blob: f36fcf86f742b6dedb039c6d75ace8bb9587ace5 [file] [log] [blame]
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
from iceberg.api import FileScanTask
from .partition_spec_parser import PartitionSpecParser
from .schema_parser import SchemaParser
class BaseFileScanTask(FileScanTask):
def __init__(self, file, schema_str, spec_str, residuals):
self._file = file
self._schema_str = schema_str
self._spec_str = spec_str
self._spec = None
self._residuals = residuals
@property
def file(self):
return self._file
@property
def spec(self):
if self._spec is None:
self._spec = PartitionSpecParser.from_json(SchemaParser.from_json(self._schema_str), self._spec_str)
return self._spec
@property
def start(self):
return 0
@property
def length(self):
return self._file.file_size_in_bytes()
@property
def residual(self):
return self._residuals.residual_for(self._file.partition())
def split(self, split_size):
if self.file.format().is_splittable():
return [task for task in SplitScanTaskIterator(split_size, self)]
else:
return self
def __repr__(self):
fields = ["file: {}".format(self._file.path()),
"partition_data: {}".format(self._file.partition()),
"residual: {}".format(self.residual)]
return "BaseFileScanTask({})".format(", ".join(fields))
def __str__(self):
return self.__repr__()
class SplitScanTaskIterator(object):
def __init__(self, split_size, file_scan_task):
self._offset = 0
self._remaining_len = file_scan_task.length
self._split_size = split_size
self._file_scan_task = file_scan_task
def has_next(self):
return self._remaining_len > 0
def __iter__(self):
return self
def __next__(self):
if self.has_next():
len = min(self._split_size, self._remaining_len)
split_task = SplitScanTask(self._offset, len, self._file_scan_task)
self._offset += len
self._remaining_len -= len
return split_task
raise StopIteration
class SplitScanTask(FileScanTask):
def __init__(self, offset, len, file_scan_task):
self._offset = offset
self._len = len
self._file_scan_task = file_scan_task
@property
def file(self):
return self._file_scan_task.file
@property
def spec(self):
return self._file_scan_task.spec
@property
def start(self):
return self._offset
@property
def length(self):
return self._len
@property
def residual(self):
return self._file_scan_task.residual()
def split(self):
raise RuntimeError("Cannot split a task which is already split")