blob: ab164280613408429f3420ce7bb3425da31bbcba [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hudi.common.table.timeline;
import org.apache.hudi.common.table.HoodieTableMetaClient;
import org.apache.hudi.common.table.HoodieTimeline;
import org.apache.hudi.common.table.timeline.HoodieInstant.State;
import org.apache.hudi.common.util.FileIOUtils;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.ValidationUtils;
import org.apache.hudi.exception.HoodieIOException;
import com.google.common.collect.ImmutableSet;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.Path;
import org.apache.log4j.LogManager;
import org.apache.log4j.Logger;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.io.Serializable;
import java.text.SimpleDateFormat;
import java.util.Arrays;
import java.util.Date;
import java.util.HashSet;
import java.util.Set;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Function;
import java.util.stream.Collectors;
/**
* Represents the Active Timeline for the Hoodie table. Instants for the last 12 hours (configurable) is in the
* ActiveTimeline and the rest are Archived. ActiveTimeline is a special timeline that allows for creation of instants
* on the timeline.
* <p>
* </p>
* The timeline is not automatically reloaded on any mutation operation, clients have to manually call reload() so that
* they can chain multiple mutations to the timeline and then call reload() once.
* <p>
* </p>
* This class can be serialized and de-serialized and on de-serialization the FileSystem is re-initialized.
*/
public class HoodieActiveTimeline extends HoodieDefaultTimeline {
public static final SimpleDateFormat COMMIT_FORMATTER = new SimpleDateFormat("yyyyMMddHHmmss");
public static final Set<String> VALID_EXTENSIONS_IN_ACTIVE_TIMELINE = new HashSet<>(Arrays.asList(
COMMIT_EXTENSION, INFLIGHT_COMMIT_EXTENSION, REQUESTED_COMMIT_EXTENSION, DELTA_COMMIT_EXTENSION,
INFLIGHT_DELTA_COMMIT_EXTENSION, REQUESTED_DELTA_COMMIT_EXTENSION, SAVEPOINT_EXTENSION,
INFLIGHT_SAVEPOINT_EXTENSION, CLEAN_EXTENSION, REQUESTED_CLEAN_EXTENSION, INFLIGHT_CLEAN_EXTENSION,
INFLIGHT_COMPACTION_EXTENSION, REQUESTED_COMPACTION_EXTENSION, INFLIGHT_RESTORE_EXTENSION, RESTORE_EXTENSION));
private static final Logger LOG = LogManager.getLogger(HoodieActiveTimeline.class);
protected HoodieTableMetaClient metaClient;
private static AtomicReference<String> lastInstantTime = new AtomicReference<>(String.valueOf(Integer.MIN_VALUE));
/**
* Returns next instant time in the {@link #COMMIT_FORMATTER} format.
* Ensures each instant time is atleast 1 second apart since we create instant times at second granularity
*/
public static String createNewInstantTime() {
return lastInstantTime.updateAndGet((oldVal) -> {
String newCommitTime;
do {
newCommitTime = HoodieActiveTimeline.COMMIT_FORMATTER.format(new Date());
} while (HoodieTimeline.compareTimestamps(newCommitTime, oldVal, LESSER_OR_EQUAL));
return newCommitTime;
});
}
protected HoodieActiveTimeline(HoodieTableMetaClient metaClient, Set<String> includedExtensions) {
this(metaClient, includedExtensions, true);
}
protected HoodieActiveTimeline(HoodieTableMetaClient metaClient, Set<String> includedExtensions,
boolean applyLayoutFilters) {
// Filter all the filter in the metapath and include only the extensions passed and
// convert them into HoodieInstant
try {
this.setInstants(metaClient.scanHoodieInstantsFromFileSystem(includedExtensions, applyLayoutFilters));
} catch (IOException e) {
throw new HoodieIOException("Failed to scan metadata", e);
}
this.metaClient = metaClient;
// multiple casts will make this lambda serializable -
// http://docs.oracle.com/javase/specs/jls/se8/html/jls-15.html#jls-15.16
this.details = (Function<HoodieInstant, Option<byte[]>> & Serializable) this::getInstantDetails;
LOG.info("Loaded instants " + getInstants().collect(Collectors.toList()));
}
public HoodieActiveTimeline(HoodieTableMetaClient metaClient) {
this(metaClient, new ImmutableSet.Builder<String>().addAll(VALID_EXTENSIONS_IN_ACTIVE_TIMELINE).build());
}
public HoodieActiveTimeline(HoodieTableMetaClient metaClient, boolean applyLayoutFilter) {
this(metaClient,
new ImmutableSet.Builder<String>()
.addAll(VALID_EXTENSIONS_IN_ACTIVE_TIMELINE).build(), applyLayoutFilter);
}
/**
* For serialization and de-serialization only.
*
* @deprecated
*/
public HoodieActiveTimeline() {
}
/**
* This method is only used when this object is deserialized in a spark executor.
*
* @deprecated
*/
private void readObject(java.io.ObjectInputStream in) throws IOException, ClassNotFoundException {
in.defaultReadObject();
}
public void createNewInstant(HoodieInstant instant) {
LOG.info("Creating a new instant " + instant);
// Create the in-flight file
createFileInMetaPath(instant.getFileName(), Option.empty(), false);
}
public void saveAsComplete(HoodieInstant instant, Option<byte[]> data) {
LOG.info("Marking instant complete " + instant);
ValidationUtils.checkArgument(instant.isInflight(),
"Could not mark an already completed instant as complete again " + instant);
transitionState(instant, HoodieTimeline.getCompletedInstant(instant), data);
LOG.info("Completed " + instant);
}
public HoodieInstant revertToInflight(HoodieInstant instant) {
LOG.info("Reverting instant to inflight " + instant);
HoodieInstant inflight = HoodieTimeline.getInflightInstant(instant, metaClient.getTableType());
revertCompleteToInflight(instant, inflight);
LOG.info("Reverted " + instant + " to inflight " + inflight);
return inflight;
}
public void deleteInflight(HoodieInstant instant) {
ValidationUtils.checkArgument(instant.isInflight());
deleteInstantFile(instant);
}
public void deletePending(HoodieInstant instant) {
ValidationUtils.checkArgument(!instant.isCompleted());
deleteInstantFile(instant);
}
public void deleteCompactionRequested(HoodieInstant instant) {
ValidationUtils.checkArgument(instant.isRequested());
ValidationUtils.checkArgument(instant.getAction().equals(HoodieTimeline.COMPACTION_ACTION));
deleteInstantFile(instant);
}
private void deleteInstantFile(HoodieInstant instant) {
LOG.info("Deleting instant " + instant);
Path inFlightCommitFilePath = new Path(metaClient.getMetaPath(), instant.getFileName());
try {
boolean result = metaClient.getFs().delete(inFlightCommitFilePath, false);
if (result) {
LOG.info("Removed instant " + instant);
} else {
throw new HoodieIOException("Could not delete instant " + instant);
}
} catch (IOException e) {
throw new HoodieIOException("Could not remove inflight commit " + inFlightCommitFilePath, e);
}
}
@Override
public Option<byte[]> getInstantDetails(HoodieInstant instant) {
Path detailPath = new Path(metaClient.getMetaPath(), instant.getFileName());
return readDataFromPath(detailPath);
}
public Option<byte[]> readCleanerInfoAsBytes(HoodieInstant instant) {
// Cleaner metadata are always stored only in timeline .hoodie
return readDataFromPath(new Path(metaClient.getMetaPath(), instant.getFileName()));
}
//-----------------------------------------------------------------
// BEGIN - COMPACTION RELATED META-DATA MANAGEMENT.
//-----------------------------------------------------------------
public Option<byte[]> readCompactionPlanAsBytes(HoodieInstant instant) {
try {
// Reading from auxiliary path first. In future release, we will cleanup compaction management
// to only write to timeline and skip auxiliary and this code will be able to handle it.
return readDataFromPath(new Path(metaClient.getMetaAuxiliaryPath(), instant.getFileName()));
} catch (HoodieIOException e) {
// This will be removed in future release. See HUDI-546
if (e.getIOException() instanceof FileNotFoundException) {
return readDataFromPath(new Path(metaClient.getMetaPath(), instant.getFileName()));
} else {
throw e;
}
}
}
/**
* Revert compaction State from inflight to requested.
*
* @param inflightInstant Inflight Instant
* @return requested instant
*/
public HoodieInstant revertCompactionInflightToRequested(HoodieInstant inflightInstant) {
ValidationUtils.checkArgument(inflightInstant.getAction().equals(HoodieTimeline.COMPACTION_ACTION));
ValidationUtils.checkArgument(inflightInstant.isInflight());
HoodieInstant requestedInstant =
new HoodieInstant(State.REQUESTED, COMPACTION_ACTION, inflightInstant.getTimestamp());
if (metaClient.getTimelineLayoutVersion().isNullVersion()) {
// Pass empty data since it is read from the corresponding .aux/.compaction instant file
transitionState(inflightInstant, requestedInstant, Option.empty());
} else {
deleteInflight(inflightInstant);
}
return requestedInstant;
}
/**
* Transition Compaction State from requested to inflight.
*
* @param requestedInstant Requested instant
* @return inflight instant
*/
public HoodieInstant transitionCompactionRequestedToInflight(HoodieInstant requestedInstant) {
ValidationUtils.checkArgument(requestedInstant.getAction().equals(HoodieTimeline.COMPACTION_ACTION));
ValidationUtils.checkArgument(requestedInstant.isRequested());
HoodieInstant inflightInstant =
new HoodieInstant(State.INFLIGHT, COMPACTION_ACTION, requestedInstant.getTimestamp());
transitionState(requestedInstant, inflightInstant, Option.empty());
return inflightInstant;
}
/**
* Transition Compaction State from inflight to Committed.
*
* @param inflightInstant Inflight instant
* @param data Extra Metadata
* @return commit instant
*/
public HoodieInstant transitionCompactionInflightToComplete(HoodieInstant inflightInstant, Option<byte[]> data) {
ValidationUtils.checkArgument(inflightInstant.getAction().equals(HoodieTimeline.COMPACTION_ACTION));
ValidationUtils.checkArgument(inflightInstant.isInflight());
HoodieInstant commitInstant = new HoodieInstant(State.COMPLETED, COMMIT_ACTION, inflightInstant.getTimestamp());
transitionState(inflightInstant, commitInstant, data);
return commitInstant;
}
private void createFileInAuxiliaryFolder(HoodieInstant instant, Option<byte[]> data) {
// This will be removed in future release. See HUDI-546
Path fullPath = new Path(metaClient.getMetaAuxiliaryPath(), instant.getFileName());
createFileInPath(fullPath, data);
}
//-----------------------------------------------------------------
// END - COMPACTION RELATED META-DATA MANAGEMENT
//-----------------------------------------------------------------
/**
* Transition Clean State from inflight to Committed.
*
* @param inflightInstant Inflight instant
* @param data Extra Metadata
* @return commit instant
*/
public HoodieInstant transitionCleanInflightToComplete(HoodieInstant inflightInstant, Option<byte[]> data) {
ValidationUtils.checkArgument(inflightInstant.getAction().equals(HoodieTimeline.CLEAN_ACTION));
ValidationUtils.checkArgument(inflightInstant.isInflight());
HoodieInstant commitInstant = new HoodieInstant(State.COMPLETED, CLEAN_ACTION, inflightInstant.getTimestamp());
// Then write to timeline
transitionState(inflightInstant, commitInstant, data);
return commitInstant;
}
/**
* Transition Clean State from requested to inflight.
*
* @param requestedInstant requested instant
* @param data Optional data to be stored
* @return commit instant
*/
public HoodieInstant transitionCleanRequestedToInflight(HoodieInstant requestedInstant, Option<byte[]> data) {
ValidationUtils.checkArgument(requestedInstant.getAction().equals(HoodieTimeline.CLEAN_ACTION));
ValidationUtils.checkArgument(requestedInstant.isRequested());
HoodieInstant inflight = new HoodieInstant(State.INFLIGHT, CLEAN_ACTION, requestedInstant.getTimestamp());
transitionState(requestedInstant, inflight, data);
return inflight;
}
private void transitionState(HoodieInstant fromInstant, HoodieInstant toInstant, Option<byte[]> data) {
transitionState(fromInstant, toInstant, data, false);
}
private void transitionState(HoodieInstant fromInstant, HoodieInstant toInstant, Option<byte[]> data,
boolean allowRedundantTransitions) {
ValidationUtils.checkArgument(fromInstant.getTimestamp().equals(toInstant.getTimestamp()));
try {
if (metaClient.getTimelineLayoutVersion().isNullVersion()) {
// Re-create the .inflight file by opening a new file and write the commit metadata in
createFileInMetaPath(fromInstant.getFileName(), data, allowRedundantTransitions);
Path fromInstantPath = new Path(metaClient.getMetaPath(), fromInstant.getFileName());
Path toInstantPath = new Path(metaClient.getMetaPath(), toInstant.getFileName());
boolean success = metaClient.getFs().rename(fromInstantPath, toInstantPath);
if (!success) {
throw new HoodieIOException("Could not rename " + fromInstantPath + " to " + toInstantPath);
}
} else {
// Ensures old state exists in timeline
LOG.info("Checking for file exists ?" + new Path(metaClient.getMetaPath(), fromInstant.getFileName()));
ValidationUtils.checkArgument(metaClient.getFs().exists(new Path(metaClient.getMetaPath(),
fromInstant.getFileName())));
// Use Write Once to create Target File
if (allowRedundantTransitions) {
createFileInPath(new Path(metaClient.getMetaPath(), toInstant.getFileName()), data);
} else {
createImmutableFileInPath(new Path(metaClient.getMetaPath(), toInstant.getFileName()), data);
}
LOG.info("Create new file for toInstant ?" + new Path(metaClient.getMetaPath(), toInstant.getFileName()));
}
} catch (IOException e) {
throw new HoodieIOException("Could not complete " + fromInstant, e);
}
}
private void revertCompleteToInflight(HoodieInstant completed, HoodieInstant inflight) {
ValidationUtils.checkArgument(completed.getTimestamp().equals(inflight.getTimestamp()));
Path inFlightCommitFilePath = new Path(metaClient.getMetaPath(), inflight.getFileName());
Path commitFilePath = new Path(metaClient.getMetaPath(), completed.getFileName());
try {
if (metaClient.getTimelineLayoutVersion().isNullVersion()) {
if (!metaClient.getFs().exists(inFlightCommitFilePath)) {
boolean success = metaClient.getFs().rename(commitFilePath, inFlightCommitFilePath);
if (!success) {
throw new HoodieIOException(
"Could not rename " + commitFilePath + " to " + inFlightCommitFilePath);
}
}
} else {
Path requestedInstantFilePath = new Path(metaClient.getMetaPath(),
new HoodieInstant(State.REQUESTED, inflight.getAction(), inflight.getTimestamp()).getFileName());
// If inflight and requested files do not exist, create one
if (!metaClient.getFs().exists(requestedInstantFilePath)) {
metaClient.getFs().create(requestedInstantFilePath, false).close();
}
if (!metaClient.getFs().exists(inFlightCommitFilePath)) {
metaClient.getFs().create(inFlightCommitFilePath, false).close();
}
boolean success = metaClient.getFs().delete(commitFilePath, false);
ValidationUtils.checkArgument(success, "State Reverting failed");
}
} catch (IOException e) {
throw new HoodieIOException("Could not complete revert " + completed, e);
}
}
public void transitionRequestedToInflight(HoodieInstant requested, Option<byte[]> content) {
transitionRequestedToInflight(requested, content, false);
}
public void transitionRequestedToInflight(HoodieInstant requested, Option<byte[]> content,
boolean allowRedundantTransitions) {
HoodieInstant inflight = new HoodieInstant(State.INFLIGHT, requested.getAction(), requested.getTimestamp());
ValidationUtils.checkArgument(requested.isRequested(), "Instant " + requested + " in wrong state");
transitionState(requested, inflight, content, allowRedundantTransitions);
}
public void saveToCompactionRequested(HoodieInstant instant, Option<byte[]> content) {
saveToCompactionRequested(instant, content, false);
}
public void saveToCompactionRequested(HoodieInstant instant, Option<byte[]> content, boolean overwrite) {
ValidationUtils.checkArgument(instant.getAction().equals(HoodieTimeline.COMPACTION_ACTION));
// Write workload to auxiliary folder
createFileInAuxiliaryFolder(instant, content);
createFileInMetaPath(instant.getFileName(), content, overwrite);
}
public void saveToCleanRequested(HoodieInstant instant, Option<byte[]> content) {
ValidationUtils.checkArgument(instant.getAction().equals(HoodieTimeline.CLEAN_ACTION));
ValidationUtils.checkArgument(instant.getState().equals(State.REQUESTED));
// Plan is stored in meta path
createFileInMetaPath(instant.getFileName(), content, false);
}
private void createFileInMetaPath(String filename, Option<byte[]> content, boolean allowOverwrite) {
Path fullPath = new Path(metaClient.getMetaPath(), filename);
if (allowOverwrite || metaClient.getTimelineLayoutVersion().isNullVersion()) {
createFileInPath(fullPath, content);
} else {
createImmutableFileInPath(fullPath, content);
}
}
private void createFileInPath(Path fullPath, Option<byte[]> content) {
try {
// If the path does not exist, create it first
if (!metaClient.getFs().exists(fullPath)) {
if (metaClient.getFs().createNewFile(fullPath)) {
LOG.info("Created a new file in meta path: " + fullPath);
} else {
throw new HoodieIOException("Failed to create file " + fullPath);
}
}
if (content.isPresent()) {
FSDataOutputStream fsout = metaClient.getFs().create(fullPath, true);
fsout.write(content.get());
fsout.close();
}
} catch (IOException e) {
throw new HoodieIOException("Failed to create file " + fullPath, e);
}
}
/**
* Creates a new file in timeline with overwrite set to false. This ensures
* files are created only once and never rewritten
* @param fullPath File Path
* @param content Content to be stored
*/
private void createImmutableFileInPath(Path fullPath, Option<byte[]> content) {
FSDataOutputStream fsout = null;
try {
fsout = metaClient.getFs().create(fullPath, false);
if (content.isPresent()) {
fsout.write(content.get());
}
} catch (IOException e) {
throw new HoodieIOException("Failed to create file " + fullPath, e);
} finally {
try {
if (null != fsout) {
fsout.close();
}
} catch (IOException e) {
throw new HoodieIOException("Failed to close file " + fullPath, e);
}
}
}
private Option<byte[]> readDataFromPath(Path detailPath) {
try (FSDataInputStream is = metaClient.getFs().open(detailPath)) {
return Option.of(FileIOUtils.readAsByteArray(is));
} catch (IOException e) {
throw new HoodieIOException("Could not read commit details from " + detailPath, e);
}
}
public HoodieActiveTimeline reload() {
return new HoodieActiveTimeline(metaClient);
}
}