blob: 9f9fd1f7ce601068e80147cfc6e33b39700c73d3 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hudi.common.table.view;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.Path;
import org.apache.hudi.avro.model.HoodieCleanMetadata;
import org.apache.hudi.avro.model.HoodieCompactionPlan;
import org.apache.hudi.avro.model.HoodieRestoreMetadata;
import org.apache.hudi.avro.model.HoodieRollbackMetadata;
import org.apache.hudi.common.fs.FSUtils;
import org.apache.hudi.common.model.CompactionOperation;
import org.apache.hudi.common.model.FileSlice;
import org.apache.hudi.common.model.HoodieBaseFile;
import org.apache.hudi.common.model.HoodieCommitMetadata;
import org.apache.hudi.common.model.HoodieFileGroup;
import org.apache.hudi.common.model.HoodieFileGroupId;
import org.apache.hudi.common.model.HoodieLogFile;
import org.apache.hudi.common.model.HoodieReplaceCommitMetadata;
import org.apache.hudi.common.model.HoodieWriteStat;
import org.apache.hudi.common.table.timeline.HoodieInstant;
import org.apache.hudi.common.table.timeline.HoodieTimeline;
import org.apache.hudi.common.table.timeline.TimelineDiffHelper;
import org.apache.hudi.common.table.timeline.TimelineDiffHelper.TimelineDiffResult;
import org.apache.hudi.common.table.timeline.TimelineMetadataUtils;
import org.apache.hudi.common.util.CleanerUtils;
import org.apache.hudi.common.util.CompactionUtils;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.collection.Pair;
import org.apache.hudi.exception.HoodieException;
import org.apache.log4j.LogManager;
import org.apache.log4j.Logger;
import java.io.IOException;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.stream.Collectors;
/**
* Adds the capability to incrementally sync the changes to file-system view as and when new instants gets completed.
*/
public abstract class IncrementalTimelineSyncFileSystemView extends AbstractTableFileSystemView {
private static final Logger LOG = LogManager.getLogger(IncrementalTimelineSyncFileSystemView.class);
// Allows incremental Timeline syncing
private final boolean incrementalTimelineSyncEnabled;
// This is the visible active timeline used only for incremental view syncing
private HoodieTimeline visibleActiveTimeline;
protected IncrementalTimelineSyncFileSystemView(boolean enableIncrementalTimelineSync) {
this.incrementalTimelineSyncEnabled = enableIncrementalTimelineSync;
}
@Override
protected void refreshTimeline(HoodieTimeline visibleActiveTimeline) {
this.visibleActiveTimeline = visibleActiveTimeline;
super.refreshTimeline(visibleActiveTimeline);
}
@Override
protected void runSync(HoodieTimeline oldTimeline, HoodieTimeline newTimeline) {
try {
if (incrementalTimelineSyncEnabled) {
TimelineDiffResult diffResult = TimelineDiffHelper.getNewInstantsForIncrementalSync(oldTimeline, newTimeline);
if (diffResult.canSyncIncrementally()) {
LOG.info("Doing incremental sync");
runIncrementalSync(newTimeline, diffResult);
LOG.info("Finished incremental sync");
// Reset timeline to latest
refreshTimeline(newTimeline);
return;
}
}
} catch (Exception ioe) {
LOG.error("Got exception trying to perform incremental sync. Reverting to complete sync", ioe);
}
super.runSync(oldTimeline, newTimeline);
}
/**
* Run incremental sync based on the diff result produced.
*
* @param timeline New Timeline
* @param diffResult Timeline Diff Result
*/
private void runIncrementalSync(HoodieTimeline timeline, TimelineDiffResult diffResult) {
LOG.info("Timeline Diff Result is :" + diffResult);
// First remove pending compaction instants which were completed
diffResult.getFinishedCompactionInstants().stream().forEach(instant -> {
try {
removePendingCompactionInstant(timeline, instant);
} catch (IOException e) {
throw new HoodieException(e);
}
});
// Add new completed instants found in the latest timeline
diffResult.getNewlySeenInstants().stream()
.filter(instant -> instant.isCompleted() || instant.getAction().equals(HoodieTimeline.COMPACTION_ACTION))
.forEach(instant -> {
try {
if (instant.getAction().equals(HoodieTimeline.COMMIT_ACTION)
|| instant.getAction().equals(HoodieTimeline.DELTA_COMMIT_ACTION)) {
addCommitInstant(timeline, instant);
} else if (instant.getAction().equals(HoodieTimeline.RESTORE_ACTION)) {
addRestoreInstant(timeline, instant);
} else if (instant.getAction().equals(HoodieTimeline.CLEAN_ACTION)) {
addCleanInstant(timeline, instant);
} else if (instant.getAction().equals(HoodieTimeline.COMPACTION_ACTION)) {
addPendingCompactionInstant(timeline, instant);
} else if (instant.getAction().equals(HoodieTimeline.ROLLBACK_ACTION)) {
addRollbackInstant(timeline, instant);
} else if (instant.getAction().equals(HoodieTimeline.REPLACE_COMMIT_ACTION)) {
addReplaceInstant(timeline, instant);
}
} catch (IOException ioe) {
throw new HoodieException(ioe);
}
});
}
/**
* Remove Pending compaction instant.
*
* @param timeline New Hoodie Timeline
* @param instant Compaction Instant to be removed
*/
private void removePendingCompactionInstant(HoodieTimeline timeline, HoodieInstant instant) throws IOException {
LOG.info("Removing completed compaction instant (" + instant + ")");
HoodieCompactionPlan plan = CompactionUtils.getCompactionPlan(metaClient, instant.getTimestamp());
removePendingCompactionOperations(CompactionUtils.getPendingCompactionOperations(instant, plan)
.map(instantPair -> Pair.of(instantPair.getValue().getKey(),
CompactionOperation.convertFromAvroRecordInstance(instantPair.getValue().getValue()))));
}
/**
* Add newly found compaction instant.
*
* @param timeline Hoodie Timeline
* @param instant Compaction Instant
*/
private void addPendingCompactionInstant(HoodieTimeline timeline, HoodieInstant instant) throws IOException {
LOG.info("Syncing pending compaction instant (" + instant + ")");
HoodieCompactionPlan compactionPlan = CompactionUtils.getCompactionPlan(metaClient, instant.getTimestamp());
List<Pair<String, CompactionOperation>> pendingOps =
CompactionUtils.getPendingCompactionOperations(instant, compactionPlan)
.map(p -> Pair.of(p.getValue().getKey(),
CompactionOperation.convertFromAvroRecordInstance(p.getValue().getValue())))
.collect(Collectors.toList());
// First, update Pending compaction instants
addPendingCompactionOperations(pendingOps.stream());
Map<String, List<Pair<String, HoodieFileGroup>>> partitionToFileGroups = pendingOps.stream().map(opPair -> {
String compactionInstantTime = opPair.getKey();
HoodieFileGroup fileGroup = new HoodieFileGroup(opPair.getValue().getFileGroupId(), timeline);
fileGroup.addNewFileSliceAtInstant(compactionInstantTime);
return Pair.of(compactionInstantTime, fileGroup);
}).collect(Collectors.groupingBy(x -> x.getValue().getPartitionPath()));
partitionToFileGroups.entrySet().forEach(entry -> {
if (isPartitionAvailableInStore(entry.getKey())) {
applyDeltaFileSlicesToPartitionView(entry.getKey(),
entry.getValue().stream().map(Pair::getValue).collect(Collectors.toList()), DeltaApplyMode.ADD);
}
});
}
/**
* Add newly found commit/delta-commit instant.
*
* @param timeline Hoodie Timeline
* @param instant Instant
*/
private void addCommitInstant(HoodieTimeline timeline, HoodieInstant instant) throws IOException {
LOG.info("Syncing committed instant (" + instant + ")");
HoodieCommitMetadata commitMetadata =
HoodieCommitMetadata.fromBytes(timeline.getInstantDetails(instant).get(), HoodieCommitMetadata.class);
updatePartitionWriteFileGroups(commitMetadata.getPartitionToWriteStats(), timeline, instant);
LOG.info("Done Syncing committed instant (" + instant + ")");
}
private void updatePartitionWriteFileGroups(Map<String, List<HoodieWriteStat>> partitionToWriteStats,
HoodieTimeline timeline,
HoodieInstant instant) {
partitionToWriteStats.entrySet().stream().forEach(entry -> {
String partition = entry.getKey();
if (isPartitionAvailableInStore(partition)) {
LOG.info("Syncing partition (" + partition + ") of instant (" + instant + ")");
FileStatus[] statuses = entry.getValue().stream().map(p -> {
FileStatus status = new FileStatus(p.getFileSizeInBytes(), false, 0, 0, 0, 0, null, null, null,
new Path(String.format("%s/%s", metaClient.getBasePath(), p.getPath())));
return status;
}).toArray(FileStatus[]::new);
List<HoodieFileGroup> fileGroups =
buildFileGroups(statuses, timeline.filterCompletedAndCompactionInstants(), false);
applyDeltaFileSlicesToPartitionView(partition, fileGroups, DeltaApplyMode.ADD);
} else {
LOG.warn("Skipping partition (" + partition + ") when syncing instant (" + instant + ") as it is not loaded");
}
});
LOG.info("Done Syncing committed instant (" + instant + ")");
}
/**
* Add newly found restore instant.
*
* @param timeline Hoodie Timeline
* @param instant Restore Instant
*/
private void addRestoreInstant(HoodieTimeline timeline, HoodieInstant instant) throws IOException {
LOG.info("Syncing restore instant (" + instant + ")");
HoodieRestoreMetadata metadata =
TimelineMetadataUtils.deserializeAvroMetadata(timeline.getInstantDetails(instant).get(), HoodieRestoreMetadata.class);
Map<String, List<Pair<String, String>>> partitionFiles =
metadata.getHoodieRestoreMetadata().entrySet().stream().flatMap(entry -> {
return entry.getValue().stream().flatMap(e -> e.getPartitionMetadata().entrySet().stream().flatMap(e2 -> {
return e2.getValue().getSuccessDeleteFiles().stream().map(x -> Pair.of(e2.getKey(), x));
}));
}).collect(Collectors.groupingBy(Pair::getKey));
partitionFiles.entrySet().stream().forEach(e -> {
removeFileSlicesForPartition(timeline, instant, e.getKey(),
e.getValue().stream().map(x -> x.getValue()).collect(Collectors.toList()));
});
if (metadata.getRestoreInstantInfo() != null) {
Set<String> rolledbackInstants = metadata.getRestoreInstantInfo().stream()
.filter(instantInfo -> HoodieTimeline.REPLACE_COMMIT_ACTION.equals(instantInfo.getAction()))
.map(instantInfo -> instantInfo.getCommitTime()).collect(Collectors.toSet());
removeReplacedFileIdsAtInstants(rolledbackInstants);
}
LOG.info("Done Syncing restore instant (" + instant + ")");
}
/**
* Add newly found rollback instant.
*
* @param timeline Hoodie Timeline
* @param instant Rollback Instant
*/
private void addRollbackInstant(HoodieTimeline timeline, HoodieInstant instant) throws IOException {
LOG.info("Syncing rollback instant (" + instant + ")");
HoodieRollbackMetadata metadata =
TimelineMetadataUtils.deserializeAvroMetadata(timeline.getInstantDetails(instant).get(), HoodieRollbackMetadata.class);
metadata.getPartitionMetadata().entrySet().stream().forEach(e -> {
removeFileSlicesForPartition(timeline, instant, e.getKey(), e.getValue().getSuccessDeleteFiles());
});
LOG.info("Done Syncing rollback instant (" + instant + ")");
}
/**
* Add newly found REPLACE instant.
*
* @param timeline Hoodie Timeline
* @param instant REPLACE Instant
*/
private void addReplaceInstant(HoodieTimeline timeline, HoodieInstant instant) throws IOException {
LOG.info("Syncing replace instant (" + instant + ")");
HoodieReplaceCommitMetadata replaceMetadata =
HoodieReplaceCommitMetadata.fromBytes(timeline.getInstantDetails(instant).get(), HoodieReplaceCommitMetadata.class);
updatePartitionWriteFileGroups(replaceMetadata.getPartitionToWriteStats(), timeline, instant);
replaceMetadata.getPartitionToReplaceFileIds().entrySet().stream().forEach(entry -> {
String partition = entry.getKey();
Map<HoodieFileGroupId, HoodieInstant> replacedFileIds = entry.getValue().stream()
.collect(Collectors.toMap(replaceStat -> new HoodieFileGroupId(partition, replaceStat), replaceStat -> instant));
LOG.info("For partition (" + partition + ") of instant (" + instant + "), excluding " + replacedFileIds.size() + " file groups");
addReplacedFileGroups(replacedFileIds);
});
LOG.info("Done Syncing REPLACE instant (" + instant + ")");
}
/**
* Add newly found clean instant. Note that cleaner metadata (.clean.completed)
* contains only relative paths unlike clean plans (.clean.requested) which contains absolute paths.
*
* @param timeline Timeline
* @param instant Clean instant
*/
private void addCleanInstant(HoodieTimeline timeline, HoodieInstant instant) throws IOException {
LOG.info("Syncing cleaner instant (" + instant + ")");
HoodieCleanMetadata cleanMetadata = CleanerUtils.getCleanerMetadata(metaClient, instant);
cleanMetadata.getPartitionMetadata().entrySet().stream().forEach(entry -> {
final String basePath = metaClient.getBasePath();
final String partitionPath = entry.getValue().getPartitionPath();
List<String> fullPathList = entry.getValue().getSuccessDeleteFiles()
.stream().map(fileName -> new Path(FSUtils
.getPartitionPath(basePath, partitionPath), fileName).toString())
.collect(Collectors.toList());
removeFileSlicesForPartition(timeline, instant, entry.getKey(), fullPathList);
});
LOG.info("Done Syncing cleaner instant (" + instant + ")");
}
private void removeFileSlicesForPartition(HoodieTimeline timeline, HoodieInstant instant, String partition,
List<String> paths) {
if (isPartitionAvailableInStore(partition)) {
LOG.info("Removing file slices for partition (" + partition + ") for instant (" + instant + ")");
FileStatus[] statuses = paths.stream().map(p -> {
FileStatus status = new FileStatus();
status.setPath(new Path(p));
return status;
}).toArray(FileStatus[]::new);
List<HoodieFileGroup> fileGroups =
buildFileGroups(statuses, timeline.filterCompletedAndCompactionInstants(), false);
applyDeltaFileSlicesToPartitionView(partition, fileGroups, DeltaApplyMode.REMOVE);
} else {
LOG.warn("Skipping partition (" + partition + ") when syncing instant (" + instant + ") as it is not loaded");
}
}
/**
* Apply mode whether to add or remove the delta view.
*/
enum DeltaApplyMode {
ADD, REMOVE
}
/**
* Apply changes to partition file-system view. Base Implementation overwrites the entire partitions view assuming
* some sort of map (in-mem/disk-based) is used. For View implementation which supports fine-granular updates (e:g
* RocksDB), override this method.
*
* @param partition PartitionPath
* @param deltaFileGroups Changed file-slices aggregated as file-groups
* @param mode Delta Apply mode
*/
protected void applyDeltaFileSlicesToPartitionView(String partition, List<HoodieFileGroup> deltaFileGroups,
DeltaApplyMode mode) {
if (deltaFileGroups.isEmpty()) {
LOG.info("No delta file groups for partition :" + partition);
return;
}
List<HoodieFileGroup> fileGroups = fetchAllStoredFileGroups(partition).collect(Collectors.toList());
/**
* Note that while finding the new data/log files added/removed, the path stored in metadata will be missing the
* base-path,scheme and authority. Ensure the matching process takes care of this discrepancy.
*/
Map<String, HoodieBaseFile> viewDataFiles = fileGroups.stream().flatMap(HoodieFileGroup::getAllRawFileSlices)
.map(FileSlice::getBaseFile).filter(Option::isPresent).map(Option::get)
.map(df -> Pair.of(Path.getPathWithoutSchemeAndAuthority(new Path(df.getPath())).toString(), df))
.collect(Collectors.toMap(Pair::getKey, Pair::getValue));
// Note: Delta Log Files and Data FIles can be empty when adding/removing pending compactions
Map<String, HoodieBaseFile> deltaDataFiles = deltaFileGroups.stream().flatMap(HoodieFileGroup::getAllRawFileSlices)
.map(FileSlice::getBaseFile).filter(Option::isPresent).map(Option::get)
.map(df -> Pair.of(Path.getPathWithoutSchemeAndAuthority(new Path(df.getPath())).toString(), df))
.collect(Collectors.toMap(Pair::getKey, Pair::getValue));
Map<String, HoodieLogFile> viewLogFiles =
fileGroups.stream().flatMap(HoodieFileGroup::getAllRawFileSlices).flatMap(FileSlice::getLogFiles)
.map(lf -> Pair.of(Path.getPathWithoutSchemeAndAuthority(lf.getPath()).toString(), lf))
.collect(Collectors.toMap(Pair::getKey, Pair::getValue));
Map<String, HoodieLogFile> deltaLogFiles =
deltaFileGroups.stream().flatMap(HoodieFileGroup::getAllRawFileSlices).flatMap(FileSlice::getLogFiles)
.map(lf -> Pair.of(Path.getPathWithoutSchemeAndAuthority(lf.getPath()).toString(), lf))
.collect(Collectors.toMap(Pair::getKey, Pair::getValue));
switch (mode) {
case ADD:
viewDataFiles.putAll(deltaDataFiles);
viewLogFiles.putAll(deltaLogFiles);
break;
case REMOVE:
deltaDataFiles.keySet().stream().forEach(p -> viewDataFiles.remove(p));
deltaLogFiles.keySet().stream().forEach(p -> viewLogFiles.remove(p));
break;
default:
throw new IllegalStateException("Unknown diff apply mode=" + mode);
}
HoodieTimeline timeline = deltaFileGroups.stream().map(df -> df.getTimeline()).findAny().get();
List<HoodieFileGroup> fgs =
buildFileGroups(viewDataFiles.values().stream(), viewLogFiles.values().stream(), timeline, true);
storePartitionView(partition, fgs);
}
@Override
public HoodieTimeline getTimeline() {
return visibleActiveTimeline;
}
}