[ISSUE #821] Support auto create subGroup (#828)
* [connect] Support auto create subGroup
Signed-off-by: zhangyang21 <zhangyang21@xiaomi.com>
* [connect] Use a unique instance name
Signed-off-by: zhangyang21 <zhangyang21@xiaomi.com>
diff --git a/rocketmq-connect/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/config/ConnectConfig.java b/rocketmq-connect/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/config/ConnectConfig.java
index 949cffd..3620e34 100644
--- a/rocketmq-connect/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/config/ConnectConfig.java
+++ b/rocketmq-connect/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/config/ConnectConfig.java
@@ -25,6 +25,11 @@
public class ConnectConfig {
/**
+ * The unique ID of each worker instance in the cluster
+ */
+ private String workerId;
+
+ /**
* Storage directory for file store.
*/
private String storePathRootDir = System.getProperty("user.home") + File.separator + "connectorStore";
@@ -109,6 +114,20 @@
private String secretKey;
+ private boolean autoCreateGroupEnable = false;
+
+ private String clusterName;
+
+ private String adminExtGroup = "connector-admin-group";
+
+ public String getWorkerId() {
+ return workerId;
+ }
+
+ public void setWorkerId(String workerId) {
+ this.workerId = workerId;
+ }
+
public String getNamesrvAddr() {
return namesrvAddr;
}
@@ -301,10 +320,34 @@
this.secretKey = secretKey;
}
- @Override
- public String toString() {
+ public boolean isAutoCreateGroupEnable() {
+ return autoCreateGroupEnable;
+ }
+
+ public void setAutoCreateGroupEnable(boolean autoCreateGroupEnable) {
+ this.autoCreateGroupEnable = autoCreateGroupEnable;
+ }
+
+ public String getClusterName() {
+ return clusterName;
+ }
+
+ public void setClusterName(String clusterName) {
+ this.clusterName = clusterName;
+ }
+
+ public String getAdminExtGroup() {
+ return adminExtGroup;
+ }
+
+ public void setAdminExtGroup(String adminExtGroup) {
+ this.adminExtGroup = adminExtGroup;
+ }
+
+ @Override public String toString() {
return "ConnectConfig{" +
- "storePathRootDir='" + storePathRootDir + '\'' +
+ "workerId='" + workerId + '\'' +
+ ", storePathRootDir='" + storePathRootDir + '\'' +
", namesrvAddr='" + namesrvAddr + '\'' +
", rmqProducerGroup='" + rmqProducerGroup + '\'' +
", maxMessageSize=" + maxMessageSize +
@@ -329,6 +372,9 @@
", aclEnable=" + aclEnable +
", accessKey='" + accessKey + '\'' +
", secretKey='" + secretKey + '\'' +
+ ", autoCreateGroupEnable=" + autoCreateGroupEnable +
+ ", clusterName='" + clusterName + '\'' +
+ ", adminExtGroup='" + adminExtGroup + '\'' +
'}';
}
}
diff --git a/rocketmq-connect/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/connectorwrapper/Worker.java b/rocketmq-connect/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/connectorwrapper/Worker.java
index b309665..8dbe13f 100644
--- a/rocketmq-connect/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/connectorwrapper/Worker.java
+++ b/rocketmq-connect/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/connectorwrapper/Worker.java
@@ -426,6 +426,9 @@
this.pendingTasks.put(workerSourceTask, System.currentTimeMillis());
} else if (task instanceof SinkTask) {
DefaultMQPullConsumer consumer = ConnectUtil.initDefaultMQPullConsumer(connectConfig);
+ if (connectConfig.isAutoCreateGroupEnable()) {
+ ConnectUtil.createSubGroup(connectConfig, consumer.getConsumerGroup());
+ }
WorkerSinkTask workerSinkTask = new WorkerSinkTask(connectorName,
(SinkTask) task, keyValue, offsetManagementService, recordConverter, consumer, workerState);
diff --git a/rocketmq-connect/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/service/ClusterManagementServiceImpl.java b/rocketmq-connect/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/service/ClusterManagementServiceImpl.java
index 80cf3b1..49ef1b9 100644
--- a/rocketmq-connect/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/service/ClusterManagementServiceImpl.java
+++ b/rocketmq-connect/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/service/ClusterManagementServiceImpl.java
@@ -28,6 +28,7 @@
import org.apache.rocketmq.common.protocol.header.NotifyConsumerIdsChangedRequestHeader;
import org.apache.rocketmq.connect.runtime.common.LoggerName;
import org.apache.rocketmq.connect.runtime.config.ConnectConfig;
+import org.apache.rocketmq.connect.runtime.utils.ConnectUtil;
import org.apache.rocketmq.remoting.common.RemotingHelper;
import org.apache.rocketmq.remoting.netty.NettyRequestProcessor;
import org.apache.rocketmq.remoting.protocol.RemotingCommand;
@@ -53,8 +54,20 @@
public ClusterManagementServiceImpl(ConnectConfig connectConfig) {
this.connectConfig = connectConfig;
this.workerStatusListeners = new HashSet<>();
- this.defaultMQPullConsumer = new DefaultMQPullConsumer(connectConfig.getConnectClusterId());
- this.defaultMQPullConsumer.setNamesrvAddr(connectConfig.getNamesrvAddr());
+ this.defaultMQPullConsumer = ConnectUtil.initDefaultMQPullConsumer(connectConfig);
+ this.defaultMQPullConsumer.setConsumerGroup(connectConfig.getConnectClusterId());
+ this.prepare(connectConfig);
+ }
+
+ /**
+ * Preparation before startup
+ *
+ * @param connectConfig
+ */
+ private void prepare(ConnectConfig connectConfig) {
+ if (connectConfig.isAutoCreateGroupEnable()) {
+ ConnectUtil.createSubGroup(connectConfig, this.defaultMQPullConsumer.getConsumerGroup());
+ }
}
@Override
diff --git a/rocketmq-connect/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/service/ConfigManagementServiceImpl.java b/rocketmq-connect/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/service/ConfigManagementServiceImpl.java
index 68af95e..f1c93aa 100644
--- a/rocketmq-connect/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/service/ConfigManagementServiceImpl.java
+++ b/rocketmq-connect/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/service/ConfigManagementServiceImpl.java
@@ -77,7 +77,7 @@
this.connectorConfigUpdateListener = new HashSet<>();
this.dataSynchronizer = new BrokerBasedLog<>(connectConfig,
connectConfig.getConfigStoreTopic(),
- ConnectUtil.createGroupName(configManagePrefix),
+ ConnectUtil.createGroupName(configManagePrefix, connectConfig.getWorkerId()),
new ConfigChangeCallback(),
new JsonConverter(),
new ConnAndTaskConfigConverter());
diff --git a/rocketmq-connect/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/service/OffsetManagementServiceImpl.java b/rocketmq-connect/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/service/OffsetManagementServiceImpl.java
index 87e818c..6a9e386 100644
--- a/rocketmq-connect/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/service/OffsetManagementServiceImpl.java
+++ b/rocketmq-connect/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/service/OffsetManagementServiceImpl.java
@@ -60,7 +60,7 @@
new ByteBufferConverter());
this.dataSynchronizer = new BrokerBasedLog(connectConfig,
connectConfig.getOffsetStoreTopic(),
- ConnectUtil.createGroupName(offsetManagePrefix),
+ ConnectUtil.createGroupName(offsetManagePrefix, connectConfig.getWorkerId()),
new OffsetChangeCallback(),
new JsonConverter(),
new ByteMapConverter());
diff --git a/rocketmq-connect/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/service/PositionManagementServiceImpl.java b/rocketmq-connect/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/service/PositionManagementServiceImpl.java
index 986c98b..d1d7ffb 100644
--- a/rocketmq-connect/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/service/PositionManagementServiceImpl.java
+++ b/rocketmq-connect/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/service/PositionManagementServiceImpl.java
@@ -60,7 +60,7 @@
new ByteBufferConverter());
this.dataSynchronizer = new BrokerBasedLog(connectConfig,
connectConfig.getPositionStoreTopic(),
- ConnectUtil.createGroupName(positionManagePrefix),
+ ConnectUtil.createGroupName(positionManagePrefix, connectConfig.getWorkerId()),
new PositionChangeCallback(),
new JsonConverter(),
new ByteMapConverter());
diff --git a/rocketmq-connect/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/utils/ConnectUtil.java b/rocketmq-connect/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/utils/ConnectUtil.java
index 45dccd5..434574a 100644
--- a/rocketmq-connect/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/utils/ConnectUtil.java
+++ b/rocketmq-connect/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/utils/ConnectUtil.java
@@ -20,25 +20,28 @@
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
-import java.util.concurrent.atomic.AtomicLong;
+import java.util.Set;
+import java.util.UUID;
import org.apache.rocketmq.acl.common.AclClientRPCHook;
import org.apache.rocketmq.acl.common.SessionCredentials;
import org.apache.rocketmq.client.consumer.DefaultMQPullConsumer;
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
+import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.common.UtilAll;
import org.apache.rocketmq.common.consumer.ConsumeFromWhere;
+import org.apache.rocketmq.common.subscription.SubscriptionGroupConfig;
import org.apache.rocketmq.connect.runtime.config.ConnectConfig;
import org.apache.rocketmq.connect.runtime.config.RuntimeConfigDefine;
import org.apache.rocketmq.connect.runtime.service.strategy.AllocateConnAndTaskStrategy;
import org.apache.rocketmq.remoting.RPCHook;
import org.apache.rocketmq.remoting.common.RemotingUtil;
import org.apache.rocketmq.remoting.protocol.LanguageCode;
+import org.apache.rocketmq.tools.admin.DefaultMQAdminExt;
+import org.apache.rocketmq.tools.command.CommandUtil;
public class ConnectUtil {
- private final static AtomicLong GROUP_POSTFIX_ID = new AtomicLong(0);
-
public static String createGroupName(String prefix) {
StringBuilder sb = new StringBuilder();
sb.append(prefix).append("-");
@@ -48,10 +51,8 @@
return sb.toString().replace(".", "-");
}
- public static String createGroupNameV2(String prefix) {
- StringBuilder sb = new StringBuilder();
- sb.append(prefix).append("-").append(GROUP_POSTFIX_ID.getAndIncrement());
- return sb.toString();
+ public static String createGroupName(String prefix, String postfix) {
+ return new StringBuilder().append(prefix).append("-").append(postfix).toString();
}
public static String createInstance(String servers) {
@@ -66,6 +67,10 @@
return String.valueOf(serversList.toString().hashCode());
}
+ public static String createUniqInstance(String prefix) {
+ return new StringBuffer(prefix).append("-").append(UUID.randomUUID().toString()).toString();
+ }
+
public static AllocateConnAndTaskStrategy initAllocateConnAndTaskStrategy(ConnectConfig connectConfig) {
try {
return (AllocateConnAndTaskStrategy) Thread.currentThread().getContextClassLoader().loadClass(connectConfig.getAllocTaskStrategy()).newInstance();
@@ -81,8 +86,8 @@
}
DefaultMQProducer producer = new DefaultMQProducer(rpcHook);
producer.setNamesrvAddr(connectConfig.getNamesrvAddr());
- producer.setInstanceName(createInstance(connectConfig.getNamesrvAddr()));
- producer.setProducerGroup(createGroupNameV2(connectConfig.getRmqProducerGroup()));
+ producer.setInstanceName(createUniqInstance(connectConfig.getNamesrvAddr()));
+ producer.setProducerGroup(connectConfig.getRmqProducerGroup());
producer.setSendMsgTimeout(connectConfig.getOperationTimeout());
producer.setMaxMessageSize(RuntimeConfigDefine.MAX_MESSAGE_SIZE);
producer.setLanguage(LanguageCode.JAVA);
@@ -96,8 +101,8 @@
}
DefaultMQPullConsumer consumer = new DefaultMQPullConsumer(rpcHook);
consumer.setNamesrvAddr(connectConfig.getNamesrvAddr());
- consumer.setInstanceName(createInstance(connectConfig.getNamesrvAddr()));
- consumer.setConsumerGroup(createGroupNameV2(connectConfig.getRmqConsumerGroup()));
+ consumer.setInstanceName(createUniqInstance(connectConfig.getNamesrvAddr()));
+ consumer.setConsumerGroup(connectConfig.getRmqConsumerGroup());
consumer.setMaxReconsumeTimes(connectConfig.getRmqMaxRedeliveryTimes());
consumer.setBrokerSuspendMaxTimeMillis(connectConfig.getBrokerSuspendMaxTimeMillis());
consumer.setConsumerPullTimeoutMillis((long) connectConfig.getRmqMessageConsumeTimeout());
@@ -112,8 +117,8 @@
}
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer(rpcHook);
consumer.setNamesrvAddr(connectConfig.getNamesrvAddr());
- consumer.setInstanceName(createInstance(connectConfig.getNamesrvAddr()));
- consumer.setConsumerGroup(createGroupNameV2(connectConfig.getRmqConsumerGroup()));
+ consumer.setInstanceName(createUniqInstance(connectConfig.getNamesrvAddr()));
+ consumer.setConsumerGroup(createGroupName(connectConfig.getRmqConsumerGroup()));
consumer.setMaxReconsumeTimes(connectConfig.getRmqMaxRedeliveryTimes());
consumer.setConsumeTimeout((long) connectConfig.getRmqMessageConsumeTimeout());
consumer.setConsumeThreadMin(connectConfig.getRmqMinConsumeThreadNums());
@@ -121,4 +126,38 @@
consumer.setLanguage(LanguageCode.JAVA);
return consumer;
}
+
+ public static DefaultMQAdminExt startMQAdminTool(ConnectConfig connectConfig) throws MQClientException {
+ RPCHook rpcHook = null;
+ if (connectConfig.getAclEnable()) {
+ rpcHook = new AclClientRPCHook(new SessionCredentials(connectConfig.getAccessKey(), connectConfig.getSecretKey()));
+ }
+ DefaultMQAdminExt defaultMQAdminExt = new DefaultMQAdminExt(rpcHook);
+ defaultMQAdminExt.setNamesrvAddr(connectConfig.getNamesrvAddr());
+ defaultMQAdminExt.setAdminExtGroup(connectConfig.getAdminExtGroup());
+ defaultMQAdminExt.setInstanceName(ConnectUtil.createUniqInstance(connectConfig.getNamesrvAddr()));
+ defaultMQAdminExt.start();
+ return defaultMQAdminExt;
+ }
+
+ public static String createSubGroup(ConnectConfig connectConfig, String subGroup) {
+ DefaultMQAdminExt defaultMQAdminExt = null;
+ try {
+ defaultMQAdminExt = startMQAdminTool(connectConfig);
+ SubscriptionGroupConfig initConfig = new SubscriptionGroupConfig();
+ initConfig.setGroupName(subGroup);
+
+ Set<String> masterSet = CommandUtil.fetchMasterAddrByClusterName(defaultMQAdminExt, connectConfig.getClusterName());
+ for (String addr : masterSet) {
+ defaultMQAdminExt.createAndUpdateSubscriptionGroupConfig(addr, initConfig);
+ }
+ } catch (Exception e) {
+ throw new IllegalArgumentException("create subGroup: " + subGroup + " failed", e);
+ } finally {
+ if (defaultMQAdminExt != null) {
+ defaultMQAdminExt.shutdown();
+ }
+ }
+ return subGroup;
+ }
}
diff --git a/rocketmq-connect/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/utils/datasync/BrokerBasedLog.java b/rocketmq-connect/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/utils/datasync/BrokerBasedLog.java
index e0c9a27..7f79e01 100644
--- a/rocketmq-connect/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/utils/datasync/BrokerBasedLog.java
+++ b/rocketmq-connect/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/utils/datasync/BrokerBasedLog.java
@@ -95,6 +95,18 @@
this.consumer.setConsumerGroup(workId);
this.keyConverter = keyConverter;
this.valueConverter = valueConverter;
+ this.prepare(connectConfig);
+ }
+
+ /**
+ * Preparation before startup
+ *
+ * @param connectConfig
+ */
+ private void prepare(ConnectConfig connectConfig) {
+ if (connectConfig.isAutoCreateGroupEnable()) {
+ ConnectUtil.createSubGroup(connectConfig, consumer.getConsumerGroup());
+ }
}
@Override
diff --git a/rocketmq-connect/rocketmq-connect-runtime/src/main/resources/connect.conf b/rocketmq-connect/rocketmq-connect-runtime/src/main/resources/connect.conf
index e951e95..449728d 100644
--- a/rocketmq-connect/rocketmq-connect-runtime/src/main/resources/connect.conf
+++ b/rocketmq-connect/rocketmq-connect-runtime/src/main/resources/connect.conf
@@ -27,6 +27,9 @@
accessKey=rocketmq
secretKey=12345678
+autoCreateGroupEnable=false
+clusterName="DefaultCluster"
+
# Source or sink connector jar file dir,The default value is rocketmq-connect-sample
pluginPaths=/xxx/connector-plugins