blob: e648684f29fe66d63a82a71e1dcc586226a43111 [file] [log] [blame]
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
import logging
import fastavro
from iceberg.api import FileFormat, Filterable
from iceberg.api.expressions import Expressions, inclusive
from iceberg.api.io import CloseableGroup
from .avro import AvroToIceberg
from .filtered_manifest import FilteredManifest
from .manifest_entry import ManifestEntry, Status
from .partition_spec_parser import PartitionSpecParser
from .schema_parser import SchemaParser
from .table_metadata import TableMetadata
_logger = logging.getLogger(__name__)
class ManifestReader(CloseableGroup, Filterable):
ALL_COLUMNS = ("*",)
CHANGE_COLUMNS = ("file_path", "file_format", "partition", "record_count", "file_size_in_bytes")
@staticmethod
def read(file, spec_lookup=None):
return ManifestReader(file=file, spec_lookup=spec_lookup)
def select(self, columns):
return FilteredManifest(self,
Expressions.always_true(),
Expressions.always_true(),
list(columns),
self.case_sensitive)
def filter_partitions(self, expr):
return FilteredManifest(self,
expr,
Expressions.always_true(),
ManifestReader.ALL_COLUMNS,
self.case_sensitive)
def filter_rows(self, expr):
return FilteredManifest(self,
inclusive(self.spec).project(expr),
expr,
ManifestReader.ALL_COLUMNS,
self.case_sensitive)
@staticmethod
def in_memory(spec, entries):
return ManifestReader(spec=spec, entries=entries)
def __init__(self, file=None, spec=None, metadata=None, schema=None, case_sensitive=True, spec_lookup=None):
self.file = file
self.schema = schema
self.metadata = metadata
self.spec = spec
self._case_sensitive = case_sensitive
self._entries = None
self._avro_rows = list()
self._fo = None
self._avro_reader = None
self._avro_rows = None
if not all([item is not None for item in [self.file, self.metadata, self.spec, self.schema]]):
if self.spec is not None:
self.__init_from_spec()
else:
self.__init_from_file(spec_lookup)
self._adds = None
self._deletes = None
def __init_from_file(self, spec_lookup):
self._fo = self.file.new_fo()
self._avro_reader = fastavro.reader(self._fo)
self.metadata = self._avro_reader.metadata
spec_id = int(self.metadata.get("partition-spec-id", TableMetadata.INITIAL_SPEC_ID))
if spec_lookup is not None:
self.spec = spec_lookup(spec_id)
self.schema = self.spec.schema
else:
self.schema = SchemaParser.from_json(self.metadata.get("schema"))
self.spec = PartitionSpecParser.from_json_fields(self.schema, spec_id, self.metadata.get("partition-spec"))
def __init_from_spec(self):
self.metadata = dict()
self.schema = self.spec.schema
def case_sensitive(self, case_sensitive):
return ManifestReader(file=self.file, metadata=self.metadata, spec=self.spec,
schema=self.schema, case_sensitive=case_sensitive)
def cache_changes(self):
adds = list()
deletes = list()
for entry in self.entries(ManifestReader.CHANGE_COLUMNS):
if entry.status == "ADDED":
adds.append(entry.copy())
elif entry.status == "DELETED":
deletes.append(entry.copy())
self._adds = adds
self._deletes = deletes
def added_files(self):
if self._adds is None:
self.cache_changes()
return self._adds
def deleted_files(self):
if self._adds is None:
self.cache_changes()
return self._deletes
def entries(self, columns=None):
if columns is None:
columns = ManifestReader.ALL_COLUMNS
file_format = FileFormat.from_file_name(self.file.location())
if file_format is None:
raise RuntimeError("Unable to determine format of manifest: %s" % self.file)
proj_schema = ManifestEntry.project_schema(self.spec.partition_type(), columns)
if self._entries is None:
if file_format is FileFormat.AVRO:
self._entries = list()
for read_entry in AvroToIceberg.read_avro_row(proj_schema, self._avro_reader):
entry = ManifestEntry(schema=proj_schema, partition_type=self.spec.partition_type())
for i, key in enumerate(read_entry.keys()):
entry.put(i, read_entry[key])
self._entries.append(entry)
self._fo.close()
self._avro_reader = None
return self._entries
def iterator(self, part_filter=None, columns=None):
if part_filter is None and columns is None:
return self.iterator(Expressions.always_true(), Filterable.ALL_COLUMNS)
return [entry.file for entry in self.entries() if entry.status != Status.DELETED]