blob: 8e62c946e04de31b76ac6ce432e1c36237ff3ea4 [file] [log] [blame]
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
"""
Conflict detection for commit operations.
"""
from pypaimon.manifest.manifest_list_manager import ManifestListManager
from pypaimon.manifest.schema.data_file_meta import DataFileMeta
from pypaimon.manifest.schema.file_entry import FileEntry
from pypaimon.utils.range import Range
from pypaimon.utils.range_helper import RangeHelper
from pypaimon.write.commit.commit_scanner import CommitScanner
class ConflictDetection:
"""Detects conflicts between base and delta files during commit.
This class provides row ID range conflict checks and row ID from snapshot conflict checks
for Data Evolution tables.
"""
def __init__(self, data_evolution_enabled, snapshot_manager,
manifest_list_manager: ManifestListManager, table, commit_scanner: CommitScanner):
"""Initialize ConflictDetection.
Args:
data_evolution_enabled: Whether data evolution feature is enabled.
snapshot_manager: Manager for reading snapshot metadata.
manifest_list_manager: Manager for reading manifest lists.
table: The FileStoreTable instance.
"""
self.data_evolution_enabled = data_evolution_enabled
self.snapshot_manager = snapshot_manager
self.manifest_list_manager = manifest_list_manager
self.table = table
self._row_id_check_from_snapshot = None
self.commit_scanner = commit_scanner
def should_be_overwrite_commit(self):
"""Check if the commit should be treated as an overwrite commit.
returns True if rowIdCheckFromSnapshot is set.
Returns:
True if the commit should be treated as OVERWRITE.
"""
return self._row_id_check_from_snapshot is not None
def check_conflicts(self, latest_snapshot, base_entries, delta_entries, commit_kind):
"""Run all conflict checks and return the first detected conflict.
merges base_entries and delta_entries, then runs conflict checks
on the merged result.
Args:
latest_snapshot: The latest snapshot at commit time.
base_entries: All entries read from the latest snapshot.
delta_entries: The delta entries being committed.
commit_kind: The kind of commit (e.g. "APPEND", "COMPACT", "OVERWRITE").
Returns:
A RuntimeError if a conflict is detected, otherwise None.
"""
all_entries = list(base_entries) + list(delta_entries)
try:
merged_entries = FileEntry.merge_entries(all_entries)
except Exception as e:
return RuntimeError(
"File deletion conflicts detected! Give up committing. " + str(e))
conflict = self.check_row_id_range_conflicts(commit_kind, merged_entries)
if conflict is not None:
return conflict
return self.check_row_id_from_snapshot(latest_snapshot, delta_entries)
def check_row_id_range_conflicts(self, commit_kind, commit_entries):
"""Check for row ID range conflicts among merged entries.
only enabled when data evolution is active, and checks that
overlapping row ID ranges in non-blob data files are identical.
Args:
commit_kind: The kind of commit (e.g. "APPEND", "COMPACT").
commit_entries: The entries being committed.
Returns:
A RuntimeError if conflict is detected, otherwise None.
"""
if not self.data_evolution_enabled:
return None
if self._row_id_check_from_snapshot is None and commit_kind != "COMPACT":
return None
entries_with_row_id = [
entry for entry in commit_entries
if entry.file.first_row_id is not None
]
if not entries_with_row_id:
return None
range_helper = RangeHelper(lambda entry: entry.file.row_id_range())
merged_groups = range_helper.merge_overlapping_ranges(entries_with_row_id)
for group in merged_groups:
data_files = [
entry for entry in group
if not DataFileMeta.is_blob_file(entry.file.file_name)
]
if not range_helper.are_all_ranges_same(data_files):
file_descriptions = [
"{name}(rowId={row_id}, count={count})".format(
name=entry.file.file_name,
row_id=entry.file.first_row_id,
count=entry.file.row_count,
)
for entry in data_files
]
return RuntimeError(
"For Data Evolution table, multiple 'MERGE INTO' and 'COMPACT' "
"operations have encountered conflicts, data files: "
+ str(file_descriptions))
return None
def check_row_id_from_snapshot(self, latest_snapshot, commit_entries):
"""Check for row ID conflicts from a specific snapshot onwards.
collects row ID ranges from delta entries, then checks if any
incremental changes between the check snapshot and latest snapshot
have overlapping row ID ranges.
Args:
latest_snapshot: The latest snapshot at commit time.
commit_entries: The delta entries being committed.
Returns:
A RuntimeError if conflict is detected, otherwise None.
"""
if not self.data_evolution_enabled:
return None
if self._row_id_check_from_snapshot is None:
return None
history_id_ranges = []
for entry in commit_entries:
first_row_id = entry.file.first_row_id
row_count = entry.file.row_count
if first_row_id is not None:
history_id_ranges.append(
Range(first_row_id, first_row_id + row_count - 1))
check_snapshot = self.snapshot_manager.get_snapshot_by_id(
self._row_id_check_from_snapshot)
if check_snapshot is None or check_snapshot.next_row_id is None:
raise RuntimeError(
"Next row id cannot be null for snapshot "
"{snapshot}.".format(snapshot=self._row_id_check_from_snapshot))
check_next_row_id = check_snapshot.next_row_id
for snapshot_id in range(
self._row_id_check_from_snapshot + 1,
latest_snapshot.id + 1):
snapshot = self.snapshot_manager.get_snapshot_by_id(snapshot_id)
if snapshot is None:
continue
if snapshot.commit_kind == "COMPACT":
continue
incremental_entries = self.commit_scanner.read_incremental_entries_from_changed_partitions(
snapshot, commit_entries)
for entry in incremental_entries:
file_range = entry.file.row_id_range()
if file_range is None:
continue
if file_range.from_ < check_next_row_id:
for history_range in history_id_ranges:
if history_range.overlaps(file_range):
return RuntimeError(
"For Data Evolution table, multiple 'MERGE INTO' "
"operations have encountered conflicts, updating "
"the same file, which can render some updates "
"ineffective.")
return None