All configurable parameters are now using '-' as delimiter instead of camelCase
Added documentation to the configDef definitions
Updated readme with latest configuration changes
diff --git a/README.md b/README.md
index 39dd423..73953a1 100644
--- a/README.md
+++ b/README.md
@@ -26,7 +26,7 @@
 name=geode-kafka-sink
 connector.class=GeodeKafkaSink
 tasks.max=1
-topicToRegions=[someTopicToSinkFrom:someRegionToConsume]
+topic-to-regions=[someTopicToSinkFrom:someRegionToConsume]
 topics=someTopicToSinkFrom
 locators=localHost[10334]
 ```
@@ -35,7 +35,7 @@
 name=geode-kafka-source
 connector.class=GeodeKafkaSource
 tasks.max=1
-regionToTopics=[someRegionToSourceFrom:someTopicToConsume]
+region-to-topics=[someRegionToSourceFrom:someTopicToConsume]
 locators=localHost[10334]
 ```
 
@@ -47,27 +47,29 @@
 #### GeodeKafkaSink Properties
 | Property | Required | Description| Default |
 |---|---|---|---|
-| locators | no, but...| A comma separated string of locators that configure which locators to connect to | localhost[10334] |
-|topicToRegions| yes| A comma separated list of "one topic to many regions" bindings.  Each binding is surrounded by brackets. For example "[topicName:regionName], [anotherTopic: regionName, anotherRegion]" | None.  This is required to be set in the source connector properties
+|locators | no, but...| A comma separated string of locators that configure which locators to connect to | localhost[10334] |
+|topic-to-regions| yes| A comma separated list of "one topic to many regions" bindings.  Each binding is surrounded by brackets. For example "[topicName:regionName], [anotherTopic: regionName, anotherRegion]" | "[gkctopic:gkcregion]"
 |security-client-auth-init| no | Point to class that implements the [AuthInitialize Interface](https://gemfire.docs.pivotal.io/99/geode/managing/security/implementing_authentication.html)
-|nullValuesMeanRemove | no | If set to true, when topics send a SinkRecord with a null value, we will convert to an operation similar to region.remove instead of putting a null value into the region | true |
+|null-values-mean-remove | no | If set to true, when topics send a SinkRecord with a null value, we will convert to an operation similar to region.remove instead of putting a null value into the region | true |
 
-* The topicToRegions property allows us to create mappings between topics  and regions.  A single one-to-one mapping would look similar to "[topic:region]" A one-to-many mapping can be made by comma separating the regions, for example "[topic:region1,region2]"  This is equivalent to both regions being consumers of the topic.
+* The topic-to-regions property allows us to create mappings between topics  and regions.  A single one-to-one mapping would look similar to "[topic:region]" A one-to-many mapping can be made by comma separating the regions, for example "[topic:region1,region2]"  This is equivalent to both regions being consumers of the topic.
 
 #### GeodeKafkaSource Properties
 | Property | Required| Description| Default |
 |---|---|---|---|
 | locators | no, but...| A comma separated string of locators that configure which locators to connect to | localhost[10334] |
-|regionToTopics| yes | A comma separated list of "one region to many topics" mappings.  Each mapping is surrounded by brackets.  For example "[regionName:topicName], "[anotherRegion: topicName, anotherTopic]" | None.  This is required to be set in the source connector properties|
+|region-to-topics| yes | A comma separated list of "one region to many topics" mappings.  Each mapping is surrounded by brackets.  For example "[regionName:topicName], "[anotherRegion: topicName, anotherTopic]" | "[gkcregion:gkctopic]"|
 |security-client-auth-init| no | Point to class that implements the [AuthInitialize Interface](https://gemfire.docs.pivotal.io/99/geode/managing/security/implementing_authentication.html)
-|geodeConnectorBatchSize| no | Maximum number of records to return on each poll| 100 |
-|geodeConnectorQueueSize| no | Maximum number of entries in the connector queue before backing up all Geode cq listeners sharing the task queue | 10000 |
-| loadEntireRegion| no| Determines if we should queue up all entries that currently exist in a region.  This allows us to copy existing region data.  Will be replayed whenever a task needs to re-register a cq| true |
-|durableClientIdPrefix| no | Prefix string for tasks to append to when registering as a durable client.  If empty string, will not register as a durable client | "" |
-| durableClientTimeout| no | How long in milliseconds to persist values in Geode's durable queue before the queue is invalidated| 60000 |
-| cqPrefix| no| Prefix string to identify Connector cq's on a Geode server |cqForGeodeKafka |
+|security-username| no | Supply a username to be used to authenticate with Geode.  Will autoset the security-client-auth-init to use a SystemPropertyAuthInit if one isn't supplied by the user| null|
+|security-password| no | Supply a password to be used to authenticate with Geode| null|
+|geode-connector-batch-size| no | Maximum number of records to return on each poll| 100 |
+|geode-connector-queue-size| no | Maximum number of entries in the connector queue before backing up all Geode cq listeners sharing the task queue | 10000 |
+| load-entire-region| no| Determines if we should queue up all entries that currently exist in a region.  This allows us to copy existing region data.  Will be replayed whenever a task needs to re-register a cq| true |
+|durable-client-id-prefix| no | Prefix string for tasks to append to when registering as a durable client.  If empty string, will not register as a durable client | "" |
+| durable-client-timeout| no | How long in milliseconds to persist values in Geode's durable queue before the queue is invalidated| 60000 |
+| cq-prefix| no| Prefix string to identify Connector cq's on a Geode server |cqForGeodeKafka |
 
-* The regionToTopics property allows us to create mappings between regions and topics.  A single one-to-one mapping would look similar to "[region:topic]" A one-to-many mapping can be made by comma separating the topics, for example "[region:topic1,topic2]"  This is equivalent to the region be a producer for both topics 
+* The region-to-topics property allows us to create mappings between regions and topics.  A single one-to-one mapping would look similar to "[region:topic]" A one-to-many mapping can be made by comma separating the topics, for example "[region:topic1,topic2]"  This is equivalent to the region be a producer for both topics 
 
 ---
 
diff --git a/src/main/java/org/geode/kafka/GeodeConnectorConfig.java b/src/main/java/org/geode/kafka/GeodeConnectorConfig.java
index 2860a8f..cc151a4 100644
--- a/src/main/java/org/geode/kafka/GeodeConnectorConfig.java
+++ b/src/main/java/org/geode/kafka/GeodeConnectorConfig.java
@@ -14,9 +14,6 @@
  */
 package org.geode.kafka;
 
-import org.apache.kafka.common.config.AbstractConfig;
-import org.apache.kafka.common.config.ConfigDef;
-
 import java.util.Arrays;
 import java.util.Collection;
 import java.util.HashMap;
@@ -24,6 +21,9 @@
 import java.util.Map;
 import java.util.stream.Collectors;
 
+import org.apache.kafka.common.config.AbstractConfig;
+import org.apache.kafka.common.config.ConfigDef;
+
 public class GeodeConnectorConfig extends AbstractConfig {
 
   // GeodeKafka Specific Configuration
@@ -37,9 +37,10 @@
   public static final String LOCATORS = "locators";
   public static final String DEFAULT_LOCATOR = "localhost[10334]";
   public static final String SECURITY_CLIENT_AUTH_INIT = "security-client-auth-init";
-  private static final String DEFAULT_SECURITY_AUTH_INIT = "org.geode.kafka.security.SystemPropertyAuthInit";
+  private static final String DEFAULT_SECURITY_AUTH_INIT =
+      "org.geode.kafka.security.SystemPropertyAuthInit";
   public static final String SECURITY_USER = "security-username";
-  public static final String SECURITY_PASSWORD= "security-password";
+  public static final String SECURITY_PASSWORD = "security-password";
 
   protected final int taskId;
   protected List<LocatorHostPort> locatorHostPorts;
@@ -47,13 +48,13 @@
   private String securityUserName;
   private String securityPassword;
 
-  //Just for testing
+  // Just for testing
   protected GeodeConnectorConfig() {
     super(new ConfigDef(), new HashMap());
     taskId = 0;
   }
 
-  //Just for testing
+  // Just for testing
   protected GeodeConnectorConfig(Map<String, String> props) {
     super(new ConfigDef(), props);
     taskId = 0;
@@ -67,19 +68,27 @@
     securityUserName = getString(SECURITY_USER);
     securityPassword = getString(SECURITY_PASSWORD);
     securityClientAuthInit = getString(SECURITY_CLIENT_AUTH_INIT);
-    //if we registered a username/password instead of auth init, we should use the default auth init if one isn't specified
+    // if we registered a username/password instead of auth init, we should use the default auth
+    // init if one isn't specified
     if (usesSecurity()) {
-      securityClientAuthInit = securityClientAuthInit != null ? securityClientAuthInit : DEFAULT_SECURITY_AUTH_INIT;
+      securityClientAuthInit =
+          securityClientAuthInit != null ? securityClientAuthInit : DEFAULT_SECURITY_AUTH_INIT;
     }
   }
 
   protected static ConfigDef configurables() {
     ConfigDef configDef = new ConfigDef();
-    configDef.define(TASK_ID, ConfigDef.Type.INT,  "0", ConfigDef.Importance.MEDIUM,"");
-    configDef.define(LOCATORS, ConfigDef.Type.STRING, DEFAULT_LOCATOR, ConfigDef.Importance.HIGH, "");
-    configDef.define(SECURITY_USER, ConfigDef.Type.STRING, null, ConfigDef.Importance.HIGH, "");
-    configDef.define(SECURITY_PASSWORD, ConfigDef.Type.STRING, null, ConfigDef.Importance.HIGH, "");
-    configDef.define(SECURITY_CLIENT_AUTH_INIT, ConfigDef.Type.STRING, null, ConfigDef.Importance.HIGH, "");
+    configDef.define(TASK_ID, ConfigDef.Type.INT, "0", ConfigDef.Importance.MEDIUM,
+        "Internally used to identify each task");
+    configDef.define(LOCATORS, ConfigDef.Type.STRING, DEFAULT_LOCATOR, ConfigDef.Importance.HIGH,
+        "A comma separated string of locators that configure which locators to connect to");
+    configDef.define(SECURITY_USER, ConfigDef.Type.STRING, null, ConfigDef.Importance.HIGH,
+        "Supply a username to be used to authenticate with Geode.  Will autoset the security-client-auth-init to use a SystemPropertyAuthInit if one isn't supplied by the user");
+    configDef.define(SECURITY_PASSWORD, ConfigDef.Type.STRING, null, ConfigDef.Importance.HIGH,
+        "Supply a password to be used to authenticate with Geode");
+    configDef.define(SECURITY_CLIENT_AUTH_INIT, ConfigDef.Type.STRING, null,
+        ConfigDef.Importance.HIGH,
+        "Point to class that implements the [AuthInitialize Interface](https://gemfire.docs.pivotal.io/99/geode/managing/security/implementing_authentication.html)");
     return configDef;
   }
 
diff --git a/src/main/java/org/geode/kafka/GeodeContext.java b/src/main/java/org/geode/kafka/GeodeContext.java
index 6190ef2..9f30242 100644
--- a/src/main/java/org/geode/kafka/GeodeContext.java
+++ b/src/main/java/org/geode/kafka/GeodeContext.java
@@ -14,10 +14,12 @@
  */
 package org.geode.kafka;
 
-import java.util.Collection;
+import static org.geode.kafka.GeodeConnectorConfig.SECURITY_CLIENT_AUTH_INIT;
+import static org.geode.kafka.GeodeConnectorConfig.SECURITY_PASSWORD;
+import static org.geode.kafka.GeodeConnectorConfig.SECURITY_USER;
+
 import java.util.List;
 
-import org.apache.geode.cache.query.CqResults;
 import org.apache.kafka.connect.errors.ConnectException;
 
 import org.apache.geode.cache.client.ClientCache;
@@ -26,12 +28,9 @@
 import org.apache.geode.cache.query.CqException;
 import org.apache.geode.cache.query.CqExistsException;
 import org.apache.geode.cache.query.CqQuery;
+import org.apache.geode.cache.query.CqResults;
 import org.apache.geode.cache.query.RegionNotFoundException;
 
-import static org.geode.kafka.GeodeConnectorConfig.SECURITY_CLIENT_AUTH_INIT;
-import static org.geode.kafka.GeodeConnectorConfig.SECURITY_PASSWORD;
-import static org.geode.kafka.GeodeConnectorConfig.SECURITY_USER;
-
 public class GeodeContext {
 
   private ClientCache clientCache;
@@ -40,15 +39,18 @@
   public GeodeContext() {}
 
   public ClientCache connectClient(List<LocatorHostPort> locatorHostPortList,
-      String durableClientId, String durableClientTimeout, String securityAuthInit, String securityUserName, String securityPassword, boolean usesSecurity) {
+      String durableClientId, String durableClientTimeout, String securityAuthInit,
+      String securityUserName, String securityPassword, boolean usesSecurity) {
     clientCache = createClientCache(locatorHostPortList, durableClientId, durableClientTimeout,
         securityAuthInit, securityUserName, securityPassword, usesSecurity);
     return clientCache;
   }
 
   public ClientCache connectClient(List<LocatorHostPort> locatorHostPortList,
-      String securityAuthInit, String securityUserName, String securityPassword, boolean usesSecurity) {
-    clientCache = createClientCache(locatorHostPortList, "", "", securityAuthInit, securityUserName, securityPassword, usesSecurity);
+      String securityAuthInit, String securityUserName, String securityPassword,
+      boolean usesSecurity) {
+    clientCache = createClientCache(locatorHostPortList, "", "", securityAuthInit, securityUserName,
+        securityPassword, usesSecurity);
     return clientCache;
   }
 
@@ -57,7 +59,8 @@
   }
 
   public ClientCache createClientCache(List<LocatorHostPort> locators, String durableClientName,
-      String durableClientTimeOut, String securityAuthInit, String securityUserName, String securityPassword, boolean usesSecurity) {
+      String durableClientTimeOut, String securityAuthInit, String securityUserName,
+      String securityPassword, boolean usesSecurity) {
     ClientCacheFactory ccf = new ClientCacheFactory();
 
     if (usesSecurity) {
@@ -93,7 +96,7 @@
   }
 
   public CqResults newCqWithInitialResults(String name, String query, CqAttributes cqAttributes,
-                                                   boolean isDurable) throws ConnectException {
+      boolean isDurable) throws ConnectException {
     try {
       CqQuery cq = clientCache.getQueryService().newCq(name, query, cqAttributes, isDurable);
       return cq.executeWithInitialResults();
diff --git a/src/main/java/org/geode/kafka/security/SystemPropertyAuthInit.java b/src/main/java/org/geode/kafka/security/SystemPropertyAuthInit.java
index cc525a2..6b646ee 100644
--- a/src/main/java/org/geode/kafka/security/SystemPropertyAuthInit.java
+++ b/src/main/java/org/geode/kafka/security/SystemPropertyAuthInit.java
@@ -19,7 +19,6 @@
 import org.apache.geode.distributed.DistributedMember;
 import org.apache.geode.security.AuthInitialize;
 import org.apache.geode.security.AuthenticationFailedException;
-import org.geode.kafka.GeodeConnectorConfig;
 
 
 public class SystemPropertyAuthInit implements AuthInitialize {
diff --git a/src/main/java/org/geode/kafka/sink/GeodeKafkaSink.java b/src/main/java/org/geode/kafka/sink/GeodeKafkaSink.java
index a8985c2..9ee5189 100644
--- a/src/main/java/org/geode/kafka/sink/GeodeKafkaSink.java
+++ b/src/main/java/org/geode/kafka/sink/GeodeKafkaSink.java
@@ -14,17 +14,17 @@
  */
 package org.geode.kafka.sink;
 
+import static org.geode.kafka.sink.GeodeSinkConnectorConfig.SINK_CONFIG_DEF;
+
 import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 
-import org.geode.kafka.GeodeConnectorConfig;
 import org.apache.kafka.common.config.ConfigDef;
 import org.apache.kafka.connect.connector.Task;
 import org.apache.kafka.connect.sink.SinkConnector;
-
-import static org.geode.kafka.sink.GeodeSinkConnectorConfig.SINK_CONFIG_DEF;
+import org.geode.kafka.GeodeConnectorConfig;
 
 public class GeodeKafkaSink extends SinkConnector {
   private Map<String, String> sharedProps;
diff --git a/src/main/java/org/geode/kafka/sink/GeodeKafkaSinkTask.java b/src/main/java/org/geode/kafka/sink/GeodeKafkaSinkTask.java
index 7db384f..be44356 100644
--- a/src/main/java/org/geode/kafka/sink/GeodeKafkaSinkTask.java
+++ b/src/main/java/org/geode/kafka/sink/GeodeKafkaSinkTask.java
@@ -20,9 +20,9 @@
 import java.util.Map;
 import java.util.stream.Collectors;
 
-import org.geode.kafka.GeodeContext;
 import org.apache.kafka.connect.sink.SinkRecord;
 import org.apache.kafka.connect.sink.SinkTask;
+import org.geode.kafka.GeodeContext;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -61,7 +61,9 @@
       configure(geodeConnectorConfig);
       geodeContext = new GeodeContext();
       geodeContext.connectClient(geodeConnectorConfig.getLocatorHostPorts(),
-          geodeConnectorConfig.getSecurityClientAuthInit(), geodeConnectorConfig.getSecurityUserName(), geodeConnectorConfig.getSecurityPassword(), geodeConnectorConfig.usesSecurity());
+          geodeConnectorConfig.getSecurityClientAuthInit(),
+          geodeConnectorConfig.getSecurityUserName(), geodeConnectorConfig.getSecurityPassword(),
+          geodeConnectorConfig.usesSecurity());
       regionNameToRegion = createProxyRegions(topicToRegions.values());
     } catch (Exception e) {
       logger.error("Unable to start sink task", e);
diff --git a/src/main/java/org/geode/kafka/sink/GeodeSinkConnectorConfig.java b/src/main/java/org/geode/kafka/sink/GeodeSinkConnectorConfig.java
index bb51b0e..a074220 100644
--- a/src/main/java/org/geode/kafka/sink/GeodeSinkConnectorConfig.java
+++ b/src/main/java/org/geode/kafka/sink/GeodeSinkConnectorConfig.java
@@ -14,20 +14,19 @@
  */
 package org.geode.kafka.sink;
 
-import org.apache.kafka.common.config.ConfigDef;
-
 import java.util.List;
 import java.util.Map;
 
+import org.apache.kafka.common.config.ConfigDef;
 import org.geode.kafka.GeodeConnectorConfig;
 
 public class GeodeSinkConnectorConfig extends GeodeConnectorConfig {
   public static final ConfigDef SINK_CONFIG_DEF = configurables();
 
   // Used by sink
-  public static final String TOPIC_TO_REGION_BINDINGS = "topicToRegions";
+  public static final String TOPIC_TO_REGION_BINDINGS = "topic-to-regions";
   public static final String DEFAULT_TOPIC_TO_REGION_BINDING = "[gkcTopic:gkcRegion]";
-  public static final String NULL_VALUES_MEAN_REMOVE = "nullValuesMeanRemove";
+  public static final String NULL_VALUES_MEAN_REMOVE = "null-values-mean-remove";
   public static final String DEFAULT_NULL_VALUES_MEAN_REMOVE = "true";
 
   private Map<String, List<String>> topicToRegions;
@@ -41,8 +40,12 @@
 
   protected static ConfigDef configurables() {
     ConfigDef configDef = GeodeConnectorConfig.configurables();
-    configDef.define(TOPIC_TO_REGION_BINDINGS, ConfigDef.Type.STRING, DEFAULT_TOPIC_TO_REGION_BINDING, ConfigDef.Importance.HIGH, "");
-    configDef.define(NULL_VALUES_MEAN_REMOVE, ConfigDef.Type.BOOLEAN, DEFAULT_NULL_VALUES_MEAN_REMOVE, ConfigDef.Importance.MEDIUM, "");
+    configDef.define(TOPIC_TO_REGION_BINDINGS, ConfigDef.Type.STRING,
+        DEFAULT_TOPIC_TO_REGION_BINDING, ConfigDef.Importance.HIGH,
+        "A comma separated list of \"one topic to many regions\" bindings.  Each binding is surrounded by brackets. For example \"[topicName:regionName], [anotherTopic: regionName, anotherRegion]");
+    configDef.define(NULL_VALUES_MEAN_REMOVE, ConfigDef.Type.BOOLEAN,
+        DEFAULT_NULL_VALUES_MEAN_REMOVE, ConfigDef.Importance.MEDIUM,
+        "If set to true, when topics send a SinkRecord with a null value, we will convert to an operation similar to region.remove instead of putting a null value into the region");
     return configDef;
   }
 
diff --git a/src/main/java/org/geode/kafka/source/GeodeKafkaSource.java b/src/main/java/org/geode/kafka/source/GeodeKafkaSource.java
index dac94f6..7b4445e 100644
--- a/src/main/java/org/geode/kafka/source/GeodeKafkaSource.java
+++ b/src/main/java/org/geode/kafka/source/GeodeKafkaSource.java
@@ -14,19 +14,19 @@
  */
 package org.geode.kafka.source;
 
+import static org.geode.kafka.source.GeodeSourceConnectorConfig.SOURCE_CONFIG_DEF;
+
 import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 
-import org.geode.kafka.GeodeConnectorConfig;
 import org.apache.kafka.common.config.ConfigDef;
 import org.apache.kafka.common.utils.AppInfoParser;
 import org.apache.kafka.connect.connector.Task;
 import org.apache.kafka.connect.source.SourceConnector;
 import org.apache.kafka.connect.util.ConnectorUtils;
-
-import static org.geode.kafka.source.GeodeSourceConnectorConfig.SOURCE_CONFIG_DEF;
+import org.geode.kafka.GeodeConnectorConfig;
 
 
 public class GeodeKafkaSource extends SourceConnector {
@@ -43,7 +43,8 @@
     List<Map<String, String>> taskConfigs = new ArrayList<>();
     List<String> bindings =
         GeodeConnectorConfig
-            .parseStringByComma(sharedProps.get(GeodeSourceConnectorConfig.REGION_TO_TOPIC_BINDINGS));
+            .parseStringByComma(
+                sharedProps.get(GeodeSourceConnectorConfig.REGION_TO_TOPIC_BINDINGS));
     List<List<String>> bindingsPerTask = ConnectorUtils.groupPartitions(bindings, maxTasks);
 
     for (int i = 0; i < maxTasks; i++) {
diff --git a/src/main/java/org/geode/kafka/source/GeodeKafkaSourceTask.java b/src/main/java/org/geode/kafka/source/GeodeKafkaSourceTask.java
index b1c289f..4acc081 100644
--- a/src/main/java/org/geode/kafka/source/GeodeKafkaSourceTask.java
+++ b/src/main/java/org/geode/kafka/source/GeodeKafkaSourceTask.java
@@ -21,17 +21,16 @@
 import java.util.Map;
 import java.util.stream.Collectors;
 
-import org.apache.geode.cache.query.CqResults;
-import org.apache.geode.cache.query.Struct;
-import org.geode.kafka.GeodeContext;
 import org.apache.kafka.connect.source.SourceRecord;
 import org.apache.kafka.connect.source.SourceTask;
+import org.geode.kafka.GeodeContext;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import org.apache.geode.cache.query.CqAttributes;
 import org.apache.geode.cache.query.CqAttributesFactory;
-import org.apache.geode.cache.query.CqEvent;
+import org.apache.geode.cache.query.CqResults;
+import org.apache.geode.cache.query.Struct;
 
 public class GeodeKafkaSourceTask extends SourceTask {
 
@@ -70,7 +69,9 @@
       geodeContext = new GeodeContext();
       geodeContext.connectClient(geodeConnectorConfig.getLocatorHostPorts(),
           geodeConnectorConfig.getDurableClientId(), geodeConnectorConfig.getDurableClientTimeout(),
-          geodeConnectorConfig.getSecurityClientAuthInit(), geodeConnectorConfig.getSecurityUserName(), geodeConnectorConfig.getSecurityPassword(), geodeConnectorConfig.usesSecurity());
+          geodeConnectorConfig.getSecurityClientAuthInit(),
+          geodeConnectorConfig.getSecurityUserName(), geodeConnectorConfig.getSecurityPassword(),
+          geodeConnectorConfig.usesSecurity());
 
       batchSize = geodeConnectorConfig.getBatchSize();
       eventBufferSupplier = new SharedEventBufferSupplier(geodeConnectorConfig.getQueueSize());
@@ -140,7 +141,10 @@
             geodeContext.newCqWithInitialResults(generateCqName(taskId, cqPrefix, regionName),
                 "select * from /" + regionName, cqAttributes,
                 isDurable);
-        eventBuffer.get().addAll((Collection<GeodeEvent>)events.stream().map(e -> new GeodeEvent(regionName, ((Struct)e).get("key"), ((Struct)e).get("value"))).collect(Collectors.toList()));
+        eventBuffer.get()
+            .addAll((Collection<GeodeEvent>) events.stream().map(
+                e -> new GeodeEvent(regionName, ((Struct) e).get("key"), ((Struct) e).get("value")))
+                .collect(Collectors.toList()));
       } else {
         geodeContext.newCq(generateCqName(taskId, cqPrefix, regionName),
             "select * from /" + regionName, cqAttributes,
diff --git a/src/main/java/org/geode/kafka/source/GeodeSourceConnectorConfig.java b/src/main/java/org/geode/kafka/source/GeodeSourceConnectorConfig.java
index 78673cd..e96796b 100644
--- a/src/main/java/org/geode/kafka/source/GeodeSourceConnectorConfig.java
+++ b/src/main/java/org/geode/kafka/source/GeodeSourceConnectorConfig.java
@@ -26,29 +26,30 @@
   public static final ConfigDef SOURCE_CONFIG_DEF = configurables();
 
   // Geode Configuration
-  public static final String DURABLE_CLIENT_ID_PREFIX = "durableClientIdPrefix";
+  public static final String DURABLE_CLIENT_ID_PREFIX = "durable-client-id-prefix";
   public static final String DEFAULT_DURABLE_CLIENT_ID = "";
-  public static final String DURABLE_CLIENT_TIME_OUT = "durableClientTimeout";
+  public static final String DURABLE_CLIENT_TIME_OUT = "durable-client-timeout";
   public static final String DEFAULT_DURABLE_CLIENT_TIMEOUT = "60000";
 
-  public static final String CQ_PREFIX = "cqPrefix";
+  public static final String CQ_PREFIX = "cq-prefix";
   public static final String DEFAULT_CQ_PREFIX = "cqForGeodeKafka";
 
   /**
    * Used as a key for source partitions
    */
   public static final String REGION_PARTITION = "regionPartition";
-  public static final String REGION_TO_TOPIC_BINDINGS = "regionToTopics";
+  public static final String REGION_TO_TOPIC_BINDINGS = "region-to-topics";
   public static final String DEFAULT_REGION_TO_TOPIC_BINDING = "[gkcRegion:gkcTopic]";
-  public static final String CQS_TO_REGISTER = "cqsToRegister"; //used internally so that only 1 task will register a cq
+  public static final String CQS_TO_REGISTER = "cqsToRegister"; // used internally so that only 1
+                                                                // task will register a cq
 
-  public static final String BATCH_SIZE = "geodeConnectorBatchSize";
+  public static final String BATCH_SIZE = "geode-connector-batch-size";
   public static final String DEFAULT_BATCH_SIZE = "100";
 
-  public static final String QUEUE_SIZE = "geodeConnectorQueueSize";
+  public static final String QUEUE_SIZE = "geode-connector-queue-size";
   public static final String DEFAULT_QUEUE_SIZE = "10000";
 
-  public static final String LOAD_ENTIRE_REGION = "loadEntireRegion";
+  public static final String LOAD_ENTIRE_REGION = "load-entire-region";
   public static final String DEFAULT_LOAD_ENTIRE_REGION = "false";
 
   private final String durableClientId;
@@ -81,14 +82,27 @@
 
   protected static ConfigDef configurables() {
     ConfigDef configDef = GeodeConnectorConfig.configurables();
-    configDef.define(CQS_TO_REGISTER, ConfigDef.Type.STRING, "", ConfigDef.Importance.HIGH, "Internally created and used parameter, for signalling a task to register cqs");
-    configDef.define(REGION_TO_TOPIC_BINDINGS, ConfigDef.Type.STRING, DEFAULT_REGION_TO_TOPIC_BINDING, ConfigDef.Importance.HIGH, "");
-    configDef.define(DURABLE_CLIENT_ID_PREFIX, ConfigDef.Type.STRING, DEFAULT_DURABLE_CLIENT_ID, ConfigDef.Importance.LOW, "");
-    configDef.define(DURABLE_CLIENT_TIME_OUT, ConfigDef.Type.STRING, DEFAULT_DURABLE_CLIENT_TIMEOUT, ConfigDef.Importance.LOW, "");
-    configDef.define(CQ_PREFIX, ConfigDef.Type.STRING, DEFAULT_CQ_PREFIX, ConfigDef.Importance.LOW, "");
-    configDef.define(BATCH_SIZE, ConfigDef.Type.INT, DEFAULT_BATCH_SIZE, ConfigDef.Importance.MEDIUM, "");
-    configDef.define(QUEUE_SIZE, ConfigDef.Type.INT, DEFAULT_QUEUE_SIZE, ConfigDef.Importance.MEDIUM, "");
-    configDef.define(LOAD_ENTIRE_REGION, ConfigDef.Type.BOOLEAN, DEFAULT_LOAD_ENTIRE_REGION, ConfigDef.Importance.MEDIUM, "");
+    configDef.define(CQS_TO_REGISTER, ConfigDef.Type.STRING, "", ConfigDef.Importance.HIGH,
+        "Internally created and used parameter, for signalling a task to register cqs");
+    configDef.define(REGION_TO_TOPIC_BINDINGS, ConfigDef.Type.STRING,
+        DEFAULT_REGION_TO_TOPIC_BINDING, ConfigDef.Importance.HIGH,
+        "A comma separated list of \"one region to many topics\" mappings.  Each mapping is surrounded by brackets.  For example \"[regionName:topicName], \"[anotherRegion: topicName, anotherTopic]\"");
+    configDef.define(DURABLE_CLIENT_ID_PREFIX, ConfigDef.Type.STRING, DEFAULT_DURABLE_CLIENT_ID,
+        ConfigDef.Importance.LOW,
+        "Prefix string for tasks to append to when registering as a durable client.  If empty string, will not register as a durable client");
+    configDef.define(DURABLE_CLIENT_TIME_OUT, ConfigDef.Type.STRING, DEFAULT_DURABLE_CLIENT_TIMEOUT,
+        ConfigDef.Importance.LOW,
+        "How long in milliseconds to persist values in Geode's durable queue before the queue is invalidated");
+    configDef.define(CQ_PREFIX, ConfigDef.Type.STRING, DEFAULT_CQ_PREFIX, ConfigDef.Importance.LOW,
+        "Prefix string to identify Connector cq's on a Geode server");
+    configDef.define(BATCH_SIZE, ConfigDef.Type.INT, DEFAULT_BATCH_SIZE,
+        ConfigDef.Importance.MEDIUM, "Maximum number of records to return on each poll");
+    configDef.define(QUEUE_SIZE, ConfigDef.Type.INT, DEFAULT_QUEUE_SIZE,
+        ConfigDef.Importance.MEDIUM,
+        "Maximum number of entries in the connector queue before backing up all Geode cq listeners sharing the task queue ");
+    configDef.define(LOAD_ENTIRE_REGION, ConfigDef.Type.BOOLEAN, DEFAULT_LOAD_ENTIRE_REGION,
+        ConfigDef.Importance.MEDIUM,
+        "Determines if we should queue up all entries that currently exist in a region.  This allows us to copy existing region data.  Will be replayed whenever a task needs to re-register a cq");
     return configDef;
   }
 
diff --git a/src/test/java/org/geode/kafka/GeodeConnectorConfigTest.java b/src/test/java/org/geode/kafka/GeodeConnectorConfigTest.java
index 11a00f9..5c63d98 100644
--- a/src/test/java/org/geode/kafka/GeodeConnectorConfigTest.java
+++ b/src/test/java/org/geode/kafka/GeodeConnectorConfigTest.java
@@ -138,7 +138,8 @@
   public void usesSecurityShouldBeTrueIfSecurityUserSet() {
     Map<String, String> props = new HashMap<>();
     props.put(SECURITY_USER, "some user");
-    GeodeConnectorConfig config = new GeodeConnectorConfig(GeodeConnectorConfig.configurables(), props);
+    GeodeConnectorConfig config =
+        new GeodeConnectorConfig(GeodeConnectorConfig.configurables(), props);
     assertTrue(config.usesSecurity());
   }
 
@@ -146,14 +147,16 @@
   public void usesSecurityShouldBeTrueIfSecurityClientAuthInitSet() {
     Map<String, String> props = new HashMap<>();
     props.put(SECURITY_CLIENT_AUTH_INIT, "someclass");
-    GeodeConnectorConfig config = new GeodeConnectorConfig(GeodeConnectorConfig.configurables(), props);
+    GeodeConnectorConfig config =
+        new GeodeConnectorConfig(GeodeConnectorConfig.configurables(), props);
     assertTrue(config.usesSecurity());
   }
 
   @Test
   public void usesSecurityShouldBeFalseIfSecurityUserAndSecurityClientAuthInitNotSet() {
     Map<String, String> props = new HashMap<>();
-    GeodeConnectorConfig config = new GeodeConnectorConfig(GeodeConnectorConfig.configurables(), props);
+    GeodeConnectorConfig config =
+        new GeodeConnectorConfig(GeodeConnectorConfig.configurables(), props);
     assertFalse(config.usesSecurity());
   }
 
@@ -161,14 +164,16 @@
   public void securityClientAuthInitShouldBeSetIfUserIsSet() {
     Map<String, String> props = new HashMap<>();
     props.put(SECURITY_USER, "some user");
-    GeodeConnectorConfig config = new GeodeConnectorConfig(GeodeConnectorConfig.configurables(), props);
+    GeodeConnectorConfig config =
+        new GeodeConnectorConfig(GeodeConnectorConfig.configurables(), props);
     assertNotNull(config.getSecurityClientAuthInit());
   }
 
   @Test
   public void securityClientAuthInitShouldNotBeSetIfUserIsNotSetAndNotSpecificallySet() {
     Map<String, String> props = new HashMap<>();
-    GeodeConnectorConfig config = new GeodeConnectorConfig(GeodeConnectorConfig.configurables(), props);
+    GeodeConnectorConfig config =
+        new GeodeConnectorConfig(GeodeConnectorConfig.configurables(), props);
     assertNull(config.getSecurityClientAuthInit());
   }
 
diff --git a/src/test/java/org/geode/kafka/WorkerAndHerderWrapper.java b/src/test/java/org/geode/kafka/WorkerAndHerderWrapper.java
index b4a7bbe..3afcde7 100644
--- a/src/test/java/org/geode/kafka/WorkerAndHerderWrapper.java
+++ b/src/test/java/org/geode/kafka/WorkerAndHerderWrapper.java
@@ -21,8 +21,6 @@
 import java.util.HashMap;
 import java.util.Map;
 
-import org.geode.kafka.sink.GeodeKafkaSink;
-import org.geode.kafka.source.GeodeKafkaSource;
 import org.apache.kafka.common.utils.SystemTime;
 import org.apache.kafka.connect.connector.policy.AllConnectorClientConfigOverridePolicy;
 import org.apache.kafka.connect.runtime.ConnectorConfig;
@@ -34,6 +32,8 @@
 import org.apache.kafka.connect.runtime.standalone.StandaloneHerder;
 import org.apache.kafka.connect.storage.MemoryOffsetBackingStore;
 import org.apache.kafka.connect.util.ConnectUtils;
+import org.geode.kafka.sink.GeodeKafkaSink;
+import org.geode.kafka.source.GeodeKafkaSource;
 
 public class WorkerAndHerderWrapper {
 
diff --git a/src/test/java/org/geode/kafka/source/GeodeKafkaSourceTaskTest.java b/src/test/java/org/geode/kafka/source/GeodeKafkaSourceTaskTest.java
index 7901426..4fa7d81 100644
--- a/src/test/java/org/geode/kafka/source/GeodeKafkaSourceTaskTest.java
+++ b/src/test/java/org/geode/kafka/source/GeodeKafkaSourceTaskTest.java
@@ -30,26 +30,20 @@
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.HashMap;
-import java.util.LinkedList;
 import java.util.List;
 import java.util.Map;
 import java.util.concurrent.BlockingQueue;
 import java.util.concurrent.LinkedBlockingQueue;
 
-import org.apache.geode.cache.query.CqAttributes;
-import org.apache.geode.cache.query.CqResults;
-import org.apache.geode.cache.query.SelectResults;
-import org.apache.geode.cache.query.Struct;
-import org.apache.geode.cache.query.internal.LinkedStructSet;
-import org.apache.geode.cache.query.internal.ResultsBag;
-import org.apache.geode.cache.query.internal.ResultsBag;
-import org.apache.geode.cache.query.internal.StructImpl;
-import org.apache.geode.cache.query.internal.types.StructTypeImpl;
 import org.geode.kafka.GeodeContext;
 import org.junit.Test;
 
 import org.apache.geode.cache.client.ClientCache;
+import org.apache.geode.cache.query.CqAttributes;
 import org.apache.geode.cache.query.CqEvent;
+import org.apache.geode.cache.query.CqResults;
+import org.apache.geode.cache.query.Struct;
+import org.apache.geode.cache.query.internal.ResultsBag;
 
 
 public class GeodeKafkaSourceTaskTest {
@@ -149,7 +143,8 @@
 
     GeodeContext geodeContext = mock(GeodeContext.class);
     when(geodeContext.getClientCache()).thenReturn(clientCache);
-    when(geodeContext.newCqWithInitialResults(anyString(), anyString(), any(CqAttributes.class), anyBoolean())).thenReturn(new ResultsBag());
+    when(geodeContext.newCqWithInitialResults(anyString(), anyString(), any(CqAttributes.class),
+        anyBoolean())).thenReturn(new ResultsBag());
     Map<String, List<String>> regionToTopicsMap = new HashMap<>();
     regionToTopicsMap.put("region1", new ArrayList());