blob: bbb40488bb04e749fc2b353e52c79b22613bda79 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hudi.client;
import org.apache.avro.generic.GenericRecord;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.Path;
import org.apache.hudi.common.fs.ConsistencyGuardConfig;
import org.apache.hudi.common.fs.FSUtils;
import org.apache.hudi.common.model.HoodieBaseFile;
import org.apache.hudi.common.model.HoodieCommitMetadata;
import org.apache.hudi.common.model.HoodieKey;
import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.common.model.HoodieWriteStat;
import org.apache.hudi.common.model.IOType;
import org.apache.hudi.common.table.HoodieTableMetaClient;
import org.apache.hudi.common.table.timeline.HoodieActiveTimeline;
import org.apache.hudi.common.table.timeline.HoodieInstant;
import org.apache.hudi.common.table.timeline.HoodieTimeline;
import org.apache.hudi.common.table.timeline.versioning.TimelineLayoutVersion;
import org.apache.hudi.common.table.view.TableFileSystemView.BaseFileOnlyView;
import org.apache.hudi.common.testutils.HoodieTestDataGenerator;
import org.apache.hudi.common.testutils.HoodieTestTable;
import org.apache.hudi.common.testutils.RawTripTestPayload;
import org.apache.hudi.common.util.FileIOUtils;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.ParquetUtils;
import org.apache.hudi.common.util.collection.Pair;
import org.apache.hudi.config.HoodieCompactionConfig;
import org.apache.hudi.config.HoodieIndexConfig;
import org.apache.hudi.config.HoodieStorageConfig;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.exception.HoodieCommitException;
import org.apache.hudi.exception.HoodieIOException;
import org.apache.hudi.exception.HoodieRollbackException;
import org.apache.hudi.index.HoodieIndex;
import org.apache.hudi.index.HoodieIndex.IndexType;
import org.apache.hudi.table.HoodieSparkTable;
import org.apache.hudi.table.HoodieTable;
import org.apache.hudi.table.MarkerFiles;
import org.apache.hudi.table.action.commit.SparkWriteHelper;
import org.apache.hudi.testutils.HoodieClientTestBase;
import org.apache.hudi.testutils.HoodieClientTestUtils;
import org.apache.hudi.testutils.HoodieWriteableTestTable;
import org.apache.log4j.LogManager;
import org.apache.log4j.Logger;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.EnumSource;
import org.junit.jupiter.params.provider.ValueSource;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.UUID;
import java.util.stream.Collectors;
import static org.apache.hudi.common.table.timeline.versioning.TimelineLayoutVersion.VERSION_0;
import static org.apache.hudi.common.testutils.FileCreateUtils.getBaseFileCountsForPaths;
import static org.apache.hudi.common.testutils.HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH;
import static org.apache.hudi.common.testutils.HoodieTestDataGenerator.DEFAULT_SECOND_PARTITION_PATH;
import static org.apache.hudi.common.testutils.HoodieTestDataGenerator.DEFAULT_THIRD_PARTITION_PATH;
import static org.apache.hudi.common.testutils.HoodieTestDataGenerator.NULL_SCHEMA;
import static org.apache.hudi.common.testutils.HoodieTestDataGenerator.TRIP_EXAMPLE_SCHEMA;
import static org.apache.hudi.common.testutils.Transformations.randomSelectAsHoodieKeys;
import static org.apache.hudi.common.testutils.Transformations.recordsToRecordKeySet;
import static org.apache.hudi.common.util.ParquetUtils.readRowKeysFromParquet;
import static org.apache.hudi.testutils.Assertions.assertNoWriteErrors;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.junit.jupiter.api.Assertions.fail;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
@SuppressWarnings("unchecked")
public class TestHoodieClientOnCopyOnWriteStorage extends HoodieClientTestBase {
private static final Logger LOG = LogManager.getLogger(TestHoodieClientOnCopyOnWriteStorage.class);
private HoodieTestTable testTable;
@BeforeEach
public void setUpTestTable() {
testTable = HoodieWriteableTestTable.of(metaClient);
}
/**
* Test Auto Commit behavior for HoodieWriteClient insert API.
*/
@Test
public void testAutoCommitOnInsert() throws Exception {
testAutoCommit(SparkRDDWriteClient::insert, false);
}
/**
* Test Auto Commit behavior for HoodieWriteClient insertPrepped API.
*/
@Test
public void testAutoCommitOnInsertPrepped() throws Exception {
testAutoCommit(SparkRDDWriteClient::insertPreppedRecords, true);
}
/**
* Test Auto Commit behavior for HoodieWriteClient upsert API.
*/
@Test
public void testAutoCommitOnUpsert() throws Exception {
testAutoCommit(SparkRDDWriteClient::upsert, false);
}
/**
* Test Auto Commit behavior for HoodieWriteClient upsert Prepped API.
*/
@Test
public void testAutoCommitOnUpsertPrepped() throws Exception {
testAutoCommit(SparkRDDWriteClient::upsertPreppedRecords, true);
}
/**
* Test Auto Commit behavior for HoodieWriteClient bulk-insert API.
*/
@Test
public void testAutoCommitOnBulkInsert() throws Exception {
testAutoCommit(SparkRDDWriteClient::bulkInsert, false);
}
/**
* Test Auto Commit behavior for HoodieWriteClient bulk-insert prepped API.
*/
@Test
public void testAutoCommitOnBulkInsertPrepped() throws Exception {
testAutoCommit((writeClient, recordRDD, instantTime) -> writeClient.bulkInsertPreppedRecords(recordRDD, instantTime,
Option.empty()), true);
}
/**
* Test auto-commit by applying write function.
*
* @param writeFn One of HoodieWriteClient Write API
* @throws Exception in case of failure
*/
private void testAutoCommit(Function3<JavaRDD<WriteStatus>, SparkRDDWriteClient, JavaRDD<HoodieRecord>, String> writeFn,
boolean isPrepped) throws Exception {
// Set autoCommit false
HoodieWriteConfig cfg = getConfigBuilder().withAutoCommit(false).build();
try (SparkRDDWriteClient client = getHoodieWriteClient(cfg);) {
String prevCommitTime = "000";
String newCommitTime = "001";
int numRecords = 200;
JavaRDD<WriteStatus> result = insertFirstBatch(cfg, client, newCommitTime, prevCommitTime, numRecords, writeFn,
isPrepped, false, numRecords);
assertFalse(testTable.commitExists(newCommitTime),
"If Autocommit is false, then commit should not be made automatically");
assertTrue(client.commit(newCommitTime, result), "Commit should succeed");
assertTrue(testTable.commitExists(newCommitTime),
"After explicit commit, commit file should be created");
}
}
/**
* Test De-duplication behavior for HoodieWriteClient insert API.
*/
@Test
public void testDeduplicationOnInsert() throws Exception {
testDeduplication(SparkRDDWriteClient::insert);
}
/**
* Test De-duplication behavior for HoodieWriteClient bulk-insert API.
*/
@Test
public void testDeduplicationOnBulkInsert() throws Exception {
testDeduplication(SparkRDDWriteClient::bulkInsert);
}
/**
* Test De-duplication behavior for HoodieWriteClient upsert API.
*/
@Test
public void testDeduplicationOnUpsert() throws Exception {
testDeduplication(SparkRDDWriteClient::upsert);
}
/**
* Test Deduplication Logic for write function.
*
* @param writeFn One of HoddieWriteClient non-prepped write APIs
* @throws Exception in case of failure
*/
private void testDeduplication(
Function3<JavaRDD<WriteStatus>, SparkRDDWriteClient, JavaRDD<HoodieRecord>, String> writeFn) throws Exception {
String newCommitTime = "001";
String recordKey = UUID.randomUUID().toString();
HoodieKey keyOne = new HoodieKey(recordKey, "2018-01-01");
HoodieRecord<RawTripTestPayload> recordOne =
new HoodieRecord(keyOne, dataGen.generateRandomValue(keyOne, newCommitTime));
HoodieKey keyTwo = new HoodieKey(recordKey, "2018-02-01");
HoodieRecord recordTwo =
new HoodieRecord(keyTwo, dataGen.generateRandomValue(keyTwo, newCommitTime));
// Same key and partition as keyTwo
HoodieRecord recordThree =
new HoodieRecord(keyTwo, dataGen.generateRandomValue(keyTwo, newCommitTime));
JavaRDD<HoodieRecord<RawTripTestPayload>> records =
jsc.parallelize(Arrays.asList(recordOne, recordTwo, recordThree), 1);
// Global dedup should be done based on recordKey only
HoodieIndex index = mock(HoodieIndex.class);
when(index.isGlobal()).thenReturn(true);
List<HoodieRecord<RawTripTestPayload>> dedupedRecs = SparkWriteHelper.newInstance().deduplicateRecords(records, index, 1).collect();
assertEquals(1, dedupedRecs.size());
assertNodupesWithinPartition(dedupedRecs);
// non-Global dedup should be done based on both recordKey and partitionPath
index = mock(HoodieIndex.class);
when(index.isGlobal()).thenReturn(false);
dedupedRecs = SparkWriteHelper.newInstance().deduplicateRecords(records, index, 1).collect();
assertEquals(2, dedupedRecs.size());
assertNodupesWithinPartition(dedupedRecs);
// Perform write-action and check
JavaRDD<HoodieRecord> recordList = jsc.parallelize(Arrays.asList(recordOne, recordTwo, recordThree), 1);
try (SparkRDDWriteClient client = getHoodieWriteClient(getConfigBuilder().combineInput(true, true).build(), false);) {
client.startCommitWithTime(newCommitTime);
List<WriteStatus> statuses = writeFn.apply(client, recordList, newCommitTime).collect();
assertNoWriteErrors(statuses);
assertEquals(2, statuses.size());
assertNodupesInPartition(statuses.stream().map(WriteStatus::getWrittenRecords).flatMap(Collection::stream)
.collect(Collectors.toList()));
}
}
/**
* Assert that there is no duplicate key at the partition level.
*
* @param records List of Hoodie records
*/
void assertNodupesInPartition(List<HoodieRecord> records) {
Map<String, Set<String>> partitionToKeys = new HashMap<>();
for (HoodieRecord r : records) {
String key = r.getRecordKey();
String partitionPath = r.getPartitionPath();
if (!partitionToKeys.containsKey(partitionPath)) {
partitionToKeys.put(partitionPath, new HashSet<>());
}
assertFalse(partitionToKeys.get(partitionPath).contains(key), "key " + key + " is duplicate within partition " + partitionPath);
partitionToKeys.get(partitionPath).add(key);
}
}
/**
* Test Upsert API.
*/
@Test
public void testUpserts() throws Exception {
testUpsertsInternal(getConfig(), SparkRDDWriteClient::upsert, false);
}
/**
* Test UpsertPrepped API.
*/
@Test
public void testUpsertsPrepped() throws Exception {
testUpsertsInternal(getConfig(), SparkRDDWriteClient::upsertPreppedRecords, true);
}
/**
* Test one of HoodieWriteClient upsert(Prepped) APIs.
*
* @param config Write Config
* @param writeFn One of Hoodie Write Function API
* @throws Exception in case of error
*/
private void testUpsertsInternal(HoodieWriteConfig config,
Function3<JavaRDD<WriteStatus>, SparkRDDWriteClient, JavaRDD<HoodieRecord>, String> writeFn, boolean isPrepped)
throws Exception {
// Force using older timeline layout
HoodieWriteConfig hoodieWriteConfig = getConfigBuilder().withProps(config.getProps()).withTimelineLayoutVersion(
VERSION_0).build();
HoodieTableMetaClient.initTableType(metaClient.getHadoopConf(), metaClient.getBasePath(), metaClient.getTableType(),
metaClient.getTableConfig().getTableName(), metaClient.getArchivePath(),
metaClient.getTableConfig().getPayloadClass(), VERSION_0);
SparkRDDWriteClient client = getHoodieWriteClient(hoodieWriteConfig, false);
// Write 1 (only inserts)
String newCommitTime = "001";
String initCommitTime = "000";
int numRecords = 200;
insertFirstBatch(hoodieWriteConfig, client, newCommitTime, initCommitTime, numRecords, SparkRDDWriteClient::insert,
isPrepped, true, numRecords);
// Write 2 (updates)
String prevCommitTime = newCommitTime;
newCommitTime = "004";
numRecords = 100;
String commitTimeBetweenPrevAndNew = "002";
updateBatch(hoodieWriteConfig, client, newCommitTime, prevCommitTime,
Option.of(Arrays.asList(commitTimeBetweenPrevAndNew)), initCommitTime, numRecords, writeFn, isPrepped, true,
numRecords, 200, 2);
// Delete 1
prevCommitTime = newCommitTime;
newCommitTime = "005";
numRecords = 50;
deleteBatch(hoodieWriteConfig, client, newCommitTime, prevCommitTime,
initCommitTime, numRecords, SparkRDDWriteClient::delete, isPrepped, true,
0, 150);
// Now simulate an upgrade and perform a restore operation
HoodieWriteConfig newConfig = getConfigBuilder().withProps(config.getProps()).withTimelineLayoutVersion(
TimelineLayoutVersion.CURR_VERSION).build();
client = getHoodieWriteClient(newConfig, false);
client.restoreToInstant("004");
// Check the entire dataset has all records still
String[] fullPartitionPaths = new String[dataGen.getPartitionPaths().length];
for (int i = 0; i < fullPartitionPaths.length; i++) {
fullPartitionPaths[i] = String.format("%s/%s/*", basePath, dataGen.getPartitionPaths()[i]);
}
assertEquals(200, HoodieClientTestUtils.read(jsc, basePath, sqlContext, fs, fullPartitionPaths).count(),
"Must contain " + 200 + " records");
// Perform Delete again on upgraded dataset.
prevCommitTime = newCommitTime;
newCommitTime = "006";
numRecords = 50;
deleteBatch(newConfig, client, newCommitTime, prevCommitTime,
initCommitTime, numRecords, SparkRDDWriteClient::delete, isPrepped, true,
0, 150);
HoodieActiveTimeline activeTimeline = new HoodieActiveTimeline(metaClient, false);
List<HoodieInstant> instants = activeTimeline.getCommitTimeline().getInstants().collect(Collectors.toList());
assertEquals(5, instants.size());
assertEquals(new HoodieInstant(HoodieInstant.State.COMPLETED, HoodieTimeline.COMMIT_ACTION, "001"),
instants.get(0));
assertEquals(new HoodieInstant(HoodieInstant.State.COMPLETED, HoodieTimeline.COMMIT_ACTION, "004"),
instants.get(1));
// New Format should have all states of instants
assertEquals(new HoodieInstant(HoodieInstant.State.REQUESTED, HoodieTimeline.COMMIT_ACTION, "006"),
instants.get(2));
assertEquals(new HoodieInstant(HoodieInstant.State.INFLIGHT, HoodieTimeline.COMMIT_ACTION, "006"),
instants.get(3));
assertEquals(new HoodieInstant(HoodieInstant.State.COMPLETED, HoodieTimeline.COMMIT_ACTION, "006"),
instants.get(4));
}
/**
* Tesst deletion of records.
*/
@Test
public void testDeletes() throws Exception {
SparkRDDWriteClient client = getHoodieWriteClient(getConfig(), false);
/**
* Write 1 (inserts and deletes) Write actual 200 insert records and ignore 100 delete records
*/
String initCommitTime = "000";
String newCommitTime = "001";
final List<HoodieRecord> recordsInFirstBatch = new ArrayList<>();
Function2<List<HoodieRecord>, String, Integer> recordGenFunction =
(String instantTime, Integer numRecordsInThisCommit) -> {
List<HoodieRecord> fewRecordsForInsert = dataGen.generateInserts(instantTime, 200);
List<HoodieRecord> fewRecordsForDelete = dataGen.generateDeletes(instantTime, 100);
recordsInFirstBatch.addAll(fewRecordsForInsert);
recordsInFirstBatch.addAll(fewRecordsForDelete);
return recordsInFirstBatch;
};
writeBatch(client, newCommitTime, initCommitTime, Option.empty(), initCommitTime,
// unused as genFn uses hard-coded number of inserts/updates/deletes
-1, recordGenFunction, SparkRDDWriteClient::upsert, true, 200, 200, 1);
/**
* Write 2 (deletes+writes).
*/
String prevCommitTime = newCommitTime;
newCommitTime = "004";
final List<HoodieRecord> recordsInSecondBatch = new ArrayList<>();
recordGenFunction = (String instantTime, Integer numRecordsInThisCommit) -> {
List<HoodieRecord> fewRecordsForDelete = recordsInFirstBatch.subList(0, 50);
List<HoodieRecord> fewRecordsForUpdate = recordsInFirstBatch.subList(50, 100);
recordsInSecondBatch.addAll(dataGen.generateDeletesFromExistingRecords(fewRecordsForDelete));
recordsInSecondBatch.addAll(fewRecordsForUpdate);
return recordsInSecondBatch;
};
writeBatch(client, newCommitTime, prevCommitTime, Option.empty(), initCommitTime, 100, recordGenFunction,
SparkRDDWriteClient::upsert, true, 50, 150, 2);
}
/**
* When records getting inserted are deleted in the same write batch, hudi should have deleted those records and
* not be available in read path.
* @throws Exception
*/
@Test
public void testDeletesForInsertsInSameBatch() throws Exception {
SparkRDDWriteClient client = getHoodieWriteClient(getConfig(), false);
/**
* Write 200 inserts and issue deletes to a subset(50) of inserts.
*/
String initCommitTime = "000";
String newCommitTime = "001";
final List<HoodieRecord> recordsInFirstBatch = new ArrayList<>();
Function2<List<HoodieRecord>, String, Integer> recordGenFunction =
(String instantTime, Integer numRecordsInThisCommit) -> {
List<HoodieRecord> fewRecordsForInsert = dataGen.generateInserts(instantTime, 200);
List<HoodieRecord> fewRecordsForDelete = fewRecordsForInsert.subList(40, 90);
recordsInFirstBatch.addAll(fewRecordsForInsert);
recordsInFirstBatch.addAll(dataGen.generateDeletesFromExistingRecords(fewRecordsForDelete));
return recordsInFirstBatch;
};
writeBatch(client, newCommitTime, initCommitTime, Option.empty(), initCommitTime,
-1, recordGenFunction, SparkRDDWriteClient::upsert, true, 150, 150, 1);
}
/**
* Test update of a record to different partition with Global Index.
*/
@ParameterizedTest
@EnumSource(value = IndexType.class, names = {"GLOBAL_BLOOM", "GLOBAL_SIMPLE"})
public void testUpsertsUpdatePartitionPathGlobalBloom(IndexType indexType) throws Exception {
testUpsertsUpdatePartitionPath(indexType, getConfig(), SparkRDDWriteClient::upsert);
}
/**
* This test ensures in a global bloom when update partition path is set to true in config, if an incoming record has mismatched partition
* compared to whats in storage, then appropriate actions are taken. i.e. old record is deleted in old partition and new one is inserted
* in the new partition.
* test structure:
* 1. insert 1 batch
* 2. insert 2nd batch with larger no of records so that a new file group is created for partitions
* 3. issue upserts to records from batch 1 with different partition path. This should ensure records from batch 1 are deleted and new
* records are upserted to the new partition
*
* @param indexType index type to be tested for
* @param config instance of {@link HoodieWriteConfig} to use
* @param writeFn write function to be used for testing
*/
private void testUpsertsUpdatePartitionPath(IndexType indexType, HoodieWriteConfig config,
Function3<JavaRDD<WriteStatus>, SparkRDDWriteClient, JavaRDD<HoodieRecord>, String> writeFn)
throws Exception {
// instantiate client
HoodieWriteConfig hoodieWriteConfig = getConfigBuilder()
.withProps(config.getProps())
.withCompactionConfig(
HoodieCompactionConfig.newBuilder().compactionSmallFileSize(10000).build())
.withIndexConfig(HoodieIndexConfig.newBuilder().withIndexType(indexType)
.withBloomIndexUpdatePartitionPath(true)
.withGlobalSimpleIndexUpdatePartitionPath(true)
.build()).withTimelineLayoutVersion(VERSION_0).build();
HoodieTableMetaClient.initTableType(metaClient.getHadoopConf(), metaClient.getBasePath(),
metaClient.getTableType(), metaClient.getTableConfig().getTableName(), metaClient.getArchivePath(),
metaClient.getTableConfig().getPayloadClass(), VERSION_0);
SparkRDDWriteClient client = getHoodieWriteClient(hoodieWriteConfig, false);
// Write 1
String newCommitTime = "001";
int numRecords = 10;
client.startCommitWithTime(newCommitTime);
List<HoodieRecord> records = dataGen.generateInserts(newCommitTime, numRecords);
Set<Pair<String, String>> expectedPartitionPathRecKeyPairs = new HashSet<>();
// populate expected partition path and record keys
for (HoodieRecord rec : records) {
expectedPartitionPathRecKeyPairs.add(Pair.of(rec.getPartitionPath(), rec.getRecordKey()));
}
JavaRDD<HoodieRecord> writeRecords = jsc.parallelize(records, 1);
JavaRDD<WriteStatus> result = writeFn.apply(client, writeRecords, newCommitTime);
result.collect();
// Check the entire dataset has all records
String[] fullPartitionPaths = getFullPartitionPaths();
assertPartitionPathRecordKeys(expectedPartitionPathRecKeyPairs, fullPartitionPaths);
// verify one basefile per partition
String[] fullExpectedPartitionPaths = getFullPartitionPaths(expectedPartitionPathRecKeyPairs.stream().map(Pair::getLeft).toArray(String[]::new));
Map<String, Long> baseFileCounts = getBaseFileCountsForPaths(basePath, fs, fullExpectedPartitionPaths);
for (Map.Entry<String, Long> entry : baseFileCounts.entrySet()) {
assertEquals(1, entry.getValue());
}
assertTrue(baseFileCounts.entrySet().stream().allMatch(entry -> entry.getValue() == 1));
// Write 2
newCommitTime = "002";
numRecords = 20; // so that a new file id is created
client.startCommitWithTime(newCommitTime);
List<HoodieRecord> recordsSecondBatch = dataGen.generateInserts(newCommitTime, numRecords);
// populate expected partition path and record keys
for (HoodieRecord rec : recordsSecondBatch) {
expectedPartitionPathRecKeyPairs.add(Pair.of(rec.getPartitionPath(), rec.getRecordKey()));
}
writeRecords = jsc.parallelize(recordsSecondBatch, 1);
result = writeFn.apply(client, writeRecords, newCommitTime);
result.collect();
// Check the entire dataset has all records
fullPartitionPaths = getFullPartitionPaths();
assertPartitionPathRecordKeys(expectedPartitionPathRecKeyPairs, fullPartitionPaths);
// verify that there are more than 1 basefiles per partition
// we can't guarantee randomness in partitions where records are distributed. So, verify atleast one partition has more than 1 basefile.
baseFileCounts = getBaseFileCountsForPaths(basePath, fs, fullPartitionPaths);
assertTrue(baseFileCounts.entrySet().stream().filter(entry -> entry.getValue() > 1).count() >= 1,
"At least one partition should have more than 1 base file after 2nd batch of writes");
// Write 3 (upserts to records from batch 1 with diff partition path)
newCommitTime = "003";
// update to diff partition paths
List<HoodieRecord> recordsToUpsert = new ArrayList<>();
for (HoodieRecord rec : records) {
// remove older entry from expected partition path record key pairs
expectedPartitionPathRecKeyPairs
.remove(Pair.of(rec.getPartitionPath(), rec.getRecordKey()));
String partitionPath = rec.getPartitionPath();
String newPartitionPath = null;
if (partitionPath.equalsIgnoreCase(DEFAULT_FIRST_PARTITION_PATH)) {
newPartitionPath = DEFAULT_SECOND_PARTITION_PATH;
} else if (partitionPath.equalsIgnoreCase(DEFAULT_SECOND_PARTITION_PATH)) {
newPartitionPath = DEFAULT_THIRD_PARTITION_PATH;
} else if (partitionPath.equalsIgnoreCase(DEFAULT_THIRD_PARTITION_PATH)) {
newPartitionPath = DEFAULT_FIRST_PARTITION_PATH;
} else {
throw new IllegalStateException("Unknown partition path " + rec.getPartitionPath());
}
recordsToUpsert.add(
new HoodieRecord(new HoodieKey(rec.getRecordKey(), newPartitionPath),
rec.getData()));
// populate expected partition path and record keys
expectedPartitionPathRecKeyPairs.add(Pair.of(newPartitionPath, rec.getRecordKey()));
}
writeRecords = jsc.parallelize(recordsToUpsert, 1);
result = writeFn.apply(client, writeRecords, newCommitTime);
result.collect();
// Check the entire dataset has all records
fullPartitionPaths = getFullPartitionPaths();
assertPartitionPathRecordKeys(expectedPartitionPathRecKeyPairs, fullPartitionPaths);
}
private void assertPartitionPathRecordKeys(Set<Pair<String, String>> expectedPartitionPathRecKeyPairs, String[] fullPartitionPaths) {
Dataset<Row> rows = getAllRows(fullPartitionPaths);
List<Pair<String, String>> actualPartitionPathRecKeyPairs = getActualPartitionPathAndRecordKeys(rows);
// verify all partitionpath, record key matches
assertActualAndExpectedPartitionPathRecordKeyMatches(expectedPartitionPathRecKeyPairs, actualPartitionPathRecKeyPairs);
}
private List<Pair<String, String>> getActualPartitionPathAndRecordKeys(Dataset<org.apache.spark.sql.Row> rows) {
List<Pair<String, String>> actualPartitionPathRecKeyPairs = new ArrayList<>();
for (Row row : rows.collectAsList()) {
actualPartitionPathRecKeyPairs
.add(Pair.of(row.getAs("_hoodie_partition_path"), row.getAs("_row_key")));
}
return actualPartitionPathRecKeyPairs;
}
private Dataset<org.apache.spark.sql.Row> getAllRows(String[] fullPartitionPaths) {
return HoodieClientTestUtils
.read(jsc, basePath, sqlContext, fs, fullPartitionPaths);
}
private String[] getFullPartitionPaths() {
return getFullPartitionPaths(dataGen.getPartitionPaths());
}
private String[] getFullPartitionPaths(String[] relativePartitionPaths) {
String[] fullPartitionPaths = new String[relativePartitionPaths.length];
for (int i = 0; i < fullPartitionPaths.length; i++) {
fullPartitionPaths[i] = String.format("%s/%s/*", basePath, relativePartitionPaths[i]);
}
return fullPartitionPaths;
}
private void assertActualAndExpectedPartitionPathRecordKeyMatches(Set<Pair<String, String>> expectedPartitionPathRecKeyPairs,
List<Pair<String, String>> actualPartitionPathRecKeyPairs) {
// verify all partitionpath, record key matches
assertEquals(expectedPartitionPathRecKeyPairs.size(), actualPartitionPathRecKeyPairs.size());
for (Pair<String, String> entry : actualPartitionPathRecKeyPairs) {
assertTrue(expectedPartitionPathRecKeyPairs.contains(entry));
}
for (Pair<String, String> entry : expectedPartitionPathRecKeyPairs) {
assertTrue(actualPartitionPathRecKeyPairs.contains(entry));
}
}
/**
* Test scenario of new file-group getting added during upsert().
*/
@Test
public void testSmallInsertHandlingForUpserts() throws Exception {
final String testPartitionPath = "2016/09/26";
final int insertSplitLimit = 100;
// setup the small file handling params
HoodieWriteConfig config = getSmallInsertWriteConfig(insertSplitLimit); // hold upto 200 records max
dataGen = new HoodieTestDataGenerator(new String[] {testPartitionPath});
SparkRDDWriteClient client = getHoodieWriteClient(config, false);
// Inserts => will write file1
String commitTime1 = "001";
client.startCommitWithTime(commitTime1);
List<HoodieRecord> inserts1 = dataGen.generateInserts(commitTime1, insertSplitLimit); // this writes ~500kb
Set<String> keys1 = recordsToRecordKeySet(inserts1);
JavaRDD<HoodieRecord> insertRecordsRDD1 = jsc.parallelize(inserts1, 1);
List<WriteStatus> statuses = client.upsert(insertRecordsRDD1, commitTime1).collect();
assertNoWriteErrors(statuses);
assertEquals(1, statuses.size(), "Just 1 file needs to be added.");
String file1 = statuses.get(0).getFileId();
assertEquals(100,
readRowKeysFromParquet(hadoopConf, new Path(basePath, statuses.get(0).getStat().getPath()))
.size(), "file should contain 100 records");
// Update + Inserts such that they just expand file1
String commitTime2 = "002";
client.startCommitWithTime(commitTime2);
List<HoodieRecord> inserts2 = dataGen.generateInserts(commitTime2, 40);
Set<String> keys2 = recordsToRecordKeySet(inserts2);
List<HoodieRecord> insertsAndUpdates2 = new ArrayList<>();
insertsAndUpdates2.addAll(inserts2);
insertsAndUpdates2.addAll(dataGen.generateUpdates(commitTime2, inserts1));
JavaRDD<HoodieRecord> insertAndUpdatesRDD2 = jsc.parallelize(insertsAndUpdates2, 1);
statuses = client.upsert(insertAndUpdatesRDD2, commitTime2).collect();
assertNoWriteErrors(statuses);
assertEquals(1, statuses.size(), "Just 1 file needs to be updated.");
assertEquals(file1, statuses.get(0).getFileId(), "Existing file should be expanded");
assertEquals(commitTime1, statuses.get(0).getStat().getPrevCommit(), "Existing file should be expanded");
Path newFile = new Path(basePath, statuses.get(0).getStat().getPath());
assertEquals(140, readRowKeysFromParquet(hadoopConf, newFile).size(),
"file should contain 140 records");
List<GenericRecord> records = ParquetUtils.readAvroRecords(hadoopConf, newFile);
for (GenericRecord record : records) {
String recordKey = record.get(HoodieRecord.RECORD_KEY_METADATA_FIELD).toString();
assertEquals(commitTime2, record.get(HoodieRecord.COMMIT_TIME_METADATA_FIELD).toString(), "only expect commit2");
assertTrue(keys2.contains(recordKey) || keys1.contains(recordKey), "key expected to be part of commit2");
}
// update + inserts such that file1 is updated and expanded, a new file2 is created.
String commitTime3 = "003";
client.startCommitWithTime(commitTime3);
List<HoodieRecord> insertsAndUpdates3 = dataGen.generateInserts(commitTime3, 200);
Set<String> keys3 = recordsToRecordKeySet(insertsAndUpdates3);
List<HoodieRecord> updates3 = dataGen.generateUpdates(commitTime3, inserts2);
insertsAndUpdates3.addAll(updates3);
JavaRDD<HoodieRecord> insertAndUpdatesRDD3 = jsc.parallelize(insertsAndUpdates3, 1);
statuses = client.upsert(insertAndUpdatesRDD3, commitTime3).collect();
assertNoWriteErrors(statuses);
assertEquals(2, statuses.size(), "2 files needs to be committed.");
HoodieTableMetaClient metadata = new HoodieTableMetaClient(hadoopConf, basePath);
HoodieTable table = getHoodieTable(metadata, config);
BaseFileOnlyView fileSystemView = table.getBaseFileOnlyView();
List<HoodieBaseFile> files =
fileSystemView.getLatestBaseFilesBeforeOrOn(testPartitionPath, commitTime3).collect(Collectors.toList());
int numTotalInsertsInCommit3 = 0;
int numTotalUpdatesInCommit3 = 0;
for (HoodieBaseFile file : files) {
if (file.getFileName().contains(file1)) {
assertEquals(commitTime3, file.getCommitTime(), "Existing file should be expanded");
records = ParquetUtils.readAvroRecords(hadoopConf, new Path(file.getPath()));
for (GenericRecord record : records) {
String recordKey = record.get(HoodieRecord.RECORD_KEY_METADATA_FIELD).toString();
String recordCommitTime = record.get(HoodieRecord.COMMIT_TIME_METADATA_FIELD).toString();
if (recordCommitTime.equals(commitTime3)) {
if (keys2.contains(recordKey)) {
keys2.remove(recordKey);
numTotalUpdatesInCommit3++;
} else {
numTotalInsertsInCommit3++;
}
}
}
assertEquals(0, keys2.size(), "All keys added in commit 2 must be updated in commit3 correctly");
} else {
assertEquals(commitTime3, file.getCommitTime(), "New file must be written for commit 3");
records = ParquetUtils.readAvroRecords(hadoopConf, new Path(file.getPath()));
for (GenericRecord record : records) {
String recordKey = record.get(HoodieRecord.RECORD_KEY_METADATA_FIELD).toString();
assertEquals(commitTime3, record.get(HoodieRecord.COMMIT_TIME_METADATA_FIELD).toString(),
"only expect commit3");
assertTrue(keys3.contains(recordKey), "key expected to be part of commit3");
}
numTotalInsertsInCommit3 += records.size();
}
}
assertEquals(numTotalUpdatesInCommit3, inserts2.size(), "Total updates in commit3 must add up");
assertEquals(numTotalInsertsInCommit3, keys3.size(), "Total inserts in commit3 must add up");
}
/**
* Test scenario of new file-group getting added during insert().
*/
@Test
public void testSmallInsertHandlingForInserts() throws Exception {
final String testPartitionPath = "2016/09/26";
final int insertSplitLimit = 100;
// setup the small file handling params
HoodieWriteConfig config = getSmallInsertWriteConfig(insertSplitLimit); // hold upto 200 records max
dataGen = new HoodieTestDataGenerator(new String[] {testPartitionPath});
SparkRDDWriteClient client = getHoodieWriteClient(config, false);
// Inserts => will write file1
String commitTime1 = "001";
client.startCommitWithTime(commitTime1);
List<HoodieRecord> inserts1 = dataGen.generateInserts(commitTime1, insertSplitLimit); // this writes ~500kb
Set<String> keys1 = recordsToRecordKeySet(inserts1);
JavaRDD<HoodieRecord> insertRecordsRDD1 = jsc.parallelize(inserts1, 1);
List<WriteStatus> statuses = client.insert(insertRecordsRDD1, commitTime1).collect();
assertNoWriteErrors(statuses);
assertPartitionMetadata(new String[] {testPartitionPath}, fs);
assertEquals(1, statuses.size(), "Just 1 file needs to be added.");
String file1 = statuses.get(0).getFileId();
assertEquals(100,
readRowKeysFromParquet(hadoopConf, new Path(basePath, statuses.get(0).getStat().getPath()))
.size(), "file should contain 100 records");
// Second, set of Inserts should just expand file1
String commitTime2 = "002";
client.startCommitWithTime(commitTime2);
List<HoodieRecord> inserts2 = dataGen.generateInserts(commitTime2, 40);
Set<String> keys2 = recordsToRecordKeySet(inserts2);
JavaRDD<HoodieRecord> insertRecordsRDD2 = jsc.parallelize(inserts2, 1);
statuses = client.insert(insertRecordsRDD2, commitTime2).collect();
assertNoWriteErrors(statuses);
assertEquals(1, statuses.size(), "Just 1 file needs to be updated.");
assertEquals(file1, statuses.get(0).getFileId(), "Existing file should be expanded");
assertEquals(commitTime1, statuses.get(0).getStat().getPrevCommit(), "Existing file should be expanded");
Path newFile = new Path(basePath, statuses.get(0).getStat().getPath());
assertEquals(140, readRowKeysFromParquet(hadoopConf, newFile).size(),
"file should contain 140 records");
List<GenericRecord> records = ParquetUtils.readAvroRecords(hadoopConf, newFile);
for (GenericRecord record : records) {
String recordKey = record.get(HoodieRecord.RECORD_KEY_METADATA_FIELD).toString();
String recCommitTime = record.get(HoodieRecord.COMMIT_TIME_METADATA_FIELD).toString();
assertTrue(commitTime1.equals(recCommitTime) || commitTime2.equals(recCommitTime),
"Record expected to be part of commit 1 or commit2");
assertTrue(keys2.contains(recordKey) || keys1.contains(recordKey),
"key expected to be part of commit 1 or commit2");
}
// Lots of inserts such that file1 is updated and expanded, a new file2 is created.
String commitTime3 = "003";
client.startCommitWithTime(commitTime3);
List<HoodieRecord> insert3 = dataGen.generateInserts(commitTime3, 200);
JavaRDD<HoodieRecord> insertRecordsRDD3 = jsc.parallelize(insert3, 1);
statuses = client.insert(insertRecordsRDD3, commitTime3).collect();
assertNoWriteErrors(statuses);
assertEquals(2, statuses.size(), "2 files needs to be committed.");
HoodieTableMetaClient metaClient = new HoodieTableMetaClient(hadoopConf, basePath);
HoodieTable table = getHoodieTable(metaClient, config);
List<HoodieBaseFile> files = table.getBaseFileOnlyView()
.getLatestBaseFilesBeforeOrOn(testPartitionPath, commitTime3).collect(Collectors.toList());
assertEquals(2, files.size(), "Total of 2 valid data files");
int totalInserts = 0;
for (HoodieBaseFile file : files) {
assertEquals(commitTime3, file.getCommitTime(), "All files must be at commit 3");
records = ParquetUtils.readAvroRecords(hadoopConf, new Path(file.getPath()));
totalInserts += records.size();
}
assertEquals(totalInserts, inserts1.size() + inserts2.size() + insert3.size(),
"Total number of records must add up");
}
/**
* Test delete with delete api.
*/
@Test
public void testDeletesWithDeleteApi() throws Exception {
final String testPartitionPath = "2016/09/26";
final int insertSplitLimit = 100;
// setup the small file handling params
HoodieWriteConfig config = getSmallInsertWriteConfig(insertSplitLimit); // hold upto 200 records max
dataGen = new HoodieTestDataGenerator(new String[] {testPartitionPath});
SparkRDDWriteClient client = getHoodieWriteClient(config, false);
// Inserts => will write file1
String commitTime1 = "001";
client.startCommitWithTime(commitTime1);
List<HoodieRecord> inserts1 = dataGen.generateInserts(commitTime1, insertSplitLimit); // this writes ~500kb
Set<String> keys1 = recordsToRecordKeySet(inserts1);
List<String> keysSoFar = new ArrayList<>(keys1);
JavaRDD<HoodieRecord> insertRecordsRDD1 = jsc.parallelize(inserts1, 1);
List<WriteStatus> statuses = client.upsert(insertRecordsRDD1, commitTime1).collect();
assertNoWriteErrors(statuses);
assertEquals(1, statuses.size(), "Just 1 file needs to be added.");
String file1 = statuses.get(0).getFileId();
assertEquals(100,
readRowKeysFromParquet(hadoopConf, new Path(basePath, statuses.get(0).getStat().getPath()))
.size(), "file should contain 100 records");
// Delete 20 among 100 inserted
testDeletes(client, inserts1, 20, file1, "002", 80, keysSoFar);
// Insert and update 40 records
Pair<Set<String>, List<HoodieRecord>> updateBatch2 = testUpdates("003", client, 40, 120);
keysSoFar.addAll(updateBatch2.getLeft());
// Delete 10 records among 40 updated
testDeletes(client, updateBatch2.getRight(), 10, file1, "004", 110, keysSoFar);
// do another batch of updates
Pair<Set<String>, List<HoodieRecord>> updateBatch3 = testUpdates("005", client, 40, 150);
keysSoFar.addAll(updateBatch3.getLeft());
// delete non existent keys
String commitTime6 = "006";
client.startCommitWithTime(commitTime6);
List<HoodieRecord> dummyInserts3 = dataGen.generateInserts(commitTime6, 20);
List<HoodieKey> hoodieKeysToDelete3 = randomSelectAsHoodieKeys(dummyInserts3, 20);
JavaRDD<HoodieKey> deleteKeys3 = jsc.parallelize(hoodieKeysToDelete3, 1);
statuses = client.delete(deleteKeys3, commitTime6).collect();
assertNoWriteErrors(statuses);
assertEquals(0, statuses.size(), "Just 0 write status for delete.");
// Check the entire dataset has all records still
String[] fullPartitionPaths = new String[dataGen.getPartitionPaths().length];
for (int i = 0; i < fullPartitionPaths.length; i++) {
fullPartitionPaths[i] = String.format("%s/%s/*", basePath, dataGen.getPartitionPaths()[i]);
}
assertEquals(150,
HoodieClientTestUtils.read(jsc, basePath, sqlContext, fs, fullPartitionPaths).count(),
"Must contain " + 150 + " records");
// delete another batch. previous delete commit should have persisted the schema. If not,
// this will throw exception
testDeletes(client, updateBatch3.getRight(), 10, file1, "007", 140, keysSoFar);
}
/**
* Test scenario of writing more file groups than existing number of file groups in partition.
*/
@Test
public void testInsertOverwritePartitionHandlingWithMoreRecords() throws Exception {
verifyInsertOverwritePartitionHandling(1000, 3000);
}
/**
* Test scenario of writing fewer file groups than existing number of file groups in partition.
*/
@Test
public void testInsertOverwritePartitionHandlingWithFewerRecords() throws Exception {
verifyInsertOverwritePartitionHandling(3000, 1000);
}
/**
* Test scenario of writing similar number file groups in partition.
*/
@Test
public void testInsertOverwritePartitionHandlinWithSimilarNumberOfRecords() throws Exception {
verifyInsertOverwritePartitionHandling(3000, 3000);
}
/**
* 1) Do write1 (upsert) with 'batch1RecordsCount' number of records.
* 2) Do write2 (insert overwrite) with 'batch2RecordsCount' number of records.
*
* Verify that all records in step1 are overwritten
*/
private void verifyInsertOverwritePartitionHandling(int batch1RecordsCount, int batch2RecordsCount) throws Exception {
final String testPartitionPath = "americas";
HoodieWriteConfig config = getSmallInsertWriteConfig(2000, false);
SparkRDDWriteClient client = getHoodieWriteClient(config, false);
dataGen = new HoodieTestDataGenerator(new String[] {testPartitionPath});
// Do Inserts
String commitTime1 = "001";
client.startCommitWithTime(commitTime1);
List<HoodieRecord> inserts1 = dataGen.generateInserts(commitTime1, batch1RecordsCount);
JavaRDD<HoodieRecord> insertRecordsRDD1 = jsc.parallelize(inserts1, 2);
List<WriteStatus> statuses = client.upsert(insertRecordsRDD1, commitTime1).collect();
assertNoWriteErrors(statuses);
Set<String> batch1Buckets = statuses.stream().map(s -> s.getFileId()).collect(Collectors.toSet());
verifyRecordsWritten(commitTime1, inserts1, statuses);
// Do Insert Overwrite
String commitTime2 = "002";
client.startCommitWithTime(commitTime2, HoodieTimeline.REPLACE_COMMIT_ACTION);
List<HoodieRecord> inserts2 = dataGen.generateInserts(commitTime2, batch2RecordsCount);
List<HoodieRecord> insertsAndUpdates2 = new ArrayList<>();
insertsAndUpdates2.addAll(inserts2);
JavaRDD<HoodieRecord> insertAndUpdatesRDD2 = jsc.parallelize(insertsAndUpdates2, 2);
HoodieWriteResult writeResult = client.insertOverwrite(insertAndUpdatesRDD2, commitTime2);
statuses = writeResult.getWriteStatuses().collect();
assertNoWriteErrors(statuses);
assertEquals(batch1Buckets, new HashSet<>(writeResult.getPartitionToReplaceFileIds().get(testPartitionPath)));
verifyRecordsWritten(commitTime2, inserts2, statuses);
}
/**
* Verify data in parquet files matches expected records and commit time.
*/
private void verifyRecordsWritten(String commitTime, List<HoodieRecord> expectedRecords, List<WriteStatus> allStatus) {
List<GenericRecord> records = new ArrayList<>();
for (WriteStatus status : allStatus) {
Path filePath = new Path(basePath, status.getStat().getPath());
records.addAll(ParquetUtils.readAvroRecords(jsc.hadoopConfiguration(), filePath));
}
Set<String> expectedKeys = recordsToRecordKeySet(expectedRecords);
assertEquals(records.size(), expectedKeys.size());
for (GenericRecord record : records) {
String recordKey = record.get(HoodieRecord.RECORD_KEY_METADATA_FIELD).toString();
assertEquals(commitTime,
record.get(HoodieRecord.COMMIT_TIME_METADATA_FIELD).toString());
assertTrue(expectedKeys.contains(recordKey));
}
}
private Pair<Set<String>, List<HoodieRecord>> testUpdates(String instantTime, SparkRDDWriteClient client,
int sizeToInsertAndUpdate, int expectedTotalRecords)
throws IOException {
client.startCommitWithTime(instantTime);
List<HoodieRecord> inserts = dataGen.generateInserts(instantTime, sizeToInsertAndUpdate);
Set<String> keys = recordsToRecordKeySet(inserts);
List<HoodieRecord> insertsAndUpdates = new ArrayList<>();
insertsAndUpdates.addAll(inserts);
insertsAndUpdates.addAll(dataGen.generateUpdates(instantTime, inserts));
JavaRDD<HoodieRecord> insertAndUpdatesRDD = jsc.parallelize(insertsAndUpdates, 1);
List<WriteStatus> statuses = client.upsert(insertAndUpdatesRDD, instantTime).collect();
assertNoWriteErrors(statuses);
// Check the entire dataset has all records still
String[] fullPartitionPaths = new String[dataGen.getPartitionPaths().length];
for (int i = 0; i < fullPartitionPaths.length; i++) {
fullPartitionPaths[i] = String.format("%s/%s/*", basePath, dataGen.getPartitionPaths()[i]);
}
assertEquals(expectedTotalRecords,
HoodieClientTestUtils.read(jsc, basePath, sqlContext, fs, fullPartitionPaths).count(),
"Must contain " + expectedTotalRecords + " records");
return Pair.of(keys, inserts);
}
private void testDeletes(SparkRDDWriteClient client, List<HoodieRecord> previousRecords, int sizeToDelete,
String existingFile, String instantTime, int exepctedRecords, List<String> keys) {
client.startCommitWithTime(instantTime);
List<HoodieKey> hoodieKeysToDelete = randomSelectAsHoodieKeys(previousRecords, sizeToDelete);
JavaRDD<HoodieKey> deleteKeys = jsc.parallelize(hoodieKeysToDelete, 1);
List<WriteStatus> statuses = client.delete(deleteKeys, instantTime).collect();
assertNoWriteErrors(statuses);
assertEquals(1, statuses.size(), "Just 1 file needs to be added.");
assertEquals(existingFile, statuses.get(0).getFileId(), "Existing file should be expanded");
// Check the entire dataset has all records still
String[] fullPartitionPaths = new String[dataGen.getPartitionPaths().length];
for (int i = 0; i < fullPartitionPaths.length; i++) {
fullPartitionPaths[i] = String.format("%s/%s/*", basePath, dataGen.getPartitionPaths()[i]);
}
assertEquals(exepctedRecords,
HoodieClientTestUtils.read(jsc, basePath, sqlContext, fs, fullPartitionPaths).count(),
"Must contain " + exepctedRecords + " records");
Path newFile = new Path(basePath, statuses.get(0).getStat().getPath());
assertEquals(exepctedRecords,
readRowKeysFromParquet(hadoopConf, newFile).size(),
"file should contain 110 records");
List<GenericRecord> records = ParquetUtils.readAvroRecords(hadoopConf, newFile);
for (GenericRecord record : records) {
String recordKey = record.get(HoodieRecord.RECORD_KEY_METADATA_FIELD).toString();
assertTrue(keys.contains(recordKey), "key expected to be part of " + instantTime);
assertFalse(hoodieKeysToDelete.contains(recordKey), "Key deleted");
}
}
/**
* Test delete with delete api.
*/
@Test
public void testDeletesWithoutInserts() {
final String testPartitionPath = "2016/09/26";
final int insertSplitLimit = 100;
// setup the small file handling params
HoodieWriteConfig config = getSmallInsertWriteConfig(insertSplitLimit, true); // hold upto 200 records max
dataGen = new HoodieTestDataGenerator(new String[] {testPartitionPath});
SparkRDDWriteClient client = getHoodieWriteClient(config, false);
// delete non existent keys
String commitTime1 = "001";
client.startCommitWithTime(commitTime1);
List<HoodieRecord> dummyInserts = dataGen.generateInserts(commitTime1, 20);
List<HoodieKey> hoodieKeysToDelete = randomSelectAsHoodieKeys(dummyInserts, 20);
JavaRDD<HoodieKey> deleteKeys = jsc.parallelize(hoodieKeysToDelete, 1);
assertThrows(HoodieIOException.class, () -> {
client.delete(deleteKeys, commitTime1).collect();
}, "Should have thrown Exception");
}
/**
* Test to ensure commit metadata points to valid files.
*/
@Test
public void testCommitWritesRelativePaths() throws Exception {
HoodieWriteConfig cfg = getConfigBuilder().withAutoCommit(false).build();
try (SparkRDDWriteClient client = getHoodieWriteClient(cfg);) {
HoodieTableMetaClient metaClient = new HoodieTableMetaClient(hadoopConf, basePath);
HoodieSparkTable table = HoodieSparkTable.create(cfg, context, metaClient);
String instantTime = "000";
client.startCommitWithTime(instantTime);
List<HoodieRecord> records = dataGen.generateInserts(instantTime, 200);
JavaRDD<HoodieRecord> writeRecords = jsc.parallelize(records, 1);
JavaRDD<WriteStatus> result = client.bulkInsert(writeRecords, instantTime);
assertTrue(client.commit(instantTime, result), "Commit should succeed");
assertTrue(testTable.commitExists(instantTime),
"After explicit commit, commit file should be created");
// Get parquet file paths from commit metadata
String actionType = metaClient.getCommitActionType();
HoodieInstant commitInstant = new HoodieInstant(false, actionType, instantTime);
HoodieTimeline commitTimeline = metaClient.getCommitTimeline().filterCompletedInstants();
HoodieCommitMetadata commitMetadata = HoodieCommitMetadata
.fromBytes(commitTimeline.getInstantDetails(commitInstant).get(), HoodieCommitMetadata.class);
String basePath = table.getMetaClient().getBasePath();
Collection<String> commitPathNames = commitMetadata.getFileIdAndFullPaths(basePath).values();
// Read from commit file
try (FSDataInputStream inputStream = fs.open(testTable.getCommitFilePath(instantTime))) {
String everything = FileIOUtils.readAsUTFString(inputStream);
HoodieCommitMetadata metadata = HoodieCommitMetadata.fromJsonString(everything, HoodieCommitMetadata.class);
HashMap<String, String> paths = metadata.getFileIdAndFullPaths(basePath);
// Compare values in both to make sure they are equal.
for (String pathName : paths.values()) {
assertTrue(commitPathNames.contains(pathName));
}
}
}
}
/**
* Test to ensure commit metadata points to valid files.10.
*/
@Test
public void testMetadataStatsOnCommit() throws Exception {
HoodieWriteConfig cfg = getConfigBuilder().withAutoCommit(false).build();
SparkRDDWriteClient client = getHoodieWriteClient(cfg);
String instantTime0 = "000";
client.startCommitWithTime(instantTime0);
List<HoodieRecord> records0 = dataGen.generateInserts(instantTime0, 200);
JavaRDD<HoodieRecord> writeRecords0 = jsc.parallelize(records0, 1);
JavaRDD<WriteStatus> result0 = client.bulkInsert(writeRecords0, instantTime0);
assertTrue(client.commit(instantTime0, result0), "Commit should succeed");
assertTrue(testTable.commitExists(instantTime0),
"After explicit commit, commit file should be created");
// Read from commit file
try (FSDataInputStream inputStream = fs.open(testTable.getCommitFilePath(instantTime0))) {
String everything = FileIOUtils.readAsUTFString(inputStream);
HoodieCommitMetadata metadata =
HoodieCommitMetadata.fromJsonString(everything, HoodieCommitMetadata.class);
int inserts = 0;
for (Map.Entry<String, List<HoodieWriteStat>> pstat : metadata.getPartitionToWriteStats().entrySet()) {
for (HoodieWriteStat stat : pstat.getValue()) {
inserts += stat.getNumInserts();
}
}
assertEquals(200, inserts);
}
// Update + Inserts such that they just expand file1
String instantTime1 = "001";
client.startCommitWithTime(instantTime1);
List<HoodieRecord> records1 = dataGen.generateUpdates(instantTime1, records0);
JavaRDD<HoodieRecord> writeRecords1 = jsc.parallelize(records1, 1);
JavaRDD<WriteStatus> result1 = client.upsert(writeRecords1, instantTime1);
assertTrue(client.commit(instantTime1, result1), "Commit should succeed");
assertTrue(testTable.commitExists(instantTime1),
"After explicit commit, commit file should be created");
// Read from commit file
try (FSDataInputStream inputStream = fs.open(testTable.getCommitFilePath(instantTime1))) {
String everything = FileIOUtils.readAsUTFString(inputStream);
HoodieCommitMetadata metadata = HoodieCommitMetadata.fromJsonString(everything, HoodieCommitMetadata.class);
int inserts = 0;
int upserts = 0;
for (Map.Entry<String, List<HoodieWriteStat>> pstat : metadata.getPartitionToWriteStats().entrySet()) {
for (HoodieWriteStat stat : pstat.getValue()) {
inserts += stat.getNumInserts();
upserts += stat.getNumUpdateWrites();
}
}
assertEquals(0, inserts);
assertEquals(200, upserts);
}
}
/**
* Tests behavior of committing only when consistency is verified.
*/
@ParameterizedTest
@ValueSource(booleans = {true, false})
public void testConsistencyCheckDuringFinalize(boolean enableOptimisticConsistencyGuard) throws Exception {
HoodieTableMetaClient metaClient = new HoodieTableMetaClient(hadoopConf, basePath);
String instantTime = "000";
HoodieWriteConfig cfg = getConfigBuilder().withAutoCommit(false).withConsistencyGuardConfig(ConsistencyGuardConfig.newBuilder()
.withEnableOptimisticConsistencyGuard(enableOptimisticConsistencyGuard).build()).build();
SparkRDDWriteClient client = getHoodieWriteClient(cfg);
Pair<Path, JavaRDD<WriteStatus>> result = testConsistencyCheck(metaClient, instantTime, enableOptimisticConsistencyGuard);
// Delete orphan marker and commit should succeed
metaClient.getFs().delete(result.getKey(), false);
if (!enableOptimisticConsistencyGuard) {
assertTrue(client.commit(instantTime, result.getRight()), "Commit should succeed");
assertTrue(testTable.commitExists(instantTime),
"After explicit commit, commit file should be created");
// Marker directory must be removed
assertFalse(metaClient.getFs().exists(new Path(metaClient.getMarkerFolderPath(instantTime))));
} else {
// with optimistic, first client.commit should have succeeded.
assertTrue(testTable.commitExists(instantTime),
"After explicit commit, commit file should be created");
// Marker directory must be removed
assertFalse(metaClient.getFs().exists(new Path(metaClient.getMarkerFolderPath(instantTime))));
}
}
private void testRollbackAfterConsistencyCheckFailureUsingFileList(boolean rollbackUsingMarkers, boolean enableOptimisticConsistencyGuard) throws Exception {
String instantTime = "000";
HoodieTableMetaClient metaClient = new HoodieTableMetaClient(hadoopConf, basePath);
HoodieWriteConfig cfg = !enableOptimisticConsistencyGuard ? getConfigBuilder().withRollbackUsingMarkers(rollbackUsingMarkers).withAutoCommit(false)
.withConsistencyGuardConfig(ConsistencyGuardConfig.newBuilder().withConsistencyCheckEnabled(true)
.withMaxConsistencyCheckIntervalMs(1).withInitialConsistencyCheckIntervalMs(1).withEnableOptimisticConsistencyGuard(enableOptimisticConsistencyGuard).build()).build() :
getConfigBuilder().withRollbackUsingMarkers(rollbackUsingMarkers).withAutoCommit(false)
.withConsistencyGuardConfig(ConsistencyGuardConfig.newBuilder()
.withConsistencyCheckEnabled(true)
.withOptimisticConsistencyGuardSleepTimeMs(1).build()).build();
SparkRDDWriteClient client = getHoodieWriteClient(cfg);
testConsistencyCheck(metaClient, instantTime, enableOptimisticConsistencyGuard);
if (!enableOptimisticConsistencyGuard) {
// Rollback of this commit should succeed with FailSafeCG
client.rollback(instantTime);
assertFalse(testTable.commitExists(instantTime),
"After explicit rollback, commit file should not be present");
// Marker directory must be removed after rollback
assertFalse(metaClient.getFs().exists(new Path(metaClient.getMarkerFolderPath(instantTime))));
} else {
// if optimistic CG is enabled, commit should have succeeded.
assertTrue(testTable.commitExists(instantTime),
"With optimistic CG, first commit should succeed. commit file should be present");
// Marker directory must be removed after rollback
assertFalse(metaClient.getFs().exists(new Path(metaClient.getMarkerFolderPath(instantTime))));
if (rollbackUsingMarkers) {
// rollback of a completed commit should fail if marked based rollback is used.
try {
client.rollback(instantTime);
fail("Rollback of completed commit should throw exception");
} catch (HoodieRollbackException e) {
// ignore
}
} else {
// rollback of a completed commit should succeed if using list based rollback
client.rollback(instantTime);
assertFalse(testTable.commitExists(instantTime),
"After explicit rollback, commit file should not be present");
}
}
}
@ParameterizedTest
@ValueSource(booleans = {true, false})
public void testRollbackAfterConsistencyCheckFailureUsingFileList(boolean enableOptimisticConsistencyGuard) throws Exception {
testRollbackAfterConsistencyCheckFailureUsingFileList(false, enableOptimisticConsistencyGuard);
}
@ParameterizedTest
@ValueSource(booleans = {true, false})
public void testRollbackAfterConsistencyCheckFailureUsingMarkers(boolean enableOptimisticConsistencyGuard) throws Exception {
testRollbackAfterConsistencyCheckFailureUsingFileList(true, enableOptimisticConsistencyGuard);
}
private Pair<Path, JavaRDD<WriteStatus>> testConsistencyCheck(HoodieTableMetaClient metaClient, String instantTime, boolean enableOptimisticConsistencyGuard)
throws Exception {
HoodieWriteConfig cfg = !enableOptimisticConsistencyGuard ? (getConfigBuilder().withAutoCommit(false)
.withConsistencyGuardConfig(ConsistencyGuardConfig.newBuilder().withConsistencyCheckEnabled(true)
.withMaxConsistencyCheckIntervalMs(1).withInitialConsistencyCheckIntervalMs(1).withEnableOptimisticConsistencyGuard(enableOptimisticConsistencyGuard).build())
.build()) : (getConfigBuilder().withAutoCommit(false)
.withConsistencyGuardConfig(ConsistencyGuardConfig.newBuilder().withConsistencyCheckEnabled(true)
.withOptimisticConsistencyGuardSleepTimeMs(1).build())
.build());
SparkRDDWriteClient client = getHoodieWriteClient(cfg);
client.startCommitWithTime(instantTime);
JavaRDD<HoodieRecord> writeRecords = jsc.parallelize(dataGen.generateInserts(instantTime, 200), 1);
JavaRDD<WriteStatus> result = client.bulkInsert(writeRecords, instantTime);
result.collect();
// Create a dummy marker file to simulate the case that a marker file was created without data file.
// This should fail the commit
String partitionPath = Arrays
.stream(fs.globStatus(new Path(String.format("%s/*/*/*/*", metaClient.getMarkerFolderPath(instantTime))),
path -> path.toString().contains(HoodieTableMetaClient.MARKER_EXTN)))
.limit(1).map(status -> status.getPath().getParent().toString()).collect(Collectors.toList()).get(0);
Path markerFilePath = new MarkerFiles(fs, basePath, metaClient.getMarkerFolderPath(instantTime), instantTime)
.create(partitionPath,
FSUtils.makeDataFileName(instantTime, "1-0-1", UUID.randomUUID().toString()),
IOType.MERGE);
LOG.info("Created a dummy marker path=" + markerFilePath);
if (!enableOptimisticConsistencyGuard) {
Exception e = assertThrows(HoodieCommitException.class, () -> {
client.commit(instantTime, result);
}, "Commit should fail due to consistency check");
assertTrue(e.getCause() instanceof HoodieIOException);
} else {
// with optimistic CG, commit should succeed
client.commit(instantTime, result);
}
return Pair.of(markerFilePath, result);
}
@Test
public void testMultiOperationsPerCommit() throws IOException {
HoodieWriteConfig cfg = getConfigBuilder().withAutoCommit(false)
.withAllowMultiWriteOnSameInstant(true)
.build();
SparkRDDWriteClient client = getHoodieWriteClient(cfg);
String firstInstantTime = "0000";
client.startCommitWithTime(firstInstantTime);
int numRecords = 200;
JavaRDD<HoodieRecord> writeRecords = jsc.parallelize(dataGen.generateInserts(firstInstantTime, numRecords), 1);
JavaRDD<WriteStatus> result = client.bulkInsert(writeRecords, firstInstantTime);
assertTrue(client.commit(firstInstantTime, result), "Commit should succeed");
assertTrue(testTable.commitExists(firstInstantTime),
"After explicit commit, commit file should be created");
// Check the entire dataset has all records still
String[] fullPartitionPaths = new String[dataGen.getPartitionPaths().length];
for (int i = 0; i < fullPartitionPaths.length; i++) {
fullPartitionPaths[i] = String.format("%s/%s/*", basePath, dataGen.getPartitionPaths()[i]);
}
assertEquals(numRecords,
HoodieClientTestUtils.read(jsc, basePath, sqlContext, fs, fullPartitionPaths).count(),
"Must contain " + numRecords + " records");
String nextInstantTime = "0001";
client.startCommitWithTime(nextInstantTime);
JavaRDD<HoodieRecord> updateRecords = jsc.parallelize(dataGen.generateUpdates(nextInstantTime, numRecords), 1);
JavaRDD<HoodieRecord> insertRecords = jsc.parallelize(dataGen.generateInserts(nextInstantTime, numRecords), 1);
JavaRDD<WriteStatus> inserts = client.bulkInsert(insertRecords, nextInstantTime);
JavaRDD<WriteStatus> upserts = client.upsert(updateRecords, nextInstantTime);
assertTrue(client.commit(nextInstantTime, inserts.union(upserts)), "Commit should succeed");
assertTrue(testTable.commitExists(firstInstantTime),
"After explicit commit, commit file should be created");
int totalRecords = 2 * numRecords;
assertEquals(totalRecords, HoodieClientTestUtils.read(jsc, basePath, sqlContext, fs, fullPartitionPaths).count(),
"Must contain " + totalRecords + " records");
}
/**
* Build Hoodie Write Config for small data file sizes.
*/
private HoodieWriteConfig getSmallInsertWriteConfig(int insertSplitSize) {
return getSmallInsertWriteConfig(insertSplitSize, false);
}
/**
* Build Hoodie Write Config for small data file sizes.
*/
private HoodieWriteConfig getSmallInsertWriteConfig(int insertSplitSize, boolean useNullSchema) {
HoodieWriteConfig.Builder builder = getConfigBuilder(useNullSchema ? NULL_SCHEMA : TRIP_EXAMPLE_SCHEMA);
return builder
.withCompactionConfig(
HoodieCompactionConfig.newBuilder()
.compactionSmallFileSize(dataGen.getEstimatedFileSizeInBytes(150))
.insertSplitSize(insertSplitSize).build())
.withStorageConfig(
HoodieStorageConfig.newBuilder()
.hfileMaxFileSize(dataGen.getEstimatedFileSizeInBytes(200))
.parquetMaxFileSize(dataGen.getEstimatedFileSizeInBytes(200)).build())
.build();
}
}