blob: 5b751e4f1eaecb1db1c0ed515bb4e84c4f52826b [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hudi.client;
import org.apache.hudi.client.common.HoodieFlinkEngineContext;
import org.apache.hudi.client.embedded.EmbeddedTimelineService;
import org.apache.hudi.common.engine.HoodieEngineContext;
import org.apache.hudi.common.fs.FSUtils;
import org.apache.hudi.common.model.FileSlice;
import org.apache.hudi.common.model.HoodieCommitMetadata;
import org.apache.hudi.common.model.HoodieKey;
import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.common.model.HoodieRecordLocation;
import org.apache.hudi.common.model.HoodieRecordPayload;
import org.apache.hudi.common.model.HoodieTableType;
import org.apache.hudi.common.model.HoodieWriteStat;
import org.apache.hudi.common.model.WriteOperationType;
import org.apache.hudi.common.table.HoodieTableMetaClient;
import org.apache.hudi.common.table.HoodieTableVersion;
import org.apache.hudi.common.table.timeline.HoodieActiveTimeline;
import org.apache.hudi.common.table.timeline.HoodieInstant;
import org.apache.hudi.common.table.timeline.HoodieTimeline;
import org.apache.hudi.common.util.CommitUtils;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.exception.HoodieCommitException;
import org.apache.hudi.exception.HoodieException;
import org.apache.hudi.exception.HoodieIOException;
import org.apache.hudi.exception.HoodieMetadataException;
import org.apache.hudi.exception.HoodieNotSupportedException;
import org.apache.hudi.index.FlinkHoodieIndex;
import org.apache.hudi.index.HoodieIndex;
import org.apache.hudi.io.FlinkAppendHandle;
import org.apache.hudi.io.FlinkCreateHandle;
import org.apache.hudi.io.FlinkMergeAndReplaceHandle;
import org.apache.hudi.io.FlinkMergeHandle;
import org.apache.hudi.io.HoodieWriteHandle;
import org.apache.hudi.io.MiniBatchHandle;
import org.apache.hudi.metadata.FlinkHoodieBackedTableMetadataWriter;
import org.apache.hudi.metadata.HoodieTableMetadataWriter;
import org.apache.hudi.table.BulkInsertPartitioner;
import org.apache.hudi.table.HoodieFlinkTable;
import org.apache.hudi.table.HoodieTable;
import org.apache.hudi.table.HoodieTimelineArchiveLog;
import org.apache.hudi.table.MarkerFiles;
import org.apache.hudi.table.action.HoodieWriteMetadata;
import org.apache.hudi.table.action.compact.FlinkCompactHelpers;
import org.apache.hudi.table.upgrade.FlinkUpgradeDowngrade;
import org.apache.hudi.util.FlinkClientUtil;
import com.codahale.metrics.Timer;
import org.apache.hadoop.conf.Configuration;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.text.ParseException;
import java.util.Comparator;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
@SuppressWarnings("checkstyle:LineLength")
public class HoodieFlinkWriteClient<T extends HoodieRecordPayload> extends
AbstractHoodieWriteClient<T, List<HoodieRecord<T>>, List<HoodieKey>, List<WriteStatus>> {
private static final Logger LOG = LoggerFactory.getLogger(HoodieFlinkWriteClient.class);
/**
* FileID to write handle mapping in order to record the write handles for each file group,
* so that we can append the mini-batch data buffer incrementally.
*/
private Map<String, HoodieWriteHandle<?, ?, ?, ?>> bucketToHandles;
public HoodieFlinkWriteClient(HoodieEngineContext context, HoodieWriteConfig clientConfig) {
this(context, clientConfig, false);
}
@Deprecated
public HoodieFlinkWriteClient(HoodieEngineContext context, HoodieWriteConfig writeConfig, boolean rollbackPending) {
super(context, writeConfig);
this.bucketToHandles = new HashMap<>();
}
@Deprecated
public HoodieFlinkWriteClient(HoodieEngineContext context, HoodieWriteConfig writeConfig, boolean rollbackPending,
Option<EmbeddedTimelineService> timelineService) {
super(context, writeConfig, timelineService);
}
/**
* Complete changes performed at the given instantTime marker with specified action.
*/
@Override
protected HoodieIndex<T, List<HoodieRecord<T>>, List<HoodieKey>, List<WriteStatus>> createIndex(HoodieWriteConfig writeConfig) {
return FlinkHoodieIndex.createIndex((HoodieFlinkEngineContext) context, config);
}
@Override
public boolean commit(String instantTime, List<WriteStatus> writeStatuses, Option<Map<String, String>> extraMetadata, String commitActionType, Map<String, List<String>> partitionToReplacedFileIds) {
List<HoodieWriteStat> writeStats = writeStatuses.parallelStream().map(WriteStatus::getStat).collect(Collectors.toList());
return commitStats(instantTime, writeStats, extraMetadata, commitActionType, partitionToReplacedFileIds);
}
@Override
protected HoodieTable<T, List<HoodieRecord<T>>, List<HoodieKey>, List<WriteStatus>> createTable(HoodieWriteConfig config, Configuration hadoopConf) {
return HoodieFlinkTable.create(config, (HoodieFlinkEngineContext) context);
}
@Override
public List<HoodieRecord<T>> filterExists(List<HoodieRecord<T>> hoodieRecords) {
// Create a Hoodie table which encapsulated the commits and files visible
HoodieFlinkTable<T> table = getHoodieTable();
Timer.Context indexTimer = metrics.getIndexCtx();
List<HoodieRecord<T>> recordsWithLocation = getIndex().tagLocation(hoodieRecords, context, table);
metrics.updateIndexMetrics(LOOKUP_STR, metrics.getDurationInMs(indexTimer == null ? 0L : indexTimer.stop()));
return recordsWithLocation.stream().filter(v1 -> !v1.isCurrentLocationKnown()).collect(Collectors.toList());
}
@Override
public void bootstrap(Option<Map<String, String>> extraMetadata) {
throw new HoodieNotSupportedException("Bootstrap operation is not supported yet");
}
@Override
public List<WriteStatus> upsert(List<HoodieRecord<T>> records, String instantTime) {
HoodieTable<T, List<HoodieRecord<T>>, List<HoodieKey>, List<WriteStatus>> table =
getTableAndInitCtx(WriteOperationType.UPSERT, instantTime);
table.validateUpsertSchema();
preWrite(instantTime, WriteOperationType.UPSERT, table.getMetaClient());
final HoodieWriteHandle<?, ?, ?, ?> writeHandle = getOrCreateWriteHandle(records.get(0), getConfig(),
instantTime, table, records.listIterator());
HoodieWriteMetadata<List<WriteStatus>> result = ((HoodieFlinkTable<T>) table).upsert(context, writeHandle, instantTime, records);
if (result.getIndexLookupDuration().isPresent()) {
metrics.updateIndexMetrics(LOOKUP_STR, result.getIndexLookupDuration().get().toMillis());
}
return postWrite(result, instantTime, table);
}
@Override
public List<WriteStatus> upsertPreppedRecords(List<HoodieRecord<T>> preppedRecords, String instantTime) {
// only used for metadata table, the upsert happens in single thread
HoodieTable<T, List<HoodieRecord<T>>, List<HoodieKey>, List<WriteStatus>> table =
getTableAndInitCtx(WriteOperationType.UPSERT, instantTime);
table.validateUpsertSchema();
preWrite(instantTime, WriteOperationType.UPSERT_PREPPED, table.getMetaClient());
final HoodieWriteHandle<?, ?, ?, ?> writeHandle = getOrCreateWriteHandle(preppedRecords.get(0), getConfig(),
instantTime, table, preppedRecords.listIterator());
HoodieWriteMetadata<List<WriteStatus>> result = ((HoodieFlinkTable<T>) table).upsertPrepped(context, writeHandle, instantTime, preppedRecords);
return postWrite(result, instantTime, table);
}
@Override
public List<WriteStatus> insert(List<HoodieRecord<T>> records, String instantTime) {
HoodieTable<T, List<HoodieRecord<T>>, List<HoodieKey>, List<WriteStatus>> table =
getTableAndInitCtx(WriteOperationType.INSERT, instantTime);
table.validateUpsertSchema();
preWrite(instantTime, WriteOperationType.INSERT, table.getMetaClient());
// create the write handle if not exists
final HoodieWriteHandle<?, ?, ?, ?> writeHandle = getOrCreateWriteHandle(records.get(0), getConfig(),
instantTime, table, records.listIterator());
HoodieWriteMetadata<List<WriteStatus>> result = ((HoodieFlinkTable<T>) table).insert(context, writeHandle, instantTime, records);
if (result.getIndexLookupDuration().isPresent()) {
metrics.updateIndexMetrics(LOOKUP_STR, result.getIndexLookupDuration().get().toMillis());
}
return postWrite(result, instantTime, table);
}
/**
* Removes all existing records from the partitions affected and inserts the given HoodieRecords, into the table.
*
* @param records HoodieRecords to insert
* @param instantTime Instant time of the commit
* @return list of WriteStatus to inspect errors and counts
*/
public List<WriteStatus> insertOverwrite(
List<HoodieRecord<T>> records, final String instantTime) {
HoodieTable<T, List<HoodieRecord<T>>, List<HoodieKey>, List<WriteStatus>> table =
getTableAndInitCtx(WriteOperationType.INSERT_OVERWRITE, instantTime);
table.validateInsertSchema();
preWrite(instantTime, WriteOperationType.INSERT_OVERWRITE, table.getMetaClient());
// create the write handle if not exists
final HoodieWriteHandle<?, ?, ?, ?> writeHandle = getOrCreateWriteHandle(records.get(0), getConfig(),
instantTime, table, records.listIterator());
HoodieWriteMetadata result = ((HoodieFlinkTable<T>) table).insertOverwrite(context, writeHandle, instantTime, records);
return postWrite(result, instantTime, table);
}
/**
* Removes all existing records of the Hoodie table and inserts the given HoodieRecords, into the table.
*
* @param records HoodieRecords to insert
* @param instantTime Instant time of the commit
* @return list of WriteStatus to inspect errors and counts
*/
public List<WriteStatus> insertOverwriteTable(
List<HoodieRecord<T>> records, final String instantTime) {
HoodieTable table = getTableAndInitCtx(WriteOperationType.INSERT_OVERWRITE_TABLE, instantTime);
table.validateInsertSchema();
preWrite(instantTime, WriteOperationType.INSERT_OVERWRITE_TABLE, table.getMetaClient());
// create the write handle if not exists
final HoodieWriteHandle<?, ?, ?, ?> writeHandle = getOrCreateWriteHandle(records.get(0), getConfig(),
instantTime, table, records.listIterator());
HoodieWriteMetadata result = ((HoodieFlinkTable<T>) table).insertOverwriteTable(context, writeHandle, instantTime, records);
return postWrite(result, instantTime, table);
}
@Override
public List<WriteStatus> insertPreppedRecords(List<HoodieRecord<T>> preppedRecords, String instantTime) {
throw new HoodieNotSupportedException("InsertPrepped operation is not supported yet");
}
@Override
public List<WriteStatus> bulkInsert(List<HoodieRecord<T>> records, String instantTime) {
throw new HoodieNotSupportedException("BulkInsert operation is not supported yet");
}
@Override
public List<WriteStatus> bulkInsert(List<HoodieRecord<T>> records, String instantTime, Option<BulkInsertPartitioner<List<HoodieRecord<T>>>> userDefinedBulkInsertPartitioner) {
throw new HoodieNotSupportedException("BulkInsert operation is not supported yet");
}
@Override
public List<WriteStatus> bulkInsertPreppedRecords(List<HoodieRecord<T>> preppedRecords, String instantTime, Option<BulkInsertPartitioner<List<HoodieRecord<T>>>> bulkInsertPartitioner) {
throw new HoodieNotSupportedException("BulkInsertPrepped operation is not supported yet");
}
@Override
public List<WriteStatus> delete(List<HoodieKey> keys, String instantTime) {
HoodieTable<T, List<HoodieRecord<T>>, List<HoodieKey>, List<WriteStatus>> table =
getTableAndInitCtx(WriteOperationType.DELETE, instantTime);
preWrite(instantTime, WriteOperationType.DELETE, table.getMetaClient());
HoodieWriteMetadata<List<WriteStatus>> result = table.delete(context, instantTime, keys);
return postWrite(result, instantTime, table);
}
@Override
protected void preWrite(String instantTime, WriteOperationType writeOperationType, HoodieTableMetaClient metaClient) {
setOperationType(writeOperationType);
// Note: the code to read the commit metadata is not thread safe for JSON deserialization,
// remove the table metadata sync
// remove the async cleaning
}
/**
* Starts async cleaning service for finished commits.
*
* <p>The Flink write client is designed to write data set as buckets
* but cleaning action should trigger after all the write actions within a
* checkpoint finish.
*/
public void startAsyncCleaning() {
this.asyncCleanerService = AsyncCleanerService.startAsyncCleaningIfEnabled(this);
}
/**
* Blocks and wait for the async cleaning service to finish.
*
* <p>The Flink write client is designed to write data set as buckets
* but cleaning action should trigger after all the write actions within a
* checkpoint finish.
*/
public void waitForCleaningFinish() {
if (this.asyncCleanerService != null) {
LOG.info("Cleaner has been spawned already. Waiting for it to finish");
AsyncCleanerService.waitForCompletion(asyncCleanerService);
LOG.info("Cleaner has finished");
}
}
@Override
protected List<WriteStatus> postWrite(HoodieWriteMetadata<List<WriteStatus>> result,
String instantTime,
HoodieTable<T, List<HoodieRecord<T>>, List<HoodieKey>, List<WriteStatus>> hoodieTable) {
if (result.getIndexLookupDuration().isPresent()) {
metrics.updateIndexMetrics(getOperationType().name(), result.getIndexUpdateDuration().get().toMillis());
}
return result.getWriteStatuses();
}
/**
* Post commit is rewrite to be invoked after a successful commit.
*
* <p>The Flink write client is designed to write data set as buckets
* but cleaning action should trigger after all the write actions within a
* checkpoint finish.
*
* @param table Table to commit on
* @param metadata Commit Metadata corresponding to committed instant
* @param instantTime Instant Time
* @param extraMetadata Additional Metadata passed by user
*/
@Override
protected void postCommit(HoodieTable<T, List<HoodieRecord<T>>, List<HoodieKey>, List<WriteStatus>> table,
HoodieCommitMetadata metadata,
String instantTime,
Option<Map<String, String>> extraMetadata) {
try {
// Delete the marker directory for the instant.
new MarkerFiles(createTable(config, hadoopConf), instantTime)
.quietDeleteMarkerDir(context, config.getMarkersDeleteParallelism());
// We cannot have unbounded commit files. Archive commits if we have to archive
HoodieTimelineArchiveLog archiveLog = new HoodieTimelineArchiveLog(config, table);
archiveLog.archiveIfRequired(context);
} catch (IOException ioe) {
throw new HoodieIOException(ioe.getMessage(), ioe);
} finally {
this.heartbeatClient.stop(instantTime);
}
}
@Override
public void commitCompaction(
String compactionInstantTime,
List<WriteStatus> writeStatuses,
Option<Map<String, String>> extraMetadata) throws IOException {
HoodieFlinkTable<T> table = getHoodieTable();
HoodieCommitMetadata metadata = FlinkCompactHelpers.newInstance().createCompactionMetadata(
table, compactionInstantTime, writeStatuses, config.getSchema());
extraMetadata.ifPresent(m -> m.forEach(metadata::addMetadata));
completeCompaction(metadata, writeStatuses, table, compactionInstantTime);
}
@Override
public void completeCompaction(
HoodieCommitMetadata metadata,
List<WriteStatus> writeStatuses,
HoodieTable<T, List<HoodieRecord<T>>, List<HoodieKey>, List<WriteStatus>> table,
String compactionCommitTime) {
this.context.setJobStatus(this.getClass().getSimpleName(), "Collect compaction write status and commit compaction");
List<HoodieWriteStat> writeStats = writeStatuses.stream().map(WriteStatus::getStat).collect(Collectors.toList());
finalizeWrite(table, compactionCommitTime, writeStats);
LOG.info("Committing Compaction {} finished with result {}.", compactionCommitTime, metadata);
FlinkCompactHelpers.newInstance().completeInflightCompaction(table, compactionCommitTime, metadata);
if (compactionTimer != null) {
long durationInMs = metrics.getDurationInMs(compactionTimer.stop());
try {
metrics.updateCommitMetrics(HoodieActiveTimeline.COMMIT_FORMATTER.parse(compactionCommitTime).getTime(),
durationInMs, metadata, HoodieActiveTimeline.COMPACTION_ACTION);
} catch (ParseException e) {
throw new HoodieCommitException("Commit time is not of valid format. Failed to commit compaction "
+ config.getBasePath() + " at time " + compactionCommitTime, e);
}
}
LOG.info("Compacted successfully on commit " + compactionCommitTime);
}
@Override
protected List<WriteStatus> compact(String compactionInstantTime, boolean shouldComplete) {
// only used for metadata table, the compaction happens in single thread
try {
List<WriteStatus> writeStatuses = FlinkCompactHelpers.compact(compactionInstantTime, this);
commitCompaction(compactionInstantTime, writeStatuses, Option.empty());
return writeStatuses;
} catch (IOException e) {
throw new HoodieException("Error while compacting instant: " + compactionInstantTime);
}
}
@Override
public HoodieWriteMetadata<List<WriteStatus>> cluster(final String clusteringInstant, final boolean shouldComplete) {
throw new HoodieNotSupportedException("Clustering is not supported yet");
}
@Override
protected HoodieTable<T, List<HoodieRecord<T>>, List<HoodieKey>, List<WriteStatus>> getTableAndInitCtx(WriteOperationType operationType, String instantTime) {
HoodieTableMetaClient metaClient = createMetaClient(true);
new FlinkUpgradeDowngrade(metaClient, config, context).run(metaClient, HoodieTableVersion.current(), config, context, instantTime);
return getTableAndInitCtx(metaClient, operationType);
}
@Override
public void syncTableMetadata() {
// Open up the metadata table again, for syncing
try (HoodieTableMetadataWriter writer = FlinkHoodieBackedTableMetadataWriter.create(hadoopConf, config, context)) {
LOG.info("Successfully synced to metadata table");
} catch (Exception e) {
throw new HoodieMetadataException("Error syncing to metadata table.", e);
}
}
/**
* Clean the write handles within a checkpoint interval.
* All the handles should have been closed already.
*/
public void cleanHandles() {
this.bucketToHandles.clear();
}
/**
* Clean the write handles within a checkpoint interval, this operation
* would close the underneath file handles, if any error happens, clean the
* corrupted data file.
*/
public void cleanHandlesGracefully() {
this.bucketToHandles.values()
.forEach(handle -> ((MiniBatchHandle) handle).closeGracefully());
this.bucketToHandles.clear();
}
/**
* Get or create a new write handle in order to reuse the file handles.
*
* @param record The first record in the bucket
* @param config Write config
* @param instantTime The instant time
* @param table The table
* @param recordItr Record iterator
* @return Existing write handle or create a new one
*/
private HoodieWriteHandle<?, ?, ?, ?> getOrCreateWriteHandle(
HoodieRecord<T> record,
HoodieWriteConfig config,
String instantTime,
HoodieTable<T, List<HoodieRecord<T>>, List<HoodieKey>, List<WriteStatus>> table,
Iterator<HoodieRecord<T>> recordItr) {
final HoodieRecordLocation loc = record.getCurrentLocation();
final String fileID = loc.getFileId();
final String partitionPath = record.getPartitionPath();
// Always use FlinkCreateHandle when insert duplication turns on
if (config.allowDuplicateInserts()) {
return new FlinkCreateHandle<>(config, instantTime, table, partitionPath,
fileID, table.getTaskContextSupplier());
}
if (bucketToHandles.containsKey(fileID)) {
MiniBatchHandle lastHandle = (MiniBatchHandle) bucketToHandles.get(fileID);
if (lastHandle.shouldReplace()) {
HoodieWriteHandle<?, ?, ?, ?> writeHandle = new FlinkMergeAndReplaceHandle<>(
config, instantTime, table, recordItr, partitionPath, fileID, table.getTaskContextSupplier(),
lastHandle.getWritePath());
this.bucketToHandles.put(fileID, writeHandle); // override with new replace handle
return writeHandle;
}
}
final boolean isDelta = table.getMetaClient().getTableType().equals(HoodieTableType.MERGE_ON_READ);
final HoodieWriteHandle<?, ?, ?, ?> writeHandle;
if (isDelta) {
writeHandle = new FlinkAppendHandle<>(config, instantTime, table, partitionPath, fileID, recordItr,
table.getTaskContextSupplier());
} else if (loc.getInstantTime().equals("I")) {
writeHandle = new FlinkCreateHandle<>(config, instantTime, table, partitionPath,
fileID, table.getTaskContextSupplier());
} else {
writeHandle = new FlinkMergeHandle<>(config, instantTime, table, recordItr, partitionPath,
fileID, table.getTaskContextSupplier());
}
this.bucketToHandles.put(fileID, writeHandle);
return writeHandle;
}
private HoodieTable<T, List<HoodieRecord<T>>, List<HoodieKey>, List<WriteStatus>> getTableAndInitCtx(HoodieTableMetaClient metaClient, WriteOperationType operationType) {
if (operationType == WriteOperationType.DELETE) {
setWriteSchemaForDeletes(metaClient);
}
// Create a Hoodie table which encapsulated the commits and files visible
HoodieFlinkTable<T> table = getHoodieTable();
if (table.getMetaClient().getCommitActionType().equals(HoodieTimeline.COMMIT_ACTION)) {
writeTimer = metrics.getCommitCtx();
} else {
writeTimer = metrics.getDeltaCommitCtx();
}
return table;
}
public String getLastPendingInstant(HoodieTableType tableType) {
final String actionType = CommitUtils.getCommitActionType(tableType);
return getLastPendingInstant(actionType);
}
public String getLastPendingInstant(String actionType) {
HoodieTimeline unCompletedTimeline = FlinkClientUtil.createMetaClient(basePath)
.getCommitsTimeline().filterInflightsAndRequested();
return unCompletedTimeline.getInstants()
.filter(x -> x.getAction().equals(actionType) && x.isInflight())
.map(HoodieInstant::getTimestamp)
.collect(Collectors.toList()).stream()
.max(Comparator.naturalOrder())
.orElse(null);
}
public String getLastCompletedInstant(HoodieTableType tableType) {
final String commitType = CommitUtils.getCommitActionType(tableType);
HoodieTimeline completedTimeline = FlinkClientUtil.createMetaClient(basePath)
.getCommitsTimeline().filterCompletedInstants();
return completedTimeline.getInstants()
.filter(x -> x.getAction().equals(commitType))
.map(HoodieInstant::getTimestamp)
.collect(Collectors.toList()).stream()
.max(Comparator.naturalOrder())
.orElse(null);
}
public void transitionRequestedToInflight(String commitType, String inFlightInstant) {
HoodieActiveTimeline activeTimeline = FlinkClientUtil.createMetaClient(basePath).getActiveTimeline();
HoodieInstant requested = new HoodieInstant(HoodieInstant.State.REQUESTED, commitType, inFlightInstant);
activeTimeline.transitionRequestedToInflight(requested, Option.empty(),
config.shouldAllowMultiWriteOnSameInstant());
}
public HoodieFlinkTable<T> getHoodieTable() {
return HoodieFlinkTable.create(config, (HoodieFlinkEngineContext) context);
}
public Map<String, List<String>> getPartitionToReplacedFileIds(
WriteOperationType writeOperationType,
List<WriteStatus> writeStatuses) {
HoodieFlinkTable<T> table = getHoodieTable();
switch (writeOperationType) {
case INSERT_OVERWRITE:
return writeStatuses.stream().map(status -> status.getStat().getPartitionPath()).distinct()
.collect(
Collectors.toMap(
partition -> partition,
partitionPath -> getAllExistingFileIds(table, partitionPath)));
case INSERT_OVERWRITE_TABLE:
Map<String, List<String>> partitionToExistingFileIds = new HashMap<>();
List<String> partitionPaths =
FSUtils.getAllPartitionPaths(context, config.getMetadataConfig(), table.getMetaClient().getBasePath());
if (partitionPaths != null && partitionPaths.size() > 0) {
context.setJobStatus(this.getClass().getSimpleName(), "Getting ExistingFileIds of all partitions");
partitionToExistingFileIds = partitionPaths.stream().parallel()
.collect(
Collectors.toMap(
partition -> partition,
partition -> getAllExistingFileIds(table, partition)));
}
return partitionToExistingFileIds;
default:
throw new AssertionError();
}
}
private List<String> getAllExistingFileIds(HoodieFlinkTable<T> table, String partitionPath) {
// because new commit is not complete. it is safe to mark all existing file Ids as old files
return table.getSliceView().getLatestFileSlices(partitionPath).map(FileSlice::getFileId).distinct().collect(Collectors.toList());
}
}