[ISSUE #95] Fix source can not consume new queue's messages when topic queue expansion (#96)

diff --git a/src/main/java/org/apache/flink/connector/rocketmq/legacy/RocketMQSourceFunction.java b/src/main/java/org/apache/flink/connector/rocketmq/legacy/RocketMQSourceFunction.java
index e445643..bedf97f 100644
--- a/src/main/java/org/apache/flink/connector/rocketmq/legacy/RocketMQSourceFunction.java
+++ b/src/main/java/org/apache/flink/connector/rocketmq/legacy/RocketMQSourceFunction.java
@@ -62,6 +62,7 @@
 
 import java.lang.management.ManagementFactory;
 import java.nio.charset.StandardCharsets;
+import java.util.ArrayList;
 import java.util.Collection;
 import java.util.Collections;
 import java.util.HashMap;
@@ -536,7 +537,7 @@
         }
     }
 
-    public void initOffsetTableFromRestoredOffsets(List<MessageQueue> messageQueues) {
+    public void initOffsetTableFromRestoredOffsets(List<MessageQueue> messageQueues) throws MQClientException {
         Preconditions.checkNotNull(restoredOffsets, "restoredOffsets can't be null");
         restoredOffsets.forEach(
                 (mq, offset) -> {
@@ -544,6 +545,17 @@
                         offsetTable.put(mq, offset);
                     }
                 });
+
+        List<MessageQueue> extMessageQueue = new ArrayList<>();
+        for (MessageQueue messageQueue : messageQueues) {
+            if (!offsetTable.containsKey(messageQueue)) {
+                extMessageQueue.add(messageQueue);
+            }
+        }
+        if (extMessageQueue.size() != 0) {
+            log.info("no restoredOffsets for {}, so init offset for these queues", extMessageQueue);
+            initOffsets(extMessageQueue);
+        }
         log.info("init offset table [{}] from restoredOffsets successful.", offsetTable);
     }
 
diff --git a/src/test/java/org/apache/flink/connector/rocketmq/legacy/sourceFunction/RocketMQSourceFunctionTest.java b/src/test/java/org/apache/flink/connector/rocketmq/legacy/sourceFunction/RocketMQSourceFunctionTest.java
index cd514cd..08371b3 100644
--- a/src/test/java/org/apache/flink/connector/rocketmq/legacy/sourceFunction/RocketMQSourceFunctionTest.java
+++ b/src/test/java/org/apache/flink/connector/rocketmq/legacy/sourceFunction/RocketMQSourceFunctionTest.java
@@ -25,12 +25,16 @@
 import org.apache.flink.connector.rocketmq.legacy.common.serialization.SimpleStringDeserializationSchema;
 import org.apache.flink.connector.rocketmq.legacy.common.util.TestUtils;
 
+import org.apache.rocketmq.client.consumer.DefaultLitePullConsumer;
+import org.apache.rocketmq.client.consumer.DefaultMQPullConsumer;
 import org.apache.rocketmq.common.message.MessageQueue;
 import org.junit.Assert;
 import org.junit.Test;
+import org.mockito.Mockito;
 
 import java.util.ArrayList;
 import java.util.HashMap;
+import java.util.List;
 import java.util.Map;
 import java.util.Properties;
 import java.util.concurrent.ConcurrentHashMap;
@@ -69,6 +73,8 @@
 
     @Test
     public void testRestartFromCheckpoint() throws Exception {
+        DefaultLitePullConsumer consumer = Mockito.mock(DefaultLitePullConsumer.class);
+        Mockito.when(consumer.committed(Mockito.any())).thenReturn(40L);
         Properties properties = new Properties();
         properties.setProperty(RocketMQConfig.CONSUMER_GROUP, "${ConsumerGroup}");
         properties.setProperty(RocketMQConfig.CONSUMER_TOPIC, "${SourceTopic}");
@@ -82,13 +88,19 @@
         map.put(new MessageQueue("tpc", "broker-0", 1), 21L);
         map.put(new MessageQueue("tpc", "broker-1", 0), 30L);
         map.put(new MessageQueue("tpc", "broker-1", 1), 31L);
+        List<MessageQueue> allocateMessageQueues = new ArrayList<>(map.keySet());
+        MessageQueue newMessageQueue = new MessageQueue("tpc", "broker-2", 0);
+        allocateMessageQueues.add(newMessageQueue);
+        TestUtils.setFieldValue(source, "messageQueues", allocateMessageQueues);
+        TestUtils.setFieldValue(source, "consumer", consumer);
         TestUtils.setFieldValue(source, "restoredOffsets", map);
         TestUtils.setFieldValue(source, "offsetTable", new ConcurrentHashMap<>());
-        source.initOffsetTableFromRestoredOffsets(new ArrayList<>(map.keySet()));
+        source.initOffsetTableFromRestoredOffsets(allocateMessageQueues);
         Map<MessageQueue, Long> offsetTable = (Map) TestUtils.getFieldValue(source, "offsetTable");
         for (Map.Entry<MessageQueue, Long> entry : map.entrySet()) {
             assertEquals(offsetTable.containsKey(entry.getKey()), true);
             assertEquals(offsetTable.containsValue(entry.getValue()), true);
         }
+        assertEquals(offsetTable.containsKey(newMessageQueue), true);
     }
 }