blob: 674d02db0c2b741d663309f64dfc5fae9ebe7c5b [file] [log] [blame]
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
import logging
from iceberg.api import Schema
from iceberg.api.types import (BinaryType,
BooleanType,
DateType,
DecimalType,
DoubleType,
FixedType,
FloatType,
IntegerType,
ListType,
LongType,
MapType,
NestedField,
StringType,
StructType,
TimestampType)
from iceberg.api.types import Type
import pyarrow as pa
from pyarrow.parquet import lib, ParquetFile
_logger = logging.getLogger(__name__)
arrow_type_map = {lib.Type_BOOL: lambda x=None: BooleanType.get(),
lib.Type_DATE32: lambda x=None: DateType.get(),
lib.Type_DECIMAL: lambda x=None: DecimalType.of(x.precision, x.scale),
lib.Type_DOUBLE: lambda x=None: DoubleType.get(),
lib.Type_FIXED_SIZE_BINARY: lambda x=None: FixedType.of_length(x.byte_width),
lib.Type_BINARY: lambda x=None: BinaryType.get(),
lib.Type_FLOAT: lambda x=None: FloatType.get(),
lib.Type_STRING: lambda x=None: StringType.get(),
lib.Type_INT32: lambda x=None: IntegerType.get(),
lib.Type_INT64: lambda x=None: LongType.get(),
lib.Type_TIMESTAMP: lambda x=None: (TimestampType.without_timezone()
if x.tz is None
else TimestampType.with_timezone())
}
def get_nested_field(field_id: int, field_name: str, field_type: Type, nullable: bool) -> NestedField:
if nullable:
return NestedField.optional(field_id, field_name, field_type)
else:
return NestedField.required(field_id, field_name, field_type)
def get_list_field(col_type: pa.ListType) -> ListType:
if col_type.value_field.nullable:
return ListType.of_optional(int(col_type.value_field.metadata[b"PARQUET:field_id"].decode("utf-8")),
arrow_type_map[col_type.value_field.type.id](col_type.value_field.type))
else:
return ListType.of_required(col_type.value_field.metadata[b"PARQUET:field_id"].decode("utf-8"),
arrow_type_map[col_type.value_field.type.id](col_type.value_field.type))
def get_field(col: pa.Field) -> NestedField:
try:
return get_nested_field(int(col.metadata[b"PARQUET:field_id"].decode("utf-8")),
col.name,
arrow_type_map[col.type.id](col.type),
col.nullable)
except KeyError:
_logger.error(f"\t{int(col.metadata[b'PARQUET:field_id'].decode('utf-8'))}: {col.type}")
raise
def get_struct_field(col_type: pa.StructType) -> StructType:
if col_type.num_fields > 0:
if col_type[0].name == "map":
return get_inferred_map(col_type)
else:
return StructType.of([get_field(child) for child in col_type])
else:
raise RuntimeError("Unable to convert type to iceberg %s" % col_type)
def get_inferred_map(col_type: pa.StructType):
base_field = col_type[0]
key_field = get_field(col_type[0].type.value_field.type[0])
value_field = get_field(col_type[0].type.value_field.type[1])
if base_field.nullable:
return MapType.of_optional(key_field.field_id, value_field.field_id,
key_field.type, value_field.type)
else:
return MapType.of_required(key_field.field_id, value_field.field_id,
key_field.type, value_field.type)
def get_map_field(col):
# Spark Map type gets serialized to Struct<List<Struct<>>>
raise NotImplementedError("Arrow Map type not implemented yet")
# adding function mappings for nested types
arrow_type_map.update({lib.Type_LIST: get_list_field,
lib.Type_MAP: get_map_field,
lib.Type_STRUCT: get_struct_field})
def convert_parquet_to_iceberg(parquet_file: ParquetFile) -> Schema: # noqa: ignore=C901
"""
Given two Iceberg schema's returns a list of column_names for all id's in the
file schema that are projected in the expected schema
Parameters
----------
parquet_file : pyarrow.parquet.ParquetFile
The Parquet File to use to extract the iceberg schema
Returns
-------
iceberg.api.Schema
returns an equivalent iceberg Schema based on the arrow schema read from the file
"""
return arrow_to_iceberg(parquet_file.schema_arrow)
def arrow_to_iceberg(arrow_schema: pa.Schema) -> Schema:
"""
Use an arrow schema, which contains the field_id metadata, to create an equivalent iceberg Schema
Parameters
----------
arrow_schema : pyarrow.Schema
An Arrow schema with the parquet field_id metadata
Returns
-------
iceberg.api.Schema
returns an equivalent iceberg Schema based on the arrow schema read from the file
"""
return Schema([get_field(col) for col in arrow_schema])