Update Rocketmq console 2.0.0 for support acl (#645)
* add accessKey and secretKey config support
* fix checkstyle
* Optimized code
* update acl for send messages
* update for check ACL when sending message
Co-authored-by: Demogorgon314 <wangkai744567028@gmail.com>
Co-authored-by: liwei5 <liwei5@vipkid.com.cn>
diff --git a/rocketmq-console/src/main/java/org/apache/rocketmq/console/aspect/admin/MQAdminAspect.java b/rocketmq-console/src/main/java/org/apache/rocketmq/console/aspect/admin/MQAdminAspect.java
index 8c7cf06..e8f2f4c 100644
--- a/rocketmq-console/src/main/java/org/apache/rocketmq/console/aspect/admin/MQAdminAspect.java
+++ b/rocketmq-console/src/main/java/org/apache/rocketmq/console/aspect/admin/MQAdminAspect.java
@@ -16,8 +16,8 @@
*/
package org.apache.rocketmq.console.aspect.admin;
-import java.lang.reflect.Method;
import org.apache.rocketmq.console.aspect.admin.annotation.MultiMQAdminCmdMethod;
+import org.apache.rocketmq.console.config.RMQConfigure;
import org.apache.rocketmq.console.service.client.MQAdminInstance;
import org.aspectj.lang.ProceedingJoinPoint;
import org.aspectj.lang.annotation.Around;
@@ -26,13 +26,19 @@
import org.aspectj.lang.reflect.MethodSignature;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
+import java.lang.reflect.Method;
+
@Aspect
@Service
public class MQAdminAspect {
private Logger logger = LoggerFactory.getLogger(MQAdminAspect.class);
+ @Autowired
+ private RMQConfigure rmqConfigure;
+
public MQAdminAspect() {
}
@@ -55,10 +61,10 @@
Method method = signature.getMethod();
MultiMQAdminCmdMethod multiMQAdminCmdMethod = method.getAnnotation(MultiMQAdminCmdMethod.class);
if (multiMQAdminCmdMethod != null && multiMQAdminCmdMethod.timeoutMillis() > 0) {
- MQAdminInstance.initMQAdminInstance(multiMQAdminCmdMethod.timeoutMillis());
+ MQAdminInstance.initMQAdminInstance(multiMQAdminCmdMethod.timeoutMillis(),rmqConfigure.getAccessKey(),rmqConfigure.getSecretKey());
}
else {
- MQAdminInstance.initMQAdminInstance(0);
+ MQAdminInstance.initMQAdminInstance(0,rmqConfigure.getAccessKey(),rmqConfigure.getSecretKey());
}
obj = joinPoint.proceed();
}
diff --git a/rocketmq-console/src/main/java/org/apache/rocketmq/console/config/RMQConfigure.java b/rocketmq-console/src/main/java/org/apache/rocketmq/console/config/RMQConfigure.java
index f25ce2c..4fbffa2 100644
--- a/rocketmq-console/src/main/java/org/apache/rocketmq/console/config/RMQConfigure.java
+++ b/rocketmq-console/src/main/java/org/apache/rocketmq/console/config/RMQConfigure.java
@@ -16,8 +16,6 @@
*/
package org.apache.rocketmq.console.config;
-import java.io.File;
-
import org.apache.commons.lang3.StringUtils;
import org.apache.rocketmq.common.MixAll;
import org.slf4j.Logger;
@@ -30,6 +28,8 @@
import org.springframework.context.annotation.Configuration;
import org.springframework.http.HttpStatus;
+import java.io.File;
+
import static org.apache.rocketmq.client.ClientConfig.SEND_MESSAGE_WITH_VIP_CHANNEL_PROPERTY;
@Configuration
@@ -51,6 +51,26 @@
private boolean loginRequired = false;
+ private String accessKey;
+
+ private String secretKey;
+
+ public String getAccessKey() {
+ return accessKey;
+ }
+
+ public void setAccessKey(String accessKey) {
+ this.accessKey = accessKey;
+ }
+
+ public String getSecretKey() {
+ return secretKey;
+ }
+
+ public void setSecretKey(String secretKey) {
+ this.secretKey = secretKey;
+ }
+
public String getNamesrvAddr() {
return namesrvAddr;
}
@@ -62,7 +82,10 @@
logger.info("setNameSrvAddrByProperty nameSrvAddr={}", namesrvAddr);
}
}
-
+ public boolean isACLEnabled(){
+ return !(StringUtils.isAnyBlank(this.accessKey, this.secretKey)||
+ StringUtils.isAnyEmpty(this.accessKey, this.secretKey));
+ }
public String getRocketMqConsoleDataPath() {
return dataPath;
}
diff --git a/rocketmq-console/src/main/java/org/apache/rocketmq/console/service/client/MQAdminInstance.java b/rocketmq-console/src/main/java/org/apache/rocketmq/console/service/client/MQAdminInstance.java
index e914e6c..3fb57d0 100644
--- a/rocketmq-console/src/main/java/org/apache/rocketmq/console/service/client/MQAdminInstance.java
+++ b/rocketmq-console/src/main/java/org/apache/rocketmq/console/service/client/MQAdminInstance.java
@@ -16,9 +16,13 @@
*/
package org.apache.rocketmq.console.service.client;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.rocketmq.acl.common.AclClientRPCHook;
+import org.apache.rocketmq.acl.common.SessionCredentials;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.client.impl.MQClientAPIImpl;
import org.apache.rocketmq.client.impl.factory.MQClientInstance;
+import org.apache.rocketmq.remoting.RPCHook;
import org.apache.rocketmq.remoting.RemotingClient;
import org.apache.rocketmq.tools.admin.DefaultMQAdminExt;
import org.apache.rocketmq.tools.admin.DefaultMQAdminExtImpl;
@@ -47,16 +51,20 @@
DefaultMQAdminExtImpl defaultMQAdminExtImpl = Reflect.on(MQAdminInstance.threadLocalMQAdminExt()).get("defaultMQAdminExtImpl");
return Reflect.on(defaultMQAdminExtImpl).get("mqClientInstance");
}
-
- public static void initMQAdminInstance(long timeoutMillis) throws MQClientException {
+ public static void initMQAdminInstance(long timeoutMillis,String accessKey,String secretKey) throws MQClientException {
Integer nowCount = INIT_COUNTER.get();
if (nowCount == null) {
+ RPCHook rpcHook = null;
+ boolean isEnableAcl = !StringUtils.isEmpty(accessKey) && !StringUtils.isEmpty(secretKey);
+ if (isEnableAcl) {
+ rpcHook = new AclClientRPCHook(new SessionCredentials(accessKey, secretKey));
+ }
DefaultMQAdminExt defaultMQAdminExt;
if (timeoutMillis > 0) {
- defaultMQAdminExt = new DefaultMQAdminExt(timeoutMillis);
+ defaultMQAdminExt = new DefaultMQAdminExt(rpcHook,timeoutMillis);
}
else {
- defaultMQAdminExt = new DefaultMQAdminExt();
+ defaultMQAdminExt = new DefaultMQAdminExt(rpcHook);
}
defaultMQAdminExt.setInstanceName(Long.toString(System.currentTimeMillis()));
defaultMQAdminExt.start();
diff --git a/rocketmq-console/src/main/java/org/apache/rocketmq/console/service/impl/MessageServiceImpl.java b/rocketmq-console/src/main/java/org/apache/rocketmq/console/service/impl/MessageServiceImpl.java
index fb41634..34d3994 100644
--- a/rocketmq-console/src/main/java/org/apache/rocketmq/console/service/impl/MessageServiceImpl.java
+++ b/rocketmq-console/src/main/java/org/apache/rocketmq/console/service/impl/MessageServiceImpl.java
@@ -17,6 +17,7 @@
package org.apache.rocketmq.console.service.impl;
+
import com.google.common.base.Function;
import com.google.common.base.Predicate;
import com.google.common.base.Throwables;
@@ -29,6 +30,8 @@
import java.util.Set;
import javax.annotation.Resource;
import org.apache.commons.lang3.StringUtils;
+import org.apache.rocketmq.acl.common.AclClientRPCHook;
+import org.apache.rocketmq.acl.common.SessionCredentials;
import org.apache.rocketmq.client.consumer.DefaultMQPullConsumer;
import org.apache.rocketmq.client.consumer.PullResult;
import org.apache.rocketmq.common.MixAll;
@@ -38,19 +41,25 @@
import org.apache.rocketmq.common.protocol.body.Connection;
import org.apache.rocketmq.common.protocol.body.ConsumeMessageDirectlyResult;
import org.apache.rocketmq.common.protocol.body.ConsumerConnection;
+import org.apache.rocketmq.console.config.RMQConfigure;
import org.apache.rocketmq.console.exception.ServiceException;
import org.apache.rocketmq.console.model.MessageView;
import org.apache.rocketmq.console.service.MessageService;
+import org.apache.rocketmq.remoting.RPCHook;
import org.apache.rocketmq.tools.admin.MQAdminExt;
import org.apache.rocketmq.tools.admin.api.MessageTrack;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
@Service
public class MessageServiceImpl implements MessageService {
private Logger logger = LoggerFactory.getLogger(MessageServiceImpl.class);
+
+ @Autowired
+ private RMQConfigure configure;
/**
* @see org.apache.rocketmq.store.config.MessageStoreConfig maxMsgsNumBatch = 64;
* @see org.apache.rocketmq.store.index.IndexService maxNum = Math.min(maxNum, this.defaultMessageStore.getMessageStoreConfig().getMaxMsgsNumBatch());
@@ -59,6 +68,7 @@
@Resource
private MQAdminExt mqAdminExt;
+ @Override
public Pair<MessageView, List<MessageTrack>> viewMessage(String subject, final String msgId) {
try {
@@ -88,7 +98,12 @@
@Override
public List<MessageView> queryMessageByTopic(String topic, final long begin, final long end) {
- DefaultMQPullConsumer consumer = new DefaultMQPullConsumer(MixAll.TOOLS_CONSUMER_GROUP);
+ boolean isEnableAcl = !StringUtils.isEmpty(configure.getAccessKey()) && !StringUtils.isEmpty(configure.getSecretKey());
+ RPCHook rpcHook = null;
+ if (isEnableAcl) {
+ rpcHook = new AclClientRPCHook(new SessionCredentials(configure.getAccessKey(),configure.getSecretKey()));
+ }
+ DefaultMQPullConsumer consumer = new DefaultMQPullConsumer(MixAll.TOOLS_CONSUMER_GROUP,rpcHook);
List<MessageView> messageViewList = Lists.newArrayList();
try {
String subExpression = "*";
diff --git a/rocketmq-console/src/main/java/org/apache/rocketmq/console/service/impl/TopicServiceImpl.java b/rocketmq-console/src/main/java/org/apache/rocketmq/console/service/impl/TopicServiceImpl.java
index 7660eaa..f6211c8 100644
--- a/rocketmq-console/src/main/java/org/apache/rocketmq/console/service/impl/TopicServiceImpl.java
+++ b/rocketmq-console/src/main/java/org/apache/rocketmq/console/service/impl/TopicServiceImpl.java
@@ -20,13 +20,9 @@
import com.google.common.base.Throwables;
import com.google.common.collect.Lists;
import com.google.common.collect.Sets;
-
-import java.util.Arrays;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Set;
-
import org.apache.commons.lang3.StringUtils;
+import org.apache.rocketmq.acl.common.AclClientRPCHook;
+import org.apache.rocketmq.acl.common.SessionCredentials;
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.common.MixAll;
@@ -43,11 +39,17 @@
import org.apache.rocketmq.console.model.request.TopicConfigInfo;
import org.apache.rocketmq.console.service.AbstractCommonService;
import org.apache.rocketmq.console.service.TopicService;
+import org.apache.rocketmq.remoting.RPCHook;
import org.apache.rocketmq.tools.command.CommandUtil;
import org.springframework.beans.BeanUtils;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
+import java.util.Arrays;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+
@Service
public class TopicServiceImpl extends AbstractCommonService implements TopicService {
@@ -209,7 +211,12 @@
}
private TopicList getSystemTopicList() {
- DefaultMQProducer producer = new DefaultMQProducer(MixAll.SELF_TEST_PRODUCER_GROUP);
+ RPCHook rpcHook = null;
+ boolean isEnableAcl = !StringUtils.isEmpty(rMQConfigure.getAccessKey()) && !StringUtils.isEmpty(rMQConfigure.getSecretKey());
+ if (isEnableAcl) {
+ rpcHook = new AclClientRPCHook(new SessionCredentials(rMQConfigure.getAccessKey(),rMQConfigure.getSecretKey()));
+ }
+ DefaultMQProducer producer = new DefaultMQProducer(MixAll.SELF_TEST_PRODUCER_GROUP,rpcHook);
producer.setInstanceName(String.valueOf(System.currentTimeMillis()));
producer.setNamesrvAddr(rMQConfigure.getNamesrvAddr());
@@ -228,7 +235,17 @@
@Override
public SendResult sendTopicMessageRequest(SendTopicMessageRequest sendTopicMessageRequest) {
- DefaultMQProducer producer = new DefaultMQProducer(MixAll.SELF_TEST_PRODUCER_GROUP);
+ DefaultMQProducer producer = null;
+ if(rMQConfigure.isACLEnabled()){
+ producer = new DefaultMQProducer(new AclClientRPCHook(new SessionCredentials(
+ rMQConfigure.getAccessKey(),
+ rMQConfigure.getSecretKey()
+ )));
+ producer.setProducerGroup(MixAll.SELF_TEST_PRODUCER_GROUP);
+ }else{
+ producer = new DefaultMQProducer(MixAll.SELF_TEST_PRODUCER_GROUP);
+ }
+
producer.setInstanceName(String.valueOf(System.currentTimeMillis()));
producer.setNamesrvAddr(rMQConfigure.getNamesrvAddr());
try {
diff --git a/rocketmq-console/src/main/resources/application.properties b/rocketmq-console/src/main/resources/application.properties
index a5d233c..6b1d2df 100644
--- a/rocketmq-console/src/main/resources/application.properties
+++ b/rocketmq-console/src/main/resources/application.properties
@@ -27,4 +27,8 @@
rocketmq.config.ticketKey=ticket
#Must create userInfo file: ${rocketmq.config.dataPath}/users.properties if the login is required
-rocketmq.config.loginRequired=false
\ No newline at end of file
+rocketmq.config.loginRequired=false
+
+#set the accessKey and secretKey if you used acl
+#rocketmq.config.accessKey=
+#rocketmq.config.secretKey=
\ No newline at end of file