blob: 9eae46cc337adc7240a4a1f094adb7c0d423071e [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hudi.table;
import org.apache.hudi.avro.AvroSchemaUtils;
import org.apache.hudi.avro.HoodieAvroUtils;
import org.apache.hudi.avro.model.HoodieCleanMetadata;
import org.apache.hudi.avro.model.HoodieCleanerPlan;
import org.apache.hudi.avro.model.HoodieClusteringPlan;
import org.apache.hudi.avro.model.HoodieCompactionPlan;
import org.apache.hudi.avro.model.HoodieIndexCommitMetadata;
import org.apache.hudi.avro.model.HoodieIndexPlan;
import org.apache.hudi.avro.model.HoodieRestoreMetadata;
import org.apache.hudi.avro.model.HoodieRestorePlan;
import org.apache.hudi.avro.model.HoodieRollbackMetadata;
import org.apache.hudi.avro.model.HoodieRollbackPlan;
import org.apache.hudi.avro.model.HoodieSavepointMetadata;
import org.apache.hudi.common.HoodiePendingRollbackInfo;
import org.apache.hudi.common.config.HoodieMetadataConfig;
import org.apache.hudi.common.config.SerializableConfiguration;
import org.apache.hudi.common.engine.HoodieEngineContext;
import org.apache.hudi.common.engine.HoodieLocalEngineContext;
import org.apache.hudi.common.engine.TaskContextSupplier;
import org.apache.hudi.common.fs.ConsistencyGuard;
import org.apache.hudi.common.fs.ConsistencyGuard.FileVisibility;
import org.apache.hudi.common.fs.ConsistencyGuardConfig;
import org.apache.hudi.common.fs.FailSafeConsistencyGuard;
import org.apache.hudi.common.fs.OptimisticConsistencyGuard;
import org.apache.hudi.common.model.HoodieFailedWritesCleaningPolicy;
import org.apache.hudi.common.model.HoodieFileFormat;
import org.apache.hudi.common.model.HoodieKey;
import org.apache.hudi.common.model.HoodieWriteStat;
import org.apache.hudi.common.table.HoodieTableConfig;
import org.apache.hudi.common.table.HoodieTableMetaClient;
import org.apache.hudi.common.table.TableSchemaResolver;
import org.apache.hudi.common.table.timeline.HoodieActiveTimeline;
import org.apache.hudi.common.table.timeline.HoodieInstant;
import org.apache.hudi.common.table.timeline.HoodieTimeline;
import org.apache.hudi.common.table.view.FileSystemViewManager;
import org.apache.hudi.common.table.view.HoodieTableFileSystemView;
import org.apache.hudi.common.table.view.SyncableFileSystemView;
import org.apache.hudi.common.table.view.TableFileSystemView;
import org.apache.hudi.common.table.view.TableFileSystemView.BaseFileOnlyView;
import org.apache.hudi.common.table.view.TableFileSystemView.SliceView;
import org.apache.hudi.common.util.Functions;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.StringUtils;
import org.apache.hudi.common.util.ValidationUtils;
import org.apache.hudi.common.util.collection.Pair;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.exception.HoodieException;
import org.apache.hudi.exception.HoodieIOException;
import org.apache.hudi.exception.HoodieInsertException;
import org.apache.hudi.exception.HoodieMetadataException;
import org.apache.hudi.exception.HoodieUpsertException;
import org.apache.hudi.index.HoodieIndex;
import org.apache.hudi.metadata.HoodieTableMetadata;
import org.apache.hudi.metadata.HoodieTableMetadataWriter;
import org.apache.hudi.metadata.MetadataPartitionType;
import org.apache.hudi.table.action.HoodieWriteMetadata;
import org.apache.hudi.table.action.bootstrap.HoodieBootstrapWriteMetadata;
import org.apache.hudi.table.marker.WriteMarkers;
import org.apache.hudi.table.marker.WriteMarkersFactory;
import org.apache.hudi.table.storage.HoodieLayoutFactory;
import org.apache.hudi.table.storage.HoodieStorageLayout;
import org.apache.avro.Schema;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.io.Serializable;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.TimeoutException;
import java.util.function.Function;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import static org.apache.hudi.common.model.HoodieFailedWritesCleaningPolicy.EAGER;
import static org.apache.hudi.common.model.HoodieFailedWritesCleaningPolicy.LAZY;
import static org.apache.hudi.common.table.HoodieTableConfig.TABLE_METADATA_PARTITIONS;
import static org.apache.hudi.common.util.StringUtils.EMPTY_STRING;
import static org.apache.hudi.metadata.HoodieTableMetadataUtil.deleteMetadataPartition;
import static org.apache.hudi.metadata.HoodieTableMetadataUtil.deleteMetadataTable;
import static org.apache.hudi.metadata.HoodieTableMetadataUtil.getMetadataPartitionsNeedingWriteStatusTracking;
import static org.apache.hudi.metadata.HoodieTableMetadataUtil.metadataPartitionExists;
/**
* Abstract implementation of a HoodieTable.
*
* @param <T> Sub type of HoodieRecordPayload
* @param <I> Type of inputs
* @param <K> Type of keys
* @param <O> Type of outputs
*/
public abstract class HoodieTable<T, I, K, O> implements Serializable {
private static final Logger LOG = LoggerFactory.getLogger(HoodieTable.class);
protected final HoodieWriteConfig config;
protected final HoodieTableMetaClient metaClient;
protected final HoodieIndex<?, ?> index;
private SerializableConfiguration hadoopConfiguration;
protected final TaskContextSupplier taskContextSupplier;
private final HoodieTableMetadata metadata;
private final HoodieStorageLayout storageLayout;
private final boolean isMetadataTable;
private transient FileSystemViewManager viewManager;
protected final transient HoodieEngineContext context;
protected HoodieTable(HoodieWriteConfig config, HoodieEngineContext context, HoodieTableMetaClient metaClient) {
this.config = config;
this.hadoopConfiguration = context.getHadoopConf();
this.context = context;
this.isMetadataTable = HoodieTableMetadata.isMetadataTable(config.getBasePath());
HoodieMetadataConfig metadataConfig = HoodieMetadataConfig.newBuilder().fromProperties(config.getMetadataConfig().getProps())
.build();
this.metadata = HoodieTableMetadata.create(context, metadataConfig, config.getBasePath());
this.viewManager = FileSystemViewManager.createViewManager(context, config.getMetadataConfig(), config.getViewStorageConfig(), config.getCommonConfig(), unused -> metadata);
this.metaClient = metaClient;
this.index = getIndex(config, context);
this.storageLayout = getStorageLayout(config);
this.taskContextSupplier = context.getTaskContextSupplier();
}
public boolean isMetadataTable() {
return isMetadataTable;
}
protected abstract HoodieIndex<?, ?> getIndex(HoodieWriteConfig config, HoodieEngineContext context);
protected HoodieStorageLayout getStorageLayout(HoodieWriteConfig config) {
return HoodieLayoutFactory.createLayout(config);
}
private synchronized FileSystemViewManager getViewManager() {
if (null == viewManager) {
viewManager = FileSystemViewManager.createViewManager(getContext(), config.getMetadataConfig(), config.getViewStorageConfig(), config.getCommonConfig(), unused -> metadata);
}
return viewManager;
}
public HoodieTableMetadata getMetadata() {
return metadata;
}
/**
* Upsert a batch of new records into Hoodie table at the supplied instantTime.
* @param context HoodieEngineContext
* @param instantTime Instant Time for the action
* @param records hoodieRecords to upsert
* @return HoodieWriteMetadata
*/
public abstract HoodieWriteMetadata<O> upsert(HoodieEngineContext context, String instantTime,
I records);
/**
* Insert a batch of new records into Hoodie table at the supplied instantTime.
* @param context HoodieEngineContext
* @param instantTime Instant Time for the action
* @param records hoodieRecords to upsert
* @return HoodieWriteMetadata
*/
public abstract HoodieWriteMetadata<O> insert(HoodieEngineContext context, String instantTime,
I records);
/**
* Bulk Insert a batch of new records into Hoodie table at the supplied instantTime.
* @param context HoodieEngineContext
* @param instantTime Instant Time for the action
* @param records hoodieRecords to upsert
* @param bulkInsertPartitioner User Defined Partitioner
* @return HoodieWriteMetadata
*/
public abstract HoodieWriteMetadata<O> bulkInsert(HoodieEngineContext context, String instantTime,
I records, Option<BulkInsertPartitioner> bulkInsertPartitioner);
/**
* Deletes a list of {@link HoodieKey}s from the Hoodie table, at the supplied instantTime {@link HoodieKey}s will be
* de-duped and non existent keys will be removed before deleting.
*
* @param context HoodieEngineContext
* @param instantTime Instant Time for the action
* @param keys {@link List} of {@link HoodieKey}s to be deleted
* @return HoodieWriteMetadata
*/
public abstract HoodieWriteMetadata<O> delete(HoodieEngineContext context, String instantTime, K keys);
/**
* Delete records from Hoodie table based on {@link HoodieKey} and {@link HoodieRecordLocation} specified in
* preppedRecords.
*
* @param context {@link HoodieEngineContext}.
* @param instantTime Instant Time for the action.
* @param preppedRecords Empty records with key and locator set.
* @return {@link HoodieWriteMetadata}
*/
public abstract HoodieWriteMetadata<O> deletePrepped(HoodieEngineContext context, String instantTime, I preppedRecords);
/**
* Deletes all data of partitions.
* @param context HoodieEngineContext
* @param instantTime Instant Time for the action
* @param partitions {@link List} of partition to be deleted
* @return HoodieWriteMetadata
*/
public abstract HoodieWriteMetadata<O> deletePartitions(HoodieEngineContext context, String instantTime, List<String> partitions);
/**
* Upserts the given prepared records into the Hoodie table, at the supplied instantTime.
* <p>
* This implementation requires that the input records are already tagged, and de-duped if needed.
* @param context HoodieEngineContext
* @param instantTime Instant Time for the action
* @param preppedRecords hoodieRecords to upsert
* @return HoodieWriteMetadata
*/
public abstract HoodieWriteMetadata<O> upsertPrepped(HoodieEngineContext context, String instantTime,
I preppedRecords);
/**
* Inserts the given prepared records into the Hoodie table, at the supplied instantTime.
* <p>
* This implementation requires that the input records are already tagged, and de-duped if needed.
* @param context HoodieEngineContext
* @param instantTime Instant Time for the action
* @param preppedRecords hoodieRecords to upsert
* @return HoodieWriteMetadata
*/
public abstract HoodieWriteMetadata<O> insertPrepped(HoodieEngineContext context, String instantTime,
I preppedRecords);
/**
* Bulk Insert the given prepared records into the Hoodie table, at the supplied instantTime.
* <p>
* This implementation requires that the input records are already tagged, and de-duped if needed.
* @param context HoodieEngineContext
* @param instantTime Instant Time for the action
* @param preppedRecords hoodieRecords to upsert
* @param bulkInsertPartitioner User Defined Partitioner
* @return HoodieWriteMetadata
*/
public abstract HoodieWriteMetadata<O> bulkInsertPrepped(HoodieEngineContext context, String instantTime,
I preppedRecords, Option<BulkInsertPartitioner> bulkInsertPartitioner);
/**
* Replaces all the existing records and inserts the specified new records into Hoodie table at the supplied instantTime,
* for the partition paths contained in input records.
*
* @param context HoodieEngineContext
* @param instantTime Instant time for the replace action
* @param records input records
* @return HoodieWriteMetadata
*/
public abstract HoodieWriteMetadata<O> insertOverwrite(HoodieEngineContext context, String instantTime, I records);
/**
* Delete all the existing records of the Hoodie table and inserts the specified new records into Hoodie table at the supplied instantTime,
* for the partition paths contained in input records.
*
* @param context HoodieEngineContext
* @param instantTime Instant time for the replace action
* @param records input records
* @return HoodieWriteMetadata
*/
public abstract HoodieWriteMetadata<O> insertOverwriteTable(HoodieEngineContext context, String instantTime, I records);
public HoodieWriteConfig getConfig() {
return config;
}
public HoodieTableMetaClient getMetaClient() {
return metaClient;
}
/**
* @return if the table is physically partitioned, based on the partition fields stored in the table config.
*/
public boolean isPartitioned() {
return getMetaClient().getTableConfig().isTablePartitioned();
}
public Configuration getHadoopConf() {
return metaClient.getHadoopConf();
}
/**
* Get the view of the file system for this table.
*/
public TableFileSystemView getFileSystemView() {
return new HoodieTableFileSystemView(metaClient, getCompletedCommitsTimeline());
}
/**
* Get the base file only view of the file system for this table.
*/
public BaseFileOnlyView getBaseFileOnlyView() {
return getViewManager().getFileSystemView(metaClient);
}
/**
* Get the full view of the file system for this table.
*/
public SliceView getSliceView() {
return getViewManager().getFileSystemView(metaClient);
}
/**
* Get complete view of the file system for this table with ability to force sync.
*/
public SyncableFileSystemView getHoodieView() {
return getViewManager().getFileSystemView(metaClient);
}
/**
* Get only the completed (no-inflights) commit + deltacommit timeline.
*/
public HoodieTimeline getCompletedCommitsTimeline() {
return metaClient.getCommitsTimeline().filterCompletedInstants();
}
/**
* Get only the completed (no-inflights) commit timeline.
*/
public HoodieTimeline getCompletedCommitTimeline() {
return metaClient.getCommitTimeline().filterCompletedInstants();
}
/**
* Get only the inflights (no-completed) commit timeline.
*/
public HoodieTimeline getPendingCommitTimeline() {
return metaClient.getCommitsTimeline().filterPendingExcludingMajorAndMinorCompaction();
}
/**
* Get only the completed (no-inflights) clean timeline.
*/
public HoodieTimeline getCompletedCleanTimeline() {
return getActiveTimeline().getCleanerTimeline().filterCompletedInstants();
}
/**
* Get clean timeline.
*/
public HoodieTimeline getCleanTimeline() {
return getActiveTimeline().getCleanerTimeline();
}
/**
* Get rollback timeline.
*/
public HoodieTimeline getRollbackTimeline() {
return getActiveTimeline().getRollbackTimeline();
}
/**
* Get restore timeline.
*/
public HoodieTimeline getRestoreTimeline() {
return getActiveTimeline().getRestoreTimeline();
}
/**
* Get only the completed (no-inflights) savepoint timeline.
*/
public HoodieTimeline getCompletedSavepointTimeline() {
return getActiveTimeline().getSavePointTimeline().filterCompletedInstants();
}
/**
* Get the list of savepoint timestamps in this table.
*/
public Set<String> getSavepointTimestamps() {
return getCompletedSavepointTimeline().getInstantsAsStream().map(HoodieInstant::getTimestamp).collect(Collectors.toSet());
}
public HoodieActiveTimeline getActiveTimeline() {
return metaClient.getActiveTimeline();
}
/**
* Return the index.
*/
public HoodieIndex<?, ?> getIndex() {
return index;
}
public HoodieStorageLayout getStorageLayout() {
return storageLayout;
}
/**
* Schedule compaction for the instant time.
*
* @param context HoodieEngineContext
* @param instantTime Instant Time for scheduling compaction
* @param extraMetadata additional metadata to write into plan
* @return
*/
public abstract Option<HoodieCompactionPlan> scheduleCompaction(HoodieEngineContext context,
String instantTime,
Option<Map<String, String>> extraMetadata);
/**
* Run Compaction on the table. Compaction arranges the data so that it is optimized for data access.
*
* @param context HoodieEngineContext
* @param compactionInstantTime Instant Time
*/
public abstract HoodieWriteMetadata<O> compact(HoodieEngineContext context,
String compactionInstantTime);
/**
* Schedule log compaction for the instant time.
*
* @param context HoodieEngineContext
* @param instantTime Instant Time for scheduling log compaction
* @param extraMetadata additional metadata to write into plan
* @return
*/
public Option<HoodieCompactionPlan> scheduleLogCompaction(HoodieEngineContext context,
String instantTime,
Option<Map<String, String>> extraMetadata) {
throw new UnsupportedOperationException("Log compaction is not supported for this table type");
}
/**
* Run Log Compaction on the table. Log Compaction arranges the data so that it is optimized for data access.
*
* @param context HoodieEngineContext
* @param logCompactionInstantTime Instant Time
*/
public HoodieWriteMetadata<O> logCompact(HoodieEngineContext context,
String logCompactionInstantTime) {
throw new UnsupportedOperationException("Log compaction is not supported for this table type");
}
/**
* Schedule clustering for the instant time.
*
* @param context HoodieEngineContext
* @param instantTime Instant Time for scheduling clustering
* @param extraMetadata additional metadata to write into plan
* @return HoodieClusteringPlan, if there is enough data for clustering.
*/
public abstract Option<HoodieClusteringPlan> scheduleClustering(HoodieEngineContext context,
String instantTime,
Option<Map<String, String>> extraMetadata);
/**
* Execute Clustering on the table. Clustering re-arranges the data so that it is optimized for data access.
*
* @param context HoodieEngineContext
* @param clusteringInstantTime Instant Time
*/
public abstract HoodieWriteMetadata<O> cluster(HoodieEngineContext context, String clusteringInstantTime);
/**
* Perform metadata/full bootstrap of a Hudi table.
* @param context HoodieEngineContext
* @param extraMetadata Additional Metadata for storing in commit file.
* @return HoodieBootstrapWriteMetadata
*/
public abstract HoodieBootstrapWriteMetadata<O> bootstrap(HoodieEngineContext context, Option<Map<String, String>> extraMetadata);
/**
* Perform rollback of bootstrap of a Hudi table.
* @param context HoodieEngineContext
*/
public abstract void rollbackBootstrap(HoodieEngineContext context, String instantTime);
/**
* Schedule cleaning for the instant time.
*
* @param context HoodieEngineContext
* @param instantTime Instant Time for scheduling cleaning
* @param extraMetadata additional metadata to write into plan
* @return HoodieCleanerPlan, if there is anything to clean.
*/
public abstract Option<HoodieCleanerPlan> scheduleCleaning(HoodieEngineContext context,
String instantTime,
Option<Map<String, String>> extraMetadata);
/**
* Executes a new clean action.
*
* @return information on cleaned file slices
*/
@Deprecated
public HoodieCleanMetadata clean(HoodieEngineContext context, String cleanInstantTime, boolean skipLocking) {
return clean(context, cleanInstantTime);
}
/**
* Executes a new clean action.
*
* @return information on cleaned file slices
*/
public abstract HoodieCleanMetadata clean(HoodieEngineContext context, String cleanInstantTime);
/**
* Schedule rollback for the instant time.
*
* @param context HoodieEngineContext
* @param instantTime Instant Time for scheduling rollback
* @param instantToRollback instant to be rolled back
* @param shouldRollbackUsingMarkers uses marker based rollback strategy when set to true. uses list based rollback when false.
* @return HoodieRollbackPlan containing info on rollback.
*/
public abstract Option<HoodieRollbackPlan> scheduleRollback(HoodieEngineContext context,
String instantTime,
HoodieInstant instantToRollback,
boolean skipTimelinePublish, boolean shouldRollbackUsingMarkers,
boolean isRestore);
/**
* Rollback the (inflight/committed) record changes with the given commit time.
* <pre>
* Three steps:
* (1) Atomically unpublish this commit
* (2) clean indexing data
* (3) clean new generated parquet files.
* (4) Finally delete .commit or .inflight file, if deleteInstants = true
* </pre>
*/
public abstract HoodieRollbackMetadata rollback(HoodieEngineContext context,
String rollbackInstantTime,
HoodieInstant commitInstant,
boolean deleteInstants,
boolean skipLocking);
/**
* Schedules Indexing for the table to the given instant.
*
* @param context HoodieEngineContext
* @param indexInstantTime Instant time for scheduling index action.
* @param partitionsToIndex List of {@link MetadataPartitionType} that should be indexed.
* @return HoodieIndexPlan containing metadata partitions and instant upto which they should be indexed.
*/
public abstract Option<HoodieIndexPlan> scheduleIndexing(HoodieEngineContext context, String indexInstantTime, List<MetadataPartitionType> partitionsToIndex);
/**
* Execute requested index action.
*
* @param context HoodieEngineContext
* @param indexInstantTime Instant time for which index action was scheduled.
* @return HoodieIndexCommitMetadata containing write stats for each metadata partition.
*/
public abstract Option<HoodieIndexCommitMetadata> index(HoodieEngineContext context, String indexInstantTime);
/**
* Create a savepoint at the specified instant, so that the table can be restored
* to this point-in-timeline later if needed.
*/
public abstract HoodieSavepointMetadata savepoint(HoodieEngineContext context,
String instantToSavepoint,
String user,
String comment);
/**
* Restore the table to the given instant. Note that this is a admin table recovery operation
* that would cause any running queries that are accessing file slices written after the instant to fail.
*/
public abstract HoodieRestoreMetadata restore(HoodieEngineContext context,
String restoreInstantTimestamp,
String savepointToRestoreTimestamp);
/**
* Schedules Restore for the table to the given instant.
*/
public abstract Option<HoodieRestorePlan> scheduleRestore(HoodieEngineContext context,
String restoreInstantTimestamp,
String savepointToRestoreTimestamp);
public void rollbackInflightCompaction(HoodieInstant inflightInstant) {
rollbackInflightCompaction(inflightInstant, s -> Option.empty());
}
public void rollbackInflightLogCompaction(HoodieInstant inflightInstant) {
rollbackInflightLogCompaction(inflightInstant, s -> Option.empty());
}
/**
* Rollback failed compactions. Inflight rollbacks for compactions revert the .inflight file
* to the .requested file.
*
* @param inflightInstant Inflight Compaction Instant
*/
public void rollbackInflightCompaction(HoodieInstant inflightInstant,
Function<String, Option<HoodiePendingRollbackInfo>> getPendingRollbackInstantFunc) {
ValidationUtils.checkArgument(inflightInstant.getAction().equals(HoodieTimeline.COMPACTION_ACTION));
rollbackInflightInstant(inflightInstant, getPendingRollbackInstantFunc);
}
/**
* Rollback inflight clustering instant to requested clustering instant
*
* @param inflightInstant Inflight clustering instant
* @param getPendingRollbackInstantFunc Function to get rollback instant
*/
public void rollbackInflightClustering(HoodieInstant inflightInstant,
Function<String, Option<HoodiePendingRollbackInfo>> getPendingRollbackInstantFunc) {
ValidationUtils.checkArgument(inflightInstant.getAction().equals(HoodieTimeline.REPLACE_COMMIT_ACTION));
rollbackInflightInstant(inflightInstant, getPendingRollbackInstantFunc);
}
/**
* Rollback inflight instant to requested instant
*
* @param inflightInstant Inflight instant
* @param getPendingRollbackInstantFunc Function to get rollback instant
*/
private void rollbackInflightInstant(HoodieInstant inflightInstant,
Function<String, Option<HoodiePendingRollbackInfo>> getPendingRollbackInstantFunc) {
final String commitTime = getPendingRollbackInstantFunc.apply(inflightInstant.getTimestamp()).map(entry
-> entry.getRollbackInstant().getTimestamp()).orElse(HoodieActiveTimeline.createNewInstantTime());
scheduleRollback(context, commitTime, inflightInstant, false, config.shouldRollbackUsingMarkers(),
false);
rollback(context, commitTime, inflightInstant, false, false);
getActiveTimeline().revertInstantFromInflightToRequested(inflightInstant);
}
/**
* Rollback failed compactions. Inflight rollbacks for compactions revert the .inflight file
* to the .requested file.
*
* @param inflightInstant Inflight Compaction Instant
*/
public void rollbackInflightLogCompaction(HoodieInstant inflightInstant, Function<String, Option<HoodiePendingRollbackInfo>> getPendingRollbackInstantFunc) {
final String commitTime = getPendingRollbackInstantFunc.apply(inflightInstant.getTimestamp()).map(entry
-> entry.getRollbackInstant().getTimestamp()).orElse(HoodieActiveTimeline.createNewInstantTime());
scheduleRollback(context, commitTime, inflightInstant, false, config.shouldRollbackUsingMarkers(),
false);
rollback(context, commitTime, inflightInstant, true, false);
}
/**
* Finalize the written data onto storage. Perform any final cleanups.
*
* @param context HoodieEngineContext
* @param stats List of HoodieWriteStats
* @throws HoodieIOException if some paths can't be finalized on storage
*/
public void finalizeWrite(HoodieEngineContext context, String instantTs, List<HoodieWriteStat> stats) throws HoodieIOException {
reconcileAgainstMarkers(context, instantTs, stats, config.getConsistencyGuardConfig().isConsistencyCheckEnabled());
}
private void deleteInvalidFilesByPartitions(HoodieEngineContext context, Map<String, List<Pair<String, String>>> invalidFilesByPartition) {
// Now delete partially written files
context.setJobStatus(this.getClass().getSimpleName(), "Delete invalid files generated during the write operation: " + config.getTableName());
context.map(invalidFilesByPartition.values().stream()
.flatMap(Collection::stream)
.collect(Collectors.toList()),
partitionFilePair -> {
final FileSystem fileSystem = metaClient.getFs();
LOG.info("Deleting invalid data file=" + partitionFilePair);
// Delete
try {
fileSystem.delete(new Path(partitionFilePair.getValue()), false);
} catch (IOException e) {
throw new HoodieIOException(e.getMessage(), e);
}
return true;
}, config.getFinalizeWriteParallelism());
}
/**
* Returns the possible invalid data file name with given marker files.
*/
protected Set<String> getInvalidDataPaths(WriteMarkers markers) throws IOException {
return markers.createdAndMergedDataPaths(context, config.getFinalizeWriteParallelism());
}
/**
* Reconciles WriteStats and marker files to detect and safely delete duplicate data files created because of Spark
* retries.
*
* @param context HoodieEngineContext
* @param instantTs Instant Timestamp
* @param stats Hoodie Write Stat
* @param consistencyCheckEnabled Consistency Check Enabled
* @throws HoodieIOException
*/
protected void reconcileAgainstMarkers(HoodieEngineContext context,
String instantTs,
List<HoodieWriteStat> stats,
boolean consistencyCheckEnabled) throws HoodieIOException {
try {
// Reconcile marker and data files with WriteStats so that partially written data-files due to failed
// (but succeeded on retry) tasks are removed.
String basePath = getMetaClient().getBasePath();
WriteMarkers markers = WriteMarkersFactory.get(config.getMarkersType(), this, instantTs);
if (!markers.doesMarkerDirExist()) {
// can happen if it was an empty write say.
return;
}
// we are not including log appends here, since they are already fail-safe.
Set<String> invalidDataPaths = getInvalidDataPaths(markers);
Set<String> validDataPaths = stats.stream()
.map(HoodieWriteStat::getPath)
.filter(p -> p.endsWith(this.getBaseFileExtension()))
.collect(Collectors.toSet());
// Contains list of partially created files. These needs to be cleaned up.
invalidDataPaths.removeAll(validDataPaths);
if (!invalidDataPaths.isEmpty()) {
LOG.info("Removing duplicate data files created due to task retries before committing. Paths=" + invalidDataPaths);
Map<String, List<Pair<String, String>>> invalidPathsByPartition = invalidDataPaths.stream()
.map(dp -> Pair.of(new Path(basePath, dp).getParent().toString(), new Path(basePath, dp).toString()))
.collect(Collectors.groupingBy(Pair::getKey));
// Ensure all files in delete list is actually present. This is mandatory for an eventually consistent FS.
// Otherwise, we may miss deleting such files. If files are not found even after retries, fail the commit
if (consistencyCheckEnabled) {
// This will either ensure all files to be deleted are present.
waitForAllFiles(context, invalidPathsByPartition, FileVisibility.APPEAR);
}
// Now delete partially written files
context.setJobStatus(this.getClass().getSimpleName(), "Delete all partially written files: " + config.getTableName());
deleteInvalidFilesByPartitions(context, invalidPathsByPartition);
// Now ensure the deleted files disappear
if (consistencyCheckEnabled) {
// This will either ensure all files to be deleted are absent.
waitForAllFiles(context, invalidPathsByPartition, FileVisibility.DISAPPEAR);
}
}
} catch (IOException ioe) {
throw new HoodieIOException(ioe.getMessage(), ioe);
}
}
/**
* Ensures all files passed either appear or disappear.
*
* @param context HoodieEngineContext
* @param groupByPartition Files grouped by partition
* @param visibility Appear/Disappear
*/
private void waitForAllFiles(HoodieEngineContext context, Map<String, List<Pair<String, String>>> groupByPartition, FileVisibility visibility) {
// This will either ensure all files to be deleted are present.
context.setJobStatus(this.getClass().getSimpleName(), "Wait for all files to appear/disappear: " + config.getTableName());
boolean checkPassed =
context.map(new ArrayList<>(groupByPartition.entrySet()), partitionWithFileList -> waitForCondition(partitionWithFileList.getKey(),
partitionWithFileList.getValue().stream(), visibility), config.getFinalizeWriteParallelism())
.stream().allMatch(x -> x);
if (!checkPassed) {
throw new HoodieIOException("Consistency check failed to ensure all files " + visibility);
}
}
private boolean waitForCondition(String partitionPath, Stream<Pair<String, String>> partitionFilePaths, FileVisibility visibility) {
final FileSystem fileSystem = metaClient.getRawFs();
List<String> fileList = partitionFilePaths.map(Pair::getValue).collect(Collectors.toList());
try {
getConsistencyGuard(fileSystem, config.getConsistencyGuardConfig()).waitTill(partitionPath, fileList, visibility);
} catch (IOException | TimeoutException ioe) {
LOG.error("Got exception while waiting for files to show up", ioe);
return false;
}
return true;
}
/**
* Instantiate {@link ConsistencyGuard} based on configs.
* <p>
* Default consistencyGuard class is {@link OptimisticConsistencyGuard}.
*/
public static ConsistencyGuard getConsistencyGuard(FileSystem fs, ConsistencyGuardConfig consistencyGuardConfig) throws IOException {
try {
return consistencyGuardConfig.shouldEnableOptimisticConsistencyGuard()
? new OptimisticConsistencyGuard(fs, consistencyGuardConfig) : new FailSafeConsistencyGuard(fs, consistencyGuardConfig);
} catch (Throwable e) {
throw new IOException("Could not load ConsistencyGuard ", e);
}
}
public TaskContextSupplier getTaskContextSupplier() {
return taskContextSupplier;
}
/**
* Ensure that the current writerSchema is compatible with the latest schema of this dataset.
*
* When inserting/updating data, we read records using the last used schema and convert them to the
* GenericRecords with writerSchema. Hence, we need to ensure that this conversion can take place without errors.
*/
private void validateSchema() throws HoodieUpsertException, HoodieInsertException {
boolean shouldValidate = config.shouldValidateAvroSchema();
boolean allowProjection = config.shouldAllowAutoEvolutionColumnDrop();
if ((!shouldValidate && allowProjection)
|| getActiveTimeline().getCommitsTimeline().filterCompletedInstants().empty()
|| StringUtils.isNullOrEmpty(config.getSchema())
) {
// Check not required
return;
}
try {
TableSchemaResolver schemaResolver = new TableSchemaResolver(getMetaClient());
Schema writerSchema = HoodieAvroUtils.createHoodieWriteSchema(config.getSchema());
Schema tableSchema = HoodieAvroUtils.createHoodieWriteSchema(schemaResolver.getTableAvroSchema(false));
AvroSchemaUtils.checkSchemaCompatible(tableSchema, writerSchema, shouldValidate, allowProjection, getDropPartitionColNames());
} catch (Exception e) {
throw new HoodieException("Failed to read schema/check compatibility for base path " + metaClient.getBasePath(), e);
}
}
public void validateUpsertSchema() throws HoodieUpsertException {
if (isMetadataTable) {
return;
}
// validate only for data table.
try {
validateSchema();
} catch (HoodieException e) {
throw new HoodieUpsertException("Failed upsert schema compatibility check", e);
}
}
public void validateInsertSchema() throws HoodieInsertException {
if (isMetadataTable) {
return;
}
// validate only for data table
try {
validateSchema();
} catch (HoodieException e) {
throw new HoodieInsertException("Failed insert schema compatibility check", e);
}
}
public HoodieFileFormat getBaseFileFormat() {
return metaClient.getTableConfig().getBaseFileFormat();
}
public HoodieFileFormat getLogFileFormat() {
return metaClient.getTableConfig().getLogFileFormat();
}
public Option<HoodieFileFormat> getPartitionMetafileFormat() {
return metaClient.getTableConfig().getPartitionMetafileFormat();
}
public String getBaseFileExtension() {
return getBaseFileFormat().getFileExtension();
}
public boolean requireSortedRecords() {
return getBaseFileFormat() == HoodieFileFormat.HFILE;
}
public HoodieEngineContext getContext() {
// This is to handle scenarios where this is called at the executor tasks which do not have access
// to engine context, and it ends up being null (as its not serializable and marked transient here).
return context == null ? new HoodieLocalEngineContext(hadoopConfiguration.get()) : context;
}
/**
* Get Table metadata writer.
*
* @param triggeringInstantTimestamp - The instant that is triggering this metadata write
* @return instance of {@link HoodieTableMetadataWriter}
*/
public final Option<HoodieTableMetadataWriter> getMetadataWriter(String triggeringInstantTimestamp) {
return getMetadataWriter(triggeringInstantTimestamp, EAGER);
}
/**
* Gets the metadata writer for async indexer.
*
* @param triggeringInstantTimestamp The instant that is triggering this metadata write.
* @return An instance of {@link HoodieTableMetadataWriter}.
*/
public Option<HoodieTableMetadataWriter> getIndexingMetadataWriter(String triggeringInstantTimestamp) {
return getMetadataWriter(triggeringInstantTimestamp, LAZY);
}
/**
* Gets the metadata writer for regular writes.
*
* @param triggeringInstantTimestamp The instant that is triggering this metadata write.
* @return An instance of {@link HoodieTableMetadataWriter}.
*/
/**
* Get Table metadata writer.
* <p>
* Note:
* Get the metadata writer for the conf. If the metadata table doesn't exist,
* this wil trigger the creation of the table and the initial bootstrapping.
* Since this call is under the transaction lock, other concurrent writers
* are blocked from doing the similar initial metadata table creation and
* the bootstrapping.
*
* @param triggeringInstantTimestamp The instant that is triggering this metadata write
* @param failedWritesCleaningPolicy Cleaning policy on failed writes
* @return instance of {@link HoodieTableMetadataWriter}
*/
protected Option<HoodieTableMetadataWriter> getMetadataWriter(
String triggeringInstantTimestamp,
HoodieFailedWritesCleaningPolicy failedWritesCleaningPolicy) {
// Each engine is expected to override this and
// provide the actual metadata writer, if enabled.
return Option.empty();
}
/**
* Deletes the metadata table if the writer disables metadata table with hoodie.metadata.enable=false
*/
public void maybeDeleteMetadataTable() {
if (shouldExecuteMetadataTableDeletion()) {
try {
LOG.info("Deleting metadata table because it is disabled in writer.");
deleteMetadataTable(config.getBasePath(), context);
} catch (HoodieMetadataException e) {
throw new HoodieException("Failed to delete metadata table.", e);
}
}
}
/**
* Deletes the metadata partition if the writer disables any metadata index.
*/
public void deleteMetadataIndexIfNecessary() {
Stream.of(MetadataPartitionType.values()).forEach(partitionType -> {
if (shouldDeleteMetadataPartition(partitionType)) {
try {
LOG.info("Deleting metadata partition because it is disabled in writer: " + partitionType.name());
if (metadataPartitionExists(metaClient.getBasePath(), context, partitionType)) {
deleteMetadataPartition(metaClient.getBasePath(), context, partitionType);
}
clearMetadataTablePartitionsConfig(Option.of(partitionType), false);
} catch (HoodieMetadataException e) {
throw new HoodieException("Failed to delete metadata partition: " + partitionType.name(), e);
}
}
});
}
private boolean shouldDeleteMetadataPartition(MetadataPartitionType partitionType) {
// Only delete metadata table partition when all the following conditions are met:
// (1) This is data table.
// (2) Index corresponding to this metadata partition is disabled in HoodieWriteConfig.
// (3) The completed metadata partitions in table config contains this partition.
// NOTE: Inflight metadata partitions are not considered as they could have been inflight due to async indexer.
if (isMetadataTable() || !config.isMetadataTableEnabled()) {
return false;
}
boolean metadataIndexDisabled;
switch (partitionType) {
// NOTE: FILES partition type is always considered in sync with hoodie.metadata.enable.
// It cannot be the case that metadata is enabled but FILES is disabled.
case COLUMN_STATS:
metadataIndexDisabled = !config.isMetadataColumnStatsIndexEnabled();
break;
case BLOOM_FILTERS:
metadataIndexDisabled = !config.isMetadataBloomFilterIndexEnabled();
break;
case RECORD_INDEX:
metadataIndexDisabled = !config.isRecordIndexEnabled();
break;
default:
LOG.debug("Not a valid metadata partition type: " + partitionType.name());
return false;
}
return metadataIndexDisabled
&& metaClient.getTableConfig().getMetadataPartitions().contains(partitionType.getPartitionPath());
}
private boolean shouldExecuteMetadataTableDeletion() {
// Only execute metadata table deletion when all the following conditions are met
// (1) This is data table
// (2) Metadata table is disabled in HoodieWriteConfig for the writer
return !HoodieTableMetadata.isMetadataTable(metaClient.getBasePath())
&& !config.isMetadataTableEnabled();
}
/**
* Clears hoodie.table.metadata.partitions in hoodie.properties
*/
private void clearMetadataTablePartitionsConfig(Option<MetadataPartitionType> partitionType, boolean clearAll) {
Set<String> partitions = metaClient.getTableConfig().getMetadataPartitions();
if (clearAll && partitions.size() > 0) {
LOG.info("Clear hoodie.table.metadata.partitions in hoodie.properties");
metaClient.getTableConfig().setValue(TABLE_METADATA_PARTITIONS.key(), EMPTY_STRING);
HoodieTableConfig.update(metaClient.getFs(), new Path(metaClient.getMetaPath()), metaClient.getTableConfig().getProps());
} else if (partitionType.isPresent() && partitions.remove(partitionType.get().getPartitionPath())) {
metaClient.getTableConfig().setValue(HoodieTableConfig.TABLE_METADATA_PARTITIONS.key(), String.join(",", partitions));
HoodieTableConfig.update(metaClient.getFs(), new Path(metaClient.getMetaPath()), metaClient.getTableConfig().getProps());
}
}
public HoodieTableMetadata getMetadataTable() {
return this.metadata;
}
/**
* When {@link HoodieTableConfig#POPULATE_META_FIELDS} is enabled,
* we need to track written records within WriteStatus in two cases:
* <ol>
* <li> When the HoodieIndex being used is not implicit with storage
* <li> If any of the metadata table partitions (record index, etc) which require written record tracking are enabled
* </ol>
*/
public boolean shouldTrackSuccessRecords() {
return config.populateMetaFields()
&& (!getIndex().isImplicitWithStorage()
|| getMetadataPartitionsNeedingWriteStatusTracking(config.getMetadataConfig(), getMetaClient()));
}
public Runnable getPreExecuteRunnable() {
return Functions.noop();
}
private Set<String> getDropPartitionColNames() {
boolean shouldDropPartitionColumns = metaClient.getTableConfig().shouldDropPartitionColumns();
if (!shouldDropPartitionColumns) {
return Collections.emptySet();
}
Option<String[]> partitionFields = metaClient.getTableConfig().getPartitionFields();
if (!partitionFields.isPresent()) {
return Collections.emptySet();
}
return new HashSet<>(Arrays.asList(partitionFields.get()));
}
}