blob: 1b28f364cb35770ca52e9165890e1a4be8b7a926 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.iceberg;
import java.io.IOException;
import java.util.Arrays;
import java.util.Collection;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.Function;
import java.util.function.Predicate;
import java.util.stream.Collectors;
import org.apache.iceberg.exceptions.RuntimeIOException;
import org.apache.iceberg.exceptions.ValidationException;
import org.apache.iceberg.io.InputFile;
import org.apache.iceberg.io.OutputFile;
import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableSet;
import org.apache.iceberg.relocated.com.google.common.collect.Iterables;
import org.apache.iceberg.relocated.com.google.common.collect.Lists;
import org.apache.iceberg.relocated.com.google.common.collect.Maps;
import org.apache.iceberg.relocated.com.google.common.collect.Sets;
import org.apache.iceberg.util.Pair;
import org.apache.iceberg.util.Tasks;
import org.apache.iceberg.util.ThreadPools;
import static org.apache.iceberg.TableProperties.MANIFEST_TARGET_SIZE_BYTES;
import static org.apache.iceberg.TableProperties.MANIFEST_TARGET_SIZE_BYTES_DEFAULT;
import static org.apache.iceberg.TableProperties.SNAPSHOT_ID_INHERITANCE_ENABLED;
import static org.apache.iceberg.TableProperties.SNAPSHOT_ID_INHERITANCE_ENABLED_DEFAULT;
public class BaseRewriteManifests extends SnapshotProducer<RewriteManifests> implements RewriteManifests {
private static final String KEPT_MANIFESTS_COUNT = "manifests-kept";
private static final String CREATED_MANIFESTS_COUNT = "manifests-created";
private static final String REPLACED_MANIFESTS_COUNT = "manifests-replaced";
private static final String PROCESSED_ENTRY_COUNT = "entries-processed";
private final TableOperations ops;
private final Map<Integer, PartitionSpec> specsById;
private final long manifestTargetSizeBytes;
private final boolean snapshotIdInheritanceEnabled;
private final Set<ManifestFile> deletedManifests = Sets.newHashSet();
private final List<ManifestFile> addedManifests = Lists.newArrayList();
private final List<ManifestFile> rewrittenAddedManifests = Lists.newArrayList();
private final Collection<ManifestFile> keptManifests = new ConcurrentLinkedQueue<>();
private final Collection<ManifestFile> newManifests = new ConcurrentLinkedQueue<>();
private final Set<ManifestFile> rewrittenManifests = Sets.newConcurrentHashSet();
private final Map<Object, WriterWrapper> writers = Maps.newConcurrentMap();
private final AtomicLong entryCount = new AtomicLong(0);
private Function<DataFile, Object> clusterByFunc;
private Predicate<ManifestFile> predicate;
private final SnapshotSummary.Builder summaryBuilder = SnapshotSummary.builder();
BaseRewriteManifests(TableOperations ops) {
super(ops);
this.ops = ops;
this.specsById = ops.current().specsById();
this.manifestTargetSizeBytes =
ops.current().propertyAsLong(MANIFEST_TARGET_SIZE_BYTES, MANIFEST_TARGET_SIZE_BYTES_DEFAULT);
this.snapshotIdInheritanceEnabled = ops.current()
.propertyAsBoolean(SNAPSHOT_ID_INHERITANCE_ENABLED, SNAPSHOT_ID_INHERITANCE_ENABLED_DEFAULT);
}
@Override
protected RewriteManifests self() {
return this;
}
@Override
protected String operation() {
return DataOperations.REPLACE;
}
@Override
public RewriteManifests set(String property, String value) {
summaryBuilder.set(property, value);
return this;
}
@Override
protected Map<String, String> summary() {
int createdManifestsCount = newManifests.size() + addedManifests.size() + rewrittenAddedManifests.size();
summaryBuilder.set(CREATED_MANIFESTS_COUNT, String.valueOf(createdManifestsCount));
summaryBuilder.set(KEPT_MANIFESTS_COUNT, String.valueOf(keptManifests.size()));
summaryBuilder.set(REPLACED_MANIFESTS_COUNT, String.valueOf(rewrittenManifests.size() + deletedManifests.size()));
summaryBuilder.set(PROCESSED_ENTRY_COUNT, String.valueOf(entryCount.get()));
summaryBuilder.setPartitionSummaryLimit(0); // do not include partition summaries because data did not change
return summaryBuilder.build();
}
@Override
public RewriteManifests clusterBy(Function<DataFile, Object> func) {
this.clusterByFunc = func;
return this;
}
@Override
public RewriteManifests rewriteIf(Predicate<ManifestFile> pred) {
this.predicate = pred;
return this;
}
@Override
public RewriteManifests deleteManifest(ManifestFile manifest) {
deletedManifests.add(manifest);
return this;
}
@Override
public RewriteManifests addManifest(ManifestFile manifest) {
Preconditions.checkArgument(!manifest.hasAddedFiles(), "Cannot add manifest with added files");
Preconditions.checkArgument(!manifest.hasDeletedFiles(), "Cannot add manifest with deleted files");
Preconditions.checkArgument(
manifest.snapshotId() == null || manifest.snapshotId() == -1,
"Snapshot id must be assigned during commit");
Preconditions.checkArgument(manifest.sequenceNumber() == -1,
"Sequence must be assigned during commit");
if (snapshotIdInheritanceEnabled && manifest.snapshotId() == null) {
addedManifests.add(manifest);
} else {
// the manifest must be rewritten with this update's snapshot ID
ManifestFile copiedManifest = copyManifest(manifest);
rewrittenAddedManifests.add(copiedManifest);
}
return this;
}
private ManifestFile copyManifest(ManifestFile manifest) {
TableMetadata current = ops.current();
InputFile toCopy = ops.io().newInputFile(manifest.path());
OutputFile newFile = newManifestOutput();
return ManifestFiles.copyRewriteManifest(
current.formatVersion(), toCopy, specsById, newFile, snapshotId(), summaryBuilder);
}
@Override
public List<ManifestFile> apply(TableMetadata base) {
List<ManifestFile> currentManifests = base.currentSnapshot().dataManifests();
Set<ManifestFile> currentManifestSet = ImmutableSet.copyOf(currentManifests);
validateDeletedManifests(currentManifestSet);
if (requiresRewrite(currentManifestSet)) {
performRewrite(currentManifests);
} else {
keepActiveManifests(currentManifests);
}
validateFilesCounts();
Iterable<ManifestFile> newManifestsWithMetadata = Iterables.transform(
Iterables.concat(newManifests, addedManifests, rewrittenAddedManifests),
manifest -> GenericManifestFile.copyOf(manifest).withSnapshotId(snapshotId()).build());
// put new manifests at the beginning
List<ManifestFile> apply = Lists.newArrayList();
Iterables.addAll(apply, newManifestsWithMetadata);
apply.addAll(keptManifests);
apply.addAll(base.currentSnapshot().deleteManifests());
return apply;
}
private boolean requiresRewrite(Set<ManifestFile> currentManifests) {
if (clusterByFunc == null) {
// manifests are deleted and added directly so don't perform a rewrite
return false;
}
if (rewrittenManifests.size() == 0) {
// nothing yet processed so perform a full rewrite
return true;
}
// if any processed manifest is not in the current manifest list, perform a full rewrite
return rewrittenManifests.stream().anyMatch(manifest -> !currentManifests.contains(manifest));
}
private void keepActiveManifests(List<ManifestFile> currentManifests) {
// keep any existing manifests as-is that were not processed
keptManifests.clear();
currentManifests.stream()
.filter(manifest -> !rewrittenManifests.contains(manifest) && !deletedManifests.contains(manifest))
.forEach(keptManifests::add);
}
private void reset() {
cleanUncommitted(newManifests, ImmutableSet.of());
entryCount.set(0);
keptManifests.clear();
rewrittenManifests.clear();
newManifests.clear();
writers.clear();
}
private void performRewrite(List<ManifestFile> currentManifests) {
reset();
List<ManifestFile> remainingManifests = currentManifests.stream()
.filter(manifest -> !deletedManifests.contains(manifest))
.collect(Collectors.toList());
try {
Tasks.foreach(remainingManifests)
.executeWith(ThreadPools.getWorkerPool())
.run(manifest -> {
if (predicate != null && !predicate.test(manifest)) {
keptManifests.add(manifest);
} else {
rewrittenManifests.add(manifest);
try (ManifestReader<DataFile> reader = ManifestFiles.read(manifest, ops.io(), ops.current().specsById())
.select(Arrays.asList("*"))) {
reader.liveEntries().forEach(
entry -> appendEntry(entry, clusterByFunc.apply(entry.file()), manifest.partitionSpecId())
);
} catch (IOException x) {
throw new RuntimeIOException(x);
}
}
});
} finally {
Tasks.foreach(writers.values()).executeWith(ThreadPools.getWorkerPool()).run(WriterWrapper::close);
}
}
private void validateDeletedManifests(Set<ManifestFile> currentManifests) {
// directly deleted manifests must be still present in the current snapshot
deletedManifests.stream()
.filter(manifest -> !currentManifests.contains(manifest))
.findAny()
.ifPresent(manifest -> {
throw new ValidationException("Manifest is missing: %s", manifest.path());
});
}
private void validateFilesCounts() {
Iterable<ManifestFile> createdManifests = Iterables.concat(newManifests, addedManifests, rewrittenAddedManifests);
int createdManifestsFilesCount = activeFilesCount(createdManifests);
Iterable<ManifestFile> replacedManifests = Iterables.concat(rewrittenManifests, deletedManifests);
int replacedManifestsFilesCount = activeFilesCount(replacedManifests);
if (createdManifestsFilesCount != replacedManifestsFilesCount) {
throw new ValidationException(
"Replaced and created manifests must have the same number of active files: %d (new), %d (old)",
createdManifestsFilesCount, replacedManifestsFilesCount);
}
}
private int activeFilesCount(Iterable<ManifestFile> manifests) {
int activeFilesCount = 0;
for (ManifestFile manifest : manifests) {
Preconditions.checkNotNull(manifest.addedFilesCount(), "Missing file counts in %s", manifest.path());
Preconditions.checkNotNull(manifest.existingFilesCount(), "Missing file counts in %s", manifest.path());
activeFilesCount += manifest.addedFilesCount();
activeFilesCount += manifest.existingFilesCount();
}
return activeFilesCount;
}
private void appendEntry(ManifestEntry<DataFile> entry, Object key, int partitionSpecId) {
Preconditions.checkNotNull(entry, "Manifest entry cannot be null");
Preconditions.checkNotNull(key, "Key cannot be null");
WriterWrapper writer = getWriter(key, partitionSpecId);
writer.addEntry(entry);
entryCount.incrementAndGet();
}
private WriterWrapper getWriter(Object key, int partitionSpecId) {
return writers.computeIfAbsent(
Pair.of(key, partitionSpecId), k -> new WriterWrapper(specsById.get(partitionSpecId)));
}
@Override
protected void cleanUncommitted(Set<ManifestFile> committed) {
cleanUncommitted(newManifests, committed);
// clean up only rewrittenAddedManifests as they are always owned by the table
// don't clean up addedManifests as they are added to the manifest list and are not compacted
cleanUncommitted(rewrittenAddedManifests, committed);
}
private void cleanUncommitted(Iterable<ManifestFile> manifests, Set<ManifestFile> committedManifests) {
for (ManifestFile manifest : manifests) {
if (!committedManifests.contains(manifest)) {
deleteFile(manifest.path());
}
}
}
long getManifestTargetSizeBytes() {
return manifestTargetSizeBytes;
}
class WriterWrapper {
private final PartitionSpec spec;
private ManifestWriter<DataFile> writer;
WriterWrapper(PartitionSpec spec) {
this.spec = spec;
}
synchronized void addEntry(ManifestEntry<DataFile> entry) {
if (writer == null) {
writer = newManifestWriter(spec);
} else if (writer.length() >= getManifestTargetSizeBytes()) {
close();
writer = newManifestWriter(spec);
}
writer.existing(entry);
}
synchronized void close() {
if (writer != null) {
try {
writer.close();
newManifests.add(writer.toManifestFile());
} catch (IOException x) {
throw new RuntimeIOException(x);
}
}
}
}
}