Adding the configuration changes as per spec
diff --git a/pom.xml b/pom.xml
index b6dc832..6c2676b 100644
--- a/pom.xml
+++ b/pom.xml
@@ -227,17 +227,10 @@
                 <groupId>org.apache.maven.plugins</groupId>
                 <artifactId>maven-compiler-plugin</artifactId>
                 <version>${maven-plugin.version}</version>
+                <inherited>true</inherited>
                 <configuration>
                     <source>1.8</source>
                     <target>1.8</target>
-                </configuration>
-            </plugin>
-            <plugin>
-                <groupId>org.apache.maven.plugins</groupId>
-                <artifactId>maven-compiler-plugin</artifactId>
-                <version>${maven-plugin.version}</version>
-                <inherited>true</inherited>
-                <configuration>
                     <compilerArgs>
                         <arg>-Xlint:-processing</arg>
                         <arg>-Xlint:all</arg>
diff --git a/src/main/java/org/apache/geode/kafka/GeodeConnectorConfig.java b/src/main/java/org/apache/geode/kafka/GeodeConnectorConfig.java
index 717fef6..05dd5f2 100644
--- a/src/main/java/org/apache/geode/kafka/GeodeConnectorConfig.java
+++ b/src/main/java/org/apache/geode/kafka/GeodeConnectorConfig.java
@@ -25,6 +25,8 @@
 import org.apache.kafka.common.config.ConfigDef;
 import org.apache.kafka.connect.storage.StringConverter;
 
+import org.apache.geode.annotations.VisibleForTesting;
+
 public class GeodeConnectorConfig extends AbstractConfig {
 
   // GeodeKafka Specific Configuration
@@ -35,13 +37,24 @@
   /**
    * Specifies which Locators to connect to Apache Geode
    */
-  public static final String LOCATORS = "locators";
-  public static final String DEFAULT_LOCATOR = "localhost[10334]";
-  public static final String SECURITY_CLIENT_AUTH_INIT = "security-client-auth-init";
+  private static final String LOCATORS = "locators";
+  private static final String DEFAULT_LOCATOR = "localhost[10334]";
+  private static final String SECURITY_CLIENT_AUTH_INIT = "security-client-auth-init";
   private static final String DEFAULT_SECURITY_AUTH_INIT =
       "org.apache.geode.kafka.security.SystemPropertyAuthInit";
-  public static final String SECURITY_USER = "security-username";
-  public static final String SECURITY_PASSWORD = "security-password";
+  private static final String SECURITY_USER = "security-username";
+  private static final String SECURITY_PASSWORD = "security-password";
+  private static final String TASK_ID_DOCUMENTATION = "Internally used to identify each task";
+  private static final String
+      LOCATORS_DOCUMENTATION =
+      "A comma separated string of locators that configure which locators to connect to";
+  private static final String
+      SECURITY_USER_DOCUMENTATION =
+      "Supply a username to be used to authenticate with Geode.  Will automatically set the security-client-auth-init to use a SystemPropertyAuthInit if one isn't supplied by the user";
+  private static final String SECURITY_PASSWORD_DOCUMENTATION = "Supply a password to be used to authenticate with Geode";
+  private static final String
+      SECURITY_CLIENT_AUTH_INIT_DOCUMENTATION =
+      "Point to the Java class that implements the [AuthInitialize Interface](https://geode.apache.org/docs/guide/19/managing/security/implementing_authentication.html)";
 
   public static final String DEFAULT_KEY_CONVERTER = StringConverter.class.getCanonicalName();
   public static final String DEFAULT_VALUE_CONVERTER = StringConverter.class.getCanonicalName();
@@ -52,7 +65,7 @@
   private String securityUserName;
   private String securityPassword;
 
-  // Just for testing
+  @VisibleForTesting
   protected GeodeConnectorConfig() {
     super(new ConfigDef(), new HashMap<>());
     taskId = 0;
@@ -75,17 +88,38 @@
 
   protected static ConfigDef configurables() {
     ConfigDef configDef = new ConfigDef();
-    configDef.define(TASK_ID, ConfigDef.Type.INT, "0", ConfigDef.Importance.MEDIUM,
-        "Internally used to identify each task");
-    configDef.define(LOCATORS, ConfigDef.Type.STRING, DEFAULT_LOCATOR, ConfigDef.Importance.HIGH,
-        "A comma separated string of locators that configure which locators to connect to");
-    configDef.define(SECURITY_USER, ConfigDef.Type.STRING, null, ConfigDef.Importance.HIGH,
-        "Supply a username to be used to authenticate with Geode.  Will automatically set the security-client-auth-init to use a SystemPropertyAuthInit if one isn't supplied by the user");
-    configDef.define(SECURITY_PASSWORD, ConfigDef.Type.STRING, null, ConfigDef.Importance.HIGH,
-        "Supply a password to be used to authenticate with Geode");
-    configDef.define(SECURITY_CLIENT_AUTH_INIT, ConfigDef.Type.STRING, null,
+    configDef.define(
+        TASK_ID,
+        ConfigDef.Type.INT,
+        "0",
+        ConfigDef.Importance.MEDIUM,
+        TASK_ID_DOCUMENTATION);
+    configDef.define(
+        LOCATORS,
+        ConfigDef.Type.STRING,
+        DEFAULT_LOCATOR,
         ConfigDef.Importance.HIGH,
-        "Point to class that implements the [AuthInitialize Interface](https://gemfire.docs.pivotal.io/99/geode/managing/security/implementing_authentication.html)");
+        LOCATORS_DOCUMENTATION);
+    configDef.define(
+        SECURITY_USER,
+        ConfigDef.Type.STRING,
+        null,
+        ConfigDef.Importance.HIGH,
+        SECURITY_USER_DOCUMENTATION);
+
+    configDef.define(
+        SECURITY_PASSWORD,
+        ConfigDef.Type.PASSWORD,
+        null,
+        ConfigDef.Importance.HIGH,
+        SECURITY_PASSWORD_DOCUMENTATION);
+
+    configDef.define(
+        SECURITY_CLIENT_AUTH_INIT,
+        ConfigDef.Type.PASSWORD,
+        null,
+        ConfigDef.Importance.HIGH,
+        SECURITY_CLIENT_AUTH_INIT_DOCUMENTATION);
     return configDef;
   }
 
diff --git a/src/main/java/org/apache/geode/kafka/sink/GeodeSinkConnectorConfig.java b/src/main/java/org/apache/geode/kafka/sink/GeodeSinkConnectorConfig.java
index cd49778..46cd6c0 100644
--- a/src/main/java/org/apache/geode/kafka/sink/GeodeSinkConnectorConfig.java
+++ b/src/main/java/org/apache/geode/kafka/sink/GeodeSinkConnectorConfig.java
@@ -25,10 +25,16 @@
   public static final ConfigDef SINK_CONFIG_DEF = configurables();
 
   // Used by sink
-  public static final String TOPIC_TO_REGION_BINDINGS = "topic-to-regions";
-  public static final String DEFAULT_TOPIC_TO_REGION_BINDING = "[gkcTopic:gkcRegion]";
-  public static final String NULL_VALUES_MEAN_REMOVE = "null-values-mean-remove";
-  public static final String DEFAULT_NULL_VALUES_MEAN_REMOVE = "true";
+  private static final String TOPIC_TO_REGION_BINDINGS = "topic-to-regions";
+  private static final String DEFAULT_TOPIC_TO_REGION_BINDING = "[gkcTopic:gkcRegion]";
+  private static final String NULL_VALUES_MEAN_REMOVE = "null-values-mean-remove";
+  private static final String DEFAULT_NULL_VALUES_MEAN_REMOVE = "true";
+  private static final String
+      NULL_VALUES_MEAN_REMOVE_DOCUMENTATION =
+      "If set to true, when topics send a SinkRecord with a null value, we will convert to an operation similar to region.remove instead of putting a null value into the region";
+  private static final String
+      TOPIC_TO_REGION_BINDINGS_DOCUMENTATION =
+      "A comma separated list of \"one topic to many regions\" bindings.  Each binding is surrounded by brackets. For example \"[topicName:regionName], [anotherTopic: regionName, anotherRegion]";
 
   private final Map<String, List<String>> topicToRegions;
   private final boolean nullValuesMeanRemove;
@@ -41,12 +47,19 @@
 
   protected static ConfigDef configurables() {
     ConfigDef configDef = GeodeConnectorConfig.configurables();
-    configDef.define(TOPIC_TO_REGION_BINDINGS, ConfigDef.Type.STRING,
-        DEFAULT_TOPIC_TO_REGION_BINDING, ConfigDef.Importance.HIGH,
-        "A comma separated list of \"one topic to many regions\" bindings.  Each binding is surrounded by brackets. For example \"[topicName:regionName], [anotherTopic: regionName, anotherRegion]");
-    configDef.define(NULL_VALUES_MEAN_REMOVE, ConfigDef.Type.BOOLEAN,
-        DEFAULT_NULL_VALUES_MEAN_REMOVE, ConfigDef.Importance.MEDIUM,
-        "If set to true, when topics send a SinkRecord with a null value, we will convert to an operation similar to region.remove instead of putting a null value into the region");
+    configDef.define(
+        TOPIC_TO_REGION_BINDINGS,
+        ConfigDef.Type.STRING,
+        DEFAULT_TOPIC_TO_REGION_BINDING,
+        ConfigDef.Importance.HIGH,
+        TOPIC_TO_REGION_BINDINGS_DOCUMENTATION);
+
+    configDef.define(
+        NULL_VALUES_MEAN_REMOVE,
+        ConfigDef.Type.BOOLEAN,
+        DEFAULT_NULL_VALUES_MEAN_REMOVE,
+        ConfigDef.Importance.MEDIUM,
+        NULL_VALUES_MEAN_REMOVE_DOCUMENTATION);
     return configDef;
   }
 
diff --git a/src/main/java/org/apache/geode/kafka/source/GeodeSourceConnectorConfig.java b/src/main/java/org/apache/geode/kafka/source/GeodeSourceConnectorConfig.java
index 01294c9..05d4fd6 100644
--- a/src/main/java/org/apache/geode/kafka/source/GeodeSourceConnectorConfig.java
+++ b/src/main/java/org/apache/geode/kafka/source/GeodeSourceConnectorConfig.java
@@ -53,6 +53,27 @@
   public static final String LOAD_ENTIRE_REGION = "load-entire-region";
   public static final String DEFAULT_LOAD_ENTIRE_REGION = "false";
 
+  private static final String
+      CQS_TO_REGISTER_DOCUMENTATION =
+      "Internally created and used parameter, for signalling a task to register CQs on Apache Geode";
+  private static final String
+      REGION_TO_TOPIC_BINDINGS_DOCUMENTATION =
+      "A comma separated list of \"one region to many topics\" mappings.  Each mapping is surrounded by brackets.  For example \"[regionName:topicName], \"[anotherRegion: topicName, anotherTopic]\"";
+  private static final String
+      DURABLE_CLIENT_ID_PREFIX_DOCUMENTATION =
+      "Prefix string for tasks to append to when registering as a durable client.  If empty string, will not register as a durable client";
+  private static final String
+      LOAD_ENTIRE_REGION_DOCUMENTATION =
+      "Determines if we should queue up all entries that currently exist in a region.  This allows us to copy existing region data.  Will be replayed whenever a task needs to re-register a CQ";
+  private static final String
+      DURABLE_CLIENT_TIME_OUT_DOCUMENTATION =
+      "How long in milliseconds to persist values in Geode's durable queue before the queue is invalidated";
+  private static final String CQ_PREFIX_DOCUMENTATION = "Prefix string to identify Connector CQ's on a Geode server";
+  private static final String BATCH_SIZE_DOCUMENTATION = "Maximum number of records to return on each poll";
+  private static final String
+      QUEUE_SIZE_DOCUMENTATION =
+      "Maximum number of entries in the connector queue before backing up all Geode CQ listeners sharing the task queue ";
+
   private final String durableClientId;
   private final String durableClientTimeout;
   private final String cqPrefix;
@@ -82,27 +103,60 @@
 
   protected static ConfigDef configurables() {
     ConfigDef configDef = GeodeConnectorConfig.configurables();
-    configDef.define(CQS_TO_REGISTER, ConfigDef.Type.STRING, "", ConfigDef.Importance.HIGH,
-        "Internally created and used parameter, for signalling a task to register cqs");
-    configDef.define(REGION_TO_TOPIC_BINDINGS, ConfigDef.Type.STRING,
-        DEFAULT_REGION_TO_TOPIC_BINDING, ConfigDef.Importance.HIGH,
-        "A comma separated list of \"one region to many topics\" mappings.  Each mapping is surrounded by brackets.  For example \"[regionName:topicName], \"[anotherRegion: topicName, anotherTopic]\"");
-    configDef.define(DURABLE_CLIENT_ID_PREFIX, ConfigDef.Type.STRING, DEFAULT_DURABLE_CLIENT_ID,
+
+    configDef.define(
+        CQS_TO_REGISTER,
+        ConfigDef.Type.STRING,
+        "",
+        ConfigDef.Importance.HIGH,
+        CQS_TO_REGISTER_DOCUMENTATION);
+
+    configDef.define(
+        REGION_TO_TOPIC_BINDINGS,
+        ConfigDef.Type.STRING,
+        DEFAULT_REGION_TO_TOPIC_BINDING,
+        ConfigDef.Importance.HIGH,
+        REGION_TO_TOPIC_BINDINGS_DOCUMENTATION);
+
+    configDef.define(
+        DURABLE_CLIENT_ID_PREFIX,
+        ConfigDef.Type.STRING,
+        DEFAULT_DURABLE_CLIENT_ID,
         ConfigDef.Importance.LOW,
-        "Prefix string for tasks to append to when registering as a durable client.  If empty string, will not register as a durable client");
-    configDef.define(DURABLE_CLIENT_TIME_OUT, ConfigDef.Type.STRING, DEFAULT_DURABLE_CLIENT_TIMEOUT,
+        DURABLE_CLIENT_ID_PREFIX_DOCUMENTATION);
+
+    configDef.define(DURABLE_CLIENT_TIME_OUT,
+        ConfigDef.Type.STRING,
+        DEFAULT_DURABLE_CLIENT_TIMEOUT,
         ConfigDef.Importance.LOW,
-        "How long in milliseconds to persist values in Geode's durable queue before the queue is invalidated");
-    configDef.define(CQ_PREFIX, ConfigDef.Type.STRING, DEFAULT_CQ_PREFIX, ConfigDef.Importance.LOW,
-        "Prefix string to identify Connector cq's on a Geode server");
-    configDef.define(BATCH_SIZE, ConfigDef.Type.INT, DEFAULT_BATCH_SIZE,
-        ConfigDef.Importance.MEDIUM, "Maximum number of records to return on each poll");
-    configDef.define(QUEUE_SIZE, ConfigDef.Type.INT, DEFAULT_QUEUE_SIZE,
+        DURABLE_CLIENT_TIME_OUT_DOCUMENTATION);
+
+    configDef.define(CQ_PREFIX,
+        ConfigDef.Type.STRING,
+        DEFAULT_CQ_PREFIX,
+        ConfigDef.Importance.LOW,
+        CQ_PREFIX_DOCUMENTATION);
+
+    configDef.define(
+        BATCH_SIZE,
+        ConfigDef.Type.INT,
+        DEFAULT_BATCH_SIZE,
         ConfigDef.Importance.MEDIUM,
-        "Maximum number of entries in the connector queue before backing up all Geode cq listeners sharing the task queue ");
-    configDef.define(LOAD_ENTIRE_REGION, ConfigDef.Type.BOOLEAN, DEFAULT_LOAD_ENTIRE_REGION,
+        BATCH_SIZE_DOCUMENTATION);
+
+    configDef.define(
+        QUEUE_SIZE,
+        ConfigDef.Type.INT,
+        DEFAULT_QUEUE_SIZE,
         ConfigDef.Importance.MEDIUM,
-        "Determines if we should queue up all entries that currently exist in a region.  This allows us to copy existing region data.  Will be replayed whenever a task needs to re-register a cq");
+        QUEUE_SIZE_DOCUMENTATION);
+
+    configDef.define(LOAD_ENTIRE_REGION,
+        ConfigDef.Type.BOOLEAN,
+        DEFAULT_LOAD_ENTIRE_REGION,
+        ConfigDef.Importance.MEDIUM,
+        LOAD_ENTIRE_REGION_DOCUMENTATION);
+
     return configDef;
   }