blob: 8aedf5b926add96b88791e42c22db229ddcc9cdc [file] [log] [blame]
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
#include "parquet/metadata.h"
#include <algorithm>
#include <cinttypes>
#include <ostream>
#include <string>
#include <string_view>
#include <utility>
#include <vector>
#include "arrow/io/memory.h"
#include "arrow/util/key_value_metadata.h"
#include "arrow/util/logging.h"
#include "parquet/encryption/encryption_internal.h"
#include "parquet/encryption/internal_file_decryptor.h"
#include "parquet/exception.h"
#include "parquet/schema.h"
#include "parquet/schema_internal.h"
#include "parquet/thrift_internal.h"
namespace parquet {
const ApplicationVersion& ApplicationVersion::PARQUET_251_FIXED_VERSION() {
static ApplicationVersion version("parquet-mr", 1, 8, 0);
return version;
}
const ApplicationVersion& ApplicationVersion::PARQUET_816_FIXED_VERSION() {
static ApplicationVersion version("parquet-mr", 1, 2, 9);
return version;
}
const ApplicationVersion& ApplicationVersion::PARQUET_CPP_FIXED_STATS_VERSION() {
static ApplicationVersion version("parquet-cpp", 1, 3, 0);
return version;
}
const ApplicationVersion& ApplicationVersion::PARQUET_MR_FIXED_STATS_VERSION() {
static ApplicationVersion version("parquet-mr", 1, 10, 0);
return version;
}
const ApplicationVersion& ApplicationVersion::PARQUET_CPP_10353_FIXED_VERSION() {
// parquet-cpp versions released prior to Arrow 3.0 would write DataPageV2 pages
// with is_compressed==0 but still write compressed data. (See: ARROW-10353).
// Parquet 1.5.1 had this problem, and after that we switched to the
// application name "parquet-cpp-arrow", so this version is fake.
static ApplicationVersion version("parquet-cpp", 2, 0, 0);
return version;
}
std::string ParquetVersionToString(ParquetVersion::type ver) {
switch (ver) {
case ParquetVersion::PARQUET_1_0:
return "1.0";
ARROW_SUPPRESS_DEPRECATION_WARNING
case ParquetVersion::PARQUET_2_0:
return "pseudo-2.0";
ARROW_UNSUPPRESS_DEPRECATION_WARNING
case ParquetVersion::PARQUET_2_4:
return "2.4";
case ParquetVersion::PARQUET_2_6:
return "2.6";
}
// This should be unreachable
return "UNKNOWN";
}
template <typename DType>
static std::shared_ptr<Statistics> MakeTypedColumnStats(
const format::ColumnMetaData& metadata, const ColumnDescriptor* descr) {
// If ColumnOrder is defined, return max_value and min_value
if (descr->column_order().get_order() == ColumnOrder::TYPE_DEFINED_ORDER) {
return MakeStatistics<DType>(
descr, metadata.statistics.min_value, metadata.statistics.max_value,
metadata.num_values - metadata.statistics.null_count,
metadata.statistics.null_count, metadata.statistics.distinct_count,
metadata.statistics.__isset.max_value || metadata.statistics.__isset.min_value,
metadata.statistics.__isset.null_count,
metadata.statistics.__isset.distinct_count);
}
// Default behavior
return MakeStatistics<DType>(
descr, metadata.statistics.min, metadata.statistics.max,
metadata.num_values - metadata.statistics.null_count,
metadata.statistics.null_count, metadata.statistics.distinct_count,
metadata.statistics.__isset.max || metadata.statistics.__isset.min,
metadata.statistics.__isset.null_count, metadata.statistics.__isset.distinct_count);
}
std::shared_ptr<Statistics> MakeColumnStats(const format::ColumnMetaData& meta_data,
const ColumnDescriptor* descr) {
switch (static_cast<Type::type>(meta_data.type)) {
case Type::BOOLEAN:
return MakeTypedColumnStats<BooleanType>(meta_data, descr);
case Type::INT32:
return MakeTypedColumnStats<Int32Type>(meta_data, descr);
case Type::INT64:
return MakeTypedColumnStats<Int64Type>(meta_data, descr);
case Type::INT96:
return MakeTypedColumnStats<Int96Type>(meta_data, descr);
case Type::DOUBLE:
return MakeTypedColumnStats<DoubleType>(meta_data, descr);
case Type::FLOAT:
return MakeTypedColumnStats<FloatType>(meta_data, descr);
case Type::BYTE_ARRAY:
return MakeTypedColumnStats<ByteArrayType>(meta_data, descr);
case Type::FIXED_LEN_BYTE_ARRAY:
return MakeTypedColumnStats<FLBAType>(meta_data, descr);
case Type::UNDEFINED:
break;
}
throw ParquetException("Can't decode page statistics for selected column type");
}
// MetaData Accessor
// ColumnCryptoMetaData
class ColumnCryptoMetaData::ColumnCryptoMetaDataImpl {
public:
explicit ColumnCryptoMetaDataImpl(const format::ColumnCryptoMetaData* crypto_metadata)
: crypto_metadata_(crypto_metadata) {}
bool encrypted_with_footer_key() const {
return crypto_metadata_->__isset.ENCRYPTION_WITH_FOOTER_KEY;
}
bool encrypted_with_column_key() const {
return crypto_metadata_->__isset.ENCRYPTION_WITH_COLUMN_KEY;
}
std::shared_ptr<schema::ColumnPath> path_in_schema() const {
return std::make_shared<schema::ColumnPath>(
crypto_metadata_->ENCRYPTION_WITH_COLUMN_KEY.path_in_schema);
}
const std::string& key_metadata() const {
return crypto_metadata_->ENCRYPTION_WITH_COLUMN_KEY.key_metadata;
}
private:
const format::ColumnCryptoMetaData* crypto_metadata_;
};
std::unique_ptr<ColumnCryptoMetaData> ColumnCryptoMetaData::Make(
const uint8_t* metadata) {
return std::unique_ptr<ColumnCryptoMetaData>(new ColumnCryptoMetaData(metadata));
}
ColumnCryptoMetaData::ColumnCryptoMetaData(const uint8_t* metadata)
: impl_(std::make_unique<ColumnCryptoMetaDataImpl>(
reinterpret_cast<const format::ColumnCryptoMetaData*>(metadata))) {}
ColumnCryptoMetaData::~ColumnCryptoMetaData() = default;
std::shared_ptr<schema::ColumnPath> ColumnCryptoMetaData::path_in_schema() const {
return impl_->path_in_schema();
}
bool ColumnCryptoMetaData::encrypted_with_footer_key() const {
return impl_->encrypted_with_footer_key();
}
const std::string& ColumnCryptoMetaData::key_metadata() const {
return impl_->key_metadata();
}
// ColumnChunk metadata
class ColumnChunkMetaData::ColumnChunkMetaDataImpl {
public:
explicit ColumnChunkMetaDataImpl(const format::ColumnChunk* column,
const ColumnDescriptor* descr,
int16_t row_group_ordinal, int16_t column_ordinal,
const ReaderProperties& properties,
const ApplicationVersion* writer_version,
std::shared_ptr<InternalFileDecryptor> file_decryptor)
: column_(column),
descr_(descr),
properties_(properties),
writer_version_(writer_version) {
column_metadata_ = &column->meta_data;
if (column->__isset.crypto_metadata) { // column metadata is encrypted
format::ColumnCryptoMetaData ccmd = column->crypto_metadata;
if (ccmd.__isset.ENCRYPTION_WITH_COLUMN_KEY) {
if (file_decryptor != nullptr && file_decryptor->properties() != nullptr) {
// should decrypt metadata
std::shared_ptr<schema::ColumnPath> path = std::make_shared<schema::ColumnPath>(
ccmd.ENCRYPTION_WITH_COLUMN_KEY.path_in_schema);
std::string key_metadata = ccmd.ENCRYPTION_WITH_COLUMN_KEY.key_metadata;
std::string aad_column_metadata = encryption::CreateModuleAad(
file_decryptor->file_aad(), encryption::kColumnMetaData, row_group_ordinal,
column_ordinal, static_cast<int16_t>(-1));
auto decryptor = file_decryptor->GetColumnMetaDecryptor(
path->ToDotString(), key_metadata, aad_column_metadata);
auto len = static_cast<uint32_t>(column->encrypted_column_metadata.size());
ThriftDeserializer deserializer(properties_);
deserializer.DeserializeMessage(
reinterpret_cast<const uint8_t*>(column->encrypted_column_metadata.c_str()),
&len, &decrypted_metadata_, decryptor);
column_metadata_ = &decrypted_metadata_;
} else {
throw ParquetException(
"Cannot decrypt ColumnMetadata."
" FileDecryption is not setup correctly");
}
}
}
for (const auto& encoding : column_metadata_->encodings) {
encodings_.push_back(LoadEnumSafe(&encoding));
}
for (const auto& encoding_stats : column_metadata_->encoding_stats) {
encoding_stats_.push_back({LoadEnumSafe(&encoding_stats.page_type),
LoadEnumSafe(&encoding_stats.encoding),
encoding_stats.count});
}
possible_stats_ = nullptr;
}
bool Equals(const ColumnChunkMetaDataImpl& other) const {
return *column_metadata_ == *other.column_metadata_;
}
// column chunk
inline int64_t file_offset() const { return column_->file_offset; }
inline const std::string& file_path() const { return column_->file_path; }
inline Type::type type() const { return LoadEnumSafe(&column_metadata_->type); }
inline int64_t num_values() const { return column_metadata_->num_values; }
std::shared_ptr<schema::ColumnPath> path_in_schema() {
return std::make_shared<schema::ColumnPath>(column_metadata_->path_in_schema);
}
// Check if statistics are set and are valid
// 1) Must be set in the metadata
// 2) Statistics must not be corrupted
inline bool is_stats_set() const {
DCHECK(writer_version_ != nullptr);
// If the column statistics don't exist or column sort order is unknown
// we cannot use the column stats
if (!column_metadata_->__isset.statistics ||
descr_->sort_order() == SortOrder::UNKNOWN) {
return false;
}
if (possible_stats_ == nullptr) {
possible_stats_ = MakeColumnStats(*column_metadata_, descr_);
}
EncodedStatistics encodedStatistics = possible_stats_->Encode();
return writer_version_->HasCorrectStatistics(type(), encodedStatistics,
descr_->sort_order());
}
inline std::shared_ptr<Statistics> statistics() const {
return is_stats_set() ? possible_stats_ : nullptr;
}
inline Compression::type compression() const {
return LoadEnumSafe(&column_metadata_->codec);
}
const std::vector<Encoding::type>& encodings() const { return encodings_; }
const std::vector<PageEncodingStats>& encoding_stats() const { return encoding_stats_; }
inline std::optional<int64_t> bloom_filter_offset() const {
if (column_metadata_->__isset.bloom_filter_offset) {
return column_metadata_->bloom_filter_offset;
}
return std::nullopt;
}
inline bool has_dictionary_page() const {
return column_metadata_->__isset.dictionary_page_offset;
}
inline int64_t dictionary_page_offset() const {
return column_metadata_->dictionary_page_offset;
}
inline int64_t data_page_offset() const { return column_metadata_->data_page_offset; }
inline bool has_index_page() const {
return column_metadata_->__isset.index_page_offset;
}
inline int64_t index_page_offset() const { return column_metadata_->index_page_offset; }
inline int64_t total_compressed_size() const {
return column_metadata_->total_compressed_size;
}
inline int64_t total_uncompressed_size() const {
return column_metadata_->total_uncompressed_size;
}
inline std::unique_ptr<ColumnCryptoMetaData> crypto_metadata() const {
if (column_->__isset.crypto_metadata) {
return ColumnCryptoMetaData::Make(
reinterpret_cast<const uint8_t*>(&column_->crypto_metadata));
} else {
return nullptr;
}
}
std::optional<IndexLocation> GetColumnIndexLocation() const {
if (column_->__isset.column_index_offset && column_->__isset.column_index_length) {
return IndexLocation{column_->column_index_offset, column_->column_index_length};
}
return std::nullopt;
}
std::optional<IndexLocation> GetOffsetIndexLocation() const {
if (column_->__isset.offset_index_offset && column_->__isset.offset_index_length) {
return IndexLocation{column_->offset_index_offset, column_->offset_index_length};
}
return std::nullopt;
}
private:
mutable std::shared_ptr<Statistics> possible_stats_;
std::vector<Encoding::type> encodings_;
std::vector<PageEncodingStats> encoding_stats_;
const format::ColumnChunk* column_;
const format::ColumnMetaData* column_metadata_;
format::ColumnMetaData decrypted_metadata_;
const ColumnDescriptor* descr_;
const ReaderProperties properties_;
const ApplicationVersion* writer_version_;
};
std::unique_ptr<ColumnChunkMetaData> ColumnChunkMetaData::Make(
const void* metadata, const ColumnDescriptor* descr,
const ReaderProperties& properties, const ApplicationVersion* writer_version,
int16_t row_group_ordinal, int16_t column_ordinal,
std::shared_ptr<InternalFileDecryptor> file_decryptor) {
return std::unique_ptr<ColumnChunkMetaData>(
new ColumnChunkMetaData(metadata, descr, row_group_ordinal, column_ordinal,
properties, writer_version, std::move(file_decryptor)));
}
std::unique_ptr<ColumnChunkMetaData> ColumnChunkMetaData::Make(
const void* metadata, const ColumnDescriptor* descr,
const ApplicationVersion* writer_version, int16_t row_group_ordinal,
int16_t column_ordinal, std::shared_ptr<InternalFileDecryptor> file_decryptor) {
return std::unique_ptr<ColumnChunkMetaData>(new ColumnChunkMetaData(
metadata, descr, row_group_ordinal, column_ordinal, default_reader_properties(),
writer_version, std::move(file_decryptor)));
}
ColumnChunkMetaData::ColumnChunkMetaData(
const void* metadata, const ColumnDescriptor* descr, int16_t row_group_ordinal,
int16_t column_ordinal, const ReaderProperties& properties,
const ApplicationVersion* writer_version,
std::shared_ptr<InternalFileDecryptor> file_decryptor)
: impl_{new ColumnChunkMetaDataImpl(
reinterpret_cast<const format::ColumnChunk*>(metadata), descr,
row_group_ordinal, column_ordinal, properties, writer_version,
std::move(file_decryptor))} {}
ColumnChunkMetaData::~ColumnChunkMetaData() = default;
// column chunk
int64_t ColumnChunkMetaData::file_offset() const { return impl_->file_offset(); }
const std::string& ColumnChunkMetaData::file_path() const { return impl_->file_path(); }
Type::type ColumnChunkMetaData::type() const { return impl_->type(); }
int64_t ColumnChunkMetaData::num_values() const { return impl_->num_values(); }
std::shared_ptr<schema::ColumnPath> ColumnChunkMetaData::path_in_schema() const {
return impl_->path_in_schema();
}
std::shared_ptr<Statistics> ColumnChunkMetaData::statistics() const {
return impl_->statistics();
}
bool ColumnChunkMetaData::is_stats_set() const { return impl_->is_stats_set(); }
std::optional<int64_t> ColumnChunkMetaData::bloom_filter_offset() const {
return impl_->bloom_filter_offset();
}
bool ColumnChunkMetaData::has_dictionary_page() const {
return impl_->has_dictionary_page();
}
int64_t ColumnChunkMetaData::dictionary_page_offset() const {
return impl_->dictionary_page_offset();
}
int64_t ColumnChunkMetaData::data_page_offset() const {
return impl_->data_page_offset();
}
bool ColumnChunkMetaData::has_index_page() const { return impl_->has_index_page(); }
int64_t ColumnChunkMetaData::index_page_offset() const {
return impl_->index_page_offset();
}
Compression::type ColumnChunkMetaData::compression() const {
return impl_->compression();
}
bool ColumnChunkMetaData::can_decompress() const {
return ::arrow::util::Codec::IsAvailable(compression());
}
const std::vector<Encoding::type>& ColumnChunkMetaData::encodings() const {
return impl_->encodings();
}
const std::vector<PageEncodingStats>& ColumnChunkMetaData::encoding_stats() const {
return impl_->encoding_stats();
}
int64_t ColumnChunkMetaData::total_uncompressed_size() const {
return impl_->total_uncompressed_size();
}
int64_t ColumnChunkMetaData::total_compressed_size() const {
return impl_->total_compressed_size();
}
std::unique_ptr<ColumnCryptoMetaData> ColumnChunkMetaData::crypto_metadata() const {
return impl_->crypto_metadata();
}
std::optional<IndexLocation> ColumnChunkMetaData::GetColumnIndexLocation() const {
return impl_->GetColumnIndexLocation();
}
std::optional<IndexLocation> ColumnChunkMetaData::GetOffsetIndexLocation() const {
return impl_->GetOffsetIndexLocation();
}
bool ColumnChunkMetaData::Equals(const ColumnChunkMetaData& other) const {
return impl_->Equals(*other.impl_);
}
// row-group metadata
class RowGroupMetaData::RowGroupMetaDataImpl {
public:
explicit RowGroupMetaDataImpl(const format::RowGroup* row_group,
const SchemaDescriptor* schema,
const ReaderProperties& properties,
const ApplicationVersion* writer_version,
std::shared_ptr<InternalFileDecryptor> file_decryptor)
: row_group_(row_group),
schema_(schema),
properties_(properties),
writer_version_(writer_version),
file_decryptor_(std::move(file_decryptor)) {
if (ARROW_PREDICT_FALSE(row_group_->columns.size() >
static_cast<size_t>(std::numeric_limits<int>::max()))) {
throw ParquetException("Row group had too many columns: ",
row_group_->columns.size());
}
}
bool Equals(const RowGroupMetaDataImpl& other) const {
return *row_group_ == *other.row_group_;
}
inline int num_columns() const { return static_cast<int>(row_group_->columns.size()); }
inline int64_t num_rows() const { return row_group_->num_rows; }
inline int64_t total_byte_size() const { return row_group_->total_byte_size; }
inline int64_t total_compressed_size() const {
return row_group_->total_compressed_size;
}
inline int64_t file_offset() const { return row_group_->file_offset; }
inline const SchemaDescriptor* schema() const { return schema_; }
std::unique_ptr<ColumnChunkMetaData> ColumnChunk(int i) {
if (i >= 0 && i < num_columns()) {
return ColumnChunkMetaData::Make(&row_group_->columns[i], schema_->Column(i),
properties_, writer_version_, row_group_->ordinal,
i, file_decryptor_);
}
throw ParquetException("The file only has ", num_columns(),
" columns, requested metadata for column: ", i);
}
std::vector<SortingColumn> sorting_columns() const {
std::vector<SortingColumn> sorting_columns;
if (!row_group_->__isset.sorting_columns) {
return sorting_columns;
}
sorting_columns.resize(row_group_->sorting_columns.size());
for (size_t i = 0; i < sorting_columns.size(); ++i) {
sorting_columns[i] = FromThrift(row_group_->sorting_columns[i]);
}
return sorting_columns;
}
private:
const format::RowGroup* row_group_;
const SchemaDescriptor* schema_;
const ReaderProperties properties_;
const ApplicationVersion* writer_version_;
std::shared_ptr<InternalFileDecryptor> file_decryptor_;
};
std::unique_ptr<RowGroupMetaData> RowGroupMetaData::Make(
const void* metadata, const SchemaDescriptor* schema,
const ApplicationVersion* writer_version,
std::shared_ptr<InternalFileDecryptor> file_decryptor) {
return std::unique_ptr<parquet::RowGroupMetaData>(
new RowGroupMetaData(metadata, schema, default_reader_properties(), writer_version,
std::move(file_decryptor)));
}
std::unique_ptr<RowGroupMetaData> RowGroupMetaData::Make(
const void* metadata, const SchemaDescriptor* schema,
const ReaderProperties& properties, const ApplicationVersion* writer_version,
std::shared_ptr<InternalFileDecryptor> file_decryptor) {
return std::unique_ptr<parquet::RowGroupMetaData>(new RowGroupMetaData(
metadata, schema, properties, writer_version, std::move(file_decryptor)));
}
RowGroupMetaData::RowGroupMetaData(const void* metadata, const SchemaDescriptor* schema,
const ReaderProperties& properties,
const ApplicationVersion* writer_version,
std::shared_ptr<InternalFileDecryptor> file_decryptor)
: impl_{new RowGroupMetaDataImpl(reinterpret_cast<const format::RowGroup*>(metadata),
schema, properties, writer_version,
std::move(file_decryptor))} {}
RowGroupMetaData::~RowGroupMetaData() = default;
bool RowGroupMetaData::Equals(const RowGroupMetaData& other) const {
return impl_->Equals(*other.impl_);
}
int RowGroupMetaData::num_columns() const { return impl_->num_columns(); }
int64_t RowGroupMetaData::num_rows() const { return impl_->num_rows(); }
int64_t RowGroupMetaData::total_byte_size() const { return impl_->total_byte_size(); }
int64_t RowGroupMetaData::total_compressed_size() const {
return impl_->total_compressed_size();
}
int64_t RowGroupMetaData::file_offset() const { return impl_->file_offset(); }
const SchemaDescriptor* RowGroupMetaData::schema() const { return impl_->schema(); }
std::unique_ptr<ColumnChunkMetaData> RowGroupMetaData::ColumnChunk(int i) const {
return impl_->ColumnChunk(i);
}
bool RowGroupMetaData::can_decompress() const {
int n_columns = num_columns();
for (int i = 0; i < n_columns; i++) {
if (!ColumnChunk(i)->can_decompress()) {
return false;
}
}
return true;
}
std::vector<SortingColumn> RowGroupMetaData::sorting_columns() const {
return impl_->sorting_columns();
}
// file metadata
class FileMetaData::FileMetaDataImpl {
public:
FileMetaDataImpl() = default;
explicit FileMetaDataImpl(
const void* metadata, uint32_t* metadata_len, ReaderProperties properties,
std::shared_ptr<InternalFileDecryptor> file_decryptor = nullptr)
: properties_(std::move(properties)), file_decryptor_(std::move(file_decryptor)) {
metadata_ = std::make_unique<format::FileMetaData>();
auto footer_decryptor =
file_decryptor_ != nullptr ? file_decryptor_->GetFooterDecryptor() : nullptr;
ThriftDeserializer deserializer(properties_);
deserializer.DeserializeMessage(reinterpret_cast<const uint8_t*>(metadata),
metadata_len, metadata_.get(), footer_decryptor);
metadata_len_ = *metadata_len;
if (metadata_->__isset.created_by) {
writer_version_ = ApplicationVersion(metadata_->created_by);
} else {
writer_version_ = ApplicationVersion("unknown 0.0.0");
}
InitSchema();
InitColumnOrders();
InitKeyValueMetadata();
}
bool VerifySignature(const void* signature) {
// verify decryption properties are set
if (file_decryptor_ == nullptr) {
throw ParquetException("Decryption not set properly. cannot verify signature");
}
// serialize the footer
uint8_t* serialized_data;
uint32_t serialized_len = metadata_len_;
ThriftSerializer serializer;
serializer.SerializeToBuffer(metadata_.get(), &serialized_len, &serialized_data);
// encrypt with nonce
auto nonce = const_cast<uint8_t*>(reinterpret_cast<const uint8_t*>(signature));
auto tag = const_cast<uint8_t*>(reinterpret_cast<const uint8_t*>(signature)) +
encryption::kNonceLength;
std::string key = file_decryptor_->GetFooterKey();
std::string aad = encryption::CreateFooterAad(file_decryptor_->file_aad());
auto aes_encryptor = encryption::AesEncryptor::Make(
file_decryptor_->algorithm(), static_cast<int>(key.size()), true,
false /*write_length*/, nullptr);
std::shared_ptr<Buffer> encrypted_buffer = std::static_pointer_cast<ResizableBuffer>(
AllocateBuffer(file_decryptor_->pool(),
aes_encryptor->CiphertextSizeDelta() + serialized_len));
uint32_t encrypted_len = aes_encryptor->SignedFooterEncrypt(
serialized_data, serialized_len, str2bytes(key), static_cast<int>(key.size()),
str2bytes(aad), static_cast<int>(aad.size()), nonce,
encrypted_buffer->mutable_data());
// Delete AES encryptor object. It was created only to verify the footer signature.
aes_encryptor->WipeOut();
delete aes_encryptor;
return 0 ==
memcmp(encrypted_buffer->data() + encrypted_len - encryption::kGcmTagLength,
tag, encryption::kGcmTagLength);
}
inline uint32_t size() const { return metadata_len_; }
inline int num_columns() const { return schema_.num_columns(); }
inline int64_t num_rows() const { return metadata_->num_rows; }
inline int num_row_groups() const {
return static_cast<int>(metadata_->row_groups.size());
}
inline int32_t version() const { return metadata_->version; }
inline const std::string& created_by() const { return metadata_->created_by; }
inline int num_schema_elements() const {
return static_cast<int>(metadata_->schema.size());
}
inline bool is_encryption_algorithm_set() const {
return metadata_->__isset.encryption_algorithm;
}
inline EncryptionAlgorithm encryption_algorithm() {
return FromThrift(metadata_->encryption_algorithm);
}
inline const std::string& footer_signing_key_metadata() {
return metadata_->footer_signing_key_metadata;
}
const ApplicationVersion& writer_version() const { return writer_version_; }
void WriteTo(::arrow::io::OutputStream* dst,
const std::shared_ptr<Encryptor>& encryptor) const {
ThriftSerializer serializer;
// Only in encrypted files with plaintext footers the
// encryption_algorithm is set in footer
if (is_encryption_algorithm_set()) {
uint8_t* serialized_data;
uint32_t serialized_len;
serializer.SerializeToBuffer(metadata_.get(), &serialized_len, &serialized_data);
// encrypt the footer key
std::vector<uint8_t> encrypted_data(encryptor->CiphertextSizeDelta() +
serialized_len);
unsigned encrypted_len =
encryptor->Encrypt(serialized_data, serialized_len, encrypted_data.data());
// write unencrypted footer
PARQUET_THROW_NOT_OK(dst->Write(serialized_data, serialized_len));
// Write signature (nonce and tag)
PARQUET_THROW_NOT_OK(
dst->Write(encrypted_data.data() + 4, encryption::kNonceLength));
PARQUET_THROW_NOT_OK(
dst->Write(encrypted_data.data() + encrypted_len - encryption::kGcmTagLength,
encryption::kGcmTagLength));
} else { // either plaintext file (when encryptor is null)
// or encrypted file with encrypted footer
serializer.Serialize(metadata_.get(), dst, encryptor);
}
}
std::unique_ptr<RowGroupMetaData> RowGroup(int i) {
if (!(i >= 0 && i < num_row_groups())) {
std::stringstream ss;
ss << "The file only has " << num_row_groups()
<< " row groups, requested metadata for row group: " << i;
throw ParquetException(ss.str());
}
return RowGroupMetaData::Make(&metadata_->row_groups[i], &schema_, properties_,
&writer_version_, file_decryptor_);
}
bool Equals(const FileMetaDataImpl& other) const {
return *metadata_ == *other.metadata_;
}
const SchemaDescriptor* schema() const { return &schema_; }
const std::shared_ptr<const KeyValueMetadata>& key_value_metadata() const {
return key_value_metadata_;
}
void set_file_path(const std::string& path) {
for (format::RowGroup& row_group : metadata_->row_groups) {
for (format::ColumnChunk& chunk : row_group.columns) {
chunk.__set_file_path(path);
}
}
}
format::RowGroup& row_group(int i) {
if (!(i >= 0 && i < num_row_groups())) {
std::stringstream ss;
ss << "The file only has " << num_row_groups()
<< " row groups, requested metadata for row group: " << i;
throw ParquetException(ss.str());
}
return metadata_->row_groups[i];
}
void AppendRowGroups(const std::unique_ptr<FileMetaDataImpl>& other) {
std::ostringstream diff_output;
if (!schema()->Equals(*other->schema(), &diff_output)) {
auto msg = "AppendRowGroups requires equal schemas.\n" + diff_output.str();
throw ParquetException(msg);
}
// ARROW-13654: `other` may point to self, be careful not to enter an infinite loop
const int n = other->num_row_groups();
// ARROW-16613: do not use reserve() as that may suppress overallocation
// and incur O(n²) behavior on repeated calls to AppendRowGroups().
// (see https://en.cppreference.com/w/cpp/container/vector/reserve
// about inappropriate uses of reserve()).
const auto start = metadata_->row_groups.size();
metadata_->row_groups.resize(start + n);
for (int i = 0; i < n; i++) {
metadata_->row_groups[start + i] = other->row_group(i);
metadata_->num_rows += metadata_->row_groups[start + i].num_rows;
}
}
std::shared_ptr<FileMetaData> Subset(const std::vector<int>& row_groups) {
for (int i : row_groups) {
if (i < num_row_groups()) continue;
throw ParquetException(
"The file only has ", num_row_groups(),
" row groups, but requested a subset including row group: ", i);
}
std::shared_ptr<FileMetaData> out(new FileMetaData());
out->impl_ = std::make_unique<FileMetaDataImpl>();
out->impl_->metadata_ = std::make_unique<format::FileMetaData>();
auto metadata = out->impl_->metadata_.get();
metadata->version = metadata_->version;
metadata->schema = metadata_->schema;
metadata->row_groups.resize(row_groups.size());
int i = 0;
for (int selected_index : row_groups) {
metadata->num_rows += row_group(selected_index).num_rows;
metadata->row_groups[i++] = row_group(selected_index);
}
metadata->key_value_metadata = metadata_->key_value_metadata;
metadata->created_by = metadata_->created_by;
metadata->column_orders = metadata_->column_orders;
metadata->encryption_algorithm = metadata_->encryption_algorithm;
metadata->footer_signing_key_metadata = metadata_->footer_signing_key_metadata;
metadata->__isset = metadata_->__isset;
out->impl_->schema_ = schema_;
out->impl_->writer_version_ = writer_version_;
out->impl_->key_value_metadata_ = key_value_metadata_;
out->impl_->file_decryptor_ = file_decryptor_;
return out;
}
void set_file_decryptor(std::shared_ptr<InternalFileDecryptor> file_decryptor) {
file_decryptor_ = file_decryptor;
}
private:
friend FileMetaDataBuilder;
uint32_t metadata_len_ = 0;
std::unique_ptr<format::FileMetaData> metadata_;
SchemaDescriptor schema_;
ApplicationVersion writer_version_;
std::shared_ptr<const KeyValueMetadata> key_value_metadata_;
const ReaderProperties properties_;
std::shared_ptr<InternalFileDecryptor> file_decryptor_;
void InitSchema() {
if (metadata_->schema.empty()) {
throw ParquetException("Empty file schema (no root)");
}
schema_.Init(schema::Unflatten(&metadata_->schema[0],
static_cast<int>(metadata_->schema.size())));
}
void InitColumnOrders() {
// update ColumnOrder
std::vector<parquet::ColumnOrder> column_orders;
if (metadata_->__isset.column_orders) {
column_orders.reserve(metadata_->column_orders.size());
for (auto column_order : metadata_->column_orders) {
if (column_order.__isset.TYPE_ORDER) {
column_orders.push_back(ColumnOrder::type_defined_);
} else {
column_orders.push_back(ColumnOrder::undefined_);
}
}
} else {
column_orders.resize(schema_.num_columns(), ColumnOrder::undefined_);
}
schema_.updateColumnOrders(column_orders);
}
void InitKeyValueMetadata() {
std::shared_ptr<KeyValueMetadata> metadata = nullptr;
if (metadata_->__isset.key_value_metadata) {
metadata = std::make_shared<KeyValueMetadata>();
for (const auto& it : metadata_->key_value_metadata) {
metadata->Append(it.key, it.value);
}
}
key_value_metadata_ = std::move(metadata);
}
};
std::shared_ptr<FileMetaData> FileMetaData::Make(
const void* metadata, uint32_t* metadata_len, const ReaderProperties& properties,
std::shared_ptr<InternalFileDecryptor> file_decryptor) {
// This FileMetaData ctor is private, not compatible with std::make_shared
return std::shared_ptr<FileMetaData>(
new FileMetaData(metadata, metadata_len, properties, std::move(file_decryptor)));
}
std::shared_ptr<FileMetaData> FileMetaData::Make(
const void* metadata, uint32_t* metadata_len,
std::shared_ptr<InternalFileDecryptor> file_decryptor) {
return std::shared_ptr<FileMetaData>(new FileMetaData(
metadata, metadata_len, default_reader_properties(), file_decryptor));
}
FileMetaData::FileMetaData(const void* metadata, uint32_t* metadata_len,
const ReaderProperties& properties,
std::shared_ptr<InternalFileDecryptor> file_decryptor)
: impl_(new FileMetaDataImpl(metadata, metadata_len, properties, file_decryptor)) {}
FileMetaData::FileMetaData() : impl_(new FileMetaDataImpl()) {}
FileMetaData::~FileMetaData() = default;
bool FileMetaData::Equals(const FileMetaData& other) const {
return impl_->Equals(*other.impl_);
}
std::unique_ptr<RowGroupMetaData> FileMetaData::RowGroup(int i) const {
return impl_->RowGroup(i);
}
bool FileMetaData::VerifySignature(const void* signature) {
return impl_->VerifySignature(signature);
}
uint32_t FileMetaData::size() const { return impl_->size(); }
int FileMetaData::num_columns() const { return impl_->num_columns(); }
int64_t FileMetaData::num_rows() const { return impl_->num_rows(); }
int FileMetaData::num_row_groups() const { return impl_->num_row_groups(); }
bool FileMetaData::can_decompress() const {
int n_row_groups = num_row_groups();
for (int i = 0; i < n_row_groups; i++) {
if (!RowGroup(i)->can_decompress()) {
return false;
}
}
return true;
}
bool FileMetaData::is_encryption_algorithm_set() const {
return impl_->is_encryption_algorithm_set();
}
EncryptionAlgorithm FileMetaData::encryption_algorithm() const {
return impl_->encryption_algorithm();
}
const std::string& FileMetaData::footer_signing_key_metadata() const {
return impl_->footer_signing_key_metadata();
}
void FileMetaData::set_file_decryptor(
std::shared_ptr<InternalFileDecryptor> file_decryptor) {
impl_->set_file_decryptor(file_decryptor);
}
ParquetVersion::type FileMetaData::version() const {
switch (impl_->version()) {
case 1:
return ParquetVersion::PARQUET_1_0;
case 2:
return ParquetVersion::PARQUET_2_LATEST;
default:
// Improperly set version, assuming Parquet 1.0
break;
}
return ParquetVersion::PARQUET_1_0;
}
const ApplicationVersion& FileMetaData::writer_version() const {
return impl_->writer_version();
}
const std::string& FileMetaData::created_by() const { return impl_->created_by(); }
int FileMetaData::num_schema_elements() const { return impl_->num_schema_elements(); }
const SchemaDescriptor* FileMetaData::schema() const { return impl_->schema(); }
const std::shared_ptr<const KeyValueMetadata>& FileMetaData::key_value_metadata() const {
return impl_->key_value_metadata();
}
void FileMetaData::set_file_path(const std::string& path) { impl_->set_file_path(path); }
void FileMetaData::AppendRowGroups(const FileMetaData& other) {
impl_->AppendRowGroups(other.impl_);
}
std::shared_ptr<FileMetaData> FileMetaData::Subset(
const std::vector<int>& row_groups) const {
return impl_->Subset(row_groups);
}
void FileMetaData::WriteTo(::arrow::io::OutputStream* dst,
const std::shared_ptr<Encryptor>& encryptor) const {
return impl_->WriteTo(dst, encryptor);
}
class FileCryptoMetaData::FileCryptoMetaDataImpl {
public:
FileCryptoMetaDataImpl() = default;
explicit FileCryptoMetaDataImpl(const uint8_t* metadata, uint32_t* metadata_len,
const ReaderProperties& properties) {
ThriftDeserializer deserializer(properties);
deserializer.DeserializeMessage(metadata, metadata_len, &metadata_);
metadata_len_ = *metadata_len;
}
EncryptionAlgorithm encryption_algorithm() const {
return FromThrift(metadata_.encryption_algorithm);
}
const std::string& key_metadata() const { return metadata_.key_metadata; }
void WriteTo(::arrow::io::OutputStream* dst) const {
ThriftSerializer serializer;
serializer.Serialize(&metadata_, dst);
}
private:
friend FileMetaDataBuilder;
format::FileCryptoMetaData metadata_;
uint32_t metadata_len_;
};
EncryptionAlgorithm FileCryptoMetaData::encryption_algorithm() const {
return impl_->encryption_algorithm();
}
const std::string& FileCryptoMetaData::key_metadata() const {
return impl_->key_metadata();
}
std::shared_ptr<FileCryptoMetaData> FileCryptoMetaData::Make(
const uint8_t* serialized_metadata, uint32_t* metadata_len,
const ReaderProperties& properties) {
return std::shared_ptr<FileCryptoMetaData>(
new FileCryptoMetaData(serialized_metadata, metadata_len, properties));
}
FileCryptoMetaData::FileCryptoMetaData(const uint8_t* serialized_metadata,
uint32_t* metadata_len,
const ReaderProperties& properties)
: impl_(new FileCryptoMetaDataImpl(serialized_metadata, metadata_len, properties)) {}
FileCryptoMetaData::FileCryptoMetaData() : impl_(new FileCryptoMetaDataImpl()) {}
FileCryptoMetaData::~FileCryptoMetaData() = default;
void FileCryptoMetaData::WriteTo(::arrow::io::OutputStream* dst) const {
impl_->WriteTo(dst);
}
std::string FileMetaData::SerializeToString() const {
// We need to pass in an initial size. Since it will automatically
// increase the buffer size to hold the metadata, we just leave it 0.
PARQUET_ASSIGN_OR_THROW(auto serializer, ::arrow::io::BufferOutputStream::Create(0));
WriteTo(serializer.get());
PARQUET_ASSIGN_OR_THROW(auto metadata_buffer, serializer->Finish());
return metadata_buffer->ToString();
}
ApplicationVersion::ApplicationVersion(std::string application, int major, int minor,
int patch)
: application_(std::move(application)), version{major, minor, patch, "", "", ""} {}
namespace {
// Parse the application version format and set parsed values to
// ApplicationVersion.
//
// The application version format must be compatible parquet-mr's
// one. See also:
// * https://github.com/apache/parquet-mr/blob/master/parquet-common/src/main/java/org/apache/parquet/VersionParser.java
// * https://github.com/apache/parquet-mr/blob/master/parquet-common/src/main/java/org/apache/parquet/SemanticVersion.java
//
// The application version format:
// "${APPLICATION_NAME}"
// "${APPLICATION_NAME} version ${VERSION}"
// "${APPLICATION_NAME} version ${VERSION} (build ${BUILD_NAME})"
//
// Eg:
// parquet-cpp
// parquet-cpp version 1.5.0ab-xyz5.5.0+cd
// parquet-cpp version 1.5.0ab-xyz5.5.0+cd (build abcd)
//
// The VERSION format:
// "${MAJOR}"
// "${MAJOR}.${MINOR}"
// "${MAJOR}.${MINOR}.${PATCH}"
// "${MAJOR}.${MINOR}.${PATCH}${UNKNOWN}"
// "${MAJOR}.${MINOR}.${PATCH}${UNKNOWN}-${PRE_RELEASE}"
// "${MAJOR}.${MINOR}.${PATCH}${UNKNOWN}-${PRE_RELEASE}+${BUILD_INFO}"
// "${MAJOR}.${MINOR}.${PATCH}${UNKNOWN}+${BUILD_INFO}"
// "${MAJOR}.${MINOR}.${PATCH}-${PRE_RELEASE}"
// "${MAJOR}.${MINOR}.${PATCH}-${PRE_RELEASE}+${BUILD_INFO}"
// "${MAJOR}.${MINOR}.${PATCH}+${BUILD_INFO}"
//
// Eg:
// 1
// 1.5
// 1.5.0
// 1.5.0ab
// 1.5.0ab-cdh5.5.0
// 1.5.0ab-cdh5.5.0+cd
// 1.5.0ab+cd
// 1.5.0-cdh5.5.0
// 1.5.0-cdh5.5.0+cd
// 1.5.0+cd
class ApplicationVersionParser {
public:
ApplicationVersionParser(const std::string& created_by,
ApplicationVersion& application_version)
: created_by_(created_by),
application_version_(application_version),
spaces_(" \t\v\r\n\f"),
digits_("0123456789") {}
void Parse() {
application_version_.application_ = "unknown";
application_version_.version = {0, 0, 0, "", "", ""};
if (!ParseApplicationName()) {
return;
}
if (!ParseVersion()) {
return;
}
if (!ParseBuildName()) {
return;
}
}
private:
bool IsSpace(const std::string& string, const size_t& offset) {
auto target = ::std::string_view(string).substr(offset, 1);
return target.find_first_of(spaces_) != ::std::string_view::npos;
}
void RemovePrecedingSpaces(const std::string& string, size_t& start,
const size_t& end) {
while (start < end && IsSpace(string, start)) {
++start;
}
}
void RemoveTrailingSpaces(const std::string& string, const size_t& start, size_t& end) {
while (start < (end - 1) && (end - 1) < string.size() && IsSpace(string, end - 1)) {
--end;
}
}
bool ParseApplicationName() {
std::string version_mark(" version ");
auto version_mark_position = created_by_.find(version_mark);
size_t application_name_end;
// No VERSION and BUILD_NAME.
if (version_mark_position == std::string::npos) {
version_start_ = std::string::npos;
application_name_end = created_by_.size();
} else {
version_start_ = version_mark_position + version_mark.size();
application_name_end = version_mark_position;
}
size_t application_name_start = 0;
RemovePrecedingSpaces(created_by_, application_name_start, application_name_end);
RemoveTrailingSpaces(created_by_, application_name_start, application_name_end);
application_version_.application_ = created_by_.substr(
application_name_start, application_name_end - application_name_start);
return true;
}
bool ParseVersion() {
// No VERSION.
if (version_start_ == std::string::npos) {
return false;
}
RemovePrecedingSpaces(created_by_, version_start_, created_by_.size());
version_end_ = created_by_.find(" (", version_start_);
// No BUILD_NAME.
if (version_end_ == std::string::npos) {
version_end_ = created_by_.size();
}
RemoveTrailingSpaces(created_by_, version_start_, version_end_);
// No VERSION.
if (version_start_ == version_end_) {
return false;
}
version_string_ = created_by_.substr(version_start_, version_end_ - version_start_);
if (!ParseVersionMajor()) {
return false;
}
if (!ParseVersionMinor()) {
return false;
}
if (!ParseVersionPatch()) {
return false;
}
if (!ParseVersionUnknown()) {
return false;
}
if (!ParseVersionPreRelease()) {
return false;
}
if (!ParseVersionBuildInfo()) {
return false;
}
return true;
}
bool ParseVersionMajor() {
size_t version_major_start = 0;
auto version_major_end = version_string_.find_first_not_of(digits_);
// MAJOR only.
if (version_major_end == std::string::npos) {
version_major_end = version_string_.size();
version_parsing_position_ = version_major_end;
} else {
// No ".".
if (version_string_[version_major_end] != '.') {
return false;
}
// No MAJOR.
if (version_major_end == version_major_start) {
return false;
}
version_parsing_position_ = version_major_end + 1; // +1 is for '.'.
}
auto version_major_string = version_string_.substr(
version_major_start, version_major_end - version_major_start);
application_version_.version.major = atoi(version_major_string.c_str());
return true;
}
bool ParseVersionMinor() {
auto version_minor_start = version_parsing_position_;
auto version_minor_end =
version_string_.find_first_not_of(digits_, version_minor_start);
// MAJOR.MINOR only.
if (version_minor_end == std::string::npos) {
version_minor_end = version_string_.size();
version_parsing_position_ = version_minor_end;
} else {
// No ".".
if (version_string_[version_minor_end] != '.') {
return false;
}
// No MINOR.
if (version_minor_end == version_minor_start) {
return false;
}
version_parsing_position_ = version_minor_end + 1; // +1 is for '.'.
}
auto version_minor_string = version_string_.substr(
version_minor_start, version_minor_end - version_minor_start);
application_version_.version.minor = atoi(version_minor_string.c_str());
return true;
}
bool ParseVersionPatch() {
auto version_patch_start = version_parsing_position_;
auto version_patch_end =
version_string_.find_first_not_of(digits_, version_patch_start);
// No UNKNOWN, PRE_RELEASE and BUILD_INFO.
if (version_patch_end == std::string::npos) {
version_patch_end = version_string_.size();
}
// No PATCH.
if (version_patch_end == version_patch_start) {
return false;
}
auto version_patch_string = version_string_.substr(
version_patch_start, version_patch_end - version_patch_start);
application_version_.version.patch = atoi(version_patch_string.c_str());
version_parsing_position_ = version_patch_end;
return true;
}
bool ParseVersionUnknown() {
// No UNKNOWN.
if (version_parsing_position_ == version_string_.size()) {
return true;
}
auto version_unknown_start = version_parsing_position_;
auto version_unknown_end = version_string_.find_first_of("-+", version_unknown_start);
// No PRE_RELEASE and BUILD_INFO
if (version_unknown_end == std::string::npos) {
version_unknown_end = version_string_.size();
}
application_version_.version.unknown = version_string_.substr(
version_unknown_start, version_unknown_end - version_unknown_start);
version_parsing_position_ = version_unknown_end;
return true;
}
bool ParseVersionPreRelease() {
// No PRE_RELEASE.
if (version_parsing_position_ == version_string_.size() ||
version_string_[version_parsing_position_] != '-') {
return true;
}
auto version_pre_release_start = version_parsing_position_ + 1; // +1 is for '-'.
auto version_pre_release_end =
version_string_.find_first_of("+", version_pre_release_start);
// No BUILD_INFO
if (version_pre_release_end == std::string::npos) {
version_pre_release_end = version_string_.size();
}
application_version_.version.pre_release = version_string_.substr(
version_pre_release_start, version_pre_release_end - version_pre_release_start);
version_parsing_position_ = version_pre_release_end;
return true;
}
bool ParseVersionBuildInfo() {
// No BUILD_INFO.
if (version_parsing_position_ == version_string_.size() ||
version_string_[version_parsing_position_] != '+') {
return true;
}
auto version_build_info_start = version_parsing_position_ + 1; // +1 is for '+'.
application_version_.version.build_info =
version_string_.substr(version_build_info_start);
return true;
}
bool ParseBuildName() {
std::string build_mark(" (build ");
auto build_mark_position = created_by_.find(build_mark, version_end_);
// No BUILD_NAME.
if (build_mark_position == std::string::npos) {
return false;
}
auto build_name_start = build_mark_position + build_mark.size();
RemovePrecedingSpaces(created_by_, build_name_start, created_by_.size());
auto build_name_end = created_by_.find_first_of(")", build_name_start);
// No end ")".
if (build_name_end == std::string::npos) {
return false;
}
RemoveTrailingSpaces(created_by_, build_name_start, build_name_end);
application_version_.build_ =
created_by_.substr(build_name_start, build_name_end - build_name_start);
return true;
}
const std::string& created_by_;
ApplicationVersion& application_version_;
// For parsing.
std::string spaces_;
std::string digits_;
size_t version_parsing_position_;
size_t version_start_;
size_t version_end_;
std::string version_string_;
};
} // namespace
ApplicationVersion::ApplicationVersion(const std::string& created_by) {
ApplicationVersionParser parser(created_by, *this);
parser.Parse();
}
bool ApplicationVersion::VersionLt(const ApplicationVersion& other_version) const {
if (application_ != other_version.application_) return false;
if (version.major < other_version.version.major) return true;
if (version.major > other_version.version.major) return false;
DCHECK_EQ(version.major, other_version.version.major);
if (version.minor < other_version.version.minor) return true;
if (version.minor > other_version.version.minor) return false;
DCHECK_EQ(version.minor, other_version.version.minor);
return version.patch < other_version.version.patch;
}
bool ApplicationVersion::VersionEq(const ApplicationVersion& other_version) const {
return application_ == other_version.application_ &&
version.major == other_version.version.major &&
version.minor == other_version.version.minor &&
version.patch == other_version.version.patch;
}
// Reference:
// parquet-mr/parquet-column/src/main/java/org/apache/parquet/CorruptStatistics.java
// PARQUET-686 has more discussion on statistics
bool ApplicationVersion::HasCorrectStatistics(Type::type col_type,
EncodedStatistics& statistics,
SortOrder::type sort_order) const {
// parquet-cpp version 1.3.0 and parquet-mr 1.10.0 onwards stats are computed
// correctly for all types
if ((application_ == "parquet-cpp" && VersionLt(PARQUET_CPP_FIXED_STATS_VERSION())) ||
(application_ == "parquet-mr" && VersionLt(PARQUET_MR_FIXED_STATS_VERSION()))) {
// Only SIGNED are valid unless max and min are the same
// (in which case the sort order does not matter)
bool max_equals_min = statistics.has_min && statistics.has_max
? statistics.min() == statistics.max()
: false;
if (SortOrder::SIGNED != sort_order && !max_equals_min) {
return false;
}
// Statistics of other types are OK
if (col_type != Type::FIXED_LEN_BYTE_ARRAY && col_type != Type::BYTE_ARRAY) {
return true;
}
}
// created_by is not populated, which could have been caused by
// parquet-mr during the same time as PARQUET-251, see PARQUET-297
if (application_ == "unknown") {
return true;
}
// Unknown sort order has incorrect stats
if (SortOrder::UNKNOWN == sort_order) {
return false;
}
// PARQUET-251
if (VersionLt(PARQUET_251_FIXED_VERSION())) {
return false;
}
return true;
}
// MetaData Builders
// row-group metadata
class ColumnChunkMetaDataBuilder::ColumnChunkMetaDataBuilderImpl {
public:
explicit ColumnChunkMetaDataBuilderImpl(std::shared_ptr<WriterProperties> props,
const ColumnDescriptor* column)
: owned_column_chunk_(new format::ColumnChunk),
properties_(std::move(props)),
column_(column) {
Init(owned_column_chunk_.get());
}
explicit ColumnChunkMetaDataBuilderImpl(std::shared_ptr<WriterProperties> props,
const ColumnDescriptor* column,
format::ColumnChunk* column_chunk)
: properties_(std::move(props)), column_(column) {
Init(column_chunk);
}
const void* contents() const { return column_chunk_; }
// column chunk
void set_file_path(const std::string& val) { column_chunk_->__set_file_path(val); }
// column metadata
void SetStatistics(const EncodedStatistics& val) {
column_chunk_->meta_data.__set_statistics(ToThrift(val));
}
void Finish(int64_t num_values, int64_t dictionary_page_offset,
int64_t index_page_offset, int64_t data_page_offset,
int64_t compressed_size, int64_t uncompressed_size, bool has_dictionary,
bool dictionary_fallback,
const std::map<Encoding::type, int32_t>& dict_encoding_stats,
const std::map<Encoding::type, int32_t>& data_encoding_stats,
const std::shared_ptr<Encryptor>& encryptor) {
if (dictionary_page_offset > 0) {
column_chunk_->meta_data.__set_dictionary_page_offset(dictionary_page_offset);
column_chunk_->__set_file_offset(dictionary_page_offset + compressed_size);
} else {
column_chunk_->__set_file_offset(data_page_offset + compressed_size);
}
column_chunk_->__isset.meta_data = true;
column_chunk_->meta_data.__set_num_values(num_values);
if (index_page_offset >= 0) {
column_chunk_->meta_data.__set_index_page_offset(index_page_offset);
}
column_chunk_->meta_data.__set_data_page_offset(data_page_offset);
column_chunk_->meta_data.__set_total_uncompressed_size(uncompressed_size);
column_chunk_->meta_data.__set_total_compressed_size(compressed_size);
std::vector<format::Encoding::type> thrift_encodings;
std::vector<format::PageEncodingStats> thrift_encoding_stats;
auto add_encoding = [&thrift_encodings](format::Encoding::type value) {
auto it = std::find(thrift_encodings.begin(), thrift_encodings.end(), value);
if (it == thrift_encodings.end()) {
thrift_encodings.push_back(value);
}
};
// Add dictionary page encoding stats
if (has_dictionary) {
for (const auto& entry : dict_encoding_stats) {
format::PageEncodingStats dict_enc_stat;
dict_enc_stat.__set_page_type(format::PageType::DICTIONARY_PAGE);
// Dictionary Encoding would be PLAIN_DICTIONARY in v1 and
// PLAIN in v2.
format::Encoding::type dict_encoding = ToThrift(entry.first);
dict_enc_stat.__set_encoding(dict_encoding);
dict_enc_stat.__set_count(entry.second);
thrift_encoding_stats.push_back(dict_enc_stat);
add_encoding(dict_encoding);
}
}
// Always add encoding for RL/DL.
// BIT_PACKED is supported in `LevelEncoder`, but would only be used
// in benchmark and testing.
// And for now, we always add RLE even if there are no levels at all,
// while parquet-mr is more fine-grained.
add_encoding(format::Encoding::RLE);
// Add data page encoding stats
for (const auto& entry : data_encoding_stats) {
format::PageEncodingStats data_enc_stat;
data_enc_stat.__set_page_type(format::PageType::DATA_PAGE);
format::Encoding::type data_encoding = ToThrift(entry.first);
data_enc_stat.__set_encoding(data_encoding);
data_enc_stat.__set_count(entry.second);
thrift_encoding_stats.push_back(data_enc_stat);
add_encoding(data_encoding);
}
column_chunk_->meta_data.__set_encodings(thrift_encodings);
column_chunk_->meta_data.__set_encoding_stats(thrift_encoding_stats);
const auto& encrypt_md =
properties_->column_encryption_properties(column_->path()->ToDotString());
// column is encrypted
if (encrypt_md != nullptr && encrypt_md->is_encrypted()) {
column_chunk_->__isset.crypto_metadata = true;
format::ColumnCryptoMetaData ccmd;
if (encrypt_md->is_encrypted_with_footer_key()) {
// encrypted with footer key
ccmd.__isset.ENCRYPTION_WITH_FOOTER_KEY = true;
ccmd.__set_ENCRYPTION_WITH_FOOTER_KEY(format::EncryptionWithFooterKey());
} else { // encrypted with column key
format::EncryptionWithColumnKey eck;
eck.__set_key_metadata(encrypt_md->key_metadata());
eck.__set_path_in_schema(column_->path()->ToDotVector());
ccmd.__isset.ENCRYPTION_WITH_COLUMN_KEY = true;
ccmd.__set_ENCRYPTION_WITH_COLUMN_KEY(eck);
}
column_chunk_->__set_crypto_metadata(ccmd);
bool encrypted_footer =
properties_->file_encryption_properties()->encrypted_footer();
bool encrypt_metadata =
!encrypted_footer || !encrypt_md->is_encrypted_with_footer_key();
if (encrypt_metadata) {
ThriftSerializer serializer;
// Serialize and encrypt ColumnMetadata separately
// Thrift-serialize the ColumnMetaData structure,
// encrypt it with the column key, and write to encrypted_column_metadata
uint8_t* serialized_data;
uint32_t serialized_len;
serializer.SerializeToBuffer(&column_chunk_->meta_data, &serialized_len,
&serialized_data);
std::vector<uint8_t> encrypted_data(encryptor->CiphertextSizeDelta() +
serialized_len);
unsigned encrypted_len =
encryptor->Encrypt(serialized_data, serialized_len, encrypted_data.data());
const char* temp =
const_cast<const char*>(reinterpret_cast<char*>(encrypted_data.data()));
std::string encrypted_column_metadata(temp, encrypted_len);
column_chunk_->__set_encrypted_column_metadata(encrypted_column_metadata);
if (encrypted_footer) {
column_chunk_->__isset.meta_data = false;
} else {
// Keep redacted metadata version for old readers
column_chunk_->__isset.meta_data = true;
column_chunk_->meta_data.__isset.statistics = false;
column_chunk_->meta_data.__isset.encoding_stats = false;
}
}
}
}
void WriteTo(::arrow::io::OutputStream* sink) {
ThriftSerializer serializer;
serializer.Serialize(column_chunk_, sink);
}
const ColumnDescriptor* descr() const { return column_; }
int64_t total_compressed_size() const {
return column_chunk_->meta_data.total_compressed_size;
}
private:
void Init(format::ColumnChunk* column_chunk) {
column_chunk_ = column_chunk;
column_chunk_->meta_data.__set_type(ToThrift(column_->physical_type()));
column_chunk_->meta_data.__set_path_in_schema(column_->path()->ToDotVector());
column_chunk_->meta_data.__set_codec(
ToThrift(properties_->compression(column_->path())));
}
format::ColumnChunk* column_chunk_;
std::unique_ptr<format::ColumnChunk> owned_column_chunk_;
const std::shared_ptr<WriterProperties> properties_;
const ColumnDescriptor* column_;
};
std::unique_ptr<ColumnChunkMetaDataBuilder> ColumnChunkMetaDataBuilder::Make(
std::shared_ptr<WriterProperties> props, const ColumnDescriptor* column,
void* contents) {
return std::unique_ptr<ColumnChunkMetaDataBuilder>(
new ColumnChunkMetaDataBuilder(std::move(props), column, contents));
}
std::unique_ptr<ColumnChunkMetaDataBuilder> ColumnChunkMetaDataBuilder::Make(
std::shared_ptr<WriterProperties> props, const ColumnDescriptor* column) {
return std::unique_ptr<ColumnChunkMetaDataBuilder>(
new ColumnChunkMetaDataBuilder(std::move(props), column));
}
ColumnChunkMetaDataBuilder::ColumnChunkMetaDataBuilder(
std::shared_ptr<WriterProperties> props, const ColumnDescriptor* column)
: impl_{std::unique_ptr<ColumnChunkMetaDataBuilderImpl>(
new ColumnChunkMetaDataBuilderImpl(std::move(props), column))} {}
ColumnChunkMetaDataBuilder::ColumnChunkMetaDataBuilder(
std::shared_ptr<WriterProperties> props, const ColumnDescriptor* column,
void* contents)
: impl_{std::unique_ptr<ColumnChunkMetaDataBuilderImpl>(
new ColumnChunkMetaDataBuilderImpl(
std::move(props), column,
reinterpret_cast<format::ColumnChunk*>(contents)))} {}
ColumnChunkMetaDataBuilder::~ColumnChunkMetaDataBuilder() = default;
const void* ColumnChunkMetaDataBuilder::contents() const { return impl_->contents(); }
void ColumnChunkMetaDataBuilder::set_file_path(const std::string& path) {
impl_->set_file_path(path);
}
void ColumnChunkMetaDataBuilder::Finish(
int64_t num_values, int64_t dictionary_page_offset, int64_t index_page_offset,
int64_t data_page_offset, int64_t compressed_size, int64_t uncompressed_size,
bool has_dictionary, bool dictionary_fallback,
const std::map<Encoding::type, int32_t>& dict_encoding_stats,
const std::map<Encoding::type, int32_t>& data_encoding_stats,
const std::shared_ptr<Encryptor>& encryptor) {
impl_->Finish(num_values, dictionary_page_offset, index_page_offset, data_page_offset,
compressed_size, uncompressed_size, has_dictionary, dictionary_fallback,
dict_encoding_stats, data_encoding_stats, encryptor);
}
void ColumnChunkMetaDataBuilder::WriteTo(::arrow::io::OutputStream* sink) {
impl_->WriteTo(sink);
}
const ColumnDescriptor* ColumnChunkMetaDataBuilder::descr() const {
return impl_->descr();
}
void ColumnChunkMetaDataBuilder::SetStatistics(const EncodedStatistics& result) {
impl_->SetStatistics(result);
}
int64_t ColumnChunkMetaDataBuilder::total_compressed_size() const {
return impl_->total_compressed_size();
}
class RowGroupMetaDataBuilder::RowGroupMetaDataBuilderImpl {
public:
explicit RowGroupMetaDataBuilderImpl(std::shared_ptr<WriterProperties> props,
const SchemaDescriptor* schema, void* contents)
: properties_(std::move(props)), schema_(schema), next_column_(0) {
row_group_ = reinterpret_cast<format::RowGroup*>(contents);
InitializeColumns(schema->num_columns());
}
ColumnChunkMetaDataBuilder* NextColumnChunk() {
if (!(next_column_ < num_columns())) {
std::stringstream ss;
ss << "The schema only has " << num_columns()
<< " columns, requested metadata for column: " << next_column_;
throw ParquetException(ss.str());
}
auto column = schema_->Column(next_column_);
auto column_builder = ColumnChunkMetaDataBuilder::Make(
properties_, column, &row_group_->columns[next_column_++]);
auto column_builder_ptr = column_builder.get();
column_builders_.push_back(std::move(column_builder));
return column_builder_ptr;
}
int current_column() { return next_column_ - 1; }
void Finish(int64_t total_bytes_written, int16_t row_group_ordinal) {
if (!(next_column_ == schema_->num_columns())) {
std::stringstream ss;
ss << "Only " << next_column_ - 1 << " out of " << schema_->num_columns()
<< " columns are initialized";
throw ParquetException(ss.str());
}
int64_t file_offset = 0;
int64_t total_compressed_size = 0;
for (int i = 0; i < schema_->num_columns(); i++) {
if (!(row_group_->columns[i].file_offset >= 0)) {
std::stringstream ss;
ss << "Column " << i << " is not complete.";
throw ParquetException(ss.str());
}
if (i == 0) {
const format::ColumnMetaData& first_col = row_group_->columns[0].meta_data;
// As per spec, file_offset for the row group points to the first
// dictionary or data page of the column.
if (first_col.__isset.dictionary_page_offset &&
first_col.dictionary_page_offset > 0) {
file_offset = first_col.dictionary_page_offset;
} else {
file_offset = first_col.data_page_offset;
}
}
// sometimes column metadata is encrypted and not available to read,
// so we must get total_compressed_size from column builder
total_compressed_size += column_builders_[i]->total_compressed_size();
}
const auto& sorting_columns = properties_->sorting_columns();
if (!sorting_columns.empty()) {
std::vector<format::SortingColumn> thrift_sorting_columns(sorting_columns.size());
for (size_t i = 0; i < sorting_columns.size(); ++i) {
thrift_sorting_columns[i] = ToThrift(sorting_columns[i]);
}
row_group_->__set_sorting_columns(std::move(thrift_sorting_columns));
}
row_group_->__set_file_offset(file_offset);
row_group_->__set_total_compressed_size(total_compressed_size);
row_group_->__set_total_byte_size(total_bytes_written);
row_group_->__set_ordinal(row_group_ordinal);
}
void set_num_rows(int64_t num_rows) { row_group_->num_rows = num_rows; }
int num_columns() { return static_cast<int>(row_group_->columns.size()); }
int64_t num_rows() { return row_group_->num_rows; }
private:
void InitializeColumns(int ncols) { row_group_->columns.resize(ncols); }
format::RowGroup* row_group_;
const std::shared_ptr<WriterProperties> properties_;
const SchemaDescriptor* schema_;
std::vector<std::unique_ptr<ColumnChunkMetaDataBuilder>> column_builders_;
int next_column_;
};
std::unique_ptr<RowGroupMetaDataBuilder> RowGroupMetaDataBuilder::Make(
std::shared_ptr<WriterProperties> props, const SchemaDescriptor* schema_,
void* contents) {
return std::unique_ptr<RowGroupMetaDataBuilder>(
new RowGroupMetaDataBuilder(std::move(props), schema_, contents));
}
RowGroupMetaDataBuilder::RowGroupMetaDataBuilder(std::shared_ptr<WriterProperties> props,
const SchemaDescriptor* schema_,
void* contents)
: impl_{new RowGroupMetaDataBuilderImpl(std::move(props), schema_, contents)} {}
RowGroupMetaDataBuilder::~RowGroupMetaDataBuilder() = default;
ColumnChunkMetaDataBuilder* RowGroupMetaDataBuilder::NextColumnChunk() {
return impl_->NextColumnChunk();
}
int RowGroupMetaDataBuilder::current_column() const { return impl_->current_column(); }
int RowGroupMetaDataBuilder::num_columns() { return impl_->num_columns(); }
int64_t RowGroupMetaDataBuilder::num_rows() { return impl_->num_rows(); }
void RowGroupMetaDataBuilder::set_num_rows(int64_t num_rows) {
impl_->set_num_rows(num_rows);
}
void RowGroupMetaDataBuilder::Finish(int64_t total_bytes_written,
int16_t row_group_ordinal) {
impl_->Finish(total_bytes_written, row_group_ordinal);
}
// file metadata
class FileMetaDataBuilder::FileMetaDataBuilderImpl {
public:
explicit FileMetaDataBuilderImpl(
const SchemaDescriptor* schema, std::shared_ptr<WriterProperties> props,
std::shared_ptr<const KeyValueMetadata> key_value_metadata)
: metadata_(new format::FileMetaData()),
properties_(std::move(props)),
schema_(schema),
key_value_metadata_(std::move(key_value_metadata)) {
if (properties_->file_encryption_properties() != nullptr &&
properties_->file_encryption_properties()->encrypted_footer()) {
crypto_metadata_.reset(new format::FileCryptoMetaData());
}
}
RowGroupMetaDataBuilder* AppendRowGroup() {
row_groups_.emplace_back();
current_row_group_builder_ =
RowGroupMetaDataBuilder::Make(properties_, schema_, &row_groups_.back());
return current_row_group_builder_.get();
}
void SetPageIndexLocation(const PageIndexLocation& location) {
auto set_index_location =
[this](size_t row_group_ordinal,
const PageIndexLocation::FileIndexLocation& file_index_location,
bool column_index) {
auto& row_group_metadata = this->row_groups_.at(row_group_ordinal);
auto iter = file_index_location.find(row_group_ordinal);
if (iter != file_index_location.cend()) {
const auto& row_group_index_location = iter->second;
for (size_t i = 0; i < row_group_index_location.size(); ++i) {
if (i >= row_group_metadata.columns.size()) {
throw ParquetException("Cannot find metadata for column ordinal ", i);
}
auto& column_metadata = row_group_metadata.columns.at(i);
const auto& index_location = row_group_index_location.at(i);
if (index_location.has_value()) {
if (column_index) {
column_metadata.__set_column_index_offset(index_location->offset);
column_metadata.__set_column_index_length(index_location->length);
} else {
column_metadata.__set_offset_index_offset(index_location->offset);
column_metadata.__set_offset_index_length(index_location->length);
}
}
}
}
};
for (size_t i = 0; i < row_groups_.size(); ++i) {
set_index_location(i, location.column_index_location, true);
set_index_location(i, location.offset_index_location, false);
}
}
std::unique_ptr<FileMetaData> Finish(
const std::shared_ptr<const KeyValueMetadata>& key_value_metadata) {
int64_t total_rows = 0;
for (auto row_group : row_groups_) {
total_rows += row_group.num_rows;
}
metadata_->__set_num_rows(total_rows);
metadata_->__set_row_groups(row_groups_);
if (key_value_metadata_ || key_value_metadata) {
if (!key_value_metadata_) {
key_value_metadata_ = key_value_metadata;
} else if (key_value_metadata) {
key_value_metadata_ = key_value_metadata_->Merge(*key_value_metadata);
}
metadata_->key_value_metadata.clear();
metadata_->key_value_metadata.reserve(key_value_metadata_->size());
for (int64_t i = 0; i < key_value_metadata_->size(); ++i) {
format::KeyValue kv_pair;
kv_pair.__set_key(key_value_metadata_->key(i));
kv_pair.__set_value(key_value_metadata_->value(i));
metadata_->key_value_metadata.push_back(kv_pair);
}
metadata_->__isset.key_value_metadata = true;
}
int32_t file_version = 0;
switch (properties_->version()) {
case ParquetVersion::PARQUET_1_0:
file_version = 1;
break;
default:
file_version = 2;
break;
}
metadata_->__set_version(file_version);
metadata_->__set_created_by(properties_->created_by());
// Users cannot set the `ColumnOrder` since we do not have user defined sort order
// in the spec yet.
// We always default to `TYPE_DEFINED_ORDER`. We can expose it in
// the API once we have user defined sort orders in the Parquet format.
// TypeDefinedOrder implies choose SortOrder based on ConvertedType/PhysicalType
format::TypeDefinedOrder type_defined_order;
format::ColumnOrder column_order;
column_order.__set_TYPE_ORDER(type_defined_order);
column_order.__isset.TYPE_ORDER = true;
metadata_->column_orders.resize(schema_->num_columns(), column_order);
metadata_->__isset.column_orders = true;
// if plaintext footer, set footer signing algorithm
auto file_encryption_properties = properties_->file_encryption_properties();
if (file_encryption_properties && !file_encryption_properties->encrypted_footer()) {
EncryptionAlgorithm signing_algorithm;
EncryptionAlgorithm algo = file_encryption_properties->algorithm();
signing_algorithm.aad.aad_file_unique = algo.aad.aad_file_unique;
signing_algorithm.aad.supply_aad_prefix = algo.aad.supply_aad_prefix;
if (!algo.aad.supply_aad_prefix) {
signing_algorithm.aad.aad_prefix = algo.aad.aad_prefix;
}
signing_algorithm.algorithm = ParquetCipher::AES_GCM_V1;
metadata_->__set_encryption_algorithm(ToThrift(signing_algorithm));
const std::string& footer_signing_key_metadata =
file_encryption_properties->footer_key_metadata();
if (footer_signing_key_metadata.size() > 0) {
metadata_->__set_footer_signing_key_metadata(footer_signing_key_metadata);
}
}
ToParquet(static_cast<parquet::schema::GroupNode*>(schema_->schema_root().get()),
&metadata_->schema);
auto file_meta_data = std::unique_ptr<FileMetaData>(new FileMetaData());
file_meta_data->impl_->metadata_ = std::move(metadata_);
file_meta_data->impl_->InitSchema();
file_meta_data->impl_->InitKeyValueMetadata();
return file_meta_data;
}
std::unique_ptr<FileCryptoMetaData> BuildFileCryptoMetaData() {
if (crypto_metadata_ == nullptr) {
return nullptr;
}
auto file_encryption_properties = properties_->file_encryption_properties();
crypto_metadata_->__set_encryption_algorithm(
ToThrift(file_encryption_properties->algorithm()));
std::string key_metadata = file_encryption_properties->footer_key_metadata();
if (!key_metadata.empty()) {
crypto_metadata_->__set_key_metadata(key_metadata);
}
std::unique_ptr<FileCryptoMetaData> file_crypto_metadata(new FileCryptoMetaData());
file_crypto_metadata->impl_->metadata_ = std::move(*crypto_metadata_);
return file_crypto_metadata;
}
protected:
std::unique_ptr<format::FileMetaData> metadata_;
std::unique_ptr<format::FileCryptoMetaData> crypto_metadata_;
private:
const std::shared_ptr<WriterProperties> properties_;
std::vector<format::RowGroup> row_groups_;
std::unique_ptr<RowGroupMetaDataBuilder> current_row_group_builder_;
const SchemaDescriptor* schema_;
std::shared_ptr<const KeyValueMetadata> key_value_metadata_;
};
std::unique_ptr<FileMetaDataBuilder> FileMetaDataBuilder::Make(
const SchemaDescriptor* schema, std::shared_ptr<WriterProperties> props,
std::shared_ptr<const KeyValueMetadata> key_value_metadata) {
return std::unique_ptr<FileMetaDataBuilder>(
new FileMetaDataBuilder(schema, std::move(props), std::move(key_value_metadata)));
}
std::unique_ptr<FileMetaDataBuilder> FileMetaDataBuilder::Make(
const SchemaDescriptor* schema, std::shared_ptr<WriterProperties> props) {
return std::unique_ptr<FileMetaDataBuilder>(
new FileMetaDataBuilder(schema, std::move(props)));
}
FileMetaDataBuilder::FileMetaDataBuilder(
const SchemaDescriptor* schema, std::shared_ptr<WriterProperties> props,
std::shared_ptr<const KeyValueMetadata> key_value_metadata)
: impl_{std::unique_ptr<FileMetaDataBuilderImpl>(new FileMetaDataBuilderImpl(
schema, std::move(props), std::move(key_value_metadata)))} {}
FileMetaDataBuilder::~FileMetaDataBuilder() = default;
RowGroupMetaDataBuilder* FileMetaDataBuilder::AppendRowGroup() {
return impl_->AppendRowGroup();
}
void FileMetaDataBuilder::SetPageIndexLocation(const PageIndexLocation& location) {
impl_->SetPageIndexLocation(location);
}
std::unique_ptr<FileMetaData> FileMetaDataBuilder::Finish(
const std::shared_ptr<const KeyValueMetadata>& key_value_metadata) {
return impl_->Finish(key_value_metadata);
}
std::unique_ptr<FileCryptoMetaData> FileMetaDataBuilder::GetCryptoMetaData() {
return impl_->BuildFileCryptoMetaData();
}
} // namespace parquet