[ISSUE #821] Support auto create subGroup (#828)

* [connect] Support auto create subGroup

Signed-off-by: zhangyang21 <zhangyang21@xiaomi.com>

* [connect] Use a unique instance name

Signed-off-by: zhangyang21 <zhangyang21@xiaomi.com>
diff --git a/rocketmq-connect/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/config/ConnectConfig.java b/rocketmq-connect/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/config/ConnectConfig.java
index 949cffd..3620e34 100644
--- a/rocketmq-connect/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/config/ConnectConfig.java
+++ b/rocketmq-connect/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/config/ConnectConfig.java
@@ -25,6 +25,11 @@
 public class ConnectConfig {
 
     /**
+     * The unique ID of each worker instance in the cluster
+     */
+    private String workerId;
+
+    /**
      * Storage directory for file store.
      */
     private String storePathRootDir = System.getProperty("user.home") + File.separator + "connectorStore";
@@ -109,6 +114,20 @@
 
     private String secretKey;
 
+    private boolean autoCreateGroupEnable = false;
+
+    private String clusterName;
+
+    private String adminExtGroup = "connector-admin-group";
+
+    public String getWorkerId() {
+        return workerId;
+    }
+
+    public void setWorkerId(String workerId) {
+        this.workerId = workerId;
+    }
+
     public String getNamesrvAddr() {
         return namesrvAddr;
     }
@@ -301,10 +320,34 @@
         this.secretKey = secretKey;
     }
 
-    @Override
-    public String toString() {
+    public boolean isAutoCreateGroupEnable() {
+        return autoCreateGroupEnable;
+    }
+
+    public void setAutoCreateGroupEnable(boolean autoCreateGroupEnable) {
+        this.autoCreateGroupEnable = autoCreateGroupEnable;
+    }
+
+    public String getClusterName() {
+        return clusterName;
+    }
+
+    public void setClusterName(String clusterName) {
+        this.clusterName = clusterName;
+    }
+
+    public String getAdminExtGroup() {
+        return adminExtGroup;
+    }
+
+    public void setAdminExtGroup(String adminExtGroup) {
+        this.adminExtGroup = adminExtGroup;
+    }
+
+    @Override public String toString() {
         return "ConnectConfig{" +
-            "storePathRootDir='" + storePathRootDir + '\'' +
+            "workerId='" + workerId + '\'' +
+            ", storePathRootDir='" + storePathRootDir + '\'' +
             ", namesrvAddr='" + namesrvAddr + '\'' +
             ", rmqProducerGroup='" + rmqProducerGroup + '\'' +
             ", maxMessageSize=" + maxMessageSize +
@@ -329,6 +372,9 @@
             ", aclEnable=" + aclEnable +
             ", accessKey='" + accessKey + '\'' +
             ", secretKey='" + secretKey + '\'' +
+            ", autoCreateGroupEnable=" + autoCreateGroupEnable +
+            ", clusterName='" + clusterName + '\'' +
+            ", adminExtGroup='" + adminExtGroup + '\'' +
             '}';
     }
 }
diff --git a/rocketmq-connect/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/connectorwrapper/Worker.java b/rocketmq-connect/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/connectorwrapper/Worker.java
index b309665..8dbe13f 100644
--- a/rocketmq-connect/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/connectorwrapper/Worker.java
+++ b/rocketmq-connect/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/connectorwrapper/Worker.java
@@ -426,6 +426,9 @@
                     this.pendingTasks.put(workerSourceTask, System.currentTimeMillis());
                 } else if (task instanceof SinkTask) {
                     DefaultMQPullConsumer consumer = ConnectUtil.initDefaultMQPullConsumer(connectConfig);
+                    if (connectConfig.isAutoCreateGroupEnable()) {
+                        ConnectUtil.createSubGroup(connectConfig, consumer.getConsumerGroup());
+                    }
 
                     WorkerSinkTask workerSinkTask = new WorkerSinkTask(connectorName,
                         (SinkTask) task, keyValue, offsetManagementService, recordConverter, consumer, workerState);
diff --git a/rocketmq-connect/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/service/ClusterManagementServiceImpl.java b/rocketmq-connect/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/service/ClusterManagementServiceImpl.java
index 80cf3b1..49ef1b9 100644
--- a/rocketmq-connect/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/service/ClusterManagementServiceImpl.java
+++ b/rocketmq-connect/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/service/ClusterManagementServiceImpl.java
@@ -28,6 +28,7 @@
 import org.apache.rocketmq.common.protocol.header.NotifyConsumerIdsChangedRequestHeader;
 import org.apache.rocketmq.connect.runtime.common.LoggerName;
 import org.apache.rocketmq.connect.runtime.config.ConnectConfig;
+import org.apache.rocketmq.connect.runtime.utils.ConnectUtil;
 import org.apache.rocketmq.remoting.common.RemotingHelper;
 import org.apache.rocketmq.remoting.netty.NettyRequestProcessor;
 import org.apache.rocketmq.remoting.protocol.RemotingCommand;
@@ -53,8 +54,20 @@
     public ClusterManagementServiceImpl(ConnectConfig connectConfig) {
         this.connectConfig = connectConfig;
         this.workerStatusListeners = new HashSet<>();
-        this.defaultMQPullConsumer = new DefaultMQPullConsumer(connectConfig.getConnectClusterId());
-        this.defaultMQPullConsumer.setNamesrvAddr(connectConfig.getNamesrvAddr());
+        this.defaultMQPullConsumer = ConnectUtil.initDefaultMQPullConsumer(connectConfig);
+        this.defaultMQPullConsumer.setConsumerGroup(connectConfig.getConnectClusterId());
+        this.prepare(connectConfig);
+    }
+
+    /**
+     * Preparation before startup
+     *
+     * @param connectConfig
+     */
+    private void prepare(ConnectConfig connectConfig) {
+        if (connectConfig.isAutoCreateGroupEnable()) {
+            ConnectUtil.createSubGroup(connectConfig, this.defaultMQPullConsumer.getConsumerGroup());
+        }
     }
 
     @Override
diff --git a/rocketmq-connect/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/service/ConfigManagementServiceImpl.java b/rocketmq-connect/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/service/ConfigManagementServiceImpl.java
index 68af95e..f1c93aa 100644
--- a/rocketmq-connect/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/service/ConfigManagementServiceImpl.java
+++ b/rocketmq-connect/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/service/ConfigManagementServiceImpl.java
@@ -77,7 +77,7 @@
         this.connectorConfigUpdateListener = new HashSet<>();
         this.dataSynchronizer = new BrokerBasedLog<>(connectConfig,
             connectConfig.getConfigStoreTopic(),
-            ConnectUtil.createGroupName(configManagePrefix),
+            ConnectUtil.createGroupName(configManagePrefix, connectConfig.getWorkerId()),
             new ConfigChangeCallback(),
             new JsonConverter(),
             new ConnAndTaskConfigConverter());
diff --git a/rocketmq-connect/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/service/OffsetManagementServiceImpl.java b/rocketmq-connect/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/service/OffsetManagementServiceImpl.java
index 87e818c..6a9e386 100644
--- a/rocketmq-connect/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/service/OffsetManagementServiceImpl.java
+++ b/rocketmq-connect/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/service/OffsetManagementServiceImpl.java
@@ -60,7 +60,7 @@
             new ByteBufferConverter());
         this.dataSynchronizer = new BrokerBasedLog(connectConfig,
             connectConfig.getOffsetStoreTopic(),
-            ConnectUtil.createGroupName(offsetManagePrefix),
+            ConnectUtil.createGroupName(offsetManagePrefix, connectConfig.getWorkerId()),
             new OffsetChangeCallback(),
             new JsonConverter(),
             new ByteMapConverter());
diff --git a/rocketmq-connect/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/service/PositionManagementServiceImpl.java b/rocketmq-connect/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/service/PositionManagementServiceImpl.java
index 986c98b..d1d7ffb 100644
--- a/rocketmq-connect/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/service/PositionManagementServiceImpl.java
+++ b/rocketmq-connect/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/service/PositionManagementServiceImpl.java
@@ -60,7 +60,7 @@
             new ByteBufferConverter());
         this.dataSynchronizer = new BrokerBasedLog(connectConfig,
             connectConfig.getPositionStoreTopic(),
-            ConnectUtil.createGroupName(positionManagePrefix),
+            ConnectUtil.createGroupName(positionManagePrefix, connectConfig.getWorkerId()),
             new PositionChangeCallback(),
             new JsonConverter(),
             new ByteMapConverter());
diff --git a/rocketmq-connect/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/utils/ConnectUtil.java b/rocketmq-connect/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/utils/ConnectUtil.java
index 45dccd5..434574a 100644
--- a/rocketmq-connect/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/utils/ConnectUtil.java
+++ b/rocketmq-connect/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/utils/ConnectUtil.java
@@ -20,25 +20,28 @@
 import java.util.ArrayList;
 import java.util.Collections;
 import java.util.List;
-import java.util.concurrent.atomic.AtomicLong;
+import java.util.Set;
+import java.util.UUID;
 import org.apache.rocketmq.acl.common.AclClientRPCHook;
 import org.apache.rocketmq.acl.common.SessionCredentials;
 import org.apache.rocketmq.client.consumer.DefaultMQPullConsumer;
 import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
+import org.apache.rocketmq.client.exception.MQClientException;
 import org.apache.rocketmq.client.producer.DefaultMQProducer;
 import org.apache.rocketmq.common.UtilAll;
 import org.apache.rocketmq.common.consumer.ConsumeFromWhere;
+import org.apache.rocketmq.common.subscription.SubscriptionGroupConfig;
 import org.apache.rocketmq.connect.runtime.config.ConnectConfig;
 import org.apache.rocketmq.connect.runtime.config.RuntimeConfigDefine;
 import org.apache.rocketmq.connect.runtime.service.strategy.AllocateConnAndTaskStrategy;
 import org.apache.rocketmq.remoting.RPCHook;
 import org.apache.rocketmq.remoting.common.RemotingUtil;
 import org.apache.rocketmq.remoting.protocol.LanguageCode;
+import org.apache.rocketmq.tools.admin.DefaultMQAdminExt;
+import org.apache.rocketmq.tools.command.CommandUtil;
 
 public class ConnectUtil {
 
-    private final static AtomicLong GROUP_POSTFIX_ID = new AtomicLong(0);
-
     public static String createGroupName(String prefix) {
         StringBuilder sb = new StringBuilder();
         sb.append(prefix).append("-");
@@ -48,10 +51,8 @@
         return sb.toString().replace(".", "-");
     }
 
-    public static String createGroupNameV2(String prefix) {
-        StringBuilder sb = new StringBuilder();
-        sb.append(prefix).append("-").append(GROUP_POSTFIX_ID.getAndIncrement());
-        return sb.toString();
+    public static String createGroupName(String prefix, String postfix) {
+        return new StringBuilder().append(prefix).append("-").append(postfix).toString();
     }
 
     public static String createInstance(String servers) {
@@ -66,6 +67,10 @@
         return String.valueOf(serversList.toString().hashCode());
     }
 
+    public static String createUniqInstance(String prefix) {
+        return new StringBuffer(prefix).append("-").append(UUID.randomUUID().toString()).toString();
+    }
+
     public static AllocateConnAndTaskStrategy initAllocateConnAndTaskStrategy(ConnectConfig connectConfig) {
         try {
             return (AllocateConnAndTaskStrategy) Thread.currentThread().getContextClassLoader().loadClass(connectConfig.getAllocTaskStrategy()).newInstance();
@@ -81,8 +86,8 @@
         }
         DefaultMQProducer producer = new DefaultMQProducer(rpcHook);
         producer.setNamesrvAddr(connectConfig.getNamesrvAddr());
-        producer.setInstanceName(createInstance(connectConfig.getNamesrvAddr()));
-        producer.setProducerGroup(createGroupNameV2(connectConfig.getRmqProducerGroup()));
+        producer.setInstanceName(createUniqInstance(connectConfig.getNamesrvAddr()));
+        producer.setProducerGroup(connectConfig.getRmqProducerGroup());
         producer.setSendMsgTimeout(connectConfig.getOperationTimeout());
         producer.setMaxMessageSize(RuntimeConfigDefine.MAX_MESSAGE_SIZE);
         producer.setLanguage(LanguageCode.JAVA);
@@ -96,8 +101,8 @@
         }
         DefaultMQPullConsumer consumer = new DefaultMQPullConsumer(rpcHook);
         consumer.setNamesrvAddr(connectConfig.getNamesrvAddr());
-        consumer.setInstanceName(createInstance(connectConfig.getNamesrvAddr()));
-        consumer.setConsumerGroup(createGroupNameV2(connectConfig.getRmqConsumerGroup()));
+        consumer.setInstanceName(createUniqInstance(connectConfig.getNamesrvAddr()));
+        consumer.setConsumerGroup(connectConfig.getRmqConsumerGroup());
         consumer.setMaxReconsumeTimes(connectConfig.getRmqMaxRedeliveryTimes());
         consumer.setBrokerSuspendMaxTimeMillis(connectConfig.getBrokerSuspendMaxTimeMillis());
         consumer.setConsumerPullTimeoutMillis((long) connectConfig.getRmqMessageConsumeTimeout());
@@ -112,8 +117,8 @@
         }
         DefaultMQPushConsumer consumer = new DefaultMQPushConsumer(rpcHook);
         consumer.setNamesrvAddr(connectConfig.getNamesrvAddr());
-        consumer.setInstanceName(createInstance(connectConfig.getNamesrvAddr()));
-        consumer.setConsumerGroup(createGroupNameV2(connectConfig.getRmqConsumerGroup()));
+        consumer.setInstanceName(createUniqInstance(connectConfig.getNamesrvAddr()));
+        consumer.setConsumerGroup(createGroupName(connectConfig.getRmqConsumerGroup()));
         consumer.setMaxReconsumeTimes(connectConfig.getRmqMaxRedeliveryTimes());
         consumer.setConsumeTimeout((long) connectConfig.getRmqMessageConsumeTimeout());
         consumer.setConsumeThreadMin(connectConfig.getRmqMinConsumeThreadNums());
@@ -121,4 +126,38 @@
         consumer.setLanguage(LanguageCode.JAVA);
         return consumer;
     }
+
+    public static DefaultMQAdminExt startMQAdminTool(ConnectConfig connectConfig) throws MQClientException {
+        RPCHook rpcHook = null;
+        if (connectConfig.getAclEnable()) {
+            rpcHook = new AclClientRPCHook(new SessionCredentials(connectConfig.getAccessKey(), connectConfig.getSecretKey()));
+        }
+        DefaultMQAdminExt defaultMQAdminExt = new DefaultMQAdminExt(rpcHook);
+        defaultMQAdminExt.setNamesrvAddr(connectConfig.getNamesrvAddr());
+        defaultMQAdminExt.setAdminExtGroup(connectConfig.getAdminExtGroup());
+        defaultMQAdminExt.setInstanceName(ConnectUtil.createUniqInstance(connectConfig.getNamesrvAddr()));
+        defaultMQAdminExt.start();
+        return defaultMQAdminExt;
+    }
+
+    public static String createSubGroup(ConnectConfig connectConfig, String subGroup) {
+        DefaultMQAdminExt defaultMQAdminExt = null;
+        try {
+            defaultMQAdminExt = startMQAdminTool(connectConfig);
+            SubscriptionGroupConfig initConfig = new SubscriptionGroupConfig();
+            initConfig.setGroupName(subGroup);
+
+            Set<String> masterSet = CommandUtil.fetchMasterAddrByClusterName(defaultMQAdminExt, connectConfig.getClusterName());
+            for (String addr : masterSet) {
+                defaultMQAdminExt.createAndUpdateSubscriptionGroupConfig(addr, initConfig);
+            }
+        } catch (Exception e) {
+            throw new IllegalArgumentException("create subGroup: " + subGroup + " failed", e);
+        } finally {
+            if (defaultMQAdminExt != null) {
+                defaultMQAdminExt.shutdown();
+            }
+        }
+        return subGroup;
+    }
 }
diff --git a/rocketmq-connect/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/utils/datasync/BrokerBasedLog.java b/rocketmq-connect/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/utils/datasync/BrokerBasedLog.java
index e0c9a27..7f79e01 100644
--- a/rocketmq-connect/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/utils/datasync/BrokerBasedLog.java
+++ b/rocketmq-connect/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/utils/datasync/BrokerBasedLog.java
@@ -95,6 +95,18 @@
         this.consumer.setConsumerGroup(workId);

         this.keyConverter = keyConverter;

         this.valueConverter = valueConverter;

+        this.prepare(connectConfig);

+    }

+

+    /**

+     * Preparation before startup

+     *

+     * @param connectConfig

+     */

+    private void prepare(ConnectConfig connectConfig) {

+        if (connectConfig.isAutoCreateGroupEnable()) {

+            ConnectUtil.createSubGroup(connectConfig, consumer.getConsumerGroup());

+        }

     }

 

     @Override

diff --git a/rocketmq-connect/rocketmq-connect-runtime/src/main/resources/connect.conf b/rocketmq-connect/rocketmq-connect-runtime/src/main/resources/connect.conf
index e951e95..449728d 100644
--- a/rocketmq-connect/rocketmq-connect-runtime/src/main/resources/connect.conf
+++ b/rocketmq-connect/rocketmq-connect-runtime/src/main/resources/connect.conf
@@ -27,6 +27,9 @@
 accessKey=rocketmq
 secretKey=12345678
 
+autoCreateGroupEnable=false
+clusterName="DefaultCluster"
+
 # Source or sink connector jar file dir,The default value is rocketmq-connect-sample
 pluginPaths=/xxx/connector-plugins