blob: f3a144d850b0a72885879bfaae5e3a47d97ddbe3 [file] [log] [blame]
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
#ifndef IMPALA_EXEC_PARQUET_METADATA_UTILS_H
#define IMPALA_EXEC_PARQUET_METADATA_UTILS_H
#include <string>
#include "runtime/descriptors.h"
#include "gen-cpp/parquet_types.h"
namespace impala {
class RuntimeState;
class ParquetMetadataUtils {
public:
/// Checks the version of the given file and returns a non-OK status if
/// Impala does not support that version.
static Status ValidateFileVersion(const parquet::FileMetaData& file_metadata,
const char* filename);
/// Validate column offsets by checking if the dictionary page comes before the data
/// pages and checking if the column offsets lie within the file.
static Status ValidateColumnOffsets(const string& filename, int64_t file_length,
const parquet::RowGroup& row_group);
/// Check that a file offset is in the file. Return an error status with a detailed
/// error message if it is not.
static Status ValidateOffsetInFile(const std::string& filename, int col_idx,
int64_t file_length, int64_t offset, const std::string& offset_name);
/// Validates the column metadata inside a row group to make sure this column is
/// supported (e.g. encoding, type, etc).
static Status ValidateRowGroupColumn(const parquet::FileMetaData& file_metadata,
const char* filename, int row_group_idx, int col_idx,
const parquet::SchemaElement& schema_element, RuntimeState* state);
/// Validates the column metadata to make sure the column is supported and its type
/// attributes conform to the parquet spec.
static Status ValidateColumn(const char* filename,
const parquet::SchemaElement& schema_element, const SlotDescriptor* slot_desc,
RuntimeState* state);
};
struct ParquetFileVersion {
/// Application that wrote the file. e.g. "IMPALA"
std::string application;
/// Version of the application that wrote the file, expressed in three parts
/// (<major>.<minor>.<patch>). Unspecified parts default to 0, and extra parts are
/// ignored. e.g.:
/// "1.2.3" => {1, 2, 3}
/// "1.2" => {1, 2, 0}
/// "1.2-cdh5" => {1, 2, 0}
struct {
int major;
int minor;
int patch;
} version;
/// If true, this file was generated by an Impala internal release
bool is_impala_internal;
ParquetFileVersion() : is_impala_internal(false) { }
/// Parses the version from the created_by string
ParquetFileVersion(const std::string& created_by);
/// Returns true if version is strictly less than <major>.<minor>.<patch>
bool VersionLt(int major, int minor = 0, int patch = 0) const;
/// Returns true if version is equal to <major>.<minor>.<patch>
bool VersionEq(int major, int minor, int patch) const;
};
/// Internal representation of a Parquet schema (including nested-type columns).
struct SchemaNode {
/// The corresponding schema element defined in the file metadata
const parquet::SchemaElement* element;
/// The index into the RowGroup::columns list if this column is materialized in the
/// file (i.e. it's a scalar type). -1 for nested types.
int col_idx;
/// The maximum definition level of this column, i.e., the definition level that
/// corresponds to a non-NULL value. Valid values are >= 0.
int max_def_level;
/// The maximum repetition level of this column. Valid values are >= 0.
int max_rep_level;
/// The definition level of the most immediate ancestor of this node with repeated
/// field repetition type. 0 if there are no repeated ancestors.
int def_level_of_immediate_repeated_ancestor;
/// Any nested schema nodes. Empty for non-nested types.
std::vector<SchemaNode> children;
SchemaNode() : element(NULL), col_idx(-1), max_def_level(-1), max_rep_level(-1),
def_level_of_immediate_repeated_ancestor(-1) { }
std::string DebugString(int indent = 0) const;
bool is_repeated() const {
return element->repetition_type == parquet::FieldRepetitionType::REPEATED;
}
};
/// Utility class to resolve SchemaPaths (e.g., from a table descriptor) against a
/// Parquet file schema. Supports resolution by field index or by field name.
/// Supports different policies for resolving nested arrays based on the modern
/// three-level encoding or the legacy encodings (one and two level).
class ParquetSchemaResolver {
public:
ParquetSchemaResolver(const HdfsTableDescriptor& tbl_desc,
TParquetFallbackSchemaResolution::type fallback_schema_resolution,
TParquetArrayResolution::type array_resolution)
: tbl_desc_(tbl_desc),
fallback_schema_resolution_(fallback_schema_resolution),
array_resolution_(array_resolution),
filename_(NULL) {
}
/// Parses the schema of the given file metadata into an internal schema
/// representation used in path resolution. Remembers the filename for error
/// reporting. Returns a non-OK status if the Parquet schema could not be parsed.
Status Init(const parquet::FileMetaData* file_metadata, const char* filename) {
DCHECK(filename != NULL);
filename_ = filename;
return CreateSchemaTree(file_metadata->schema, &schema_);
}
/// Traverses 'schema_' according to 'path', returning the result in 'node'. If 'path'
/// does not exist in this file's schema, 'missing_field' is set to true and
/// Status::OK() is returned, otherwise 'missing_field' is set to false. If 'path'
/// resolves to a collection position field, *pos_field is set to true. Otherwise
/// 'pos_field' is set to false. Returns a non-OK status if 'path' cannot be resolved
/// against the file's schema (e.g., unrecognized collection schema).
///
/// Tries to resolve fields within lists according to the 'ordered_array_encodings_'.
/// Returns a bad status if resolution fails for all attempted array encodings.
Status ResolvePath(const SchemaPath& path, SchemaNode** node, bool* pos_field,
bool* missing_field) const;
private:
/// The 'array_encoding' parameter determines whether to assume one-, two-, or
/// three-level array encoding. The returned status is not logged (i.e. it's an expected
/// error).
enum ArrayEncoding {
ONE_LEVEL,
TWO_LEVEL,
THREE_LEVEL,
NUM_ARRAY_ENCODINGS
};
/// An arbitrary limit on the number of children per schema node we support.
/// Used to sanity-check Parquet schemas.
static const int SCHEMA_NODE_CHILDREN_SANITY_LIMIT = 64 * 1024;
/// Maps from the array-resolution policy to the ordered array encodings that should
/// be tried during path resolution. All entries have the ONE_LEVEL encoding at the end
/// because there is no ambiguity between the one-level and the other encodings (there
/// is no harm in trying it).
static const std::vector<ArrayEncoding> ORDERED_ARRAY_ENCODINGS[];
/// Unflattens the schema metadata from a Parquet file metadata and converts it to our
/// SchemaNode representation. Returns the result in 'node' unless an error status is
/// returned. Does not set the slot_desc field of any SchemaNode.
Status CreateSchemaTree(const std::vector<parquet::SchemaElement>& schema,
SchemaNode* node) const;
/// Recursive implementation used internally by the above CreateSchemaTree() function.
Status CreateSchemaTree(const std::vector<parquet::SchemaElement>& schema,
int max_def_level, int max_rep_level, int ira_def_level, int* idx, int* col_idx,
SchemaNode* node) const;
Status ResolvePathHelper(ArrayEncoding array_encoding, const SchemaPath& path,
SchemaNode** node, bool* pos_field, bool* missing_field) const;
/// Helper functions for ResolvePathHelper().
/// Advances 'node' to one of its children based on path[next_idx] and
/// 'col_type'. 'col_type' is NULL if 'node' is the root node, otherwise it's the type
/// associated with 'node'. Returns the child node or sets 'missing_field' to true.
SchemaNode* NextSchemaNode(const ColumnType* col_type, const SchemaPath& path,
int next_idx, SchemaNode* node, bool* missing_field) const;
/// Returns the index of 'node's child with 'name', or the number of children if not
/// found. The name comparison is case-insensitive because that's how Impala treats
/// db/table/column/field names. If there are several matches with different casing,
/// then the index of the first match is returned.
int FindChildWithName(SchemaNode* node, const string& name) const;
/// The ResolvePathHelper() logic for arrays.
Status ResolveArray(ArrayEncoding array_encoding, const SchemaPath& path, int idx,
SchemaNode** node, bool* pos_field, bool* missing_field) const;
/// The ResolvePathHelper() logic for maps.
Status ResolveMap(const SchemaPath& path, int idx, SchemaNode** node,
bool* missing_field) const;
/// The ResolvePathHelper() logic for scalars (just does validation since there's no
/// more actual work to be done).
Status ValidateScalarNode(const SchemaNode& node, const ColumnType& col_type,
const SchemaPath& path, int idx) const;
const HdfsTableDescriptor& tbl_desc_;
const TParquetFallbackSchemaResolution::type fallback_schema_resolution_;
const TParquetArrayResolution::type array_resolution_;
const char* filename_;
/// Root node of our internal schema representation populated in Init().
SchemaNode schema_;
};
} // impala namespace
#endif