blob: 7605e1c3548bd496db51aa3e6b24bc75b4799485 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.iceberg;
import com.github.benmanes.caffeine.cache.Caffeine;
import com.github.benmanes.caffeine.cache.LoadingCache;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.Arrays;
import java.util.Comparator;
import java.util.List;
import java.util.Map;
import java.util.Queue;
import java.util.Set;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.ExecutorService;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import org.apache.iceberg.exceptions.RuntimeIOException;
import org.apache.iceberg.expressions.Expression;
import org.apache.iceberg.expressions.Expressions;
import org.apache.iceberg.expressions.ManifestEvaluator;
import org.apache.iceberg.expressions.Projections;
import org.apache.iceberg.io.CloseableIterable;
import org.apache.iceberg.io.FileIO;
import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
import org.apache.iceberg.relocated.com.google.common.collect.Iterables;
import org.apache.iceberg.relocated.com.google.common.collect.ListMultimap;
import org.apache.iceberg.relocated.com.google.common.collect.Lists;
import org.apache.iceberg.relocated.com.google.common.collect.Maps;
import org.apache.iceberg.relocated.com.google.common.collect.Multimaps;
import org.apache.iceberg.relocated.com.google.common.collect.Sets;
import org.apache.iceberg.types.Comparators;
import org.apache.iceberg.types.Conversions;
import org.apache.iceberg.types.Type;
import org.apache.iceberg.types.Types;
import org.apache.iceberg.util.Pair;
import org.apache.iceberg.util.StructLikeWrapper;
import org.apache.iceberg.util.Tasks;
/**
* An index of {@link DeleteFile delete files} by sequence number.
* <p>
* Use {@link #builderFor(FileIO, Iterable)} to construct an index, and {@link #forDataFile(long, DataFile)} or
* {@link #forEntry(ManifestEntry)} to get the the delete files to apply to a given data file.
*/
class DeleteFileIndex {
private final Map<Integer, PartitionSpec> specsById;
private final Map<Integer, Types.StructType> partitionTypeById;
private final Map<Integer, ThreadLocal<StructLikeWrapper>> wrapperById;
private final long[] globalSeqs;
private final DeleteFile[] globalDeletes;
private final Map<Pair<Integer, StructLikeWrapper>, Pair<long[], DeleteFile[]>> sortedDeletesByPartition;
DeleteFileIndex(Map<Integer, PartitionSpec> specsById, long[] globalSeqs, DeleteFile[] globalDeletes,
Map<Pair<Integer, StructLikeWrapper>, Pair<long[], DeleteFile[]>> sortedDeletesByPartition) {
this.specsById = specsById;
ImmutableMap.Builder<Integer, Types.StructType> builder = ImmutableMap.builder();
specsById.forEach((specId, spec) -> builder.put(specId, spec.partitionType()));
this.partitionTypeById = builder.build();
this.wrapperById = Maps.newConcurrentMap();
this.globalSeqs = globalSeqs;
this.globalDeletes = globalDeletes;
this.sortedDeletesByPartition = sortedDeletesByPartition;
}
public boolean isEmpty() {
return (globalDeletes == null || globalDeletes.length == 0) && sortedDeletesByPartition.isEmpty();
}
private StructLikeWrapper newWrapper(int specId) {
return StructLikeWrapper.forType(partitionTypeById.get(specId));
}
private Pair<Integer, StructLikeWrapper> partition(int specId, StructLike struct) {
ThreadLocal<StructLikeWrapper> wrapper = wrapperById.computeIfAbsent(specId,
id -> ThreadLocal.withInitial(() -> newWrapper(id)));
return Pair.of(specId, wrapper.get().set(struct));
}
DeleteFile[] forEntry(ManifestEntry<DataFile> entry) {
return forDataFile(entry.sequenceNumber(), entry.file());
}
DeleteFile[] forDataFile(long sequenceNumber, DataFile file) {
Pair<Integer, StructLikeWrapper> partition = partition(file.specId(), file.partition());
Pair<long[], DeleteFile[]> partitionDeletes = sortedDeletesByPartition.get(partition);
Stream<DeleteFile> matchingDeletes;
if (partitionDeletes == null) {
matchingDeletes = limitBySequenceNumber(sequenceNumber, globalSeqs, globalDeletes);
} else if (globalDeletes == null) {
matchingDeletes = limitBySequenceNumber(sequenceNumber, partitionDeletes.first(), partitionDeletes.second());
} else {
matchingDeletes = Stream.concat(
limitBySequenceNumber(sequenceNumber, globalSeqs, globalDeletes),
limitBySequenceNumber(sequenceNumber, partitionDeletes.first(), partitionDeletes.second()));
}
return matchingDeletes
.filter(deleteFile -> canContainDeletesForFile(file, deleteFile, specsById.get(file.specId()).schema()))
.toArray(DeleteFile[]::new);
}
private static boolean canContainDeletesForFile(DataFile dataFile, DeleteFile deleteFile, Schema schema) {
switch (deleteFile.content()) {
case POSITION_DELETES:
return canContainPosDeletesForFile(dataFile, deleteFile);
case EQUALITY_DELETES:
return canContainEqDeletesForFile(dataFile, deleteFile, schema);
}
return true;
}
private static boolean canContainPosDeletesForFile(DataFile dataFile, DeleteFile deleteFile) {
// check that the delete file can contain the data file's file_path
Map<Integer, ByteBuffer> lowers = deleteFile.lowerBounds();
Map<Integer, ByteBuffer> uppers = deleteFile.upperBounds();
if (lowers == null || uppers == null) {
return true;
}
Type pathType = MetadataColumns.DELETE_FILE_PATH.type();
int pathId = MetadataColumns.DELETE_FILE_PATH.fieldId();
Comparator<CharSequence> comparator = Comparators.charSequences();
ByteBuffer lower = lowers.get(pathId);
if (lower != null && comparator.compare(dataFile.path(), Conversions.fromByteBuffer(pathType, lower)) < 0) {
return false;
}
ByteBuffer upper = uppers.get(pathId);
if (upper != null && comparator.compare(dataFile.path(), Conversions.fromByteBuffer(pathType, upper)) > 0) {
return false;
}
return true;
}
@SuppressWarnings("checkstyle:CyclomaticComplexity")
private static boolean canContainEqDeletesForFile(DataFile dataFile, DeleteFile deleteFile, Schema schema) {
// whether to check data ranges or to assume that the ranges match
// if upper/lower bounds are missing, null counts may still be used to determine delete files can be skipped
boolean checkRanges = dataFile.lowerBounds() != null && dataFile.upperBounds() != null &&
deleteFile.lowerBounds() != null && deleteFile.upperBounds() != null;
Map<Integer, ByteBuffer> dataLowers = dataFile.lowerBounds();
Map<Integer, ByteBuffer> dataUppers = dataFile.upperBounds();
Map<Integer, ByteBuffer> deleteLowers = deleteFile.lowerBounds();
Map<Integer, ByteBuffer> deleteUppers = deleteFile.upperBounds();
Map<Integer, Long> dataNullCounts = dataFile.nullValueCounts();
Map<Integer, Long> dataValueCounts = dataFile.valueCounts();
Map<Integer, Long> deleteNullCounts = deleteFile.nullValueCounts();
Map<Integer, Long> deleteValueCounts = deleteFile.valueCounts();
for (int id : deleteFile.equalityFieldIds()) {
Types.NestedField field = schema.findField(id);
if (!field.type().isPrimitiveType()) {
// stats are not kept for nested types. assume that the delete file may match
continue;
}
if (containsNull(dataNullCounts, field) && containsNull(deleteNullCounts, field)) {
// the data has null values and null has been deleted, so the deletes must be applied
continue;
}
if (allNull(dataNullCounts, dataValueCounts, field) && allNonNull(deleteNullCounts, field)) {
// the data file contains only null values for this field, but there are no deletes for null values
return false;
}
if (allNull(deleteNullCounts, deleteValueCounts, field) && allNonNull(dataNullCounts, field)) {
// the delete file removes only null rows with null for this field, but there are no data rows with null
return false;
}
if (!checkRanges) {
// some upper and lower bounds are missing, assume they match
continue;
}
ByteBuffer dataLower = dataLowers.get(id);
ByteBuffer dataUpper = dataUppers.get(id);
ByteBuffer deleteLower = deleteLowers.get(id);
ByteBuffer deleteUpper = deleteUppers.get(id);
if (dataLower == null || dataUpper == null || deleteLower == null || deleteUpper == null) {
// at least one bound is not known, assume the delete file may match
continue;
}
if (!rangesOverlap(field.type().asPrimitiveType(), dataLower, dataUpper, deleteLower, deleteUpper)) {
// no values overlap between the data file and the deletes
return false;
}
}
return true;
}
private static <T> boolean rangesOverlap(Type.PrimitiveType type,
ByteBuffer dataLowerBuf, ByteBuffer dataUpperBuf,
ByteBuffer deleteLowerBuf, ByteBuffer deleteUpperBuf) {
Comparator<T> comparator = Comparators.forType(type);
T dataLower = Conversions.fromByteBuffer(type, dataLowerBuf);
T dataUpper = Conversions.fromByteBuffer(type, dataUpperBuf);
T deleteLower = Conversions.fromByteBuffer(type, deleteLowerBuf);
T deleteUpper = Conversions.fromByteBuffer(type, deleteUpperBuf);
return comparator.compare(deleteLower, dataUpper) <= 0 && comparator.compare(dataLower, deleteUpper) <= 0;
}
private static boolean allNonNull(Map<Integer, Long> nullValueCounts, Types.NestedField field) {
if (field.isRequired()) {
return true;
}
if (nullValueCounts == null) {
return false;
}
Long nullValueCount = nullValueCounts.get(field.fieldId());
if (nullValueCount == null) {
return false;
}
return nullValueCount <= 0;
}
private static boolean allNull(Map<Integer, Long> nullValueCounts, Map<Integer, Long> valueCounts,
Types.NestedField field) {
if (field.isRequired()) {
return false;
}
if (nullValueCounts == null || valueCounts == null) {
return false;
}
Long nullValueCount = nullValueCounts.get(field.fieldId());
Long valueCount = valueCounts.get(field.fieldId());
if (nullValueCount == null || valueCount == null) {
return false;
}
return nullValueCount.equals(valueCount);
}
private static boolean containsNull(Map<Integer, Long> nullValueCounts, Types.NestedField field) {
if (field.isRequired()) {
return false;
}
if (nullValueCounts == null) {
return true;
}
Long nullValueCount = nullValueCounts.get(field.fieldId());
if (nullValueCount == null) {
return true;
}
return nullValueCount > 0;
}
private static Stream<DeleteFile> limitBySequenceNumber(long sequenceNumber, long[] seqs, DeleteFile[] files) {
if (files == null) {
return Stream.empty();
}
int pos = Arrays.binarySearch(seqs, sequenceNumber);
int start;
if (pos < 0) {
// the sequence number was not found, where it would be inserted is -(pos + 1)
start = -(pos + 1);
} else {
// the sequence number was found, but may not be the first
// find the first delete file with the given sequence number by decrementing the position
start = pos;
while (start > 0 && seqs[start - 1] >= sequenceNumber) {
start -= 1;
}
}
return Arrays.stream(files, start, files.length);
}
static Builder builderFor(FileIO io, Iterable<ManifestFile> deleteManifests) {
return new Builder(io, Sets.newHashSet(deleteManifests));
}
static class Builder {
private final FileIO io;
private final Set<ManifestFile> deleteManifests;
private Map<Integer, PartitionSpec> specsById = null;
private Expression dataFilter = Expressions.alwaysTrue();
private Expression partitionFilter = Expressions.alwaysTrue();
private boolean caseSensitive = true;
private ExecutorService executorService = null;
Builder(FileIO io, Set<ManifestFile> deleteManifests) {
this.io = io;
this.deleteManifests = Sets.newHashSet(deleteManifests);
}
Builder specsById(Map<Integer, PartitionSpec> newSpecsById) {
this.specsById = newSpecsById;
return this;
}
Builder filterData(Expression newDataFilter) {
this.dataFilter = Expressions.and(dataFilter, newDataFilter);
return this;
}
Builder filterPartitions(Expression newPartitionFilter) {
this.partitionFilter = Expressions.and(partitionFilter, newPartitionFilter);
return this;
}
Builder caseSensitive(boolean newCaseSensitive) {
this.caseSensitive = newCaseSensitive;
return this;
}
Builder planWith(ExecutorService newExecutorService) {
this.executorService = newExecutorService;
return this;
}
DeleteFileIndex build() {
// read all of the matching delete manifests in parallel and accumulate the matching files in a queue
Queue<ManifestEntry<DeleteFile>> deleteEntries = new ConcurrentLinkedQueue<>();
Tasks.foreach(deleteManifestReaders())
.stopOnFailure().throwFailureWhenFinished()
.executeWith(executorService)
.run(deleteFile -> {
try (CloseableIterable<ManifestEntry<DeleteFile>> reader = deleteFile) {
for (ManifestEntry<DeleteFile> entry : reader) {
// copy with stats for better filtering against data file stats
deleteEntries.add(entry.copy());
}
} catch (IOException e) {
throw new RuntimeIOException(e, "Failed to close");
}
});
// build a map from (specId, partition) to delete file entries
ListMultimap<Pair<Integer, StructLikeWrapper>, ManifestEntry<DeleteFile>> deleteFilesByPartition =
Multimaps.newListMultimap(Maps.newHashMap(), Lists::newArrayList);
for (ManifestEntry<DeleteFile> entry : deleteEntries) {
int specId = entry.file().specId();
StructLikeWrapper wrapper = StructLikeWrapper.forType(specsById.get(specId).partitionType())
.set(entry.file().partition());
deleteFilesByPartition.put(Pair.of(specId, wrapper), entry);
}
// sort the entries in each map value by sequence number and split into sequence numbers and delete files lists
Map<Pair<Integer, StructLikeWrapper>, Pair<long[], DeleteFile[]>> sortedDeletesByPartition = Maps.newHashMap();
// also, separate out equality deletes in an unpartitioned spec that should be applied globally
long[] globalApplySeqs = null;
DeleteFile[] globalDeletes = null;
for (Pair<Integer, StructLikeWrapper> partition : deleteFilesByPartition.keySet()) {
if (specsById.get(partition.first()).isUnpartitioned()) {
Preconditions.checkState(globalDeletes == null, "Detected multiple partition specs with no partitions");
List<Pair<Long, DeleteFile>> eqFilesSortedBySeq = deleteFilesByPartition.get(partition).stream()
.filter(entry -> entry.file().content() == FileContent.EQUALITY_DELETES)
.map(entry ->
// a delete file is indexed by the sequence number it should be applied to
Pair.of(entry.sequenceNumber() - 1, entry.file()))
.sorted(Comparator.comparingLong(Pair::first))
.collect(Collectors.toList());
globalApplySeqs = eqFilesSortedBySeq.stream().mapToLong(Pair::first).toArray();
globalDeletes = eqFilesSortedBySeq.stream().map(Pair::second).toArray(DeleteFile[]::new);
List<Pair<Long, DeleteFile>> posFilesSortedBySeq = deleteFilesByPartition.get(partition).stream()
.filter(entry -> entry.file().content() == FileContent.POSITION_DELETES)
.map(entry -> Pair.of(entry.sequenceNumber(), entry.file()))
.sorted(Comparator.comparingLong(Pair::first))
.collect(Collectors.toList());
long[] seqs = posFilesSortedBySeq.stream().mapToLong(Pair::first).toArray();
DeleteFile[] files = posFilesSortedBySeq.stream().map(Pair::second).toArray(DeleteFile[]::new);
sortedDeletesByPartition.put(partition, Pair.of(seqs, files));
} else {
List<Pair<Long, DeleteFile>> filesSortedBySeq = deleteFilesByPartition.get(partition).stream()
.map(entry -> {
// a delete file is indexed by the sequence number it should be applied to
long applySeq = entry.sequenceNumber() -
(entry.file().content() == FileContent.EQUALITY_DELETES ? 1 : 0);
return Pair.of(applySeq, entry.file());
})
.sorted(Comparator.comparingLong(Pair::first))
.collect(Collectors.toList());
long[] seqs = filesSortedBySeq.stream().mapToLong(Pair::first).toArray();
DeleteFile[] files = filesSortedBySeq.stream().map(Pair::second).toArray(DeleteFile[]::new);
sortedDeletesByPartition.put(partition, Pair.of(seqs, files));
}
}
return new DeleteFileIndex(specsById, globalApplySeqs, globalDeletes, sortedDeletesByPartition);
}
private Iterable<CloseableIterable<ManifestEntry<DeleteFile>>> deleteManifestReaders() {
LoadingCache<Integer, ManifestEvaluator> evalCache = specsById == null ? null :
Caffeine.newBuilder().build(specId -> {
PartitionSpec spec = specsById.get(specId);
return ManifestEvaluator.forPartitionFilter(
Expressions.and(partitionFilter, Projections.inclusive(spec, caseSensitive).project(dataFilter)),
spec, caseSensitive);
});
Iterable<ManifestFile> matchingManifests = evalCache == null ? deleteManifests :
Iterables.filter(deleteManifests, manifest ->
manifest.content() == ManifestContent.DELETES &&
(manifest.hasAddedFiles() || manifest.hasDeletedFiles()) &&
evalCache.get(manifest.partitionSpecId()).eval(manifest));
return Iterables.transform(
matchingManifests,
manifest ->
ManifestFiles.readDeleteManifest(manifest, io, specsById)
.filterRows(dataFilter)
.filterPartitions(partitionFilter)
.caseSensitive(caseSensitive)
.liveEntries()
);
}
}
}