blob: 41fb16c008632c6a3425fea8964b7f9e1371a902 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hudi.client;
import org.apache.hudi.client.CompactionAdminClient.ValidationOpResult;
import org.apache.hudi.common.model.CompactionOperation;
import org.apache.hudi.common.model.FileSlice;
import org.apache.hudi.common.model.HoodieFileGroup;
import org.apache.hudi.common.model.HoodieLogFile;
import org.apache.hudi.common.model.HoodieTestUtils;
import org.apache.hudi.common.table.HoodieTableMetaClient;
import org.apache.hudi.common.table.view.HoodieTableFileSystemView;
import org.apache.hudi.common.util.CompactionTestUtils;
import org.apache.hudi.common.util.CompactionUtils;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.collection.Pair;
import org.apache.hudi.exception.HoodieException;
import org.apache.hudi.exception.HoodieIOException;
import org.apache.hudi.table.compact.OperationResult;
import org.apache.log4j.LogManager;
import org.apache.log4j.Logger;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import java.io.IOException;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import static org.apache.hudi.client.CompactionAdminClient.getRenamingActionsToAlignWithCompactionOperation;
import static org.apache.hudi.client.CompactionAdminClient.renameLogFile;
import static org.apache.hudi.common.model.HoodieTableType.MERGE_ON_READ;
public class TestCompactionAdminClient extends TestHoodieClientBase {
private static final Logger LOG = LogManager.getLogger(TestCompactionAdminClient.class);
private HoodieTableMetaClient metaClient;
private CompactionAdminClient client;
@Before
public void setUp() throws Exception {
initPath();
initSparkContexts();
metaClient = HoodieTestUtils.init(HoodieTestUtils.getDefaultHadoopConf(), basePath, MERGE_ON_READ);
client = new CompactionAdminClient(jsc, basePath);
}
@Test
public void testUnscheduleCompactionPlan() throws Exception {
int numEntriesPerInstant = 10;
CompactionTestUtils.setupAndValidateCompactionOperations(metaClient, false, numEntriesPerInstant,
numEntriesPerInstant, numEntriesPerInstant, numEntriesPerInstant);
// THere are delta-commits after compaction instant
validateUnSchedulePlan(client, "000", "001", numEntriesPerInstant, 2 * numEntriesPerInstant);
// THere are delta-commits after compaction instant
validateUnSchedulePlan(client, "002", "003", numEntriesPerInstant, 2 * numEntriesPerInstant);
// THere are no delta-commits after compaction instant
validateUnSchedulePlan(client, "004", "005", numEntriesPerInstant, 0);
// THere are no delta-commits after compaction instant
validateUnSchedulePlan(client, "006", "007", numEntriesPerInstant, 0);
}
@Test
public void testUnscheduleCompactionFileId() throws Exception {
int numEntriesPerInstant = 10;
CompactionTestUtils.setupAndValidateCompactionOperations(metaClient, false, numEntriesPerInstant,
numEntriesPerInstant, numEntriesPerInstant, numEntriesPerInstant);
Map<String, CompactionOperation> instantsWithOp =
Stream.of("001", "003", "005", "007").map(instant -> {
try {
return Pair.of(instant, CompactionUtils.getCompactionPlan(metaClient, instant));
} catch (IOException ioe) {
throw new HoodieException(ioe);
}
}).map(instantWithPlan -> instantWithPlan.getRight().getOperations().stream()
.map(op -> Pair.of(instantWithPlan.getLeft(), CompactionOperation.convertFromAvroRecordInstance(op)))
.findFirst().get()).collect(Collectors.toMap(Pair::getLeft, Pair::getRight));
// THere are delta-commits after compaction instant
validateUnScheduleFileId(client, "000", "001", instantsWithOp.get("001"), 2);
// THere are delta-commits after compaction instant
validateUnScheduleFileId(client, "002", "003", instantsWithOp.get("003"), 2);
// THere are no delta-commits after compaction instant
validateUnScheduleFileId(client, "004", "005", instantsWithOp.get("005"), 0);
// THere are no delta-commits after compaction instant
validateUnScheduleFileId(client, "006", "007", instantsWithOp.get("007"), 0);
}
@Test
public void testRepairCompactionPlan() throws Exception {
int numEntriesPerInstant = 10;
CompactionTestUtils.setupAndValidateCompactionOperations(metaClient, false, numEntriesPerInstant,
numEntriesPerInstant, numEntriesPerInstant, numEntriesPerInstant);
// THere are delta-commits after compaction instant
validateRepair("000", "001", numEntriesPerInstant, 2 * numEntriesPerInstant);
// THere are delta-commits after compaction instant
validateRepair("002", "003", numEntriesPerInstant, 2 * numEntriesPerInstant);
// THere are no delta-commits after compaction instant
validateRepair("004", "005", numEntriesPerInstant, 0);
// THere are no delta-commits after compaction instant
validateRepair("006", "007", numEntriesPerInstant, 0);
}
private void validateRepair(String ingestionInstant, String compactionInstant, int numEntriesPerInstant,
int expNumRepairs) throws Exception {
List<Pair<HoodieLogFile, HoodieLogFile>> renameFiles =
validateUnSchedulePlan(client, ingestionInstant, compactionInstant, numEntriesPerInstant, expNumRepairs, true);
metaClient = new HoodieTableMetaClient(metaClient.getHadoopConf(), basePath, true);
List<ValidationOpResult> result = client.validateCompactionPlan(metaClient, compactionInstant, 1);
if (expNumRepairs > 0) {
Assert.assertTrue("Expect some failures in validation", result.stream().anyMatch(r -> !r.isSuccess()));
}
// Now repair
List<Pair<HoodieLogFile, HoodieLogFile>> undoFiles =
result.stream().flatMap(r -> getRenamingActionsToAlignWithCompactionOperation(metaClient,
compactionInstant, r.getOperation(), Option.empty()).stream()).map(rn -> {
try {
renameLogFile(metaClient, rn.getKey(), rn.getValue());
} catch (IOException e) {
throw new HoodieIOException(e.getMessage(), e);
}
return rn;
}).collect(Collectors.toList());
Map<String, String> renameFilesFromUndo = undoFiles.stream()
.collect(Collectors.toMap(p -> p.getRight().getPath().toString(), x -> x.getLeft().getPath().toString()));
Map<String, String> expRenameFiles = renameFiles.stream()
.collect(Collectors.toMap(p -> p.getLeft().getPath().toString(), x -> x.getRight().getPath().toString()));
if (expNumRepairs > 0) {
Assert.assertFalse("Rename Files must be non-empty", renameFiles.isEmpty());
} else {
Assert.assertTrue("Rename Files must be empty", renameFiles.isEmpty());
}
expRenameFiles.forEach((key, value) -> LOG.info("Key :" + key + " renamed to " + value + " rolled back to "
+ renameFilesFromUndo.get(key)));
Assert.assertEquals("Undo must completely rollback renames", expRenameFiles, renameFilesFromUndo);
// Now expect validation to succeed
result = client.validateCompactionPlan(metaClient, compactionInstant, 1);
Assert.assertTrue("Expect no failures in validation", result.stream().allMatch(OperationResult::isSuccess));
Assert.assertEquals("Expected Num Repairs", expNumRepairs, undoFiles.size());
}
/**
* Enssure compaction plan is valid.
*
* @param compactionInstant Compaction Instant
*/
private void ensureValidCompactionPlan(String compactionInstant) throws Exception {
metaClient = new HoodieTableMetaClient(metaClient.getHadoopConf(), basePath, true);
// Ensure compaction-plan is good to begin with
List<ValidationOpResult> validationResults = client.validateCompactionPlan(metaClient, compactionInstant, 1);
Assert.assertFalse("Some validations failed",
validationResults.stream().anyMatch(v -> !v.isSuccess()));
}
private void validateRenameFiles(List<Pair<HoodieLogFile, HoodieLogFile>> renameFiles, String ingestionInstant,
String compactionInstant, HoodieTableFileSystemView fsView) {
// Ensure new names of log-files are on expected lines
Set<HoodieLogFile> uniqNewLogFiles = new HashSet<>();
Set<HoodieLogFile> uniqOldLogFiles = new HashSet<>();
renameFiles.forEach(lfPair -> {
Assert.assertFalse("Old Log File Names do not collide", uniqOldLogFiles.contains(lfPair.getKey()));
Assert.assertFalse("New Log File Names do not collide", uniqNewLogFiles.contains(lfPair.getValue()));
uniqOldLogFiles.add(lfPair.getKey());
uniqNewLogFiles.add(lfPair.getValue());
});
renameFiles.forEach(lfPair -> {
HoodieLogFile oldLogFile = lfPair.getLeft();
HoodieLogFile newLogFile = lfPair.getValue();
Assert.assertEquals("Base Commit time is expected", ingestionInstant, newLogFile.getBaseCommitTime());
Assert.assertEquals("Base Commit time is expected", compactionInstant, oldLogFile.getBaseCommitTime());
Assert.assertEquals("File Id is expected", oldLogFile.getFileId(), newLogFile.getFileId());
HoodieLogFile lastLogFileBeforeCompaction =
fsView.getLatestMergedFileSlicesBeforeOrOn(HoodieTestUtils.DEFAULT_PARTITION_PATHS[0], ingestionInstant)
.filter(fs -> fs.getFileId().equals(oldLogFile.getFileId())).map(fs -> fs.getLogFiles().findFirst().get())
.findFirst().get();
Assert.assertEquals("Log Version expected",
lastLogFileBeforeCompaction.getLogVersion() + oldLogFile.getLogVersion(), newLogFile.getLogVersion());
Assert.assertTrue("Log version does not collide",
newLogFile.getLogVersion() > lastLogFileBeforeCompaction.getLogVersion());
});
}
/**
* Validate Unschedule operations.
*/
private List<Pair<HoodieLogFile, HoodieLogFile>> validateUnSchedulePlan(CompactionAdminClient client,
String ingestionInstant, String compactionInstant, int numEntriesPerInstant, int expNumRenames) throws Exception {
return validateUnSchedulePlan(client, ingestionInstant, compactionInstant, numEntriesPerInstant, expNumRenames,
false);
}
/**
* Validate Unschedule operations.
*/
private List<Pair<HoodieLogFile, HoodieLogFile>> validateUnSchedulePlan(CompactionAdminClient client,
String ingestionInstant, String compactionInstant, int numEntriesPerInstant, int expNumRenames,
boolean skipUnSchedule) throws Exception {
ensureValidCompactionPlan(compactionInstant);
// Check suggested rename operations
List<Pair<HoodieLogFile, HoodieLogFile>> renameFiles =
client.getRenamingActionsForUnschedulingCompactionPlan(metaClient, compactionInstant, 1, Option.empty(), false);
metaClient = new HoodieTableMetaClient(metaClient.getHadoopConf(), basePath, true);
// Log files belonging to file-slices created because of compaction request must be renamed
Set<HoodieLogFile> gotLogFilesToBeRenamed = renameFiles.stream().map(Pair::getLeft).collect(Collectors.toSet());
final HoodieTableFileSystemView fsView =
new HoodieTableFileSystemView(metaClient, metaClient.getCommitsAndCompactionTimeline());
Set<HoodieLogFile> expLogFilesToBeRenamed = fsView.getLatestFileSlices(HoodieTestUtils.DEFAULT_PARTITION_PATHS[0])
.filter(fs -> fs.getBaseInstantTime().equals(compactionInstant)).flatMap(FileSlice::getLogFiles)
.collect(Collectors.toSet());
Assert.assertEquals("Log files belonging to file-slices created because of compaction request must be renamed",
expLogFilesToBeRenamed, gotLogFilesToBeRenamed);
if (skipUnSchedule) {
// Do the renaming only but do not touch the compaction plan - Needed for repair tests
renameFiles.forEach(lfPair -> {
try {
renameLogFile(metaClient, lfPair.getLeft(), lfPair.getRight());
} catch (IOException e) {
throw new HoodieIOException(e.getMessage(), e);
}
});
} else {
validateRenameFiles(renameFiles, ingestionInstant, compactionInstant, fsView);
}
Map<String, Long> fileIdToCountsBeforeRenaming =
fsView.getLatestMergedFileSlicesBeforeOrOn(HoodieTestUtils.DEFAULT_PARTITION_PATHS[0], compactionInstant)
.filter(fs -> fs.getBaseInstantTime().equals(ingestionInstant))
.map(fs -> Pair.of(fs.getFileId(), fs.getLogFiles().count()))
.collect(Collectors.toMap(Pair::getKey, Pair::getValue));
// Call the main unschedule API
client.unscheduleCompactionPlan(compactionInstant, false, 1, false);
metaClient = new HoodieTableMetaClient(metaClient.getHadoopConf(), basePath, true);
final HoodieTableFileSystemView newFsView =
new HoodieTableFileSystemView(metaClient, metaClient.getCommitsAndCompactionTimeline());
// Expect all file-slice whose base-commit is same as compaction commit to contain no new Log files
newFsView.getLatestFileSlicesBeforeOrOn(HoodieTestUtils.DEFAULT_PARTITION_PATHS[0], compactionInstant, true)
.filter(fs -> fs.getBaseInstantTime().equals(compactionInstant)).forEach(fs -> {
Assert.assertFalse("No Data file must be present", fs.getBaseFile().isPresent());
Assert.assertEquals("No Log Files", 0, fs.getLogFiles().count());
});
// Ensure same number of log-files before and after renaming per fileId
Map<String, Long> fileIdToCountsAfterRenaming =
newFsView.getAllFileGroups(HoodieTestUtils.DEFAULT_PARTITION_PATHS[0]).flatMap(HoodieFileGroup::getAllFileSlices)
.filter(fs -> fs.getBaseInstantTime().equals(ingestionInstant))
.map(fs -> Pair.of(fs.getFileId(), fs.getLogFiles().count()))
.collect(Collectors.toMap(Pair::getKey, Pair::getValue));
Assert.assertEquals("Each File Id has same number of log-files", fileIdToCountsBeforeRenaming,
fileIdToCountsAfterRenaming);
Assert.assertEquals("Not Empty", numEntriesPerInstant, fileIdToCountsAfterRenaming.size());
Assert.assertEquals("Expected number of renames", expNumRenames, renameFiles.size());
return renameFiles;
}
/**
* Validate Unschedule operations.
*/
private void validateUnScheduleFileId(CompactionAdminClient client, String ingestionInstant, String compactionInstant,
CompactionOperation op, int expNumRenames) throws Exception {
ensureValidCompactionPlan(compactionInstant);
// Check suggested rename operations
List<Pair<HoodieLogFile, HoodieLogFile>> renameFiles = client
.getRenamingActionsForUnschedulingCompactionOperation(metaClient, compactionInstant, op, Option.empty(), false);
metaClient = new HoodieTableMetaClient(metaClient.getHadoopConf(), basePath, true);
// Log files belonging to file-slices created because of compaction request must be renamed
Set<HoodieLogFile> gotLogFilesToBeRenamed = renameFiles.stream().map(Pair::getLeft).collect(Collectors.toSet());
final HoodieTableFileSystemView fsView =
new HoodieTableFileSystemView(metaClient, metaClient.getCommitsAndCompactionTimeline());
Set<HoodieLogFile> expLogFilesToBeRenamed = fsView.getLatestFileSlices(HoodieTestUtils.DEFAULT_PARTITION_PATHS[0])
.filter(fs -> fs.getBaseInstantTime().equals(compactionInstant))
.filter(fs -> fs.getFileId().equals(op.getFileId())).flatMap(FileSlice::getLogFiles)
.collect(Collectors.toSet());
Assert.assertEquals("Log files belonging to file-slices created because of compaction request must be renamed",
expLogFilesToBeRenamed, gotLogFilesToBeRenamed);
validateRenameFiles(renameFiles, ingestionInstant, compactionInstant, fsView);
Map<String, Long> fileIdToCountsBeforeRenaming =
fsView.getLatestMergedFileSlicesBeforeOrOn(HoodieTestUtils.DEFAULT_PARTITION_PATHS[0], compactionInstant)
.filter(fs -> fs.getBaseInstantTime().equals(ingestionInstant))
.filter(fs -> fs.getFileId().equals(op.getFileId()))
.map(fs -> Pair.of(fs.getFileId(), fs.getLogFiles().count()))
.collect(Collectors.toMap(Pair::getKey, Pair::getValue));
// Call the main unschedule API
client.unscheduleCompactionFileId(op.getFileGroupId(), false, false);
metaClient = new HoodieTableMetaClient(metaClient.getHadoopConf(), basePath, true);
final HoodieTableFileSystemView newFsView =
new HoodieTableFileSystemView(metaClient, metaClient.getCommitsAndCompactionTimeline());
// Expect all file-slice whose base-commit is same as compaction commit to contain no new Log files
newFsView.getLatestFileSlicesBeforeOrOn(HoodieTestUtils.DEFAULT_PARTITION_PATHS[0], compactionInstant, true)
.filter(fs -> fs.getBaseInstantTime().equals(compactionInstant))
.filter(fs -> fs.getFileId().equals(op.getFileId())).forEach(fs -> {
Assert.assertFalse("No Data file must be present", fs.getBaseFile().isPresent());
Assert.assertEquals("No Log Files", 0, fs.getLogFiles().count());
});
// Ensure same number of log-files before and after renaming per fileId
Map<String, Long> fileIdToCountsAfterRenaming =
newFsView.getAllFileGroups(HoodieTestUtils.DEFAULT_PARTITION_PATHS[0]).flatMap(HoodieFileGroup::getAllFileSlices)
.filter(fs -> fs.getBaseInstantTime().equals(ingestionInstant))
.filter(fs -> fs.getFileId().equals(op.getFileId()))
.map(fs -> Pair.of(fs.getFileId(), fs.getLogFiles().count()))
.collect(Collectors.toMap(Pair::getKey, Pair::getValue));
Assert.assertEquals("Each File Id has same number of log-files", fileIdToCountsBeforeRenaming,
fileIdToCountsAfterRenaming);
Assert.assertEquals("Not Empty", 1, fileIdToCountsAfterRenaming.size());
Assert.assertEquals("Expected number of renames", expNumRenames, renameFiles.size());
}
}