Seperated out Configuration into source and sink where appropriate
Added a few more tests
diff --git a/build.gradle b/build.gradle
index a131d22..114b3cd 100644
--- a/build.gradle
+++ b/build.gradle
@@ -19,7 +19,6 @@
compile group: 'org.apache.logging.log4j', name: 'log4j-api', version: '2.13.0'
compile group: 'org.apache.logging.log4j', name: 'log4j-core', version: '2.13.0'
-
testCompile(group: 'org.apache.kafka', name: 'kafka_2.12', version: '2.3.1')
testCompile(group: 'org.apache.kafka', name: 'kafka-streams-test-utils', version: '1.1.0')
testCompile(group: 'org.apache.curator', name: 'curator-framework', version: '4.2.0')
@@ -31,3 +30,4 @@
testImplementation 'org.awaitility:awaitility:4.0.2'
}
+
diff --git a/settings.gradle b/settings.gradle
index b5a2326..48eed3b 100644
--- a/settings.gradle
+++ b/settings.gradle
@@ -1,2 +1 @@
-rootProject.name = 'geode-kafka-connector'
-
+rootProject.name = 'geode-kafka-connector'
\ No newline at end of file
diff --git a/src/main/java/geode/kafka/GeodeConnectorConfig.java b/src/main/java/geode/kafka/GeodeConnectorConfig.java
index dcc479e..396d428 100644
--- a/src/main/java/geode/kafka/GeodeConnectorConfig.java
+++ b/src/main/java/geode/kafka/GeodeConnectorConfig.java
@@ -8,79 +8,26 @@
public class GeodeConnectorConfig {
- //Geode Configuration
- public static final String DURABLE_CLIENT_ID_PREFIX = "durableClientId";
- public static final String DEFAULT_DURABLE_CLIENT_ID = "";
- public static final String DURABLE_CLIENT_TIME_OUT = "durableClientTimeout";
- public static final String DEFAULT_DURABLE_CLIENT_TIMEOUT = "60000";
-
//GeodeKafka Specific Configuration
+ /**
+ * Identifier for each task
+ */
public static final String TASK_ID = "GEODE_TASK_ID"; //One config per task
-
- public static final String CQ_PREFIX = "cqPrefix";
- public static final String DEFAULT_CQ_PREFIX = "cqForGeodeKafka";
/**
* Specifies which Locators to connect to Apache Geode
*/
public static final String LOCATORS = "locators";
public static final String DEFAULT_LOCATOR = "localhost[10334]";
+ protected final int taskId;
+ protected List<LocatorHostPort> locatorHostPorts;
- /**
- * Specifies which Topics to connect in Kafka, uses the variable name with Kafka Sink Configuration
- * Only used in sink configuration
- */
- public static final String TOPICS = "topics";
-
- //Used by sink
- public static final String TOPIC_TO_REGION_BINDINGS = "topicToRegion";
-
- //Used by source
- public static final String REGION_TO_TOPIC_BINDINGS = "regionToTopic";
-
- /**
- * Property to describe the Source Partition in a record
- */
- public static final String REGION_NAME = "regionName"; //used for Source Partition Events
-
- public static final String BATCH_SIZE = "geodeConnectorBatchSize";
- public static final String DEFAULT_BATCH_SIZE = "100";
-
- public static final String QUEUE_SIZE = "geodeConnectorQueueSize";
- public static final String DEFAULT_QUEUE_SIZE = "100000";
-
- public static final String LOAD_ENTIRE_REGION = "loadEntireRegion";
- public static final String DEFAULT_LOAD_ENTIRE_REGION = "false";
-
-
- private final int taskId;
- private final String durableClientId;
- private final String durableClientIdPrefix;
- private final String durableClientTimeout;
-
- private Map<String, List<String>> regionToTopics;
- private Map<String, List<String>> topicToRegions;
- private List<LocatorHostPort> locatorHostPorts;
-
- //just for tests
- GeodeConnectorConfig() {
+ protected GeodeConnectorConfig() {
taskId = 0;
- durableClientId = "";
- durableClientIdPrefix = "";
- durableClientTimeout = "0";
}
public GeodeConnectorConfig(Map<String, String> connectorProperties) {
taskId = Integer.parseInt(connectorProperties.get(TASK_ID));
- durableClientIdPrefix = connectorProperties.get(DURABLE_CLIENT_ID_PREFIX);
- if (isDurable(durableClientIdPrefix)) {
- durableClientId = durableClientIdPrefix + taskId;
- } else {
- durableClientId = "";
- }
- durableClientTimeout = connectorProperties.get(DURABLE_CLIENT_TIME_OUT);
- regionToTopics = parseRegionToTopics(connectorProperties.get(GeodeConnectorConfig.REGION_TO_TOPIC_BINDINGS));
- topicToRegions = parseTopicToRegions(connectorProperties.get(GeodeConnectorConfig.TOPIC_TO_REGION_BINDINGS));
locatorHostPorts = parseLocators(connectorProperties.get(GeodeConnectorConfig.LOCATORS));
}
@@ -105,7 +52,7 @@
return bindings.stream().map(binding -> {
String[] regionToTopicsArray = parseBinding(binding);
return regionToTopicsArray;
- }).collect(Collectors.toMap(regionToTopicsArray -> regionToTopicsArray[0], regionToTopicsArray -> parseNames(regionToTopicsArray[1])));
+ }).collect(Collectors.toMap(regionToTopicsArray -> regionToTopicsArray[0], regionToTopicsArray -> parseStringByComma(regionToTopicsArray[1])));
}
public static List<String> parseBindings(String bindings) {
@@ -122,8 +69,12 @@
}
//Used to parse a string of topics or regions
- public static List<String> parseNames(String names) {
- return Arrays.stream(names.split(",")).map((s) -> s.trim()).collect(Collectors.toList());
+ public static List<String> parseStringByComma(String string) {
+ return parseStringBy(string, ",");
+ }
+
+ public static List<String> parseStringBy(String string, String regex) {
+ return Arrays.stream(string.split(regex)).map((s) -> s.trim()).collect(Collectors.toList());
}
public static String reconstructString(Collection<String> strings) {
@@ -144,40 +95,11 @@
return new LocatorHostPort(locator, port);
}
- public boolean isDurable() {
- return isDurable(durableClientId);
- }
-
- /**
- * @param durableClientId or prefix can be passed in. Either both will be "" or both will have a value
- * @return
- */
- boolean isDurable(String durableClientId) {
- return !durableClientId.equals("");
- }
-
public int getTaskId() {
return taskId;
}
- public String getDurableClientId() {
- return durableClientId;
- }
-
- public String getDurableClientTimeout() {
- return durableClientTimeout;
- }
-
public List<LocatorHostPort> getLocatorHostPorts() {
return locatorHostPorts;
}
-
- public Map<String, List<String>> getRegionToTopics() {
- return regionToTopics;
- }
-
- public Map<String, List<String>> getTopicToRegions() {
- return topicToRegions;
- }
-
}
diff --git a/src/main/java/geode/kafka/GeodeContext.java b/src/main/java/geode/kafka/GeodeContext.java
index d1fd3ae..b7c3d27 100644
--- a/src/main/java/geode/kafka/GeodeContext.java
+++ b/src/main/java/geode/kafka/GeodeContext.java
@@ -2,6 +2,8 @@
import org.apache.geode.cache.client.ClientCache;
import org.apache.geode.cache.client.ClientCacheFactory;
+import org.apache.geode.cache.client.PoolFactory;
+import org.apache.geode.cache.client.PoolManager;
import org.apache.geode.cache.query.CqAttributes;
import org.apache.geode.cache.query.CqException;
import org.apache.geode.cache.query.CqExistsException;
@@ -17,20 +19,42 @@
private ClientCache clientCache;
- public GeodeContext(GeodeConnectorConfig connectorConfig) {
- clientCache = createClientCache(connectorConfig.getLocatorHostPorts(), connectorConfig.getDurableClientId(), connectorConfig.getDurableClientTimeout());
+ public GeodeContext() {
+ }
+
+ public ClientCache connectClient(List<LocatorHostPort> locatorHostPortList, String durableClientId, String durableClientTimeout) {
+ clientCache = createClientCache(locatorHostPortList, durableClientId, durableClientTimeout);
+ return clientCache;
+ }
+
+ public ClientCache connectClient(List<LocatorHostPort> locatorHostPortList) {
+ clientCache = createClientCache(locatorHostPortList, "", "");
+ return clientCache;
}
public ClientCache getClientCache() {
return clientCache;
}
+ /**
+ *
+ * @param locators
+ * @param durableClientName
+ * @param durableClientTimeOut
+ * @return
+ */
public ClientCache createClientCache(List<LocatorHostPort> locators, String durableClientName, String durableClientTimeOut) {
- ClientCacheFactory ccf = new ClientCacheFactory().set("durable-client-id", durableClientName)
- .set("durable-client-timeout", durableClientTimeOut)
- .setPoolSubscriptionEnabled(true);
+ ClientCacheFactory ccf = new ClientCacheFactory();
+ if (!durableClientName.equals("")) {
+ ccf.set("durable-client-id", durableClientName)
+ .set("durable-client-timeout", durableClientTimeOut);
+ }
+ //currently we only allow using the default pool.
+ //If we ever want to allow adding multiple pools we'll have to configure pool factories
+ ccf.setPoolSubscriptionEnabled(true);
+
for (LocatorHostPort locator: locators) {
- ccf.addPoolLocator(locator.getHostName(), locator.getPort()).create();
+ ccf.addPoolLocator(locator.getHostName(), locator.getPort());
}
return ccf.create();
}
diff --git a/src/main/java/geode/kafka/sink/BatchRecords.java b/src/main/java/geode/kafka/sink/BatchRecords.java
index 742dcbc..282ba1c 100644
--- a/src/main/java/geode/kafka/sink/BatchRecords.java
+++ b/src/main/java/geode/kafka/sink/BatchRecords.java
@@ -4,6 +4,7 @@
import org.apache.kafka.connect.sink.SinkRecord;
import java.util.ArrayList;
+import java.util.Collection;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
@@ -13,8 +14,18 @@
*/
public class BatchRecords {
- private Map updateMap = new HashMap();
- private List removeList = new ArrayList();
+ private Map updateMap;
+ private Collection removeList;
+
+ public BatchRecords() {
+ this(new HashMap(), new ArrayList());
+ }
+
+ /** Used for tests**/
+ public BatchRecords(Map updateMap, Collection removeList) {
+ this.updateMap = updateMap;
+ this.removeList = removeList;
+ }
public void addRemoveOperation(SinkRecord record) {
//if a previous operation added to the update map
diff --git a/src/main/java/geode/kafka/sink/GeodeKafkaSink.java b/src/main/java/geode/kafka/sink/GeodeKafkaSink.java
index 99f9b9d..43f8eab 100644
--- a/src/main/java/geode/kafka/sink/GeodeKafkaSink.java
+++ b/src/main/java/geode/kafka/sink/GeodeKafkaSink.java
@@ -11,19 +11,11 @@
import java.util.List;
import java.util.Map;
-import static geode.kafka.GeodeConnectorConfig.BATCH_SIZE;
-import static geode.kafka.GeodeConnectorConfig.CQ_PREFIX;
-import static geode.kafka.GeodeConnectorConfig.DEFAULT_BATCH_SIZE;
-import static geode.kafka.GeodeConnectorConfig.DEFAULT_CQ_PREFIX;
-import static geode.kafka.GeodeConnectorConfig.DEFAULT_DURABLE_CLIENT_ID;
-import static geode.kafka.GeodeConnectorConfig.DEFAULT_DURABLE_CLIENT_TIMEOUT;
import static geode.kafka.GeodeConnectorConfig.DEFAULT_LOCATOR;
-import static geode.kafka.GeodeConnectorConfig.DEFAULT_QUEUE_SIZE;
-import static geode.kafka.GeodeConnectorConfig.DURABLE_CLIENT_ID_PREFIX;
-import static geode.kafka.GeodeConnectorConfig.DURABLE_CLIENT_TIME_OUT;
import static geode.kafka.GeodeConnectorConfig.LOCATORS;
-import static geode.kafka.GeodeConnectorConfig.QUEUE_SIZE;
-import static geode.kafka.GeodeConnectorConfig.TOPIC_TO_REGION_BINDINGS;
+import static geode.kafka.GeodeSinkConnectorConfig.DEFAULT_NULL_VALUES_MEAN_REMOVE;
+import static geode.kafka.GeodeSinkConnectorConfig.NULL_VALUES_MEAN_REMOVE;
+import static geode.kafka.GeodeSinkConnectorConfig.TOPIC_TO_REGION_BINDINGS;
public class GeodeKafkaSink extends SinkConnector {
private static final ConfigDef CONFIG_DEF = new ConfigDef();
@@ -44,7 +36,7 @@
List<Map<String, String>> taskConfigs = new ArrayList<>();
Map<String, String> taskProps = new HashMap<>();
taskProps.putAll(sharedProps);
- List<String> bindings = GeodeConnectorConfig.parseNames(taskProps.get(TOPIC_TO_REGION_BINDINGS));
+ List<String> bindings = GeodeConnectorConfig.parseStringByComma(taskProps.get(TOPIC_TO_REGION_BINDINGS));
List<List<String>> bindingsPerTask = ConnectorUtils.groupPartitions(bindings, maxTasks);
for (int i = 0; i < maxTasks; i++) {
@@ -75,11 +67,7 @@
private Map<String, String> computeMissingConfigurations(Map<String, String> props) {
props.computeIfAbsent(LOCATORS, (key)-> DEFAULT_LOCATOR);
- props.computeIfAbsent(DURABLE_CLIENT_TIME_OUT, (key) -> DEFAULT_DURABLE_CLIENT_TIMEOUT);
- props.computeIfAbsent(DURABLE_CLIENT_ID_PREFIX, (key) -> DEFAULT_DURABLE_CLIENT_ID);
- props.computeIfAbsent(BATCH_SIZE, (key) -> DEFAULT_BATCH_SIZE);
- props.computeIfAbsent(QUEUE_SIZE, (key) -> DEFAULT_QUEUE_SIZE);
- props.computeIfAbsent(CQ_PREFIX, (key) -> DEFAULT_CQ_PREFIX);
+ props.computeIfAbsent(NULL_VALUES_MEAN_REMOVE, (key) -> DEFAULT_NULL_VALUES_MEAN_REMOVE);
return props;
}
}
diff --git a/src/main/java/geode/kafka/sink/GeodeKafkaSinkTask.java b/src/main/java/geode/kafka/sink/GeodeKafkaSinkTask.java
index 203192c..7f21134 100644
--- a/src/main/java/geode/kafka/sink/GeodeKafkaSinkTask.java
+++ b/src/main/java/geode/kafka/sink/GeodeKafkaSinkTask.java
@@ -1,7 +1,7 @@
package geode.kafka.sink;
-import geode.kafka.GeodeConnectorConfig;
import geode.kafka.GeodeContext;
+import geode.kafka.GeodeSinkConnectorConfig;
import org.apache.geode.cache.Region;
import org.apache.geode.cache.client.ClientRegionShortcut;
import org.apache.kafka.connect.sink.SinkRecord;
@@ -9,13 +9,13 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
+
/**
* TODO javaDoc
* Currently force 1 region per task
@@ -24,10 +24,10 @@
private static final Logger logger = LoggerFactory.getLogger(GeodeKafkaSinkTask.class);
- GeodeContext geodeContext;
- Map<String, List<String>> topicToRegions;
- Map<String, Region> regionNameToRegion;
- boolean nullValuesMeansRemove = true;
+ private GeodeContext geodeContext;
+ private Map<String, List<String>> topicToRegions;
+ private Map<String, Region> regionNameToRegion;
+ private boolean nullValuesMeansRemove = true;
/**
* {@inheritDoc}
@@ -41,11 +41,13 @@
@Override
public void start(Map<String, String> props) {
try {
- GeodeConnectorConfig geodeConnectorConfig = new GeodeConnectorConfig(props);
+ GeodeSinkConnectorConfig geodeConnectorConfig = new GeodeSinkConnectorConfig(props);
logger.debug("GeodeKafkaSourceTask id:" + geodeConnectorConfig.getTaskId() + " starting");
- geodeContext = new GeodeContext(geodeConnectorConfig);
+ geodeContext = new GeodeContext();
+ geodeContext.connectClient(geodeConnectorConfig.getLocatorHostPorts());
topicToRegions = geodeConnectorConfig.getTopicToRegions();
regionNameToRegion = createProxyRegions(topicToRegions.values());
+ nullValuesMeansRemove = geodeConnectorConfig.getNullValuesMeanRemove();
} catch (Exception e) {
e.printStackTrace();
logger.error("Unable to start sink task", e);
diff --git a/src/main/java/geode/kafka/sink/GeodeSinkConnectorConfig.java b/src/main/java/geode/kafka/sink/GeodeSinkConnectorConfig.java
new file mode 100644
index 0000000..7c6aa3e
--- /dev/null
+++ b/src/main/java/geode/kafka/sink/GeodeSinkConnectorConfig.java
@@ -0,0 +1,35 @@
+package geode.kafka;
+
+import java.util.List;
+import java.util.Map;
+
+public class GeodeSinkConnectorConfig extends GeodeConnectorConfig {
+ //Used by sink
+ public static final String TOPIC_TO_REGION_BINDINGS = "topicToRegion";
+ public static final String NULL_VALUES_MEAN_REMOVE = "nullValuesMeanRemove";
+ public static final String DEFAULT_NULL_VALUES_MEAN_REMOVE = "true";
+
+ private Map<String, List<String>> topicToRegions;
+ private final boolean nullValuesMeanRemove;
+
+ //just for tests
+ GeodeSinkConnectorConfig() {
+ super();
+ nullValuesMeanRemove = Boolean.parseBoolean(DEFAULT_NULL_VALUES_MEAN_REMOVE);
+ }
+
+ public GeodeSinkConnectorConfig(Map<String, String> connectorProperties) {
+ super(connectorProperties);
+ topicToRegions = parseTopicToRegions(connectorProperties.get(TOPIC_TO_REGION_BINDINGS));
+ nullValuesMeanRemove = Boolean.parseBoolean(connectorProperties.get(NULL_VALUES_MEAN_REMOVE));
+ }
+
+ public Map<String, List<String>> getTopicToRegions() {
+ return topicToRegions;
+ }
+
+ public boolean getNullValuesMeanRemove() {
+ return nullValuesMeanRemove;
+ }
+
+}
diff --git a/src/main/java/geode/kafka/source/GeodeKafkaSource.java b/src/main/java/geode/kafka/source/GeodeKafkaSource.java
index 91e6203..686359e 100644
--- a/src/main/java/geode/kafka/source/GeodeKafkaSource.java
+++ b/src/main/java/geode/kafka/source/GeodeKafkaSource.java
@@ -12,21 +12,21 @@
import java.util.List;
import java.util.Map;
-import static geode.kafka.GeodeConnectorConfig.BATCH_SIZE;
-import static geode.kafka.GeodeConnectorConfig.CQ_PREFIX;
-import static geode.kafka.GeodeConnectorConfig.DEFAULT_BATCH_SIZE;
-import static geode.kafka.GeodeConnectorConfig.DEFAULT_CQ_PREFIX;
-import static geode.kafka.GeodeConnectorConfig.DEFAULT_DURABLE_CLIENT_ID;
-import static geode.kafka.GeodeConnectorConfig.DEFAULT_DURABLE_CLIENT_TIMEOUT;
-import static geode.kafka.GeodeConnectorConfig.DEFAULT_LOAD_ENTIRE_REGION;
import static geode.kafka.GeodeConnectorConfig.DEFAULT_LOCATOR;
-import static geode.kafka.GeodeConnectorConfig.DEFAULT_QUEUE_SIZE;
-import static geode.kafka.GeodeConnectorConfig.DURABLE_CLIENT_ID_PREFIX;
-import static geode.kafka.GeodeConnectorConfig.DURABLE_CLIENT_TIME_OUT;
-import static geode.kafka.GeodeConnectorConfig.LOAD_ENTIRE_REGION;
import static geode.kafka.GeodeConnectorConfig.LOCATORS;
-import static geode.kafka.GeodeConnectorConfig.QUEUE_SIZE;
-import static geode.kafka.GeodeConnectorConfig.REGION_TO_TOPIC_BINDINGS;
+import static geode.kafka.source.GeodeSourceConnectorConfig.BATCH_SIZE;
+import static geode.kafka.source.GeodeSourceConnectorConfig.CQ_PREFIX;
+import static geode.kafka.source.GeodeSourceConnectorConfig.DEFAULT_BATCH_SIZE;
+import static geode.kafka.source.GeodeSourceConnectorConfig.DEFAULT_CQ_PREFIX;
+import static geode.kafka.source.GeodeSourceConnectorConfig.DEFAULT_DURABLE_CLIENT_ID;
+import static geode.kafka.source.GeodeSourceConnectorConfig.DEFAULT_DURABLE_CLIENT_TIMEOUT;
+import static geode.kafka.source.GeodeSourceConnectorConfig.DEFAULT_LOAD_ENTIRE_REGION;
+import static geode.kafka.source.GeodeSourceConnectorConfig.DEFAULT_QUEUE_SIZE;
+import static geode.kafka.source.GeodeSourceConnectorConfig.DURABLE_CLIENT_ID_PREFIX;
+import static geode.kafka.source.GeodeSourceConnectorConfig.DURABLE_CLIENT_TIME_OUT;
+import static geode.kafka.source.GeodeSourceConnectorConfig.LOAD_ENTIRE_REGION;
+import static geode.kafka.source.GeodeSourceConnectorConfig.QUEUE_SIZE;
+import static geode.kafka.source.GeodeSourceConnectorConfig.REGION_TO_TOPIC_BINDINGS;
public class GeodeKafkaSource extends SourceConnector {
@@ -47,7 +47,7 @@
Map<String, String> taskProps = new HashMap<>();
taskProps.putAll(sharedProps);
- List<String> bindings = GeodeConnectorConfig.parseNames(taskProps.get(REGION_TO_TOPIC_BINDINGS));
+ List<String> bindings = GeodeConnectorConfig.parseStringByComma(taskProps.get(REGION_TO_TOPIC_BINDINGS));
List<List<String>> bindingsPerTask = ConnectorUtils.groupPartitions(bindings, maxTasks);
for (int i = 0; i < maxTasks; i++) {
diff --git a/src/main/java/geode/kafka/source/GeodeKafkaSourceTask.java b/src/main/java/geode/kafka/source/GeodeKafkaSourceTask.java
index dadc8ba..c983a51 100644
--- a/src/main/java/geode/kafka/source/GeodeKafkaSourceTask.java
+++ b/src/main/java/geode/kafka/source/GeodeKafkaSourceTask.java
@@ -19,12 +19,9 @@
import java.util.concurrent.LinkedBlockingQueue;
import java.util.stream.Collectors;
-import static geode.kafka.GeodeConnectorConfig.BATCH_SIZE;
-import static geode.kafka.GeodeConnectorConfig.CQ_PREFIX;
-import static geode.kafka.GeodeConnectorConfig.DEFAULT_BATCH_SIZE;
-import static geode.kafka.GeodeConnectorConfig.LOAD_ENTIRE_REGION;
-import static geode.kafka.GeodeConnectorConfig.QUEUE_SIZE;
-import static geode.kafka.GeodeConnectorConfig.REGION_NAME;
+import static geode.kafka.source.GeodeSourceConnectorConfig.BATCH_SIZE;
+import static geode.kafka.source.GeodeSourceConnectorConfig.QUEUE_SIZE;
+import static org.apache.geode.pdx.internal.PeerTypeRegistration.REGION_NAME;
public class GeodeKafkaSourceTask extends SourceTask {
@@ -37,7 +34,7 @@
private static final Map<String, Long> OFFSET_DEFAULT = createOffset();
private GeodeContext geodeContext;
- private GeodeConnectorConfig geodeConnectorConfig;
+ private GeodeSourceConnectorConfig geodeConnectorConfig;
private Map<String, List<String>> regionToTopics;
private Map<String, Map<String, String>> sourcePartitions;
private BlockingQueue<GeodeEvent> eventBuffer;
@@ -58,9 +55,11 @@
@Override
public void start(Map<String, String> props) {
try {
- geodeConnectorConfig = new GeodeConnectorConfig(props);
+ System.out.println("JASON start task");
+ geodeConnectorConfig = new GeodeSourceConnectorConfig(props);
logger.debug("GeodeKafkaSourceTask id:" + geodeConnectorConfig.getTaskId() + " starting");
- geodeContext = new GeodeContext(geodeConnectorConfig);
+ geodeContext = new GeodeContext();
+ geodeContext.connectClient(geodeConnectorConfig.getLocatorHostPorts(), geodeConnectorConfig.getDurableClientId(), geodeConnectorConfig.getDurableClientTimeout());
batchSize = Integer.parseInt(props.get(BATCH_SIZE));
int queueSize = Integer.parseInt(props.get(QUEUE_SIZE));
@@ -69,14 +68,18 @@
regionToTopics = geodeConnectorConfig.getRegionToTopics();
sourcePartitions = createSourcePartitionsMap(regionToTopics.keySet());
- String cqPrefix = props.get(CQ_PREFIX);
- boolean loadEntireRegion = Boolean.parseBoolean(props.get(LOAD_ENTIRE_REGION));
+ String cqPrefix = geodeConnectorConfig.getCqPrefix();
+ boolean loadEntireRegion = geodeConnectorConfig.getLoadEntireRegion();
installOnGeode(geodeConnectorConfig, geodeContext, eventBuffer, cqPrefix, loadEntireRegion);
} catch (Exception e) {
+ System.out.println("JASON start task failed" + e);
+ e.printStackTrace();
logger.error("Unable to start source task", e);
throw e;
}
+ System.out.println("JASON end task");
+
}
@Override
@@ -102,7 +105,7 @@
geodeContext.getClientCache().close(true);
}
- void installOnGeode(GeodeConnectorConfig geodeConnectorConfig, GeodeContext geodeContext, BlockingQueue eventBuffer, String cqPrefix, boolean loadEntireRegion) {
+ void installOnGeode(GeodeSourceConnectorConfig geodeConnectorConfig, GeodeContext geodeContext, BlockingQueue eventBuffer, String cqPrefix, boolean loadEntireRegion) {
boolean isDurable = geodeConnectorConfig.isDurable();
int taskId = geodeConnectorConfig.getTaskId();
for (String region : geodeConnectorConfig.getRegionToTopics().keySet()) {
diff --git a/src/main/java/geode/kafka/source/GeodeSourceConnectorConfig.java b/src/main/java/geode/kafka/source/GeodeSourceConnectorConfig.java
new file mode 100644
index 0000000..c29048f
--- /dev/null
+++ b/src/main/java/geode/kafka/source/GeodeSourceConnectorConfig.java
@@ -0,0 +1,104 @@
+package geode.kafka.source;
+
+import geode.kafka.GeodeConnectorConfig;
+import geode.kafka.LocatorHostPort;
+
+import java.util.List;
+import java.util.Map;
+
+public class GeodeSourceConnectorConfig extends GeodeConnectorConfig {
+
+ //Geode Configuration
+ public static final String DURABLE_CLIENT_ID_PREFIX = "durableClientId";
+ public static final String DEFAULT_DURABLE_CLIENT_ID = "";
+ public static final String DURABLE_CLIENT_TIME_OUT = "durableClientTimeout";
+ public static final String DEFAULT_DURABLE_CLIENT_TIMEOUT = "60000";
+
+ public static final String CQ_PREFIX = "cqPrefix";
+ public static final String DEFAULT_CQ_PREFIX = "cqForGeodeKafka";
+
+ /**
+ * Used as a key for source partitions
+ */
+ public static final String REGION = "region";
+
+ public static final String REGION_TO_TOPIC_BINDINGS = "regionToTopic";
+
+ public static final String BATCH_SIZE = "geodeConnectorBatchSize";
+ public static final String DEFAULT_BATCH_SIZE = "100";
+
+ public static final String QUEUE_SIZE = "geodeConnectorQueueSize";
+ public static final String DEFAULT_QUEUE_SIZE = "100000";
+
+ public static final String LOAD_ENTIRE_REGION = "loadEntireRegion";
+ public static final String DEFAULT_LOAD_ENTIRE_REGION = "false";
+
+ private final String durableClientId;
+ private final String durableClientIdPrefix;
+ private final String durableClientTimeout;
+ private final String cqPrefix;
+ private final boolean loadEntireRegion;
+
+ private Map<String, List<String>> regionToTopics;
+
+ //just for tests
+ protected GeodeSourceConnectorConfig() {
+ super();
+ durableClientId = "";
+ durableClientIdPrefix = "";
+ durableClientTimeout = "0";
+ cqPrefix = DEFAULT_CQ_PREFIX;
+ loadEntireRegion = Boolean.parseBoolean(DEFAULT_LOAD_ENTIRE_REGION);
+ }
+
+ public GeodeSourceConnectorConfig(Map<String, String> connectorProperties) {
+ super(connectorProperties);
+ regionToTopics = parseRegionToTopics(connectorProperties.get(REGION_TO_TOPIC_BINDINGS));
+ durableClientIdPrefix = connectorProperties.get(DURABLE_CLIENT_ID_PREFIX);
+ if (isDurable(durableClientIdPrefix)) {
+ durableClientId = durableClientIdPrefix + taskId;
+ } else {
+ durableClientId = "";
+ }
+ durableClientTimeout = connectorProperties.get(DURABLE_CLIENT_TIME_OUT);
+ cqPrefix = connectorProperties.get(CQ_PREFIX);
+ loadEntireRegion = Boolean.parseBoolean(connectorProperties.get(LOAD_ENTIRE_REGION));
+ }
+
+ public boolean isDurable() {
+ return isDurable(durableClientId);
+ }
+
+ /**
+ * @param durableClientId or prefix can be passed in. Either both will be "" or both will have a value
+ * @return
+ */
+ boolean isDurable(String durableClientId) {
+ return !durableClientId.equals("");
+ }
+
+ public int getTaskId() {
+ return taskId;
+ }
+
+ public String getDurableClientId() {
+ return durableClientId;
+ }
+
+ public String getDurableClientTimeout() {
+ return durableClientTimeout;
+ }
+
+ public String getCqPrefix() {
+ return cqPrefix;
+ }
+
+ public boolean getLoadEntireRegion() {
+ return loadEntireRegion;
+ }
+
+ public Map<String, List<String>> getRegionToTopics() {
+ return regionToTopics;
+ }
+
+}
diff --git a/src/test/java/geode/kafka/GeodeConnectorConfigTest.java b/src/test/java/geode/kafka/GeodeConnectorConfigTest.java
index 6a39c5d..904b981 100644
--- a/src/test/java/geode/kafka/GeodeConnectorConfigTest.java
+++ b/src/test/java/geode/kafka/GeodeConnectorConfigTest.java
@@ -10,8 +10,6 @@
import java.util.List;
import java.util.Map;
-import static geode.kafka.GeodeConnectorConfig.DURABLE_CLIENT_ID_PREFIX;
-import static geode.kafka.GeodeConnectorConfig.DURABLE_CLIENT_TIME_OUT;
import static geode.kafka.GeodeConnectorConfig.LOCATORS;
import static geode.kafka.GeodeConnectorConfig.TASK_ID;
import static org.hamcrest.CoreMatchers.allOf;
@@ -26,7 +24,7 @@
@Test
public void parseRegionNamesShouldSplitOnComma() {
GeodeConnectorConfig config = new GeodeConnectorConfig();
- List<String> regionNames = config.parseNames("region1,region2,region3,region4");
+ List<String> regionNames = config.parseStringByComma("region1,region2,region3,region4");
assertEquals(4, regionNames.size());
assertThat(true, allOf(is(regionNames.contains("region1"))
, is(regionNames.contains("region2"))
@@ -37,7 +35,7 @@
@Test
public void parseRegionNamesShouldChomp() {
GeodeConnectorConfig config = new GeodeConnectorConfig();
- List<String> regionNames = config.parseNames("region1, region2, region3,region4");
+ List<String> regionNames = config.parseStringByComma("region1, region2, region3,region4");
assertEquals(4, regionNames.size());
assertThat(true, allOf(is(regionNames instanceof List)
, is(regionNames.contains("region1"))
@@ -135,14 +133,5 @@
*/
- @Test
- public void durableClientIdShouldNotBeSetIfPrefixIsEmpty() {
- Map<String, String> props = new HashMap<>();
- props.put(TASK_ID, "0");
- props.put(DURABLE_CLIENT_ID_PREFIX, "");
- props.put(LOCATORS, "localhost[10334]");
- GeodeConnectorConfig config = new GeodeConnectorConfig(props);
- assertEquals("", config.getDurableClientId());
- }
}
diff --git a/src/test/java/geode/kafka/WorkerAndHerderWrapper.java b/src/test/java/geode/kafka/WorkerAndHerderWrapper.java
index 5e8f074..1b46fe0 100644
--- a/src/test/java/geode/kafka/WorkerAndHerderWrapper.java
+++ b/src/test/java/geode/kafka/WorkerAndHerderWrapper.java
@@ -18,12 +18,11 @@
import java.util.HashMap;
import java.util.Map;
-import static geode.kafka.GeodeConnectorConfig.REGION_TO_TOPIC_BINDINGS;
-import static geode.kafka.GeodeConnectorConfig.TOPICS;
-import static geode.kafka.GeodeConnectorConfig.TOPIC_TO_REGION_BINDINGS;
import static geode.kafka.GeodeKafkaTestCluster.TEST_REGION_TO_TOPIC_BINDINGS;
import static geode.kafka.GeodeKafkaTestCluster.TEST_TOPIC_FOR_SINK;
import static geode.kafka.GeodeKafkaTestCluster.TEST_TOPIC_TO_REGION_BINDINGS;
+import static geode.kafka.GeodeSinkConnectorConfig.TOPIC_TO_REGION_BINDINGS;
+import static geode.kafka.source.GeodeSourceConnectorConfig.REGION_TO_TOPIC_BINDINGS;
public class WorkerAndHerderWrapper {
@@ -34,9 +33,6 @@
// fast flushing for testing.
props.put(WorkerConfig.OFFSET_COMMIT_INTERVAL_MS_CONFIG, "10");
-
- props.put(WorkerConfig.INTERNAL_KEY_CONVERTER_CLASS_CONFIG, "org.apache.kafka.connect.storage.StringConverter");
- props.put(WorkerConfig.INTERNAL_VALUE_CONVERTER_CLASS_CONFIG, "org.apache.kafka.connect.storage.StringConverter");
props.put("internal.key.converter.schemas.enable", "false");
props.put("internal.value.converter.schemas.enable", "false");
props.put(WorkerConfig.KEY_CONVERTER_CLASS_CONFIG, "org.apache.kafka.connect.storage.StringConverter");
@@ -71,7 +67,7 @@
sinkProps.put(ConnectorConfig.NAME_CONFIG, "geode-kafka-sink-connector");
sinkProps.put(ConnectorConfig.TASKS_MAX_CONFIG, "1");
sinkProps.put(TOPIC_TO_REGION_BINDINGS, TEST_TOPIC_TO_REGION_BINDINGS);
- sinkProps.put(TOPICS, TEST_TOPIC_FOR_SINK);
+ sinkProps.put("topics", TEST_TOPIC_FOR_SINK);
herder.putConnectorConfig(
sinkProps.get(ConnectorConfig.NAME_CONFIG),
diff --git a/src/test/java/geode/kafka/sink/BatchRecordsTest.java b/src/test/java/geode/kafka/sink/BatchRecordsTest.java
index 4907d74..593fea0 100644
--- a/src/test/java/geode/kafka/sink/BatchRecordsTest.java
+++ b/src/test/java/geode/kafka/sink/BatchRecordsTest.java
@@ -1,37 +1,82 @@
package geode.kafka.sink;
+import org.apache.geode.cache.Region;
+import org.apache.kafka.connect.sink.SinkRecord;
import org.junit.Test;
+import java.util.Collection;
+import java.util.Map;
+
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
public class BatchRecordsTest {
@Test
public void updatingARecordShouldRemoveFromTheRemoveListIfNullValuesIsRemoveBooleanIsSet() {
-
+ Map updates = mock(Map.class);
+ Collection removes = mock(Collection.class);
+ when(removes.contains(any())).thenReturn(true);
+ BatchRecords records = new BatchRecords(updates, removes);
+ SinkRecord sinkRecord = mock(SinkRecord.class);
+ records.addUpdateOperation(sinkRecord, true);
+ verify(removes, times(1)).remove(any());
}
@Test
public void updatingARecordShouldAddToTheUpdateMap() {
-
+ Map updates = mock(Map.class);
+ Collection removes = mock(Collection.class);
+ when(removes.contains(any())).thenReturn(false);
+ BatchRecords records = new BatchRecords(updates, removes);
+ SinkRecord sinkRecord = mock(SinkRecord.class);
+ records.addUpdateOperation(sinkRecord, true);
+ verify(updates, times(1)).put(any(), any());
}
@Test
public void updatingARecordShouldNotRemoveFromTheRemoveListIfNullValuesIsNotSet() {
-
+ boolean nullValuesMeanRemove = false;
+ Map updates = mock(Map.class);
+ Collection removes = mock(Collection.class);
+ when(removes.contains(any())).thenReturn(true);
+ BatchRecords records = new BatchRecords(updates, removes);
+ SinkRecord sinkRecord = mock(SinkRecord.class);
+ records.addUpdateOperation(sinkRecord, nullValuesMeanRemove);
+ verify(removes, times(0)).remove(any());
}
@Test
- public void removingARecordShouldRemoveFromTheUpdateMap() {
-
+ public void removingARecordShouldRemoveFromTheUpdateMapIfKeyIsPresent() {
+ Map updates = mock(Map.class);
+ Collection removes = mock(Collection.class);
+ when(updates.containsKey(any())).thenReturn(true);
+ BatchRecords records = new BatchRecords(updates, removes);
+ SinkRecord sinkRecord = mock(SinkRecord.class);
+ records.addRemoveOperation(sinkRecord);
+ verify(updates, times(1)).remove(any());
}
@Test
public void removingARecordAddToTheRemoveCollection() {
-
+ Map updates = mock(Map.class);
+ Collection removes = mock(Collection.class);
+ BatchRecords records = new BatchRecords(updates, removes);
+ SinkRecord sinkRecord = mock(SinkRecord.class);
+ records.addRemoveOperation(sinkRecord);
+ verify(removes, times(1)).add(any());
}
@Test
public void executeOperationsShouldInvokePutAllAndRemoveAll() {
-
+ Region region = mock(Region.class);
+ BatchRecords records = new BatchRecords();
+ records.executeOperations(region);
+ verify(region, times(1)).putAll(any());
+ verify(region, times(1)).removeAll(any());
}
diff --git a/src/test/java/geode/kafka/sink/GeodeKafkaSinkTaskTest.java b/src/test/java/geode/kafka/sink/GeodeKafkaSinkTaskTest.java
new file mode 100644
index 0000000..37131ba
--- /dev/null
+++ b/src/test/java/geode/kafka/sink/GeodeKafkaSinkTaskTest.java
@@ -0,0 +1,4 @@
+package geode.kafka.sink;
+
+public class GeodeKafkaSinkTaskTest {
+}
diff --git a/src/test/java/geode/kafka/sink/GeodeKafkaSinkTest.java b/src/test/java/geode/kafka/sink/GeodeKafkaSinkTest.java
new file mode 100644
index 0000000..2720e90
--- /dev/null
+++ b/src/test/java/geode/kafka/sink/GeodeKafkaSinkTest.java
@@ -0,0 +1,16 @@
+package geode.kafka.sink;
+
+import org.junit.Test;
+
+import java.util.HashMap;
+import java.util.Map;
+
+public class GeodeKafkaSinkTest {
+
+ @Test
+ public void test() {
+ GeodeKafkaSink sink = new GeodeKafkaSink();
+ Map<String, String> props = new HashMap();
+ sink.start(props);
+ }
+}
diff --git a/src/test/java/geode/kafka/source/GeodeKafkaSourceTaskTest.java b/src/test/java/geode/kafka/source/GeodeKafkaSourceTaskTest.java
index d4149db..b793d30 100644
--- a/src/test/java/geode/kafka/source/GeodeKafkaSourceTaskTest.java
+++ b/src/test/java/geode/kafka/source/GeodeKafkaSourceTaskTest.java
@@ -1,6 +1,5 @@
package geode.kafka.source;
-import geode.kafka.GeodeConnectorConfig;
import geode.kafka.GeodeContext;
import org.apache.geode.cache.client.ClientCache;
import org.apache.geode.cache.query.CqEvent;
@@ -15,8 +14,8 @@
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
-import static geode.kafka.GeodeConnectorConfig.DEFAULT_CQ_PREFIX;
-import static geode.kafka.GeodeConnectorConfig.REGION_NAME;
+import static geode.kafka.source.GeodeSourceConnectorConfig.DEFAULT_CQ_PREFIX;
+import static org.apache.geode.pdx.internal.PeerTypeRegistration.REGION_NAME;
import static org.hamcrest.CoreMatchers.is;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertThat;
@@ -88,7 +87,7 @@
GeodeContext geodeContext = mock(GeodeContext.class);
when(geodeContext.getClientCache()).thenReturn(clientCache);
- GeodeConnectorConfig config = mock(GeodeConnectorConfig.class);
+ GeodeSourceConnectorConfig config = mock(GeodeSourceConnectorConfig.class);
when (config.isDurable()).thenReturn(true);
GeodeKafkaSourceTask task = new GeodeKafkaSourceTask();
task.installOnGeode(config, geodeContext, null, "", false);
@@ -105,7 +104,7 @@
Map<String, List<String>> regionToTopicsMap = new HashMap<>();
regionToTopicsMap.put("region1", new ArrayList());
- GeodeConnectorConfig config = mock(GeodeConnectorConfig.class);
+ GeodeSourceConnectorConfig config = mock(GeodeSourceConnectorConfig.class);
when (config.getRegionToTopics()).thenReturn(regionToTopicsMap);
GeodeKafkaSourceTask task = new GeodeKafkaSourceTask();
@@ -123,7 +122,7 @@
Map<String, List<String>> regionToTopicsMap = new HashMap<>();
regionToTopicsMap.put("region1", new ArrayList());
- GeodeConnectorConfig config = mock(GeodeConnectorConfig.class);
+ GeodeSourceConnectorConfig config = mock(GeodeSourceConnectorConfig.class);
when (config.getRegionToTopics()).thenReturn(regionToTopicsMap);
GeodeKafkaSourceTask task = new GeodeKafkaSourceTask();
@@ -138,7 +137,7 @@
GeodeContext geodeContext = mock(GeodeContext.class);
when(geodeContext.getClientCache()).thenReturn(clientCache);
- GeodeConnectorConfig config = mock(GeodeConnectorConfig.class);
+ GeodeSourceConnectorConfig config = mock(GeodeSourceConnectorConfig.class);
when (config.isDurable()).thenReturn(false);
GeodeKafkaSourceTask task = new GeodeKafkaSourceTask();
task.installOnGeode(config, geodeContext, null, "", false);
diff --git a/src/test/java/geode/kafka/source/GeodeSourceConnectorConfigTest.java b/src/test/java/geode/kafka/source/GeodeSourceConnectorConfigTest.java
new file mode 100644
index 0000000..629c07d
--- /dev/null
+++ b/src/test/java/geode/kafka/source/GeodeSourceConnectorConfigTest.java
@@ -0,0 +1,26 @@
+package geode.kafka.source;
+
+import geode.kafka.GeodeConnectorConfig;
+import org.junit.Test;
+
+import java.util.HashMap;
+import java.util.Map;
+
+import static geode.kafka.GeodeConnectorConfig.LOCATORS;
+import static geode.kafka.GeodeConnectorConfig.TASK_ID;
+import static geode.kafka.source.GeodeSourceConnectorConfig.DURABLE_CLIENT_ID_PREFIX;
+import static org.junit.Assert.assertEquals;
+
+public class GeodeSourceConnectorConfigTest {
+
+ @Test
+ public void durableClientIdShouldNotBeSetIfPrefixIsEmpty() {
+ Map<String, String> props = new HashMap<>();
+ props.put(TASK_ID, "0");
+ props.put(DURABLE_CLIENT_ID_PREFIX, "");
+ props.put(LOCATORS, "localhost[10334]");
+ GeodeSourceConnectorConfig config = new GeodeSourceConnectorConfig(props);
+ assertEquals("", config.getDurableClientId());
+ }
+
+}