blob: 7d0cc12abce5aec39d17932692f1e117850532ef [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hudi.client;
import org.apache.hudi.client.transaction.ConflictResolutionStrategy;
import org.apache.hudi.client.transaction.PreferWriterConflictResolutionStrategy;
import org.apache.hudi.client.transaction.SimpleConcurrentFileWritesConflictResolutionStrategy;
import org.apache.hudi.client.transaction.lock.FileSystemBasedLockProvider;
import org.apache.hudi.client.transaction.lock.InProcessLockProvider;
import org.apache.hudi.client.transaction.lock.ZookeeperBasedLockProvider;
import org.apache.hudi.common.config.HoodieStorageConfig;
import org.apache.hudi.common.config.LockConfiguration;
import org.apache.hudi.common.model.HoodieFailedWritesCleaningPolicy;
import org.apache.hudi.common.model.HoodieFileFormat;
import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.common.model.HoodieTableType;
import org.apache.hudi.common.model.TableServiceType;
import org.apache.hudi.common.model.WriteConcurrencyMode;
import org.apache.hudi.common.table.HoodieTableMetaClient;
import org.apache.hudi.common.table.marker.MarkerType;
import org.apache.hudi.common.table.timeline.HoodieActiveTimeline;
import org.apache.hudi.common.table.timeline.HoodieInstant;
import org.apache.hudi.common.table.view.FileSystemViewStorageConfig;
import org.apache.hudi.common.table.view.FileSystemViewStorageType;
import org.apache.hudi.common.testutils.HoodieTestUtils;
import org.apache.hudi.common.util.FileIOUtils;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.config.HoodieArchivalConfig;
import org.apache.hudi.config.HoodieCleanConfig;
import org.apache.hudi.config.HoodieClusteringConfig;
import org.apache.hudi.config.HoodieCompactionConfig;
import org.apache.hudi.config.HoodieLockConfig;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.exception.HoodieWriteConflictException;
import org.apache.hudi.table.action.HoodieWriteMetadata;
import org.apache.hudi.table.marker.SimpleDirectMarkerBasedDetectionStrategy;
import org.apache.hudi.table.marker.SimpleTransactionDirectMarkerBasedDetectionStrategy;
import org.apache.hudi.testutils.HoodieClientTestBase;
import org.apache.hudi.timeline.service.handlers.marker.AsyncTimelineServerBasedDetectionStrategy;
import org.apache.curator.test.TestingServer;
import org.apache.hadoop.fs.Path;
import org.apache.spark.SparkException;
import org.apache.spark.api.java.JavaRDD;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.Arguments;
import org.junit.jupiter.params.provider.EnumSource;
import org.junit.jupiter.params.provider.MethodSource;
import java.io.File;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashSet;
import java.util.List;
import java.util.Properties;
import java.util.Set;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.CyclicBarrier;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import static org.apache.hudi.common.config.LockConfiguration.FILESYSTEM_LOCK_EXPIRE_PROP_KEY;
import static org.apache.hudi.common.config.LockConfiguration.FILESYSTEM_LOCK_PATH_PROP_KEY;
import static org.apache.hudi.common.config.LockConfiguration.LOCK_ACQUIRE_NUM_RETRIES_PROP_KEY;
import static org.apache.hudi.common.config.LockConfiguration.LOCK_ACQUIRE_RETRY_WAIT_TIME_IN_MILLIS_PROP_KEY;
import static org.apache.hudi.common.config.LockConfiguration.LOCK_ACQUIRE_WAIT_TIMEOUT_MS_PROP_KEY;
import static org.apache.hudi.common.config.LockConfiguration.ZK_BASE_PATH_PROP_KEY;
import static org.apache.hudi.common.config.LockConfiguration.ZK_CONNECTION_TIMEOUT_MS_PROP_KEY;
import static org.apache.hudi.common.config.LockConfiguration.ZK_CONNECT_URL_PROP_KEY;
import static org.apache.hudi.common.config.LockConfiguration.ZK_LOCK_KEY_PROP_KEY;
import static org.apache.hudi.common.config.LockConfiguration.ZK_SESSION_TIMEOUT_MS_PROP_KEY;
import static org.apache.hudi.testutils.Assertions.assertNoWriteErrors;
import static org.junit.jupiter.api.Assertions.assertDoesNotThrow;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.junit.jupiter.api.Assertions.fail;
public class TestHoodieClientMultiWriter extends HoodieClientTestBase {
private Properties lockProperties = null;
/**
* super is not thread safe!!
**/
@Override
public SparkRDDWriteClient getHoodieWriteClient(HoodieWriteConfig cfg) {
return new SparkRDDWriteClient(context, cfg);
}
@BeforeEach
public void setup() throws IOException {
if (lockProperties == null) {
lockProperties = new Properties();
lockProperties.setProperty(FILESYSTEM_LOCK_PATH_PROP_KEY, basePath + "/.hoodie/.locks");
lockProperties.setProperty(LockConfiguration.LOCK_ACQUIRE_WAIT_TIMEOUT_MS_PROP_KEY, "3000");
lockProperties.setProperty(FILESYSTEM_LOCK_EXPIRE_PROP_KEY, "1");
lockProperties.setProperty(LOCK_ACQUIRE_WAIT_TIMEOUT_MS_PROP_KEY, "1000");
lockProperties.setProperty(LOCK_ACQUIRE_RETRY_WAIT_TIME_IN_MILLIS_PROP_KEY, "1000");
lockProperties.setProperty(LOCK_ACQUIRE_NUM_RETRIES_PROP_KEY, "3");
}
}
public void setUpMORTestTable() throws IOException {
cleanupResources();
initPath();
initSparkContexts();
initTestDataGenerator();
initFileSystem();
fs.mkdirs(new Path(basePath));
metaClient = HoodieTestUtils.init(hadoopConf, basePath, HoodieTableType.MERGE_ON_READ, HoodieFileFormat.PARQUET);
initTestDataGenerator();
}
@AfterEach
public void clean() throws IOException {
cleanupResources();
}
private static final List<Class> LOCK_PROVIDER_CLASSES = Arrays.asList(
InProcessLockProvider.class,
FileSystemBasedLockProvider.class);
private static final List<ConflictResolutionStrategy> CONFLICT_RESOLUTION_STRATEGY_CLASSES = Arrays.asList(
new SimpleConcurrentFileWritesConflictResolutionStrategy(),
new PreferWriterConflictResolutionStrategy());
private static Iterable<Object[]> providerClassResolutionStrategyAndTableType() {
List<Object[]> opts = new ArrayList<>();
for (Object providerClass : LOCK_PROVIDER_CLASSES) {
for (ConflictResolutionStrategy resolutionStrategy : CONFLICT_RESOLUTION_STRATEGY_CLASSES) {
opts.add(new Object[] {HoodieTableType.COPY_ON_WRITE, providerClass, resolutionStrategy});
opts.add(new Object[] {HoodieTableType.MERGE_ON_READ, providerClass, resolutionStrategy});
}
}
return opts;
}
/**
* Test multi-writers with early conflict detect enable, including
* 1. MOR + Direct marker
* 2. COW + Direct marker
* 3. MOR + Timeline server based marker
* 4. COW + Timeline server based marker
* <p>
* |---------------------- 003 heartBeat expired -------------------|
* <p>
* ---|---------|--------------------|--------------------------------------|-------------------------|-------------------------> time
* init 001
* 002 start writing
* 003 start which has conflict with 002
* and failed soon
* 002 commit successfully 004 write successfully
*
* @param tableType
* @param markerType
* @throws Exception
*/
@ParameterizedTest
@MethodSource("configParams")
public void testHoodieClientBasicMultiWriterWithEarlyConflictDetection(String tableType, String markerType, String earlyConflictDetectionStrategy) throws Exception {
if (tableType.equalsIgnoreCase(HoodieTableType.MERGE_ON_READ.name())) {
setUpMORTestTable();
}
int heartBeatIntervalForCommit4 = 10 * 1000;
HoodieWriteConfig writeConfig;
TestingServer server = null;
if (earlyConflictDetectionStrategy.equalsIgnoreCase(SimpleTransactionDirectMarkerBasedDetectionStrategy.class.getName())) {
// need to setup zk related env there. Bcz SimpleTransactionDirectMarkerBasedDetectionStrategy is only support zk lock for now.
server = new TestingServer();
Properties properties = new Properties();
properties.setProperty(ZK_BASE_PATH_PROP_KEY, basePath);
properties.setProperty(ZK_CONNECT_URL_PROP_KEY, server.getConnectString());
properties.setProperty(ZK_BASE_PATH_PROP_KEY, server.getTempDirectory().getAbsolutePath());
properties.setProperty(ZK_SESSION_TIMEOUT_MS_PROP_KEY, "10000");
properties.setProperty(ZK_CONNECTION_TIMEOUT_MS_PROP_KEY, "10000");
properties.setProperty(ZK_LOCK_KEY_PROP_KEY, "key");
properties.setProperty(LOCK_ACQUIRE_WAIT_TIMEOUT_MS_PROP_KEY, "1000");
writeConfig = buildWriteConfigForEarlyConflictDetect(markerType, properties, ZookeeperBasedLockProvider.class, earlyConflictDetectionStrategy);
} else {
Properties properties = new Properties();
properties.setProperty(FILESYSTEM_LOCK_PATH_PROP_KEY, basePath + "/.hoodie/.locks");
properties.setProperty(LockConfiguration.LOCK_ACQUIRE_WAIT_TIMEOUT_MS_PROP_KEY, "3000");
writeConfig = buildWriteConfigForEarlyConflictDetect(markerType, properties, InProcessLockProvider.class, earlyConflictDetectionStrategy);
}
final SparkRDDWriteClient client1 = getHoodieWriteClient(writeConfig);
// Create the first commit
final String nextCommitTime1 = "001";
createCommitWithInserts(writeConfig, client1, "000", nextCommitTime1, 200, true);
final SparkRDDWriteClient client2 = getHoodieWriteClient(writeConfig);
final SparkRDDWriteClient client3 = getHoodieWriteClient(writeConfig);
final String nextCommitTime2 = "002";
// start to write commit 002
final JavaRDD<WriteStatus> writeStatusList2 = startCommitForUpdate(writeConfig, client2, nextCommitTime2, 100);
// start to write commit 003
// this commit 003 will fail quickly because early conflict detection before create marker.
final String nextCommitTime3 = "003";
assertThrows(SparkException.class, () -> {
final JavaRDD<WriteStatus> writeStatusList3 = startCommitForUpdate(writeConfig, client3, nextCommitTime3, 100);
client3.commit(nextCommitTime3, writeStatusList3);
}, "Early conflict detected but cannot resolve conflicts for overlapping writes");
// start to commit 002 and success
assertDoesNotThrow(() -> {
client2.commit(nextCommitTime2, writeStatusList2);
});
HoodieWriteConfig config4 = HoodieWriteConfig.newBuilder().withProperties(writeConfig.getProps()).withHeartbeatIntervalInMs(heartBeatIntervalForCommit4).build();
final SparkRDDWriteClient client4 = getHoodieWriteClient(config4);
Path heartbeatFilePath = new Path(HoodieTableMetaClient.getHeartbeatFolderPath(basePath) + Path.SEPARATOR + nextCommitTime3);
fs.create(heartbeatFilePath, true);
// Wait for heart beat expired for failed commitTime3 "003"
// Otherwise commit4 still can see conflict between failed write 003.
Thread.sleep(heartBeatIntervalForCommit4 * 2);
final String nextCommitTime4 = "004";
assertDoesNotThrow(() -> {
final JavaRDD<WriteStatus> writeStatusList4 = startCommitForUpdate(writeConfig, client4, nextCommitTime4, 100);
client4.commit(nextCommitTime4, writeStatusList4);
});
List<String> completedInstant = metaClient.reloadActiveTimeline().getCommitsTimeline()
.filterCompletedInstants().getInstants().stream()
.map(HoodieInstant::getTimestamp).collect(Collectors.toList());
assertEquals(3, completedInstant.size());
assertTrue(completedInstant.contains(nextCommitTime1));
assertTrue(completedInstant.contains(nextCommitTime2));
assertTrue(completedInstant.contains(nextCommitTime4));
FileIOUtils.deleteDirectory(new File(basePath));
if (server != null) {
server.close();
}
client1.close();
client2.close();
client3.close();
client4.close();
}
@ParameterizedTest
@MethodSource("providerClassResolutionStrategyAndTableType")
public void testHoodieClientBasicMultiWriter(HoodieTableType tableType, Class providerClass,
ConflictResolutionStrategy resolutionStrategy) throws Exception {
if (tableType == HoodieTableType.MERGE_ON_READ) {
setUpMORTestTable();
}
lockProperties.setProperty(LockConfiguration.LOCK_ACQUIRE_WAIT_TIMEOUT_MS_PROP_KEY, "3000");
HoodieWriteConfig writeConfig = getConfigBuilder()
.withCleanConfig(HoodieCleanConfig.newBuilder()
.withFailedWritesCleaningPolicy(HoodieFailedWritesCleaningPolicy.LAZY)
.withAutoClean(false).build())
.withArchivalConfig(HoodieArchivalConfig.newBuilder()
.withAutoArchive(false).build())
.withWriteConcurrencyMode(WriteConcurrencyMode.OPTIMISTIC_CONCURRENCY_CONTROL)
// Timeline-server-based markers are not used for multi-writer tests
.withMarkersType(MarkerType.DIRECT.name())
.withLockConfig(HoodieLockConfig.newBuilder().withLockProvider(providerClass)
.withConflictResolutionStrategy(resolutionStrategy)
.build()).withAutoCommit(false).withProperties(lockProperties).build();
// Create the first commit
createCommitWithInserts(writeConfig, getHoodieWriteClient(writeConfig), "000", "001", 200, true);
final int threadCount = 2;
final ExecutorService executors = Executors.newFixedThreadPool(2);
final SparkRDDWriteClient client1 = getHoodieWriteClient(writeConfig);
final SparkRDDWriteClient client2 = getHoodieWriteClient(writeConfig);
final CyclicBarrier cyclicBarrier = new CyclicBarrier(threadCount);
final AtomicBoolean writer1Completed = new AtomicBoolean(false);
final AtomicBoolean writer2Completed = new AtomicBoolean(false);
Future future1 = executors.submit(() -> {
try {
final String nextCommitTime = HoodieActiveTimeline.createNewInstantTime();
final JavaRDD<WriteStatus> writeStatusList = startCommitForUpdate(writeConfig, client1, nextCommitTime, 100);
// Wait for the 2nd writer to start the commit
cyclicBarrier.await(60, TimeUnit.SECONDS);
// Commit the update before the 2nd writer
assertDoesNotThrow(() -> {
client1.commit(nextCommitTime, writeStatusList);
});
// Signal the 2nd writer to go ahead for his commit
cyclicBarrier.await(60, TimeUnit.SECONDS);
writer1Completed.set(true);
} catch (Exception e) {
writer1Completed.set(false);
}
});
Future future2 = executors.submit(() -> {
try {
final String nextCommitTime = HoodieActiveTimeline.createNewInstantTime();
// Wait for the 1st writer to make progress with the commit
cyclicBarrier.await(60, TimeUnit.SECONDS);
final JavaRDD<WriteStatus> writeStatusList = startCommitForUpdate(writeConfig, client2, nextCommitTime, 100);
// Wait for the 1st writer to complete the commit
cyclicBarrier.await(60, TimeUnit.SECONDS);
assertThrows(HoodieWriteConflictException.class, () -> {
client2.commit(nextCommitTime, writeStatusList);
});
writer2Completed.set(true);
} catch (Exception e) {
writer2Completed.set(false);
}
});
future1.get();
future2.get();
// both should have been completed successfully. I mean, we already assert for conflict for writer2 at L155.
assertTrue(writer1Completed.get() && writer2Completed.get());
client1.close();
client2.close();
}
@ParameterizedTest
@EnumSource(value = HoodieTableType.class, names = {"COPY_ON_WRITE", "MERGE_ON_READ"})
public void testMultiWriterWithInsertsToDistinctPartitions(HoodieTableType tableType) throws Exception {
if (tableType == HoodieTableType.MERGE_ON_READ) {
setUpMORTestTable();
}
lockProperties.setProperty(LockConfiguration.LOCK_ACQUIRE_WAIT_TIMEOUT_MS_PROP_KEY, "3000");
lockProperties.setProperty(LockConfiguration.LOCK_ACQUIRE_CLIENT_RETRY_WAIT_TIME_IN_MILLIS_PROP_KEY, "3000");
lockProperties.setProperty(LockConfiguration.LOCK_ACQUIRE_CLIENT_NUM_RETRIES_PROP_KEY, "20");
HoodieWriteConfig cfg = getConfigBuilder()
.withCleanConfig(HoodieCleanConfig.newBuilder()
.withFailedWritesCleaningPolicy(HoodieFailedWritesCleaningPolicy.LAZY)
.build())
.withCompactionConfig(HoodieCompactionConfig.newBuilder()
.withInlineCompaction(false)
.withMaxNumDeltaCommitsBeforeCompaction(2)
.build())
.withWriteConcurrencyMode(WriteConcurrencyMode.OPTIMISTIC_CONCURRENCY_CONTROL)
.withLockConfig(HoodieLockConfig.newBuilder()
.withLockProvider(InProcessLockProvider.class)
.build())
.withAutoCommit(false)
.withEmbeddedTimelineServerEnabled(false)
// Timeline-server-based markers are not used for multi-writer tests
.withMarkersType(MarkerType.DIRECT.name())
.withProperties(lockProperties)
.withFileSystemViewConfig(FileSystemViewStorageConfig.newBuilder()
.withStorageType(FileSystemViewStorageType.MEMORY)
.build())
.build();
// Create the first commit
SparkRDDWriteClient<?> client = getHoodieWriteClient(cfg);
createCommitWithInsertsForPartition(cfg, client, "000", "001", 100, "2016/03/01");
client.close();
int numConcurrentWriters = 5;
ExecutorService executors = Executors.newFixedThreadPool(numConcurrentWriters);
List<Future<?>> futures = new ArrayList<>(numConcurrentWriters);
for (int loop = 0; loop < numConcurrentWriters; loop++) {
String newCommitTime = "00" + (loop + 2);
String partition = "2016/03/0" + (loop + 2);
futures.add(executors.submit(() -> {
try {
SparkRDDWriteClient<?> writeClient = getHoodieWriteClient(cfg);
createCommitWithInsertsForPartition(cfg, writeClient, "001", newCommitTime, 100, partition);
writeClient.close();
} catch (Exception e) {
throw new RuntimeException(e);
}
}));
}
futures.forEach(f -> {
try {
f.get();
} catch (Exception e) {
throw new RuntimeException(e);
}
});
}
/**
* Count down the latch and await for all the needed threads to join.
*
* @param latch - Count down latch
* @param waitTimeMillis - Max wait time in millis for waiting
*/
private void latchCountDownAndWait(CountDownLatch latch, long waitTimeMillis) {
latch.countDown();
try {
latch.await(waitTimeMillis, TimeUnit.MILLISECONDS);
} catch (InterruptedException e) {
//
}
}
@ParameterizedTest
@MethodSource("providerClassResolutionStrategyAndTableType")
public void testMultiWriterWithAsyncTableServicesWithConflict(HoodieTableType tableType, Class providerClass,
ConflictResolutionStrategy resolutionStrategy) throws Exception {
// create inserts X 1
if (tableType == HoodieTableType.MERGE_ON_READ) {
setUpMORTestTable();
}
// Disabling embedded timeline server, it doesn't work with multiwriter
HoodieWriteConfig.Builder writeConfigBuilder = getConfigBuilder()
.withCleanConfig(HoodieCleanConfig.newBuilder()
.withAutoClean(false)
.withAsyncClean(true)
.retainCommits(0)
.withFailedWritesCleaningPolicy(HoodieFailedWritesCleaningPolicy.LAZY).build())
.withCompactionConfig(HoodieCompactionConfig.newBuilder()
.withInlineCompaction(false)
.withMaxNumDeltaCommitsBeforeCompaction(2).build())
.withEmbeddedTimelineServerEnabled(false)
// Timeline-server-based markers are not used for multi-writer tests
.withMarkersType(MarkerType.DIRECT.name())
.withFileSystemViewConfig(FileSystemViewStorageConfig.newBuilder().withStorageType(
FileSystemViewStorageType.MEMORY).build())
.withWriteConcurrencyMode(WriteConcurrencyMode.OPTIMISTIC_CONCURRENCY_CONTROL)
.withLockConfig(HoodieLockConfig.newBuilder().withLockProvider(providerClass)
.withConflictResolutionStrategy(resolutionStrategy)
.build()).withAutoCommit(false).withProperties(lockProperties);
Set<String> validInstants = new HashSet<>();
// Create the first commit with inserts
HoodieWriteConfig cfg = writeConfigBuilder.build();
SparkRDDWriteClient client = getHoodieWriteClient(cfg);
String firstCommitTime = HoodieActiveTimeline.createNewInstantTime();
createCommitWithInserts(cfg, client, "000", firstCommitTime, 200, true);
validInstants.add(firstCommitTime);
// Create 2 commits with upserts
String secondCommitTime = HoodieActiveTimeline.createNewInstantTime();
createCommitWithUpserts(cfg, client, firstCommitTime, "000", secondCommitTime, 100);
String thirdCommitTime = HoodieActiveTimeline.createNewInstantTime();
createCommitWithUpserts(cfg, client, secondCommitTime, "000", thirdCommitTime, 100);
validInstants.add(secondCommitTime);
validInstants.add(thirdCommitTime);
// Three clients running actions in parallel
final int threadCount = 3;
final CountDownLatch scheduleCountDownLatch = new CountDownLatch(threadCount);
final ExecutorService executors = Executors.newFixedThreadPool(threadCount);
// Write config with clustering enabled
final HoodieWriteConfig cfg2 = writeConfigBuilder
.withClusteringConfig(HoodieClusteringConfig.newBuilder()
.withInlineClustering(true)
.withInlineClusteringNumCommits(1)
.build())
.build();
final SparkRDDWriteClient client1 = getHoodieWriteClient(cfg2);
final SparkRDDWriteClient client2 = getHoodieWriteClient(cfg);
final SparkRDDWriteClient client3 = getHoodieWriteClient(cfg);
// Create upserts, schedule cleaning, schedule compaction in parallel
Future future1 = executors.submit(() -> {
final String newCommitTime = HoodieActiveTimeline.createNewInstantTime();
final int numRecords = 100;
final String commitTimeBetweenPrevAndNew = secondCommitTime;
// We want the upsert to go through only after the compaction
// and cleaning schedule completion. So, waiting on latch here.
latchCountDownAndWait(scheduleCountDownLatch, 30000);
if (tableType == HoodieTableType.MERGE_ON_READ) {
// Since the compaction already went in, this upsert has
// to fail
assertThrows(IllegalArgumentException.class, () -> {
createCommitWithUpserts(cfg, client1, thirdCommitTime, commitTimeBetweenPrevAndNew, newCommitTime, numRecords);
});
} else {
// We don't have the compaction for COW and so this upsert
// has to pass
assertDoesNotThrow(() -> {
createCommitWithUpserts(cfg, client1, thirdCommitTime, commitTimeBetweenPrevAndNew, newCommitTime, numRecords);
});
validInstants.add(newCommitTime);
}
});
Future future2 = executors.submit(() -> {
if (tableType == HoodieTableType.MERGE_ON_READ) {
assertDoesNotThrow(() -> {
String compactionTimeStamp = HoodieActiveTimeline.createNewInstantTime();
client2.scheduleTableService(compactionTimeStamp, Option.empty(), TableServiceType.COMPACT);
});
}
latchCountDownAndWait(scheduleCountDownLatch, 30000);
});
Future future3 = executors.submit(() -> {
assertDoesNotThrow(() -> {
latchCountDownAndWait(scheduleCountDownLatch, 30000);
String cleanCommitTime = HoodieActiveTimeline.createNewInstantTime();
client3.scheduleTableService(cleanCommitTime, Option.empty(), TableServiceType.CLEAN);
});
});
future1.get();
future2.get();
future3.get();
String pendingCompactionTime = (tableType == HoodieTableType.MERGE_ON_READ)
? metaClient.reloadActiveTimeline().filterPendingCompactionTimeline()
.firstInstant().get().getTimestamp()
: "";
Option<HoodieInstant> pendingCleanInstantOp = metaClient.reloadActiveTimeline().getCleanerTimeline().filterInflightsAndRequested()
.firstInstant();
String pendingCleanTime = pendingCleanInstantOp.isPresent()
? pendingCleanInstantOp.get().getTimestamp()
: HoodieActiveTimeline.createNewInstantTime();
CountDownLatch runCountDownLatch = new CountDownLatch(threadCount);
// Create inserts, run cleaning, run compaction in parallel
future1 = executors.submit(() -> {
final String newCommitTime = HoodieActiveTimeline.createNewInstantTime();
final int numRecords = 100;
latchCountDownAndWait(runCountDownLatch, 30000);
assertDoesNotThrow(() -> {
createCommitWithInserts(cfg, client1, thirdCommitTime, newCommitTime, numRecords, true);
validInstants.add(newCommitTime);
});
});
future2 = executors.submit(() -> {
latchCountDownAndWait(runCountDownLatch, 30000);
if (tableType == HoodieTableType.MERGE_ON_READ) {
assertDoesNotThrow(() -> {
HoodieWriteMetadata<JavaRDD<WriteStatus>> compactionMetadata = client2.compact(pendingCompactionTime);
client2.commitCompaction(pendingCompactionTime, compactionMetadata.getCommitMetadata().get(), Option.empty());
validInstants.add(pendingCompactionTime);
});
}
});
future3 = executors.submit(() -> {
latchCountDownAndWait(runCountDownLatch, 30000);
assertDoesNotThrow(() -> {
client3.clean(pendingCleanTime, false);
validInstants.add(pendingCleanTime);
});
});
future1.get();
future2.get();
future3.get();
validInstants.addAll(
metaClient.reloadActiveTimeline().getCompletedReplaceTimeline()
.filterCompletedInstants().getInstantsAsStream().map(HoodieInstant::getTimestamp).collect(Collectors.toSet()));
Set<String> completedInstants = metaClient.reloadActiveTimeline().getCommitsTimeline()
.filterCompletedInstants().getInstantsAsStream().map(HoodieInstant::getTimestamp)
.collect(Collectors.toSet());
assertTrue(validInstants.containsAll(completedInstants));
client.close();
client1.close();
client2.close();
client3.close();
}
@ParameterizedTest
@EnumSource(value = HoodieTableType.class, names = {"MERGE_ON_READ", "COPY_ON_WRITE"})
public void testMultiWriterWithAsyncLazyCleanRollback(HoodieTableType tableType) throws Exception {
// create inserts X 1
if (tableType == HoodieTableType.MERGE_ON_READ) {
setUpMORTestTable();
}
// Disabling embedded timeline server, it doesn't work with multiwriter
HoodieWriteConfig.Builder writeConfigBuilder = getConfigBuilder()
.withCleanConfig(HoodieCleanConfig.newBuilder()
.withAutoClean(false)
.withAsyncClean(true)
.withFailedWritesCleaningPolicy(HoodieFailedWritesCleaningPolicy.LAZY).build())
.withCompactionConfig(HoodieCompactionConfig.newBuilder()
.withInlineCompaction(false)
.withMaxNumDeltaCommitsBeforeCompaction(2).build())
.withEmbeddedTimelineServerEnabled(false)
// Timeline-server-based markers are not used for multi-writer tests
.withMarkersType(MarkerType.DIRECT.name())
.withFileSystemViewConfig(FileSystemViewStorageConfig.newBuilder().withStorageType(
FileSystemViewStorageType.MEMORY).build())
.withWriteConcurrencyMode(WriteConcurrencyMode.OPTIMISTIC_CONCURRENCY_CONTROL)
// Set the config so that heartbeat will expire in 1 second without update
.withLockConfig(HoodieLockConfig.newBuilder().withLockProvider(InProcessLockProvider.class)
.build()).withAutoCommit(false).withProperties(lockProperties);
Set<String> validInstants = new HashSet<>();
// Create the first commit with inserts
HoodieWriteConfig cfg = writeConfigBuilder.build();
SparkRDDWriteClient client = getHoodieWriteClient(cfg);
createCommitWithInserts(cfg, client, "000", "001", 200, true);
validInstants.add("001");
// Three clients running actions in parallel
final int threadCount = 3;
final ExecutorService executor = Executors.newFixedThreadPool(threadCount);
final SparkRDDWriteClient client1 = getHoodieWriteClient(cfg);
final SparkRDDWriteClient client2 = getHoodieWriteClient(cfg);
final String commitTime2 = "002";
final String commitTime3 = "003";
AtomicReference<Object> writeStatus1 = new AtomicReference<>(null);
AtomicReference<Object> writeStatus2 = new AtomicReference<>(null);
Future future1 = executor.submit(() -> {
final int numRecords = 100;
assertDoesNotThrow(() -> {
writeStatus1.set(createCommitWithInserts(cfg, client1, "001", commitTime2, numRecords, false));
});
});
Future future2 = executor.submit(() -> {
final int numRecords = 100;
assertDoesNotThrow(() -> {
writeStatus2.set(createCommitWithInserts(cfg, client2, "001", commitTime3, numRecords, false));
client2.getHeartbeatClient().stop(commitTime3);
});
});
future1.get();
future2.get();
final CountDownLatch commitCountDownLatch = new CountDownLatch(1);
HoodieTableMetaClient tableMetaClient = client.getTableServiceClient().createMetaClient(true);
// Commit the instants and get instants to rollback in parallel
future1 = executor.submit(() -> {
client1.commit(commitTime2, writeStatus1.get());
commitCountDownLatch.countDown();
});
Future future3 = executor.submit(() -> {
try {
commitCountDownLatch.await(30000, TimeUnit.MILLISECONDS);
} catch (InterruptedException e) {
//
}
List<String> instantsToRollback =
client.getTableServiceClient().getInstantsToRollback(tableMetaClient, HoodieFailedWritesCleaningPolicy.LAZY, Option.empty());
// Only commit3 will be rollback, although commit2 is in the inflight timeline and has no heartbeat file
assertEquals(1, instantsToRollback.size());
assertEquals(commitTime3, instantsToRollback.get(0));
});
future1.get();
future3.get();
client.close();
client1.close();
client2.close();
}
@ParameterizedTest
@EnumSource(value = HoodieTableType.class, names = {"COPY_ON_WRITE", "MERGE_ON_READ"})
public void testHoodieClientMultiWriterWithClustering(HoodieTableType tableType) throws Exception {
if (tableType == HoodieTableType.MERGE_ON_READ) {
setUpMORTestTable();
}
Properties properties = new Properties();
properties.setProperty(FILESYSTEM_LOCK_PATH_PROP_KEY, basePath + "/.hoodie/.locks");
properties.setProperty(LockConfiguration.LOCK_ACQUIRE_WAIT_TIMEOUT_MS_PROP_KEY, "3000");
HoodieWriteConfig.Builder writeConfigBuilder = getConfigBuilder()
.withCleanConfig(HoodieCleanConfig.newBuilder()
.withFailedWritesCleaningPolicy(HoodieFailedWritesCleaningPolicy.LAZY)
.withAutoClean(false).build())
.withWriteConcurrencyMode(WriteConcurrencyMode.OPTIMISTIC_CONCURRENCY_CONTROL)
// Timeline-server-based markers are not used for multi-writer tests
.withMarkersType(MarkerType.DIRECT.name())
.withLockConfig(HoodieLockConfig.newBuilder().withLockProvider(InProcessLockProvider.class)
.withConflictResolutionStrategy(new SimpleConcurrentFileWritesConflictResolutionStrategy())
.build()).withAutoCommit(false).withProperties(properties);
HoodieWriteConfig cfg = writeConfigBuilder.build();
HoodieWriteConfig cfg2 = writeConfigBuilder.build();
HoodieWriteConfig cfg3 = writeConfigBuilder
.withClusteringConfig(HoodieClusteringConfig.newBuilder().withInlineClustering(true).withInlineClusteringNumCommits(1).build())
.build();
// Create the first commit
createCommitWithInserts(cfg, getHoodieWriteClient(cfg), "000", "001", 200, true);
// Start another inflight commit
String newCommitTime = "003";
int numRecords = 100;
SparkRDDWriteClient client1 = getHoodieWriteClient(cfg);
String commitTimeBetweenPrevAndNew = "002";
JavaRDD<WriteStatus> result1 = updateBatch(cfg, client1, newCommitTime, "001",
Option.of(Arrays.asList(commitTimeBetweenPrevAndNew)), "000", numRecords, SparkRDDWriteClient::upsert, false, false,
numRecords, 200, 2);
// Start and finish another commit while the previous writer for commit 003 is running
newCommitTime = "004";
SparkRDDWriteClient client2 = getHoodieWriteClient(cfg);
JavaRDD<WriteStatus> result2 = updateBatch(cfg2, client2, newCommitTime, "001",
Option.of(Arrays.asList(commitTimeBetweenPrevAndNew)), "000", numRecords, SparkRDDWriteClient::upsert, false, false,
numRecords, 200, 2);
client2.commit(newCommitTime, result2);
// Schedule and run clustering while previous writer for commit 003 is running
SparkRDDWriteClient client3 = getHoodieWriteClient(cfg3);
// schedule clustering
Option<String> clusterInstant = client3.scheduleTableService(Option.empty(), TableServiceType.CLUSTER);
assertTrue(clusterInstant.isPresent());
// Attempt to commit the inflight commit 003
try {
client1.commit("003", result1);
fail("Should have thrown a concurrent conflict exception");
} catch (Exception e) {
// Expected
}
client1.close();
client2.close();
client3.close();
}
@Test
public void testHoodieClientMultiWriterAutoCommitForConflict() throws Exception {
lockProperties.setProperty(LockConfiguration.LOCK_ACQUIRE_CLIENT_NUM_RETRIES_PROP_KEY, "100");
HoodieWriteConfig.Builder writeConfigBuilder = getConfigBuilder()
.withCleanConfig(HoodieCleanConfig.newBuilder()
.withFailedWritesCleaningPolicy(HoodieFailedWritesCleaningPolicy.LAZY)
.withAutoClean(false).build())
.withWriteConcurrencyMode(WriteConcurrencyMode.OPTIMISTIC_CONCURRENCY_CONTROL)
// Timeline-server-based markers are not used for multi-writer tests
.withMarkersType(MarkerType.DIRECT.name())
.withLockConfig(HoodieLockConfig.newBuilder().withLockProvider(InProcessLockProvider.class)
.build()).withAutoCommit(true).withProperties(lockProperties);
HoodieWriteConfig cfg = writeConfigBuilder.build();
HoodieWriteConfig cfg2 = writeConfigBuilder.build();
// Create the first commit
createCommitWithInserts(cfg, getHoodieWriteClient(cfg), "000", "001", 5000, false);
// Start another inflight commit
String newCommitTime1 = "003";
String newCommitTime2 = "004";
SparkRDDWriteClient client1 = getHoodieWriteClient(cfg);
SparkRDDWriteClient client2 = getHoodieWriteClient(cfg2);
List<HoodieRecord> updates1 = dataGen.generateUpdates(newCommitTime1, 5000);
List<HoodieRecord> updates2 = dataGen.generateUpdates(newCommitTime2, 5000);
JavaRDD<HoodieRecord> writeRecords1 = jsc.parallelize(updates1, 4);
JavaRDD<HoodieRecord> writeRecords2 = jsc.parallelize(updates2, 4);
runConcurrentAndAssert(writeRecords1, writeRecords2, client1, client2, SparkRDDWriteClient::upsert, true);
client1.close();
client2.close();
}
private void runConcurrentAndAssert(JavaRDD<HoodieRecord> writeRecords1, JavaRDD<HoodieRecord> writeRecords2,
SparkRDDWriteClient client1, SparkRDDWriteClient client2,
Function3<JavaRDD<WriteStatus>, SparkRDDWriteClient, JavaRDD<HoodieRecord>, String> writeFn,
boolean assertForConflict) throws ExecutionException, InterruptedException {
CountDownLatch runCountDownLatch = new CountDownLatch(2);
final ExecutorService executors = Executors.newFixedThreadPool(2);
String newCommitTime1 = "003";
String newCommitTime2 = "004";
AtomicBoolean client1Succeeded = new AtomicBoolean(true);
AtomicBoolean client2Succeeded = new AtomicBoolean(true);
Future future1 = executors.submit(() -> {
try {
ingestBatch(writeFn, client1, newCommitTime1, writeRecords1, runCountDownLatch);
} catch (IOException e) {
LOG.error("IOException thrown " + e.getMessage());
} catch (InterruptedException e) {
LOG.error("Interrupted Exception thrown " + e.getMessage());
} catch (Exception e) {
client1Succeeded.set(false);
}
}
);
Future future2 = executors.submit(() -> {
try {
ingestBatch(writeFn, client2, newCommitTime2, writeRecords2, runCountDownLatch);
} catch (IOException e) {
LOG.error("IOException thrown " + e.getMessage());
} catch (InterruptedException e) {
LOG.error("Interrupted Exception thrown " + e.getMessage());
} catch (Exception e) {
client2Succeeded.set(false);
}
}
);
future1.get();
future2.get();
if (assertForConflict) {
assertFalse(client1Succeeded.get() && client2Succeeded.get());
assertTrue(client1Succeeded.get() || client2Succeeded.get());
} else {
assertTrue(client2Succeeded.get() && client1Succeeded.get());
}
}
@Test
public void testHoodieClientMultiWriterAutoCommitNonConflict() throws Exception {
Properties properties = new Properties();
properties.setProperty(FILESYSTEM_LOCK_PATH_PROP_KEY, basePath + "/.hoodie/.locks");
properties.setProperty(LockConfiguration.LOCK_ACQUIRE_WAIT_TIMEOUT_MS_PROP_KEY, "3000");
properties.setProperty(LockConfiguration.LOCK_ACQUIRE_CLIENT_NUM_RETRIES_PROP_KEY, "100");
HoodieWriteConfig.Builder writeConfigBuilder = getConfigBuilder()
.withCleanConfig(HoodieCleanConfig.newBuilder()
.withFailedWritesCleaningPolicy(HoodieFailedWritesCleaningPolicy.LAZY)
.withAutoClean(false).build())
.withWriteConcurrencyMode(WriteConcurrencyMode.OPTIMISTIC_CONCURRENCY_CONTROL)
// Timeline-server-based markers are not used for multi-writer tests
.withMarkersType(MarkerType.DIRECT.name())
.withLockConfig(HoodieLockConfig.newBuilder().withLockProvider(InProcessLockProvider.class)
.build()).withAutoCommit(true).withProperties(properties);
HoodieWriteConfig cfg = writeConfigBuilder.build();
HoodieWriteConfig cfg2 = writeConfigBuilder.build();
// Create the first commit
createCommitWithInserts(cfg, getHoodieWriteClient(cfg), "000", "001", 200, false);
// Start another inflight commit
String newCommitTime1 = "003";
String newCommitTime2 = "004";
SparkRDDWriteClient client1 = getHoodieWriteClient(cfg);
SparkRDDWriteClient client2 = getHoodieWriteClient(cfg2);
List<HoodieRecord> updates1 = dataGen.generateInserts(newCommitTime1, 200);
List<HoodieRecord> updates2 = dataGen.generateInserts(newCommitTime2, 200);
JavaRDD<HoodieRecord> writeRecords1 = jsc.parallelize(updates1, 1);
JavaRDD<HoodieRecord> writeRecords2 = jsc.parallelize(updates2, 1);
runConcurrentAndAssert(writeRecords1, writeRecords2, client1, client2, SparkRDDWriteClient::bulkInsert, false);
client1.close();
client2.close();
}
private void ingestBatch(Function3<JavaRDD<WriteStatus>, SparkRDDWriteClient, JavaRDD<HoodieRecord>, String> writeFn,
SparkRDDWriteClient writeClient, String commitTime, JavaRDD<HoodieRecord> records,
CountDownLatch countDownLatch) throws IOException, InterruptedException {
writeClient.startCommitWithTime(commitTime);
countDownLatch.countDown();
countDownLatch.await();
JavaRDD<WriteStatus> statusJavaRDD = writeFn.apply(writeClient, records, commitTime);
statusJavaRDD.collect();
}
private void createCommitWithInsertsForPartition(HoodieWriteConfig cfg, SparkRDDWriteClient client,
String prevCommitTime, String newCommitTime, int numRecords,
String partition) throws Exception {
JavaRDD<WriteStatus> result = insertBatch(cfg, client, newCommitTime, prevCommitTime, numRecords, SparkRDDWriteClient::insert,
false, false, numRecords, numRecords, 1, Option.of(partition));
assertTrue(client.commit(newCommitTime, result), "Commit should succeed");
}
private JavaRDD<WriteStatus> createCommitWithInserts(HoodieWriteConfig cfg, SparkRDDWriteClient client,
String prevCommitTime, String newCommitTime, int numRecords,
boolean doCommit) throws Exception {
// Finish first base commit
JavaRDD<WriteStatus> result = insertFirstBatch(cfg, client, newCommitTime, prevCommitTime, numRecords, SparkRDDWriteClient::bulkInsert,
false, false, numRecords);
if (doCommit) {
assertTrue(client.commit(newCommitTime, result), "Commit should succeed");
}
return result;
}
private void createCommitWithUpserts(HoodieWriteConfig cfg, SparkRDDWriteClient client, String prevCommit,
String commitTimeBetweenPrevAndNew, String newCommitTime, int numRecords)
throws Exception {
JavaRDD<WriteStatus> result = updateBatch(cfg, client, newCommitTime, prevCommit,
Option.of(Arrays.asList(commitTimeBetweenPrevAndNew)), "000", numRecords, SparkRDDWriteClient::upsert, false, false,
numRecords, 200, 2);
client.commit(newCommitTime, result);
}
/**
* Start the commit for an update operation with given number of records
*
* @param writeConfig - Write config
* @param writeClient - Write client for starting the commit
* @param newCommitTime - Commit time for the update
* @param numRecords - Number of records to update
* @return RDD of write status from the update
* @throws Exception
*/
private JavaRDD<WriteStatus> startCommitForUpdate(HoodieWriteConfig writeConfig, SparkRDDWriteClient writeClient,
String newCommitTime, int numRecords) throws Exception {
// Start the new commit
writeClient.startCommitWithTime(newCommitTime);
// Prepare update records
final Function2<List<HoodieRecord>, String, Integer> recordGenFunction =
generateWrapRecordsFn(false, writeConfig, dataGen::generateUniqueUpdates);
final List<HoodieRecord> records = recordGenFunction.apply(newCommitTime, numRecords);
final JavaRDD<HoodieRecord> writeRecords = jsc.parallelize(records, 1);
// Write updates
Function3<JavaRDD<WriteStatus>, SparkRDDWriteClient, JavaRDD<HoodieRecord>, String> writeFn = SparkRDDWriteClient::upsert;
JavaRDD<WriteStatus> result = writeFn.apply(writeClient, writeRecords, newCommitTime);
List<WriteStatus> statuses = result.collect();
assertNoWriteErrors(statuses);
return result;
}
public static Stream<Arguments> configParams() {
Object[][] data =
new Object[][] {
{"COPY_ON_WRITE", MarkerType.TIMELINE_SERVER_BASED.name(), AsyncTimelineServerBasedDetectionStrategy.class.getName()},
{"MERGE_ON_READ", MarkerType.TIMELINE_SERVER_BASED.name(), AsyncTimelineServerBasedDetectionStrategy.class.getName()},
{"MERGE_ON_READ", MarkerType.DIRECT.name(), SimpleDirectMarkerBasedDetectionStrategy.class.getName()},
{"COPY_ON_WRITE", MarkerType.DIRECT.name(), SimpleDirectMarkerBasedDetectionStrategy.class.getName()},
{"COPY_ON_WRITE", MarkerType.DIRECT.name(), SimpleTransactionDirectMarkerBasedDetectionStrategy.class.getName()}
};
return Stream.of(data).map(Arguments::of);
}
private HoodieWriteConfig buildWriteConfigForEarlyConflictDetect(String markerType, Properties properties,
Class lockProvider, String earlyConflictDetectionStrategy) {
if (markerType.equalsIgnoreCase(MarkerType.DIRECT.name())) {
return getConfigBuilder()
.withHeartbeatIntervalInMs(60 * 1000)
.withFileSystemViewConfig(FileSystemViewStorageConfig.newBuilder()
.withStorageType(FileSystemViewStorageType.MEMORY)
.withSecondaryStorageType(FileSystemViewStorageType.MEMORY).build())
.withCleanConfig(HoodieCleanConfig.newBuilder()
.withFailedWritesCleaningPolicy(HoodieFailedWritesCleaningPolicy.LAZY)
.withAutoClean(false).build())
.withArchivalConfig(HoodieArchivalConfig.newBuilder()
.withAutoArchive(false).build())
.withWriteConcurrencyMode(WriteConcurrencyMode.OPTIMISTIC_CONCURRENCY_CONTROL)
.withMarkersType(MarkerType.DIRECT.name())
.withEarlyConflictDetectionEnable(true)
.withEarlyConflictDetectionStrategy(earlyConflictDetectionStrategy)
.withAsyncConflictDetectorInitialDelayMs(0)
.withAsyncConflictDetectorPeriodMs(100)
.withLockConfig(HoodieLockConfig.newBuilder().withLockProvider(lockProvider).build())
.withAutoCommit(false).withProperties(properties).build();
} else {
return getConfigBuilder()
.withStorageConfig(HoodieStorageConfig.newBuilder().parquetMaxFileSize(20 * 1024).build())
.withHeartbeatIntervalInMs(60 * 1000)
.withFileSystemViewConfig(FileSystemViewStorageConfig.newBuilder()
.withStorageType(FileSystemViewStorageType.MEMORY)
.withSecondaryStorageType(FileSystemViewStorageType.MEMORY).build())
.withCleanConfig(HoodieCleanConfig.newBuilder()
.withFailedWritesCleaningPolicy(HoodieFailedWritesCleaningPolicy.LAZY)
.withAutoClean(false).build())
.withArchivalConfig(HoodieArchivalConfig.newBuilder()
.withAutoArchive(false).build())
.withWriteConcurrencyMode(WriteConcurrencyMode.OPTIMISTIC_CONCURRENCY_CONTROL)
.withMarkersType(MarkerType.TIMELINE_SERVER_BASED.name())
// Set the batch processing interval for marker requests to be larger than
// the running interval of the async conflict detector so that the conflict can
// be detected before the marker requests are processed at the timeline server
// in the test.
.withMarkersTimelineServerBasedBatchIntervalMs(1000)
.withLockConfig(HoodieLockConfig.newBuilder().withLockProvider(lockProvider).build())
.withEarlyConflictDetectionEnable(true)
.withEarlyConflictDetectionStrategy(earlyConflictDetectionStrategy)
.withAsyncConflictDetectorInitialDelayMs(0)
.withAsyncConflictDetectorPeriodMs(100)
.withAutoCommit(false).withProperties(properties).build();
}
}
}