[optimization] add disable 2pc config (#34)
diff --git a/flink-doris-connector/src/main/java/org/apache/doris/flink/cfg/DorisExecutionOptions.java b/flink-doris-connector/src/main/java/org/apache/doris/flink/cfg/DorisExecutionOptions.java
index cc4b203..2daf5e1 100644
--- a/flink-doris-connector/src/main/java/org/apache/doris/flink/cfg/DorisExecutionOptions.java
+++ b/flink-doris-connector/src/main/java/org/apache/doris/flink/cfg/DorisExecutionOptions.java
@@ -45,13 +45,16 @@
private final Boolean enableDelete;
+ private final Boolean enable2PC;
public DorisExecutionOptions(int checkInterval,
int maxRetries,
int bufferSize,
int bufferCount,
String labelPrefix,
- Properties streamLoadProp, Boolean enableDelete) {
+ Properties streamLoadProp,
+ Boolean enableDelete,
+ Boolean enable2PC) {
Preconditions.checkArgument(maxRetries >= 0);
this.checkInterval = checkInterval;
this.maxRetries = maxRetries;
@@ -60,6 +63,7 @@
this.labelPrefix = labelPrefix;
this.streamLoadProp = streamLoadProp;
this.enableDelete = enableDelete;
+ this.enable2PC = enable2PC;
}
public static Builder builder() {
@@ -105,6 +109,9 @@
return enableDelete;
}
+ public Boolean enabled2PC() {
+ return enable2PC;
+ }
/**
* Builder of {@link DorisExecutionOptions}.
*/
@@ -117,6 +124,8 @@
private Properties streamLoadProp = new Properties();
private boolean enableDelete = false;
+ private boolean enable2PC = true;
+
public Builder setCheckInterval(Integer checkInterval) {
this.checkInterval = checkInterval;
return this;
@@ -152,8 +161,13 @@
return this;
}
+ public Builder disable2PC() {
+ this.enable2PC = false;
+ return this;
+ }
+
public DorisExecutionOptions build() {
- return new DorisExecutionOptions(checkInterval, maxRetries, bufferSize, bufferCount, labelPrefix, streamLoadProp, enableDelete);
+ return new DorisExecutionOptions(checkInterval, maxRetries, bufferSize, bufferCount, labelPrefix, streamLoadProp, enableDelete, enable2PC);
}
}
diff --git a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/DorisStreamLoad.java b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/DorisStreamLoad.java
index b468cc8..dfb9cb7 100644
--- a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/DorisStreamLoad.java
+++ b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/DorisStreamLoad.java
@@ -61,7 +61,7 @@
public class DorisStreamLoad implements Serializable {
private static final Logger LOG = LoggerFactory.getLogger(DorisStreamLoad.class);
private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
- private final String labelSuffix;
+ private final LabelGenerator labelGenerator;
private final byte[] lineDelimiter;
private static final String LOAD_URL_PATTERN = "http://%s/api/%s/%s/_stream_load";
private static final String ABORT_URL_PATTERN = "http://%s/api/%s/_stream_load_2pc";
@@ -74,6 +74,7 @@
private final String passwd;
private final String db;
private final String table;
+ private final boolean enable2PC;
private final Properties streamLoadProp;
private final RecordStream recordStream;
private Future<CloseableHttpResponse> pendingLoadFuture;
@@ -84,7 +85,7 @@
public DorisStreamLoad(String hostPort,
DorisOptions dorisOptions,
DorisExecutionOptions executionOptions,
- String labelSuffix,
+ LabelGenerator labelGenerator,
CloseableHttpClient httpClient) {
this.hostPort = hostPort;
String[] tableInfo = dorisOptions.getTableIdentifier().split("\\.");
@@ -92,9 +93,10 @@
this.table = tableInfo[1];
this.user = dorisOptions.getUsername();
this.passwd = dorisOptions.getPassword();
- this.labelSuffix = labelSuffix;
+ this.labelGenerator = labelGenerator;
this.loadUrlStr = String.format(LOAD_URL_PATTERN, hostPort, db, table);
this.abortUrlStr = String.format(ABORT_URL_PATTERN, hostPort, db);
+ this.enable2PC = executionOptions.enabled2PC();
this.streamLoadProp = executionOptions.getStreamLoadProp();
this.httpClient = httpClient;
this.executorService = new ThreadPoolExecutor(1, 1,
@@ -133,7 +135,7 @@
LOG.info("abort for labelSuffix {}. start chkId {}.", labelSuffix, chkID);
while (true) {
try {
- String label = labelSuffix + "_" + startChkID;
+ String label = labelGenerator.generateLabel(startChkID);
HttpPutBuilder builder = new HttpPutBuilder();
builder.setUrl(loadUrlStr)
.baseAuth(user, passwd)
@@ -218,12 +220,11 @@
/**
* start write data for new checkpoint.
- * @param chkID
+ * @param label
* @throws IOException
*/
- public void startLoad(long chkID) throws IOException{
+ public void startLoad(String label) throws IOException{
loadBatchFirstRecord = true;
- String label = labelSuffix + "_" + chkID;
HttpPutBuilder putBuilder = new HttpPutBuilder();
recordStream.startInput();
LOG.info("stream load started for {}", label);
@@ -232,10 +233,12 @@
putBuilder.setUrl(loadUrlStr)
.baseAuth(user, passwd)
.addCommonHeader()
- .enable2PC()
.setLabel(label)
.setEntity(entity)
.addProperties(streamLoadProp);
+ if (enable2PC) {
+ putBuilder.enable2PC();
+ }
pendingLoadFuture = executorService.submit(() -> {
LOG.info("start execute load");
return httpClient.execute(putBuilder.build());
diff --git a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/DorisWriter.java b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/DorisWriter.java
index 026f54e..86ed9ed 100644
--- a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/DorisWriter.java
+++ b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/DorisWriter.java
@@ -66,6 +66,7 @@
private final DorisReadOptions dorisReadOptions;
private final DorisExecutionOptions executionOptions;
private final String labelPrefix;
+ private final LabelGenerator labelGenerator;
private final int intervalTime;
private final DorisWriterState dorisWriterState;
private final DorisRecordSerializer<IN> serializer;
@@ -87,6 +88,7 @@
LOG.info("labelPrefix " + executionOptions.getLabelPrefix());
this.dorisWriterState = new DorisWriterState(executionOptions.getLabelPrefix());
this.labelPrefix = executionOptions.getLabelPrefix() + "_" + initContext.getSubtaskId();
+ this.labelGenerator = new LabelGenerator(labelPrefix, executionOptions.enabled2PC());
this.scheduledExecutorService = new ScheduledThreadPoolExecutor(1, new ExecutorThreadFactory("stream-load-check"));
this.serializer = serializer;
this.dorisOptions = dorisOptions;
@@ -98,20 +100,22 @@
public void initializeLoad(List<DorisWriterState> state) throws IOException {
try {
- this.dorisStreamLoad = new DorisStreamLoad(
+ this.dorisStreamLoad = new DorisStreamLoad(
RestService.getBackend(dorisOptions, dorisReadOptions, LOG),
dorisOptions,
executionOptions,
- labelPrefix, new HttpUtil().getHttpClient());
+ labelGenerator, new HttpUtil().getHttpClient());
// TODO: we need check and abort all pending transaction.
// Discard transactions that may cause the job to fail.
- dorisStreamLoad.abortPreCommit(labelPrefix, lastCheckpointId + 1);
+ if(executionOptions.enabled2PC()) {
+ dorisStreamLoad.abortPreCommit(labelPrefix, lastCheckpointId + 1);
+ }
} catch (Exception e) {
throw new DorisRuntimeException(e);
}
// get main work thread.
executorThread = Thread.currentThread();
- dorisStreamLoad.startLoad(lastCheckpointId + 1);
+ dorisStreamLoad.startLoad(labelGenerator.generateLabel(lastCheckpointId + 1));
// when uploading data in streaming mode, we need to regularly detect whether there are exceptions.
scheduledExecutorService.scheduleWithFixedDelay(this::checkDone, 200, intervalTime, TimeUnit.MILLISECONDS);
}
@@ -124,6 +128,7 @@
@Override
public List<DorisCommittable> prepareCommit(boolean flush) throws IOException {
+ // disable exception checker before stop load.
loading = false;
Preconditions.checkState(dorisStreamLoad != null);
RespContent respContent = dorisStreamLoad.stopLoad();
@@ -131,7 +136,9 @@
String errMsg = String.format("stream load error: %s, see more in %s", respContent.getMessage(), respContent.getErrorURL());
throw new DorisRuntimeException(errMsg);
}
-
+ if (!executionOptions.enabled2PC()) {
+ return Collections.emptyList();
+ }
long txnId = respContent.getTxnId();
return ImmutableList.of(new DorisCommittable(dorisStreamLoad.getHostPort(), dorisStreamLoad.getDb(), txnId));
@@ -140,7 +147,7 @@
@Override
public List<DorisWriterState> snapshotState(long checkpointId) throws IOException {
Preconditions.checkState(dorisStreamLoad != null);
- this.dorisStreamLoad.startLoad(checkpointId + 1);
+ this.dorisStreamLoad.startLoad(labelGenerator.generateLabel(checkpointId + 1));
this.loading = true;
return Collections.singletonList(dorisWriterState);
}
@@ -173,7 +180,7 @@
private void checkLoadException() {
if (loadException != null) {
- throw new RuntimeException("error while load exception.", loadException);
+ throw new RuntimeException("error while loading data.", loadException);
}
}
diff --git a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/LabelGenerator.java b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/LabelGenerator.java
new file mode 100644
index 0000000..436d709
--- /dev/null
+++ b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/LabelGenerator.java
@@ -0,0 +1,34 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+package org.apache.doris.flink.sink.writer;
+
+/**
+ * Generator label for stream load.
+ */
+public class LabelGenerator {
+ private String labelPrefix;
+ private boolean enable2PC;
+
+ public LabelGenerator(String labelPrefix, boolean enable2PC) {
+ this.labelPrefix = labelPrefix;
+ this.enable2PC = enable2PC;
+ }
+
+ public String generateLabel(long chkId) {
+ return enable2PC ? labelPrefix + "_" + chkId : labelPrefix + "_" + System.currentTimeMillis();
+ }
+}
diff --git a/flink-doris-connector/src/main/java/org/apache/doris/flink/table/DorisDynamicTableFactory.java b/flink-doris-connector/src/main/java/org/apache/doris/flink/table/DorisDynamicTableFactory.java
index 7d6455e..be00cff 100644
--- a/flink-doris-connector/src/main/java/org/apache/doris/flink/table/DorisDynamicTableFactory.java
+++ b/flink-doris-connector/src/main/java/org/apache/doris/flink/table/DorisDynamicTableFactory.java
@@ -118,6 +118,12 @@
.defaultValue(DORIS_EXEC_MEM_LIMIT_DEFAULT)
.withDescription("");
// flink write config options
+ private static final ConfigOption<Boolean> SINK_ENABLE_2PC = ConfigOptions
+ .key("sink.enable-2pc")
+ .booleanType()
+ .defaultValue(true)
+ .withDescription("enable 2PC while loading");
+
private static final ConfigOption<Integer> SINK_CHECK_INTERVAL = ConfigOptions
.key("sink.check-interval")
.intType()
@@ -195,6 +201,7 @@
options.add(DORIS_EXEC_MEM_LIMIT);
options.add(SINK_CHECK_INTERVAL);
+ options.add(SINK_ENABLE_2PC);
options.add(SINK_MAX_RETRIES);
options.add(SINK_BUFFER_FLUSH_INTERVAL);
options.add(SINK_ENABLE_DELETE);
@@ -262,6 +269,9 @@
builder.setLabelPrefix(readableConfig.get(SINK_LABEL_PREFIX));
builder.setStreamLoadProp(streamLoadProp);
builder.setDeletable(readableConfig.get(SINK_ENABLE_DELETE));
+ if (!readableConfig.get(SINK_ENABLE_2PC)) {
+ builder.disable2PC();
+ }
return builder.build();
}
diff --git a/flink-doris-connector/src/test/java/org/apache/doris/flink/sink/writer/TestDorisStreamLoad.java b/flink-doris-connector/src/test/java/org/apache/doris/flink/sink/writer/TestDorisStreamLoad.java
index b401a5b..b4532ee 100644
--- a/flink-doris-connector/src/test/java/org/apache/doris/flink/sink/writer/TestDorisStreamLoad.java
+++ b/flink-doris-connector/src/test/java/org/apache/doris/flink/sink/writer/TestDorisStreamLoad.java
@@ -57,7 +57,7 @@
CloseableHttpResponse abortSuccessResponse = HttpTestUtil.getResponse(HttpTestUtil.ABORT_SUCCESS_RESPONSE, true);
CloseableHttpResponse preCommitResponse = HttpTestUtil.getResponse(HttpTestUtil.PRE_COMMIT_RESPONSE, true);
when(httpClient.execute(any())).thenReturn(existLabelResponse, abortSuccessResponse, preCommitResponse);
- DorisStreamLoad dorisStreamLoad = new DorisStreamLoad("", dorisOptions, executionOptions, "", httpClient);
+ DorisStreamLoad dorisStreamLoad = new DorisStreamLoad("", dorisOptions, executionOptions, new LabelGenerator("test001_0", true), httpClient);
dorisStreamLoad.abortPreCommit("test001_0", 1);
}
@@ -67,8 +67,8 @@
CloseableHttpResponse preCommitResponse = HttpTestUtil.getResponse(HttpTestUtil.PRE_COMMIT_RESPONSE, true);
when(httpClient.execute(any())).thenReturn(preCommitResponse);
byte[] writeBuffer = "test".getBytes(StandardCharsets.UTF_8);
- DorisStreamLoad dorisStreamLoad = new DorisStreamLoad("", dorisOptions, executionOptions, "", httpClient);
- dorisStreamLoad.startLoad(1);
+ DorisStreamLoad dorisStreamLoad = new DorisStreamLoad("", dorisOptions, executionOptions, new LabelGenerator("", true), httpClient);
+ dorisStreamLoad.startLoad("1");
dorisStreamLoad.writeRecord(writeBuffer);
dorisStreamLoad.stopLoad();
byte[] buff = new byte[4];
@@ -86,8 +86,8 @@
CloseableHttpResponse preCommitResponse = HttpTestUtil.getResponse(HttpTestUtil.PRE_COMMIT_RESPONSE, true);
when(httpClient.execute(any())).thenReturn(preCommitResponse);
byte[] writeBuffer = "test".getBytes(StandardCharsets.UTF_8);
- DorisStreamLoad dorisStreamLoad = new DorisStreamLoad("", dorisOptions, executionOptions, "", httpClient);
- dorisStreamLoad.startLoad(1);
+ DorisStreamLoad dorisStreamLoad = new DorisStreamLoad("", dorisOptions, executionOptions, new LabelGenerator("", true), httpClient);
+ dorisStreamLoad.startLoad("1");
dorisStreamLoad.writeRecord(writeBuffer);
dorisStreamLoad.writeRecord(writeBuffer);
dorisStreamLoad.stopLoad();
@@ -111,8 +111,8 @@
CloseableHttpResponse preCommitResponse = HttpTestUtil.getResponse(HttpTestUtil.PRE_COMMIT_RESPONSE, true);
when(httpClient.execute(any())).thenReturn(preCommitResponse);
- DorisStreamLoad dorisStreamLoad = new DorisStreamLoad("", dorisOptions, executionOptions, "", httpClient);
- dorisStreamLoad.startLoad(1);
+ DorisStreamLoad dorisStreamLoad = new DorisStreamLoad("", dorisOptions, executionOptions, new LabelGenerator("", true), httpClient);
+ dorisStreamLoad.startLoad("1");
dorisStreamLoad.writeRecord("{\"id\": 1}".getBytes(StandardCharsets.UTF_8));
dorisStreamLoad.writeRecord("{\"id\": 2}".getBytes(StandardCharsets.UTF_8));
dorisStreamLoad.stopLoad();
diff --git a/flink-doris-connector/src/test/java/org/apache/doris/flink/sink/writer/TestDorisWriter.java b/flink-doris-connector/src/test/java/org/apache/doris/flink/sink/writer/TestDorisWriter.java
index 094e8a4..3bf32b4 100644
--- a/flink-doris-connector/src/test/java/org/apache/doris/flink/sink/writer/TestDorisWriter.java
+++ b/flink-doris-connector/src/test/java/org/apache/doris/flink/sink/writer/TestDorisWriter.java
@@ -62,8 +62,8 @@
CloseableHttpResponse preCommitResponse = HttpTestUtil.getResponse(HttpTestUtil.PRE_COMMIT_RESPONSE, true);
when(httpClient.execute(any())).thenReturn(preCommitResponse);
- DorisStreamLoad dorisStreamLoad = new DorisStreamLoad("local:8040", dorisOptions, executionOptions, "", httpClient);
- dorisStreamLoad.startLoad(1);
+ DorisStreamLoad dorisStreamLoad = new DorisStreamLoad("local:8040", dorisOptions, executionOptions, new LabelGenerator("", true), httpClient);
+ dorisStreamLoad.startLoad("");
Sink.InitContext initContext = mock(Sink.InitContext.class);
when(initContext.getRestoredCheckpointId()).thenReturn(OptionalLong.of(1));
DorisWriter<String> dorisWriter = new DorisWriter<String>(initContext, Collections.emptyList(), new SimpleStringSerializer(), dorisOptions, readOptions, executionOptions);
@@ -83,7 +83,7 @@
CloseableHttpResponse preCommitResponse = HttpTestUtil.getResponse(HttpTestUtil.PRE_COMMIT_RESPONSE, true);
when(httpClient.execute(any())).thenReturn(preCommitResponse);
- DorisStreamLoad dorisStreamLoad = new DorisStreamLoad("local:8040", dorisOptions, executionOptions, "", httpClient);
+ DorisStreamLoad dorisStreamLoad = new DorisStreamLoad("local:8040", dorisOptions, executionOptions, new LabelGenerator("", true), httpClient);
Sink.InitContext initContext = mock(Sink.InitContext.class);
when(initContext.getRestoredCheckpointId()).thenReturn(OptionalLong.of(1));
DorisWriter<String> dorisWriter = new DorisWriter<String>(initContext, Collections.emptyList(), new SimpleStringSerializer(), dorisOptions, readOptions, executionOptions);