[Feature] add stream load batch write (#168)

At present, the writing of Flink Connector can only rely on the writing of checkpoint.
Not very friendly to two scenarios:
1. Jobs that do not need to enable checkpoint
2. The ETL of Flink jobs is complicated, which makes the checkpoint very slow, but it does not want to affect the writing performance.
diff --git a/flink-doris-connector/src/main/java/org/apache/doris/flink/cfg/DorisExecutionOptions.java b/flink-doris-connector/src/main/java/org/apache/doris/flink/cfg/DorisExecutionOptions.java
index 03adf19..a3f6c50 100644
--- a/flink-doris-connector/src/main/java/org/apache/doris/flink/cfg/DorisExecutionOptions.java
+++ b/flink-doris-connector/src/main/java/org/apache/doris/flink/cfg/DorisExecutionOptions.java
@@ -32,21 +32,30 @@
     public static final int DEFAULT_MAX_RETRY_TIMES = 1;
     private static final int DEFAULT_BUFFER_SIZE = 1024 * 1024;
     private static final int DEFAULT_BUFFER_COUNT = 3;
+    //batch flush
+    private static final int DEFAULT_FLUSH_QUEUE_SIZE = 2;
+    private static final int DEFAULT_BUFFER_FLUSH_MAX_ROWS = 50000;
+    private static final int DEFAULT_BUFFER_FLUSH_MAX_BYTES = 10 * 1024 * 1024;
+    private static final long DEFAULT_BUFFER_FLUSH_INTERVAL_MS = 10 * 1000;
     private final int checkInterval;
     private final int maxRetries;
     private final int bufferSize;
     private final int bufferCount;
     private final String labelPrefix;
-
     /**
      * Properties for the StreamLoad.
      */
     private final Properties streamLoadProp;
-
     private final Boolean enableDelete;
-
     private final Boolean enable2PC;
 
+    //batch mode param
+    private int flushQueueSize;
+    private int bufferFlushMaxRows;
+    private int bufferFlushMaxBytes;
+    private long bufferFlushIntervalMs;
+    private boolean enableBatchMode;
+
     public DorisExecutionOptions(int checkInterval,
                                  int maxRetries,
                                  int bufferSize,
@@ -54,7 +63,12 @@
                                  String labelPrefix,
                                  Properties streamLoadProp,
                                  Boolean enableDelete,
-                                 Boolean enable2PC) {
+                                 Boolean enable2PC,
+                                 boolean enableBatchMode,
+                                 int flushQueueSize,
+                                 int bufferFlushMaxRows,
+                                 int bufferFlushMaxBytes,
+                                 long bufferFlushIntervalMs) {
         Preconditions.checkArgument(maxRetries >= 0);
         this.checkInterval = checkInterval;
         this.maxRetries = maxRetries;
@@ -64,6 +78,12 @@
         this.streamLoadProp = streamLoadProp;
         this.enableDelete = enableDelete;
         this.enable2PC = enable2PC;
+
+        this.enableBatchMode = enableBatchMode;
+        this.flushQueueSize = flushQueueSize;
+        this.bufferFlushMaxRows = bufferFlushMaxRows;
+        this.bufferFlushMaxBytes = bufferFlushMaxBytes;
+        this.bufferFlushIntervalMs = bufferFlushIntervalMs;
     }
 
     public static Builder builder() {
@@ -119,6 +139,27 @@
     public Boolean enabled2PC() {
         return enable2PC;
     }
+
+    public int getFlushQueueSize() {
+        return flushQueueSize;
+    }
+
+    public int getBufferFlushMaxRows() {
+        return bufferFlushMaxRows;
+    }
+
+    public int getBufferFlushMaxBytes() {
+        return bufferFlushMaxBytes;
+    }
+
+    public long getBufferFlushIntervalMs() {
+        return bufferFlushIntervalMs;
+    }
+
+    public boolean enableBatchMode() {
+        return enableBatchMode;
+    }
+
     /**
      * Builder of {@link DorisExecutionOptions}.
      */
@@ -130,9 +171,15 @@
         private String labelPrefix = "";
         private Properties streamLoadProp = new Properties();
         private boolean enableDelete = true;
-
         private boolean enable2PC = true;
 
+        private int flushQueueSize = DEFAULT_FLUSH_QUEUE_SIZE;
+        private int bufferFlushMaxRows = DEFAULT_BUFFER_FLUSH_MAX_ROWS;
+        private int bufferFlushMaxBytes = DEFAULT_BUFFER_FLUSH_MAX_BYTES;
+        private long bufferFlushIntervalMs = DEFAULT_BUFFER_FLUSH_INTERVAL_MS;
+        private boolean enableBatchMode = false;
+
+
         public Builder setCheckInterval(Integer checkInterval) {
             this.checkInterval = checkInterval;
             return this;
@@ -173,10 +220,35 @@
             return this;
         }
 
+        public Builder enableBatchMode() {
+            this.enableBatchMode = true;
+            return this;
+        }
+
+        public Builder setFlushQueueSize(int flushQueueSize) {
+            this.flushQueueSize = flushQueueSize;
+            return this;
+        }
+
+        public Builder setBufferFlushIntervalMs(long bufferFlushIntervalMs) {
+            Preconditions.checkState(bufferFlushIntervalMs >= 1000, "bufferFlushIntervalMs must be greater than or equal to 1 second");
+            this.bufferFlushIntervalMs = bufferFlushIntervalMs;
+            return this;
+        }
+
+        public Builder setBufferFlushMaxRows(int bufferFlushMaxRows) {
+            this.bufferFlushMaxRows = bufferFlushMaxRows;
+            return this;
+        }
+
+        public Builder setBufferFlushMaxBytes(int bufferFlushMaxBytes) {
+            this.bufferFlushMaxBytes = bufferFlushMaxBytes;
+            return this;
+        }
+
         public DorisExecutionOptions build() {
-            return new DorisExecutionOptions(checkInterval, maxRetries, bufferSize, bufferCount, labelPrefix, streamLoadProp, enableDelete, enable2PC);
+            return new DorisExecutionOptions(checkInterval, maxRetries, bufferSize, bufferCount, labelPrefix,
+                    streamLoadProp, enableDelete, enable2PC, enableBatchMode, flushQueueSize, bufferFlushMaxRows, bufferFlushMaxBytes, bufferFlushIntervalMs);
         }
     }
-
-
 }
diff --git a/flink-doris-connector/src/main/java/org/apache/doris/flink/exception/DorisBatchLoadException.java b/flink-doris-connector/src/main/java/org/apache/doris/flink/exception/DorisBatchLoadException.java
new file mode 100644
index 0000000..265b3d2
--- /dev/null
+++ b/flink-doris-connector/src/main/java/org/apache/doris/flink/exception/DorisBatchLoadException.java
@@ -0,0 +1,45 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.flink.exception;
+
+/**
+ * Doris batch load run exception.
+ */
+public class DorisBatchLoadException extends RuntimeException {
+    public DorisBatchLoadException() {
+        super();
+    }
+
+    public DorisBatchLoadException(String message) {
+        super(message);
+    }
+
+    public DorisBatchLoadException(String message, Throwable cause) {
+        super(message, cause);
+    }
+
+    public DorisBatchLoadException(Throwable cause) {
+        super(cause);
+    }
+
+    protected DorisBatchLoadException(String message, Throwable cause,
+                                      boolean enableSuppression,
+                                      boolean writableStackTrace) {
+        super(message, cause, enableSuppression, writableStackTrace);
+    }
+}
diff --git a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/BackendUtil.java b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/BackendUtil.java
new file mode 100644
index 0000000..701cad6
--- /dev/null
+++ b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/BackendUtil.java
@@ -0,0 +1,67 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.flink.sink;
+
+import org.apache.doris.flink.exception.DorisRuntimeException;
+import org.apache.doris.flink.rest.models.BackendV2;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.net.HttpURLConnection;
+import java.net.URL;
+import java.util.List;
+
+public class BackendUtil {
+    private static final Logger LOG = LoggerFactory.getLogger(BackendUtil.class);
+    private final List<BackendV2.BackendRowV2> backends;
+    private long pos;
+
+    public BackendUtil(List<BackendV2.BackendRowV2> backends) {
+        this.backends = backends;
+        this.pos = 0;
+    }
+
+    public String getAvailableBackend() {
+        long tmp = pos + backends.size();
+        while (pos < tmp) {
+            BackendV2.BackendRowV2 backend = backends.get((int) (pos % backends.size()));
+            String res = backend.toBackendString();
+            if(tryHttpConnection(res)){
+                pos++;
+                return res;
+            }
+        }
+        throw new DorisRuntimeException("no available backend.");
+    }
+
+    public boolean tryHttpConnection(String backend) {
+        try {
+            backend = "http://" + backend;
+            URL url = new URL(backend);
+            HttpURLConnection co =  (HttpURLConnection) url.openConnection();
+            co.setConnectTimeout(60000);
+            co.connect();
+            co.disconnect();
+            return true;
+        } catch (Exception ex) {
+            LOG.warn("Failed to connect to backend:{}", backend, ex);
+            pos++;
+            return false;
+        }
+    }
+}
diff --git a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/batch/BatchRecordBuffer.java b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/batch/BatchRecordBuffer.java
new file mode 100644
index 0000000..99876bb
--- /dev/null
+++ b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/batch/BatchRecordBuffer.java
@@ -0,0 +1,142 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.flink.sink.batch;
+
+import org.apache.doris.flink.sink.writer.RecordBuffer;
+import org.apache.flink.annotation.VisibleForTesting;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.nio.ByteBuffer;
+
+
+/**
+ * buffer to queue
+ */
+public class BatchRecordBuffer {
+    private static final Logger LOG = LoggerFactory.getLogger(BatchRecordBuffer.class);
+    public static final String LINE_SEPARATOR = "\n";
+    private String labelName;
+    private ByteBuffer buffer;
+    private byte[] lineDelimiter;
+    private int numOfRecords = 0;
+    private int bufferSizeBytes = 0;
+    private boolean loadBatchFirstRecord = true;
+
+    public BatchRecordBuffer(){}
+
+    public BatchRecordBuffer(byte[] lineDelimiter, int bufferSize) {
+        super();
+        this.lineDelimiter = lineDelimiter;
+        this.buffer = ByteBuffer.allocate(bufferSize);
+    }
+
+    public void insert(byte[] record) {
+        ensureCapacity(record.length);
+        if(loadBatchFirstRecord){
+            loadBatchFirstRecord = false;
+        } else {
+            this.buffer.put(this.lineDelimiter);
+        }
+        this.buffer.put(record);
+        setNumOfRecords(getNumOfRecords() + 1);
+        setBufferSizeBytes(getBufferSizeBytes() + record.length);
+    }
+
+    @VisibleForTesting
+    public void ensureCapacity(int length) {
+        if(buffer.remaining() >= length){
+            return;
+        }
+        int currentRemain = buffer.remaining();
+        int currentCapacity = buffer.capacity();
+
+        int target = buffer.remaining() + length;
+        int capacity = buffer.capacity();
+        //grow 512kb each time
+        target = Math.max(target, Math.min(capacity + 512 * 1024, capacity * 2));
+        ByteBuffer tmp = ByteBuffer.allocate(target);
+        buffer.flip();
+        tmp.put(buffer);
+        buffer.clear();
+        buffer = tmp;
+        LOG.info("record length {},buffer remain {} ,grow capacity {} to {}", length, currentRemain, currentCapacity, target);
+    }
+
+    public String getLabelName() {
+        return labelName;
+    }
+
+    public void setLabelName(String labelName) {
+        this.labelName = labelName;
+    }
+
+    /**
+     * @return true if buffer is empty
+     */
+    public boolean isEmpty() {
+        return numOfRecords == 0;
+    }
+
+    public ByteBuffer getData() {
+        //change mode
+        buffer.flip();
+        LOG.debug("flush buffer: {} records, {} bytes",getNumOfRecords(),getBufferSizeBytes());
+        return buffer;
+    }
+
+    public void clear(){
+        this.buffer.clear();
+        this.numOfRecords = 0;
+        this.bufferSizeBytes = 0;
+        this.labelName = null;
+        this.loadBatchFirstRecord = true;
+    }
+
+    public ByteBuffer getBuffer(){
+        return buffer;
+    }
+    /**
+     * @return Number of records in this buffer
+     */
+    public int getNumOfRecords() {
+        return numOfRecords;
+    }
+
+    /**
+     * @return Buffer size in bytes
+     */
+    public int getBufferSizeBytes() {
+        return bufferSizeBytes;
+    }
+
+    /**
+     * @param numOfRecords Updates number of records (Usually by 1)
+     */
+    public void setNumOfRecords(int numOfRecords) {
+        this.numOfRecords = numOfRecords;
+    }
+
+    /**
+     * @param bufferSizeBytes Updates sum of size of records present in this buffer (Bytes)
+     */
+    public void setBufferSizeBytes(int bufferSizeBytes) {
+        this.bufferSizeBytes = bufferSizeBytes;
+    }
+
+}
diff --git a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/batch/DorisBatchSink.java b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/batch/DorisBatchSink.java
new file mode 100644
index 0000000..2c578d4
--- /dev/null
+++ b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/batch/DorisBatchSink.java
@@ -0,0 +1,97 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.flink.sink.batch;
+
+import org.apache.doris.flink.cfg.DorisExecutionOptions;
+import org.apache.doris.flink.cfg.DorisOptions;
+import org.apache.doris.flink.cfg.DorisReadOptions;
+import org.apache.doris.flink.sink.writer.DorisRecordSerializer;
+import org.apache.flink.api.connector.sink2.Sink;
+import org.apache.flink.api.connector.sink2.SinkWriter;
+import org.apache.flink.util.Preconditions;
+
+import java.io.IOException;
+
+public class DorisBatchSink<IN> implements Sink<IN> {
+    private final DorisOptions dorisOptions;
+    private final DorisReadOptions dorisReadOptions;
+    private final DorisExecutionOptions dorisExecutionOptions;
+    private final DorisRecordSerializer<IN> serializer;
+
+    public DorisBatchSink(DorisOptions dorisOptions,
+                          DorisReadOptions dorisReadOptions,
+                          DorisExecutionOptions dorisExecutionOptions,
+                          DorisRecordSerializer<IN> serializer) {
+        this.dorisOptions = dorisOptions;
+        this.dorisReadOptions = dorisReadOptions;
+        this.dorisExecutionOptions = dorisExecutionOptions;
+        this.serializer = serializer;
+    }
+
+    @Override
+    public SinkWriter<IN> createWriter(InitContext initContext) throws IOException {
+        DorisBatchWriter<IN> dorisBatchWriter = new DorisBatchWriter<IN>(initContext, serializer, dorisOptions, dorisReadOptions, dorisExecutionOptions);
+        dorisBatchWriter.initializeLoad();
+        return dorisBatchWriter;
+    }
+
+    public static <IN> DorisBatchSink.Builder<IN> builder() {
+        return new DorisBatchSink.Builder<>();
+    }
+
+    /**
+     * build for DorisBatchSink.
+     * @param <IN> record type.
+     */
+    public static class Builder<IN> {
+        private DorisOptions dorisOptions;
+        private DorisReadOptions dorisReadOptions;
+        private DorisExecutionOptions dorisExecutionOptions;
+        private DorisRecordSerializer<IN> serializer;
+
+        public DorisBatchSink.Builder<IN> setDorisOptions(DorisOptions dorisOptions) {
+            this.dorisOptions = dorisOptions;
+            return this;
+        }
+
+        public DorisBatchSink.Builder<IN> setDorisReadOptions(DorisReadOptions dorisReadOptions) {
+            this.dorisReadOptions = dorisReadOptions;
+            return this;
+        }
+
+        public DorisBatchSink.Builder<IN> setDorisExecutionOptions(DorisExecutionOptions dorisExecutionOptions) {
+            this.dorisExecutionOptions = dorisExecutionOptions;
+            return this;
+        }
+
+        public DorisBatchSink.Builder<IN> setSerializer(DorisRecordSerializer<IN> serializer) {
+            this.serializer = serializer;
+            return this;
+        }
+
+        public DorisBatchSink<IN> build() {
+            Preconditions.checkNotNull(dorisOptions);
+            Preconditions.checkNotNull(dorisExecutionOptions);
+            Preconditions.checkNotNull(serializer);
+            if(dorisReadOptions == null) {
+                dorisReadOptions = DorisReadOptions.builder().build();
+            }
+            return new DorisBatchSink<>(dorisOptions, dorisReadOptions, dorisExecutionOptions, serializer);
+        }
+    }
+}
diff --git a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/batch/DorisBatchStreamLoad.java b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/batch/DorisBatchStreamLoad.java
new file mode 100644
index 0000000..8f2dcc1
--- /dev/null
+++ b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/batch/DorisBatchStreamLoad.java
@@ -0,0 +1,292 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.flink.sink.batch;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import org.apache.doris.flink.cfg.DorisExecutionOptions;
+import org.apache.doris.flink.cfg.DorisOptions;
+import org.apache.doris.flink.cfg.DorisReadOptions;
+import org.apache.doris.flink.exception.DorisBatchLoadException;
+import org.apache.doris.flink.rest.RestService;
+import org.apache.doris.flink.rest.models.RespContent;
+import org.apache.doris.flink.sink.BackendUtil;
+import org.apache.doris.flink.sink.EscapeHandler;
+import org.apache.doris.flink.sink.HttpPutBuilder;
+import org.apache.doris.flink.sink.HttpUtil;
+import org.apache.doris.flink.sink.writer.LabelGenerator;
+import org.apache.http.client.methods.CloseableHttpResponse;
+import org.apache.http.entity.ByteArrayEntity;
+import org.apache.http.impl.client.CloseableHttpClient;
+import org.apache.http.util.EntityUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.io.Serializable;
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Properties;
+import java.util.concurrent.ArrayBlockingQueue;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.LinkedBlockingDeque;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.ThreadFactory;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicReference;
+
+import static org.apache.doris.flink.sink.LoadStatus.PUBLISH_TIMEOUT;
+import static org.apache.doris.flink.sink.LoadStatus.SUCCESS;
+import static org.apache.doris.flink.sink.writer.LoadConstants.LINE_DELIMITER_DEFAULT;
+import static org.apache.doris.flink.sink.writer.LoadConstants.LINE_DELIMITER_KEY;
+
+/**
+ * async stream load
+ **/
+public class DorisBatchStreamLoad implements Serializable {
+    private static final long serialVersionUID = 1L;
+    private static final Logger LOG = LoggerFactory.getLogger(DorisBatchStreamLoad.class);
+    private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
+    private static final List<String> DORIS_SUCCESS_STATUS = new ArrayList<>(Arrays.asList(SUCCESS, PUBLISH_TIMEOUT));
+    private final LabelGenerator labelGenerator;
+    private final byte[] lineDelimiter;
+    private static final String LOAD_URL_PATTERN = "http://%s/api/%s/%s/_stream_load";
+    private String loadUrl;
+    private String hostPort;
+    private final String username;
+    private final String password;
+    private final String db;
+    private final String table;
+    private final Properties loadProps;
+    private BatchRecordBuffer buffer;
+    private DorisExecutionOptions executionOptions;
+    private ExecutorService loadExecutorService;
+    private LoadAsyncExecutor loadAsyncExecutor;
+    private BlockingQueue<BatchRecordBuffer> writeQueue;
+    private BlockingQueue<BatchRecordBuffer> readQueue;
+    private final AtomicBoolean started;
+    private AtomicReference<Throwable> exception = new AtomicReference<>(null);
+    private CloseableHttpClient httpClient = new HttpUtil().getHttpClient();
+    private BackendUtil backendUtil;
+
+    public DorisBatchStreamLoad(DorisOptions dorisOptions,
+                                DorisReadOptions dorisReadOptions,
+                                DorisExecutionOptions executionOptions,
+                                LabelGenerator labelGenerator) {
+        this.backendUtil = new BackendUtil(RestService.getBackendsV2(dorisOptions, dorisReadOptions, LOG));
+        this.hostPort = backendUtil.getAvailableBackend();
+        String[] tableInfo = dorisOptions.getTableIdentifier().split("\\.");
+        this.db = tableInfo[0];
+        this.table = tableInfo[1];
+        this.username = dorisOptions.getUsername();
+        this.password = dorisOptions.getPassword();
+        this.loadUrl = String.format(LOAD_URL_PATTERN, hostPort, db, table);
+        this.loadProps = executionOptions.getStreamLoadProp();
+        this.labelGenerator = labelGenerator;
+        this.lineDelimiter = EscapeHandler.escapeString(loadProps.getProperty(LINE_DELIMITER_KEY, LINE_DELIMITER_DEFAULT)).getBytes();
+        this.executionOptions = executionOptions;
+        //init queue
+        this.writeQueue = new ArrayBlockingQueue<>(executionOptions.getFlushQueueSize());
+        LOG.info("init RecordBuffer capacity {}, count {}", executionOptions.getBufferFlushMaxBytes(), executionOptions.getFlushQueueSize());
+        for (int index = 0; index < executionOptions.getFlushQueueSize(); index++) {
+            this.writeQueue.add(new BatchRecordBuffer(this.lineDelimiter, executionOptions.getBufferFlushMaxBytes()));
+        }
+        readQueue = new LinkedBlockingDeque<>();
+
+        this.loadAsyncExecutor= new LoadAsyncExecutor();
+        this.loadExecutorService = new ThreadPoolExecutor(1, 1, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<>(1), new DefaultThreadFactory("streamload-executor"), new ThreadPoolExecutor.AbortPolicy());
+        this.started = new AtomicBoolean(true);
+        this.loadExecutorService.execute(loadAsyncExecutor);
+    }
+
+    /**
+     * write record into cache.
+     * @param record
+     * @throws IOException
+     */
+    public void writeRecord(byte[] record) throws InterruptedException {
+        checkFlushException();
+        if(buffer == null){
+            buffer = takeRecordFromWriteQueue();
+        }
+        buffer.insert(record);
+        //When it exceeds 80% of the byteSize,to flush, to avoid triggering bytebuffer expansion
+        if (buffer.getBufferSizeBytes() >= executionOptions.getBufferFlushMaxBytes() * 0.8
+                || (executionOptions.getBufferFlushMaxRows() != 0 && buffer.getNumOfRecords() >= executionOptions.getBufferFlushMaxRows())) {
+            flush(false);
+        }
+    }
+
+    public void flush(boolean waitUtilDone) throws InterruptedException {
+        checkFlushException();
+        if (buffer == null) {
+            LOG.debug("buffer is empty, skip flush.");
+            return;
+        }
+        buffer.setLabelName(labelGenerator.generateBatchLabel());
+        BatchRecordBuffer tmpBuff = buffer;
+        readQueue.put(tmpBuff);
+        if(waitUtilDone){
+            waitAsyncLoadFinish();
+        }
+        this.buffer = null;
+    }
+
+    private void putRecordToWriteQueue(BatchRecordBuffer buffer){
+        try {
+            writeQueue.put(buffer);
+        } catch (InterruptedException e) {
+            throw new RuntimeException("Failed to recycle a buffer to queue");
+        }
+    }
+
+    private BatchRecordBuffer takeRecordFromWriteQueue(){
+        checkFlushException();
+        try {
+            return writeQueue.take();
+        } catch (InterruptedException e) {
+            throw new RuntimeException("Failed to take a buffer from queue");
+        }
+    }
+
+    private void checkFlushException() {
+        if (exception.get() != null) {
+            throw new DorisBatchLoadException(exception.get());
+        }
+    }
+
+    private void waitAsyncLoadFinish() throws InterruptedException {
+        for(int i = 0; i < executionOptions.getFlushQueueSize() + 1 ; i++){
+            BatchRecordBuffer empty = takeRecordFromWriteQueue();
+            readQueue.put(empty);
+        }
+    }
+
+    public void close(){
+        //close async executor
+        this.loadExecutorService.shutdown();
+        this.started.set(false);
+
+        //clear buffer
+        this.writeQueue.clear();
+        this.readQueue.clear();
+    }
+
+    class LoadAsyncExecutor implements Runnable {
+        @Override
+        public void run() {
+            LOG.info("LoadAsyncExecutor start");
+            while (started.get()) {
+                BatchRecordBuffer buffer = null;
+                try {
+                    buffer = readQueue.poll(2000L, TimeUnit.MILLISECONDS);
+                    if(buffer == null){
+                        continue;
+                    }
+                    if (buffer.getLabelName() != null) {
+                        load(buffer.getLabelName(), buffer);
+                    }
+                } catch (Exception e) {
+                    LOG.error("worker running error", e);
+                    exception.set(e);
+                    break;
+                } finally {
+                    //Recycle buffer to avoid writer thread blocking
+                    if(buffer != null){
+                        buffer.clear();
+                        putRecordToWriteQueue(buffer);
+                    }
+                }
+            }
+            LOG.info("LoadAsyncExecutor stop");
+        }
+
+        /**
+         * execute stream load
+         */
+        public void load(String label, BatchRecordBuffer buffer) throws IOException{
+            refreshLoadUrl();
+            ByteBuffer data = buffer.getData();
+            ByteArrayEntity entity = new ByteArrayEntity(data.array(), data.arrayOffset(), data.limit());
+            HttpPutBuilder putBuilder = new HttpPutBuilder();
+            putBuilder.setUrl(loadUrl)
+                    .baseAuth(username, password)
+                    .setLabel(label)
+                    .addCommonHeader()
+                    .setEntity(entity)
+                    .addHiddenColumns(executionOptions.getDeletable())
+                    .addProperties(executionOptions.getStreamLoadProp());
+
+            int retry = 0;
+            while (retry <= executionOptions.getMaxRetries()) {
+                LOG.info("stream load started for {} on host {}", label, hostPort);
+                try (CloseableHttpResponse response = httpClient.execute(putBuilder.build())) {
+                    int statusCode = response.getStatusLine().getStatusCode();
+                    if (statusCode == 200 && response.getEntity() != null) {
+                        String loadResult = EntityUtils.toString(response.getEntity());
+                        LOG.info("load Result {}", loadResult);
+                        RespContent respContent = OBJECT_MAPPER.readValue(loadResult, RespContent.class);
+                        if (!DORIS_SUCCESS_STATUS.contains(respContent.getStatus())) {
+                            String errMsg = String.format("stream load error: %s, see more in %s", respContent.getMessage(), respContent.getErrorURL());
+                            throw new DorisBatchLoadException(errMsg);
+                        }else{
+                            return;
+                        }
+                    }
+                    LOG.error("stream load failed with {}, reason {}, to retry", hostPort, response.getStatusLine().toString());
+                }catch (Exception ex){
+                    if (retry == executionOptions.getMaxRetries()) {
+                        throw new DorisBatchLoadException("stream load error: ", ex);
+                    }
+                    LOG.error("stream load error with {}, to retry, cause by", hostPort, ex);
+
+                }
+                retry++;
+                // get available backend retry
+                refreshLoadUrl();
+                putBuilder.setUrl(loadUrl);
+            }
+        }
+
+        private void refreshLoadUrl(){
+            hostPort = backendUtil.getAvailableBackend();
+            loadUrl = String.format(LOAD_URL_PATTERN, hostPort, db, table);
+        }
+    }
+
+    static class DefaultThreadFactory implements ThreadFactory {
+        private static final AtomicInteger poolNumber = new AtomicInteger(1);
+        private final AtomicInteger threadNumber = new AtomicInteger(1);
+        private final String namePrefix;
+
+        DefaultThreadFactory(String name) {
+            namePrefix = "pool-" + poolNumber.getAndIncrement() + "-" + name + "-";
+        }
+
+        public Thread newThread(Runnable r) {
+            Thread t = new Thread(r, namePrefix + threadNumber.getAndIncrement());
+            t.setDaemon(false);
+            return t;
+        }
+    }
+}
diff --git a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/batch/DorisBatchWriter.java b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/batch/DorisBatchWriter.java
new file mode 100644
index 0000000..d4621c7
--- /dev/null
+++ b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/batch/DorisBatchWriter.java
@@ -0,0 +1,112 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.flink.sink.batch;
+
+import org.apache.doris.flink.cfg.DorisExecutionOptions;
+import org.apache.doris.flink.cfg.DorisOptions;
+import org.apache.doris.flink.cfg.DorisReadOptions;
+import org.apache.doris.flink.sink.writer.DorisRecordSerializer;
+import org.apache.doris.flink.sink.writer.LabelGenerator;
+import org.apache.flink.api.connector.sink2.Sink;
+import org.apache.flink.api.connector.sink2.SinkWriter;
+import org.apache.flink.util.concurrent.ExecutorThreadFactory;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.Objects;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ScheduledThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
+
+public class DorisBatchWriter<IN> implements SinkWriter<IN> {
+    private static final Logger LOG = LoggerFactory.getLogger(DorisBatchWriter.class);
+    private DorisBatchStreamLoad batchStreamLoad;
+    private final DorisOptions dorisOptions;
+    private final DorisReadOptions dorisReadOptions;
+    private final DorisExecutionOptions executionOptions;
+    private final String labelPrefix;
+    private final LabelGenerator labelGenerator;
+    private final long flushIntervalMs;
+    private final DorisRecordSerializer<IN> serializer;
+    private final transient ScheduledExecutorService scheduledExecutorService;
+    private transient volatile Exception flushException = null;
+
+    public DorisBatchWriter(Sink.InitContext initContext,
+                            DorisRecordSerializer<IN> serializer,
+                            DorisOptions dorisOptions,
+                            DorisReadOptions dorisReadOptions,
+                            DorisExecutionOptions executionOptions) {
+        LOG.info("labelPrefix " + executionOptions.getLabelPrefix());
+        this.labelPrefix = executionOptions.getLabelPrefix() + "_" + initContext.getSubtaskId();
+        this.labelGenerator = new LabelGenerator(labelPrefix, false);
+        this.scheduledExecutorService = new ScheduledThreadPoolExecutor(1, new ExecutorThreadFactory("stream-load-flush-interval"));
+        this.serializer = serializer;
+        this.dorisOptions = dorisOptions;
+        this.dorisReadOptions = dorisReadOptions;
+        this.executionOptions = executionOptions;
+        this.flushIntervalMs = executionOptions.getBufferFlushIntervalMs();
+    }
+
+    public void initializeLoad() throws IOException {
+        this.batchStreamLoad = new DorisBatchStreamLoad(dorisOptions, dorisReadOptions, executionOptions, labelGenerator);
+        // when uploading data in streaming mode, we need to regularly detect whether there are exceptions.
+        scheduledExecutorService.scheduleWithFixedDelay(this::intervalFlush, flushIntervalMs, flushIntervalMs, TimeUnit.MILLISECONDS);
+    }
+
+    private void intervalFlush() {
+        try {
+            LOG.info("interval flush triggered.");
+            batchStreamLoad.flush(false);
+        } catch (InterruptedException e) {
+            flushException = e;
+        }
+    }
+
+    @Override
+    public void write(IN in, Context context) throws IOException, InterruptedException {
+        checkFlushException();
+        byte[] serialize = serializer.serialize(in);
+        if(Objects.isNull(serialize)){
+            //ddl record
+            return;
+        }
+        batchStreamLoad.writeRecord(serialize);
+    }
+    @Override
+    public void flush(boolean flush) throws IOException, InterruptedException {
+        checkFlushException();
+        LOG.info("checkpoint flush triggered.");
+        batchStreamLoad.flush(true);
+    }
+
+    @Override
+    public void close() throws Exception {
+        LOG.info("DorisBatchWriter Close");
+        if (scheduledExecutorService != null) {
+            scheduledExecutorService.shutdownNow();
+        }
+        batchStreamLoad.close();
+    }
+
+    private void checkFlushException() {
+        if (flushException != null) {
+            throw new RuntimeException("Writing records to streamload failed.", flushException);
+        }
+    }
+}
diff --git a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/DorisWriter.java b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/DorisWriter.java
index 642e1d3..7890670 100644
--- a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/DorisWriter.java
+++ b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/DorisWriter.java
@@ -23,8 +23,8 @@
 import org.apache.doris.flink.exception.DorisRuntimeException;
 import org.apache.doris.flink.exception.StreamLoadException;
 import org.apache.doris.flink.rest.RestService;
-import org.apache.doris.flink.rest.models.BackendV2;
 import org.apache.doris.flink.rest.models.RespContent;
+import org.apache.doris.flink.sink.BackendUtil;
 import org.apache.doris.flink.sink.DorisCommittable;
 import org.apache.doris.flink.sink.HttpUtil;
 import org.apache.flink.annotation.VisibleForTesting;
@@ -38,8 +38,6 @@
 import org.slf4j.LoggerFactory;
 
 import java.io.IOException;
-import java.net.HttpURLConnection;
-import java.net.URL;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collections;
@@ -73,8 +71,7 @@
     private final transient ScheduledExecutorService scheduledExecutorService;
     private transient Thread executorThread;
     private transient volatile Exception loadException = null;
-    private List<BackendV2.BackendRowV2> backends;
-    private long pos;
+    private BackendUtil backendUtil;
     private String currentLabel;
 
     public DorisWriter(Sink.InitContext initContext,
@@ -99,16 +96,14 @@
         this.executionOptions = executionOptions;
         this.intervalTime = executionOptions.checkInterval();
         this.loading = false;
-        this.pos = 0;
     }
 
     public void initializeLoad(List<DorisWriterState> state) throws IOException {
         //cache backend
-        this.backends = RestService.getBackendsV2(dorisOptions, dorisReadOptions, LOG);
-        String backend = getAvailableBackend();
+        backendUtil = new BackendUtil(RestService.getBackendsV2(dorisOptions, dorisReadOptions, LOG));
         try {
             this.dorisStreamLoad = new DorisStreamLoad(
-                    backend,
+                    backendUtil.getAvailableBackend(),
                     dorisOptions,
                     executionOptions,
                     labelGenerator, new HttpUtil().getHttpClient());
@@ -168,7 +163,7 @@
     public List<DorisWriterState> snapshotState(long checkpointId) throws IOException {
         Preconditions.checkState(dorisStreamLoad != null);
         // dynamic refresh BE node
-        this.dorisStreamLoad.setHostPort(getAvailableBackend());
+        this.dorisStreamLoad.setHostPort(backendUtil.getAvailableBackend());
         this.currentLabel = labelGenerator.generateLabel(checkpointId + 1);
         return Collections.singletonList(dorisWriterState);
     }
@@ -221,11 +216,6 @@
         this.dorisStreamLoad = streamLoad;
     }
 
-    @VisibleForTesting
-    public void setBackends(List<BackendV2.BackendRowV2> backends) {
-        this.backends = backends;
-    }
-
     @Override
     public void close() throws Exception {
         if (scheduledExecutorService != null) {
@@ -235,34 +225,4 @@
             dorisStreamLoad.close();
         }
     }
-
-    @VisibleForTesting
-    public String getAvailableBackend() {
-        long tmp = pos + backends.size();
-        while (pos < tmp) {
-            BackendV2.BackendRowV2 backend = backends.get((int) (pos % backends.size()));
-            String res = backend.toBackendString();
-            if(tryHttpConnection(res)){
-                pos++;
-                return res;
-            }
-        }
-        throw new DorisRuntimeException("no available backend.");
-    }
-
-    public boolean tryHttpConnection(String backend) {
-        try {
-            backend = "http://" + backend;
-            URL url = new URL(backend);
-            HttpURLConnection co =  (HttpURLConnection) url.openConnection();
-            co.setConnectTimeout(60000);
-            co.connect();
-            co.disconnect();
-            return true;
-        } catch (Exception ex) {
-            LOG.warn("Failed to connect to backend:{}", backend, ex);
-            pos++;
-            return false;
-        }
-    }
 }
diff --git a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/LabelGenerator.java b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/LabelGenerator.java
index d31e777..35edae9 100644
--- a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/LabelGenerator.java
+++ b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/LabelGenerator.java
@@ -33,4 +33,8 @@
     public String generateLabel(long chkId) {
         return enable2PC ? labelPrefix + "_" + chkId : labelPrefix + "_" + UUID.randomUUID();
     }
+
+    public String generateBatchLabel() {
+        return labelPrefix + "_" + UUID.randomUUID();
+    }
 }
diff --git a/flink-doris-connector/src/main/java/org/apache/doris/flink/table/DorisConfigOptions.java b/flink-doris-connector/src/main/java/org/apache/doris/flink/table/DorisConfigOptions.java
index 3e129aa..91cc148 100644
--- a/flink-doris-connector/src/main/java/org/apache/doris/flink/table/DorisConfigOptions.java
+++ b/flink-doris-connector/src/main/java/org/apache/doris/flink/table/DorisConfigOptions.java
@@ -195,6 +195,38 @@
 
     public static final ConfigOption<Integer> SINK_PARALLELISM = FactoryUtil.SINK_PARALLELISM;
 
+
+    public static final ConfigOption<Boolean> SINK_ENABLE_BATCH_MODE = ConfigOptions
+            .key("sink.enable.batch-mode")
+            .booleanType()
+            .defaultValue(false)
+            .withDescription("Whether to enable batch write mode");
+
+    public static final ConfigOption<Integer> SINK_FLUSH_QUEUE_SIZE = ConfigOptions
+            .key("sink.flush.queue-size")
+            .intType()
+            .defaultValue(2)
+            .withDescription("Queue length for async stream load, default is 2");
+
+    public static final ConfigOption<Integer> SINK_BUFFER_FLUSH_MAX_ROWS = ConfigOptions
+            .key("sink.buffer-flush.max-rows")
+            .intType()
+            .defaultValue(50000)
+            .withDescription("The maximum number of flush items in each batch, the default is 5w");
+
+    public static final ConfigOption<Integer> SINK_BUFFER_FLUSH_MAX_BYTES = ConfigOptions
+            .key("sink.buffer-flush.max-bytes")
+            .intType()
+            .defaultValue(10 * 1024 * 1024)
+            .withDescription("The maximum number of bytes flushed in each batch, the default is 10MB");
+
+    public static final ConfigOption<Duration> SINK_BUFFER_FLUSH_INTERVAL = ConfigOptions
+            .key("sink.buffer-flush.interval")
+            .durationType()
+            .defaultValue(Duration.ofSeconds(10))
+            .withDescription("the flush interval mills, over this time, asynchronous threads will flush data. The " +
+                    "default value is 10s.");
+
     // Prefix for Doris StreamLoad specific properties.
     public static final String STREAM_LOAD_PROP_PREFIX = "sink.properties.";
 
diff --git a/flink-doris-connector/src/main/java/org/apache/doris/flink/table/DorisDynamicTableFactory.java b/flink-doris-connector/src/main/java/org/apache/doris/flink/table/DorisDynamicTableFactory.java
index 81c756c..e6cbb1d 100644
--- a/flink-doris-connector/src/main/java/org/apache/doris/flink/table/DorisDynamicTableFactory.java
+++ b/flink-doris-connector/src/main/java/org/apache/doris/flink/table/DorisDynamicTableFactory.java
@@ -59,10 +59,15 @@
 import static org.apache.doris.flink.table.DorisConfigOptions.LOOKUP_MAX_RETRIES;
 import static org.apache.doris.flink.table.DorisConfigOptions.PASSWORD;
 import static org.apache.doris.flink.table.DorisConfigOptions.SINK_BUFFER_COUNT;
+import static org.apache.doris.flink.table.DorisConfigOptions.SINK_BUFFER_FLUSH_INTERVAL;
+import static org.apache.doris.flink.table.DorisConfigOptions.SINK_BUFFER_FLUSH_MAX_BYTES;
+import static org.apache.doris.flink.table.DorisConfigOptions.SINK_BUFFER_FLUSH_MAX_ROWS;
 import static org.apache.doris.flink.table.DorisConfigOptions.SINK_BUFFER_SIZE;
 import static org.apache.doris.flink.table.DorisConfigOptions.SINK_CHECK_INTERVAL;
 import static org.apache.doris.flink.table.DorisConfigOptions.SINK_ENABLE_2PC;
+import static org.apache.doris.flink.table.DorisConfigOptions.SINK_ENABLE_BATCH_MODE;
 import static org.apache.doris.flink.table.DorisConfigOptions.SINK_ENABLE_DELETE;
+import static org.apache.doris.flink.table.DorisConfigOptions.SINK_FLUSH_QUEUE_SIZE;
 import static org.apache.doris.flink.table.DorisConfigOptions.SINK_LABEL_PREFIX;
 import static org.apache.doris.flink.table.DorisConfigOptions.SINK_MAX_RETRIES;
 import static org.apache.doris.flink.table.DorisConfigOptions.SINK_PARALLELISM;
@@ -130,6 +135,12 @@
         options.add(SINK_BUFFER_COUNT);
         options.add(SINK_PARALLELISM);
 
+        options.add(SINK_ENABLE_BATCH_MODE);
+        options.add(SINK_BUFFER_FLUSH_MAX_ROWS);
+        options.add(SINK_BUFFER_FLUSH_MAX_BYTES);
+        options.add(SINK_FLUSH_QUEUE_SIZE);
+        options.add(SINK_BUFFER_FLUSH_INTERVAL);
+
         options.add(SOURCE_USE_OLD_API);
         return options;
     }
@@ -195,6 +206,15 @@
         if (!readableConfig.get(SINK_ENABLE_2PC)) {
             builder.disable2PC();
         }
+
+        if(readableConfig.get(SINK_ENABLE_BATCH_MODE)) {
+            builder.enableBatchMode();
+        }
+
+        builder.setFlushQueueSize(readableConfig.get(SINK_FLUSH_QUEUE_SIZE));
+        builder.setBufferFlushMaxRows(readableConfig.get(SINK_BUFFER_FLUSH_MAX_ROWS));
+        builder.setBufferFlushMaxBytes(readableConfig.get(SINK_BUFFER_FLUSH_MAX_BYTES));
+        builder.setBufferFlushIntervalMs(readableConfig.get(SINK_BUFFER_FLUSH_INTERVAL).toMillis());
         return builder.build();
     }
 
diff --git a/flink-doris-connector/src/main/java/org/apache/doris/flink/table/DorisDynamicTableSink.java b/flink-doris-connector/src/main/java/org/apache/doris/flink/table/DorisDynamicTableSink.java
index ae4e137..be13fea 100644
--- a/flink-doris-connector/src/main/java/org/apache/doris/flink/table/DorisDynamicTableSink.java
+++ b/flink-doris-connector/src/main/java/org/apache/doris/flink/table/DorisDynamicTableSink.java
@@ -22,12 +22,14 @@
 import org.apache.doris.flink.cfg.DorisReadOptions;
 import org.apache.doris.flink.rest.RestService;
 import org.apache.doris.flink.sink.DorisSink;
+import org.apache.doris.flink.sink.batch.DorisBatchSink;
 import org.apache.doris.flink.sink.writer.RowDataSerializer;
 
 import org.apache.flink.table.api.TableSchema;
 import org.apache.flink.table.connector.ChangelogMode;
 import org.apache.flink.table.connector.sink.DynamicTableSink;
 import org.apache.flink.table.connector.sink.SinkProvider;
+import org.apache.flink.table.connector.sink.SinkV2Provider;
 import org.apache.flink.table.data.RowData;
 import org.apache.flink.types.RowKind;
 import org.apache.flink.util.Preconditions;
@@ -97,12 +99,22 @@
                 .setType(loadProperties.getProperty(FORMAT_KEY, CSV))
                 .enableDelete(deletable)
                 .setFieldDelimiter(loadProperties.getProperty(FIELD_DELIMITER_KEY, FIELD_DELIMITER_DEFAULT));
-        DorisSink.Builder<RowData> dorisSinkBuilder = DorisSink.builder();
-        dorisSinkBuilder.setDorisOptions(options)
-                .setDorisReadOptions(readOptions)
-                .setDorisExecutionOptions(executionOptions)
-                .setSerializer(serializerBuilder.build());
-        return SinkProvider.of(dorisSinkBuilder.build(), sinkParallelism);
+
+        if(!executionOptions.enableBatchMode()){
+            DorisSink.Builder<RowData> dorisSinkBuilder = DorisSink.builder();
+            dorisSinkBuilder.setDorisOptions(options)
+                    .setDorisReadOptions(readOptions)
+                    .setDorisExecutionOptions(executionOptions)
+                    .setSerializer(serializerBuilder.build());
+            return SinkProvider.of(dorisSinkBuilder.build(), sinkParallelism);
+        }else{
+            DorisBatchSink.Builder<RowData> dorisBatchSinkBuilder = DorisBatchSink.builder();
+            dorisBatchSinkBuilder.setDorisOptions(options)
+                    .setDorisReadOptions(readOptions)
+                    .setDorisExecutionOptions(executionOptions)
+                    .setSerializer(serializerBuilder.build());
+            return SinkV2Provider.of(dorisBatchSinkBuilder.build(), sinkParallelism);
+        }
     }
 
     @Override
diff --git a/flink-doris-connector/src/test/java/org/apache/doris/flink/DorisSinkBatchExample.java b/flink-doris-connector/src/test/java/org/apache/doris/flink/DorisSinkBatchExample.java
new file mode 100644
index 0000000..a6835c6
--- /dev/null
+++ b/flink-doris-connector/src/test/java/org/apache/doris/flink/DorisSinkBatchExample.java
@@ -0,0 +1,91 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+package org.apache.doris.flink;
+
+import org.apache.doris.flink.cfg.DorisExecutionOptions;
+import org.apache.doris.flink.cfg.DorisOptions;
+import org.apache.doris.flink.cfg.DorisReadOptions;
+import org.apache.doris.flink.sink.batch.DorisBatchSink;
+import org.apache.doris.flink.sink.writer.SimpleStringSerializer;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.api.functions.source.SourceFunction;
+
+import java.util.Properties;
+import java.util.UUID;
+
+
+public class DorisSinkBatchExample {
+    public static void main(String[] args) throws Exception{
+        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+        env.setParallelism(1);
+//        env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
+        env.enableCheckpointing(5000);
+//        env.getCheckpointConfig().enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
+//        env.setRestartStrategy(RestartStrategies.fixedDelayRestart(5, Time.milliseconds(30000)));
+        DorisBatchSink.Builder<String> builder = DorisBatchSink.builder();
+        final DorisReadOptions.Builder readOptionBuilder = DorisReadOptions.builder();
+        readOptionBuilder.setDeserializeArrowAsync(false)
+                .setDeserializeQueueSize(64)
+                .setExecMemLimit(2147483648L)
+                .setRequestQueryTimeoutS(3600)
+                .setRequestBatchSize(1000)
+                .setRequestConnectTimeoutMs(10000)
+                .setRequestReadTimeoutMs(10000)
+                .setRequestRetries(3)
+                .setRequestTabletSize(1024 * 1024);
+        Properties properties = new Properties();
+        properties.setProperty("column_separator", ",");
+        properties.setProperty("line_delimiter", "\n");
+        properties.setProperty("format", "csv");
+        DorisOptions.Builder dorisBuilder = DorisOptions.builder();
+        dorisBuilder.setFenodes("127.0.0.1:8030")
+                .setTableIdentifier("test.test_flink")
+                .setUsername("root")
+                .setPassword("");
+        DorisExecutionOptions.Builder  executionBuilder = DorisExecutionOptions.builder();
+        executionBuilder.setLabelPrefix("label")
+                .setStreamLoadProp(properties)
+                .setDeletable(false)
+                .setBufferFlushMaxBytes(8*1024)
+                .setBufferFlushMaxRows(900)
+                .setBufferFlushIntervalMs(1000 * 10);
+
+        builder.setDorisReadOptions(readOptionBuilder.build())
+                .setDorisExecutionOptions(executionBuilder.build())
+                .setSerializer(new SimpleStringSerializer())
+                .setDorisOptions(dorisBuilder.build());
+
+        env.addSource(new SourceFunction<String>() {
+            private Long id = 0L;
+            @Override
+            public void run(SourceContext<String> out) throws Exception {
+                while(true){
+                    id=id+1;
+                    String record = id + "," + UUID.randomUUID() + "," + id + "";
+                    out.collect(record);
+                    Thread.sleep(500);
+                }
+            }
+            @Override
+            public void cancel() {
+
+            }
+        }).sinkTo(builder.build());
+
+        env.execute("doris batch test");
+    }
+}
diff --git a/flink-doris-connector/src/test/java/org/apache/doris/flink/sink/TestBackendUtil.java b/flink-doris-connector/src/test/java/org/apache/doris/flink/sink/TestBackendUtil.java
new file mode 100644
index 0000000..2704af6
--- /dev/null
+++ b/flink-doris-connector/src/test/java/org/apache/doris/flink/sink/TestBackendUtil.java
@@ -0,0 +1,58 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.flink.sink;
+
+import org.apache.doris.flink.rest.models.BackendV2;
+import org.junit.Assert;
+import org.junit.Ignore;
+import org.junit.Test;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+
+@Ignore
+public class TestBackendUtil {
+
+    @Test
+    public void testGetAvailableBackend() throws Exception{
+        List<BackendV2.BackendRowV2> backends = Arrays.asList(
+                newBackend("127.0.0.1", 8040),
+                newBackend("127.0.0.2", 8040),
+                newBackend("127.0.0.3", 8040));
+        BackendUtil backendUtil =  new BackendUtil(backends);
+        Assert.assertEquals(backends.get(0).toBackendString(), backendUtil.getAvailableBackend());
+        Assert.assertEquals(backends.get(1).toBackendString(), backendUtil.getAvailableBackend());
+        Assert.assertEquals(backends.get(2).toBackendString(), backendUtil.getAvailableBackend());
+        Assert.assertEquals(backends.get(0).toBackendString(), backendUtil.getAvailableBackend());
+    }
+
+    @Test
+    public void testTryHttpConnection(){
+        BackendUtil backendUtil = new BackendUtil(new ArrayList<>());
+        boolean flag = backendUtil.tryHttpConnection("127.0.0.1:8040");
+        Assert.assertFalse(flag);
+    }
+
+    private BackendV2.BackendRowV2 newBackend(String host, int port){
+        BackendV2.BackendRowV2 backend = new BackendV2.BackendRowV2();
+        backend.setIp(host);
+        backend.setHttpPort(port);
+        return backend;
+    }
+}
diff --git a/flink-doris-connector/src/test/java/org/apache/doris/flink/sink/batch/TestBatchRecordBuffer.java b/flink-doris-connector/src/test/java/org/apache/doris/flink/sink/batch/TestBatchRecordBuffer.java
new file mode 100644
index 0000000..18cb79a
--- /dev/null
+++ b/flink-doris-connector/src/test/java/org/apache/doris/flink/sink/batch/TestBatchRecordBuffer.java
@@ -0,0 +1,75 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.flink.sink.batch;
+
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.nio.ByteBuffer;
+import java.nio.charset.StandardCharsets;
+
+/**
+ * test for RecordBuffer.
+ */
+public class TestBatchRecordBuffer {
+
+    @Test
+    public void testInsert(){
+        BatchRecordBuffer recordBuffer = new BatchRecordBuffer("\n".getBytes(StandardCharsets.UTF_8),1);
+        recordBuffer.insert("doris,1".getBytes(StandardCharsets.UTF_8));
+
+        Assert.assertEquals(1, recordBuffer.getNumOfRecords());
+        Assert.assertEquals("doris,1".getBytes(StandardCharsets.UTF_8).length, recordBuffer.getBufferSizeBytes());
+
+        recordBuffer.insert("doris,2".getBytes(StandardCharsets.UTF_8));
+        Assert.assertEquals(2, recordBuffer.getNumOfRecords());
+        Assert.assertEquals("doris,1\ndoris,2".getBytes(StandardCharsets.UTF_8).length - "\n".getBytes(StandardCharsets.UTF_8).length, recordBuffer.getBufferSizeBytes());
+
+        ByteBuffer data = recordBuffer.getData();
+        Assert.assertEquals("doris,1\ndoris,2", new String(data.array(), data.arrayOffset(), data.limit()));
+
+        //mock flush
+        recordBuffer.clear();
+        recordBuffer.insert("doris,3".getBytes(StandardCharsets.UTF_8));
+        Assert.assertEquals(1, recordBuffer.getNumOfRecords());
+        Assert.assertEquals("doris,3".getBytes(StandardCharsets.UTF_8).length, recordBuffer.getBufferSizeBytes());
+    }
+
+    @Test
+    public void testGrowCapacity(){
+        BatchRecordBuffer recordBuffer = new BatchRecordBuffer("\n".getBytes(StandardCharsets.UTF_8),1);
+        recordBuffer.ensureCapacity(10);
+
+        Assert.assertEquals(recordBuffer.getBuffer().capacity(), 10 + 1);
+
+        recordBuffer.ensureCapacity(100);
+        Assert.assertEquals(recordBuffer.getBuffer().capacity(), 100 + 11);
+
+        recordBuffer.ensureCapacity(1024);
+        Assert.assertEquals(recordBuffer.getBuffer().capacity(), 1024 + 111);
+
+        recordBuffer = new BatchRecordBuffer("\n".getBytes(StandardCharsets.UTF_8),16);
+        recordBuffer.ensureCapacity(16);
+        Assert.assertEquals(recordBuffer.getBuffer().capacity(), 16);
+
+        recordBuffer.insert("1234567890".getBytes(StandardCharsets.UTF_8));
+        recordBuffer.ensureCapacity(8);
+        Assert.assertEquals(recordBuffer.getBuffer().capacity(), 32);
+    }
+
+}
diff --git a/flink-doris-connector/src/test/java/org/apache/doris/flink/sink/writer/TestDorisWriter.java b/flink-doris-connector/src/test/java/org/apache/doris/flink/sink/writer/TestDorisWriter.java
index a45261b..9e44336 100644
--- a/flink-doris-connector/src/test/java/org/apache/doris/flink/sink/writer/TestDorisWriter.java
+++ b/flink-doris-connector/src/test/java/org/apache/doris/flink/sink/writer/TestDorisWriter.java
@@ -20,7 +20,6 @@
 import org.apache.doris.flink.cfg.DorisExecutionOptions;
 import org.apache.doris.flink.cfg.DorisOptions;
 import org.apache.doris.flink.cfg.DorisReadOptions;
-import org.apache.doris.flink.rest.models.BackendV2;
 import org.apache.doris.flink.sink.DorisCommittable;
 import org.apache.doris.flink.sink.HttpTestUtil;
 import org.apache.doris.flink.sink.OptionUtils;
@@ -32,7 +31,6 @@
 import org.junit.Ignore;
 import org.junit.Test;
 
-import java.util.Arrays;
 import java.util.Collections;
 import java.util.List;
 import java.util.OptionalLong;
@@ -95,36 +93,4 @@
         Assert.assertEquals("doris", writerStates.get(0).getLabelPrefix());
         Assert.assertTrue(dorisWriter.isLoading());
     }
-
-    @Test
-    public void testGetAvailableBackend() throws Exception{
-        Sink.InitContext initContext = mock(Sink.InitContext.class);
-        DorisWriter<String> dorisWriter = new DorisWriter<String>(initContext, Collections.emptyList(), new SimpleStringSerializer(), dorisOptions, readOptions, executionOptions);
-        List<BackendV2.BackendRowV2> backends = Arrays.asList(
-                newBackend("127.0.0.1", 8040),
-                newBackend("127.0.0.2", 8040),
-                newBackend("127.0.0.3", 8040));
-        dorisWriter.setBackends(backends);
-        Assert.assertEquals(backends.get(0).toBackendString(), dorisWriter.getAvailableBackend());
-        Assert.assertEquals(backends.get(1).toBackendString(), dorisWriter.getAvailableBackend());
-        Assert.assertEquals(backends.get(2).toBackendString(), dorisWriter.getAvailableBackend());
-        Assert.assertEquals(backends.get(0).toBackendString(), dorisWriter.getAvailableBackend());
-    }
-
-    @Test
-    public void testTryHttpConnection(){
-        Sink.InitContext initContext = mock(Sink.InitContext.class);
-        DorisWriter<String> dorisWriter = new DorisWriter<String>(initContext, Collections.emptyList(), new SimpleStringSerializer(), dorisOptions, readOptions, executionOptions);
-        boolean flag = dorisWriter.tryHttpConnection("127.0.0.1:8040");
-        Assert.assertFalse(flag);
-    }
-
-    private BackendV2.BackendRowV2 newBackend(String host, int port){
-        BackendV2.BackendRowV2 backend = new BackendV2.BackendRowV2();
-        backend.setIp(host);
-        backend.setHttpPort(port);
-        return backend;
-    }
-
-
 }