| /* |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| |
| package org.apache.hudi.sink; |
| |
| import org.apache.hudi.client.WriteStatus; |
| import org.apache.hudi.common.fs.FSUtils; |
| import org.apache.hudi.common.model.HoodieFailedWritesCleaningPolicy; |
| import org.apache.hudi.common.model.HoodieWriteStat; |
| import org.apache.hudi.common.model.WriteConcurrencyMode; |
| import org.apache.hudi.common.table.HoodieTableMetaClient; |
| import org.apache.hudi.common.table.timeline.HoodieActiveTimeline; |
| import org.apache.hudi.common.table.timeline.HoodieInstant; |
| import org.apache.hudi.common.table.timeline.HoodieTimeline; |
| import org.apache.hudi.config.HoodieCleanConfig; |
| import org.apache.hudi.config.HoodieWriteConfig; |
| import org.apache.hudi.configuration.FlinkOptions; |
| import org.apache.hudi.configuration.HadoopConfigurations; |
| import org.apache.hudi.metadata.HoodieTableMetadata; |
| import org.apache.hudi.sink.event.WriteMetadataEvent; |
| import org.apache.hudi.sink.utils.MockCoordinatorExecutor; |
| import org.apache.hudi.sink.utils.NonThrownExecutor; |
| import org.apache.hudi.util.StreamerUtil; |
| import org.apache.hudi.utils.TestConfigurations; |
| import org.apache.hudi.utils.TestUtils; |
| |
| import org.apache.flink.configuration.Configuration; |
| import org.apache.flink.runtime.jobgraph.OperatorID; |
| import org.apache.flink.runtime.operators.coordination.MockOperatorCoordinatorContext; |
| import org.apache.flink.runtime.operators.coordination.OperatorCoordinator; |
| import org.apache.flink.runtime.operators.coordination.OperatorEvent; |
| import org.apache.flink.util.FileUtils; |
| import org.apache.hadoop.fs.FileSystem; |
| import org.apache.hadoop.fs.Path; |
| import org.junit.jupiter.api.AfterEach; |
| import org.junit.jupiter.api.BeforeEach; |
| import org.junit.jupiter.api.Test; |
| import org.junit.jupiter.api.io.TempDir; |
| import org.mockito.Mockito; |
| import org.slf4j.Logger; |
| |
| import java.io.File; |
| import java.io.IOException; |
| import java.util.Collections; |
| import java.util.concurrent.CompletableFuture; |
| import java.util.concurrent.TimeUnit; |
| |
| import static org.hamcrest.CoreMatchers.instanceOf; |
| import static org.hamcrest.CoreMatchers.is; |
| import static org.hamcrest.CoreMatchers.startsWith; |
| import static org.hamcrest.MatcherAssert.assertThat; |
| import static org.junit.jupiter.api.Assertions.assertDoesNotThrow; |
| import static org.junit.jupiter.api.Assertions.assertNotEquals; |
| import static org.junit.jupiter.api.Assertions.assertNull; |
| import static org.junit.jupiter.api.Assertions.assertTrue; |
| |
| /** |
| * Test cases for StreamingSinkOperatorCoordinator. |
| */ |
| public class TestStreamWriteOperatorCoordinator { |
| private StreamWriteOperatorCoordinator coordinator; |
| |
| @TempDir |
| File tempFile; |
| |
| @BeforeEach |
| public void before() throws Exception { |
| OperatorCoordinator.Context context = new MockOperatorCoordinatorContext(new OperatorID(), 2); |
| coordinator = new StreamWriteOperatorCoordinator( |
| TestConfigurations.getDefaultConf(tempFile.getAbsolutePath()), context); |
| coordinator.start(); |
| coordinator.setExecutor(new MockCoordinatorExecutor(context)); |
| |
| coordinator.handleEventFromOperator(0, WriteMetadataEvent.emptyBootstrap(0)); |
| coordinator.handleEventFromOperator(1, WriteMetadataEvent.emptyBootstrap(1)); |
| } |
| |
| @AfterEach |
| public void after() throws Exception { |
| coordinator.close(); |
| } |
| |
| @Test |
| void testInstantState() { |
| String instant = coordinator.getInstant(); |
| assertNotEquals("", instant); |
| |
| OperatorEvent event0 = createOperatorEvent(0, instant, "par1", true, 0.1); |
| OperatorEvent event1 = createOperatorEvent(1, instant, "par2", false, 0.2); |
| coordinator.handleEventFromOperator(0, event0); |
| coordinator.handleEventFromOperator(1, event1); |
| |
| coordinator.notifyCheckpointComplete(1); |
| String inflight = TestUtils.getLastPendingInstant(tempFile.getAbsolutePath()); |
| String lastCompleted = TestUtils.getLastCompleteInstant(tempFile.getAbsolutePath()); |
| assertThat("Instant should be complete", lastCompleted, is(instant)); |
| assertNotEquals("", inflight, "Should start a new instant"); |
| assertNotEquals(instant, inflight, "Should start a new instant"); |
| } |
| |
| @Test |
| public void testTableInitialized() throws IOException { |
| final org.apache.hadoop.conf.Configuration hadoopConf = HadoopConfigurations.getHadoopConf(new Configuration()); |
| String basePath = tempFile.getAbsolutePath(); |
| try (FileSystem fs = FSUtils.getFs(basePath, hadoopConf)) { |
| assertTrue(fs.exists(new Path(basePath, HoodieTableMetaClient.METAFOLDER_NAME))); |
| } |
| } |
| |
| @Test |
| public void testCheckpointAndRestore() throws Exception { |
| CompletableFuture<byte[]> future = new CompletableFuture<>(); |
| coordinator.checkpointCoordinator(1, future); |
| coordinator.resetToCheckpoint(1, future.get()); |
| } |
| |
| @Test |
| public void testReceiveInvalidEvent() { |
| CompletableFuture<byte[]> future = new CompletableFuture<>(); |
| coordinator.checkpointCoordinator(1, future); |
| OperatorEvent event = WriteMetadataEvent.builder() |
| .taskID(0) |
| .instantTime("abc") |
| .writeStatus(Collections.emptyList()) |
| .build(); |
| |
| assertError(() -> coordinator.handleEventFromOperator(0, event), |
| "Receive an unexpected event for instant abc from task 0"); |
| } |
| |
| @Test |
| public void testCheckpointCompleteWithPartialEvents() { |
| final CompletableFuture<byte[]> future = new CompletableFuture<>(); |
| coordinator.checkpointCoordinator(1, future); |
| String instant = coordinator.getInstant(); |
| OperatorEvent event = WriteMetadataEvent.builder() |
| .taskID(0) |
| .instantTime(instant) |
| .writeStatus(Collections.emptyList()) |
| .build(); |
| coordinator.handleEventFromOperator(0, event); |
| |
| assertDoesNotThrow(() -> coordinator.notifyCheckpointComplete(1), |
| "Returns early for empty write results"); |
| String lastCompleted = TestUtils.getLastCompleteInstant(tempFile.getAbsolutePath()); |
| assertNull(lastCompleted, "Returns early for empty write results"); |
| assertNull(coordinator.getEventBuffer()[0]); |
| |
| OperatorEvent event1 = createOperatorEvent(1, instant, "par2", false, 0.2); |
| coordinator.handleEventFromOperator(1, event1); |
| assertDoesNotThrow(() -> coordinator.notifyCheckpointComplete(2), |
| "Commits the instant with partial events anyway"); |
| lastCompleted = TestUtils.getLastCompleteInstant(tempFile.getAbsolutePath()); |
| assertThat("Commits the instant with partial events anyway", lastCompleted, is(instant)); |
| } |
| |
| @Test |
| public void testRecommitWithPartialUncommittedEvents() { |
| final CompletableFuture<byte[]> future = new CompletableFuture<>(); |
| coordinator.checkpointCoordinator(1, future); |
| String instant = coordinator.getInstant(); |
| String lastCompleted = TestUtils.getLastCompleteInstant(tempFile.getAbsolutePath()); |
| assertNull(lastCompleted, "Returns early for empty write results"); |
| WriteMetadataEvent event1 = createOperatorEvent(0, instant, "par1", false, 0.2); |
| event1.setBootstrap(true); |
| WriteMetadataEvent event2 = WriteMetadataEvent.emptyBootstrap(1); |
| coordinator.handleEventFromOperator(0, event1); |
| coordinator.handleEventFromOperator(1, event2); |
| lastCompleted = TestUtils.getLastCompleteInstant(tempFile.getAbsolutePath()); |
| assertThat("Recommits the instant with partial uncommitted events", lastCompleted, is(instant)); |
| } |
| |
| @Test |
| public void testRecommitWithLazyFailedWritesCleanPolicy() { |
| coordinator.getWriteClient().getConfig().setValue(HoodieCleanConfig.FAILED_WRITES_CLEANER_POLICY, HoodieFailedWritesCleaningPolicy.LAZY.name()); |
| assertTrue(coordinator.getWriteClient().getConfig().getFailedWritesCleanPolicy().isLazy()); |
| final CompletableFuture<byte[]> future = new CompletableFuture<>(); |
| coordinator.checkpointCoordinator(1, future); |
| String instant = coordinator.getInstant(); |
| WriteMetadataEvent event1 = createOperatorEvent(0, instant, "par1", false, 0.2); |
| event1.setBootstrap(true); |
| WriteMetadataEvent event2 = WriteMetadataEvent.emptyBootstrap(1); |
| coordinator.handleEventFromOperator(0, event1); |
| coordinator.handleEventFromOperator(1, event2); |
| assertThat("Recommits the instant with lazy failed writes clean policy", TestUtils.getLastCompleteInstant(tempFile.getAbsolutePath()), is(instant)); |
| } |
| |
| @Test |
| public void testHiveSyncInvoked() throws Exception { |
| // reset |
| reset(); |
| // override the default configuration |
| Configuration conf = TestConfigurations.getDefaultConf(tempFile.getAbsolutePath()); |
| conf.setBoolean(FlinkOptions.HIVE_SYNC_ENABLED, true); |
| OperatorCoordinator.Context context = new MockOperatorCoordinatorContext(new OperatorID(), 1); |
| coordinator = new StreamWriteOperatorCoordinator(conf, context); |
| coordinator.start(); |
| coordinator.setExecutor(new MockCoordinatorExecutor(context)); |
| |
| final WriteMetadataEvent event0 = WriteMetadataEvent.emptyBootstrap(0); |
| |
| coordinator.handleEventFromOperator(0, event0); |
| |
| String instant = mockWriteWithMetadata(); |
| assertNotEquals("", instant); |
| |
| // never throw for hive synchronization now |
| assertDoesNotThrow(() -> coordinator.notifyCheckpointComplete(1)); |
| } |
| |
| @Test |
| void testSyncMetadataTable() throws Exception { |
| // reset |
| reset(); |
| // override the default configuration |
| Configuration conf = TestConfigurations.getDefaultConf(tempFile.getAbsolutePath()); |
| conf.setBoolean(FlinkOptions.METADATA_ENABLED, true); |
| conf.setInteger(FlinkOptions.METADATA_COMPACTION_DELTA_COMMITS, 5); |
| OperatorCoordinator.Context context = new MockOperatorCoordinatorContext(new OperatorID(), 1); |
| coordinator = new StreamWriteOperatorCoordinator(conf, context); |
| coordinator.start(); |
| coordinator.setExecutor(new MockCoordinatorExecutor(context)); |
| |
| final WriteMetadataEvent event0 = WriteMetadataEvent.emptyBootstrap(0); |
| |
| coordinator.handleEventFromOperator(0, event0); |
| |
| String instant = coordinator.getInstant(); |
| assertNotEquals("", instant); |
| |
| final String metadataTableBasePath = HoodieTableMetadata.getMetadataTableBasePath(tempFile.getAbsolutePath()); |
| HoodieTableMetaClient metadataTableMetaClient = StreamerUtil.createMetaClient(metadataTableBasePath, HadoopConfigurations.getHadoopConf(conf)); |
| HoodieTimeline completedTimeline = metadataTableMetaClient.getActiveTimeline().filterCompletedInstants(); |
| assertThat("One instant need to sync to metadata table", completedTimeline.countInstants(), is(1)); |
| assertThat(completedTimeline.lastInstant().get().getTimestamp(), startsWith(HoodieTableMetadata.SOLO_COMMIT_TIMESTAMP)); |
| |
| // test metadata table compaction |
| // write another 4 commits |
| for (int i = 1; i < 5; i++) { |
| instant = mockWriteWithMetadata(); |
| metadataTableMetaClient.reloadActiveTimeline(); |
| completedTimeline = metadataTableMetaClient.getActiveTimeline().filterCompletedInstants(); |
| assertThat("One instant need to sync to metadata table", completedTimeline.countInstants(), is(i + 1)); |
| assertThat(completedTimeline.lastInstant().get().getTimestamp(), is(instant)); |
| } |
| // the 5th commit triggers the compaction |
| mockWriteWithMetadata(); |
| metadataTableMetaClient.reloadActiveTimeline(); |
| completedTimeline = metadataTableMetaClient.reloadActiveTimeline().filterCompletedAndCompactionInstants(); |
| assertThat("One instant need to sync to metadata table", completedTimeline.countInstants(), is(7)); |
| assertThat(completedTimeline.nthFromLastInstant(1).get().getTimestamp(), is(instant + "001")); |
| assertThat(completedTimeline.nthFromLastInstant(1).get().getAction(), is(HoodieTimeline.COMMIT_ACTION)); |
| // write another 2 commits |
| for (int i = 7; i < 8; i++) { |
| instant = mockWriteWithMetadata(); |
| metadataTableMetaClient.reloadActiveTimeline(); |
| completedTimeline = metadataTableMetaClient.getActiveTimeline().filterCompletedInstants(); |
| assertThat("One instant need to sync to metadata table", completedTimeline.countInstants(), is(i + 1)); |
| assertThat(completedTimeline.lastInstant().get().getTimestamp(), is(instant)); |
| } |
| |
| // write another commit to trigger clean |
| instant = mockWriteWithMetadata(); |
| metadataTableMetaClient.reloadActiveTimeline(); |
| completedTimeline = metadataTableMetaClient.getActiveTimeline().filterCompletedAndCompactionInstants(); |
| assertThat("One instant need to sync to metadata table", completedTimeline.countInstants(), is(9)); |
| |
| // write another commit |
| mockWriteWithMetadata(); |
| |
| // write another commit |
| instant = mockWriteWithMetadata(); |
| // write another commit to trigger compaction |
| mockWriteWithMetadata(); |
| metadataTableMetaClient.reloadActiveTimeline(); |
| completedTimeline = metadataTableMetaClient.getActiveTimeline().filterCompletedAndCompactionInstants(); |
| assertThat("One instant need to sync to metadata table", completedTimeline.countInstants(), is(13)); |
| assertThat(completedTimeline.nthFromLastInstant(1).get().getTimestamp(), is(instant + "001")); |
| assertThat(completedTimeline.nthFromLastInstant(1).get().getAction(), is(HoodieTimeline.COMMIT_ACTION)); |
| } |
| |
| @Test |
| void testSyncMetadataTableWithLogCompaction() throws Exception { |
| // reset |
| reset(); |
| // override the default configuration |
| Configuration conf = TestConfigurations.getDefaultConf(tempFile.getAbsolutePath()); |
| conf.setBoolean(FlinkOptions.METADATA_ENABLED, true); |
| conf.setInteger(FlinkOptions.METADATA_COMPACTION_DELTA_COMMITS, 20); |
| conf.setString("hoodie.metadata.log.compaction.enable", "true"); |
| OperatorCoordinator.Context context = new MockOperatorCoordinatorContext(new OperatorID(), 1); |
| coordinator = new StreamWriteOperatorCoordinator(conf, context); |
| coordinator.start(); |
| coordinator.setExecutor(new MockCoordinatorExecutor(context)); |
| |
| final WriteMetadataEvent event0 = WriteMetadataEvent.emptyBootstrap(0); |
| |
| coordinator.handleEventFromOperator(0, event0); |
| |
| String instant = coordinator.getInstant(); |
| assertNotEquals("", instant); |
| |
| final String metadataTableBasePath = HoodieTableMetadata.getMetadataTableBasePath(tempFile.getAbsolutePath()); |
| HoodieTableMetaClient metadataTableMetaClient = StreamerUtil.createMetaClient(metadataTableBasePath, HadoopConfigurations.getHadoopConf(conf)); |
| HoodieTimeline completedTimeline = metadataTableMetaClient.getActiveTimeline().filterCompletedInstants(); |
| assertThat("One instant need to sync to metadata table", completedTimeline.countInstants(), is(1)); |
| assertThat(completedTimeline.lastInstant().get().getTimestamp(), startsWith(HoodieTableMetadata.SOLO_COMMIT_TIMESTAMP)); |
| |
| // test metadata table log compaction |
| // write another 5 commits |
| for (int i = 1; i < 6; i++) { |
| instant = mockWriteWithMetadata(); |
| metadataTableMetaClient.reloadActiveTimeline(); |
| completedTimeline = metadataTableMetaClient.getActiveTimeline().filterCompletedInstants(); |
| assertThat("One instant need to sync to metadata table", completedTimeline.countInstants(), is(i + 1)); |
| assertThat(completedTimeline.lastInstant().get().getTimestamp(), is(instant)); |
| } |
| // the 6th commit triggers the log compaction |
| mockWriteWithMetadata(); |
| metadataTableMetaClient.reloadActiveTimeline(); |
| completedTimeline = metadataTableMetaClient.reloadActiveTimeline().filterCompletedAndCompactionInstants(); |
| assertThat("One instant need to sync to metadata table", completedTimeline.countInstants(), is(8)); |
| assertThat(completedTimeline.nthFromLastInstant(1).get().getTimestamp(), is(instant + "005")); |
| // log compaction is another delta commit |
| assertThat(completedTimeline.nthFromLastInstant(1).get().getAction(), is(HoodieTimeline.DELTA_COMMIT_ACTION)); |
| } |
| |
| @Test |
| void testSyncMetadataTableWithRollback() throws Exception { |
| // reset |
| reset(); |
| // override the default configuration |
| Configuration conf = TestConfigurations.getDefaultConf(tempFile.getAbsolutePath()); |
| conf.setBoolean(FlinkOptions.METADATA_ENABLED, true); |
| OperatorCoordinator.Context context = new MockOperatorCoordinatorContext(new OperatorID(), 1); |
| coordinator = new StreamWriteOperatorCoordinator(conf, context); |
| coordinator.start(); |
| coordinator.setExecutor(new MockCoordinatorExecutor(context)); |
| |
| final WriteMetadataEvent event0 = WriteMetadataEvent.emptyBootstrap(0); |
| |
| coordinator.handleEventFromOperator(0, event0); |
| |
| String instant = coordinator.getInstant(); |
| assertNotEquals("", instant); |
| |
| final String metadataTableBasePath = HoodieTableMetadata.getMetadataTableBasePath(tempFile.getAbsolutePath()); |
| HoodieTableMetaClient metadataTableMetaClient = StreamerUtil.createMetaClient(metadataTableBasePath, HadoopConfigurations.getHadoopConf(conf)); |
| HoodieTimeline completedTimeline = metadataTableMetaClient.getActiveTimeline().filterCompletedInstants(); |
| assertThat("One instant need to sync to metadata table", completedTimeline.countInstants(), is(1)); |
| assertThat(completedTimeline.lastInstant().get().getTimestamp(), startsWith(HoodieTableMetadata.SOLO_COMMIT_TIMESTAMP)); |
| |
| // writes a normal commit |
| mockWriteWithMetadata(); |
| instant = coordinator.getInstant(); |
| // creates an inflight commit on the metadata timeline |
| metadataTableMetaClient.getActiveTimeline() |
| .createNewInstant(new HoodieInstant(HoodieInstant.State.REQUESTED, HoodieActiveTimeline.DELTA_COMMIT_ACTION, instant)); |
| metadataTableMetaClient.getActiveTimeline().transitionRequestedToInflight(HoodieActiveTimeline.DELTA_COMMIT_ACTION, instant); |
| metadataTableMetaClient.reloadActiveTimeline(); |
| |
| // write another commit with existing instant on the metadata timeline |
| mockWriteWithMetadata(); |
| metadataTableMetaClient.reloadActiveTimeline(); |
| |
| completedTimeline = metadataTableMetaClient.getActiveTimeline().filterCompletedInstants(); |
| assertThat("One instant need to sync to metadata table", completedTimeline.countInstants(), is(4)); |
| assertThat(completedTimeline.nthFromLastInstant(1).get().getTimestamp(), is(instant)); |
| assertThat("The pending instant should be rolled back first", |
| completedTimeline.lastInstant().get().getAction(), is(HoodieTimeline.ROLLBACK_ACTION)); |
| } |
| |
| @Test |
| public void testEndInputIsTheLastEvent() throws Exception { |
| Configuration conf = TestConfigurations.getDefaultConf(tempFile.getAbsolutePath()); |
| MockOperatorCoordinatorContext context = new MockOperatorCoordinatorContext(new OperatorID(), 1); |
| Logger logger = Mockito.mock(Logger.class); // avoid too many logs by executor |
| NonThrownExecutor executor = NonThrownExecutor.builder(logger).waitForTasksFinish(true).build(); |
| |
| try (StreamWriteOperatorCoordinator coordinator = new StreamWriteOperatorCoordinator(conf, context)) { |
| coordinator.start(); |
| coordinator.setExecutor(executor); |
| coordinator.handleEventFromOperator(0, WriteMetadataEvent.emptyBootstrap(0)); |
| TimeUnit.SECONDS.sleep(5); // wait for handled bootstrap event |
| |
| int eventCount = 20_000; // big enough to fill executor's queue |
| for (int i = 0; i < eventCount; i++) { |
| coordinator.handleEventFromOperator(0, createOperatorEvent(0, coordinator.getInstant(), "par1", true, 0.1)); |
| } |
| |
| WriteMetadataEvent endInput = WriteMetadataEvent.builder() |
| .taskID(0) |
| .instantTime(coordinator.getInstant()) |
| .writeStatus(Collections.emptyList()) |
| .endInput(true) |
| .build(); |
| coordinator.handleEventFromOperator(0, endInput); |
| |
| // wait for submitted events completed |
| executor.close(); |
| |
| // there should be no events after endInput |
| assertNull(coordinator.getEventBuffer()[0]); |
| } |
| } |
| |
| @Test |
| void testLockForMetadataTable() throws Exception { |
| // reset |
| reset(); |
| // override the default configuration |
| Configuration conf = TestConfigurations.getDefaultConf(tempFile.getAbsolutePath()); |
| conf.setBoolean(FlinkOptions.METADATA_ENABLED, true); |
| |
| conf.setString(HoodieWriteConfig.WRITE_CONCURRENCY_MODE.key(), WriteConcurrencyMode.OPTIMISTIC_CONCURRENCY_CONTROL.name()); |
| conf.setInteger("hoodie.write.lock.client.num_retries", 1); |
| |
| OperatorCoordinator.Context context = new MockOperatorCoordinatorContext(new OperatorID(), 1); |
| coordinator = new StreamWriteOperatorCoordinator(conf, context); |
| coordinator.start(); |
| coordinator.setExecutor(new MockCoordinatorExecutor(context)); |
| |
| final WriteMetadataEvent event0 = WriteMetadataEvent.emptyBootstrap(0); |
| |
| coordinator.handleEventFromOperator(0, event0); |
| |
| String instant = coordinator.getInstant(); |
| assertNotEquals("", instant); |
| |
| final String metadataTableBasePath = HoodieTableMetadata.getMetadataTableBasePath(tempFile.getAbsolutePath()); |
| HoodieTableMetaClient metadataTableMetaClient = StreamerUtil.createMetaClient(metadataTableBasePath, HadoopConfigurations.getHadoopConf(conf)); |
| HoodieTimeline completedTimeline = metadataTableMetaClient.getActiveTimeline().filterCompletedInstants(); |
| assertThat("One instant need to sync to metadata table", completedTimeline.countInstants(), is(1)); |
| assertThat(completedTimeline.lastInstant().get().getTimestamp(), startsWith(HoodieTableMetadata.SOLO_COMMIT_TIMESTAMP)); |
| |
| instant = mockWriteWithMetadata(); |
| metadataTableMetaClient.reloadActiveTimeline(); |
| completedTimeline = metadataTableMetaClient.getActiveTimeline().filterCompletedInstants(); |
| assertThat("One instant need to sync to metadata table", completedTimeline.countInstants(), is(2)); |
| assertThat(completedTimeline.lastInstant().get().getTimestamp(), is(instant)); |
| } |
| |
| @Test |
| public void testCommitOnEmptyBatch() throws Exception { |
| Configuration conf = TestConfigurations.getDefaultConf(tempFile.getAbsolutePath()); |
| conf.setBoolean(HoodieWriteConfig.ALLOW_EMPTY_COMMIT.key(), true); |
| MockOperatorCoordinatorContext context = new MockOperatorCoordinatorContext(new OperatorID(), 2); |
| NonThrownExecutor executor = new MockCoordinatorExecutor(context); |
| try (StreamWriteOperatorCoordinator coordinator = new StreamWriteOperatorCoordinator(conf, context)) { |
| coordinator.start(); |
| coordinator.setExecutor(executor); |
| coordinator.handleEventFromOperator(0, WriteMetadataEvent.emptyBootstrap(0)); |
| coordinator.handleEventFromOperator(1, WriteMetadataEvent.emptyBootstrap(1)); |
| |
| // Coordinator start the instant |
| String instant = coordinator.getInstant(); |
| |
| OperatorEvent event1 = WriteMetadataEvent.builder() |
| .taskID(0) |
| .instantTime(instant) |
| .writeStatus(Collections.emptyList()) |
| .lastBatch(true) |
| .build(); |
| OperatorEvent event2 = WriteMetadataEvent.builder() |
| .taskID(1) |
| .instantTime(instant) |
| .writeStatus(Collections.emptyList()) |
| .lastBatch(true) |
| .build(); |
| coordinator.handleEventFromOperator(0, event1); |
| coordinator.handleEventFromOperator(1, event2); |
| |
| assertDoesNotThrow(() -> coordinator.notifyCheckpointComplete(1), |
| "Commit the instant"); |
| String lastCompleted = TestUtils.getLastCompleteInstant(tempFile.getAbsolutePath()); |
| assertThat("Commits the instant with empty batch anyway", lastCompleted, is(instant)); |
| assertNull(coordinator.getEventBuffer()[0]); |
| } |
| } |
| |
| // ------------------------------------------------------------------------- |
| // Utilities |
| // ------------------------------------------------------------------------- |
| |
| private String mockWriteWithMetadata() { |
| final String instant = coordinator.getInstant(); |
| OperatorEvent event = createOperatorEvent(0, instant, "par1", true, 0.1); |
| |
| coordinator.handleEventFromOperator(0, event); |
| coordinator.notifyCheckpointComplete(0); |
| return instant; |
| } |
| |
| private static WriteMetadataEvent createOperatorEvent( |
| int taskId, |
| String instant, |
| String partitionPath, |
| boolean trackSuccessRecords, |
| double failureFraction) { |
| final WriteStatus writeStatus = new WriteStatus(trackSuccessRecords, failureFraction); |
| writeStatus.setPartitionPath(partitionPath); |
| |
| HoodieWriteStat writeStat = new HoodieWriteStat(); |
| writeStat.setPartitionPath(partitionPath); |
| writeStat.setFileId("fileId123"); |
| writeStat.setPath("path123"); |
| writeStat.setFileSizeInBytes(123); |
| writeStat.setTotalWriteBytes(123); |
| writeStat.setNumWrites(1); |
| |
| writeStatus.setStat(writeStat); |
| |
| return WriteMetadataEvent.builder() |
| .taskID(taskId) |
| .instantTime(instant) |
| .writeStatus(Collections.singletonList(writeStatus)) |
| .lastBatch(true) |
| .build(); |
| } |
| |
| private void reset() throws Exception { |
| FileUtils.cleanDirectory(tempFile); |
| } |
| |
| private void assertError(Runnable runnable, String message) { |
| runnable.run(); |
| // wait a little while for the task to finish |
| assertThat(coordinator.getContext(), instanceOf(MockOperatorCoordinatorContext.class)); |
| MockOperatorCoordinatorContext context = (MockOperatorCoordinatorContext) coordinator.getContext(); |
| assertTrue(context.isJobFailed(), message); |
| } |
| } |