[optimization] add disable 2pc config (#34)

diff --git a/flink-doris-connector/src/main/java/org/apache/doris/flink/cfg/DorisExecutionOptions.java b/flink-doris-connector/src/main/java/org/apache/doris/flink/cfg/DorisExecutionOptions.java
index cc4b203..2daf5e1 100644
--- a/flink-doris-connector/src/main/java/org/apache/doris/flink/cfg/DorisExecutionOptions.java
+++ b/flink-doris-connector/src/main/java/org/apache/doris/flink/cfg/DorisExecutionOptions.java
@@ -45,13 +45,16 @@
 
     private final Boolean enableDelete;
 
+    private final Boolean enable2PC;
 
     public DorisExecutionOptions(int checkInterval,
                                  int maxRetries,
                                  int bufferSize,
                                  int bufferCount,
                                  String labelPrefix,
-                                 Properties streamLoadProp, Boolean enableDelete) {
+                                 Properties streamLoadProp,
+                                 Boolean enableDelete,
+                                 Boolean enable2PC) {
         Preconditions.checkArgument(maxRetries >= 0);
         this.checkInterval = checkInterval;
         this.maxRetries = maxRetries;
@@ -60,6 +63,7 @@
         this.labelPrefix = labelPrefix;
         this.streamLoadProp = streamLoadProp;
         this.enableDelete = enableDelete;
+        this.enable2PC = enable2PC;
     }
 
     public static Builder builder() {
@@ -105,6 +109,9 @@
         return enableDelete;
     }
 
+    public Boolean enabled2PC() {
+        return enable2PC;
+    }
     /**
      * Builder of {@link DorisExecutionOptions}.
      */
@@ -117,6 +124,8 @@
         private Properties streamLoadProp = new Properties();
         private boolean enableDelete = false;
 
+        private boolean enable2PC = true;
+
         public Builder setCheckInterval(Integer checkInterval) {
             this.checkInterval = checkInterval;
             return this;
@@ -152,8 +161,13 @@
             return this;
         }
 
+        public Builder disable2PC() {
+            this.enable2PC = false;
+            return this;
+        }
+
         public DorisExecutionOptions build() {
-            return new DorisExecutionOptions(checkInterval, maxRetries, bufferSize, bufferCount, labelPrefix, streamLoadProp, enableDelete);
+            return new DorisExecutionOptions(checkInterval, maxRetries, bufferSize, bufferCount, labelPrefix, streamLoadProp, enableDelete, enable2PC);
         }
     }
 
diff --git a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/DorisStreamLoad.java b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/DorisStreamLoad.java
index b468cc8..dfb9cb7 100644
--- a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/DorisStreamLoad.java
+++ b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/DorisStreamLoad.java
@@ -61,7 +61,7 @@
 public class DorisStreamLoad implements Serializable {
     private static final Logger LOG = LoggerFactory.getLogger(DorisStreamLoad.class);
     private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
-    private final String labelSuffix;
+    private final LabelGenerator labelGenerator;
     private final byte[] lineDelimiter;
     private static final String LOAD_URL_PATTERN = "http://%s/api/%s/%s/_stream_load";
     private static final String ABORT_URL_PATTERN = "http://%s/api/%s/_stream_load_2pc";
@@ -74,6 +74,7 @@
     private final String passwd;
     private final String db;
     private final String table;
+    private final boolean enable2PC;
     private final Properties streamLoadProp;
     private final RecordStream recordStream;
     private Future<CloseableHttpResponse> pendingLoadFuture;
@@ -84,7 +85,7 @@
     public DorisStreamLoad(String hostPort,
                            DorisOptions dorisOptions,
                            DorisExecutionOptions executionOptions,
-                           String labelSuffix,
+                           LabelGenerator labelGenerator,
                            CloseableHttpClient httpClient) {
         this.hostPort = hostPort;
         String[] tableInfo = dorisOptions.getTableIdentifier().split("\\.");
@@ -92,9 +93,10 @@
         this.table = tableInfo[1];
         this.user = dorisOptions.getUsername();
         this.passwd = dorisOptions.getPassword();
-        this.labelSuffix = labelSuffix;
+        this.labelGenerator = labelGenerator;
         this.loadUrlStr = String.format(LOAD_URL_PATTERN, hostPort, db, table);
         this.abortUrlStr = String.format(ABORT_URL_PATTERN, hostPort, db);
+        this.enable2PC = executionOptions.enabled2PC();
         this.streamLoadProp = executionOptions.getStreamLoadProp();
         this.httpClient = httpClient;
         this.executorService = new ThreadPoolExecutor(1, 1,
@@ -133,7 +135,7 @@
         LOG.info("abort for labelSuffix {}. start chkId {}.", labelSuffix, chkID);
         while (true) {
             try {
-                String label = labelSuffix + "_" + startChkID;
+                String label = labelGenerator.generateLabel(startChkID);
                 HttpPutBuilder builder = new HttpPutBuilder();
                 builder.setUrl(loadUrlStr)
                         .baseAuth(user, passwd)
@@ -218,12 +220,11 @@
 
     /**
      * start write data for new checkpoint.
-     * @param chkID
+     * @param label
      * @throws IOException
      */
-    public void startLoad(long chkID) throws IOException{
+    public void startLoad(String label) throws IOException{
         loadBatchFirstRecord = true;
-        String label = labelSuffix + "_" + chkID;
         HttpPutBuilder putBuilder = new HttpPutBuilder();
         recordStream.startInput();
         LOG.info("stream load started for {}", label);
@@ -232,10 +233,12 @@
             putBuilder.setUrl(loadUrlStr)
                     .baseAuth(user, passwd)
                     .addCommonHeader()
-                    .enable2PC()
                     .setLabel(label)
                     .setEntity(entity)
                     .addProperties(streamLoadProp);
+            if (enable2PC) {
+               putBuilder.enable2PC();
+            }
             pendingLoadFuture = executorService.submit(() -> {
                 LOG.info("start execute load");
                 return httpClient.execute(putBuilder.build());
diff --git a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/DorisWriter.java b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/DorisWriter.java
index 026f54e..86ed9ed 100644
--- a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/DorisWriter.java
+++ b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/DorisWriter.java
@@ -66,6 +66,7 @@
     private final DorisReadOptions dorisReadOptions;
     private final DorisExecutionOptions executionOptions;
     private final String labelPrefix;
+    private final LabelGenerator labelGenerator;
     private final int intervalTime;
     private final DorisWriterState dorisWriterState;
     private final DorisRecordSerializer<IN> serializer;
@@ -87,6 +88,7 @@
         LOG.info("labelPrefix " + executionOptions.getLabelPrefix());
         this.dorisWriterState = new DorisWriterState(executionOptions.getLabelPrefix());
         this.labelPrefix = executionOptions.getLabelPrefix() + "_" + initContext.getSubtaskId();
+        this.labelGenerator = new LabelGenerator(labelPrefix, executionOptions.enabled2PC());
         this.scheduledExecutorService = new ScheduledThreadPoolExecutor(1, new ExecutorThreadFactory("stream-load-check"));
         this.serializer = serializer;
         this.dorisOptions = dorisOptions;
@@ -98,20 +100,22 @@
 
     public void initializeLoad(List<DorisWriterState> state) throws IOException {
         try {
-            this.dorisStreamLoad  = new DorisStreamLoad(
+            this.dorisStreamLoad = new DorisStreamLoad(
                     RestService.getBackend(dorisOptions, dorisReadOptions, LOG),
                     dorisOptions,
                     executionOptions,
-                    labelPrefix, new HttpUtil().getHttpClient());
+                    labelGenerator, new HttpUtil().getHttpClient());
             // TODO: we need check and abort all pending transaction.
             //  Discard transactions that may cause the job to fail.
-            dorisStreamLoad.abortPreCommit(labelPrefix, lastCheckpointId + 1);
+            if(executionOptions.enabled2PC()) {
+                dorisStreamLoad.abortPreCommit(labelPrefix, lastCheckpointId + 1);
+            }
         } catch (Exception e) {
             throw new DorisRuntimeException(e);
         }
         // get main work thread.
         executorThread = Thread.currentThread();
-        dorisStreamLoad.startLoad(lastCheckpointId + 1);
+        dorisStreamLoad.startLoad(labelGenerator.generateLabel(lastCheckpointId + 1));
         // when uploading data in streaming mode, we need to regularly detect whether there are exceptions.
         scheduledExecutorService.scheduleWithFixedDelay(this::checkDone, 200, intervalTime, TimeUnit.MILLISECONDS);
     }
@@ -124,6 +128,7 @@
 
     @Override
     public List<DorisCommittable> prepareCommit(boolean flush) throws IOException {
+        // disable exception checker before stop load.
         loading = false;
         Preconditions.checkState(dorisStreamLoad != null);
         RespContent respContent = dorisStreamLoad.stopLoad();
@@ -131,7 +136,9 @@
             String errMsg = String.format("stream load error: %s, see more in %s", respContent.getMessage(), respContent.getErrorURL());
             throw new DorisRuntimeException(errMsg);
         }
-
+        if (!executionOptions.enabled2PC()) {
+            return Collections.emptyList();
+        }
         long txnId = respContent.getTxnId();
 
         return ImmutableList.of(new DorisCommittable(dorisStreamLoad.getHostPort(), dorisStreamLoad.getDb(), txnId));
@@ -140,7 +147,7 @@
     @Override
     public List<DorisWriterState> snapshotState(long checkpointId) throws IOException {
         Preconditions.checkState(dorisStreamLoad != null);
-        this.dorisStreamLoad.startLoad(checkpointId + 1);
+        this.dorisStreamLoad.startLoad(labelGenerator.generateLabel(checkpointId + 1));
         this.loading = true;
         return Collections.singletonList(dorisWriterState);
     }
@@ -173,7 +180,7 @@
 
     private void checkLoadException() {
         if (loadException != null) {
-            throw new RuntimeException("error while load exception.", loadException);
+            throw new RuntimeException("error while loading data.", loadException);
         }
     }
 
diff --git a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/LabelGenerator.java b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/LabelGenerator.java
new file mode 100644
index 0000000..436d709
--- /dev/null
+++ b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/LabelGenerator.java
@@ -0,0 +1,34 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+package org.apache.doris.flink.sink.writer;
+
+/**
+ * Generator label for stream load.
+ */
+public class LabelGenerator {
+    private String labelPrefix;
+    private boolean enable2PC;
+
+    public LabelGenerator(String labelPrefix, boolean enable2PC) {
+        this.labelPrefix = labelPrefix;
+        this.enable2PC = enable2PC;
+    }
+
+    public String generateLabel(long chkId) {
+        return enable2PC ? labelPrefix + "_" + chkId : labelPrefix + "_" + System.currentTimeMillis();
+    }
+}
diff --git a/flink-doris-connector/src/main/java/org/apache/doris/flink/table/DorisDynamicTableFactory.java b/flink-doris-connector/src/main/java/org/apache/doris/flink/table/DorisDynamicTableFactory.java
index 7d6455e..be00cff 100644
--- a/flink-doris-connector/src/main/java/org/apache/doris/flink/table/DorisDynamicTableFactory.java
+++ b/flink-doris-connector/src/main/java/org/apache/doris/flink/table/DorisDynamicTableFactory.java
@@ -118,6 +118,12 @@
             .defaultValue(DORIS_EXEC_MEM_LIMIT_DEFAULT)
             .withDescription("");
     // flink write config options
+    private static final ConfigOption<Boolean> SINK_ENABLE_2PC = ConfigOptions
+            .key("sink.enable-2pc")
+            .booleanType()
+            .defaultValue(true)
+            .withDescription("enable 2PC while loading");
+
     private static final ConfigOption<Integer> SINK_CHECK_INTERVAL = ConfigOptions
             .key("sink.check-interval")
             .intType()
@@ -195,6 +201,7 @@
         options.add(DORIS_EXEC_MEM_LIMIT);
 
         options.add(SINK_CHECK_INTERVAL);
+        options.add(SINK_ENABLE_2PC);
         options.add(SINK_MAX_RETRIES);
         options.add(SINK_BUFFER_FLUSH_INTERVAL);
         options.add(SINK_ENABLE_DELETE);
@@ -262,6 +269,9 @@
         builder.setLabelPrefix(readableConfig.get(SINK_LABEL_PREFIX));
         builder.setStreamLoadProp(streamLoadProp);
         builder.setDeletable(readableConfig.get(SINK_ENABLE_DELETE));
+        if (!readableConfig.get(SINK_ENABLE_2PC)) {
+            builder.disable2PC();
+        }
         return builder.build();
     }
 
diff --git a/flink-doris-connector/src/test/java/org/apache/doris/flink/sink/writer/TestDorisStreamLoad.java b/flink-doris-connector/src/test/java/org/apache/doris/flink/sink/writer/TestDorisStreamLoad.java
index b401a5b..b4532ee 100644
--- a/flink-doris-connector/src/test/java/org/apache/doris/flink/sink/writer/TestDorisStreamLoad.java
+++ b/flink-doris-connector/src/test/java/org/apache/doris/flink/sink/writer/TestDorisStreamLoad.java
@@ -57,7 +57,7 @@
         CloseableHttpResponse abortSuccessResponse = HttpTestUtil.getResponse(HttpTestUtil.ABORT_SUCCESS_RESPONSE, true);
         CloseableHttpResponse preCommitResponse = HttpTestUtil.getResponse(HttpTestUtil.PRE_COMMIT_RESPONSE, true);
         when(httpClient.execute(any())).thenReturn(existLabelResponse, abortSuccessResponse, preCommitResponse);
-        DorisStreamLoad dorisStreamLoad = new DorisStreamLoad("", dorisOptions, executionOptions, "", httpClient);
+        DorisStreamLoad dorisStreamLoad = new DorisStreamLoad("", dorisOptions, executionOptions, new LabelGenerator("test001_0", true), httpClient);
         dorisStreamLoad.abortPreCommit("test001_0", 1);
     }
 
@@ -67,8 +67,8 @@
         CloseableHttpResponse preCommitResponse = HttpTestUtil.getResponse(HttpTestUtil.PRE_COMMIT_RESPONSE, true);
         when(httpClient.execute(any())).thenReturn(preCommitResponse);
         byte[] writeBuffer = "test".getBytes(StandardCharsets.UTF_8);
-        DorisStreamLoad dorisStreamLoad = new DorisStreamLoad("", dorisOptions, executionOptions, "", httpClient);
-        dorisStreamLoad.startLoad(1);
+        DorisStreamLoad dorisStreamLoad = new DorisStreamLoad("", dorisOptions, executionOptions, new LabelGenerator("", true), httpClient);
+        dorisStreamLoad.startLoad("1");
         dorisStreamLoad.writeRecord(writeBuffer);
         dorisStreamLoad.stopLoad();
         byte[] buff = new byte[4];
@@ -86,8 +86,8 @@
         CloseableHttpResponse preCommitResponse = HttpTestUtil.getResponse(HttpTestUtil.PRE_COMMIT_RESPONSE, true);
         when(httpClient.execute(any())).thenReturn(preCommitResponse);
         byte[] writeBuffer = "test".getBytes(StandardCharsets.UTF_8);
-        DorisStreamLoad dorisStreamLoad = new DorisStreamLoad("", dorisOptions, executionOptions, "", httpClient);
-        dorisStreamLoad.startLoad(1);
+        DorisStreamLoad dorisStreamLoad = new DorisStreamLoad("", dorisOptions, executionOptions, new LabelGenerator("", true), httpClient);
+        dorisStreamLoad.startLoad("1");
         dorisStreamLoad.writeRecord(writeBuffer);
         dorisStreamLoad.writeRecord(writeBuffer);
         dorisStreamLoad.stopLoad();
@@ -111,8 +111,8 @@
         CloseableHttpResponse preCommitResponse = HttpTestUtil.getResponse(HttpTestUtil.PRE_COMMIT_RESPONSE, true);
         when(httpClient.execute(any())).thenReturn(preCommitResponse);
 
-        DorisStreamLoad dorisStreamLoad = new DorisStreamLoad("", dorisOptions, executionOptions, "", httpClient);
-        dorisStreamLoad.startLoad(1);
+        DorisStreamLoad dorisStreamLoad = new DorisStreamLoad("", dorisOptions, executionOptions, new LabelGenerator("", true), httpClient);
+        dorisStreamLoad.startLoad("1");
         dorisStreamLoad.writeRecord("{\"id\": 1}".getBytes(StandardCharsets.UTF_8));
         dorisStreamLoad.writeRecord("{\"id\": 2}".getBytes(StandardCharsets.UTF_8));
         dorisStreamLoad.stopLoad();
diff --git a/flink-doris-connector/src/test/java/org/apache/doris/flink/sink/writer/TestDorisWriter.java b/flink-doris-connector/src/test/java/org/apache/doris/flink/sink/writer/TestDorisWriter.java
index 094e8a4..3bf32b4 100644
--- a/flink-doris-connector/src/test/java/org/apache/doris/flink/sink/writer/TestDorisWriter.java
+++ b/flink-doris-connector/src/test/java/org/apache/doris/flink/sink/writer/TestDorisWriter.java
@@ -62,8 +62,8 @@
         CloseableHttpResponse preCommitResponse = HttpTestUtil.getResponse(HttpTestUtil.PRE_COMMIT_RESPONSE, true);
         when(httpClient.execute(any())).thenReturn(preCommitResponse);
 
-        DorisStreamLoad dorisStreamLoad = new DorisStreamLoad("local:8040", dorisOptions, executionOptions, "", httpClient);
-        dorisStreamLoad.startLoad(1);
+        DorisStreamLoad dorisStreamLoad = new DorisStreamLoad("local:8040", dorisOptions, executionOptions, new LabelGenerator("", true), httpClient);
+        dorisStreamLoad.startLoad("");
         Sink.InitContext initContext = mock(Sink.InitContext.class);
         when(initContext.getRestoredCheckpointId()).thenReturn(OptionalLong.of(1));
         DorisWriter<String> dorisWriter = new DorisWriter<String>(initContext, Collections.emptyList(), new SimpleStringSerializer(), dorisOptions, readOptions, executionOptions);
@@ -83,7 +83,7 @@
         CloseableHttpResponse preCommitResponse = HttpTestUtil.getResponse(HttpTestUtil.PRE_COMMIT_RESPONSE, true);
         when(httpClient.execute(any())).thenReturn(preCommitResponse);
 
-        DorisStreamLoad dorisStreamLoad = new DorisStreamLoad("local:8040", dorisOptions, executionOptions, "", httpClient);
+        DorisStreamLoad dorisStreamLoad = new DorisStreamLoad("local:8040", dorisOptions, executionOptions, new LabelGenerator("", true), httpClient);
         Sink.InitContext initContext = mock(Sink.InitContext.class);
         when(initContext.getRestoredCheckpointId()).thenReturn(OptionalLong.of(1));
         DorisWriter<String> dorisWriter = new DorisWriter<String>(initContext, Collections.emptyList(), new SimpleStringSerializer(), dorisOptions, readOptions, executionOptions);