Seperated out Configuration into source and sink where appropriate
Added a few more tests
diff --git a/build.gradle b/build.gradle
index a131d22..114b3cd 100644
--- a/build.gradle
+++ b/build.gradle
@@ -19,7 +19,6 @@
     compile group: 'org.apache.logging.log4j', name: 'log4j-api', version: '2.13.0'
     compile group: 'org.apache.logging.log4j', name: 'log4j-core', version: '2.13.0'
 
-
     testCompile(group: 'org.apache.kafka', name: 'kafka_2.12', version: '2.3.1')
     testCompile(group: 'org.apache.kafka', name: 'kafka-streams-test-utils', version: '1.1.0')
     testCompile(group: 'org.apache.curator', name: 'curator-framework', version: '4.2.0')
@@ -31,3 +30,4 @@
 
     testImplementation 'org.awaitility:awaitility:4.0.2'
 }
+
diff --git a/settings.gradle b/settings.gradle
index b5a2326..48eed3b 100644
--- a/settings.gradle
+++ b/settings.gradle
@@ -1,2 +1 @@
-rootProject.name = 'geode-kafka-connector'
-
+rootProject.name = 'geode-kafka-connector'
\ No newline at end of file
diff --git a/src/main/java/geode/kafka/GeodeConnectorConfig.java b/src/main/java/geode/kafka/GeodeConnectorConfig.java
index dcc479e..396d428 100644
--- a/src/main/java/geode/kafka/GeodeConnectorConfig.java
+++ b/src/main/java/geode/kafka/GeodeConnectorConfig.java
@@ -8,79 +8,26 @@
 
 public class GeodeConnectorConfig {
 
-    //Geode Configuration
-    public static final String DURABLE_CLIENT_ID_PREFIX = "durableClientId";
-    public static final String DEFAULT_DURABLE_CLIENT_ID = "";
-    public static final String DURABLE_CLIENT_TIME_OUT = "durableClientTimeout";
-    public static final String DEFAULT_DURABLE_CLIENT_TIMEOUT = "60000";
-
     //GeodeKafka Specific Configuration
+    /**
+     * Identifier for each task
+     */
     public static final String TASK_ID = "GEODE_TASK_ID"; //One config per task
-
-    public static final String CQ_PREFIX = "cqPrefix";
-    public static final String DEFAULT_CQ_PREFIX = "cqForGeodeKafka";
     /**
      * Specifies which Locators to connect to Apache Geode
      */
     public static final String LOCATORS = "locators";
     public static final String DEFAULT_LOCATOR = "localhost[10334]";
 
+    protected final int taskId;
+    protected List<LocatorHostPort> locatorHostPorts;
 
-    /**
-     * Specifies which Topics to connect in Kafka, uses the variable name with Kafka Sink Configuration
-     * Only used in sink configuration
-     */
-    public static final String TOPICS = "topics";
-
-    //Used by sink
-    public static final String TOPIC_TO_REGION_BINDINGS = "topicToRegion";
-
-    //Used by source
-    public static final String REGION_TO_TOPIC_BINDINGS = "regionToTopic";
-
-    /**
-     * Property to describe the Source Partition in a record
-     */
-    public static final String REGION_NAME = "regionName";  //used for Source Partition Events
-
-    public static final String BATCH_SIZE = "geodeConnectorBatchSize";
-    public static final String DEFAULT_BATCH_SIZE = "100";
-
-    public static final String QUEUE_SIZE = "geodeConnectorQueueSize";
-    public static final String DEFAULT_QUEUE_SIZE = "100000";
-
-    public static final String LOAD_ENTIRE_REGION = "loadEntireRegion";
-    public static final String DEFAULT_LOAD_ENTIRE_REGION = "false";
-
-
-    private final int taskId;
-    private final String durableClientId;
-    private final String durableClientIdPrefix;
-    private final String durableClientTimeout;
-
-    private Map<String, List<String>> regionToTopics;
-    private Map<String, List<String>> topicToRegions;
-    private List<LocatorHostPort> locatorHostPorts;
-
-    //just for tests
-    GeodeConnectorConfig() {
+    protected GeodeConnectorConfig() {
         taskId = 0;
-        durableClientId = "";
-        durableClientIdPrefix = "";
-        durableClientTimeout = "0";
     }
 
     public GeodeConnectorConfig(Map<String, String> connectorProperties) {
         taskId = Integer.parseInt(connectorProperties.get(TASK_ID));
-        durableClientIdPrefix = connectorProperties.get(DURABLE_CLIENT_ID_PREFIX);
-        if (isDurable(durableClientIdPrefix)) {
-            durableClientId = durableClientIdPrefix + taskId;
-        } else {
-            durableClientId = "";
-        }
-        durableClientTimeout = connectorProperties.get(DURABLE_CLIENT_TIME_OUT);
-        regionToTopics = parseRegionToTopics(connectorProperties.get(GeodeConnectorConfig.REGION_TO_TOPIC_BINDINGS));
-        topicToRegions = parseTopicToRegions(connectorProperties.get(GeodeConnectorConfig.TOPIC_TO_REGION_BINDINGS));
         locatorHostPorts = parseLocators(connectorProperties.get(GeodeConnectorConfig.LOCATORS));
     }
 
@@ -105,7 +52,7 @@
         return bindings.stream().map(binding -> {
             String[] regionToTopicsArray = parseBinding(binding);
             return regionToTopicsArray;
-        }).collect(Collectors.toMap(regionToTopicsArray -> regionToTopicsArray[0], regionToTopicsArray -> parseNames(regionToTopicsArray[1])));
+        }).collect(Collectors.toMap(regionToTopicsArray -> regionToTopicsArray[0], regionToTopicsArray -> parseStringByComma(regionToTopicsArray[1])));
     }
 
     public static List<String> parseBindings(String bindings) {
@@ -122,8 +69,12 @@
     }
 
     //Used to parse a string of topics or regions
-    public static List<String> parseNames(String names) {
-        return Arrays.stream(names.split(",")).map((s) -> s.trim()).collect(Collectors.toList());
+    public static List<String> parseStringByComma(String string) {
+        return parseStringBy(string, ",");
+    }
+
+    public static List<String> parseStringBy(String string, String regex) {
+        return Arrays.stream(string.split(regex)).map((s) -> s.trim()).collect(Collectors.toList());
     }
 
     public static String reconstructString(Collection<String> strings) {
@@ -144,40 +95,11 @@
         return new LocatorHostPort(locator, port);
     }
 
-    public boolean isDurable() {
-        return isDurable(durableClientId);
-    }
-
-    /**
-     * @param durableClientId or prefix can be passed in.  Either both will be "" or both will have a value
-     * @return
-     */
-    boolean isDurable(String durableClientId) {
-        return !durableClientId.equals("");
-    }
-
     public int getTaskId() {
         return taskId;
     }
 
-    public String getDurableClientId() {
-        return durableClientId;
-    }
-
-    public String getDurableClientTimeout() {
-        return durableClientTimeout;
-    }
-
     public List<LocatorHostPort> getLocatorHostPorts() {
         return locatorHostPorts;
     }
-
-    public Map<String, List<String>> getRegionToTopics() {
-        return regionToTopics;
-    }
-
-    public Map<String, List<String>> getTopicToRegions() {
-        return topicToRegions;
-    }
-
 }
diff --git a/src/main/java/geode/kafka/GeodeContext.java b/src/main/java/geode/kafka/GeodeContext.java
index d1fd3ae..b7c3d27 100644
--- a/src/main/java/geode/kafka/GeodeContext.java
+++ b/src/main/java/geode/kafka/GeodeContext.java
@@ -2,6 +2,8 @@
 
 import org.apache.geode.cache.client.ClientCache;
 import org.apache.geode.cache.client.ClientCacheFactory;
+import org.apache.geode.cache.client.PoolFactory;
+import org.apache.geode.cache.client.PoolManager;
 import org.apache.geode.cache.query.CqAttributes;
 import org.apache.geode.cache.query.CqException;
 import org.apache.geode.cache.query.CqExistsException;
@@ -17,20 +19,42 @@
     private ClientCache clientCache;
 
 
-    public GeodeContext(GeodeConnectorConfig connectorConfig) {
-        clientCache = createClientCache(connectorConfig.getLocatorHostPorts(), connectorConfig.getDurableClientId(), connectorConfig.getDurableClientTimeout());
+    public GeodeContext() {
+    }
+
+    public ClientCache connectClient(List<LocatorHostPort> locatorHostPortList, String durableClientId, String durableClientTimeout) {
+        clientCache = createClientCache(locatorHostPortList, durableClientId, durableClientTimeout);
+        return clientCache;
+    }
+
+    public ClientCache connectClient(List<LocatorHostPort> locatorHostPortList) {
+        clientCache = createClientCache(locatorHostPortList, "", "");
+        return clientCache;
     }
 
     public ClientCache getClientCache() {
         return clientCache;
     }
 
+    /**
+     *
+     * @param locators
+     * @param durableClientName
+     * @param durableClientTimeOut
+     * @return
+     */
     public ClientCache createClientCache(List<LocatorHostPort> locators, String durableClientName, String durableClientTimeOut) {
-        ClientCacheFactory ccf = new ClientCacheFactory().set("durable-client-id", durableClientName)
-                .set("durable-client-timeout", durableClientTimeOut)
-                .setPoolSubscriptionEnabled(true);
+        ClientCacheFactory ccf = new ClientCacheFactory();
+        if (!durableClientName.equals("")) {
+            ccf.set("durable-client-id", durableClientName)
+                    .set("durable-client-timeout", durableClientTimeOut);
+        }
+        //currently we only allow using the default pool.
+        //If we ever want to allow adding multiple pools we'll have to configure pool factories
+        ccf.setPoolSubscriptionEnabled(true);
+
         for (LocatorHostPort locator: locators) {
-            ccf.addPoolLocator(locator.getHostName(), locator.getPort()).create();
+            ccf.addPoolLocator(locator.getHostName(), locator.getPort());
         }
         return ccf.create();
     }
diff --git a/src/main/java/geode/kafka/sink/BatchRecords.java b/src/main/java/geode/kafka/sink/BatchRecords.java
index 742dcbc..282ba1c 100644
--- a/src/main/java/geode/kafka/sink/BatchRecords.java
+++ b/src/main/java/geode/kafka/sink/BatchRecords.java
@@ -4,6 +4,7 @@
 import org.apache.kafka.connect.sink.SinkRecord;
 
 import java.util.ArrayList;
+import java.util.Collection;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
@@ -13,8 +14,18 @@
  */
 public class BatchRecords {
 
-    private Map updateMap = new HashMap();
-    private List removeList = new ArrayList();
+    private Map updateMap;
+    private Collection removeList;
+
+    public BatchRecords() {
+        this(new HashMap(), new ArrayList());
+    }
+
+    /** Used for tests**/
+    public BatchRecords(Map updateMap, Collection removeList) {
+        this.updateMap = updateMap;
+        this.removeList = removeList;
+    }
 
     public void addRemoveOperation(SinkRecord record) {
         //if a previous operation added to the update map
diff --git a/src/main/java/geode/kafka/sink/GeodeKafkaSink.java b/src/main/java/geode/kafka/sink/GeodeKafkaSink.java
index 99f9b9d..43f8eab 100644
--- a/src/main/java/geode/kafka/sink/GeodeKafkaSink.java
+++ b/src/main/java/geode/kafka/sink/GeodeKafkaSink.java
@@ -11,19 +11,11 @@
 import java.util.List;
 import java.util.Map;
 
-import static geode.kafka.GeodeConnectorConfig.BATCH_SIZE;
-import static geode.kafka.GeodeConnectorConfig.CQ_PREFIX;
-import static geode.kafka.GeodeConnectorConfig.DEFAULT_BATCH_SIZE;
-import static geode.kafka.GeodeConnectorConfig.DEFAULT_CQ_PREFIX;
-import static geode.kafka.GeodeConnectorConfig.DEFAULT_DURABLE_CLIENT_ID;
-import static geode.kafka.GeodeConnectorConfig.DEFAULT_DURABLE_CLIENT_TIMEOUT;
 import static geode.kafka.GeodeConnectorConfig.DEFAULT_LOCATOR;
-import static geode.kafka.GeodeConnectorConfig.DEFAULT_QUEUE_SIZE;
-import static geode.kafka.GeodeConnectorConfig.DURABLE_CLIENT_ID_PREFIX;
-import static geode.kafka.GeodeConnectorConfig.DURABLE_CLIENT_TIME_OUT;
 import static geode.kafka.GeodeConnectorConfig.LOCATORS;
-import static geode.kafka.GeodeConnectorConfig.QUEUE_SIZE;
-import static geode.kafka.GeodeConnectorConfig.TOPIC_TO_REGION_BINDINGS;
+import static geode.kafka.GeodeSinkConnectorConfig.DEFAULT_NULL_VALUES_MEAN_REMOVE;
+import static geode.kafka.GeodeSinkConnectorConfig.NULL_VALUES_MEAN_REMOVE;
+import static geode.kafka.GeodeSinkConnectorConfig.TOPIC_TO_REGION_BINDINGS;
 
 public class GeodeKafkaSink extends SinkConnector  {
     private static final ConfigDef CONFIG_DEF = new ConfigDef();
@@ -44,7 +36,7 @@
         List<Map<String, String>> taskConfigs = new ArrayList<>();
         Map<String, String> taskProps = new HashMap<>();
         taskProps.putAll(sharedProps);
-        List<String> bindings = GeodeConnectorConfig.parseNames(taskProps.get(TOPIC_TO_REGION_BINDINGS));
+        List<String> bindings = GeodeConnectorConfig.parseStringByComma(taskProps.get(TOPIC_TO_REGION_BINDINGS));
         List<List<String>> bindingsPerTask = ConnectorUtils.groupPartitions(bindings, maxTasks);
 
         for (int i = 0; i < maxTasks; i++) {
@@ -75,11 +67,7 @@
 
     private Map<String, String> computeMissingConfigurations(Map<String, String> props) {
         props.computeIfAbsent(LOCATORS, (key)-> DEFAULT_LOCATOR);
-        props.computeIfAbsent(DURABLE_CLIENT_TIME_OUT, (key) -> DEFAULT_DURABLE_CLIENT_TIMEOUT);
-        props.computeIfAbsent(DURABLE_CLIENT_ID_PREFIX, (key) -> DEFAULT_DURABLE_CLIENT_ID);
-        props.computeIfAbsent(BATCH_SIZE, (key) -> DEFAULT_BATCH_SIZE);
-        props.computeIfAbsent(QUEUE_SIZE, (key) -> DEFAULT_QUEUE_SIZE);
-        props.computeIfAbsent(CQ_PREFIX, (key) -> DEFAULT_CQ_PREFIX);
+        props.computeIfAbsent(NULL_VALUES_MEAN_REMOVE, (key) -> DEFAULT_NULL_VALUES_MEAN_REMOVE);
         return props;
     }
 }
diff --git a/src/main/java/geode/kafka/sink/GeodeKafkaSinkTask.java b/src/main/java/geode/kafka/sink/GeodeKafkaSinkTask.java
index 203192c..7f21134 100644
--- a/src/main/java/geode/kafka/sink/GeodeKafkaSinkTask.java
+++ b/src/main/java/geode/kafka/sink/GeodeKafkaSinkTask.java
@@ -1,7 +1,7 @@
 package geode.kafka.sink;
 
-import geode.kafka.GeodeConnectorConfig;
 import geode.kafka.GeodeContext;
+import geode.kafka.GeodeSinkConnectorConfig;
 import org.apache.geode.cache.Region;
 import org.apache.geode.cache.client.ClientRegionShortcut;
 import org.apache.kafka.connect.sink.SinkRecord;
@@ -9,13 +9,13 @@
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import java.util.ArrayList;
 import java.util.Collection;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.stream.Collectors;
 
+
 /**
  * TODO javaDoc
  * Currently force 1 region per task
@@ -24,10 +24,10 @@
 
     private static final Logger logger = LoggerFactory.getLogger(GeodeKafkaSinkTask.class);
 
-    GeodeContext geodeContext;
-    Map<String, List<String>> topicToRegions;
-    Map<String, Region> regionNameToRegion;
-    boolean nullValuesMeansRemove = true;
+    private GeodeContext geodeContext;
+    private Map<String, List<String>> topicToRegions;
+    private Map<String, Region> regionNameToRegion;
+    private boolean nullValuesMeansRemove = true;
 
     /**
      * {@inheritDoc}
@@ -41,11 +41,13 @@
     @Override
     public void start(Map<String, String> props) {
         try {
-            GeodeConnectorConfig geodeConnectorConfig = new GeodeConnectorConfig(props);
+            GeodeSinkConnectorConfig geodeConnectorConfig = new GeodeSinkConnectorConfig(props);
             logger.debug("GeodeKafkaSourceTask id:" + geodeConnectorConfig.getTaskId() + " starting");
-            geodeContext = new GeodeContext(geodeConnectorConfig);
+            geodeContext = new GeodeContext();
+            geodeContext.connectClient(geodeConnectorConfig.getLocatorHostPorts());
             topicToRegions = geodeConnectorConfig.getTopicToRegions();
             regionNameToRegion = createProxyRegions(topicToRegions.values());
+            nullValuesMeansRemove = geodeConnectorConfig.getNullValuesMeanRemove();
         } catch (Exception e) {
             e.printStackTrace();
             logger.error("Unable to start sink task", e);
diff --git a/src/main/java/geode/kafka/sink/GeodeSinkConnectorConfig.java b/src/main/java/geode/kafka/sink/GeodeSinkConnectorConfig.java
new file mode 100644
index 0000000..7c6aa3e
--- /dev/null
+++ b/src/main/java/geode/kafka/sink/GeodeSinkConnectorConfig.java
@@ -0,0 +1,35 @@
+package geode.kafka;
+
+import java.util.List;
+import java.util.Map;
+
+public class GeodeSinkConnectorConfig extends GeodeConnectorConfig {
+    //Used by sink
+    public static final String TOPIC_TO_REGION_BINDINGS = "topicToRegion";
+    public static final String NULL_VALUES_MEAN_REMOVE = "nullValuesMeanRemove";
+    public static final String DEFAULT_NULL_VALUES_MEAN_REMOVE = "true";
+
+    private Map<String, List<String>> topicToRegions;
+    private final boolean nullValuesMeanRemove;
+
+    //just for tests
+    GeodeSinkConnectorConfig() {
+        super();
+        nullValuesMeanRemove = Boolean.parseBoolean(DEFAULT_NULL_VALUES_MEAN_REMOVE);
+    }
+
+    public GeodeSinkConnectorConfig(Map<String, String> connectorProperties) {
+        super(connectorProperties);
+       topicToRegions = parseTopicToRegions(connectorProperties.get(TOPIC_TO_REGION_BINDINGS));
+       nullValuesMeanRemove = Boolean.parseBoolean(connectorProperties.get(NULL_VALUES_MEAN_REMOVE));
+    }
+
+    public Map<String, List<String>> getTopicToRegions() {
+        return topicToRegions;
+    }
+
+    public boolean getNullValuesMeanRemove() {
+        return nullValuesMeanRemove;
+    }
+
+}
diff --git a/src/main/java/geode/kafka/source/GeodeKafkaSource.java b/src/main/java/geode/kafka/source/GeodeKafkaSource.java
index 91e6203..686359e 100644
--- a/src/main/java/geode/kafka/source/GeodeKafkaSource.java
+++ b/src/main/java/geode/kafka/source/GeodeKafkaSource.java
@@ -12,21 +12,21 @@
 import java.util.List;
 import java.util.Map;
 
-import static geode.kafka.GeodeConnectorConfig.BATCH_SIZE;
-import static geode.kafka.GeodeConnectorConfig.CQ_PREFIX;
-import static geode.kafka.GeodeConnectorConfig.DEFAULT_BATCH_SIZE;
-import static geode.kafka.GeodeConnectorConfig.DEFAULT_CQ_PREFIX;
-import static geode.kafka.GeodeConnectorConfig.DEFAULT_DURABLE_CLIENT_ID;
-import static geode.kafka.GeodeConnectorConfig.DEFAULT_DURABLE_CLIENT_TIMEOUT;
-import static geode.kafka.GeodeConnectorConfig.DEFAULT_LOAD_ENTIRE_REGION;
 import static geode.kafka.GeodeConnectorConfig.DEFAULT_LOCATOR;
-import static geode.kafka.GeodeConnectorConfig.DEFAULT_QUEUE_SIZE;
-import static geode.kafka.GeodeConnectorConfig.DURABLE_CLIENT_ID_PREFIX;
-import static geode.kafka.GeodeConnectorConfig.DURABLE_CLIENT_TIME_OUT;
-import static geode.kafka.GeodeConnectorConfig.LOAD_ENTIRE_REGION;
 import static geode.kafka.GeodeConnectorConfig.LOCATORS;
-import static geode.kafka.GeodeConnectorConfig.QUEUE_SIZE;
-import static geode.kafka.GeodeConnectorConfig.REGION_TO_TOPIC_BINDINGS;
+import static geode.kafka.source.GeodeSourceConnectorConfig.BATCH_SIZE;
+import static geode.kafka.source.GeodeSourceConnectorConfig.CQ_PREFIX;
+import static geode.kafka.source.GeodeSourceConnectorConfig.DEFAULT_BATCH_SIZE;
+import static geode.kafka.source.GeodeSourceConnectorConfig.DEFAULT_CQ_PREFIX;
+import static geode.kafka.source.GeodeSourceConnectorConfig.DEFAULT_DURABLE_CLIENT_ID;
+import static geode.kafka.source.GeodeSourceConnectorConfig.DEFAULT_DURABLE_CLIENT_TIMEOUT;
+import static geode.kafka.source.GeodeSourceConnectorConfig.DEFAULT_LOAD_ENTIRE_REGION;
+import static geode.kafka.source.GeodeSourceConnectorConfig.DEFAULT_QUEUE_SIZE;
+import static geode.kafka.source.GeodeSourceConnectorConfig.DURABLE_CLIENT_ID_PREFIX;
+import static geode.kafka.source.GeodeSourceConnectorConfig.DURABLE_CLIENT_TIME_OUT;
+import static geode.kafka.source.GeodeSourceConnectorConfig.LOAD_ENTIRE_REGION;
+import static geode.kafka.source.GeodeSourceConnectorConfig.QUEUE_SIZE;
+import static geode.kafka.source.GeodeSourceConnectorConfig.REGION_TO_TOPIC_BINDINGS;
 
 
 public class GeodeKafkaSource extends SourceConnector {
@@ -47,7 +47,7 @@
     Map<String, String> taskProps = new HashMap<>();
     taskProps.putAll(sharedProps);
 
-    List<String> bindings = GeodeConnectorConfig.parseNames(taskProps.get(REGION_TO_TOPIC_BINDINGS));
+    List<String> bindings = GeodeConnectorConfig.parseStringByComma(taskProps.get(REGION_TO_TOPIC_BINDINGS));
     List<List<String>> bindingsPerTask = ConnectorUtils.groupPartitions(bindings, maxTasks);
 
     for (int i = 0; i < maxTasks; i++) {
diff --git a/src/main/java/geode/kafka/source/GeodeKafkaSourceTask.java b/src/main/java/geode/kafka/source/GeodeKafkaSourceTask.java
index dadc8ba..c983a51 100644
--- a/src/main/java/geode/kafka/source/GeodeKafkaSourceTask.java
+++ b/src/main/java/geode/kafka/source/GeodeKafkaSourceTask.java
@@ -19,12 +19,9 @@
 import java.util.concurrent.LinkedBlockingQueue;
 import java.util.stream.Collectors;
 
-import static geode.kafka.GeodeConnectorConfig.BATCH_SIZE;
-import static geode.kafka.GeodeConnectorConfig.CQ_PREFIX;
-import static geode.kafka.GeodeConnectorConfig.DEFAULT_BATCH_SIZE;
-import static geode.kafka.GeodeConnectorConfig.LOAD_ENTIRE_REGION;
-import static geode.kafka.GeodeConnectorConfig.QUEUE_SIZE;
-import static geode.kafka.GeodeConnectorConfig.REGION_NAME;
+import static geode.kafka.source.GeodeSourceConnectorConfig.BATCH_SIZE;
+import static geode.kafka.source.GeodeSourceConnectorConfig.QUEUE_SIZE;
+import static org.apache.geode.pdx.internal.PeerTypeRegistration.REGION_NAME;
 
 public class GeodeKafkaSourceTask extends SourceTask {
 
@@ -37,7 +34,7 @@
     private static final Map<String, Long> OFFSET_DEFAULT = createOffset();
 
     private GeodeContext geodeContext;
-    private GeodeConnectorConfig geodeConnectorConfig;
+    private GeodeSourceConnectorConfig geodeConnectorConfig;
     private Map<String, List<String>> regionToTopics;
     private Map<String, Map<String, String>> sourcePartitions;
     private BlockingQueue<GeodeEvent> eventBuffer;
@@ -58,9 +55,11 @@
     @Override
     public void start(Map<String, String> props) {
         try {
-            geodeConnectorConfig = new GeodeConnectorConfig(props);
+            System.out.println("JASON start task");
+            geodeConnectorConfig = new GeodeSourceConnectorConfig(props);
             logger.debug("GeodeKafkaSourceTask id:" + geodeConnectorConfig.getTaskId() + " starting");
-            geodeContext = new GeodeContext(geodeConnectorConfig);
+            geodeContext = new GeodeContext();
+            geodeContext.connectClient(geodeConnectorConfig.getLocatorHostPorts(), geodeConnectorConfig.getDurableClientId(), geodeConnectorConfig.getDurableClientTimeout());
 
             batchSize = Integer.parseInt(props.get(BATCH_SIZE));
             int queueSize = Integer.parseInt(props.get(QUEUE_SIZE));
@@ -69,14 +68,18 @@
             regionToTopics = geodeConnectorConfig.getRegionToTopics();
             sourcePartitions = createSourcePartitionsMap(regionToTopics.keySet());
 
-            String cqPrefix = props.get(CQ_PREFIX);
-            boolean loadEntireRegion = Boolean.parseBoolean(props.get(LOAD_ENTIRE_REGION));
+            String cqPrefix = geodeConnectorConfig.getCqPrefix();
 
+            boolean loadEntireRegion = geodeConnectorConfig.getLoadEntireRegion();
             installOnGeode(geodeConnectorConfig, geodeContext, eventBuffer, cqPrefix, loadEntireRegion);
         } catch (Exception e) {
+            System.out.println("JASON start task failed" + e);
+            e.printStackTrace();
             logger.error("Unable to start source task", e);
             throw e;
         }
+        System.out.println("JASON end task");
+
     }
 
     @Override
@@ -102,7 +105,7 @@
         geodeContext.getClientCache().close(true);
     }
 
-    void installOnGeode(GeodeConnectorConfig geodeConnectorConfig, GeodeContext geodeContext, BlockingQueue eventBuffer, String cqPrefix, boolean loadEntireRegion) {
+    void installOnGeode(GeodeSourceConnectorConfig geodeConnectorConfig, GeodeContext geodeContext, BlockingQueue eventBuffer, String cqPrefix, boolean loadEntireRegion) {
         boolean isDurable = geodeConnectorConfig.isDurable();
         int taskId = geodeConnectorConfig.getTaskId();
         for (String region : geodeConnectorConfig.getRegionToTopics().keySet()) {
diff --git a/src/main/java/geode/kafka/source/GeodeSourceConnectorConfig.java b/src/main/java/geode/kafka/source/GeodeSourceConnectorConfig.java
new file mode 100644
index 0000000..c29048f
--- /dev/null
+++ b/src/main/java/geode/kafka/source/GeodeSourceConnectorConfig.java
@@ -0,0 +1,104 @@
+package geode.kafka.source;
+
+import geode.kafka.GeodeConnectorConfig;
+import geode.kafka.LocatorHostPort;
+
+import java.util.List;
+import java.util.Map;
+
+public class GeodeSourceConnectorConfig extends GeodeConnectorConfig {
+
+    //Geode Configuration
+    public static final String DURABLE_CLIENT_ID_PREFIX = "durableClientId";
+    public static final String DEFAULT_DURABLE_CLIENT_ID = "";
+    public static final String DURABLE_CLIENT_TIME_OUT = "durableClientTimeout";
+    public static final String DEFAULT_DURABLE_CLIENT_TIMEOUT = "60000";
+
+    public static final String CQ_PREFIX = "cqPrefix";
+    public static final String DEFAULT_CQ_PREFIX = "cqForGeodeKafka";
+
+    /**
+     * Used as a key for source partitions
+     */
+    public static final String REGION = "region";
+
+    public static final String REGION_TO_TOPIC_BINDINGS = "regionToTopic";
+
+    public static final String BATCH_SIZE = "geodeConnectorBatchSize";
+    public static final String DEFAULT_BATCH_SIZE = "100";
+
+    public static final String QUEUE_SIZE = "geodeConnectorQueueSize";
+    public static final String DEFAULT_QUEUE_SIZE = "100000";
+
+    public static final String LOAD_ENTIRE_REGION = "loadEntireRegion";
+    public static final String DEFAULT_LOAD_ENTIRE_REGION = "false";
+
+    private final String durableClientId;
+    private final String durableClientIdPrefix;
+    private final String durableClientTimeout;
+    private final String cqPrefix;
+    private final boolean loadEntireRegion;
+
+    private Map<String, List<String>> regionToTopics;
+
+    //just for tests
+    protected GeodeSourceConnectorConfig() {
+        super();
+        durableClientId = "";
+        durableClientIdPrefix = "";
+        durableClientTimeout = "0";
+        cqPrefix = DEFAULT_CQ_PREFIX;
+        loadEntireRegion = Boolean.parseBoolean(DEFAULT_LOAD_ENTIRE_REGION);
+    }
+
+    public GeodeSourceConnectorConfig(Map<String, String> connectorProperties) {
+        super(connectorProperties);
+        regionToTopics = parseRegionToTopics(connectorProperties.get(REGION_TO_TOPIC_BINDINGS));
+        durableClientIdPrefix = connectorProperties.get(DURABLE_CLIENT_ID_PREFIX);
+        if (isDurable(durableClientIdPrefix)) {
+            durableClientId = durableClientIdPrefix + taskId;
+        } else {
+            durableClientId = "";
+        }
+        durableClientTimeout = connectorProperties.get(DURABLE_CLIENT_TIME_OUT);
+        cqPrefix = connectorProperties.get(CQ_PREFIX);
+        loadEntireRegion = Boolean.parseBoolean(connectorProperties.get(LOAD_ENTIRE_REGION));
+    }
+
+    public boolean isDurable() {
+        return isDurable(durableClientId);
+    }
+
+    /**
+     * @param durableClientId or prefix can be passed in.  Either both will be "" or both will have a value
+     * @return
+     */
+    boolean isDurable(String durableClientId) {
+        return !durableClientId.equals("");
+    }
+
+    public int getTaskId() {
+        return taskId;
+    }
+
+    public String getDurableClientId() {
+        return durableClientId;
+    }
+
+    public String getDurableClientTimeout() {
+        return durableClientTimeout;
+    }
+
+    public String getCqPrefix() {
+        return cqPrefix;
+    }
+
+    public boolean getLoadEntireRegion() {
+        return loadEntireRegion;
+    }
+
+    public Map<String, List<String>> getRegionToTopics() {
+        return regionToTopics;
+    }
+
+}
diff --git a/src/test/java/geode/kafka/GeodeConnectorConfigTest.java b/src/test/java/geode/kafka/GeodeConnectorConfigTest.java
index 6a39c5d..904b981 100644
--- a/src/test/java/geode/kafka/GeodeConnectorConfigTest.java
+++ b/src/test/java/geode/kafka/GeodeConnectorConfigTest.java
@@ -10,8 +10,6 @@
 import java.util.List;
 import java.util.Map;
 
-import static geode.kafka.GeodeConnectorConfig.DURABLE_CLIENT_ID_PREFIX;
-import static geode.kafka.GeodeConnectorConfig.DURABLE_CLIENT_TIME_OUT;
 import static geode.kafka.GeodeConnectorConfig.LOCATORS;
 import static geode.kafka.GeodeConnectorConfig.TASK_ID;
 import static org.hamcrest.CoreMatchers.allOf;
@@ -26,7 +24,7 @@
     @Test
     public void parseRegionNamesShouldSplitOnComma() {
         GeodeConnectorConfig config = new GeodeConnectorConfig();
-        List<String> regionNames = config.parseNames("region1,region2,region3,region4");
+        List<String> regionNames = config.parseStringByComma("region1,region2,region3,region4");
         assertEquals(4, regionNames.size());
         assertThat(true, allOf(is(regionNames.contains("region1"))
                 , is(regionNames.contains("region2"))
@@ -37,7 +35,7 @@
     @Test
     public void parseRegionNamesShouldChomp() {
         GeodeConnectorConfig config = new GeodeConnectorConfig();
-        List<String> regionNames = config.parseNames("region1, region2, region3,region4");
+        List<String> regionNames = config.parseStringByComma("region1, region2, region3,region4");
         assertEquals(4, regionNames.size());
         assertThat(true, allOf(is(regionNames instanceof List)
                 , is(regionNames.contains("region1"))
@@ -135,14 +133,5 @@
      */
 
 
-    @Test
-    public void durableClientIdShouldNotBeSetIfPrefixIsEmpty() {
-        Map<String, String> props = new HashMap<>();
-        props.put(TASK_ID, "0");
-        props.put(DURABLE_CLIENT_ID_PREFIX, "");
-        props.put(LOCATORS, "localhost[10334]");
-        GeodeConnectorConfig config = new GeodeConnectorConfig(props);
-        assertEquals("", config.getDurableClientId());
-    }
 
 }
diff --git a/src/test/java/geode/kafka/WorkerAndHerderWrapper.java b/src/test/java/geode/kafka/WorkerAndHerderWrapper.java
index 5e8f074..1b46fe0 100644
--- a/src/test/java/geode/kafka/WorkerAndHerderWrapper.java
+++ b/src/test/java/geode/kafka/WorkerAndHerderWrapper.java
@@ -18,12 +18,11 @@
 import java.util.HashMap;
 import java.util.Map;
 
-import static geode.kafka.GeodeConnectorConfig.REGION_TO_TOPIC_BINDINGS;
-import static geode.kafka.GeodeConnectorConfig.TOPICS;
-import static geode.kafka.GeodeConnectorConfig.TOPIC_TO_REGION_BINDINGS;
 import static geode.kafka.GeodeKafkaTestCluster.TEST_REGION_TO_TOPIC_BINDINGS;
 import static geode.kafka.GeodeKafkaTestCluster.TEST_TOPIC_FOR_SINK;
 import static geode.kafka.GeodeKafkaTestCluster.TEST_TOPIC_TO_REGION_BINDINGS;
+import static geode.kafka.GeodeSinkConnectorConfig.TOPIC_TO_REGION_BINDINGS;
+import static geode.kafka.source.GeodeSourceConnectorConfig.REGION_TO_TOPIC_BINDINGS;
 
 public class WorkerAndHerderWrapper {
 
@@ -34,9 +33,6 @@
         // fast flushing for testing.
         props.put(WorkerConfig.OFFSET_COMMIT_INTERVAL_MS_CONFIG, "10");
 
-
-        props.put(WorkerConfig.INTERNAL_KEY_CONVERTER_CLASS_CONFIG, "org.apache.kafka.connect.storage.StringConverter");
-        props.put(WorkerConfig.INTERNAL_VALUE_CONVERTER_CLASS_CONFIG, "org.apache.kafka.connect.storage.StringConverter");
         props.put("internal.key.converter.schemas.enable", "false");
         props.put("internal.value.converter.schemas.enable", "false");
         props.put(WorkerConfig.KEY_CONVERTER_CLASS_CONFIG, "org.apache.kafka.connect.storage.StringConverter");
@@ -71,7 +67,7 @@
         sinkProps.put(ConnectorConfig.NAME_CONFIG, "geode-kafka-sink-connector");
         sinkProps.put(ConnectorConfig.TASKS_MAX_CONFIG, "1");
         sinkProps.put(TOPIC_TO_REGION_BINDINGS, TEST_TOPIC_TO_REGION_BINDINGS);
-        sinkProps.put(TOPICS, TEST_TOPIC_FOR_SINK);
+        sinkProps.put("topics", TEST_TOPIC_FOR_SINK);
 
         herder.putConnectorConfig(
                 sinkProps.get(ConnectorConfig.NAME_CONFIG),
diff --git a/src/test/java/geode/kafka/sink/BatchRecordsTest.java b/src/test/java/geode/kafka/sink/BatchRecordsTest.java
index 4907d74..593fea0 100644
--- a/src/test/java/geode/kafka/sink/BatchRecordsTest.java
+++ b/src/test/java/geode/kafka/sink/BatchRecordsTest.java
@@ -1,37 +1,82 @@
 package geode.kafka.sink;
 
+import org.apache.geode.cache.Region;
+import org.apache.kafka.connect.sink.SinkRecord;
 import org.junit.Test;
 
+import java.util.Collection;
+import java.util.Map;
+
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
 public class BatchRecordsTest {
     @Test
     public void updatingARecordShouldRemoveFromTheRemoveListIfNullValuesIsRemoveBooleanIsSet() {
-
+        Map updates = mock(Map.class);
+        Collection removes = mock(Collection.class);
+        when(removes.contains(any())).thenReturn(true);
+        BatchRecords records = new BatchRecords(updates, removes);
+        SinkRecord sinkRecord = mock(SinkRecord.class);
+        records.addUpdateOperation(sinkRecord, true);
+        verify(removes, times(1)).remove(any());
     }
 
     @Test
     public void updatingARecordShouldAddToTheUpdateMap() {
-
+        Map updates = mock(Map.class);
+        Collection removes = mock(Collection.class);
+        when(removes.contains(any())).thenReturn(false);
+        BatchRecords records = new BatchRecords(updates, removes);
+        SinkRecord sinkRecord = mock(SinkRecord.class);
+        records.addUpdateOperation(sinkRecord, true);
+        verify(updates, times(1)).put(any(), any());
     }
 
     @Test
     public void updatingARecordShouldNotRemoveFromTheRemoveListIfNullValuesIsNotSet() {
-
+        boolean nullValuesMeanRemove = false;
+        Map updates = mock(Map.class);
+        Collection removes = mock(Collection.class);
+        when(removes.contains(any())).thenReturn(true);
+        BatchRecords records = new BatchRecords(updates, removes);
+        SinkRecord sinkRecord = mock(SinkRecord.class);
+        records.addUpdateOperation(sinkRecord, nullValuesMeanRemove);
+        verify(removes, times(0)).remove(any());
     }
 
 
     @Test
-    public void removingARecordShouldRemoveFromTheUpdateMap() {
-
+    public void removingARecordShouldRemoveFromTheUpdateMapIfKeyIsPresent() {
+        Map updates = mock(Map.class);
+        Collection removes = mock(Collection.class);
+        when(updates.containsKey(any())).thenReturn(true);
+        BatchRecords records = new BatchRecords(updates, removes);
+        SinkRecord sinkRecord = mock(SinkRecord.class);
+        records.addRemoveOperation(sinkRecord);
+        verify(updates, times(1)).remove(any());
     }
 
     @Test
     public void removingARecordAddToTheRemoveCollection() {
-
+        Map updates = mock(Map.class);
+        Collection removes = mock(Collection.class);
+        BatchRecords records = new BatchRecords(updates, removes);
+        SinkRecord sinkRecord = mock(SinkRecord.class);
+        records.addRemoveOperation(sinkRecord);
+        verify(removes, times(1)).add(any());
     }
 
     @Test
     public void executeOperationsShouldInvokePutAllAndRemoveAll() {
-
+        Region region = mock(Region.class);
+        BatchRecords records = new BatchRecords();
+        records.executeOperations(region);
+        verify(region, times(1)).putAll(any());
+        verify(region, times(1)).removeAll(any());
     }
 
 
diff --git a/src/test/java/geode/kafka/sink/GeodeKafkaSinkTaskTest.java b/src/test/java/geode/kafka/sink/GeodeKafkaSinkTaskTest.java
new file mode 100644
index 0000000..37131ba
--- /dev/null
+++ b/src/test/java/geode/kafka/sink/GeodeKafkaSinkTaskTest.java
@@ -0,0 +1,4 @@
+package geode.kafka.sink;
+
+public class GeodeKafkaSinkTaskTest {
+}
diff --git a/src/test/java/geode/kafka/sink/GeodeKafkaSinkTest.java b/src/test/java/geode/kafka/sink/GeodeKafkaSinkTest.java
new file mode 100644
index 0000000..2720e90
--- /dev/null
+++ b/src/test/java/geode/kafka/sink/GeodeKafkaSinkTest.java
@@ -0,0 +1,16 @@
+package geode.kafka.sink;
+
+import org.junit.Test;
+
+import java.util.HashMap;
+import java.util.Map;
+
+public class GeodeKafkaSinkTest {
+
+    @Test
+    public void test() {
+        GeodeKafkaSink sink = new GeodeKafkaSink();
+        Map<String, String> props = new HashMap();
+        sink.start(props);
+    }
+}
diff --git a/src/test/java/geode/kafka/source/GeodeKafkaSourceTaskTest.java b/src/test/java/geode/kafka/source/GeodeKafkaSourceTaskTest.java
index d4149db..b793d30 100644
--- a/src/test/java/geode/kafka/source/GeodeKafkaSourceTaskTest.java
+++ b/src/test/java/geode/kafka/source/GeodeKafkaSourceTaskTest.java
@@ -1,6 +1,5 @@
 package geode.kafka.source;
 
-import geode.kafka.GeodeConnectorConfig;
 import geode.kafka.GeodeContext;
 import org.apache.geode.cache.client.ClientCache;
 import org.apache.geode.cache.query.CqEvent;
@@ -15,8 +14,8 @@
 import java.util.concurrent.BlockingQueue;
 import java.util.concurrent.LinkedBlockingQueue;
 
-import static geode.kafka.GeodeConnectorConfig.DEFAULT_CQ_PREFIX;
-import static geode.kafka.GeodeConnectorConfig.REGION_NAME;
+import static geode.kafka.source.GeodeSourceConnectorConfig.DEFAULT_CQ_PREFIX;
+import static org.apache.geode.pdx.internal.PeerTypeRegistration.REGION_NAME;
 import static org.hamcrest.CoreMatchers.is;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertThat;
@@ -88,7 +87,7 @@
         GeodeContext geodeContext = mock(GeodeContext.class);
         when(geodeContext.getClientCache()).thenReturn(clientCache);
 
-        GeodeConnectorConfig config = mock(GeodeConnectorConfig.class);
+        GeodeSourceConnectorConfig config = mock(GeodeSourceConnectorConfig.class);
         when (config.isDurable()).thenReturn(true);
         GeodeKafkaSourceTask task = new GeodeKafkaSourceTask();
         task.installOnGeode(config, geodeContext, null, "", false);
@@ -105,7 +104,7 @@
         Map<String, List<String>> regionToTopicsMap = new HashMap<>();
         regionToTopicsMap.put("region1", new ArrayList());
 
-        GeodeConnectorConfig config = mock(GeodeConnectorConfig.class);
+        GeodeSourceConnectorConfig config = mock(GeodeSourceConnectorConfig.class);
         when (config.getRegionToTopics()).thenReturn(regionToTopicsMap);
 
         GeodeKafkaSourceTask task = new GeodeKafkaSourceTask();
@@ -123,7 +122,7 @@
         Map<String, List<String>> regionToTopicsMap = new HashMap<>();
         regionToTopicsMap.put("region1", new ArrayList());
 
-        GeodeConnectorConfig config = mock(GeodeConnectorConfig.class);
+        GeodeSourceConnectorConfig config = mock(GeodeSourceConnectorConfig.class);
         when (config.getRegionToTopics()).thenReturn(regionToTopicsMap);
 
         GeodeKafkaSourceTask task = new GeodeKafkaSourceTask();
@@ -138,7 +137,7 @@
         GeodeContext geodeContext = mock(GeodeContext.class);
         when(geodeContext.getClientCache()).thenReturn(clientCache);
 
-        GeodeConnectorConfig config = mock(GeodeConnectorConfig.class);
+        GeodeSourceConnectorConfig config = mock(GeodeSourceConnectorConfig.class);
         when (config.isDurable()).thenReturn(false);
         GeodeKafkaSourceTask task = new GeodeKafkaSourceTask();
         task.installOnGeode(config, geodeContext, null, "", false);
diff --git a/src/test/java/geode/kafka/source/GeodeSourceConnectorConfigTest.java b/src/test/java/geode/kafka/source/GeodeSourceConnectorConfigTest.java
new file mode 100644
index 0000000..629c07d
--- /dev/null
+++ b/src/test/java/geode/kafka/source/GeodeSourceConnectorConfigTest.java
@@ -0,0 +1,26 @@
+package geode.kafka.source;
+
+import geode.kafka.GeodeConnectorConfig;
+import org.junit.Test;
+
+import java.util.HashMap;
+import java.util.Map;
+
+import static geode.kafka.GeodeConnectorConfig.LOCATORS;
+import static geode.kafka.GeodeConnectorConfig.TASK_ID;
+import static geode.kafka.source.GeodeSourceConnectorConfig.DURABLE_CLIENT_ID_PREFIX;
+import static org.junit.Assert.assertEquals;
+
+public class GeodeSourceConnectorConfigTest {
+
+    @Test
+    public void durableClientIdShouldNotBeSetIfPrefixIsEmpty() {
+        Map<String, String> props = new HashMap<>();
+        props.put(TASK_ID, "0");
+        props.put(DURABLE_CLIENT_ID_PREFIX, "");
+        props.put(LOCATORS, "localhost[10334]");
+        GeodeSourceConnectorConfig config = new GeodeSourceConnectorConfig(props);
+        assertEquals("", config.getDurableClientId());
+    }
+
+}