[ISSUE #95] Fix source can not consume new queue's messages when topic queue expansion (#96)
diff --git a/src/main/java/org/apache/flink/connector/rocketmq/legacy/RocketMQSourceFunction.java b/src/main/java/org/apache/flink/connector/rocketmq/legacy/RocketMQSourceFunction.java
index e445643..bedf97f 100644
--- a/src/main/java/org/apache/flink/connector/rocketmq/legacy/RocketMQSourceFunction.java
+++ b/src/main/java/org/apache/flink/connector/rocketmq/legacy/RocketMQSourceFunction.java
@@ -62,6 +62,7 @@
import java.lang.management.ManagementFactory;
import java.nio.charset.StandardCharsets;
+import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
@@ -536,7 +537,7 @@
}
}
- public void initOffsetTableFromRestoredOffsets(List<MessageQueue> messageQueues) {
+ public void initOffsetTableFromRestoredOffsets(List<MessageQueue> messageQueues) throws MQClientException {
Preconditions.checkNotNull(restoredOffsets, "restoredOffsets can't be null");
restoredOffsets.forEach(
(mq, offset) -> {
@@ -544,6 +545,17 @@
offsetTable.put(mq, offset);
}
});
+
+ List<MessageQueue> extMessageQueue = new ArrayList<>();
+ for (MessageQueue messageQueue : messageQueues) {
+ if (!offsetTable.containsKey(messageQueue)) {
+ extMessageQueue.add(messageQueue);
+ }
+ }
+ if (extMessageQueue.size() != 0) {
+ log.info("no restoredOffsets for {}, so init offset for these queues", extMessageQueue);
+ initOffsets(extMessageQueue);
+ }
log.info("init offset table [{}] from restoredOffsets successful.", offsetTable);
}
diff --git a/src/test/java/org/apache/flink/connector/rocketmq/legacy/sourceFunction/RocketMQSourceFunctionTest.java b/src/test/java/org/apache/flink/connector/rocketmq/legacy/sourceFunction/RocketMQSourceFunctionTest.java
index cd514cd..08371b3 100644
--- a/src/test/java/org/apache/flink/connector/rocketmq/legacy/sourceFunction/RocketMQSourceFunctionTest.java
+++ b/src/test/java/org/apache/flink/connector/rocketmq/legacy/sourceFunction/RocketMQSourceFunctionTest.java
@@ -25,12 +25,16 @@
import org.apache.flink.connector.rocketmq.legacy.common.serialization.SimpleStringDeserializationSchema;
import org.apache.flink.connector.rocketmq.legacy.common.util.TestUtils;
+import org.apache.rocketmq.client.consumer.DefaultLitePullConsumer;
+import org.apache.rocketmq.client.consumer.DefaultMQPullConsumer;
import org.apache.rocketmq.common.message.MessageQueue;
import org.junit.Assert;
import org.junit.Test;
+import org.mockito.Mockito;
import java.util.ArrayList;
import java.util.HashMap;
+import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.concurrent.ConcurrentHashMap;
@@ -69,6 +73,8 @@
@Test
public void testRestartFromCheckpoint() throws Exception {
+ DefaultLitePullConsumer consumer = Mockito.mock(DefaultLitePullConsumer.class);
+ Mockito.when(consumer.committed(Mockito.any())).thenReturn(40L);
Properties properties = new Properties();
properties.setProperty(RocketMQConfig.CONSUMER_GROUP, "${ConsumerGroup}");
properties.setProperty(RocketMQConfig.CONSUMER_TOPIC, "${SourceTopic}");
@@ -82,13 +88,19 @@
map.put(new MessageQueue("tpc", "broker-0", 1), 21L);
map.put(new MessageQueue("tpc", "broker-1", 0), 30L);
map.put(new MessageQueue("tpc", "broker-1", 1), 31L);
+ List<MessageQueue> allocateMessageQueues = new ArrayList<>(map.keySet());
+ MessageQueue newMessageQueue = new MessageQueue("tpc", "broker-2", 0);
+ allocateMessageQueues.add(newMessageQueue);
+ TestUtils.setFieldValue(source, "messageQueues", allocateMessageQueues);
+ TestUtils.setFieldValue(source, "consumer", consumer);
TestUtils.setFieldValue(source, "restoredOffsets", map);
TestUtils.setFieldValue(source, "offsetTable", new ConcurrentHashMap<>());
- source.initOffsetTableFromRestoredOffsets(new ArrayList<>(map.keySet()));
+ source.initOffsetTableFromRestoredOffsets(allocateMessageQueues);
Map<MessageQueue, Long> offsetTable = (Map) TestUtils.getFieldValue(source, "offsetTable");
for (Map.Entry<MessageQueue, Long> entry : map.entrySet()) {
assertEquals(offsetTable.containsKey(entry.getKey()), true);
assertEquals(offsetTable.containsValue(entry.getValue()), true);
}
+ assertEquals(offsetTable.containsKey(newMessageQueue), true);
}
}