Update Rocketmq console 2.0.0 for support acl (#645)

* add accessKey and secretKey config support

* fix checkstyle

* Optimized code

* update acl for send messages

* update for check ACL when sending message

Co-authored-by: Demogorgon314 <wangkai744567028@gmail.com>
Co-authored-by: liwei5 <liwei5@vipkid.com.cn>
diff --git a/rocketmq-console/src/main/java/org/apache/rocketmq/console/aspect/admin/MQAdminAspect.java b/rocketmq-console/src/main/java/org/apache/rocketmq/console/aspect/admin/MQAdminAspect.java
index 8c7cf06..e8f2f4c 100644
--- a/rocketmq-console/src/main/java/org/apache/rocketmq/console/aspect/admin/MQAdminAspect.java
+++ b/rocketmq-console/src/main/java/org/apache/rocketmq/console/aspect/admin/MQAdminAspect.java
@@ -16,8 +16,8 @@
  */
 package org.apache.rocketmq.console.aspect.admin;
 
-import java.lang.reflect.Method;
 import org.apache.rocketmq.console.aspect.admin.annotation.MultiMQAdminCmdMethod;
+import org.apache.rocketmq.console.config.RMQConfigure;
 import org.apache.rocketmq.console.service.client.MQAdminInstance;
 import org.aspectj.lang.ProceedingJoinPoint;
 import org.aspectj.lang.annotation.Around;
@@ -26,13 +26,19 @@
 import org.aspectj.lang.reflect.MethodSignature;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
+import org.springframework.beans.factory.annotation.Autowired;
 import org.springframework.stereotype.Service;
 
+import java.lang.reflect.Method;
+
 @Aspect
 @Service
 public class MQAdminAspect {
     private Logger logger = LoggerFactory.getLogger(MQAdminAspect.class);
 
+    @Autowired
+    private RMQConfigure rmqConfigure;
+
     public MQAdminAspect() {
     }
 
@@ -55,10 +61,10 @@
             Method method = signature.getMethod();
             MultiMQAdminCmdMethod multiMQAdminCmdMethod = method.getAnnotation(MultiMQAdminCmdMethod.class);
             if (multiMQAdminCmdMethod != null && multiMQAdminCmdMethod.timeoutMillis() > 0) {
-                MQAdminInstance.initMQAdminInstance(multiMQAdminCmdMethod.timeoutMillis());
+                MQAdminInstance.initMQAdminInstance(multiMQAdminCmdMethod.timeoutMillis(),rmqConfigure.getAccessKey(),rmqConfigure.getSecretKey());
             }
             else {
-                MQAdminInstance.initMQAdminInstance(0);
+                MQAdminInstance.initMQAdminInstance(0,rmqConfigure.getAccessKey(),rmqConfigure.getSecretKey());
             }
             obj = joinPoint.proceed();
         }
diff --git a/rocketmq-console/src/main/java/org/apache/rocketmq/console/config/RMQConfigure.java b/rocketmq-console/src/main/java/org/apache/rocketmq/console/config/RMQConfigure.java
index f25ce2c..4fbffa2 100644
--- a/rocketmq-console/src/main/java/org/apache/rocketmq/console/config/RMQConfigure.java
+++ b/rocketmq-console/src/main/java/org/apache/rocketmq/console/config/RMQConfigure.java
@@ -16,8 +16,6 @@
  */
 package org.apache.rocketmq.console.config;
 
-import java.io.File;
-
 import org.apache.commons.lang3.StringUtils;
 import org.apache.rocketmq.common.MixAll;
 import org.slf4j.Logger;
@@ -30,6 +28,8 @@
 import org.springframework.context.annotation.Configuration;
 import org.springframework.http.HttpStatus;
 
+import java.io.File;
+
 import static org.apache.rocketmq.client.ClientConfig.SEND_MESSAGE_WITH_VIP_CHANNEL_PROPERTY;
 
 @Configuration
@@ -51,6 +51,26 @@
 
     private boolean loginRequired = false;
 
+    private String accessKey;
+
+    private String secretKey;
+
+    public String getAccessKey() {
+        return accessKey;
+    }
+
+    public void setAccessKey(String accessKey) {
+        this.accessKey = accessKey;
+    }
+
+    public String getSecretKey() {
+        return secretKey;
+    }
+
+    public void setSecretKey(String secretKey) {
+        this.secretKey = secretKey;
+    }
+
     public String getNamesrvAddr() {
         return namesrvAddr;
     }
@@ -62,7 +82,10 @@
             logger.info("setNameSrvAddrByProperty nameSrvAddr={}", namesrvAddr);
         }
     }
-
+    public boolean isACLEnabled(){
+        return !(StringUtils.isAnyBlank(this.accessKey, this.secretKey)||
+                StringUtils.isAnyEmpty(this.accessKey, this.secretKey));
+    }
     public String getRocketMqConsoleDataPath() {
         return dataPath;
     }
diff --git a/rocketmq-console/src/main/java/org/apache/rocketmq/console/service/client/MQAdminInstance.java b/rocketmq-console/src/main/java/org/apache/rocketmq/console/service/client/MQAdminInstance.java
index e914e6c..3fb57d0 100644
--- a/rocketmq-console/src/main/java/org/apache/rocketmq/console/service/client/MQAdminInstance.java
+++ b/rocketmq-console/src/main/java/org/apache/rocketmq/console/service/client/MQAdminInstance.java
@@ -16,9 +16,13 @@
  */
 package org.apache.rocketmq.console.service.client;
 
+import org.apache.commons.lang3.StringUtils;
+import org.apache.rocketmq.acl.common.AclClientRPCHook;
+import org.apache.rocketmq.acl.common.SessionCredentials;
 import org.apache.rocketmq.client.exception.MQClientException;
 import org.apache.rocketmq.client.impl.MQClientAPIImpl;
 import org.apache.rocketmq.client.impl.factory.MQClientInstance;
+import org.apache.rocketmq.remoting.RPCHook;
 import org.apache.rocketmq.remoting.RemotingClient;
 import org.apache.rocketmq.tools.admin.DefaultMQAdminExt;
 import org.apache.rocketmq.tools.admin.DefaultMQAdminExtImpl;
@@ -47,16 +51,20 @@
         DefaultMQAdminExtImpl defaultMQAdminExtImpl = Reflect.on(MQAdminInstance.threadLocalMQAdminExt()).get("defaultMQAdminExtImpl");
         return Reflect.on(defaultMQAdminExtImpl).get("mqClientInstance");
     }
-
-    public static void initMQAdminInstance(long timeoutMillis) throws MQClientException {
+    public static void initMQAdminInstance(long timeoutMillis,String accessKey,String secretKey) throws MQClientException {
         Integer nowCount = INIT_COUNTER.get();
         if (nowCount == null) {
+            RPCHook rpcHook = null;
+            boolean isEnableAcl = !StringUtils.isEmpty(accessKey) && !StringUtils.isEmpty(secretKey);
+            if (isEnableAcl) {
+                rpcHook = new AclClientRPCHook(new SessionCredentials(accessKey, secretKey));
+            }
             DefaultMQAdminExt defaultMQAdminExt;
             if (timeoutMillis > 0) {
-                defaultMQAdminExt = new DefaultMQAdminExt(timeoutMillis);
+                defaultMQAdminExt = new DefaultMQAdminExt(rpcHook,timeoutMillis);
             }
             else {
-                defaultMQAdminExt = new DefaultMQAdminExt();
+                defaultMQAdminExt = new DefaultMQAdminExt(rpcHook);
             }
             defaultMQAdminExt.setInstanceName(Long.toString(System.currentTimeMillis()));
             defaultMQAdminExt.start();
diff --git a/rocketmq-console/src/main/java/org/apache/rocketmq/console/service/impl/MessageServiceImpl.java b/rocketmq-console/src/main/java/org/apache/rocketmq/console/service/impl/MessageServiceImpl.java
index fb41634..34d3994 100644
--- a/rocketmq-console/src/main/java/org/apache/rocketmq/console/service/impl/MessageServiceImpl.java
+++ b/rocketmq-console/src/main/java/org/apache/rocketmq/console/service/impl/MessageServiceImpl.java
@@ -17,6 +17,7 @@
 
 package org.apache.rocketmq.console.service.impl;
 
+
 import com.google.common.base.Function;
 import com.google.common.base.Predicate;
 import com.google.common.base.Throwables;
@@ -29,6 +30,8 @@
 import java.util.Set;
 import javax.annotation.Resource;
 import org.apache.commons.lang3.StringUtils;
+import org.apache.rocketmq.acl.common.AclClientRPCHook;
+import org.apache.rocketmq.acl.common.SessionCredentials;
 import org.apache.rocketmq.client.consumer.DefaultMQPullConsumer;
 import org.apache.rocketmq.client.consumer.PullResult;
 import org.apache.rocketmq.common.MixAll;
@@ -38,19 +41,25 @@
 import org.apache.rocketmq.common.protocol.body.Connection;
 import org.apache.rocketmq.common.protocol.body.ConsumeMessageDirectlyResult;
 import org.apache.rocketmq.common.protocol.body.ConsumerConnection;
+import org.apache.rocketmq.console.config.RMQConfigure;
 import org.apache.rocketmq.console.exception.ServiceException;
 import org.apache.rocketmq.console.model.MessageView;
 import org.apache.rocketmq.console.service.MessageService;
+import org.apache.rocketmq.remoting.RPCHook;
 import org.apache.rocketmq.tools.admin.MQAdminExt;
 import org.apache.rocketmq.tools.admin.api.MessageTrack;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
+import org.springframework.beans.factory.annotation.Autowired;
 import org.springframework.stereotype.Service;
 
 @Service
 public class MessageServiceImpl implements MessageService {
 
     private Logger logger = LoggerFactory.getLogger(MessageServiceImpl.class);
+
+    @Autowired
+    private RMQConfigure configure;
     /**
      * @see org.apache.rocketmq.store.config.MessageStoreConfig maxMsgsNumBatch = 64;
      * @see org.apache.rocketmq.store.index.IndexService maxNum = Math.min(maxNum, this.defaultMessageStore.getMessageStoreConfig().getMaxMsgsNumBatch());
@@ -59,6 +68,7 @@
     @Resource
     private MQAdminExt mqAdminExt;
 
+    @Override
     public Pair<MessageView, List<MessageTrack>> viewMessage(String subject, final String msgId) {
         try {
 
@@ -88,7 +98,12 @@
 
     @Override
     public List<MessageView> queryMessageByTopic(String topic, final long begin, final long end) {
-        DefaultMQPullConsumer consumer = new DefaultMQPullConsumer(MixAll.TOOLS_CONSUMER_GROUP);
+        boolean isEnableAcl = !StringUtils.isEmpty(configure.getAccessKey()) && !StringUtils.isEmpty(configure.getSecretKey());
+        RPCHook rpcHook = null;
+        if (isEnableAcl) {
+            rpcHook = new AclClientRPCHook(new SessionCredentials(configure.getAccessKey(),configure.getSecretKey()));
+        }
+        DefaultMQPullConsumer consumer = new DefaultMQPullConsumer(MixAll.TOOLS_CONSUMER_GROUP,rpcHook);
         List<MessageView> messageViewList = Lists.newArrayList();
         try {
             String subExpression = "*";
diff --git a/rocketmq-console/src/main/java/org/apache/rocketmq/console/service/impl/TopicServiceImpl.java b/rocketmq-console/src/main/java/org/apache/rocketmq/console/service/impl/TopicServiceImpl.java
index 7660eaa..f6211c8 100644
--- a/rocketmq-console/src/main/java/org/apache/rocketmq/console/service/impl/TopicServiceImpl.java
+++ b/rocketmq-console/src/main/java/org/apache/rocketmq/console/service/impl/TopicServiceImpl.java
@@ -20,13 +20,9 @@
 import com.google.common.base.Throwables;
 import com.google.common.collect.Lists;
 import com.google.common.collect.Sets;
-
-import java.util.Arrays;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Set;
-
 import org.apache.commons.lang3.StringUtils;
+import org.apache.rocketmq.acl.common.AclClientRPCHook;
+import org.apache.rocketmq.acl.common.SessionCredentials;
 import org.apache.rocketmq.client.producer.DefaultMQProducer;
 import org.apache.rocketmq.client.producer.SendResult;
 import org.apache.rocketmq.common.MixAll;
@@ -43,11 +39,17 @@
 import org.apache.rocketmq.console.model.request.TopicConfigInfo;
 import org.apache.rocketmq.console.service.AbstractCommonService;
 import org.apache.rocketmq.console.service.TopicService;
+import org.apache.rocketmq.remoting.RPCHook;
 import org.apache.rocketmq.tools.command.CommandUtil;
 import org.springframework.beans.BeanUtils;
 import org.springframework.beans.factory.annotation.Autowired;
 import org.springframework.stereotype.Service;
 
+import java.util.Arrays;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+
 @Service
 public class TopicServiceImpl extends AbstractCommonService implements TopicService {
 
@@ -209,7 +211,12 @@
     }
 
     private TopicList  getSystemTopicList() {
-        DefaultMQProducer producer = new DefaultMQProducer(MixAll.SELF_TEST_PRODUCER_GROUP);
+        RPCHook rpcHook = null;
+        boolean isEnableAcl = !StringUtils.isEmpty(rMQConfigure.getAccessKey()) && !StringUtils.isEmpty(rMQConfigure.getSecretKey());
+        if (isEnableAcl) {
+            rpcHook = new AclClientRPCHook(new SessionCredentials(rMQConfigure.getAccessKey(),rMQConfigure.getSecretKey()));
+        }
+        DefaultMQProducer producer = new DefaultMQProducer(MixAll.SELF_TEST_PRODUCER_GROUP,rpcHook);
         producer.setInstanceName(String.valueOf(System.currentTimeMillis()));
         producer.setNamesrvAddr(rMQConfigure.getNamesrvAddr());
 
@@ -228,7 +235,17 @@
 
     @Override
     public SendResult sendTopicMessageRequest(SendTopicMessageRequest sendTopicMessageRequest) {
-        DefaultMQProducer producer = new DefaultMQProducer(MixAll.SELF_TEST_PRODUCER_GROUP);
+        DefaultMQProducer producer = null;
+        if(rMQConfigure.isACLEnabled()){
+            producer = new DefaultMQProducer(new AclClientRPCHook(new SessionCredentials(
+                    rMQConfigure.getAccessKey(),
+                    rMQConfigure.getSecretKey()
+            )));
+            producer.setProducerGroup(MixAll.SELF_TEST_PRODUCER_GROUP);
+        }else{
+            producer = new DefaultMQProducer(MixAll.SELF_TEST_PRODUCER_GROUP);
+        }
+
         producer.setInstanceName(String.valueOf(System.currentTimeMillis()));
         producer.setNamesrvAddr(rMQConfigure.getNamesrvAddr());
         try {
diff --git a/rocketmq-console/src/main/resources/application.properties b/rocketmq-console/src/main/resources/application.properties
index a5d233c..6b1d2df 100644
--- a/rocketmq-console/src/main/resources/application.properties
+++ b/rocketmq-console/src/main/resources/application.properties
@@ -27,4 +27,8 @@
 rocketmq.config.ticketKey=ticket
 
 #Must create userInfo file: ${rocketmq.config.dataPath}/users.properties if the login is required
-rocketmq.config.loginRequired=false
\ No newline at end of file
+rocketmq.config.loginRequired=false
+
+#set the accessKey and secretKey if you used acl
+#rocketmq.config.accessKey=
+#rocketmq.config.secretKey=
\ No newline at end of file