blob: 513bac14b2efdcb0328ee45ad486ac6a240e4b05 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hudi.client.functional;
import org.apache.hudi.client.HoodieWriteResult;
import org.apache.hudi.client.SparkRDDWriteClient;
import org.apache.hudi.client.WriteStatus;
import org.apache.hudi.client.common.HoodieSparkEngineContext;
import org.apache.hudi.common.config.HoodieMetadataConfig;
import org.apache.hudi.common.config.SerializableConfiguration;
import org.apache.hudi.common.fs.FSUtils;
import org.apache.hudi.common.metrics.Registry;
import org.apache.hudi.common.model.FileSlice;
import org.apache.hudi.common.model.HoodieFailedWritesCleaningPolicy;
import org.apache.hudi.common.model.HoodieFileFormat;
import org.apache.hudi.common.model.HoodieFileGroup;
import org.apache.hudi.common.model.HoodieKey;
import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.common.model.HoodieTableType;
import org.apache.hudi.common.table.HoodieTableMetaClient;
import org.apache.hudi.common.table.timeline.HoodieActiveTimeline;
import org.apache.hudi.common.table.timeline.HoodieTimeline;
import org.apache.hudi.common.table.view.FileSystemViewStorageConfig;
import org.apache.hudi.common.table.view.HoodieTableFileSystemView;
import org.apache.hudi.common.table.view.TableFileSystemView;
import org.apache.hudi.common.testutils.HoodieTestDataGenerator;
import org.apache.hudi.common.testutils.HoodieTestTable;
import org.apache.hudi.common.util.HoodieTimer;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.config.HoodieCompactionConfig;
import org.apache.hudi.config.HoodieIndexConfig;
import org.apache.hudi.config.HoodieMetricsConfig;
import org.apache.hudi.config.HoodieStorageConfig;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.exception.TableNotFoundException;
import org.apache.hudi.index.HoodieIndex;
import org.apache.hudi.metadata.FileSystemBackedTableMetadata;
import org.apache.hudi.metadata.HoodieBackedTableMetadataWriter;
import org.apache.hudi.metadata.HoodieMetadataMetrics;
import org.apache.hudi.metadata.HoodieTableMetadata;
import org.apache.hudi.metadata.MetadataPartitionType;
import org.apache.hudi.metadata.SparkHoodieBackedTableMetadataWriter;
import org.apache.hudi.table.HoodieSparkTable;
import org.apache.hudi.table.HoodieTable;
import org.apache.hudi.testutils.HoodieClientTestHarness;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.Path;
import org.apache.log4j.LogManager;
import org.apache.log4j.Logger;
import org.apache.spark.api.java.JavaRDD;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Disabled;
import org.junit.jupiter.api.Tag;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.io.TempDir;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.EnumSource;
import java.io.IOException;
import java.nio.file.Files;
import java.nio.file.Paths;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
import static org.apache.hudi.common.testutils.HoodieTestDataGenerator.TRIP_EXAMPLE_SCHEMA;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertNotNull;
import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.junit.jupiter.api.Assertions.assertTrue;
@Tag("functional")
public class TestHoodieBackedMetadata extends HoodieClientTestHarness {
private static final Logger LOG = LogManager.getLogger(TestHoodieBackedMetadata.class);
@TempDir
public java.nio.file.Path tempFolder;
private String metadataTableBasePath;
private HoodieTableType tableType;
public void init(HoodieTableType tableType) throws IOException {
this.tableType = tableType;
initPath();
initSparkContexts("TestHoodieMetadata");
initFileSystem();
fs.mkdirs(new Path(basePath));
initMetaClient(tableType);
initTestDataGenerator();
metadataTableBasePath = HoodieTableMetadata.getMetadataTableBasePath(basePath);
}
@AfterEach
public void clean() throws IOException {
cleanupResources();
}
/**
* Metadata Table bootstrap scenarios.
*/
@Test
public void testMetadataTableBootstrap() throws Exception {
init(HoodieTableType.COPY_ON_WRITE);
HoodieSparkEngineContext engineContext = new HoodieSparkEngineContext(jsc);
// Metadata table should not exist until created for the first time
assertFalse(fs.exists(new Path(metadataTableBasePath)), "Metadata table should not exist");
assertThrows(TableNotFoundException.class, () -> HoodieTableMetaClient.builder().setConf(hadoopConf).setBasePath(metadataTableBasePath).build());
// Metadata table is not created if disabled by config
String firstCommitTime = HoodieActiveTimeline.createNewInstantTime();
try (SparkRDDWriteClient client = new SparkRDDWriteClient(engineContext, getWriteConfig(true, false))) {
client.startCommitWithTime(firstCommitTime);
client.insert(jsc.parallelize(dataGen.generateInserts(firstCommitTime, 5)), firstCommitTime);
assertFalse(fs.exists(new Path(metadataTableBasePath)), "Metadata table should not be created");
assertThrows(TableNotFoundException.class, () -> HoodieTableMetaClient.builder().setConf(hadoopConf).setBasePath(metadataTableBasePath).build());
}
// Metadata table should not be created if any non-complete instants are present
String secondCommitTime = HoodieActiveTimeline.createNewInstantTime();
try (SparkRDDWriteClient client = new SparkRDDWriteClient(engineContext, getWriteConfig(false, true), true)) {
client.startCommitWithTime(secondCommitTime);
client.insert(jsc.parallelize(dataGen.generateUpdates(secondCommitTime, 2)), secondCommitTime);
// AutoCommit is false so no bootstrap
client.syncTableMetadata();
assertFalse(fs.exists(new Path(metadataTableBasePath)), "Metadata table should not be created");
assertThrows(TableNotFoundException.class, () -> HoodieTableMetaClient.builder().setConf(hadoopConf).setBasePath(metadataTableBasePath).build());
// rollback this commit
client.rollback(secondCommitTime);
}
// Metadata table created when enabled by config & sync is called
secondCommitTime = HoodieActiveTimeline.createNewInstantTime();
try (SparkRDDWriteClient client = new SparkRDDWriteClient(engineContext, getWriteConfig(true, true), true)) {
client.startCommitWithTime(secondCommitTime);
client.insert(jsc.parallelize(dataGen.generateUpdates(secondCommitTime, 2)), secondCommitTime);
client.syncTableMetadata();
assertTrue(fs.exists(new Path(metadataTableBasePath)));
validateMetadata(client);
}
// Delete all existing instants on dataset to simulate archiving. This should trigger a re-bootstrap of the metadata
// table as last synched instant has been "archived".
final String metadataTableMetaPath = metadataTableBasePath + Path.SEPARATOR + HoodieTableMetaClient.METAFOLDER_NAME;
assertTrue(fs.exists(new Path(metadataTableMetaPath, HoodieTimeline.makeDeltaFileName(secondCommitTime))));
Arrays.stream(fs.listStatus(new Path(metaClient.getMetaPath()))).filter(status -> status.getPath().getName().matches("^\\d+\\..*"))
.forEach(status -> {
try {
fs.delete(status.getPath(), false);
} catch (IOException e) {
LOG.warn("Error when deleting instant " + status + ": " + e);
}
});
String thirdCommitTime = HoodieActiveTimeline.createNewInstantTime();
try (SparkRDDWriteClient client = new SparkRDDWriteClient(engineContext, getWriteConfig(true, true), true)) {
client.startCommitWithTime(thirdCommitTime);
client.insert(jsc.parallelize(dataGen.generateUpdates(thirdCommitTime, 2)), thirdCommitTime);
client.syncTableMetadata();
assertTrue(fs.exists(new Path(metadataTableBasePath)));
validateMetadata(client);
// Metadata Table should not have previous delta-commits as it was re-bootstrapped
assertFalse(fs.exists(new Path(metadataTableMetaPath, HoodieTimeline.makeDeltaFileName(firstCommitTime))));
assertFalse(fs.exists(new Path(metadataTableMetaPath, HoodieTimeline.makeDeltaFileName(secondCommitTime))));
assertTrue(fs.exists(new Path(metadataTableMetaPath, HoodieTimeline.makeDeltaFileName(thirdCommitTime))));
}
}
/**
* Only valid partition directories are added to the metadata.
*/
@Test
public void testOnlyValidPartitionsAdded() throws Exception {
// This test requires local file system
init(HoodieTableType.COPY_ON_WRITE);
HoodieSparkEngineContext engineContext = new HoodieSparkEngineContext(jsc);
// Create an empty directory which is not a partition directory (lacks partition metadata)
final String nonPartitionDirectory = HoodieTestDataGenerator.DEFAULT_PARTITION_PATHS[0] + "-nonpartition";
Files.createDirectories(Paths.get(basePath, nonPartitionDirectory));
// Three directories which are partitions but will be ignored due to filter
final String filterDirRegex = ".*-filterDir\\d|\\..*";
final String filteredDirectoryOne = HoodieTestDataGenerator.DEFAULT_PARTITION_PATHS[0] + "-filterDir1";
final String filteredDirectoryTwo = HoodieTestDataGenerator.DEFAULT_PARTITION_PATHS[0] + "-filterDir2";
final String filteredDirectoryThree = ".backups";
// Create some commits
HoodieTestTable testTable = HoodieTestTable.of(metaClient);
testTable.withPartitionMetaFiles("p1", "p2", filteredDirectoryOne, filteredDirectoryTwo, filteredDirectoryThree)
.addCommit("001").withBaseFilesInPartition("p1", 10).withBaseFilesInPartition("p2", 10, 10)
.addCommit("002").withBaseFilesInPartition("p1", 10).withBaseFilesInPartition("p2", 10, 10, 10);
final HoodieWriteConfig writeConfig =
getWriteConfigBuilder(HoodieFailedWritesCleaningPolicy.NEVER, true, true, false)
.withMetadataConfig(HoodieMetadataConfig.newBuilder().enable(true).withDirectoryFilterRegex(filterDirRegex).build()).build();
try (SparkRDDWriteClient client = new SparkRDDWriteClient(engineContext, writeConfig)) {
client.startCommitWithTime("005");
client.insert(jsc.emptyRDD(), "005");
List<String> partitions = metadataWriter(client).metadata().getAllPartitionPaths();
assertFalse(partitions.contains(nonPartitionDirectory),
"Must not contain the non-partition " + nonPartitionDirectory);
assertTrue(partitions.contains("p1"), "Must contain partition p1");
assertTrue(partitions.contains("p2"), "Must contain partition p2");
assertFalse(partitions.contains(filteredDirectoryOne),
"Must not contain the filtered directory " + filteredDirectoryOne);
assertFalse(partitions.contains(filteredDirectoryTwo),
"Must not contain the filtered directory " + filteredDirectoryTwo);
assertFalse(partitions.contains(filteredDirectoryThree),
"Must not contain the filtered directory " + filteredDirectoryThree);
FileStatus[] statuses = metadata(client).getAllFilesInPartition(new Path(basePath, "p1"));
assertEquals(2, statuses.length);
statuses = metadata(client).getAllFilesInPartition(new Path(basePath, "p2"));
assertEquals(5, statuses.length);
Map<String, FileStatus[]> partitionsToFilesMap = metadata(client).getAllFilesInPartitions(
Arrays.asList(basePath + "/p1", basePath + "/p2"));
assertEquals(2, partitionsToFilesMap.size());
assertEquals(2, partitionsToFilesMap.get(basePath + "/p1").length);
assertEquals(5, partitionsToFilesMap.get(basePath + "/p2").length);
}
}
/**
* Test various table operations sync to Metadata Table correctly.
*/
@ParameterizedTest
@EnumSource(HoodieTableType.class)
public void testTableOperations(HoodieTableType tableType) throws Exception {
init(tableType);
HoodieSparkEngineContext engineContext = new HoodieSparkEngineContext(jsc);
try (SparkRDDWriteClient client = new SparkRDDWriteClient(engineContext, getWriteConfig(true, true))) {
// Write 1 (Bulk insert)
String newCommitTime = "001";
List<HoodieRecord> records = dataGen.generateInserts(newCommitTime, 20);
client.startCommitWithTime(newCommitTime);
List<WriteStatus> writeStatuses = client.bulkInsert(jsc.parallelize(records, 1), newCommitTime).collect();
assertNoWriteErrors(writeStatuses);
validateMetadata(client);
// Write 2 (inserts)
newCommitTime = "002";
client.startCommitWithTime(newCommitTime);
validateMetadata(client);
records = dataGen.generateInserts(newCommitTime, 20);
writeStatuses = client.insert(jsc.parallelize(records, 1), newCommitTime).collect();
assertNoWriteErrors(writeStatuses);
validateMetadata(client);
// Write 3 (updates)
newCommitTime = "003";
client.startCommitWithTime(newCommitTime);
records = dataGen.generateUniqueUpdates(newCommitTime, 10);
writeStatuses = client.upsert(jsc.parallelize(records, 1), newCommitTime).collect();
assertNoWriteErrors(writeStatuses);
validateMetadata(client);
// Write 4 (updates and inserts)
newCommitTime = "004";
client.startCommitWithTime(newCommitTime);
records = dataGen.generateUpdates(newCommitTime, 10);
writeStatuses = client.upsert(jsc.parallelize(records, 1), newCommitTime).collect();
assertNoWriteErrors(writeStatuses);
validateMetadata(client);
// Compaction
if (metaClient.getTableType() == HoodieTableType.MERGE_ON_READ) {
newCommitTime = "005";
client.scheduleCompactionAtInstant(newCommitTime, Option.empty());
client.compact(newCommitTime);
validateMetadata(client);
}
// Write 5 (updates and inserts)
newCommitTime = "006";
client.startCommitWithTime(newCommitTime);
records = dataGen.generateUpdates(newCommitTime, 5);
writeStatuses = client.upsert(jsc.parallelize(records, 1), newCommitTime).collect();
assertNoWriteErrors(writeStatuses);
validateMetadata(client);
// Compaction
if (metaClient.getTableType() == HoodieTableType.MERGE_ON_READ) {
newCommitTime = "007";
client.scheduleCompactionAtInstant(newCommitTime, Option.empty());
client.compact(newCommitTime);
validateMetadata(client);
}
// Deletes
newCommitTime = "008";
records = dataGen.generateDeletes(newCommitTime, 10);
JavaRDD<HoodieKey> deleteKeys = jsc.parallelize(records, 1).map(r -> r.getKey());
client.startCommitWithTime(newCommitTime);
client.delete(deleteKeys, newCommitTime);
validateMetadata(client);
// Clean
newCommitTime = "009";
client.clean(newCommitTime);
validateMetadata(client);
// Restore
client.restoreToInstant("006");
validateMetadata(client);
}
}
/**
* Test rollback of various table operations sync to Metadata Table correctly.
*/
@ParameterizedTest
@EnumSource(HoodieTableType.class)
public void testRollbackOperations(HoodieTableType tableType) throws Exception {
init(tableType);
HoodieSparkEngineContext engineContext = new HoodieSparkEngineContext(jsc);
try (SparkRDDWriteClient client = new SparkRDDWriteClient(engineContext, getWriteConfig(true, true))) {
// Write 1 (Bulk insert)
String newCommitTime = HoodieActiveTimeline.createNewInstantTime();
List<HoodieRecord> records = dataGen.generateInserts(newCommitTime, 20);
client.startCommitWithTime(newCommitTime);
List<WriteStatus> writeStatuses = client.bulkInsert(jsc.parallelize(records, 1), newCommitTime).collect();
assertNoWriteErrors(writeStatuses);
validateMetadata(client);
// Write 2 (inserts) + Rollback of inserts
newCommitTime = HoodieActiveTimeline.createNewInstantTime();
client.startCommitWithTime(newCommitTime);
records = dataGen.generateInserts(newCommitTime, 20);
writeStatuses = client.insert(jsc.parallelize(records, 1), newCommitTime).collect();
assertNoWriteErrors(writeStatuses);
validateMetadata(client);
client.rollback(newCommitTime);
client.syncTableMetadata();
validateMetadata(client);
// Write 3 (updates) + Rollback of updates
newCommitTime = HoodieActiveTimeline.createNewInstantTime();
client.startCommitWithTime(newCommitTime);
records = dataGen.generateUniqueUpdates(newCommitTime, 20);
writeStatuses = client.upsert(jsc.parallelize(records, 1), newCommitTime).collect();
assertNoWriteErrors(writeStatuses);
validateMetadata(client);
client.rollback(newCommitTime);
client.syncTableMetadata();
validateMetadata(client);
// Rollback of updates and inserts
newCommitTime = HoodieActiveTimeline.createNewInstantTime();
client.startCommitWithTime(newCommitTime);
records = dataGen.generateUpdates(newCommitTime, 10);
writeStatuses = client.upsert(jsc.parallelize(records, 1), newCommitTime).collect();
assertNoWriteErrors(writeStatuses);
validateMetadata(client);
client.rollback(newCommitTime);
client.syncTableMetadata();
validateMetadata(client);
// Rollback of Compaction
if (metaClient.getTableType() == HoodieTableType.MERGE_ON_READ) {
newCommitTime = HoodieActiveTimeline.createNewInstantTime();
client.scheduleCompactionAtInstant(newCommitTime, Option.empty());
client.compact(newCommitTime);
validateMetadata(client);
}
// Rollback of Deletes
newCommitTime = HoodieActiveTimeline.createNewInstantTime();
records = dataGen.generateDeletes(newCommitTime, 10);
JavaRDD<HoodieKey> deleteKeys = jsc.parallelize(records, 1).map(r -> r.getKey());
client.startCommitWithTime(newCommitTime);
writeStatuses = client.delete(deleteKeys, newCommitTime).collect();
assertNoWriteErrors(writeStatuses);
validateMetadata(client);
client.rollback(newCommitTime);
client.syncTableMetadata();
validateMetadata(client);
// Rollback of Clean
newCommitTime = HoodieActiveTimeline.createNewInstantTime();
client.clean(newCommitTime);
validateMetadata(client);
client.rollback(newCommitTime);
client.syncTableMetadata();
validateMetadata(client);
}
// Rollback of partial commits
try (SparkRDDWriteClient client = new SparkRDDWriteClient(engineContext,
getWriteConfigBuilder(false, true, false).withRollbackUsingMarkers(false).build())) {
// Write updates and inserts
String newCommitTime = HoodieActiveTimeline.createNewInstantTime();
client.startCommitWithTime(newCommitTime);
List<HoodieRecord> records = dataGen.generateUpdates(newCommitTime, 10);
List<WriteStatus> writeStatuses = client.upsert(jsc.parallelize(records, 1), newCommitTime).collect();
assertNoWriteErrors(writeStatuses);
client.rollback(newCommitTime);
client.syncTableMetadata();
validateMetadata(client);
}
// Marker based rollback of partial commits
try (SparkRDDWriteClient client = new SparkRDDWriteClient(engineContext,
getWriteConfigBuilder(false, true, false).withRollbackUsingMarkers(true).build())) {
// Write updates and inserts
String newCommitTime = HoodieActiveTimeline.createNewInstantTime();
client.startCommitWithTime(newCommitTime);
List<HoodieRecord> records = dataGen.generateUpdates(newCommitTime, 10);
List<WriteStatus> writeStatuses = client.upsert(jsc.parallelize(records, 1), newCommitTime).collect();
assertNoWriteErrors(writeStatuses);
client.rollback(newCommitTime);
client.syncTableMetadata();
validateMetadata(client);
}
}
/**
* Test when syncing rollback to metadata if the commit being rolled back has not been synced that essentially a no-op occurs to metadata.
* Once explicit sync is called, metadata should match.
*/
@ParameterizedTest
@EnumSource(HoodieTableType.class)
public void testRollbackUnsyncedCommit(HoodieTableType tableType) throws Exception {
init(tableType);
HoodieSparkEngineContext engineContext = new HoodieSparkEngineContext(jsc);
try (SparkRDDWriteClient client = new SparkRDDWriteClient(engineContext, getWriteConfig(true, true))) {
// Initialize table with metadata
String newCommitTime = HoodieActiveTimeline.createNewInstantTime();
List<HoodieRecord> records = dataGen.generateInserts(newCommitTime, 20);
client.startCommitWithTime(newCommitTime);
List<WriteStatus> writeStatuses = client.bulkInsert(jsc.parallelize(records, 1), newCommitTime).collect();
assertNoWriteErrors(writeStatuses);
validateMetadata(client);
}
String newCommitTime = HoodieActiveTimeline.createNewInstantTime();
try (SparkRDDWriteClient client = new SparkRDDWriteClient(engineContext, getWriteConfig(true, false))) {
// Commit with metadata disabled
client.startCommitWithTime(newCommitTime);
List<HoodieRecord> records = dataGen.generateUpdates(newCommitTime, 10);
List<WriteStatus> writeStatuses = client.upsert(jsc.parallelize(records, 1), newCommitTime).collect();
assertNoWriteErrors(writeStatuses);
client.rollback(newCommitTime);
}
try (SparkRDDWriteClient client = new SparkRDDWriteClient<>(engineContext, getWriteConfig(true, true))) {
assertFalse(metadata(client).isInSync());
client.syncTableMetadata();
validateMetadata(client);
}
}
/**
* Test sync of table operations.
*/
@ParameterizedTest
@EnumSource(HoodieTableType.class)
@Disabled
public void testSync(HoodieTableType tableType) throws Exception {
init(tableType);
HoodieSparkEngineContext engineContext = new HoodieSparkEngineContext(jsc);
String newCommitTime;
List<HoodieRecord> records;
List<WriteStatus> writeStatuses;
// Initial commits without metadata table enabled
try (SparkRDDWriteClient client = new SparkRDDWriteClient(engineContext, getWriteConfig(true, false))) {
newCommitTime = HoodieActiveTimeline.createNewInstantTime();
records = dataGen.generateInserts(newCommitTime, 5);
client.startCommitWithTime(newCommitTime);
writeStatuses = client.bulkInsert(jsc.parallelize(records, 1), newCommitTime).collect();
assertNoWriteErrors(writeStatuses);
newCommitTime = HoodieActiveTimeline.createNewInstantTime();
records = dataGen.generateInserts(newCommitTime, 5);
client.startCommitWithTime(newCommitTime);
writeStatuses = client.bulkInsert(jsc.parallelize(records, 1), newCommitTime).collect();
assertNoWriteErrors(writeStatuses);
}
// Enable metadata table so it initialized by listing from file system
try (SparkRDDWriteClient client = new SparkRDDWriteClient(engineContext, getWriteConfig(true, true))) {
// inserts
newCommitTime = HoodieActiveTimeline.createNewInstantTime();
client.startCommitWithTime(newCommitTime);
records = dataGen.generateInserts(newCommitTime, 5);
writeStatuses = client.insert(jsc.parallelize(records, 1), newCommitTime).collect();
assertNoWriteErrors(writeStatuses);
validateMetadata(client);
assertTrue(metadata(client).isInSync());
}
// Various table operations without metadata table enabled
String restoreToInstant;
String inflightActionTimestamp;
String beforeInflightActionTimestamp;
try (SparkRDDWriteClient client = new SparkRDDWriteClient(engineContext, getWriteConfig(true, false))) {
// updates
newCommitTime = HoodieActiveTimeline.createNewInstantTime();
client.startCommitWithTime(newCommitTime);
records = dataGen.generateUniqueUpdates(newCommitTime, 5);
writeStatuses = client.upsert(jsc.parallelize(records, 1), newCommitTime).collect();
assertNoWriteErrors(writeStatuses);
assertTrue(metadata(client).isInSync());
// updates and inserts
newCommitTime = HoodieActiveTimeline.createNewInstantTime();
client.startCommitWithTime(newCommitTime);
records = dataGen.generateUpdates(newCommitTime, 10);
writeStatuses = client.upsert(jsc.parallelize(records, 1), newCommitTime).collect();
assertNoWriteErrors(writeStatuses);
assertTrue(metadata(client).isInSync());
// Compaction
if (metaClient.getTableType() == HoodieTableType.MERGE_ON_READ) {
newCommitTime = HoodieActiveTimeline.createNewInstantTime();
client.scheduleCompactionAtInstant(newCommitTime, Option.empty());
client.compact(newCommitTime);
assertTrue(metadata(client).isInSync());
}
// Savepoint
restoreToInstant = newCommitTime;
if (metaClient.getTableType() == HoodieTableType.COPY_ON_WRITE) {
client.savepoint("hoodie", "metadata test");
assertTrue(metadata(client).isInSync());
}
// Record a timestamp for creating an inflight instance for sync testing
inflightActionTimestamp = HoodieActiveTimeline.createNewInstantTime();
beforeInflightActionTimestamp = newCommitTime;
// Deletes
newCommitTime = HoodieActiveTimeline.createNewInstantTime();
records = dataGen.generateDeletes(newCommitTime, 5);
JavaRDD<HoodieKey> deleteKeys = jsc.parallelize(records, 1).map(r -> r.getKey());
client.startCommitWithTime(newCommitTime);
client.delete(deleteKeys, newCommitTime);
assertTrue(metadata(client).isInSync());
// Clean
newCommitTime = HoodieActiveTimeline.createNewInstantTime();
client.clean(newCommitTime);
assertTrue(metadata(client).isInSync());
// updates
newCommitTime = HoodieActiveTimeline.createNewInstantTime();
client.startCommitWithTime(newCommitTime);
records = dataGen.generateUniqueUpdates(newCommitTime, 10);
writeStatuses = client.upsert(jsc.parallelize(records, 1), newCommitTime).collect();
assertNoWriteErrors(writeStatuses);
assertTrue(metadata(client).isInSync());
// insert overwrite to test replacecommit
newCommitTime = HoodieActiveTimeline.createNewInstantTime();
client.startCommitWithTime(newCommitTime, HoodieTimeline.REPLACE_COMMIT_ACTION);
records = dataGen.generateInserts(newCommitTime, 5);
HoodieWriteResult replaceResult = client.insertOverwrite(jsc.parallelize(records, 1), newCommitTime);
writeStatuses = replaceResult.getWriteStatuses().collect();
assertNoWriteErrors(writeStatuses);
assertTrue(metadata(client).isInSync());
}
// If there is an incomplete operation, the Metadata Table is not updated beyond that operations but the
// in-memory merge should consider all the completed operations.
Path inflightCleanPath = new Path(metaClient.getMetaPath(), HoodieTimeline.makeInflightCleanerFileName(inflightActionTimestamp));
fs.create(inflightCleanPath).close();
try (SparkRDDWriteClient client = new SparkRDDWriteClient(engineContext, getWriteConfig(true, true))) {
// Restore cannot be done until the metadata table is in sync. See HUDI-1502 for details
client.syncTableMetadata();
// Table should sync only before the inflightActionTimestamp
HoodieBackedTableMetadataWriter writer =
(HoodieBackedTableMetadataWriter) SparkHoodieBackedTableMetadataWriter.create(hadoopConf, client.getConfig(), context);
assertEquals(writer.getLatestSyncedInstantTime().get(), beforeInflightActionTimestamp);
// Reader should sync to all the completed instants
HoodieTableMetadata metadata = HoodieTableMetadata.create(context, client.getConfig().getMetadataConfig(),
client.getConfig().getBasePath(), FileSystemViewStorageConfig.FILESYSTEM_VIEW_SPILLABLE_DIR.defaultValue());
assertEquals(metadata.getSyncedInstantTime().get(), newCommitTime);
// Remove the inflight instance holding back table sync
fs.delete(inflightCleanPath, false);
client.syncTableMetadata();
writer =
(HoodieBackedTableMetadataWriter)SparkHoodieBackedTableMetadataWriter.create(hadoopConf, client.getConfig(), context);
assertEquals(writer.getLatestSyncedInstantTime().get(), newCommitTime);
// Reader should sync to all the completed instants
metadata = HoodieTableMetadata.create(context, client.getConfig().getMetadataConfig(),
client.getConfig().getBasePath(), FileSystemViewStorageConfig.FILESYSTEM_VIEW_SPILLABLE_DIR.defaultValue());
assertEquals(metadata.getSyncedInstantTime().get(), newCommitTime);
}
// Enable metadata table and ensure it is synced
try (SparkRDDWriteClient client = new SparkRDDWriteClient(engineContext, getWriteConfig(true, true))) {
client.restoreToInstant(restoreToInstant);
assertFalse(metadata(client).isInSync());
newCommitTime = HoodieActiveTimeline.createNewInstantTime();
client.startCommitWithTime(newCommitTime);
client.syncTableMetadata();
validateMetadata(client);
assertTrue(metadata(client).isInSync());
}
}
/**
* Instants on Metadata Table should be archived as per config. Metadata Table should be automatically compacted as per config.
*/
@Test
public void testCleaningArchivingAndCompaction() throws Exception {
init(HoodieTableType.COPY_ON_WRITE);
HoodieSparkEngineContext engineContext = new HoodieSparkEngineContext(jsc);
final int maxDeltaCommitsBeforeCompaction = 4;
HoodieWriteConfig config = getWriteConfigBuilder(true, true, false)
.withMetadataConfig(HoodieMetadataConfig.newBuilder().enable(true)
.archiveCommitsWith(6, 8).retainCommits(1)
.withMaxNumDeltaCommitsBeforeCompaction(maxDeltaCommitsBeforeCompaction).build())
// don't archive the data timeline at all.
.withCompactionConfig(HoodieCompactionConfig.newBuilder().archiveCommitsWith(Integer.MAX_VALUE - 1, Integer.MAX_VALUE)
.retainCommits(1).retainFileVersions(1).withAutoClean(true).withAsyncClean(true).build())
.build();
List<HoodieRecord> records;
try (SparkRDDWriteClient client = new SparkRDDWriteClient(engineContext, config)) {
for (int i = 1; i < 10; ++i) {
String newCommitTime = HoodieActiveTimeline.createNewInstantTime();
if (i == 1) {
records = dataGen.generateInserts(newCommitTime, 5);
} else {
records = dataGen.generateUpdates(newCommitTime, 2);
}
client.startCommitWithTime(newCommitTime);
List<WriteStatus> writeStatuses = client.upsert(jsc.parallelize(records, 1), newCommitTime).collect();
assertNoWriteErrors(writeStatuses);
validateMetadata(client);
}
}
HoodieTableMetaClient metadataMetaClient = HoodieTableMetaClient.builder().setConf(hadoopConf).setBasePath(metadataTableBasePath).build();
HoodieTableMetaClient datasetMetaClient = HoodieTableMetaClient.builder().setConf(hadoopConf).setBasePath(config.getBasePath()).build();
HoodieActiveTimeline metadataTimeline = metadataMetaClient.getActiveTimeline();
// check that there are compactions.
assertTrue(metadataTimeline.getCommitTimeline().filterCompletedInstants().countInstants() > 0);
// check that cleaning has, once after each compaction.
assertTrue(metadataTimeline.getCleanerTimeline().filterCompletedInstants().countInstants() > 0);
// ensure archiving has happened
long numDataCompletedInstants = datasetMetaClient.getActiveTimeline().filterCompletedInstants().countInstants();
long numDeltaCommits = metadataTimeline.getDeltaCommitTimeline().filterCompletedInstants().countInstants();
assertTrue(numDeltaCommits < numDataCompletedInstants, "Must have less delta commits than total completed instants on data timeline.");
}
/**
* Test various error scenarios.
*/
@Test
public void testErrorCases() throws Exception {
init(HoodieTableType.COPY_ON_WRITE);
HoodieSparkEngineContext engineContext = new HoodieSparkEngineContext(jsc);
// TESTCASE: If commit on the metadata table succeeds but fails on the dataset, then on next init the metadata table
// should be rolled back to last valid commit.
try (SparkRDDWriteClient client = new SparkRDDWriteClient(engineContext, getWriteConfig(true, true), true)) {
String newCommitTime = HoodieActiveTimeline.createNewInstantTime();
client.startCommitWithTime(newCommitTime);
List<HoodieRecord> records = dataGen.generateInserts(newCommitTime, 10);
List<WriteStatus> writeStatuses = client.upsert(jsc.parallelize(records, 1), newCommitTime).collect();
assertNoWriteErrors(writeStatuses);
validateMetadata(client);
newCommitTime = HoodieActiveTimeline.createNewInstantTime();
client.startCommitWithTime(newCommitTime);
records = dataGen.generateInserts(newCommitTime, 5);
writeStatuses = client.bulkInsert(jsc.parallelize(records, 1), newCommitTime).collect();
assertNoWriteErrors(writeStatuses);
// There is no way to simulate failed commit on the main dataset, hence we simply delete the completed
// instant so that only the inflight is left over.
String commitInstantFileName = HoodieTimeline.makeCommitFileName(newCommitTime);
assertTrue(fs.delete(new Path(basePath + Path.SEPARATOR + HoodieTableMetaClient.METAFOLDER_NAME,
commitInstantFileName), false));
}
try (SparkRDDWriteClient client = new SparkRDDWriteClient(engineContext, getWriteConfig(true, true), true)) {
String newCommitTime = client.startCommit();
// Next insert
List<HoodieRecord> records = dataGen.generateInserts(newCommitTime, 5);
List<WriteStatus> writeStatuses = client.upsert(jsc.parallelize(records, 1), newCommitTime).collect();
assertNoWriteErrors(writeStatuses);
// Post rollback commit and metadata should be valid
validateMetadata(client);
}
}
/**
* Test non-partitioned datasets.
*/
//@Test
public void testNonPartitioned() throws Exception {
init(HoodieTableType.COPY_ON_WRITE);
HoodieSparkEngineContext engineContext = new HoodieSparkEngineContext(jsc);
HoodieTestDataGenerator nonPartitionedGenerator = new HoodieTestDataGenerator(new String[] {""});
try (SparkRDDWriteClient client = new SparkRDDWriteClient(engineContext, getWriteConfig(true, true))) {
// Write 1 (Bulk insert)
String newCommitTime = "001";
List<HoodieRecord> records = nonPartitionedGenerator.generateInserts(newCommitTime, 10);
client.startCommitWithTime(newCommitTime);
List<WriteStatus> writeStatuses = client.bulkInsert(jsc.parallelize(records, 1), newCommitTime).collect();
validateMetadata(client);
List<String> metadataPartitions = metadata(client).getAllPartitionPaths();
assertTrue(metadataPartitions.contains(""), "Must contain empty partition");
}
}
/**
* Test various metrics published by metadata table.
*/
@Test
public void testMetadataMetrics() throws Exception {
init(HoodieTableType.COPY_ON_WRITE);
HoodieSparkEngineContext engineContext = new HoodieSparkEngineContext(jsc);
try (SparkRDDWriteClient client = new SparkRDDWriteClient(engineContext, getWriteConfigBuilder(true, true, true).build())) {
// Write
String newCommitTime = HoodieActiveTimeline.createNewInstantTime();
List<HoodieRecord> records = dataGen.generateInserts(newCommitTime, 20);
client.startCommitWithTime(newCommitTime);
List<WriteStatus> writeStatuses = client.insert(jsc.parallelize(records, 1), newCommitTime).collect();
assertNoWriteErrors(writeStatuses);
validateMetadata(client);
Registry metricsRegistry = Registry.getRegistry("HoodieMetadata");
assertTrue(metricsRegistry.getAllCounts().containsKey(HoodieMetadataMetrics.INITIALIZE_STR + ".count"));
assertTrue(metricsRegistry.getAllCounts().containsKey(HoodieMetadataMetrics.INITIALIZE_STR + ".totalDuration"));
assertTrue(metricsRegistry.getAllCounts().get(HoodieMetadataMetrics.INITIALIZE_STR + ".count") >= 1L);
assertTrue(metricsRegistry.getAllCounts().containsKey("basefile.size"));
assertTrue(metricsRegistry.getAllCounts().containsKey("logfile.size"));
assertTrue(metricsRegistry.getAllCounts().containsKey("basefile.count"));
assertTrue(metricsRegistry.getAllCounts().containsKey("logfile.count"));
}
}
/**
* Test when reading from metadata table which is out of sync with dataset that results are still consistent.
*/
@Test
public void testMetadataOutOfSync() throws Exception {
init(HoodieTableType.COPY_ON_WRITE);
HoodieSparkEngineContext engineContext = new HoodieSparkEngineContext(jsc);
SparkRDDWriteClient unsyncedClient = new SparkRDDWriteClient(engineContext, getWriteConfig(true, true));
// Enable metadata so table is initialized
try (SparkRDDWriteClient client = new SparkRDDWriteClient(engineContext, getWriteConfig(true, true))) {
// Perform Bulk Insert
String newCommitTime = "001";
client.startCommitWithTime(newCommitTime);
List<HoodieRecord> records = dataGen.generateInserts(newCommitTime, 20);
client.bulkInsert(jsc.parallelize(records, 1), newCommitTime).collect();
}
// Perform commit operations with metadata disabled
try (SparkRDDWriteClient client = new SparkRDDWriteClient(engineContext, getWriteConfig(true, false))) {
// Perform Insert
String newCommitTime = "002";
client.startCommitWithTime(newCommitTime);
List<HoodieRecord> records = dataGen.generateInserts(newCommitTime, 20);
client.insert(jsc.parallelize(records, 1), newCommitTime).collect();
// Perform Upsert
newCommitTime = "003";
client.startCommitWithTime(newCommitTime);
records = dataGen.generateUniqueUpdates(newCommitTime, 20);
client.upsert(jsc.parallelize(records, 1), newCommitTime).collect();
// Compaction
if (metaClient.getTableType() == HoodieTableType.MERGE_ON_READ) {
newCommitTime = "004";
client.scheduleCompactionAtInstant(newCommitTime, Option.empty());
client.compact(newCommitTime);
}
}
assertFalse(metadata(unsyncedClient).isInSync());
validateMetadata(unsyncedClient);
// Perform clean operation with metadata disabled
try (SparkRDDWriteClient client = new SparkRDDWriteClient(engineContext, getWriteConfig(true, false))) {
// One more commit needed to trigger clean so upsert and compact
String newCommitTime = "005";
client.startCommitWithTime(newCommitTime);
List<HoodieRecord> records = dataGen.generateUpdates(newCommitTime, 20);
client.upsert(jsc.parallelize(records, 1), newCommitTime).collect();
if (metaClient.getTableType() == HoodieTableType.MERGE_ON_READ) {
newCommitTime = "006";
client.scheduleCompactionAtInstant(newCommitTime, Option.empty());
client.compact(newCommitTime);
}
// Clean
newCommitTime = "007";
client.clean(newCommitTime);
}
assertFalse(metadata(unsyncedClient).isInSync());
validateMetadata(unsyncedClient);
// Perform restore with metadata disabled
try (SparkRDDWriteClient client = new SparkRDDWriteClient(engineContext, getWriteConfig(true, false))) {
client.restoreToInstant("004");
}
assertFalse(metadata(unsyncedClient).isInSync());
validateMetadata(unsyncedClient);
}
/**
* Validate the metadata tables contents to ensure it matches what is on the file system.
*/
private void validateMetadata(SparkRDDWriteClient testClient) throws IOException {
HoodieWriteConfig config = testClient.getConfig();
SparkRDDWriteClient client;
if (config.isEmbeddedTimelineServerEnabled()) {
testClient.close();
client = new SparkRDDWriteClient(testClient.getEngineContext(), testClient.getConfig());
} else {
client = testClient;
}
HoodieTableMetadata tableMetadata = metadata(client);
assertNotNull(tableMetadata, "MetadataReader should have been initialized");
if (!config.useFileListingMetadata()) {
return;
}
HoodieTimer timer = new HoodieTimer().startTimer();
HoodieSparkEngineContext engineContext = new HoodieSparkEngineContext(jsc);
// Partitions should match
FileSystemBackedTableMetadata fsBackedTableMetadata = new FileSystemBackedTableMetadata(engineContext,
new SerializableConfiguration(hadoopConf), config.getBasePath(), config.shouldAssumeDatePartitioning());
List<String> fsPartitions = fsBackedTableMetadata.getAllPartitionPaths();
List<String> metadataPartitions = tableMetadata.getAllPartitionPaths();
Collections.sort(fsPartitions);
Collections.sort(metadataPartitions);
assertEquals(fsPartitions.size(), metadataPartitions.size(), "Partitions should match");
assertTrue(fsPartitions.equals(metadataPartitions), "Partitions should match");
// Files within each partition should match
metaClient = HoodieTableMetaClient.reload(metaClient);
HoodieTable table = HoodieSparkTable.create(config, engineContext);
TableFileSystemView tableView = table.getHoodieView();
List<String> fullPartitionPaths = fsPartitions.stream().map(partition -> basePath + "/" + partition).collect(Collectors.toList());
Map<String, FileStatus[]> partitionToFilesMap = tableMetadata.getAllFilesInPartitions(fullPartitionPaths);
assertEquals(fsPartitions.size(), partitionToFilesMap.size());
fsPartitions.forEach(partition -> {
try {
Path partitionPath;
if (partition.equals("")) {
// Should be the non-partitioned case
partitionPath = new Path(basePath);
} else {
partitionPath = new Path(basePath, partition);
}
FileStatus[] fsStatuses = FSUtils.getAllDataFilesInPartition(fs, partitionPath);
FileStatus[] metaStatuses = tableMetadata.getAllFilesInPartition(partitionPath);
List<String> fsFileNames = Arrays.stream(fsStatuses)
.map(s -> s.getPath().getName()).collect(Collectors.toList());
List<String> metadataFilenames = Arrays.stream(metaStatuses)
.map(s -> s.getPath().getName()).collect(Collectors.toList());
Collections.sort(fsFileNames);
Collections.sort(metadataFilenames);
assertEquals(fsStatuses.length, partitionToFilesMap.get(basePath + "/" + partition).length);
// File sizes should be valid
Arrays.stream(metaStatuses).forEach(s -> assertTrue(s.getLen() > 0));
// Block sizes should be valid
Arrays.stream(metaStatuses).forEach(s -> assertTrue(s.getBlockSize() > 0));
List<Long> fsBlockSizes = Arrays.stream(fsStatuses).map(FileStatus::getBlockSize).collect(Collectors.toList());
Collections.sort(fsBlockSizes);
List<Long> metadataBlockSizes = Arrays.stream(metaStatuses).map(FileStatus::getBlockSize).collect(Collectors.toList());
Collections.sort(metadataBlockSizes);
assertEquals(fsBlockSizes, metadataBlockSizes);
if ((fsFileNames.size() != metadataFilenames.size()) || (!fsFileNames.equals(metadataFilenames))) {
LOG.info("*** File system listing = " + Arrays.toString(fsFileNames.toArray()));
LOG.info("*** Metadata listing = " + Arrays.toString(metadataFilenames.toArray()));
for (String fileName : fsFileNames) {
if (!metadataFilenames.contains(fileName)) {
LOG.error(partition + "FsFilename " + fileName + " not found in Meta data");
}
}
for (String fileName : metadataFilenames) {
if (!fsFileNames.contains(fileName)) {
LOG.error(partition + "Metadata file " + fileName + " not found in original FS");
}
}
}
assertEquals(fsFileNames.size(), metadataFilenames.size(), "Files within partition " + partition + " should match");
assertTrue(fsFileNames.equals(metadataFilenames), "Files within partition " + partition + " should match");
// FileSystemView should expose the same data
List<HoodieFileGroup> fileGroups = tableView.getAllFileGroups(partition).collect(Collectors.toList());
fileGroups.addAll(tableView.getAllReplacedFileGroups(partition).collect(Collectors.toList()));
fileGroups.forEach(g -> LogManager.getLogger(TestHoodieBackedMetadata.class).info(g));
fileGroups.forEach(g -> g.getAllBaseFiles().forEach(b -> LogManager.getLogger(TestHoodieBackedMetadata.class).info(b)));
fileGroups.forEach(g -> g.getAllFileSlices().forEach(s -> LogManager.getLogger(TestHoodieBackedMetadata.class).info(s)));
long numFiles = fileGroups.stream()
.mapToLong(g -> g.getAllBaseFiles().count() + g.getAllFileSlices().mapToLong(s -> s.getLogFiles().count()).sum())
.sum();
assertEquals(metadataFilenames.size(), numFiles);
} catch (IOException e) {
// TODO Auto-generated catch block
e.printStackTrace();
assertTrue(false, "Exception should not be raised: " + e);
}
});
HoodieBackedTableMetadataWriter metadataWriter = metadataWriter(client);
assertNotNull(metadataWriter, "MetadataWriter should have been initialized");
// Validate write config for metadata table
HoodieWriteConfig metadataWriteConfig = metadataWriter.getWriteConfig();
assertFalse(metadataWriteConfig.useFileListingMetadata(), "No metadata table for metadata table");
assertFalse(metadataWriteConfig.getFileListingMetadataVerify(), "No verify for metadata table");
// Metadata table should be in sync with the dataset
assertTrue(metadata(client).isInSync());
HoodieTableMetaClient metadataMetaClient = HoodieTableMetaClient.builder().setConf(hadoopConf).setBasePath(metadataTableBasePath).build();
// Metadata table is MOR
assertEquals(metadataMetaClient.getTableType(), HoodieTableType.MERGE_ON_READ, "Metadata Table should be MOR");
// Metadata table is HFile format
assertEquals(metadataMetaClient.getTableConfig().getBaseFileFormat(), HoodieFileFormat.HFILE,
"Metadata Table base file format should be HFile");
// Metadata table has a fixed number of partitions
// Cannot use FSUtils.getAllFoldersWithPartitionMetaFile for this as that function filters all directory
// in the .hoodie folder.
List<String> metadataTablePartitions = FSUtils.getAllPartitionPaths(engineContext, HoodieTableMetadata.getMetadataTableBasePath(basePath),
false, false, false);
Assertions.assertEquals(MetadataPartitionType.values().length, metadataTablePartitions.size());
// Metadata table should automatically compact and clean
// versions are +1 as autoclean / compaction happens end of commits
int numFileVersions = metadataWriteConfig.getCleanerFileVersionsRetained() + 1;
HoodieTableFileSystemView fsView = new HoodieTableFileSystemView(metadataMetaClient, metadataMetaClient.getActiveTimeline());
metadataTablePartitions.forEach(partition -> {
List<FileSlice> latestSlices = fsView.getLatestFileSlices(partition).collect(Collectors.toList());
assertTrue(latestSlices.stream().map(FileSlice::getBaseFile).count() <= 1, "Should have a single latest base file");
assertTrue(latestSlices.size() <= 1, "Should have a single latest file slice");
assertTrue(latestSlices.size() <= numFileVersions, "Should limit file slice to "
+ numFileVersions + " but was " + latestSlices.size());
});
LOG.info("Validation time=" + timer.endTimer());
}
private HoodieBackedTableMetadataWriter metadataWriter(SparkRDDWriteClient client) {
return (HoodieBackedTableMetadataWriter) SparkHoodieBackedTableMetadataWriter
.create(hadoopConf, client.getConfig(), new HoodieSparkEngineContext(jsc));
}
private HoodieTableMetadata metadata(SparkRDDWriteClient client) {
HoodieWriteConfig clientConfig = client.getConfig();
return HoodieTableMetadata.create(client.getEngineContext(), clientConfig.getMetadataConfig(), clientConfig.getBasePath(),
clientConfig.getSpillableMapBasePath());
}
// TODO: this can be moved to TestHarness after merge from master
private void assertNoWriteErrors(List<WriteStatus> statuses) {
// Verify there are no errors
for (WriteStatus status : statuses) {
assertFalse(status.hasErrors(), "Errors found in write of " + status.getFileId());
}
}
private HoodieWriteConfig getWriteConfig(boolean autoCommit, boolean useFileListingMetadata) {
return getWriteConfigBuilder(autoCommit, useFileListingMetadata, false).build();
}
private HoodieWriteConfig.Builder getWriteConfigBuilder(boolean autoCommit, boolean useFileListingMetadata, boolean enableMetrics) {
return getWriteConfigBuilder(HoodieFailedWritesCleaningPolicy.EAGER, autoCommit, useFileListingMetadata, enableMetrics);
}
private HoodieWriteConfig.Builder getWriteConfigBuilder(HoodieFailedWritesCleaningPolicy policy, boolean autoCommit, boolean useFileListingMetadata, boolean enableMetrics) {
return HoodieWriteConfig.newBuilder().withPath(basePath).withSchema(TRIP_EXAMPLE_SCHEMA)
.withParallelism(2, 2).withDeleteParallelism(2).withRollbackParallelism(2).withFinalizeWriteParallelism(2)
.withAutoCommit(autoCommit)
.withCompactionConfig(HoodieCompactionConfig.newBuilder().compactionSmallFileSize(1024 * 1024 * 1024)
.withInlineCompaction(false).withMaxNumDeltaCommitsBeforeCompaction(1)
.withFailedWritesCleaningPolicy(policy)
.withAutoClean(false).retainCommits(1).retainFileVersions(1).build())
.withStorageConfig(HoodieStorageConfig.newBuilder().hfileMaxFileSize(1024 * 1024 * 1024).build())
.withEmbeddedTimelineServerEnabled(true).forTable("test-trip-table")
.withFileSystemViewConfig(new FileSystemViewStorageConfig.Builder()
.withEnableBackupForRemoteFileSystemView(false).build())
.withIndexConfig(HoodieIndexConfig.newBuilder().withIndexType(HoodieIndex.IndexType.BLOOM).build())
.withMetadataConfig(HoodieMetadataConfig.newBuilder()
.enable(useFileListingMetadata)
.enableMetrics(enableMetrics).build())
.withMetricsConfig(HoodieMetricsConfig.newBuilder().on(enableMetrics)
.withExecutorMetrics(true).usePrefix("unit-test").build());
}
@Override
protected HoodieTableType getTableType() {
return tableType;
}
}