blob: 506d2579a7f05cc558b71a2e34b3ac77042a46b9 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.iceberg;
import java.io.IOException;
import java.util.Arrays;
import java.util.List;
import java.util.Map;
import java.util.Set;
import org.apache.iceberg.exceptions.RuntimeIOException;
import org.apache.iceberg.exceptions.ValidationException;
import org.apache.iceberg.expressions.Evaluator;
import org.apache.iceberg.expressions.Expression;
import org.apache.iceberg.expressions.Expressions;
import org.apache.iceberg.expressions.ManifestEvaluator;
import org.apache.iceberg.expressions.Projections;
import org.apache.iceberg.expressions.StrictMetricsEvaluator;
import org.apache.iceberg.relocated.com.google.common.base.Joiner;
import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
import org.apache.iceberg.relocated.com.google.common.collect.Iterables;
import org.apache.iceberg.relocated.com.google.common.collect.Lists;
import org.apache.iceberg.relocated.com.google.common.collect.Maps;
import org.apache.iceberg.relocated.com.google.common.collect.Sets;
import org.apache.iceberg.util.CharSequenceSet;
import org.apache.iceberg.util.CharSequenceWrapper;
import org.apache.iceberg.util.ManifestFileUtil;
import org.apache.iceberg.util.PartitionSet;
import org.apache.iceberg.util.Tasks;
import org.apache.iceberg.util.ThreadPools;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
abstract class ManifestFilterManager<F extends ContentFile<F>> {
private static final Logger LOG = LoggerFactory.getLogger(ManifestFilterManager.class);
private static final Joiner COMMA = Joiner.on(",");
protected static class DeleteException extends ValidationException {
private final String partition;
private DeleteException(String partition) {
super("Operation would delete existing data");
this.partition = partition;
}
public String partition() {
return partition;
}
}
private final Map<Integer, PartitionSpec> specsById;
private final PartitionSet deleteFilePartitions;
private final PartitionSet dropPartitions;
private final Set<CharSequence> deletePaths = CharSequenceSet.empty();
private Expression deleteExpression = Expressions.alwaysFalse();
private long minSequenceNumber = 0;
private boolean hasPathOnlyDeletes = false;
private boolean failAnyDelete = false;
private boolean failMissingDeletePaths = false;
private int duplicateDeleteCount = 0;
// cache filtered manifests to avoid extra work when commits fail.
private final Map<ManifestFile, ManifestFile> filteredManifests = Maps.newConcurrentMap();
// tracking where files were deleted to validate retries quickly
private final Map<ManifestFile, Iterable<F>> filteredManifestToDeletedFiles =
Maps.newConcurrentMap();
protected ManifestFilterManager(Map<Integer, PartitionSpec> specsById) {
this.specsById = specsById;
this.deleteFilePartitions = PartitionSet.create(specsById);
this.dropPartitions = PartitionSet.create(specsById);
}
protected abstract void deleteFile(String location);
protected abstract ManifestWriter<F> newManifestWriter(PartitionSpec spec);
protected abstract ManifestReader<F> newManifestReader(ManifestFile manifest);
protected void failAnyDelete() {
this.failAnyDelete = true;
}
protected void failMissingDeletePaths() {
this.failMissingDeletePaths = true;
}
/**
* Add a filter to match files to delete. A file will be deleted if all of the rows it contains
* match this or any other filter passed to this method.
*
* @param expr an expression to match rows.
*/
protected void deleteByRowFilter(Expression expr) {
Preconditions.checkNotNull(expr, "Cannot delete files using filter: null");
invalidateFilteredCache();
this.deleteExpression = Expressions.or(deleteExpression, expr);
}
/**
* Add a partition tuple to drop from the table during the delete phase.
*/
protected void dropPartition(int specId, StructLike partition) {
Preconditions.checkNotNull(partition, "Cannot delete files in invalid partition: null");
invalidateFilteredCache();
dropPartitions.add(specId, partition);
}
/**
* Set the sequence number used to remove old delete files.
* <p>
* Delete files with a sequence number older than the given value will be removed. By setting this to the sequence
* number of the oldest data file in the table, this will continuously remove delete files that are no longer needed
* because deletes cannot match any existing rows in the table.
*
* @param sequenceNumber a sequence number used to remove old delete files
*/
protected void dropDeleteFilesOlderThan(long sequenceNumber) {
Preconditions.checkArgument(sequenceNumber >= 0,
"Invalid minimum data sequence number: %s", sequenceNumber);
this.minSequenceNumber = sequenceNumber;
}
/**
* Add a specific path to be deleted in the new snapshot.
*/
void delete(F file) {
Preconditions.checkNotNull(file, "Cannot delete file: null");
invalidateFilteredCache();
deletePaths.add(file.path());
deleteFilePartitions.add(file.specId(), file.partition());
}
/**
* Add a specific path to be deleted in the new snapshot.
*/
void delete(CharSequence path) {
Preconditions.checkNotNull(path, "Cannot delete file path: null");
invalidateFilteredCache();
this.hasPathOnlyDeletes = true;
deletePaths.add(path);
}
/**
* Filter deleted files out of a list of manifests.
*
* @param tableSchema the current table schema
* @param manifests a list of manifests to be filtered
* @return an array of filtered manifests
*/
List<ManifestFile> filterManifests(Schema tableSchema, List<ManifestFile> manifests) {
if (manifests == null || manifests.isEmpty()) {
validateRequiredDeletes();
return ImmutableList.of();
}
// use a common metrics evaluator for all manifests because it is bound to the table schema
StrictMetricsEvaluator metricsEvaluator = new StrictMetricsEvaluator(tableSchema, deleteExpression);
ManifestFile[] filtered = new ManifestFile[manifests.size()];
// open all of the manifest files in parallel, use index to avoid reordering
Tasks.range(filtered.length)
.stopOnFailure().throwFailureWhenFinished()
.executeWith(ThreadPools.getWorkerPool())
.run(index -> {
ManifestFile manifest = filterManifest(metricsEvaluator, manifests.get(index));
filtered[index] = manifest;
});
validateRequiredDeletes(filtered);
return Arrays.asList(filtered);
}
/**
* Creates a snapshot summary builder with the files deleted from the set of filtered manifests.
*
* @param manifests a set of filtered manifests
*/
SnapshotSummary.Builder buildSummary(Iterable<ManifestFile> manifests) {
SnapshotSummary.Builder summaryBuilder = SnapshotSummary.builder();
for (ManifestFile manifest : manifests) {
PartitionSpec manifestSpec = specsById.get(manifest.partitionSpecId());
Iterable<F> manifestDeletes = filteredManifestToDeletedFiles.get(manifest);
if (manifestDeletes != null) {
for (F file : manifestDeletes) {
summaryBuilder.deletedFile(manifestSpec, file);
}
}
}
summaryBuilder.incrementDuplicateDeletes(duplicateDeleteCount);
return summaryBuilder;
}
/**
* Throws a {@link ValidationException} if any deleted file was not present in a filtered manifest.
*
* @param manifests a set of filtered manifests
*/
private void validateRequiredDeletes(ManifestFile... manifests) {
if (failMissingDeletePaths) {
Set<CharSequence> deletedFiles = deletedFiles(manifests);
ValidationException.check(deletedFiles.containsAll(deletePaths),
"Missing required files to delete: %s",
COMMA.join(Iterables.filter(deletePaths, path -> !deletedFiles.contains(path))));
}
}
private Set<CharSequence> deletedFiles(ManifestFile[] manifests) {
Set<CharSequence> deletedFiles = CharSequenceSet.empty();
if (manifests != null) {
for (ManifestFile manifest : manifests) {
Iterable<F> manifestDeletes = filteredManifestToDeletedFiles.get(manifest);
if (manifestDeletes != null) {
for (F file : manifestDeletes) {
deletedFiles.add(file.path());
}
}
}
}
return deletedFiles;
}
/**
* Deletes filtered manifests that were created by this class, but are not in the committed manifest set.
*
* @param committed the set of manifest files that were committed
*/
void cleanUncommitted(Set<ManifestFile> committed) {
// iterate over a copy of entries to avoid concurrent modification
List<Map.Entry<ManifestFile, ManifestFile>> filterEntries =
Lists.newArrayList(filteredManifests.entrySet());
for (Map.Entry<ManifestFile, ManifestFile> entry : filterEntries) {
// remove any new filtered manifests that aren't in the committed list
ManifestFile manifest = entry.getKey();
ManifestFile filtered = entry.getValue();
if (!committed.contains(filtered)) {
// only delete if the filtered copy was created
if (!manifest.equals(filtered)) {
deleteFile(filtered.path());
}
// remove the entry from the cache
filteredManifests.remove(manifest);
}
}
}
private void invalidateFilteredCache() {
cleanUncommitted(SnapshotProducer.EMPTY_SET);
}
/**
* @return a ManifestReader that is a filtered version of the input manifest.
*/
private ManifestFile filterManifest(StrictMetricsEvaluator metricsEvaluator, ManifestFile manifest) {
ManifestFile cached = filteredManifests.get(manifest);
if (cached != null) {
return cached;
}
boolean hasLiveFiles = manifest.hasAddedFiles() || manifest.hasExistingFiles();
if (!hasLiveFiles || !canContainDeletedFiles(manifest)) {
filteredManifests.put(manifest, manifest);
return manifest;
}
try (ManifestReader<F> reader = newManifestReader(manifest)) {
// this assumes that the manifest doesn't have files to remove and streams through the
// manifest without copying data. if a manifest does have a file to remove, this will break
// out of the loop and move on to filtering the manifest.
boolean hasDeletedFiles = manifestHasDeletedFiles(metricsEvaluator, reader);
if (!hasDeletedFiles) {
filteredManifests.put(manifest, manifest);
return manifest;
}
return filterManifestWithDeletedFiles(metricsEvaluator, manifest, reader);
} catch (IOException e) {
throw new RuntimeIOException(e, "Failed to close manifest: %s", manifest);
}
}
private boolean canContainDeletedFiles(ManifestFile manifest) {
boolean canContainExpressionDeletes;
if (deleteExpression != null && deleteExpression != Expressions.alwaysFalse()) {
ManifestEvaluator manifestEvaluator =
ManifestEvaluator.forRowFilter(deleteExpression, specsById.get(manifest.partitionSpecId()), true);
canContainExpressionDeletes = manifestEvaluator.eval(manifest);
} else {
canContainExpressionDeletes = false;
}
boolean canContainDroppedPartitions;
if (dropPartitions.size() > 0) {
canContainDroppedPartitions = ManifestFileUtil.canContainAny(manifest, dropPartitions, specsById);
} else {
canContainDroppedPartitions = false;
}
boolean canContainDroppedFiles;
if (hasPathOnlyDeletes) {
canContainDroppedFiles = true;
} else if (deletePaths.size() > 0) {
// because there were no path-only deletes, the set of deleted file partitions is valid
canContainDroppedFiles = ManifestFileUtil.canContainAny(manifest, deleteFilePartitions, specsById);
} else {
canContainDroppedFiles = false;
}
boolean canContainDropBySeq = manifest.content() == ManifestContent.DELETES &&
manifest.minSequenceNumber() < minSequenceNumber;
return canContainExpressionDeletes || canContainDroppedPartitions || canContainDroppedFiles || canContainDropBySeq;
}
private boolean manifestHasDeletedFiles(
StrictMetricsEvaluator metricsEvaluator, ManifestReader<F> reader) {
boolean isDelete = reader.isDeleteManifestReader();
Evaluator inclusive = inclusiveDeleteEvaluator(reader.spec());
Evaluator strict = strictDeleteEvaluator(reader.spec());
boolean hasDeletedFiles = false;
for (ManifestEntry<F> entry : reader.entries()) {
F file = entry.file();
boolean fileDelete = deletePaths.contains(file.path()) ||
dropPartitions.contains(file.specId(), file.partition()) ||
(isDelete && entry.sequenceNumber() > 0 && entry.sequenceNumber() < minSequenceNumber);
if (fileDelete || inclusive.eval(file.partition())) {
ValidationException.check(
fileDelete || strict.eval(file.partition()) || metricsEvaluator.eval(file),
"Cannot delete file where some, but not all, rows match filter %s: %s",
this.deleteExpression, file.path());
hasDeletedFiles = true;
if (failAnyDelete) {
throw new DeleteException(reader.spec().partitionToPath(file.partition()));
}
break; // as soon as a deleted file is detected, stop scanning
}
}
return hasDeletedFiles;
}
private ManifestFile filterManifestWithDeletedFiles(
StrictMetricsEvaluator metricsEvaluator, ManifestFile manifest, ManifestReader<F> reader) {
boolean isDelete = reader.isDeleteManifestReader();
Evaluator inclusive = inclusiveDeleteEvaluator(reader.spec());
Evaluator strict = strictDeleteEvaluator(reader.spec());
// when this point is reached, there is at least one file that will be deleted in the
// manifest. produce a copy of the manifest with all deleted files removed.
List<F> deletedFiles = Lists.newArrayList();
Set<CharSequenceWrapper> deletedPaths = Sets.newHashSet();
try {
ManifestWriter<F> writer = newManifestWriter(reader.spec());
try {
reader.entries().forEach(entry -> {
F file = entry.file();
boolean fileDelete = deletePaths.contains(file.path()) ||
dropPartitions.contains(file.specId(), file.partition()) ||
(isDelete && entry.sequenceNumber() > 0 && entry.sequenceNumber() < minSequenceNumber);
if (entry.status() != ManifestEntry.Status.DELETED) {
if (fileDelete || inclusive.eval(file.partition())) {
ValidationException.check(
fileDelete || strict.eval(file.partition()) || metricsEvaluator.eval(file),
"Cannot delete file where some, but not all, rows match filter %s: %s",
this.deleteExpression, file.path());
writer.delete(entry);
CharSequenceWrapper wrapper = CharSequenceWrapper.wrap(entry.file().path());
if (deletedPaths.contains(wrapper)) {
LOG.warn("Deleting a duplicate path from manifest {}: {}",
manifest.path(), wrapper.get());
duplicateDeleteCount += 1;
} else {
// only add the file to deletes if it is a new delete
// this keeps the snapshot summary accurate for non-duplicate data
deletedFiles.add(entry.file().copyWithoutStats());
}
deletedPaths.add(wrapper);
} else {
writer.existing(entry);
}
}
});
} finally {
writer.close();
}
// return the filtered manifest as a reader
ManifestFile filtered = writer.toManifestFile();
// update caches
filteredManifests.put(manifest, filtered);
filteredManifestToDeletedFiles.put(filtered, deletedFiles);
return filtered;
} catch (IOException e) {
throw new RuntimeIOException(e, "Failed to close manifest writer");
}
}
private Evaluator strictDeleteEvaluator(PartitionSpec spec) {
Expression strictExpr = Projections.strict(spec).project(deleteExpression);
return new Evaluator(spec.partitionType(), strictExpr);
}
private Evaluator inclusiveDeleteEvaluator(PartitionSpec spec) {
Expression inclusiveExpr = Projections.inclusive(spec).project(deleteExpression);
return new Evaluator(spec.partitionType(), inclusiveExpr);
}
}