blob: 0c38598c2a0dd3f6ea70b485322beea835541e6f [file] [log] [blame]
################################################################################
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
################################################################################
import heapq
from dataclasses import dataclass
from functools import cmp_to_key
from typing import List, Callable
from pypaimon.pynative.row.binary_row import BinaryRow
from pypaimon.pynative.table.data_file_meta import DataFileMeta
@dataclass
class SortedRun:
"""
A SortedRun is a list of files sorted by their keys.
The key intervals [minKey, maxKey] of these files do not overlap.
"""
files: List[DataFileMeta]
class IntervalPartition:
"""
Algorithm to partition several data files into the minimum number of SortedRuns.
"""
def __init__(self, input_files: List[DataFileMeta]):
self.files = input_files.copy()
self.key_comparator = default_key_comparator
self.files.sort(key=cmp_to_key(self._compare_files))
def partition(self) -> List[List[SortedRun]]:
result = []
section: List[DataFileMeta] = []
bound = None
for meta in self.files:
if section and self.key_comparator(meta.min_key, bound) > 0:
result.append(self._partition_section(section))
section.clear()
bound = None
section.append(meta)
if bound is None or self.key_comparator(meta.max_key, bound) > 0:
bound = meta.max_key
if section:
result.append(self._partition_section(section))
return result
def _partition_section(self, metas: List[DataFileMeta]) -> List[SortedRun]:
heap: List[HeapRun] = []
first_run = [metas[0]]
heapq.heappush(heap, HeapRun(first_run, self.key_comparator))
for i in range(1, len(metas)):
meta = metas[i]
earliest_finishing_run = heap[0]
last_max_key = earliest_finishing_run.run[-1].max_key
if self.key_comparator(meta.min_key, last_max_key) > 0:
top = heapq.heappop(heap)
top.run.append(meta)
heapq.heappush(heap, top)
else:
new_run = [meta]
heapq.heappush(heap, HeapRun(new_run, self.key_comparator))
return [SortedRun(files=h.run) for h in heap]
def _compare_files(self, f1: DataFileMeta, f2: DataFileMeta) -> int:
min_key_cmp = self.key_comparator(f1.min_key, f2.min_key)
if min_key_cmp != 0:
return min_key_cmp
return self.key_comparator(f1.max_key, f2.max_key)
@dataclass
class HeapRun:
run: List[DataFileMeta]
comparator: Callable[[BinaryRow, BinaryRow], int]
def __lt__(self, other) -> bool:
my_last_max = self.run[-1].max_key
other_last_max = other.run[-1].max_key
return self.comparator(my_last_max, other_last_max) < 0
def default_key_comparator(key1: BinaryRow, key2: BinaryRow) -> int:
if not key1 or not key1.values:
if not key2 or not key2.values:
return 0
return -1
if not key2 or not key2.values:
return 1
min_field_count = min(len(key1.values), len(key2.values))
for i in range(min_field_count):
val1 = key1.values[i]
val2 = key2.values[i]
if val1 is None and val2 is None:
continue
if val1 is None:
return -1
if val2 is None:
return 1
if val1 < val2:
return -1
elif val1 > val2:
return 1
if len(key1.values) < len(key2.values):
return -1
elif len(key1.values) > len(key2.values):
return 1
else:
return 0