[Feature](CacheWriter) doris sink support cache record buffer (#193)
diff --git a/flink-doris-connector/src/main/java/org/apache/doris/flink/catalog/DorisCatalogFactory.java b/flink-doris-connector/src/main/java/org/apache/doris/flink/catalog/DorisCatalogFactory.java
index 958ce3d..c00e5f1 100644
--- a/flink-doris-connector/src/main/java/org/apache/doris/flink/catalog/DorisCatalogFactory.java
+++ b/flink-doris-connector/src/main/java/org/apache/doris/flink/catalog/DorisCatalogFactory.java
@@ -50,6 +50,7 @@
import static org.apache.doris.flink.table.DorisConfigOptions.SINK_LABEL_PREFIX;
import static org.apache.doris.flink.table.DorisConfigOptions.SINK_MAX_RETRIES;
import static org.apache.doris.flink.table.DorisConfigOptions.SINK_PARALLELISM;
+import static org.apache.doris.flink.table.DorisConfigOptions.SINK_USE_CACHE;
import static org.apache.doris.flink.table.DorisConfigOptions.SOURCE_USE_OLD_API;
import static org.apache.doris.flink.table.DorisConfigOptions.STREAM_LOAD_PROP_PREFIX;
import static org.apache.doris.flink.table.DorisConfigOptions.TABLE_IDENTIFIER;
@@ -105,6 +106,7 @@
options.add(SINK_BUFFER_SIZE);
options.add(SINK_BUFFER_COUNT);
options.add(SINK_PARALLELISM);
+ options.add(SINK_USE_CACHE);
options.add(SOURCE_USE_OLD_API);
return options;
diff --git a/flink-doris-connector/src/main/java/org/apache/doris/flink/cfg/DorisExecutionOptions.java b/flink-doris-connector/src/main/java/org/apache/doris/flink/cfg/DorisExecutionOptions.java
index ccd23f9..2422df8 100644
--- a/flink-doris-connector/src/main/java/org/apache/doris/flink/cfg/DorisExecutionOptions.java
+++ b/flink-doris-connector/src/main/java/org/apache/doris/flink/cfg/DorisExecutionOptions.java
@@ -42,6 +42,8 @@
private final int bufferSize;
private final int bufferCount;
private final String labelPrefix;
+ private final boolean useCache;
+
/**
* Properties for the StreamLoad.
*/
@@ -62,6 +64,7 @@
int bufferSize,
int bufferCount,
String labelPrefix,
+ boolean useCache,
Properties streamLoadProp,
Boolean enableDelete,
Boolean enable2PC,
@@ -77,6 +80,7 @@
this.bufferSize = bufferSize;
this.bufferCount = bufferCount;
this.labelPrefix = labelPrefix;
+ this.useCache = useCache;
this.streamLoadProp = streamLoadProp;
this.enableDelete = enableDelete;
this.enable2PC = enable2PC;
@@ -132,6 +136,10 @@
return labelPrefix;
}
+ public boolean isUseCache () {
+ return useCache;
+ }
+
public Properties getStreamLoadProp() {
return streamLoadProp;
}
@@ -177,6 +185,7 @@
private int bufferSize = DEFAULT_BUFFER_SIZE;
private int bufferCount = DEFAULT_BUFFER_COUNT;
private String labelPrefix = "";
+ private boolean useCache = false;
private Properties streamLoadProp = new Properties();
private boolean enableDelete = true;
private boolean enable2PC = true;
@@ -215,6 +224,11 @@
return this;
}
+ public Builder setUseCache(boolean useCache) {
+ this.useCache = useCache;
+ return this;
+ }
+
public Builder setStreamLoadProp(Properties streamLoadProp) {
this.streamLoadProp = streamLoadProp;
return this;
@@ -262,7 +276,7 @@
}
public DorisExecutionOptions build() {
- return new DorisExecutionOptions(checkInterval, maxRetries, bufferSize, bufferCount, labelPrefix,
+ return new DorisExecutionOptions(checkInterval, maxRetries, bufferSize, bufferCount, labelPrefix, useCache,
streamLoadProp, enableDelete, enable2PC, enableBatchMode, flushQueueSize, bufferFlushMaxRows,
bufferFlushMaxBytes, bufferFlushIntervalMs, ignoreUpdateBefore);
}
diff --git a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/CacheRecordBuffer.java b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/CacheRecordBuffer.java
new file mode 100644
index 0000000..c25928e
--- /dev/null
+++ b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/CacheRecordBuffer.java
@@ -0,0 +1,118 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.flink.sink.writer;
+
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.util.Preconditions;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.concurrent.BlockingDeque;
+import java.util.concurrent.LinkedBlockingDeque;
+import java.util.concurrent.LinkedBlockingQueue;
+
+/**
+ * Channel of record stream and HTTP data stream.
+ */
+public class CacheRecordBuffer extends RecordBuffer{
+ private static final Logger LOG = LoggerFactory.getLogger(CacheRecordBuffer.class);
+ BlockingDeque<ByteBuffer> bufferCache;
+ LinkedBlockingQueue<ByteBuffer> bufferPool;
+
+ public CacheRecordBuffer(int capacity, int queueSize) {
+ super(capacity, queueSize);
+ bufferCache = new LinkedBlockingDeque<>();
+ bufferPool = new LinkedBlockingQueue<>();
+ }
+
+ @Override
+ public void startBufferData() throws IOException{
+ LOG.info("start buffer data, read queue size {}, write queue size {}, buffer cache size {}, buffer pool size {}",
+ readQueue.size(), writeQueue.size(), bufferCache.size(), bufferPool.size());
+ try {
+ // if the cache have data, that should be restarted from previous error
+ if (currentReadBuffer != null && currentReadBuffer.limit() != 0) {
+ currentReadBuffer.rewind();
+ readQueue.putFirst(currentReadBuffer);
+ currentReadBuffer = null;
+ }
+ // re-read the data in bufferCache
+ ByteBuffer buffer = bufferCache.pollFirst();
+ while (buffer != null) {
+ buffer.rewind();
+ readQueue.putFirst(buffer);
+ buffer = bufferCache.pollFirst();
+ }
+ } catch (InterruptedException e) {
+ throw new IOException(e);
+ }
+ }
+
+ @Override
+ public int read(byte[] buf) throws InterruptedException {
+ if (currentReadBuffer == null) {
+ currentReadBuffer = readQueue.take();
+ }
+ // add empty buffer as end flag
+ if (currentReadBuffer.limit() == 0) {
+ Preconditions.checkState(readQueue.size() == 0);
+ bufferCache.putFirst(currentReadBuffer);
+ writeQueue.offer(allocate());
+ currentReadBuffer = null;
+ return -1;
+ }
+
+ int available = currentReadBuffer.remaining();
+ int nRead = Math.min(available, buf.length);
+ currentReadBuffer.get(buf, 0, nRead);
+ if (currentReadBuffer.remaining() == 0) {
+ bufferCache.putFirst(currentReadBuffer);
+ writeQueue.offer(allocate());
+ currentReadBuffer = null;
+ }
+ return nRead;
+ }
+
+ public void recycleCache() {
+ // recycle cache buffer
+ Preconditions.checkState(readQueue.size() == 0);
+ ByteBuffer buff = bufferCache.poll();
+ while (buff != null) {
+ buff.clear();
+ bufferPool.add(buff);
+ buff = bufferCache.poll();
+ }
+ }
+
+ private ByteBuffer allocate(){
+ ByteBuffer buff = bufferPool.poll();
+ return buff != null ? buff : ByteBuffer.allocate(bufferCapacity);
+ }
+
+ @VisibleForTesting
+ public int getBufferCacheSize() {
+ return bufferCache.size();
+ }
+
+ @VisibleForTesting
+ public int getBufferPoolSize() {
+ return bufferPool.size();
+ }
+}
diff --git a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/DorisStreamLoad.java b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/DorisStreamLoad.java
index e7ade2f..cda3c05 100644
--- a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/DorisStreamLoad.java
+++ b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/DorisStreamLoad.java
@@ -70,7 +70,7 @@
private String loadUrlStr;
private String hostPort;
- private final String abortUrlStr;
+ private String abortUrlStr;
private final String user;
private final String passwd;
private final String db;
@@ -105,7 +105,7 @@
this.executorService = new ThreadPoolExecutor(1, 1,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<>(), new ExecutorThreadFactory("stream-load-upload"));
- this.recordStream = new RecordStream(executionOptions.getBufferSize(), executionOptions.getBufferCount());
+ this.recordStream = new RecordStream(executionOptions.getBufferSize(), executionOptions.getBufferCount(), executionOptions.isUseCache());
lineDelimiter = EscapeHandler.escapeString(streamLoadProp.getProperty(LINE_DELIMITER_KEY, LINE_DELIMITER_DEFAULT)).getBytes();
loadBatchFirstRecord = true;
}
@@ -121,6 +121,7 @@
public void setHostPort(String hostPort) {
this.hostPort = hostPort;
this.loadUrlStr = String.format(LOAD_URL_PATTERN, hostPort, this.db, this.table);
+ this.abortUrlStr = String.format(ABORT_URL_PATTERN, hostPort, db);
}
public Future<CloseableHttpResponse> getPendingLoadFuture() {
@@ -226,10 +227,10 @@
* @param label
* @throws IOException
*/
- public void startLoad(String label) throws IOException{
- loadBatchFirstRecord = true;
+ public void startLoad(String label, boolean isResume) throws IOException {
+ loadBatchFirstRecord = !isResume;
HttpPutBuilder putBuilder = new HttpPutBuilder();
- recordStream.startInput();
+ recordStream.startInput(isResume);
LOG.info("stream load started for {} on host {}", label, hostPort);
try {
InputStreamEntity entity = new InputStreamEntity(recordStream);
diff --git a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/DorisWriter.java b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/DorisWriter.java
index 1f98206..230bad5 100644
--- a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/DorisWriter.java
+++ b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/DorisWriter.java
@@ -60,6 +60,7 @@
private static final Logger LOG = LoggerFactory.getLogger(DorisWriter.class);
private static final List<String> DORIS_SUCCESS_STATUS = new ArrayList<>(Arrays.asList(SUCCESS, PUBLISH_TIMEOUT));
private final long lastCheckpointId;
+ private long curCheckpointId;
private DorisStreamLoad dorisStreamLoad;
volatile boolean loading;
private final DorisOptions dorisOptions;
@@ -86,6 +87,7 @@
initContext
.getRestoredCheckpointId()
.orElse(CheckpointIDCounter.INITIAL_CHECKPOINT_ID - 1);
+ this.curCheckpointId = lastCheckpointId + 1;
LOG.info("restore checkpointId {}", lastCheckpointId);
LOG.info("labelPrefix " + executionOptions.getLabelPrefix());
this.dorisWriterState = new DorisWriterState(executionOptions.getLabelPrefix());
@@ -113,14 +115,14 @@
// TODO: we need check and abort all pending transaction.
// Discard transactions that may cause the job to fail.
if(executionOptions.enabled2PC()) {
- dorisStreamLoad.abortPreCommit(labelPrefix, lastCheckpointId + 1);
+ dorisStreamLoad.abortPreCommit(labelPrefix, curCheckpointId);
}
} catch (Exception e) {
throw new DorisRuntimeException(e);
}
// get main work thread.
executorThread = Thread.currentThread();
- this.currentLabel = labelGenerator.generateLabel(lastCheckpointId + 1);
+ this.currentLabel = labelGenerator.generateLabel(curCheckpointId);
// when uploading data in streaming mode, we need to regularly detect whether there are exceptions.
scheduledExecutorService.scheduleWithFixedDelay(this::checkDone, 200, intervalTime, TimeUnit.MILLISECONDS);
}
@@ -134,8 +136,8 @@
return;
}
if(!loading) {
- //Start streamload only when there has data
- dorisStreamLoad.startLoad(currentLabel);
+ // start stream load only when there has data
+ dorisStreamLoad.startLoad(currentLabel, false);
loading = true;
}
dorisStreamLoad.writeRecord(serialize);
@@ -167,7 +169,8 @@
Preconditions.checkState(dorisStreamLoad != null);
// dynamic refresh BE node
this.dorisStreamLoad.setHostPort(backendUtil.getAvailableBackend());
- this.currentLabel = labelGenerator.generateLabel(checkpointId + 1);
+ this.curCheckpointId = checkpointId + 1;
+ this.currentLabel = labelGenerator.generateLabel(curCheckpointId);
return Collections.singletonList(dorisWriterState);
}
@@ -182,23 +185,38 @@
return;
}
- // double check to interrupt when loading is true and dorisStreamLoad.getPendingLoadFuture().isDone
- // fix issue #139
+ // double-check the future, to avoid getting the old future
if (dorisStreamLoad.getPendingLoadFuture() != null
&& dorisStreamLoad.getPendingLoadFuture().isDone()) {
- // TODO: introduce cache for reload instead of throwing exceptions.
- String errorMsg;
- try {
- RespContent content = dorisStreamLoad.handlePreCommitResponse(dorisStreamLoad.getPendingLoadFuture().get());
- errorMsg = content.getMessage();
- } catch (Exception e) {
- errorMsg = e.getMessage();
- }
+ // error happened when loading, now we should stop receive data
+ // and abort previous txn(stream load) and start a new txn(stream load)
+ // use send cached data to new txn, then notify to restart the stream
+ if (executionOptions.isUseCache()) {
+ try {
+ this.dorisStreamLoad.setHostPort(backendUtil.getAvailableBackend());
+ if (executionOptions.enabled2PC()) {
+ dorisStreamLoad.abortPreCommit(labelPrefix, curCheckpointId);
+ }
+ // start a new txn(stream load)
+ LOG.info("getting exception, breakpoint resume for checkpoint ID: {}", curCheckpointId);
+ dorisStreamLoad.startLoad(labelGenerator.generateLabel(curCheckpointId), true);
+ } catch (Exception e) {
+ throw new DorisRuntimeException(e);
+ }
+ } else {
+ String errorMsg;
+ try {
+ RespContent content = dorisStreamLoad.handlePreCommitResponse(dorisStreamLoad.getPendingLoadFuture().get());
+ errorMsg = content.getMessage();
+ } catch (Exception e) {
+ errorMsg = e.getMessage();
+ }
- loadException = new StreamLoadException(errorMsg);
- LOG.error("stream load finished unexpectedly, interrupt worker thread! {}", errorMsg);
- // set the executor thread interrupted in case blocking in write data.
- executorThread.interrupt();
+ loadException = new StreamLoadException(errorMsg);
+ LOG.error("stream load finished unexpectedly, interrupt worker thread! {}", errorMsg);
+ // set the executor thread interrupted in case blocking in write data.
+ executorThread.interrupt();
+ }
}
}
}
diff --git a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/RecordBuffer.java b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/RecordBuffer.java
index abe93e9..e5259cb 100644
--- a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/RecordBuffer.java
+++ b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/RecordBuffer.java
@@ -33,7 +33,7 @@
public class RecordBuffer {
private static final Logger LOG = LoggerFactory.getLogger(RecordBuffer.class);
BlockingQueue<ByteBuffer> writeQueue;
- BlockingQueue<ByteBuffer> readQueue;
+ LinkedBlockingDeque<ByteBuffer> readQueue;
int bufferCapacity;
int queueSize;
ByteBuffer currentWriteBuffer;
@@ -52,7 +52,7 @@
this.queueSize = queueSize;
}
- public void startBufferData() {
+ public void startBufferData() throws IOException{
LOG.info("start buffer data, read queue size {}, write queue size {}", readQueue.size(), writeQueue.size());
Preconditions.checkState(readQueue.size() == 0);
Preconditions.checkState(writeQueue.size() == queueSize);
diff --git a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/RecordStream.java b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/RecordStream.java
index fd1c6ca..baf68bf 100644
--- a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/RecordStream.java
+++ b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/RecordStream.java
@@ -31,11 +31,19 @@
return 0;
}
- public RecordStream(int bufferSize, int bufferCount) {
- this.recordBuffer = new RecordBuffer(bufferSize, bufferCount);
+ public RecordStream(int bufferSize, int bufferCount, boolean useCache) {
+ if (useCache) {
+ this.recordBuffer = new CacheRecordBuffer(bufferSize, bufferCount);
+ }else {
+ this.recordBuffer = new RecordBuffer(bufferSize, bufferCount);
+ }
}
- public void startInput() {
+ public void startInput(boolean isResume) throws IOException {
+ // if resume from breakpoint, do not recycle cache buffer
+ if (!isResume && recordBuffer instanceof CacheRecordBuffer) {
+ ((CacheRecordBuffer)recordBuffer).recycleCache();
+ }
recordBuffer.startBufferData();
}
diff --git a/flink-doris-connector/src/main/java/org/apache/doris/flink/table/DorisConfigOptions.java b/flink-doris-connector/src/main/java/org/apache/doris/flink/table/DorisConfigOptions.java
index 98b9a78..50b205c 100644
--- a/flink-doris-connector/src/main/java/org/apache/doris/flink/table/DorisConfigOptions.java
+++ b/flink-doris-connector/src/main/java/org/apache/doris/flink/table/DorisConfigOptions.java
@@ -235,6 +235,12 @@
.withDescription("In the CDC scenario, when the primary key of the upstream is inconsistent with that of the downstream, the update-before data needs to be passed to the downstream as deleted data, otherwise the data cannot be deleted.\n" +
"The default is to ignore, that is, perform upsert semantics.");
+ public static final ConfigOption<Boolean> SINK_USE_CACHE = ConfigOptions
+ .key("sink.use-cache")
+ .booleanType()
+ .defaultValue(false)
+ .withDescription("Whether to use buffer cache for breakpoint resume");
+
// Prefix for Doris StreamLoad specific properties.
public static final String STREAM_LOAD_PROP_PREFIX = "sink.properties.";
diff --git a/flink-doris-connector/src/main/java/org/apache/doris/flink/table/DorisDynamicTableFactory.java b/flink-doris-connector/src/main/java/org/apache/doris/flink/table/DorisDynamicTableFactory.java
index 57e4fc0..9583236 100644
--- a/flink-doris-connector/src/main/java/org/apache/doris/flink/table/DorisDynamicTableFactory.java
+++ b/flink-doris-connector/src/main/java/org/apache/doris/flink/table/DorisDynamicTableFactory.java
@@ -74,6 +74,7 @@
import static org.apache.doris.flink.table.DorisConfigOptions.SINK_LABEL_PREFIX;
import static org.apache.doris.flink.table.DorisConfigOptions.SINK_MAX_RETRIES;
import static org.apache.doris.flink.table.DorisConfigOptions.SINK_PARALLELISM;
+import static org.apache.doris.flink.table.DorisConfigOptions.SINK_USE_CACHE;
import static org.apache.doris.flink.table.DorisConfigOptions.SOURCE_USE_OLD_API;
import static org.apache.doris.flink.table.DorisConfigOptions.STREAM_LOAD_PROP_PREFIX;
import static org.apache.doris.flink.table.DorisConfigOptions.TABLE_IDENTIFIER;
@@ -146,6 +147,8 @@
options.add(SINK_FLUSH_QUEUE_SIZE);
options.add(SINK_BUFFER_FLUSH_INTERVAL);
+ options.add(SINK_USE_CACHE);
+
options.add(SOURCE_USE_OLD_API);
return options;
}
@@ -223,6 +226,8 @@
builder.setBufferFlushMaxRows(readableConfig.get(SINK_BUFFER_FLUSH_MAX_ROWS));
builder.setBufferFlushMaxBytes(readableConfig.get(SINK_BUFFER_FLUSH_MAX_BYTES));
builder.setBufferFlushIntervalMs(readableConfig.get(SINK_BUFFER_FLUSH_INTERVAL).toMillis());
+
+ builder.setUseCache(readableConfig.get(SINK_USE_CACHE));
return builder.build();
}
diff --git a/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/DatabaseSync.java b/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/DatabaseSync.java
index e36590c..a28403e 100644
--- a/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/DatabaseSync.java
+++ b/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/DatabaseSync.java
@@ -201,6 +201,8 @@
sinkConfig.getOptional(DorisConfigOptions.SINK_BUFFER_FLUSH_MAX_BYTES).ifPresent(executionBuilder::setBufferFlushMaxBytes);
sinkConfig.getOptional(DorisConfigOptions.SINK_BUFFER_FLUSH_INTERVAL).ifPresent(v-> executionBuilder.setBufferFlushIntervalMs(v.toMillis()));
+ sinkConfig.getOptional(DorisConfigOptions.SINK_USE_CACHE).ifPresent(executionBuilder::setUseCache);
+
DorisExecutionOptions executionOptions = executionBuilder.build();
builder.setDorisReadOptions(DorisReadOptions.builder().build())
.setDorisExecutionOptions(executionOptions)
diff --git a/flink-doris-connector/src/test/java/org/apache/doris/flink/sink/writer/TestCacheRecordBuffer.java b/flink-doris-connector/src/test/java/org/apache/doris/flink/sink/writer/TestCacheRecordBuffer.java
new file mode 100644
index 0000000..4af39ab
--- /dev/null
+++ b/flink-doris-connector/src/test/java/org/apache/doris/flink/sink/writer/TestCacheRecordBuffer.java
@@ -0,0 +1,77 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.flink.sink.writer;
+
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.nio.charset.StandardCharsets;
+
+/**
+ * test for CacheRecordBuffer.
+ */
+public class TestCacheRecordBuffer {
+
+ @Test
+ public void testRead() throws Exception {
+ CacheRecordBuffer recordBuffer = new CacheRecordBuffer(16, 4);
+ recordBuffer.startBufferData();
+ recordBuffer.write("This is Test for CacheRecordBuffer!".getBytes(StandardCharsets.UTF_8));
+ Assert.assertEquals(2, recordBuffer.getReadQueueSize());
+ Assert.assertEquals(1, recordBuffer.getWriteQueueSize());
+ Assert.assertEquals(0,recordBuffer.getBufferCacheSize());
+ byte[] buffer = new byte[16];
+ int nRead = recordBuffer.read(buffer);
+ Assert.assertEquals(1, recordBuffer.getReadQueueSize());
+ Assert.assertEquals(2, recordBuffer.getWriteQueueSize());
+ Assert.assertEquals(1,recordBuffer.getBufferCacheSize());
+ Assert.assertEquals(16, nRead);
+ Assert.assertArrayEquals("This is Test for".getBytes(StandardCharsets.UTF_8), buffer);
+
+ recordBuffer.write("Continue to write the last one.".getBytes(StandardCharsets.UTF_8));
+ buffer = new byte[12];
+ nRead = recordBuffer.read(buffer);
+ Assert.assertEquals(12, nRead);
+ Assert.assertArrayEquals(" CacheRecord".getBytes(StandardCharsets.UTF_8), buffer);
+ Assert.assertEquals(2, recordBuffer.getReadQueueSize());
+ Assert.assertEquals(0, recordBuffer.getWriteQueueSize());
+ Assert.assertEquals(1,recordBuffer.getBufferCacheSize());
+ }
+
+ @Test
+ public void testRecycleCache() throws Exception {
+ CacheRecordBuffer recordBuffer = new CacheRecordBuffer(16, 4);
+ recordBuffer.startBufferData();
+ recordBuffer.write("This is Test for CacheRecordBuffer.recycleCache!".getBytes(StandardCharsets.UTF_8));
+ Assert.assertEquals(3, recordBuffer.getReadQueueSize());
+ Assert.assertEquals(1, recordBuffer.getWriteQueueSize());
+ Assert.assertEquals(0,recordBuffer.getBufferCacheSize());
+
+ byte[] buffer = new byte[50];
+ recordBuffer.read(buffer);
+ recordBuffer.read(buffer);
+ recordBuffer.read(buffer);
+ Assert.assertEquals(0, recordBuffer.getReadQueueSize());
+ Assert.assertEquals(4, recordBuffer.getWriteQueueSize());
+ Assert.assertEquals(3,recordBuffer.getBufferCacheSize());
+
+ recordBuffer.recycleCache();
+ Assert.assertEquals(3,recordBuffer.getBufferPoolSize());
+ Assert.assertEquals(0,recordBuffer.getBufferCacheSize());
+ }
+}
diff --git a/flink-doris-connector/src/test/java/org/apache/doris/flink/sink/writer/TestDorisStreamLoad.java b/flink-doris-connector/src/test/java/org/apache/doris/flink/sink/writer/TestDorisStreamLoad.java
index d6f0967..e295de1 100644
--- a/flink-doris-connector/src/test/java/org/apache/doris/flink/sink/writer/TestDorisStreamLoad.java
+++ b/flink-doris-connector/src/test/java/org/apache/doris/flink/sink/writer/TestDorisStreamLoad.java
@@ -90,7 +90,7 @@
when(httpClient.execute(any())).thenReturn(preCommitResponse);
byte[] writeBuffer = "test".getBytes(StandardCharsets.UTF_8);
DorisStreamLoad dorisStreamLoad = new DorisStreamLoad("", dorisOptions, executionOptions, new LabelGenerator("", true), httpClient);
- dorisStreamLoad.startLoad("1");
+ dorisStreamLoad.startLoad("1",false);
dorisStreamLoad.writeRecord(writeBuffer);
dorisStreamLoad.stopLoad("label");
byte[] buff = new byte[4];
@@ -109,7 +109,7 @@
when(httpClient.execute(any())).thenReturn(preCommitResponse);
byte[] writeBuffer = "test".getBytes(StandardCharsets.UTF_8);
DorisStreamLoad dorisStreamLoad = new DorisStreamLoad("", dorisOptions, executionOptions, new LabelGenerator("", true), httpClient);
- dorisStreamLoad.startLoad("1");
+ dorisStreamLoad.startLoad("1", false);
dorisStreamLoad.writeRecord(writeBuffer);
dorisStreamLoad.writeRecord(writeBuffer);
dorisStreamLoad.stopLoad("label");
@@ -134,7 +134,7 @@
when(httpClient.execute(any())).thenReturn(preCommitResponse);
DorisStreamLoad dorisStreamLoad = new DorisStreamLoad("", dorisOptions, executionOptions, new LabelGenerator("", true), httpClient);
- dorisStreamLoad.startLoad("1");
+ dorisStreamLoad.startLoad("1", false);
dorisStreamLoad.writeRecord("{\"id\": 1}".getBytes(StandardCharsets.UTF_8));
dorisStreamLoad.writeRecord("{\"id\": 2}".getBytes(StandardCharsets.UTF_8));
dorisStreamLoad.stopLoad("label");
diff --git a/flink-doris-connector/src/test/java/org/apache/doris/flink/sink/writer/TestDorisWriter.java b/flink-doris-connector/src/test/java/org/apache/doris/flink/sink/writer/TestDorisWriter.java
index 9e44336..e988d6b 100644
--- a/flink-doris-connector/src/test/java/org/apache/doris/flink/sink/writer/TestDorisWriter.java
+++ b/flink-doris-connector/src/test/java/org/apache/doris/flink/sink/writer/TestDorisWriter.java
@@ -62,7 +62,7 @@
when(httpClient.execute(any())).thenReturn(preCommitResponse);
DorisStreamLoad dorisStreamLoad = new DorisStreamLoad("local:8040", dorisOptions, executionOptions, new LabelGenerator("", true), httpClient);
- dorisStreamLoad.startLoad("");
+ dorisStreamLoad.startLoad("", false);
Sink.InitContext initContext = mock(Sink.InitContext.class);
when(initContext.getRestoredCheckpointId()).thenReturn(OptionalLong.of(1));
DorisWriter<String> dorisWriter = new DorisWriter<String>(initContext, Collections.emptyList(), new SimpleStringSerializer(), dorisOptions, readOptions, executionOptions);