blob: cef872f7ea9a432935a413a07c714aef10b9f095 [file] [log] [blame]
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
from copy import deepcopy
import functools
from iceberg.api import DataOperations
from iceberg.api.expressions import Expressions, Literal, Operation, UnboundPredicate
from iceberg.api.types import TimestampType
from .manifest_group import ManifestGroup
from .util import str_as_bool
TIMESTAMP_RANGE_MAP = {Operation.LT: lambda min_val, max_val, val: (min_val, val - 1 if val - 1 < max_val else max_val),
Operation.LT_EQ: lambda min_val, max_val, val: (min_val, val if val < max_val else max_val),
Operation.GT: lambda min_val, max_val, val: (val - 1 if val - 1 > min_val else min_val, max_val),
Operation.GT_EQ: lambda min_val, max_val, val: (val if val > min_val else min_val, max_val),
Operation.EQ: lambda min_val, max_val, val: (val if val > min_val else min_val,
val if val < max_val else max_val)}
class ScanSummary(object):
IGNORED_OPERATIONS = {DataOperations.DELETE, DataOperations.REPLACE}
SCAN_SUMMARY_COLUMNS = ["partition", "record_count", "file_size_in_bytes"]
class ScanSummaryBuilder(object):
TIMESTAMP_NAMES = {"dateCreated", "lastUpdated"}
def __init__(self, scan):
self.scan = scan
self.table = scan.table
self.ops = self.table.ops
self.snapshot_timestamps = {snap.snapshot_id: snap.timestamp_millis
for snap in self.table.snapshots()}
self._throw_if_limited = False
self.force_use_manifests = False
self._limit = 2**31 - 1
self.time_filters = list()
def add_timestamp_filter(self, filter):
self.throw_if_limited()
self.time_filters.append(filter)
return self
def after(self, timestamp):
self.add_timestamp_expression(timestamp, Expressions.greater_than_or_equal)
return self
def before(self, timestamp):
self.add_timestamp_expression(timestamp, Expressions.less_than_or_equal)
return self
def add_timestamp_expression(self, timestamp, expr_func):
if isinstance(timestamp, str):
timestamp = Literal.of(timestamp).to(TimestampType.without_timezone()).value / 1000
self.add_timestamp_filter(expr_func("timestamp_ms", timestamp))
return self
def throw_if_limited(self):
self._throw_if_limited = True
return self
def limit(self, num_partitions):
self._limit = num_partitions
return self
def use_manifests(self):
self.force_use_manifests = True
return self
def remove_time_filters(self, expressions, expression):
if expression.op == Operation.AND:
self.remove_time_filters(expressions, expression.left)
self.remove_time_filters(expressions, expression.right)
return
elif isinstance(expression, UnboundPredicate):
pred = expression
ref = pred.ref
lit = pred.lit
if ref.name in ScanSummaryBuilder.TIMESTAMP_NAMES:
ts_literal = lit.to(TimestampType.without_timezone())
millis = ScanSummaryBuilder.to_millis(ts_literal.value)
self.add_timestamp_filter(Expressions.predicate(pred.op, "timestamp_ms", millis))
return
expressions.append(expression)
def build(self):
if self.table.current_snapshot() is None:
return dict()
filters = list()
self.remove_time_filters(filters, Expressions.rewrite_not(self.scan.row_filter))
row_filter = self.join_filters(filters)
if len(self.time_filters) == 0:
return self.from_manifest_scan(self.table.current_snapshot().manifests, row_filter)
min_timestamp, max_timestamp = self.timestamp_range(self.time_filters)
oldest_snapshot = self.table.current_snapshot()
for key, val in self.snapshot_timestamps.items():
if val < oldest_snapshot.timestamp_millis:
oldest_snapshot = self.ops.current().snapshot(key)
# if oldest known snapshot is in the range, then there may be an expired snapshot that has
# been removed that matched the range. because the timestamp of that snapshot is unknown,
# it can't be included in the results and the results are not reliable."""
if oldest_snapshot.timestamp_millis >= min_timestamp and oldest_snapshot <= max_timestamp:
raise RuntimeError("Cannot satisfy time filters: time range may include expired snapshots")
snapshots = [snapshot for snapshot in ScanSummaryBuilder.snapshots_in_time_range(self.ops.current(),
min_timestamp,
max_timestamp)
if snapshot.operation not in ScanSummary.IGNORED_OPERATIONS]
result = self.from_partition_summaries(snapshots)
if result is not None and not self.force_use_manifests:
return result
# filter down to the the set of manifest files that were created in the time range, ignoring
# the snapshots created by delete or replace operations. this is complete because it finds
# files in the snapshot where they were added to the dataset in either an append or an
# overwrite. if those files are later compacted with a replace or deleted, those changes are
# ignored.
manifests_to_scan = list()
snapshot_ids = set()
for snap in snapshots:
snapshot_ids.add(snap)
for manifest in snap.manifests:
if manifest.snapshot_id is not None or manifest.snapshot_id == snap.snapshot_id:
manifests_to_scan.append(manifest)
return self.from_manifest_scan(manifests_to_scan, row_filter, True)
def from_manifest_scan(self, manifests, row_filter, ignore_existing=False):
top_n = TopN(self._limit, self._throw_if_limited, lambda x, y: 0 if x == y else -1 if x < y else 1)
entries = (ManifestGroup(self.ops, manifests)
.filter_data(row_filter)
.ignore_deleted()
.ignore_existing(ignore_existing)
.select(ScanSummary.SCAN_SUMMARY_COLUMNS)
.entries())
spec = self.table.spec()
for entry in entries:
timestamp = self.snapshot_timestamps.get(entry.snapshot_id)
partition = spec.partition_to_path(entry.file.partition())
top_n.update(partition,
lambda metrics: ((metrics if metrics is not None else PartitionMetrics())
.update_from_file(entry.file, timestamp)))
return top_n.get()
def from_partition_summaries(self, snapshots):
# try to build the result from snapshot metadata, but fall back if:
# any snapshot has no summary
# any snapshot has
top_n = TopN(self._limit, self._throw_if_limited, lambda x, y: 0 if x == y else -1 if x < y else 1)
for snap in snapshots:
if snap.operation is None or snap.summary is None \
or str_as_bool(snap.summary.get(SnapshotSummary.PARTITION_SUMMARY_PROP, "false")):
return None
for key, _ in snap.summary.items():
if key.startswith(SnapshotSummary.CHANGED_PARTITION_PREFIX):
part_key = key[len(SnapshotSummary.CHANGED_PARTITION_PREFIX):]
# part = dict(entry.split("=") for entry in val.split(","))
# UPDATE THIS BEFORE FINISHING
added_files = 0
added_records = 0
added_size = 0
top_n.update(part_key,
lambda metrics: ((PartitionMetrics() if metrics is None else metrics)
.update_from_counts(added_files,
added_records,
added_size,
snap.timestamp_millis)))
return top_n.get()
@staticmethod
def snapshots_in_time_range(meta, min_ts, max_ts):
snapshots = []
current = meta.current_snapshot()
while current is not None and current.timestamp_millis >= min_ts:
current = meta.snapshot(current.parent_id)
if current.timestamp_millis <= max_ts:
snapshots.add(current)
snapshots.reverse()
return snapshots
@staticmethod
def timestamp_range(time_filters):
min_timestamp = float('-inf')
max_timestamp = float('inf')
for pred in time_filters:
value = pred.lit.val
try:
min_timestamp, max_timestamp = TIMESTAMP_RANGE_MAP[pred.op](min_timestamp, max_timestamp, value)
except KeyError:
raise RuntimeError("Cannot filter timestamps using predicate: %s" % pred)
if max_timestamp < min_timestamp:
raise RuntimeError("No timestamps can match filters: %s" % ", ".join([str(pred)
for pred in time_filters]))
return min_timestamp, max_timestamp
@staticmethod
def join_filters(expressions):
result = Expressions.always_true()
for expression in expressions:
result = Expressions.and_(result, expression)
return result
@staticmethod
def to_millis(timestamp):
if timestamp < 10000000000:
# in seconds
return timestamp * 1000
elif timestamp < 10000000000000:
# in millis
return timestamp
# in micros
return timestamp / 1000
class TopN(object):
def __init__(self, N, throw_if_limited, key_comparator):
self.max_size = N
self.throw_if_limited = throw_if_limited
self.map = dict()
self.key_comparator = key_comparator
self.cut = None
def update(self, key, update_func):
if self.cut is not None and self.key_comparator(self.cut, key) <= 0:
return
self.map[key] = update_func(self.map.get(key))
while len(map.keys()) > self.max_size:
if self.throw_if_limited:
raise RuntimeError("Too many matching keys: more than %s" % self.max_size)
self.cut = sorted(self.map, key=functools.cmp_to_key(self.key_comparator))[-1]
del self.map[self.cut]
def get(self):
return deepcopy(self.map)
class PartitionMetrics(object):
def __init__(self):
self.file_count = 0
self.record_count = 0
self.total_size = 0
self.data_timestamp_millis = None
def update_from_counts(self, file_count, record_count, files_size, timestamp_millis):
self.file_count += file_count
self.record_count += record_count
self.total_size += files_size
if self.data_timestamp_millis is None or self.data_timestamp_millis < timestamp_millis:
self.data_timestamp_millis = timestamp_millis
return self
def update_from_file(self, file, timestamp_millis):
self.file_count += 1
self.record_count += file.record_count()
self.total_size += file.files_size_in_bytes()
if self.data_timestamp_millis is None or self.data_timestamp_millis < timestamp_millis:
self.data_timestamp_millis = timestamp_millis
return self
def __repr__(self):
items = ("%s=%r" % (k, v) for k, v in self.__dict__.items())
return "%s(%s)" % (self.__class__.__name__, ','.join(items))
def __str__(self):
return self.__repr__()
class SnapshotSummary(object):
GENIE_ID_PROP = "genie-id"
ADDED_FILES_PROP = "added-data-files"
DELETED_FILES_PROP = "deleted-data-files"
TOTAL_FILES_PROP = "total-data-files"
ADDED_RECORDS_PROP = "added-records"
DELETED_RECORDS_PROP = "deleted-records"
TOTAL_RECORDS_PROP = "total-records"
ADDED_FILE_SIZE_PROP = "added-files-size"
DELETED_DUPLICATE_FILES = "deleted-duplicate-files"
CHANGED_PARTITION_COUNT_PROP = "changed-partition-count"
CHANGED_PARTITION_PREFIX = "partitions."
PARTITION_SUMMARY_PROP = "partition-summaries-included"
def __init__(self):
pass
class SnapshotSummaryBuilder(object):
def __init__(self):
self.changed_partitions = dict()
self.added_files = 0
self.deleted_files = 0
self.deleted_dupicate_files = 0
self.added_records = 0
self.deleted_records = 0
self.properties = dict()
def clear(self):
self.changed_partitions = dict()
self.added_files = 0
self.deleted_files = 0
self.deleted_dupicate_files = 0
self.added_records = 0
self.deleted_records = 0
def increment_duplicate_deletes(self):
self.deleted_dupicate_files += 1
def deleted_file(self, spec, data_file):
self.update_partitiomns(spec, data_file, False)
self.deleted_files += 1
self.deleted_records += data_file.record_count
def added_file(self, spec, data_file):
self.update_partitiomns(spec, data_file, True)
self.added_files += 1
self.added_records += data_file.record_count
def update_partitions(self, spec, file, is_addition):
key = spec.partition_to_path(file.partition())
metrics = self.changed_partitions.get(key, PartitionMetrics())
if is_addition:
self.changed_partitions[key] = metrics.update_from_file(file, None)
def build(self):
builder = dict()
builder.update(self.properties)
SnapshotSummaryBuilder.set_if(self.added_files > 0, builder,
SnapshotSummary.ADDED_FILES_PROP, self.added_files)
SnapshotSummaryBuilder.set_if(self.deleted_files > 0,
builder, SnapshotSummary.DELETED_FILES_PROP, self.deleted_files)
SnapshotSummaryBuilder.set_if(self.deleted_dupicate_files > 0,
builder, SnapshotSummary.DELETED_DUPLICATE_FILES, self.deleted_dupicate_files)
SnapshotSummaryBuilder.set_if(self.added_records > 0,
builder, SnapshotSummary.ADDED_RECORDS_PROP, self.added_records)
SnapshotSummaryBuilder.set_if(self.deleted_records > 0,
builder, SnapshotSummary.DELETED_RECORDS_PROP, self.deleted_records)
builder[SnapshotSummary.CHANGED_PARTITION_COUNT_PROP] = len(self.changed_partitions.items())
if len(self.changed_partitions.items()) < 100:
builder[SnapshotSummary.PARTITION_SUMMARY_PROP] = "true"
for key, metrics in self.changed_partitions:
metric_dict = {SnapshotSummary.ADDED_FILES_PROP: metrics.file_count,
SnapshotSummary.ADDED_RECORDS_PROP: metrics.record_count,
SnapshotSummary.ADDED_FILE_SIZE_PROP: metrics.total_size}
builder[SnapshotSummary.CHANGED_PARTITION_PREFIX + key] = ",".join(["{}={}".format(key, val)
for inner_key, val
in metric_dict.items()])
return builder
def set(self, prop, value):
self.properties[prop] = value
@staticmethod
def set_if(expression, builder, prop, value):
if expression:
builder[prop] = value