blob: e56c568b95a3385d03159b484cee382f39d31917 [file] [log] [blame]
"""
Licensed to the Apache Software Foundation (ASF) under one
or more contributor license agreements. See the NOTICE file
distributed with this work for additional information
regarding copyright ownership. The ASF licenses this file
to you under the Apache License, Version 2.0 (the
"License"); you may not use this file except in compliance
with the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
"""
import os
import time
import logging
from typing import List, Optional, Dict, Set, Callable
logger = logging.getLogger(__name__)
from pypaimon.common.predicate import Predicate
from pypaimon.globalindex import ScoredGlobalIndexResult
from pypaimon.table.source.deletion_file import DeletionFile
from pypaimon.manifest.index_manifest_file import IndexManifestFile
from pypaimon.manifest.manifest_file_manager import ManifestFileManager
from pypaimon.manifest.manifest_list_manager import ManifestListManager
from pypaimon.manifest.schema.manifest_entry import ManifestEntry
from pypaimon.manifest.schema.manifest_file_meta import ManifestFileMeta
from pypaimon.read.plan import Plan
from pypaimon.read.push_down_utils import (
remove_row_id_filter,
trim_and_transform_predicate,
)
from pypaimon.read.scanner.append_table_split_generator import AppendTableSplitGenerator
from pypaimon.read.scanner.data_evolution_split_generator import DataEvolutionSplitGenerator
from pypaimon.read.scanner.primary_key_table_split_generator import PrimaryKeyTableSplitGenerator
from pypaimon.read.split import DataSplit
from pypaimon.snapshot.snapshot_manager import SnapshotManager
from pypaimon.table.bucket_mode import BucketMode
from pypaimon.manifest.simple_stats_evolutions import SimpleStatsEvolutions
def _row_ranges_from_predicate(predicate: Optional[Predicate]) -> Optional[List]:
from pypaimon.utils.range import Range
from pypaimon.table.special_fields import SpecialFields
if predicate is None:
return None
def visit(p: Predicate):
if p.method == 'and':
result = None
for child in p.literals:
sub = visit(child)
if sub is None:
continue
result = Range.and_(result, sub) if result is not None else sub
if not result:
return result
return result
if p.method == 'or':
parts = []
for child in p.literals:
sub = visit(child)
if sub is None:
return None
parts.extend(sub)
if not parts:
return []
return Range.sort_and_merge_overlap(parts, merge=True, adjacent=True)
if p.field != SpecialFields.ROW_ID.name:
return None
if p.method == 'equal':
if not p.literals:
return []
return Range.to_ranges([int(p.literals[0])])
if p.method == 'in':
if not p.literals:
return []
return Range.to_ranges([int(x) for x in p.literals])
if p.method == 'between':
if not p.literals or len(p.literals) < 2:
return []
return [Range(int(p.literals[0]), int(p.literals[1]))]
return None
return visit(predicate)
def _filter_manifest_files_by_row_ranges(
manifest_files: List[ManifestFileMeta],
row_ranges: List) -> List[ManifestFileMeta]:
"""
Filter manifest files by row ranges.
Only keep manifest files that have min_row_id and max_row_id and overlap with the given row ranges.
Args:
manifest_files: List of manifest file metadata
row_ranges: List of row ranges to filter by
Returns:
Filtered list of manifest files
"""
from pypaimon.utils.range import Range
filtered_files = []
for manifest in manifest_files:
min_row_id = manifest.min_row_id
max_row_id = manifest.max_row_id
# If min_row_id or max_row_id is None, we cannot filter, keep the file
if min_row_id is None or max_row_id is None:
filtered_files.append(manifest)
continue
# Check if manifest row range overlaps with any of the expected row ranges
manifest_row_range = Range(min_row_id, max_row_id)
should_keep = False
for expected_range in row_ranges:
# Check if ranges intersect
intersect = Range.intersect(
manifest_row_range.from_,
manifest_row_range.to,
expected_range.from_,
expected_range.to)
if intersect:
should_keep = True
break
if should_keep:
filtered_files.append(manifest)
return filtered_files
def _filter_manifest_entries_by_row_ranges(
entries: List[ManifestEntry],
row_ranges: List) -> List[ManifestEntry]:
if not row_ranges:
return []
filtered = []
for entry in entries:
first_row_id = entry.file.first_row_id
if first_row_id is None:
filtered.append(entry)
continue
file_range = entry.file.row_id_range()
for r in row_ranges:
if file_range.overlaps(r):
filtered.append(entry)
break
return filtered
class FileScanner:
def __init__(
self,
table,
manifest_scanner: Callable[[], List[ManifestFileMeta]],
predicate: Optional[Predicate] = None,
limit: Optional[int] = None,
vector_search: Optional['VectorSearch'] = None
):
from pypaimon.table.file_store_table import FileStoreTable
self.table: FileStoreTable = table
self.manifest_scanner = manifest_scanner
self.predicate = predicate
self.predicate_for_stats = remove_row_id_filter(predicate) if predicate else None
self.limit = limit
self.vector_search = vector_search
self.snapshot_manager = SnapshotManager(table)
self.manifest_list_manager = ManifestListManager(table)
self.manifest_file_manager = ManifestFileManager(table)
self.primary_key_predicate = trim_and_transform_predicate(
self.predicate, self.table.field_names, self.table.trimmed_primary_keys)
self.partition_key_predicate = trim_and_transform_predicate(
self.predicate, self.table.field_names, self.table.partition_keys)
options = self.table.options
# Get split target size and open file cost from table options
self.target_split_size = options.source_split_target_size()
self.open_file_cost = options.source_split_open_file_cost()
self.idx_of_this_subtask = None
self.number_of_para_subtasks = None
self.start_pos_of_this_subtask = None
self.end_pos_of_this_subtask = None
self.only_read_real_buckets = options.bucket() == BucketMode.POSTPONE_BUCKET.value
self.data_evolution = options.data_evolution_enabled()
self.deletion_vectors_enabled = options.deletion_vectors_enabled()
def schema_fields_func(schema_id: int):
return self.table.schema_manager.get_schema(schema_id).fields
self.simple_stats_evolutions = SimpleStatsEvolutions(
schema_fields_func,
self.table.table_schema.id
)
def _deletion_files_map(self, entries: List[ManifestEntry]) -> Dict[tuple, Dict[str, DeletionFile]]:
if not self.deletion_vectors_enabled:
return {}
# Extract unique partition-bucket pairs from file entries
bucket_files = set()
for e in entries:
bucket_files.add((tuple(e.partition.values), e.bucket))
return self._scan_dv_index(self.snapshot_manager.get_latest_snapshot(), bucket_files)
def scan(self) -> Plan:
start_ms = time.time() * 1000
# Create appropriate split generator based on table type
if self.table.is_primary_key_table:
entries = self.plan_files()
split_generator = PrimaryKeyTableSplitGenerator(
self.table,
self.target_split_size,
self.open_file_cost,
self._deletion_files_map(entries)
)
elif self.data_evolution:
entries, split_generator = self._create_data_evolution_split_generator()
else:
entries = self.plan_files()
split_generator = AppendTableSplitGenerator(
self.table,
self.target_split_size,
self.open_file_cost,
self._deletion_files_map(entries)
)
if not entries:
return Plan([])
# Configure sharding if needed
if self.idx_of_this_subtask is not None:
split_generator.with_shard(self.idx_of_this_subtask, self.number_of_para_subtasks)
elif self.start_pos_of_this_subtask is not None:
split_generator.with_slice(self.start_pos_of_this_subtask, self.end_pos_of_this_subtask)
# Generate splits
splits = split_generator.create_splits(entries)
splits = self._apply_push_down_limit(splits)
duration_ms = int(time.time() * 1000 - start_ms)
logger.info(
"File store scan plan completed in %d ms. Files size: %d",
duration_ms, len(entries)
)
return Plan(splits)
def _create_data_evolution_split_generator(self):
row_ranges = None
score_getter = None
global_index_result = self._eval_global_index()
if global_index_result is not None:
row_ranges = global_index_result.results().to_range_list()
if isinstance(global_index_result, ScoredGlobalIndexResult):
score_getter = global_index_result.score_getter()
if row_ranges is None and self.predicate is not None:
row_ranges = _row_ranges_from_predicate(self.predicate)
manifest_files = self.manifest_scanner()
# Filter manifest files by row ranges if available
if row_ranges is not None:
manifest_files = _filter_manifest_files_by_row_ranges(manifest_files, row_ranges)
entries = self.read_manifest_entries(manifest_files)
if row_ranges is not None:
entries = _filter_manifest_entries_by_row_ranges(entries, row_ranges)
return entries, DataEvolutionSplitGenerator(
self.table,
self.target_split_size,
self.open_file_cost,
self._deletion_files_map(entries),
row_ranges,
score_getter
)
def plan_files(self) -> List[ManifestEntry]:
manifest_files = self.manifest_scanner()
if len(manifest_files) == 0:
return []
return self.read_manifest_entries(manifest_files)
def _eval_global_index(self):
from pypaimon.globalindex.global_index_result import GlobalIndexResult
from pypaimon.globalindex.global_index_scan_builder import (
GlobalIndexScanBuilder
)
from pypaimon.utils.range import Range
# No filter and no vector search - nothing to evaluate
if self.predicate is None and self.vector_search is None:
return None
# Check if global index is enabled
if not self.table.options.global_index_enabled():
return None
# Get latest snapshot
snapshot = self.snapshot_manager.get_latest_snapshot()
if snapshot is None:
return None
# Check if table has store with global index scan builder
index_scan_builder = self.table.new_global_index_scan_builder()
if index_scan_builder is None:
return None
# Set partition predicate and snapshot
index_scan_builder.with_partition_predicate(
self.partition_key_predicate
).with_snapshot(snapshot)
# Get indexed row ranges
indexed_row_ranges = index_scan_builder.shard_list()
if not indexed_row_ranges:
return None
# Get next row ID from snapshot
next_row_id = snapshot.next_row_id
if next_row_id is None:
return None
# Calculate non-indexed row ranges
non_indexed_row_ranges = Range(0, next_row_id - 1).exclude(indexed_row_ranges)
# Get thread number from options (can be None, meaning use default)
thread_num = self.table.options.global_index_thread_num()
# Scan global index in parallel
result = GlobalIndexScanBuilder.parallel_scan(
indexed_row_ranges,
index_scan_builder,
self.predicate,
self.vector_search,
thread_num
)
if result is None:
return None
for row_range in non_indexed_row_ranges:
result = result.or_(GlobalIndexResult.from_range(row_range))
return result
def read_manifest_entries(self, manifest_files: List[ManifestFileMeta]) -> List[ManifestEntry]:
max_workers = max(8, self.table.options.scan_manifest_parallelism(os.cpu_count() or 8))
manifest_files = [entry for entry in manifest_files if self._filter_manifest_file(entry)]
return self.manifest_file_manager.read_entries_parallel(
manifest_files,
self._filter_manifest_entry,
max_workers=max_workers
)
def with_shard(self, idx_of_this_subtask: int, number_of_para_subtasks: int) -> 'FileScanner':
if idx_of_this_subtask >= number_of_para_subtasks:
raise ValueError("idx_of_this_subtask must be less than number_of_para_subtasks")
if self.start_pos_of_this_subtask is not None:
raise Exception("with_shard and with_slice cannot be used simultaneously")
self.idx_of_this_subtask = idx_of_this_subtask
self.number_of_para_subtasks = number_of_para_subtasks
return self
def with_slice(self, start_pos: int, end_pos: int) -> 'FileScanner':
if start_pos >= end_pos:
raise ValueError("start_pos must be less than end_pos")
if self.idx_of_this_subtask is not None:
raise Exception("with_slice and with_shard cannot be used simultaneously")
self.start_pos_of_this_subtask = start_pos
self.end_pos_of_this_subtask = end_pos
return self
def _apply_push_down_limit(self, splits: List[DataSplit]) -> List[DataSplit]:
if self.limit is None:
return splits
scanned_row_count = 0
limited_splits = []
for split in splits:
if split.raw_convertible:
limited_splits.append(split)
scanned_row_count += split.row_count
if scanned_row_count >= self.limit:
return limited_splits
return limited_splits
def _filter_manifest_file(self, file: ManifestFileMeta) -> bool:
if not self.partition_key_predicate:
return True
return self.partition_key_predicate.test_by_simple_stats(
file.partition_stats,
file.num_added_files + file.num_deleted_files)
def _filter_manifest_entry(self, entry: ManifestEntry) -> bool:
if self.only_read_real_buckets and entry.bucket < 0:
return False
if self.partition_key_predicate and not self.partition_key_predicate.test(entry.partition):
return False
# Get SimpleStatsEvolution for this schema
evolution = self.simple_stats_evolutions.get_or_create(entry.file.schema_id)
# Apply evolution to stats
if self.table.is_primary_key_table:
if self.deletion_vectors_enabled and entry.file.level == 0: # do not read level 0 file
return False
if not self.primary_key_predicate:
return True
return self.primary_key_predicate.test_by_simple_stats(
entry.file.key_stats,
entry.file.row_count
)
else:
if not self.predicate:
return True
if self.predicate_for_stats is None:
return True
# Data evolution: file stats may be from another schema, skip stats filter and filter in reader.
if self.data_evolution:
return True
if entry.file.value_stats_cols is None and entry.file.write_cols is not None:
stats_fields = entry.file.write_cols
else:
stats_fields = entry.file.value_stats_cols
evolved_stats = evolution.evolution(
entry.file.value_stats,
entry.file.row_count,
stats_fields
)
return self.predicate_for_stats.test_by_simple_stats(
evolved_stats,
entry.file.row_count
)
def _scan_dv_index(self, snapshot, buckets: Set[tuple]) -> Dict[tuple, Dict[str, DeletionFile]]:
"""
Scan deletion vector index from snapshot.
Returns a map of (partition, bucket) -> {filename -> DeletionFile}
Reference: SnapshotReaderImpl.scanDvIndex() in Java
"""
if not snapshot or not snapshot.index_manifest:
return {}
result = {}
# Read index manifest file
index_manifest_file = IndexManifestFile(self.table)
index_entries = index_manifest_file.read(snapshot.index_manifest)
# Filter by DELETION_VECTORS_INDEX type and requested buckets
for entry in index_entries:
if entry.index_file.index_type != IndexManifestFile.DELETION_VECTORS_INDEX:
continue
partition_bucket = (tuple(entry.partition.values), entry.bucket)
if partition_bucket not in buckets:
continue
# Convert to deletion files
deletion_files = self._to_deletion_files(entry)
if deletion_files:
result[partition_bucket] = deletion_files
return result
def _to_deletion_files(self, index_entry) -> Dict[str, DeletionFile]:
"""
Convert index manifest entry to deletion files map.
Returns {filename -> DeletionFile}
"""
deletion_files = {}
index_file = index_entry.index_file
# Check if dv_ranges exists
if not index_file.dv_ranges:
return deletion_files
# Build deletion file path
# Format: manifest/index-manifest-{uuid}
index_path = self.table.table_path.rstrip('/') + '/index'
dv_file_path = f"{index_path}/{index_file.file_name}"
# Convert each DeletionVectorMeta to DeletionFile
for data_file_name, dv_meta in index_file.dv_ranges.items():
deletion_file = DeletionFile(
dv_index_path=dv_file_path,
offset=dv_meta.offset,
length=dv_meta.length,
cardinality=dv_meta.cardinality
)
deletion_files[data_file_name] = deletion_file
return deletion_files