| // Licensed to the Apache Software Foundation (ASF) under one |
| // or more contributor license agreements. See the NOTICE file |
| // distributed with this work for additional information |
| // regarding copyright ownership. The ASF licenses this file |
| // to you under the Apache License, Version 2.0 (the |
| // "License"); you may not use this file except in compliance |
| // with the License. You may obtain a copy of the License at |
| // |
| // http://www.apache.org/licenses/LICENSE-2.0 |
| // |
| // Unless required by applicable law or agreed to in writing, |
| // software distributed under the License is distributed on an |
| // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| // KIND, either express or implied. See the License for the |
| // specific language governing permissions and limitations |
| // under the License. |
| |
| #include "exec/parquet-metadata-utils.h" |
| |
| #include <string> |
| #include <sstream> |
| #include <vector> |
| |
| #include <boost/algorithm/string.hpp> |
| #include <gutil/strings/substitute.h> |
| |
| #include "common/logging.h" |
| #include "common/status.h" |
| #include "exec/parquet-common.h" |
| #include "runtime/runtime-state.h" |
| #include "util/debug-util.h" |
| |
| #include "common/names.h" |
| |
| using boost::algorithm::is_any_of; |
| using boost::algorithm::split; |
| using boost::algorithm::token_compress_on; |
| |
| namespace impala { |
| |
| Status ParquetMetadataUtils::ValidateFileVersion( |
| const parquet::FileMetaData& file_metadata, const char* filename) { |
| if (file_metadata.version > PARQUET_CURRENT_VERSION) { |
| stringstream ss; |
| ss << "File: " << filename << " is of an unsupported version. " |
| << "file version: " << file_metadata.version; |
| return Status(ss.str()); |
| } |
| return Status::OK(); |
| } |
| |
| Status ParquetMetadataUtils::ValidateColumnOffsets(const string& filename, |
| int64_t file_length, const parquet::RowGroup& row_group) { |
| for (int i = 0; i < row_group.columns.size(); ++i) { |
| const parquet::ColumnChunk& col_chunk = row_group.columns[i]; |
| RETURN_IF_ERROR(ValidateOffsetInFile(filename, i, file_length, |
| col_chunk.meta_data.data_page_offset, "data page offset")); |
| int64_t col_start = col_chunk.meta_data.data_page_offset; |
| // The file format requires that if a dictionary page exists, it be before data pages. |
| if (col_chunk.meta_data.__isset.dictionary_page_offset) { |
| RETURN_IF_ERROR(ValidateOffsetInFile(filename, i, file_length, |
| col_chunk.meta_data.dictionary_page_offset, "dictionary page offset")); |
| if (col_chunk.meta_data.dictionary_page_offset >= col_start) { |
| return Status(Substitute("Parquet file '$0': metadata is corrupt. Dictionary " |
| "page (offset=$1) must come before any data pages (offset=$2).", |
| filename, col_chunk.meta_data.dictionary_page_offset, col_start)); |
| } |
| col_start = col_chunk.meta_data.dictionary_page_offset; |
| } |
| int64_t col_len = col_chunk.meta_data.total_compressed_size; |
| int64_t col_end = col_start + col_len; |
| if (col_end <= 0 || col_end > file_length) { |
| return Status(Substitute("Parquet file '$0': metadata is corrupt. Column $1 has " |
| "invalid column offsets (offset=$2, size=$3, file_size=$4).", filename, i, |
| col_start, col_len, file_length)); |
| } |
| } |
| return Status::OK(); |
| } |
| |
| Status ParquetMetadataUtils::ValidateOffsetInFile(const string& filename, int col_idx, |
| int64_t file_length, int64_t offset, const string& offset_name) { |
| if (offset < 0 || offset >= file_length) { |
| return Status(Substitute("File '$0': metadata is corrupt. Column $1 has invalid " |
| "$2 (offset=$3 file_size=$4).", filename, col_idx, offset_name, offset, |
| file_length)); |
| } |
| return Status::OK();; |
| } |
| |
| static bool IsEncodingSupported(parquet::Encoding::type e) { |
| switch (e) { |
| case parquet::Encoding::PLAIN: |
| case parquet::Encoding::PLAIN_DICTIONARY: |
| case parquet::Encoding::BIT_PACKED: |
| case parquet::Encoding::RLE: |
| return true; |
| default: |
| return false; |
| } |
| } |
| |
| Status ParquetMetadataUtils::ValidateColumn(const parquet::FileMetaData& file_metadata, |
| const char* filename, int row_group_idx, int col_idx, |
| const parquet::SchemaElement& schema_element, const SlotDescriptor* slot_desc, |
| RuntimeState* state) { |
| const parquet::ColumnChunk& file_data = |
| file_metadata.row_groups[row_group_idx].columns[col_idx]; |
| |
| // Check the encodings are supported. |
| const vector<parquet::Encoding::type>& encodings = file_data.meta_data.encodings; |
| for (int i = 0; i < encodings.size(); ++i) { |
| if (!IsEncodingSupported(encodings[i])) { |
| stringstream ss; |
| ss << "File '" << filename << "' uses an unsupported encoding: " |
| << PrintEncoding(encodings[i]) << " for column '" << schema_element.name |
| << "'."; |
| return Status(ss.str()); |
| } |
| } |
| |
| // Check the compression is supported. |
| if (file_data.meta_data.codec != parquet::CompressionCodec::UNCOMPRESSED && |
| file_data.meta_data.codec != parquet::CompressionCodec::SNAPPY && |
| file_data.meta_data.codec != parquet::CompressionCodec::GZIP) { |
| stringstream ss; |
| ss << "File '" << filename << "' uses an unsupported compression: " |
| << file_data.meta_data.codec << " for column '" << schema_element.name |
| << "'."; |
| return Status(ss.str()); |
| } |
| |
| // Validation after this point is only if col_reader is reading values. |
| if (slot_desc == NULL) return Status::OK(); |
| |
| parquet::Type::type type = IMPALA_TO_PARQUET_TYPES[slot_desc->type().type]; |
| if (UNLIKELY(type != file_data.meta_data.type)) { |
| return Status(Substitute("Unexpected Parquet type in file '$0' metadata expected $1 " |
| "actual $2: file may be corrupt", filename, type, file_data.meta_data.type)); |
| } |
| |
| // Check the decimal scale in the file matches the metastore scale and precision. |
| // We fail the query if the metadata makes it impossible for us to safely read |
| // the file. If we don't require the metadata, we will fail the query if |
| // abort_on_error is true, otherwise we will just log a warning. |
| bool is_converted_type_decimal = schema_element.__isset.converted_type && |
| schema_element.converted_type == parquet::ConvertedType::DECIMAL; |
| if (slot_desc->type().type == TYPE_DECIMAL) { |
| // We require that the scale and byte length be set. |
| if (schema_element.type != parquet::Type::FIXED_LEN_BYTE_ARRAY) { |
| stringstream ss; |
| ss << "File '" << filename << "' column '" << schema_element.name |
| << "' should be a decimal column encoded using FIXED_LEN_BYTE_ARRAY."; |
| return Status(ss.str()); |
| } |
| |
| if (!schema_element.__isset.type_length) { |
| stringstream ss; |
| ss << "File '" << filename << "' column '" << schema_element.name |
| << "' does not have type_length set."; |
| return Status(ss.str()); |
| } |
| |
| int expected_len = ParquetPlainEncoder::DecimalSize(slot_desc->type()); |
| if (schema_element.type_length != expected_len) { |
| stringstream ss; |
| ss << "File '" << filename << "' column '" << schema_element.name |
| << "' has an invalid type length. Expecting: " << expected_len |
| << " len in file: " << schema_element.type_length; |
| return Status(ss.str()); |
| } |
| |
| if (!schema_element.__isset.scale) { |
| stringstream ss; |
| ss << "File '" << filename << "' column '" << schema_element.name |
| << "' does not have the scale set."; |
| return Status(ss.str()); |
| } |
| |
| if (schema_element.scale != slot_desc->type().scale) { |
| // TODO: we could allow a mismatch and do a conversion at this step. |
| stringstream ss; |
| ss << "File '" << filename << "' column '" << schema_element.name |
| << "' has a scale that does not match the table metadata scale." |
| << " File metadata scale: " << schema_element.scale |
| << " Table metadata scale: " << slot_desc->type().scale; |
| return Status(ss.str()); |
| } |
| |
| // The other decimal metadata should be there but we don't need it. |
| if (!schema_element.__isset.precision) { |
| ErrorMsg msg(TErrorCode::PARQUET_MISSING_PRECISION, filename, |
| schema_element.name); |
| RETURN_IF_ERROR(state->LogOrReturnError(msg)); |
| } else { |
| if (schema_element.precision != slot_desc->type().precision) { |
| // TODO: we could allow a mismatch and do a conversion at this step. |
| ErrorMsg msg(TErrorCode::PARQUET_WRONG_PRECISION, filename, schema_element.name, |
| schema_element.precision, slot_desc->type().precision); |
| RETURN_IF_ERROR(state->LogOrReturnError(msg)); |
| } |
| } |
| |
| if (!is_converted_type_decimal) { |
| // TODO: is this validation useful? It is not required at all to read the data and |
| // might only serve to reject otherwise perfectly readable files. |
| ErrorMsg msg(TErrorCode::PARQUET_BAD_CONVERTED_TYPE, filename, |
| schema_element.name); |
| RETURN_IF_ERROR(state->LogOrReturnError(msg)); |
| } |
| } else if (schema_element.__isset.scale || schema_element.__isset.precision || |
| is_converted_type_decimal) { |
| ErrorMsg msg(TErrorCode::PARQUET_INCOMPATIBLE_DECIMAL, filename, |
| schema_element.name, slot_desc->type().DebugString()); |
| RETURN_IF_ERROR(state->LogOrReturnError(msg)); |
| } |
| return Status::OK(); |
| } |
| |
| ParquetFileVersion::ParquetFileVersion(const string& created_by) { |
| string created_by_lower = created_by; |
| std::transform(created_by_lower.begin(), created_by_lower.end(), |
| created_by_lower.begin(), ::tolower); |
| is_impala_internal = false; |
| |
| vector<string> tokens; |
| split(tokens, created_by_lower, is_any_of(" "), token_compress_on); |
| // Boost always creates at least one token |
| DCHECK_GT(tokens.size(), 0); |
| application = tokens[0]; |
| |
| if (tokens.size() >= 3 && tokens[1] == "version") { |
| string version_string = tokens[2]; |
| // Ignore any trailing nodextra characters |
| int n = version_string.find_first_not_of("0123456789."); |
| string version_string_trimmed = version_string.substr(0, n); |
| |
| vector<string> version_tokens; |
| split(version_tokens, version_string_trimmed, is_any_of(".")); |
| version.major = version_tokens.size() >= 1 ? atoi(version_tokens[0].c_str()) : 0; |
| version.minor = version_tokens.size() >= 2 ? atoi(version_tokens[1].c_str()) : 0; |
| version.patch = version_tokens.size() >= 3 ? atoi(version_tokens[2].c_str()) : 0; |
| |
| if (application == "impala") { |
| if (version_string.find("-internal") != string::npos) is_impala_internal = true; |
| } |
| } else { |
| version.major = 0; |
| version.minor = 0; |
| version.patch = 0; |
| } |
| } |
| |
| bool ParquetFileVersion::VersionLt(int major, int minor, int patch) const { |
| if (version.major < major) return true; |
| if (version.major > major) return false; |
| DCHECK_EQ(version.major, major); |
| if (version.minor < minor) return true; |
| if (version.minor > minor) return false; |
| DCHECK_EQ(version.minor, minor); |
| return version.patch < patch; |
| } |
| |
| bool ParquetFileVersion::VersionEq(int major, int minor, int patch) const { |
| return version.major == major && version.minor == minor && version.patch == patch; |
| } |
| |
| static string PrintRepetitionType(const parquet::FieldRepetitionType::type& t) { |
| switch (t) { |
| case parquet::FieldRepetitionType::REQUIRED: return "required"; |
| case parquet::FieldRepetitionType::OPTIONAL: return "optional"; |
| case parquet::FieldRepetitionType::REPEATED: return "repeated"; |
| default: return "<unknown>"; |
| } |
| } |
| |
| static string PrintParquetType(const parquet::Type::type& t) { |
| switch (t) { |
| case parquet::Type::BOOLEAN: return "boolean"; |
| case parquet::Type::INT32: return "int32"; |
| case parquet::Type::INT64: return "int64"; |
| case parquet::Type::INT96: return "int96"; |
| case parquet::Type::FLOAT: return "float"; |
| case parquet::Type::DOUBLE: return "double"; |
| case parquet::Type::BYTE_ARRAY: return "byte_array"; |
| case parquet::Type::FIXED_LEN_BYTE_ARRAY: return "fixed_len_byte_array"; |
| default: return "<unknown>"; |
| } |
| } |
| |
| string SchemaNode::DebugString(int indent) const { |
| stringstream ss; |
| for (int i = 0; i < indent; ++i) ss << " "; |
| ss << PrintRepetitionType(element->repetition_type) << " "; |
| if (element->num_children > 0) { |
| ss << "struct"; |
| } else { |
| ss << PrintParquetType(element->type); |
| } |
| ss << " " << element->name << " [i:" << col_idx << " d:" << max_def_level |
| << " r:" << max_rep_level << "]"; |
| if (element->num_children > 0) { |
| ss << " {" << endl; |
| for (int i = 0; i < element->num_children; ++i) { |
| ss << children[i].DebugString(indent + 2) << endl; |
| } |
| for (int i = 0; i < indent; ++i) ss << " "; |
| ss << "}"; |
| } |
| return ss.str(); |
| } |
| |
| Status ParquetSchemaResolver::CreateSchemaTree(const vector<parquet::SchemaElement>& schema, |
| SchemaNode* node) const { |
| int idx = 0; |
| int col_idx = 0; |
| RETURN_IF_ERROR(CreateSchemaTree(schema, 0, 0, 0, &idx, &col_idx, node)); |
| if (node->children.empty()) { |
| return Status(Substitute("Invalid file: '$0' has no columns.", filename_)); |
| } |
| return Status::OK(); |
| } |
| |
| Status ParquetSchemaResolver::CreateSchemaTree( |
| const vector<parquet::SchemaElement>& schema, int max_def_level, int max_rep_level, |
| int ira_def_level, int* idx, int* col_idx, SchemaNode* node) |
| const { |
| if (*idx >= schema.size()) { |
| return Status(Substitute("File '$0' corrupt: could not reconstruct schema tree from " |
| "flattened schema in file metadata", filename_)); |
| } |
| node->element = &schema[*idx]; |
| ++(*idx); |
| |
| if (node->element->num_children == 0) { |
| // node is a leaf node, meaning it's materialized in the file and appears in |
| // file_metadata_.row_groups.columns |
| node->col_idx = *col_idx; |
| ++(*col_idx); |
| } else if (node->element->num_children > SCHEMA_NODE_CHILDREN_SANITY_LIMIT) { |
| // Sanity-check the schema to avoid allocating absurdly large buffers below. |
| return Status(Substitute("Schema in Parquet file '$0' has $1 children, more than limit of " |
| "$2. File is likely corrupt", filename_, node->element->num_children, |
| SCHEMA_NODE_CHILDREN_SANITY_LIMIT)); |
| } else if (node->element->num_children < 0) { |
| return Status(Substitute("Corrupt Parquet file '$0': schema element has $1 children.", |
| filename_, node->element->num_children)); |
| } |
| |
| // def_level_of_immediate_repeated_ancestor does not include this node, so set before |
| // updating ira_def_level |
| node->def_level_of_immediate_repeated_ancestor = ira_def_level; |
| |
| if (node->element->repetition_type == parquet::FieldRepetitionType::OPTIONAL) { |
| ++max_def_level; |
| } else if (node->element->repetition_type == parquet::FieldRepetitionType::REPEATED) { |
| ++max_rep_level; |
| // Repeated fields add a definition level. This is used to distinguish between an |
| // empty list and a list with an item in it. |
| ++max_def_level; |
| // node is the new most immediate repeated ancestor |
| ira_def_level = max_def_level; |
| } |
| node->max_def_level = max_def_level; |
| node->max_rep_level = max_rep_level; |
| |
| node->children.resize(node->element->num_children); |
| for (int i = 0; i < node->element->num_children; ++i) { |
| RETURN_IF_ERROR(CreateSchemaTree(schema, max_def_level, max_rep_level, ira_def_level, |
| idx, col_idx, &node->children[i])); |
| } |
| return Status::OK(); |
| } |
| |
| Status ParquetSchemaResolver::ResolvePath(const SchemaPath& path, SchemaNode** node, |
| bool* pos_field, bool* missing_field) const { |
| *missing_field = false; |
| // First try two-level array encoding. |
| bool missing_field_two_level; |
| Status status_two_level = |
| ResolvePathHelper(TWO_LEVEL, path, node, pos_field, &missing_field_two_level); |
| if (missing_field_two_level) DCHECK(status_two_level.ok()); |
| if (status_two_level.ok() && !missing_field_two_level) return Status::OK(); |
| // The two-level resolution failed or reported a missing field, try three-level array |
| // encoding. |
| bool missing_field_three_level; |
| Status status_three_level = |
| ResolvePathHelper(THREE_LEVEL, path, node, pos_field, &missing_field_three_level); |
| if (missing_field_three_level) DCHECK(status_three_level.ok()); |
| if (status_three_level.ok() && !missing_field_three_level) return Status::OK(); |
| // The three-level resolution failed or reported a missing field, try one-level array |
| // encoding. |
| bool missing_field_one_level; |
| Status status_one_level = |
| ResolvePathHelper(ONE_LEVEL, path, node, pos_field, &missing_field_one_level); |
| if (missing_field_one_level) DCHECK(status_one_level.ok()); |
| if (status_one_level.ok() && !missing_field_one_level) return Status::OK(); |
| // None of resolutions yielded a node. Set *missing_field to true if any of the |
| // resolutions reported a missing a field. |
| if (missing_field_one_level || missing_field_two_level || missing_field_three_level) { |
| *node = NULL; |
| *missing_field = true; |
| return Status::OK(); |
| } |
| // All resolutions failed. Log and return the status from the three-level resolution |
| // (which is technically the standard). |
| DCHECK(!status_one_level.ok() && !status_two_level.ok() && !status_three_level.ok()); |
| *node = NULL; |
| VLOG_QUERY << status_three_level.msg().msg() << "\n" << GetStackTrace(); |
| return status_three_level; |
| } |
| |
| Status ParquetSchemaResolver::ResolvePathHelper(ArrayEncoding array_encoding, |
| const SchemaPath& path, SchemaNode** node, bool* pos_field, bool* missing_field) const { |
| DCHECK(schema_.element != NULL) |
| << "schema_ must be initialized before calling ResolvePath()"; |
| |
| *pos_field = false; |
| *missing_field = false; |
| *node = const_cast<SchemaNode*>(&schema_); |
| const ColumnType* col_type = NULL; |
| |
| // Traverse 'path' and resolve 'node' to the corresponding SchemaNode in 'schema_' (by |
| // ordinal), or set 'node' to NULL if 'path' doesn't exist in this file's schema. |
| for (int i = 0; i < path.size(); ++i) { |
| // Advance '*node' if necessary |
| if (i == 0 || col_type->type != TYPE_ARRAY || array_encoding == THREE_LEVEL) { |
| *node = NextSchemaNode(col_type, path, i, *node, missing_field); |
| if (*missing_field) return Status::OK(); |
| } else { |
| // We just resolved an array, meaning *node is set to the repeated field of the |
| // array. Since we are trying to resolve using one- or two-level array encoding, the |
| // repeated field represents both the array and the array's item (i.e. there is no |
| // explict item field), so we don't advance *node in this case. |
| DCHECK(col_type != NULL); |
| DCHECK_EQ(col_type->type, TYPE_ARRAY); |
| DCHECK(array_encoding == ONE_LEVEL || array_encoding == TWO_LEVEL); |
| DCHECK((*node)->is_repeated()); |
| } |
| |
| // Advance 'col_type' |
| int table_idx = path[i]; |
| col_type = i == 0 ? &tbl_desc_.col_descs()[table_idx].type() |
| : &col_type->children[table_idx]; |
| |
| // Resolve path[i] |
| if (col_type->type == TYPE_ARRAY) { |
| DCHECK_EQ(col_type->children.size(), 1); |
| RETURN_IF_ERROR( |
| ResolveArray(array_encoding, path, i, node, pos_field, missing_field)); |
| if (*missing_field || *pos_field) return Status::OK(); |
| } else if (col_type->type == TYPE_MAP) { |
| DCHECK_EQ(col_type->children.size(), 2); |
| RETURN_IF_ERROR(ResolveMap(path, i, node, missing_field)); |
| if (*missing_field) return Status::OK(); |
| } else if (col_type->type == TYPE_STRUCT) { |
| DCHECK_GT(col_type->children.size(), 0); |
| // Nothing to do for structs |
| } else { |
| DCHECK(!col_type->IsComplexType()); |
| DCHECK_EQ(i, path.size() - 1); |
| RETURN_IF_ERROR(ValidateScalarNode(**node, *col_type, path, i)); |
| } |
| } |
| DCHECK(*node != NULL); |
| return Status::OK(); |
| } |
| |
| SchemaNode* ParquetSchemaResolver::NextSchemaNode( |
| const ColumnType* col_type, const SchemaPath& path, int next_idx, SchemaNode* node, |
| bool* missing_field) const { |
| DCHECK_LT(next_idx, path.size()); |
| if (next_idx != 0) DCHECK(col_type != NULL); |
| |
| int file_idx; |
| int table_idx = path[next_idx]; |
| if (fallback_schema_resolution_ == TParquetFallbackSchemaResolution::type::NAME) { |
| if (next_idx == 0) { |
| // Resolve top-level table column by name. |
| DCHECK_LT(table_idx, tbl_desc_.col_descs().size()); |
| const string& name = tbl_desc_.col_descs()[table_idx].name(); |
| file_idx = FindChildWithName(node, name); |
| } else if (col_type->type == TYPE_STRUCT) { |
| // Resolve struct field by name. |
| DCHECK_LT(table_idx, col_type->field_names.size()); |
| const string& name = col_type->field_names[table_idx]; |
| file_idx = FindChildWithName(node, name); |
| } else if (col_type->type == TYPE_ARRAY) { |
| // Arrays have only one child in the file. |
| DCHECK_EQ(table_idx, SchemaPathConstants::ARRAY_ITEM); |
| file_idx = table_idx; |
| } else { |
| DCHECK_EQ(col_type->type, TYPE_MAP); |
| // Maps have two values, "key" and "value". These are supposed to be ordered and may |
| // not have the right field names, but try to resolve by name in case they're |
| // switched and otherwise use the order. See |
| // https://github.com/apache/parquet-format/blob/master/LogicalTypes.md#maps for |
| // more details. |
| DCHECK(table_idx == SchemaPathConstants::MAP_KEY || |
| table_idx == SchemaPathConstants::MAP_VALUE); |
| const string& name = table_idx == SchemaPathConstants::MAP_KEY ? "key" : "value"; |
| file_idx = FindChildWithName(node, name); |
| if (file_idx >= node->children.size()) { |
| // Couldn't resolve by name, fall back to resolution by position. |
| file_idx = table_idx; |
| } |
| } |
| } else { |
| // Resolution by position. |
| DCHECK_EQ(fallback_schema_resolution_, |
| TParquetFallbackSchemaResolution::type::POSITION); |
| if (next_idx == 0) { |
| // For top-level columns, the first index in a path includes the table's partition |
| // keys. |
| file_idx = table_idx - tbl_desc_.num_clustering_cols(); |
| } else { |
| file_idx = table_idx; |
| } |
| } |
| |
| if (file_idx >= node->children.size()) { |
| string schema_resolution_mode = "unknown"; |
| auto entry = _TParquetFallbackSchemaResolution_VALUES_TO_NAMES.find( |
| fallback_schema_resolution_); |
| if (entry != _TParquetFallbackSchemaResolution_VALUES_TO_NAMES.end()) { |
| schema_resolution_mode = entry->second; |
| } |
| VLOG_FILE << Substitute( |
| "File '$0' does not contain path '$1' (resolving by $2)", filename_, |
| PrintPath(tbl_desc_, path), schema_resolution_mode); |
| *missing_field = true; |
| return NULL; |
| } |
| return &node->children[file_idx]; |
| } |
| |
| int ParquetSchemaResolver::FindChildWithName(SchemaNode* node, |
| const string& name) const { |
| int idx; |
| for (idx = 0; idx < node->children.size(); ++idx) { |
| if (node->children[idx].element->name == name) break; |
| } |
| return idx; |
| } |
| |
| // There are three types of array encodings: |
| // |
| // 1. One-level encoding |
| // A bare repeated field. This is interpreted as a required array of required |
| // items. |
| // Example: |
| // repeated <item-type> item; |
| // |
| // 2. Two-level encoding |
| // A group containing a single repeated field. This is interpreted as a |
| // <list-repetition> array of required items (<list-repetition> is either |
| // optional or required). |
| // Example: |
| // <list-repetition> group <name> { |
| // repeated <item-type> item; |
| // } |
| // |
| // 3. Three-level encoding |
| // The "official" encoding according to the parquet spec. A group containing a |
| // single repeated group containing the item field. This is interpreted as a |
| // <list-repetition> array of <item-repetition> items (<list-repetition> and |
| // <item-repetition> are each either optional or required). |
| // Example: |
| // <list-repetition> group <name> { |
| // repeated group list { |
| // <item-repetition> <item-type> item; |
| // } |
| // } |
| // |
| // We ignore any field annotations or names, making us more permissive than the |
| // Parquet spec dictates. Note that in any of the encodings, <item-type> may be a |
| // group containing more fields, which corresponds to a complex item type. See |
| // https://github.com/apache/parquet-format/blob/master/LogicalTypes.md#lists for |
| // more details and examples. |
| // |
| // This function resolves the array at '*node' assuming one-, two-, or three-level |
| // encoding, determined by 'array_encoding'. '*node' is set to the repeated field for all |
| // three encodings (unless '*pos_field' or '*missing_field' are set to true). |
| Status ParquetSchemaResolver::ResolveArray(ArrayEncoding array_encoding, |
| const SchemaPath& path, int idx, SchemaNode** node, bool* pos_field, |
| bool* missing_field) const { |
| if (array_encoding == ONE_LEVEL) { |
| if (!(*node)->is_repeated()) { |
| ErrorMsg msg(TErrorCode::PARQUET_UNRECOGNIZED_SCHEMA, filename_, |
| PrintSubPath(tbl_desc_, path, idx), "array", (*node)->DebugString()); |
| return Status::Expected(msg); |
| } |
| } else { |
| // In the multi-level case, we always expect the outer group to contain a single |
| // repeated field |
| if ((*node)->children.size() != 1 || !(*node)->children[0].is_repeated()) { |
| ErrorMsg msg(TErrorCode::PARQUET_UNRECOGNIZED_SCHEMA, filename_, |
| PrintSubPath(tbl_desc_, path, idx), "array", (*node)->DebugString()); |
| return Status::Expected(msg); |
| } |
| // Set *node to the repeated field |
| *node = &(*node)->children[0]; |
| } |
| DCHECK((*node)->is_repeated()); |
| |
| if (idx + 1 < path.size()) { |
| if (path[idx + 1] == SchemaPathConstants::ARRAY_POS) { |
| // The next index in 'path' is the artifical position field. |
| DCHECK_EQ(path.size(), idx + 2) << "position field cannot have children!"; |
| *pos_field = true; |
| *node = NULL; |
| return Status::OK(); |
| } else { |
| // The next value in 'path' should be the item index |
| DCHECK_EQ(path[idx + 1], SchemaPathConstants::ARRAY_ITEM); |
| } |
| } |
| return Status::OK(); |
| } |
| |
| // According to the parquet spec, map columns are represented like: |
| // <map-repetition> group <name> (MAP) { |
| // repeated group key_value { |
| // required <key-type> key; |
| // <value-repetition> <value-type> value; |
| // } |
| // } |
| // We ignore any field annotations or names, making us more permissive than the |
| // Parquet spec dictates. See |
| // https://github.com/apache/parquet-format/blob/master/LogicalTypes.md#maps for |
| // more details. |
| Status ParquetSchemaResolver::ResolveMap(const SchemaPath& path, int idx, SchemaNode** node, |
| bool* missing_field) const { |
| if ((*node)->children.size() != 1 || !(*node)->children[0].is_repeated() || |
| (*node)->children[0].children.size() != 2) { |
| ErrorMsg msg(TErrorCode::PARQUET_UNRECOGNIZED_SCHEMA, filename_, |
| PrintSubPath(tbl_desc_, path, idx), "map", (*node)->DebugString()); |
| return Status::Expected(msg); |
| } |
| *node = &(*node)->children[0]; |
| |
| // The next index in 'path' should be the key or the value. |
| if (idx + 1 < path.size()) { |
| DCHECK(path[idx + 1] == SchemaPathConstants::MAP_KEY || |
| path[idx + 1] == SchemaPathConstants::MAP_VALUE); |
| } |
| return Status::OK(); |
| } |
| |
| Status ParquetSchemaResolver::ValidateScalarNode(const SchemaNode& node, |
| const ColumnType& col_type, const SchemaPath& path, int idx) const { |
| if (!node.children.empty()) { |
| ErrorMsg msg(TErrorCode::PARQUET_UNRECOGNIZED_SCHEMA, filename_, |
| PrintSubPath(tbl_desc_, path, idx), col_type.DebugString(), node.DebugString()); |
| return Status::Expected(msg); |
| } |
| parquet::Type::type type = IMPALA_TO_PARQUET_TYPES[col_type.type]; |
| if (type != node.element->type) { |
| ErrorMsg msg(TErrorCode::PARQUET_UNRECOGNIZED_SCHEMA, filename_, |
| PrintSubPath(tbl_desc_, path, idx), col_type.DebugString(), node.DebugString()); |
| return Status::Expected(msg); |
| } |
| return Status::OK(); |
| } |
| |
| } |