blob: 7ce13577a542822e271d82c3b25eab7a01336247 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.iceberg.actions;
import java.io.IOException;
import java.util.Collection;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
import org.apache.iceberg.CombinedScanTask;
import org.apache.iceberg.DataFile;
import org.apache.iceberg.FileScanTask;
import org.apache.iceberg.PartitionSpec;
import org.apache.iceberg.RewriteFiles;
import org.apache.iceberg.Table;
import org.apache.iceberg.TableProperties;
import org.apache.iceberg.encryption.EncryptionManager;
import org.apache.iceberg.expressions.Expression;
import org.apache.iceberg.expressions.Expressions;
import org.apache.iceberg.io.CloseableIterable;
import org.apache.iceberg.io.CloseableIterator;
import org.apache.iceberg.io.FileIO;
import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
import org.apache.iceberg.relocated.com.google.common.collect.Iterables;
import org.apache.iceberg.relocated.com.google.common.collect.ListMultimap;
import org.apache.iceberg.relocated.com.google.common.collect.Lists;
import org.apache.iceberg.relocated.com.google.common.collect.Maps;
import org.apache.iceberg.relocated.com.google.common.collect.Multimaps;
import org.apache.iceberg.relocated.com.google.common.collect.Sets;
import org.apache.iceberg.relocated.com.google.common.collect.Streams;
import org.apache.iceberg.util.PropertyUtil;
import org.apache.iceberg.util.StructLikeWrapper;
import org.apache.iceberg.util.TableScanUtil;
import org.apache.iceberg.util.Tasks;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public abstract class BaseRewriteDataFilesAction<ThisT>
extends BaseSnapshotUpdateAction<ThisT, RewriteDataFilesActionResult> {
private static final Logger LOG = LoggerFactory.getLogger(BaseRewriteDataFilesAction.class);
private final Table table;
private final FileIO fileIO;
private final EncryptionManager encryptionManager;
private boolean caseSensitive;
private PartitionSpec spec;
private Expression filter;
private long targetSizeInBytes;
private int splitLookback;
private long splitOpenFileCost;
protected BaseRewriteDataFilesAction(Table table) {
this.table = table;
this.spec = table.spec();
this.filter = Expressions.alwaysTrue();
this.caseSensitive = false;
long splitSize = PropertyUtil.propertyAsLong(
table.properties(),
TableProperties.SPLIT_SIZE,
TableProperties.SPLIT_SIZE_DEFAULT);
long targetFileSize = PropertyUtil.propertyAsLong(
table.properties(),
TableProperties.WRITE_TARGET_FILE_SIZE_BYTES,
TableProperties.WRITE_TARGET_FILE_SIZE_BYTES_DEFAULT);
this.targetSizeInBytes = Math.min(splitSize, targetFileSize);
this.splitLookback = PropertyUtil.propertyAsInt(
table.properties(),
TableProperties.SPLIT_LOOKBACK,
TableProperties.SPLIT_LOOKBACK_DEFAULT);
this.splitOpenFileCost = PropertyUtil.propertyAsLong(
table.properties(),
TableProperties.SPLIT_OPEN_FILE_COST,
TableProperties.SPLIT_OPEN_FILE_COST_DEFAULT);
this.fileIO = fileIO();
this.encryptionManager = table.encryption();
}
@Override
protected Table table() {
return table;
}
protected EncryptionManager encryptionManager() {
return encryptionManager;
}
protected boolean caseSensitive() {
return caseSensitive;
}
/**
* Is it case sensitive
*
* @param newCaseSensitive caseSensitive
* @return this for method chaining
*/
public BaseRewriteDataFilesAction<ThisT> caseSensitive(boolean newCaseSensitive) {
this.caseSensitive = newCaseSensitive;
return this;
}
/**
* Pass a PartitionSpec id to specify which PartitionSpec should be used in DataFile rewrite
*
* @param specId PartitionSpec id to rewrite
* @return this for method chaining
*/
public BaseRewriteDataFilesAction<ThisT> outputSpecId(int specId) {
Preconditions.checkArgument(table.specs().containsKey(specId), "Invalid spec id %d", specId);
this.spec = table.specs().get(specId);
return this;
}
/**
* Specify the target rewrite data file size in bytes
*
* @param targetSize size in bytes of rewrite data file
* @return this for method chaining
*/
public BaseRewriteDataFilesAction<ThisT> targetSizeInBytes(long targetSize) {
Preconditions.checkArgument(targetSize > 0L, "Invalid target rewrite data file size in bytes %d",
targetSize);
this.targetSizeInBytes = targetSize;
return this;
}
/**
* Specify the number of "bins" considered when trying to pack the next file split into a task. Increasing this
* usually makes tasks a bit more even by considering more ways to pack file regions into a single task with extra
* planning cost.
* <p>
* This configuration can reorder the incoming file regions, to preserve order for lower/upper bounds in file
* metadata, user can use a lookback of 1.
*
* @param lookback number of "bins" considered when trying to pack the next file split into a task.
* @return this for method chaining
*/
public BaseRewriteDataFilesAction<ThisT> splitLookback(int lookback) {
Preconditions.checkArgument(lookback > 0L, "Invalid split lookback %d", lookback);
this.splitLookback = lookback;
return this;
}
/**
* Specify the minimum file size to count to pack into one "bin". If the read file size is smaller than this specified
* threshold, Iceberg will use this value to do count.
* <p>
* this configuration controls the number of files to compact for each task, small value would lead to a high
* compaction, the default value is 4MB.
*
* @param openFileCost minimum file size to count to pack into one "bin".
* @return this for method chaining
*/
public BaseRewriteDataFilesAction<ThisT> splitOpenFileCost(long openFileCost) {
Preconditions.checkArgument(openFileCost > 0L, "Invalid split openFileCost %d", openFileCost);
this.splitOpenFileCost = openFileCost;
return this;
}
/**
* Pass a row Expression to filter DataFiles to be rewritten. Note that all files that may contain data matching the
* filter may be rewritten.
*
* @param expr Expression to filter out DataFiles
* @return this for method chaining
*/
public BaseRewriteDataFilesAction<ThisT> filter(Expression expr) {
this.filter = Expressions.and(filter, expr);
return this;
}
@Override
public RewriteDataFilesActionResult execute() {
CloseableIterable<FileScanTask> fileScanTasks = null;
try {
fileScanTasks = table.newScan()
.caseSensitive(caseSensitive)
.ignoreResiduals()
.filter(filter)
.planFiles();
} finally {
try {
if (fileScanTasks != null) {
fileScanTasks.close();
}
} catch (IOException ioe) {
LOG.warn("Failed to close task iterable", ioe);
}
}
Map<StructLikeWrapper, Collection<FileScanTask>> groupedTasks = groupTasksByPartition(fileScanTasks.iterator());
Map<StructLikeWrapper, Collection<FileScanTask>> filteredGroupedTasks = groupedTasks.entrySet().stream()
.filter(kv -> kv.getValue().size() > 1)
.collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue));
// Nothing to rewrite if there's only one DataFile in each partition.
if (filteredGroupedTasks.isEmpty()) {
return RewriteDataFilesActionResult.empty();
}
// Split and combine tasks under each partition
List<CombinedScanTask> combinedScanTasks = filteredGroupedTasks.values().stream()
.map(scanTasks -> {
CloseableIterable<FileScanTask> splitTasks = TableScanUtil.splitFiles(
CloseableIterable.withNoopClose(scanTasks), targetSizeInBytes);
return TableScanUtil.planTasks(splitTasks, targetSizeInBytes, splitLookback, splitOpenFileCost);
})
.flatMap(Streams::stream)
.filter(task -> task.files().size() > 1)
.collect(Collectors.toList());
if (combinedScanTasks.isEmpty()) {
return RewriteDataFilesActionResult.empty();
}
List<DataFile> addedDataFiles = rewriteDataForTasks(combinedScanTasks);
List<DataFile> currentDataFiles = combinedScanTasks.stream()
.flatMap(tasks -> tasks.files().stream().map(FileScanTask::file))
.collect(Collectors.toList());
replaceDataFiles(currentDataFiles, addedDataFiles);
return new RewriteDataFilesActionResult(currentDataFiles, addedDataFiles);
}
private Map<StructLikeWrapper, Collection<FileScanTask>> groupTasksByPartition(
CloseableIterator<FileScanTask> tasksIter) {
ListMultimap<StructLikeWrapper, FileScanTask> tasksGroupedByPartition = Multimaps.newListMultimap(
Maps.newHashMap(), Lists::newArrayList);
try (CloseableIterator<FileScanTask> iterator = tasksIter) {
iterator.forEachRemaining(task -> {
StructLikeWrapper structLike = StructLikeWrapper.forType(spec.partitionType()).set(task.file().partition());
tasksGroupedByPartition.put(structLike, task);
});
} catch (IOException e) {
LOG.warn("Failed to close task iterator", e);
}
return tasksGroupedByPartition.asMap();
}
private void replaceDataFiles(Iterable<DataFile> deletedDataFiles, Iterable<DataFile> addedDataFiles) {
try {
RewriteFiles rewriteFiles = table.newRewrite();
rewriteFiles.rewriteFiles(Sets.newHashSet(deletedDataFiles), Sets.newHashSet(addedDataFiles));
commit(rewriteFiles);
} catch (Exception e) {
Tasks.foreach(Iterables.transform(addedDataFiles, f -> f.path().toString()))
.noRetry()
.suppressFailureWhenFinished()
.onFailure((location, exc) -> LOG.warn("Failed to delete: {}", location, exc))
.run(fileIO::deleteFile);
throw e;
}
}
protected abstract FileIO fileIO();
protected abstract List<DataFile> rewriteDataForTasks(List<CombinedScanTask> combinedScanTask);
}