Added GeodeContext to handle cache related operations
Added GeodeConnectorConfig for shared configs
Moved packages
diff --git a/src/main/java/geode/kafka/GeodeConnectorConfig.java b/src/main/java/geode/kafka/GeodeConnectorConfig.java
new file mode 100644
index 0000000..9ac561f
--- /dev/null
+++ b/src/main/java/geode/kafka/GeodeConnectorConfig.java
@@ -0,0 +1,132 @@
+package geode.kafka;
+
+import java.util.Arrays;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+
+public class GeodeConnectorConfig {
+
+    //Geode Configuration
+    public static final String DURABLE_CLIENT_ID_PREFIX = "durableClientId";
+    public static final String DEFAULT_DURABLE_CLIENT_ID = "";
+    public static final String DURABLE_CLIENT_TIME_OUT = "durableClientTimeout";
+    public static final String DEFAULT_DURABLE_CLIENT_TIMEOUT = "60000";
+
+
+    //GeodeKafka Specific Configuration
+    public static final String TASK_ID = "GEODE_TASK_ID"; //One config per task
+
+    public static final String CQ_PREFIX = "cqPrefix";
+    public static final String DEFAULT_CQ_PREFIX = "cqForGeodeKafka";
+    /**
+     * Specifies which Locators to connect to Apache Geode
+     */
+    public static final String LOCATORS = "locators";
+    public static final String DEFAULT_LOCATOR = "localhost[10334]";
+
+    /**
+     * Specifies which Regions to connect in Apache Geode
+     */
+    public static final String REGIONS = "regions";
+
+    /**
+     * Specifies which Topics to connect in Kafka
+     */
+    public static final String TOPICS = "topics";
+
+    /**
+     * Property to describe the Source Partition in a record
+     */
+    public static final String REGION_NAME = "regionName";  //used for Source Partition Events
+
+    public static final String BATCH_SIZE = "geodeConnectorBatchSize";
+    public static final String DEFAULT_BATCH_SIZE = "100";
+
+    public static final String QUEUE_SIZE = "geodeConnectorQueueSize";
+    public static final String DEFAULT_QUEUE_SIZE = "100000";
+
+
+
+    private final int taskId;
+    private final String durableClientId;
+    private final String durableClientIdPrefix;
+    private final String durableClientTimeout;
+    private List<String> regionNames;
+    private List<String> topics;
+    private List<LocatorHostPort> locatorHostPorts;
+
+    //just for tests
+    GeodeConnectorConfig() {
+        taskId = 0;
+        durableClientId = "";
+        durableClientIdPrefix = "";
+        durableClientTimeout = "0";
+    }
+
+    public GeodeConnectorConfig(Map<String, String> connectorProperties) {
+        taskId = Integer.parseInt(connectorProperties.get(TASK_ID));
+        durableClientIdPrefix = connectorProperties.get(DURABLE_CLIENT_ID_PREFIX);
+        if (isDurable(durableClientIdPrefix)) {
+            durableClientId = durableClientIdPrefix + taskId;
+        }
+        else {
+            durableClientId = "";
+        }
+        durableClientTimeout = connectorProperties.get(DURABLE_CLIENT_TIME_OUT);
+        regionNames = parseNames(connectorProperties.get(GeodeConnectorConfig.REGIONS));
+        topics = parseNames(connectorProperties.get(GeodeConnectorConfig.TOPICS));
+        locatorHostPorts = parseLocators(connectorProperties.get(GeodeConnectorConfig.LOCATORS));
+    }
+
+    List<String> parseNames(String names) {
+        return Arrays.stream(names.split(",")).map((s) -> s.trim()).collect(Collectors.toList());
+    }
+
+    List<LocatorHostPort> parseLocators(String locators) {
+        return Arrays.stream(locators.split(",")).map((s) -> {
+            String locatorString = s.trim();
+            return parseLocator(locatorString);
+        }).collect(Collectors.toList());
+    }
+
+    private LocatorHostPort parseLocator(String locatorString) {
+        String[] splits = locatorString.split("\\[");
+        String locator = splits[0];
+        int port = Integer.parseInt(splits[1].replace("]", ""));
+        return new LocatorHostPort(locator, port);
+    }
+
+    public boolean isDurable() {
+        return isDurable(durableClientId);
+    }
+
+    /**
+     * @param durableClientId or prefix can be passed in.  Either both will be "" or both will have a value
+     * @return
+     */
+    boolean isDurable(String durableClientId) {
+        return !durableClientId.equals("");
+    }
+
+
+    public String getDurableClientId() {
+        return durableClientId;
+    }
+
+    public String getDurableClientTimeout() {
+        return durableClientTimeout;
+    }
+
+    public List<String> getRegionNames() {
+        return regionNames;
+    }
+
+    public List<String> getTopics() {
+        return topics;
+    }
+
+    public List<LocatorHostPort> getLocatorHostPorts() {
+        return locatorHostPorts;
+    }
+}
diff --git a/src/main/java/kafka/LocatorHostPort.java b/src/main/java/geode/kafka/LocatorHostPort.java
similarity index 94%
rename from src/main/java/kafka/LocatorHostPort.java
rename to src/main/java/geode/kafka/LocatorHostPort.java
index 517bad7..50d7440 100644
--- a/src/main/java/kafka/LocatorHostPort.java
+++ b/src/main/java/geode/kafka/LocatorHostPort.java
@@ -1,4 +1,4 @@
-package kafka;
+package geode.kafka;
 
 public class LocatorHostPort {
 
diff --git a/src/main/java/kafka/GeodeKafkaSink.java b/src/main/java/geode/kafka/sink/GeodeKafkaSink.java
similarity index 98%
rename from src/main/java/kafka/GeodeKafkaSink.java
rename to src/main/java/geode/kafka/sink/GeodeKafkaSink.java
index 67a244e..68460e4 100644
--- a/src/main/java/kafka/GeodeKafkaSink.java
+++ b/src/main/java/geode/kafka/sink/GeodeKafkaSink.java
@@ -1,4 +1,4 @@
-package kafka;
+package geode.kafka.sink;
 
 import org.apache.kafka.common.config.ConfigDef;
 import org.apache.kafka.common.utils.AppInfoParser;
diff --git a/src/main/java/kafka/GeodeEvent.java b/src/main/java/geode/kafka/source/GeodeEvent.java
similarity index 94%
rename from src/main/java/kafka/GeodeEvent.java
rename to src/main/java/geode/kafka/source/GeodeEvent.java
index 805f4a0..41e37c6 100644
--- a/src/main/java/kafka/GeodeEvent.java
+++ b/src/main/java/geode/kafka/source/GeodeEvent.java
@@ -1,4 +1,4 @@
-package kafka;
+package geode.kafka.source;
 
 import org.apache.geode.cache.query.CqEvent;
 
diff --git a/src/main/java/kafka/GeodeKafkaSource.java b/src/main/java/geode/kafka/source/GeodeKafkaSource.java
similarity index 64%
rename from src/main/java/kafka/GeodeKafkaSource.java
rename to src/main/java/geode/kafka/source/GeodeKafkaSource.java
index 95afc50..d5da62a 100644
--- a/src/main/java/kafka/GeodeKafkaSource.java
+++ b/src/main/java/geode/kafka/source/GeodeKafkaSource.java
@@ -1,5 +1,6 @@
-package kafka;
+package geode.kafka.source;
 
+import geode.kafka.GeodeConnectorConfig;
 import org.apache.kafka.common.config.ConfigDef;
 import org.apache.kafka.common.utils.AppInfoParser;
 import org.apache.kafka.connect.connector.Task;
@@ -10,21 +11,19 @@
 import java.util.List;
 import java.util.Map;
 
-import static kafka.GeodeConnectorConfig.BATCH_SIZE;
-import static kafka.GeodeConnectorConfig.CQ_PREFIX;
-import static kafka.GeodeConnectorConfig.DEFAULT_BATCH_SIZE;
-import static kafka.GeodeConnectorConfig.DEFAULT_CQ_PREFIX;
-import static kafka.GeodeConnectorConfig.DEFAULT_DURABLE_CLIENT_ID;
-import static kafka.GeodeConnectorConfig.DEFAULT_DURABLE_CLIENT_TIMEOUT;
-import static kafka.GeodeConnectorConfig.DEFAULT_LOCATOR;
-import static kafka.GeodeConnectorConfig.DEFAULT_QUEUE_SIZE;
-import static kafka.GeodeConnectorConfig.DURABLE_CLIENT_ID_PREFIX;
-import static kafka.GeodeConnectorConfig.DURABLE_CLIENT_TIME_OUT;
-import static kafka.GeodeConnectorConfig.LOCATORS;
-import static kafka.GeodeConnectorConfig.QUEUE_SIZE;
-import static kafka.GeodeConnectorConfig.REGIONS;
-import static kafka.GeodeConnectorConfig.TOPICS;
-import static kafka.GeodeKafkaSourceTask.TASK_ID;
+import static geode.kafka.GeodeConnectorConfig.BATCH_SIZE;
+import static geode.kafka.GeodeConnectorConfig.CQ_PREFIX;
+import static geode.kafka.GeodeConnectorConfig.DEFAULT_BATCH_SIZE;
+import static geode.kafka.GeodeConnectorConfig.DEFAULT_CQ_PREFIX;
+import static geode.kafka.GeodeConnectorConfig.DEFAULT_DURABLE_CLIENT_ID;
+import static geode.kafka.GeodeConnectorConfig.DEFAULT_DURABLE_CLIENT_TIMEOUT;
+import static geode.kafka.GeodeConnectorConfig.DEFAULT_LOCATOR;
+import static geode.kafka.GeodeConnectorConfig.DEFAULT_QUEUE_SIZE;
+import static geode.kafka.GeodeConnectorConfig.DURABLE_CLIENT_ID_PREFIX;
+import static geode.kafka.GeodeConnectorConfig.DURABLE_CLIENT_TIME_OUT;
+import static geode.kafka.GeodeConnectorConfig.LOCATORS;
+import static geode.kafka.GeodeConnectorConfig.QUEUE_SIZE;
+
 
 public class GeodeKafkaSource extends SourceConnector {
 
@@ -39,16 +38,14 @@
 
   @Override
   public List<Map<String, String>> taskConfigs(int maxTasks) {
-    System.out.println("GKSource: taskConfigs");
     List<Map<String, String>> taskConfigs = new ArrayList<>();
     Map<String, String> taskProps = new HashMap<>();
 
     taskProps.putAll(sharedProps);
 
-    // use the same props for all tasks at the moment
     for (int i = 0; i < maxTasks; i++) {
     //TODO partition regions and topics
-      taskProps.put(TASK_ID, "" + i);
+      taskProps.put(GeodeConnectorConfig.TASK_ID, "" + i);
       taskConfigs.add(taskProps);
     }
     return taskConfigs;
@@ -82,6 +79,7 @@
 
   @Override
   public String version() {
+    //TODO
     return AppInfoParser.getVersion();
   }
 
diff --git a/src/main/java/kafka/GeodeKafkaSourceListener.java b/src/main/java/geode/kafka/source/GeodeKafkaSourceListener.java
similarity index 95%
rename from src/main/java/kafka/GeodeKafkaSourceListener.java
rename to src/main/java/geode/kafka/source/GeodeKafkaSourceListener.java
index 4c0e729..c4d6b22 100644
--- a/src/main/java/kafka/GeodeKafkaSourceListener.java
+++ b/src/main/java/geode/kafka/source/GeodeKafkaSourceListener.java
@@ -1,7 +1,6 @@
-package kafka;
+package geode.kafka.source;
 
 import org.apache.geode.cache.query.CqEvent;
-import org.apache.geode.cache.query.CqListener;
 import org.apache.geode.cache.query.CqStatusListener;
 
 import java.util.concurrent.BlockingQueue;
diff --git a/src/main/java/geode/kafka/source/GeodeKafkaSourceTask.java b/src/main/java/geode/kafka/source/GeodeKafkaSourceTask.java
new file mode 100644
index 0000000..23fa141
--- /dev/null
+++ b/src/main/java/geode/kafka/source/GeodeKafkaSourceTask.java
@@ -0,0 +1,133 @@
+package geode.kafka.source;
+
+import geode.kafka.GeodeContext;
+import geode.kafka.GeodeConnectorConfig;
+import org.apache.geode.cache.query.CqAttributes;
+import org.apache.geode.cache.query.CqAttributesFactory;
+import org.apache.kafka.connect.source.SourceRecord;
+import org.apache.kafka.connect.source.SourceTask;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.stream.Collectors;
+
+import static geode.kafka.GeodeConnectorConfig.BATCH_SIZE;
+import static geode.kafka.GeodeConnectorConfig.CQ_PREFIX;
+import static geode.kafka.GeodeConnectorConfig.QUEUE_SIZE;
+import static geode.kafka.GeodeConnectorConfig.REGION_NAME;
+
+public class GeodeKafkaSourceTask extends SourceTask {
+
+    private static final String TASK_PREFIX = "TASK";
+    private static final String DOT = ".";
+
+    //property string to pass in to identify task
+    private static final Map<String, Long> OFFSET_DEFAULT = createOffset();
+
+    private int taskId;
+    private GeodeContext geodeContext;
+    private List<String> topics;
+
+    private Map<String, Map<String, String>> sourcePartitions;
+    private static BlockingQueue<GeodeEvent> eventBuffer;
+    private int batchSize;
+
+
+    private static Map<String, Long> createOffset() {
+        Map<String, Long> offset = new HashMap<>();
+        offset.put("OFFSET", 0L);
+        return offset;
+    }
+
+    @Override
+    public String version() {
+        return null;
+    }
+
+    @Override
+    public void start(Map<String, String> props) {
+        try {
+            GeodeConnectorConfig geodeConnectorConfig = new GeodeConnectorConfig(props);
+            geodeContext = new GeodeContext(geodeConnectorConfig);
+
+            batchSize = Integer.parseInt(props.get(BATCH_SIZE));
+            int queueSize = Integer.parseInt(props.get(QUEUE_SIZE));
+            eventBuffer = new LinkedBlockingQueue<>(queueSize);
+
+            sourcePartitions = createSourcePartitionsMap(geodeConnectorConfig.getRegionNames());
+            topics = geodeConnectorConfig.getTopics();
+
+            String cqPrefix = props.get(CQ_PREFIX);
+
+            installOnGeode(geodeConnectorConfig, geodeContext, eventBuffer, cqPrefix);
+        }
+        catch (Exception e) {
+            System.out.println("Exception:" + e);
+            e.printStackTrace();
+            throw e;
+        }
+    }
+
+    @Override
+    public List<SourceRecord> poll() throws InterruptedException {
+        ArrayList<SourceRecord> records = new ArrayList<>(batchSize);
+        ArrayList<GeodeEvent> events = new ArrayList<>(batchSize);
+        if (eventBuffer.drainTo(events, batchSize) > 0) {
+            for (GeodeEvent event : events) {
+                for (String topic : topics) {
+                    records.add(new SourceRecord(sourcePartitions.get(event.getRegionName()), OFFSET_DEFAULT, topic, null, event.getEvent()));
+                }
+            }
+
+            return records;
+        }
+
+        return null;
+    }
+
+    @Override
+    public void stop() {
+        geodeContext.getClientCache().close(true);
+    }
+
+    void installOnGeode(GeodeConnectorConfig geodeConnectorConfig, GeodeContext geodeContext, BlockingQueue eventBuffer, String cqPrefix) {
+      boolean isDurable = geodeConnectorConfig.isDurable();
+        for (String region : geodeConnectorConfig.getRegionNames()) {
+            installListenersToRegion(geodeContext, taskId, eventBuffer, region, cqPrefix, isDurable);
+        }
+        if (isDurable) {
+            geodeContext.getClientCache().readyForEvents();
+        }
+    }
+
+    void installListenersToRegion(GeodeContext geodeContext, int taskId, BlockingQueue<GeodeEvent> eventBuffer, String regionName, String cqPrefix, boolean isDurable) {
+        CqAttributesFactory cqAttributesFactory = new CqAttributesFactory();
+        cqAttributesFactory.addCqListener(new GeodeKafkaSourceListener(eventBuffer, regionName));
+        CqAttributes cqAttributes = cqAttributesFactory.create();
+        geodeContext.newCq(generateCqName(taskId, cqPrefix, regionName), "select * from /" + regionName, cqAttributes,
+                isDurable);
+    }
+
+    /**
+     * converts a list of regions names into a map of source partitions
+     *
+     * @param regionNames list of regionNames
+     * @return Map<String, Map < String, String>> a map of source partitions, keyed by region name
+     */
+    Map<String, Map<String, String>> createSourcePartitionsMap(List<String> regionNames) {
+        return regionNames.stream().map(regionName -> {
+            Map<String, String> sourcePartition = new HashMap<>();
+            sourcePartition.put(REGION_NAME, regionName);
+            return sourcePartition;
+        }).collect(Collectors.toMap(s -> s.get(REGION_NAME), s -> s));
+    }
+
+    String generateCqName(int taskId, String cqPrefix, String regionName) {
+        return cqPrefix + DOT + TASK_PREFIX + taskId + DOT + regionName;
+    }
+
+}
diff --git a/src/main/java/kafka/GeodeConnectorConfig.java b/src/main/java/kafka/GeodeConnectorConfig.java
deleted file mode 100644
index 003367d..0000000
--- a/src/main/java/kafka/GeodeConnectorConfig.java
+++ /dev/null
@@ -1,41 +0,0 @@
-package kafka;
-
-public class GeodeConnectorConfig {
-
-    //Geode Configuration
-    public static final String DURABLE_CLIENT_ID_PREFIX = "durableClientId";
-    public static final String DEFAULT_DURABLE_CLIENT_ID = "";
-    public static final String DURABLE_CLIENT_TIME_OUT = "durableClientTimeout";
-    public static final String DEFAULT_DURABLE_CLIENT_TIMEOUT = "60000";
-
-
-    //GeodeKafka Specific Configuration
-    public static final String CQ_PREFIX = "cqPrefix";
-    public static final String DEFAULT_CQ_PREFIX = "cqForGeodeKafka";
-    /**
-     * Specifies which Locators to connect to Apache Geode
-     */
-    public static final String LOCATORS = "locators";
-    public static final String DEFAULT_LOCATOR = "localhost[10334]";
-
-    /**
-     * Specifies which Regions to connect in Apache Geode
-     */
-    public static final String REGIONS = "regions";
-
-    /**
-     * Specifies which Topics to connect in Kafka
-     */
-    public static final String TOPICS = "topics";
-
-    /**
-     * Property to describe the Source Partition in a record
-     */
-    public static final String REGION_NAME = "regionName";  //used for Source Partition Events
-
-    public static final String BATCH_SIZE = "geodeConnectorBatchSize";
-    public static final String DEFAULT_BATCH_SIZE = "100";
-
-    public static final String QUEUE_SIZE = "geodeConnectorQueueSize";
-    public static final String DEFAULT_QUEUE_SIZE = "100000";
-}
diff --git a/src/main/java/kafka/GeodeKafkaSourceTask.java b/src/main/java/kafka/GeodeKafkaSourceTask.java
deleted file mode 100644
index 896b602..0000000
--- a/src/main/java/kafka/GeodeKafkaSourceTask.java
+++ /dev/null
@@ -1,196 +0,0 @@
-package kafka;
-
-import org.apache.geode.cache.client.ClientCache;
-import org.apache.geode.cache.client.ClientCacheFactory;
-import org.apache.geode.cache.query.CqAttributes;
-import org.apache.geode.cache.query.CqAttributesFactory;
-import org.apache.geode.cache.query.CqException;
-import org.apache.geode.cache.query.CqExistsException;
-import org.apache.geode.cache.query.RegionNotFoundException;
-import org.apache.kafka.connect.source.SourceRecord;
-import org.apache.kafka.connect.source.SourceTask;
-
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.concurrent.BlockingQueue;
-import java.util.concurrent.LinkedBlockingQueue;
-import java.util.stream.Collectors;
-
-import static kafka.GeodeConnectorConfig.BATCH_SIZE;
-import static kafka.GeodeConnectorConfig.CQ_PREFIX;
-import static kafka.GeodeConnectorConfig.DEFAULT_CQ_PREFIX;
-import static kafka.GeodeConnectorConfig.DEFAULT_DURABLE_CLIENT_ID;
-import static kafka.GeodeConnectorConfig.DEFAULT_DURABLE_CLIENT_TIMEOUT;
-import static kafka.GeodeConnectorConfig.DURABLE_CLIENT_ID_PREFIX;
-import static kafka.GeodeConnectorConfig.DURABLE_CLIENT_TIME_OUT;
-import static kafka.GeodeConnectorConfig.QUEUE_SIZE;
-import static kafka.GeodeConnectorConfig.REGION_NAME;
-
-public class GeodeKafkaSourceTask extends SourceTask {
-
-    //property string to pass in to identify task
-    public static final String TASK_ID = "GEODE_TASK_ID";
-    private static final String TASK_PREFIX = "TASK";
-    private static final String DOT = ".";
-    private static final Map<String, Long> OFFSET_DEFAULT = createOffset();
-
-    private int taskId;
-    private ClientCache clientCache;
-    private List<String> regionNames;
-    private List<String> topics;
-    private Map<String, Map<String, String>> sourcePartitions;
-    private static BlockingQueue<GeodeEvent> eventBuffer;
-    private int batchSize;
-
-
-    private static Map<String, Long> createOffset() {
-        Map<String, Long> offset = new HashMap<>();
-        offset.put("OFFSET", 0L);
-        return offset;
-    }
-
-    @Override
-    public String version() {
-        return null;
-    }
-
-    @Override
-    public void start(Map<String, String> props) {
-        try {
-            System.out.println("JASON task start");
-            taskId = Integer.parseInt(props.get(TASK_ID));
-            batchSize = Integer.parseInt(props.get(BATCH_SIZE));
-            int queueSize = Integer.parseInt(props.get(QUEUE_SIZE));
-            eventBuffer = new LinkedBlockingQueue<>(queueSize);
-
-            //grouping will be done in the source and not the task
-            regionNames = parseNames(props.get(GeodeConnectorConfig.REGIONS));
-            topics = parseNames(props.get(GeodeConnectorConfig.TOPICS));
-            sourcePartitions = createSourcePartitionsMap(regionNames);
-
-            String durableClientId = props.get(DURABLE_CLIENT_ID_PREFIX);
-            if (!durableClientId.equals("")) {
-                durableClientId += taskId;
-            }
-            System.out.println("JASON durable client id is:" + durableClientId);
-            String durableClientTimeout = props.get(DURABLE_CLIENT_TIME_OUT);
-            String cqPrefix = props.get(CQ_PREFIX);
-
-            List<LocatorHostPort> locators = parseLocators(props.get(GeodeConnectorConfig.LOCATORS));
-            installOnGeode(taskId, eventBuffer, locators, regionNames, durableClientId, durableClientTimeout, cqPrefix);
-            System.out.println("JASON task start finished");
-        }
-        catch (Exception e) {
-            System.out.println("Exception:" + e);
-            e.printStackTrace();
-            throw e;
-        }
-    }
-
-    @Override
-    public List<SourceRecord> poll() throws InterruptedException {
-        ArrayList<SourceRecord> records = new ArrayList<>(batchSize);
-        ArrayList<GeodeEvent> events = new ArrayList<>(batchSize);
-        if (eventBuffer.drainTo(events, batchSize) > 0) {
-            for (GeodeEvent event : events) {
-                for (String topic : topics) {
-                    records.add(new SourceRecord(sourcePartitions.get(event.getRegionName()), OFFSET_DEFAULT, topic, null, event.getEvent()));
-//                    records.add(new SourceRecord(sourcePartitions.get(event.getRegionName()), OFFSET_DEFAULT, topic, null, "STRING"));
-                }
-            }
-
-            return records;
-        }
-
-        return null;
-    }
-
-    @Override
-    public void stop() {
-        clientCache.close(true);
-    }
-
-    ClientCache createClientCache(List<LocatorHostPort> locators, String durableClientName, String durableClientTimeOut) {
-        ClientCacheFactory ccf = new ClientCacheFactory().set("durable-client-id", durableClientName)
-                .set("durable-client-timeout", durableClientTimeOut)
-                .setPoolSubscriptionEnabled(true);
-        for (LocatorHostPort locator: locators) {
-          ccf.addPoolLocator(locator.getHostName(), locator.getPort()).create();
-        }
-        return ccf.create();
-    }
-
-    void installOnGeode(int taskId, BlockingQueue<GeodeEvent> eventBuffer, List<LocatorHostPort> locators, List<String> regionNames, String durableClientId, String durableClientTimeout, String cqPrefix) {
-      boolean isDurable = isDurable(durableClientId);
-
-      clientCache = createClientCache(locators, durableClientId, durableClientTimeout);
-        for (String region : regionNames) {
-            installListenersToRegion(taskId, eventBuffer, region, cqPrefix, isDurable);
-        }
-        if (isDurable) {
-          clientCache.readyForEvents();
-        }
-    }
-
-    void installListenersToRegion(int taskId, BlockingQueue<GeodeEvent> eventBuffer, String regionName, String cqPrefix, boolean isDurable) {
-        CqAttributesFactory cqAttributesFactory = new CqAttributesFactory();
-        cqAttributesFactory.addCqListener(new GeodeKafkaSourceListener(eventBuffer, regionName));
-        CqAttributes cqAttributes = cqAttributesFactory.create();
-        try {
-            clientCache.getQueryService().newCq(generateCqName(taskId, cqPrefix, regionName), "select * from /" + regionName, cqAttributes,
-                    isDurable).execute();
-        } catch (CqExistsException e) {
-            System.out.println("UHH");
-            e.printStackTrace();
-        } catch (CqException | RegionNotFoundException e) {
-            System.out.println("UHH e");
-            e.printStackTrace();
-        } catch (Exception e) {
-            System.out.println("UHHHHHH " + e);
-        }
-    }
-
-
-  List<String> parseNames(String names) {
-        return Arrays.stream(names.split(",")).map((s) -> s.trim()).collect(Collectors.toList());
-    }
-
-    List<LocatorHostPort> parseLocators(String locators) {
-        return Arrays.stream(locators.split(",")).map((s) -> {
-            String locatorString = s.trim();
-            return parseLocator(locatorString);
-        }).collect(Collectors.toList());
-    }
-
-    private LocatorHostPort parseLocator(String locatorString) {
-        String[] splits = locatorString.split("\\[");
-        String locator = splits[0];
-        int port = Integer.parseInt(splits[1].replace("]", ""));
-        return new LocatorHostPort(locator, port);
-    }
-
-    boolean isDurable(String durableClientId) {
-      return !durableClientId.equals("");
-    }
-
-    String generateCqName(int taskId, String cqPrefix, String regionName) {
-      return cqPrefix + DOT + TASK_PREFIX + taskId + DOT + regionName;
-    }
-
-    /**
-     * converts a list of regions names into a map of source partitions
-     *
-     * @param regionNames list of regionNames
-     * @return Map<String, Map < String, String>> a map of source partitions, keyed by region name
-     */
-    Map<String, Map<String, String>> createSourcePartitionsMap(List<String> regionNames) {
-        return regionNames.stream().map(regionName -> {
-            Map<String, String> sourcePartition = new HashMap<>();
-            sourcePartition.put(REGION_NAME, regionName);
-            return sourcePartition;
-        }).collect(Collectors.toMap(s -> s.get(REGION_NAME), s -> s));
-    }
-}
diff --git a/src/test/java/kafka/GeodeKafkaTestCluster.java b/src/test/java/geode/kafka/GeodeKafkaTestCluster.java
similarity index 98%
rename from src/test/java/kafka/GeodeKafkaTestCluster.java
rename to src/test/java/geode/kafka/GeodeKafkaTestCluster.java
index 85c0fc4..f1a6dff 100644
--- a/src/test/java/kafka/GeodeKafkaTestCluster.java
+++ b/src/test/java/geode/kafka/GeodeKafkaTestCluster.java
@@ -1,6 +1,7 @@
-package kafka;
+package geode.kafka;
 
 import kafka.admin.RackAwareMode;
+import geode.kafka.source.GeodeKafkaSource;
 import kafka.zk.AdminZkClient;
 import kafka.zk.KafkaZkClient;
 import org.apache.geode.cache.Region;
diff --git a/src/test/java/kafka/GeodeLocalCluster.java b/src/test/java/geode/kafka/GeodeLocalCluster.java
similarity index 97%
rename from src/test/java/kafka/GeodeLocalCluster.java
rename to src/test/java/geode/kafka/GeodeLocalCluster.java
index 43ac8f5..afd3b9d 100644
--- a/src/test/java/kafka/GeodeLocalCluster.java
+++ b/src/test/java/geode/kafka/GeodeLocalCluster.java
@@ -1,4 +1,4 @@
-package kafka;
+package geode.kafka;
 
 import java.io.IOException;
 
diff --git a/src/test/java/kafka/JavaProcess.java b/src/test/java/geode/kafka/JavaProcess.java
similarity index 97%
rename from src/test/java/kafka/JavaProcess.java
rename to src/test/java/geode/kafka/JavaProcess.java
index 30edfef..fe00094 100644
--- a/src/test/java/kafka/JavaProcess.java
+++ b/src/test/java/geode/kafka/JavaProcess.java
@@ -1,4 +1,4 @@
-package kafka;
+package geode.kafka;
 
 import java.io.File;
 import java.io.IOException;
diff --git a/src/test/java/kafka/KafkaLocalCluster.java b/src/test/java/geode/kafka/KafkaLocalCluster.java
similarity index 96%
rename from src/test/java/kafka/KafkaLocalCluster.java
rename to src/test/java/geode/kafka/KafkaLocalCluster.java
index cd2a3df..57f16f4 100644
--- a/src/test/java/kafka/KafkaLocalCluster.java
+++ b/src/test/java/geode/kafka/KafkaLocalCluster.java
@@ -1,4 +1,4 @@
-package kafka;
+package geode.kafka;
 
 import kafka.server.KafkaConfig;
 import kafka.server.KafkaServerStartable;
diff --git a/src/test/java/kafka/LocatorLauncherWrapper.java b/src/test/java/geode/kafka/LocatorLauncherWrapper.java
similarity index 98%
rename from src/test/java/kafka/LocatorLauncherWrapper.java
rename to src/test/java/geode/kafka/LocatorLauncherWrapper.java
index c1a7075..57ff405 100644
--- a/src/test/java/kafka/LocatorLauncherWrapper.java
+++ b/src/test/java/geode/kafka/LocatorLauncherWrapper.java
@@ -1,4 +1,4 @@
-package kafka;
+package geode.kafka;
 
 import org.apache.geode.distributed.ConfigurationProperties;
 import org.apache.geode.distributed.Locator;
diff --git a/src/test/java/kafka/ServerLauncherWrapper.java b/src/test/java/geode/kafka/ServerLauncherWrapper.java
similarity index 98%
rename from src/test/java/kafka/ServerLauncherWrapper.java
rename to src/test/java/geode/kafka/ServerLauncherWrapper.java
index 933824e..b36a3aa 100644
--- a/src/test/java/kafka/ServerLauncherWrapper.java
+++ b/src/test/java/geode/kafka/ServerLauncherWrapper.java
@@ -1,4 +1,4 @@
-package kafka;
+package geode.kafka;
 
 import org.apache.geode.cache.Cache;
 import org.apache.geode.cache.CacheFactory;
diff --git a/src/test/java/kafka/WorkerAndHerderCluster.java b/src/test/java/geode/kafka/WorkerAndHerderCluster.java
similarity index 95%
rename from src/test/java/kafka/WorkerAndHerderCluster.java
rename to src/test/java/geode/kafka/WorkerAndHerderCluster.java
index 7357232..c347946 100644
--- a/src/test/java/kafka/WorkerAndHerderCluster.java
+++ b/src/test/java/geode/kafka/WorkerAndHerderCluster.java
@@ -1,4 +1,4 @@
-package kafka;
+package geode.kafka;
 
 import java.io.IOException;
 
diff --git a/src/test/java/kafka/WorkerAndHerderWrapper.java b/src/test/java/geode/kafka/WorkerAndHerderWrapper.java
similarity index 88%
rename from src/test/java/kafka/WorkerAndHerderWrapper.java
rename to src/test/java/geode/kafka/WorkerAndHerderWrapper.java
index 5f8ccd2..cc8e27b 100644
--- a/src/test/java/kafka/WorkerAndHerderWrapper.java
+++ b/src/test/java/geode/kafka/WorkerAndHerderWrapper.java
@@ -1,7 +1,6 @@
-package kafka;
+package geode.kafka;
 
-import org.apache.geode.distributed.ConfigurationProperties;
-import org.apache.geode.distributed.Locator;
+import geode.kafka.source.GeodeKafkaSource;
 import org.apache.kafka.common.utils.SystemTime;
 import org.apache.kafka.connect.connector.policy.AllConnectorClientConfigOverridePolicy;
 import org.apache.kafka.connect.runtime.ConnectorConfig;
@@ -14,16 +13,14 @@
 import org.apache.kafka.connect.storage.MemoryOffsetBackingStore;
 import org.apache.kafka.connect.util.ConnectUtils;
 
-import java.io.File;
 import java.io.IOException;
 import java.util.HashMap;
 import java.util.Map;
-import java.util.Properties;
 
-import static kafka.GeodeConnectorConfig.REGIONS;
-import static kafka.GeodeConnectorConfig.TOPICS;
-import static kafka.GeodeKafkaTestCluster.TEST_REGIONS;
-import static kafka.GeodeKafkaTestCluster.TEST_TOPICS;
+import static geode.kafka.GeodeConnectorConfig.REGIONS;
+import static geode.kafka.GeodeConnectorConfig.TOPICS;
+import static geode.kafka.GeodeKafkaTestCluster.TEST_REGIONS;
+import static geode.kafka.GeodeKafkaTestCluster.TEST_TOPICS;
 
 public class WorkerAndHerderWrapper {
 
diff --git a/src/test/java/kafka/ZooKeeperLocalCluster.java b/src/test/java/geode/kafka/ZooKeeperLocalCluster.java
similarity index 98%
rename from src/test/java/kafka/ZooKeeperLocalCluster.java
rename to src/test/java/geode/kafka/ZooKeeperLocalCluster.java
index 8b23f53..a3d3433 100644
--- a/src/test/java/kafka/ZooKeeperLocalCluster.java
+++ b/src/test/java/geode/kafka/ZooKeeperLocalCluster.java
@@ -1,4 +1,4 @@
-package kafka;
+package geode.kafka;
 
 import org.apache.zookeeper.server.ServerConfig;
 import org.apache.zookeeper.server.ZooKeeperServerMain;
diff --git a/src/test/java/geode/kafka/source/GeodeKafkaSourceTaskTest.java b/src/test/java/geode/kafka/source/GeodeKafkaSourceTaskTest.java
new file mode 100644
index 0000000..33c260d
--- /dev/null
+++ b/src/test/java/geode/kafka/source/GeodeKafkaSourceTaskTest.java
@@ -0,0 +1,85 @@
+package geode.kafka.source;
+
+import geode.kafka.GeodeConnectorConfig;
+import org.junit.Test;
+
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import static geode.kafka.GeodeConnectorConfig.REGION_NAME;
+import static org.hamcrest.CoreMatchers.is;
+import static org.junit.Assert.assertThat;
+
+public class GeodeKafkaSourceTaskTest {
+
+    @Test
+    public void cqListenerOnEventPopulatesEventsBuffer() {
+
+    }
+
+    @Test
+    public void pollReturnsEventsWhenEventBufferHasValues() {
+
+    }
+
+    @Test
+    public void regionsArePassedCorrectlyToTask() {
+
+    }
+
+    @Test
+    public void installOnGeodeShouldCallCq() {
+        GeodeKafkaSourceTask task = new GeodeKafkaSourceTask();
+    }
+
+
+
+
+
+    @Test
+    public void createSourcePartitionsShouldReturnAMapOfSourcePartitions() {
+        GeodeKafkaSourceTask task = new GeodeKafkaSourceTask();
+        List<String> regionNames = Arrays.asList(new String[]{"region1", "region2", "region3"});
+        Map<String, Map<String,String>> sourcePartitions = task.createSourcePartitionsMap(regionNames);
+        assertThat(3, is(sourcePartitions.size()));
+        assertThat(true, is(sourcePartitions.get("region1").get(REGION_NAME).equals("region1")));
+        assertThat(true, is(sourcePartitions.get("region2").get(REGION_NAME).equals("region2")));
+        assertThat(true, is(sourcePartitions.get("region3").get(REGION_NAME).equals("region3")));
+    }
+
+
+
+    @Test
+    public void listOfLocatorsShouldBeConfiguredIntoClientCache() {
+
+    }
+
+    @Test
+    public void shouldNotBeDurableIfDurableClientIdIsNull() {
+
+    }
+
+    @Test
+    public void shouldNotCallReadyForEventsIfDurableClientPrefixIsEmpty() {
+
+    }
+
+    //Source properties tests
+    @Test
+    public void propertiesShouldBeCorrectlyTranslatedToConfiguration() {
+        Map<String, String> props = new HashMap<>();
+        props.put(GeodeConnectorConfig.QUEUE_SIZE, GeodeConnectorConfig.DEFAULT_QUEUE_SIZE);
+        props.put(GeodeConnectorConfig.BATCH_SIZE, GeodeConnectorConfig.DEFAULT_BATCH_SIZE);
+
+        GeodeKafkaSourceTask task = new GeodeKafkaSourceTask();
+        task.start(props);
+
+//        assertThat(task.getQueueSize(GeodeConnectorConfig.QUEUE_SIZE));
+
+
+    }
+
+
+}
diff --git a/src/test/java/kafka/GeodeKafkaSourceTest.java b/src/test/java/geode/kafka/source/GeodeKafkaSourceTest.java
similarity index 93%
rename from src/test/java/kafka/GeodeKafkaSourceTest.java
rename to src/test/java/geode/kafka/source/GeodeKafkaSourceTest.java
index ec6dff8..717d495 100644
--- a/src/test/java/kafka/GeodeKafkaSourceTest.java
+++ b/src/test/java/geode/kafka/source/GeodeKafkaSourceTest.java
@@ -1,4 +1,4 @@
-package kafka;
+package geode.kafka.source;
 
 import org.junit.Test;
 
diff --git a/src/test/java/kafka/GeodeKafkaSourceTaskTest.java b/src/test/java/kafka/GeodeKafkaSourceTaskTest.java
deleted file mode 100644
index 2c15664..0000000
--- a/src/test/java/kafka/GeodeKafkaSourceTaskTest.java
+++ /dev/null
@@ -1,114 +0,0 @@
-package kafka;
-
-import org.junit.Test;
-
-import java.util.Arrays;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-
-import static kafka.GeodeConnectorConfig.REGION_NAME;
-import static org.hamcrest.CoreMatchers.allOf;
-import static org.hamcrest.CoreMatchers.is;
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertThat;
-
-public class GeodeKafkaSourceTaskTest {
-
-    @Test
-    public void cqListenerOnEventPopulatesEventsBuffer() {
-
-    }
-
-    @Test
-    public void pollReturnsEventsWhenEventBufferHasValues() {
-
-    }
-
-    @Test
-    public void regionsArePassedCorrectlyToTask() {
-
-    }
-
-    @Test
-    public void installOnGeodeShouldCallCq() {
-        GeodeKafkaSourceTask task = new GeodeKafkaSourceTask();
-    }
-
-
-
-    @Test
-    public void parseRegionNamesShouldSplitOnComma() {
-        GeodeKafkaSourceTask task = new GeodeKafkaSourceTask();
-        List<String> regionNames = task.parseNames("region1,region2,region3,region4");
-        assertEquals(4, regionNames.size());
-        assertThat(true, allOf(is(regionNames.contains("region1"))
-                , is(regionNames.contains("region2"))
-                , is(regionNames.contains("region3"))
-                , is(regionNames.contains("region4"))));
-    }
-
-    @Test
-    public void parseRegionNamesShouldChomp() {
-        GeodeKafkaSourceTask task = new GeodeKafkaSourceTask();
-        List<String> regionNames = task.parseNames("region1, region2, region3,region4");
-        assertEquals(4, regionNames.size());
-        assertThat(true, allOf(is(regionNames instanceof List)
-                , is(regionNames.contains("region1"))
-                , is(regionNames.contains("region2"))
-                , is(regionNames.contains("region3"))
-                , is(regionNames.contains("region4"))));
-    }
-
-    @Test
-    public void createSourcePartitionsShouldReturnAMapOfSourcePartitions() {
-        GeodeKafkaSourceTask task = new GeodeKafkaSourceTask();
-        List<String> regionNames = Arrays.asList(new String[]{"region1", "region2", "region3"});
-        Map<String, Map<String,String>> sourcePartitions = task.createSourcePartitionsMap(regionNames);
-        assertThat(3, is(sourcePartitions.size()));
-        assertThat(true, is(sourcePartitions.get("region1").get(REGION_NAME).equals("region1")));
-        assertThat(true, is(sourcePartitions.get("region2").get(REGION_NAME).equals("region2")));
-        assertThat(true, is(sourcePartitions.get("region3").get(REGION_NAME).equals("region3")));
-    }
-
-    @Test
-    public void shouldBeAbleToParseGeodeLocatorStrings() {
-        GeodeKafkaSourceTask task = new GeodeKafkaSourceTask();
-        String locatorString="localhost[8888], localhost[8881]";
-        List<LocatorHostPort> locators = task.parseLocators(locatorString);
-        assertThat(2, is(locators.size()));
-    }
-
-    @Test
-    public void listOfLocatorsShouldBeConfiguredIntoClientCache() {
-
-    }
-
-    @Test
-    public void shouldNotBeDurableIfDurableClientIdIsNull() {
-
-    }
-
-    @Test
-    public void shouldNotCallReadyForEventsIfDurableClientPrefixIsEmpty() {
-
-    }
-
-    //Source properties tests
-    @Test
-    public void propertiesShouldBeCorrectlyTranslatedToConfiguration() {
-        Map<String, String> props = new HashMap<>();
-        props.put(GeodeConnectorConfig.QUEUE_SIZE, GeodeConnectorConfig.DEFAULT_QUEUE_SIZE);
-        props.put(GeodeConnectorConfig.BATCH_SIZE, GeodeConnectorConfig.DEFAULT_BATCH_SIZE);
-
-
-        GeodeKafkaSourceTask task = new GeodeKafkaSourceTask();
-        task.start(props);
-
-//        assertThat(task.getQueueSize(GeodeConnectorConfig.QUEUE_SIZE));
-
-
-    }
-
-
-}