blob: 0728546961b12cd3cd1dd41a295ed7134329978e [file] [log] [blame]
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
from datetime import datetime
import decimal
import logging
import typing
from iceberg.api import Schema
from iceberg.api.expressions import Expression
from iceberg.api.io import InputFile
from iceberg.api.types import NestedField, Type, TypeID
from iceberg.core.filesystem import FileSystem, LocalFileSystem, S3FileSystem
from iceberg.core.util.profile import profile
from iceberg.exceptions import FileSystemNotFound, InvalidCastException
import numpy as np
import pyarrow as pa
from pyarrow import fs
import pyarrow.dataset as ds
import pyarrow.parquet as pq
from .dataset_utils import get_dataset_filter
from .parquet_schema_utils import prune_columns
from .parquet_to_iceberg import convert_parquet_to_iceberg
_logger = logging.getLogger(__name__)
DTYPE_MAP: typing.Dict[TypeID,
typing.Callable[[NestedField], typing.Tuple[pa.Field, typing.Any]]] = \
{TypeID.BINARY: lambda field: pa.binary(),
TypeID.BOOLEAN: lambda field: (pa.bool_(), False),
TypeID.DATE: lambda field: (pa.date32(), datetime.now()),
TypeID.DECIMAL: lambda field: (pa.decimal128(field.type.precision, field.type.scale),
decimal.Decimal()),
TypeID.DOUBLE: lambda field: (pa.float64(), np.nan),
TypeID.FIXED: lambda field: pa.binary(field.length),
TypeID.FLOAT: lambda field: (pa.float32(), np.nan),
TypeID.INTEGER: lambda field: (pa.int32(), np.nan),
TypeID.LIST: lambda field: (pa.list_(pa.field("element",
DTYPE_MAP[field.type.element_type.type_id](field.type)[0])),
None),
TypeID.LONG: lambda field: (pa.int64(), np.nan),
# To-Do: update to support reading map fields
# TypeID.MAP: lambda field: (,),
TypeID.STRING: lambda field: (pa.string(), ""),
TypeID.STRUCT: lambda field: (pa.struct([(nested_field.name,
DTYPE_MAP[nested_field.type.type_id](nested_field.type)[0])
for nested_field in field.type.fields]), {}),
TypeID.TIMESTAMP: lambda field: (pa.timestamp("us"), datetime.now()),
# not used in SPARK, so not implementing for now
# TypeID.TIME: pa.time64(None)
}
FS_MAP: typing.Dict[typing.Type[FileSystem], typing.Type[fs.FileSystem]] = {LocalFileSystem: fs.LocalFileSystem}
try:
FS_MAP[S3FileSystem] = fs.S3FileSystem
except ImportError:
_logger.warning("Mapped filesystem not available")
class ParquetReader(object):
def __init__(self, input: InputFile, expected_schema: Schema, options, filter_expr: Expression,
case_sensitive: bool, start: int = None, end: int = None):
self._stats: typing.Dict[str, int] = dict()
self._input = input
self._input_fo = input.new_fo()
self._arrow_file = pq.ParquetFile(self._input_fo)
self._file_schema = convert_parquet_to_iceberg(self._arrow_file)
self._expected_schema = expected_schema
self._file_to_expected_name_map = ParquetReader.get_field_map(self._file_schema,
self._expected_schema)
self._options = options
self._filter = get_dataset_filter(filter_expr, ParquetReader.get_reverse_field_map(self._file_schema,
self._expected_schema))
self._case_sensitive = case_sensitive
if start is not None or end is not None:
raise NotImplementedError("Partial file reads are not yet supported")
# self.start = start
# self.end = end
self.materialized_table = False
self._table = None
_logger.debug("Reader initialized for %s" % self._input.path)
@property
def stats(self) -> typing.Dict[str, int]:
return dict(self._stats)
def read(self, force=False) -> pa.Table:
if not self.materialized_table or force:
self._read_data()
return self._table
def _read_data(self) -> None:
_logger.debug("Starting data read")
# only scan the columns projected and in our file
cols_to_read = prune_columns(self._file_schema, self._expected_schema)
with profile("read data", self._stats):
try:
read_fs = FS_MAP[type(self._input.fs)]
except KeyError:
raise FileSystemNotFound(f"No mapped filesystem found for {type(self._input.fs)}")
arrow_dataset = ds.FileSystemDataset.from_paths([self._input.location()],
schema=self._arrow_file.schema_arrow,
format=ds.ParquetFileFormat(),
filesystem=read_fs())
arrow_table = arrow_dataset.to_table(columns=cols_to_read, filter=self._filter)
# process schema evolution if needed
with profile("schema_evol_proc", self._stats):
processed_tbl = self.migrate_schema(arrow_table)
for i, field in self.get_missing_fields():
dtype_func = DTYPE_MAP.get(field.type.type_id)
if dtype_func is None:
raise RuntimeError("Unable to create null column for type %s" % field.type.type_id)
dtype = dtype_func(field)
processed_tbl = (processed_tbl.add_column(i,
pa.field(field.name, dtype[0], True, None),
ParquetReader.create_null_column(processed_tbl[0],
dtype)))
self._table = processed_tbl
self.materialized_table = True
def __enter__(self):
return self
def __exit__(self, exc_type, exc_val, exc_tb):
_logger.debug(self._stats)
self.close()
def close(self):
self._input_fo.close()
def get_missing_fields(self) -> typing.List[typing.Tuple[int, NestedField]]:
return [(i, field) for i, field in enumerate(self._expected_schema.as_struct().fields)
if self._file_schema.find_field(field.id) is None]
@staticmethod
def get_field_map(file_schema, expected_schema) -> typing.Dict[str, str]:
return {file_schema.find_field(field.id).name: field.name
for field in expected_schema.as_struct().fields
if file_schema.find_field(field.id) is not None}
@staticmethod
def get_reverse_field_map(file_schema, expected_schema) -> typing.Dict[str, str]:
return {expected_schema.find_field(field.id).name: field.name
for field in file_schema.as_struct().fields
if expected_schema.find_field(field.id) is not None}
def migrate_schema(self, table: pa.Table) -> pa.Table:
data_arrays: typing.List[pa.ChunkedArray] = []
schema: typing.List[pa.Field] = []
for key, value in self._file_to_expected_name_map.items():
column_idx: int = table.schema.get_field_index(key)
column_field: pa.Field = table.schema[column_idx]
column_arrow_type: pa.DataType = column_field.type
column_data: pa.ChunkedArray = table[column_idx]
iceberg_field: NestedField = self._expected_schema.find_field(value)
converted_field: NestedField = self._file_schema.find_field(key)
if iceberg_field.type != converted_field.type:
if not ParquetReader.is_supported_cast(converted_field.type, iceberg_field.type):
_logger.error(f"unsupported cast {converted_field.type} -> {iceberg_field.type}")
raise InvalidCastException("")
try:
column_arrow_type = DTYPE_MAP[iceberg_field.type.type_id](iceberg_field)[0]
column_data = column_data.cast(column_arrow_type)
except KeyError:
_logger.error(f"Unable to map {iceberg_field.type} to an arrow type")
raise
data_arrays.append(column_data)
schema.append(pa.field(value, column_arrow_type, column_field.nullable, column_field.metadata))
return pa.table(data_arrays, schema=pa.schema(schema))
@staticmethod
def create_null_column(reference_column: pa.ChunkedArray, dtype_tuple: typing.Tuple[pa.DataType, typing.Any]) -> pa.ChunkedArray:
dtype, init_val = dtype_tuple
return pa.chunked_array([pa.array(np.full(len(c), init_val),
type=dtype,
mask=np.array([True] * len(reference_column.chunks[0]), dtype="bool"))
for c in reference_column.chunks], type=dtype)
@staticmethod
def is_supported_cast(old_type: Type, new_type: Type) -> bool:
if old_type.type_id == TypeID.INTEGER and new_type.type_id == TypeID.LONG:
return True
elif old_type.type_id == TypeID.FLOAT and new_type.type_id == TypeID.DOUBLE:
return True
elif old_type.type_id == TypeID.DECIMAL and new_type.type_id == TypeID.DECIMAL \
and old_type.precision < new_type.precision \
and old_type.scale == new_type.scale:
return True
return False