Merge pull request #77 from walking98/master

[Issue #78] Add new accessChannel property for both producer/consumer @since RMQ 4.5.1
diff --git a/rocketmq-spring-boot/src/main/java/org/apache/rocketmq/spring/annotation/RocketMQMessageListener.java b/rocketmq-spring-boot/src/main/java/org/apache/rocketmq/spring/annotation/RocketMQMessageListener.java
index 1d48817..f1c7748 100644
--- a/rocketmq-spring-boot/src/main/java/org/apache/rocketmq/spring/annotation/RocketMQMessageListener.java
+++ b/rocketmq-spring-boot/src/main/java/org/apache/rocketmq/spring/annotation/RocketMQMessageListener.java
@@ -32,6 +32,7 @@
     String ACCESS_KEY_PLACEHOLDER = "${rocketmq.consumer.access-key:}";
     String SECRET_KEY_PLACEHOLDER = "${rocketmq.consumer.secret-key:}";
     String TRACE_TOPIC_PLACEHOLDER = "${rocketmq.consumer.customized-trace-topic:}";
+    String ACCESS_CHANNEL_PLACEHOLDER = "${rocketmq.access-channel:}";
 
     /**
      * Consumers of the same role is required to have exactly same subscriptions and consumerGroup to correctly achieve
@@ -103,4 +104,9 @@
      * The property of "name-server".
      */
     String nameServer() default NAME_SERVER_PLACEHOLDER;
+
+    /**
+     * The property of "access-channel".
+     */
+    String accessChannel() default ACCESS_CHANNEL_PLACEHOLDER;
 }
diff --git a/rocketmq-spring-boot/src/main/java/org/apache/rocketmq/spring/autoconfigure/ListenerContainerConfiguration.java b/rocketmq-spring-boot/src/main/java/org/apache/rocketmq/spring/autoconfigure/ListenerContainerConfiguration.java
index f409198..2d6cb3f 100644
--- a/rocketmq-spring-boot/src/main/java/org/apache/rocketmq/spring/autoconfigure/ListenerContainerConfiguration.java
+++ b/rocketmq-spring-boot/src/main/java/org/apache/rocketmq/spring/autoconfigure/ListenerContainerConfiguration.java
@@ -18,6 +18,7 @@
 package org.apache.rocketmq.spring.autoconfigure;
 
 import com.fasterxml.jackson.databind.ObjectMapper;
+import org.apache.rocketmq.client.AccessChannel;
 import org.apache.rocketmq.spring.annotation.ConsumeMode;
 import org.apache.rocketmq.spring.annotation.MessageModel;
 import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;
@@ -113,7 +114,11 @@
 
         String nameServer = environment.resolvePlaceholders(annotation.nameServer());
         nameServer = StringUtils.isEmpty(nameServer) ? rocketMQProperties.getNameServer() : nameServer;
+        String accessChannel = environment.resolvePlaceholders(annotation.accessChannel());
         container.setNameServer(nameServer);
+        if (!StringUtils.isEmpty(accessChannel)) {
+            container.setAccessChannel(AccessChannel.valueOf(accessChannel));
+        }
         container.setTopic(environment.resolvePlaceholders(annotation.topic()));
         container.setConsumerGroup(environment.resolvePlaceholders(annotation.consumerGroup()));
         container.setRocketMQMessageListener(annotation);
diff --git a/rocketmq-spring-boot/src/main/java/org/apache/rocketmq/spring/autoconfigure/RocketMQAutoConfiguration.java b/rocketmq-spring-boot/src/main/java/org/apache/rocketmq/spring/autoconfigure/RocketMQAutoConfiguration.java
index dbbd8f0..5196369 100644
--- a/rocketmq-spring-boot/src/main/java/org/apache/rocketmq/spring/autoconfigure/RocketMQAutoConfiguration.java
+++ b/rocketmq-spring-boot/src/main/java/org/apache/rocketmq/spring/autoconfigure/RocketMQAutoConfiguration.java
@@ -20,6 +20,7 @@
 import com.fasterxml.jackson.databind.ObjectMapper;
 import org.apache.rocketmq.acl.common.AclClientRPCHook;
 import org.apache.rocketmq.acl.common.SessionCredentials;
+import org.apache.rocketmq.client.AccessChannel;
 import org.apache.rocketmq.client.MQAdmin;
 import org.apache.rocketmq.client.producer.DefaultMQProducer;
 import org.apache.rocketmq.spring.config.RocketMQConfigUtils;
@@ -80,6 +81,8 @@
         Assert.hasText(nameServer, "[rocketmq.name-server] must not be null");
         Assert.hasText(groupName, "[rocketmq.producer.group] must not be null");
 
+        String accessChannel = rocketMQProperties.getAccessChannel();
+
         DefaultMQProducer producer;
         String ak = rocketMQProperties.getProducer().getAccessKey();
         String sk = rocketMQProperties.getProducer().getSecretKey();
@@ -94,6 +97,9 @@
         }
 
         producer.setNamesrvAddr(nameServer);
+        if (!StringUtils.isEmpty(accessChannel)) {
+            producer.setAccessChannel(AccessChannel.valueOf(accessChannel));
+        }
         producer.setSendMsgTimeout(producerConfig.getSendMessageTimeout());
         producer.setRetryTimesWhenSendFailed(producerConfig.getRetryTimesWhenSendFailed());
         producer.setRetryTimesWhenSendAsyncFailed(producerConfig.getRetryTimesWhenSendAsyncFailed());
diff --git a/rocketmq-spring-boot/src/main/java/org/apache/rocketmq/spring/autoconfigure/RocketMQProperties.java b/rocketmq-spring-boot/src/main/java/org/apache/rocketmq/spring/autoconfigure/RocketMQProperties.java
index c539b8f..62ca2be 100644
--- a/rocketmq-spring-boot/src/main/java/org/apache/rocketmq/spring/autoconfigure/RocketMQProperties.java
+++ b/rocketmq-spring-boot/src/main/java/org/apache/rocketmq/spring/autoconfigure/RocketMQProperties.java
@@ -29,6 +29,11 @@
      */
     private String nameServer;
 
+    /**
+     * Enum type for accesChannel, values: LOCAL, CLOUD
+     */
+    private String accessChannel;
+
     private Producer producer;
 
     public String getNameServer() {
@@ -39,6 +44,14 @@
         this.nameServer = nameServer;
     }
 
+    public String getAccessChannel() {
+        return accessChannel;
+    }
+
+    public void setAccessChannel(String accessChannel) {
+        this.accessChannel = accessChannel;
+    }
+
     public RocketMQProperties.Producer getProducer() {
         return producer;
     }
diff --git a/rocketmq-spring-boot/src/main/java/org/apache/rocketmq/spring/support/DefaultRocketMQListenerContainer.java b/rocketmq-spring-boot/src/main/java/org/apache/rocketmq/spring/support/DefaultRocketMQListenerContainer.java
index b56e3c3..6a73010 100644
--- a/rocketmq-spring-boot/src/main/java/org/apache/rocketmq/spring/support/DefaultRocketMQListenerContainer.java
+++ b/rocketmq-spring-boot/src/main/java/org/apache/rocketmq/spring/support/DefaultRocketMQListenerContainer.java
@@ -18,6 +18,7 @@
 package org.apache.rocketmq.spring.support;
 
 import com.fasterxml.jackson.databind.ObjectMapper;
+import org.apache.rocketmq.client.AccessChannel;
 import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
 import org.apache.rocketmq.client.consumer.MessageSelector;
 import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
@@ -74,6 +75,8 @@
 
     private String nameServer;
 
+    private AccessChannel accessChannel = AccessChannel.LOCAL;
+
     private String consumerGroup;
 
     private String topic;
@@ -125,6 +128,14 @@
         this.nameServer = nameServer;
     }
 
+    public AccessChannel getAccessChannel() {
+        return accessChannel;
+    }
+
+    public void setAccessChannel(AccessChannel accessChannel) {
+        this.accessChannel = accessChannel;
+    }
+
     public String getConsumerGroup() {
         return consumerGroup;
     }
@@ -431,6 +442,9 @@
         } else {
             consumer.setNamesrvAddr(nameServer);
         }
+        if (accessChannel != null) {
+            consumer.setAccessChannel(accessChannel);
+        }
         consumer.setConsumeThreadMax(consumeThreadMax);
         if (consumeThreadMax < consumer.getConsumeThreadMin()) {
             consumer.setConsumeThreadMin(consumeThreadMax);
diff --git a/rocketmq-spring-boot/src/test/java/org/apache/rocketmq/spring/autoconfigure/RocketMQAutoConfigurationTest.java b/rocketmq-spring-boot/src/test/java/org/apache/rocketmq/spring/autoconfigure/RocketMQAutoConfigurationTest.java
index 5fbe752..528e916 100644
--- a/rocketmq-spring-boot/src/test/java/org/apache/rocketmq/spring/autoconfigure/RocketMQAutoConfigurationTest.java
+++ b/rocketmq-spring-boot/src/test/java/org/apache/rocketmq/spring/autoconfigure/RocketMQAutoConfigurationTest.java
@@ -52,7 +52,8 @@
     @Test
     public void testDefaultMQProducerWithRelaxPropertyName() {
         runner.withPropertyValues("rocketmq.nameServer=127.0.0.1:9876",
-                "rocketmq.producer.group=spring_rocketmq").
+                "rocketmq.producer.group=spring_rocketmq",
+                "rocketmq.accessChannel=LOCAL").
                 run((context) -> {
                     assertThat(context).hasSingleBean(DefaultMQProducer.class);
                     assertThat(context).hasSingleBean(RocketMQProperties.class);
@@ -61,6 +62,17 @@
     }
 
     @Test
+    public void testBadAccessChannelProperty() {
+        runner.withPropertyValues("rocketmq.nameServer=127.0.0.1:9876",
+                "rocketmq.producer.group=spring_rocketmq",
+                "rocketmq.accessChannel=LOCAL123").
+                run((context) -> {
+                    //Should throw exception for bad accessChannel property
+                    assertThat(context).getFailure();
+                });
+    }
+
+    @Test
     public void testDefaultMQProducer() {
         runner.withPropertyValues("rocketmq.name-server=127.0.0.1:9876",
                 "rocketmq.producer.group=spring_rocketmq").