[fix] Fixed the issue that batchwriter may be blocked when writing to multiple tables (#511)
diff --git a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/batch/DorisBatchStreamLoad.java b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/batch/DorisBatchStreamLoad.java
index 3cfda60..3747257 100644
--- a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/batch/DorisBatchStreamLoad.java
+++ b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/batch/DorisBatchStreamLoad.java
@@ -178,7 +178,7 @@
* @param record
* @throws IOException
*/
- public synchronized void writeRecord(String database, String table, byte[] record) {
+ public void writeRecord(String database, String table, byte[] record) {
checkFlushException();
String bufferKey = getTableIdentifier(database, table);
@@ -228,15 +228,15 @@
}
}
- public synchronized boolean bufferFullFlush(String bufferKey) {
+ public boolean bufferFullFlush(String bufferKey) {
return doFlush(bufferKey, false, true);
}
- public synchronized boolean intervalFlush() {
+ public boolean intervalFlush() {
return doFlush(null, false, false);
}
- public synchronized boolean checkpointFlush() {
+ public boolean checkpointFlush() {
return doFlush(null, true, false);
}
@@ -254,6 +254,10 @@
}
private synchronized boolean flush(String bufferKey, boolean waitUtilDone) {
+ if (bufferMap.isEmpty()) {
+ // bufferMap may have been flushed by other threads
+ return false;
+ }
if (null == bufferKey) {
boolean flush = false;
for (String key : bufferMap.keySet()) {
@@ -270,7 +274,7 @@
} else if (bufferMap.containsKey(bufferKey)) {
flushBuffer(bufferKey);
} else {
- throw new DorisBatchLoadException("buffer not found for key: " + bufferKey);
+ LOG.warn("buffer not found for key: {}, may be already flushed.", bufferKey);
}
if (waitUtilDone) {
waitAsyncLoadFinish();
@@ -281,6 +285,7 @@
private synchronized void flushBuffer(String bufferKey) {
BatchRecordBuffer buffer = bufferMap.get(bufferKey);
buffer.setLabelName(labelGenerator.generateBatchLabel(buffer.getTable()));
+ LOG.debug("flush buffer for key {} with label {}", bufferKey, buffer.getLabelName());
putRecordToFlushQueue(buffer);
bufferMap.remove(bufferKey);
}
@@ -408,11 +413,6 @@
load(bf.getLabelName(), bf);
}
}
-
- if (flushQueue.size() < flushQueueSize) {
- // Avoid waiting for 2 rounds of intervalMs
- doFlush(null, false, false);
- }
} catch (Exception e) {
LOG.error("worker running error", e);
exception.set(e);