Merge pull request #77 from walking98/master
[Issue #78] Add new accessChannel property for both producer/consumer @since RMQ 4.5.1
diff --git a/rocketmq-spring-boot/src/main/java/org/apache/rocketmq/spring/annotation/RocketMQMessageListener.java b/rocketmq-spring-boot/src/main/java/org/apache/rocketmq/spring/annotation/RocketMQMessageListener.java
index 1d48817..f1c7748 100644
--- a/rocketmq-spring-boot/src/main/java/org/apache/rocketmq/spring/annotation/RocketMQMessageListener.java
+++ b/rocketmq-spring-boot/src/main/java/org/apache/rocketmq/spring/annotation/RocketMQMessageListener.java
@@ -32,6 +32,7 @@
String ACCESS_KEY_PLACEHOLDER = "${rocketmq.consumer.access-key:}";
String SECRET_KEY_PLACEHOLDER = "${rocketmq.consumer.secret-key:}";
String TRACE_TOPIC_PLACEHOLDER = "${rocketmq.consumer.customized-trace-topic:}";
+ String ACCESS_CHANNEL_PLACEHOLDER = "${rocketmq.access-channel:}";
/**
* Consumers of the same role is required to have exactly same subscriptions and consumerGroup to correctly achieve
@@ -103,4 +104,9 @@
* The property of "name-server".
*/
String nameServer() default NAME_SERVER_PLACEHOLDER;
+
+ /**
+ * The property of "access-channel".
+ */
+ String accessChannel() default ACCESS_CHANNEL_PLACEHOLDER;
}
diff --git a/rocketmq-spring-boot/src/main/java/org/apache/rocketmq/spring/autoconfigure/ListenerContainerConfiguration.java b/rocketmq-spring-boot/src/main/java/org/apache/rocketmq/spring/autoconfigure/ListenerContainerConfiguration.java
index f409198..2d6cb3f 100644
--- a/rocketmq-spring-boot/src/main/java/org/apache/rocketmq/spring/autoconfigure/ListenerContainerConfiguration.java
+++ b/rocketmq-spring-boot/src/main/java/org/apache/rocketmq/spring/autoconfigure/ListenerContainerConfiguration.java
@@ -18,6 +18,7 @@
package org.apache.rocketmq.spring.autoconfigure;
import com.fasterxml.jackson.databind.ObjectMapper;
+import org.apache.rocketmq.client.AccessChannel;
import org.apache.rocketmq.spring.annotation.ConsumeMode;
import org.apache.rocketmq.spring.annotation.MessageModel;
import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;
@@ -113,7 +114,11 @@
String nameServer = environment.resolvePlaceholders(annotation.nameServer());
nameServer = StringUtils.isEmpty(nameServer) ? rocketMQProperties.getNameServer() : nameServer;
+ String accessChannel = environment.resolvePlaceholders(annotation.accessChannel());
container.setNameServer(nameServer);
+ if (!StringUtils.isEmpty(accessChannel)) {
+ container.setAccessChannel(AccessChannel.valueOf(accessChannel));
+ }
container.setTopic(environment.resolvePlaceholders(annotation.topic()));
container.setConsumerGroup(environment.resolvePlaceholders(annotation.consumerGroup()));
container.setRocketMQMessageListener(annotation);
diff --git a/rocketmq-spring-boot/src/main/java/org/apache/rocketmq/spring/autoconfigure/RocketMQAutoConfiguration.java b/rocketmq-spring-boot/src/main/java/org/apache/rocketmq/spring/autoconfigure/RocketMQAutoConfiguration.java
index dbbd8f0..5196369 100644
--- a/rocketmq-spring-boot/src/main/java/org/apache/rocketmq/spring/autoconfigure/RocketMQAutoConfiguration.java
+++ b/rocketmq-spring-boot/src/main/java/org/apache/rocketmq/spring/autoconfigure/RocketMQAutoConfiguration.java
@@ -20,6 +20,7 @@
import com.fasterxml.jackson.databind.ObjectMapper;
import org.apache.rocketmq.acl.common.AclClientRPCHook;
import org.apache.rocketmq.acl.common.SessionCredentials;
+import org.apache.rocketmq.client.AccessChannel;
import org.apache.rocketmq.client.MQAdmin;
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.spring.config.RocketMQConfigUtils;
@@ -80,6 +81,8 @@
Assert.hasText(nameServer, "[rocketmq.name-server] must not be null");
Assert.hasText(groupName, "[rocketmq.producer.group] must not be null");
+ String accessChannel = rocketMQProperties.getAccessChannel();
+
DefaultMQProducer producer;
String ak = rocketMQProperties.getProducer().getAccessKey();
String sk = rocketMQProperties.getProducer().getSecretKey();
@@ -94,6 +97,9 @@
}
producer.setNamesrvAddr(nameServer);
+ if (!StringUtils.isEmpty(accessChannel)) {
+ producer.setAccessChannel(AccessChannel.valueOf(accessChannel));
+ }
producer.setSendMsgTimeout(producerConfig.getSendMessageTimeout());
producer.setRetryTimesWhenSendFailed(producerConfig.getRetryTimesWhenSendFailed());
producer.setRetryTimesWhenSendAsyncFailed(producerConfig.getRetryTimesWhenSendAsyncFailed());
diff --git a/rocketmq-spring-boot/src/main/java/org/apache/rocketmq/spring/autoconfigure/RocketMQProperties.java b/rocketmq-spring-boot/src/main/java/org/apache/rocketmq/spring/autoconfigure/RocketMQProperties.java
index c539b8f..62ca2be 100644
--- a/rocketmq-spring-boot/src/main/java/org/apache/rocketmq/spring/autoconfigure/RocketMQProperties.java
+++ b/rocketmq-spring-boot/src/main/java/org/apache/rocketmq/spring/autoconfigure/RocketMQProperties.java
@@ -29,6 +29,11 @@
*/
private String nameServer;
+ /**
+ * Enum type for accesChannel, values: LOCAL, CLOUD
+ */
+ private String accessChannel;
+
private Producer producer;
public String getNameServer() {
@@ -39,6 +44,14 @@
this.nameServer = nameServer;
}
+ public String getAccessChannel() {
+ return accessChannel;
+ }
+
+ public void setAccessChannel(String accessChannel) {
+ this.accessChannel = accessChannel;
+ }
+
public RocketMQProperties.Producer getProducer() {
return producer;
}
diff --git a/rocketmq-spring-boot/src/main/java/org/apache/rocketmq/spring/support/DefaultRocketMQListenerContainer.java b/rocketmq-spring-boot/src/main/java/org/apache/rocketmq/spring/support/DefaultRocketMQListenerContainer.java
index b56e3c3..6a73010 100644
--- a/rocketmq-spring-boot/src/main/java/org/apache/rocketmq/spring/support/DefaultRocketMQListenerContainer.java
+++ b/rocketmq-spring-boot/src/main/java/org/apache/rocketmq/spring/support/DefaultRocketMQListenerContainer.java
@@ -18,6 +18,7 @@
package org.apache.rocketmq.spring.support;
import com.fasterxml.jackson.databind.ObjectMapper;
+import org.apache.rocketmq.client.AccessChannel;
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.MessageSelector;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
@@ -74,6 +75,8 @@
private String nameServer;
+ private AccessChannel accessChannel = AccessChannel.LOCAL;
+
private String consumerGroup;
private String topic;
@@ -125,6 +128,14 @@
this.nameServer = nameServer;
}
+ public AccessChannel getAccessChannel() {
+ return accessChannel;
+ }
+
+ public void setAccessChannel(AccessChannel accessChannel) {
+ this.accessChannel = accessChannel;
+ }
+
public String getConsumerGroup() {
return consumerGroup;
}
@@ -431,6 +442,9 @@
} else {
consumer.setNamesrvAddr(nameServer);
}
+ if (accessChannel != null) {
+ consumer.setAccessChannel(accessChannel);
+ }
consumer.setConsumeThreadMax(consumeThreadMax);
if (consumeThreadMax < consumer.getConsumeThreadMin()) {
consumer.setConsumeThreadMin(consumeThreadMax);
diff --git a/rocketmq-spring-boot/src/test/java/org/apache/rocketmq/spring/autoconfigure/RocketMQAutoConfigurationTest.java b/rocketmq-spring-boot/src/test/java/org/apache/rocketmq/spring/autoconfigure/RocketMQAutoConfigurationTest.java
index 5fbe752..528e916 100644
--- a/rocketmq-spring-boot/src/test/java/org/apache/rocketmq/spring/autoconfigure/RocketMQAutoConfigurationTest.java
+++ b/rocketmq-spring-boot/src/test/java/org/apache/rocketmq/spring/autoconfigure/RocketMQAutoConfigurationTest.java
@@ -52,7 +52,8 @@
@Test
public void testDefaultMQProducerWithRelaxPropertyName() {
runner.withPropertyValues("rocketmq.nameServer=127.0.0.1:9876",
- "rocketmq.producer.group=spring_rocketmq").
+ "rocketmq.producer.group=spring_rocketmq",
+ "rocketmq.accessChannel=LOCAL").
run((context) -> {
assertThat(context).hasSingleBean(DefaultMQProducer.class);
assertThat(context).hasSingleBean(RocketMQProperties.class);
@@ -61,6 +62,17 @@
}
@Test
+ public void testBadAccessChannelProperty() {
+ runner.withPropertyValues("rocketmq.nameServer=127.0.0.1:9876",
+ "rocketmq.producer.group=spring_rocketmq",
+ "rocketmq.accessChannel=LOCAL123").
+ run((context) -> {
+ //Should throw exception for bad accessChannel property
+ assertThat(context).getFailure();
+ });
+ }
+
+ @Test
public void testDefaultMQProducer() {
runner.withPropertyValues("rocketmq.name-server=127.0.0.1:9876",
"rocketmq.producer.group=spring_rocketmq").