[ISSUE #396]support namespace (#397)
Co-authored-by: zhangjidi <zhangjidi@cmss.chinamobile.com>
diff --git a/rocketmq-spring-boot-samples/rocketmq-consume-demo/pom.xml b/rocketmq-spring-boot-samples/rocketmq-consume-demo/pom.xml
index 6f7fdf1..99fe1d6 100644
--- a/rocketmq-spring-boot-samples/rocketmq-consume-demo/pom.xml
+++ b/rocketmq-spring-boot-samples/rocketmq-consume-demo/pom.xml
@@ -23,7 +23,7 @@
<parent>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-spring-boot-samples</artifactId>
- <version>2.2.1-SNAPSHOT</version>
+ <version>2.2.2-SNAPSHOT</version>
</parent>
<artifactId>rocketmq-consume-demo</artifactId>
diff --git a/rocketmq-spring-boot-samples/rocketmq-produce-demo/pom.xml b/rocketmq-spring-boot-samples/rocketmq-produce-demo/pom.xml
index c9d121d..9473bcd 100644
--- a/rocketmq-spring-boot-samples/rocketmq-produce-demo/pom.xml
+++ b/rocketmq-spring-boot-samples/rocketmq-produce-demo/pom.xml
@@ -23,7 +23,7 @@
<parent>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-spring-boot-samples</artifactId>
- <version>2.2.1-SNAPSHOT</version>
+ <version>2.2.2-SNAPSHOT</version>
</parent>
<artifactId>rocketmq-produce-demo</artifactId>
diff --git a/rocketmq-spring-boot/src/main/java/org/apache/rocketmq/spring/annotation/ExtRocketMQConsumerConfiguration.java b/rocketmq-spring-boot/src/main/java/org/apache/rocketmq/spring/annotation/ExtRocketMQConsumerConfiguration.java
index 84d641a..0390fec 100644
--- a/rocketmq-spring-boot/src/main/java/org/apache/rocketmq/spring/annotation/ExtRocketMQConsumerConfiguration.java
+++ b/rocketmq-spring-boot/src/main/java/org/apache/rocketmq/spring/annotation/ExtRocketMQConsumerConfiguration.java
@@ -110,4 +110,9 @@
* The name value of message trace topic.If you don't config,you can use the default trace topic name.
*/
String customizedTraceTopic() default TRACE_TOPIC_PLACEHOLDER;
+
+ /**
+ * The namespace of consumer.
+ */
+ String namespace() default "";
}
\ No newline at end of file
diff --git a/rocketmq-spring-boot/src/main/java/org/apache/rocketmq/spring/annotation/ExtRocketMQTemplateConfiguration.java b/rocketmq-spring-boot/src/main/java/org/apache/rocketmq/spring/annotation/ExtRocketMQTemplateConfiguration.java
index 8e80147..090100c 100644
--- a/rocketmq-spring-boot/src/main/java/org/apache/rocketmq/spring/annotation/ExtRocketMQTemplateConfiguration.java
+++ b/rocketmq-spring-boot/src/main/java/org/apache/rocketmq/spring/annotation/ExtRocketMQTemplateConfiguration.java
@@ -89,4 +89,8 @@
* The property of "tlsEnable" default false.
*/
String tlsEnable() default "false";
+ /**
+ * The namespace of producer.
+ */
+ String namespace() default "";
}
diff --git a/rocketmq-spring-boot/src/main/java/org/apache/rocketmq/spring/annotation/RocketMQMessageListener.java b/rocketmq-spring-boot/src/main/java/org/apache/rocketmq/spring/annotation/RocketMQMessageListener.java
index e80f328..ecc3e0e 100644
--- a/rocketmq-spring-boot/src/main/java/org/apache/rocketmq/spring/annotation/RocketMQMessageListener.java
+++ b/rocketmq-spring-boot/src/main/java/org/apache/rocketmq/spring/annotation/RocketMQMessageListener.java
@@ -127,4 +127,9 @@
* The property of "tlsEnable" default false.
*/
String tlsEnable() default "false";
+
+ /**
+ * The namespace of consumer.
+ */
+ String namespace() default "";
}
diff --git a/rocketmq-spring-boot/src/main/java/org/apache/rocketmq/spring/autoconfigure/ExtConsumerResetConfiguration.java b/rocketmq-spring-boot/src/main/java/org/apache/rocketmq/spring/autoconfigure/ExtConsumerResetConfiguration.java
index f2ece8a..b1e9288 100644
--- a/rocketmq-spring-boot/src/main/java/org/apache/rocketmq/spring/autoconfigure/ExtConsumerResetConfiguration.java
+++ b/rocketmq-spring-boot/src/main/java/org/apache/rocketmq/spring/autoconfigure/ExtConsumerResetConfiguration.java
@@ -132,6 +132,7 @@
groupName, topicName, messageModel, selectorType, selectorExpression, ak, sk, pullBatchSize, useTLS);
litePullConsumer.setEnableMsgTrace(annotation.enableMsgTrace());
litePullConsumer.setCustomizedTraceTopic(resolvePlaceholders(annotation.customizedTraceTopic(), consumerConfig.getCustomizedTraceTopic()));
+ litePullConsumer.setNamespace(annotation.namespace());
return litePullConsumer;
}
diff --git a/rocketmq-spring-boot/src/main/java/org/apache/rocketmq/spring/autoconfigure/ExtProducerResetConfiguration.java b/rocketmq-spring-boot/src/main/java/org/apache/rocketmq/spring/autoconfigure/ExtProducerResetConfiguration.java
index ca304e1..ed9fb7d 100644
--- a/rocketmq-spring-boot/src/main/java/org/apache/rocketmq/spring/autoconfigure/ExtProducerResetConfiguration.java
+++ b/rocketmq-spring-boot/src/main/java/org/apache/rocketmq/spring/autoconfigure/ExtProducerResetConfiguration.java
@@ -129,7 +129,7 @@
producer.setCompressMsgBodyOverHowmuch(annotation.compressMessageBodyThreshold() == -1 ? producerConfig.getCompressMessageBodyThreshold() : annotation.compressMessageBodyThreshold());
producer.setRetryAnotherBrokerWhenNotStoreOK(annotation.retryNextServer());
producer.setUseTLS(useTLS);
-
+ producer.setNamespace(annotation.namespace());
return producer;
}
diff --git a/rocketmq-spring-boot/src/main/java/org/apache/rocketmq/spring/autoconfigure/RocketMQAutoConfiguration.java b/rocketmq-spring-boot/src/main/java/org/apache/rocketmq/spring/autoconfigure/RocketMQAutoConfiguration.java
index ea00432..b908950 100644
--- a/rocketmq-spring-boot/src/main/java/org/apache/rocketmq/spring/autoconfigure/RocketMQAutoConfiguration.java
+++ b/rocketmq-spring-boot/src/main/java/org/apache/rocketmq/spring/autoconfigure/RocketMQAutoConfiguration.java
@@ -116,7 +116,7 @@
producer.setCompressMsgBodyOverHowmuch(producerConfig.getCompressMessageBodyThreshold());
producer.setRetryAnotherBrokerWhenNotStoreOK(producerConfig.isRetryNextServer());
producer.setUseTLS(producerConfig.isTlsEnable());
-
+ producer.setNamespace(producerConfig.getNamespace());
return producer;
}
@@ -146,6 +146,7 @@
groupName, topicName, messageModel, selectorType, selectorExpression, ak, sk, pullBatchSize, useTLS);
litePullConsumer.setEnableMsgTrace(consumerConfig.isEnableMsgTrace());
litePullConsumer.setCustomizedTraceTopic(consumerConfig.getCustomizedTraceTopic());
+ litePullConsumer.setNamespace(consumerConfig.getNamespace());
return litePullConsumer;
}
diff --git a/rocketmq-spring-boot/src/main/java/org/apache/rocketmq/spring/autoconfigure/RocketMQProperties.java b/rocketmq-spring-boot/src/main/java/org/apache/rocketmq/spring/autoconfigure/RocketMQProperties.java
index d6c8a90..974bec9 100644
--- a/rocketmq-spring-boot/src/main/java/org/apache/rocketmq/spring/autoconfigure/RocketMQProperties.java
+++ b/rocketmq-spring-boot/src/main/java/org/apache/rocketmq/spring/autoconfigure/RocketMQProperties.java
@@ -81,6 +81,11 @@
private String group;
/**
+ * Namespace for this MQ Producer instance.
+ */
+ private String namespace;
+
+ /**
* Millis of send message timeout.
*/
private int sendMessageTimeout = 3000;
@@ -232,6 +237,14 @@
public void setTlsEnable(boolean tlsEnable) {
this.tlsEnable = tlsEnable;
}
+
+ public String getNamespace() {
+ return namespace;
+ }
+
+ public void setNamespace(String namespace) {
+ this.namespace = namespace;
+ }
}
public Consumer getConsumer() {
@@ -249,6 +262,11 @@
private String group;
/**
+ * Namespace for this MQ Consumer instance.
+ */
+ private String namespace;
+
+ /**
* Topic name of consumer.
*/
private String topic;
@@ -403,6 +421,14 @@
public void setTlsEnable(boolean tlsEnable) {
this.tlsEnable = tlsEnable;
}
+
+ public String getNamespace() {
+ return namespace;
+ }
+
+ public void setNamespace(String namespace) {
+ this.namespace = namespace;
+ }
}
}
\ No newline at end of file
diff --git a/rocketmq-spring-boot/src/main/java/org/apache/rocketmq/spring/support/DefaultRocketMQListenerContainer.java b/rocketmq-spring-boot/src/main/java/org/apache/rocketmq/spring/support/DefaultRocketMQListenerContainer.java
index 2a42fe4..f48788e 100644
--- a/rocketmq-spring-boot/src/main/java/org/apache/rocketmq/spring/support/DefaultRocketMQListenerContainer.java
+++ b/rocketmq-spring-boot/src/main/java/org/apache/rocketmq/spring/support/DefaultRocketMQListenerContainer.java
@@ -124,6 +124,7 @@
private int maxReconsumeTimes;
private int replyTimeout;
private String tlsEnable;
+ private String namespace;
public long getSuspendCurrentQueueTimeMillis() {
return suspendCurrentQueueTimeMillis;
@@ -226,6 +227,7 @@
this.maxReconsumeTimes = anno.maxReconsumeTimes();
this.replyTimeout = anno.replyTimeout();
this.tlsEnable = anno.tlsEnable();
+ this.namespace = anno.namespace();
}
public ConsumeMode getConsumeMode() {
@@ -256,6 +258,14 @@
this.tlsEnable = tlsEnable;
}
+ public String getNamespace() {
+ return namespace;
+ }
+
+ public void setNamespace(String namespace) {
+ this.namespace = namespace;
+ }
+
public DefaultMQPushConsumer getConsumer() {
return consumer;
}
@@ -344,13 +354,14 @@
public String toString() {
return "DefaultRocketMQListenerContainer{" +
"consumerGroup='" + consumerGroup + '\'' +
+ ", namespace='" + namespace + '\'' +
", nameServer='" + nameServer + '\'' +
", topic='" + topic + '\'' +
", consumeMode=" + consumeMode +
", selectorType=" + selectorType +
", selectorExpression='" + selectorExpression + '\'' +
- ", messageModel=" + messageModel + '\'' +
- ", tlsEnable=" + tlsEnable +
+ ", messageModel=" + messageModel + '\'' +
+ ", tlsEnable=" + tlsEnable +
'}';
}
@@ -579,7 +590,7 @@
this.applicationContext.getEnvironment().
resolveRequiredPlaceholders(this.rocketMQMessageListener.customizedTraceTopic()));
}
-
+ consumer.setNamespace(namespace);
consumer.setInstanceName(RocketMQUtil.getInstanceName(nameServer));
String customizedNameServer = this.applicationContext.getEnvironment().resolveRequiredPlaceholders(this.rocketMQMessageListener.nameServer());