blob: 3fcd6ec037da939b49b9c866d22b17c3e3b6ba13 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.iceberg;
import static org.apache.iceberg.TableProperties.MANIFEST_MIN_MERGE_COUNT;
import static org.apache.iceberg.TableProperties.MANIFEST_MIN_MERGE_COUNT_DEFAULT;
import static org.apache.iceberg.TableProperties.MANIFEST_TARGET_SIZE_BYTES;
import static org.apache.iceberg.TableProperties.MANIFEST_TARGET_SIZE_BYTES_DEFAULT;
import static org.apache.iceberg.TableProperties.SNAPSHOT_ID_INHERITANCE_ENABLED;
import static org.apache.iceberg.TableProperties.SNAPSHOT_ID_INHERITANCE_ENABLED_DEFAULT;
import java.io.IOException;
import java.io.UncheckedIOException;
import java.util.Arrays;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
import org.apache.iceberg.events.CreateSnapshotEvent;
import org.apache.iceberg.exceptions.RuntimeIOException;
import org.apache.iceberg.exceptions.ValidationException;
import org.apache.iceberg.expressions.Expression;
import org.apache.iceberg.expressions.Expressions;
import org.apache.iceberg.io.CloseableIterable;
import org.apache.iceberg.io.CloseableIterator;
import org.apache.iceberg.io.InputFile;
import org.apache.iceberg.io.OutputFile;
import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
import org.apache.iceberg.relocated.com.google.common.base.Predicate;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableSet;
import org.apache.iceberg.relocated.com.google.common.collect.Iterables;
import org.apache.iceberg.relocated.com.google.common.collect.Iterators;
import org.apache.iceberg.relocated.com.google.common.collect.Lists;
import org.apache.iceberg.relocated.com.google.common.collect.Maps;
import org.apache.iceberg.relocated.com.google.common.collect.Sets;
import org.apache.iceberg.util.CharSequenceSet;
import org.apache.iceberg.util.Pair;
import org.apache.iceberg.util.PartitionSet;
import org.apache.iceberg.util.SnapshotUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
abstract class MergingSnapshotProducer<ThisT> extends SnapshotProducer<ThisT> {
private static final Logger LOG = LoggerFactory.getLogger(MergingSnapshotProducer.class);
// data is only added in "append" and "overwrite" operations
private static final Set<String> VALIDATE_ADDED_FILES_OPERATIONS =
ImmutableSet.of(DataOperations.APPEND, DataOperations.OVERWRITE);
// data files are removed in "overwrite", "replace", and "delete"
private static final Set<String> VALIDATE_DATA_FILES_EXIST_OPERATIONS =
ImmutableSet.of(DataOperations.OVERWRITE, DataOperations.REPLACE, DataOperations.DELETE);
private static final Set<String> VALIDATE_DATA_FILES_EXIST_SKIP_DELETE_OPERATIONS =
ImmutableSet.of(DataOperations.OVERWRITE, DataOperations.REPLACE);
// delete files can be added in "overwrite" or "delete" operations
private static final Set<String> VALIDATE_ADDED_DELETE_FILES_OPERATIONS =
ImmutableSet.of(DataOperations.OVERWRITE, DataOperations.DELETE);
private final String tableName;
private final TableOperations ops;
private final SnapshotSummary.Builder summaryBuilder = SnapshotSummary.builder();
private final ManifestMergeManager<DataFile> mergeManager;
private final ManifestFilterManager<DataFile> filterManager;
private final ManifestMergeManager<DeleteFile> deleteMergeManager;
private final ManifestFilterManager<DeleteFile> deleteFilterManager;
private final boolean snapshotIdInheritanceEnabled;
// update data
private final List<DataFile> newDataFiles = Lists.newArrayList();
private Long newDataFilesDataSequenceNumber;
private final Map<Integer, List<DeleteFileHolder>> newDeleteFilesBySpec = Maps.newHashMap();
private final List<ManifestFile> appendManifests = Lists.newArrayList();
private final List<ManifestFile> rewrittenAppendManifests = Lists.newArrayList();
private final SnapshotSummary.Builder addedFilesSummary = SnapshotSummary.builder();
private final SnapshotSummary.Builder appendedManifestsSummary = SnapshotSummary.builder();
private Expression deleteExpression = Expressions.alwaysFalse();
private PartitionSpec dataSpec;
// cache new data manifests after writing
private List<ManifestFile> cachedNewDataManifests = null;
private boolean hasNewDataFiles = false;
// cache new manifests for delete files
private final List<ManifestFile> cachedNewDeleteManifests = Lists.newLinkedList();
private boolean hasNewDeleteFiles = false;
private boolean caseSensitive = true;
MergingSnapshotProducer(String tableName, TableOperations ops) {
super(ops);
this.tableName = tableName;
this.ops = ops;
this.dataSpec = null;
long targetSizeBytes =
ops.current()
.propertyAsLong(MANIFEST_TARGET_SIZE_BYTES, MANIFEST_TARGET_SIZE_BYTES_DEFAULT);
int minCountToMerge =
ops.current().propertyAsInt(MANIFEST_MIN_MERGE_COUNT, MANIFEST_MIN_MERGE_COUNT_DEFAULT);
boolean mergeEnabled =
ops.current()
.propertyAsBoolean(
TableProperties.MANIFEST_MERGE_ENABLED,
TableProperties.MANIFEST_MERGE_ENABLED_DEFAULT);
this.mergeManager = new DataFileMergeManager(targetSizeBytes, minCountToMerge, mergeEnabled);
this.filterManager = new DataFileFilterManager();
this.deleteMergeManager =
new DeleteFileMergeManager(targetSizeBytes, minCountToMerge, mergeEnabled);
this.deleteFilterManager = new DeleteFileFilterManager();
this.snapshotIdInheritanceEnabled =
ops.current()
.propertyAsBoolean(
SNAPSHOT_ID_INHERITANCE_ENABLED, SNAPSHOT_ID_INHERITANCE_ENABLED_DEFAULT);
}
@Override
public ThisT set(String property, String value) {
summaryBuilder.set(property, value);
return self();
}
public ThisT caseSensitive(boolean isCaseSensitive) {
this.caseSensitive = isCaseSensitive;
filterManager.caseSensitive(isCaseSensitive);
deleteFilterManager.caseSensitive(isCaseSensitive);
return self();
}
protected boolean isCaseSensitive() {
return caseSensitive;
}
protected PartitionSpec dataSpec() {
Preconditions.checkState(
dataSpec != null, "Cannot determine partition spec: no data files have been added");
// the spec is set when the write is started
return dataSpec;
}
protected Expression rowFilter() {
return deleteExpression;
}
protected List<DataFile> addedDataFiles() {
return ImmutableList.copyOf(newDataFiles);
}
protected void failAnyDelete() {
filterManager.failAnyDelete();
deleteFilterManager.failAnyDelete();
}
protected void failMissingDeletePaths() {
filterManager.failMissingDeletePaths();
deleteFilterManager.failMissingDeletePaths();
}
/**
* Add a filter to match files to delete. A file will be deleted if all of the rows it contains
* match this or any other filter passed to this method.
*
* @param expr an expression to match rows.
*/
protected void deleteByRowFilter(Expression expr) {
this.deleteExpression = expr;
filterManager.deleteByRowFilter(expr);
// if a delete file matches the row filter, then it can be deleted because the rows will also be
// deleted
deleteFilterManager.deleteByRowFilter(expr);
}
/** Add a partition tuple to drop from the table during the delete phase. */
protected void dropPartition(int specId, StructLike partition) {
// dropping the data in a partition also drops all deletes in the partition
filterManager.dropPartition(specId, partition);
deleteFilterManager.dropPartition(specId, partition);
}
/** Add a specific data file to be deleted in the new snapshot. */
protected void delete(DataFile file) {
filterManager.delete(file);
}
/** Add a specific delete file to be deleted in the new snapshot. */
protected void delete(DeleteFile file) {
deleteFilterManager.delete(file);
}
/** Add a specific data path to be deleted in the new snapshot. */
protected void delete(CharSequence path) {
// this is an old call that never worked for delete files and can only be used to remove data
// files.
filterManager.delete(path);
}
protected boolean deletesDataFiles() {
return filterManager.containsDeletes();
}
protected boolean deletesDeleteFiles() {
return deleteFilterManager.containsDeletes();
}
protected boolean addsDataFiles() {
return newDataFiles.size() > 0;
}
protected boolean addsDeleteFiles() {
return newDeleteFilesBySpec.size() > 0;
}
/** Add a data file to the new snapshot. */
protected void add(DataFile file) {
Preconditions.checkNotNull(file, "Invalid data file: null");
setDataSpec(file);
addedFilesSummary.addedFile(dataSpec(), file);
hasNewDataFiles = true;
newDataFiles.add(file);
}
/** Add a delete file to the new snapshot. */
protected void add(DeleteFile file) {
Preconditions.checkNotNull(file, "Invalid delete file: null");
add(new DeleteFileHolder(file));
}
/** Add a delete file to the new snapshot. */
protected void add(DeleteFile file, long dataSequenceNumber) {
Preconditions.checkNotNull(file, "Invalid delete file: null");
add(new DeleteFileHolder(file, dataSequenceNumber));
}
private void add(DeleteFileHolder fileHolder) {
int specId = fileHolder.deleteFile().specId();
PartitionSpec fileSpec = ops.current().spec(specId);
List<DeleteFileHolder> deleteFiles =
newDeleteFilesBySpec.computeIfAbsent(specId, s -> Lists.newArrayList());
deleteFiles.add(fileHolder);
addedFilesSummary.addedFile(fileSpec, fileHolder.deleteFile());
hasNewDeleteFiles = true;
}
private void setDataSpec(DataFile file) {
PartitionSpec fileSpec = ops.current().spec(file.specId());
Preconditions.checkNotNull(
fileSpec, "Cannot find partition spec for data file: %s", file.path());
if (dataSpec == null) {
dataSpec = fileSpec;
} else if (dataSpec.specId() != file.specId()) {
throw new ValidationException("Invalid data file, expected spec id: %d", dataSpec.specId());
}
}
/** Add all files in a manifest to the new snapshot. */
protected void add(ManifestFile manifest) {
Preconditions.checkArgument(
manifest.content() == ManifestContent.DATA, "Cannot append delete manifest: %s", manifest);
if (snapshotIdInheritanceEnabled && manifest.snapshotId() == null) {
appendedManifestsSummary.addedManifest(manifest);
appendManifests.add(manifest);
} else {
// the manifest must be rewritten with this update's snapshot ID
ManifestFile copiedManifest = copyManifest(manifest);
rewrittenAppendManifests.add(copiedManifest);
}
}
private ManifestFile copyManifest(ManifestFile manifest) {
TableMetadata current = ops.current();
InputFile toCopy = ops.io().newInputFile(manifest.path());
OutputFile newManifestPath = newManifestOutput();
return ManifestFiles.copyAppendManifest(
current.formatVersion(),
manifest.partitionSpecId(),
toCopy,
current.specsById(),
newManifestPath,
snapshotId(),
appendedManifestsSummary);
}
/**
* Validates that no files matching given partitions have been added to the table since a starting
* snapshot.
*
* @param base table metadata to validate
* @param startingSnapshotId id of the snapshot current at the start of the operation
* @param partitionSet a set of partitions to filter new conflicting data files
* @param parent ending snapshot on the lineage being validated
*/
protected void validateAddedDataFiles(
TableMetadata base, Long startingSnapshotId, PartitionSet partitionSet, Snapshot parent) {
CloseableIterable<ManifestEntry<DataFile>> conflictEntries =
addedDataFiles(base, startingSnapshotId, null, partitionSet, parent);
try (CloseableIterator<ManifestEntry<DataFile>> conflicts = conflictEntries.iterator()) {
if (conflicts.hasNext()) {
throw new ValidationException(
"Found conflicting files that can contain records matching partitions %s: %s",
partitionSet,
Iterators.toString(
Iterators.transform(conflicts, entry -> entry.file().path().toString())));
}
} catch (IOException e) {
throw new UncheckedIOException(
String.format("Failed to validate no appends matching %s", partitionSet), e);
}
}
/**
* Validates that no files matching a filter have been added to the table since a starting
* snapshot.
*
* @param base table metadata to validate
* @param startingSnapshotId id of the snapshot current at the start of the operation
* @param conflictDetectionFilter an expression used to find new conflicting data files
*/
protected void validateAddedDataFiles(
TableMetadata base,
Long startingSnapshotId,
Expression conflictDetectionFilter,
Snapshot parent) {
CloseableIterable<ManifestEntry<DataFile>> conflictEntries =
addedDataFiles(base, startingSnapshotId, conflictDetectionFilter, null, parent);
try (CloseableIterator<ManifestEntry<DataFile>> conflicts = conflictEntries.iterator()) {
if (conflicts.hasNext()) {
throw new ValidationException(
"Found conflicting files that can contain records matching %s: %s",
conflictDetectionFilter,
Iterators.toString(
Iterators.transform(conflicts, entry -> entry.file().path().toString())));
}
} catch (IOException e) {
throw new UncheckedIOException(
String.format("Failed to validate no appends matching %s", conflictDetectionFilter), e);
}
}
/**
* Returns an iterable of files matching a filter have been added to a branch since a starting
* snapshot.
*
* @param base table metadata to validate
* @param startingSnapshotId id of the snapshot current at the start of the operation
* @param dataFilter an expression used to find new data files
* @param partitionSet a set of partitions to find new data files
* @param parent ending snapshot of the branch
*/
private CloseableIterable<ManifestEntry<DataFile>> addedDataFiles(
TableMetadata base,
Long startingSnapshotId,
Expression dataFilter,
PartitionSet partitionSet,
Snapshot parent) {
// if there is no current table state, no files have been added
if (parent == null) {
return CloseableIterable.empty();
}
Pair<List<ManifestFile>, Set<Long>> history =
validationHistory(
base,
startingSnapshotId,
VALIDATE_ADDED_FILES_OPERATIONS,
ManifestContent.DATA,
parent);
List<ManifestFile> manifests = history.first();
Set<Long> newSnapshots = history.second();
ManifestGroup manifestGroup =
new ManifestGroup(ops.io(), manifests, ImmutableList.of())
.caseSensitive(caseSensitive)
.filterManifestEntries(entry -> newSnapshots.contains(entry.snapshotId()))
.specsById(base.specsById())
.ignoreDeleted()
.ignoreExisting();
if (dataFilter != null) {
manifestGroup = manifestGroup.filterData(dataFilter);
}
if (partitionSet != null) {
manifestGroup =
manifestGroup.filterManifestEntries(
entry -> partitionSet.contains(entry.file().specId(), entry.file().partition()));
}
return manifestGroup.entries();
}
/**
* Validates that no new delete files that must be applied to the given data files have been added
* to the table since a starting snapshot.
*
* @param base table metadata to validate
* @param startingSnapshotId id of the snapshot current at the start of the operation
* @param dataFiles data files to validate have no new row deletes
* @param parent ending snapshot on the branch being validated
*/
protected void validateNoNewDeletesForDataFiles(
TableMetadata base, Long startingSnapshotId, Iterable<DataFile> dataFiles, Snapshot parent) {
validateNoNewDeletesForDataFiles(
base, startingSnapshotId, null, dataFiles, newDataFilesDataSequenceNumber != null, parent);
}
/**
* Validates that no new delete files that must be applied to the given data files have been added
* to the table since a starting snapshot.
*
* @param base table metadata to validate
* @param startingSnapshotId id of the snapshot current at the start of the operation
* @param dataFilter a data filter
* @param dataFiles data files to validate have no new row deletes
* @param parent ending snapshot on the branch being validated
*/
protected void validateNoNewDeletesForDataFiles(
TableMetadata base,
Long startingSnapshotId,
Expression dataFilter,
Iterable<DataFile> dataFiles,
Snapshot parent) {
validateNoNewDeletesForDataFiles(
base, startingSnapshotId, dataFilter, dataFiles, false, parent);
}
/**
* Validates that no new delete files that must be applied to the given data files have been added
* to the table since a starting snapshot, with the option to ignore equality deletes during the
* validation.
*
* <p>For example, in the case of rewriting data files, if the added data files have the same
* sequence number as the replaced data files, equality deletes added at a higher sequence number
* are still effective against the added data files, so there is no risk of commit conflict
* between RewriteFiles and RowDelta. In cases like this, validation against equality delete files
* can be omitted.
*
* @param base table metadata to validate
* @param startingSnapshotId id of the snapshot current at the start of the operation
* @param dataFilter a data filter
* @param dataFiles data files to validate have no new row deletes
* @param ignoreEqualityDeletes whether equality deletes should be ignored in validation
* @param parent ending snapshot on the branch being validated
*/
private void validateNoNewDeletesForDataFiles(
TableMetadata base,
Long startingSnapshotId,
Expression dataFilter,
Iterable<DataFile> dataFiles,
boolean ignoreEqualityDeletes,
Snapshot parent) {
// if there is no current table state, no files have been added
if (parent == null || base.formatVersion() < 2) {
return;
}
DeleteFileIndex deletes = addedDeleteFiles(base, startingSnapshotId, dataFilter, null, parent);
long startingSequenceNumber = startingSequenceNumber(base, startingSnapshotId);
for (DataFile dataFile : dataFiles) {
// if any delete is found that applies to files written in or before the starting snapshot,
// fail
DeleteFile[] deleteFiles = deletes.forDataFile(startingSequenceNumber, dataFile);
if (ignoreEqualityDeletes) {
ValidationException.check(
Arrays.stream(deleteFiles)
.noneMatch(deleteFile -> deleteFile.content() == FileContent.POSITION_DELETES),
"Cannot commit, found new position delete for replaced data file: %s",
dataFile);
} else {
ValidationException.check(
deleteFiles.length == 0,
"Cannot commit, found new delete for replaced data file: %s",
dataFile);
}
}
}
/**
* Validates that no delete files matching a filter have been added to the table since a starting
* snapshot.
*
* @param base table metadata to validate
* @param startingSnapshotId id of the snapshot current at the start of the operation
* @param dataFilter an expression used to find new conflicting delete files
* @param parent ending snapshot on the branch being validated
*/
protected void validateNoNewDeleteFiles(
TableMetadata base, Long startingSnapshotId, Expression dataFilter, Snapshot parent) {
DeleteFileIndex deletes = addedDeleteFiles(base, startingSnapshotId, dataFilter, null, parent);
ValidationException.check(
deletes.isEmpty(),
"Found new conflicting delete files that can apply to records matching %s: %s",
dataFilter,
Iterables.transform(deletes.referencedDeleteFiles(), ContentFile::path));
}
/**
* Validates that no delete files matching a partition set have been added to the table since a
* starting snapshot.
*
* @param base table metadata to validate
* @param startingSnapshotId id of the snapshot current at the start of the operation
* @param partitionSet a partition set used to find new conflicting delete files
* @param parent ending snapshot on the branch being validated
*/
protected void validateNoNewDeleteFiles(
TableMetadata base, Long startingSnapshotId, PartitionSet partitionSet, Snapshot parent) {
DeleteFileIndex deletes =
addedDeleteFiles(base, startingSnapshotId, null, partitionSet, parent);
ValidationException.check(
deletes.isEmpty(),
"Found new conflicting delete files that can apply to records matching %s: %s",
partitionSet,
Iterables.transform(deletes.referencedDeleteFiles(), ContentFile::path));
}
/**
* Returns matching delete files have been added to the table since a starting snapshot.
*
* @param base table metadata to validate
* @param startingSnapshotId id of the snapshot current at the start of the operation
* @param dataFilter an expression used to find delete files
* @param partitionSet a partition set used to find delete files
* @param parent parent snapshot of the branch
*/
protected DeleteFileIndex addedDeleteFiles(
TableMetadata base,
Long startingSnapshotId,
Expression dataFilter,
PartitionSet partitionSet,
Snapshot parent) {
// if there is no current table state, return empty delete file index
if (parent == null || base.formatVersion() < 2) {
return DeleteFileIndex.builderFor(ops.io(), ImmutableList.of())
.specsById(base.specsById())
.build();
}
Pair<List<ManifestFile>, Set<Long>> history =
validationHistory(
base,
startingSnapshotId,
VALIDATE_ADDED_DELETE_FILES_OPERATIONS,
ManifestContent.DELETES,
parent);
List<ManifestFile> deleteManifests = history.first();
long startingSequenceNumber = startingSequenceNumber(base, startingSnapshotId);
return buildDeleteFileIndex(deleteManifests, startingSequenceNumber, dataFilter, partitionSet);
}
/**
* Validates that no files matching a filter have been deleted from the table since a starting
* snapshot.
*
* @param base table metadata to validate
* @param startingSnapshotId id of the snapshot current at the start of the operation
* @param dataFilter an expression used to find deleted data files
* @param parent ending snapshot on the branch being validated
*/
protected void validateDeletedDataFiles(
TableMetadata base, Long startingSnapshotId, Expression dataFilter, Snapshot parent) {
CloseableIterable<ManifestEntry<DataFile>> conflictEntries =
deletedDataFiles(base, startingSnapshotId, dataFilter, null, parent);
try (CloseableIterator<ManifestEntry<DataFile>> conflicts = conflictEntries.iterator()) {
if (conflicts.hasNext()) {
throw new ValidationException(
"Found conflicting deleted files that can contain records matching %s: %s",
dataFilter,
Iterators.toString(
Iterators.transform(conflicts, entry -> entry.file().path().toString())));
}
} catch (IOException e) {
throw new UncheckedIOException(
String.format("Failed to validate no deleted data files matching %s", dataFilter), e);
}
}
/**
* Validates that no files matching a filter have been deleted from the table since a starting
* snapshot.
*
* @param base table metadata to validate
* @param startingSnapshotId id of the snapshot current at the start of the operation
* @param partitionSet a partition set used to find deleted data files
* @param parent ending snapshot on the branch being validated
*/
protected void validateDeletedDataFiles(
TableMetadata base, Long startingSnapshotId, PartitionSet partitionSet, Snapshot parent) {
CloseableIterable<ManifestEntry<DataFile>> conflictEntries =
deletedDataFiles(base, startingSnapshotId, null, partitionSet, parent);
try (CloseableIterator<ManifestEntry<DataFile>> conflicts = conflictEntries.iterator()) {
if (conflicts.hasNext()) {
throw new ValidationException(
"Found conflicting deleted files that can apply to records matching %s: %s",
partitionSet,
Iterators.toString(
Iterators.transform(conflicts, entry -> entry.file().path().toString())));
}
} catch (IOException e) {
throw new UncheckedIOException(
String.format("Failed to validate no appends matching %s", partitionSet), e);
}
}
/**
* Returns an iterable of files matching a filter have been added to the table since a starting
* snapshot.
*
* @param base table metadata to validate
* @param startingSnapshotId id of the snapshot current at the start of the operation
* @param dataFilter an expression used to find deleted data files
* @param partitionSet a set of partitions to find deleted data files
* @param parent ending snapshot on the branch being validated
*/
private CloseableIterable<ManifestEntry<DataFile>> deletedDataFiles(
TableMetadata base,
Long startingSnapshotId,
Expression dataFilter,
PartitionSet partitionSet,
Snapshot parent) {
// if there is no current table state, no files have been deleted
if (parent == null) {
return CloseableIterable.empty();
}
Pair<List<ManifestFile>, Set<Long>> history =
validationHistory(
base,
startingSnapshotId,
VALIDATE_DATA_FILES_EXIST_OPERATIONS,
ManifestContent.DATA,
parent);
List<ManifestFile> manifests = history.first();
Set<Long> newSnapshots = history.second();
ManifestGroup manifestGroup =
new ManifestGroup(ops.io(), manifests, ImmutableList.of())
.caseSensitive(caseSensitive)
.filterManifestEntries(entry -> newSnapshots.contains(entry.snapshotId()))
.filterManifestEntries(entry -> entry.status().equals(ManifestEntry.Status.DELETED))
.specsById(base.specsById())
.ignoreExisting();
if (dataFilter != null) {
manifestGroup = manifestGroup.filterData(dataFilter);
}
if (partitionSet != null) {
manifestGroup =
manifestGroup.filterManifestEntries(
entry -> partitionSet.contains(entry.file().specId(), entry.file().partition()));
}
return manifestGroup.entries();
}
protected void setNewDataFilesDataSequenceNumber(long sequenceNumber) {
this.newDataFilesDataSequenceNumber = sequenceNumber;
}
private long startingSequenceNumber(TableMetadata metadata, Long startingSnapshotId) {
if (startingSnapshotId != null && metadata.snapshot(startingSnapshotId) != null) {
Snapshot startingSnapshot = metadata.snapshot(startingSnapshotId);
return startingSnapshot.sequenceNumber();
} else {
return TableMetadata.INITIAL_SEQUENCE_NUMBER;
}
}
private DeleteFileIndex buildDeleteFileIndex(
List<ManifestFile> deleteManifests,
long startingSequenceNumber,
Expression dataFilter,
PartitionSet partitionSet) {
DeleteFileIndex.Builder builder =
DeleteFileIndex.builderFor(ops.io(), deleteManifests)
.afterSequenceNumber(startingSequenceNumber)
.caseSensitive(caseSensitive)
.specsById(ops.current().specsById());
if (dataFilter != null) {
builder.filterData(dataFilter);
}
if (partitionSet != null) {
builder.filterPartitions(partitionSet);
}
return builder.build();
}
@SuppressWarnings("CollectionUndefinedEquality")
protected void validateDataFilesExist(
TableMetadata base,
Long startingSnapshotId,
CharSequenceSet requiredDataFiles,
boolean skipDeletes,
Expression conflictDetectionFilter,
Snapshot parent) {
// if there is no current table state, no files have been removed
if (parent == null) {
return;
}
Set<String> matchingOperations =
skipDeletes
? VALIDATE_DATA_FILES_EXIST_SKIP_DELETE_OPERATIONS
: VALIDATE_DATA_FILES_EXIST_OPERATIONS;
Pair<List<ManifestFile>, Set<Long>> history =
validationHistory(
base, startingSnapshotId, matchingOperations, ManifestContent.DATA, parent);
List<ManifestFile> manifests = history.first();
Set<Long> newSnapshots = history.second();
ManifestGroup matchingDeletesGroup =
new ManifestGroup(ops.io(), manifests, ImmutableList.of())
.filterManifestEntries(
entry ->
entry.status() != ManifestEntry.Status.ADDED
&& newSnapshots.contains(entry.snapshotId())
&& requiredDataFiles.contains(entry.file().path()))
.specsById(base.specsById())
.ignoreExisting();
if (conflictDetectionFilter != null) {
matchingDeletesGroup.filterData(conflictDetectionFilter);
}
try (CloseableIterator<ManifestEntry<DataFile>> deletes =
matchingDeletesGroup.entries().iterator()) {
if (deletes.hasNext()) {
throw new ValidationException(
"Cannot commit, missing data files: %s",
Iterators.toString(
Iterators.transform(deletes, entry -> entry.file().path().toString())));
}
} catch (IOException e) {
throw new UncheckedIOException("Failed to validate required files exist", e);
}
}
private Pair<List<ManifestFile>, Set<Long>> validationHistory(
TableMetadata base,
Long startingSnapshotId,
Set<String> matchingOperations,
ManifestContent content,
Snapshot parent) {
List<ManifestFile> manifests = Lists.newArrayList();
Set<Long> newSnapshots = Sets.newHashSet();
Snapshot lastSnapshot = null;
Iterable<Snapshot> snapshots =
SnapshotUtil.ancestorsBetween(parent.snapshotId(), startingSnapshotId, base::snapshot);
for (Snapshot currentSnapshot : snapshots) {
lastSnapshot = currentSnapshot;
if (matchingOperations.contains(currentSnapshot.operation())) {
newSnapshots.add(currentSnapshot.snapshotId());
if (content == ManifestContent.DATA) {
for (ManifestFile manifest : currentSnapshot.dataManifests(ops.io())) {
if (manifest.snapshotId() == currentSnapshot.snapshotId()) {
manifests.add(manifest);
}
}
} else {
for (ManifestFile manifest : currentSnapshot.deleteManifests(ops.io())) {
if (manifest.snapshotId() == currentSnapshot.snapshotId()) {
manifests.add(manifest);
}
}
}
}
}
ValidationException.check(
lastSnapshot == null || Objects.equals(lastSnapshot.parentId(), startingSnapshotId),
"Cannot determine history between starting snapshot %s and the last known ancestor %s",
startingSnapshotId,
lastSnapshot != null ? lastSnapshot.snapshotId() : null);
return Pair.of(manifests, newSnapshots);
}
@Override
protected Map<String, String> summary() {
summaryBuilder.setPartitionSummaryLimit(
ops.current()
.propertyAsInt(
TableProperties.WRITE_PARTITION_SUMMARY_LIMIT,
TableProperties.WRITE_PARTITION_SUMMARY_LIMIT_DEFAULT));
return summaryBuilder.build();
}
@Override
public List<ManifestFile> apply(TableMetadata base, Snapshot snapshot) {
// filter any existing manifests
List<ManifestFile> filtered =
filterManager.filterManifests(
SnapshotUtil.schemaFor(base, targetBranch()),
snapshot != null ? snapshot.dataManifests(ops.io()) : null);
long minDataSequenceNumber =
filtered.stream()
.map(ManifestFile::minSequenceNumber)
.filter(
seq ->
seq
!= ManifestWriter
.UNASSIGNED_SEQ) // filter out unassigned in rewritten manifests
.reduce(base.lastSequenceNumber(), Math::min);
deleteFilterManager.dropDeleteFilesOlderThan(minDataSequenceNumber);
List<ManifestFile> filteredDeletes =
deleteFilterManager.filterManifests(
SnapshotUtil.schemaFor(base, targetBranch()),
snapshot != null ? snapshot.deleteManifests(ops.io()) : null);
// only keep manifests that have live data files or that were written by this commit
Predicate<ManifestFile> shouldKeep =
manifest ->
manifest.hasAddedFiles()
|| manifest.hasExistingFiles()
|| manifest.snapshotId() == snapshotId();
Iterable<ManifestFile> unmergedManifests =
Iterables.filter(Iterables.concat(prepareNewDataManifests(), filtered), shouldKeep);
Iterable<ManifestFile> unmergedDeleteManifests =
Iterables.filter(Iterables.concat(prepareDeleteManifests(), filteredDeletes), shouldKeep);
// update the snapshot summary
summaryBuilder.clear();
summaryBuilder.merge(addedFilesSummary);
summaryBuilder.merge(appendedManifestsSummary);
summaryBuilder.merge(filterManager.buildSummary(filtered));
summaryBuilder.merge(deleteFilterManager.buildSummary(filteredDeletes));
List<ManifestFile> manifests = Lists.newArrayList();
Iterables.addAll(manifests, mergeManager.mergeManifests(unmergedManifests));
Iterables.addAll(manifests, deleteMergeManager.mergeManifests(unmergedDeleteManifests));
return manifests;
}
@Override
public Object updateEvent() {
long snapshotId = snapshotId();
Snapshot justSaved = ops.refresh().snapshot(snapshotId);
long sequenceNumber = TableMetadata.INVALID_SEQUENCE_NUMBER;
Map<String, String> summary;
if (justSaved == null) {
// The snapshot just saved may not be present if the latest metadata couldn't be loaded due to
// eventual
// consistency problems in refresh.
LOG.warn("Failed to load committed snapshot: omitting sequence number from notifications");
summary = summary();
} else {
sequenceNumber = justSaved.sequenceNumber();
summary = justSaved.summary();
}
return new CreateSnapshotEvent(tableName, operation(), snapshotId, sequenceNumber, summary);
}
@SuppressWarnings("checkstyle:CyclomaticComplexity")
private void cleanUncommittedAppends(Set<ManifestFile> committed) {
if (cachedNewDataManifests != null) {
boolean hasDeletes = false;
for (ManifestFile manifest : cachedNewDataManifests) {
if (!committed.contains(manifest)) {
deleteFile(manifest.path());
hasDeletes = true;
}
}
if (hasDeletes) {
this.cachedNewDataManifests = null;
}
}
boolean hasDeleteDeletes = false;
for (ManifestFile cachedNewDeleteManifest : cachedNewDeleteManifests) {
if (!committed.contains(cachedNewDeleteManifest)) {
deleteFile(cachedNewDeleteManifest.path());
hasDeleteDeletes = true;
}
}
if (hasDeleteDeletes) {
this.cachedNewDeleteManifests.clear();
}
// rewritten manifests are always owned by the table
for (ManifestFile manifest : rewrittenAppendManifests) {
if (!committed.contains(manifest)) {
deleteFile(manifest.path());
}
}
// manifests that are not rewritten are only owned by the table if the commit succeeded
if (!committed.isEmpty()) {
// the commit succeeded if at least one manifest was committed
// the table now owns appendManifests; clean up any that are not used
for (ManifestFile manifest : appendManifests) {
if (!committed.contains(manifest)) {
deleteFile(manifest.path());
}
}
}
}
@Override
protected void cleanUncommitted(Set<ManifestFile> committed) {
mergeManager.cleanUncommitted(committed);
filterManager.cleanUncommitted(committed);
deleteMergeManager.cleanUncommitted(committed);
deleteFilterManager.cleanUncommitted(committed);
cleanUncommittedAppends(committed);
}
private Iterable<ManifestFile> prepareNewDataManifests() {
Iterable<ManifestFile> newManifests;
if (newDataFiles.size() > 0) {
List<ManifestFile> dataFileManifests = newDataFilesAsManifests();
newManifests = Iterables.concat(dataFileManifests, appendManifests, rewrittenAppendManifests);
} else {
newManifests = Iterables.concat(appendManifests, rewrittenAppendManifests);
}
return Iterables.transform(
newManifests,
manifest -> GenericManifestFile.copyOf(manifest).withSnapshotId(snapshotId()).build());
}
private List<ManifestFile> newDataFilesAsManifests() {
if (hasNewDataFiles && cachedNewDataManifests != null) {
cachedNewDataManifests.forEach(file -> deleteFile(file.path()));
cachedNewDataManifests = null;
}
if (cachedNewDataManifests == null) {
try {
RollingManifestWriter<DataFile> writer = newRollingManifestWriter(dataSpec());
try {
if (newDataFilesDataSequenceNumber == null) {
newDataFiles.forEach(writer::add);
} else {
newDataFiles.forEach(f -> writer.add(f, newDataFilesDataSequenceNumber));
}
} finally {
writer.close();
}
this.cachedNewDataManifests = writer.toManifestFiles();
this.hasNewDataFiles = false;
} catch (IOException e) {
throw new RuntimeIOException(e, "Failed to close manifest writer");
}
}
return cachedNewDataManifests;
}
private Iterable<ManifestFile> prepareDeleteManifests() {
if (newDeleteFilesBySpec.isEmpty()) {
return ImmutableList.of();
}
return newDeleteFilesAsManifests();
}
private List<ManifestFile> newDeleteFilesAsManifests() {
if (hasNewDeleteFiles && cachedNewDeleteManifests.size() > 0) {
for (ManifestFile cachedNewDeleteManifest : cachedNewDeleteManifests) {
deleteFile(cachedNewDeleteManifest.path());
}
// this triggers a rewrite of all delete manifests even if there is only one new delete file
// if there is a relevant use case in the future, the behavior can be optimized
cachedNewDeleteManifests.clear();
}
if (cachedNewDeleteManifests.isEmpty()) {
newDeleteFilesBySpec.forEach(
(specId, deleteFiles) -> {
PartitionSpec spec = ops.current().spec(specId);
try {
RollingManifestWriter<DeleteFile> writer = newRollingDeleteManifestWriter(spec);
try {
deleteFiles.forEach(
df -> {
if (df.dataSequenceNumber() != null) {
writer.add(df.deleteFile(), df.dataSequenceNumber());
} else {
writer.add(df.deleteFile());
}
});
} finally {
writer.close();
}
cachedNewDeleteManifests.addAll(writer.toManifestFiles());
} catch (IOException e) {
throw new RuntimeIOException(e, "Failed to close manifest writer");
}
});
this.hasNewDeleteFiles = false;
}
return cachedNewDeleteManifests;
}
private class DataFileFilterManager extends ManifestFilterManager<DataFile> {
private DataFileFilterManager() {
super(ops.current().specsById(), MergingSnapshotProducer.this::workerPool);
}
@Override
protected void deleteFile(String location) {
MergingSnapshotProducer.this.deleteFile(location);
}
@Override
protected ManifestWriter<DataFile> newManifestWriter(PartitionSpec manifestSpec) {
return MergingSnapshotProducer.this.newManifestWriter(manifestSpec);
}
@Override
protected ManifestReader<DataFile> newManifestReader(ManifestFile manifest) {
return MergingSnapshotProducer.this.newManifestReader(manifest);
}
}
private class DataFileMergeManager extends ManifestMergeManager<DataFile> {
DataFileMergeManager(long targetSizeBytes, int minCountToMerge, boolean mergeEnabled) {
super(
targetSizeBytes, minCountToMerge, mergeEnabled, MergingSnapshotProducer.this::workerPool);
}
@Override
protected long snapshotId() {
return MergingSnapshotProducer.this.snapshotId();
}
@Override
protected PartitionSpec spec(int specId) {
return ops.current().spec(specId);
}
@Override
protected void deleteFile(String location) {
MergingSnapshotProducer.this.deleteFile(location);
}
@Override
protected ManifestWriter<DataFile> newManifestWriter(PartitionSpec manifestSpec) {
return MergingSnapshotProducer.this.newManifestWriter(manifestSpec);
}
@Override
protected ManifestReader<DataFile> newManifestReader(ManifestFile manifest) {
return MergingSnapshotProducer.this.newManifestReader(manifest);
}
}
private class DeleteFileFilterManager extends ManifestFilterManager<DeleteFile> {
private DeleteFileFilterManager() {
super(ops.current().specsById(), MergingSnapshotProducer.this::workerPool);
}
@Override
protected void deleteFile(String location) {
MergingSnapshotProducer.this.deleteFile(location);
}
@Override
protected ManifestWriter<DeleteFile> newManifestWriter(PartitionSpec manifestSpec) {
return MergingSnapshotProducer.this.newDeleteManifestWriter(manifestSpec);
}
@Override
protected ManifestReader<DeleteFile> newManifestReader(ManifestFile manifest) {
return MergingSnapshotProducer.this.newDeleteManifestReader(manifest);
}
}
private class DeleteFileMergeManager extends ManifestMergeManager<DeleteFile> {
DeleteFileMergeManager(long targetSizeBytes, int minCountToMerge, boolean mergeEnabled) {
super(
targetSizeBytes, minCountToMerge, mergeEnabled, MergingSnapshotProducer.this::workerPool);
}
@Override
protected long snapshotId() {
return MergingSnapshotProducer.this.snapshotId();
}
@Override
protected PartitionSpec spec(int specId) {
return ops.current().spec(specId);
}
@Override
protected void deleteFile(String location) {
MergingSnapshotProducer.this.deleteFile(location);
}
@Override
protected ManifestWriter<DeleteFile> newManifestWriter(PartitionSpec manifestSpec) {
return MergingSnapshotProducer.this.newDeleteManifestWriter(manifestSpec);
}
@Override
protected ManifestReader<DeleteFile> newManifestReader(ManifestFile manifest) {
return MergingSnapshotProducer.this.newDeleteManifestReader(manifest);
}
}
private static class DeleteFileHolder {
private final DeleteFile deleteFile;
private final Long dataSequenceNumber;
/**
* Wrap a delete file for commit with a given data sequence number
*
* @param deleteFile delete file
* @param dataSequenceNumber data sequence number to apply
*/
DeleteFileHolder(DeleteFile deleteFile, long dataSequenceNumber) {
this.deleteFile = deleteFile;
this.dataSequenceNumber = dataSequenceNumber;
}
/**
* Wrap a delete file for commit with the latest sequence number
*
* @param deleteFile delete file
*/
DeleteFileHolder(DeleteFile deleteFile) {
this.deleteFile = deleteFile;
this.dataSequenceNumber = null;
}
public DeleteFile deleteFile() {
return deleteFile;
}
public Long dataSequenceNumber() {
return dataSequenceNumber;
}
}
}