Fix concurrency issues during flush (#177)
Co-authored-by: geyun <suchengxiang.scx@cainiao.com>
diff --git a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/batch/DorisBatchStreamLoad.java b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/batch/DorisBatchStreamLoad.java
index 8f2dcc1..1fe217d 100644
--- a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/batch/DorisBatchStreamLoad.java
+++ b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/batch/DorisBatchStreamLoad.java
@@ -124,7 +124,7 @@
* @param record
* @throws IOException
*/
- public void writeRecord(byte[] record) throws InterruptedException {
+ public synchronized void writeRecord(byte[] record) throws InterruptedException {
checkFlushException();
if(buffer == null){
buffer = takeRecordFromWriteQueue();
@@ -137,7 +137,7 @@
}
}
- public void flush(boolean waitUtilDone) throws InterruptedException {
+ public synchronized void flush(boolean waitUtilDone) throws InterruptedException {
checkFlushException();
if (buffer == null) {
LOG.debug("buffer is empty, skip flush.");