blob: af2c2782e57d33688f1f73831432138bfb6ba09a [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hudi.table;
import com.fasterxml.jackson.databind.DeserializationFeature;
import com.fasterxml.jackson.databind.ObjectMapper;
import org.apache.avro.Schema;
import org.apache.avro.generic.IndexedRecord;
import org.apache.hadoop.fs.Path;
import org.apache.hudi.avro.model.HoodieArchivedMetaEntry;
import org.apache.hudi.avro.model.HoodieCompactionPlan;
import org.apache.hudi.avro.model.HoodieRollbackMetadata;
import org.apache.hudi.avro.model.HoodieSavepointMetadata;
import org.apache.hudi.client.ReplaceArchivalHelper;
import org.apache.hudi.client.common.HoodieEngineContext;
import org.apache.hudi.common.model.ActionType;
import org.apache.hudi.common.model.HoodieArchivedLogFile;
import org.apache.hudi.common.model.HoodieAvroPayload;
import org.apache.hudi.common.model.HoodieCommitMetadata;
import org.apache.hudi.common.model.HoodieReplaceCommitMetadata;
import org.apache.hudi.common.model.HoodieRollingStatMetadata;
import org.apache.hudi.common.table.HoodieTableMetaClient;
import org.apache.hudi.common.table.log.HoodieLogFormat;
import org.apache.hudi.common.table.log.HoodieLogFormat.Writer;
import org.apache.hudi.common.table.log.block.HoodieAvroDataBlock;
import org.apache.hudi.common.table.log.block.HoodieLogBlock;
import org.apache.hudi.common.table.log.block.HoodieLogBlock.HeaderMetadataType;
import org.apache.hudi.common.table.timeline.HoodieActiveTimeline;
import org.apache.hudi.common.table.timeline.HoodieArchivedTimeline;
import org.apache.hudi.common.table.timeline.HoodieInstant;
import org.apache.hudi.common.table.timeline.HoodieTimeline;
import org.apache.hudi.common.table.timeline.TimelineMetadataUtils;
import org.apache.hudi.common.table.view.TableFileSystemView;
import org.apache.hudi.common.util.CleanerUtils;
import org.apache.hudi.common.util.CompactionUtils;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.collection.Pair;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.exception.HoodieCommitException;
import org.apache.hudi.exception.HoodieException;
import org.apache.hudi.exception.HoodieIOException;
import org.apache.log4j.LogManager;
import org.apache.log4j.Logger;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.Comparator;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import static org.apache.hudi.common.table.timeline.HoodieTimeline.GREATER_THAN;
import static org.apache.hudi.common.table.timeline.HoodieTimeline.LESSER_THAN_OR_EQUALS;
/**
* Archiver to bound the growth of files under .hoodie meta path.
*/
public class HoodieTimelineArchiveLog<T extends HoodieAvroPayload, I, K, O> {
private static final Logger LOG = LogManager.getLogger(HoodieTimelineArchiveLog.class);
private final Path archiveFilePath;
private final HoodieWriteConfig config;
private Writer writer;
private final int maxInstantsToKeep;
private final int minInstantsToKeep;
private final HoodieTable<T, I, K, O> table;
private final HoodieTableMetaClient metaClient;
public HoodieTimelineArchiveLog(HoodieWriteConfig config, HoodieTable<T, I, K, O> table) {
this.config = config;
this.table = table;
this.metaClient = table.getMetaClient();
this.archiveFilePath = HoodieArchivedTimeline.getArchiveLogPath(metaClient.getArchivePath());
this.maxInstantsToKeep = config.getMaxCommitsToKeep();
this.minInstantsToKeep = config.getMinCommitsToKeep();
}
private Writer openWriter() {
try {
if (this.writer == null) {
return HoodieLogFormat.newWriterBuilder().onParentPath(archiveFilePath.getParent())
.withFileId(archiveFilePath.getName()).withFileExtension(HoodieArchivedLogFile.ARCHIVE_EXTENSION)
.withFs(metaClient.getFs()).overBaseCommit("").build();
} else {
return this.writer;
}
} catch (InterruptedException | IOException e) {
throw new HoodieException("Unable to initialize HoodieLogFormat writer", e);
}
}
private void close() {
try {
if (this.writer != null) {
this.writer.close();
}
} catch (IOException e) {
throw new HoodieException("Unable to close HoodieLogFormat writer", e);
}
}
/**
* Check if commits need to be archived. If yes, archive commits.
*/
public boolean archiveIfRequired(HoodieEngineContext context) throws IOException {
try {
List<HoodieInstant> instantsToArchive = getInstantsToArchive().collect(Collectors.toList());
boolean success = true;
if (!instantsToArchive.isEmpty()) {
this.writer = openWriter();
LOG.info("Archiving instants " + instantsToArchive);
archive(context, instantsToArchive);
LOG.info("Deleting archived instants " + instantsToArchive);
success = deleteArchivedInstants(instantsToArchive);
} else {
LOG.info("No Instants to archive");
}
return success;
} finally {
close();
}
}
private Stream<HoodieInstant> getCleanInstantsToArchive() {
HoodieTimeline cleanAndRollbackTimeline = table.getActiveTimeline()
.getTimelineOfActions(Collections.singleton(HoodieTimeline.CLEAN_ACTION)).filterCompletedInstants();
return cleanAndRollbackTimeline.getInstants()
.collect(Collectors.groupingBy(HoodieInstant::getAction)).values().stream()
.map(hoodieInstants -> {
if (hoodieInstants.size() > this.maxInstantsToKeep) {
return hoodieInstants.subList(0, hoodieInstants.size() - this.minInstantsToKeep);
} else {
return new ArrayList<HoodieInstant>();
}
}).flatMap(Collection::stream);
}
private Stream<HoodieInstant> getCommitInstantsToArchive() {
// TODO (na) : Add a way to return actions associated with a timeline and then merge/unify
// with logic above to avoid Stream.concats
HoodieTimeline commitTimeline = table.getCompletedCommitsTimeline();
Option<HoodieInstant> oldestPendingCompactionInstant =
table.getActiveTimeline().filterPendingCompactionTimeline().firstInstant();
// We cannot have any holes in the commit timeline. We cannot archive any commits which are
// made after the first savepoint present.
Option<HoodieInstant> firstSavepoint = table.getCompletedSavepointTimeline().firstInstant();
if (!commitTimeline.empty() && commitTimeline.countInstants() > maxInstantsToKeep) {
// Actually do the commits
return commitTimeline.getInstants()
.filter(s -> {
// if no savepoint present, then dont filter
return !(firstSavepoint.isPresent() && HoodieTimeline.compareTimestamps(firstSavepoint.get().getTimestamp(), LESSER_THAN_OR_EQUALS, s.getTimestamp()));
}).filter(s -> {
// Ensure commits >= oldest pending compaction commit is retained
return oldestPendingCompactionInstant
.map(instant -> HoodieTimeline.compareTimestamps(instant.getTimestamp(), GREATER_THAN, s.getTimestamp()))
.orElse(true);
}).limit(commitTimeline.countInstants() - minInstantsToKeep);
} else {
return Stream.empty();
}
}
private Stream<HoodieInstant> getInstantsToArchive() {
// TODO: Handle ROLLBACK_ACTION in future
Stream<HoodieInstant> instants = Stream.concat(getCleanInstantsToArchive(), getCommitInstantsToArchive());
// For archiving and cleaning instants, we need to include intermediate state files if they exist
HoodieActiveTimeline rawActiveTimeline = new HoodieActiveTimeline(metaClient, false);
Map<Pair<String, String>, List<HoodieInstant>> groupByTsAction = rawActiveTimeline.getInstants()
.collect(Collectors.groupingBy(i -> Pair.of(i.getTimestamp(),
HoodieInstant.getComparableAction(i.getAction()))));
return instants.flatMap(hoodieInstant ->
groupByTsAction.get(Pair.of(hoodieInstant.getTimestamp(),
HoodieInstant.getComparableAction(hoodieInstant.getAction()))).stream());
}
private boolean deleteArchivedInstants(List<HoodieInstant> archivedInstants) throws IOException {
LOG.info("Deleting instants " + archivedInstants);
boolean success = true;
for (HoodieInstant archivedInstant : archivedInstants) {
Path commitFile = new Path(metaClient.getMetaPath(), archivedInstant.getFileName());
try {
if (metaClient.getFs().exists(commitFile)) {
success &= metaClient.getFs().delete(commitFile, false);
LOG.info("Archived and deleted instant file " + commitFile);
}
} catch (IOException e) {
throw new HoodieIOException("Failed to delete archived instant " + archivedInstant, e);
}
}
// Remove older meta-data from auxiliary path too
Option<HoodieInstant> latestCommitted = Option.fromJavaOptional(archivedInstants.stream().filter(i -> i.isCompleted() && (i.getAction().equals(HoodieTimeline.COMMIT_ACTION)
|| (i.getAction().equals(HoodieTimeline.DELTA_COMMIT_ACTION)))).max(Comparator.comparing(HoodieInstant::getTimestamp)));
LOG.info("Latest Committed Instant=" + latestCommitted);
if (latestCommitted.isPresent()) {
success &= deleteAllInstantsOlderorEqualsInAuxMetaFolder(latestCommitted.get());
}
return success;
}
/**
* Remove older instants from auxiliary meta folder.
*
* @param thresholdInstant Hoodie Instant
* @return success if all eligible file deleted successfully
* @throws IOException in case of error
*/
private boolean deleteAllInstantsOlderorEqualsInAuxMetaFolder(HoodieInstant thresholdInstant) throws IOException {
List<HoodieInstant> instants = null;
boolean success = true;
try {
instants =
metaClient.scanHoodieInstantsFromFileSystem(
new Path(metaClient.getMetaAuxiliaryPath()),
HoodieActiveTimeline.VALID_EXTENSIONS_IN_ACTIVE_TIMELINE,
false);
} catch (FileNotFoundException e) {
/*
* On some FSs deletion of all files in the directory can auto remove the directory itself.
* GCS is one example, as it doesn't have real directories and subdirectories. When client
* removes all the files from a "folder" on GCS is has to create a special "/" to keep the folder
* around. If this doesn't happen (timeout, misconfigured client, ...) folder will be deleted and
* in this case we should not break when aux folder is not found.
* GCS information: (https://cloud.google.com/storage/docs/gsutil/addlhelp/HowSubdirectoriesWork)
*/
LOG.warn("Aux path not found. Skipping: " + metaClient.getMetaAuxiliaryPath());
return success;
}
List<HoodieInstant> instantsToBeDeleted =
instants.stream().filter(instant1 -> HoodieTimeline.compareTimestamps(instant1.getTimestamp(),
LESSER_THAN_OR_EQUALS, thresholdInstant.getTimestamp())).collect(Collectors.toList());
for (HoodieInstant deleteInstant : instantsToBeDeleted) {
LOG.info("Deleting instant " + deleteInstant + " in auxiliary meta path " + metaClient.getMetaAuxiliaryPath());
Path metaFile = new Path(metaClient.getMetaAuxiliaryPath(), deleteInstant.getFileName());
if (metaClient.getFs().exists(metaFile)) {
success &= metaClient.getFs().delete(metaFile, false);
LOG.info("Deleted instant file in auxiliary metapath : " + metaFile);
}
}
return success;
}
public void archive(HoodieEngineContext context, List<HoodieInstant> instants) throws HoodieCommitException {
try {
HoodieTimeline commitTimeline = metaClient.getActiveTimeline().getAllCommitsTimeline().filterCompletedInstants();
Schema wrapperSchema = HoodieArchivedMetaEntry.getClassSchema();
LOG.info("Wrapper schema " + wrapperSchema.toString());
List<IndexedRecord> records = new ArrayList<>();
for (HoodieInstant hoodieInstant : instants) {
boolean deleteSuccess = deleteReplacedFileGroups(context, hoodieInstant);
if (!deleteSuccess) {
// throw error and stop archival if deleting replaced file groups failed.
throw new HoodieCommitException("Unable to delete file(s) for " + hoodieInstant.getFileName());
}
try {
deleteAnyLeftOverMarkerFiles(context, hoodieInstant);
records.add(convertToAvroRecord(commitTimeline, hoodieInstant));
if (records.size() >= this.config.getCommitArchivalBatchSize()) {
writeToFile(wrapperSchema, records);
}
} catch (Exception e) {
LOG.error("Failed to archive commits, .commit file: " + hoodieInstant.getFileName(), e);
if (this.config.isFailOnTimelineArchivingEnabled()) {
throw e;
}
}
}
writeToFile(wrapperSchema, records);
} catch (Exception e) {
throw new HoodieCommitException("Failed to archive commits", e);
}
}
private void deleteAnyLeftOverMarkerFiles(HoodieEngineContext context, HoodieInstant instant) {
MarkerFiles markerFiles = new MarkerFiles(table, instant.getTimestamp());
if (markerFiles.deleteMarkerDir(context, config.getMarkersDeleteParallelism())) {
LOG.info("Cleaned up left over marker directory for instant :" + instant);
}
}
private boolean deleteReplacedFileGroups(HoodieEngineContext context, HoodieInstant instant) {
if (!instant.isCompleted() || !HoodieTimeline.REPLACE_COMMIT_ACTION.equals(instant.getAction())) {
// only delete files for completed replace instants
return true;
}
TableFileSystemView fileSystemView = this.table.getFileSystemView();
List<String> replacedPartitions = getReplacedPartitions(instant);
return ReplaceArchivalHelper.deleteReplacedFileGroups(context, metaClient, fileSystemView, instant, replacedPartitions);
}
private List<String> getReplacedPartitions(HoodieInstant instant) {
try {
HoodieReplaceCommitMetadata metadata = HoodieReplaceCommitMetadata.fromBytes(
metaClient.getActiveTimeline().getInstantDetails(instant).get(),
HoodieReplaceCommitMetadata.class);
return new ArrayList<>(metadata.getPartitionToReplaceFileIds().keySet());
} catch (IOException e) {
throw new HoodieCommitException("Failed to archive because cannot delete replace files", e);
}
}
private void writeToFile(Schema wrapperSchema, List<IndexedRecord> records) throws Exception {
if (records.size() > 0) {
Map<HeaderMetadataType, String> header = new HashMap<>();
header.put(HoodieLogBlock.HeaderMetadataType.SCHEMA, wrapperSchema.toString());
HoodieAvroDataBlock block = new HoodieAvroDataBlock(records, header);
this.writer = writer.appendBlock(block);
records.clear();
}
}
private IndexedRecord convertToAvroRecord(HoodieTimeline commitTimeline, HoodieInstant hoodieInstant)
throws IOException {
HoodieArchivedMetaEntry archivedMetaWrapper = new HoodieArchivedMetaEntry();
archivedMetaWrapper.setCommitTime(hoodieInstant.getTimestamp());
archivedMetaWrapper.setActionState(hoodieInstant.getState().name());
switch (hoodieInstant.getAction()) {
case HoodieTimeline.CLEAN_ACTION: {
if (hoodieInstant.isCompleted()) {
archivedMetaWrapper.setHoodieCleanMetadata(CleanerUtils.getCleanerMetadata(metaClient, hoodieInstant));
} else {
archivedMetaWrapper.setHoodieCleanerPlan(CleanerUtils.getCleanerPlan(metaClient, hoodieInstant));
}
archivedMetaWrapper.setActionType(ActionType.clean.name());
break;
}
case HoodieTimeline.COMMIT_ACTION:
case HoodieTimeline.DELTA_COMMIT_ACTION: {
HoodieCommitMetadata commitMetadata = HoodieCommitMetadata
.fromBytes(commitTimeline.getInstantDetails(hoodieInstant).get(), HoodieCommitMetadata.class);
archivedMetaWrapper.setHoodieCommitMetadata(convertCommitMetadata(commitMetadata));
archivedMetaWrapper.setActionType(ActionType.commit.name());
break;
}
case HoodieTimeline.REPLACE_COMMIT_ACTION: {
HoodieReplaceCommitMetadata replaceCommitMetadata = HoodieReplaceCommitMetadata
.fromBytes(commitTimeline.getInstantDetails(hoodieInstant).get(), HoodieReplaceCommitMetadata.class);
archivedMetaWrapper.setHoodieReplaceCommitMetadata(ReplaceArchivalHelper.convertReplaceCommitMetadata(replaceCommitMetadata));
archivedMetaWrapper.setActionType(ActionType.replacecommit.name());
break;
}
case HoodieTimeline.ROLLBACK_ACTION: {
archivedMetaWrapper.setHoodieRollbackMetadata(TimelineMetadataUtils.deserializeAvroMetadata(
commitTimeline.getInstantDetails(hoodieInstant).get(), HoodieRollbackMetadata.class));
archivedMetaWrapper.setActionType(ActionType.rollback.name());
break;
}
case HoodieTimeline.SAVEPOINT_ACTION: {
archivedMetaWrapper.setHoodieSavePointMetadata(TimelineMetadataUtils.deserializeAvroMetadata(
commitTimeline.getInstantDetails(hoodieInstant).get(), HoodieSavepointMetadata.class));
archivedMetaWrapper.setActionType(ActionType.savepoint.name());
break;
}
case HoodieTimeline.COMPACTION_ACTION: {
HoodieCompactionPlan plan = CompactionUtils.getCompactionPlan(metaClient, hoodieInstant.getTimestamp());
archivedMetaWrapper.setHoodieCompactionPlan(plan);
archivedMetaWrapper.setActionType(ActionType.compaction.name());
break;
}
default: {
throw new UnsupportedOperationException("Action not fully supported yet");
}
}
return archivedMetaWrapper;
}
public static org.apache.hudi.avro.model.HoodieCommitMetadata convertCommitMetadata(
HoodieCommitMetadata hoodieCommitMetadata) {
ObjectMapper mapper = new ObjectMapper();
// Need this to ignore other public get() methods
mapper.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false);
org.apache.hudi.avro.model.HoodieCommitMetadata avroMetaData =
mapper.convertValue(hoodieCommitMetadata, org.apache.hudi.avro.model.HoodieCommitMetadata.class);
// Do not archive Rolling Stats, cannot set to null since AVRO will throw null pointer
avroMetaData.getExtraMetadata().put(HoodieRollingStatMetadata.ROLLING_STAT_METADATA_KEY, "");
return avroMetaData;
}
}