Merge pull request #466 from zhangjidi2016/fix_broadcast
[ISSUE #458]Fixed the problem of cannot consume previous messages in broadcast consumption mode
diff --git a/rocketmq-spring-boot/src/main/java/org/apache/rocketmq/spring/support/DefaultRocketMQListenerContainer.java b/rocketmq-spring-boot/src/main/java/org/apache/rocketmq/spring/support/DefaultRocketMQListenerContainer.java
index 182f08c..94d2671 100644
--- a/rocketmq-spring-boot/src/main/java/org/apache/rocketmq/spring/support/DefaultRocketMQListenerContainer.java
+++ b/rocketmq-spring-boot/src/main/java/org/apache/rocketmq/spring/support/DefaultRocketMQListenerContainer.java
@@ -619,7 +619,6 @@
resolveRequiredPlaceholders(this.rocketMQMessageListener.customizedTraceTopic()));
}
consumer.setNamespace(namespace);
- consumer.setInstanceName(RocketMQUtil.getInstanceName(nameServer));
String customizedNameServer = this.applicationContext.getEnvironment().resolveRequiredPlaceholders(this.rocketMQMessageListener.nameServer());
if (customizedNameServer != null) {
@@ -639,9 +638,11 @@
switch (messageModel) {
case BROADCASTING:
consumer.setMessageModel(org.apache.rocketmq.common.protocol.heartbeat.MessageModel.BROADCASTING);
+ consumer.setInstanceName(Long.toString(nameServer.hashCode()));
break;
case CLUSTERING:
consumer.setMessageModel(org.apache.rocketmq.common.protocol.heartbeat.MessageModel.CLUSTERING);
+ consumer.setInstanceName(RocketMQUtil.getInstanceName(nameServer));
break;
default:
throw new IllegalArgumentException("Property 'messageModel' was wrong.");
diff --git a/rocketmq-spring-boot/src/main/java/org/apache/rocketmq/spring/support/RocketMQUtil.java b/rocketmq-spring-boot/src/main/java/org/apache/rocketmq/spring/support/RocketMQUtil.java
index e598363..aad63b4 100644
--- a/rocketmq-spring-boot/src/main/java/org/apache/rocketmq/spring/support/RocketMQUtil.java
+++ b/rocketmq-spring-boot/src/main/java/org/apache/rocketmq/spring/support/RocketMQUtil.java
@@ -293,7 +293,7 @@
instanceName.append(identify, 0, maxLength)
.append(identify.hashCode());
} else {
- instanceName.append(identify);
+ instanceName.append(identify.hashCode());
}
instanceName.append(separator).append(UtilAll.getPid())
.append(separator).append(System.nanoTime());
@@ -312,7 +312,6 @@
litePullConsumer = new DefaultLitePullConsumer(groupName);
}
litePullConsumer.setNamesrvAddr(nameServer);
- litePullConsumer.setInstanceName(RocketMQUtil.getInstanceName(nameServer));
litePullConsumer.setPullBatchSize(pullBatchSize);
if (accessChannel != null) {
litePullConsumer.setAccessChannel(AccessChannel.valueOf(accessChannel));
@@ -322,9 +321,11 @@
switch (messageModel) {
case BROADCASTING:
litePullConsumer.setMessageModel(org.apache.rocketmq.common.protocol.heartbeat.MessageModel.BROADCASTING);
+ litePullConsumer.setInstanceName(Long.toString(nameServer.hashCode()));
break;
case CLUSTERING:
litePullConsumer.setMessageModel(org.apache.rocketmq.common.protocol.heartbeat.MessageModel.CLUSTERING);
+ litePullConsumer.setInstanceName(RocketMQUtil.getInstanceName(nameServer));
break;
default:
throw new IllegalArgumentException("Property 'messageModel' was wrong.");
diff --git a/rocketmq-spring-boot/src/test/java/org/apache/rocketmq/spring/support/RocketMQUtilTest.java b/rocketmq-spring-boot/src/test/java/org/apache/rocketmq/spring/support/RocketMQUtilTest.java
index 78a91e0..d112c0e 100644
--- a/rocketmq-spring-boot/src/test/java/org/apache/rocketmq/spring/support/RocketMQUtilTest.java
+++ b/rocketmq-spring-boot/src/test/java/org/apache/rocketmq/spring/support/RocketMQUtilTest.java
@@ -148,8 +148,8 @@
@Test
public void testGetInstanceName() {
String nameServer = "127.0.0.1:9876";
- String expected = "127.0.0.1:9876@";
- assertEquals(expected + UtilAll.getPid(), removeNanoTime(RocketMQUtil.getInstanceName(nameServer)));
+ String expected = "127.0.0.1:9876";
+ assertEquals(expected.hashCode() + "@" + UtilAll.getPid(), removeNanoTime(RocketMQUtil.getInstanceName(nameServer)));
nameServer = "I-am-a-very-very-long-domain-name-1:9876;I-am-a-very-very-long-domain-name-2:9876;I-am-a-very-very-long-domain-name-3:9876";
expected = "I-am-a-very-very-long-domain-name-1:9876;I-am-a-very-very-long-domain-name-2:9876;I-am-a-very-very-l-335144505@";