[ISSUE #495] jdbc-sink-connector support divide task by queue (#496)

* rocketmq-connect-jdbc add JdbcSinkTask & optimize JdbcSourceTask

* jdbc-connector bug-fix duplicate data pushed in table

* [ISSUE #489] JDBC Connector support divide task by topic strategy

* [ISSUE #489] JDBC Connector support divide task by topic strategy

* [ISSUE #495] jdbc-sink-connector support divide task by queue

Co-authored-by: Xiongmengfei <Xiong_mengfei@163.com>
diff --git a/rocketmq-connect-jdbc/pom.xml b/rocketmq-connect-jdbc/pom.xml
index 1d708f3..59442c5 100644
--- a/rocketmq-connect-jdbc/pom.xml
+++ b/rocketmq-connect-jdbc/pom.xml
@@ -16,7 +16,7 @@
     <modelVersion>4.0.0</modelVersion>
     <groupId>org.apache.rocketmq</groupId>
     <artifactId>rocketmq-connect-jdbc</artifactId>
-    <version>1.0.0</version>
+    <version>0.0.1-SNAPSHOT</version>
 
     <name>connect-jdbc</name>
     <url>https://github.com/apache/incubator-rocketmq-externals/tree/master/rocketmq-connect-jdbc</url>
@@ -40,6 +40,7 @@
         <!-- Compiler settings properties -->
         <maven.compiler.source>1.8</maven.compiler.source>
         <maven.compiler.target>1.8</maven.compiler.target>
+        <rocketmq.version>4.5.2</rocketmq.version>
     </properties>
 
     <build>
@@ -188,7 +189,7 @@
         <dependency>
             <groupId>io.openmessaging</groupId>
             <artifactId>openmessaging-connector</artifactId>
-            <version>0.1.0-beta</version>
+            <version>0.1.1-beta-SNAPSHOT</version>
         </dependency>
 		<dependency>
 			<groupId>io.openmessaging</groupId>
@@ -198,7 +199,7 @@
         <dependency>
             <groupId>com.alibaba</groupId>
             <artifactId>fastjson</artifactId>
-            <version>1.2.51</version>
+            <version>1.2.60</version>
         </dependency>
         <dependency>
             <groupId>org.slf4j</groupId>
@@ -217,14 +218,25 @@
         </dependency>
         <dependency>
             <groupId>org.apache.rocketmq</groupId>
+            <artifactId>rocketmq-client</artifactId>
+            <version>${rocketmq.version}</version>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.rocketmq</groupId>
+            <artifactId>rocketmq-tools</artifactId>
+            <version>${rocketmq.version}</version>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.rocketmq</groupId>
+            <artifactId>rocketmq-remoting</artifactId>
+            <version>${rocketmq.version}</version>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.rocketmq</groupId>
             <artifactId>rocketmq-openmessaging</artifactId>
             <version>4.3.2</version>
         </dependency>
-       <dependency>
-            <groupId>com.alibaba</groupId>
-            <artifactId>druid</artifactId>
-            <version>1.0.18</version>
-        </dependency>
+
         <dependency>
             <groupId>commons-cli</groupId>
             <artifactId>commons-cli</artifactId>
@@ -239,7 +251,19 @@
 			<groupId>io.javalin</groupId>
 			<artifactId>javalin</artifactId>
 			<version>1.3.0</version>
-		</dependency>		
+		</dependency>
+
+        <dependency>
+            <groupId>com.github.shyiko</groupId>
+            <artifactId>mysql-binlog-connector-java</artifactId>
+            <version>0.20.1</version>
+        </dependency>
+
+        <dependency>
+            <groupId>com.alibaba</groupId>
+            <artifactId>druid</artifactId>
+            <version>1.0.31</version>
+        </dependency>
 
     </dependencies>
 
diff --git a/rocketmq-connect-jdbc/src/main/java/org/apache/rocketmq/connect/jdbc/common/ConstDefine.java b/rocketmq-connect-jdbc/src/main/java/org/apache/rocketmq/connect/jdbc/common/ConstDefine.java
new file mode 100644
index 0000000..f49d367
--- /dev/null
+++ b/rocketmq-connect-jdbc/src/main/java/org/apache/rocketmq/connect/jdbc/common/ConstDefine.java
@@ -0,0 +1,23 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.rocketmq.connect.jdbc.common;
+
+public class ConstDefine {
+
+    public static String JDBC_CONNECTOR_ADMIN_PREFIX = "JDBC-CONNECTOR-ADMIN";
+
+}
diff --git a/rocketmq-connect-jdbc/src/main/java/org/apache/rocketmq/connect/jdbc/common/DBUtils.java b/rocketmq-connect-jdbc/src/main/java/org/apache/rocketmq/connect/jdbc/common/DBUtils.java
index ccee96b..ab58153 100644
--- a/rocketmq-connect-jdbc/src/main/java/org/apache/rocketmq/connect/jdbc/common/DBUtils.java
+++ b/rocketmq-connect-jdbc/src/main/java/org/apache/rocketmq/connect/jdbc/common/DBUtils.java
@@ -192,9 +192,9 @@
         Map<String, String> map = new HashMap<>();
         map.put("driverClassName", "com.mysql.cj.jdbc.Driver");
         map.put("url",
-                "jdbc:mysql://" + config.getJdbcUrl() + "?useSSL=true&verifyServerCertificate=false&serverTimezone=GMT%2B8&characterEncoding=utf8");
-        map.put("username", config.getJdbcUsername());
-        map.put("password", config.getJdbcPassword());
+                "jdbc:mysql://" + config.getDbUrl() + ":" + config.getDbPort()  + "?useSSL=true&verifyServerCertificate=false&serverTimezone=GMT%2B8&characterEncoding=utf8");
+        map.put("username", config.getDbUsername());
+        map.put("password", config.getDbPassword());
         map.put("initialSize", "1");
         map.put("maxActive", "1");
         map.put("maxWait", "60000");
diff --git a/rocketmq-connect-jdbc/src/main/java/org/apache/rocketmq/connect/jdbc/common/Utils.java b/rocketmq-connect-jdbc/src/main/java/org/apache/rocketmq/connect/jdbc/common/Utils.java
new file mode 100644
index 0000000..5708e34
--- /dev/null
+++ b/rocketmq-connect-jdbc/src/main/java/org/apache/rocketmq/connect/jdbc/common/Utils.java
@@ -0,0 +1,74 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.rocketmq.connect.jdbc.common;
+
+import org.apache.commons.lang3.StringUtils;
+import org.apache.rocketmq.client.exception.MQClientException;
+import org.apache.rocketmq.common.protocol.route.BrokerData;
+import org.apache.rocketmq.common.protocol.route.TopicRouteData;
+import org.apache.rocketmq.remoting.exception.RemotingException;
+import org.apache.rocketmq.tools.admin.DefaultMQAdminExt;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+
+public class Utils {
+    private static final Logger log = LoggerFactory.getLogger(Utils.class);
+
+    public static String createGroupName(String prefix) {
+        return new StringBuilder().append(prefix).append("-").append(System.currentTimeMillis()).toString();
+    }
+
+    public static String createGroupName(String prefix, String postfix) {
+        return new StringBuilder().append(prefix).append("-").append(postfix).toString();
+    }
+
+    public static String createTaskId(String prefix) {
+        return new StringBuilder().append(prefix).append("-").append(System.currentTimeMillis()).toString();
+    }
+
+    public static String createInstanceName(String namesrvAddr) {
+        String[] namesrvArray = namesrvAddr.split(";");
+        List<String> namesrvList = new ArrayList<>();
+        for (String ns : namesrvArray) {
+            if (!namesrvList.contains(ns)) {
+                namesrvList.add(ns);
+            }
+        }
+        Collections.sort(namesrvList);
+        return String.valueOf(namesrvList.toString().hashCode());
+    }
+
+    public static List<BrokerData> examineBrokerData(DefaultMQAdminExt defaultMQAdminExt, String topic,
+        String cluster) throws RemotingException, MQClientException, InterruptedException {
+        List<BrokerData> brokerList = new ArrayList<>();
+
+        TopicRouteData topicRouteData = defaultMQAdminExt.examineTopicRouteInfo(topic);
+        if (topicRouteData.getBrokerDatas() != null) {
+            for (BrokerData broker : topicRouteData.getBrokerDatas()) {
+                if (StringUtils.equals(broker.getCluster(), cluster)) {
+                    brokerList.add(broker);
+                }
+            }
+        }
+        return brokerList;
+    }
+
+}
diff --git a/rocketmq-connect-jdbc/src/main/java/org/apache/rocketmq/connect/jdbc/config/Config.java b/rocketmq-connect-jdbc/src/main/java/org/apache/rocketmq/connect/jdbc/config/Config.java
index 91a3e51..ae86d3f 100644
--- a/rocketmq-connect-jdbc/src/main/java/org/apache/rocketmq/connect/jdbc/config/Config.java
+++ b/rocketmq-connect-jdbc/src/main/java/org/apache/rocketmq/connect/jdbc/config/Config.java
@@ -59,6 +59,10 @@
     public static final String CONN_TOPIC_NAMES = "topicNames";
     public static final String CONN_DB_MODE = "mode";
 
+    public static final String CONN_SOURCE_RMQ = "source-rocketmq";
+    public static final String CONN_SOURCE_CLUSTER = "source-cluster";
+    public static final String REFRESH_INTERVAL = "refresh.interval";
+
     /* Mode Config */
     private String mode = "";
     private String incrementingColumnName = "";
diff --git a/rocketmq-connect-jdbc/src/main/java/org/apache/rocketmq/connect/jdbc/config/SinkDbConnectorConfig.java b/rocketmq-connect-jdbc/src/main/java/org/apache/rocketmq/connect/jdbc/config/SinkDbConnectorConfig.java
index 851b253..3ff4f71 100644
--- a/rocketmq-connect-jdbc/src/main/java/org/apache/rocketmq/connect/jdbc/config/SinkDbConnectorConfig.java
+++ b/rocketmq-connect-jdbc/src/main/java/org/apache/rocketmq/connect/jdbc/config/SinkDbConnectorConfig.java
@@ -6,11 +6,17 @@
 import org.apache.rocketmq.connect.jdbc.strategy.DivideTaskByTopic;
 
 import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
 import java.util.Set;
 
 public class SinkDbConnectorConfig extends DbConnectorConfig{
 
     private Set<String> whiteList;
+    private String srcNamesrvs;
+    private String srcCluster;
+    private long refreshInterval;
+    private Map<String, List<TaskTopicInfo>> topicRouteMap;
 
     public SinkDbConnectorConfig(){
     }
@@ -34,11 +40,15 @@
         this.dbUserName = config.getString(Config.CONN_DB_USERNAME);
         this.dbPassword = config.getString(Config.CONN_DB_PASSWORD);
 
+        this.srcNamesrvs = config.getString(Config.CONN_SOURCE_RMQ);
+        this.srcCluster = config.getString(Config.CONN_SOURCE_CLUSTER);
+        this.refreshInterval = config.getLong(Config.REFRESH_INTERVAL, 3);
+
     }
 
     private void buildWhiteList(KeyValue config) {
         this.whiteList = new HashSet<>();
-        String whiteListStr = config.getString(Config.CONN_WHITE_LIST, "");
+        String whiteListStr = config.getString(Config.CONN_TOPIC_NAMES, "");
         String[] wl = whiteListStr.trim().split(",");
         if (wl.length <= 0)
             throw new IllegalArgumentException("White list must be not empty.");
@@ -59,6 +69,26 @@
         this.whiteList = whiteList;
     }
 
+    public String getSrcNamesrvs() {
+        return this.srcNamesrvs;
+    }
+
+    public String getSrcCluster() {
+        return this.srcCluster;
+    }
+
+    public long getRefreshInterval() {
+        return this.refreshInterval;
+    }
+
+    public Map<String, List<TaskTopicInfo>> getTopicRouteMap() {
+        return topicRouteMap;
+    }
+
+    public void setTopicRouteMap(Map<String, List<TaskTopicInfo>> topicRouteMap) {
+        this.topicRouteMap = topicRouteMap;
+    }
+
     @Override
     public Set<String> getWhiteTopics() {
         return getWhiteList();
diff --git a/rocketmq-connect-jdbc/src/main/java/org/apache/rocketmq/connect/jdbc/connector/JdbcSinkConnector.java b/rocketmq-connect-jdbc/src/main/java/org/apache/rocketmq/connect/jdbc/connector/JdbcSinkConnector.java
index 935ad52..0f818ee 100644
--- a/rocketmq-connect-jdbc/src/main/java/org/apache/rocketmq/connect/jdbc/connector/JdbcSinkConnector.java
+++ b/rocketmq-connect-jdbc/src/main/java/org/apache/rocketmq/connect/jdbc/connector/JdbcSinkConnector.java
@@ -3,22 +3,69 @@
 import io.openmessaging.KeyValue;
 import io.openmessaging.connector.api.Task;
 import io.openmessaging.connector.api.sink.SinkConnector;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.commons.lang3.concurrent.BasicThreadFactory;
+import org.apache.commons.lang3.text.StrSubstitutor;
+import org.apache.rocketmq.client.exception.MQClientException;
+import org.apache.rocketmq.common.MixAll;
+import org.apache.rocketmq.common.TopicConfig;
+import org.apache.rocketmq.common.protocol.body.TopicList;
+import org.apache.rocketmq.common.protocol.route.BrokerData;
+import org.apache.rocketmq.common.protocol.route.QueueData;
+import org.apache.rocketmq.common.protocol.route.TopicRouteData;
+import org.apache.rocketmq.connect.jdbc.common.ConstDefine;
+import org.apache.rocketmq.connect.jdbc.common.Utils;
 import org.apache.rocketmq.connect.jdbc.config.*;
+import org.apache.rocketmq.remoting.RPCHook;
+import org.apache.rocketmq.remoting.exception.RemotingException;
+import org.apache.rocketmq.tools.admin.DefaultMQAdminExt;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import java.util.ArrayList;
-import java.util.List;
+import java.util.*;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
 
 
 public class JdbcSinkConnector extends SinkConnector {
     private static final Logger log = LoggerFactory.getLogger(JdbcSinkConnector.class);
-    private KeyValue config;
     private DbConnectorConfig dbConnectorConfig;
     private volatile boolean configValid = false;
+    private ScheduledExecutorService executor;
+    private Map<String, List<TaskTopicInfo>> topicRouteMap;
 
-    public JdbcSinkConnector(){
+    private DefaultMQAdminExt srcMQAdminExt;
+
+    private volatile boolean adminStarted;
+
+    public JdbcSinkConnector() {
+        topicRouteMap = new HashMap<String, List<TaskTopicInfo>>();
         dbConnectorConfig = new SinkDbConnectorConfig();
+        executor = Executors.newSingleThreadScheduledExecutor(new BasicThreadFactory.Builder().namingPattern("JdbcSinkConnector-SinkWatcher-%d").daemon(true).build());
+    }
+
+    private synchronized void startMQAdminTools() {
+        if (!configValid || adminStarted) {
+            return;
+        }
+        RPCHook rpcHook = null;
+        this.srcMQAdminExt = new DefaultMQAdminExt(rpcHook);
+        this.srcMQAdminExt.setNamesrvAddr(((SinkDbConnectorConfig) this.dbConnectorConfig).getSrcNamesrvs());
+        this.srcMQAdminExt.setAdminExtGroup(Utils.createGroupName(ConstDefine.JDBC_CONNECTOR_ADMIN_PREFIX));
+        this.srcMQAdminExt.setInstanceName(Utils.createInstanceName(((SinkDbConnectorConfig) this.dbConnectorConfig).getSrcNamesrvs()));
+
+        try {
+            this.srcMQAdminExt.start();
+            log.info("RocketMQ srcMQAdminExt started");
+
+        } catch (MQClientException e) {
+            log.error("Replicator start failed for `srcMQAdminExt` exception.", e);
+        }
+
+        adminStarted = true;
     }
 
     @Override
@@ -40,7 +87,80 @@
 
     @Override
     public void start() {
+        startMQAdminTools();
+        startListener();
+    }
 
+    public void startListener() {
+        executor.scheduleAtFixedRate(new Runnable() {
+            @Override
+            public void run() {
+                Map<String, List<TaskTopicInfo>> origin = topicRouteMap;
+                topicRouteMap = new HashMap<String, List<TaskTopicInfo>>();
+
+                buildRoute();
+
+                if (!compare(origin, topicRouteMap)) {
+                    context.requestTaskReconfiguration();
+                }
+            }
+        }, ((SinkDbConnectorConfig) dbConnectorConfig).getRefreshInterval(), ((SinkDbConnectorConfig) dbConnectorConfig).getRefreshInterval(), TimeUnit.SECONDS);
+    }
+
+    public boolean compare(Map<String, List<TaskTopicInfo>> origin, Map<String, List<TaskTopicInfo>> updated) {
+        if (origin.size() != updated.size()) {
+            return false;
+        }
+        for (Map.Entry<String, List<TaskTopicInfo>> entry : origin.entrySet()) {
+            if (!updated.containsKey(entry.getKey())) {
+                return false;
+            }
+            List<TaskTopicInfo> originTasks = entry.getValue();
+            List<TaskTopicInfo> updateTasks = updated.get(entry.getKey());
+            if (originTasks.size() != updateTasks.size()) {
+                return false;
+            }
+
+            if (!originTasks.containsAll(updateTasks)) {
+                return false;
+            }
+        }
+
+        return true;
+    }
+
+    public void buildRoute() {
+        String srcCluster = ((SinkDbConnectorConfig) this.dbConnectorConfig).getSrcCluster();
+        try {
+            for (String topic : ((SinkDbConnectorConfig) this.dbConnectorConfig).getWhiteList()) {
+
+                // different from BrokerData with cluster field, which can ensure the brokerData is from expected cluster.
+                // QueueData use brokerName as unique info on cluster of rocketmq. so when we want to get QueueData of
+                // expected cluster, we should get brokerNames of expected cluster, and then filter queueDatas.
+                List<BrokerData> brokerList = Utils.examineBrokerData(this.srcMQAdminExt, topic, srcCluster);
+                Set<String> brokerNameSet = new HashSet<String>();
+                for (BrokerData b : brokerList) {
+                    brokerNameSet.add(b.getBrokerName());
+                }
+
+                TopicRouteData topicRouteData = srcMQAdminExt.examineTopicRouteInfo(topic);
+                if (!topicRouteMap.containsKey(topic)) {
+                    topicRouteMap.put(topic, new ArrayList<TaskTopicInfo>());
+                }
+                for (QueueData qd : topicRouteData.getQueueDatas()) {
+                    if (brokerNameSet.contains(qd.getBrokerName())) {
+                        for (int i = 0; i < qd.getReadQueueNums(); i++) {
+                            TaskTopicInfo taskTopicInfo = new TaskTopicInfo(topic, qd.getBrokerName(), i, null);
+                            topicRouteMap.get(topic).add(taskTopicInfo);
+                        }
+                    }
+                }
+            }
+        } catch (Exception e) {
+            log.error("Fetch topic list error.", e);
+        } finally {
+            srcMQAdminExt.shutdown();
+        }
     }
 
     @Override
@@ -70,6 +190,10 @@
             return new ArrayList<KeyValue>();
         }
 
+        startMQAdminTools();
+
+        buildRoute();
+
         TaskDivideConfig tdc = new TaskDivideConfig(
                 this.dbConnectorConfig.getDbUrl(),
                 this.dbConnectorConfig.getDbPort(),
@@ -80,6 +204,9 @@
                 this.dbConnectorConfig.getTaskParallelism(),
                 this.dbConnectorConfig.getMode()
         );
+
+        ((SinkDbConnectorConfig) this.dbConnectorConfig).setTopicRouteMap(topicRouteMap);
+
         return this.dbConnectorConfig.getTaskDivideStrategy().divide(this.dbConnectorConfig, tdc);
     }
 }
diff --git a/rocketmq-connect-jdbc/src/main/java/org/apache/rocketmq/connect/jdbc/strategy/DivideTaskByQueue.java b/rocketmq-connect-jdbc/src/main/java/org/apache/rocketmq/connect/jdbc/strategy/DivideTaskByQueue.java
index 7ef5c31..797710a 100644
--- a/rocketmq-connect-jdbc/src/main/java/org/apache/rocketmq/connect/jdbc/strategy/DivideTaskByQueue.java
+++ b/rocketmq-connect-jdbc/src/main/java/org/apache/rocketmq/connect/jdbc/strategy/DivideTaskByQueue.java
@@ -19,10 +19,7 @@
 import com.alibaba.fastjson.JSONObject;
 import io.openmessaging.KeyValue;
 import io.openmessaging.internal.DefaultKeyValue;
-import org.apache.rocketmq.replicator.config.DataType;
-import org.apache.rocketmq.replicator.config.TaskConfigEnum;
-import org.apache.rocketmq.replicator.config.TaskDivideConfig;
-import org.apache.rocketmq.replicator.config.TaskTopicInfo;
+import org.apache.rocketmq.connect.jdbc.config.*;
 
 import java.util.ArrayList;
 import java.util.HashMap;
@@ -30,12 +27,21 @@
 import java.util.Map;
 
 public class DivideTaskByQueue extends TaskDivideStrategy {
+
     @Override
-    public List<KeyValue> divide(Map<String, List<TaskTopicInfo>> topicRouteMap, TaskDivideConfig tdc) {
+    public List<KeyValue> divide(DbConnectorConfig dbConnectorConfig, TaskDivideConfig tdc) {
+        if (dbConnectorConfig instanceof SinkDbConnectorConfig){
+            return divideSinkTaskByQueue(dbConnectorConfig, tdc);
+        }
+        return null;
+    }
+
+    public List<KeyValue> divideSinkTaskByQueue(DbConnectorConfig dbConnectorConfig, TaskDivideConfig tdc) {
 
         List<KeyValue> config = new ArrayList<KeyValue>();
         int parallelism = tdc.getTaskParallelism();
         Map<Integer, List<TaskTopicInfo>> queueTopicList = new HashMap<Integer, List<TaskTopicInfo>>();
+        Map<String, List<TaskTopicInfo>> topicRouteMap = ((SinkDbConnectorConfig)dbConnectorConfig).getTopicRouteMap();
         int id = -1;
         for (String t : topicRouteMap.keySet()) {
             for (TaskTopicInfo taskTopicInfo : topicRouteMap.get(t)) {
@@ -49,11 +55,14 @@
 
         for (int i = 0; i < parallelism; i++) {
             KeyValue keyValue = new DefaultKeyValue();
-            keyValue.put(TaskConfigEnum.TASK_STORE_ROCKETMQ.getKey(), tdc.getStoreTopic());
-            keyValue.put(TaskConfigEnum.TASK_SOURCE_ROCKETMQ.getKey(), tdc.getSourceNamesrvAddr());
-            keyValue.put(TaskConfigEnum.TASK_DATA_TYPE.getKey(), DataType.COMMON_MESSAGE.ordinal());
-            keyValue.put(TaskConfigEnum.TASK_TOPIC_INFO.getKey(), JSONObject.toJSONString(queueTopicList.get(i)));
-            keyValue.put(TaskConfigEnum.TASK_SOURCE_RECORD_CONVERTER.getKey(), tdc.getSrcRecordConverter());
+            keyValue.put(Config.CONN_DB_IP, tdc.getDbUrl());
+            keyValue.put(Config.CONN_DB_PORT, tdc.getDbPort());
+            keyValue.put(Config.CONN_DB_USERNAME, tdc.getDbUserName());
+            keyValue.put(Config.CONN_DB_PASSWORD, tdc.getDbPassword());
+            keyValue.put(Config.CONN_TOPIC_NAMES, JSONObject.toJSONString(queueTopicList.get(i)));
+            keyValue.put(Config.CONN_DATA_TYPE, tdc.getDataType());
+            keyValue.put(Config.CONN_SOURCE_RECORD_CONVERTER, tdc.getSrcRecordConverter());
+            keyValue.put(Config.CONN_DB_MODE, tdc.getMode());
             config.add(keyValue);
         }