blob: 29a7455b684985b98be8cba788639ace2565573a [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hudi.sink;
import org.apache.hudi.client.HoodieFlinkWriteClient;
import org.apache.hudi.client.WriteStatus;
import org.apache.hudi.common.model.HoodieKey;
import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.common.model.HoodieTableType;
import org.apache.hudi.common.model.WriteOperationType;
import org.apache.hudi.common.table.timeline.HoodieInstant;
import org.apache.hudi.common.table.view.FileSystemViewStorageConfig;
import org.apache.hudi.common.table.view.FileSystemViewStorageType;
import org.apache.hudi.configuration.FlinkOptions;
import org.apache.hudi.exception.HoodieException;
import org.apache.hudi.sink.event.WriteMetadataEvent;
import org.apache.hudi.sink.utils.StreamWriteFunctionWrapper;
import org.apache.hudi.util.StreamerUtil;
import org.apache.hudi.utils.TestConfigurations;
import org.apache.hudi.utils.TestData;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.runtime.operators.coordination.OperatorEvent;
import org.apache.flink.table.data.RowData;
import org.hamcrest.MatcherAssert;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.io.TempDir;
import java.io.File;
import java.util.Comparator;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
import static org.hamcrest.CoreMatchers.instanceOf;
import static org.hamcrest.CoreMatchers.is;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertNotEquals;
import static org.junit.jupiter.api.Assertions.assertNotNull;
import static org.junit.jupiter.api.Assertions.assertSame;
import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.junit.jupiter.api.Assertions.assertTrue;
/**
* Test cases for stream write.
*/
public class TestWriteCopyOnWrite {
protected static final Map<String, String> EXPECTED1 = new HashMap<>();
protected static final Map<String, String> EXPECTED2 = new HashMap<>();
protected static final Map<String, String> EXPECTED3 = new HashMap<>();
static {
EXPECTED1.put("par1", "[id1,par1,id1,Danny,23,1,par1, id2,par1,id2,Stephen,33,2,par1]");
EXPECTED1.put("par2", "[id3,par2,id3,Julian,53,3,par2, id4,par2,id4,Fabian,31,4,par2]");
EXPECTED1.put("par3", "[id5,par3,id5,Sophia,18,5,par3, id6,par3,id6,Emma,20,6,par3]");
EXPECTED1.put("par4", "[id7,par4,id7,Bob,44,7,par4, id8,par4,id8,Han,56,8,par4]");
EXPECTED2.put("par1", "[id1,par1,id1,Danny,24,1,par1, id2,par1,id2,Stephen,34,2,par1]");
EXPECTED2.put("par2", "[id3,par2,id3,Julian,54,3,par2, id4,par2,id4,Fabian,32,4,par2]");
EXPECTED2.put("par3", "[id5,par3,id5,Sophia,18,5,par3, id6,par3,id6,Emma,20,6,par3, "
+ "id9,par3,id9,Jane,19,6,par3]");
EXPECTED2.put("par4", "[id10,par4,id10,Ella,38,7,par4, id11,par4,id11,Phoebe,52,8,par4, "
+ "id7,par4,id7,Bob,44,7,par4, id8,par4,id8,Han,56,8,par4]");
EXPECTED3.put("par1", "[id1,par1,id1,Danny,23,1,par1]");
}
protected Configuration conf;
protected StreamWriteFunctionWrapper<RowData> funcWrapper;
@TempDir
File tempFile;
@BeforeEach
public void before() throws Exception {
final String basePath = tempFile.getAbsolutePath();
conf = TestConfigurations.getDefaultConf(basePath);
conf.setString(FlinkOptions.TABLE_TYPE, getTableType().name());
setUp(conf);
this.funcWrapper = new StreamWriteFunctionWrapper<>(tempFile.getAbsolutePath(), conf);
}
/**
* Override to have custom configuration.
*/
protected void setUp(Configuration conf) {
// for sub-class extension
}
@AfterEach
public void after() throws Exception {
funcWrapper.close();
}
@Test
public void testCheckpoint() throws Exception {
// open the function and ingest data
funcWrapper.openFunction();
for (RowData rowData : TestData.DATA_SET_INSERT) {
funcWrapper.invoke(rowData);
}
// no checkpoint, so the coordinator does not accept any events
assertTrue(
funcWrapper.getEventBuffer().length == 1
&& funcWrapper.getEventBuffer()[0] == null, "The coordinator events buffer expect to be empty");
// this triggers the data write and event send
funcWrapper.checkpointFunction(1);
String instant = funcWrapper.getWriteClient().getLastPendingInstant(getTableType());
final OperatorEvent nextEvent = funcWrapper.getNextEvent();
MatcherAssert.assertThat("The operator expect to send an event", nextEvent, instanceOf(WriteMetadataEvent.class));
List<WriteStatus> writeStatuses = ((WriteMetadataEvent) nextEvent).getWriteStatuses();
assertNotNull(writeStatuses);
MatcherAssert.assertThat(writeStatuses.size(), is(4)); // write 4 partition files
assertThat(writeStatuses.stream()
.map(WriteStatus::getPartitionPath).sorted(Comparator.naturalOrder())
.collect(Collectors.joining(",")),
is("par1,par2,par3,par4"));
funcWrapper.getCoordinator().handleEventFromOperator(0, nextEvent);
assertNotNull(funcWrapper.getEventBuffer()[0], "The coordinator missed the event");
checkInstantState(funcWrapper.getWriteClient(), HoodieInstant.State.REQUESTED, instant);
funcWrapper.checkpointComplete(1);
// the coordinator checkpoint commits the inflight instant.
checkInstantState(funcWrapper.getWriteClient(), HoodieInstant.State.COMPLETED, instant);
// checkpoint for next round, no data input, so after the checkpoint,
// there should not be REQUESTED Instant
// this triggers the data write and event send
funcWrapper.checkpointFunction(2);
String instant2 = funcWrapper.getWriteClient()
.getLastPendingInstant(getTableType());
assertNotEquals(instant, instant2);
final OperatorEvent nextEvent2 = funcWrapper.getNextEvent();
assertThat("The operator expect to send an event", nextEvent2, instanceOf(WriteMetadataEvent.class));
List<WriteStatus> writeStatuses2 = ((WriteMetadataEvent) nextEvent2).getWriteStatuses();
assertNotNull(writeStatuses2);
assertThat(writeStatuses2.size(), is(0)); // write empty statuses
funcWrapper.getCoordinator().handleEventFromOperator(0, nextEvent2);
assertNotNull(funcWrapper.getEventBuffer()[0], "The coordinator missed the event");
funcWrapper.checkpointComplete(2);
// started a new instant already
checkInflightInstant(funcWrapper.getWriteClient());
checkInstantState(funcWrapper.getWriteClient(), HoodieInstant.State.COMPLETED, instant);
}
@Test
public void testCheckpointFails() throws Exception {
// open the function and ingest data
funcWrapper.openFunction();
// no data written and triggers checkpoint fails,
// then we should revert the start instant
// this triggers the data write and event send
funcWrapper.checkpointFunction(1);
String instant = funcWrapper.getWriteClient()
.getLastPendingInstant(getTableType());
assertNotNull(instant);
final OperatorEvent nextEvent = funcWrapper.getNextEvent();
assertThat("The operator expect to send an event", nextEvent, instanceOf(WriteMetadataEvent.class));
List<WriteStatus> writeStatuses = ((WriteMetadataEvent) nextEvent).getWriteStatuses();
assertNotNull(writeStatuses);
assertThat(writeStatuses.size(), is(0)); // no data write
// fails the checkpoint
funcWrapper.checkpointFails(1);
assertFalse(funcWrapper.getCoordinatorContext().isJobFailed(),
"The last checkpoint was aborted, ignore the events");
// the instant metadata should be reused
checkInstantState(funcWrapper.getWriteClient(), HoodieInstant.State.REQUESTED, instant);
checkInstantState(funcWrapper.getWriteClient(), HoodieInstant.State.COMPLETED, null);
for (RowData rowData : TestData.DATA_SET_INSERT) {
funcWrapper.invoke(rowData);
}
// this returns early because there is no inflight instant
assertThrows(HoodieException.class,
() -> funcWrapper.checkpointFunction(2),
"Timeout(0ms) while waiting for");
// do not sent the write event and fails the checkpoint,
// behaves like the last checkpoint is successful.
funcWrapper.checkpointFails(2);
}
@Test
public void testSubtaskFails() throws Exception {
// open the function and ingest data
funcWrapper.openFunction();
// no data written and triggers checkpoint fails,
// then we should revert the start instant
// this triggers the data write and event send
funcWrapper.checkpointFunction(1);
funcWrapper.getNextEvent();
String instant1 = funcWrapper.getWriteClient().getLastPendingInstant(getTableType());
assertNotNull(instant1);
// fails the subtask
funcWrapper.subTaskFails(0);
String instant2 = funcWrapper.getWriteClient().getLastPendingInstant(getTableType());
assertNotEquals(instant2, instant1, "The previous instant should be rolled back when starting new instant");
checkInstantState(funcWrapper.getWriteClient(), HoodieInstant.State.COMPLETED, null);
}
@Test
public void testInsert() throws Exception {
// open the function and ingest data
funcWrapper.openFunction();
for (RowData rowData : TestData.DATA_SET_INSERT) {
funcWrapper.invoke(rowData);
}
assertEmptyDataFiles();
// this triggers the data write and event send
funcWrapper.checkpointFunction(1);
String instant = funcWrapper.getWriteClient()
.getLastPendingInstant(getTableType());
final OperatorEvent nextEvent = funcWrapper.getNextEvent();
assertThat("The operator expect to send an event", nextEvent, instanceOf(WriteMetadataEvent.class));
funcWrapper.getCoordinator().handleEventFromOperator(0, nextEvent);
assertNotNull(funcWrapper.getEventBuffer()[0], "The coordinator missed the event");
checkInstantState(funcWrapper.getWriteClient(), HoodieInstant.State.REQUESTED, instant);
funcWrapper.checkpointComplete(1);
checkWrittenData(tempFile, EXPECTED1);
// the coordinator checkpoint commits the inflight instant.
checkInstantState(funcWrapper.getWriteClient(), HoodieInstant.State.COMPLETED, instant);
checkWrittenData(tempFile, EXPECTED1);
}
@Test
public void testInsertDuplicates() throws Exception {
// reset the config option
conf.setBoolean(FlinkOptions.INSERT_DROP_DUPS, true);
funcWrapper = new StreamWriteFunctionWrapper<>(tempFile.getAbsolutePath(), conf);
// open the function and ingest data
funcWrapper.openFunction();
for (RowData rowData : TestData.DATA_SET_INSERT_DUPLICATES) {
funcWrapper.invoke(rowData);
}
assertEmptyDataFiles();
// this triggers the data write and event send
funcWrapper.checkpointFunction(1);
OperatorEvent nextEvent = funcWrapper.getNextEvent();
assertThat("The operator expect to send an event", nextEvent, instanceOf(WriteMetadataEvent.class));
funcWrapper.getCoordinator().handleEventFromOperator(0, nextEvent);
assertNotNull(funcWrapper.getEventBuffer()[0], "The coordinator missed the event");
funcWrapper.checkpointComplete(1);
checkWrittenData(tempFile, EXPECTED3, 1);
// insert duplicates again
for (RowData rowData : TestData.DATA_SET_INSERT_DUPLICATES) {
funcWrapper.invoke(rowData);
}
funcWrapper.checkpointFunction(2);
nextEvent = funcWrapper.getNextEvent();
funcWrapper.getCoordinator().handleEventFromOperator(0, nextEvent);
funcWrapper.checkpointComplete(2);
checkWrittenData(tempFile, EXPECTED3, 1);
}
@Test
public void testUpsert() throws Exception {
// open the function and ingest data
funcWrapper.openFunction();
for (RowData rowData : TestData.DATA_SET_INSERT) {
funcWrapper.invoke(rowData);
}
assertEmptyDataFiles();
// this triggers the data write and event send
funcWrapper.checkpointFunction(1);
OperatorEvent nextEvent = funcWrapper.getNextEvent();
assertThat("The operator expect to send an event", nextEvent, instanceOf(WriteMetadataEvent.class));
funcWrapper.getCoordinator().handleEventFromOperator(0, nextEvent);
assertNotNull(funcWrapper.getEventBuffer()[0], "The coordinator missed the event");
funcWrapper.checkpointComplete(1);
// upsert another data buffer
for (RowData rowData : TestData.DATA_SET_UPDATE_INSERT) {
funcWrapper.invoke(rowData);
}
// the data is not flushed yet
checkWrittenData(tempFile, EXPECTED1);
// this triggers the data write and event send
funcWrapper.checkpointFunction(2);
String instant = funcWrapper.getWriteClient()
.getLastPendingInstant(getTableType());
nextEvent = funcWrapper.getNextEvent();
assertThat("The operator expect to send an event", nextEvent, instanceOf(WriteMetadataEvent.class));
funcWrapper.getCoordinator().handleEventFromOperator(0, nextEvent);
assertNotNull(funcWrapper.getEventBuffer()[0], "The coordinator missed the event");
checkInstantState(funcWrapper.getWriteClient(), HoodieInstant.State.REQUESTED, instant);
funcWrapper.checkpointComplete(2);
// the coordinator checkpoint commits the inflight instant.
checkInstantState(funcWrapper.getWriteClient(), HoodieInstant.State.COMPLETED, instant);
checkWrittenData(tempFile, EXPECTED2);
}
@Test
public void testUpsertWithDelete() throws Exception {
// open the function and ingest data
funcWrapper.openFunction();
for (RowData rowData : TestData.DATA_SET_INSERT) {
funcWrapper.invoke(rowData);
}
assertEmptyDataFiles();
// this triggers the data write and event send
funcWrapper.checkpointFunction(1);
OperatorEvent nextEvent = funcWrapper.getNextEvent();
assertThat("The operator expect to send an event", nextEvent, instanceOf(WriteMetadataEvent.class));
funcWrapper.getCoordinator().handleEventFromOperator(0, nextEvent);
assertNotNull(funcWrapper.getEventBuffer()[0], "The coordinator missed the event");
funcWrapper.checkpointComplete(1);
// upsert another data buffer
for (RowData rowData : TestData.DATA_SET_UPDATE_DELETE) {
funcWrapper.invoke(rowData);
}
// the data is not flushed yet
checkWrittenData(tempFile, EXPECTED1);
// this triggers the data write and event send
funcWrapper.checkpointFunction(2);
String instant = funcWrapper.getWriteClient()
.getLastPendingInstant(getTableType());
nextEvent = funcWrapper.getNextEvent();
assertThat("The operator expect to send an event", nextEvent, instanceOf(WriteMetadataEvent.class));
funcWrapper.getCoordinator().handleEventFromOperator(0, nextEvent);
assertNotNull(funcWrapper.getEventBuffer()[0], "The coordinator missed the event");
checkInstantState(funcWrapper.getWriteClient(), HoodieInstant.State.REQUESTED, instant);
funcWrapper.checkpointComplete(2);
// the coordinator checkpoint commits the inflight instant.
checkInstantState(funcWrapper.getWriteClient(), HoodieInstant.State.COMPLETED, instant);
Map<String, String> expected = new HashMap<>();
// id3, id5 were deleted and id9 is ignored
expected.put("par1", "[id1,par1,id1,Danny,24,1,par1, id2,par1,id2,Stephen,34,2,par1]");
expected.put("par2", "[id4,par2,id4,Fabian,31,4,par2]");
expected.put("par3", "[id6,par3,id6,Emma,20,6,par3]");
expected.put("par4", "[id7,par4,id7,Bob,44,7,par4, id8,par4,id8,Han,56,8,par4]");
checkWrittenData(tempFile, expected);
}
@Test
public void testInsertWithMiniBatches() throws Exception {
// reset the config option
conf.setDouble(FlinkOptions.WRITE_BATCH_SIZE, 0.0006); // 630 bytes batch size
funcWrapper = new StreamWriteFunctionWrapper<>(tempFile.getAbsolutePath(), conf);
// open the function and ingest data
funcWrapper.openFunction();
// Each record is 208 bytes. so 4 records expect to trigger a mini-batch write
for (RowData rowData : TestData.DATA_SET_INSERT_DUPLICATES) {
funcWrapper.invoke(rowData);
}
Map<String, List<HoodieRecord>> dataBuffer = funcWrapper.getDataBuffer();
assertThat("Should have 1 data bucket", dataBuffer.size(), is(1));
assertThat("2 records expect to flush out as a mini-batch",
dataBuffer.values().stream().findFirst().map(List::size).orElse(-1),
is(2));
// this triggers the data write and event send
funcWrapper.checkpointFunction(1);
dataBuffer = funcWrapper.getDataBuffer();
assertThat("All data should be flushed out", dataBuffer.size(), is(0));
final OperatorEvent event1 = funcWrapper.getNextEvent(); // remove the first event first
final OperatorEvent event2 = funcWrapper.getNextEvent();
assertThat("The operator expect to send an event", event2, instanceOf(WriteMetadataEvent.class));
funcWrapper.getCoordinator().handleEventFromOperator(0, event1);
funcWrapper.getCoordinator().handleEventFromOperator(0, event2);
assertNotNull(funcWrapper.getEventBuffer()[0], "The coordinator missed the event");
String instant = funcWrapper.getWriteClient()
.getLastPendingInstant(getTableType());
funcWrapper.checkpointComplete(1);
Map<String, String> expected = getMiniBatchExpected();
checkWrittenData(tempFile, expected, 1);
// started a new instant already
checkInflightInstant(funcWrapper.getWriteClient());
checkInstantState(funcWrapper.getWriteClient(), HoodieInstant.State.COMPLETED, instant);
// insert duplicates again
for (RowData rowData : TestData.DATA_SET_INSERT_DUPLICATES) {
funcWrapper.invoke(rowData);
}
funcWrapper.checkpointFunction(2);
final OperatorEvent event3 = funcWrapper.getNextEvent(); // remove the first event first
final OperatorEvent event4 = funcWrapper.getNextEvent();
funcWrapper.getCoordinator().handleEventFromOperator(0, event3);
funcWrapper.getCoordinator().handleEventFromOperator(0, event4);
funcWrapper.checkpointComplete(2);
// Same the original base file content.
checkWrittenData(tempFile, expected, 1);
}
@Test
public void testInsertWithDeduplication() throws Exception {
// reset the config option
conf.setDouble(FlinkOptions.WRITE_BATCH_SIZE, 0.0006); // 630 bytes batch size
conf.setBoolean(FlinkOptions.INSERT_DROP_DUPS, true);
funcWrapper = new StreamWriteFunctionWrapper<>(tempFile.getAbsolutePath(), conf);
// open the function and ingest data
funcWrapper.openFunction();
// Each record is 208 bytes. so 4 records expect to trigger a mini-batch write
for (RowData rowData : TestData.DATA_SET_INSERT_SAME_KEY) {
funcWrapper.invoke(rowData);
}
Map<String, List<HoodieRecord>> dataBuffer = funcWrapper.getDataBuffer();
assertThat("Should have 1 data bucket", dataBuffer.size(), is(1));
assertThat("2 records expect to flush out as a mini-batch",
dataBuffer.values().stream().findFirst().map(List::size).orElse(-1),
is(2));
// this triggers the data write and event send
funcWrapper.checkpointFunction(1);
dataBuffer = funcWrapper.getDataBuffer();
assertThat("All data should be flushed out", dataBuffer.size(), is(0));
final OperatorEvent event1 = funcWrapper.getNextEvent(); // remove the first event first
final OperatorEvent event2 = funcWrapper.getNextEvent();
assertThat("The operator expect to send an event", event2, instanceOf(WriteMetadataEvent.class));
funcWrapper.getCoordinator().handleEventFromOperator(0, event1);
funcWrapper.getCoordinator().handleEventFromOperator(0, event2);
assertNotNull(funcWrapper.getEventBuffer()[0], "The coordinator missed the event");
String instant = funcWrapper.getWriteClient()
.getLastPendingInstant(getTableType());
funcWrapper.checkpointComplete(1);
Map<String, String> expected = new HashMap<>();
expected.put("par1", "[id1,par1,id1,Danny,23,4,par1]");
checkWrittenData(tempFile, expected, 1);
// started a new instant already
checkInflightInstant(funcWrapper.getWriteClient());
checkInstantState(funcWrapper.getWriteClient(), HoodieInstant.State.COMPLETED, instant);
// insert duplicates again
for (RowData rowData : TestData.DATA_SET_INSERT_SAME_KEY) {
funcWrapper.invoke(rowData);
}
funcWrapper.checkpointFunction(2);
final OperatorEvent event3 = funcWrapper.getNextEvent(); // remove the first event first
final OperatorEvent event4 = funcWrapper.getNextEvent();
funcWrapper.getCoordinator().handleEventFromOperator(0, event3);
funcWrapper.getCoordinator().handleEventFromOperator(0, event4);
funcWrapper.checkpointComplete(2);
// Same the original base file content.
checkWrittenData(tempFile, expected, 1);
}
@Test
public void testInsertAllowsDuplication() throws Exception {
// reset the config option
conf.setDouble(FlinkOptions.WRITE_BATCH_SIZE, 0.0006); // 630 bytes batch size
conf.setString(FlinkOptions.OPERATION, WriteOperationType.INSERT.value());
funcWrapper = new StreamWriteFunctionWrapper<>(tempFile.getAbsolutePath(), conf);
// open the function and ingest data
funcWrapper.openFunction();
// Each record is 208 bytes. so 4 records expect to trigger a mini-batch write
for (RowData rowData : TestData.DATA_SET_INSERT_SAME_KEY) {
funcWrapper.invoke(rowData);
}
// this triggers the data write and event send
funcWrapper.checkpointFunction(1);
Map<String, List<HoodieRecord>> dataBuffer = funcWrapper.getDataBuffer();
assertThat("All data should be flushed out", dataBuffer.size(), is(0));
final OperatorEvent event1 = funcWrapper.getNextEvent(); // remove the first event first
final OperatorEvent event2 = funcWrapper.getNextEvent();
assertThat("The operator expect to send an event", event2, instanceOf(WriteMetadataEvent.class));
funcWrapper.getCoordinator().handleEventFromOperator(0, event1);
funcWrapper.getCoordinator().handleEventFromOperator(0, event2);
assertNotNull(funcWrapper.getEventBuffer()[0], "The coordinator missed the event");
String instant = funcWrapper.getWriteClient()
.getLastPendingInstant(getTableType());
funcWrapper.checkpointComplete(1);
Map<String, String> expected = new HashMap<>();
expected.put("par1", "["
+ "id1,par1,id1,Danny,23,0,par1, "
+ "id1,par1,id1,Danny,23,1,par1, "
+ "id1,par1,id1,Danny,23,2,par1, "
+ "id1,par1,id1,Danny,23,3,par1, "
+ "id1,par1,id1,Danny,23,4,par1]");
TestData.checkWrittenAllData(tempFile, expected, 1);
// started a new instant already
checkInflightInstant(funcWrapper.getWriteClient());
checkInstantState(funcWrapper.getWriteClient(), HoodieInstant.State.COMPLETED, instant);
// insert duplicates again
for (RowData rowData : TestData.DATA_SET_INSERT_SAME_KEY) {
funcWrapper.invoke(rowData);
}
funcWrapper.checkpointFunction(2);
final OperatorEvent event3 = funcWrapper.getNextEvent(); // remove the first event first
final OperatorEvent event4 = funcWrapper.getNextEvent();
funcWrapper.getCoordinator().handleEventFromOperator(0, event3);
funcWrapper.getCoordinator().handleEventFromOperator(0, event4);
funcWrapper.checkpointComplete(2);
// same with the original base file content.
expected.put("par1", "["
+ "id1,par1,id1,Danny,23,0,par1, "
+ "id1,par1,id1,Danny,23,0,par1, "
+ "id1,par1,id1,Danny,23,1,par1, "
+ "id1,par1,id1,Danny,23,1,par1, "
+ "id1,par1,id1,Danny,23,2,par1, "
+ "id1,par1,id1,Danny,23,2,par1, "
+ "id1,par1,id1,Danny,23,3,par1, "
+ "id1,par1,id1,Danny,23,3,par1, "
+ "id1,par1,id1,Danny,23,4,par1, "
+ "id1,par1,id1,Danny,23,4,par1]");
TestData.checkWrittenAllData(tempFile, expected, 1);
}
@Test
public void testInsertWithSmallBufferSize() throws Exception {
// reset the config option
conf.setDouble(FlinkOptions.WRITE_TASK_MAX_SIZE, 200.0006); // 630 bytes buffer size
funcWrapper = new StreamWriteFunctionWrapper<>(tempFile.getAbsolutePath(), conf);
// open the function and ingest data
funcWrapper.openFunction();
// each record is 208 bytes. so 4 records expect to trigger buffer flush:
// flush the max size bucket once at a time.
for (RowData rowData : TestData.DATA_SET_INSERT_DUPLICATES) {
funcWrapper.invoke(rowData);
}
Map<String, List<HoodieRecord>> dataBuffer = funcWrapper.getDataBuffer();
assertThat("Should have 1 data bucket", dataBuffer.size(), is(1));
assertThat("2 records expect to flush out as a mini-batch",
dataBuffer.values().stream().findFirst().map(List::size).orElse(-1),
is(2));
// this triggers the data write and event send
funcWrapper.checkpointFunction(1);
dataBuffer = funcWrapper.getDataBuffer();
assertThat("All data should be flushed out", dataBuffer.size(), is(0));
for (int i = 0; i < 2; i++) {
final OperatorEvent event = funcWrapper.getNextEvent(); // remove the first event first
assertThat("The operator expect to send an event", event, instanceOf(WriteMetadataEvent.class));
funcWrapper.getCoordinator().handleEventFromOperator(0, event);
}
assertNotNull(funcWrapper.getEventBuffer()[0], "The coordinator missed the event");
String instant = funcWrapper.getWriteClient()
.getLastPendingInstant(getTableType());
funcWrapper.checkpointComplete(1);
Map<String, String> expected = getMiniBatchExpected();
checkWrittenData(tempFile, expected, 1);
// started a new instant already
checkInflightInstant(funcWrapper.getWriteClient());
checkInstantState(funcWrapper.getWriteClient(), HoodieInstant.State.COMPLETED, instant);
// insert duplicates again
for (RowData rowData : TestData.DATA_SET_INSERT_DUPLICATES) {
funcWrapper.invoke(rowData);
}
funcWrapper.checkpointFunction(2);
for (int i = 0; i < 2; i++) {
final OperatorEvent event = funcWrapper.getNextEvent(); // remove the first event first
funcWrapper.getCoordinator().handleEventFromOperator(0, event);
}
funcWrapper.checkpointComplete(2);
// Same the original base file content.
checkWrittenData(tempFile, expected, 1);
}
protected Map<String, String> getMiniBatchExpected() {
Map<String, String> expected = new HashMap<>();
// the last 2 lines are merged
expected.put("par1", "["
+ "id1,par1,id1,Danny,23,1,par1, "
+ "id1,par1,id1,Danny,23,1,par1, "
+ "id1,par1,id1,Danny,23,1,par1]");
return expected;
}
protected Map<String, String> getExpectedBeforeCheckpointComplete() {
return EXPECTED2;
}
@Test
public void testIndexStateBootstrap() throws Exception {
// open the function and ingest data
funcWrapper.openFunction();
for (RowData rowData : TestData.DATA_SET_INSERT) {
funcWrapper.invoke(rowData);
}
assertEmptyDataFiles();
// this triggers the data write and event send
funcWrapper.checkpointFunction(1);
OperatorEvent nextEvent = funcWrapper.getNextEvent();
assertThat("The operator expect to send an event", nextEvent, instanceOf(WriteMetadataEvent.class));
funcWrapper.getCoordinator().handleEventFromOperator(0, nextEvent);
assertNotNull(funcWrapper.getEventBuffer()[0], "The coordinator missed the event");
funcWrapper.checkpointComplete(1);
// the data is not flushed yet
checkWrittenData(tempFile, EXPECTED1);
// reset the config option
conf.setBoolean(FlinkOptions.INDEX_BOOTSTRAP_ENABLED, true);
funcWrapper = new StreamWriteFunctionWrapper<>(tempFile.getAbsolutePath(), conf);
// upsert another data buffer
funcWrapper.openFunction();
for (RowData rowData : TestData.DATA_SET_UPDATE_INSERT) {
funcWrapper.invoke(rowData);
}
assertTrue(funcWrapper.isAlreadyBootstrap());
checkIndexLoaded(
new HoodieKey("id1", "par1"),
new HoodieKey("id2", "par1"),
new HoodieKey("id3", "par2"),
new HoodieKey("id4", "par2"),
new HoodieKey("id5", "par3"),
new HoodieKey("id6", "par3"),
new HoodieKey("id7", "par4"),
new HoodieKey("id8", "par4"),
new HoodieKey("id9", "par3"),
new HoodieKey("id10", "par4"),
new HoodieKey("id11", "par4"));
// this triggers the data write and event send
funcWrapper.checkpointFunction(1);
String instant = funcWrapper.getWriteClient()
.getLastPendingInstant(getTableType());
nextEvent = funcWrapper.getNextEvent();
assertThat("The operator expect to send an event", nextEvent, instanceOf(WriteMetadataEvent.class));
Map<String, String> expected = getExpectedBeforeCheckpointComplete();
checkWrittenData(tempFile, expected);
funcWrapper.getCoordinator().handleEventFromOperator(0, nextEvent);
assertNotNull(funcWrapper.getEventBuffer()[0], "The coordinator missed the event");
checkInstantState(funcWrapper.getWriteClient(), HoodieInstant.State.REQUESTED, instant);
funcWrapper.checkpointComplete(1);
// the coordinator checkpoint commits the inflight instant.
checkInstantState(funcWrapper.getWriteClient(), HoodieInstant.State.COMPLETED, instant);
checkWrittenData(tempFile, EXPECTED2);
}
@Test
public void testWriteExactlyOnce() throws Exception {
// reset the config option
conf.setLong(FlinkOptions.WRITE_COMMIT_ACK_TIMEOUT, 3);
conf.setDouble(FlinkOptions.WRITE_TASK_MAX_SIZE, 200.0006); // 630 bytes buffer size
funcWrapper = new StreamWriteFunctionWrapper<>(tempFile.getAbsolutePath(), conf);
// open the function and ingest data
funcWrapper.openFunction();
for (RowData rowData : TestData.DATA_SET_INSERT) {
funcWrapper.invoke(rowData);
}
// no checkpoint, so the coordinator does not accept any events
assertTrue(
funcWrapper.getEventBuffer().length == 1
&& funcWrapper.getEventBuffer()[0] == null, "The coordinator events buffer expect to be empty");
// this triggers the data write and event send
funcWrapper.checkpointFunction(1);
assertTrue(funcWrapper.isConforming(), "The write function should be waiting for the instant to commit");
for (int i = 0; i < 2; i++) {
final OperatorEvent event = funcWrapper.getNextEvent(); // remove the first event first
assertThat("The operator expect to send an event", event, instanceOf(WriteMetadataEvent.class));
funcWrapper.getCoordinator().handleEventFromOperator(0, event);
}
funcWrapper.checkpointComplete(1);
for (RowData rowData : TestData.DATA_SET_INSERT) {
funcWrapper.invoke(rowData);
}
assertFalse(funcWrapper.isConforming(), "The write function should finish waiting for the instant to commit");
// checkpoint for the next round, when there is eager flush but the write
// task is waiting for the instant commit ack, should throw for timeout.
funcWrapper.checkpointFunction(2);
assertThrows(HoodieException.class, () -> {
for (RowData rowData : TestData.DATA_SET_INSERT) {
funcWrapper.invoke(rowData);
}
}, "Timeout(500ms) while waiting for instant");
}
@Test
public void testReuseEmbeddedServer() {
HoodieFlinkWriteClient writeClient = StreamerUtil.createWriteClient(conf, null);
FileSystemViewStorageConfig viewStorageConfig = writeClient.getConfig().getViewStorageConfig();
assertSame(viewStorageConfig.getStorageType(), FileSystemViewStorageType.REMOTE_FIRST);
// get another write client
writeClient = StreamerUtil.createWriteClient(conf, null);
assertSame(writeClient.getConfig().getViewStorageConfig().getStorageType(), FileSystemViewStorageType.REMOTE_FIRST);
assertEquals(viewStorageConfig.getRemoteViewServerPort(), writeClient.getConfig().getViewStorageConfig().getRemoteViewServerPort());
}
// -------------------------------------------------------------------------
// Utilities
// -------------------------------------------------------------------------
@SuppressWarnings("rawtypes")
private void checkInflightInstant(HoodieFlinkWriteClient writeClient) {
final String instant = writeClient.getLastPendingInstant(getTableType());
assertNotNull(instant);
}
@SuppressWarnings("rawtypes")
private void checkInstantState(
HoodieFlinkWriteClient writeClient,
HoodieInstant.State state,
String instantStr) {
final String instant;
switch (state) {
case REQUESTED:
instant = writeClient.getLastPendingInstant(getTableType());
break;
case COMPLETED:
instant = writeClient.getLastCompletedInstant(getTableType());
break;
default:
throw new AssertionError("Unexpected state");
}
assertThat(instant, is(instantStr));
}
protected HoodieTableType getTableType() {
return HoodieTableType.COPY_ON_WRITE;
}
protected void checkWrittenData(File baseFile, Map<String, String> expected) throws Exception {
checkWrittenData(baseFile, expected, 4);
}
protected void checkWrittenData(File baseFile, Map<String, String> expected, int partitions) throws Exception {
TestData.checkWrittenData(baseFile, expected, partitions);
}
/**
* Asserts the data files are empty.
*/
protected void assertEmptyDataFiles() {
File[] dataFiles = tempFile.listFiles(file -> !file.getName().startsWith("."));
assertNotNull(dataFiles);
assertThat(dataFiles.length, is(0));
}
private void checkIndexLoaded(HoodieKey... keys) {
for (HoodieKey key : keys) {
assertTrue(funcWrapper.isKeyInState(key),
"Key: " + key + " assumes to be in the index state");
}
}
}