Reworked region to topic and topic to region parsing
Tasks are assigned region to topic or topic to region groups
A task can handle multiple regions or multiple topics based on configuration
diff --git a/build.gradle b/build.gradle
index 515967d..a131d22 100644
--- a/build.gradle
+++ b/build.gradle
@@ -27,6 +27,7 @@
 
     testCompile group: 'junit', name: 'junit', version: '4.12'
     testCompile 'org.mockito:mockito-core:3.2.4'
+    testCompile 'pl.pragmatists:JUnitParams:1.1.1'
 
     testImplementation 'org.awaitility:awaitility:4.0.2'
 }
diff --git a/src/main/java/geode/kafka/GeodeConnectorConfig.java b/src/main/java/geode/kafka/GeodeConnectorConfig.java
index 11eff62..dcc479e 100644
--- a/src/main/java/geode/kafka/GeodeConnectorConfig.java
+++ b/src/main/java/geode/kafka/GeodeConnectorConfig.java
@@ -25,16 +25,19 @@
     public static final String LOCATORS = "locators";
     public static final String DEFAULT_LOCATOR = "localhost[10334]";
 
-    /**
-     * Specifies which Regions to connect in Apache Geode
-     */
-    public static final String REGIONS = "regions";
 
     /**
-     * Specifies which Topics to connect in Kafka
+     * Specifies which Topics to connect in Kafka, uses the variable name with Kafka Sink Configuration
+     * Only used in sink configuration
      */
     public static final String TOPICS = "topics";
 
+    //Used by sink
+    public static final String TOPIC_TO_REGION_BINDINGS = "topicToRegion";
+
+    //Used by source
+    public static final String REGION_TO_TOPIC_BINDINGS = "regionToTopic";
+
     /**
      * Property to describe the Source Partition in a record
      */
@@ -54,8 +57,9 @@
     private final String durableClientId;
     private final String durableClientIdPrefix;
     private final String durableClientTimeout;
-    private List<String> regionNames;
-    private List<String> topics;
+
+    private Map<String, List<String>> regionToTopics;
+    private Map<String, List<String>> topicToRegions;
     private List<LocatorHostPort> locatorHostPorts;
 
     //just for tests
@@ -71,22 +75,59 @@
         durableClientIdPrefix = connectorProperties.get(DURABLE_CLIENT_ID_PREFIX);
         if (isDurable(durableClientIdPrefix)) {
             durableClientId = durableClientIdPrefix + taskId;
-        }
-        else {
+        } else {
             durableClientId = "";
         }
         durableClientTimeout = connectorProperties.get(DURABLE_CLIENT_TIME_OUT);
-        regionNames = parseNames(connectorProperties.get(GeodeConnectorConfig.REGIONS));
-        topics = parseNames(connectorProperties.get(GeodeConnectorConfig.TOPICS));
+        regionToTopics = parseRegionToTopics(connectorProperties.get(GeodeConnectorConfig.REGION_TO_TOPIC_BINDINGS));
+        topicToRegions = parseTopicToRegions(connectorProperties.get(GeodeConnectorConfig.TOPIC_TO_REGION_BINDINGS));
         locatorHostPorts = parseLocators(connectorProperties.get(GeodeConnectorConfig.LOCATORS));
     }
 
+
+    public static Map<String, List<String>> parseTopicToRegions(String combinedBindings) {
+        //It's the same formatting, so parsing is the same going topic to region or region to topic
+        return parseRegionToTopics(combinedBindings);
+    }
+
+    /**
+     * Given a string of the form [region:topic,...] will produce a map where the key is the
+     * regionName and the value is a list of topicNames to push values to
+     *
+     * @param combinedBindings a string with similar form to "[region:topic,...], [region2:topic2,...]
+     * @return mapping of regionName to list of topics to update
+     */
+    public static Map<String, List<String>> parseRegionToTopics(String combinedBindings) {
+        if (combinedBindings == "" || combinedBindings == null){
+            return null;
+        }
+        List<String> bindings = parseBindings(combinedBindings);
+        return bindings.stream().map(binding -> {
+            String[] regionToTopicsArray = parseBinding(binding);
+            return regionToTopicsArray;
+        }).collect(Collectors.toMap(regionToTopicsArray -> regionToTopicsArray[0], regionToTopicsArray -> parseNames(regionToTopicsArray[1])));
+    }
+
+    public static List<String> parseBindings(String bindings) {
+        return Arrays.stream(bindings.split("](\\s)*,")).map((s) -> {
+            s = s.replaceAll("\\[", "");
+            s = s.replaceAll("\\]", "");
+            s = s.trim();
+            return s;
+        }).collect(Collectors.toList());
+    }
+
+    private static String[] parseBinding(String binding) {
+        return binding.split(":");
+    }
+
+    //Used to parse a string of topics or regions
     public static List<String> parseNames(String names) {
         return Arrays.stream(names.split(",")).map((s) -> s.trim()).collect(Collectors.toList());
     }
 
     public static String reconstructString(Collection<String> strings) {
-        return strings.stream().collect(Collectors.joining(","));
+        return strings.stream().collect(Collectors.joining("],[")) + "]";
     }
 
     List<LocatorHostPort> parseLocators(String locators) {
@@ -115,7 +156,6 @@
         return !durableClientId.equals("");
     }
 
-
     public int getTaskId() {
         return taskId;
     }
@@ -128,15 +168,16 @@
         return durableClientTimeout;
     }
 
-    public List<String> getRegionNames() {
-        return regionNames;
-    }
-
-    public List<String> getTopics() {
-        return topics;
-    }
-
     public List<LocatorHostPort> getLocatorHostPorts() {
         return locatorHostPorts;
     }
+
+    public Map<String, List<String>> getRegionToTopics() {
+        return regionToTopics;
+    }
+
+    public Map<String, List<String>> getTopicToRegions() {
+        return topicToRegions;
+    }
+
 }
diff --git a/src/main/java/geode/kafka/sink/GeodeKafkaSink.java b/src/main/java/geode/kafka/sink/GeodeKafkaSink.java
index c7afb96..99f9b9d 100644
--- a/src/main/java/geode/kafka/sink/GeodeKafkaSink.java
+++ b/src/main/java/geode/kafka/sink/GeodeKafkaSink.java
@@ -23,8 +23,7 @@
 import static geode.kafka.GeodeConnectorConfig.DURABLE_CLIENT_TIME_OUT;
 import static geode.kafka.GeodeConnectorConfig.LOCATORS;
 import static geode.kafka.GeodeConnectorConfig.QUEUE_SIZE;
-import static geode.kafka.GeodeConnectorConfig.REGIONS;
-import static geode.kafka.GeodeConnectorConfig.TOPICS;
+import static geode.kafka.GeodeConnectorConfig.TOPIC_TO_REGION_BINDINGS;
 
 public class GeodeKafkaSink extends SinkConnector  {
     private static final ConfigDef CONFIG_DEF = new ConfigDef();
@@ -45,17 +44,12 @@
         List<Map<String, String>> taskConfigs = new ArrayList<>();
         Map<String, String> taskProps = new HashMap<>();
         taskProps.putAll(sharedProps);
-
-        List<String> topics = GeodeConnectorConfig.parseNames(taskProps.get(TOPICS));
-        List<List<String>> topicsPerTask = ConnectorUtils.groupPartitions(topics, maxTasks);
-
-        List<String> regions = GeodeConnectorConfig.parseNames(taskProps.get(REGIONS));
-        List<List<String>> regionsPerTask = ConnectorUtils.groupPartitions(regions, maxTasks);
+        List<String> bindings = GeodeConnectorConfig.parseNames(taskProps.get(TOPIC_TO_REGION_BINDINGS));
+        List<List<String>> bindingsPerTask = ConnectorUtils.groupPartitions(bindings, maxTasks);
 
         for (int i = 0; i < maxTasks; i++) {
             taskProps.put(GeodeConnectorConfig.TASK_ID, "" + i);
-            taskProps.put(TOPICS, GeodeConnectorConfig.reconstructString(topicsPerTask.get(i)));
-            taskProps.put(REGIONS, GeodeConnectorConfig.reconstructString(regionsPerTask.get(i)));
+            taskProps.put(TOPIC_TO_REGION_BINDINGS, GeodeConnectorConfig.reconstructString(bindingsPerTask.get(i)));
             taskConfigs.add(taskProps);
         }
 
diff --git a/src/main/java/geode/kafka/source/GeodeKafkaSource.java b/src/main/java/geode/kafka/source/GeodeKafkaSource.java
index ca3f2c8..91e6203 100644
--- a/src/main/java/geode/kafka/source/GeodeKafkaSource.java
+++ b/src/main/java/geode/kafka/source/GeodeKafkaSource.java
@@ -26,8 +26,7 @@
 import static geode.kafka.GeodeConnectorConfig.LOAD_ENTIRE_REGION;
 import static geode.kafka.GeodeConnectorConfig.LOCATORS;
 import static geode.kafka.GeodeConnectorConfig.QUEUE_SIZE;
-import static geode.kafka.GeodeConnectorConfig.REGIONS;
-import static geode.kafka.GeodeConnectorConfig.TOPICS;
+import static geode.kafka.GeodeConnectorConfig.REGION_TO_TOPIC_BINDINGS;
 
 
 public class GeodeKafkaSource extends SourceConnector {
@@ -48,16 +47,12 @@
     Map<String, String> taskProps = new HashMap<>();
     taskProps.putAll(sharedProps);
 
-    List<String> topics = GeodeConnectorConfig.parseNames(taskProps.get(TOPICS));
-    List<List<String>> topicsPerTask = ConnectorUtils.groupPartitions(topics, maxTasks);
-
-    List<String> regions = GeodeConnectorConfig.parseNames(taskProps.get(REGIONS));
-    List<List<String>> regionsPerTask = ConnectorUtils.groupPartitions(regions, maxTasks);
+    List<String> bindings = GeodeConnectorConfig.parseNames(taskProps.get(REGION_TO_TOPIC_BINDINGS));
+    List<List<String>> bindingsPerTask = ConnectorUtils.groupPartitions(bindings, maxTasks);
 
     for (int i = 0; i < maxTasks; i++) {
       taskProps.put(GeodeConnectorConfig.TASK_ID, "" + i);
-      taskProps.put(TOPICS, GeodeConnectorConfig.reconstructString(topicsPerTask.get(i)));
-      taskProps.put(REGIONS, GeodeConnectorConfig.reconstructString(regionsPerTask.get(i)));
+      taskProps.put(REGION_TO_TOPIC_BINDINGS, GeodeConnectorConfig.reconstructString(bindingsPerTask.get(i)));
       taskConfigs.add(taskProps);
     }
     return taskConfigs;
diff --git a/src/main/java/geode/kafka/source/GeodeKafkaSourceTask.java b/src/main/java/geode/kafka/source/GeodeKafkaSourceTask.java
index 8080179..dadc8ba 100644
--- a/src/main/java/geode/kafka/source/GeodeKafkaSourceTask.java
+++ b/src/main/java/geode/kafka/source/GeodeKafkaSourceTask.java
@@ -38,7 +38,7 @@
 
     private GeodeContext geodeContext;
     private GeodeConnectorConfig geodeConnectorConfig;
-    private List<String> topics;
+    private Map<String, List<String>> regionToTopics;
     private Map<String, Map<String, String>> sourcePartitions;
     private BlockingQueue<GeodeEvent> eventBuffer;
     private int batchSize;
@@ -55,12 +55,6 @@
         return null;
     }
 
-    void startForTesting(BlockingQueue eventBuffer, List<String> topics, int batchSize) {
-        this.eventBuffer = eventBuffer;
-        this.topics = topics;
-        this.batchSize = batchSize;
-    }
-
     @Override
     public void start(Map<String, String> props) {
         try {
@@ -72,15 +66,14 @@
             int queueSize = Integer.parseInt(props.get(QUEUE_SIZE));
             eventBuffer = new LinkedBlockingQueue<>(queueSize);
 
-            sourcePartitions = createSourcePartitionsMap(geodeConnectorConfig.getRegionNames());
-            topics = geodeConnectorConfig.getTopics();
+            regionToTopics = geodeConnectorConfig.getRegionToTopics();
+            sourcePartitions = createSourcePartitionsMap(regionToTopics.keySet());
 
             String cqPrefix = props.get(CQ_PREFIX);
             boolean loadEntireRegion = Boolean.parseBoolean(props.get(LOAD_ENTIRE_REGION));
 
             installOnGeode(geodeConnectorConfig, geodeContext, eventBuffer, cqPrefix, loadEntireRegion);
-        }
-        catch (Exception e) {
+        } catch (Exception e) {
             logger.error("Unable to start source task", e);
             throw e;
         }
@@ -92,8 +85,10 @@
         ArrayList<GeodeEvent> events = new ArrayList<>(batchSize);
         if (eventBuffer.drainTo(events, batchSize) > 0) {
             for (GeodeEvent event : events) {
+                String regionName = event.getRegionName();
+                List<String> topics = regionToTopics.get(regionName);
                 for (String topic : topics) {
-                    records.add(new SourceRecord(sourcePartitions.get(event.getRegionName()), OFFSET_DEFAULT, topic, null, event.getEvent().getNewValue()));
+                    records.add(new SourceRecord(sourcePartitions.get(regionName), OFFSET_DEFAULT, topic, null, event.getEvent().getNewValue()));
                 }
             }
             return records;
@@ -108,9 +103,9 @@
     }
 
     void installOnGeode(GeodeConnectorConfig geodeConnectorConfig, GeodeContext geodeContext, BlockingQueue eventBuffer, String cqPrefix, boolean loadEntireRegion) {
-      boolean isDurable = geodeConnectorConfig.isDurable();
-      int taskId = geodeConnectorConfig.getTaskId();
-        for (String region : geodeConnectorConfig.getRegionNames()) {
+        boolean isDurable = geodeConnectorConfig.isDurable();
+        int taskId = geodeConnectorConfig.getTaskId();
+        for (String region : geodeConnectorConfig.getRegionToTopics().keySet()) {
             installListenersToRegion(geodeContext, taskId, eventBuffer, region, cqPrefix, loadEntireRegion, isDurable);
         }
         if (isDurable) {
@@ -132,8 +127,7 @@
                 geodeContext.newCq(generateCqName(taskId, cqPrefix, regionName), "select * from /" + regionName, cqAttributes,
                         isDurable);
             }
-        }
-        finally {
+        } finally {
             listener.signalInitialResultsLoaded();
         }
         return listener;
@@ -145,7 +139,7 @@
      * @param regionNames list of regionNames
      * @return Map<String, Map < String, String>> a map of source partitions, keyed by region name
      */
-    Map<String, Map<String, String>> createSourcePartitionsMap(List<String> regionNames) {
+    Map<String, Map<String, String>> createSourcePartitionsMap(Collection<String> regionNames) {
         return regionNames.stream().map(regionName -> {
             Map<String, String> sourcePartition = new HashMap<>();
             sourcePartition.put(REGION_NAME, regionName);
diff --git a/src/test/java/geode/kafka/GeodeConnectorConfigTest.java b/src/test/java/geode/kafka/GeodeConnectorConfigTest.java
index 8f06f61..6a39c5d 100644
--- a/src/test/java/geode/kafka/GeodeConnectorConfigTest.java
+++ b/src/test/java/geode/kafka/GeodeConnectorConfigTest.java
@@ -1,16 +1,26 @@
 package geode.kafka;
 
+import junitparams.JUnitParamsRunner;
+import junitparams.Parameters;
 import org.junit.Test;
+import org.junit.runner.RunWith;
 
+import java.util.Arrays;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 
+import static geode.kafka.GeodeConnectorConfig.DURABLE_CLIENT_ID_PREFIX;
+import static geode.kafka.GeodeConnectorConfig.DURABLE_CLIENT_TIME_OUT;
+import static geode.kafka.GeodeConnectorConfig.LOCATORS;
+import static geode.kafka.GeodeConnectorConfig.TASK_ID;
 import static org.hamcrest.CoreMatchers.allOf;
 import static org.hamcrest.CoreMatchers.is;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertThat;
+import static org.junit.Assert.assertTrue;
 
+@RunWith(JUnitParamsRunner.class)
 public class GeodeConnectorConfigTest {
 
     @Test
@@ -45,14 +55,94 @@
     }
 
     @Test
-    public void durableClientIdShouldNotBeSetIfPropertyIsNotSet() {
+    @Parameters(method="oneToOneBindings")
+    public void parseBindingsCanSplitOneToOneBindings(String value) {
+        List<String> splitBindings = GeodeConnectorConfig.parseBindings(value);
+        assertEquals(2, splitBindings.size());
+    }
+
+    @Test
+    public void parseBindingsCanSplitASingleOneToOneBindings() {
+        String binding = "[region1:topic1]";
+        List<String> splitBindings = GeodeConnectorConfig.parseBindings(binding);
+        assertEquals(1, splitBindings.size());
+        assertEquals(binding.replaceAll("\\[", "").replaceAll("\\]", ""), splitBindings.get(0));
+    }
+
+    public List<String> oneToOneBindings() {
+        return Arrays.asList(new String[]{"[region1:topic1],[region2:topic2]"
+        ,"[region1:topic1] , [region2:topic2]"
+        ,"[region1:topic1], [region2:topic2] ,"
+        ,"[region1: topic1], [region2 :topic2]"});
+    }
+
+    @Test
+    @Parameters(method="oneToManyBindings")
+    public void parseBindingsCanSplitOneToManyBindings(String value) {
+        List<String> splitBindings = GeodeConnectorConfig.parseBindings(value);
+        assertEquals(Arrays.toString(splitBindings.toArray()), 2, splitBindings.size());
+    }
+
+    public List<String> oneToManyBindings() {
+        return Arrays.asList(new String[]{"[region1:topic1,topic2],[region2:topic2,topic3]"
+                ,"[region1:topic1 , topic2] , [region2:topic2 , topic3]"
+                ,"[region1:topic1 ,], [region2:topic2 ,] ,"
+                ,"[region1: topic1 ,topic3], [region2 :topic2]"});
+    }
+
+
+    @Test
+    @Parameters(method="oneToManyBindings")
+    public void reconstructBindingsToStringShouldReformAParsableString(String value) {
+        List<String> splitBindings = GeodeConnectorConfig.parseBindings(value);
+        String reconstructString = GeodeConnectorConfig.reconstructString(splitBindings);
+        splitBindings = GeodeConnectorConfig.parseBindings(reconstructString);
+        assertEquals(Arrays.toString(splitBindings.toArray()), 2, splitBindings.size());
+    }
+
+    @Test
+    @Parameters(method="oneToOneBindings")
+    public void configurationShouldReturnRegionToTopicsMappingWhenParseRegionToTopics(String value) {
+        Map<String, List<String>> regionToTopics = GeodeConnectorConfig.parseRegionToTopics(value);
+        assertEquals(2, regionToTopics.size());
+        assertTrue(regionToTopics.get("region1") != null);
+        assertEquals(1, regionToTopics.get("region1").size());
+        assertTrue(regionToTopics.get("region1").contains("topic1"));
+    }
+
+    @Test
+    public void regionToTopicParsingShouldParseCorrectlyWithASingleBinding() {
+        Map<String, List<String>> regionToTopics = GeodeConnectorConfig.parseRegionToTopics("[region1:topic1]");
+        assertTrue(regionToTopics.get("region1") != null);
+        assertEquals(1, regionToTopics.get("region1").size());
+        assertTrue(regionToTopics.get("region1").contains("topic1"));
+    }
+
+
+    /*
+     taskId = Integer.parseInt(connectorProperties.get(TASK_ID));
+        durableClientIdPrefix = connectorProperties.get(DURABLE_CLIENT_ID_PREFIX);
+        if (isDurable(durableClientIdPrefix)) {
+            durableClientId = durableClientIdPrefix + taskId;
+        } else {
+            durableClientId = "";
+        }
+        durableClientTimeout = connectorProperties.get(DURABLE_CLIENT_TIME_OUT);
+        regionToTopics = parseRegionToTopics(connectorProperties.get(GeodeConnectorConfig.REGION_TO_TOPIC_BINDINGS));
+        topicToRegions = parseTopicToRegions(connectorProperties.get(GeodeConnectorConfig.REGION_TO_TOPIC_BINDINGS));
+        locatorHostPorts = parseLocators(connectorProperties.get(GeodeConnectorConfig.LOCATORS));
+
+     */
+
+
+    @Test
+    public void durableClientIdShouldNotBeSetIfPrefixIsEmpty() {
         Map<String, String> props = new HashMap<>();
+        props.put(TASK_ID, "0");
+        props.put(DURABLE_CLIENT_ID_PREFIX, "");
+        props.put(LOCATORS, "localhost[10334]");
         GeodeConnectorConfig config = new GeodeConnectorConfig(props);
         assertEquals("", config.getDurableClientId());
     }
 
-    @Test
-    public void cqPrefixShouldBeProperlyCalculatedFromProps() {
-
-    }
 }
diff --git a/src/test/java/geode/kafka/GeodeKafkaTestCluster.java b/src/test/java/geode/kafka/GeodeKafkaTestCluster.java
index ce26354..c6cb832 100644
--- a/src/test/java/geode/kafka/GeodeKafkaTestCluster.java
+++ b/src/test/java/geode/kafka/GeodeKafkaTestCluster.java
@@ -40,7 +40,6 @@
 
 import static org.awaitility.Awaitility.await;
 import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertNotNull;
 
 public class GeodeKafkaTestCluster {
 
@@ -48,11 +47,13 @@
   public static TemporaryFolder temporaryFolder = new TemporaryFolder();
   private static boolean debug = true;
 
-  public static String TEST_TOPIC = "someTopic";
-  public static String TEST_REGION = "someRegion";
+  public static String TEST_REGION_TO_TOPIC_BINDINGS = "[someRegionForSource:someTopicForSource]";
+  public static String TEST_TOPIC_TO_REGION_BINDINGS = "[someTopicForSink:someRegionForSink]";
 
+  public static String TEST_TOPIC_FOR_SOURCE = "someTopicForSource";
+  public static String TEST_REGION_FOR_SOURCE = "someRegionForSource";
   public static String TEST_TOPIC_FOR_SINK = "someTopicForSink";
-  public static String TEST_REGION_FOR_SINK = "someTopicForSink";
+  public static String TEST_REGION_FOR_SINK = "someRegionForSink";
 
   private static ZooKeeperLocalCluster zooKeeperLocalCluster;
   private static KafkaLocalCluster kafkaLocalCluster;
@@ -85,7 +86,7 @@
     KafkaZkClient zkClient = KafkaZkClient.apply("localhost:2181",false,200000,
             15000,10, Time.SYSTEM, "myGroup","myMetricType", null);
     AdminZkClient adminZkClient = new AdminZkClient(zkClient);
-    adminZkClient.deleteTopic(TEST_TOPIC);
+    adminZkClient.deleteTopic(TEST_TOPIC_FOR_SOURCE);
     adminZkClient.deleteTopic(TEST_TOPIC_FOR_SINK);
 
     kafkaLocalCluster.stop();
@@ -106,7 +107,7 @@
     Properties topicProperties = new Properties();
     topicProperties.put("flush.messages", "1");
     AdminZkClient adminZkClient = new AdminZkClient(zkClient);
-    adminZkClient.createTopic(TEST_TOPIC,1
+    adminZkClient.createTopic(TEST_TOPIC_FOR_SOURCE,1
             ,1, topicProperties, RackAwareMode.Disabled$.MODULE$);
     adminZkClient.createTopic(TEST_TOPIC_FOR_SINK,1
             ,1, topicProperties, RackAwareMode.Disabled$.MODULE$);
@@ -179,7 +180,7 @@
       final Consumer<String, String> consumer =
               new KafkaConsumer<>(props);
       // Subscribe to the topic.
-      consumer.subscribe(Collections.singletonList(TEST_TOPIC));
+      consumer.subscribe(Collections.singletonList(TEST_TOPIC_FOR_SOURCE));
       return consumer;
   }
 
@@ -201,7 +202,7 @@
   @Test
   public void endToEndSourceTest() {
     ClientCache client = createGeodeClient();
-    Region region = client.createClientRegionFactory(ClientRegionShortcut.PROXY).create(TEST_REGION);
+    Region region = client.createClientRegionFactory(ClientRegionShortcut.PROXY).create(TEST_REGION_FOR_SOURCE);
 
     //right now just verify something makes it end to end
     AtomicInteger valueReceived = new AtomicInteger(0);
diff --git a/src/test/java/geode/kafka/GeodeLocalCluster.java b/src/test/java/geode/kafka/GeodeLocalCluster.java
index afd3b9d..276dc05 100644
--- a/src/test/java/geode/kafka/GeodeLocalCluster.java
+++ b/src/test/java/geode/kafka/GeodeLocalCluster.java
@@ -16,7 +16,6 @@
         System.out.println("starting locator");
         locatorProcess.exec("10334");
         Thread.sleep(15000);
-        System.out.println("is alive?" + locatorProcess.process.isAlive());
         serverProcess.exec("40404");
         Thread.sleep(30000);
     }
diff --git a/src/test/java/geode/kafka/ServerLauncherWrapper.java b/src/test/java/geode/kafka/ServerLauncherWrapper.java
index b36a3aa..7ef9db3 100644
--- a/src/test/java/geode/kafka/ServerLauncherWrapper.java
+++ b/src/test/java/geode/kafka/ServerLauncherWrapper.java
@@ -11,6 +11,9 @@
 import java.io.IOException;
 import java.util.Properties;
 
+import static geode.kafka.GeodeKafkaTestCluster.TEST_REGION_FOR_SINK;
+import static geode.kafka.GeodeKafkaTestCluster.TEST_REGION_FOR_SOURCE;
+
 public class ServerLauncherWrapper {
 
     public static void main(String... args) throws IOException {
@@ -43,11 +46,11 @@
                 .create();
         CacheServer cacheServer = cache.addCacheServer();
         cacheServer.setPort(0);
-//        cacheServer.setMaxConnections(Integer.MAX_VALUE);
         cacheServer.start();
 
         //create the region
-        cache.createRegionFactory(RegionShortcut.PARTITION).create("someRegion");
+        cache.createRegionFactory(RegionShortcut.PARTITION).create(TEST_REGION_FOR_SINK);
+        cache.createRegionFactory(RegionShortcut.PARTITION).create(TEST_REGION_FOR_SOURCE);
         System.out.println("starting cacheserver");
         while (true) {
 
diff --git a/src/test/java/geode/kafka/WorkerAndHerderWrapper.java b/src/test/java/geode/kafka/WorkerAndHerderWrapper.java
index a33c135..5e8f074 100644
--- a/src/test/java/geode/kafka/WorkerAndHerderWrapper.java
+++ b/src/test/java/geode/kafka/WorkerAndHerderWrapper.java
@@ -18,12 +18,12 @@
 import java.util.HashMap;
 import java.util.Map;
 
-import static geode.kafka.GeodeConnectorConfig.REGIONS;
+import static geode.kafka.GeodeConnectorConfig.REGION_TO_TOPIC_BINDINGS;
 import static geode.kafka.GeodeConnectorConfig.TOPICS;
-import static geode.kafka.GeodeKafkaTestCluster.TEST_REGION;
-import static geode.kafka.GeodeKafkaTestCluster.TEST_REGION_FOR_SINK;
-import static geode.kafka.GeodeKafkaTestCluster.TEST_TOPIC;
+import static geode.kafka.GeodeConnectorConfig.TOPIC_TO_REGION_BINDINGS;
+import static geode.kafka.GeodeKafkaTestCluster.TEST_REGION_TO_TOPIC_BINDINGS;
 import static geode.kafka.GeodeKafkaTestCluster.TEST_TOPIC_FOR_SINK;
+import static geode.kafka.GeodeKafkaTestCluster.TEST_TOPIC_TO_REGION_BINDINGS;
 
 public class WorkerAndHerderWrapper {
 
@@ -59,8 +59,7 @@
         sourceProps.put(ConnectorConfig.CONNECTOR_CLASS_CONFIG, GeodeKafkaSource.class.getName());
         sourceProps.put(ConnectorConfig.NAME_CONFIG, "geode-kafka-source-connector");
         sourceProps.put(ConnectorConfig.TASKS_MAX_CONFIG, "1");
-        sourceProps.put(REGIONS, TEST_REGION);
-        sourceProps.put(TOPICS, TEST_TOPIC);
+        sourceProps.put(REGION_TO_TOPIC_BINDINGS, TEST_REGION_TO_TOPIC_BINDINGS);
 
         herder.putConnectorConfig(
                 sourceProps.get(ConnectorConfig.NAME_CONFIG),
@@ -71,7 +70,7 @@
         sinkProps.put(ConnectorConfig.CONNECTOR_CLASS_CONFIG, GeodeKafkaSink.class.getName());
         sinkProps.put(ConnectorConfig.NAME_CONFIG, "geode-kafka-sink-connector");
         sinkProps.put(ConnectorConfig.TASKS_MAX_CONFIG, "1");
-        sinkProps.put(REGIONS, TEST_REGION_FOR_SINK);
+        sinkProps.put(TOPIC_TO_REGION_BINDINGS, TEST_TOPIC_TO_REGION_BINDINGS);
         sinkProps.put(TOPICS, TEST_TOPIC_FOR_SINK);
 
         herder.putConnectorConfig(
diff --git a/src/test/java/geode/kafka/source/GeodeKafkaSourceTaskTest.java b/src/test/java/geode/kafka/source/GeodeKafkaSourceTaskTest.java
index 041adc8..d4149db 100644
--- a/src/test/java/geode/kafka/source/GeodeKafkaSourceTaskTest.java
+++ b/src/test/java/geode/kafka/source/GeodeKafkaSourceTaskTest.java
@@ -2,8 +2,8 @@
 
 import geode.kafka.GeodeConnectorConfig;
 import geode.kafka.GeodeContext;
+import org.apache.geode.cache.client.ClientCache;
 import org.apache.geode.cache.query.CqEvent;
-import org.apache.kafka.connect.source.SourceRecord;
 import org.junit.Test;
 
 import java.util.ArrayList;
@@ -15,15 +15,8 @@
 import java.util.concurrent.BlockingQueue;
 import java.util.concurrent.LinkedBlockingQueue;
 
-import static geode.kafka.GeodeConnectorConfig.BATCH_SIZE;
 import static geode.kafka.GeodeConnectorConfig.DEFAULT_CQ_PREFIX;
-import static geode.kafka.GeodeConnectorConfig.DEFAULT_LOCATOR;
-import static geode.kafka.GeodeConnectorConfig.DURABLE_CLIENT_ID_PREFIX;
-import static geode.kafka.GeodeConnectorConfig.LOCATORS;
-import static geode.kafka.GeodeConnectorConfig.REGIONS;
 import static geode.kafka.GeodeConnectorConfig.REGION_NAME;
-import static geode.kafka.GeodeConnectorConfig.TASK_ID;
-import static geode.kafka.GeodeConnectorConfig.TOPICS;
 import static org.hamcrest.CoreMatchers.is;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertThat;
@@ -31,6 +24,8 @@
 import static org.mockito.ArgumentMatchers.anyBoolean;
 import static org.mockito.ArgumentMatchers.anyString;
 import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
 import static org.mockito.Mockito.when;
 
 
@@ -87,6 +82,70 @@
     }
 
     @Test
+    public void readyForEventsIsCalledIfDurable() {
+        ClientCache clientCache = mock(ClientCache.class);
+
+        GeodeContext geodeContext = mock(GeodeContext.class);
+        when(geodeContext.getClientCache()).thenReturn(clientCache);
+
+        GeodeConnectorConfig config = mock(GeodeConnectorConfig.class);
+        when (config.isDurable()).thenReturn(true);
+        GeodeKafkaSourceTask task = new GeodeKafkaSourceTask();
+        task.installOnGeode(config, geodeContext, null, "", false);
+        verify(clientCache, times(1)).readyForEvents();
+    }
+
+    @Test
+    public void cqIsInvokedForEveryRegionWithATopic() {
+        ClientCache clientCache = mock(ClientCache.class);
+
+        GeodeContext geodeContext = mock(GeodeContext.class);
+        when(geodeContext.getClientCache()).thenReturn(clientCache);
+
+        Map<String, List<String>> regionToTopicsMap = new HashMap<>();
+        regionToTopicsMap.put("region1", new ArrayList());
+
+        GeodeConnectorConfig config = mock(GeodeConnectorConfig.class);
+        when (config.getRegionToTopics()).thenReturn(regionToTopicsMap);
+
+        GeodeKafkaSourceTask task = new GeodeKafkaSourceTask();
+        task.installOnGeode(config, geodeContext, null, "someCqPrefix", false);
+        verify(geodeContext, times(1)).newCq(anyString(), anyString(), any(), anyBoolean());
+    }
+
+    @Test
+    public void cqWithInitialResultsIsInvokedForEveryRegionWithATopicIfLoadEntireIsSet() {
+        ClientCache clientCache = mock(ClientCache.class);
+
+        GeodeContext geodeContext = mock(GeodeContext.class);
+        when(geodeContext.getClientCache()).thenReturn(clientCache);
+
+        Map<String, List<String>> regionToTopicsMap = new HashMap<>();
+        regionToTopicsMap.put("region1", new ArrayList());
+
+        GeodeConnectorConfig config = mock(GeodeConnectorConfig.class);
+        when (config.getRegionToTopics()).thenReturn(regionToTopicsMap);
+
+        GeodeKafkaSourceTask task = new GeodeKafkaSourceTask();
+        task.installOnGeode(config, geodeContext, new LinkedBlockingQueue(), "someCqPrefix", true);
+        verify(geodeContext, times(1)).newCqWithInitialResults(anyString(), anyString(), any(), anyBoolean());
+    }
+
+    @Test
+    public void readyForEventsIsNotCalledIfNotDurable() {
+        ClientCache clientCache = mock(ClientCache.class);
+
+        GeodeContext geodeContext = mock(GeodeContext.class);
+        when(geodeContext.getClientCache()).thenReturn(clientCache);
+
+        GeodeConnectorConfig config = mock(GeodeConnectorConfig.class);
+        when (config.isDurable()).thenReturn(false);
+        GeodeKafkaSourceTask task = new GeodeKafkaSourceTask();
+        task.installOnGeode(config, geodeContext, null, "", false);
+        verify(clientCache, times(0)).readyForEvents();
+    }
+
+    @Test
     public void pollReturnsEventsWhenEventBufferHasValues() throws Exception {
 //        BlockingQueue<GeodeEvent> eventBuffer = new LinkedBlockingQueue(100);
 //        CqEvent cqEvent = mock(CqEvent.class);
@@ -139,20 +198,11 @@
 
     }
 
-    //Source properties tests
+
     @Test
-    public void propertiesShouldBeCorrectlyTranslatedToConfiguration() {
-        Map<String, String> props = new HashMap<>();
-        props.put(GeodeConnectorConfig.QUEUE_SIZE, GeodeConnectorConfig.DEFAULT_QUEUE_SIZE);
-        props.put(GeodeConnectorConfig.BATCH_SIZE, GeodeConnectorConfig.DEFAULT_BATCH_SIZE);
-
-        GeodeKafkaSourceTask task = new GeodeKafkaSourceTask();
-//        task.start(props);
-
-//        assertThat(task.getQueueSize(GeodeConnectorConfig.QUEUE_SIZE));
-
-
+    public void cqPrefixShouldBeProperlyCalculatedFromProps() {
+//        GeodeContext geodeContext = mock(GeodeContext.class);
+//        GeodeKafkaSourceTask task = new GeodeKafkaSourceTask();
     }
 
-
 }