commit | 5e13440321e1844adba3a75e6415d3ec08d60e9a | [log] [tgz] |
---|---|---|
author | Humkum <1109939087@qq.com> | Fri May 17 11:02:21 2024 +0800 |
committer | GitHub <noreply@github.com> | Fri May 17 11:02:21 2024 +0800 |
tree | f824d5ab3b52abb4c649edf0f4922d2a849e5e1a | |
parent | 0a1a03367ca422f3f4d5011b368e8e0ab0f91db8 [diff] |
[ISSUE #118] fix: concurrency problem caused by batchFlushOnCheckpoint (#119)
diff --git a/src/main/java/org/apache/flink/connector/rocketmq/legacy/RocketMQSink.java b/src/main/java/org/apache/flink/connector/rocketmq/legacy/RocketMQSink.java index 9f87486..9ff2812 100644 --- a/src/main/java/org/apache/flink/connector/rocketmq/legacy/RocketMQSink.java +++ b/src/main/java/org/apache/flink/connector/rocketmq/legacy/RocketMQSink.java
@@ -111,7 +111,9 @@ sinkInTps.markEvent(); if (batchFlushOnCheckpoint) { - batchList.add(input); + synchronized (batchList) { + batchList.add(input); + } if (batchList.size() >= batchSize) { flushSync(); }