blob: fde4e36fa786e1441c2e339426f8ef37979dc655 [file] [log] [blame]
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
from enum import Enum
from iceberg.api import (DataFile,
FileFormat,
Metrics,
Schema)
from iceberg.api.types import (IntegerType,
LongType,
NestedField)
from iceberg.core.avro import IcebergToAvro
from .generic_data_file import GenericDataFile
from .partition_data import PartitionData
class ManifestEntry():
AVRO_NAME = "manifest_entry"
def __init__(self, schema=None, partition_type=None, to_copy=None):
self.schema = schema
self.snapshot_id = 0
self.file = None
self.status = Status.EXISTING
if self.schema is None and partition_type is not None:
self.schema = IcebergToAvro.type_to_schema(ManifestEntry.get_schema(partition_type))
elif self.schema is None and partition_type is None and to_copy is not None:
self.schema = to_copy.schema
self.status = to_copy.status
self.snapshot_id = to_copy.snapshot_id
self.file = to_copy.file.copy()
if self.schema is None:
raise RuntimeError("Invalid arguments schema=%s, partition_type=%s, to_copy=%s" % (self.schema,
partition_type,
to_copy))
def wrap_existing(self, snapshot_id, file):
self.status = Status.EXISTING
self.snapshot_id = snapshot_id
self.file = file
return self
def wrap_append(self, snapshot_id, file):
self.status = Status.ADDED
self.snapshot_id = snapshot_id
self.file = file
return self
def wrap_delete(self, snapshot_id, file):
self.status = Status.DELETED
self.snapshot_id = snapshot_id
self.file = file
return self
def copy(self):
return ManifestEntry(to_copy=self)
def put(self, i, v):
if i == 0:
self.status = Status.from_id(v)
elif i == 1:
self.snapshot_id = v
elif i == 2:
if isinstance(v, dict):
metrics = Metrics(row_count=v.get("record_count"),
column_sizes=v.get("column_sizes"),
value_counts=v.get("value_counts"),
null_value_counts=v.get("null_value_counts"),
lower_bounds=v.get("lower_bounds"),
upper_bounds=v.get("upper_bounds"))
data_file_schema = self.schema.as_struct().field(name="data_file")
part_data = PartitionData.from_json(data_file_schema
.type
.field(name="partition").type, v.get("partition"))
v = GenericDataFile(v.get("file_path"),
FileFormat[v.get("file_format")],
v.get("file_size_in_bytes"),
v.get("block_size_in_byte"),
row_count=v.get("record_count"),
partition=part_data,
metrics=metrics
)
self.file = v
def get(self, i):
if i == 0:
return self.status.value
elif i == 1:
return self.snapshot_id
elif i == 2:
return self.file
def __repr__(self):
return "ManifestEntry(status=%s, snapshot_id=%s, file=%s" % (self.status, self.snapshot_id, self.file)
def __str__(self):
return self.__repr__()
@staticmethod
def project_schema(part_type, columns):
return ManifestEntry.wrap_file_schema(Schema(DataFile.get_type(part_type).fields)
.select(columns)
.as_struct())
@staticmethod
def get_schema(partition_type):
return ManifestEntry.wrap_file_schema(DataFile.get_type(partition_type))
@staticmethod
def wrap_file_schema(file_struct):
return Schema(NestedField.required(0, "status", IntegerType.get()),
NestedField.required(1, "snapshot_id", LongType.get()),
NestedField.required(2, "data_file", file_struct))
class Status(Enum):
EXISTING = 0
ADDED = 1
DELETED = 2
@staticmethod
def from_id(id):
return Status(id)