blob: 3ff4f71e2e981a5542e72b73b8675a33a40b239a [file] [log] [blame]
package org.apache.rocketmq.connect.jdbc.config;
import io.openmessaging.KeyValue;
import org.apache.rocketmq.connect.jdbc.strategy.DivideStrategyEnum;
import org.apache.rocketmq.connect.jdbc.strategy.DivideTaskByQueue;
import org.apache.rocketmq.connect.jdbc.strategy.DivideTaskByTopic;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
public class SinkDbConnectorConfig extends DbConnectorConfig{
private Set<String> whiteList;
private String srcNamesrvs;
private String srcCluster;
private long refreshInterval;
private Map<String, List<TaskTopicInfo>> topicRouteMap;
public SinkDbConnectorConfig(){
}
@Override
public void validate(KeyValue config) {
this.taskParallelism = config.getInt(Config.CONN_TASK_PARALLELISM, 0);
int strategy = config.getInt(Config.CONN_TASK_DIVIDE_STRATEGY, DivideStrategyEnum.BY_QUEUE.ordinal());
if (strategy == DivideStrategyEnum.BY_QUEUE.ordinal()) {
this.taskDivideStrategy = new DivideTaskByQueue();
} else {
this.taskDivideStrategy = new DivideTaskByTopic();
}
buildWhiteList(config);
this.converter = config.getString(Config.CONN_SOURCE_RECORD_CONVERTER);
this.dbUrl = config.getString(Config.CONN_DB_IP);
this.dbPort = config.getString(Config.CONN_DB_PORT);
this.dbUserName = config.getString(Config.CONN_DB_USERNAME);
this.dbPassword = config.getString(Config.CONN_DB_PASSWORD);
this.srcNamesrvs = config.getString(Config.CONN_SOURCE_RMQ);
this.srcCluster = config.getString(Config.CONN_SOURCE_CLUSTER);
this.refreshInterval = config.getLong(Config.REFRESH_INTERVAL, 3);
}
private void buildWhiteList(KeyValue config) {
this.whiteList = new HashSet<>();
String whiteListStr = config.getString(Config.CONN_TOPIC_NAMES, "");
String[] wl = whiteListStr.trim().split(",");
if (wl.length <= 0)
throw new IllegalArgumentException("White list must be not empty.");
else {
this.whiteList.clear();
for (String t : wl) {
this.whiteList.add(t.trim());
}
}
}
public Set<String> getWhiteList() {
return whiteList;
}
public void setWhiteList(Set<String> whiteList) {
this.whiteList = whiteList;
}
public String getSrcNamesrvs() {
return this.srcNamesrvs;
}
public String getSrcCluster() {
return this.srcCluster;
}
public long getRefreshInterval() {
return this.refreshInterval;
}
public Map<String, List<TaskTopicInfo>> getTopicRouteMap() {
return topicRouteMap;
}
public void setTopicRouteMap(Map<String, List<TaskTopicInfo>> topicRouteMap) {
this.topicRouteMap = topicRouteMap;
}
@Override
public Set<String> getWhiteTopics() {
return getWhiteList();
}
}