blob: 775771eed1c75c5f369cbb85651cd9f7301cc2b1 [file] [log] [blame]
"""
Licensed to the Apache Software Foundation (ASF) under one
or more contributor license agreements. See the NOTICE file
distributed with this work for additional information
regarding copyright ownership. The ASF licenses this file
to you under the Apache License, Version 2.0 (the
"License"); you may not use this file except in compliance
with the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
"""
from collections import defaultdict
from typing import List, Dict, Tuple
from pypaimon.manifest.schema.data_file_meta import DataFileMeta
from pypaimon.manifest.schema.manifest_entry import ManifestEntry
from pypaimon.read.scanner.split_generator import AbstractSplitGenerator
from pypaimon.read.split import Split
from pypaimon.read.sliced_split import SlicedSplit
class AppendTableSplitGenerator(AbstractSplitGenerator):
"""
Split generator for append-only tables.
"""
def create_splits(self, file_entries: List[ManifestEntry]) -> List[Split]:
partitioned_files = defaultdict(list)
for entry in file_entries:
partitioned_files[(tuple(entry.partition.values), entry.bucket)].append(entry)
plan_start_pos = 0
plan_end_pos = 0
if self.start_pos_of_this_subtask is not None:
# shard data range: [plan_start_pos, plan_end_pos)
partitioned_files, plan_start_pos, plan_end_pos = \
self.__filter_by_slice(
partitioned_files,
self.start_pos_of_this_subtask,
self.end_pos_of_this_subtask
)
elif self.idx_of_this_subtask is not None:
partitioned_files, plan_start_pos, plan_end_pos = self._filter_by_shard(partitioned_files)
def weight_func(f: DataFileMeta) -> int:
return max(f.file_size, self.open_file_cost)
splits = []
for key, file_entries_list in partitioned_files.items():
if not file_entries_list:
continue
data_files: List[DataFileMeta] = [e.file for e in file_entries_list]
packed_files: List[List[DataFileMeta]] = self._pack_for_ordered(
data_files, weight_func, self.target_split_size
)
splits += self._build_split_from_pack(
packed_files, file_entries_list, False
)
if self.start_pos_of_this_subtask is not None or self.idx_of_this_subtask is not None:
splits = self._wrap_to_sliced_splits(splits, plan_start_pos, plan_end_pos)
return splits
def _wrap_to_sliced_splits(self, splits: List[Split], plan_start_pos: int, plan_end_pos: int) -> List[Split]:
sliced_splits = []
file_end_pos = 0 # end row position of current file in all splits data
for split in splits:
shard_file_idx_map = self.__compute_split_file_idx_map(
plan_start_pos, plan_end_pos, split, file_end_pos
)
file_end_pos = shard_file_idx_map[self.NEXT_POS_KEY]
del shard_file_idx_map[self.NEXT_POS_KEY]
if shard_file_idx_map:
sliced_splits.append(SlicedSplit(split, shard_file_idx_map))
else:
sliced_splits.append(split)
return sliced_splits
@staticmethod
def __filter_by_slice(
partitioned_files: defaultdict,
start_pos: int,
end_pos: int
) -> tuple:
plan_start_pos = 0
plan_end_pos = 0
entry_end_pos = 0 # end row position of current file in all data
splits_start_pos = 0
filtered_partitioned_files = defaultdict(list)
# Iterate through all file entries to find files that overlap with current shard range
for key, file_entries in partitioned_files.items():
filtered_entries = []
for entry in file_entries:
entry_begin_pos = entry_end_pos # Starting row position of current file in all data
entry_end_pos += entry.file.row_count # Update to row position after current file
# If current file is completely after shard range, stop iteration
if entry_begin_pos >= end_pos:
break
# If current file is completely before shard range, skip it
if entry_end_pos <= start_pos:
continue
if entry_begin_pos <= start_pos < entry_end_pos:
splits_start_pos = entry_begin_pos
plan_start_pos = start_pos - entry_begin_pos
# If shard end position is within current file, record relative end position
if entry_begin_pos < end_pos <= entry_end_pos:
plan_end_pos = end_pos - splits_start_pos
# Add files that overlap with shard range to result
filtered_entries.append(entry)
if filtered_entries:
filtered_partitioned_files[key] = filtered_entries
return filtered_partitioned_files, plan_start_pos, plan_end_pos
def _filter_by_shard(self, partitioned_files: defaultdict) -> tuple:
"""
Filter file entries by shard. Only keep the files within the range, which means
that only the starting and ending files need to be further divided subsequently.
"""
# Calculate total rows
total_row = sum(
entry.file.row_count
for file_entries in partitioned_files.values()
for entry in file_entries
)
# Calculate shard range using shared helper
start_pos, end_pos = self._compute_shard_range(total_row)
return self.__filter_by_slice(partitioned_files, start_pos, end_pos)
@staticmethod
def __compute_split_file_idx_map(
plan_start_pos: int,
plan_end_pos: int,
split: Split,
file_end_pos: int
) -> Dict[str, Tuple[int, int]]:
"""
Compute file index map for a split, determining which rows to read from each file.
"""
shard_file_idx_map = {}
for file in split.files:
file_begin_pos = file_end_pos # Starting row position of current file in all data
file_end_pos += file.row_count # Update to row position after current file
# Use shared helper to compute file range
file_range = AppendTableSplitGenerator._compute_file_range(
plan_start_pos, plan_end_pos, file_begin_pos, file.row_count
)
if file_range is not None:
shard_file_idx_map[file.file_name] = file_range
shard_file_idx_map[AppendTableSplitGenerator.NEXT_POS_KEY] = file_end_pos
return shard_file_idx_map