blob: 6d8c7b001c349c5aeec1c2fdcc97d200f4d1da7b [file] [log] [blame]
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
import itertools
import logging
from multiprocessing import cpu_count
from multiprocessing.dummy import Pool
from iceberg.api.expressions import (InclusiveManifestEvaluator,
ResidualEvaluator)
from .base_file_scan_task import BaseFileScanTask
from .base_table_scan import BaseTableScan
from .manifest_reader import ManifestReader
from .partition_spec_parser import PartitionSpecParser
from .schema_parser import SchemaParser
from .table_properties import TableProperties
from .util import SCAN_THREAD_POOL_ENABLED, WORKER_THREAD_POOL_SIZE_PROP
_logger = logging.getLogger(__name__)
class DataTableScan(BaseTableScan):
SNAPSHOT_COLUMNS = ("snapshot_id", "file_path", "file_ordinal", "file_format", "block_size_in_bytes",
"file_size_in_bytes", "record_count", "partition", "value_counts", "null_value_counts",
"lower_bounds", "upper_bounds")
def __init__(self, ops, table, schema=None, snapshot_id=None, row_filter=None,
case_sensitive=True, selected_columns=None, options=None, minused_cols=None):
super(DataTableScan, self).__init__(ops, table, schema if schema is not None else table.schema(),
snapshot_id=snapshot_id, row_filter=row_filter,
case_sensitive=case_sensitive, selected_columns=selected_columns,
options=options, minused_cols=minused_cols)
self._cached_evaluators = dict()
def new_refined_scan(self, ops, table, schema, snapshot_id=None, row_filter=None, case_sensitive=None,
selected_columns=None, options=None, minused_cols=None):
return DataTableScan(ops, table, schema,
snapshot_id=snapshot_id, row_filter=row_filter, case_sensitive=case_sensitive,
selected_columns=selected_columns, options=options, minused_cols=minused_cols)
def plan_files(self, ops=None, snapshot=None, row_filter=None):
if all(i is None for i in [ops, snapshot, row_filter]):
return super(DataTableScan, self).plan_files()
matching_manifests = [manifest for manifest in snapshot.manifests
if self.cache_loader(manifest.spec_id).eval(manifest)]
if self.ops.conf.get(SCAN_THREAD_POOL_ENABLED):
with Pool(self.ops.conf.get(WORKER_THREAD_POOL_SIZE_PROP,
cpu_count())) as reader_scan_pool:
return itertools.chain.from_iterable([scan for scan
in reader_scan_pool.map(self.get_scans_for_manifest,
matching_manifests)])
else:
return itertools.chain.from_iterable([self.get_scans_for_manifest(manifest)
for manifest in matching_manifests])
def cache_loader(self, spec_id):
spec = self.ops.current().spec_id(spec_id)
return InclusiveManifestEvaluator(spec, self.row_filter)
def get_scans_for_manifest(self, manifest):
from .filesystem import FileSystemInputFile
input_file = FileSystemInputFile.from_location(manifest.manifest_path, self.ops.conf)
reader = ManifestReader.read(input_file)
schema_str = SchemaParser.to_json(reader.spec.schema)
spec_str = PartitionSpecParser.to_json(reader.spec)
residuals = ResidualEvaluator(reader.spec, self.row_filter)
return [BaseFileScanTask(file, schema_str, spec_str, residuals)
for file in reader.filter_rows(self.row_filter).select(BaseTableScan.SNAPSHOT_COLUMNS).iterator()]
def target_split_size(self, ops):
scan_split_size_str = self.options.get(TableProperties.SPLIT_SIZE)
if scan_split_size_str is not None:
try:
return int(scan_split_size_str)
except ValueError:
_logger.warning("Invalid %s option: %s" % (TableProperties.SPLIT_SIZE, scan_split_size_str))
return int(self.ops.current().properties.get(TableProperties.SPLIT_SIZE, TableProperties.SPLIT_SIZE_DEFAULT))