blob: 904f070d78f6c77cfe2acb40812ecd95cce1465f [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.iceberg;
import java.io.IOException;
import java.lang.reflect.Array;
import java.util.Comparator;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;
import org.apache.iceberg.ManifestEntry.Status;
import org.apache.iceberg.exceptions.RuntimeIOException;
import org.apache.iceberg.relocated.com.google.common.collect.Iterables;
import org.apache.iceberg.relocated.com.google.common.collect.ListMultimap;
import org.apache.iceberg.relocated.com.google.common.collect.Lists;
import org.apache.iceberg.relocated.com.google.common.collect.Maps;
import org.apache.iceberg.relocated.com.google.common.collect.Multimaps;
import org.apache.iceberg.util.BinPacking.ListPacker;
import org.apache.iceberg.util.Exceptions;
import org.apache.iceberg.util.Tasks;
import org.apache.iceberg.util.ThreadPools;
abstract class ManifestMergeManager<F extends ContentFile<F>> {
private final long targetSizeBytes;
private final int minCountToMerge;
private final boolean mergeEnabled;
// cache merge results to reuse when retrying
private final Map<List<ManifestFile>, ManifestFile> mergedManifests = Maps.newConcurrentMap();
ManifestMergeManager(long targetSizeBytes, int minCountToMerge, boolean mergeEnabled) {
this.targetSizeBytes = targetSizeBytes;
this.minCountToMerge = minCountToMerge;
this.mergeEnabled = mergeEnabled;
}
protected abstract long snapshotId();
protected abstract PartitionSpec spec(int specId);
protected abstract void deleteFile(String location);
protected abstract ManifestWriter<F> newManifestWriter(PartitionSpec spec);
protected abstract ManifestReader<F> newManifestReader(ManifestFile manifest);
Iterable<ManifestFile> mergeManifests(Iterable<ManifestFile> manifests) {
Iterator<ManifestFile> manifestIter = manifests.iterator();
if (!mergeEnabled || !manifestIter.hasNext()) {
return manifests;
}
ManifestFile first = manifestIter.next();
List<ManifestFile> merged = Lists.newArrayList();
ListMultimap<Integer, ManifestFile> groups = groupBySpec(first, manifestIter);
for (Integer specId : groups.keySet()) {
Iterables.addAll(merged, mergeGroup(first, specId, groups.get(specId)));
}
return merged;
}
void cleanUncommitted(Set<ManifestFile> committed) {
// iterate over a copy of entries to avoid concurrent modification
List<Map.Entry<List<ManifestFile>, ManifestFile>> entries =
Lists.newArrayList(mergedManifests.entrySet());
for (Map.Entry<List<ManifestFile>, ManifestFile> entry : entries) {
// delete any new merged manifests that aren't in the committed list
ManifestFile merged = entry.getValue();
if (!committed.contains(merged)) {
deleteFile(merged.path());
// remove the deleted file from the cache
mergedManifests.remove(entry.getKey());
}
}
}
private ListMultimap<Integer, ManifestFile> groupBySpec(ManifestFile first, Iterator<ManifestFile> remaining) {
ListMultimap<Integer, ManifestFile> groups = Multimaps.newListMultimap(
Maps.newTreeMap(Comparator.<Integer>reverseOrder()),
Lists::newArrayList);
groups.put(first.partitionSpecId(), first);
remaining.forEachRemaining(manifest -> groups.put(manifest.partitionSpecId(), manifest));
return groups;
}
@SuppressWarnings("unchecked")
private Iterable<ManifestFile> mergeGroup(ManifestFile first, int specId, List<ManifestFile> group) {
// use a lookback of 1 to avoid reordering the manifests. using 1 also means this should pack
// from the end so that the manifest that gets under-filled is the first one, which will be
// merged the next time.
ListPacker<ManifestFile> packer = new ListPacker<>(targetSizeBytes, 1, false);
List<List<ManifestFile>> bins = packer.packEnd(group, ManifestFile::length);
// process bins in parallel, but put results in the order of the bins into an array to preserve
// the order of manifests and contents. preserving the order helps avoid random deletes when
// data files are eventually aged off.
List<ManifestFile>[] binResults = (List<ManifestFile>[])
Array.newInstance(List.class, bins.size());
Tasks.range(bins.size())
.stopOnFailure().throwFailureWhenFinished()
.executeWith(ThreadPools.getWorkerPool())
.run(index -> {
List<ManifestFile> bin = bins.get(index);
List<ManifestFile> outputManifests = Lists.newArrayList();
binResults[index] = outputManifests;
if (bin.size() == 1) {
// no need to rewrite
outputManifests.add(bin.get(0));
return;
}
// if the bin has the first manifest (the new data files or an appended manifest file) then only merge it
// if the number of manifests is above the minimum count. this is applied only to bins with an in-memory
// manifest so that large manifests don't prevent merging older groups.
if (bin.contains(first) && bin.size() < minCountToMerge) {
// not enough to merge, add all manifest files to the output list
outputManifests.addAll(bin);
} else {
// merge the group
outputManifests.add(createManifest(specId, bin));
}
});
return Iterables.concat(binResults);
}
private ManifestFile createManifest(int specId, List<ManifestFile> bin) {
// if this merge was already rewritten, use the existing file.
// if the new files are in this merge, then the ManifestFile for the new files has changed and
// will be a cache miss.
if (mergedManifests.containsKey(bin)) {
return mergedManifests.get(bin);
}
ManifestWriter<F> writer = newManifestWriter(spec(specId));
boolean threw = true;
try {
for (ManifestFile manifest : bin) {
try (ManifestReader<F> reader = newManifestReader(manifest)) {
for (ManifestEntry<F> entry : reader.entries()) {
if (entry.status() == Status.DELETED) {
// suppress deletes from previous snapshots. only files deleted by this snapshot
// should be added to the new manifest
if (entry.snapshotId() == snapshotId()) {
writer.delete(entry);
}
} else if (entry.status() == Status.ADDED && entry.snapshotId() == snapshotId()) {
// adds from this snapshot are still adds, otherwise they should be existing
writer.add(entry);
} else {
// add all files from the old manifest as existing files
writer.existing(entry);
}
}
} catch (IOException e) {
throw new RuntimeIOException(e, "Failed to close manifest reader");
}
}
threw = false;
} finally {
Exceptions.close(writer, threw);
}
ManifestFile manifest = writer.toManifestFile();
// update the cache
mergedManifests.put(bin, manifest);
return manifest;
}
}