blob: 5f56369031ab011e3c9a605b6861834d6945b142 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hudi.table;
import org.apache.hudi.client.WriteStatus;
import org.apache.hudi.avro.model.HoodieCompactionPlan;
import org.apache.hudi.common.HoodieRollbackStat;
import org.apache.hudi.common.model.FileSlice;
import org.apache.hudi.common.model.HoodieCommitMetadata;
import org.apache.hudi.common.model.HoodieLogFile;
import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.common.model.HoodieRecordLocation;
import org.apache.hudi.common.model.HoodieRecordPayload;
import org.apache.hudi.common.model.HoodieWriteStat;
import org.apache.hudi.common.table.HoodieTimeline;
import org.apache.hudi.common.table.SyncableFileSystemView;
import org.apache.hudi.common.table.timeline.HoodieActiveTimeline;
import org.apache.hudi.common.table.timeline.HoodieInstant;
import org.apache.hudi.common.util.FSUtils;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.ValidationUtils;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.exception.HoodieCompactionException;
import org.apache.hudi.exception.HoodieIOException;
import org.apache.hudi.exception.HoodieUpsertException;
import org.apache.hudi.execution.MergeOnReadLazyInsertIterable;
import org.apache.hudi.io.HoodieAppendHandle;
import org.apache.hudi.table.compact.HoodieMergeOnReadTableCompactor;
import org.apache.hudi.table.rollback.RollbackHelper;
import org.apache.hudi.table.rollback.RollbackRequest;
import org.apache.log4j.LogManager;
import org.apache.log4j.Logger;
import org.apache.spark.Partitioner;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import java.io.IOException;
import java.io.UncheckedIOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.stream.Collectors;
/**
* Implementation of a more real-time Hoodie Table the provides tradeoffs on read and write cost/amplification.
*
* <p>
* INSERTS - Same as HoodieCopyOnWriteTable - Produce new files, block aligned to desired size (or) Merge with the
* smallest existing file, to expand it
* </p>
* <p>
* UPDATES - Appends the changes to a rolling log file maintained per file Id. Compaction merges the log file into the
* base file.
* </p>
* <p>
* WARNING - MOR table type does not support nested rollbacks, every rollback must be followed by an attempted commit
* action
* </p>
*/
public class HoodieMergeOnReadTable<T extends HoodieRecordPayload> extends HoodieCopyOnWriteTable<T> {
private static final Logger LOG = LogManager.getLogger(HoodieMergeOnReadTable.class);
// UpsertPartitioner for MergeOnRead table type
private MergeOnReadUpsertPartitioner mergeOnReadUpsertPartitioner;
public HoodieMergeOnReadTable(HoodieWriteConfig config, JavaSparkContext jsc) {
super(config, jsc);
}
@Override
public Partitioner getUpsertPartitioner(WorkloadProfile profile, JavaSparkContext jsc) {
if (profile == null) {
throw new HoodieUpsertException("Need workload profile to construct the upsert partitioner.");
}
mergeOnReadUpsertPartitioner = new MergeOnReadUpsertPartitioner(profile, jsc);
return mergeOnReadUpsertPartitioner;
}
@Override
public Iterator<List<WriteStatus>> handleUpdate(String commitTime, String fileId, Iterator<HoodieRecord<T>> recordItr)
throws IOException {
LOG.info("Merging updates for commit " + commitTime + " for file " + fileId);
if (!index.canIndexLogFiles() && mergeOnReadUpsertPartitioner.getSmallFileIds().contains(fileId)) {
LOG.info("Small file corrections for updates for commit " + commitTime + " for file " + fileId);
return super.handleUpdate(commitTime, fileId, recordItr);
} else {
HoodieAppendHandle<T> appendHandle = new HoodieAppendHandle<>(config, commitTime, this, fileId, recordItr);
appendHandle.doAppend();
appendHandle.close();
return Collections.singletonList(Collections.singletonList(appendHandle.getWriteStatus())).iterator();
}
}
@Override
public Iterator<List<WriteStatus>> handleInsert(String commitTime, String idPfx, Iterator<HoodieRecord<T>> recordItr)
throws Exception {
// If canIndexLogFiles, write inserts to log files else write inserts to parquet files
if (index.canIndexLogFiles()) {
return new MergeOnReadLazyInsertIterable<>(recordItr, config, commitTime, this, idPfx);
} else {
return super.handleInsert(commitTime, idPfx, recordItr);
}
}
@Override
public HoodieCompactionPlan scheduleCompaction(JavaSparkContext jsc, String instantTime) {
LOG.info("Checking if compaction needs to be run on " + config.getBasePath());
Option<HoodieInstant> lastCompaction =
getActiveTimeline().getCommitTimeline().filterCompletedInstants().lastInstant();
String deltaCommitsSinceTs = "0";
if (lastCompaction.isPresent()) {
deltaCommitsSinceTs = lastCompaction.get().getTimestamp();
}
int deltaCommitsSinceLastCompaction = getActiveTimeline().getDeltaCommitTimeline()
.findInstantsAfter(deltaCommitsSinceTs, Integer.MAX_VALUE).countInstants();
if (config.getInlineCompactDeltaCommitMax() > deltaCommitsSinceLastCompaction) {
LOG.info("Not running compaction as only " + deltaCommitsSinceLastCompaction
+ " delta commits was found since last compaction " + deltaCommitsSinceTs + ". Waiting for "
+ config.getInlineCompactDeltaCommitMax());
return new HoodieCompactionPlan();
}
LOG.info("Compacting merge on read table " + config.getBasePath());
HoodieMergeOnReadTableCompactor compactor = new HoodieMergeOnReadTableCompactor();
try {
return compactor.generateCompactionPlan(jsc, this, config, instantTime,
((SyncableFileSystemView) getSliceView()).getPendingCompactionOperations()
.map(instantTimeCompactionopPair -> instantTimeCompactionopPair.getValue().getFileGroupId())
.collect(Collectors.toSet()));
} catch (IOException e) {
throw new HoodieCompactionException("Could not schedule compaction " + config.getBasePath(), e);
}
}
@Override
public JavaRDD<WriteStatus> compact(JavaSparkContext jsc, String compactionInstantTime,
HoodieCompactionPlan compactionPlan) {
HoodieMergeOnReadTableCompactor compactor = new HoodieMergeOnReadTableCompactor();
try {
return compactor.compact(jsc, compactionPlan, this, config, compactionInstantTime);
} catch (IOException e) {
throw new HoodieCompactionException("Could not compact " + config.getBasePath(), e);
}
}
@Override
public List<HoodieRollbackStat> rollback(JavaSparkContext jsc, HoodieInstant instant,
boolean deleteInstants) throws IOException {
long startTime = System.currentTimeMillis();
String commit = instant.getTimestamp();
LOG.error("Rolling back instant " + instant);
// Atomically un-publish all non-inflight commits
if (instant.isCompleted()) {
LOG.error("Un-publishing instant " + instant + ", deleteInstants=" + deleteInstants);
instant = this.getActiveTimeline().revertToInflight(instant);
// reload meta-client to reflect latest timeline status
metaClient.reloadActiveTimeline();
}
List<HoodieRollbackStat> allRollbackStats = new ArrayList<>();
// At the moment, MOR table type does not support bulk nested rollbacks. Nested rollbacks is an experimental
// feature that is expensive. To perform nested rollbacks, initiate multiple requests of client.rollback
// (commitToRollback).
// NOTE {@link HoodieCompactionConfig#withCompactionLazyBlockReadEnabled} needs to be set to TRUE. This is
// required to avoid OOM when merging multiple LogBlocks performed during nested rollbacks.
// Atomically un-publish all non-inflight commits
// Atomically un-publish all non-inflight commits
// For Requested State (like failure during index lookup), there is nothing to do rollback other than
// deleting the timeline file
if (!instant.isRequested()) {
LOG.info("Unpublished " + commit);
List<RollbackRequest> rollbackRequests = generateRollbackRequests(jsc, instant);
// TODO: We need to persist this as rollback workload and use it in case of partial failures
allRollbackStats = new RollbackHelper(metaClient, config).performRollback(jsc, instant, rollbackRequests);
}
// Delete Inflight instants if enabled
deleteInflightAndRequestedInstant(deleteInstants, this.getActiveTimeline(), instant);
LOG.info("Time(in ms) taken to finish rollback " + (System.currentTimeMillis() - startTime));
return allRollbackStats;
}
/**
* Generate all rollback requests that we need to perform for rolling back this action without actually performing
* rolling back.
*
* @param jsc JavaSparkContext
* @param instantToRollback Instant to Rollback
* @return list of rollback requests
* @throws IOException
*/
private List<RollbackRequest> generateRollbackRequests(JavaSparkContext jsc, HoodieInstant instantToRollback)
throws IOException {
String commit = instantToRollback.getTimestamp();
List<String> partitions = FSUtils.getAllPartitionPaths(this.metaClient.getFs(), this.getMetaClient().getBasePath(),
config.shouldAssumeDatePartitioning());
int sparkPartitions = Math.max(Math.min(partitions.size(), config.getRollbackParallelism()), 1);
return jsc.parallelize(partitions, Math.min(partitions.size(), sparkPartitions)).flatMap(partitionPath -> {
HoodieActiveTimeline activeTimeline = this.getActiveTimeline().reload();
List<RollbackRequest> partitionRollbackRequests = new ArrayList<>();
switch (instantToRollback.getAction()) {
case HoodieTimeline.COMMIT_ACTION:
LOG.info(
"Rolling back commit action. There are higher delta commits. So only rolling back this instant");
partitionRollbackRequests.add(
RollbackRequest.createRollbackRequestWithDeleteDataAndLogFilesAction(partitionPath, instantToRollback));
break;
case HoodieTimeline.COMPACTION_ACTION:
// If there is no delta commit present after the current commit (if compaction), no action, else we
// need to make sure that a compaction commit rollback also deletes any log files written as part of the
// succeeding deltacommit.
boolean higherDeltaCommits =
!activeTimeline.getDeltaCommitTimeline().filterCompletedInstants().findInstantsAfter(commit, 1).empty();
if (higherDeltaCommits) {
// Rollback of a compaction action with no higher deltacommit means that the compaction is scheduled
// and has not yet finished. In this scenario we should delete only the newly created parquet files
// and not corresponding base commit log files created with this as baseCommit since updates would
// have been written to the log files.
LOG.info("Rolling back compaction. There are higher delta commits. So only deleting data files");
partitionRollbackRequests.add(
RollbackRequest.createRollbackRequestWithDeleteDataFilesOnlyAction(partitionPath, instantToRollback));
} else {
// No deltacommits present after this compaction commit (inflight or requested). In this case, we
// can also delete any log files that were created with this compaction commit as base
// commit.
LOG.info("Rolling back compaction plan. There are NO higher delta commits. So deleting both data and"
+ " log files");
partitionRollbackRequests.add(
RollbackRequest.createRollbackRequestWithDeleteDataAndLogFilesAction(partitionPath, instantToRollback));
}
break;
case HoodieTimeline.DELTA_COMMIT_ACTION:
// --------------------------------------------------------------------------------------------------
// (A) The following cases are possible if index.canIndexLogFiles and/or index.isGlobal
// --------------------------------------------------------------------------------------------------
// (A.1) Failed first commit - Inserts were written to log files and HoodieWriteStat has no entries. In
// this scenario we would want to delete these log files.
// (A.2) Failed recurring commit - Inserts/Updates written to log files. In this scenario,
// HoodieWriteStat will have the baseCommitTime for the first log file written, add rollback blocks.
// (A.3) Rollback triggered for first commit - Inserts were written to the log files but the commit is
// being reverted. In this scenario, HoodieWriteStat will be `null` for the attribute prevCommitTime and
// and hence will end up deleting these log files. This is done so there are no orphan log files
// lying around.
// (A.4) Rollback triggered for recurring commits - Inserts/Updates are being rolled back, the actions
// taken in this scenario is a combination of (A.2) and (A.3)
// ---------------------------------------------------------------------------------------------------
// (B) The following cases are possible if !index.canIndexLogFiles and/or !index.isGlobal
// ---------------------------------------------------------------------------------------------------
// (B.1) Failed first commit - Inserts were written to parquet files and HoodieWriteStat has no entries.
// In this scenario, we delete all the parquet files written for the failed commit.
// (B.2) Failed recurring commits - Inserts were written to parquet files and updates to log files. In
// this scenario, perform (A.1) and for updates written to log files, write rollback blocks.
// (B.3) Rollback triggered for first commit - Same as (B.1)
// (B.4) Rollback triggered for recurring commits - Same as (B.2) plus we need to delete the log files
// as well if the base parquet file gets deleted.
try {
HoodieCommitMetadata commitMetadata = HoodieCommitMetadata.fromBytes(
metaClient.getCommitTimeline()
.getInstantDetails(
new HoodieInstant(true, instantToRollback.getAction(), instantToRollback.getTimestamp()))
.get(),
HoodieCommitMetadata.class);
// In case all data was inserts and the commit failed, delete the file belonging to that commit
// We do not know fileIds for inserts (first inserts are either log files or parquet files),
// delete all files for the corresponding failed commit, if present (same as COW)
partitionRollbackRequests.add(
RollbackRequest.createRollbackRequestWithDeleteDataAndLogFilesAction(partitionPath, instantToRollback));
// append rollback blocks for updates
if (commitMetadata.getPartitionToWriteStats().containsKey(partitionPath)) {
partitionRollbackRequests
.addAll(generateAppendRollbackBlocksAction(partitionPath, instantToRollback, commitMetadata));
}
break;
} catch (IOException io) {
throw new UncheckedIOException("Failed to collect rollback actions for commit " + commit, io);
}
default:
break;
}
return partitionRollbackRequests.iterator();
}).filter(Objects::nonNull).collect();
}
@Override
public void finalizeWrite(JavaSparkContext jsc, String instantTs, List<HoodieWriteStat> stats)
throws HoodieIOException {
// delegate to base class for MOR tables
super.finalizeWrite(jsc, instantTs, stats);
}
/**
* UpsertPartitioner for MergeOnRead table type, this allows auto correction of small parquet files to larger ones
* without the need for an index in the logFile.
*/
class MergeOnReadUpsertPartitioner extends HoodieCopyOnWriteTable.UpsertPartitioner {
MergeOnReadUpsertPartitioner(WorkloadProfile profile, JavaSparkContext jsc) {
super(profile, jsc);
}
@Override
protected List<SmallFile> getSmallFiles(String partitionPath) {
// smallFiles only for partitionPath
List<SmallFile> smallFileLocations = new ArrayList<>();
// Init here since this class (and member variables) might not have been initialized
HoodieTimeline commitTimeline = getCompletedCommitsTimeline();
// Find out all eligible small file slices
if (!commitTimeline.empty()) {
HoodieInstant latestCommitTime = commitTimeline.lastInstant().get();
// find smallest file in partition and append to it
List<FileSlice> allSmallFileSlices = new ArrayList<>();
// If we cannot index log files, then we choose the smallest parquet file in the partition and add inserts to
// it. Doing this overtime for a partition, we ensure that we handle small file issues
if (!index.canIndexLogFiles()) {
// TODO : choose last N small files since there can be multiple small files written to a single partition
// by different spark partitions in a single batch
Option<FileSlice> smallFileSlice = Option.fromJavaOptional(getSliceView()
.getLatestFileSlicesBeforeOrOn(partitionPath, latestCommitTime.getTimestamp(), false)
.filter(fileSlice -> fileSlice.getLogFiles().count() < 1 && fileSlice.getBaseFile().get().getFileSize() < config.getParquetSmallFileLimit())
.min((FileSlice left, FileSlice right) -> left.getBaseFile().get().getFileSize() < right.getBaseFile().get().getFileSize() ? -1 : 1));
if (smallFileSlice.isPresent()) {
allSmallFileSlices.add(smallFileSlice.get());
}
} else {
// If we can index log files, we can add more inserts to log files for fileIds including those under
// pending compaction.
List<FileSlice> allFileSlices =
getSliceView().getLatestFileSlicesBeforeOrOn(partitionPath, latestCommitTime.getTimestamp(), true)
.collect(Collectors.toList());
for (FileSlice fileSlice : allFileSlices) {
if (isSmallFile(fileSlice)) {
allSmallFileSlices.add(fileSlice);
}
}
}
// Create SmallFiles from the eligible file slices
for (FileSlice smallFileSlice : allSmallFileSlices) {
SmallFile sf = new SmallFile();
if (smallFileSlice.getBaseFile().isPresent()) {
// TODO : Move logic of file name, file id, base commit time handling inside file slice
String filename = smallFileSlice.getBaseFile().get().getFileName();
sf.location = new HoodieRecordLocation(FSUtils.getCommitTime(filename), FSUtils.getFileId(filename));
sf.sizeBytes = getTotalFileSize(smallFileSlice);
smallFileLocations.add(sf);
} else {
HoodieLogFile logFile = smallFileSlice.getLogFiles().findFirst().get();
sf.location = new HoodieRecordLocation(FSUtils.getBaseCommitTimeFromLogPath(logFile.getPath()),
FSUtils.getFileIdFromLogPath(logFile.getPath()));
sf.sizeBytes = getTotalFileSize(smallFileSlice);
smallFileLocations.add(sf);
}
}
}
return smallFileLocations;
}
public List<String> getSmallFileIds() {
return (List<String>) smallFiles.stream().map(smallFile -> ((SmallFile) smallFile).location.getFileId())
.collect(Collectors.toList());
}
private long getTotalFileSize(FileSlice fileSlice) {
if (!fileSlice.getBaseFile().isPresent()) {
return convertLogFilesSizeToExpectedParquetSize(fileSlice.getLogFiles().collect(Collectors.toList()));
} else {
return fileSlice.getBaseFile().get().getFileSize()
+ convertLogFilesSizeToExpectedParquetSize(fileSlice.getLogFiles().collect(Collectors.toList()));
}
}
private boolean isSmallFile(FileSlice fileSlice) {
long totalSize = getTotalFileSize(fileSlice);
return totalSize < config.getParquetMaxFileSize();
}
// TODO (NA) : Make this static part of utility
public long convertLogFilesSizeToExpectedParquetSize(List<HoodieLogFile> hoodieLogFiles) {
long totalSizeOfLogFiles = hoodieLogFiles.stream().map(HoodieLogFile::getFileSize)
.filter(size -> size > 0).reduce(Long::sum).orElse(0L);
// Here we assume that if there is no base parquet file, all log files contain only inserts.
// We can then just get the parquet equivalent size of these log files, compare that with
// {@link config.getParquetMaxFileSize()} and decide if there is scope to insert more rows
return (long) (totalSizeOfLogFiles * config.getLogFileToParquetCompressionRatio());
}
}
private List<RollbackRequest> generateAppendRollbackBlocksAction(String partitionPath, HoodieInstant rollbackInstant,
HoodieCommitMetadata commitMetadata) {
ValidationUtils.checkArgument(rollbackInstant.getAction().equals(HoodieTimeline.DELTA_COMMIT_ACTION));
// wStat.getPrevCommit() might not give the right commit time in the following
// scenario : If a compaction was scheduled, the new commitTime associated with the requested compaction will be
// used to write the new log files. In this case, the commit time for the log file is the compaction requested time.
// But the index (global) might store the baseCommit of the parquet and not the requested, hence get the
// baseCommit always by listing the file slice
Map<String, String> fileIdToBaseCommitTimeForLogMap = this.getSliceView().getLatestFileSlices(partitionPath)
.collect(Collectors.toMap(FileSlice::getFileId, FileSlice::getBaseInstantTime));
return commitMetadata.getPartitionToWriteStats().get(partitionPath).stream().filter(wStat -> {
// Filter out stats without prevCommit since they are all inserts
boolean validForRollback = (wStat != null) && (!wStat.getPrevCommit().equals(HoodieWriteStat.NULL_COMMIT))
&& (wStat.getPrevCommit() != null) && fileIdToBaseCommitTimeForLogMap.containsKey(wStat.getFileId());
if (validForRollback) {
// For sanity, log instant time can never be less than base-commit on which we are rolling back
ValidationUtils
.checkArgument(HoodieTimeline.compareTimestamps(fileIdToBaseCommitTimeForLogMap.get(wStat.getFileId()),
rollbackInstant.getTimestamp(), HoodieTimeline.LESSER_OR_EQUAL));
}
return validForRollback && HoodieTimeline.compareTimestamps(fileIdToBaseCommitTimeForLogMap.get(
// Base Ts should be strictly less. If equal (for inserts-to-logs), the caller employs another option
// to delete and we should not step on it
wStat.getFileId()), rollbackInstant.getTimestamp(), HoodieTimeline.LESSER);
}).map(wStat -> {
String baseCommitTime = fileIdToBaseCommitTimeForLogMap.get(wStat.getFileId());
return RollbackRequest.createRollbackRequestWithAppendRollbackBlockAction(partitionPath, wStat.getFileId(),
baseCommitTime, rollbackInstant);
}).collect(Collectors.toList());
}
}