Reworked region to topic and topic to region parsing
Tasks are assigned region to topic or topic to region groups
A task can handle multiple regions or multiple topics based on configuration
diff --git a/build.gradle b/build.gradle
index 515967d..a131d22 100644
--- a/build.gradle
+++ b/build.gradle
@@ -27,6 +27,7 @@
testCompile group: 'junit', name: 'junit', version: '4.12'
testCompile 'org.mockito:mockito-core:3.2.4'
+ testCompile 'pl.pragmatists:JUnitParams:1.1.1'
testImplementation 'org.awaitility:awaitility:4.0.2'
}
diff --git a/src/main/java/geode/kafka/GeodeConnectorConfig.java b/src/main/java/geode/kafka/GeodeConnectorConfig.java
index 11eff62..dcc479e 100644
--- a/src/main/java/geode/kafka/GeodeConnectorConfig.java
+++ b/src/main/java/geode/kafka/GeodeConnectorConfig.java
@@ -25,16 +25,19 @@
public static final String LOCATORS = "locators";
public static final String DEFAULT_LOCATOR = "localhost[10334]";
- /**
- * Specifies which Regions to connect in Apache Geode
- */
- public static final String REGIONS = "regions";
/**
- * Specifies which Topics to connect in Kafka
+ * Specifies which Topics to connect in Kafka, uses the variable name with Kafka Sink Configuration
+ * Only used in sink configuration
*/
public static final String TOPICS = "topics";
+ //Used by sink
+ public static final String TOPIC_TO_REGION_BINDINGS = "topicToRegion";
+
+ //Used by source
+ public static final String REGION_TO_TOPIC_BINDINGS = "regionToTopic";
+
/**
* Property to describe the Source Partition in a record
*/
@@ -54,8 +57,9 @@
private final String durableClientId;
private final String durableClientIdPrefix;
private final String durableClientTimeout;
- private List<String> regionNames;
- private List<String> topics;
+
+ private Map<String, List<String>> regionToTopics;
+ private Map<String, List<String>> topicToRegions;
private List<LocatorHostPort> locatorHostPorts;
//just for tests
@@ -71,22 +75,59 @@
durableClientIdPrefix = connectorProperties.get(DURABLE_CLIENT_ID_PREFIX);
if (isDurable(durableClientIdPrefix)) {
durableClientId = durableClientIdPrefix + taskId;
- }
- else {
+ } else {
durableClientId = "";
}
durableClientTimeout = connectorProperties.get(DURABLE_CLIENT_TIME_OUT);
- regionNames = parseNames(connectorProperties.get(GeodeConnectorConfig.REGIONS));
- topics = parseNames(connectorProperties.get(GeodeConnectorConfig.TOPICS));
+ regionToTopics = parseRegionToTopics(connectorProperties.get(GeodeConnectorConfig.REGION_TO_TOPIC_BINDINGS));
+ topicToRegions = parseTopicToRegions(connectorProperties.get(GeodeConnectorConfig.TOPIC_TO_REGION_BINDINGS));
locatorHostPorts = parseLocators(connectorProperties.get(GeodeConnectorConfig.LOCATORS));
}
+
+ public static Map<String, List<String>> parseTopicToRegions(String combinedBindings) {
+ //It's the same formatting, so parsing is the same going topic to region or region to topic
+ return parseRegionToTopics(combinedBindings);
+ }
+
+ /**
+ * Given a string of the form [region:topic,...] will produce a map where the key is the
+ * regionName and the value is a list of topicNames to push values to
+ *
+ * @param combinedBindings a string with similar form to "[region:topic,...], [region2:topic2,...]
+ * @return mapping of regionName to list of topics to update
+ */
+ public static Map<String, List<String>> parseRegionToTopics(String combinedBindings) {
+ if (combinedBindings == "" || combinedBindings == null){
+ return null;
+ }
+ List<String> bindings = parseBindings(combinedBindings);
+ return bindings.stream().map(binding -> {
+ String[] regionToTopicsArray = parseBinding(binding);
+ return regionToTopicsArray;
+ }).collect(Collectors.toMap(regionToTopicsArray -> regionToTopicsArray[0], regionToTopicsArray -> parseNames(regionToTopicsArray[1])));
+ }
+
+ public static List<String> parseBindings(String bindings) {
+ return Arrays.stream(bindings.split("](\\s)*,")).map((s) -> {
+ s = s.replaceAll("\\[", "");
+ s = s.replaceAll("\\]", "");
+ s = s.trim();
+ return s;
+ }).collect(Collectors.toList());
+ }
+
+ private static String[] parseBinding(String binding) {
+ return binding.split(":");
+ }
+
+ //Used to parse a string of topics or regions
public static List<String> parseNames(String names) {
return Arrays.stream(names.split(",")).map((s) -> s.trim()).collect(Collectors.toList());
}
public static String reconstructString(Collection<String> strings) {
- return strings.stream().collect(Collectors.joining(","));
+ return strings.stream().collect(Collectors.joining("],[")) + "]";
}
List<LocatorHostPort> parseLocators(String locators) {
@@ -115,7 +156,6 @@
return !durableClientId.equals("");
}
-
public int getTaskId() {
return taskId;
}
@@ -128,15 +168,16 @@
return durableClientTimeout;
}
- public List<String> getRegionNames() {
- return regionNames;
- }
-
- public List<String> getTopics() {
- return topics;
- }
-
public List<LocatorHostPort> getLocatorHostPorts() {
return locatorHostPorts;
}
+
+ public Map<String, List<String>> getRegionToTopics() {
+ return regionToTopics;
+ }
+
+ public Map<String, List<String>> getTopicToRegions() {
+ return topicToRegions;
+ }
+
}
diff --git a/src/main/java/geode/kafka/sink/GeodeKafkaSink.java b/src/main/java/geode/kafka/sink/GeodeKafkaSink.java
index c7afb96..99f9b9d 100644
--- a/src/main/java/geode/kafka/sink/GeodeKafkaSink.java
+++ b/src/main/java/geode/kafka/sink/GeodeKafkaSink.java
@@ -23,8 +23,7 @@
import static geode.kafka.GeodeConnectorConfig.DURABLE_CLIENT_TIME_OUT;
import static geode.kafka.GeodeConnectorConfig.LOCATORS;
import static geode.kafka.GeodeConnectorConfig.QUEUE_SIZE;
-import static geode.kafka.GeodeConnectorConfig.REGIONS;
-import static geode.kafka.GeodeConnectorConfig.TOPICS;
+import static geode.kafka.GeodeConnectorConfig.TOPIC_TO_REGION_BINDINGS;
public class GeodeKafkaSink extends SinkConnector {
private static final ConfigDef CONFIG_DEF = new ConfigDef();
@@ -45,17 +44,12 @@
List<Map<String, String>> taskConfigs = new ArrayList<>();
Map<String, String> taskProps = new HashMap<>();
taskProps.putAll(sharedProps);
-
- List<String> topics = GeodeConnectorConfig.parseNames(taskProps.get(TOPICS));
- List<List<String>> topicsPerTask = ConnectorUtils.groupPartitions(topics, maxTasks);
-
- List<String> regions = GeodeConnectorConfig.parseNames(taskProps.get(REGIONS));
- List<List<String>> regionsPerTask = ConnectorUtils.groupPartitions(regions, maxTasks);
+ List<String> bindings = GeodeConnectorConfig.parseNames(taskProps.get(TOPIC_TO_REGION_BINDINGS));
+ List<List<String>> bindingsPerTask = ConnectorUtils.groupPartitions(bindings, maxTasks);
for (int i = 0; i < maxTasks; i++) {
taskProps.put(GeodeConnectorConfig.TASK_ID, "" + i);
- taskProps.put(TOPICS, GeodeConnectorConfig.reconstructString(topicsPerTask.get(i)));
- taskProps.put(REGIONS, GeodeConnectorConfig.reconstructString(regionsPerTask.get(i)));
+ taskProps.put(TOPIC_TO_REGION_BINDINGS, GeodeConnectorConfig.reconstructString(bindingsPerTask.get(i)));
taskConfigs.add(taskProps);
}
diff --git a/src/main/java/geode/kafka/source/GeodeKafkaSource.java b/src/main/java/geode/kafka/source/GeodeKafkaSource.java
index ca3f2c8..91e6203 100644
--- a/src/main/java/geode/kafka/source/GeodeKafkaSource.java
+++ b/src/main/java/geode/kafka/source/GeodeKafkaSource.java
@@ -26,8 +26,7 @@
import static geode.kafka.GeodeConnectorConfig.LOAD_ENTIRE_REGION;
import static geode.kafka.GeodeConnectorConfig.LOCATORS;
import static geode.kafka.GeodeConnectorConfig.QUEUE_SIZE;
-import static geode.kafka.GeodeConnectorConfig.REGIONS;
-import static geode.kafka.GeodeConnectorConfig.TOPICS;
+import static geode.kafka.GeodeConnectorConfig.REGION_TO_TOPIC_BINDINGS;
public class GeodeKafkaSource extends SourceConnector {
@@ -48,16 +47,12 @@
Map<String, String> taskProps = new HashMap<>();
taskProps.putAll(sharedProps);
- List<String> topics = GeodeConnectorConfig.parseNames(taskProps.get(TOPICS));
- List<List<String>> topicsPerTask = ConnectorUtils.groupPartitions(topics, maxTasks);
-
- List<String> regions = GeodeConnectorConfig.parseNames(taskProps.get(REGIONS));
- List<List<String>> regionsPerTask = ConnectorUtils.groupPartitions(regions, maxTasks);
+ List<String> bindings = GeodeConnectorConfig.parseNames(taskProps.get(REGION_TO_TOPIC_BINDINGS));
+ List<List<String>> bindingsPerTask = ConnectorUtils.groupPartitions(bindings, maxTasks);
for (int i = 0; i < maxTasks; i++) {
taskProps.put(GeodeConnectorConfig.TASK_ID, "" + i);
- taskProps.put(TOPICS, GeodeConnectorConfig.reconstructString(topicsPerTask.get(i)));
- taskProps.put(REGIONS, GeodeConnectorConfig.reconstructString(regionsPerTask.get(i)));
+ taskProps.put(REGION_TO_TOPIC_BINDINGS, GeodeConnectorConfig.reconstructString(bindingsPerTask.get(i)));
taskConfigs.add(taskProps);
}
return taskConfigs;
diff --git a/src/main/java/geode/kafka/source/GeodeKafkaSourceTask.java b/src/main/java/geode/kafka/source/GeodeKafkaSourceTask.java
index 8080179..dadc8ba 100644
--- a/src/main/java/geode/kafka/source/GeodeKafkaSourceTask.java
+++ b/src/main/java/geode/kafka/source/GeodeKafkaSourceTask.java
@@ -38,7 +38,7 @@
private GeodeContext geodeContext;
private GeodeConnectorConfig geodeConnectorConfig;
- private List<String> topics;
+ private Map<String, List<String>> regionToTopics;
private Map<String, Map<String, String>> sourcePartitions;
private BlockingQueue<GeodeEvent> eventBuffer;
private int batchSize;
@@ -55,12 +55,6 @@
return null;
}
- void startForTesting(BlockingQueue eventBuffer, List<String> topics, int batchSize) {
- this.eventBuffer = eventBuffer;
- this.topics = topics;
- this.batchSize = batchSize;
- }
-
@Override
public void start(Map<String, String> props) {
try {
@@ -72,15 +66,14 @@
int queueSize = Integer.parseInt(props.get(QUEUE_SIZE));
eventBuffer = new LinkedBlockingQueue<>(queueSize);
- sourcePartitions = createSourcePartitionsMap(geodeConnectorConfig.getRegionNames());
- topics = geodeConnectorConfig.getTopics();
+ regionToTopics = geodeConnectorConfig.getRegionToTopics();
+ sourcePartitions = createSourcePartitionsMap(regionToTopics.keySet());
String cqPrefix = props.get(CQ_PREFIX);
boolean loadEntireRegion = Boolean.parseBoolean(props.get(LOAD_ENTIRE_REGION));
installOnGeode(geodeConnectorConfig, geodeContext, eventBuffer, cqPrefix, loadEntireRegion);
- }
- catch (Exception e) {
+ } catch (Exception e) {
logger.error("Unable to start source task", e);
throw e;
}
@@ -92,8 +85,10 @@
ArrayList<GeodeEvent> events = new ArrayList<>(batchSize);
if (eventBuffer.drainTo(events, batchSize) > 0) {
for (GeodeEvent event : events) {
+ String regionName = event.getRegionName();
+ List<String> topics = regionToTopics.get(regionName);
for (String topic : topics) {
- records.add(new SourceRecord(sourcePartitions.get(event.getRegionName()), OFFSET_DEFAULT, topic, null, event.getEvent().getNewValue()));
+ records.add(new SourceRecord(sourcePartitions.get(regionName), OFFSET_DEFAULT, topic, null, event.getEvent().getNewValue()));
}
}
return records;
@@ -108,9 +103,9 @@
}
void installOnGeode(GeodeConnectorConfig geodeConnectorConfig, GeodeContext geodeContext, BlockingQueue eventBuffer, String cqPrefix, boolean loadEntireRegion) {
- boolean isDurable = geodeConnectorConfig.isDurable();
- int taskId = geodeConnectorConfig.getTaskId();
- for (String region : geodeConnectorConfig.getRegionNames()) {
+ boolean isDurable = geodeConnectorConfig.isDurable();
+ int taskId = geodeConnectorConfig.getTaskId();
+ for (String region : geodeConnectorConfig.getRegionToTopics().keySet()) {
installListenersToRegion(geodeContext, taskId, eventBuffer, region, cqPrefix, loadEntireRegion, isDurable);
}
if (isDurable) {
@@ -132,8 +127,7 @@
geodeContext.newCq(generateCqName(taskId, cqPrefix, regionName), "select * from /" + regionName, cqAttributes,
isDurable);
}
- }
- finally {
+ } finally {
listener.signalInitialResultsLoaded();
}
return listener;
@@ -145,7 +139,7 @@
* @param regionNames list of regionNames
* @return Map<String, Map < String, String>> a map of source partitions, keyed by region name
*/
- Map<String, Map<String, String>> createSourcePartitionsMap(List<String> regionNames) {
+ Map<String, Map<String, String>> createSourcePartitionsMap(Collection<String> regionNames) {
return regionNames.stream().map(regionName -> {
Map<String, String> sourcePartition = new HashMap<>();
sourcePartition.put(REGION_NAME, regionName);
diff --git a/src/test/java/geode/kafka/GeodeConnectorConfigTest.java b/src/test/java/geode/kafka/GeodeConnectorConfigTest.java
index 8f06f61..6a39c5d 100644
--- a/src/test/java/geode/kafka/GeodeConnectorConfigTest.java
+++ b/src/test/java/geode/kafka/GeodeConnectorConfigTest.java
@@ -1,16 +1,26 @@
package geode.kafka;
+import junitparams.JUnitParamsRunner;
+import junitparams.Parameters;
import org.junit.Test;
+import org.junit.runner.RunWith;
+import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
+import static geode.kafka.GeodeConnectorConfig.DURABLE_CLIENT_ID_PREFIX;
+import static geode.kafka.GeodeConnectorConfig.DURABLE_CLIENT_TIME_OUT;
+import static geode.kafka.GeodeConnectorConfig.LOCATORS;
+import static geode.kafka.GeodeConnectorConfig.TASK_ID;
import static org.hamcrest.CoreMatchers.allOf;
import static org.hamcrest.CoreMatchers.is;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertThat;
+import static org.junit.Assert.assertTrue;
+@RunWith(JUnitParamsRunner.class)
public class GeodeConnectorConfigTest {
@Test
@@ -45,14 +55,94 @@
}
@Test
- public void durableClientIdShouldNotBeSetIfPropertyIsNotSet() {
+ @Parameters(method="oneToOneBindings")
+ public void parseBindingsCanSplitOneToOneBindings(String value) {
+ List<String> splitBindings = GeodeConnectorConfig.parseBindings(value);
+ assertEquals(2, splitBindings.size());
+ }
+
+ @Test
+ public void parseBindingsCanSplitASingleOneToOneBindings() {
+ String binding = "[region1:topic1]";
+ List<String> splitBindings = GeodeConnectorConfig.parseBindings(binding);
+ assertEquals(1, splitBindings.size());
+ assertEquals(binding.replaceAll("\\[", "").replaceAll("\\]", ""), splitBindings.get(0));
+ }
+
+ public List<String> oneToOneBindings() {
+ return Arrays.asList(new String[]{"[region1:topic1],[region2:topic2]"
+ ,"[region1:topic1] , [region2:topic2]"
+ ,"[region1:topic1], [region2:topic2] ,"
+ ,"[region1: topic1], [region2 :topic2]"});
+ }
+
+ @Test
+ @Parameters(method="oneToManyBindings")
+ public void parseBindingsCanSplitOneToManyBindings(String value) {
+ List<String> splitBindings = GeodeConnectorConfig.parseBindings(value);
+ assertEquals(Arrays.toString(splitBindings.toArray()), 2, splitBindings.size());
+ }
+
+ public List<String> oneToManyBindings() {
+ return Arrays.asList(new String[]{"[region1:topic1,topic2],[region2:topic2,topic3]"
+ ,"[region1:topic1 , topic2] , [region2:topic2 , topic3]"
+ ,"[region1:topic1 ,], [region2:topic2 ,] ,"
+ ,"[region1: topic1 ,topic3], [region2 :topic2]"});
+ }
+
+
+ @Test
+ @Parameters(method="oneToManyBindings")
+ public void reconstructBindingsToStringShouldReformAParsableString(String value) {
+ List<String> splitBindings = GeodeConnectorConfig.parseBindings(value);
+ String reconstructString = GeodeConnectorConfig.reconstructString(splitBindings);
+ splitBindings = GeodeConnectorConfig.parseBindings(reconstructString);
+ assertEquals(Arrays.toString(splitBindings.toArray()), 2, splitBindings.size());
+ }
+
+ @Test
+ @Parameters(method="oneToOneBindings")
+ public void configurationShouldReturnRegionToTopicsMappingWhenParseRegionToTopics(String value) {
+ Map<String, List<String>> regionToTopics = GeodeConnectorConfig.parseRegionToTopics(value);
+ assertEquals(2, regionToTopics.size());
+ assertTrue(regionToTopics.get("region1") != null);
+ assertEquals(1, regionToTopics.get("region1").size());
+ assertTrue(regionToTopics.get("region1").contains("topic1"));
+ }
+
+ @Test
+ public void regionToTopicParsingShouldParseCorrectlyWithASingleBinding() {
+ Map<String, List<String>> regionToTopics = GeodeConnectorConfig.parseRegionToTopics("[region1:topic1]");
+ assertTrue(regionToTopics.get("region1") != null);
+ assertEquals(1, regionToTopics.get("region1").size());
+ assertTrue(regionToTopics.get("region1").contains("topic1"));
+ }
+
+
+ /*
+ taskId = Integer.parseInt(connectorProperties.get(TASK_ID));
+ durableClientIdPrefix = connectorProperties.get(DURABLE_CLIENT_ID_PREFIX);
+ if (isDurable(durableClientIdPrefix)) {
+ durableClientId = durableClientIdPrefix + taskId;
+ } else {
+ durableClientId = "";
+ }
+ durableClientTimeout = connectorProperties.get(DURABLE_CLIENT_TIME_OUT);
+ regionToTopics = parseRegionToTopics(connectorProperties.get(GeodeConnectorConfig.REGION_TO_TOPIC_BINDINGS));
+ topicToRegions = parseTopicToRegions(connectorProperties.get(GeodeConnectorConfig.REGION_TO_TOPIC_BINDINGS));
+ locatorHostPorts = parseLocators(connectorProperties.get(GeodeConnectorConfig.LOCATORS));
+
+ */
+
+
+ @Test
+ public void durableClientIdShouldNotBeSetIfPrefixIsEmpty() {
Map<String, String> props = new HashMap<>();
+ props.put(TASK_ID, "0");
+ props.put(DURABLE_CLIENT_ID_PREFIX, "");
+ props.put(LOCATORS, "localhost[10334]");
GeodeConnectorConfig config = new GeodeConnectorConfig(props);
assertEquals("", config.getDurableClientId());
}
- @Test
- public void cqPrefixShouldBeProperlyCalculatedFromProps() {
-
- }
}
diff --git a/src/test/java/geode/kafka/GeodeKafkaTestCluster.java b/src/test/java/geode/kafka/GeodeKafkaTestCluster.java
index ce26354..c6cb832 100644
--- a/src/test/java/geode/kafka/GeodeKafkaTestCluster.java
+++ b/src/test/java/geode/kafka/GeodeKafkaTestCluster.java
@@ -40,7 +40,6 @@
import static org.awaitility.Awaitility.await;
import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertNotNull;
public class GeodeKafkaTestCluster {
@@ -48,11 +47,13 @@
public static TemporaryFolder temporaryFolder = new TemporaryFolder();
private static boolean debug = true;
- public static String TEST_TOPIC = "someTopic";
- public static String TEST_REGION = "someRegion";
+ public static String TEST_REGION_TO_TOPIC_BINDINGS = "[someRegionForSource:someTopicForSource]";
+ public static String TEST_TOPIC_TO_REGION_BINDINGS = "[someTopicForSink:someRegionForSink]";
+ public static String TEST_TOPIC_FOR_SOURCE = "someTopicForSource";
+ public static String TEST_REGION_FOR_SOURCE = "someRegionForSource";
public static String TEST_TOPIC_FOR_SINK = "someTopicForSink";
- public static String TEST_REGION_FOR_SINK = "someTopicForSink";
+ public static String TEST_REGION_FOR_SINK = "someRegionForSink";
private static ZooKeeperLocalCluster zooKeeperLocalCluster;
private static KafkaLocalCluster kafkaLocalCluster;
@@ -85,7 +86,7 @@
KafkaZkClient zkClient = KafkaZkClient.apply("localhost:2181",false,200000,
15000,10, Time.SYSTEM, "myGroup","myMetricType", null);
AdminZkClient adminZkClient = new AdminZkClient(zkClient);
- adminZkClient.deleteTopic(TEST_TOPIC);
+ adminZkClient.deleteTopic(TEST_TOPIC_FOR_SOURCE);
adminZkClient.deleteTopic(TEST_TOPIC_FOR_SINK);
kafkaLocalCluster.stop();
@@ -106,7 +107,7 @@
Properties topicProperties = new Properties();
topicProperties.put("flush.messages", "1");
AdminZkClient adminZkClient = new AdminZkClient(zkClient);
- adminZkClient.createTopic(TEST_TOPIC,1
+ adminZkClient.createTopic(TEST_TOPIC_FOR_SOURCE,1
,1, topicProperties, RackAwareMode.Disabled$.MODULE$);
adminZkClient.createTopic(TEST_TOPIC_FOR_SINK,1
,1, topicProperties, RackAwareMode.Disabled$.MODULE$);
@@ -179,7 +180,7 @@
final Consumer<String, String> consumer =
new KafkaConsumer<>(props);
// Subscribe to the topic.
- consumer.subscribe(Collections.singletonList(TEST_TOPIC));
+ consumer.subscribe(Collections.singletonList(TEST_TOPIC_FOR_SOURCE));
return consumer;
}
@@ -201,7 +202,7 @@
@Test
public void endToEndSourceTest() {
ClientCache client = createGeodeClient();
- Region region = client.createClientRegionFactory(ClientRegionShortcut.PROXY).create(TEST_REGION);
+ Region region = client.createClientRegionFactory(ClientRegionShortcut.PROXY).create(TEST_REGION_FOR_SOURCE);
//right now just verify something makes it end to end
AtomicInteger valueReceived = new AtomicInteger(0);
diff --git a/src/test/java/geode/kafka/GeodeLocalCluster.java b/src/test/java/geode/kafka/GeodeLocalCluster.java
index afd3b9d..276dc05 100644
--- a/src/test/java/geode/kafka/GeodeLocalCluster.java
+++ b/src/test/java/geode/kafka/GeodeLocalCluster.java
@@ -16,7 +16,6 @@
System.out.println("starting locator");
locatorProcess.exec("10334");
Thread.sleep(15000);
- System.out.println("is alive?" + locatorProcess.process.isAlive());
serverProcess.exec("40404");
Thread.sleep(30000);
}
diff --git a/src/test/java/geode/kafka/ServerLauncherWrapper.java b/src/test/java/geode/kafka/ServerLauncherWrapper.java
index b36a3aa..7ef9db3 100644
--- a/src/test/java/geode/kafka/ServerLauncherWrapper.java
+++ b/src/test/java/geode/kafka/ServerLauncherWrapper.java
@@ -11,6 +11,9 @@
import java.io.IOException;
import java.util.Properties;
+import static geode.kafka.GeodeKafkaTestCluster.TEST_REGION_FOR_SINK;
+import static geode.kafka.GeodeKafkaTestCluster.TEST_REGION_FOR_SOURCE;
+
public class ServerLauncherWrapper {
public static void main(String... args) throws IOException {
@@ -43,11 +46,11 @@
.create();
CacheServer cacheServer = cache.addCacheServer();
cacheServer.setPort(0);
-// cacheServer.setMaxConnections(Integer.MAX_VALUE);
cacheServer.start();
//create the region
- cache.createRegionFactory(RegionShortcut.PARTITION).create("someRegion");
+ cache.createRegionFactory(RegionShortcut.PARTITION).create(TEST_REGION_FOR_SINK);
+ cache.createRegionFactory(RegionShortcut.PARTITION).create(TEST_REGION_FOR_SOURCE);
System.out.println("starting cacheserver");
while (true) {
diff --git a/src/test/java/geode/kafka/WorkerAndHerderWrapper.java b/src/test/java/geode/kafka/WorkerAndHerderWrapper.java
index a33c135..5e8f074 100644
--- a/src/test/java/geode/kafka/WorkerAndHerderWrapper.java
+++ b/src/test/java/geode/kafka/WorkerAndHerderWrapper.java
@@ -18,12 +18,12 @@
import java.util.HashMap;
import java.util.Map;
-import static geode.kafka.GeodeConnectorConfig.REGIONS;
+import static geode.kafka.GeodeConnectorConfig.REGION_TO_TOPIC_BINDINGS;
import static geode.kafka.GeodeConnectorConfig.TOPICS;
-import static geode.kafka.GeodeKafkaTestCluster.TEST_REGION;
-import static geode.kafka.GeodeKafkaTestCluster.TEST_REGION_FOR_SINK;
-import static geode.kafka.GeodeKafkaTestCluster.TEST_TOPIC;
+import static geode.kafka.GeodeConnectorConfig.TOPIC_TO_REGION_BINDINGS;
+import static geode.kafka.GeodeKafkaTestCluster.TEST_REGION_TO_TOPIC_BINDINGS;
import static geode.kafka.GeodeKafkaTestCluster.TEST_TOPIC_FOR_SINK;
+import static geode.kafka.GeodeKafkaTestCluster.TEST_TOPIC_TO_REGION_BINDINGS;
public class WorkerAndHerderWrapper {
@@ -59,8 +59,7 @@
sourceProps.put(ConnectorConfig.CONNECTOR_CLASS_CONFIG, GeodeKafkaSource.class.getName());
sourceProps.put(ConnectorConfig.NAME_CONFIG, "geode-kafka-source-connector");
sourceProps.put(ConnectorConfig.TASKS_MAX_CONFIG, "1");
- sourceProps.put(REGIONS, TEST_REGION);
- sourceProps.put(TOPICS, TEST_TOPIC);
+ sourceProps.put(REGION_TO_TOPIC_BINDINGS, TEST_REGION_TO_TOPIC_BINDINGS);
herder.putConnectorConfig(
sourceProps.get(ConnectorConfig.NAME_CONFIG),
@@ -71,7 +70,7 @@
sinkProps.put(ConnectorConfig.CONNECTOR_CLASS_CONFIG, GeodeKafkaSink.class.getName());
sinkProps.put(ConnectorConfig.NAME_CONFIG, "geode-kafka-sink-connector");
sinkProps.put(ConnectorConfig.TASKS_MAX_CONFIG, "1");
- sinkProps.put(REGIONS, TEST_REGION_FOR_SINK);
+ sinkProps.put(TOPIC_TO_REGION_BINDINGS, TEST_TOPIC_TO_REGION_BINDINGS);
sinkProps.put(TOPICS, TEST_TOPIC_FOR_SINK);
herder.putConnectorConfig(
diff --git a/src/test/java/geode/kafka/source/GeodeKafkaSourceTaskTest.java b/src/test/java/geode/kafka/source/GeodeKafkaSourceTaskTest.java
index 041adc8..d4149db 100644
--- a/src/test/java/geode/kafka/source/GeodeKafkaSourceTaskTest.java
+++ b/src/test/java/geode/kafka/source/GeodeKafkaSourceTaskTest.java
@@ -2,8 +2,8 @@
import geode.kafka.GeodeConnectorConfig;
import geode.kafka.GeodeContext;
+import org.apache.geode.cache.client.ClientCache;
import org.apache.geode.cache.query.CqEvent;
-import org.apache.kafka.connect.source.SourceRecord;
import org.junit.Test;
import java.util.ArrayList;
@@ -15,15 +15,8 @@
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
-import static geode.kafka.GeodeConnectorConfig.BATCH_SIZE;
import static geode.kafka.GeodeConnectorConfig.DEFAULT_CQ_PREFIX;
-import static geode.kafka.GeodeConnectorConfig.DEFAULT_LOCATOR;
-import static geode.kafka.GeodeConnectorConfig.DURABLE_CLIENT_ID_PREFIX;
-import static geode.kafka.GeodeConnectorConfig.LOCATORS;
-import static geode.kafka.GeodeConnectorConfig.REGIONS;
import static geode.kafka.GeodeConnectorConfig.REGION_NAME;
-import static geode.kafka.GeodeConnectorConfig.TASK_ID;
-import static geode.kafka.GeodeConnectorConfig.TOPICS;
import static org.hamcrest.CoreMatchers.is;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertThat;
@@ -31,6 +24,8 @@
import static org.mockito.ArgumentMatchers.anyBoolean;
import static org.mockito.ArgumentMatchers.anyString;
import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
@@ -87,6 +82,70 @@
}
@Test
+ public void readyForEventsIsCalledIfDurable() {
+ ClientCache clientCache = mock(ClientCache.class);
+
+ GeodeContext geodeContext = mock(GeodeContext.class);
+ when(geodeContext.getClientCache()).thenReturn(clientCache);
+
+ GeodeConnectorConfig config = mock(GeodeConnectorConfig.class);
+ when (config.isDurable()).thenReturn(true);
+ GeodeKafkaSourceTask task = new GeodeKafkaSourceTask();
+ task.installOnGeode(config, geodeContext, null, "", false);
+ verify(clientCache, times(1)).readyForEvents();
+ }
+
+ @Test
+ public void cqIsInvokedForEveryRegionWithATopic() {
+ ClientCache clientCache = mock(ClientCache.class);
+
+ GeodeContext geodeContext = mock(GeodeContext.class);
+ when(geodeContext.getClientCache()).thenReturn(clientCache);
+
+ Map<String, List<String>> regionToTopicsMap = new HashMap<>();
+ regionToTopicsMap.put("region1", new ArrayList());
+
+ GeodeConnectorConfig config = mock(GeodeConnectorConfig.class);
+ when (config.getRegionToTopics()).thenReturn(regionToTopicsMap);
+
+ GeodeKafkaSourceTask task = new GeodeKafkaSourceTask();
+ task.installOnGeode(config, geodeContext, null, "someCqPrefix", false);
+ verify(geodeContext, times(1)).newCq(anyString(), anyString(), any(), anyBoolean());
+ }
+
+ @Test
+ public void cqWithInitialResultsIsInvokedForEveryRegionWithATopicIfLoadEntireIsSet() {
+ ClientCache clientCache = mock(ClientCache.class);
+
+ GeodeContext geodeContext = mock(GeodeContext.class);
+ when(geodeContext.getClientCache()).thenReturn(clientCache);
+
+ Map<String, List<String>> regionToTopicsMap = new HashMap<>();
+ regionToTopicsMap.put("region1", new ArrayList());
+
+ GeodeConnectorConfig config = mock(GeodeConnectorConfig.class);
+ when (config.getRegionToTopics()).thenReturn(regionToTopicsMap);
+
+ GeodeKafkaSourceTask task = new GeodeKafkaSourceTask();
+ task.installOnGeode(config, geodeContext, new LinkedBlockingQueue(), "someCqPrefix", true);
+ verify(geodeContext, times(1)).newCqWithInitialResults(anyString(), anyString(), any(), anyBoolean());
+ }
+
+ @Test
+ public void readyForEventsIsNotCalledIfNotDurable() {
+ ClientCache clientCache = mock(ClientCache.class);
+
+ GeodeContext geodeContext = mock(GeodeContext.class);
+ when(geodeContext.getClientCache()).thenReturn(clientCache);
+
+ GeodeConnectorConfig config = mock(GeodeConnectorConfig.class);
+ when (config.isDurable()).thenReturn(false);
+ GeodeKafkaSourceTask task = new GeodeKafkaSourceTask();
+ task.installOnGeode(config, geodeContext, null, "", false);
+ verify(clientCache, times(0)).readyForEvents();
+ }
+
+ @Test
public void pollReturnsEventsWhenEventBufferHasValues() throws Exception {
// BlockingQueue<GeodeEvent> eventBuffer = new LinkedBlockingQueue(100);
// CqEvent cqEvent = mock(CqEvent.class);
@@ -139,20 +198,11 @@
}
- //Source properties tests
+
@Test
- public void propertiesShouldBeCorrectlyTranslatedToConfiguration() {
- Map<String, String> props = new HashMap<>();
- props.put(GeodeConnectorConfig.QUEUE_SIZE, GeodeConnectorConfig.DEFAULT_QUEUE_SIZE);
- props.put(GeodeConnectorConfig.BATCH_SIZE, GeodeConnectorConfig.DEFAULT_BATCH_SIZE);
-
- GeodeKafkaSourceTask task = new GeodeKafkaSourceTask();
-// task.start(props);
-
-// assertThat(task.getQueueSize(GeodeConnectorConfig.QUEUE_SIZE));
-
-
+ public void cqPrefixShouldBeProperlyCalculatedFromProps() {
+// GeodeContext geodeContext = mock(GeodeContext.class);
+// GeodeKafkaSourceTask task = new GeodeKafkaSourceTask();
}
-
}