blob: 1acff6bcc8ff91431347b33ea7082e27d0c2c1e0 [file]
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
from concurrent.futures import ThreadPoolExecutor, as_completed
from typing import Dict, List, Tuple, Set
from pypaimon.common.identifier import Identifier
from pypaimon.common.options.core_options import CoreOptions
from pypaimon.table.row.blob import BlobDescriptor, BlobViewStruct
from pypaimon.table.special_fields import SpecialFields
from pypaimon.utils.range import Range
_PRELOAD_THREAD_NUM = 100
_MIN_ROWS_PER_TASK = 100
class TableReferences:
"""Groups BlobViewStruct references by upstream table."""
def __init__(self, identifier: Identifier):
self.identifier: Identifier = identifier
self.references_by_field: Dict[int, List[BlobViewStruct]] = {}
self.row_ids: List[int] = []
def add(self, view_struct: BlobViewStruct) -> None:
self.references_by_field.setdefault(view_struct.field_id, []).append(view_struct)
self.row_ids.append(int(view_struct.row_id))
class TableReadPlan:
"""A plan for reading blob descriptors from one upstream table."""
def __init__(self, identifier: Identifier, upstream_table,
read_fields: List, row_ranges: List[Range]):
self.identifier: Identifier = identifier
self.upstream_table = upstream_table
self.read_fields: List = read_fields
self.row_ranges: List[Range] = row_ranges
class BlobViewLookup:
"""Resolve BlobViewStruct references by reading upstream blob descriptors."""
def __init__(self, table):
self._table = table
self._descriptor_cache: Dict[BlobViewStruct, BlobDescriptor] = {}
self._null_value_cache: Set[BlobViewStruct] = set()
def preload(self, view_structs: List[BlobViewStruct]):
if not view_structs:
return
grouped: Dict[str, TableReferences] = self._group_by_table(view_structs)
plans: List[TableReadPlan] = []
for table_refs in grouped.values():
plans.append(self._create_table_read_plan(table_refs))
target_rows: int = self._target_rows_per_task(plans)
tasks: List[Tuple[TableReadPlan, List[Range]]] = []
for plan in plans:
for range_chunk in self._split_row_ranges(plan.row_ranges, target_rows):
tasks.append((plan, range_chunk))
if len(tasks) <= 1:
for plan, range_chunk in tasks:
descriptors, null_values = self._load_descriptor_chunk(plan, range_chunk)
self._descriptor_cache.update(descriptors)
self._null_value_cache.update(null_values)
return
with ThreadPoolExecutor(max_workers=min(_PRELOAD_THREAD_NUM, len(tasks))) as executor:
futures = {
executor.submit(self._load_descriptor_chunk, plan, range_chunk): (plan, range_chunk)
for plan, range_chunk in tasks
}
for future in as_completed(futures):
try:
descriptors, null_values = future.result()
self._descriptor_cache.update(descriptors)
self._null_value_cache.update(null_values)
except Exception as exc:
# Cancel remaining futures that have not started yet so a single
# failure can abort the rest of the preload work as early as possible.
for pending_future in futures:
pending_future.cancel()
raise RuntimeError("Failed to preload blob descriptors.") from exc
def resolve_descriptor(self, view_struct: BlobViewStruct) -> BlobDescriptor:
descriptor: BlobDescriptor = self._descriptor_cache.get(view_struct)
if descriptor is None:
if view_struct in self._null_value_cache:
raise ValueError(
"BlobViewStruct {} resolves to a null blob value.".format(view_struct)
)
raise ValueError(
"Cannot resolve BlobViewStruct {} because row id {} was not found "
"in upstream table.".format(view_struct, view_struct.row_id)
)
return descriptor
def resolve_to_null(self, view_struct: BlobViewStruct) -> bool:
if view_struct in self._null_value_cache:
return True
if view_struct not in self._descriptor_cache:
raise ValueError(
"Cannot resolve BlobViewStruct {} because row id {} was not found "
"in upstream table.".format(view_struct, view_struct.row_id)
)
return False
def _group_by_table(
self, view_structs: List[BlobViewStruct]
) -> Dict[str, TableReferences]:
grouped: Dict[str, TableReferences] = {}
for view_struct in view_structs:
key = view_struct.identifier.get_full_name()
if key not in grouped:
grouped[key] = TableReferences(view_struct.identifier)
grouped[key].add(view_struct)
return grouped
def _create_table_read_plan(self, table_refs: TableReferences) -> TableReadPlan:
upstream_table = self._load_table(table_refs.identifier)
fields: List = []
for field_id in table_refs.references_by_field:
fields.append(self._field_by_id(upstream_table, field_id))
read_fields = SpecialFields.row_type_with_row_id(fields)
return TableReadPlan(
table_refs.identifier, upstream_table, read_fields,
Range.to_ranges(table_refs.row_ids))
def _load_descriptor_chunk(
self, plan: TableReadPlan, row_ranges: List[Range]
) -> Tuple[Dict[BlobViewStruct, BlobDescriptor], set]:
identifier: Identifier = plan.identifier
upstream_table = plan.upstream_table
read_fields = plan.read_fields
projection_field_names: List[str] = [f.name for f in read_fields]
descriptor_table = upstream_table.copy({CoreOptions.BLOB_AS_DESCRIPTOR.key(): "true"})
read_builder = descriptor_table.new_read_builder().with_projection(projection_field_names)
if SpecialFields.ROW_ID.name not in [
data_field.name for data_field in read_builder.read_type()
]:
raise ValueError(
"Cannot resolve blob view for table {} because row tracking is not readable."
.format(identifier.get_full_name())
)
predicate_builder = read_builder.new_predicate_builder()
range_predicates: List = []
for r in row_ranges:
if r.from_ == r.to:
range_predicates.append(
predicate_builder.equal(SpecialFields.ROW_ID.name, r.from_))
else:
range_predicates.append(
predicate_builder.between(SpecialFields.ROW_ID.name, r.from_, r.to))
if len(range_predicates) == 1:
predicate = range_predicates[0]
else:
predicate = predicate_builder.or_predicates(range_predicates)
read_builder.with_filter(predicate)
result = read_builder.new_read().to_arrow(read_builder.new_scan().plan().splits())
if SpecialFields.ROW_ID.name not in result.schema.names:
raise ValueError(
"Cannot resolve blob view for table {} because row tracking is not readable."
.format(identifier.get_full_name())
)
row_id_values: List = result.column(SpecialFields.ROW_ID.name).to_pylist()
resolved: Dict[BlobViewStruct, BlobDescriptor] = {}
null_values: set = set()
for field in read_fields:
if field.name == SpecialFields.ROW_ID.name:
continue
if field.name not in result.schema.names:
continue
values = result.column(field.name).to_pylist()
for row_id, value in zip(row_id_values, values):
view_struct = BlobViewStruct(
identifier.get_full_name(), field.id, int(row_id))
if value is None:
null_values.add(view_struct)
continue
descriptor = BlobDescriptor.deserialize(value)
resolved[view_struct] = descriptor
return resolved, null_values
@staticmethod
def _split_row_ranges(
row_ranges: List[Range], target_rows_per_task: int
) -> List[List[Range]]:
"""
Split row ranges into multiple chunks for parallel task processing.
"""
if not row_ranges:
return []
chunks: List[List[Range]] = []
current_chunk: List[Range] = []
current_chunk_rows: int = 0
for r in row_ranges:
next_from = r.from_
# Process current range until all rows are allocated
while next_from <= r.to:
# If current chunk is full, save it and start a new one
if current_chunk_rows == target_rows_per_task:
chunks.append(current_chunk)
current_chunk = []
current_chunk_rows = 0
# Calculate remaining capacity in current chunk
remaining = target_rows_per_task - current_chunk_rows
# Determine the end position for this allocation (don't exceed range boundary)
next_to = min(r.to, next_from + remaining - 1)
# Add the allocated range to current chunk
current_chunk.append(Range(next_from, next_to))
current_chunk_rows += next_to - next_from + 1
# Move to next unallocated position
next_from = next_to + 1
# Don't forget the last chunk if it has any ranges
if current_chunk:
chunks.append(current_chunk)
return chunks
@staticmethod
def _target_rows_per_task(plans: List[TableReadPlan]) -> int:
total_rows: int = 0
for plan in plans:
for r in plan.row_ranges:
total_rows += r.count()
if total_rows <= 0:
return _MIN_ROWS_PER_TASK
return max(_MIN_ROWS_PER_TASK, (total_rows + _PRELOAD_THREAD_NUM - 1) // _PRELOAD_THREAD_NUM)
def _load_table(self, identifier: Identifier):
catalog = self._table.catalog_environment.catalog_loader.load()
return catalog.get_table(identifier)
@staticmethod
def _field_by_id(table, field_id: int) -> 'DataField':
for field in table.table_schema.fields:
if field.id == field_id:
return field
raise ValueError(
"Cannot find blob fieldId {} in upstream table {}."
.format(field_id, table.identifier.get_full_name())
)