blob: 88f755a44d0a2011d10e9e14d7afaa5eeef3ef8b [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hudi.io;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.Path;
import org.apache.hudi.common.fs.FSUtils;
import org.apache.hudi.common.fs.HoodieWrapperFileSystem;
import org.apache.hudi.common.model.HoodieCommitMetadata;
import org.apache.hudi.common.model.HoodieReplaceCommitMetadata;
import org.apache.hudi.common.model.WriteOperationType;
import org.apache.hudi.common.table.HoodieTableMetaClient;
import org.apache.hudi.common.table.timeline.HoodieActiveTimeline;
import org.apache.hudi.common.table.timeline.HoodieArchivedTimeline;
import org.apache.hudi.common.table.timeline.HoodieInstant;
import org.apache.hudi.common.table.timeline.HoodieInstant.State;
import org.apache.hudi.common.table.timeline.HoodieTimeline;
import org.apache.hudi.common.testutils.HoodieTestDataGenerator;
import org.apache.hudi.common.testutils.HoodieTestTable;
import org.apache.hudi.common.testutils.HoodieTestUtils;
import org.apache.hudi.config.HoodieCompactionConfig;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.table.HoodieSparkTable;
import org.apache.hudi.table.HoodieTable;
import org.apache.hudi.table.HoodieTimelineArchiveLog;
import org.apache.hudi.testutils.HoodieClientTestHarness;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import java.io.IOException;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
import java.util.stream.Collectors;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertTrue;
public class TestHoodieTimelineArchiveLog extends HoodieClientTestHarness {
private Configuration hadoopConf;
private HoodieWrapperFileSystem wrapperFs;
@BeforeEach
public void init() throws Exception {
initPath();
initSparkContexts();
initMetaClient();
hadoopConf = context.getHadoopConf().get();
metaClient.getFs().mkdirs(new Path(basePath));
metaClient = HoodieTestUtils.init(hadoopConf, basePath);
wrapperFs = metaClient.getFs();
hadoopConf.addResource(wrapperFs.getConf());
}
@AfterEach
public void clean() throws IOException {
cleanupResources();
}
@Test
public void testArchiveEmptyTable() throws IOException {
HoodieWriteConfig cfg =
HoodieWriteConfig.newBuilder().withPath(basePath).withSchema(HoodieTestDataGenerator.TRIP_EXAMPLE_SCHEMA)
.withParallelism(2, 2).forTable("test-trip-table").build();
metaClient = HoodieTableMetaClient.reload(metaClient);
HoodieTable table = HoodieSparkTable.create(cfg, context, metaClient);
HoodieTimelineArchiveLog archiveLog = new HoodieTimelineArchiveLog(cfg, table);
boolean result = archiveLog.archiveIfRequired(context);
assertTrue(result);
}
@Test
public void testArchiveTableWithArchival() throws IOException {
HoodieWriteConfig cfg = HoodieWriteConfig.newBuilder().withPath(basePath)
.withSchema(HoodieTestDataGenerator.TRIP_EXAMPLE_SCHEMA).withParallelism(2, 2)
.withCompactionConfig(HoodieCompactionConfig.newBuilder().retainCommits(1).archiveCommitsWith(2, 4).build())
.forTable("test-trip-table").build();
HoodieTestUtils.init(hadoopConf, basePath);
// Requested Compaction
HoodieTestDataGenerator.createCompactionAuxiliaryMetadata(basePath,
new HoodieInstant(State.REQUESTED, HoodieTimeline.COMPACTION_ACTION, "100"), wrapperFs.getConf());
// Inflight Compaction
HoodieTestDataGenerator.createCompactionAuxiliaryMetadata(basePath,
new HoodieInstant(State.INFLIGHT, HoodieTimeline.COMPACTION_ACTION, "100"), wrapperFs.getConf());
HoodieTestDataGenerator.createCommitFile(basePath, "100", wrapperFs.getConf());
// Requested Compaction
HoodieTestDataGenerator.createCompactionAuxiliaryMetadata(basePath,
new HoodieInstant(State.REQUESTED, HoodieTimeline.COMPACTION_ACTION, "101"), wrapperFs.getConf());
// Inflight Compaction
HoodieTestDataGenerator.createCompactionAuxiliaryMetadata(basePath,
new HoodieInstant(State.INFLIGHT, HoodieTimeline.COMPACTION_ACTION, "101"), wrapperFs.getConf());
HoodieTestDataGenerator.createCommitFile(basePath, "101", wrapperFs.getConf());
// Requested Compaction
HoodieTestDataGenerator.createCompactionAuxiliaryMetadata(basePath,
new HoodieInstant(State.REQUESTED, HoodieTimeline.COMPACTION_ACTION, "102"), wrapperFs.getConf());
// Inflight Compaction
HoodieTestDataGenerator.createCompactionAuxiliaryMetadata(basePath,
new HoodieInstant(State.INFLIGHT, HoodieTimeline.COMPACTION_ACTION, "102"), wrapperFs.getConf());
HoodieTestDataGenerator.createCommitFile(basePath, "102", wrapperFs.getConf());
// Requested Compaction
HoodieTestDataGenerator.createCompactionAuxiliaryMetadata(basePath,
new HoodieInstant(State.REQUESTED, HoodieTimeline.COMPACTION_ACTION, "103"), wrapperFs.getConf());
// Inflight Compaction
HoodieTestDataGenerator.createCompactionAuxiliaryMetadata(basePath,
new HoodieInstant(State.INFLIGHT, HoodieTimeline.COMPACTION_ACTION, "103"), wrapperFs.getConf());
HoodieTestDataGenerator.createCommitFile(basePath, "103", wrapperFs.getConf());
// Requested Compaction
HoodieTestDataGenerator.createCompactionAuxiliaryMetadata(basePath,
new HoodieInstant(State.REQUESTED, HoodieTimeline.COMPACTION_ACTION, "104"), wrapperFs.getConf());
// Inflight Compaction
HoodieTestDataGenerator.createCompactionAuxiliaryMetadata(basePath,
new HoodieInstant(State.INFLIGHT, HoodieTimeline.COMPACTION_ACTION, "104"), wrapperFs.getConf());
HoodieTestDataGenerator.createCommitFile(basePath, "104", wrapperFs.getConf());
// Requested Compaction
HoodieTestDataGenerator.createCompactionAuxiliaryMetadata(basePath,
new HoodieInstant(State.REQUESTED, HoodieTimeline.COMPACTION_ACTION, "105"), wrapperFs.getConf());
// Inflight Compaction
HoodieTestDataGenerator.createCompactionAuxiliaryMetadata(basePath,
new HoodieInstant(State.INFLIGHT, HoodieTimeline.COMPACTION_ACTION, "105"), wrapperFs.getConf());
HoodieTestDataGenerator.createCommitFile(basePath, "105", wrapperFs.getConf());
metaClient = HoodieTableMetaClient.reload(metaClient);
HoodieTimeline timeline = metaClient.getActiveTimeline().getCommitsTimeline().filterCompletedInstants();
assertEquals(6, timeline.countInstants(), "Loaded 6 commits and the count should match");
HoodieTestUtils.createCleanFiles(metaClient, basePath, "100", wrapperFs.getConf());
HoodieTestUtils.createCleanFiles(metaClient, basePath, "101", wrapperFs.getConf());
HoodieTestUtils.createCleanFiles(metaClient, basePath, "102", wrapperFs.getConf());
HoodieTestUtils.createCleanFiles(metaClient, basePath, "103", wrapperFs.getConf());
HoodieTestUtils.createCleanFiles(metaClient, basePath, "104", wrapperFs.getConf());
HoodieTestUtils.createCleanFiles(metaClient, basePath, "105", wrapperFs.getConf());
HoodieTestUtils.createPendingCleanFiles(metaClient, "106", "107");
// reload the timeline and get all the commmits before archive
timeline = metaClient.getActiveTimeline().reload().getAllCommitsTimeline().filterCompletedInstants();
List<HoodieInstant> originalCommits = timeline.getInstants().collect(Collectors.toList());
assertEquals(12, timeline.countInstants(), "Loaded 6 commits and the count should match");
// verify in-flight instants before archive
verifyInflightInstants(metaClient, 2);
metaClient = HoodieTableMetaClient.reload(metaClient);
HoodieTable table = HoodieSparkTable.create(cfg, context, metaClient);
HoodieTimelineArchiveLog archiveLog = new HoodieTimelineArchiveLog(cfg, table);
assertTrue(archiveLog.archiveIfRequired(context));
// reload the timeline and remove the remaining commits
timeline = metaClient.getActiveTimeline().reload().getAllCommitsTimeline().filterCompletedInstants();
originalCommits.removeAll(timeline.getInstants().collect(Collectors.toList()));
// Check compaction instants
List<HoodieInstant> instants = metaClient.scanHoodieInstantsFromFileSystem(
new Path(metaClient.getMetaAuxiliaryPath()), HoodieActiveTimeline.VALID_EXTENSIONS_IN_ACTIVE_TIMELINE, false);
assertEquals(4, instants.size(), "Should delete all compaction instants < 104");
assertFalse(instants.contains(new HoodieInstant(State.REQUESTED, HoodieTimeline.COMPACTION_ACTION, "100")),
"Requested Compaction must be absent for 100");
assertFalse(instants.contains(new HoodieInstant(State.INFLIGHT, HoodieTimeline.COMPACTION_ACTION, "100")),
"Inflight Compaction must be absent for 100");
assertFalse(instants.contains(new HoodieInstant(State.REQUESTED, HoodieTimeline.COMPACTION_ACTION, "101")),
"Requested Compaction must be absent for 101");
assertFalse(instants.contains(new HoodieInstant(State.INFLIGHT, HoodieTimeline.COMPACTION_ACTION, "101")),
"Inflight Compaction must be absent for 101");
assertFalse(instants.contains(new HoodieInstant(State.REQUESTED, HoodieTimeline.COMPACTION_ACTION, "102")),
"Requested Compaction must be absent for 102");
assertFalse(instants.contains(new HoodieInstant(State.INFLIGHT, HoodieTimeline.COMPACTION_ACTION, "102")),
"Inflight Compaction must be absent for 102");
assertFalse(instants.contains(new HoodieInstant(State.REQUESTED, HoodieTimeline.COMPACTION_ACTION, "103")),
"Requested Compaction must be absent for 103");
assertFalse(instants.contains(new HoodieInstant(State.INFLIGHT, HoodieTimeline.COMPACTION_ACTION, "103")),
"Inflight Compaction must be absent for 103");
assertTrue(instants.contains(new HoodieInstant(State.REQUESTED, HoodieTimeline.COMPACTION_ACTION, "104")),
"Requested Compaction must be present for 104");
assertTrue(instants.contains(new HoodieInstant(State.INFLIGHT, HoodieTimeline.COMPACTION_ACTION, "104")),
"Inflight Compaction must be present for 104");
assertTrue(instants.contains(new HoodieInstant(State.REQUESTED, HoodieTimeline.COMPACTION_ACTION, "105")),
"Requested Compaction must be present for 105");
assertTrue(instants.contains(new HoodieInstant(State.INFLIGHT, HoodieTimeline.COMPACTION_ACTION, "105")),
"Inflight Compaction must be present for 105");
// read the file
HoodieArchivedTimeline archivedTimeline = new HoodieArchivedTimeline(metaClient);
assertEquals(24, archivedTimeline.countInstants(),
"Total archived records and total read records are the same count");
//make sure the archived commits are the same as the (originalcommits - commitsleft)
Set<String> readCommits =
archivedTimeline.getInstants().map(HoodieInstant::getTimestamp).collect(Collectors.toSet());
assertEquals(originalCommits.stream().map(HoodieInstant::getTimestamp).collect(Collectors.toSet()), readCommits,
"Read commits map should match the originalCommits - commitsLoadedFromArchival");
// verify in-flight instants after archive
verifyInflightInstants(metaClient, 2);
}
@Test
public void testArchiveTableWithReplacedFiles() throws Exception {
HoodieTestUtils.init(hadoopConf, basePath);
HoodieWriteConfig cfg = HoodieWriteConfig.newBuilder().withPath(basePath)
.withSchema(HoodieTestDataGenerator.TRIP_EXAMPLE_SCHEMA).withParallelism(2, 2).forTable("test-trip-table")
.withCompactionConfig(HoodieCompactionConfig.newBuilder().retainCommits(1).archiveCommitsWith(2, 3).build())
.build();
int numCommits = 4;
int commitInstant = 100;
for (int i = 0; i < numCommits; i++) {
createReplaceMetadata(commitInstant);
commitInstant += 100;
}
metaClient = HoodieTableMetaClient.reload(metaClient);
HoodieTimeline timeline = metaClient.getActiveTimeline().getCommitsTimeline().filterCompletedInstants();
HoodieTable table = HoodieSparkTable.create(cfg, context, metaClient);
assertEquals(4, timeline.countInstants(), "Loaded 4 commits and the count should match");
HoodieTimelineArchiveLog archiveLog = new HoodieTimelineArchiveLog(cfg, table);
boolean result = archiveLog.archiveIfRequired(context);
assertTrue(result);
FileStatus[] allFiles = metaClient.getFs().listStatus(new Path(basePath + "/" + HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH));
Set<String> allFileIds = Arrays.stream(allFiles).map(fs -> FSUtils.getFileIdFromFilePath(fs.getPath())).collect(Collectors.toSet());
// verify 100-1,200-1 are deleted by archival
assertFalse(allFileIds.contains("file-100-1"));
assertFalse(allFileIds.contains("file-200-1"));
assertTrue(allFileIds.contains("file-100-2"));
assertTrue(allFileIds.contains("file-200-2"));
assertTrue(allFileIds.contains("file-300-1"));
assertTrue(allFileIds.contains("file-400-1"));
}
@Test
public void testArchiveTableWithNoArchival() throws IOException {
HoodieWriteConfig cfg = HoodieWriteConfig.newBuilder().withPath(basePath)
.withSchema(HoodieTestDataGenerator.TRIP_EXAMPLE_SCHEMA).withParallelism(2, 2).forTable("test-trip-table")
.withCompactionConfig(HoodieCompactionConfig.newBuilder().retainCommits(1).archiveCommitsWith(2, 5).build())
.build();
metaClient = HoodieTableMetaClient.reload(metaClient);
HoodieTable table = HoodieSparkTable.create(cfg, context, metaClient);
HoodieTimelineArchiveLog archiveLog = new HoodieTimelineArchiveLog(cfg, table);
// Requested Compaction
HoodieTestDataGenerator.createCompactionAuxiliaryMetadata(basePath,
new HoodieInstant(State.REQUESTED, HoodieTimeline.COMPACTION_ACTION, "100"), wrapperFs.getConf());
// Inflight Compaction
HoodieTestDataGenerator.createCompactionAuxiliaryMetadata(basePath,
new HoodieInstant(State.INFLIGHT, HoodieTimeline.COMPACTION_ACTION, "100"), wrapperFs.getConf());
HoodieTestDataGenerator.createCommitFile(basePath, "100", wrapperFs.getConf());
// Requested Compaction
HoodieTestDataGenerator.createCompactionAuxiliaryMetadata(basePath,
new HoodieInstant(State.REQUESTED, HoodieTimeline.COMPACTION_ACTION, "101"), wrapperFs.getConf());
// Inflight Compaction
HoodieTestDataGenerator.createCompactionAuxiliaryMetadata(basePath,
new HoodieInstant(State.INFLIGHT, HoodieTimeline.COMPACTION_ACTION, "101"), wrapperFs.getConf());
HoodieTestDataGenerator.createCommitFile(basePath, "101", wrapperFs.getConf());
// Requested Compaction
HoodieTestDataGenerator.createCompactionAuxiliaryMetadata(basePath,
new HoodieInstant(State.REQUESTED, HoodieTimeline.COMPACTION_ACTION, "102"), wrapperFs.getConf());
// Inflight Compaction
HoodieTestDataGenerator.createCompactionAuxiliaryMetadata(basePath,
new HoodieInstant(State.INFLIGHT, HoodieTimeline.COMPACTION_ACTION, "102"), wrapperFs.getConf());
HoodieTestDataGenerator.createCommitFile(basePath, "102", wrapperFs.getConf());
// Requested Compaction
HoodieTestDataGenerator.createCompactionAuxiliaryMetadata(basePath,
new HoodieInstant(State.REQUESTED, HoodieTimeline.COMPACTION_ACTION, "103"), wrapperFs.getConf());
// Inflight Compaction
HoodieTestDataGenerator.createCompactionAuxiliaryMetadata(basePath,
new HoodieInstant(State.INFLIGHT, HoodieTimeline.COMPACTION_ACTION, "103"), wrapperFs.getConf());
HoodieTestDataGenerator.createCommitFile(basePath, "103", wrapperFs.getConf());
HoodieTimeline timeline = metaClient.getActiveTimeline().getCommitsTimeline().filterCompletedInstants();
assertEquals(4, timeline.countInstants(), "Loaded 4 commits and the count should match");
boolean result = archiveLog.archiveIfRequired(context);
assertTrue(result);
timeline = metaClient.getActiveTimeline().reload().getCommitsTimeline().filterCompletedInstants();
assertEquals(4, timeline.countInstants(), "Should not archive commits when maxCommitsToKeep is 5");
List<HoodieInstant> instants = metaClient.scanHoodieInstantsFromFileSystem(
new Path(metaClient.getMetaAuxiliaryPath()), HoodieActiveTimeline.VALID_EXTENSIONS_IN_ACTIVE_TIMELINE, false);
assertEquals(8, instants.size(), "Should not delete any aux compaction files when maxCommitsToKeep is 5");
assertTrue(instants.contains(new HoodieInstant(State.REQUESTED, HoodieTimeline.COMPACTION_ACTION, "100")),
"Requested Compaction must be present for 100");
assertTrue(instants.contains(new HoodieInstant(State.INFLIGHT, HoodieTimeline.COMPACTION_ACTION, "100")),
"Inflight Compaction must be present for 100");
assertTrue(instants.contains(new HoodieInstant(State.REQUESTED, HoodieTimeline.COMPACTION_ACTION, "101")),
"Requested Compaction must be present for 101");
assertTrue(instants.contains(new HoodieInstant(State.INFLIGHT, HoodieTimeline.COMPACTION_ACTION, "101")),
"Inflight Compaction must be present for 101");
assertTrue(instants.contains(new HoodieInstant(State.REQUESTED, HoodieTimeline.COMPACTION_ACTION, "102")),
"Requested Compaction must be present for 102");
assertTrue(instants.contains(new HoodieInstant(State.INFLIGHT, HoodieTimeline.COMPACTION_ACTION, "102")),
"Inflight Compaction must be present for 102");
assertTrue(instants.contains(new HoodieInstant(State.REQUESTED, HoodieTimeline.COMPACTION_ACTION, "103")),
"Requested Compaction must be present for 103");
assertTrue(instants.contains(new HoodieInstant(State.INFLIGHT, HoodieTimeline.COMPACTION_ACTION, "103")),
"Inflight Compaction must be present for 103");
}
@Test
public void testArchiveCommitSafety() throws IOException {
HoodieWriteConfig cfg = HoodieWriteConfig.newBuilder().withPath(basePath)
.withSchema(HoodieTestDataGenerator.TRIP_EXAMPLE_SCHEMA).withParallelism(2, 2).forTable("test-trip-table")
.withCompactionConfig(HoodieCompactionConfig.newBuilder().retainCommits(1).archiveCommitsWith(2, 5).build())
.build();
metaClient = HoodieTableMetaClient.reload(metaClient);
HoodieTable table = HoodieSparkTable.create(cfg, context, metaClient);
HoodieTimelineArchiveLog archiveLog = new HoodieTimelineArchiveLog(cfg, table);
HoodieTestDataGenerator.createCommitFile(basePath, "100", wrapperFs.getConf());
HoodieTestDataGenerator.createCommitFile(basePath, "101", wrapperFs.getConf());
HoodieTestDataGenerator.createCommitFile(basePath, "102", wrapperFs.getConf());
HoodieTestDataGenerator.createCommitFile(basePath, "103", wrapperFs.getConf());
HoodieTestDataGenerator.createCommitFile(basePath, "104", wrapperFs.getConf());
HoodieTestDataGenerator.createCommitFile(basePath, "105", wrapperFs.getConf());
HoodieTimeline timeline = metaClient.getActiveTimeline().getCommitsTimeline().filterCompletedInstants();
assertEquals(6, timeline.countInstants(), "Loaded 6 commits and the count should match");
boolean result = archiveLog.archiveIfRequired(context);
assertTrue(result);
timeline = metaClient.getActiveTimeline().reload().getCommitsTimeline().filterCompletedInstants();
assertTrue(timeline.containsOrBeforeTimelineStarts("100"), "Archived commits should always be safe");
assertTrue(timeline.containsOrBeforeTimelineStarts("101"), "Archived commits should always be safe");
assertTrue(timeline.containsOrBeforeTimelineStarts("102"), "Archived commits should always be safe");
assertTrue(timeline.containsOrBeforeTimelineStarts("103"), "Archived commits should always be safe");
}
@Test
public void testArchiveCommitSavepointNoHole() throws IOException {
HoodieWriteConfig cfg = HoodieWriteConfig.newBuilder().withPath(basePath)
.withSchema(HoodieTestDataGenerator.TRIP_EXAMPLE_SCHEMA).withParallelism(2, 2).forTable("test-trip-table")
.withCompactionConfig(HoodieCompactionConfig.newBuilder().retainCommits(1).archiveCommitsWith(2, 5).build())
.build();
HoodieTestDataGenerator.createCommitFile(basePath, "100", wrapperFs.getConf());
HoodieTestDataGenerator.createCommitFile(basePath, "101", wrapperFs.getConf());
HoodieTestDataGenerator.createSavepointFile(basePath, "101", wrapperFs.getConf());
HoodieTestDataGenerator.createCommitFile(basePath, "102", wrapperFs.getConf());
HoodieTestDataGenerator.createCommitFile(basePath, "103", wrapperFs.getConf());
HoodieTestDataGenerator.createCommitFile(basePath, "104", wrapperFs.getConf());
HoodieTestDataGenerator.createCommitFile(basePath, "105", wrapperFs.getConf());
HoodieTable table = HoodieSparkTable.create(cfg, context);
HoodieTimelineArchiveLog archiveLog = new HoodieTimelineArchiveLog(cfg, table);
HoodieTimeline timeline = metaClient.getActiveTimeline().getCommitsTimeline().filterCompletedInstants();
assertEquals(6, timeline.countInstants(), "Loaded 6 commits and the count should match");
assertTrue(archiveLog.archiveIfRequired(context));
timeline = metaClient.getActiveTimeline().reload().getCommitsTimeline().filterCompletedInstants();
assertEquals(5, timeline.countInstants(),
"Since we have a savepoint at 101, we should never archive any commit after 101 (we only archive 100)");
assertTrue(timeline.containsInstant(new HoodieInstant(false, HoodieTimeline.COMMIT_ACTION, "101")),
"Archived commits should always be safe");
assertTrue(timeline.containsInstant(new HoodieInstant(false, HoodieTimeline.COMMIT_ACTION, "102")),
"Archived commits should always be safe");
assertTrue(timeline.containsInstant(new HoodieInstant(false, HoodieTimeline.COMMIT_ACTION, "103")),
"Archived commits should always be safe");
}
@Test
public void testArchiveCommitCompactionNoHole() throws IOException {
HoodieWriteConfig cfg = HoodieWriteConfig.newBuilder().withPath(basePath)
.withSchema(HoodieTestDataGenerator.TRIP_EXAMPLE_SCHEMA).withParallelism(2, 2).forTable("test-trip-table")
.withCompactionConfig(HoodieCompactionConfig.newBuilder().retainCommits(1).archiveCommitsWith(2, 5).build())
.build();
HoodieTestDataGenerator.createCommitFile(basePath, "100", wrapperFs.getConf());
HoodieTestDataGenerator.createCompactionRequestedFile(basePath, "101", wrapperFs.getConf());
HoodieTestDataGenerator.createCompactionAuxiliaryMetadata(basePath,
new HoodieInstant(State.REQUESTED, HoodieTimeline.COMPACTION_ACTION, "101"), wrapperFs.getConf());
HoodieTestDataGenerator.createCommitFile(basePath, "102", wrapperFs.getConf());
HoodieTestDataGenerator.createCommitFile(basePath, "103", wrapperFs.getConf());
HoodieTestDataGenerator.createCompactionRequestedFile(basePath, "104", wrapperFs.getConf());
HoodieTestDataGenerator.createCompactionAuxiliaryMetadata(basePath,
new HoodieInstant(State.REQUESTED, HoodieTimeline.COMPACTION_ACTION, "104"), wrapperFs.getConf());
HoodieTestDataGenerator.createCommitFile(basePath, "105", wrapperFs.getConf());
HoodieTestDataGenerator.createCommitFile(basePath, "106", wrapperFs.getConf());
HoodieTestDataGenerator.createCommitFile(basePath, "107", wrapperFs.getConf());
HoodieTable table = HoodieSparkTable.create(cfg, context, metaClient);
HoodieTimelineArchiveLog archiveLog = new HoodieTimelineArchiveLog(cfg, table);
HoodieTimeline timeline = metaClient.getActiveTimeline().getCommitsAndCompactionTimeline();
assertEquals(8, timeline.countInstants(), "Loaded 6 commits and the count should match");
boolean result = archiveLog.archiveIfRequired(context);
assertTrue(result);
timeline = metaClient.getActiveTimeline().reload().getCommitsAndCompactionTimeline();
assertFalse(timeline.containsInstant(new HoodieInstant(false, HoodieTimeline.COMMIT_ACTION, "100")),
"Instants before oldest pending compaction can be removed");
assertEquals(7, timeline.countInstants(),
"Since we have a pending compaction at 101, we should never archive any commit "
+ "after 101 (we only archive 100)");
assertTrue(timeline.containsInstant(new HoodieInstant(State.REQUESTED, HoodieTimeline.COMPACTION_ACTION, "101")),
"Requested Compaction must still be present");
assertTrue(timeline.containsInstant(new HoodieInstant(false, HoodieTimeline.COMMIT_ACTION, "102")),
"Instants greater than oldest pending compaction must be present");
assertTrue(timeline.containsInstant(new HoodieInstant(false, HoodieTimeline.COMMIT_ACTION, "103")),
"Instants greater than oldest pending compaction must be present");
assertTrue(timeline.containsInstant(new HoodieInstant(State.REQUESTED, HoodieTimeline.COMPACTION_ACTION, "104")),
"Instants greater than oldest pending compaction must be present");
assertTrue(timeline.containsInstant(new HoodieInstant(false, HoodieTimeline.COMMIT_ACTION, "105")),
"Instants greater than oldest pending compaction must be present");
assertTrue(timeline.containsInstant(new HoodieInstant(false, HoodieTimeline.COMMIT_ACTION, "106")),
"Instants greater than oldest pending compaction must be present");
assertTrue(timeline.containsInstant(new HoodieInstant(false, HoodieTimeline.COMMIT_ACTION, "107")),
"Instants greater than oldest pending compaction must be present");
}
@Test
public void testArchiveCommitTimeline() throws IOException {
HoodieWriteConfig cfg =
HoodieWriteConfig.newBuilder().withPath(basePath).withSchema(HoodieTestDataGenerator.TRIP_EXAMPLE_SCHEMA)
.withParallelism(2, 2).forTable("test-trip-table")
.withCompactionConfig(HoodieCompactionConfig.newBuilder().retainCommits(1).archiveCommitsWith(2, 3).build())
.build();
metaClient = HoodieTableMetaClient.reload(metaClient);
HoodieTestDataGenerator.createCommitFile(basePath, "1", wrapperFs.getConf());
HoodieInstant instant1 = new HoodieInstant(false, HoodieTimeline.COMMIT_ACTION, "1");
HoodieTestDataGenerator.createCommitFile(basePath, "2", wrapperFs.getConf());
Path markerPath = new Path(metaClient.getMarkerFolderPath("2"));
wrapperFs.mkdirs(markerPath);
HoodieInstant instant2 = new HoodieInstant(false, HoodieTimeline.COMMIT_ACTION, "2");
HoodieTestDataGenerator.createCommitFile(basePath, "3", wrapperFs.getConf());
HoodieInstant instant3 = new HoodieInstant(false, HoodieTimeline.COMMIT_ACTION, "3");
//add 2 more instants to pass filter criteria set in compaction config above
HoodieTestDataGenerator.createCommitFile(basePath, "4", wrapperFs.getConf());
HoodieTestDataGenerator.createCommitFile(basePath, "5", wrapperFs.getConf());
HoodieTable table = HoodieSparkTable.create(cfg, context, metaClient);
HoodieTimelineArchiveLog archiveLog = new HoodieTimelineArchiveLog(cfg, table);
boolean result = archiveLog.archiveIfRequired(context);
assertTrue(result);
HoodieArchivedTimeline archivedTimeline = metaClient.getArchivedTimeline();
List<HoodieInstant> archivedInstants = Arrays.asList(instant1, instant2, instant3);
assertEquals(new HashSet<>(archivedInstants), archivedTimeline.getInstants().collect(Collectors.toSet()));
assertFalse(wrapperFs.exists(markerPath));
}
private void verifyInflightInstants(HoodieTableMetaClient metaClient, int expectedTotalInstants) {
HoodieTimeline timeline = metaClient.getActiveTimeline().reload()
.getTimelineOfActions(Collections.singleton(HoodieTimeline.CLEAN_ACTION)).filterInflights();
assertEquals(expectedTotalInstants, timeline.countInstants(),
"Loaded inflight clean actions and the count should match");
}
@Test
public void testConvertCommitMetadata() {
HoodieCommitMetadata hoodieCommitMetadata = new HoodieCommitMetadata();
hoodieCommitMetadata.setOperationType(WriteOperationType.INSERT);
HoodieWriteConfig cfg = HoodieWriteConfig.newBuilder().withPath(basePath)
.withSchema(HoodieTestDataGenerator.TRIP_EXAMPLE_SCHEMA).withParallelism(2, 2).forTable("test-commitMetadata-converter")
.withCompactionConfig(HoodieCompactionConfig.newBuilder().retainCommits(1).archiveCommitsWith(2, 5).build())
.build();
metaClient = HoodieTableMetaClient.reload(metaClient);
HoodieTable table = HoodieSparkTable.create(cfg, context, metaClient);
HoodieTimelineArchiveLog archiveLog = new HoodieTimelineArchiveLog(cfg, table);
org.apache.hudi.avro.model.HoodieCommitMetadata expectedCommitMetadata = archiveLog.convertCommitMetadata(hoodieCommitMetadata);
assertEquals(expectedCommitMetadata.getOperationType(), WriteOperationType.INSERT.toString());
}
private void createReplaceMetadata(int commitInstant) throws Exception {
String commitTime = "" + commitInstant;
String fileId1 = "file-" + commitInstant + "-1";
String fileId2 = "file-" + commitInstant + "-2";
// create replace instant to mark fileId1 as deleted
HoodieReplaceCommitMetadata replaceMetadata = new HoodieReplaceCommitMetadata();
replaceMetadata.addReplaceFileId(HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH, fileId1);
replaceMetadata.setOperationType(WriteOperationType.INSERT_OVERWRITE);
HoodieTestTable testTable = HoodieTestTable.of(metaClient);
testTable.addReplaceCommit(commitTime, replaceMetadata);
testTable.withBaseFilesInPartition(HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH, fileId1, fileId2);
}
}