[ISSUE #219] Support to enable tls through configuration file
[ISSUE #219] Support to enable tls through configuration file
diff --git a/.gitignore b/.gitignore
index 06bb326..2457913 100644
--- a/.gitignore
+++ b/.gitignore
@@ -11,3 +11,4 @@
!LICENSE-BIN
.DS_Store
.vscode
+.factorypath
diff --git a/rocketmq-spring-boot-samples/rocketmq-consume-demo/src/main/java/org/apache/rocketmq/samples/springboot/ExtRocketMQTemplate.java b/rocketmq-spring-boot-samples/rocketmq-consume-demo/src/main/java/org/apache/rocketmq/samples/springboot/ExtRocketMQTemplate.java
index 5499ac1..a2ea1b5 100644
--- a/rocketmq-spring-boot-samples/rocketmq-consume-demo/src/main/java/org/apache/rocketmq/samples/springboot/ExtRocketMQTemplate.java
+++ b/rocketmq-spring-boot-samples/rocketmq-consume-demo/src/main/java/org/apache/rocketmq/samples/springboot/ExtRocketMQTemplate.java
@@ -19,6 +19,6 @@
import org.apache.rocketmq.spring.annotation.ExtRocketMQConsumerConfiguration;
import org.apache.rocketmq.spring.core.RocketMQTemplate;
-@ExtRocketMQConsumerConfiguration(topic = "${demo.rocketmq.topic}", group = "string_consumer")
+@ExtRocketMQConsumerConfiguration(topic = "${demo.rocketmq.topic}", group = "string_consumer", tlsEnable = "${demo.ext.consumer.tlsEnable}")
public class ExtRocketMQTemplate extends RocketMQTemplate {
}
\ No newline at end of file
diff --git a/rocketmq-spring-boot-samples/rocketmq-consume-demo/src/main/java/org/apache/rocketmq/samples/springboot/consumer/StringConsumer.java b/rocketmq-spring-boot-samples/rocketmq-consume-demo/src/main/java/org/apache/rocketmq/samples/springboot/consumer/StringConsumer.java
index 11ac489..e407ec7 100644
--- a/rocketmq-spring-boot-samples/rocketmq-consume-demo/src/main/java/org/apache/rocketmq/samples/springboot/consumer/StringConsumer.java
+++ b/rocketmq-spring-boot-samples/rocketmq-consume-demo/src/main/java/org/apache/rocketmq/samples/springboot/consumer/StringConsumer.java
@@ -25,7 +25,7 @@
* StringConsumer
*/
@Service
-@RocketMQMessageListener(topic = "${demo.rocketmq.topic}", consumerGroup = "string_consumer", selectorExpression = "${demo.rocketmq.tag}")
+@RocketMQMessageListener(topic = "${demo.rocketmq.topic}", consumerGroup = "string_consumer", selectorExpression = "${demo.rocketmq.tag}", tlsEnable = "${demo.rocketmq.tlsEnable}")
public class StringConsumer implements RocketMQListener<String> {
@Override
public void onMessage(String message) {
diff --git a/rocketmq-spring-boot-samples/rocketmq-consume-demo/src/main/resources/application.properties b/rocketmq-spring-boot-samples/rocketmq-consume-demo/src/main/resources/application.properties
index 5953e48..3e54413 100644
--- a/rocketmq-spring-boot-samples/rocketmq-consume-demo/src/main/resources/application.properties
+++ b/rocketmq-spring-boot-samples/rocketmq-consume-demo/src/main/resources/application.properties
@@ -33,6 +33,11 @@
demo.rocketmq.transTopic=spring-transaction-topic
demo.rocketmq.topic.user=user-topic
demo.rocketmq.tag=tagA
-
# another nameserver different global
-demo.rocketmq.myNameServer=127.0.0.1:9876
\ No newline at end of file
+demo.rocketmq.myNameServer=127.0.0.1:9876
+# my Consumer TLS Listener
+demo.rocketmq.tlsEnable=false
+# default LitePullConsumer TLS
+rocketmq.consumer.tlsEnable=false
+# ext rocketmq consumer template TLS
+demo.ext.consumer.tlsEnable=false
diff --git a/rocketmq-spring-boot-samples/rocketmq-produce-demo/src/main/java/org/apache/rocketmq/samples/springboot/ExtRocketMQTemplate.java b/rocketmq-spring-boot-samples/rocketmq-produce-demo/src/main/java/org/apache/rocketmq/samples/springboot/ExtRocketMQTemplate.java
index 7a78552..e509187 100644
--- a/rocketmq-spring-boot-samples/rocketmq-produce-demo/src/main/java/org/apache/rocketmq/samples/springboot/ExtRocketMQTemplate.java
+++ b/rocketmq-spring-boot-samples/rocketmq-produce-demo/src/main/java/org/apache/rocketmq/samples/springboot/ExtRocketMQTemplate.java
@@ -19,6 +19,6 @@
import org.apache.rocketmq.spring.annotation.ExtRocketMQTemplateConfiguration;
import org.apache.rocketmq.spring.core.RocketMQTemplate;
-@ExtRocketMQTemplateConfiguration(nameServer = "${demo.rocketmq.extNameServer}")
+@ExtRocketMQTemplateConfiguration(nameServer = "${demo.rocketmq.extNameServer}", tlsEnable = "${demo.rocketmq.ext.useTLS}")
public class ExtRocketMQTemplate extends RocketMQTemplate {
}
\ No newline at end of file
diff --git a/rocketmq-spring-boot-samples/rocketmq-produce-demo/src/main/resources/application.properties b/rocketmq-spring-boot-samples/rocketmq-produce-demo/src/main/resources/application.properties
index 7b5e422..fcf58e1 100644
--- a/rocketmq-spring-boot-samples/rocketmq-produce-demo/src/main/resources/application.properties
+++ b/rocketmq-spring-boot-samples/rocketmq-produce-demo/src/main/resources/application.properties
@@ -29,4 +29,8 @@
demo.rocketmq.objectRequestTopic=objectRequestTopic:tagA
demo.rocketmq.genericRequestTopic=genericRequestTopic:tagA
-demo.rocketmq.extNameServer=127.0.0.1:9876
\ No newline at end of file
+demo.rocketmq.extNameServer=127.0.0.1:9876
+# default producer tls config
+rocketmq.producer.tls-enable=false
+# self ext producer tls config
+demo.rocketmq.ext.useTLS=false
diff --git a/rocketmq-spring-boot/src/main/java/org/apache/rocketmq/spring/annotation/ExtRocketMQConsumerConfiguration.java b/rocketmq-spring-boot/src/main/java/org/apache/rocketmq/spring/annotation/ExtRocketMQConsumerConfiguration.java
index 8529d24..84d641a 100644
--- a/rocketmq-spring-boot/src/main/java/org/apache/rocketmq/spring/annotation/ExtRocketMQConsumerConfiguration.java
+++ b/rocketmq-spring-boot/src/main/java/org/apache/rocketmq/spring/annotation/ExtRocketMQConsumerConfiguration.java
@@ -97,6 +97,11 @@
int pullBatchSize() default 10;
/**
+ * The property of "tlsEnable" default false.
+ */
+ String tlsEnable() default "false";
+
+ /**
* Switch flag instance for message trace.
*/
boolean enableMsgTrace() default false;
diff --git a/rocketmq-spring-boot/src/main/java/org/apache/rocketmq/spring/annotation/ExtRocketMQTemplateConfiguration.java b/rocketmq-spring-boot/src/main/java/org/apache/rocketmq/spring/annotation/ExtRocketMQTemplateConfiguration.java
index f24493d..8e80147 100644
--- a/rocketmq-spring-boot/src/main/java/org/apache/rocketmq/spring/annotation/ExtRocketMQTemplateConfiguration.java
+++ b/rocketmq-spring-boot/src/main/java/org/apache/rocketmq/spring/annotation/ExtRocketMQTemplateConfiguration.java
@@ -85,4 +85,8 @@
* The name value of message trace topic.If you don't config,you can use the default trace topic name.
*/
String customizedTraceTopic() default "${rocketmq.producer.customized-trace-topic:}";
-}
\ No newline at end of file
+ /**
+ * The property of "tlsEnable" default false.
+ */
+ String tlsEnable() default "false";
+}
diff --git a/rocketmq-spring-boot/src/main/java/org/apache/rocketmq/spring/annotation/RocketMQMessageListener.java b/rocketmq-spring-boot/src/main/java/org/apache/rocketmq/spring/annotation/RocketMQMessageListener.java
index 9662e15..e80f328 100644
--- a/rocketmq-spring-boot/src/main/java/org/apache/rocketmq/spring/annotation/RocketMQMessageListener.java
+++ b/rocketmq-spring-boot/src/main/java/org/apache/rocketmq/spring/annotation/RocketMQMessageListener.java
@@ -122,4 +122,9 @@
* The property of "access-channel".
*/
String accessChannel() default ACCESS_CHANNEL_PLACEHOLDER;
+
+ /**
+ * The property of "tlsEnable" default false.
+ */
+ String tlsEnable() default "false";
}
diff --git a/rocketmq-spring-boot/src/main/java/org/apache/rocketmq/spring/autoconfigure/ExtConsumerResetConfiguration.java b/rocketmq-spring-boot/src/main/java/org/apache/rocketmq/spring/autoconfigure/ExtConsumerResetConfiguration.java
index dc8a039..f2ece8a 100644
--- a/rocketmq-spring-boot/src/main/java/org/apache/rocketmq/spring/autoconfigure/ExtConsumerResetConfiguration.java
+++ b/rocketmq-spring-boot/src/main/java/org/apache/rocketmq/spring/autoconfigure/ExtConsumerResetConfiguration.java
@@ -126,8 +126,10 @@
String ak = resolvePlaceholders(annotation.accessKey(), consumerConfig.getAccessKey());
String sk = resolvePlaceholders(annotation.secretKey(), consumerConfig.getSecretKey());
int pullBatchSize = annotation.pullBatchSize();
+ //if String is not is equal "true" TLS mode will represent the as default value false
+ boolean useTLS = new Boolean(environment.resolvePlaceholders(annotation.tlsEnable()));
DefaultLitePullConsumer litePullConsumer = RocketMQUtil.createDefaultLitePullConsumer(nameServer, accessChannel,
- groupName, topicName, messageModel, selectorType, selectorExpression, ak, sk, pullBatchSize);
+ groupName, topicName, messageModel, selectorType, selectorExpression, ak, sk, pullBatchSize, useTLS);
litePullConsumer.setEnableMsgTrace(annotation.enableMsgTrace());
litePullConsumer.setCustomizedTraceTopic(resolvePlaceholders(annotation.customizedTraceTopic(), consumerConfig.getCustomizedTraceTopic()));
return litePullConsumer;
diff --git a/rocketmq-spring-boot/src/main/java/org/apache/rocketmq/spring/autoconfigure/ExtProducerResetConfiguration.java b/rocketmq-spring-boot/src/main/java/org/apache/rocketmq/spring/autoconfigure/ExtProducerResetConfiguration.java
index 58d5eae..ca304e1 100644
--- a/rocketmq-spring-boot/src/main/java/org/apache/rocketmq/spring/autoconfigure/ExtProducerResetConfiguration.java
+++ b/rocketmq-spring-boot/src/main/java/org/apache/rocketmq/spring/autoconfigure/ExtProducerResetConfiguration.java
@@ -116,6 +116,8 @@
boolean isEnableMsgTrace = annotation.enableMsgTrace();
String customizedTraceTopic = environment.resolvePlaceholders(annotation.customizedTraceTopic());
customizedTraceTopic = StringUtils.isEmpty(customizedTraceTopic) ? producerConfig.getCustomizedTraceTopic() : customizedTraceTopic;
+ //if String is not is equal "true" TLS mode will represent the as default value false
+ boolean useTLS = new Boolean(environment.resolvePlaceholders(annotation.tlsEnable()));
DefaultMQProducer producer = RocketMQUtil.createDefaultMQProducer(groupName, ak, sk, isEnableMsgTrace, customizedTraceTopic);
@@ -126,6 +128,7 @@
producer.setMaxMessageSize(annotation.maxMessageSize() == -1 ? producerConfig.getMaxMessageSize() : annotation.maxMessageSize());
producer.setCompressMsgBodyOverHowmuch(annotation.compressMessageBodyThreshold() == -1 ? producerConfig.getCompressMessageBodyThreshold() : annotation.compressMessageBodyThreshold());
producer.setRetryAnotherBrokerWhenNotStoreOK(annotation.retryNextServer());
+ producer.setUseTLS(useTLS);
return producer;
}
diff --git a/rocketmq-spring-boot/src/main/java/org/apache/rocketmq/spring/autoconfigure/ListenerContainerConfiguration.java b/rocketmq-spring-boot/src/main/java/org/apache/rocketmq/spring/autoconfigure/ListenerContainerConfiguration.java
index 1e3f1ed..5bd6413 100644
--- a/rocketmq-spring-boot/src/main/java/org/apache/rocketmq/spring/autoconfigure/ListenerContainerConfiguration.java
+++ b/rocketmq-spring-boot/src/main/java/org/apache/rocketmq/spring/autoconfigure/ListenerContainerConfiguration.java
@@ -146,6 +146,7 @@
container.setSelectorExpression(tags);
}
container.setConsumerGroup(environment.resolvePlaceholders(annotation.consumerGroup()));
+ container.setTlsEnable(environment.resolvePlaceholders(annotation.tlsEnable()));
if (RocketMQListener.class.isAssignableFrom(bean.getClass())) {
container.setRocketMQListener((RocketMQListener) bean);
} else if (RocketMQReplyListener.class.isAssignableFrom(bean.getClass())) {
diff --git a/rocketmq-spring-boot/src/main/java/org/apache/rocketmq/spring/autoconfigure/RocketMQAutoConfiguration.java b/rocketmq-spring-boot/src/main/java/org/apache/rocketmq/spring/autoconfigure/RocketMQAutoConfiguration.java
index 41dc8a8..ea00432 100644
--- a/rocketmq-spring-boot/src/main/java/org/apache/rocketmq/spring/autoconfigure/RocketMQAutoConfiguration.java
+++ b/rocketmq-spring-boot/src/main/java/org/apache/rocketmq/spring/autoconfigure/RocketMQAutoConfiguration.java
@@ -115,6 +115,7 @@
producer.setMaxMessageSize(producerConfig.getMaxMessageSize());
producer.setCompressMsgBodyOverHowmuch(producerConfig.getCompressMessageBodyThreshold());
producer.setRetryAnotherBrokerWhenNotStoreOK(producerConfig.isRetryNextServer());
+ producer.setUseTLS(producerConfig.isTlsEnable());
return producer;
}
@@ -139,9 +140,10 @@
String ak = consumerConfig.getAccessKey();
String sk = consumerConfig.getSecretKey();
int pullBatchSize = consumerConfig.getPullBatchSize();
+ boolean useTLS = consumerConfig.isTlsEnable();
DefaultLitePullConsumer litePullConsumer = RocketMQUtil.createDefaultLitePullConsumer(nameServer, accessChannel,
- groupName, topicName, messageModel, selectorType, selectorExpression, ak, sk, pullBatchSize);
+ groupName, topicName, messageModel, selectorType, selectorExpression, ak, sk, pullBatchSize, useTLS);
litePullConsumer.setEnableMsgTrace(consumerConfig.isEnableMsgTrace());
litePullConsumer.setCustomizedTraceTopic(consumerConfig.getCustomizedTraceTopic());
return litePullConsumer;
@@ -176,4 +178,4 @@
static class DefaultLitePullConsumerExistsCondition {
}
}
-}
+}
\ No newline at end of file
diff --git a/rocketmq-spring-boot/src/main/java/org/apache/rocketmq/spring/autoconfigure/RocketMQProperties.java b/rocketmq-spring-boot/src/main/java/org/apache/rocketmq/spring/autoconfigure/RocketMQProperties.java
index 4337ff3..d6c8a90 100644
--- a/rocketmq-spring-boot/src/main/java/org/apache/rocketmq/spring/autoconfigure/RocketMQProperties.java
+++ b/rocketmq-spring-boot/src/main/java/org/apache/rocketmq/spring/autoconfigure/RocketMQProperties.java
@@ -132,6 +132,11 @@
*/
private String customizedTraceTopic = TopicValidator.RMQ_SYS_TRACE_TOPIC;
+ /**
+ * The property of "tlsEnable".
+ */
+ private boolean tlsEnable = false;
+
public String getGroup() {
return group;
}
@@ -219,6 +224,14 @@
public void setCustomizedTraceTopic(String customizedTraceTopic) {
this.customizedTraceTopic = customizedTraceTopic;
}
+
+ public boolean isTlsEnable() {
+ return tlsEnable;
+ }
+
+ public void setTlsEnable(boolean tlsEnable) {
+ this.tlsEnable = tlsEnable;
+ }
}
public Consumer getConsumer() {
@@ -290,6 +303,11 @@
*/
private Map<String, Map<String, Boolean>> listeners = new HashMap<>();
+ /**
+ * The property of "tlsEnable".
+ */
+ private boolean tlsEnable = false;
+
public String getGroup() {
return group;
}
@@ -377,6 +395,14 @@
public void setCustomizedTraceTopic(String customizedTraceTopic) {
this.customizedTraceTopic = customizedTraceTopic;
}
+
+ public boolean isTlsEnable() {
+ return tlsEnable;
+ }
+
+ public void setTlsEnable(boolean tlsEnable) {
+ this.tlsEnable = tlsEnable;
+ }
}
-}
+}
\ No newline at end of file
diff --git a/rocketmq-spring-boot/src/main/java/org/apache/rocketmq/spring/support/DefaultRocketMQListenerContainer.java b/rocketmq-spring-boot/src/main/java/org/apache/rocketmq/spring/support/DefaultRocketMQListenerContainer.java
index 1b6ad0e..2a42fe4 100644
--- a/rocketmq-spring-boot/src/main/java/org/apache/rocketmq/spring/support/DefaultRocketMQListenerContainer.java
+++ b/rocketmq-spring-boot/src/main/java/org/apache/rocketmq/spring/support/DefaultRocketMQListenerContainer.java
@@ -123,6 +123,7 @@
private long consumeTimeout;
private int maxReconsumeTimes;
private int replyTimeout;
+ private String tlsEnable;
public long getSuspendCurrentQueueTimeMillis() {
return suspendCurrentQueueTimeMillis;
@@ -224,6 +225,7 @@
this.consumeTimeout = anno.consumeTimeout();
this.maxReconsumeTimes = anno.maxReconsumeTimes();
this.replyTimeout = anno.replyTimeout();
+ this.tlsEnable = anno.tlsEnable();
}
public ConsumeMode getConsumeMode() {
@@ -246,6 +248,14 @@
return messageModel;
}
+ public String getTlsEnable() {
+ return tlsEnable;
+ }
+
+ public void setTlsEnable(String tlsEnable) {
+ this.tlsEnable = tlsEnable;
+ }
+
public DefaultMQPushConsumer getConsumer() {
return consumer;
}
@@ -339,7 +349,8 @@
", consumeMode=" + consumeMode +
", selectorType=" + selectorType +
", selectorExpression='" + selectorExpression + '\'' +
- ", messageModel=" + messageModel +
+ ", messageModel=" + messageModel + '\'' +
+ ", tlsEnable=" + tlsEnable +
'}';
}
@@ -619,6 +630,9 @@
throw new IllegalArgumentException("Property 'consumeMode' was wrong.");
}
+ //if String is not is equal "true" TLS mode will represent the as default value false
+ consumer.setUseTLS(new Boolean(tlsEnable));
+
if (rocketMQListener instanceof RocketMQPushConsumerLifecycleListener) {
((RocketMQPushConsumerLifecycleListener) rocketMQListener).prepareStart(consumer);
} else if (rocketMQReplyListener instanceof RocketMQPushConsumerLifecycleListener) {
@@ -627,4 +641,4 @@
}
-}
+}
\ No newline at end of file
diff --git a/rocketmq-spring-boot/src/main/java/org/apache/rocketmq/spring/support/RocketMQUtil.java b/rocketmq-spring-boot/src/main/java/org/apache/rocketmq/spring/support/RocketMQUtil.java
index 381d936..e8fbc94 100644
--- a/rocketmq-spring-boot/src/main/java/org/apache/rocketmq/spring/support/RocketMQUtil.java
+++ b/rocketmq-spring-boot/src/main/java/org/apache/rocketmq/spring/support/RocketMQUtil.java
@@ -294,7 +294,7 @@
public static DefaultLitePullConsumer createDefaultLitePullConsumer(String nameServer, String accessChannel,
String groupName, String topicName, MessageModel messageModel, SelectorType selectorType,
- String selectorExpression, String ak, String sk, int pullBatchSize)
+ String selectorExpression, String ak, String sk, int pullBatchSize, boolean useTLS)
throws MQClientException {
DefaultLitePullConsumer litePullConsumer = null;
if (!StringUtils.isEmpty(ak) && !StringUtils.isEmpty(sk)) {
@@ -309,6 +309,7 @@
if (accessChannel != null) {
litePullConsumer.setAccessChannel(AccessChannel.valueOf(accessChannel));
}
+ litePullConsumer.setUseTLS(useTLS);
switch (messageModel) {
case BROADCASTING: