blob: a53b83e43f86a709c7486a18b4302da4325e3663 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hudi.client.functional;
import org.apache.avro.generic.GenericRecord;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.Path;
import org.apache.hudi.avro.model.HoodieClusteringPlan;
import org.apache.hudi.avro.model.HoodieRequestedReplaceMetadata;
import org.apache.hudi.avro.model.HoodieCleanMetadata;
import org.apache.hudi.client.AbstractHoodieWriteClient;
import org.apache.hudi.client.HoodieWriteResult;
import org.apache.hudi.client.SparkRDDWriteClient;
import org.apache.hudi.client.SparkTaskContextSupplier;
import org.apache.hudi.client.WriteStatus;
import org.apache.hudi.common.config.TypedProperties;
import org.apache.hudi.client.validator.SparkPreCommitValidator;
import org.apache.hudi.client.validator.SqlQueryEqualityPreCommitValidator;
import org.apache.hudi.client.validator.SqlQuerySingleResultPreCommitValidator;
import org.apache.hudi.common.engine.HoodieEngineContext;
import org.apache.hudi.common.fs.ConsistencyGuardConfig;
import org.apache.hudi.common.fs.FSUtils;
import org.apache.hudi.common.model.FileSlice;
import org.apache.hudi.common.model.HoodieBaseFile;
import org.apache.hudi.common.model.HoodieCommitMetadata;
import org.apache.hudi.common.model.HoodieFailedWritesCleaningPolicy;
import org.apache.hudi.common.model.HoodieFileGroupId;
import org.apache.hudi.common.model.HoodieKey;
import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.common.model.HoodieRecordPayload;
import org.apache.hudi.common.model.HoodieWriteStat;
import org.apache.hudi.common.model.IOType;
import org.apache.hudi.common.model.WriteOperationType;
import org.apache.hudi.common.table.HoodieTableMetaClient;
import org.apache.hudi.common.table.timeline.HoodieActiveTimeline;
import org.apache.hudi.common.table.timeline.HoodieInstant;
import org.apache.hudi.common.table.timeline.HoodieTimeline;
import org.apache.hudi.common.table.timeline.TimelineMetadataUtils;
import org.apache.hudi.common.table.timeline.versioning.TimelineLayoutVersion;
import org.apache.hudi.common.table.view.TableFileSystemView.BaseFileOnlyView;
import org.apache.hudi.common.testutils.HoodieTestDataGenerator;
import org.apache.hudi.common.testutils.HoodieTestTable;
import org.apache.hudi.common.testutils.HoodieTestUtils;
import org.apache.hudi.common.testutils.RawTripTestPayload;
import org.apache.hudi.common.util.ClusteringUtils;
import org.apache.hudi.common.util.CollectionUtils;
import org.apache.hudi.common.util.BaseFileUtils;
import org.apache.hudi.common.util.FileIOUtils;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.StringUtils;
import org.apache.hudi.common.util.collection.Pair;
import org.apache.hudi.config.HoodieClusteringConfig;
import org.apache.hudi.config.HoodieCompactionConfig;
import org.apache.hudi.config.HoodieIndexConfig;
import org.apache.hudi.config.HoodiePreCommitValidatorConfig;
import org.apache.hudi.config.HoodieStorageConfig;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.exception.HoodieCommitException;
import org.apache.hudi.exception.HoodieCorruptedDataException;
import org.apache.hudi.exception.HoodieIOException;
import org.apache.hudi.exception.HoodieInsertException;
import org.apache.hudi.exception.HoodieRollbackException;
import org.apache.hudi.exception.HoodieUpsertException;
import org.apache.hudi.exception.HoodieValidationException;
import org.apache.hudi.index.HoodieIndex;
import org.apache.hudi.index.HoodieIndex.IndexType;
import org.apache.hudi.io.HoodieMergeHandle;
import org.apache.hudi.keygen.BaseKeyGenerator;
import org.apache.hudi.keygen.KeyGenerator;
import org.apache.hudi.keygen.factory.HoodieSparkKeyGeneratorFactory;
import org.apache.hudi.table.HoodieSparkCopyOnWriteTable;
import org.apache.hudi.table.HoodieSparkTable;
import org.apache.hudi.table.HoodieTable;
import org.apache.hudi.table.MarkerFiles;
import org.apache.hudi.table.action.HoodieWriteMetadata;
import org.apache.hudi.table.action.commit.SparkWriteHelper;
import org.apache.hudi.testutils.HoodieClientTestBase;
import org.apache.hudi.testutils.HoodieClientTestUtils;
import org.apache.hudi.testutils.HoodieSparkWriteableTestTable;
import org.apache.log4j.LogManager;
import org.apache.log4j.Logger;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Tag;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.Arguments;
import org.junit.jupiter.params.provider.EnumSource;
import org.junit.jupiter.params.provider.MethodSource;
import org.junit.jupiter.params.provider.ValueSource;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import static org.apache.hudi.common.table.timeline.versioning.TimelineLayoutVersion.VERSION_0;
import static org.apache.hudi.common.testutils.FileCreateUtils.getBaseFileCountsForPaths;
import static org.apache.hudi.common.testutils.HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH;
import static org.apache.hudi.common.testutils.HoodieTestDataGenerator.DEFAULT_SECOND_PARTITION_PATH;
import static org.apache.hudi.common.testutils.HoodieTestDataGenerator.DEFAULT_THIRD_PARTITION_PATH;
import static org.apache.hudi.common.testutils.HoodieTestDataGenerator.NULL_SCHEMA;
import static org.apache.hudi.common.testutils.HoodieTestDataGenerator.TRIP_EXAMPLE_SCHEMA;
import static org.apache.hudi.common.testutils.Transformations.randomSelectAsHoodieKeys;
import static org.apache.hudi.common.testutils.Transformations.recordsToRecordKeySet;
import static org.apache.hudi.config.HoodieClusteringConfig.ASYNC_CLUSTERING_ENABLE;
import static org.apache.hudi.config.HoodieClusteringConfig.CLUSTERING_EXECUTION_STRATEGY_CLASS;
import static org.apache.hudi.testutils.Assertions.assertNoWriteErrors;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertNull;
import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.junit.jupiter.api.Assertions.fail;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
@SuppressWarnings("unchecked")
@Tag("functional")
public class TestHoodieClientOnCopyOnWriteStorage extends HoodieClientTestBase {
private static final Logger LOG = LogManager.getLogger(TestHoodieClientOnCopyOnWriteStorage.class);
private static final Map<String, String> STRATEGY_PARAMS = new HashMap<String, String>() {
{
put("sortColumn", "record_key");
}
};
private static Stream<Arguments> smallInsertHandlingParams() {
return Arrays.stream(new Boolean[][] {{true}, {false}}).map(Arguments::of);
}
private static Stream<Arguments> populateMetaFieldsParams() {
return Arrays.stream(new Boolean[][] {{true}, {false}}).map(Arguments::of);
}
private static Stream<Arguments> rollbackFailedCommitsParams() {
return Stream.of(
Arguments.of(HoodieFailedWritesCleaningPolicy.LAZY, true),
Arguments.of(HoodieFailedWritesCleaningPolicy.LAZY, false),
Arguments.of(HoodieFailedWritesCleaningPolicy.NEVER, true),
Arguments.of(HoodieFailedWritesCleaningPolicy.NEVER, false)
);
}
private static Stream<Arguments> rollbackAfterConsistencyCheckFailureParams() {
return Stream.of(
Arguments.of(true, true),
Arguments.of(true, false),
Arguments.of(false, true),
Arguments.of(false, false)
);
}
private HoodieTestTable testTable;
private static final String COUNT_SQL_QUERY_FOR_VALIDATION = "select count(*) from <TABLE_NAME>";
@BeforeEach
public void setUpTestTable() {
testTable = HoodieSparkWriteableTestTable.of(metaClient);
}
/**
* Test Auto Commit behavior for HoodieWriteClient insert API.
*/
@ParameterizedTest
@MethodSource("populateMetaFieldsParams")
public void testAutoCommitOnInsert(boolean populateMetaFields) throws Exception {
testAutoCommit(SparkRDDWriteClient::insert, false, populateMetaFields);
}
/**
* Test Auto Commit behavior for HoodieWriteClient insertPrepped API.
*/
@ParameterizedTest
@MethodSource("populateMetaFieldsParams")
public void testAutoCommitOnInsertPrepped(boolean populateMetaFields) throws Exception {
testAutoCommit(SparkRDDWriteClient::insertPreppedRecords, true, populateMetaFields);
}
/**
* Test Auto Commit behavior for HoodieWriteClient upsert API.
*/
@ParameterizedTest
@MethodSource("populateMetaFieldsParams")
public void testAutoCommitOnUpsert(boolean populateMetaFields) throws Exception {
testAutoCommit(SparkRDDWriteClient::upsert, false, populateMetaFields);
}
/**
* Test Auto Commit behavior for HoodieWriteClient upsert Prepped API.
*/
@ParameterizedTest
@MethodSource("populateMetaFieldsParams")
public void testAutoCommitOnUpsertPrepped(boolean populateMetaFields) throws Exception {
testAutoCommit(SparkRDDWriteClient::upsertPreppedRecords, true, populateMetaFields);
}
/**
* Test Auto Commit behavior for HoodieWriteClient bulk-insert API.
*/
@ParameterizedTest
@MethodSource("populateMetaFieldsParams")
public void testAutoCommitOnBulkInsert(boolean populateMetaFields) throws Exception {
testAutoCommit(SparkRDDWriteClient::bulkInsert, false, populateMetaFields);
}
/**
* Test Auto Commit behavior for HoodieWriteClient bulk-insert prepped API.
*/
@ParameterizedTest
@MethodSource("populateMetaFieldsParams")
public void testAutoCommitOnBulkInsertPrepped(boolean populateMetaFields) throws Exception {
testAutoCommit((writeClient, recordRDD, instantTime) -> writeClient.bulkInsertPreppedRecords(recordRDD, instantTime,
Option.empty()), true, populateMetaFields);
}
/**
* Test auto-commit by applying write function.
*
* @param writeFn One of HoodieWriteClient Write API
* @throws Exception in case of failure
*/
private void testAutoCommit(Function3<JavaRDD<WriteStatus>, SparkRDDWriteClient, JavaRDD<HoodieRecord>, String> writeFn,
boolean isPrepped, boolean populateMetaFields) throws Exception {
// Set autoCommit false
HoodieWriteConfig.Builder cfgBuilder = getConfigBuilder().withAutoCommit(false);
addConfigsForPopulateMetaFields(cfgBuilder, populateMetaFields);
try (SparkRDDWriteClient client = getHoodieWriteClient(cfgBuilder.build());) {
String prevCommitTime = "000";
String newCommitTime = "001";
int numRecords = 200;
JavaRDD<WriteStatus> result = insertFirstBatch(cfgBuilder.build(), client, newCommitTime, prevCommitTime, numRecords, writeFn,
isPrepped, false, numRecords);
assertFalse(testTable.commitExists(newCommitTime),
"If Autocommit is false, then commit should not be made automatically");
assertTrue(client.commit(newCommitTime, result), "Commit should succeed");
assertTrue(testTable.commitExists(newCommitTime),
"After explicit commit, commit file should be created");
}
}
@Test
public void testPreCommitValidatorsOnInsert() throws Exception {
int numRecords = 200;
HoodiePreCommitValidatorConfig validatorConfig = HoodiePreCommitValidatorConfig.newBuilder()
.withPreCommitValidator(SqlQuerySingleResultPreCommitValidator.class.getName())
.withPrecommitValidatorSingleResultSqlQueries(COUNT_SQL_QUERY_FOR_VALIDATION + "#" + numRecords)
.build();
HoodieWriteConfig config = getConfigBuilder().withAutoCommit(true)
.withPreCommitValidatorConfig(validatorConfig).build();
try (SparkRDDWriteClient client = getHoodieWriteClient(config)) {
Function3<JavaRDD<WriteStatus>, SparkRDDWriteClient, JavaRDD<HoodieRecord>, String> writeFn = (writeClient, recordRDD, instantTime) ->
writeClient.bulkInsert(recordRDD, instantTime, Option.empty());
String newCommitTime = HoodieActiveTimeline.createNewInstantTime();
JavaRDD<WriteStatus> result = insertFirstBatch(config, client, newCommitTime,
"000", numRecords, writeFn, false, false, numRecords);
assertTrue(testTable.commitExists(newCommitTime));
}
}
@Test
public void testPreCommitValidationFailureOnInsert() throws Exception {
int numRecords = 200;
HoodiePreCommitValidatorConfig validatorConfig = HoodiePreCommitValidatorConfig.newBuilder()
.withPreCommitValidator(SqlQuerySingleResultPreCommitValidator.class.getName())
//set wrong value for expected number of rows
.withPrecommitValidatorSingleResultSqlQueries(COUNT_SQL_QUERY_FOR_VALIDATION + "#" + 500)
.build();
HoodieWriteConfig config = getConfigBuilder().withPreCommitValidatorConfig(validatorConfig).build();
String newCommitTime = HoodieActiveTimeline.createNewInstantTime();
try (SparkRDDWriteClient client = getHoodieWriteClient(config)) {
Function3<JavaRDD<WriteStatus>, SparkRDDWriteClient, JavaRDD<HoodieRecord>, String> writeFn = (writeClient, recordRDD, instantTime) ->
writeClient.bulkInsert(recordRDD, instantTime, Option.empty());
JavaRDD<WriteStatus> result = insertFirstBatch(config, client, newCommitTime,
"000", numRecords, writeFn, false, false, numRecords);
fail("Expected validation to fail because we only insert 200 rows. Validation is configured to expect 500 rows");
} catch (HoodieInsertException e) {
if (e.getCause() instanceof HoodieValidationException) {
// expected because wrong value passed
} else {
throw e;
}
}
assertFalse(testTable.commitExists(newCommitTime));
}
@Test
public void testPreCommitValidationWithMultipleInflights() throws Exception {
int numRecords = 200;
HoodiePreCommitValidatorConfig validatorConfig = HoodiePreCommitValidatorConfig.newBuilder()
.withPreCommitValidator(SqlQuerySingleResultPreCommitValidator.class.getName())
//set wrong value for expected number of rows
.withPrecommitValidatorSingleResultSqlQueries(COUNT_SQL_QUERY_FOR_VALIDATION + "#" + 500)
.build();
HoodieWriteConfig config = getConfigBuilder()
.withCompactionConfig(HoodieCompactionConfig.newBuilder().withFailedWritesCleaningPolicy(HoodieFailedWritesCleaningPolicy.NEVER).build())
.withPreCommitValidatorConfig(validatorConfig)
.build();
String instant1 = HoodieActiveTimeline.createNewInstantTime();
try {
insertWithConfig(config, numRecords, instant1);
fail("Expected validation to fail because we only insert 200 rows. Validation is configured to expect 500 rows");
} catch (HoodieInsertException e) {
if (e.getCause() instanceof HoodieValidationException) {
// expected because wrong value passed
} else {
throw e;
}
}
assertFalse(testTable.commitExists(instant1));
assertTrue(testTable.inflightCommitExists(instant1));
numRecords = 300;
validatorConfig = HoodiePreCommitValidatorConfig.newBuilder()
.withPreCommitValidator(SqlQuerySingleResultPreCommitValidator.class.getName())
//set wrong value for expected number of rows
.withPrecommitValidatorSingleResultSqlQueries(COUNT_SQL_QUERY_FOR_VALIDATION + "#" + numRecords)
.build();
config = getConfigBuilder()
.withCompactionConfig(HoodieCompactionConfig.newBuilder().withFailedWritesCleaningPolicy(HoodieFailedWritesCleaningPolicy.NEVER).build())
.withPreCommitValidatorConfig(validatorConfig)
.build();
String instant2 = HoodieActiveTimeline.createNewInstantTime();
// expect pre-commit validators to succeed. Note that validator is expected to exclude inflight instant1
insertWithConfig(config, numRecords, instant2);
assertTrue(testTable.inflightCommitExists(instant1));
assertTrue(testTable.commitExists(instant2));
}
private void insertWithConfig(HoodieWriteConfig config, int numRecords, String instant) throws Exception {
try (SparkRDDWriteClient client = getHoodieWriteClient(config)) {
Function3<JavaRDD<WriteStatus>, SparkRDDWriteClient, JavaRDD<HoodieRecord>, String> writeFn = (writeClient, recordRDD, instantTime) ->
writeClient.bulkInsert(recordRDD, instantTime, Option.empty());
JavaRDD<WriteStatus> result = insertFirstBatch(config, client, instant,
"000", numRecords, writeFn, false, false, numRecords);
}
}
/**
* Test De-duplication behavior for HoodieWriteClient insert API.
*/
@ParameterizedTest
@MethodSource("populateMetaFieldsParams")
public void testDeduplicationOnInsert(boolean populateMetaFields) throws Exception {
testDeduplication(SparkRDDWriteClient::insert, populateMetaFields);
}
/**
* Test De-duplication behavior for HoodieWriteClient bulk-insert API.
*/
@ParameterizedTest
@MethodSource("populateMetaFieldsParams")
public void testDeduplicationOnBulkInsert(boolean populateMetaFields) throws Exception {
testDeduplication(SparkRDDWriteClient::bulkInsert, populateMetaFields);
}
/**
* Test De-duplication behavior for HoodieWriteClient upsert API.
*/
@ParameterizedTest
@MethodSource("populateMetaFieldsParams")
public void testDeduplicationOnUpsert(boolean populateMetaFields) throws Exception {
testDeduplication(SparkRDDWriteClient::upsert, populateMetaFields);
}
/**
* Test Deduplication Logic for write function.
*
* @param writeFn One of HoddieWriteClient non-prepped write APIs
* @throws Exception in case of failure
*/
private void testDeduplication(
Function3<JavaRDD<WriteStatus>, SparkRDDWriteClient, JavaRDD<HoodieRecord>, String> writeFn, boolean populateMetaFields) throws Exception {
String newCommitTime = "001";
String recordKey = UUID.randomUUID().toString();
HoodieKey keyOne = new HoodieKey(recordKey, "2018-01-01");
HoodieRecord<RawTripTestPayload> recordOne =
new HoodieRecord(keyOne, dataGen.generateRandomValue(keyOne, newCommitTime));
HoodieKey keyTwo = new HoodieKey(recordKey, "2018-02-01");
HoodieRecord recordTwo =
new HoodieRecord(keyTwo, dataGen.generateRandomValue(keyTwo, newCommitTime));
// Same key and partition as keyTwo
HoodieRecord recordThree =
new HoodieRecord(keyTwo, dataGen.generateRandomValue(keyTwo, newCommitTime));
JavaRDD<HoodieRecord<RawTripTestPayload>> records =
jsc.parallelize(Arrays.asList(recordOne, recordTwo, recordThree), 1);
// Global dedup should be done based on recordKey only
HoodieIndex index = mock(HoodieIndex.class);
when(index.isGlobal()).thenReturn(true);
List<HoodieRecord<RawTripTestPayload>> dedupedRecs = SparkWriteHelper.newInstance().deduplicateRecords(records, index, 1).collect();
assertEquals(1, dedupedRecs.size());
assertEquals(dedupedRecs.get(0).getPartitionPath(), recordThree.getPartitionPath());
assertNodupesWithinPartition(dedupedRecs);
// non-Global dedup should be done based on both recordKey and partitionPath
index = mock(HoodieIndex.class);
when(index.isGlobal()).thenReturn(false);
dedupedRecs = SparkWriteHelper.newInstance().deduplicateRecords(records, index, 1).collect();
assertEquals(2, dedupedRecs.size());
assertNodupesWithinPartition(dedupedRecs);
// Perform write-action and check
JavaRDD<HoodieRecord> recordList = jsc.parallelize(Arrays.asList(recordOne, recordTwo, recordThree), 1);
HoodieWriteConfig.Builder configBuilder = getConfigBuilder(HoodieFailedWritesCleaningPolicy.LAZY)
.combineInput(true, true);
addConfigsForPopulateMetaFields(configBuilder, populateMetaFields);
try (SparkRDDWriteClient client = getHoodieWriteClient(configBuilder.build());) {
client.startCommitWithTime(newCommitTime);
List<WriteStatus> statuses = writeFn.apply(client, recordList, newCommitTime).collect();
assertNoWriteErrors(statuses);
assertEquals(2, statuses.size());
assertNodupesInPartition(statuses.stream().map(WriteStatus::getWrittenRecords).flatMap(Collection::stream)
.collect(Collectors.toList()));
}
}
/**
* Assert that there is no duplicate key at the partition level.
*
* @param records List of Hoodie records
*/
void assertNodupesInPartition(List<HoodieRecord> records) {
Map<String, Set<String>> partitionToKeys = new HashMap<>();
for (HoodieRecord r : records) {
String key = r.getRecordKey();
String partitionPath = r.getPartitionPath();
if (!partitionToKeys.containsKey(partitionPath)) {
partitionToKeys.put(partitionPath, new HashSet<>());
}
assertFalse(partitionToKeys.get(partitionPath).contains(key), "key " + key + " is duplicate within partition " + partitionPath);
partitionToKeys.get(partitionPath).add(key);
}
}
/**
* Test Upsert API.
*/
@ParameterizedTest
@MethodSource("populateMetaFieldsParams")
public void testUpserts(boolean populateMetaFields) throws Exception {
HoodieWriteConfig.Builder cfgBuilder = getConfigBuilder();
addConfigsForPopulateMetaFields(cfgBuilder, populateMetaFields);
testUpsertsInternal(cfgBuilder.build(), SparkRDDWriteClient::upsert, false);
}
/**
* Test UpsertPrepped API.
*/
@ParameterizedTest
@MethodSource("populateMetaFieldsParams")
public void testUpsertsPrepped(boolean populateMetaFields) throws Exception {
HoodieWriteConfig.Builder cfgBuilder = getConfigBuilder();
addConfigsForPopulateMetaFields(cfgBuilder, populateMetaFields);
testUpsertsInternal(cfgBuilder.build(), SparkRDDWriteClient::upsertPreppedRecords, true);
}
/**
* Test one of HoodieWriteClient upsert(Prepped) APIs.
*
* @param config Write Config
* @param writeFn One of Hoodie Write Function API
* @throws Exception in case of error
*/
private void testUpsertsInternal(HoodieWriteConfig config,
Function3<JavaRDD<WriteStatus>, SparkRDDWriteClient, JavaRDD<HoodieRecord>, String> writeFn, boolean isPrepped)
throws Exception {
// Force using older timeline layout
HoodieWriteConfig hoodieWriteConfig = getConfigBuilder(HoodieFailedWritesCleaningPolicy.LAZY)
.withProps(config.getProps()).withTimelineLayoutVersion(
VERSION_0).build();
HoodieTableMetaClient.withPropertyBuilder()
.fromMetaClient(metaClient)
.setTimelineLayoutVersion(VERSION_0)
.setPopulateMetaFields(config.populateMetaFields())
.initTable(metaClient.getHadoopConf(), metaClient.getBasePath());
SparkRDDWriteClient client = getHoodieWriteClient(hoodieWriteConfig);
// Write 1 (only inserts)
String newCommitTime = "001";
String initCommitTime = "000";
int numRecords = 200;
insertFirstBatch(hoodieWriteConfig, client, newCommitTime, initCommitTime, numRecords, SparkRDDWriteClient::insert,
isPrepped, true, numRecords, config.populateMetaFields());
// Write 2 (updates)
String prevCommitTime = newCommitTime;
newCommitTime = "004";
numRecords = 100;
String commitTimeBetweenPrevAndNew = "002";
updateBatch(hoodieWriteConfig, client, newCommitTime, prevCommitTime,
Option.of(Arrays.asList(commitTimeBetweenPrevAndNew)), initCommitTime, numRecords, writeFn, isPrepped, true,
numRecords, 200, 2, config.populateMetaFields());
// Delete 1
prevCommitTime = newCommitTime;
newCommitTime = "005";
numRecords = 50;
deleteBatch(hoodieWriteConfig, client, newCommitTime, prevCommitTime,
initCommitTime, numRecords, SparkRDDWriteClient::delete, isPrepped, true,
0, 150, config.populateMetaFields());
// Now simulate an upgrade and perform a restore operation
HoodieWriteConfig newConfig = getConfigBuilder().withProps(config.getProps()).withTimelineLayoutVersion(
TimelineLayoutVersion.CURR_VERSION).build();
client = getHoodieWriteClient(newConfig);
client.restoreToInstant("004");
// Check the entire dataset has all records still
String[] fullPartitionPaths = new String[dataGen.getPartitionPaths().length];
for (int i = 0; i < fullPartitionPaths.length; i++) {
fullPartitionPaths[i] = String.format("%s/%s/*", basePath, dataGen.getPartitionPaths()[i]);
}
assertEquals(200, HoodieClientTestUtils.read(jsc, basePath, sqlContext, fs, fullPartitionPaths).count(),
"Must contain " + 200 + " records");
// Perform Delete again on upgraded dataset.
prevCommitTime = newCommitTime;
newCommitTime = "006";
numRecords = 50;
deleteBatch(newConfig, client, newCommitTime, prevCommitTime,
initCommitTime, numRecords, SparkRDDWriteClient::delete, isPrepped, true,
0, 150);
HoodieActiveTimeline activeTimeline = new HoodieActiveTimeline(metaClient, false);
List<HoodieInstant> instants = activeTimeline.getCommitTimeline().getInstants().collect(Collectors.toList());
assertEquals(5, instants.size());
assertEquals(new HoodieInstant(HoodieInstant.State.COMPLETED, HoodieTimeline.COMMIT_ACTION, "001"),
instants.get(0));
assertEquals(new HoodieInstant(HoodieInstant.State.COMPLETED, HoodieTimeline.COMMIT_ACTION, "004"),
instants.get(1));
// New Format should have all states of instants
assertEquals(new HoodieInstant(HoodieInstant.State.REQUESTED, HoodieTimeline.COMMIT_ACTION, "006"),
instants.get(2));
assertEquals(new HoodieInstant(HoodieInstant.State.INFLIGHT, HoodieTimeline.COMMIT_ACTION, "006"),
instants.get(3));
assertEquals(new HoodieInstant(HoodieInstant.State.COMPLETED, HoodieTimeline.COMMIT_ACTION, "006"),
instants.get(4));
final HoodieWriteConfig cfg = hoodieWriteConfig;
final String instantTime = "007";
HoodieTableMetaClient metaClient = HoodieTableMetaClient.builder().setConf(jsc.hadoopConfiguration()).setBasePath(basePath).build();
String basePathStr = basePath;
HoodieTable table = getHoodieTable(metaClient, cfg);
String extension = metaClient.getTableConfig().getBaseFileFormat().getFileExtension();
jsc.parallelize(Arrays.asList(1)).map(e -> {
HoodieCommitMetadata commitMetadata = HoodieCommitMetadata
.fromBytes(metaClient.getActiveTimeline().getInstantDetails(
metaClient.getCommitsTimeline().filterCompletedInstants().lastInstant().get()).get(),
HoodieCommitMetadata.class);
String filePath = commitMetadata.getPartitionToWriteStats().values().stream()
.flatMap(w -> w.stream()).filter(s -> s.getPath().endsWith(extension)).findAny()
.map(ee -> ee.getPath()).orElse(null);
String partitionPath = commitMetadata.getPartitionToWriteStats().values().stream()
.flatMap(w -> w.stream()).filter(s -> s.getPath().endsWith(extension)).findAny()
.map(ee -> ee.getPartitionPath()).orElse(null);
Path baseFilePath = new Path(basePathStr, filePath);
HoodieBaseFile baseFile = new HoodieBaseFile(baseFilePath.toString());
try {
HoodieMergeHandle handle = new HoodieMergeHandle(cfg, instantTime, table, new HashMap<>(),
partitionPath, FSUtils.getFileId(baseFilePath.getName()), baseFile, new SparkTaskContextSupplier(),
config.populateMetaFields() ? Option.empty() :
Option.of((BaseKeyGenerator) HoodieSparkKeyGeneratorFactory.createKeyGenerator(new TypedProperties(config.getProps()))));
WriteStatus writeStatus = new WriteStatus(false, 0.0);
writeStatus.setStat(new HoodieWriteStat());
writeStatus.getStat().setNumWrites(0);
handle.performMergeDataValidationCheck(writeStatus);
} catch (HoodieCorruptedDataException e1) {
fail("Exception not expected because merge validation check is disabled");
}
try {
final String newInstantTime = "006";
cfg.getProps().setProperty("hoodie.merge.data.validation.enabled", "true");
HoodieWriteConfig cfg2 = HoodieWriteConfig.newBuilder().withProps(cfg.getProps()).build();
HoodieMergeHandle handle = new HoodieMergeHandle(cfg2, newInstantTime, table, new HashMap<>(),
partitionPath, FSUtils.getFileId(baseFilePath.getName()), baseFile, new SparkTaskContextSupplier(),
config.populateMetaFields() ? Option.empty() :
Option.of((BaseKeyGenerator) HoodieSparkKeyGeneratorFactory.createKeyGenerator(new TypedProperties(config.getProps()))));
WriteStatus writeStatus = new WriteStatus(false, 0.0);
writeStatus.setStat(new HoodieWriteStat());
writeStatus.getStat().setNumWrites(0);
handle.performMergeDataValidationCheck(writeStatus);
fail("The above line should have thrown an exception");
} catch (HoodieCorruptedDataException e2) {
// expected
}
return true;
}).collect();
}
/**
* Test Insert API for HoodieConcatHandle.
*/
@ParameterizedTest
@MethodSource("populateMetaFieldsParams")
public void testInsertsWithHoodieConcatHandle(boolean populateMetaFields) throws Exception {
HoodieWriteConfig.Builder cfgBuilder = getConfigBuilder();
addConfigsForPopulateMetaFields(cfgBuilder, populateMetaFields);
testHoodieConcatHandle(cfgBuilder.build(), false);
}
/**
* Test InsertPrepped API for HoodieConcatHandle.
*/
@ParameterizedTest
@MethodSource("populateMetaFieldsParams")
public void testInsertsPreppedWithHoodieConcatHandle(boolean populateMetaFields) throws Exception {
HoodieWriteConfig.Builder cfgBuilder = getConfigBuilder();
addConfigsForPopulateMetaFields(cfgBuilder, populateMetaFields);
testHoodieConcatHandle(cfgBuilder.build(), true);
}
/**
* Test one of HoodieConcatHandle w/ {@link AbstractHoodieWriteClient#insert(Object, String)} API.
*
* @param config Write Config
* @throws Exception in case of error
*/
private void testHoodieConcatHandle(HoodieWriteConfig config, boolean isPrepped)
throws Exception {
// Force using older timeline layout
HoodieWriteConfig hoodieWriteConfig = getConfigBuilder()
.withProps(config.getProps()).withMergeAllowDuplicateOnInserts(true).withTimelineLayoutVersion(
VERSION_0).build();
HoodieTableMetaClient.withPropertyBuilder()
.fromMetaClient(metaClient)
.setTimelineLayoutVersion(VERSION_0)
.initTable(metaClient.getHadoopConf(), metaClient.getBasePath());
SparkRDDWriteClient client = getHoodieWriteClient(hoodieWriteConfig);
// Write 1 (only inserts)
String newCommitTime = "001";
String initCommitTime = "000";
int numRecords = 200;
insertFirstBatch(hoodieWriteConfig, client, newCommitTime, initCommitTime, numRecords, SparkRDDWriteClient::insert,
isPrepped, true, numRecords, config.populateMetaFields());
// Write 2 (updates)
String prevCommitTime = newCommitTime;
newCommitTime = "004";
numRecords = 100;
String commitTimeBetweenPrevAndNew = "002";
final Function2<List<HoodieRecord>, String, Integer> recordGenFunction =
generateWrapRecordsFn(isPrepped, hoodieWriteConfig, dataGen::generateUniqueUpdates);
writeBatch(client, newCommitTime, prevCommitTime, Option.of(Arrays.asList(commitTimeBetweenPrevAndNew)), initCommitTime,
numRecords, recordGenFunction, SparkRDDWriteClient::insert, true, numRecords, 300,
2, false, config.populateMetaFields());
}
/**
* Tests deletion of records.
*/
@ParameterizedTest
@MethodSource("populateMetaFieldsParams")
public void testDeletes(boolean populateMetaFields) throws Exception {
HoodieWriteConfig.Builder cfgBuilder = getConfigBuilder(HoodieFailedWritesCleaningPolicy.LAZY);
addConfigsForPopulateMetaFields(cfgBuilder, populateMetaFields);
SparkRDDWriteClient client = getHoodieWriteClient(cfgBuilder.build());
/**
* Write 1 (inserts and deletes) Write actual 200 insert records and ignore 100 delete records
*/
String initCommitTime = "000";
String newCommitTime = "001";
final List<HoodieRecord> recordsInFirstBatch = new ArrayList<>();
Function2<List<HoodieRecord>, String, Integer> recordGenFunction =
(String instantTime, Integer numRecordsInThisCommit) -> {
List<HoodieRecord> fewRecordsForInsert = dataGen.generateInserts(instantTime, 200);
List<HoodieRecord> fewRecordsForDelete = dataGen.generateDeletes(instantTime, 100);
recordsInFirstBatch.addAll(fewRecordsForInsert);
recordsInFirstBatch.addAll(fewRecordsForDelete);
return recordsInFirstBatch;
};
writeBatch(client, newCommitTime, initCommitTime, Option.empty(), initCommitTime,
// unused as genFn uses hard-coded number of inserts/updates/deletes
-1, recordGenFunction, SparkRDDWriteClient::upsert, true, 200, 200, 1, false,
populateMetaFields);
/**
* Write 2 (deletes+writes).
*/
String prevCommitTime = newCommitTime;
newCommitTime = "004";
final List<HoodieRecord> recordsInSecondBatch = new ArrayList<>();
recordGenFunction = (String instantTime, Integer numRecordsInThisCommit) -> {
List<HoodieRecord> fewRecordsForDelete = recordsInFirstBatch.subList(0, 50);
List<HoodieRecord> fewRecordsForUpdate = recordsInFirstBatch.subList(50, 100);
recordsInSecondBatch.addAll(dataGen.generateDeletesFromExistingRecords(fewRecordsForDelete));
recordsInSecondBatch.addAll(fewRecordsForUpdate);
return recordsInSecondBatch;
};
writeBatch(client, newCommitTime, prevCommitTime, Option.empty(), initCommitTime, 100, recordGenFunction,
SparkRDDWriteClient::upsert, true, 50, 150, 2, false,
populateMetaFields);
}
/**
* When records getting inserted are deleted in the same write batch, hudi should have deleted those records and
* not be available in read path.
* @throws Exception
*/
@ParameterizedTest
@MethodSource("populateMetaFieldsParams")
public void testDeletesForInsertsInSameBatch(boolean populateMetaFields) throws Exception {
HoodieWriteConfig.Builder cfgBuilder = getConfigBuilder(HoodieFailedWritesCleaningPolicy.LAZY);
addConfigsForPopulateMetaFields(cfgBuilder, populateMetaFields);
SparkRDDWriteClient client = getHoodieWriteClient(cfgBuilder.build());
/**
* Write 200 inserts and issue deletes to a subset(50) of inserts.
*/
String initCommitTime = "000";
String newCommitTime = "001";
final List<HoodieRecord> recordsInFirstBatch = new ArrayList<>();
Function2<List<HoodieRecord>, String, Integer> recordGenFunction =
(String instantTime, Integer numRecordsInThisCommit) -> {
List<HoodieRecord> fewRecordsForInsert = dataGen.generateInserts(instantTime, 200);
List<HoodieRecord> fewRecordsForDelete = fewRecordsForInsert.subList(40, 90);
recordsInFirstBatch.addAll(fewRecordsForInsert);
recordsInFirstBatch.addAll(dataGen.generateDeletesFromExistingRecords(fewRecordsForDelete));
return recordsInFirstBatch;
};
writeBatch(client, newCommitTime, initCommitTime, Option.empty(), initCommitTime,
-1, recordGenFunction, SparkRDDWriteClient::upsert, true, 150, 150, 1, false,
populateMetaFields);
}
/**
* Test update of a record to different partition with Global Index.
*/
@ParameterizedTest
@EnumSource(value = IndexType.class, names = {"GLOBAL_BLOOM", "GLOBAL_SIMPLE"})
public void testUpsertsUpdatePartitionPathGlobalBloom(IndexType indexType) throws Exception {
testUpsertsUpdatePartitionPath(indexType, getConfig(), SparkRDDWriteClient::upsert);
}
/**
* This test ensures in a global bloom when update partition path is set to true in config, if an incoming record has mismatched partition
* compared to whats in storage, then appropriate actions are taken. i.e. old record is deleted in old partition and new one is inserted
* in the new partition.
* test structure:
* 1. insert 1 batch
* 2. insert 2nd batch with larger no of records so that a new file group is created for partitions
* 3. issue upserts to records from batch 1 with different partition path. This should ensure records from batch 1 are deleted and new
* records are upserted to the new partition
*
* @param indexType index type to be tested for
* @param config instance of {@link HoodieWriteConfig} to use
* @param writeFn write function to be used for testing
*/
private void testUpsertsUpdatePartitionPath(IndexType indexType, HoodieWriteConfig config,
Function3<JavaRDD<WriteStatus>, SparkRDDWriteClient, JavaRDD<HoodieRecord>, String> writeFn)
throws Exception {
// instantiate client
HoodieWriteConfig hoodieWriteConfig = getConfigBuilder()
.withProps(config.getProps())
.withCompactionConfig(
HoodieCompactionConfig.newBuilder().compactionSmallFileSize(10000).build())
.withIndexConfig(HoodieIndexConfig.newBuilder().withIndexType(indexType)
.withBloomIndexUpdatePartitionPath(true)
.withGlobalSimpleIndexUpdatePartitionPath(true)
.build()).withTimelineLayoutVersion(VERSION_0).build();
HoodieTableMetaClient.withPropertyBuilder()
.fromMetaClient(metaClient)
.setTimelineLayoutVersion(VERSION_0)
.initTable(metaClient.getHadoopConf(), metaClient.getBasePath());
// Set rollback to LAZY so no inflights are deleted
hoodieWriteConfig.getProps().put(HoodieCompactionConfig.FAILED_WRITES_CLEANER_POLICY_PROP.key(),
HoodieFailedWritesCleaningPolicy.LAZY.name());
SparkRDDWriteClient client = getHoodieWriteClient(hoodieWriteConfig);
// Write 1
String newCommitTime = "001";
int numRecords = 10;
client.startCommitWithTime(newCommitTime);
List<HoodieRecord> records = dataGen.generateInserts(newCommitTime, numRecords);
Set<Pair<String, String>> expectedPartitionPathRecKeyPairs = new HashSet<>();
// populate expected partition path and record keys
for (HoodieRecord rec : records) {
expectedPartitionPathRecKeyPairs.add(Pair.of(rec.getPartitionPath(), rec.getRecordKey()));
}
JavaRDD<HoodieRecord> writeRecords = jsc.parallelize(records, 1);
JavaRDD<WriteStatus> result = writeFn.apply(client, writeRecords, newCommitTime);
result.collect();
// Check the entire dataset has all records
String[] fullPartitionPaths = getFullPartitionPaths();
assertPartitionPathRecordKeys(expectedPartitionPathRecKeyPairs, fullPartitionPaths);
// verify one basefile per partition
String[] fullExpectedPartitionPaths = getFullPartitionPaths(expectedPartitionPathRecKeyPairs.stream().map(Pair::getLeft).toArray(String[]::new));
Map<String, Long> baseFileCounts = getBaseFileCountsForPaths(basePath, fs, fullExpectedPartitionPaths);
for (Map.Entry<String, Long> entry : baseFileCounts.entrySet()) {
assertEquals(1, entry.getValue());
}
assertTrue(baseFileCounts.entrySet().stream().allMatch(entry -> entry.getValue() == 1));
// Write 2
newCommitTime = "002";
numRecords = 20; // so that a new file id is created
client.startCommitWithTime(newCommitTime);
List<HoodieRecord> recordsSecondBatch = dataGen.generateInserts(newCommitTime, numRecords);
// populate expected partition path and record keys
for (HoodieRecord rec : recordsSecondBatch) {
expectedPartitionPathRecKeyPairs.add(Pair.of(rec.getPartitionPath(), rec.getRecordKey()));
}
writeRecords = jsc.parallelize(recordsSecondBatch, 1);
result = writeFn.apply(client, writeRecords, newCommitTime);
result.collect();
// Check the entire dataset has all records
fullPartitionPaths = getFullPartitionPaths();
assertPartitionPathRecordKeys(expectedPartitionPathRecKeyPairs, fullPartitionPaths);
// verify that there are more than 1 basefiles per partition
// we can't guarantee randomness in partitions where records are distributed. So, verify atleast one partition has more than 1 basefile.
baseFileCounts = getBaseFileCountsForPaths(basePath, fs, fullPartitionPaths);
assertTrue(baseFileCounts.entrySet().stream().filter(entry -> entry.getValue() > 1).count() >= 1,
"At least one partition should have more than 1 base file after 2nd batch of writes");
// Write 3 (upserts to records from batch 1 with diff partition path)
newCommitTime = "003";
// update to diff partition paths
List<HoodieRecord> recordsToUpsert = new ArrayList<>();
for (HoodieRecord rec : records) {
// remove older entry from expected partition path record key pairs
expectedPartitionPathRecKeyPairs
.remove(Pair.of(rec.getPartitionPath(), rec.getRecordKey()));
String partitionPath = rec.getPartitionPath();
String newPartitionPath = null;
if (partitionPath.equalsIgnoreCase(DEFAULT_FIRST_PARTITION_PATH)) {
newPartitionPath = DEFAULT_SECOND_PARTITION_PATH;
} else if (partitionPath.equalsIgnoreCase(DEFAULT_SECOND_PARTITION_PATH)) {
newPartitionPath = DEFAULT_THIRD_PARTITION_PATH;
} else if (partitionPath.equalsIgnoreCase(DEFAULT_THIRD_PARTITION_PATH)) {
newPartitionPath = DEFAULT_FIRST_PARTITION_PATH;
} else {
throw new IllegalStateException("Unknown partition path " + rec.getPartitionPath());
}
recordsToUpsert.add(
new HoodieRecord(new HoodieKey(rec.getRecordKey(), newPartitionPath),
rec.getData()));
// populate expected partition path and record keys
expectedPartitionPathRecKeyPairs.add(Pair.of(newPartitionPath, rec.getRecordKey()));
}
writeRecords = jsc.parallelize(recordsToUpsert, 1);
result = writeFn.apply(client, writeRecords, newCommitTime);
result.collect();
// Check the entire dataset has all records
fullPartitionPaths = getFullPartitionPaths();
assertPartitionPathRecordKeys(expectedPartitionPathRecKeyPairs, fullPartitionPaths);
}
private void assertPartitionPathRecordKeys(Set<Pair<String, String>> expectedPartitionPathRecKeyPairs, String[] fullPartitionPaths) {
Dataset<Row> rows = getAllRows(fullPartitionPaths);
List<Pair<String, String>> actualPartitionPathRecKeyPairs = getActualPartitionPathAndRecordKeys(rows);
// verify all partitionpath, record key matches
assertActualAndExpectedPartitionPathRecordKeyMatches(expectedPartitionPathRecKeyPairs, actualPartitionPathRecKeyPairs);
}
private List<Pair<String, String>> getActualPartitionPathAndRecordKeys(Dataset<org.apache.spark.sql.Row> rows) {
List<Pair<String, String>> actualPartitionPathRecKeyPairs = new ArrayList<>();
for (Row row : rows.collectAsList()) {
actualPartitionPathRecKeyPairs
.add(Pair.of(row.getAs("_hoodie_partition_path"), row.getAs("_row_key")));
}
return actualPartitionPathRecKeyPairs;
}
private Dataset<org.apache.spark.sql.Row> getAllRows(String[] fullPartitionPaths) {
return HoodieClientTestUtils
.read(jsc, basePath, sqlContext, fs, fullPartitionPaths);
}
private String[] getFullPartitionPaths() {
return getFullPartitionPaths(dataGen.getPartitionPaths());
}
private String[] getFullPartitionPaths(String[] relativePartitionPaths) {
String[] fullPartitionPaths = new String[relativePartitionPaths.length];
for (int i = 0; i < fullPartitionPaths.length; i++) {
fullPartitionPaths[i] = String.format("%s/%s/*", basePath, relativePartitionPaths[i]);
}
return fullPartitionPaths;
}
private void assertActualAndExpectedPartitionPathRecordKeyMatches(Set<Pair<String, String>> expectedPartitionPathRecKeyPairs,
List<Pair<String, String>> actualPartitionPathRecKeyPairs) {
// verify all partitionpath, record key matches
assertEquals(expectedPartitionPathRecKeyPairs.size(), actualPartitionPathRecKeyPairs.size());
for (Pair<String, String> entry : actualPartitionPathRecKeyPairs) {
assertTrue(expectedPartitionPathRecKeyPairs.contains(entry));
}
for (Pair<String, String> entry : expectedPartitionPathRecKeyPairs) {
assertTrue(actualPartitionPathRecKeyPairs.contains(entry));
}
}
private Pair<List<WriteStatus>, List<HoodieRecord>> insertBatchRecords(SparkRDDWriteClient client, String commitTime,
Integer recordNum, int expectStatueSize) {
client.startCommitWithTime(commitTime);
List<HoodieRecord> inserts1 = dataGen.generateInserts(commitTime, recordNum);
JavaRDD<HoodieRecord> insertRecordsRDD1 = jsc.parallelize(inserts1, 1);
List<WriteStatus> statuses = client.upsert(insertRecordsRDD1, commitTime).collect();
assertNoWriteErrors(statuses);
assertEquals(expectStatueSize, statuses.size(), "check expect statue size.");
return Pair.of(statuses, inserts1);
}
@Test
public void testUpdateRejectForClustering() throws IOException {
final String testPartitionPath = "2016/09/26";
dataGen = new HoodieTestDataGenerator(new String[] {testPartitionPath});
Properties props = new Properties();
props.setProperty(ASYNC_CLUSTERING_ENABLE.key(), "true");
HoodieWriteConfig config = getSmallInsertWriteConfig(100,
TRIP_EXAMPLE_SCHEMA, dataGen.getEstimatedFileSizeInBytes(150), true, props);
SparkRDDWriteClient client = getHoodieWriteClient(config);
HoodieSparkCopyOnWriteTable table = (HoodieSparkCopyOnWriteTable) HoodieSparkTable.create(config, context, metaClient);
//1. insert to generate 2 file group
String commitTime1 = "001";
Pair<List<WriteStatus>, List<HoodieRecord>> upsertResult = insertBatchRecords(client, commitTime1, 600, 2);
List<HoodieRecord> inserts1 = upsertResult.getValue();
List<String> fileGroupIds1 = table.getFileSystemView().getAllFileGroups(testPartitionPath)
.map(fileGroup -> fileGroup.getFileGroupId().getFileId()).collect(Collectors.toList());
assertEquals(2, fileGroupIds1.size());
// 2. generate clustering plan for fileGroupIds1 file groups
String commitTime2 = "002";
List<List<FileSlice>> firstInsertFileSlicesList = table.getFileSystemView().getAllFileGroups(testPartitionPath)
.map(fileGroup -> fileGroup.getAllFileSlices().collect(Collectors.toList())).collect(Collectors.toList());
List<FileSlice>[] fileSlices = (List<FileSlice>[])firstInsertFileSlicesList.toArray(new List[firstInsertFileSlicesList.size()]);
createRequestedReplaceInstant(this.metaClient, commitTime2, fileSlices);
// 3. insert one record with no updating reject exception, and not merge the small file, just generate a new file group
String commitTime3 = "003";
insertBatchRecords(client, commitTime3, 1, 1).getKey();
List<String> fileGroupIds2 = table.getFileSystemView().getAllFileGroups(testPartitionPath)
.map(fileGroup -> fileGroup.getFileGroupId().getFileId()).collect(Collectors.toList());
assertEquals(3, fileGroupIds2.size());
// 4. update one record for the clustering two file groups, throw reject update exception
String commitTime4 = "004";
client.startCommitWithTime(commitTime4);
List<HoodieRecord> insertsAndUpdates3 = new ArrayList<>();
insertsAndUpdates3.addAll(dataGen.generateUpdates(commitTime4, inserts1));
String assertMsg = String.format("Not allowed to update the clustering files in partition: %s "
+ "For pending clustering operations, we are not going to support update for now.", testPartitionPath);
assertThrows(HoodieUpsertException.class, () -> {
writeClient.upsert(jsc.parallelize(insertsAndUpdates3, 1), commitTime3).collect(); }, assertMsg);
// 5. insert one record with no updating reject exception, will merge the small file
String commitTime5 = "005";
List<WriteStatus> statuses = insertBatchRecords(client, commitTime5, 1, 1).getKey();
fileGroupIds2.removeAll(fileGroupIds1);
assertEquals(fileGroupIds2.get(0), statuses.get(0).getFileId());
List<String> firstInsertFileGroupIds4 = table.getFileSystemView().getAllFileGroups(testPartitionPath)
.map(fileGroup -> fileGroup.getFileGroupId().getFileId()).collect(Collectors.toList());
assertEquals(3, firstInsertFileGroupIds4.size());
}
/**
* Test scenario of new file-group getting added during upsert().
*/
@Test
public void testSmallInsertHandlingForUpserts() throws Exception {
final String testPartitionPath = "2016/09/26";
final int insertSplitLimit = 100;
// setup the small file handling params
// hold upto 200 records max
HoodieWriteConfig config = getSmallInsertWriteConfig(insertSplitLimit,
TRIP_EXAMPLE_SCHEMA, dataGen.getEstimatedFileSizeInBytes(150));
dataGen = new HoodieTestDataGenerator(new String[] {testPartitionPath});
SparkRDDWriteClient client = getHoodieWriteClient(config);
BaseFileUtils fileUtils = BaseFileUtils.getInstance(metaClient);
// Inserts => will write file1
String commitTime1 = "001";
client.startCommitWithTime(commitTime1);
List<HoodieRecord> inserts1 = dataGen.generateInserts(commitTime1, insertSplitLimit); // this writes ~500kb
Set<String> keys1 = recordsToRecordKeySet(inserts1);
JavaRDD<HoodieRecord> insertRecordsRDD1 = jsc.parallelize(inserts1, 1);
List<WriteStatus> statuses = client.upsert(insertRecordsRDD1, commitTime1).collect();
assertNoWriteErrors(statuses);
assertEquals(1, statuses.size(), "Just 1 file needs to be added.");
String file1 = statuses.get(0).getFileId();
assertEquals(100,
fileUtils.readRowKeys(hadoopConf, new Path(basePath, statuses.get(0).getStat().getPath()))
.size(), "file should contain 100 records");
// Update + Inserts such that they just expand file1
String commitTime2 = "002";
client.startCommitWithTime(commitTime2);
List<HoodieRecord> inserts2 = dataGen.generateInserts(commitTime2, 40);
Set<String> keys2 = recordsToRecordKeySet(inserts2);
List<HoodieRecord> insertsAndUpdates2 = new ArrayList<>();
insertsAndUpdates2.addAll(inserts2);
insertsAndUpdates2.addAll(dataGen.generateUpdates(commitTime2, inserts1));
JavaRDD<HoodieRecord> insertAndUpdatesRDD2 = jsc.parallelize(insertsAndUpdates2, 1);
statuses = client.upsert(insertAndUpdatesRDD2, commitTime2).collect();
assertNoWriteErrors(statuses);
assertEquals(1, statuses.size(), "Just 1 file needs to be updated.");
assertEquals(file1, statuses.get(0).getFileId(), "Existing file should be expanded");
assertEquals(commitTime1, statuses.get(0).getStat().getPrevCommit(), "Existing file should be expanded");
Path newFile = new Path(basePath, statuses.get(0).getStat().getPath());
assertEquals(140, fileUtils.readRowKeys(hadoopConf, newFile).size(),
"file should contain 140 records");
List<GenericRecord> records = fileUtils.readAvroRecords(hadoopConf, newFile);
for (GenericRecord record : records) {
String recordKey = record.get(HoodieRecord.RECORD_KEY_METADATA_FIELD).toString();
assertEquals(commitTime2, record.get(HoodieRecord.COMMIT_TIME_METADATA_FIELD).toString(), "only expect commit2");
assertTrue(keys2.contains(recordKey) || keys1.contains(recordKey), "key expected to be part of commit2");
}
// update + inserts such that file1 is updated and expanded, a new file2 is created.
String commitTime3 = "003";
client.startCommitWithTime(commitTime3);
List<HoodieRecord> insertsAndUpdates3 = dataGen.generateInserts(commitTime3, 200);
Set<String> keys3 = recordsToRecordKeySet(insertsAndUpdates3);
List<HoodieRecord> updates3 = dataGen.generateUpdates(commitTime3, inserts2);
insertsAndUpdates3.addAll(updates3);
JavaRDD<HoodieRecord> insertAndUpdatesRDD3 = jsc.parallelize(insertsAndUpdates3, 1);
statuses = client.upsert(insertAndUpdatesRDD3, commitTime3).collect();
assertNoWriteErrors(statuses);
assertEquals(2, statuses.size(), "2 files needs to be committed.");
HoodieTableMetaClient metadata = HoodieTableMetaClient.builder().setConf(hadoopConf).setBasePath(basePath).build();
HoodieTable table = getHoodieTable(metadata, config);
BaseFileOnlyView fileSystemView = table.getBaseFileOnlyView();
List<HoodieBaseFile> files =
fileSystemView.getLatestBaseFilesBeforeOrOn(testPartitionPath, commitTime3).collect(Collectors.toList());
int numTotalInsertsInCommit3 = 0;
int numTotalUpdatesInCommit3 = 0;
for (HoodieBaseFile file : files) {
if (file.getFileName().contains(file1)) {
assertEquals(commitTime3, file.getCommitTime(), "Existing file should be expanded");
records = fileUtils.readAvroRecords(hadoopConf, new Path(file.getPath()));
for (GenericRecord record : records) {
String recordKey = record.get(HoodieRecord.RECORD_KEY_METADATA_FIELD).toString();
String recordCommitTime = record.get(HoodieRecord.COMMIT_TIME_METADATA_FIELD).toString();
if (recordCommitTime.equals(commitTime3)) {
if (keys2.contains(recordKey)) {
keys2.remove(recordKey);
numTotalUpdatesInCommit3++;
} else {
numTotalInsertsInCommit3++;
}
}
}
assertEquals(0, keys2.size(), "All keys added in commit 2 must be updated in commit3 correctly");
} else {
assertEquals(commitTime3, file.getCommitTime(), "New file must be written for commit 3");
records = fileUtils.readAvroRecords(hadoopConf, new Path(file.getPath()));
for (GenericRecord record : records) {
String recordKey = record.get(HoodieRecord.RECORD_KEY_METADATA_FIELD).toString();
assertEquals(commitTime3, record.get(HoodieRecord.COMMIT_TIME_METADATA_FIELD).toString(),
"only expect commit3");
assertTrue(keys3.contains(recordKey), "key expected to be part of commit3");
}
numTotalInsertsInCommit3 += records.size();
}
}
assertEquals(numTotalUpdatesInCommit3, inserts2.size(), "Total updates in commit3 must add up");
assertEquals(numTotalInsertsInCommit3, keys3.size(), "Total inserts in commit3 must add up");
}
/**
* Test scenario of new file-group getting added during insert().
*/
@ParameterizedTest
@MethodSource("smallInsertHandlingParams")
public void testSmallInsertHandlingForInserts(boolean mergeAllowDuplicateInserts) throws Exception {
final String testPartitionPath = "2016/09/26";
final int insertSplitLimit = 100;
// setup the small file handling params
HoodieWriteConfig config = getSmallInsertWriteConfig(insertSplitLimit, false, mergeAllowDuplicateInserts); // hold upto 200 records max
dataGen = new HoodieTestDataGenerator(new String[] {testPartitionPath});
SparkRDDWriteClient client = getHoodieWriteClient(config);
BaseFileUtils fileUtils = BaseFileUtils.getInstance(metaClient);
// Inserts => will write file1
String commitTime1 = "001";
client.startCommitWithTime(commitTime1);
List<HoodieRecord> inserts1 = dataGen.generateInserts(commitTime1, insertSplitLimit); // this writes ~500kb
Set<String> keys1 = recordsToRecordKeySet(inserts1);
JavaRDD<HoodieRecord> insertRecordsRDD1 = jsc.parallelize(inserts1, 1);
List<WriteStatus> statuses = client.insert(insertRecordsRDD1, commitTime1).collect();
assertNoWriteErrors(statuses);
assertPartitionMetadata(new String[] {testPartitionPath}, fs);
assertEquals(1, statuses.size(), "Just 1 file needs to be added.");
String file1 = statuses.get(0).getFileId();
assertEquals(100,
fileUtils.readRowKeys(hadoopConf, new Path(basePath, statuses.get(0).getStat().getPath()))
.size(), "file should contain 100 records");
// Second, set of Inserts should just expand file1
String commitTime2 = "002";
client.startCommitWithTime(commitTime2);
List<HoodieRecord> inserts2 = dataGen.generateInserts(commitTime2, 40);
Set<String> keys2 = recordsToRecordKeySet(inserts2);
JavaRDD<HoodieRecord> insertRecordsRDD2 = jsc.parallelize(inserts2, 1);
statuses = client.insert(insertRecordsRDD2, commitTime2).collect();
assertNoWriteErrors(statuses);
assertEquals(1, statuses.size(), "Just 1 file needs to be updated.");
assertEquals(file1, statuses.get(0).getFileId(), "Existing file should be expanded");
assertEquals(commitTime1, statuses.get(0).getStat().getPrevCommit(), "Existing file should be expanded");
Path newFile = new Path(basePath, statuses.get(0).getStat().getPath());
assertEquals(140, fileUtils.readRowKeys(hadoopConf, newFile).size(),
"file should contain 140 records");
List<GenericRecord> records = fileUtils.readAvroRecords(hadoopConf, newFile);
for (GenericRecord record : records) {
String recordKey = record.get(HoodieRecord.RECORD_KEY_METADATA_FIELD).toString();
String recCommitTime = record.get(HoodieRecord.COMMIT_TIME_METADATA_FIELD).toString();
assertTrue(commitTime1.equals(recCommitTime) || commitTime2.equals(recCommitTime),
"Record expected to be part of commit 1 or commit2");
assertTrue(keys2.contains(recordKey) || keys1.contains(recordKey),
"key expected to be part of commit 1 or commit2");
}
// Lots of inserts such that file1 is updated and expanded, a new file2 is created.
String commitTime3 = "003";
client.startCommitWithTime(commitTime3);
List<HoodieRecord> inserts3 = dataGen.generateInserts(commitTime3, 200);
JavaRDD<HoodieRecord> insertRecordsRDD3 = jsc.parallelize(inserts3, 1);
statuses = client.insert(insertRecordsRDD3, commitTime3).collect();
assertNoWriteErrors(statuses);
assertEquals(2, statuses.size(), "2 files needs to be committed.");
assertEquals(340,
fileUtils.readRowKeys(hadoopConf, new Path(basePath, statuses.get(0).getStat().getPath())).size()
+ fileUtils.readRowKeys(hadoopConf, new Path(basePath, statuses.get(1).getStat().getPath())).size(),
"file should contain 340 records");
HoodieTableMetaClient metaClient = HoodieTableMetaClient.builder().setConf(hadoopConf).setBasePath(basePath).build();
HoodieTable table = getHoodieTable(metaClient, config);
List<HoodieBaseFile> files = table.getBaseFileOnlyView()
.getLatestBaseFilesBeforeOrOn(testPartitionPath, commitTime3).collect(Collectors.toList());
assertEquals(2, files.size(), "Total of 2 valid data files");
int totalInserts = 0;
for (HoodieBaseFile file : files) {
assertEquals(commitTime3, file.getCommitTime(), "All files must be at commit 3");
totalInserts += fileUtils.readAvroRecords(hadoopConf, new Path(file.getPath())).size();
}
assertEquals(totalInserts, inserts1.size() + inserts2.size() + inserts3.size(), "Total number of records must add up");
}
/**
* Test delete with delete api.
*/
@Test
public void testDeletesWithDeleteApi() throws Exception {
final String testPartitionPath = "2016/09/26";
final int insertSplitLimit = 100;
// setup the small file handling params
// hold upto 200 records max
HoodieWriteConfig config = getSmallInsertWriteConfig(insertSplitLimit,
TRIP_EXAMPLE_SCHEMA, dataGen.getEstimatedFileSizeInBytes(150));
dataGen = new HoodieTestDataGenerator(new String[] {testPartitionPath});
SparkRDDWriteClient client = getHoodieWriteClient(config);
// Inserts => will write file1
String commitTime1 = "001";
client.startCommitWithTime(commitTime1);
List<HoodieRecord> inserts1 = dataGen.generateInserts(commitTime1, insertSplitLimit); // this writes ~500kb
Set<String> keys1 = recordsToRecordKeySet(inserts1);
List<String> keysSoFar = new ArrayList<>(keys1);
JavaRDD<HoodieRecord> insertRecordsRDD1 = jsc.parallelize(inserts1, 1);
List<WriteStatus> statuses = client.upsert(insertRecordsRDD1, commitTime1).collect();
assertNoWriteErrors(statuses);
assertEquals(1, statuses.size(), "Just 1 file needs to be added.");
String file1 = statuses.get(0).getFileId();
assertEquals(100,
BaseFileUtils.getInstance(metaClient).readRowKeys(hadoopConf, new Path(basePath, statuses.get(0).getStat().getPath()))
.size(), "file should contain 100 records");
// Delete 20 among 100 inserted
testDeletes(client, inserts1, 20, file1, "002", 80, keysSoFar);
// Insert and update 40 records
Pair<Set<String>, List<HoodieRecord>> updateBatch2 = testUpdates("003", client, 40, 120);
keysSoFar.addAll(updateBatch2.getLeft());
// Delete 10 records among 40 updated
testDeletes(client, updateBatch2.getRight(), 10, file1, "004", 110, keysSoFar);
// do another batch of updates
Pair<Set<String>, List<HoodieRecord>> updateBatch3 = testUpdates("005", client, 40, 150);
keysSoFar.addAll(updateBatch3.getLeft());
// delete non existent keys
String commitTime6 = "006";
client.startCommitWithTime(commitTime6);
List<HoodieRecord> dummyInserts3 = dataGen.generateInserts(commitTime6, 20);
List<HoodieKey> hoodieKeysToDelete3 = randomSelectAsHoodieKeys(dummyInserts3, 20);
JavaRDD<HoodieKey> deleteKeys3 = jsc.parallelize(hoodieKeysToDelete3, 1);
statuses = client.delete(deleteKeys3, commitTime6).collect();
assertNoWriteErrors(statuses);
assertEquals(0, statuses.size(), "Just 0 write status for delete.");
// Check the entire dataset has all records still
String[] fullPartitionPaths = new String[dataGen.getPartitionPaths().length];
for (int i = 0; i < fullPartitionPaths.length; i++) {
fullPartitionPaths[i] = String.format("%s/%s/*", basePath, dataGen.getPartitionPaths()[i]);
}
assertEquals(150,
HoodieClientTestUtils.read(jsc, basePath, sqlContext, fs, fullPartitionPaths).count(),
"Must contain " + 150 + " records");
// delete another batch. previous delete commit should have persisted the schema. If not,
// this will throw exception
testDeletes(client, updateBatch3.getRight(), 10, file1, "007", 140, keysSoFar);
}
@ParameterizedTest
@MethodSource("populateMetaFieldsParams")
public void testSimpleClustering(boolean populateMetaFields) throws Exception {
// setup clustering config.
HoodieClusteringConfig clusteringConfig = HoodieClusteringConfig.newBuilder().withClusteringMaxNumGroups(10)
.withClusteringTargetPartitions(0).withInlineClusteringNumCommits(1).build();
testInsertAndClustering(clusteringConfig, populateMetaFields, true, SqlQueryEqualityPreCommitValidator.class.getName(), COUNT_SQL_QUERY_FOR_VALIDATION, "");
}
@ParameterizedTest
@MethodSource("populateMetaFieldsParams")
public void testClusteringWithSortColumns(boolean populateMetaFields) throws Exception {
// setup clustering config.
HoodieClusteringConfig clusteringConfig = HoodieClusteringConfig.newBuilder().withClusteringMaxNumGroups(10)
.withClusteringSortColumns(populateMetaFields ? "_hoodie_record_key" : "_row_key")
.withClusteringTargetPartitions(0).withInlineClusteringNumCommits(1).build();
testInsertAndClustering(clusteringConfig, populateMetaFields, true, SqlQueryEqualityPreCommitValidator.class.getName(), COUNT_SQL_QUERY_FOR_VALIDATION, "");
}
@ParameterizedTest
@MethodSource("populateMetaFieldsParams")
public void testPendingClusteringRollback(boolean populateMetaFields) throws Exception {
// setup clustering config.
HoodieClusteringConfig clusteringConfig = HoodieClusteringConfig.newBuilder().withClusteringMaxNumGroups(10)
.withClusteringTargetPartitions(0).withInlineClusteringNumCommits(1).build();
// start clustering, but don't commit
List<HoodieRecord> allRecords = testInsertAndClustering(clusteringConfig, populateMetaFields, false);
HoodieTableMetaClient metaClient = HoodieTableMetaClient.builder().setConf(hadoopConf).setBasePath(basePath).build();
List<Pair<HoodieInstant, HoodieClusteringPlan>> pendingClusteringPlans =
ClusteringUtils.getAllPendingClusteringPlans(metaClient).collect(Collectors.toList());
assertEquals(1, pendingClusteringPlans.size());
HoodieInstant pendingClusteringInstant = pendingClusteringPlans.get(0).getLeft();
// complete another commit after pending clustering
HoodieWriteConfig.Builder cfgBuilder = getConfigBuilder(HoodieFailedWritesCleaningPolicy.EAGER);
addConfigsForPopulateMetaFields(cfgBuilder, populateMetaFields);
HoodieWriteConfig config = cfgBuilder.build();
SparkRDDWriteClient client = getHoodieWriteClient(config);
dataGen = new HoodieTestDataGenerator();
String commitTime = HoodieActiveTimeline.createNewInstantTime();
allRecords.addAll(dataGen.generateInserts(commitTime, 200));
writeAndVerifyBatch(client, allRecords, commitTime, populateMetaFields);
// verify pending clustering can be rolled back (even though there is a completed commit greater than pending clustering)
client.rollback(pendingClusteringInstant.getTimestamp());
metaClient.reloadActiveTimeline();
// verify there are no pending clustering instants
assertEquals(0, ClusteringUtils.getAllPendingClusteringPlans(metaClient).count());
}
@Test
public void testClusteringWithFailingValidator() throws Exception {
// setup clustering config.
HoodieClusteringConfig clusteringConfig = HoodieClusteringConfig.newBuilder().withClusteringMaxNumGroups(10)
.withClusteringSortColumns("_hoodie_record_key")
.withClusteringTargetPartitions(0).withInlineClusteringNumCommits(1).build();
try {
testInsertAndClustering(clusteringConfig, true, true, FailingPreCommitValidator.class.getName(), COUNT_SQL_QUERY_FOR_VALIDATION, "");
fail("expected pre-commit clustering validation to fail");
} catch (HoodieValidationException e) {
// expected
}
}
@Test
public void testClusteringInvalidConfigForSqlQueryValidator() throws Exception {
// setup clustering config.
HoodieClusteringConfig clusteringConfig = HoodieClusteringConfig.newBuilder().withClusteringMaxNumGroups(10)
.withClusteringTargetPartitions(0).withInlineClusteringNumCommits(1).build();
try {
testInsertAndClustering(clusteringConfig, false, true, SqlQueryEqualityPreCommitValidator.class.getName(), "", "");
fail("expected pre-commit clustering validation to fail because sql query is not configured");
} catch (HoodieValidationException e) {
// expected
}
}
@Test
public void testClusteringInvalidConfigForSqlQuerySingleResultValidator() throws Exception {
// setup clustering config.
HoodieClusteringConfig clusteringConfig = HoodieClusteringConfig.newBuilder().withClusteringMaxNumGroups(10)
.withClusteringTargetPartitions(0).withInlineClusteringNumCommits(1).build();
testInsertAndClustering(clusteringConfig, false, true, SqlQuerySingleResultPreCommitValidator.class.getName(),
"", COUNT_SQL_QUERY_FOR_VALIDATION + "#400");
}
@Test
public void testClusteringInvalidConfigForSqlQuerySingleResultValidatorFailure() throws Exception {
// setup clustering config.
HoodieClusteringConfig clusteringConfig = HoodieClusteringConfig.newBuilder().withClusteringMaxNumGroups(10)
.withClusteringTargetPartitions(0).withInlineClusteringNumCommits(1).build();
try {
testInsertAndClustering(clusteringConfig, false, true, SqlQuerySingleResultPreCommitValidator.class.getName(),
"", COUNT_SQL_QUERY_FOR_VALIDATION + "#802");
fail("expected pre-commit clustering validation to fail because of count mismatch. expect 400 rows, not 802");
} catch (HoodieValidationException e) {
// expected
}
}
private List<HoodieRecord> testInsertAndClustering(HoodieClusteringConfig clusteringConfig, boolean populateMetaFields, boolean completeClustering) throws Exception {
return testInsertAndClustering(clusteringConfig, populateMetaFields, completeClustering, "", "", "");
}
private List<HoodieRecord> testInsertAndClustering(HoodieClusteringConfig clusteringConfig, boolean populateMetaFields,
boolean completeClustering, String validatorClasses,
String sqlQueryForEqualityValidation, String sqlQueryForSingleResultValidation) throws Exception {
List<HoodieRecord> allRecords = testInsertTwoBatches(populateMetaFields);
testClustering(clusteringConfig, populateMetaFields, completeClustering, validatorClasses, sqlQueryForEqualityValidation, sqlQueryForSingleResultValidation, allRecords);
return allRecords;
}
private List<HoodieRecord> testInsertTwoBatches(boolean populateMetaFields) throws IOException {
// create config to not update small files.
HoodieWriteConfig config = getSmallInsertWriteConfig(2000, TRIP_EXAMPLE_SCHEMA, 10, false, populateMetaFields,
populateMetaFields ? new Properties() : getPropertiesForKeyGen());
SparkRDDWriteClient client = getHoodieWriteClient(config);
dataGen = new HoodieTestDataGenerator(new String[] {"2015/03/16"});
String commitTime = HoodieActiveTimeline.createNewInstantTime();
List<HoodieRecord> records1 = dataGen.generateInserts(commitTime, 200);
List<WriteStatus> statuses1 = writeAndVerifyBatch(client, records1, commitTime, populateMetaFields);
Set<HoodieFileGroupId> fileIds1 = getFileGroupIdsFromWriteStatus(statuses1);
commitTime = HoodieActiveTimeline.createNewInstantTime();
List<HoodieRecord> records2 = dataGen.generateInserts(commitTime, 200);
List<WriteStatus> statuses2 = writeAndVerifyBatch(client, records2, commitTime, populateMetaFields);
Set<HoodieFileGroupId> fileIds2 = getFileGroupIdsFromWriteStatus(statuses2);
//verify new files are created for 2nd write
Set<HoodieFileGroupId> fileIdIntersection = new HashSet<>(fileIds1);
fileIdIntersection.retainAll(fileIds2);
assertEquals(0, fileIdIntersection.size());
return Stream.concat(records1.stream(), records2.stream()).collect(Collectors.toList());
}
private String testClustering(HoodieClusteringConfig clusteringConfig, boolean populateMetaFields, boolean completeClustering,
String validatorClasses, String sqlQueryForEqualityValidation, String sqlQueryForSingleResultValidation,
List<HoodieRecord> allRecords) throws IOException {
HoodieWriteConfig config = getConfigBuilder(HoodieFailedWritesCleaningPolicy.LAZY).withAutoCommit(false)
.withClusteringConfig(clusteringConfig)
.withProps(getPropertiesForKeyGen()).build();
HoodieWriteMetadata<JavaRDD<WriteStatus>> clusterMetadata =
performClustering(clusteringConfig, populateMetaFields, completeClustering, validatorClasses, sqlQueryForEqualityValidation, sqlQueryForSingleResultValidation, allRecords);
if (completeClustering) {
String clusteringCommitTime = metaClient.reloadActiveTimeline().getCompletedReplaceTimeline()
.getReverseOrderedInstants().findFirst().get().getTimestamp();
verifyRecordsWritten(clusteringCommitTime, populateMetaFields, allRecords, clusterMetadata.getWriteStatuses().collect(), config);
return clusteringCommitTime;
} else {
return "";
}
}
private HoodieWriteMetadata<JavaRDD<WriteStatus>> performClustering(HoodieClusteringConfig clusteringConfig,
boolean populateMetaFields,
boolean completeClustering,
String validatorClasses,
String sqlQueryForEqualityValidation, String sqlQueryForSingleResultValidation,
List<HoodieRecord> allRecords) throws IOException {
HoodiePreCommitValidatorConfig validatorConfig = HoodiePreCommitValidatorConfig.newBuilder()
.withPreCommitValidator(StringUtils.nullToEmpty(validatorClasses))
.withPrecommitValidatorEqualitySqlQueries(sqlQueryForEqualityValidation)
.withPrecommitValidatorSingleResultSqlQueries(sqlQueryForSingleResultValidation)
.build();
HoodieWriteConfig config = getConfigBuilder().withAutoCommit(false)
.withPreCommitValidatorConfig(validatorConfig)
.withProps(populateMetaFields ? new Properties() : getPropertiesForKeyGen())
.withClusteringConfig(clusteringConfig).build();
// create client with new config.
SparkRDDWriteClient client = getHoodieWriteClient(config);
String clusteringCommitTime = client.scheduleClustering(Option.empty()).get().toString();
HoodieWriteMetadata<JavaRDD<WriteStatus>> clusterMetadata = client.cluster(clusteringCommitTime, completeClustering);
verifyRecordsWritten(clusteringCommitTime, populateMetaFields, allRecords, clusterMetadata.getWriteStatuses().collect(), config);
Set<HoodieFileGroupId> replacedFileIds = new HashSet<>();
clusterMetadata.getPartitionToReplaceFileIds().entrySet().forEach(partitionFiles ->
partitionFiles.getValue().stream().forEach(file ->
replacedFileIds.add(new HoodieFileGroupId(partitionFiles.getKey(), file))));
return clusterMetadata;
}
private Set<HoodieFileGroupId> getFileGroupIdsFromWriteStatus(List<WriteStatus> statuses) {
return statuses.stream().map(s -> new HoodieFileGroupId(s.getPartitionPath(), s.getFileId())).collect(Collectors.toSet());
}
/**
* Test scenario of writing more file groups than existing number of file groups in partition.
*/
@ParameterizedTest
@MethodSource("populateMetaFieldsParams")
public void testInsertOverwritePartitionHandlingWithMoreRecords(boolean populateMetaFields) throws Exception {
verifyInsertOverwritePartitionHandling(1000, 3000, populateMetaFields);
}
/**
* Test scenario of writing fewer file groups than existing number of file groups in partition.
*/
@ParameterizedTest
@MethodSource("populateMetaFieldsParams")
public void testInsertOverwritePartitionHandlingWithFewerRecords(boolean populateMetaFields) throws Exception {
verifyInsertOverwritePartitionHandling(3000, 1000, populateMetaFields);
}
/**
* Test scenario of writing similar number file groups in partition.
*/
@ParameterizedTest
@MethodSource("populateMetaFieldsParams")
public void testInsertOverwritePartitionHandlingWithSimilarNumberOfRecords(boolean populateMetaFields) throws Exception {
verifyInsertOverwritePartitionHandling(3000, 3000, populateMetaFields);
}
/**
* 1) Do write1 (upsert) with 'batch1RecordsCount' number of records.
* 2) Do write2 (insert overwrite) with 'batch2RecordsCount' number of records.
*
* Verify that all records in step1 are overwritten
*/
private void verifyInsertOverwritePartitionHandling(int batch1RecordsCount, int batch2RecordsCount, boolean populateMetaFields) throws Exception {
final String testPartitionPath = "americas";
HoodieWriteConfig config = getSmallInsertWriteConfig(2000,
TRIP_EXAMPLE_SCHEMA, dataGen.getEstimatedFileSizeInBytes(150), populateMetaFields, populateMetaFields
? new Properties() : getPropertiesForKeyGen());
SparkRDDWriteClient client = getHoodieWriteClient(config);
dataGen = new HoodieTestDataGenerator(new String[] {testPartitionPath});
// Do Inserts
String commit1 = "001";
List<WriteStatus> statuses = writeAndVerifyBatch(client, dataGen.generateInserts(commit1, batch1RecordsCount), commit1, populateMetaFields);
Set<String> batch1Buckets = getFileIdsFromWriteStatus(statuses);
// Do Insert Overwrite
String commitTime2 = "002";
client.startCommitWithTime(commitTime2, HoodieTimeline.REPLACE_COMMIT_ACTION);
List<HoodieRecord> inserts2 = dataGen.generateInserts(commitTime2, batch2RecordsCount);
List<HoodieRecord> insertsAndUpdates2 = new ArrayList<>();
insertsAndUpdates2.addAll(inserts2);
JavaRDD<HoodieRecord> insertAndUpdatesRDD2 = jsc.parallelize(insertsAndUpdates2, 2);
HoodieWriteResult writeResult = client.insertOverwrite(insertAndUpdatesRDD2, commitTime2);
statuses = writeResult.getWriteStatuses().collect();
assertNoWriteErrors(statuses);
assertEquals(batch1Buckets, new HashSet<>(writeResult.getPartitionToReplaceFileIds().get(testPartitionPath)));
verifyRecordsWritten(commitTime2, populateMetaFields, inserts2, statuses, config);
}
private Set<String> getFileIdsFromWriteStatus(List<WriteStatus> statuses) {
return statuses.stream().map(s -> s.getFileId()).collect(Collectors.toSet());
}
/**
* Test scenario of writing fewer file groups for first partition than second an third partition.
*/
@ParameterizedTest
@MethodSource("populateMetaFieldsParams")
public void verifyDeletePartitionsHandlingWithFewerRecordsFirstPartition(boolean populateMetaFields) throws Exception {
verifyDeletePartitionsHandling(1000, 3000, 3000, populateMetaFields);
}
/**
* Test scenario of writing similar number file groups in partition.
*/
@ParameterizedTest
@MethodSource("populateMetaFieldsParams")
public void verifyDeletePartitionsHandlingWithSimilarNumberOfRecords(boolean populateMetaFields) throws Exception {
verifyDeletePartitionsHandling(3000, 3000, 3000, populateMetaFields);
}
/**
* Test scenario of writing more file groups for first partition than second an third partition.
*/
@ParameterizedTest
@MethodSource("populateMetaFieldsParams")
public void verifyDeletePartitionsHandlingHandlingWithFewerRecordsSecondThirdPartition(boolean populateMetaFields) throws Exception {
verifyDeletePartitionsHandling(3000, 1000, 1000, populateMetaFields);
}
private Set<String> insertPartitionRecordsWithCommit(SparkRDDWriteClient client, int recordsCount, String commitTime1, String partitionPath) throws IOException {
client.startCommitWithTime(commitTime1);
List<HoodieRecord> inserts1 = dataGen.generateInsertsForPartition(commitTime1, recordsCount, partitionPath);
JavaRDD<HoodieRecord> insertRecordsRDD1 = jsc.parallelize(inserts1, 2);
List<WriteStatus> statuses = client.upsert(insertRecordsRDD1, commitTime1).collect();
assertNoWriteErrors(statuses);
Set<String> batchBuckets = statuses.stream().map(s -> s.getFileId()).collect(Collectors.toSet());
verifyRecordsWritten(commitTime1, true, inserts1, statuses, client.getConfig());
return batchBuckets;
}
private Set<String> deletePartitionWithCommit(SparkRDDWriteClient client, String commitTime, List<String> deletePartitionPath) {
client.startCommitWithTime(commitTime, HoodieTimeline.REPLACE_COMMIT_ACTION);
HoodieWriteResult writeResult = client.deletePartitions(deletePartitionPath, commitTime);
Set<String> deletePartitionReplaceFileIds =
writeResult.getPartitionToReplaceFileIds().entrySet()
.stream().flatMap(entry -> entry.getValue().stream()).collect(Collectors.toSet());
return deletePartitionReplaceFileIds;
}
/**
* 1) Do write1 (upsert) with 'batch1RecordsCount' number of records for first partition.
* 2) Do write2 (upsert) with 'batch2RecordsCount' number of records for second partition.
* 3) Do write3 (upsert) with 'batch3RecordsCount' number of records for third partition.
* 4) delete first partition and check result.
* 5) delete second and third partition and check result.
*
*/
private void verifyDeletePartitionsHandling(int batch1RecordsCount, int batch2RecordsCount, int batch3RecordsCount,
boolean populateMetaFields) throws Exception {
HoodieWriteConfig config = getSmallInsertWriteConfig(2000,
TRIP_EXAMPLE_SCHEMA, dataGen.getEstimatedFileSizeInBytes(150), populateMetaFields, populateMetaFields
? new Properties() : getPropertiesForKeyGen());
SparkRDDWriteClient client = getHoodieWriteClient(config);
dataGen = new HoodieTestDataGenerator();
// Do Inserts for DEFAULT_FIRST_PARTITION_PATH
String commitTime1 = "001";
Set<String> batch1Buckets =
this.insertPartitionRecordsWithCommit(client, batch1RecordsCount, commitTime1, DEFAULT_FIRST_PARTITION_PATH);
// Do Inserts for DEFAULT_SECOND_PARTITION_PATH
String commitTime2 = "002";
Set<String> batch2Buckets =
this.insertPartitionRecordsWithCommit(client, batch2RecordsCount, commitTime2, DEFAULT_SECOND_PARTITION_PATH);
// Do Inserts for DEFAULT_THIRD_PARTITION_PATH
String commitTime3 = "003";
Set<String> batch3Buckets =
this.insertPartitionRecordsWithCommit(client, batch3RecordsCount, commitTime3, DEFAULT_THIRD_PARTITION_PATH);
// delete DEFAULT_FIRST_PARTITION_PATH
String commitTime4 = "004";
Set<String> deletePartitionReplaceFileIds1 =
deletePartitionWithCommit(client, commitTime4, Arrays.asList(DEFAULT_FIRST_PARTITION_PATH));
assertEquals(batch1Buckets, deletePartitionReplaceFileIds1);
List<HoodieBaseFile> baseFiles = HoodieClientTestUtils.getLatestBaseFiles(basePath, fs,
String.format("%s/%s/*", basePath, DEFAULT_FIRST_PARTITION_PATH));
assertEquals(0, baseFiles.size());
baseFiles = HoodieClientTestUtils.getLatestBaseFiles(basePath, fs,
String.format("%s/%s/*", basePath, DEFAULT_SECOND_PARTITION_PATH));
assertTrue(baseFiles.size() > 0);
baseFiles = HoodieClientTestUtils.getLatestBaseFiles(basePath, fs,
String.format("%s/%s/*", basePath, DEFAULT_THIRD_PARTITION_PATH));
assertTrue(baseFiles.size() > 0);
// delete DEFAULT_SECOND_PARTITION_PATH, DEFAULT_THIRD_PARTITION_PATH
String commitTime5 = "005";
Set<String> deletePartitionReplaceFileIds2 =
deletePartitionWithCommit(client, commitTime5, Arrays.asList(DEFAULT_SECOND_PARTITION_PATH, DEFAULT_THIRD_PARTITION_PATH));
Set<String> expectedFileId = new HashSet<>();
expectedFileId.addAll(batch2Buckets);
expectedFileId.addAll(batch3Buckets);
assertEquals(expectedFileId, deletePartitionReplaceFileIds2);
baseFiles = HoodieClientTestUtils.getLatestBaseFiles(basePath, fs,
String.format("%s/%s/*", basePath, DEFAULT_FIRST_PARTITION_PATH),
String.format("%s/%s/*", basePath, DEFAULT_SECOND_PARTITION_PATH),
String.format("%s/%s/*", basePath, DEFAULT_THIRD_PARTITION_PATH));
assertEquals(0, baseFiles.size());
}
/**
* Verify data in base files matches expected records and commit time.
*/
private void verifyRecordsWritten(String commitTime, boolean populateMetadataField,
List<HoodieRecord> expectedRecords, List<WriteStatus> allStatus, HoodieWriteConfig config) throws IOException {
List<GenericRecord> records = new ArrayList<>();
for (WriteStatus status : allStatus) {
Path filePath = new Path(basePath, status.getStat().getPath());
records.addAll(BaseFileUtils.getInstance(metaClient).readAvroRecords(jsc.hadoopConfiguration(), filePath));
}
Set<String> expectedKeys = recordsToRecordKeySet(expectedRecords);
assertEquals(records.size(), expectedKeys.size());
if (config.populateMetaFields()) {
for (GenericRecord record : records) {
String recordKey = record.get(HoodieRecord.RECORD_KEY_METADATA_FIELD).toString();
assertEquals(commitTime,
record.get(HoodieRecord.COMMIT_TIME_METADATA_FIELD).toString());
assertTrue(expectedKeys.contains(recordKey));
}
} else {
KeyGenerator keyGenerator = HoodieSparkKeyGeneratorFactory.createKeyGenerator(new TypedProperties(config.getProps()));
for (GenericRecord record : records) {
String recordKey = keyGenerator.getKey(record).getRecordKey();
if (!populateMetadataField) {
assertNull(record.get(HoodieRecord.COMMIT_TIME_METADATA_FIELD));
}
assertTrue(expectedKeys.contains(recordKey));
}
}
}
private List<WriteStatus> writeAndVerifyBatch(SparkRDDWriteClient client, List<HoodieRecord> inserts, String commitTime, boolean populateMetaFields) throws IOException {
client.startCommitWithTime(commitTime);
JavaRDD<HoodieRecord> insertRecordsRDD1 = jsc.parallelize(inserts, 2);
List<WriteStatus> statuses = client.upsert(insertRecordsRDD1, commitTime).collect();
assertNoWriteErrors(statuses);
verifyRecordsWritten(commitTime, populateMetaFields, inserts, statuses, client.getConfig());
return statuses;
}
private Pair<Set<String>, List<HoodieRecord>> testUpdates(String instantTime, SparkRDDWriteClient client,
int sizeToInsertAndUpdate, int expectedTotalRecords)
throws IOException {
client.startCommitWithTime(instantTime);
List<HoodieRecord> inserts = dataGen.generateInserts(instantTime, sizeToInsertAndUpdate);
Set<String> keys = recordsToRecordKeySet(inserts);
List<HoodieRecord> insertsAndUpdates = new ArrayList<>();
insertsAndUpdates.addAll(inserts);
insertsAndUpdates.addAll(dataGen.generateUpdates(instantTime, inserts));
JavaRDD<HoodieRecord> insertAndUpdatesRDD = jsc.parallelize(insertsAndUpdates, 1);
List<WriteStatus> statuses = client.upsert(insertAndUpdatesRDD, instantTime).collect();
assertNoWriteErrors(statuses);
// Check the entire dataset has all records still
String[] fullPartitionPaths = new String[dataGen.getPartitionPaths().length];
for (int i = 0; i < fullPartitionPaths.length; i++) {
fullPartitionPaths[i] = String.format("%s/%s/*", basePath, dataGen.getPartitionPaths()[i]);
}
assertEquals(expectedTotalRecords,
HoodieClientTestUtils.read(jsc, basePath, sqlContext, fs, fullPartitionPaths).count(),
"Must contain " + expectedTotalRecords + " records");
return Pair.of(keys, inserts);
}
private void testDeletes(SparkRDDWriteClient client, List<HoodieRecord> previousRecords, int sizeToDelete,
String existingFile, String instantTime, int expectedRecords, List<String> keys) {
client.startCommitWithTime(instantTime);
List<HoodieKey> hoodieKeysToDelete = randomSelectAsHoodieKeys(previousRecords, sizeToDelete);
JavaRDD<HoodieKey> deleteKeys = jsc.parallelize(hoodieKeysToDelete, 1);
List<WriteStatus> statuses = client.delete(deleteKeys, instantTime).collect();
assertNoWriteErrors(statuses);
assertEquals(1, statuses.size(), "Just 1 file needs to be added.");
assertEquals(existingFile, statuses.get(0).getFileId(), "Existing file should be expanded");
// Check the entire dataset has all records still
String[] fullPartitionPaths = new String[dataGen.getPartitionPaths().length];
for (int i = 0; i < fullPartitionPaths.length; i++) {
fullPartitionPaths[i] = String.format("%s/%s/*", basePath, dataGen.getPartitionPaths()[i]);
}
assertEquals(expectedRecords,
HoodieClientTestUtils.read(jsc, basePath, sqlContext, fs, fullPartitionPaths).count(),
"Must contain " + expectedRecords + " records");
Path newFile = new Path(basePath, statuses.get(0).getStat().getPath());
assertEquals(expectedRecords,
BaseFileUtils.getInstance(metaClient).readRowKeys(hadoopConf, newFile).size(),
"file should contain 110 records");
List<GenericRecord> records = BaseFileUtils.getInstance(metaClient).readAvroRecords(hadoopConf, newFile);
for (GenericRecord record : records) {
String recordKey = record.get(HoodieRecord.RECORD_KEY_METADATA_FIELD).toString();
assertTrue(keys.contains(recordKey), "key expected to be part of " + instantTime);
assertFalse(hoodieKeysToDelete.contains(recordKey), "Key deleted");
}
}
/**
* Test delete with delete api.
*/
@ParameterizedTest
@MethodSource("populateMetaFieldsParams")
public void testDeletesWithoutInserts(boolean populateMetaFields) {
final String testPartitionPath = "2016/09/26";
final int insertSplitLimit = 100;
// setup the small file handling params
HoodieWriteConfig config = getSmallInsertWriteConfig(insertSplitLimit,
TRIP_EXAMPLE_SCHEMA, dataGen.getEstimatedFileSizeInBytes(150), populateMetaFields, populateMetaFields
? new Properties() : getPropertiesForKeyGen());
dataGen = new HoodieTestDataGenerator(new String[] {testPartitionPath});
SparkRDDWriteClient client = getHoodieWriteClient(config);
// delete non existent keys
String commitTime1 = "001";
client.startCommitWithTime(commitTime1);
List<HoodieRecord> dummyInserts = dataGen.generateInserts(commitTime1, 20);
List<HoodieKey> hoodieKeysToDelete = randomSelectAsHoodieKeys(dummyInserts, 20);
JavaRDD<HoodieKey> deleteKeys = jsc.parallelize(hoodieKeysToDelete, 1);
assertThrows(HoodieIOException.class, () -> {
client.delete(deleteKeys, commitTime1).collect();
}, "Should have thrown Exception");
}
/**
* Test to ensure commit metadata points to valid files.
*/
@ParameterizedTest
@MethodSource("populateMetaFieldsParams")
public void testCommitWritesRelativePaths(boolean populateMetaFields) throws Exception {
HoodieWriteConfig.Builder cfgBuilder = getConfigBuilder().withAutoCommit(false);
addConfigsForPopulateMetaFields(cfgBuilder, populateMetaFields);
try (SparkRDDWriteClient client = getHoodieWriteClient(cfgBuilder.build());) {
HoodieTableMetaClient metaClient = HoodieTableMetaClient.builder().setConf(hadoopConf).setBasePath(basePath).build();
HoodieSparkTable table = HoodieSparkTable.create(cfgBuilder.build(), context, metaClient);
String instantTime = "000";
client.startCommitWithTime(instantTime);
List<HoodieRecord> records = dataGen.generateInserts(instantTime, 200);
JavaRDD<HoodieRecord> writeRecords = jsc.parallelize(records, 1);
JavaRDD<WriteStatus> result = client.bulkInsert(writeRecords, instantTime);
assertTrue(client.commit(instantTime, result), "Commit should succeed");
assertTrue(testTable.commitExists(instantTime),
"After explicit commit, commit file should be created");
// Get base file paths from commit metadata
String actionType = metaClient.getCommitActionType();
HoodieInstant commitInstant = new HoodieInstant(false, actionType, instantTime);
HoodieTimeline commitTimeline = metaClient.getCommitTimeline().filterCompletedInstants();
HoodieCommitMetadata commitMetadata = HoodieCommitMetadata
.fromBytes(commitTimeline.getInstantDetails(commitInstant).get(), HoodieCommitMetadata.class);
String basePath = table.getMetaClient().getBasePath();
Collection<String> commitPathNames = commitMetadata.getFileIdAndFullPaths(basePath).values();
// Read from commit file
try (FSDataInputStream inputStream = fs.open(testTable.getCommitFilePath(instantTime))) {
String everything = FileIOUtils.readAsUTFString(inputStream);
HoodieCommitMetadata metadata = HoodieCommitMetadata.fromJsonString(everything, HoodieCommitMetadata.class);
HashMap<String, String> paths = metadata.getFileIdAndFullPaths(basePath);
// Compare values in both to make sure they are equal.
for (String pathName : paths.values()) {
assertTrue(commitPathNames.contains(pathName));
}
}
}
}
/**
* Test to ensure commit metadata points to valid files.10.
*/
@ParameterizedTest
@MethodSource("populateMetaFieldsParams")
public void testMetadataStatsOnCommit(boolean populateMetaFields) throws Exception {
HoodieWriteConfig.Builder cfgBuilder = getConfigBuilder().withAutoCommit(false);
addConfigsForPopulateMetaFields(cfgBuilder, populateMetaFields);
HoodieWriteConfig cfg = cfgBuilder.build();
SparkRDDWriteClient client = getHoodieWriteClient(cfg);
String instantTime0 = "000";
client.startCommitWithTime(instantTime0);
List<HoodieRecord> records0 = dataGen.generateInserts(instantTime0, 200);
JavaRDD<HoodieRecord> writeRecords0 = jsc.parallelize(records0, 1);
JavaRDD<WriteStatus> result0 = client.bulkInsert(writeRecords0, instantTime0);
assertTrue(client.commit(instantTime0, result0), "Commit should succeed");
assertTrue(testTable.commitExists(instantTime0),
"After explicit commit, commit file should be created");
// Read from commit file
try (FSDataInputStream inputStream = fs.open(testTable.getCommitFilePath(instantTime0))) {
String everything = FileIOUtils.readAsUTFString(inputStream);
HoodieCommitMetadata metadata =
HoodieCommitMetadata.fromJsonString(everything, HoodieCommitMetadata.class);
int inserts = 0;
for (Map.Entry<String, List<HoodieWriteStat>> pstat : metadata.getPartitionToWriteStats().entrySet()) {
for (HoodieWriteStat stat : pstat.getValue()) {
inserts += stat.getNumInserts();
}
}
assertEquals(200, inserts);
}
// Update + Inserts such that they just expand file1
String instantTime1 = "001";
client.startCommitWithTime(instantTime1);
List<HoodieRecord> records1 = dataGen.generateUpdates(instantTime1, records0);
JavaRDD<HoodieRecord> writeRecords1 = jsc.parallelize(records1, 1);
JavaRDD<WriteStatus> result1 = client.upsert(writeRecords1, instantTime1);
assertTrue(client.commit(instantTime1, result1), "Commit should succeed");
assertTrue(testTable.commitExists(instantTime1),
"After explicit commit, commit file should be created");
// Read from commit file
try (FSDataInputStream inputStream = fs.open(testTable.getCommitFilePath(instantTime1))) {
String everything = FileIOUtils.readAsUTFString(inputStream);
HoodieCommitMetadata metadata = HoodieCommitMetadata.fromJsonString(everything, HoodieCommitMetadata.class);
int inserts = 0;
int upserts = 0;
for (Map.Entry<String, List<HoodieWriteStat>> pstat : metadata.getPartitionToWriteStats().entrySet()) {
for (HoodieWriteStat stat : pstat.getValue()) {
inserts += stat.getNumInserts();
upserts += stat.getNumUpdateWrites();
}
}
assertEquals(0, inserts);
assertEquals(200, upserts);
}
}
/**
* Tests behavior of committing only when consistency is verified.
*/
@ParameterizedTest
@ValueSource(booleans = {true, false})
public void testConsistencyCheckDuringFinalize(boolean enableOptimisticConsistencyGuard) throws Exception {
HoodieTableMetaClient metaClient = HoodieTableMetaClient.builder().setConf(hadoopConf).setBasePath(basePath).build();
String instantTime = "000";
HoodieWriteConfig cfg = getConfigBuilder().withAutoCommit(false).withConsistencyGuardConfig(ConsistencyGuardConfig.newBuilder()
.withEnableOptimisticConsistencyGuard(enableOptimisticConsistencyGuard).build()).build();
SparkRDDWriteClient client = getHoodieWriteClient(cfg);
Pair<Path, JavaRDD<WriteStatus>> result = testConsistencyCheck(metaClient, instantTime, enableOptimisticConsistencyGuard);
// Delete orphan marker and commit should succeed
metaClient.getFs().delete(result.getKey(), false);
if (!enableOptimisticConsistencyGuard) {
assertTrue(client.commit(instantTime, result.getRight()), "Commit should succeed");
assertTrue(testTable.commitExists(instantTime),
"After explicit commit, commit file should be created");
// Marker directory must be removed
assertFalse(metaClient.getFs().exists(new Path(metaClient.getMarkerFolderPath(instantTime))));
} else {
// with optimistic, first client.commit should have succeeded.
assertTrue(testTable.commitExists(instantTime),
"After explicit commit, commit file should be created");
// Marker directory must be removed
assertFalse(metaClient.getFs().exists(new Path(metaClient.getMarkerFolderPath(instantTime))));
}
}
private void testRollbackAfterConsistencyCheckFailureUsingFileList(boolean rollbackUsingMarkers, boolean enableOptimisticConsistencyGuard,
boolean populateMetaFields) throws Exception {
String instantTime = "000";
HoodieTableMetaClient metaClient = HoodieTableMetaClient.builder().setConf(hadoopConf).setBasePath(basePath).build();
Properties properties = new Properties();
if (!populateMetaFields) {
properties = getPropertiesForKeyGen();
}
HoodieWriteConfig cfg = !enableOptimisticConsistencyGuard ? getConfigBuilder().withRollbackUsingMarkers(rollbackUsingMarkers).withAutoCommit(false)
.withConsistencyGuardConfig(ConsistencyGuardConfig.newBuilder().withConsistencyCheckEnabled(true)
.withMaxConsistencyCheckIntervalMs(1).withInitialConsistencyCheckIntervalMs(1).withEnableOptimisticConsistencyGuard(enableOptimisticConsistencyGuard).build()).build() :
getConfigBuilder().withRollbackUsingMarkers(rollbackUsingMarkers).withAutoCommit(false)
.withConsistencyGuardConfig(ConsistencyGuardConfig.newBuilder()
.withConsistencyCheckEnabled(true)
.withOptimisticConsistencyGuardSleepTimeMs(1).build())
.withProperties(properties).build();
SparkRDDWriteClient client = getHoodieWriteClient(cfg);
testConsistencyCheck(metaClient, instantTime, enableOptimisticConsistencyGuard);
if (!enableOptimisticConsistencyGuard) {
// Rollback of this commit should succeed with FailSafeCG
client.rollback(instantTime);
assertFalse(testTable.commitExists(instantTime),
"After explicit rollback, commit file should not be present");
// Marker directory must be removed after rollback
assertFalse(metaClient.getFs().exists(new Path(metaClient.getMarkerFolderPath(instantTime))));
} else {
// if optimistic CG is enabled, commit should have succeeded.
assertTrue(testTable.commitExists(instantTime),
"With optimistic CG, first commit should succeed. commit file should be present");
// Marker directory must be removed after rollback
assertFalse(metaClient.getFs().exists(new Path(metaClient.getMarkerFolderPath(instantTime))));
if (rollbackUsingMarkers) {
// rollback of a completed commit should fail if marked based rollback is used.
try {
client.rollback(instantTime);
fail("Rollback of completed commit should throw exception");
} catch (HoodieRollbackException e) {
// ignore
}
} else {
// rollback of a completed commit should succeed if using list based rollback
client.rollback(instantTime);
assertFalse(testTable.commitExists(instantTime),
"After explicit rollback, commit file should not be present");
}
}
}
@ParameterizedTest
@MethodSource("rollbackAfterConsistencyCheckFailureParams")
public void testRollbackAfterConsistencyCheckFailureUsingFileList(boolean enableOptimisticConsistencyGuard, boolean populateMetCols) throws Exception {
testRollbackAfterConsistencyCheckFailureUsingFileList(false, enableOptimisticConsistencyGuard, populateMetCols);
}
@ParameterizedTest
@MethodSource("rollbackAfterConsistencyCheckFailureParams")
public void testRollbackAfterConsistencyCheckFailureUsingMarkers(boolean enableOptimisticConsistencyGuard, boolean populateMetCols) throws Exception {
testRollbackAfterConsistencyCheckFailureUsingFileList(true, enableOptimisticConsistencyGuard, populateMetCols);
}
@ParameterizedTest
@MethodSource("rollbackFailedCommitsParams")
public void testRollbackFailedCommits(HoodieFailedWritesCleaningPolicy cleaningPolicy, boolean populateMetaFields) throws Exception {
HoodieTestUtils.init(hadoopConf, basePath);
// Perform 2 failed writes to table
SparkRDDWriteClient client = new SparkRDDWriteClient(context, getParallelWritingWriteConfig(cleaningPolicy, populateMetaFields));
writeBatch(client, "100", "100", Option.of(Arrays.asList("100")), "100",
100, dataGen::generateInserts, SparkRDDWriteClient::bulkInsert, false, 100, 300,
0, false);
client.close();
client = new SparkRDDWriteClient(context, getParallelWritingWriteConfig(cleaningPolicy, populateMetaFields));
writeBatch(client, "200", "100", Option.of(Arrays.asList("200")), "200",
100, dataGen::generateInserts, SparkRDDWriteClient::bulkInsert, false, 100, 300,
0, false);
client.close();
// refresh data generator to delete records generated from failed commits
dataGen = new HoodieTestDataGenerator();
// Perform 1 successful write
client = new SparkRDDWriteClient(context, getParallelWritingWriteConfig(cleaningPolicy, populateMetaFields));
writeBatch(client, "300", "200", Option.of(Arrays.asList("300")), "300",
100, dataGen::generateInserts, SparkRDDWriteClient::bulkInsert, false, 100, 300,
0, true);
HoodieTableMetaClient metaClient = HoodieTableMetaClient.builder().setConf(hadoopConf).setBasePath(basePath).build();
assertTrue(metaClient.getActiveTimeline().getTimelineOfActions(
CollectionUtils.createSet(HoodieTimeline.ROLLBACK_ACTION)).countInstants() == 0);
assertTrue(metaClient.getActiveTimeline().filterInflights().countInstants() == 2);
assertTrue(metaClient.getActiveTimeline().getCommitsTimeline().filterCompletedInstants().countInstants() == 1);
// Await till enough time passes such that the first 2 failed commits heartbeats are expired
boolean conditionMet = false;
while (!conditionMet) {
conditionMet = client.getHeartbeatClient().isHeartbeatExpired("200");
Thread.sleep(2000);
}
client = new SparkRDDWriteClient(context, getParallelWritingWriteConfig(cleaningPolicy, populateMetaFields));
// Perform 1 successful write
writeBatch(client, "400", "300", Option.of(Arrays.asList("400")), "400",
100, dataGen::generateInserts, SparkRDDWriteClient::bulkInsert, false, 100, 300,
0, true);
client.clean();
HoodieActiveTimeline timeline = metaClient.getActiveTimeline().reload();
if (cleaningPolicy.isLazy()) {
assertTrue(
timeline
.getTimelineOfActions(CollectionUtils.createSet(HoodieTimeline.ROLLBACK_ACTION))
.countInstants()
== 2);
// Since we write rollbacks not clean, there should be no clean action on the timeline
assertTrue(
timeline
.getTimelineOfActions(CollectionUtils.createSet(HoodieTimeline.CLEAN_ACTION))
.countInstants()
== 0);
assertTrue(timeline.getCommitsTimeline().filterCompletedInstants().countInstants() == 2);
} else if (cleaningPolicy.isNever()) {
assertTrue(
timeline
.getTimelineOfActions(CollectionUtils.createSet(HoodieTimeline.ROLLBACK_ACTION))
.countInstants()
== 0);
// There should be no clean or rollback action on the timeline
assertTrue(
timeline
.getTimelineOfActions(CollectionUtils.createSet(HoodieTimeline.CLEAN_ACTION))
.countInstants()
== 0);
assertTrue(timeline.getCommitsTimeline().filterCompletedInstants().countInstants() == 2);
}
}
@ParameterizedTest
@MethodSource("populateMetaFieldsParams")
public void testRollbackFailedCommitsToggleCleaningPolicy(boolean populateMetaFields) throws Exception {
HoodieTestUtils.init(hadoopConf, basePath);
HoodieFailedWritesCleaningPolicy cleaningPolicy = HoodieFailedWritesCleaningPolicy.EAGER;
SparkRDDWriteClient client = new SparkRDDWriteClient(context, getParallelWritingWriteConfig(cleaningPolicy, populateMetaFields));
// Perform 1 failed writes to table
writeBatch(client, "100", "100", Option.of(Arrays.asList("100")), "100",
100, dataGen::generateInserts, SparkRDDWriteClient::bulkInsert, false, 100, 300,
0, false);
client.close();
// Toggle cleaning policy to LAZY
cleaningPolicy = HoodieFailedWritesCleaningPolicy.LAZY;
// Perform 2 failed writes to table
client = new SparkRDDWriteClient(context, getParallelWritingWriteConfig(cleaningPolicy, populateMetaFields));
writeBatch(client, "200", "100", Option.of(Arrays.asList("200")), "200",
100, dataGen::generateInserts, SparkRDDWriteClient::bulkInsert, false, 100, 300,
0, false);
client.close();
client = new SparkRDDWriteClient(context, getParallelWritingWriteConfig(cleaningPolicy, populateMetaFields));
writeBatch(client, "300", "200", Option.of(Arrays.asList("300")), "300",
100, dataGen::generateInserts, SparkRDDWriteClient::bulkInsert, false, 100, 300,
0, false);
client.close();
// Await till enough time passes such that the first 2 failed commits heartbeats are expired
boolean conditionMet = false;
while (!conditionMet) {
conditionMet = client.getHeartbeatClient().isHeartbeatExpired("300");
Thread.sleep(2000);
}
client.clean();
HoodieActiveTimeline timeline = metaClient.getActiveTimeline().reload();
assertTrue(timeline.getTimelineOfActions(
CollectionUtils.createSet(HoodieTimeline.ROLLBACK_ACTION)).countInstants() == 3);
// Perform 2 failed commits
client = new SparkRDDWriteClient(context, getParallelWritingWriteConfig(cleaningPolicy, populateMetaFields));
writeBatch(client, "400", "300", Option.of(Arrays.asList("400")), "400",
100, dataGen::generateInserts, SparkRDDWriteClient::bulkInsert, false, 100, 300,
0, false);
client.close();
client = new SparkRDDWriteClient(context, getParallelWritingWriteConfig(cleaningPolicy, populateMetaFields));
writeBatch(client, "500", "400", Option.of(Arrays.asList("500")), "500",
100, dataGen::generateInserts, SparkRDDWriteClient::bulkInsert, false, 100, 300,
0, false);
client.close();
// Toggle cleaning policy to EAGER
cleaningPolicy = HoodieFailedWritesCleaningPolicy.EAGER;
client = new SparkRDDWriteClient(context, getParallelWritingWriteConfig(cleaningPolicy, populateMetaFields));
client.startCommit();
timeline = metaClient.getActiveTimeline().reload();
assertTrue(timeline.getTimelineOfActions(
CollectionUtils.createSet(HoodieTimeline.ROLLBACK_ACTION)).countInstants() == 5);
assertTrue(timeline.getCommitsTimeline().filterCompletedInstants().countInstants() == 0);
}
@ParameterizedTest
@MethodSource("populateMetaFieldsParams")
public void testParallelInsertAndCleanPreviousFailedCommits(boolean populateMetaFields) throws Exception {
HoodieFailedWritesCleaningPolicy cleaningPolicy = HoodieFailedWritesCleaningPolicy.LAZY;
ExecutorService service = Executors.newFixedThreadPool(2);
HoodieTestUtils.init(hadoopConf, basePath);
// Perform 2 failed writes to table
SparkRDDWriteClient client = new SparkRDDWriteClient(context, getParallelWritingWriteConfig(cleaningPolicy, populateMetaFields));
writeBatch(client, "100", "100", Option.of(Arrays.asList("100")), "100",
100, dataGen::generateInserts, SparkRDDWriteClient::bulkInsert, false, 100, 100,
0, false);
client.close();
client = new SparkRDDWriteClient(context, getParallelWritingWriteConfig(cleaningPolicy, populateMetaFields));
writeBatch(client, "200", "200", Option.of(Arrays.asList("200")), "200",
100, dataGen::generateInserts, SparkRDDWriteClient::bulkInsert, false, 100, 100,
0, false);
client.close();
// refresh data generator to delete records generated from failed commits
dataGen = new HoodieTestDataGenerator();
// Create a succesful commit
Future<JavaRDD<WriteStatus>> commit3 = service.submit(() -> writeBatch(new SparkRDDWriteClient(context, getParallelWritingWriteConfig(cleaningPolicy, populateMetaFields)),
"300", "200", Option.of(Arrays.asList("300")), "200", 100, dataGen::generateInserts,
SparkRDDWriteClient::bulkInsert, false, 100, 100, 0, true));
commit3.get();
HoodieTableMetaClient metaClient = HoodieTableMetaClient.builder().setConf(hadoopConf).setBasePath(basePath).build();
assertTrue(metaClient.getActiveTimeline().getTimelineOfActions(
CollectionUtils.createSet(HoodieTimeline.ROLLBACK_ACTION)).countInstants() == 0);
assertTrue(metaClient.getActiveTimeline().filterInflights().countInstants() == 2);
assertTrue(metaClient.getActiveTimeline().getCommitsTimeline().filterCompletedInstants().countInstants() == 1);
client = new SparkRDDWriteClient(context, getParallelWritingWriteConfig(cleaningPolicy, populateMetaFields));
// Await till enough time passes such that the first 2 failed commits heartbeats are expired
boolean conditionMet = false;
while (!conditionMet) {
conditionMet = client.getHeartbeatClient().isHeartbeatExpired("200");
Thread.sleep(2000);
}
Future<JavaRDD<WriteStatus>> commit4 = service.submit(() -> writeBatch(new SparkRDDWriteClient(context, getParallelWritingWriteConfig(cleaningPolicy, populateMetaFields)),
"400", "300", Option.of(Arrays.asList("400")), "400", 100, dataGen::generateInserts,
SparkRDDWriteClient::bulkInsert, false, 100, 100, 0, true));
Future<HoodieCleanMetadata> clean1 = service.submit(() -> new SparkRDDWriteClient(context, getParallelWritingWriteConfig(cleaningPolicy, populateMetaFields)).clean());
commit4.get();
clean1.get();
HoodieActiveTimeline timeline = metaClient.getActiveTimeline().reload();
assertTrue(timeline.getTimelineOfActions(
CollectionUtils.createSet(HoodieTimeline.ROLLBACK_ACTION)).countInstants() == 2);
// Since we write rollbacks not clean, there should be no clean action on the timeline
assertTrue(timeline.getTimelineOfActions(
CollectionUtils.createSet(HoodieTimeline.CLEAN_ACTION)).countInstants() == 0);
assertTrue(timeline.getCommitsTimeline().filterCompletedInstants().countInstants() == 2);
}
private Pair<Path, JavaRDD<WriteStatus>> testConsistencyCheck(HoodieTableMetaClient metaClient, String instantTime, boolean enableOptimisticConsistencyGuard)
throws Exception {
HoodieWriteConfig cfg = !enableOptimisticConsistencyGuard ? (getConfigBuilder().withAutoCommit(false)
.withConsistencyGuardConfig(ConsistencyGuardConfig.newBuilder().withConsistencyCheckEnabled(true)
.withMaxConsistencyCheckIntervalMs(1).withInitialConsistencyCheckIntervalMs(1).withEnableOptimisticConsistencyGuard(enableOptimisticConsistencyGuard).build())
.build()) : (getConfigBuilder().withAutoCommit(false)
.withConsistencyGuardConfig(ConsistencyGuardConfig.newBuilder().withConsistencyCheckEnabled(true)
.withOptimisticConsistencyGuardSleepTimeMs(1).build())
.build());
SparkRDDWriteClient client = getHoodieWriteClient(cfg);
client.startCommitWithTime(instantTime);
JavaRDD<HoodieRecord> writeRecords = jsc.parallelize(dataGen.generateInserts(instantTime, 200), 1);
JavaRDD<WriteStatus> result = client.bulkInsert(writeRecords, instantTime);
result.collect();
// Create a dummy marker file to simulate the case that a marker file was created without data file.
// This should fail the commit
String partitionPath = Arrays
.stream(fs.globStatus(new Path(String.format("%s/*/*/*/*", metaClient.getMarkerFolderPath(instantTime))),
path -> path.toString().contains(HoodieTableMetaClient.MARKER_EXTN)))
.limit(1).map(status -> status.getPath().getParent().toString()).collect(Collectors.toList()).get(0);
Path markerFilePath = new MarkerFiles(fs, basePath, metaClient.getMarkerFolderPath(instantTime), instantTime)
.create(partitionPath,
FSUtils.makeDataFileName(instantTime, "1-0-1", UUID.randomUUID().toString()),
IOType.MERGE);
LOG.info("Created a dummy marker path=" + markerFilePath);
if (!enableOptimisticConsistencyGuard) {
Exception e = assertThrows(HoodieCommitException.class, () -> {
client.commit(instantTime, result);
}, "Commit should fail due to consistency check");
assertTrue(e.getCause() instanceof HoodieIOException);
} else {
// with optimistic CG, commit should succeed
client.commit(instantTime, result);
}
return Pair.of(markerFilePath, result);
}
@ParameterizedTest
@MethodSource("populateMetaFieldsParams")
public void testMultiOperationsPerCommit(boolean populateMetaFields) throws IOException {
HoodieWriteConfig.Builder cfgBuilder = getConfigBuilder().withAutoCommit(false)
.withAllowMultiWriteOnSameInstant(true);
addConfigsForPopulateMetaFields(cfgBuilder, populateMetaFields);
HoodieWriteConfig cfg = cfgBuilder.build();
SparkRDDWriteClient client = getHoodieWriteClient(cfg);
String firstInstantTime = "0000";
client.startCommitWithTime(firstInstantTime);
int numRecords = 200;
JavaRDD<HoodieRecord> writeRecords = jsc.parallelize(dataGen.generateInserts(firstInstantTime, numRecords), 1);
JavaRDD<WriteStatus> result = client.bulkInsert(writeRecords, firstInstantTime);
assertTrue(client.commit(firstInstantTime, result), "Commit should succeed");
assertTrue(testTable.commitExists(firstInstantTime),
"After explicit commit, commit file should be created");
// Check the entire dataset has all records still
String[] fullPartitionPaths = new String[dataGen.getPartitionPaths().length];
for (int i = 0; i < fullPartitionPaths.length; i++) {
fullPartitionPaths[i] = String.format("%s/%s/*", basePath, dataGen.getPartitionPaths()[i]);
}
assertEquals(numRecords,
HoodieClientTestUtils.read(jsc, basePath, sqlContext, fs, fullPartitionPaths).count(),
"Must contain " + numRecords + " records");
String nextInstantTime = "0001";
client.startCommitWithTime(nextInstantTime);
JavaRDD<HoodieRecord> updateRecords = jsc.parallelize(dataGen.generateUpdates(nextInstantTime, numRecords), 1);
JavaRDD<HoodieRecord> insertRecords = jsc.parallelize(dataGen.generateInserts(nextInstantTime, numRecords), 1);
JavaRDD<WriteStatus> inserts = client.bulkInsert(insertRecords, nextInstantTime);
JavaRDD<WriteStatus> upserts = client.upsert(updateRecords, nextInstantTime);
assertTrue(client.commit(nextInstantTime, inserts.union(upserts)), "Commit should succeed");
assertTrue(testTable.commitExists(firstInstantTime),
"After explicit commit, commit file should be created");
int totalRecords = 2 * numRecords;
assertEquals(totalRecords, HoodieClientTestUtils.read(jsc, basePath, sqlContext, fs, fullPartitionPaths).count(),
"Must contain " + totalRecords + " records");
}
/**
* Build Hoodie Write Config for small data file sizes.
*/
private HoodieWriteConfig getSmallInsertWriteConfig(int insertSplitSize) {
return getSmallInsertWriteConfig(insertSplitSize, false);
}
/**
* Build Hoodie Write Config for small data file sizes.
*/
private HoodieWriteConfig getSmallInsertWriteConfig(int insertSplitSize, boolean useNullSchema) {
return getSmallInsertWriteConfig(insertSplitSize, useNullSchema, false);
}
/**
* Build Hoodie Write Config for small data file sizes.
*/
private HoodieWriteConfig getSmallInsertWriteConfig(int insertSplitSize, boolean useNullSchema, boolean mergeAllowDuplicateInserts) {
return getSmallInsertWriteConfig(insertSplitSize, useNullSchema, dataGen.getEstimatedFileSizeInBytes(150), mergeAllowDuplicateInserts);
}
/**
* Build Hoodie Write Config for specified small file sizes.
*/
private HoodieWriteConfig getSmallInsertWriteConfig(int insertSplitSize, boolean useNullSchema, long smallFileSize) {
return getSmallInsertWriteConfig(insertSplitSize, useNullSchema, smallFileSize, false);
}
/**
* Build Hoodie Write Config for specified small file sizes.
*/
private HoodieWriteConfig getSmallInsertWriteConfig(int insertSplitSize, boolean useNullSchema, long smallFileSize, boolean mergeAllowDuplicateInserts) {
String schemaStr = useNullSchema ? NULL_SCHEMA : TRIP_EXAMPLE_SCHEMA;
return getSmallInsertWriteConfig(insertSplitSize, schemaStr, smallFileSize, mergeAllowDuplicateInserts);
}
private HoodieWriteConfig getSmallInsertWriteConfig(int insertSplitSize, String schemaStr, long smallFileSize) {
return getSmallInsertWriteConfig(insertSplitSize, schemaStr, smallFileSize, false);
}
private HoodieWriteConfig getSmallInsertWriteConfig(int insertSplitSize, String schemaStr, long smallFileSize, boolean mergeAllowDuplicateInserts) {
return getSmallInsertWriteConfig(insertSplitSize, schemaStr, smallFileSize, mergeAllowDuplicateInserts, true, new Properties());
}
private HoodieWriteConfig getSmallInsertWriteConfig(int insertSplitSize, String schemaStr, long smallFileSize, boolean populateMetaFields, Properties props) {
return getSmallInsertWriteConfig(insertSplitSize, schemaStr, smallFileSize, false, populateMetaFields, props);
}
private HoodieWriteConfig getSmallInsertWriteConfig(int insertSplitSize, String schemaStr, long smallFileSize, boolean mergeAllowDuplicateInserts,
boolean populateMetaFields, Properties props) {
HoodieWriteConfig.Builder builder = getConfigBuilder(schemaStr);
if (!populateMetaFields) {
builder.withIndexConfig(HoodieIndexConfig.newBuilder().withIndexType(IndexType.SIMPLE).build());
}
return builder
.withCompactionConfig(
HoodieCompactionConfig.newBuilder()
.compactionSmallFileSize(smallFileSize)
// Set rollback to LAZY so no inflights are deleted
.withFailedWritesCleaningPolicy(HoodieFailedWritesCleaningPolicy.LAZY)
.insertSplitSize(insertSplitSize).build())
.withStorageConfig(
HoodieStorageConfig.newBuilder()
.hfileMaxFileSize(dataGen.getEstimatedFileSizeInBytes(200))
.parquetMaxFileSize(dataGen.getEstimatedFileSizeInBytes(200)).build())
.withMergeAllowDuplicateOnInserts(mergeAllowDuplicateInserts)
.withProps(props)
.build();
}
protected HoodieInstant createRequestedReplaceInstant(HoodieTableMetaClient metaClient, String clusterTime, List<FileSlice>[] fileSlices) throws IOException {
HoodieClusteringPlan clusteringPlan =
ClusteringUtils.createClusteringPlan(CLUSTERING_EXECUTION_STRATEGY_CLASS.defaultValue(), STRATEGY_PARAMS, fileSlices, Collections.emptyMap());
HoodieInstant clusteringInstant = new HoodieInstant(HoodieInstant.State.REQUESTED, HoodieTimeline.REPLACE_COMMIT_ACTION, clusterTime);
HoodieRequestedReplaceMetadata requestedReplaceMetadata = HoodieRequestedReplaceMetadata.newBuilder()
.setClusteringPlan(clusteringPlan).setOperationType(WriteOperationType.CLUSTER.name()).build();
metaClient.getActiveTimeline().saveToPendingReplaceCommit(clusteringInstant, TimelineMetadataUtils.serializeRequestedReplaceMetadata(requestedReplaceMetadata));
return clusteringInstant;
}
private HoodieWriteConfig getParallelWritingWriteConfig(HoodieFailedWritesCleaningPolicy cleaningPolicy, boolean populateMetaFields) {
return getConfigBuilder()
.withEmbeddedTimelineServerEnabled(false)
.withCompactionConfig(HoodieCompactionConfig.newBuilder()
.withFailedWritesCleaningPolicy(cleaningPolicy)
.withAutoClean(false).build())
.withTimelineLayoutVersion(1)
.withHeartbeatIntervalInMs(3 * 1000)
.withAutoCommit(false)
.withProperties(populateMetaFields ? new Properties() : getPropertiesForKeyGen()).build();
}
public static class FailingPreCommitValidator<T extends HoodieRecordPayload, I, K, O extends JavaRDD<WriteStatus>> extends SparkPreCommitValidator<T, I, K, O> {
public FailingPreCommitValidator(HoodieSparkTable table, HoodieEngineContext context, HoodieWriteConfig config) {
super(table, context, config);
}
@Override
protected void validateRecordsBeforeAndAfter(final Dataset<Row> before, final Dataset<Row> after, final Set<String> partitionsAffected) {
throw new HoodieValidationException("simulate failure");
}
}
}