blob: dcc479ee8bf3849f0073d3f5276d1266789bf585 [file] [log] [blame]
package geode.kafka;
import java.util.Arrays;
import java.util.Collection;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
public class GeodeConnectorConfig {
//Geode Configuration
public static final String DURABLE_CLIENT_ID_PREFIX = "durableClientId";
public static final String DEFAULT_DURABLE_CLIENT_ID = "";
public static final String DURABLE_CLIENT_TIME_OUT = "durableClientTimeout";
public static final String DEFAULT_DURABLE_CLIENT_TIMEOUT = "60000";
//GeodeKafka Specific Configuration
public static final String TASK_ID = "GEODE_TASK_ID"; //One config per task
public static final String CQ_PREFIX = "cqPrefix";
public static final String DEFAULT_CQ_PREFIX = "cqForGeodeKafka";
/**
* Specifies which Locators to connect to Apache Geode
*/
public static final String LOCATORS = "locators";
public static final String DEFAULT_LOCATOR = "localhost[10334]";
/**
* Specifies which Topics to connect in Kafka, uses the variable name with Kafka Sink Configuration
* Only used in sink configuration
*/
public static final String TOPICS = "topics";
//Used by sink
public static final String TOPIC_TO_REGION_BINDINGS = "topicToRegion";
//Used by source
public static final String REGION_TO_TOPIC_BINDINGS = "regionToTopic";
/**
* Property to describe the Source Partition in a record
*/
public static final String REGION_NAME = "regionName"; //used for Source Partition Events
public static final String BATCH_SIZE = "geodeConnectorBatchSize";
public static final String DEFAULT_BATCH_SIZE = "100";
public static final String QUEUE_SIZE = "geodeConnectorQueueSize";
public static final String DEFAULT_QUEUE_SIZE = "100000";
public static final String LOAD_ENTIRE_REGION = "loadEntireRegion";
public static final String DEFAULT_LOAD_ENTIRE_REGION = "false";
private final int taskId;
private final String durableClientId;
private final String durableClientIdPrefix;
private final String durableClientTimeout;
private Map<String, List<String>> regionToTopics;
private Map<String, List<String>> topicToRegions;
private List<LocatorHostPort> locatorHostPorts;
//just for tests
GeodeConnectorConfig() {
taskId = 0;
durableClientId = "";
durableClientIdPrefix = "";
durableClientTimeout = "0";
}
public GeodeConnectorConfig(Map<String, String> connectorProperties) {
taskId = Integer.parseInt(connectorProperties.get(TASK_ID));
durableClientIdPrefix = connectorProperties.get(DURABLE_CLIENT_ID_PREFIX);
if (isDurable(durableClientIdPrefix)) {
durableClientId = durableClientIdPrefix + taskId;
} else {
durableClientId = "";
}
durableClientTimeout = connectorProperties.get(DURABLE_CLIENT_TIME_OUT);
regionToTopics = parseRegionToTopics(connectorProperties.get(GeodeConnectorConfig.REGION_TO_TOPIC_BINDINGS));
topicToRegions = parseTopicToRegions(connectorProperties.get(GeodeConnectorConfig.TOPIC_TO_REGION_BINDINGS));
locatorHostPorts = parseLocators(connectorProperties.get(GeodeConnectorConfig.LOCATORS));
}
public static Map<String, List<String>> parseTopicToRegions(String combinedBindings) {
//It's the same formatting, so parsing is the same going topic to region or region to topic
return parseRegionToTopics(combinedBindings);
}
/**
* Given a string of the form [region:topic,...] will produce a map where the key is the
* regionName and the value is a list of topicNames to push values to
*
* @param combinedBindings a string with similar form to "[region:topic,...], [region2:topic2,...]
* @return mapping of regionName to list of topics to update
*/
public static Map<String, List<String>> parseRegionToTopics(String combinedBindings) {
if (combinedBindings == "" || combinedBindings == null){
return null;
}
List<String> bindings = parseBindings(combinedBindings);
return bindings.stream().map(binding -> {
String[] regionToTopicsArray = parseBinding(binding);
return regionToTopicsArray;
}).collect(Collectors.toMap(regionToTopicsArray -> regionToTopicsArray[0], regionToTopicsArray -> parseNames(regionToTopicsArray[1])));
}
public static List<String> parseBindings(String bindings) {
return Arrays.stream(bindings.split("](\\s)*,")).map((s) -> {
s = s.replaceAll("\\[", "");
s = s.replaceAll("\\]", "");
s = s.trim();
return s;
}).collect(Collectors.toList());
}
private static String[] parseBinding(String binding) {
return binding.split(":");
}
//Used to parse a string of topics or regions
public static List<String> parseNames(String names) {
return Arrays.stream(names.split(",")).map((s) -> s.trim()).collect(Collectors.toList());
}
public static String reconstructString(Collection<String> strings) {
return strings.stream().collect(Collectors.joining("],[")) + "]";
}
List<LocatorHostPort> parseLocators(String locators) {
return Arrays.stream(locators.split(",")).map((s) -> {
String locatorString = s.trim();
return parseLocator(locatorString);
}).collect(Collectors.toList());
}
private LocatorHostPort parseLocator(String locatorString) {
String[] splits = locatorString.split("\\[");
String locator = splits[0];
int port = Integer.parseInt(splits[1].replace("]", ""));
return new LocatorHostPort(locator, port);
}
public boolean isDurable() {
return isDurable(durableClientId);
}
/**
* @param durableClientId or prefix can be passed in. Either both will be "" or both will have a value
* @return
*/
boolean isDurable(String durableClientId) {
return !durableClientId.equals("");
}
public int getTaskId() {
return taskId;
}
public String getDurableClientId() {
return durableClientId;
}
public String getDurableClientTimeout() {
return durableClientTimeout;
}
public List<LocatorHostPort> getLocatorHostPorts() {
return locatorHostPorts;
}
public Map<String, List<String>> getRegionToTopics() {
return regionToTopics;
}
public Map<String, List<String>> getTopicToRegions() {
return topicToRegions;
}
}