| /* |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| |
| package org.apache.hudi.table; |
| |
| import org.apache.hudi.avro.model.HoodieActionInstant; |
| import org.apache.hudi.avro.model.HoodieCleanMetadata; |
| import org.apache.hudi.avro.model.HoodieCleanPartitionMetadata; |
| import org.apache.hudi.avro.model.HoodieCleanerPlan; |
| import org.apache.hudi.avro.model.HoodieClusteringGroup; |
| import org.apache.hudi.avro.model.HoodieClusteringPlan; |
| import org.apache.hudi.avro.model.HoodieClusteringStrategy; |
| import org.apache.hudi.avro.model.HoodieRequestedReplaceMetadata; |
| import org.apache.hudi.avro.model.HoodieRollbackMetadata; |
| import org.apache.hudi.avro.model.HoodieSliceInfo; |
| import org.apache.hudi.client.SparkRDDWriteClient; |
| import org.apache.hudi.client.WriteStatus; |
| import org.apache.hudi.client.common.HoodieSparkEngineContext; |
| import org.apache.hudi.common.HoodieCleanStat; |
| import org.apache.hudi.common.bootstrap.TestBootstrapIndex; |
| import org.apache.hudi.common.config.HoodieMetadataConfig; |
| import org.apache.hudi.common.fs.ConsistencyGuardConfig; |
| import org.apache.hudi.common.fs.FSUtils; |
| import org.apache.hudi.common.model.BootstrapFileMapping; |
| import org.apache.hudi.common.model.FileSlice; |
| import org.apache.hudi.common.model.HoodieCleaningPolicy; |
| import org.apache.hudi.common.model.HoodieCommitMetadata; |
| import org.apache.hudi.common.model.HoodieFailedWritesCleaningPolicy; |
| import org.apache.hudi.common.model.HoodieRecord; |
| import org.apache.hudi.common.model.HoodieReplaceCommitMetadata; |
| import org.apache.hudi.common.model.HoodieTableType; |
| import org.apache.hudi.common.model.HoodieWriteStat; |
| import org.apache.hudi.common.model.IOType; |
| import org.apache.hudi.common.model.WriteOperationType; |
| import org.apache.hudi.common.table.HoodieTableMetaClient; |
| import org.apache.hudi.common.table.timeline.HoodieActiveTimeline; |
| import org.apache.hudi.common.table.timeline.HoodieInstant; |
| import org.apache.hudi.common.table.timeline.HoodieInstant.State; |
| import org.apache.hudi.common.table.timeline.HoodieTimeline; |
| import org.apache.hudi.common.table.timeline.TimelineMetadataUtils; |
| import org.apache.hudi.common.table.timeline.versioning.clean.CleanMetadataMigrator; |
| import org.apache.hudi.common.table.timeline.versioning.clean.CleanPlanMigrator; |
| import org.apache.hudi.common.table.timeline.versioning.clean.CleanPlanV1MigrationHandler; |
| import org.apache.hudi.common.table.view.FileSystemViewStorageConfig; |
| import org.apache.hudi.common.testutils.HoodieMetadataTestTable; |
| import org.apache.hudi.common.testutils.HoodieTestTable; |
| import org.apache.hudi.common.testutils.HoodieTestUtils; |
| import org.apache.hudi.common.util.CleanerUtils; |
| import org.apache.hudi.common.util.CollectionUtils; |
| import org.apache.hudi.common.util.Option; |
| import org.apache.hudi.common.util.StringUtils; |
| import org.apache.hudi.common.util.collection.Pair; |
| import org.apache.hudi.config.HoodieCleanConfig; |
| import org.apache.hudi.config.HoodieCompactionConfig; |
| import org.apache.hudi.config.HoodieWriteConfig; |
| import org.apache.hudi.exception.HoodieIOException; |
| import org.apache.hudi.index.HoodieIndex; |
| import org.apache.hudi.index.SparkHoodieIndexFactory; |
| import org.apache.hudi.metadata.HoodieTableMetadata; |
| import org.apache.hudi.metadata.HoodieTableMetadataWriter; |
| import org.apache.hudi.metadata.SparkHoodieBackedTableMetadataWriter; |
| import org.apache.hudi.table.action.clean.CleanPlanner; |
| import org.apache.hudi.testutils.HoodieClientTestBase; |
| |
| import org.apache.hadoop.fs.FSDataOutputStream; |
| import org.apache.hadoop.fs.Path; |
| import org.apache.spark.api.java.JavaRDD; |
| import org.junit.jupiter.api.Test; |
| import org.junit.jupiter.params.ParameterizedTest; |
| import org.junit.jupiter.params.provider.ValueSource; |
| |
| import java.io.File; |
| import java.io.IOException; |
| import java.nio.charset.StandardCharsets; |
| import java.nio.file.Paths; |
| import java.util.ArrayList; |
| import java.util.Arrays; |
| import java.util.Collections; |
| import java.util.HashMap; |
| import java.util.List; |
| import java.util.Map; |
| import java.util.UUID; |
| import java.util.concurrent.TimeUnit; |
| import java.util.function.Predicate; |
| import java.util.stream.Collectors; |
| import java.util.stream.Stream; |
| |
| import scala.Tuple3; |
| |
| import static org.apache.hudi.HoodieTestCommitGenerator.getBaseFilename; |
| import static org.apache.hudi.common.testutils.HoodieTestTable.makeNewCommitTime; |
| import static org.apache.hudi.common.testutils.HoodieTestUtils.DEFAULT_PARTITION_PATHS; |
| import static org.apache.hudi.testutils.Assertions.assertNoWriteErrors; |
| import static org.awaitility.Awaitility.await; |
| import static org.junit.jupiter.api.Assertions.assertEquals; |
| import static org.junit.jupiter.api.Assertions.assertFalse; |
| import static org.junit.jupiter.api.Assertions.assertNotNull; |
| import static org.junit.jupiter.api.Assertions.assertNull; |
| import static org.junit.jupiter.api.Assertions.assertTrue; |
| |
| /** |
| * Test Cleaning related logic. |
| */ |
| public class TestCleaner extends HoodieClientTestBase { |
| |
| private static final int BIG_BATCH_INSERT_SIZE = 500; |
| private static final int PARALLELISM = 10; |
| |
| /** |
| * Helper method to do first batch of insert for clean by versions/commits tests. |
| * |
| * @param context Spark engine context |
| * @param metaClient Hoodie table meta client |
| * @param client Hoodie Client |
| * @param recordGenFunction Function to generate records for insertion |
| * @param insertFn Insertion API for testing |
| * @throws Exception in case of error |
| */ |
| public static Pair<String, JavaRDD<WriteStatus>> insertFirstBigBatchForClientCleanerTest( |
| HoodieSparkEngineContext context, |
| HoodieTableMetaClient metaClient, |
| SparkRDDWriteClient client, |
| Function2<List<HoodieRecord>, String, Integer> recordGenFunction, |
| Function3<JavaRDD<WriteStatus>, SparkRDDWriteClient, JavaRDD<HoodieRecord>, String> insertFn) throws Exception { |
| |
| /* |
| * do a big insert (this is basically same as insert part of upsert, just adding it here so we can catch breakages |
| * in insert(), if the implementation diverges.) |
| */ |
| String newCommitTime = client.startCommit(); |
| |
| List<HoodieRecord> records = recordGenFunction.apply(newCommitTime, BIG_BATCH_INSERT_SIZE); |
| JavaRDD<HoodieRecord> writeRecords = context.getJavaSparkContext().parallelize(records, PARALLELISM); |
| |
| JavaRDD<WriteStatus> statuses = insertFn.apply(client, writeRecords, newCommitTime); |
| // Verify there are no errors |
| assertNoWriteErrors(statuses.collect()); |
| // verify that there is a commit |
| metaClient = HoodieTableMetaClient.reload(metaClient); |
| HoodieTimeline timeline = new HoodieActiveTimeline(metaClient).getCommitTimeline(); |
| assertEquals(1, timeline.findInstantsAfter("000", Integer.MAX_VALUE).countInstants(), "Expecting a single commit."); |
| // Should have 100 records in table (check using Index), all in locations marked at commit |
| HoodieTable table = HoodieSparkTable.create(client.getConfig(), context, metaClient); |
| |
| if (client.getConfig().shouldAutoCommit()) { |
| assertFalse(table.getCompletedCommitsTimeline().empty()); |
| } |
| // We no longer write empty cleaner plans when there is nothing to be cleaned. |
| assertTrue(table.getCompletedCleanTimeline().empty()); |
| |
| if (client.getConfig().shouldAutoCommit()) { |
| HoodieIndex index = SparkHoodieIndexFactory.createIndex(client.getConfig()); |
| List<HoodieRecord> taggedRecords = tagLocation(index, context, context.getJavaSparkContext().parallelize(records, PARALLELISM), table).collect(); |
| checkTaggedRecords(taggedRecords, newCommitTime); |
| } |
| return Pair.of(newCommitTime, statuses); |
| } |
| |
| /** |
| * Helper method to do first batch of insert for clean by versions/commits tests. |
| * |
| * @param context Spark engine context |
| * @param client Hoodie Client |
| * @param recordGenFunction Function to generate records for insertion |
| * @param insertFn Insertion API for testing |
| * @throws Exception in case of error |
| */ |
| public static Pair<String, JavaRDD<WriteStatus>> insertFirstFailedBigBatchForClientCleanerTest( |
| HoodieSparkEngineContext context, |
| SparkRDDWriteClient client, |
| Function2<List<HoodieRecord>, String, Integer> recordGenFunction, |
| Function3<JavaRDD<WriteStatus>, SparkRDDWriteClient, JavaRDD<HoodieRecord>, String> insertFn) throws Exception { |
| |
| /* |
| * do a big insert (this is basically same as insert part of upsert, just adding it here so we can catch breakages |
| * in insert(), if the implementation diverges.) |
| */ |
| String newCommitTime = client.startCommit(); |
| |
| List<HoodieRecord> records = recordGenFunction.apply(newCommitTime, BIG_BATCH_INSERT_SIZE); |
| JavaRDD<HoodieRecord> writeRecords = context.getJavaSparkContext().parallelize(records, 5); |
| |
| JavaRDD<WriteStatus> statuses = insertFn.apply(client, writeRecords, newCommitTime); |
| // Verify there are no errors |
| assertNoWriteErrors(statuses.collect()); |
| // Don't invoke commit to simulate failed write |
| client.getHeartbeatClient().stop(newCommitTime); |
| return Pair.of(newCommitTime, statuses); |
| } |
| |
| /** |
| * Test Clean-Failed-Writes when Cleaning policy is by VERSIONS using insert/upsert API. |
| */ |
| @Test |
| public void testInsertAndCleanFailedWritesByVersions() throws Exception { |
| testInsertAndCleanFailedWritesByVersions(SparkRDDWriteClient::insert, false); |
| } |
| |
| /** |
| * Test Helper for cleaning failed writes by versions logic from HoodieWriteClient API perspective. |
| * |
| * @param insertFn Insert API to be tested |
| * @param isPreppedAPI Flag to indicate if a prepped-version is used. If true, a wrapper function will be used during |
| * record generation to also tag the regards (de-dupe is implicit as we use unique record-gen APIs) |
| * @throws Exception in case of errors |
| */ |
| private void testInsertAndCleanFailedWritesByVersions( |
| Function3<JavaRDD<WriteStatus>, SparkRDDWriteClient, JavaRDD<HoodieRecord>, String> insertFn, boolean isPreppedAPI) |
| throws Exception { |
| int maxVersions = 3; // keep upto 3 versions for each file |
| HoodieWriteConfig cfg = getConfigBuilder() |
| .withAutoCommit(false) |
| .withHeartbeatIntervalInMs(3000) |
| .withCleanConfig(HoodieCleanConfig.newBuilder() |
| .withFailedWritesCleaningPolicy(HoodieFailedWritesCleaningPolicy.LAZY) |
| .withCleanerPolicy(HoodieCleaningPolicy.KEEP_LATEST_FILE_VERSIONS).retainFileVersions(maxVersions).build()) |
| .withParallelism(1, 1).withBulkInsertParallelism(1).withFinalizeWriteParallelism(1).withDeleteParallelism(1) |
| .withConsistencyGuardConfig(ConsistencyGuardConfig.newBuilder().withConsistencyCheckEnabled(true).build()) |
| .build(); |
| try (SparkRDDWriteClient client = getHoodieWriteClient(cfg)) { |
| |
| final Function2<List<HoodieRecord>, String, Integer> recordInsertGenWrappedFunction = |
| generateWrapRecordsFn(isPreppedAPI, cfg, dataGen::generateInserts); |
| |
| Pair<String, JavaRDD<WriteStatus>> result = insertFirstBigBatchForClientCleanerTest(context, metaClient, client, recordInsertGenWrappedFunction, insertFn); |
| |
| client.commit(result.getLeft(), result.getRight()); |
| |
| HoodieTable table = HoodieSparkTable.create(client.getConfig(), context, metaClient); |
| |
| assertTrue(table.getCompletedCleanTimeline().empty()); |
| |
| insertFirstFailedBigBatchForClientCleanerTest(context, client, recordInsertGenWrappedFunction, insertFn); |
| |
| insertFirstFailedBigBatchForClientCleanerTest(context, client, recordInsertGenWrappedFunction, insertFn); |
| |
| Pair<String, JavaRDD<WriteStatus>> ret = |
| insertFirstFailedBigBatchForClientCleanerTest(context, client, recordInsertGenWrappedFunction, insertFn); |
| |
| // Await till enough time passes such that the last failed commits heartbeats are expired |
| await().atMost(10, TimeUnit.SECONDS).until(() -> client.getHeartbeatClient() |
| .isHeartbeatExpired(ret.getLeft())); |
| |
| List<HoodieCleanStat> cleanStats = runCleaner(cfg); |
| assertEquals(0, cleanStats.size(), "Must not clean any files"); |
| HoodieActiveTimeline timeline = metaClient.reloadActiveTimeline(); |
| assertTrue(timeline.getTimelineOfActions( |
| CollectionUtils.createSet(HoodieTimeline.ROLLBACK_ACTION)).filterCompletedInstants().countInstants() == 3); |
| Option<HoodieInstant> rollBackInstantForFailedCommit = timeline.getTimelineOfActions( |
| CollectionUtils.createSet(HoodieTimeline.ROLLBACK_ACTION)).filterCompletedInstants().lastInstant(); |
| HoodieRollbackMetadata rollbackMetadata = TimelineMetadataUtils.deserializeAvroMetadata( |
| timeline.getInstantDetails(rollBackInstantForFailedCommit.get()).get(), HoodieRollbackMetadata.class); |
| // Rollback of one of the failed writes should have deleted 3 files |
| assertEquals(3, rollbackMetadata.getTotalFilesDeleted()); |
| } |
| } |
| |
| /** |
| * Tests no more than 1 clean is scheduled if hoodie.clean.allow.multiple config is set to false. |
| */ |
| @Test |
| public void testMultiClean() { |
| HoodieWriteConfig writeConfig = getConfigBuilder() |
| .withFileSystemViewConfig(new FileSystemViewStorageConfig.Builder() |
| .withEnableBackupForRemoteFileSystemView(false).build()) |
| .withCleanConfig(HoodieCleanConfig.newBuilder() |
| .withFailedWritesCleaningPolicy(HoodieFailedWritesCleaningPolicy.EAGER) |
| .allowMultipleCleans(false) |
| .withAutoClean(false).retainCommits(1).retainFileVersions(1) |
| .build()) |
| .withCompactionConfig(HoodieCompactionConfig.newBuilder().compactionSmallFileSize(1024 * 1024 * 1024) |
| .withInlineCompaction(false).withMaxNumDeltaCommitsBeforeCompaction(1) |
| .build()) |
| .withEmbeddedTimelineServerEnabled(false).build(); |
| |
| int index = 0; |
| String cleanInstantTime; |
| final String partition = "2015/03/16"; |
| try (SparkRDDWriteClient client = new SparkRDDWriteClient(context, writeConfig)) { |
| // Three writes so we can initiate a clean |
| for (; index < 3; ++index) { |
| String newCommitTime = "00" + index; |
| List<HoodieRecord> records = dataGen.generateInsertsForPartition(newCommitTime, 1, partition); |
| client.startCommitWithTime(newCommitTime); |
| client.insert(jsc.parallelize(records, 1), newCommitTime).collect(); |
| } |
| } |
| |
| // mimic failed/leftover clean by scheduling a clean but not performing it |
| cleanInstantTime = "00" + index++; |
| HoodieTable table = HoodieSparkTable.create(writeConfig, context); |
| Option<HoodieCleanerPlan> cleanPlan = table.scheduleCleaning(context, cleanInstantTime, Option.empty()); |
| assertEquals(cleanPlan.get().getFilePathsToBeDeletedPerPartition().get(partition).size(), 1); |
| assertEquals(metaClient.reloadActiveTimeline().getCleanerTimeline().filterInflightsAndRequested().countInstants(), 1); |
| |
| try (SparkRDDWriteClient client = new SparkRDDWriteClient(context, writeConfig)) { |
| // Next commit. This is required so that there is an additional file version to clean. |
| String newCommitTime = "00" + index++; |
| List<HoodieRecord> records = dataGen.generateInsertsForPartition(newCommitTime, 1, partition); |
| client.startCommitWithTime(newCommitTime); |
| client.insert(jsc.parallelize(records, 1), newCommitTime).collect(); |
| |
| // Try to schedule another clean |
| String newCleanInstantTime = "00" + index++; |
| HoodieCleanMetadata cleanMetadata = client.clean(newCleanInstantTime); |
| // When hoodie.clean.allow.multiple is set to false, a new clean action should not be scheduled. |
| // The existing requested clean should complete execution. |
| assertNotNull(cleanMetadata); |
| assertTrue(metaClient.reloadActiveTimeline().getCleanerTimeline() |
| .filterCompletedInstants().containsInstant(cleanInstantTime)); |
| assertFalse(metaClient.getActiveTimeline().getCleanerTimeline() |
| .containsInstant(newCleanInstantTime)); |
| |
| // 1 file cleaned |
| assertEquals(cleanMetadata.getPartitionMetadata().get(partition).getSuccessDeleteFiles().size(), 1); |
| assertEquals(cleanMetadata.getPartitionMetadata().get(partition).getFailedDeleteFiles().size(), 0); |
| assertEquals(cleanMetadata.getPartitionMetadata().get(partition).getDeletePathPatterns().size(), 1); |
| |
| // Now that there is no requested or inflight clean instant, a new clean action can be scheduled |
| cleanMetadata = client.clean(newCleanInstantTime); |
| assertNotNull(cleanMetadata); |
| assertTrue(metaClient.reloadActiveTimeline().getCleanerTimeline() |
| .containsInstant(newCleanInstantTime)); |
| |
| // 1 file cleaned |
| assertEquals(cleanMetadata.getPartitionMetadata().get(partition).getSuccessDeleteFiles().size(), 1); |
| assertEquals(cleanMetadata.getPartitionMetadata().get(partition).getFailedDeleteFiles().size(), 0); |
| assertEquals(cleanMetadata.getPartitionMetadata().get(partition).getDeletePathPatterns().size(), 1); |
| } |
| } |
| |
| /** |
| * Test Clean-By-Commits using insert/upsert API. |
| */ |
| @Test |
| public void testFailedInsertAndCleanByCommits() throws Exception { |
| testFailedInsertAndCleanByCommits(SparkRDDWriteClient::insert, false); |
| } |
| |
| /** |
| * Test Helper for Cleaning failed commits by commits logic from HoodieWriteClient API perspective. |
| * |
| * @param insertFn Insert API to be tested |
| * @param isPreppedAPI Flag to indicate if a prepped-version is used. If true, a wrapper function will be used during |
| * record generation to also tag the regards (de-dupe is implicit as we use uniq record-gen APIs) |
| * @throws Exception in case of errors |
| */ |
| private void testFailedInsertAndCleanByCommits( |
| Function3<JavaRDD<WriteStatus>, SparkRDDWriteClient, JavaRDD<HoodieRecord>, String> insertFn, boolean isPreppedAPI) |
| throws Exception { |
| int maxCommits = 3; // keep upto 3 commits from the past |
| HoodieWriteConfig cfg = getConfigBuilder() |
| .withAutoCommit(false) |
| .withHeartbeatIntervalInMs(3000) |
| .withCleanConfig(HoodieCleanConfig.newBuilder() |
| .withFailedWritesCleaningPolicy(HoodieFailedWritesCleaningPolicy.LAZY) |
| .withCleanerPolicy(HoodieCleaningPolicy.KEEP_LATEST_COMMITS).retainCommits(maxCommits).build()) |
| .withParallelism(1, 1).withBulkInsertParallelism(1).withFinalizeWriteParallelism(1).withDeleteParallelism(1) |
| .withConsistencyGuardConfig(ConsistencyGuardConfig.newBuilder().withConsistencyCheckEnabled(true).build()) |
| .build(); |
| SparkRDDWriteClient client = getHoodieWriteClient(cfg); |
| |
| final Function2<List<HoodieRecord>, String, Integer> recordInsertGenWrappedFunction = |
| generateWrapRecordsFn(isPreppedAPI, cfg, dataGen::generateInserts); |
| |
| Pair<String, JavaRDD<WriteStatus>> result = insertFirstBigBatchForClientCleanerTest(context, metaClient, client, recordInsertGenWrappedFunction, insertFn); |
| client.commit(result.getLeft(), result.getRight()); |
| |
| HoodieTable table = HoodieSparkTable.create(client.getConfig(), context, metaClient); |
| assertTrue(table.getCompletedCleanTimeline().empty()); |
| |
| insertFirstFailedBigBatchForClientCleanerTest(context, client, recordInsertGenWrappedFunction, insertFn); |
| |
| insertFirstFailedBigBatchForClientCleanerTest(context, client, recordInsertGenWrappedFunction, insertFn); |
| |
| Pair<String, JavaRDD<WriteStatus>> ret = |
| insertFirstFailedBigBatchForClientCleanerTest(context, client, recordInsertGenWrappedFunction, insertFn); |
| // Await till enough time passes such that the last failed commits heartbeats are expired |
| await().atMost(10, TimeUnit.SECONDS).until(() -> client.getHeartbeatClient() |
| .isHeartbeatExpired(ret.getLeft())); |
| List<HoodieCleanStat> cleanStats = runCleaner(cfg); |
| assertEquals(0, cleanStats.size(), "Must not clean any files"); |
| HoodieActiveTimeline timeline = metaClient.reloadActiveTimeline(); |
| assertTrue(timeline.getTimelineOfActions( |
| CollectionUtils.createSet(HoodieTimeline.ROLLBACK_ACTION)).filterCompletedInstants().countInstants() == 3); |
| Option<HoodieInstant> rollBackInstantForFailedCommit = timeline.getTimelineOfActions( |
| CollectionUtils.createSet(HoodieTimeline.ROLLBACK_ACTION)).filterCompletedInstants().lastInstant(); |
| HoodieRollbackMetadata rollbackMetadata = TimelineMetadataUtils.deserializeAvroMetadata( |
| timeline.getInstantDetails(rollBackInstantForFailedCommit.get()).get(), HoodieRollbackMetadata.class); |
| // Rollback of one of the failed writes should have deleted 3 files |
| assertEquals(3, rollbackMetadata.getTotalFilesDeleted()); |
| } |
| |
| /** |
| * Helper to run cleaner and collect Clean Stats. |
| * |
| * @param config HoodieWriteConfig |
| */ |
| protected List<HoodieCleanStat> runCleaner(HoodieWriteConfig config) throws IOException { |
| return runCleaner(config, false, false, 1, false); |
| } |
| |
| protected List<HoodieCleanStat> runCleanerWithInstantFormat(HoodieWriteConfig config, boolean needInstantInHudiFormat) throws IOException { |
| return runCleaner(config, false, false, 1, needInstantInHudiFormat); |
| } |
| |
| protected List<HoodieCleanStat> runCleaner(HoodieWriteConfig config, int firstCommitSequence, boolean needInstantInHudiFormat) throws IOException { |
| return runCleaner(config, false, false, firstCommitSequence, needInstantInHudiFormat); |
| } |
| |
| protected List<HoodieCleanStat> runCleaner(HoodieWriteConfig config, boolean simulateRetryFailure) throws IOException { |
| return runCleaner(config, simulateRetryFailure, false, 1, false); |
| } |
| |
| protected List<HoodieCleanStat> runCleaner( |
| HoodieWriteConfig config, boolean simulateRetryFailure, boolean simulateMetadataFailure) throws IOException { |
| return runCleaner(config, simulateRetryFailure, simulateMetadataFailure, 1, false); |
| } |
| |
| /** |
| * Helper to run cleaner and collect Clean Stats. |
| * |
| * @param config HoodieWriteConfig |
| */ |
| protected List<HoodieCleanStat> runCleaner( |
| HoodieWriteConfig config, boolean simulateRetryFailure, boolean simulateMetadataFailure, |
| Integer firstCommitSequence, boolean needInstantInHudiFormat) throws IOException { |
| SparkRDDWriteClient<?> writeClient = getHoodieWriteClient(config); |
| String cleanInstantTs = needInstantInHudiFormat ? makeNewCommitTime(firstCommitSequence, "%014d") : makeNewCommitTime(firstCommitSequence, "%09d"); |
| HoodieCleanMetadata cleanMetadata1 = writeClient.clean(cleanInstantTs); |
| |
| if (null == cleanMetadata1) { |
| return new ArrayList<>(); |
| } |
| |
| if (simulateRetryFailure) { |
| HoodieInstant completedCleanInstant = new HoodieInstant(State.COMPLETED, HoodieTimeline.CLEAN_ACTION, cleanInstantTs); |
| HoodieCleanMetadata metadata = CleanerUtils.getCleanerMetadata(metaClient, completedCleanInstant); |
| metadata.getPartitionMetadata().values().forEach(p -> { |
| String dirPath = metaClient.getBasePath() + "/" + p.getPartitionPath(); |
| p.getSuccessDeleteFiles().forEach(p2 -> { |
| try { |
| metaClient.getFs().create(new Path(dirPath, p2), true).close(); |
| } catch (IOException e) { |
| throw new HoodieIOException(e.getMessage(), e); |
| } |
| }); |
| }); |
| metaClient.reloadActiveTimeline().revertToInflight(completedCleanInstant); |
| |
| if (config.isMetadataTableEnabled() && simulateMetadataFailure) { |
| // Simulate the failure of corresponding instant in the metadata table |
| HoodieTableMetaClient metadataMetaClient = HoodieTableMetaClient.builder() |
| .setBasePath(HoodieTableMetadata.getMetadataTableBasePath(metaClient.getBasePath())) |
| .setConf(metaClient.getHadoopConf()) |
| .build(); |
| HoodieInstant deltaCommit = new HoodieInstant(State.COMPLETED, HoodieTimeline.DELTA_COMMIT_ACTION, cleanInstantTs); |
| metadataMetaClient.reloadActiveTimeline().revertToInflight(deltaCommit); |
| } |
| |
| // retry clean operation again |
| writeClient.clean(); |
| final HoodieCleanMetadata retriedCleanMetadata = CleanerUtils.getCleanerMetadata(HoodieTableMetaClient.reload(metaClient), completedCleanInstant); |
| cleanMetadata1.getPartitionMetadata().keySet().forEach(k -> { |
| HoodieCleanPartitionMetadata p1 = cleanMetadata1.getPartitionMetadata().get(k); |
| HoodieCleanPartitionMetadata p2 = retriedCleanMetadata.getPartitionMetadata().get(k); |
| assertEquals(p1.getDeletePathPatterns(), p2.getDeletePathPatterns()); |
| assertEquals(p1.getSuccessDeleteFiles(), p2.getSuccessDeleteFiles()); |
| assertEquals(p1.getFailedDeleteFiles(), p2.getFailedDeleteFiles()); |
| assertEquals(p1.getPartitionPath(), p2.getPartitionPath()); |
| assertEquals(k, p1.getPartitionPath()); |
| }); |
| } |
| |
| Map<String, HoodieCleanStat> cleanStatMap = cleanMetadata1.getPartitionMetadata().values().stream() |
| .map(x -> new HoodieCleanStat.Builder().withPartitionPath(x.getPartitionPath()) |
| .withFailedDeletes(x.getFailedDeleteFiles()).withSuccessfulDeletes(x.getSuccessDeleteFiles()) |
| .withPolicy(HoodieCleaningPolicy.valueOf(x.getPolicy())).withDeletePathPattern(x.getDeletePathPatterns()) |
| .withEarliestCommitRetained(Option.ofNullable(cleanMetadata1.getEarliestCommitToRetain() != null |
| ? new HoodieInstant(State.COMPLETED, HoodieTimeline.COMMIT_ACTION, "000") |
| : null)) |
| .build()) |
| .collect(Collectors.toMap(HoodieCleanStat::getPartitionPath, x -> x)); |
| cleanMetadata1.getBootstrapPartitionMetadata().values().forEach(x -> { |
| HoodieCleanStat s = cleanStatMap.get(x.getPartitionPath()); |
| cleanStatMap.put(x.getPartitionPath(), new HoodieCleanStat.Builder().withPartitionPath(x.getPartitionPath()) |
| .withFailedDeletes(s.getFailedDeleteFiles()).withSuccessfulDeletes(s.getSuccessDeleteFiles()) |
| .withPolicy(HoodieCleaningPolicy.valueOf(x.getPolicy())).withDeletePathPattern(s.getDeletePathPatterns()) |
| .withEarliestCommitRetained(Option.ofNullable(s.getEarliestCommitToRetain()) |
| .map(y -> new HoodieInstant(State.COMPLETED, HoodieTimeline.COMMIT_ACTION, y))) |
| .withSuccessfulDeleteBootstrapBaseFiles(x.getSuccessDeleteFiles()) |
| .withFailedDeleteBootstrapBaseFiles(x.getFailedDeleteFiles()) |
| .withDeleteBootstrapBasePathPatterns(x.getDeletePathPatterns()).build()); |
| }); |
| return new ArrayList<>(cleanStatMap.values()); |
| } |
| |
| @Test |
| public void testCleanEmptyInstants() throws Exception { |
| HoodieWriteConfig config = |
| HoodieWriteConfig.newBuilder() |
| .withPath(basePath) |
| .withMetadataConfig(HoodieMetadataConfig.newBuilder().withAssumeDatePartitioning(true).build()) |
| .withCleanConfig(HoodieCleanConfig.newBuilder() |
| .withCleanerPolicy(HoodieCleaningPolicy.KEEP_LATEST_COMMITS).build()) |
| .build(); |
| metaClient = HoodieTableMetaClient.reload(metaClient); |
| |
| int commitCount = 20; |
| int cleanCount = 20; |
| |
| int startInstant = 1; |
| |
| for (int i = 0; i < cleanCount; i++, startInstant++) { |
| String commitTime = makeNewCommitTime(startInstant, "%09d"); |
| createEmptyCleanMetadata(commitTime + "", false); |
| } |
| |
| int instantClean = startInstant; |
| |
| for (int i = 0; i < commitCount; i++, startInstant++) { |
| String commitTime = makeNewCommitTime(startInstant, "%09d"); |
| HoodieTestTable.of(metaClient).addCommit(commitTime); |
| } |
| |
| List<HoodieCleanStat> cleanStats = runCleaner(config); |
| HoodieActiveTimeline timeline = metaClient.reloadActiveTimeline(); |
| |
| assertEquals(0, cleanStats.size(), "Must not clean any files"); |
| assertEquals(1, timeline.getTimelineOfActions( |
| CollectionUtils.createSet(HoodieTimeline.CLEAN_ACTION)).filterInflightsAndRequested().countInstants()); |
| assertEquals(0, timeline.getTimelineOfActions( |
| CollectionUtils.createSet(HoodieTimeline.CLEAN_ACTION)).filterInflights().countInstants()); |
| assertEquals(--cleanCount, timeline.getTimelineOfActions( |
| CollectionUtils.createSet(HoodieTimeline.CLEAN_ACTION)).filterCompletedInstants().countInstants()); |
| assertTrue(timeline.getTimelineOfActions( |
| CollectionUtils.createSet(HoodieTimeline.CLEAN_ACTION)).filterInflightsAndRequested().containsInstant(makeNewCommitTime(--instantClean, "%09d"))); |
| |
| cleanStats = runCleaner(config); |
| timeline = metaClient.reloadActiveTimeline(); |
| |
| assertEquals(0, cleanStats.size(), "Must not clean any files"); |
| assertEquals(1, timeline.getTimelineOfActions( |
| CollectionUtils.createSet(HoodieTimeline.CLEAN_ACTION)).filterInflightsAndRequested().countInstants()); |
| assertEquals(0, timeline.getTimelineOfActions( |
| CollectionUtils.createSet(HoodieTimeline.CLEAN_ACTION)).filterInflights().countInstants()); |
| assertEquals(--cleanCount, timeline.getTimelineOfActions( |
| CollectionUtils.createSet(HoodieTimeline.CLEAN_ACTION)).filterCompletedInstants().countInstants()); |
| assertTrue(timeline.getTimelineOfActions( |
| CollectionUtils.createSet(HoodieTimeline.CLEAN_ACTION)).filterInflightsAndRequested().containsInstant(makeNewCommitTime(--instantClean, "%09d"))); |
| } |
| |
| @Test |
| public void testCleanWithReplaceCommits() throws Exception { |
| HoodieWriteConfig config = HoodieWriteConfig.newBuilder().withPath(basePath) |
| .withMetadataConfig(HoodieMetadataConfig.newBuilder() |
| .withMaxNumDeltaCommitsBeforeCompaction(1) |
| .withAssumeDatePartitioning(true).build()) |
| .withCleanConfig(HoodieCleanConfig.newBuilder() |
| .withCleanerPolicy(HoodieCleaningPolicy.KEEP_LATEST_COMMITS) |
| .retainCommits(2).build()) |
| .build(); |
| |
| HoodieTableMetadataWriter metadataWriter = SparkHoodieBackedTableMetadataWriter.create(hadoopConf, config, context); |
| HoodieTestTable testTable = HoodieMetadataTestTable.of(metaClient, metadataWriter); |
| String p0 = "2020/01/01"; |
| String p1 = "2020/01/02"; |
| |
| // make 1 commit, with 1 file per partition |
| String file1P0C0 = UUID.randomUUID().toString(); |
| String file1P1C0 = UUID.randomUUID().toString(); |
| testTable.addInflightCommit("00000000000001").withBaseFilesInPartition(p0, file1P0C0).withBaseFilesInPartition(p1, file1P1C0); |
| |
| HoodieCommitMetadata commitMetadata = generateCommitMetadata("00000000000001", |
| Collections.unmodifiableMap(new HashMap<String, List<String>>() { |
| { |
| put(p0, CollectionUtils.createImmutableList(file1P0C0)); |
| put(p1, CollectionUtils.createImmutableList(file1P1C0)); |
| } |
| }) |
| ); |
| metadataWriter.update(commitMetadata, "00000000000001", false); |
| metaClient.getActiveTimeline().saveAsComplete( |
| new HoodieInstant(State.INFLIGHT, HoodieTimeline.COMMIT_ACTION, "00000000000001"), |
| Option.of(commitMetadata.toJsonString().getBytes(StandardCharsets.UTF_8))); |
| |
| metaClient = HoodieTableMetaClient.reload(metaClient); |
| |
| List<HoodieCleanStat> hoodieCleanStatsOne = runCleanerWithInstantFormat(config, true); |
| assertEquals(0, hoodieCleanStatsOne.size(), "Must not scan any partitions and clean any files"); |
| assertTrue(testTable.baseFileExists(p0, "00000000000001", file1P0C0)); |
| assertTrue(testTable.baseFileExists(p1, "00000000000001", file1P1C0)); |
| |
| // make next replacecommit, with 1 clustering operation. logically delete p0. No change to p1 |
| // notice that clustering generates empty inflight commit files |
| Map<String, String> partitionAndFileId002 = testTable.forReplaceCommit("00000000000002").getFileIdsWithBaseFilesInPartitions(p0); |
| String file2P0C1 = partitionAndFileId002.get(p0); |
| Pair<HoodieRequestedReplaceMetadata, HoodieReplaceCommitMetadata> replaceMetadata = |
| generateReplaceCommitMetadata("00000000000002", p0, file1P0C0, file2P0C1); |
| testTable.addReplaceCommit("00000000000002", Option.of(replaceMetadata.getKey()), Option.empty(), replaceMetadata.getValue()); |
| |
| // run cleaner |
| List<HoodieCleanStat> hoodieCleanStatsTwo = runCleanerWithInstantFormat(config, true); |
| assertEquals(0, hoodieCleanStatsTwo.size(), "Must not scan any partitions and clean any files"); |
| assertTrue(testTable.baseFileExists(p0, "00000000000002", file2P0C1)); |
| assertTrue(testTable.baseFileExists(p0, "00000000000001", file1P0C0)); |
| assertTrue(testTable.baseFileExists(p1, "00000000000001", file1P1C0)); |
| |
| // make next replacecommit, with 1 clustering operation. Replace data in p1. No change to p0 |
| // notice that clustering generates empty inflight commit files |
| Map<String, String> partitionAndFileId003 = testTable.forReplaceCommit("00000000000003").getFileIdsWithBaseFilesInPartitions(p1); |
| String file3P1C2 = partitionAndFileId003.get(p1); |
| replaceMetadata = generateReplaceCommitMetadata("00000000000003", p1, file1P1C0, file3P1C2); |
| testTable.addReplaceCommit("00000000000003", Option.of(replaceMetadata.getKey()), Option.empty(), replaceMetadata.getValue()); |
| |
| // run cleaner |
| List<HoodieCleanStat> hoodieCleanStatsThree = runCleanerWithInstantFormat(config, true); |
| assertEquals(0, hoodieCleanStatsThree.size(), "Must not scan any partitions and clean any files"); |
| assertTrue(testTable.baseFileExists(p0, "00000000000002", file2P0C1)); |
| assertTrue(testTable.baseFileExists(p0, "00000000000001", file1P0C0)); |
| assertTrue(testTable.baseFileExists(p1, "00000000000003", file3P1C2)); |
| assertTrue(testTable.baseFileExists(p1, "00000000000001", file1P1C0)); |
| |
| // make next replacecommit, with 1 clustering operation. Replace data in p0 again |
| // notice that clustering generates empty inflight commit files |
| Map<String, String> partitionAndFileId004 = testTable.forReplaceCommit("00000000000004").getFileIdsWithBaseFilesInPartitions(p0); |
| String file4P0C3 = partitionAndFileId004.get(p0); |
| replaceMetadata = generateReplaceCommitMetadata("00000000000004", p0, file2P0C1, file4P0C3); |
| testTable.addReplaceCommit("00000000000004", Option.of(replaceMetadata.getKey()), Option.empty(), replaceMetadata.getValue()); |
| |
| // run cleaner |
| List<HoodieCleanStat> hoodieCleanStatsFour = runCleaner(config, 5, true); |
| assertTrue(testTable.baseFileExists(p0, "00000000000004", file4P0C3)); |
| assertTrue(testTable.baseFileExists(p0, "00000000000002", file2P0C1)); |
| assertTrue(testTable.baseFileExists(p1, "00000000000003", file3P1C2)); |
| assertFalse(testTable.baseFileExists(p0, "00000000000001", file1P0C0)); |
| //file1P1C0 still stays because its not replaced until 3 and its the only version available |
| assertTrue(testTable.baseFileExists(p1, "00000000000001", file1P1C0)); |
| |
| // make next replacecommit, with 1 clustering operation. Replace all data in p1. no new files created |
| // notice that clustering generates empty inflight commit files |
| Map<String, String> partitionAndFileId005 = testTable.forReplaceCommit("00000000000006").getFileIdsWithBaseFilesInPartitions(p1); |
| String file4P1C4 = partitionAndFileId005.get(p1); |
| replaceMetadata = generateReplaceCommitMetadata("00000000000006", p0, file3P1C2, file4P1C4); |
| testTable.addReplaceCommit("00000000000006", Option.of(replaceMetadata.getKey()), Option.empty(), replaceMetadata.getValue()); |
| |
| List<HoodieCleanStat> hoodieCleanStatsFive = runCleaner(config, 7, true); |
| assertTrue(testTable.baseFileExists(p0, "00000000000004", file4P0C3)); |
| assertTrue(testTable.baseFileExists(p0, "00000000000002", file2P0C1)); |
| assertTrue(testTable.baseFileExists(p1, "00000000000003", file3P1C2)); |
| assertFalse(testTable.baseFileExists(p0, "00000000000001", file1P0C0)); |
| assertFalse(testTable.baseFileExists(p1, "00000000000001", file1P1C0)); |
| } |
| |
| private Pair<HoodieRequestedReplaceMetadata, HoodieReplaceCommitMetadata> generateReplaceCommitMetadata( |
| String instantTime, String partition, String replacedFileId, String newFileId) { |
| HoodieRequestedReplaceMetadata requestedReplaceMetadata = new HoodieRequestedReplaceMetadata(); |
| requestedReplaceMetadata.setOperationType(WriteOperationType.CLUSTER.toString()); |
| requestedReplaceMetadata.setVersion(1); |
| HoodieSliceInfo sliceInfo = HoodieSliceInfo.newBuilder().setFileId(replacedFileId).build(); |
| List<HoodieClusteringGroup> clusteringGroups = new ArrayList<>(); |
| clusteringGroups.add(HoodieClusteringGroup.newBuilder() |
| .setVersion(1).setNumOutputFileGroups(1).setMetrics(Collections.emptyMap()) |
| .setSlices(Collections.singletonList(sliceInfo)).build()); |
| requestedReplaceMetadata.setExtraMetadata(Collections.emptyMap()); |
| requestedReplaceMetadata.setClusteringPlan(HoodieClusteringPlan.newBuilder() |
| .setVersion(1).setExtraMetadata(Collections.emptyMap()) |
| .setStrategy(HoodieClusteringStrategy.newBuilder().setStrategyClassName("").setVersion(1).build()) |
| .setInputGroups(clusteringGroups).build()); |
| |
| HoodieReplaceCommitMetadata replaceMetadata = new HoodieReplaceCommitMetadata(); |
| replaceMetadata.addReplaceFileId(partition, replacedFileId); |
| replaceMetadata.setOperationType(WriteOperationType.CLUSTER); |
| if (!StringUtils.isNullOrEmpty(newFileId)) { |
| HoodieWriteStat writeStat = new HoodieWriteStat(); |
| writeStat.setPartitionPath(partition); |
| writeStat.setPath(partition + "/" + getBaseFilename(instantTime, newFileId)); |
| writeStat.setFileId(newFileId); |
| writeStat.setTotalWriteBytes(1); |
| writeStat.setFileSizeInBytes(1); |
| replaceMetadata.addWriteStat(partition, writeStat); |
| } |
| return Pair.of(requestedReplaceMetadata, replaceMetadata); |
| } |
| |
| @Test |
| public void testCleanMetadataUpgradeDowngrade() { |
| String instantTime = "000"; |
| |
| String partition1 = DEFAULT_PARTITION_PATHS[0]; |
| String partition2 = DEFAULT_PARTITION_PATHS[1]; |
| |
| String extension = metaClient.getTableConfig().getBaseFileFormat().getFileExtension(); |
| String fileName1 = "data1_1_000" + extension; |
| String fileName2 = "data2_1_000" + extension; |
| |
| String filePath1 = metaClient.getBasePath() + "/" + partition1 + "/" + fileName1; |
| String filePath2 = metaClient.getBasePath() + "/" + partition1 + "/" + fileName2; |
| |
| List<String> deletePathPatterns1 = Arrays.asList(filePath1, filePath2); |
| List<String> successDeleteFiles1 = Collections.singletonList(filePath1); |
| List<String> failedDeleteFiles1 = Collections.singletonList(filePath2); |
| |
| // create partition1 clean stat. |
| HoodieCleanStat cleanStat1 = new HoodieCleanStat(HoodieCleaningPolicy.KEEP_LATEST_FILE_VERSIONS, |
| partition1, deletePathPatterns1, successDeleteFiles1, |
| failedDeleteFiles1, instantTime, ""); |
| |
| List<String> deletePathPatterns2 = new ArrayList<>(); |
| List<String> successDeleteFiles2 = new ArrayList<>(); |
| List<String> failedDeleteFiles2 = new ArrayList<>(); |
| |
| // create partition2 empty clean stat. |
| HoodieCleanStat cleanStat2 = new HoodieCleanStat(HoodieCleaningPolicy.KEEP_LATEST_COMMITS, |
| partition2, deletePathPatterns2, successDeleteFiles2, |
| failedDeleteFiles2, instantTime, ""); |
| |
| // map with absolute file path. |
| Map<String, Tuple3> oldExpected = new HashMap<>(); |
| oldExpected.put(partition1, new Tuple3<>(deletePathPatterns1, successDeleteFiles1, failedDeleteFiles1)); |
| oldExpected.put(partition2, new Tuple3<>(deletePathPatterns2, successDeleteFiles2, failedDeleteFiles2)); |
| |
| // map with relative path. |
| Map<String, Tuple3> newExpected = new HashMap<>(); |
| newExpected.put(partition1, new Tuple3<>(Arrays.asList(fileName1, fileName2), Collections.singletonList(fileName1), |
| Collections.singletonList(fileName2))); |
| newExpected.put(partition2, new Tuple3<>(deletePathPatterns2, successDeleteFiles2, failedDeleteFiles2)); |
| |
| HoodieCleanMetadata metadata = CleanerUtils.convertCleanMetadata( |
| instantTime, |
| Option.of(0L), |
| Arrays.asList(cleanStat1, cleanStat2) |
| ); |
| metadata.setVersion(CleanerUtils.CLEAN_METADATA_VERSION_1); |
| |
| // NOw upgrade and check |
| CleanMetadataMigrator metadataMigrator = new CleanMetadataMigrator(metaClient); |
| metadata = metadataMigrator.upgradeToLatest(metadata, metadata.getVersion()); |
| assertCleanMetadataPathEquals(newExpected, metadata); |
| |
| CleanMetadataMigrator migrator = new CleanMetadataMigrator(metaClient); |
| HoodieCleanMetadata oldMetadata = |
| migrator.migrateToVersion(metadata, metadata.getVersion(), CleanerUtils.CLEAN_METADATA_VERSION_1); |
| assertEquals(CleanerUtils.CLEAN_METADATA_VERSION_1, oldMetadata.getVersion()); |
| assertCleanMetadataEquals(metadata, oldMetadata); |
| assertCleanMetadataPathEquals(oldExpected, oldMetadata); |
| |
| HoodieCleanMetadata newMetadata = migrator.upgradeToLatest(oldMetadata, oldMetadata.getVersion()); |
| assertEquals(CleanerUtils.LATEST_CLEAN_METADATA_VERSION, newMetadata.getVersion()); |
| assertCleanMetadataEquals(oldMetadata, newMetadata); |
| assertCleanMetadataPathEquals(newExpected, newMetadata); |
| assertCleanMetadataPathEquals(oldExpected, oldMetadata); |
| } |
| |
| private static void assertCleanMetadataEquals(HoodieCleanMetadata expected, HoodieCleanMetadata actual) { |
| assertEquals(expected.getEarliestCommitToRetain(), actual.getEarliestCommitToRetain()); |
| assertEquals(expected.getStartCleanTime(), actual.getStartCleanTime()); |
| assertEquals(expected.getTimeTakenInMillis(), actual.getTimeTakenInMillis()); |
| assertEquals(expected.getTotalFilesDeleted(), actual.getTotalFilesDeleted()); |
| |
| Map<String, HoodieCleanPartitionMetadata> map1 = expected.getPartitionMetadata(); |
| Map<String, HoodieCleanPartitionMetadata> map2 = actual.getPartitionMetadata(); |
| |
| assertEquals(map1.keySet(), map2.keySet()); |
| |
| List<String> partitions1 = map1.values().stream().map(HoodieCleanPartitionMetadata::getPartitionPath).collect( |
| Collectors.toList()); |
| List<String> partitions2 = map2.values().stream().map(HoodieCleanPartitionMetadata::getPartitionPath).collect( |
| Collectors.toList()); |
| assertEquals(partitions1, partitions2); |
| |
| List<String> policies1 = map1.values().stream().map(HoodieCleanPartitionMetadata::getPolicy).collect(Collectors.toList()); |
| List<String> policies2 = map2.values().stream().map(HoodieCleanPartitionMetadata::getPolicy).collect(Collectors.toList()); |
| assertEquals(policies1, policies2); |
| } |
| |
| @Test |
| public void testCleanPlanUpgradeDowngrade() { |
| String instantTime = "000"; |
| |
| String partition1 = DEFAULT_PARTITION_PATHS[0]; |
| String partition2 = DEFAULT_PARTITION_PATHS[1]; |
| |
| String extension = metaClient.getTableConfig().getBaseFileFormat().getFileExtension(); |
| String fileName1 = "data1_1_000" + extension; |
| String fileName2 = "data2_1_000" + extension; |
| |
| Map<String, List<String>> filesToBeCleanedPerPartition = new HashMap<>(); |
| filesToBeCleanedPerPartition.put(partition1, Arrays.asList(fileName1)); |
| filesToBeCleanedPerPartition.put(partition2, Arrays.asList(fileName2)); |
| |
| HoodieCleanerPlan version1Plan = |
| HoodieCleanerPlan.newBuilder().setEarliestInstantToRetain(HoodieActionInstant.newBuilder() |
| .setAction(HoodieTimeline.COMMIT_ACTION) |
| .setTimestamp(instantTime).setState(State.COMPLETED.name()).build()) |
| .setPolicy(HoodieCleaningPolicy.KEEP_LATEST_COMMITS.name()) |
| .setFilesToBeDeletedPerPartition(filesToBeCleanedPerPartition) |
| .setVersion(CleanPlanV1MigrationHandler.VERSION) |
| .build(); |
| |
| // Upgrade and Verify version 2 plan |
| HoodieCleanerPlan version2Plan = |
| new CleanPlanMigrator(metaClient).upgradeToLatest(version1Plan, version1Plan.getVersion()); |
| assertEquals(version1Plan.getEarliestInstantToRetain(), version2Plan.getEarliestInstantToRetain()); |
| assertEquals(version1Plan.getPolicy(), version2Plan.getPolicy()); |
| assertEquals(CleanPlanner.LATEST_CLEAN_PLAN_VERSION, version2Plan.getVersion()); |
| // Deprecated Field is not used. |
| assertEquals(0, version2Plan.getFilesToBeDeletedPerPartition().size()); |
| assertEquals(version1Plan.getFilesToBeDeletedPerPartition().size(), |
| version2Plan.getFilePathsToBeDeletedPerPartition().size()); |
| assertEquals(version1Plan.getFilesToBeDeletedPerPartition().get(partition1).size(), |
| version2Plan.getFilePathsToBeDeletedPerPartition().get(partition1).size()); |
| assertEquals(version1Plan.getFilesToBeDeletedPerPartition().get(partition2).size(), |
| version2Plan.getFilePathsToBeDeletedPerPartition().get(partition2).size()); |
| assertEquals(new Path(FSUtils.getPartitionPath(metaClient.getBasePath(), partition1), fileName1).toString(), |
| version2Plan.getFilePathsToBeDeletedPerPartition().get(partition1).get(0).getFilePath()); |
| assertEquals(new Path(FSUtils.getPartitionPath(metaClient.getBasePath(), partition2), fileName2).toString(), |
| version2Plan.getFilePathsToBeDeletedPerPartition().get(partition2).get(0).getFilePath()); |
| |
| // Downgrade and verify version 1 plan |
| HoodieCleanerPlan gotVersion1Plan = new CleanPlanMigrator(metaClient).migrateToVersion(version2Plan, |
| version2Plan.getVersion(), version1Plan.getVersion()); |
| assertEquals(version1Plan.getEarliestInstantToRetain(), gotVersion1Plan.getEarliestInstantToRetain()); |
| assertEquals(version1Plan.getPolicy(), version2Plan.getPolicy()); |
| assertEquals(version1Plan.getVersion(), gotVersion1Plan.getVersion()); |
| assertEquals(version1Plan.getFilesToBeDeletedPerPartition().size(), |
| gotVersion1Plan.getFilesToBeDeletedPerPartition().size()); |
| assertEquals(version1Plan.getFilesToBeDeletedPerPartition().get(partition1).size(), |
| gotVersion1Plan.getFilesToBeDeletedPerPartition().get(partition1).size()); |
| assertEquals(version1Plan.getFilesToBeDeletedPerPartition().get(partition2).size(), |
| gotVersion1Plan.getFilesToBeDeletedPerPartition().get(partition2).size()); |
| assertEquals(version1Plan.getFilesToBeDeletedPerPartition().get(partition1).get(0), |
| gotVersion1Plan.getFilesToBeDeletedPerPartition().get(partition1).get(0)); |
| assertEquals(version1Plan.getFilesToBeDeletedPerPartition().get(partition2).get(0), |
| gotVersion1Plan.getFilesToBeDeletedPerPartition().get(partition2).get(0)); |
| assertTrue(gotVersion1Plan.getFilePathsToBeDeletedPerPartition().isEmpty()); |
| assertNull(version1Plan.getFilePathsToBeDeletedPerPartition()); |
| } |
| |
| private static void assertCleanMetadataPathEquals(Map<String, Tuple3> expected, HoodieCleanMetadata actual) { |
| |
| Map<String, HoodieCleanPartitionMetadata> partitionMetadataMap = actual.getPartitionMetadata(); |
| |
| for (Map.Entry<String, HoodieCleanPartitionMetadata> entry : partitionMetadataMap.entrySet()) { |
| String partitionPath = entry.getKey(); |
| HoodieCleanPartitionMetadata partitionMetadata = entry.getValue(); |
| |
| assertEquals(expected.get(partitionPath)._1(), partitionMetadata.getDeletePathPatterns()); |
| assertEquals(expected.get(partitionPath)._2(), partitionMetadata.getSuccessDeleteFiles()); |
| assertEquals(expected.get(partitionPath)._3(), partitionMetadata.getFailedDeleteFiles()); |
| } |
| } |
| |
| /** |
| * Generate Bootstrap index, bootstrap base file and corresponding metaClient. |
| * @return Partition to BootstrapFileMapping Map |
| * @throws IOException |
| */ |
| protected Map<String, List<BootstrapFileMapping>> generateBootstrapIndexAndSourceData(String... partitions) throws IOException { |
| // create bootstrap source data path |
| java.nio.file.Path sourcePath = tempDir.resolve("data"); |
| java.nio.file.Files.createDirectories(sourcePath); |
| assertTrue(new File(sourcePath.toString()).exists()); |
| |
| // recreate metaClient with Bootstrap base path |
| metaClient = HoodieTestUtils.init(basePath, getTableType(), sourcePath.toString(), true); |
| |
| // generate bootstrap index |
| Map<String, List<BootstrapFileMapping>> bootstrapMapping = TestBootstrapIndex.generateBootstrapIndex(metaClient, sourcePath.toString(), |
| partitions, 1); |
| |
| for (Map.Entry<String, List<BootstrapFileMapping>> entry : bootstrapMapping.entrySet()) { |
| new File(sourcePath.toString() + "/" + entry.getKey()).mkdirs(); |
| assertTrue(new File(entry.getValue().get(0).getBootstrapFileStatus().getPath().getUri()).createNewFile()); |
| } |
| return bootstrapMapping; |
| } |
| |
| /** |
| * Test Cleaning functionality of table.rollback() API. |
| */ |
| @Test |
| public void testCleanMarkerDataFilesOnRollback() throws Exception { |
| HoodieTestTable testTable = HoodieTestTable.of(metaClient) |
| .addRequestedCommit("001") |
| .withMarkerFiles("default", 10, IOType.MERGE); |
| final int numTempFilesBefore = testTable.listAllFilesInTempFolder().length; |
| assertEquals(10, numTempFilesBefore, "Some marker files are created."); |
| |
| HoodieWriteConfig config = HoodieWriteConfig.newBuilder() |
| .withFileSystemViewConfig(FileSystemViewStorageConfig.newBuilder() |
| .withRemoteServerPort(timelineServicePort).build()) |
| .withPath(basePath).build(); |
| metaClient = HoodieTableMetaClient.reload(metaClient); |
| HoodieTable table = HoodieSparkTable.create(config, context, metaClient); |
| table.getActiveTimeline().transitionRequestedToInflight( |
| new HoodieInstant(State.REQUESTED, HoodieTimeline.COMMIT_ACTION, "001"), Option.empty()); |
| metaClient.reloadActiveTimeline(); |
| HoodieInstant rollbackInstant = new HoodieInstant(State.INFLIGHT, HoodieTimeline.COMMIT_ACTION, "001"); |
| table.scheduleRollback(context, "002", rollbackInstant, false, config.shouldRollbackUsingMarkers()); |
| table.rollback(context, "002", rollbackInstant, true, false); |
| final int numTempFilesAfter = testTable.listAllFilesInTempFolder().length; |
| assertEquals(0, numTempFilesAfter, "All temp files are deleted."); |
| } |
| |
| /** |
| * Test CLeaner Stat when there are no partition paths. |
| */ |
| @Test |
| public void testCleaningWithZeroPartitionPaths() throws Exception { |
| HoodieWriteConfig config = HoodieWriteConfig.newBuilder().withPath(basePath) |
| .withMetadataConfig(HoodieMetadataConfig.newBuilder().withAssumeDatePartitioning(true).build()) |
| .withCleanConfig(HoodieCleanConfig.newBuilder() |
| .withCleanerPolicy(HoodieCleaningPolicy.KEEP_LATEST_COMMITS).retainCommits(2).build()) |
| .build(); |
| |
| // Make a commit, although there are no partitionPaths. |
| // Example use-case of this is when a client wants to create a table |
| // with just some commit metadata, but no data/partitionPaths. |
| HoodieTableMetadataWriter metadataWriter = SparkHoodieBackedTableMetadataWriter.create(hadoopConf, config, context); |
| HoodieTestTable testTable = HoodieMetadataTestTable.of(metaClient, metadataWriter); |
| testTable.doWriteOperation("001", WriteOperationType.INSERT, Collections.emptyList(), 1); |
| |
| metaClient = HoodieTableMetaClient.reload(metaClient); |
| |
| List<HoodieCleanStat> hoodieCleanStatsOne = runCleaner(config); |
| assertTrue(hoodieCleanStatsOne.isEmpty(), "HoodieCleanStats should be empty for a table with empty partitionPaths"); |
| } |
| |
| /** |
| * Test Keep Latest Commits when there are pending compactions. |
| */ |
| @ParameterizedTest |
| @ValueSource(booleans = {true, false}) |
| public void testKeepLatestCommitsWithPendingCompactions(boolean isAsync) throws Exception { |
| HoodieWriteConfig config = HoodieWriteConfig.newBuilder().withPath(basePath) |
| .withMetadataConfig(HoodieMetadataConfig.newBuilder().withAssumeDatePartitioning(true).build()) |
| .withCleanConfig(HoodieCleanConfig.newBuilder() |
| .withCleanerPolicy(HoodieCleaningPolicy.KEEP_LATEST_COMMITS).withAsyncClean(isAsync).retainCommits(2).build()) |
| .build(); |
| // Deletions: |
| // . FileId Base Logs Total Retained Commits |
| // FileId7 5 10 15 009, 011 |
| // FileId6 5 10 15 009 |
| // FileId5 3 6 9 005 |
| // FileId4 2 4 6 003 |
| // FileId3 1 2 3 001 |
| // FileId2 0 0 0 000 |
| // FileId1 0 0 0 000 |
| testPendingCompactions(config, 48, 18, false); |
| } |
| |
| /** |
| * Test Keep Latest Versions when there are pending compactions. |
| */ |
| @ParameterizedTest |
| @ValueSource(booleans = {false, true}) |
| public void testKeepLatestVersionsWithPendingCompactions(boolean retryFailure) throws Exception { |
| HoodieWriteConfig config = |
| HoodieWriteConfig.newBuilder().withPath(basePath) |
| .withMetadataConfig(HoodieMetadataConfig.newBuilder().withAssumeDatePartitioning(true).build()) |
| .withCleanConfig(HoodieCleanConfig.newBuilder() |
| .withCleanerPolicy(HoodieCleaningPolicy.KEEP_LATEST_FILE_VERSIONS).retainFileVersions(2).build()) |
| .build(); |
| // Deletions: |
| // . FileId Base Logs Total Retained Commits |
| // FileId7 5 10 15 009, 011 |
| // FileId6 4 8 12 007, 009 |
| // FileId5 2 4 6 003 005 |
| // FileId4 1 2 3 001, 003 |
| // FileId3 0 0 0 000, 001 |
| // FileId2 0 0 0 000 |
| // FileId1 0 0 0 000 |
| testPendingCompactions(config, 36, 9, retryFailure); |
| } |
| |
| /** |
| * Test clean previous corrupted cleanFiles. |
| */ |
| @Test |
| public void testCleanPreviousCorruptedCleanFiles() throws IOException { |
| HoodieWriteConfig config = |
| HoodieWriteConfig.newBuilder() |
| .withPath(basePath) |
| .withMetadataConfig(HoodieMetadataConfig.newBuilder().withAssumeDatePartitioning(true).build()) |
| .withCleanConfig(HoodieCleanConfig.newBuilder() |
| .withCleanerPolicy(HoodieCleaningPolicy.KEEP_LATEST_FILE_VERSIONS).retainFileVersions(1).build()) |
| .build(); |
| |
| String commitTime = makeNewCommitTime(1, "%09d"); |
| List<String> cleanerFileNames = Arrays.asList( |
| HoodieTimeline.makeRequestedCleanerFileName(commitTime), |
| HoodieTimeline.makeInflightCleanerFileName(commitTime)); |
| for (String f : cleanerFileNames) { |
| Path commitFile = new Path(Paths |
| .get(metaClient.getBasePath(), HoodieTableMetaClient.METAFOLDER_NAME, f).toString()); |
| try (FSDataOutputStream os = metaClient.getFs().create(commitFile, true)) { |
| // Write empty clean metadata |
| os.write(new byte[0]); |
| } |
| } |
| metaClient = HoodieTableMetaClient.reload(metaClient); |
| |
| List<HoodieCleanStat> cleanStats = runCleaner(config); |
| assertEquals(0, cleanStats.size(), "Must not clean any files"); |
| } |
| |
| @ParameterizedTest |
| @ValueSource(booleans = {true, false}) |
| public void testRerunFailedClean(boolean simulateMetadataFailure) throws Exception { |
| HoodieWriteConfig config = HoodieWriteConfig.newBuilder().withPath(basePath) |
| .withMetadataConfig(HoodieMetadataConfig.newBuilder() |
| .withMaxNumDeltaCommitsBeforeCompaction(1) |
| .withAssumeDatePartitioning(true).build()) |
| .withCleanConfig(HoodieCleanConfig.newBuilder() |
| .withCleanerPolicy(HoodieCleaningPolicy.KEEP_LATEST_COMMITS).retainCommits(2).build()) |
| .build(); |
| |
| HoodieTableMetadataWriter metadataWriter = SparkHoodieBackedTableMetadataWriter.create(hadoopConf, config, context); |
| HoodieTestTable testTable = HoodieMetadataTestTable.of(metaClient, metadataWriter); |
| String p0 = "2020/01/01"; |
| String p1 = "2020/01/02"; |
| |
| // make 1 commit, with 1 file per partition |
| String file1P0C0 = UUID.randomUUID().toString(); |
| String file1P1C0 = UUID.randomUUID().toString(); |
| testTable.addInflightCommit("00000000000001").withBaseFilesInPartition(p0, file1P0C0).withBaseFilesInPartition(p1, file1P1C0); |
| |
| HoodieCommitMetadata commitMetadata = generateCommitMetadata("00000000000001", |
| Collections.unmodifiableMap(new HashMap<String, List<String>>() { |
| { |
| put(p0, CollectionUtils.createImmutableList(file1P0C0)); |
| put(p1, CollectionUtils.createImmutableList(file1P1C0)); |
| } |
| }) |
| ); |
| metadataWriter.update(commitMetadata, "00000000000001", false); |
| metaClient.getActiveTimeline().saveAsComplete( |
| new HoodieInstant(State.INFLIGHT, HoodieTimeline.COMMIT_ACTION, "00000000000001"), |
| Option.of(commitMetadata.toJsonString().getBytes(StandardCharsets.UTF_8))); |
| |
| metaClient = HoodieTableMetaClient.reload(metaClient); |
| |
| // make next replacecommit, with 1 clustering operation. logically delete p0. No change to p1 |
| // notice that clustering generates empty inflight commit files |
| Map<String, String> partitionAndFileId002 = testTable.forReplaceCommit("00000000000002").getFileIdsWithBaseFilesInPartitions(p0); |
| String file2P0C1 = partitionAndFileId002.get(p0); |
| Pair<HoodieRequestedReplaceMetadata, HoodieReplaceCommitMetadata> replaceMetadata = |
| generateReplaceCommitMetadata("00000000000002", p0, file1P0C0, file2P0C1); |
| testTable.addReplaceCommit("00000000000002", Option.of(replaceMetadata.getKey()), Option.empty(), replaceMetadata.getValue()); |
| |
| // make next replacecommit, with 1 clustering operation. Replace data in p1. No change to p0 |
| // notice that clustering generates empty inflight commit files |
| Map<String, String> partitionAndFileId003 = testTable.forReplaceCommit("00000000000003").getFileIdsWithBaseFilesInPartitions(p1); |
| String file3P1C2 = partitionAndFileId003.get(p1); |
| replaceMetadata = generateReplaceCommitMetadata("00000000000003", p1, file1P1C0, file3P1C2); |
| testTable.addReplaceCommit("00000000000003", Option.of(replaceMetadata.getKey()), Option.empty(), replaceMetadata.getValue()); |
| |
| // make next replacecommit, with 1 clustering operation. Replace data in p0 again |
| // notice that clustering generates empty inflight commit files |
| Map<String, String> partitionAndFileId004 = testTable.forReplaceCommit("00000000000004").getFileIdsWithBaseFilesInPartitions(p0); |
| String file4P0C3 = partitionAndFileId004.get(p0); |
| replaceMetadata = generateReplaceCommitMetadata("00000000000004", p0, file2P0C1, file4P0C3); |
| testTable.addReplaceCommit("00000000000004", Option.of(replaceMetadata.getKey()), Option.empty(), replaceMetadata.getValue()); |
| |
| // run cleaner with failures |
| List<HoodieCleanStat> hoodieCleanStats = runCleaner(config, true, simulateMetadataFailure, 5, true); |
| assertTrue(testTable.baseFileExists(p0, "00000000000004", file4P0C3)); |
| assertTrue(testTable.baseFileExists(p0, "00000000000002", file2P0C1)); |
| assertTrue(testTable.baseFileExists(p1, "00000000000003", file3P1C2)); |
| assertFalse(testTable.baseFileExists(p0, "00000000000001", file1P0C0)); |
| //file1P1C0 still stays because its not replaced until 3 and its the only version available |
| assertTrue(testTable.baseFileExists(p1, "00000000000001", file1P1C0)); |
| } |
| |
| /** |
| * Common test method for validating pending compactions. |
| * |
| * @param config Hoodie Write Config |
| * @param expNumFilesDeleted Number of files deleted |
| */ |
| private void testPendingCompactions(HoodieWriteConfig config, int expNumFilesDeleted, |
| int expNumFilesUnderCompactionDeleted, boolean retryFailure) throws Exception { |
| HoodieTableMetaClient metaClient = |
| HoodieTestUtils.init(hadoopConf, basePath, HoodieTableType.MERGE_ON_READ); |
| final String partition = "2016/03/15"; |
| Map<String, String> expFileIdToPendingCompaction = new HashMap<String, String>() { |
| { |
| put("fileId2", "004"); |
| put("fileId3", "006"); |
| put("fileId4", "008"); |
| put("fileId5", "010"); |
| } |
| }; |
| Map<String, String> fileIdToLatestInstantBeforeCompaction = new HashMap<String, String>() { |
| { |
| put("fileId1", "000"); |
| put("fileId2", "000"); |
| put("fileId3", "001"); |
| put("fileId4", "003"); |
| put("fileId5", "005"); |
| put("fileId6", "009"); |
| put("fileId7", "011"); |
| } |
| }; |
| |
| // Generate 7 file-groups. First one has only one slice and no pending compaction. File Slices (2 - 5) has |
| // multiple versions with pending compaction. File Slices (6 - 7) have multiple file-slices but not under |
| // compactions |
| // FileIds 2-5 will be under compaction |
| HoodieTestTable.of(metaClient) |
| .addCommit("000") |
| .withBaseFilesInPartition(partition, "fileId1", "fileId2", "fileId3", "fileId4", "fileId5", "fileId6", "fileId7") |
| .withLogFile(partition, "fileId1", 1, 2) |
| .withLogFile(partition, "fileId2", 1, 2) |
| .withLogFile(partition, "fileId3", 1, 2) |
| .withLogFile(partition, "fileId4", 1, 2) |
| .withLogFile(partition, "fileId5", 1, 2) |
| .withLogFile(partition, "fileId6", 1, 2) |
| .withLogFile(partition, "fileId7", 1, 2) |
| .addCommit("001") |
| .withBaseFilesInPartition(partition, "fileId3", "fileId4", "fileId5", "fileId6", "fileId7") |
| .withLogFile(partition, "fileId3", 1, 2) |
| .withLogFile(partition, "fileId4", 1, 2) |
| .withLogFile(partition, "fileId5", 1, 2) |
| .withLogFile(partition, "fileId6", 1, 2) |
| .withLogFile(partition, "fileId7", 1, 2) |
| .addCommit("003") |
| .withBaseFilesInPartition(partition, "fileId4", "fileId5", "fileId6", "fileId7") |
| .withLogFile(partition, "fileId4", 1, 2) |
| .withLogFile(partition, "fileId5", 1, 2) |
| .withLogFile(partition, "fileId6", 1, 2) |
| .withLogFile(partition, "fileId7", 1, 2) |
| .addRequestedCompaction("004", new FileSlice(partition, "000", "fileId2")) |
| .withLogFile(partition, "fileId2", 1, 2) |
| .addCommit("005") |
| .withBaseFilesInPartition(partition, "fileId5", "fileId6", "fileId7") |
| .withLogFile(partition, "fileId5", 1, 2) |
| .withLogFile(partition, "fileId6", 1, 2) |
| .withLogFile(partition, "fileId7", 1, 2) |
| .addRequestedCompaction("006", new FileSlice(partition, "001", "fileId3")) |
| .withLogFile(partition, "fileId3", 1, 2) |
| .addCommit("007") |
| .withBaseFilesInPartition(partition, "fileId6", "fileId7") |
| .withLogFile(partition, "fileId6", 1, 2) |
| .withLogFile(partition, "fileId7", 1, 2) |
| .addRequestedCompaction("008", new FileSlice(partition, "003", "fileId4")) |
| .withLogFile(partition, "fileId4", 1, 2) |
| .addCommit("009") |
| .withBaseFilesInPartition(partition, "fileId6", "fileId7") |
| .withLogFile(partition, "fileId6", 1, 2) |
| .withLogFile(partition, "fileId7", 1, 2) |
| .addRequestedCompaction("010", new FileSlice(partition, "005", "fileId5")) |
| .withLogFile(partition, "fileId5", 1, 2) |
| .addCommit("011") |
| .withBaseFilesInPartition(partition, "fileId7") |
| .withLogFile(partition, "fileId7", 1, 2) |
| .addCommit("013"); |
| |
| // Clean now |
| metaClient = HoodieTableMetaClient.reload(metaClient); |
| List<HoodieCleanStat> hoodieCleanStats = runCleaner(config, retryFailure); |
| |
| // Test for safety |
| final HoodieTableMetaClient newMetaClient = HoodieTableMetaClient.reload(metaClient); |
| final HoodieTable hoodieTable = HoodieSparkTable.create(config, context, metaClient); |
| |
| expFileIdToPendingCompaction.forEach((fileId, value) -> { |
| String baseInstantForCompaction = fileIdToLatestInstantBeforeCompaction.get(fileId); |
| Option<FileSlice> fileSliceForCompaction = Option.fromJavaOptional(hoodieTable.getSliceView() |
| .getLatestFileSlicesBeforeOrOn(partition, baseInstantForCompaction, |
| true) |
| .filter(fs -> fs.getFileId().equals(fileId)).findFirst()); |
| assertTrue(fileSliceForCompaction.isPresent(), "Base Instant for Compaction must be preserved"); |
| assertTrue(fileSliceForCompaction.get().getBaseFile().isPresent(), "FileSlice has data-file"); |
| assertEquals(2, fileSliceForCompaction.get().getLogFiles().count(), "FileSlice has log-files"); |
| }); |
| |
| // Test for progress (Did we clean some files ?) |
| long numFilesUnderCompactionDeleted = hoodieCleanStats.stream() |
| .flatMap(cleanStat -> convertPathToFileIdWithCommitTime(newMetaClient, cleanStat.getDeletePathPatterns()) |
| .map(fileIdWithCommitTime -> { |
| if (expFileIdToPendingCompaction.containsKey(fileIdWithCommitTime.getKey())) { |
| assertTrue(HoodieTimeline.compareTimestamps( |
| fileIdToLatestInstantBeforeCompaction.get(fileIdWithCommitTime.getKey()), |
| HoodieTimeline.GREATER_THAN, fileIdWithCommitTime.getValue()), |
| "Deleted instant time must be less than pending compaction"); |
| return true; |
| } |
| return false; |
| })).filter(x -> x).count(); |
| long numDeleted = |
| hoodieCleanStats.stream().mapToLong(cleanStat -> cleanStat.getDeletePathPatterns().size()).sum(); |
| // Tighter check for regression |
| assertEquals(expNumFilesDeleted, numDeleted, "Correct number of files deleted"); |
| assertEquals(expNumFilesUnderCompactionDeleted, numFilesUnderCompactionDeleted, |
| "Correct number of files under compaction deleted"); |
| } |
| |
| private Stream<Pair<String, String>> convertPathToFileIdWithCommitTime(final HoodieTableMetaClient metaClient, |
| List<String> paths) { |
| Predicate<String> roFilePredicate = |
| path -> path.contains(metaClient.getTableConfig().getBaseFileFormat().getFileExtension()); |
| Predicate<String> rtFilePredicate = |
| path -> path.contains(metaClient.getTableConfig().getLogFileFormat().getFileExtension()); |
| Stream<Pair<String, String>> stream1 = paths.stream().filter(roFilePredicate).map(fullPath -> { |
| String fileName = Paths.get(fullPath).getFileName().toString(); |
| return Pair.of(FSUtils.getFileId(fileName), FSUtils.getCommitTime(fileName)); |
| }); |
| Stream<Pair<String, String>> stream2 = paths.stream().filter(rtFilePredicate).map(path -> { |
| return Pair.of(FSUtils.getFileIdFromLogPath(new Path(path)), |
| FSUtils.getBaseCommitTimeFromLogPath(new Path(path))); |
| }); |
| return Stream.concat(stream1, stream2); |
| } |
| |
| protected static HoodieCommitMetadata generateCommitMetadata( |
| String instantTime, Map<String, List<String>> partitionToFilePaths) { |
| HoodieCommitMetadata metadata = new HoodieCommitMetadata(); |
| metadata.addMetadata(HoodieCommitMetadata.SCHEMA_KEY, HoodieTestTable.PHONY_TABLE_SCHEMA); |
| partitionToFilePaths.forEach((partitionPath, fileList) -> fileList.forEach(f -> { |
| HoodieWriteStat writeStat = new HoodieWriteStat(); |
| writeStat.setPartitionPath(partitionPath); |
| writeStat.setPath(partitionPath + "/" + getBaseFilename(instantTime, f)); |
| writeStat.setFileId(f); |
| writeStat.setTotalWriteBytes(1); |
| writeStat.setFileSizeInBytes(1); |
| metadata.addWriteStat(partitionPath, writeStat); |
| })); |
| return metadata; |
| } |
| } |