[ISSUE #76] Fix bug when the job restore from ck (#77)

Fix bug when the job restore from ck

Co-authored-by: 高思伟 <siwei.gao@amh-group.com>
diff --git a/src/main/java/org/apache/rocketmq/flink/legacy/RocketMQSourceFunction.java b/src/main/java/org/apache/rocketmq/flink/legacy/RocketMQSourceFunction.java
index 97037a1..fa57839 100644
--- a/src/main/java/org/apache/rocketmq/flink/legacy/RocketMQSourceFunction.java
+++ b/src/main/java/org/apache/rocketmq/flink/legacy/RocketMQSourceFunction.java
@@ -176,10 +176,6 @@
         if (restoredOffsets == null) {
             restoredOffsets = new ConcurrentHashMap<>();
         }
-
-        // use restoredOffsets to init offset table.
-        initOffsetTableFromRestoredOffsets();
-
         if (pendingOffsetsToCommit == null) {
             pendingOffsetsToCommit = new LinkedMap();
         }
@@ -250,7 +246,9 @@
                 RocketMQUtils.allocate(totalQueues, taskNumber, ctx.getIndexOfThisSubtask());
         // If the job recovers from the state, the state has already contained the offsets of last
         // commit.
-        if (!restored) {
+        if (restored) {
+            initOffsetTableFromRestoredOffsets(messageQueues);
+        } else {
             initOffsets(messageQueues);
         }
     }
@@ -539,11 +537,11 @@
         }
     }
 
-    public void initOffsetTableFromRestoredOffsets() {
+    public void initOffsetTableFromRestoredOffsets(List<MessageQueue> messageQueues) {
         Preconditions.checkNotNull(restoredOffsets, "restoredOffsets can't be null");
         restoredOffsets.forEach(
                 (mq, offset) -> {
-                    if (!offsetTable.containsKey(mq) || offsetTable.get(mq) < offset) {
+                    if (messageQueues.contains(mq)) {
                         offsetTable.put(mq, offset);
                     }
                 });
diff --git a/src/test/java/org/apache/rocketmq/flink/legacy/sourceFunction/RocketMQSourceFunctionTest.java b/src/test/java/org/apache/rocketmq/flink/legacy/sourceFunction/RocketMQSourceFunctionTest.java
index 8b7b44f..6ef73e6 100644
--- a/src/test/java/org/apache/rocketmq/flink/legacy/sourceFunction/RocketMQSourceFunctionTest.java
+++ b/src/test/java/org/apache/rocketmq/flink/legacy/sourceFunction/RocketMQSourceFunctionTest.java
@@ -27,6 +27,7 @@
 
 import org.junit.Test;
 
+import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.Map;
 import java.util.Properties;
@@ -78,7 +79,7 @@
         map.put(new MessageQueue("tpc", "broker-1", 1), 31L);
         setFieldValue(source, "restoredOffsets", map);
         setFieldValue(source, "offsetTable", new ConcurrentHashMap<>());
-        source.initOffsetTableFromRestoredOffsets();
+        source.initOffsetTableFromRestoredOffsets(new ArrayList<>(map.keySet()));
         Map<MessageQueue, Long> offsetTable = (Map) getFieldValue(source, "offsetTable");
         for (Map.Entry<MessageQueue, Long> entry : map.entrySet()) {
             assertEquals(offsetTable.containsKey(entry.getKey()), true);