[ISSUE #495] jdbc-sink-connector support divide task by queue (#496)
* rocketmq-connect-jdbc add JdbcSinkTask & optimize JdbcSourceTask
* jdbc-connector bug-fix duplicate data pushed in table
* [ISSUE #489] JDBC Connector support divide task by topic strategy
* [ISSUE #489] JDBC Connector support divide task by topic strategy
* [ISSUE #495] jdbc-sink-connector support divide task by queue
Co-authored-by: Xiongmengfei <Xiong_mengfei@163.com>
diff --git a/rocketmq-connect-jdbc/pom.xml b/rocketmq-connect-jdbc/pom.xml
index 1d708f3..59442c5 100644
--- a/rocketmq-connect-jdbc/pom.xml
+++ b/rocketmq-connect-jdbc/pom.xml
@@ -16,7 +16,7 @@
<modelVersion>4.0.0</modelVersion>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-connect-jdbc</artifactId>
- <version>1.0.0</version>
+ <version>0.0.1-SNAPSHOT</version>
<name>connect-jdbc</name>
<url>https://github.com/apache/incubator-rocketmq-externals/tree/master/rocketmq-connect-jdbc</url>
@@ -40,6 +40,7 @@
<!-- Compiler settings properties -->
<maven.compiler.source>1.8</maven.compiler.source>
<maven.compiler.target>1.8</maven.compiler.target>
+ <rocketmq.version>4.5.2</rocketmq.version>
</properties>
<build>
@@ -188,7 +189,7 @@
<dependency>
<groupId>io.openmessaging</groupId>
<artifactId>openmessaging-connector</artifactId>
- <version>0.1.0-beta</version>
+ <version>0.1.1-beta-SNAPSHOT</version>
</dependency>
<dependency>
<groupId>io.openmessaging</groupId>
@@ -198,7 +199,7 @@
<dependency>
<groupId>com.alibaba</groupId>
<artifactId>fastjson</artifactId>
- <version>1.2.51</version>
+ <version>1.2.60</version>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
@@ -217,14 +218,25 @@
</dependency>
<dependency>
<groupId>org.apache.rocketmq</groupId>
+ <artifactId>rocketmq-client</artifactId>
+ <version>${rocketmq.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.rocketmq</groupId>
+ <artifactId>rocketmq-tools</artifactId>
+ <version>${rocketmq.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.rocketmq</groupId>
+ <artifactId>rocketmq-remoting</artifactId>
+ <version>${rocketmq.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-openmessaging</artifactId>
<version>4.3.2</version>
</dependency>
- <dependency>
- <groupId>com.alibaba</groupId>
- <artifactId>druid</artifactId>
- <version>1.0.18</version>
- </dependency>
+
<dependency>
<groupId>commons-cli</groupId>
<artifactId>commons-cli</artifactId>
@@ -239,7 +251,19 @@
<groupId>io.javalin</groupId>
<artifactId>javalin</artifactId>
<version>1.3.0</version>
- </dependency>
+ </dependency>
+
+ <dependency>
+ <groupId>com.github.shyiko</groupId>
+ <artifactId>mysql-binlog-connector-java</artifactId>
+ <version>0.20.1</version>
+ </dependency>
+
+ <dependency>
+ <groupId>com.alibaba</groupId>
+ <artifactId>druid</artifactId>
+ <version>1.0.31</version>
+ </dependency>
</dependencies>
diff --git a/rocketmq-connect-jdbc/src/main/java/org/apache/rocketmq/connect/jdbc/common/ConstDefine.java b/rocketmq-connect-jdbc/src/main/java/org/apache/rocketmq/connect/jdbc/common/ConstDefine.java
new file mode 100644
index 0000000..f49d367
--- /dev/null
+++ b/rocketmq-connect-jdbc/src/main/java/org/apache/rocketmq/connect/jdbc/common/ConstDefine.java
@@ -0,0 +1,23 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.rocketmq.connect.jdbc.common;
+
+public class ConstDefine {
+
+ public static String JDBC_CONNECTOR_ADMIN_PREFIX = "JDBC-CONNECTOR-ADMIN";
+
+}
diff --git a/rocketmq-connect-jdbc/src/main/java/org/apache/rocketmq/connect/jdbc/common/DBUtils.java b/rocketmq-connect-jdbc/src/main/java/org/apache/rocketmq/connect/jdbc/common/DBUtils.java
index ccee96b..ab58153 100644
--- a/rocketmq-connect-jdbc/src/main/java/org/apache/rocketmq/connect/jdbc/common/DBUtils.java
+++ b/rocketmq-connect-jdbc/src/main/java/org/apache/rocketmq/connect/jdbc/common/DBUtils.java
@@ -192,9 +192,9 @@
Map<String, String> map = new HashMap<>();
map.put("driverClassName", "com.mysql.cj.jdbc.Driver");
map.put("url",
- "jdbc:mysql://" + config.getJdbcUrl() + "?useSSL=true&verifyServerCertificate=false&serverTimezone=GMT%2B8&characterEncoding=utf8");
- map.put("username", config.getJdbcUsername());
- map.put("password", config.getJdbcPassword());
+ "jdbc:mysql://" + config.getDbUrl() + ":" + config.getDbPort() + "?useSSL=true&verifyServerCertificate=false&serverTimezone=GMT%2B8&characterEncoding=utf8");
+ map.put("username", config.getDbUsername());
+ map.put("password", config.getDbPassword());
map.put("initialSize", "1");
map.put("maxActive", "1");
map.put("maxWait", "60000");
diff --git a/rocketmq-connect-jdbc/src/main/java/org/apache/rocketmq/connect/jdbc/common/Utils.java b/rocketmq-connect-jdbc/src/main/java/org/apache/rocketmq/connect/jdbc/common/Utils.java
new file mode 100644
index 0000000..5708e34
--- /dev/null
+++ b/rocketmq-connect-jdbc/src/main/java/org/apache/rocketmq/connect/jdbc/common/Utils.java
@@ -0,0 +1,74 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.rocketmq.connect.jdbc.common;
+
+import org.apache.commons.lang3.StringUtils;
+import org.apache.rocketmq.client.exception.MQClientException;
+import org.apache.rocketmq.common.protocol.route.BrokerData;
+import org.apache.rocketmq.common.protocol.route.TopicRouteData;
+import org.apache.rocketmq.remoting.exception.RemotingException;
+import org.apache.rocketmq.tools.admin.DefaultMQAdminExt;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+
+public class Utils {
+ private static final Logger log = LoggerFactory.getLogger(Utils.class);
+
+ public static String createGroupName(String prefix) {
+ return new StringBuilder().append(prefix).append("-").append(System.currentTimeMillis()).toString();
+ }
+
+ public static String createGroupName(String prefix, String postfix) {
+ return new StringBuilder().append(prefix).append("-").append(postfix).toString();
+ }
+
+ public static String createTaskId(String prefix) {
+ return new StringBuilder().append(prefix).append("-").append(System.currentTimeMillis()).toString();
+ }
+
+ public static String createInstanceName(String namesrvAddr) {
+ String[] namesrvArray = namesrvAddr.split(";");
+ List<String> namesrvList = new ArrayList<>();
+ for (String ns : namesrvArray) {
+ if (!namesrvList.contains(ns)) {
+ namesrvList.add(ns);
+ }
+ }
+ Collections.sort(namesrvList);
+ return String.valueOf(namesrvList.toString().hashCode());
+ }
+
+ public static List<BrokerData> examineBrokerData(DefaultMQAdminExt defaultMQAdminExt, String topic,
+ String cluster) throws RemotingException, MQClientException, InterruptedException {
+ List<BrokerData> brokerList = new ArrayList<>();
+
+ TopicRouteData topicRouteData = defaultMQAdminExt.examineTopicRouteInfo(topic);
+ if (topicRouteData.getBrokerDatas() != null) {
+ for (BrokerData broker : topicRouteData.getBrokerDatas()) {
+ if (StringUtils.equals(broker.getCluster(), cluster)) {
+ brokerList.add(broker);
+ }
+ }
+ }
+ return brokerList;
+ }
+
+}
diff --git a/rocketmq-connect-jdbc/src/main/java/org/apache/rocketmq/connect/jdbc/config/Config.java b/rocketmq-connect-jdbc/src/main/java/org/apache/rocketmq/connect/jdbc/config/Config.java
index 91a3e51..ae86d3f 100644
--- a/rocketmq-connect-jdbc/src/main/java/org/apache/rocketmq/connect/jdbc/config/Config.java
+++ b/rocketmq-connect-jdbc/src/main/java/org/apache/rocketmq/connect/jdbc/config/Config.java
@@ -59,6 +59,10 @@
public static final String CONN_TOPIC_NAMES = "topicNames";
public static final String CONN_DB_MODE = "mode";
+ public static final String CONN_SOURCE_RMQ = "source-rocketmq";
+ public static final String CONN_SOURCE_CLUSTER = "source-cluster";
+ public static final String REFRESH_INTERVAL = "refresh.interval";
+
/* Mode Config */
private String mode = "";
private String incrementingColumnName = "";
diff --git a/rocketmq-connect-jdbc/src/main/java/org/apache/rocketmq/connect/jdbc/config/SinkDbConnectorConfig.java b/rocketmq-connect-jdbc/src/main/java/org/apache/rocketmq/connect/jdbc/config/SinkDbConnectorConfig.java
index 851b253..3ff4f71 100644
--- a/rocketmq-connect-jdbc/src/main/java/org/apache/rocketmq/connect/jdbc/config/SinkDbConnectorConfig.java
+++ b/rocketmq-connect-jdbc/src/main/java/org/apache/rocketmq/connect/jdbc/config/SinkDbConnectorConfig.java
@@ -6,11 +6,17 @@
import org.apache.rocketmq.connect.jdbc.strategy.DivideTaskByTopic;
import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
import java.util.Set;
public class SinkDbConnectorConfig extends DbConnectorConfig{
private Set<String> whiteList;
+ private String srcNamesrvs;
+ private String srcCluster;
+ private long refreshInterval;
+ private Map<String, List<TaskTopicInfo>> topicRouteMap;
public SinkDbConnectorConfig(){
}
@@ -34,11 +40,15 @@
this.dbUserName = config.getString(Config.CONN_DB_USERNAME);
this.dbPassword = config.getString(Config.CONN_DB_PASSWORD);
+ this.srcNamesrvs = config.getString(Config.CONN_SOURCE_RMQ);
+ this.srcCluster = config.getString(Config.CONN_SOURCE_CLUSTER);
+ this.refreshInterval = config.getLong(Config.REFRESH_INTERVAL, 3);
+
}
private void buildWhiteList(KeyValue config) {
this.whiteList = new HashSet<>();
- String whiteListStr = config.getString(Config.CONN_WHITE_LIST, "");
+ String whiteListStr = config.getString(Config.CONN_TOPIC_NAMES, "");
String[] wl = whiteListStr.trim().split(",");
if (wl.length <= 0)
throw new IllegalArgumentException("White list must be not empty.");
@@ -59,6 +69,26 @@
this.whiteList = whiteList;
}
+ public String getSrcNamesrvs() {
+ return this.srcNamesrvs;
+ }
+
+ public String getSrcCluster() {
+ return this.srcCluster;
+ }
+
+ public long getRefreshInterval() {
+ return this.refreshInterval;
+ }
+
+ public Map<String, List<TaskTopicInfo>> getTopicRouteMap() {
+ return topicRouteMap;
+ }
+
+ public void setTopicRouteMap(Map<String, List<TaskTopicInfo>> topicRouteMap) {
+ this.topicRouteMap = topicRouteMap;
+ }
+
@Override
public Set<String> getWhiteTopics() {
return getWhiteList();
diff --git a/rocketmq-connect-jdbc/src/main/java/org/apache/rocketmq/connect/jdbc/connector/JdbcSinkConnector.java b/rocketmq-connect-jdbc/src/main/java/org/apache/rocketmq/connect/jdbc/connector/JdbcSinkConnector.java
index 935ad52..0f818ee 100644
--- a/rocketmq-connect-jdbc/src/main/java/org/apache/rocketmq/connect/jdbc/connector/JdbcSinkConnector.java
+++ b/rocketmq-connect-jdbc/src/main/java/org/apache/rocketmq/connect/jdbc/connector/JdbcSinkConnector.java
@@ -3,22 +3,69 @@
import io.openmessaging.KeyValue;
import io.openmessaging.connector.api.Task;
import io.openmessaging.connector.api.sink.SinkConnector;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.commons.lang3.concurrent.BasicThreadFactory;
+import org.apache.commons.lang3.text.StrSubstitutor;
+import org.apache.rocketmq.client.exception.MQClientException;
+import org.apache.rocketmq.common.MixAll;
+import org.apache.rocketmq.common.TopicConfig;
+import org.apache.rocketmq.common.protocol.body.TopicList;
+import org.apache.rocketmq.common.protocol.route.BrokerData;
+import org.apache.rocketmq.common.protocol.route.QueueData;
+import org.apache.rocketmq.common.protocol.route.TopicRouteData;
+import org.apache.rocketmq.connect.jdbc.common.ConstDefine;
+import org.apache.rocketmq.connect.jdbc.common.Utils;
import org.apache.rocketmq.connect.jdbc.config.*;
+import org.apache.rocketmq.remoting.RPCHook;
+import org.apache.rocketmq.remoting.exception.RemotingException;
+import org.apache.rocketmq.tools.admin.DefaultMQAdminExt;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import java.util.ArrayList;
-import java.util.List;
+import java.util.*;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
public class JdbcSinkConnector extends SinkConnector {
private static final Logger log = LoggerFactory.getLogger(JdbcSinkConnector.class);
- private KeyValue config;
private DbConnectorConfig dbConnectorConfig;
private volatile boolean configValid = false;
+ private ScheduledExecutorService executor;
+ private Map<String, List<TaskTopicInfo>> topicRouteMap;
- public JdbcSinkConnector(){
+ private DefaultMQAdminExt srcMQAdminExt;
+
+ private volatile boolean adminStarted;
+
+ public JdbcSinkConnector() {
+ topicRouteMap = new HashMap<String, List<TaskTopicInfo>>();
dbConnectorConfig = new SinkDbConnectorConfig();
+ executor = Executors.newSingleThreadScheduledExecutor(new BasicThreadFactory.Builder().namingPattern("JdbcSinkConnector-SinkWatcher-%d").daemon(true).build());
+ }
+
+ private synchronized void startMQAdminTools() {
+ if (!configValid || adminStarted) {
+ return;
+ }
+ RPCHook rpcHook = null;
+ this.srcMQAdminExt = new DefaultMQAdminExt(rpcHook);
+ this.srcMQAdminExt.setNamesrvAddr(((SinkDbConnectorConfig) this.dbConnectorConfig).getSrcNamesrvs());
+ this.srcMQAdminExt.setAdminExtGroup(Utils.createGroupName(ConstDefine.JDBC_CONNECTOR_ADMIN_PREFIX));
+ this.srcMQAdminExt.setInstanceName(Utils.createInstanceName(((SinkDbConnectorConfig) this.dbConnectorConfig).getSrcNamesrvs()));
+
+ try {
+ this.srcMQAdminExt.start();
+ log.info("RocketMQ srcMQAdminExt started");
+
+ } catch (MQClientException e) {
+ log.error("Replicator start failed for `srcMQAdminExt` exception.", e);
+ }
+
+ adminStarted = true;
}
@Override
@@ -40,7 +87,80 @@
@Override
public void start() {
+ startMQAdminTools();
+ startListener();
+ }
+ public void startListener() {
+ executor.scheduleAtFixedRate(new Runnable() {
+ @Override
+ public void run() {
+ Map<String, List<TaskTopicInfo>> origin = topicRouteMap;
+ topicRouteMap = new HashMap<String, List<TaskTopicInfo>>();
+
+ buildRoute();
+
+ if (!compare(origin, topicRouteMap)) {
+ context.requestTaskReconfiguration();
+ }
+ }
+ }, ((SinkDbConnectorConfig) dbConnectorConfig).getRefreshInterval(), ((SinkDbConnectorConfig) dbConnectorConfig).getRefreshInterval(), TimeUnit.SECONDS);
+ }
+
+ public boolean compare(Map<String, List<TaskTopicInfo>> origin, Map<String, List<TaskTopicInfo>> updated) {
+ if (origin.size() != updated.size()) {
+ return false;
+ }
+ for (Map.Entry<String, List<TaskTopicInfo>> entry : origin.entrySet()) {
+ if (!updated.containsKey(entry.getKey())) {
+ return false;
+ }
+ List<TaskTopicInfo> originTasks = entry.getValue();
+ List<TaskTopicInfo> updateTasks = updated.get(entry.getKey());
+ if (originTasks.size() != updateTasks.size()) {
+ return false;
+ }
+
+ if (!originTasks.containsAll(updateTasks)) {
+ return false;
+ }
+ }
+
+ return true;
+ }
+
+ public void buildRoute() {
+ String srcCluster = ((SinkDbConnectorConfig) this.dbConnectorConfig).getSrcCluster();
+ try {
+ for (String topic : ((SinkDbConnectorConfig) this.dbConnectorConfig).getWhiteList()) {
+
+ // different from BrokerData with cluster field, which can ensure the brokerData is from expected cluster.
+ // QueueData use brokerName as unique info on cluster of rocketmq. so when we want to get QueueData of
+ // expected cluster, we should get brokerNames of expected cluster, and then filter queueDatas.
+ List<BrokerData> brokerList = Utils.examineBrokerData(this.srcMQAdminExt, topic, srcCluster);
+ Set<String> brokerNameSet = new HashSet<String>();
+ for (BrokerData b : brokerList) {
+ brokerNameSet.add(b.getBrokerName());
+ }
+
+ TopicRouteData topicRouteData = srcMQAdminExt.examineTopicRouteInfo(topic);
+ if (!topicRouteMap.containsKey(topic)) {
+ topicRouteMap.put(topic, new ArrayList<TaskTopicInfo>());
+ }
+ for (QueueData qd : topicRouteData.getQueueDatas()) {
+ if (brokerNameSet.contains(qd.getBrokerName())) {
+ for (int i = 0; i < qd.getReadQueueNums(); i++) {
+ TaskTopicInfo taskTopicInfo = new TaskTopicInfo(topic, qd.getBrokerName(), i, null);
+ topicRouteMap.get(topic).add(taskTopicInfo);
+ }
+ }
+ }
+ }
+ } catch (Exception e) {
+ log.error("Fetch topic list error.", e);
+ } finally {
+ srcMQAdminExt.shutdown();
+ }
}
@Override
@@ -70,6 +190,10 @@
return new ArrayList<KeyValue>();
}
+ startMQAdminTools();
+
+ buildRoute();
+
TaskDivideConfig tdc = new TaskDivideConfig(
this.dbConnectorConfig.getDbUrl(),
this.dbConnectorConfig.getDbPort(),
@@ -80,6 +204,9 @@
this.dbConnectorConfig.getTaskParallelism(),
this.dbConnectorConfig.getMode()
);
+
+ ((SinkDbConnectorConfig) this.dbConnectorConfig).setTopicRouteMap(topicRouteMap);
+
return this.dbConnectorConfig.getTaskDivideStrategy().divide(this.dbConnectorConfig, tdc);
}
}
diff --git a/rocketmq-connect-jdbc/src/main/java/org/apache/rocketmq/connect/jdbc/strategy/DivideTaskByQueue.java b/rocketmq-connect-jdbc/src/main/java/org/apache/rocketmq/connect/jdbc/strategy/DivideTaskByQueue.java
index 7ef5c31..797710a 100644
--- a/rocketmq-connect-jdbc/src/main/java/org/apache/rocketmq/connect/jdbc/strategy/DivideTaskByQueue.java
+++ b/rocketmq-connect-jdbc/src/main/java/org/apache/rocketmq/connect/jdbc/strategy/DivideTaskByQueue.java
@@ -19,10 +19,7 @@
import com.alibaba.fastjson.JSONObject;
import io.openmessaging.KeyValue;
import io.openmessaging.internal.DefaultKeyValue;
-import org.apache.rocketmq.replicator.config.DataType;
-import org.apache.rocketmq.replicator.config.TaskConfigEnum;
-import org.apache.rocketmq.replicator.config.TaskDivideConfig;
-import org.apache.rocketmq.replicator.config.TaskTopicInfo;
+import org.apache.rocketmq.connect.jdbc.config.*;
import java.util.ArrayList;
import java.util.HashMap;
@@ -30,12 +27,21 @@
import java.util.Map;
public class DivideTaskByQueue extends TaskDivideStrategy {
+
@Override
- public List<KeyValue> divide(Map<String, List<TaskTopicInfo>> topicRouteMap, TaskDivideConfig tdc) {
+ public List<KeyValue> divide(DbConnectorConfig dbConnectorConfig, TaskDivideConfig tdc) {
+ if (dbConnectorConfig instanceof SinkDbConnectorConfig){
+ return divideSinkTaskByQueue(dbConnectorConfig, tdc);
+ }
+ return null;
+ }
+
+ public List<KeyValue> divideSinkTaskByQueue(DbConnectorConfig dbConnectorConfig, TaskDivideConfig tdc) {
List<KeyValue> config = new ArrayList<KeyValue>();
int parallelism = tdc.getTaskParallelism();
Map<Integer, List<TaskTopicInfo>> queueTopicList = new HashMap<Integer, List<TaskTopicInfo>>();
+ Map<String, List<TaskTopicInfo>> topicRouteMap = ((SinkDbConnectorConfig)dbConnectorConfig).getTopicRouteMap();
int id = -1;
for (String t : topicRouteMap.keySet()) {
for (TaskTopicInfo taskTopicInfo : topicRouteMap.get(t)) {
@@ -49,11 +55,14 @@
for (int i = 0; i < parallelism; i++) {
KeyValue keyValue = new DefaultKeyValue();
- keyValue.put(TaskConfigEnum.TASK_STORE_ROCKETMQ.getKey(), tdc.getStoreTopic());
- keyValue.put(TaskConfigEnum.TASK_SOURCE_ROCKETMQ.getKey(), tdc.getSourceNamesrvAddr());
- keyValue.put(TaskConfigEnum.TASK_DATA_TYPE.getKey(), DataType.COMMON_MESSAGE.ordinal());
- keyValue.put(TaskConfigEnum.TASK_TOPIC_INFO.getKey(), JSONObject.toJSONString(queueTopicList.get(i)));
- keyValue.put(TaskConfigEnum.TASK_SOURCE_RECORD_CONVERTER.getKey(), tdc.getSrcRecordConverter());
+ keyValue.put(Config.CONN_DB_IP, tdc.getDbUrl());
+ keyValue.put(Config.CONN_DB_PORT, tdc.getDbPort());
+ keyValue.put(Config.CONN_DB_USERNAME, tdc.getDbUserName());
+ keyValue.put(Config.CONN_DB_PASSWORD, tdc.getDbPassword());
+ keyValue.put(Config.CONN_TOPIC_NAMES, JSONObject.toJSONString(queueTopicList.get(i)));
+ keyValue.put(Config.CONN_DATA_TYPE, tdc.getDataType());
+ keyValue.put(Config.CONN_SOURCE_RECORD_CONVERTER, tdc.getSrcRecordConverter());
+ keyValue.put(Config.CONN_DB_MODE, tdc.getMode());
config.add(keyValue);
}