[ISSUE #76] Fix bug when the job restore from ck (#77)
Fix bug when the job restore from ck
Co-authored-by: 高思伟 <siwei.gao@amh-group.com>
diff --git a/src/main/java/org/apache/rocketmq/flink/legacy/RocketMQSourceFunction.java b/src/main/java/org/apache/rocketmq/flink/legacy/RocketMQSourceFunction.java
index 97037a1..fa57839 100644
--- a/src/main/java/org/apache/rocketmq/flink/legacy/RocketMQSourceFunction.java
+++ b/src/main/java/org/apache/rocketmq/flink/legacy/RocketMQSourceFunction.java
@@ -176,10 +176,6 @@
if (restoredOffsets == null) {
restoredOffsets = new ConcurrentHashMap<>();
}
-
- // use restoredOffsets to init offset table.
- initOffsetTableFromRestoredOffsets();
-
if (pendingOffsetsToCommit == null) {
pendingOffsetsToCommit = new LinkedMap();
}
@@ -250,7 +246,9 @@
RocketMQUtils.allocate(totalQueues, taskNumber, ctx.getIndexOfThisSubtask());
// If the job recovers from the state, the state has already contained the offsets of last
// commit.
- if (!restored) {
+ if (restored) {
+ initOffsetTableFromRestoredOffsets(messageQueues);
+ } else {
initOffsets(messageQueues);
}
}
@@ -539,11 +537,11 @@
}
}
- public void initOffsetTableFromRestoredOffsets() {
+ public void initOffsetTableFromRestoredOffsets(List<MessageQueue> messageQueues) {
Preconditions.checkNotNull(restoredOffsets, "restoredOffsets can't be null");
restoredOffsets.forEach(
(mq, offset) -> {
- if (!offsetTable.containsKey(mq) || offsetTable.get(mq) < offset) {
+ if (messageQueues.contains(mq)) {
offsetTable.put(mq, offset);
}
});
diff --git a/src/test/java/org/apache/rocketmq/flink/legacy/sourceFunction/RocketMQSourceFunctionTest.java b/src/test/java/org/apache/rocketmq/flink/legacy/sourceFunction/RocketMQSourceFunctionTest.java
index 8b7b44f..6ef73e6 100644
--- a/src/test/java/org/apache/rocketmq/flink/legacy/sourceFunction/RocketMQSourceFunctionTest.java
+++ b/src/test/java/org/apache/rocketmq/flink/legacy/sourceFunction/RocketMQSourceFunctionTest.java
@@ -27,6 +27,7 @@
import org.junit.Test;
+import java.util.ArrayList;
import java.util.HashMap;
import java.util.Map;
import java.util.Properties;
@@ -78,7 +79,7 @@
map.put(new MessageQueue("tpc", "broker-1", 1), 31L);
setFieldValue(source, "restoredOffsets", map);
setFieldValue(source, "offsetTable", new ConcurrentHashMap<>());
- source.initOffsetTableFromRestoredOffsets();
+ source.initOffsetTableFromRestoredOffsets(new ArrayList<>(map.keySet()));
Map<MessageQueue, Long> offsetTable = (Map) getFieldValue(source, "offsetTable");
for (Map.Entry<MessageQueue, Long> entry : map.entrySet()) {
assertEquals(offsetTable.containsKey(entry.getKey()), true);