blob: ee2f50cb20c48193bbac43630d2a9d6b9b12c84d [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hudi.sink;
import org.apache.hudi.client.WriteStatus;
import org.apache.hudi.common.fs.FSUtils;
import org.apache.hudi.common.model.HoodieFailedWritesCleaningPolicy;
import org.apache.hudi.common.model.HoodieWriteStat;
import org.apache.hudi.common.model.WriteConcurrencyMode;
import org.apache.hudi.common.table.HoodieTableMetaClient;
import org.apache.hudi.common.table.timeline.HoodieActiveTimeline;
import org.apache.hudi.common.table.timeline.HoodieInstant;
import org.apache.hudi.common.table.timeline.HoodieTimeline;
import org.apache.hudi.config.HoodieCleanConfig;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.configuration.FlinkOptions;
import org.apache.hudi.configuration.HadoopConfigurations;
import org.apache.hudi.metadata.HoodieTableMetadata;
import org.apache.hudi.sink.event.WriteMetadataEvent;
import org.apache.hudi.sink.utils.MockCoordinatorExecutor;
import org.apache.hudi.sink.utils.NonThrownExecutor;
import org.apache.hudi.util.StreamerUtil;
import org.apache.hudi.utils.TestConfigurations;
import org.apache.hudi.utils.TestUtils;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.runtime.jobgraph.OperatorID;
import org.apache.flink.runtime.operators.coordination.MockOperatorCoordinatorContext;
import org.apache.flink.runtime.operators.coordination.OperatorCoordinator;
import org.apache.flink.runtime.operators.coordination.OperatorEvent;
import org.apache.flink.util.FileUtils;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.io.TempDir;
import org.mockito.Mockito;
import org.slf4j.Logger;
import java.io.File;
import java.io.IOException;
import java.util.Collections;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
import static org.hamcrest.CoreMatchers.instanceOf;
import static org.hamcrest.CoreMatchers.is;
import static org.hamcrest.CoreMatchers.startsWith;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.junit.jupiter.api.Assertions.assertDoesNotThrow;
import static org.junit.jupiter.api.Assertions.assertNotEquals;
import static org.junit.jupiter.api.Assertions.assertNull;
import static org.junit.jupiter.api.Assertions.assertTrue;
/**
* Test cases for StreamingSinkOperatorCoordinator.
*/
public class TestStreamWriteOperatorCoordinator {
private StreamWriteOperatorCoordinator coordinator;
@TempDir
File tempFile;
@BeforeEach
public void before() throws Exception {
OperatorCoordinator.Context context = new MockOperatorCoordinatorContext(new OperatorID(), 2);
coordinator = new StreamWriteOperatorCoordinator(
TestConfigurations.getDefaultConf(tempFile.getAbsolutePath()), context);
coordinator.start();
coordinator.setExecutor(new MockCoordinatorExecutor(context));
coordinator.handleEventFromOperator(0, WriteMetadataEvent.emptyBootstrap(0));
coordinator.handleEventFromOperator(1, WriteMetadataEvent.emptyBootstrap(1));
}
@AfterEach
public void after() throws Exception {
coordinator.close();
}
@Test
void testInstantState() {
String instant = coordinator.getInstant();
assertNotEquals("", instant);
OperatorEvent event0 = createOperatorEvent(0, instant, "par1", true, 0.1);
OperatorEvent event1 = createOperatorEvent(1, instant, "par2", false, 0.2);
coordinator.handleEventFromOperator(0, event0);
coordinator.handleEventFromOperator(1, event1);
coordinator.notifyCheckpointComplete(1);
String inflight = TestUtils.getLastPendingInstant(tempFile.getAbsolutePath());
String lastCompleted = TestUtils.getLastCompleteInstant(tempFile.getAbsolutePath());
assertThat("Instant should be complete", lastCompleted, is(instant));
assertNotEquals("", inflight, "Should start a new instant");
assertNotEquals(instant, inflight, "Should start a new instant");
}
@Test
public void testTableInitialized() throws IOException {
final org.apache.hadoop.conf.Configuration hadoopConf = HadoopConfigurations.getHadoopConf(new Configuration());
String basePath = tempFile.getAbsolutePath();
try (FileSystem fs = FSUtils.getFs(basePath, hadoopConf)) {
assertTrue(fs.exists(new Path(basePath, HoodieTableMetaClient.METAFOLDER_NAME)));
}
}
@Test
public void testCheckpointAndRestore() throws Exception {
CompletableFuture<byte[]> future = new CompletableFuture<>();
coordinator.checkpointCoordinator(1, future);
coordinator.resetToCheckpoint(1, future.get());
}
@Test
public void testReceiveInvalidEvent() {
CompletableFuture<byte[]> future = new CompletableFuture<>();
coordinator.checkpointCoordinator(1, future);
OperatorEvent event = WriteMetadataEvent.builder()
.taskID(0)
.instantTime("abc")
.writeStatus(Collections.emptyList())
.build();
assertError(() -> coordinator.handleEventFromOperator(0, event),
"Receive an unexpected event for instant abc from task 0");
}
@Test
public void testCheckpointCompleteWithPartialEvents() {
final CompletableFuture<byte[]> future = new CompletableFuture<>();
coordinator.checkpointCoordinator(1, future);
String instant = coordinator.getInstant();
OperatorEvent event = WriteMetadataEvent.builder()
.taskID(0)
.instantTime(instant)
.writeStatus(Collections.emptyList())
.build();
coordinator.handleEventFromOperator(0, event);
assertDoesNotThrow(() -> coordinator.notifyCheckpointComplete(1),
"Returns early for empty write results");
String lastCompleted = TestUtils.getLastCompleteInstant(tempFile.getAbsolutePath());
assertNull(lastCompleted, "Returns early for empty write results");
assertNull(coordinator.getEventBuffer()[0]);
OperatorEvent event1 = createOperatorEvent(1, instant, "par2", false, 0.2);
coordinator.handleEventFromOperator(1, event1);
assertDoesNotThrow(() -> coordinator.notifyCheckpointComplete(2),
"Commits the instant with partial events anyway");
lastCompleted = TestUtils.getLastCompleteInstant(tempFile.getAbsolutePath());
assertThat("Commits the instant with partial events anyway", lastCompleted, is(instant));
}
@Test
public void testRecommitWithPartialUncommittedEvents() {
final CompletableFuture<byte[]> future = new CompletableFuture<>();
coordinator.checkpointCoordinator(1, future);
String instant = coordinator.getInstant();
String lastCompleted = TestUtils.getLastCompleteInstant(tempFile.getAbsolutePath());
assertNull(lastCompleted, "Returns early for empty write results");
WriteMetadataEvent event1 = createOperatorEvent(0, instant, "par1", false, 0.2);
event1.setBootstrap(true);
WriteMetadataEvent event2 = WriteMetadataEvent.emptyBootstrap(1);
coordinator.handleEventFromOperator(0, event1);
coordinator.handleEventFromOperator(1, event2);
lastCompleted = TestUtils.getLastCompleteInstant(tempFile.getAbsolutePath());
assertThat("Recommits the instant with partial uncommitted events", lastCompleted, is(instant));
}
@Test
public void testRecommitWithLazyFailedWritesCleanPolicy() {
coordinator.getWriteClient().getConfig().setValue(HoodieCleanConfig.FAILED_WRITES_CLEANER_POLICY, HoodieFailedWritesCleaningPolicy.LAZY.name());
assertTrue(coordinator.getWriteClient().getConfig().getFailedWritesCleanPolicy().isLazy());
final CompletableFuture<byte[]> future = new CompletableFuture<>();
coordinator.checkpointCoordinator(1, future);
String instant = coordinator.getInstant();
WriteMetadataEvent event1 = createOperatorEvent(0, instant, "par1", false, 0.2);
event1.setBootstrap(true);
WriteMetadataEvent event2 = WriteMetadataEvent.emptyBootstrap(1);
coordinator.handleEventFromOperator(0, event1);
coordinator.handleEventFromOperator(1, event2);
assertThat("Recommits the instant with lazy failed writes clean policy", TestUtils.getLastCompleteInstant(tempFile.getAbsolutePath()), is(instant));
}
@Test
public void testHiveSyncInvoked() throws Exception {
// reset
reset();
// override the default configuration
Configuration conf = TestConfigurations.getDefaultConf(tempFile.getAbsolutePath());
conf.setBoolean(FlinkOptions.HIVE_SYNC_ENABLED, true);
OperatorCoordinator.Context context = new MockOperatorCoordinatorContext(new OperatorID(), 1);
coordinator = new StreamWriteOperatorCoordinator(conf, context);
coordinator.start();
coordinator.setExecutor(new MockCoordinatorExecutor(context));
final WriteMetadataEvent event0 = WriteMetadataEvent.emptyBootstrap(0);
coordinator.handleEventFromOperator(0, event0);
String instant = mockWriteWithMetadata();
assertNotEquals("", instant);
// never throw for hive synchronization now
assertDoesNotThrow(() -> coordinator.notifyCheckpointComplete(1));
}
@Test
void testSyncMetadataTable() throws Exception {
// reset
reset();
// override the default configuration
Configuration conf = TestConfigurations.getDefaultConf(tempFile.getAbsolutePath());
conf.setBoolean(FlinkOptions.METADATA_ENABLED, true);
conf.setInteger(FlinkOptions.METADATA_COMPACTION_DELTA_COMMITS, 5);
OperatorCoordinator.Context context = new MockOperatorCoordinatorContext(new OperatorID(), 1);
coordinator = new StreamWriteOperatorCoordinator(conf, context);
coordinator.start();
coordinator.setExecutor(new MockCoordinatorExecutor(context));
final WriteMetadataEvent event0 = WriteMetadataEvent.emptyBootstrap(0);
coordinator.handleEventFromOperator(0, event0);
String instant = coordinator.getInstant();
assertNotEquals("", instant);
final String metadataTableBasePath = HoodieTableMetadata.getMetadataTableBasePath(tempFile.getAbsolutePath());
HoodieTableMetaClient metadataTableMetaClient = StreamerUtil.createMetaClient(metadataTableBasePath, HadoopConfigurations.getHadoopConf(conf));
HoodieTimeline completedTimeline = metadataTableMetaClient.getActiveTimeline().filterCompletedInstants();
assertThat("One instant need to sync to metadata table", completedTimeline.countInstants(), is(1));
assertThat(completedTimeline.lastInstant().get().getTimestamp(), startsWith(HoodieTableMetadata.SOLO_COMMIT_TIMESTAMP));
// test metadata table compaction
// write another 4 commits
for (int i = 1; i < 5; i++) {
instant = mockWriteWithMetadata();
metadataTableMetaClient.reloadActiveTimeline();
completedTimeline = metadataTableMetaClient.getActiveTimeline().filterCompletedInstants();
assertThat("One instant need to sync to metadata table", completedTimeline.countInstants(), is(i + 1));
assertThat(completedTimeline.lastInstant().get().getTimestamp(), is(instant));
}
// the 5th commit triggers the compaction
mockWriteWithMetadata();
metadataTableMetaClient.reloadActiveTimeline();
completedTimeline = metadataTableMetaClient.reloadActiveTimeline().filterCompletedAndCompactionInstants();
assertThat("One instant need to sync to metadata table", completedTimeline.countInstants(), is(7));
assertThat(completedTimeline.nthFromLastInstant(1).get().getTimestamp(), is(instant + "001"));
assertThat(completedTimeline.nthFromLastInstant(1).get().getAction(), is(HoodieTimeline.COMMIT_ACTION));
// write another 2 commits
for (int i = 7; i < 8; i++) {
instant = mockWriteWithMetadata();
metadataTableMetaClient.reloadActiveTimeline();
completedTimeline = metadataTableMetaClient.getActiveTimeline().filterCompletedInstants();
assertThat("One instant need to sync to metadata table", completedTimeline.countInstants(), is(i + 1));
assertThat(completedTimeline.lastInstant().get().getTimestamp(), is(instant));
}
// write another commit to trigger clean
instant = mockWriteWithMetadata();
metadataTableMetaClient.reloadActiveTimeline();
completedTimeline = metadataTableMetaClient.getActiveTimeline().filterCompletedAndCompactionInstants();
assertThat("One instant need to sync to metadata table", completedTimeline.countInstants(), is(9));
// write another commit
mockWriteWithMetadata();
// write another commit
instant = mockWriteWithMetadata();
// write another commit to trigger compaction
mockWriteWithMetadata();
metadataTableMetaClient.reloadActiveTimeline();
completedTimeline = metadataTableMetaClient.getActiveTimeline().filterCompletedAndCompactionInstants();
assertThat("One instant need to sync to metadata table", completedTimeline.countInstants(), is(13));
assertThat(completedTimeline.nthFromLastInstant(1).get().getTimestamp(), is(instant + "001"));
assertThat(completedTimeline.nthFromLastInstant(1).get().getAction(), is(HoodieTimeline.COMMIT_ACTION));
}
@Test
void testSyncMetadataTableWithLogCompaction() throws Exception {
// reset
reset();
// override the default configuration
Configuration conf = TestConfigurations.getDefaultConf(tempFile.getAbsolutePath());
conf.setBoolean(FlinkOptions.METADATA_ENABLED, true);
conf.setInteger(FlinkOptions.METADATA_COMPACTION_DELTA_COMMITS, 20);
conf.setString("hoodie.metadata.log.compaction.enable", "true");
OperatorCoordinator.Context context = new MockOperatorCoordinatorContext(new OperatorID(), 1);
coordinator = new StreamWriteOperatorCoordinator(conf, context);
coordinator.start();
coordinator.setExecutor(new MockCoordinatorExecutor(context));
final WriteMetadataEvent event0 = WriteMetadataEvent.emptyBootstrap(0);
coordinator.handleEventFromOperator(0, event0);
String instant = coordinator.getInstant();
assertNotEquals("", instant);
final String metadataTableBasePath = HoodieTableMetadata.getMetadataTableBasePath(tempFile.getAbsolutePath());
HoodieTableMetaClient metadataTableMetaClient = StreamerUtil.createMetaClient(metadataTableBasePath, HadoopConfigurations.getHadoopConf(conf));
HoodieTimeline completedTimeline = metadataTableMetaClient.getActiveTimeline().filterCompletedInstants();
assertThat("One instant need to sync to metadata table", completedTimeline.countInstants(), is(1));
assertThat(completedTimeline.lastInstant().get().getTimestamp(), startsWith(HoodieTableMetadata.SOLO_COMMIT_TIMESTAMP));
// test metadata table log compaction
// write another 5 commits
for (int i = 1; i < 6; i++) {
instant = mockWriteWithMetadata();
metadataTableMetaClient.reloadActiveTimeline();
completedTimeline = metadataTableMetaClient.getActiveTimeline().filterCompletedInstants();
assertThat("One instant need to sync to metadata table", completedTimeline.countInstants(), is(i + 1));
assertThat(completedTimeline.lastInstant().get().getTimestamp(), is(instant));
}
// the 6th commit triggers the log compaction
mockWriteWithMetadata();
metadataTableMetaClient.reloadActiveTimeline();
completedTimeline = metadataTableMetaClient.reloadActiveTimeline().filterCompletedAndCompactionInstants();
assertThat("One instant need to sync to metadata table", completedTimeline.countInstants(), is(8));
assertThat(completedTimeline.nthFromLastInstant(1).get().getTimestamp(), is(instant + "005"));
// log compaction is another delta commit
assertThat(completedTimeline.nthFromLastInstant(1).get().getAction(), is(HoodieTimeline.DELTA_COMMIT_ACTION));
}
@Test
void testSyncMetadataTableWithRollback() throws Exception {
// reset
reset();
// override the default configuration
Configuration conf = TestConfigurations.getDefaultConf(tempFile.getAbsolutePath());
conf.setBoolean(FlinkOptions.METADATA_ENABLED, true);
OperatorCoordinator.Context context = new MockOperatorCoordinatorContext(new OperatorID(), 1);
coordinator = new StreamWriteOperatorCoordinator(conf, context);
coordinator.start();
coordinator.setExecutor(new MockCoordinatorExecutor(context));
final WriteMetadataEvent event0 = WriteMetadataEvent.emptyBootstrap(0);
coordinator.handleEventFromOperator(0, event0);
String instant = coordinator.getInstant();
assertNotEquals("", instant);
final String metadataTableBasePath = HoodieTableMetadata.getMetadataTableBasePath(tempFile.getAbsolutePath());
HoodieTableMetaClient metadataTableMetaClient = StreamerUtil.createMetaClient(metadataTableBasePath, HadoopConfigurations.getHadoopConf(conf));
HoodieTimeline completedTimeline = metadataTableMetaClient.getActiveTimeline().filterCompletedInstants();
assertThat("One instant need to sync to metadata table", completedTimeline.countInstants(), is(1));
assertThat(completedTimeline.lastInstant().get().getTimestamp(), startsWith(HoodieTableMetadata.SOLO_COMMIT_TIMESTAMP));
// writes a normal commit
mockWriteWithMetadata();
instant = coordinator.getInstant();
// creates an inflight commit on the metadata timeline
metadataTableMetaClient.getActiveTimeline()
.createNewInstant(new HoodieInstant(HoodieInstant.State.REQUESTED, HoodieActiveTimeline.DELTA_COMMIT_ACTION, instant));
metadataTableMetaClient.getActiveTimeline().transitionRequestedToInflight(HoodieActiveTimeline.DELTA_COMMIT_ACTION, instant);
metadataTableMetaClient.reloadActiveTimeline();
// write another commit with existing instant on the metadata timeline
mockWriteWithMetadata();
metadataTableMetaClient.reloadActiveTimeline();
completedTimeline = metadataTableMetaClient.getActiveTimeline().filterCompletedInstants();
assertThat("One instant need to sync to metadata table", completedTimeline.countInstants(), is(4));
assertThat(completedTimeline.nthFromLastInstant(1).get().getTimestamp(), is(instant));
assertThat("The pending instant should be rolled back first",
completedTimeline.lastInstant().get().getAction(), is(HoodieTimeline.ROLLBACK_ACTION));
}
@Test
public void testEndInputIsTheLastEvent() throws Exception {
Configuration conf = TestConfigurations.getDefaultConf(tempFile.getAbsolutePath());
MockOperatorCoordinatorContext context = new MockOperatorCoordinatorContext(new OperatorID(), 1);
Logger logger = Mockito.mock(Logger.class); // avoid too many logs by executor
NonThrownExecutor executor = NonThrownExecutor.builder(logger).waitForTasksFinish(true).build();
try (StreamWriteOperatorCoordinator coordinator = new StreamWriteOperatorCoordinator(conf, context)) {
coordinator.start();
coordinator.setExecutor(executor);
coordinator.handleEventFromOperator(0, WriteMetadataEvent.emptyBootstrap(0));
TimeUnit.SECONDS.sleep(5); // wait for handled bootstrap event
int eventCount = 20_000; // big enough to fill executor's queue
for (int i = 0; i < eventCount; i++) {
coordinator.handleEventFromOperator(0, createOperatorEvent(0, coordinator.getInstant(), "par1", true, 0.1));
}
WriteMetadataEvent endInput = WriteMetadataEvent.builder()
.taskID(0)
.instantTime(coordinator.getInstant())
.writeStatus(Collections.emptyList())
.endInput(true)
.build();
coordinator.handleEventFromOperator(0, endInput);
// wait for submitted events completed
executor.close();
// there should be no events after endInput
assertNull(coordinator.getEventBuffer()[0]);
}
}
@Test
void testLockForMetadataTable() throws Exception {
// reset
reset();
// override the default configuration
Configuration conf = TestConfigurations.getDefaultConf(tempFile.getAbsolutePath());
conf.setBoolean(FlinkOptions.METADATA_ENABLED, true);
conf.setString(HoodieWriteConfig.WRITE_CONCURRENCY_MODE.key(), WriteConcurrencyMode.OPTIMISTIC_CONCURRENCY_CONTROL.name());
conf.setInteger("hoodie.write.lock.client.num_retries", 1);
OperatorCoordinator.Context context = new MockOperatorCoordinatorContext(new OperatorID(), 1);
coordinator = new StreamWriteOperatorCoordinator(conf, context);
coordinator.start();
coordinator.setExecutor(new MockCoordinatorExecutor(context));
final WriteMetadataEvent event0 = WriteMetadataEvent.emptyBootstrap(0);
coordinator.handleEventFromOperator(0, event0);
String instant = coordinator.getInstant();
assertNotEquals("", instant);
final String metadataTableBasePath = HoodieTableMetadata.getMetadataTableBasePath(tempFile.getAbsolutePath());
HoodieTableMetaClient metadataTableMetaClient = StreamerUtil.createMetaClient(metadataTableBasePath, HadoopConfigurations.getHadoopConf(conf));
HoodieTimeline completedTimeline = metadataTableMetaClient.getActiveTimeline().filterCompletedInstants();
assertThat("One instant need to sync to metadata table", completedTimeline.countInstants(), is(1));
assertThat(completedTimeline.lastInstant().get().getTimestamp(), startsWith(HoodieTableMetadata.SOLO_COMMIT_TIMESTAMP));
instant = mockWriteWithMetadata();
metadataTableMetaClient.reloadActiveTimeline();
completedTimeline = metadataTableMetaClient.getActiveTimeline().filterCompletedInstants();
assertThat("One instant need to sync to metadata table", completedTimeline.countInstants(), is(2));
assertThat(completedTimeline.lastInstant().get().getTimestamp(), is(instant));
}
@Test
public void testCommitOnEmptyBatch() throws Exception {
Configuration conf = TestConfigurations.getDefaultConf(tempFile.getAbsolutePath());
conf.setBoolean(HoodieWriteConfig.ALLOW_EMPTY_COMMIT.key(), true);
MockOperatorCoordinatorContext context = new MockOperatorCoordinatorContext(new OperatorID(), 2);
NonThrownExecutor executor = new MockCoordinatorExecutor(context);
try (StreamWriteOperatorCoordinator coordinator = new StreamWriteOperatorCoordinator(conf, context)) {
coordinator.start();
coordinator.setExecutor(executor);
coordinator.handleEventFromOperator(0, WriteMetadataEvent.emptyBootstrap(0));
coordinator.handleEventFromOperator(1, WriteMetadataEvent.emptyBootstrap(1));
// Coordinator start the instant
String instant = coordinator.getInstant();
OperatorEvent event1 = WriteMetadataEvent.builder()
.taskID(0)
.instantTime(instant)
.writeStatus(Collections.emptyList())
.lastBatch(true)
.build();
OperatorEvent event2 = WriteMetadataEvent.builder()
.taskID(1)
.instantTime(instant)
.writeStatus(Collections.emptyList())
.lastBatch(true)
.build();
coordinator.handleEventFromOperator(0, event1);
coordinator.handleEventFromOperator(1, event2);
assertDoesNotThrow(() -> coordinator.notifyCheckpointComplete(1),
"Commit the instant");
String lastCompleted = TestUtils.getLastCompleteInstant(tempFile.getAbsolutePath());
assertThat("Commits the instant with empty batch anyway", lastCompleted, is(instant));
assertNull(coordinator.getEventBuffer()[0]);
}
}
// -------------------------------------------------------------------------
// Utilities
// -------------------------------------------------------------------------
private String mockWriteWithMetadata() {
final String instant = coordinator.getInstant();
OperatorEvent event = createOperatorEvent(0, instant, "par1", true, 0.1);
coordinator.handleEventFromOperator(0, event);
coordinator.notifyCheckpointComplete(0);
return instant;
}
private static WriteMetadataEvent createOperatorEvent(
int taskId,
String instant,
String partitionPath,
boolean trackSuccessRecords,
double failureFraction) {
final WriteStatus writeStatus = new WriteStatus(trackSuccessRecords, failureFraction);
writeStatus.setPartitionPath(partitionPath);
HoodieWriteStat writeStat = new HoodieWriteStat();
writeStat.setPartitionPath(partitionPath);
writeStat.setFileId("fileId123");
writeStat.setPath("path123");
writeStat.setFileSizeInBytes(123);
writeStat.setTotalWriteBytes(123);
writeStat.setNumWrites(1);
writeStatus.setStat(writeStat);
return WriteMetadataEvent.builder()
.taskID(taskId)
.instantTime(instant)
.writeStatus(Collections.singletonList(writeStatus))
.lastBatch(true)
.build();
}
private void reset() throws Exception {
FileUtils.cleanDirectory(tempFile);
}
private void assertError(Runnable runnable, String message) {
runnable.run();
// wait a little while for the task to finish
assertThat(coordinator.getContext(), instanceOf(MockOperatorCoordinatorContext.class));
MockOperatorCoordinatorContext context = (MockOperatorCoordinatorContext) coordinator.getContext();
assertTrue(context.isJobFailed(), message);
}
}