blob: b8e02b905b8cbe1b748ac1a8ee24b6570ae637f2 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hudi.table.upgrade;
import org.apache.hudi.client.SparkRDDWriteClient;
import org.apache.hudi.client.WriteStatus;
import org.apache.hudi.common.model.FileSlice;
import org.apache.hudi.common.model.HoodieFileGroup;
import org.apache.hudi.common.model.HoodieLogFile;
import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.common.model.HoodieTableType;
import org.apache.hudi.common.table.HoodieTableConfig;
import org.apache.hudi.common.table.HoodieTableVersion;
import org.apache.hudi.common.table.timeline.HoodieInstant;
import org.apache.hudi.common.table.view.SyncableFileSystemView;
import org.apache.hudi.common.testutils.HoodieTestDataGenerator;
import org.apache.hudi.common.testutils.HoodieTestUtils;
import org.apache.hudi.common.util.collection.Pair;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.table.HoodieTable;
import org.apache.hudi.table.MarkerFiles;
import org.apache.hudi.testutils.Assertions;
import org.apache.hudi.testutils.HoodieClientTestBase;
import org.apache.hudi.testutils.HoodieClientTestUtils;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileUtil;
import org.apache.hadoop.fs.Path;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.Arguments;
import org.junit.jupiter.params.provider.MethodSource;
import java.io.IOException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import static org.apache.hudi.common.table.HoodieTableConfig.HOODIE_TABLE_TYPE_PROP_NAME;
import static org.apache.hudi.common.testutils.HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH;
import static org.apache.hudi.common.testutils.HoodieTestDataGenerator.DEFAULT_SECOND_PARTITION_PATH;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertTrue;
/**
* Unit tests {@link SparkUpgradeDowngrade}.
*/
public class TestUpgradeDowngrade extends HoodieClientTestBase {
private static final String TEST_NAME_WITH_PARAMS = "[{index}] Test with deletePartialMarkerFiles={0} and TableType = {1}";
public static Stream<Arguments> configParams() {
Object[][] data = new Object[][] {
{true, HoodieTableType.COPY_ON_WRITE}, {false, HoodieTableType.COPY_ON_WRITE},
{true, HoodieTableType.MERGE_ON_READ}, {false, HoodieTableType.MERGE_ON_READ}
};
return Stream.of(data).map(Arguments::of);
}
@BeforeEach
public void setUp() throws Exception {
initSparkContexts();
initDFS();
initTestDataGenerator();
initDFSMetaClient();
}
@Test
public void testLeftOverUpdatedPropFileCleanup() throws IOException {
testUpgradeInternal(true, true, HoodieTableType.MERGE_ON_READ);
}
@ParameterizedTest(name = TEST_NAME_WITH_PARAMS)
@MethodSource("configParams")
public void testUpgrade(boolean deletePartialMarkerFiles, HoodieTableType tableType) throws IOException {
testUpgradeInternal(false, deletePartialMarkerFiles, tableType);
}
public void testUpgradeInternal(boolean induceResiduesFromPrevUpgrade, boolean deletePartialMarkerFiles, HoodieTableType tableType) throws IOException {
// init config, table and client.
Map<String, String> params = new HashMap<>();
if (tableType == HoodieTableType.MERGE_ON_READ) {
params.put(HOODIE_TABLE_TYPE_PROP_NAME, HoodieTableType.MERGE_ON_READ.name());
metaClient = HoodieTestUtils.init(dfs.getConf(), dfsBasePath, HoodieTableType.MERGE_ON_READ);
}
HoodieWriteConfig cfg = getConfigBuilder().withAutoCommit(false).withRollbackUsingMarkers(false).withProps(params).build();
SparkRDDWriteClient client = getHoodieWriteClient(cfg);
// prepare data. Make 2 commits, in which 2nd is not committed.
List<FileSlice> firstPartitionCommit2FileSlices = new ArrayList<>();
List<FileSlice> secondPartitionCommit2FileSlices = new ArrayList<>();
Pair<List<HoodieRecord>, List<HoodieRecord>> inputRecords = twoUpsertCommitDataWithTwoPartitions(firstPartitionCommit2FileSlices, secondPartitionCommit2FileSlices, cfg, client, false);
HoodieTable table = this.getHoodieTable(metaClient, cfg);
HoodieInstant commitInstant = table.getPendingCommitTimeline().lastInstant().get();
// delete one of the marker files in 2nd commit if need be.
MarkerFiles markerFiles = new MarkerFiles(table, commitInstant.getTimestamp());
List<String> markerPaths = markerFiles.allMarkerFilePaths();
if (deletePartialMarkerFiles) {
String toDeleteMarkerFile = markerPaths.get(0);
table.getMetaClient().getFs().delete(new Path(table.getMetaClient().getTempFolderPath() + "/" + commitInstant.getTimestamp() + "/" + toDeleteMarkerFile));
markerPaths.remove(toDeleteMarkerFile);
}
// set hoodie.table.version to 0 in hoodie.properties file
metaClient.getTableConfig().setTableVersion(HoodieTableVersion.ZERO);
if (induceResiduesFromPrevUpgrade) {
createResidualFile();
}
// should re-create marker files for 2nd commit since its pending.
new SparkUpgradeDowngrade(metaClient, cfg, context).run(metaClient, HoodieTableVersion.ONE, cfg, context, null);
// assert marker files
assertMarkerFilesForUpgrade(table, commitInstant, firstPartitionCommit2FileSlices, secondPartitionCommit2FileSlices);
// verify hoodie.table.version got upgraded
assertEquals(metaClient.getTableConfig().getTableVersion().versionCode(), HoodieTableVersion.ONE.versionCode());
assertTableVersionFromPropertyFile(HoodieTableVersion.ONE);
// trigger 3rd commit with marker based rollback enabled.
List<HoodieRecord> thirdBatch = triggerCommit("003", tableType, true);
// Check the entire dataset has all records only from 1st commit and 3rd commit since 2nd is expected to be rolledback.
assertRows(inputRecords.getKey(), thirdBatch);
if (induceResiduesFromPrevUpgrade) {
assertFalse(dfs.exists(new Path(metaClient.getMetaPath(), SparkUpgradeDowngrade.HOODIE_UPDATED_PROPERTY_FILE)));
}
}
@ParameterizedTest(name = TEST_NAME_WITH_PARAMS)
@MethodSource("configParams")
public void testDowngrade(boolean deletePartialMarkerFiles, HoodieTableType tableType) throws IOException {
// init config, table and client.
Map<String, String> params = new HashMap<>();
if (tableType == HoodieTableType.MERGE_ON_READ) {
params.put(HOODIE_TABLE_TYPE_PROP_NAME, HoodieTableType.MERGE_ON_READ.name());
metaClient = HoodieTestUtils.init(hadoopConf, basePath, HoodieTableType.MERGE_ON_READ);
}
HoodieWriteConfig cfg = getConfigBuilder().withAutoCommit(false).withRollbackUsingMarkers(false).withProps(params).build();
SparkRDDWriteClient client = getHoodieWriteClient(cfg);
// prepare data. Make 2 commits, in which 2nd is not committed.
List<FileSlice> firstPartitionCommit2FileSlices = new ArrayList<>();
List<FileSlice> secondPartitionCommit2FileSlices = new ArrayList<>();
Pair<List<HoodieRecord>, List<HoodieRecord>> inputRecords = twoUpsertCommitDataWithTwoPartitions(firstPartitionCommit2FileSlices, secondPartitionCommit2FileSlices, cfg, client, false);
HoodieTable table = this.getHoodieTable(metaClient, cfg);
HoodieInstant commitInstant = table.getPendingCommitTimeline().lastInstant().get();
// delete one of the marker files in 2nd commit if need be.
MarkerFiles markerFiles = new MarkerFiles(table, commitInstant.getTimestamp());
List<String> markerPaths = markerFiles.allMarkerFilePaths();
if (deletePartialMarkerFiles) {
String toDeleteMarkerFile = markerPaths.get(0);
table.getMetaClient().getFs().delete(new Path(table.getMetaClient().getTempFolderPath() + "/" + commitInstant.getTimestamp() + "/" + toDeleteMarkerFile));
markerPaths.remove(toDeleteMarkerFile);
}
// set hoodie.table.version to 1 in hoodie.properties file
prepForDowngrade();
// downgrade should be performed. all marker files should be deleted
new SparkUpgradeDowngrade(metaClient, cfg, context).run(metaClient, HoodieTableVersion.ZERO, cfg, context, null);
// assert marker files
assertMarkerFilesForDowngrade(table, commitInstant);
// verify hoodie.table.version got downgraded
assertEquals(metaClient.getTableConfig().getTableVersion().versionCode(), HoodieTableVersion.ZERO.versionCode());
assertTableVersionFromPropertyFile(HoodieTableVersion.ZERO);
// trigger 3rd commit with marker based rollback disabled.
List<HoodieRecord> thirdBatch = triggerCommit("003", tableType, false);
// Check the entire dataset has all records only from 1st commit and 3rd commit since 2nd is expected to be rolledback.
assertRows(inputRecords.getKey(), thirdBatch);
}
private void assertMarkerFilesForDowngrade(HoodieTable table, HoodieInstant commitInstant) throws IOException {
// Verify recreated marker files are as expected
MarkerFiles markerFiles = new MarkerFiles(table, commitInstant.getTimestamp());
assertFalse(markerFiles.doesMarkerDirExist());
}
private void assertMarkerFilesForUpgrade(HoodieTable table, HoodieInstant commitInstant, List<FileSlice> firstPartitionCommit2FileSlices,
List<FileSlice> secondPartitionCommit2FileSlices) throws IOException {
// Verify recreated marker files are as expected
MarkerFiles markerFiles = new MarkerFiles(table, commitInstant.getTimestamp());
assertTrue(markerFiles.doesMarkerDirExist());
List<String> files = markerFiles.allMarkerFilePaths();
assertEquals(2, files.size());
List<String> actualFiles = new ArrayList<>();
for (String file : files) {
String fileName = MarkerFiles.stripMarkerSuffix(file);
actualFiles.add(fileName);
}
List<FileSlice> expectedFileSlices = new ArrayList<>();
expectedFileSlices.addAll(firstPartitionCommit2FileSlices);
expectedFileSlices.addAll(secondPartitionCommit2FileSlices);
List<String> expectedPaths = new ArrayList<>();
List<Pair<String, String>> expectedLogFilePaths = new ArrayList<>();
for (FileSlice fileSlice : expectedFileSlices) {
String partitionPath = fileSlice.getPartitionPath();
if (table.getMetaClient().getTableType() == HoodieTableType.MERGE_ON_READ) {
for (HoodieLogFile logFile : fileSlice.getLogFiles().collect(Collectors.toList())) {
// log file format can't be matched as is, since the write token can't be asserted. Hence asserting for partitionpath, fileId and baseCommit time.
String logBaseCommitTime = logFile.getBaseCommitTime();
expectedLogFilePaths.add(Pair.of(partitionPath + "/" + logFile.getFileId(), logBaseCommitTime));
}
}
if (fileSlice.getBaseInstantTime().equals(commitInstant.getTimestamp())) {
String path = fileSlice.getBaseFile().get().getPath();
// for base files, path can be asserted as is.
expectedPaths.add(path.substring(path.indexOf(partitionPath)));
}
}
// Trim log file paths only
List<String> trimmedActualFiles = new ArrayList<>();
for (String actualFile : actualFiles) {
if (table.getMetaClient().getTableType() == HoodieTableType.MERGE_ON_READ) {
trimmedActualFiles.add(actualFile.substring(0, actualFile.lastIndexOf('.')));
} else {
trimmedActualFiles.add(actualFile);
}
}
// assert for base files.
for (String expected : expectedPaths) {
if (trimmedActualFiles.contains(expected)) {
trimmedActualFiles.remove(expected);
}
}
if (expectedLogFilePaths.size() > 0) {
// assert for log files
List<Pair<String, String>> actualLogFiles = new ArrayList<>();
for (String actual : trimmedActualFiles) {
actualLogFiles.add(Pair.of(actual.substring(0, actual.indexOf('_')), actual.substring(actual.lastIndexOf('_') + 1)));
}
assertEquals(expectedLogFilePaths.size(), actualLogFiles.size());
for (Pair<String, String> entry : expectedLogFilePaths) {
assertTrue(actualLogFiles.contains(entry));
}
} else {
assertTrue(trimmedActualFiles.size() == 0);
}
}
private List<HoodieRecord> triggerCommit(String newCommitTime, HoodieTableType tableType, boolean enableMarkedBasedRollback) {
Map<String, String> params = new HashMap<>();
if (tableType == HoodieTableType.MERGE_ON_READ) {
params.put(HOODIE_TABLE_TYPE_PROP_NAME, HoodieTableType.MERGE_ON_READ.name());
}
HoodieWriteConfig cfg = getConfigBuilder().withAutoCommit(false).withRollbackUsingMarkers(enableMarkedBasedRollback).withProps(params).build();
SparkRDDWriteClient client = getHoodieWriteClient(cfg, true);
client.startCommitWithTime(newCommitTime);
List<HoodieRecord> records = dataGen.generateInsertsContainsAllPartitions(newCommitTime, 2);
JavaRDD<HoodieRecord> writeRecords = jsc.parallelize(records, 1);
JavaRDD<WriteStatus> statuses = client.upsert(writeRecords, newCommitTime);
Assertions.assertNoWriteErrors(statuses.collect());
client.commit(newCommitTime, statuses);
return records;
}
private void assertRows(List<HoodieRecord> firstBatch, List<HoodieRecord> secondBatch) {
String[] fullPartitionPaths = new String[dataGen.getPartitionPaths().length];
for (int i = 0; i < fullPartitionPaths.length; i++) {
fullPartitionPaths[i] = String.format("%s/%s/*", basePath, dataGen.getPartitionPaths()[i]);
}
Dataset<Row> rows = HoodieClientTestUtils.read(jsc, metaClient.getBasePath(), sqlContext, metaClient.getFs(), fullPartitionPaths);
List<String> expectedRecordKeys = new ArrayList<>();
for (HoodieRecord rec : firstBatch) {
expectedRecordKeys.add(rec.getRecordKey());
}
for (HoodieRecord rec : secondBatch) {
expectedRecordKeys.add(rec.getRecordKey());
}
List<Row> rowList = rows.collectAsList();
assertEquals(expectedRecordKeys.size(), rows.count());
List<String> actualRecordKeys = new ArrayList<>();
for (Row row : rowList) {
actualRecordKeys.add(row.getAs("_row_key"));
}
for (String expectedRecordKey : expectedRecordKeys) {
assertTrue(actualRecordKeys.contains(expectedRecordKey));
}
}
/**
* Create two commits and may or may not commit 2nd commit.
*
* @param firstPartitionCommit2FileSlices list to hold file slices in first partition.
* @param secondPartitionCommit2FileSlices list of hold file slices from second partition.
* @param cfg instance of {@link HoodieWriteConfig}
* @param client instance of {@link SparkRDDWriteClient} to use.
* @param commitSecondUpsert true if 2nd commit needs to be committed. false otherwise.
* @return a pair of list of records from 1st and 2nd batch.
*/
private Pair<List<HoodieRecord>, List<HoodieRecord>> twoUpsertCommitDataWithTwoPartitions(List<FileSlice> firstPartitionCommit2FileSlices,
List<FileSlice> secondPartitionCommit2FileSlices,
HoodieWriteConfig cfg, SparkRDDWriteClient client,
boolean commitSecondUpsert) throws IOException {
//just generate two partitions
dataGen = new HoodieTestDataGenerator(new String[] {DEFAULT_FIRST_PARTITION_PATH, DEFAULT_SECOND_PARTITION_PATH});
//1. prepare data
HoodieTestDataGenerator.writePartitionMetadata(dfs, new String[] {DEFAULT_FIRST_PARTITION_PATH, DEFAULT_SECOND_PARTITION_PATH}, dfsBasePath);
/**
* Write 1 (only inserts)
*/
String newCommitTime = "001";
client.startCommitWithTime(newCommitTime);
List<HoodieRecord> records = dataGen.generateInsertsContainsAllPartitions(newCommitTime, 2);
JavaRDD<HoodieRecord> writeRecords = jsc.parallelize(records, 1);
JavaRDD<WriteStatus> statuses = client.upsert(writeRecords, newCommitTime);
Assertions.assertNoWriteErrors(statuses.collect());
client.commit(newCommitTime, statuses);
/**
* Write 2 (updates)
*/
newCommitTime = "002";
client.startCommitWithTime(newCommitTime);
List<HoodieRecord> records2 = dataGen.generateUpdates(newCommitTime, records);
statuses = client.upsert(jsc.parallelize(records2, 1), newCommitTime);
Assertions.assertNoWriteErrors(statuses.collect());
if (commitSecondUpsert) {
client.commit(newCommitTime, statuses);
}
//2. assert filegroup and get the first partition fileslice
HoodieTable table = this.getHoodieTable(metaClient, cfg);
SyncableFileSystemView fsView = getFileSystemViewWithUnCommittedSlices(table.getMetaClient());
List<HoodieFileGroup> firstPartitionCommit2FileGroups = fsView.getAllFileGroups(DEFAULT_FIRST_PARTITION_PATH).collect(Collectors.toList());
assertEquals(1, firstPartitionCommit2FileGroups.size());
firstPartitionCommit2FileSlices.addAll(firstPartitionCommit2FileGroups.get(0).getAllFileSlices().collect(Collectors.toList()));
//3. assert filegroup and get the second partition fileslice
List<HoodieFileGroup> secondPartitionCommit2FileGroups = fsView.getAllFileGroups(DEFAULT_SECOND_PARTITION_PATH).collect(Collectors.toList());
assertEquals(1, secondPartitionCommit2FileGroups.size());
secondPartitionCommit2FileSlices.addAll(secondPartitionCommit2FileGroups.get(0).getAllFileSlices().collect(Collectors.toList()));
//4. assert fileslice
HoodieTableType tableType = metaClient.getTableType();
if (tableType.equals(HoodieTableType.COPY_ON_WRITE)) {
assertEquals(2, firstPartitionCommit2FileSlices.size());
assertEquals(2, secondPartitionCommit2FileSlices.size());
} else {
assertEquals(1, firstPartitionCommit2FileSlices.size());
assertEquals(1, secondPartitionCommit2FileSlices.size());
}
return Pair.of(records, records2);
}
private void prepForDowngrade() throws IOException {
metaClient.getTableConfig().setTableVersion(HoodieTableVersion.ONE);
Path propertyFile = new Path(metaClient.getMetaPath() + "/" + HoodieTableConfig.HOODIE_PROPERTIES_FILE);
try (FSDataOutputStream os = metaClient.getFs().create(propertyFile)) {
metaClient.getTableConfig().getProperties().store(os, "");
}
}
private void createResidualFile() throws IOException {
Path propertyFile = new Path(metaClient.getMetaPath() + "/" + HoodieTableConfig.HOODIE_PROPERTIES_FILE);
Path updatedPropertyFile = new Path(metaClient.getMetaPath() + "/" + SparkUpgradeDowngrade.HOODIE_UPDATED_PROPERTY_FILE);
// Step1: Copy hoodie.properties to hoodie.properties.orig
FileUtil.copy(metaClient.getFs(), propertyFile, metaClient.getFs(), updatedPropertyFile,
false, metaClient.getHadoopConf());
}
private void assertTableVersionFromPropertyFile(HoodieTableVersion expectedVersion) throws IOException {
Path propertyFile = new Path(metaClient.getMetaPath() + "/" + HoodieTableConfig.HOODIE_PROPERTIES_FILE);
// Load the properties and verify
FSDataInputStream fsDataInputStream = metaClient.getFs().open(propertyFile);
Properties prop = new Properties();
prop.load(fsDataInputStream);
fsDataInputStream.close();
assertEquals(Integer.toString(expectedVersion.versionCode()), prop.getProperty(HoodieTableConfig.HOODIE_TABLE_VERSION_PROP_NAME));
}
}