blob: 68395d346c5f6d0957e4f2af2b39cc7393aa19c0 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.rocketmq.connect.cassandra.strategy;
import com.alibaba.fastjson.JSONObject;
import io.openmessaging.KeyValue;
import io.openmessaging.connector.api.Task;
import io.openmessaging.internal.DefaultKeyValue;
import org.apache.rocketmq.connect.cassandra.config.*;
import java.util.*;
public class DivideTaskByTopic extends TaskDivideStrategy {
@Override
public List<KeyValue> divide(DbConnectorConfig dbConnectorConfig, TaskDivideConfig tdc) {
if (dbConnectorConfig instanceof SourceDbConnectorConfig){
return divideSourceTaskByTopic(dbConnectorConfig, tdc);
}else {
return divideSinkTaskByTopic(dbConnectorConfig, tdc);
}
}
private List<KeyValue> divideSinkTaskByTopic(DbConnectorConfig dbConnectorConfig, TaskDivideConfig tdc) {
List<KeyValue> config = new ArrayList<KeyValue>();
int parallelism = tdc.getTaskParallelism();
int id = -1;
Set<String> topicRouteSet = ((SinkDbConnectorConfig)dbConnectorConfig).getWhiteTopics();
Map<Integer, StringBuilder> taskTopicList = new HashMap<>();
for (String topicName : topicRouteSet) {
int ind = ++id % parallelism;
if (!taskTopicList.containsKey(ind)) {
taskTopicList.put(ind, new StringBuilder(topicName));
} else {
taskTopicList.get(ind).append(",").append(topicName);
}
}
for (int i = 0; i < parallelism; i++) {
KeyValue keyValue = new DefaultKeyValue();
keyValue.put(Config.CONN_DB_IP, tdc.getDbUrl());
keyValue.put(Config.CONN_DB_PORT, tdc.getDbPort());
keyValue.put(Config.CONN_DB_USERNAME, tdc.getDbUserName());
keyValue.put(Config.CONN_DB_PASSWORD, tdc.getDbPassword());
keyValue.put(Config.CONN_DB_DATACENTER, tdc.getLocalDataCenter());
keyValue.put(Config.CONN_TOPIC_NAMES, taskTopicList.get(i).toString());
keyValue.put(Config.CONN_DATA_TYPE, tdc.getDataType());
keyValue.put(Config.CONN_SOURCE_RECORD_CONVERTER, tdc.getSrcRecordConverter());
keyValue.put(Config.CONN_DB_MODE, tdc.getMode());
config.add(keyValue);
}
return config;
}
private List<KeyValue> divideSourceTaskByTopic(DbConnectorConfig dbConnectorConfig, TaskDivideConfig tdc) {
List<KeyValue> config = new ArrayList<KeyValue>();
int parallelism = tdc.getTaskParallelism();
int id = -1;
Map<String, String> topicRouteMap = ((SourceDbConnectorConfig)dbConnectorConfig).getWhiteTopics();
Map<Integer, Map<String, Map<String, String>>> taskTopicList = new HashMap<>();
for (Map.Entry<String, String> entry : topicRouteMap.entrySet()) {
int ind = ++id % parallelism;
if (!taskTopicList.containsKey(ind)) {
taskTopicList.put(ind, new HashMap<>());
}
String dbKey = entry.getKey().split("-")[0];
String tableKey = entry.getKey().split("-")[1];
String filter = entry.getValue();
Map<String, String> tableMap = new HashMap<>();
tableMap.put(tableKey, filter);
if(!taskTopicList.get(ind).containsKey(dbKey)){
taskTopicList.get(ind).put(dbKey, tableMap);
}else {
taskTopicList.get(ind).get(dbKey).putAll(tableMap);
}
}
for (int i = 0; i < parallelism; i++) {
KeyValue keyValue = new DefaultKeyValue();
keyValue.put(Config.CONN_DB_IP, tdc.getDbUrl());
keyValue.put(Config.CONN_DB_PORT, tdc.getDbPort());
keyValue.put(Config.CONN_DB_USERNAME, tdc.getDbUserName());
keyValue.put(Config.CONN_DB_PASSWORD, tdc.getDbPassword());
keyValue.put(Config.CONN_DB_DATACENTER, tdc.getLocalDataCenter());
keyValue.put(Config.CONN_WHITE_LIST, JSONObject.toJSONString(taskTopicList.get(i)));
keyValue.put(Config.CONN_DATA_TYPE, tdc.getDataType());
keyValue.put(Config.CONN_SOURCE_RECORD_CONVERTER, tdc.getSrcRecordConverter());
keyValue.put(Config.CONN_DB_MODE, tdc.getMode());
config.add(keyValue);
}
return config;
}
}