Moved all string contansts to separate variables and classes
diff --git a/src/main/java/org/apache/geode/kafka/GeodeConnectorConfig.java b/src/main/java/org/apache/geode/kafka/GeodeConnectorConfig.java
index 05dd5f2..586817a 100644
--- a/src/main/java/org/apache/geode/kafka/GeodeConnectorConfig.java
+++ b/src/main/java/org/apache/geode/kafka/GeodeConnectorConfig.java
@@ -14,6 +14,19 @@
  */
 package org.apache.geode.kafka;
 
+import static org.apache.geode.kafka.utils.GeodeConfigurationConstants.DEFAULT_LOCATOR;
+import static org.apache.geode.kafka.utils.GeodeConfigurationConstants.DEFAULT_SECURITY_AUTH_INIT;
+import static org.apache.geode.kafka.utils.GeodeConfigurationConstants.LOCATORS;
+import static org.apache.geode.kafka.utils.GeodeConfigurationConstants.LOCATORS_DOCUMENTATION;
+import static org.apache.geode.kafka.utils.GeodeConfigurationConstants.SECURITY_CLIENT_AUTH_INIT;
+import static org.apache.geode.kafka.utils.GeodeConfigurationConstants.SECURITY_CLIENT_AUTH_INIT_DOCUMENTATION;
+import static org.apache.geode.kafka.utils.GeodeConfigurationConstants.SECURITY_PASSWORD;
+import static org.apache.geode.kafka.utils.GeodeConfigurationConstants.SECURITY_PASSWORD_DOCUMENTATION;
+import static org.apache.geode.kafka.utils.GeodeConfigurationConstants.SECURITY_USER;
+import static org.apache.geode.kafka.utils.GeodeConfigurationConstants.SECURITY_USER_DOCUMENTATION;
+import static org.apache.geode.kafka.utils.GeodeConfigurationConstants.TASK_ID;
+import static org.apache.geode.kafka.utils.GeodeConfigurationConstants.TASK_ID_DOCUMENTATION;
+
 import java.util.Arrays;
 import java.util.Collection;
 import java.util.HashMap;
@@ -23,6 +36,7 @@
 
 import org.apache.kafka.common.config.AbstractConfig;
 import org.apache.kafka.common.config.ConfigDef;
+import org.apache.kafka.common.config.types.Password;
 import org.apache.kafka.connect.storage.StringConverter;
 
 import org.apache.geode.annotations.VisibleForTesting;
@@ -30,40 +44,14 @@
 public class GeodeConnectorConfig extends AbstractConfig {
 
   // GeodeKafka Specific Configuration
-  /**
-   * Identifier for each task
-   */
-  public static final String TASK_ID = "GEODE_TASK_ID"; // One config per task
-  /**
-   * Specifies which Locators to connect to Apache Geode
-   */
-  private static final String LOCATORS = "locators";
-  private static final String DEFAULT_LOCATOR = "localhost[10334]";
-  private static final String SECURITY_CLIENT_AUTH_INIT = "security-client-auth-init";
-  private static final String DEFAULT_SECURITY_AUTH_INIT =
-      "org.apache.geode.kafka.security.SystemPropertyAuthInit";
-  private static final String SECURITY_USER = "security-username";
-  private static final String SECURITY_PASSWORD = "security-password";
-  private static final String TASK_ID_DOCUMENTATION = "Internally used to identify each task";
-  private static final String
-      LOCATORS_DOCUMENTATION =
-      "A comma separated string of locators that configure which locators to connect to";
-  private static final String
-      SECURITY_USER_DOCUMENTATION =
-      "Supply a username to be used to authenticate with Geode.  Will automatically set the security-client-auth-init to use a SystemPropertyAuthInit if one isn't supplied by the user";
-  private static final String SECURITY_PASSWORD_DOCUMENTATION = "Supply a password to be used to authenticate with Geode";
-  private static final String
-      SECURITY_CLIENT_AUTH_INIT_DOCUMENTATION =
-      "Point to the Java class that implements the [AuthInitialize Interface](https://geode.apache.org/docs/guide/19/managing/security/implementing_authentication.html)";
-
   public static final String DEFAULT_KEY_CONVERTER = StringConverter.class.getCanonicalName();
   public static final String DEFAULT_VALUE_CONVERTER = StringConverter.class.getCanonicalName();
 
   protected final int taskId;
   protected List<LocatorHostPort> locatorHostPorts;
-  private String securityClientAuthInit;
+  private Password securityClientAuthInit;
   private String securityUserName;
-  private String securityPassword;
+  private Password securityPassword;
 
   @VisibleForTesting
   protected GeodeConnectorConfig() {
@@ -74,10 +62,11 @@
   public GeodeConnectorConfig(ConfigDef configDef, Map<String, String> connectorProperties) {
     super(configDef, connectorProperties);
     taskId = getInt(TASK_ID);
-    locatorHostPorts = parseLocators(getString(GeodeConnectorConfig.LOCATORS));
+    locatorHostPorts = parseLocators(getString(LOCATORS));
     securityUserName = getString(SECURITY_USER);
-    securityPassword = getString(SECURITY_PASSWORD);
-    securityClientAuthInit = getString(SECURITY_CLIENT_AUTH_INIT);
+    securityPassword = getPassword(SECURITY_PASSWORD);
+    securityClientAuthInit = getPassword(SECURITY_CLIENT_AUTH_INIT);
+//    System.out.println(securityUserName + "NABA " + securityPassword.value() + "NABA" + securityClientAuthInit.value());
     // if we registered a username/password instead of auth init, we should use the default auth
     // init if one isn't specified
     if (usesSecurity()) {
@@ -196,7 +185,7 @@
   }
 
   public String getSecurityClientAuthInit() {
-    return securityClientAuthInit;
+    return securityClientAuthInit == null ? null : securityClientAuthInit.value();
   }
 
   public String getSecurityUserName() {
@@ -204,7 +193,7 @@
   }
 
   public String getSecurityPassword() {
-    return securityPassword;
+    return securityPassword == null ? null : securityPassword.value();
   }
 
   public boolean usesSecurity() {
diff --git a/src/main/java/org/apache/geode/kafka/GeodeContext.java b/src/main/java/org/apache/geode/kafka/GeodeContext.java
index 6be418f..418a476 100644
--- a/src/main/java/org/apache/geode/kafka/GeodeContext.java
+++ b/src/main/java/org/apache/geode/kafka/GeodeContext.java
@@ -14,9 +14,9 @@
  */
 package org.apache.geode.kafka;
 
-import static org.apache.geode.kafka.GeodeConnectorConfig.SECURITY_CLIENT_AUTH_INIT;
-import static org.apache.geode.kafka.GeodeConnectorConfig.SECURITY_PASSWORD;
-import static org.apache.geode.kafka.GeodeConnectorConfig.SECURITY_USER;
+import static org.apache.geode.kafka.utils.GeodeConfigurationConstants.SECURITY_CLIENT_AUTH_INIT;
+import static org.apache.geode.kafka.utils.GeodeConfigurationConstants.SECURITY_PASSWORD;
+import static org.apache.geode.kafka.utils.GeodeConfigurationConstants.SECURITY_USER;
 
 import java.util.List;
 
diff --git a/src/main/java/org/apache/geode/kafka/sink/GeodeKafkaSink.java b/src/main/java/org/apache/geode/kafka/sink/GeodeKafkaSink.java
index edf2f97..b09ba87 100644
--- a/src/main/java/org/apache/geode/kafka/sink/GeodeKafkaSink.java
+++ b/src/main/java/org/apache/geode/kafka/sink/GeodeKafkaSink.java
@@ -14,6 +14,8 @@
  */
 package org.apache.geode.kafka.sink;
 
+import static org.apache.geode.kafka.utils.GeodeConfigurationConstants.TASK_ID;
+
 import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.List;
@@ -23,8 +25,6 @@
 import org.apache.kafka.connect.connector.Task;
 import org.apache.kafka.connect.sink.SinkConnector;
 
-import org.apache.geode.kafka.GeodeConnectorConfig;
-
 public class GeodeKafkaSink extends SinkConnector {
   private Map<String, String> sharedProps;
 
@@ -46,7 +46,7 @@
     // have no control over partitioning in kafka and which tasks will fire
     for (int i = 0; i < maxTasks; i++) {
       Map<String, String> taskProps = new HashMap<>(sharedProps);
-      taskProps.put(GeodeConnectorConfig.TASK_ID, "" + i);
+      taskProps.put(TASK_ID, "" + i);
       taskConfigs.add(taskProps);
     }
 
diff --git a/src/main/java/org/apache/geode/kafka/sink/GeodeSinkConnectorConfig.java b/src/main/java/org/apache/geode/kafka/sink/GeodeSinkConnectorConfig.java
index 46cd6c0..cd12429 100644
--- a/src/main/java/org/apache/geode/kafka/sink/GeodeSinkConnectorConfig.java
+++ b/src/main/java/org/apache/geode/kafka/sink/GeodeSinkConnectorConfig.java
@@ -14,6 +14,13 @@
  */
 package org.apache.geode.kafka.sink;
 
+import static org.apache.geode.kafka.utils.GeodeSinkConfigurationConstants.DEFAULT_NULL_VALUES_MEAN_REMOVE;
+import static org.apache.geode.kafka.utils.GeodeSinkConfigurationConstants.DEFAULT_TOPIC_TO_REGION_BINDING;
+import static org.apache.geode.kafka.utils.GeodeSinkConfigurationConstants.NULL_VALUES_MEAN_REMOVE;
+import static org.apache.geode.kafka.utils.GeodeSinkConfigurationConstants.NULL_VALUES_MEAN_REMOVE_DOCUMENTATION;
+import static org.apache.geode.kafka.utils.GeodeSinkConfigurationConstants.TOPIC_TO_REGION_BINDINGS;
+import static org.apache.geode.kafka.utils.GeodeSinkConfigurationConstants.TOPIC_TO_REGION_BINDINGS_DOCUMENTATION;
+
 import java.util.List;
 import java.util.Map;
 
@@ -23,19 +30,6 @@
 
 public class GeodeSinkConnectorConfig extends GeodeConnectorConfig {
   public static final ConfigDef SINK_CONFIG_DEF = configurables();
-
-  // Used by sink
-  private static final String TOPIC_TO_REGION_BINDINGS = "topic-to-regions";
-  private static final String DEFAULT_TOPIC_TO_REGION_BINDING = "[gkcTopic:gkcRegion]";
-  private static final String NULL_VALUES_MEAN_REMOVE = "null-values-mean-remove";
-  private static final String DEFAULT_NULL_VALUES_MEAN_REMOVE = "true";
-  private static final String
-      NULL_VALUES_MEAN_REMOVE_DOCUMENTATION =
-      "If set to true, when topics send a SinkRecord with a null value, we will convert to an operation similar to region.remove instead of putting a null value into the region";
-  private static final String
-      TOPIC_TO_REGION_BINDINGS_DOCUMENTATION =
-      "A comma separated list of \"one topic to many regions\" bindings.  Each binding is surrounded by brackets. For example \"[topicName:regionName], [anotherTopic: regionName, anotherRegion]";
-
   private final Map<String, List<String>> topicToRegions;
   private final boolean nullValuesMeanRemove;
 
diff --git a/src/main/java/org/apache/geode/kafka/source/GeodeKafkaSource.java b/src/main/java/org/apache/geode/kafka/source/GeodeKafkaSource.java
index ca6dd0c..cdbe60f 100644
--- a/src/main/java/org/apache/geode/kafka/source/GeodeKafkaSource.java
+++ b/src/main/java/org/apache/geode/kafka/source/GeodeKafkaSource.java
@@ -15,6 +15,9 @@
 package org.apache.geode.kafka.source;
 
 import static org.apache.geode.kafka.source.GeodeSourceConnectorConfig.SOURCE_CONFIG_DEF;
+import static org.apache.geode.kafka.utils.GeodeSourceConfigurationConstants.CQS_TO_REGISTER;
+import static org.apache.geode.kafka.utils.GeodeSourceConfigurationConstants.REGION_TO_TOPIC_BINDINGS;
+import static org.apache.geode.kafka.utils.GeodeConfigurationConstants.TASK_ID;
 
 import java.util.ArrayList;
 import java.util.HashMap;
@@ -45,13 +48,13 @@
     List<String> bindings =
         GeodeConnectorConfig
             .parseStringByComma(
-                sharedProps.get(GeodeSourceConnectorConfig.REGION_TO_TOPIC_BINDINGS));
+                sharedProps.get(REGION_TO_TOPIC_BINDINGS));
     List<List<String>> bindingsPerTask = ConnectorUtils.groupPartitions(bindings, maxTasks);
 
     for (int i = 0; i < maxTasks; i++) {
       Map<String, String> taskProps = new HashMap<>(sharedProps);
-      taskProps.put(GeodeConnectorConfig.TASK_ID, "" + i);
-      taskProps.put(GeodeSourceConnectorConfig.CQS_TO_REGISTER,
+      taskProps.put(TASK_ID, "" + i);
+      taskProps.put(CQS_TO_REGISTER,
           GeodeConnectorConfig.reconstructString(bindingsPerTask.get(i)));
       taskConfigs.add(taskProps);
     }
diff --git a/src/main/java/org/apache/geode/kafka/source/GeodeKafkaSourceTask.java b/src/main/java/org/apache/geode/kafka/source/GeodeKafkaSourceTask.java
index 182efff..24cd531 100644
--- a/src/main/java/org/apache/geode/kafka/source/GeodeKafkaSourceTask.java
+++ b/src/main/java/org/apache/geode/kafka/source/GeodeKafkaSourceTask.java
@@ -14,6 +14,8 @@
  */
 package org.apache.geode.kafka.source;
 
+import static org.apache.geode.kafka.utils.GeodeSourceConfigurationConstants.REGION_PARTITION;
+
 import java.util.ArrayList;
 import java.util.Collection;
 import java.util.HashMap;
@@ -66,6 +68,7 @@
   @Override
   public void start(Map<String, String> props) {
     try {
+      System.out.println("NABA ::" + props);
       GeodeSourceConnectorConfig geodeConnectorConfig = new GeodeSourceConnectorConfig(props);
       logger.debug("GeodeKafkaSourceTask id:" + geodeConnectorConfig.getTaskId() + " starting");
       geodeContext = new GeodeContext();
@@ -175,9 +178,9 @@
   Map<String, Map<String, String>> createSourcePartitionsMap(Collection<String> regionNames) {
     return regionNames.stream().map(regionName -> {
       Map<String, String> sourcePartition = new HashMap<>();
-      sourcePartition.put(GeodeSourceConnectorConfig.REGION_PARTITION, regionName);
+      sourcePartition.put(REGION_PARTITION, regionName);
       return sourcePartition;
-    }).collect(Collectors.toMap(s -> s.get(GeodeSourceConnectorConfig.REGION_PARTITION), s -> s));
+    }).collect(Collectors.toMap(s -> s.get(REGION_PARTITION), s -> s));
   }
 
   String generateCqName(int taskId, String cqPrefix, String regionName) {
diff --git a/src/main/java/org/apache/geode/kafka/source/GeodeSourceConnectorConfig.java b/src/main/java/org/apache/geode/kafka/source/GeodeSourceConnectorConfig.java
index 05d4fd6..9b10970 100644
--- a/src/main/java/org/apache/geode/kafka/source/GeodeSourceConnectorConfig.java
+++ b/src/main/java/org/apache/geode/kafka/source/GeodeSourceConnectorConfig.java
@@ -14,6 +14,30 @@
  */
 package org.apache.geode.kafka.source;
 
+import static org.apache.geode.kafka.utils.GeodeSourceConfigurationConstants.BATCH_SIZE;
+import static org.apache.geode.kafka.utils.GeodeSourceConfigurationConstants.BATCH_SIZE_DOCUMENTATION;
+import static org.apache.geode.kafka.utils.GeodeSourceConfigurationConstants.CQS_TO_REGISTER;
+import static org.apache.geode.kafka.utils.GeodeSourceConfigurationConstants.CQS_TO_REGISTER_DOCUMENTATION;
+import static org.apache.geode.kafka.utils.GeodeSourceConfigurationConstants.CQ_PREFIX;
+import static org.apache.geode.kafka.utils.GeodeSourceConfigurationConstants.CQ_PREFIX_DOCUMENTATION;
+import static org.apache.geode.kafka.utils.GeodeSourceConfigurationConstants.DEFAULT_BATCH_SIZE;
+import static org.apache.geode.kafka.utils.GeodeSourceConfigurationConstants.DEFAULT_CQ_PREFIX;
+import static org.apache.geode.kafka.utils.GeodeSourceConfigurationConstants.DEFAULT_DURABLE_CLIENT_ID;
+import static org.apache.geode.kafka.utils.GeodeSourceConfigurationConstants.DEFAULT_DURABLE_CLIENT_TIMEOUT;
+import static org.apache.geode.kafka.utils.GeodeSourceConfigurationConstants.DEFAULT_LOAD_ENTIRE_REGION;
+import static org.apache.geode.kafka.utils.GeodeSourceConfigurationConstants.DEFAULT_QUEUE_SIZE;
+import static org.apache.geode.kafka.utils.GeodeSourceConfigurationConstants.DEFAULT_REGION_TO_TOPIC_BINDING;
+import static org.apache.geode.kafka.utils.GeodeSourceConfigurationConstants.DURABLE_CLIENT_ID_PREFIX;
+import static org.apache.geode.kafka.utils.GeodeSourceConfigurationConstants.DURABLE_CLIENT_ID_PREFIX_DOCUMENTATION;
+import static org.apache.geode.kafka.utils.GeodeSourceConfigurationConstants.DURABLE_CLIENT_TIME_OUT;
+import static org.apache.geode.kafka.utils.GeodeSourceConfigurationConstants.DURABLE_CLIENT_TIME_OUT_DOCUMENTATION;
+import static org.apache.geode.kafka.utils.GeodeSourceConfigurationConstants.LOAD_ENTIRE_REGION;
+import static org.apache.geode.kafka.utils.GeodeSourceConfigurationConstants.LOAD_ENTIRE_REGION_DOCUMENTATION;
+import static org.apache.geode.kafka.utils.GeodeSourceConfigurationConstants.QUEUE_SIZE;
+import static org.apache.geode.kafka.utils.GeodeSourceConfigurationConstants.QUEUE_SIZE_DOCUMENTATION;
+import static org.apache.geode.kafka.utils.GeodeSourceConfigurationConstants.REGION_TO_TOPIC_BINDINGS;
+import static org.apache.geode.kafka.utils.GeodeSourceConfigurationConstants.REGION_TO_TOPIC_BINDINGS_DOCUMENTATION;
+
 import java.util.Collection;
 import java.util.List;
 import java.util.Map;
@@ -26,53 +50,6 @@
 
   public static final ConfigDef SOURCE_CONFIG_DEF = configurables();
 
-  // Geode Configuration
-  public static final String DURABLE_CLIENT_ID_PREFIX = "durable-client-id-prefix";
-  public static final String DEFAULT_DURABLE_CLIENT_ID = "";
-  public static final String DURABLE_CLIENT_TIME_OUT = "durable-client-timeout";
-  public static final String DEFAULT_DURABLE_CLIENT_TIMEOUT = "60000";
-
-  public static final String CQ_PREFIX = "cq-prefix";
-  public static final String DEFAULT_CQ_PREFIX = "cqForGeodeKafka";
-
-  /**
-   * Used as a key for source partitions
-   */
-  public static final String REGION_PARTITION = "regionPartition";
-  public static final String REGION_TO_TOPIC_BINDINGS = "region-to-topics";
-  public static final String DEFAULT_REGION_TO_TOPIC_BINDING = "[gkcRegion:gkcTopic]";
-  public static final String CQS_TO_REGISTER = "cqsToRegister"; // used internally so that only 1
-                                                                // task will register a cq
-
-  public static final String BATCH_SIZE = "geode-connector-batch-size";
-  public static final String DEFAULT_BATCH_SIZE = "100";
-
-  public static final String QUEUE_SIZE = "geode-connector-queue-size";
-  public static final String DEFAULT_QUEUE_SIZE = "10000";
-
-  public static final String LOAD_ENTIRE_REGION = "load-entire-region";
-  public static final String DEFAULT_LOAD_ENTIRE_REGION = "false";
-
-  private static final String
-      CQS_TO_REGISTER_DOCUMENTATION =
-      "Internally created and used parameter, for signalling a task to register CQs on Apache Geode";
-  private static final String
-      REGION_TO_TOPIC_BINDINGS_DOCUMENTATION =
-      "A comma separated list of \"one region to many topics\" mappings.  Each mapping is surrounded by brackets.  For example \"[regionName:topicName], \"[anotherRegion: topicName, anotherTopic]\"";
-  private static final String
-      DURABLE_CLIENT_ID_PREFIX_DOCUMENTATION =
-      "Prefix string for tasks to append to when registering as a durable client.  If empty string, will not register as a durable client";
-  private static final String
-      LOAD_ENTIRE_REGION_DOCUMENTATION =
-      "Determines if we should queue up all entries that currently exist in a region.  This allows us to copy existing region data.  Will be replayed whenever a task needs to re-register a CQ";
-  private static final String
-      DURABLE_CLIENT_TIME_OUT_DOCUMENTATION =
-      "How long in milliseconds to persist values in Geode's durable queue before the queue is invalidated";
-  private static final String CQ_PREFIX_DOCUMENTATION = "Prefix string to identify Connector CQ's on a Geode server";
-  private static final String BATCH_SIZE_DOCUMENTATION = "Maximum number of records to return on each poll";
-  private static final String
-      QUEUE_SIZE_DOCUMENTATION =
-      "Maximum number of entries in the connector queue before backing up all Geode CQ listeners sharing the task queue ";
 
   private final String durableClientId;
   private final String durableClientTimeout;
diff --git a/src/main/java/org/apache/geode/kafka/utils/GeodeConfigurationConstants.java b/src/main/java/org/apache/geode/kafka/utils/GeodeConfigurationConstants.java
new file mode 100644
index 0000000..a4a6811
--- /dev/null
+++ b/src/main/java/org/apache/geode/kafka/utils/GeodeConfigurationConstants.java
@@ -0,0 +1,30 @@
+package org.apache.geode.kafka.utils;
+
+import org.apache.kafka.common.config.types.Password;
+
+public class GeodeConfigurationConstants {
+  /**
+   * GEODE SPECIFIC CONFIGURATION
+   */
+  //Identifier for each task
+  public static final String TASK_ID = "GEODE_TASK_ID"; // One config per task
+  //Specifies which Locators to connect to Apache Geode
+  public static final String LOCATORS = "locators";
+  public static final String DEFAULT_LOCATOR = "localhost[10334]";
+  public static final String SECURITY_CLIENT_AUTH_INIT = "security-client-auth-init";
+  public static final Password DEFAULT_SECURITY_AUTH_INIT =
+      new Password("org.apache.geode.kafka.security.SystemPropertyAuthInit");
+  public static final String SECURITY_USER = "security-username";
+  public static final String SECURITY_PASSWORD = "security-password";
+  public static final String TASK_ID_DOCUMENTATION = "Internally used to identify each task";
+  public static final String
+      LOCATORS_DOCUMENTATION =
+      "A comma separated string of locators that configure which locators to connect to";
+  public static final String
+      SECURITY_USER_DOCUMENTATION =
+      "Supply a username to be used to authenticate with Geode.  Will automatically set the security-client-auth-init to use a SystemPropertyAuthInit if one isn't supplied by the user";
+  public static final String SECURITY_PASSWORD_DOCUMENTATION = "Supply a password to be used to authenticate with Geode";
+  public static final String
+      SECURITY_CLIENT_AUTH_INIT_DOCUMENTATION =
+      "Point to the Java class that implements the [AuthInitialize Interface](https://geode.apache.org/docs/guide/19/managing/security/implementing_authentication.html)";
+}
diff --git a/src/main/java/org/apache/geode/kafka/utils/GeodeSinkConfigurationConstants.java b/src/main/java/org/apache/geode/kafka/utils/GeodeSinkConfigurationConstants.java
new file mode 100644
index 0000000..e3e3a6f
--- /dev/null
+++ b/src/main/java/org/apache/geode/kafka/utils/GeodeSinkConfigurationConstants.java
@@ -0,0 +1,17 @@
+package org.apache.geode.kafka.utils;
+
+public class GeodeSinkConfigurationConstants {
+  /**
+   * SINK SPECIFIC CONFIGURATION
+   */
+  public static final String TOPIC_TO_REGION_BINDINGS = "topic-to-regions";
+  public static final String DEFAULT_TOPIC_TO_REGION_BINDING = "[gkcTopic:gkcRegion]";
+  public static final String NULL_VALUES_MEAN_REMOVE = "null-values-mean-remove";
+  public static final String DEFAULT_NULL_VALUES_MEAN_REMOVE = "true";
+  public static final String
+      NULL_VALUES_MEAN_REMOVE_DOCUMENTATION =
+      "If set to true, when topics send a SinkRecord with a null value, we will convert to an operation similar to region.remove instead of putting a null value into the region";
+  public static final String
+      TOPIC_TO_REGION_BINDINGS_DOCUMENTATION =
+      "A comma separated list of \"one topic to many regions\" bindings.  Each binding is surrounded by brackets. For example \"[topicName:regionName], [anotherTopic: regionName, anotherRegion]";
+}
diff --git a/src/main/java/org/apache/geode/kafka/utils/GeodeSourceConfigurationConstants.java b/src/main/java/org/apache/geode/kafka/utils/GeodeSourceConfigurationConstants.java
new file mode 100644
index 0000000..52f4a84
--- /dev/null
+++ b/src/main/java/org/apache/geode/kafka/utils/GeodeSourceConfigurationConstants.java
@@ -0,0 +1,46 @@
+package org.apache.geode.kafka.utils;
+
+public class GeodeSourceConfigurationConstants {
+  /**
+   * SOURCE SPECIFIC CONFIGURATIONS
+   */
+  // Geode Configuration
+  public static final String DURABLE_CLIENT_ID_PREFIX = "durable-client-id-prefix";
+  public static final String DEFAULT_DURABLE_CLIENT_ID = "";
+  public static final String DURABLE_CLIENT_TIME_OUT = "durable-client-timeout";
+  public static final String DEFAULT_DURABLE_CLIENT_TIMEOUT = "60000";
+  public static final String CQ_PREFIX = "cq-prefix";
+  public static final String DEFAULT_CQ_PREFIX = "cqForGeodeKafka";
+  //Used as a key for source partitions
+  public static final String REGION_PARTITION = "regionPartition";
+  public static final String REGION_TO_TOPIC_BINDINGS = "region-to-topics";
+  public static final String DEFAULT_REGION_TO_TOPIC_BINDING = "[gkcRegion:gkcTopic]";
+  public static final String CQS_TO_REGISTER = "cqsToRegister"; // used internally so that only 1
+  // task will register a cq
+  public static final String BATCH_SIZE = "geode-connector-batch-size";
+  public static final String DEFAULT_BATCH_SIZE = "100";
+  public static final String QUEUE_SIZE = "geode-connector-queue-size";
+  public static final String DEFAULT_QUEUE_SIZE = "10000";
+  public static final String LOAD_ENTIRE_REGION = "load-entire-region";
+  public static final String DEFAULT_LOAD_ENTIRE_REGION = "false";
+  public static final String
+      CQS_TO_REGISTER_DOCUMENTATION =
+      "Internally created and used parameter, for signalling a task to register CQs on Apache Geode";
+  public static final String
+      REGION_TO_TOPIC_BINDINGS_DOCUMENTATION =
+      "A comma separated list of \"one region to many topics\" mappings.  Each mapping is surrounded by brackets.  For example \"[regionName:topicName], \"[anotherRegion: topicName, anotherTopic]\"";
+  public static final String
+      DURABLE_CLIENT_ID_PREFIX_DOCUMENTATION =
+      "Prefix string for tasks to append to when registering as a durable client.  If empty string, will not register as a durable client";
+  public static final String
+      LOAD_ENTIRE_REGION_DOCUMENTATION =
+      "Determines if we should queue up all entries that currently exist in a region.  This allows us to copy existing region data.  Will be replayed whenever a task needs to re-register a CQ";
+  public static final String
+      DURABLE_CLIENT_TIME_OUT_DOCUMENTATION =
+      "How long in milliseconds to persist values in Geode's durable queue before the queue is invalidated";
+  public static final String CQ_PREFIX_DOCUMENTATION = "Prefix string to identify Connector CQ's on a Geode server";
+  public static final String BATCH_SIZE_DOCUMENTATION = "Maximum number of records to return on each poll";
+  public static final String
+      QUEUE_SIZE_DOCUMENTATION =
+      "Maximum number of entries in the connector queue before backing up all Geode CQ listeners sharing the task queue ";
+}
diff --git a/src/test/java/org/apache/geode/kafka/GeodeConnectorConfigTest.java b/src/test/java/org/apache/geode/kafka/GeodeConnectorConfigTest.java
index 4461dc4..ccc946f 100644
--- a/src/test/java/org/apache/geode/kafka/GeodeConnectorConfigTest.java
+++ b/src/test/java/org/apache/geode/kafka/GeodeConnectorConfigTest.java
@@ -16,6 +16,8 @@
 
 
 import static org.apache.geode.kafka.GeodeConnectorConfig.parseStringByComma;
+import static org.apache.geode.kafka.utils.GeodeConfigurationConstants.SECURITY_CLIENT_AUTH_INIT;
+import static org.apache.geode.kafka.utils.GeodeConfigurationConstants.SECURITY_USER;
 import static org.hamcrest.CoreMatchers.allOf;
 import static org.hamcrest.CoreMatchers.is;
 import static org.junit.Assert.assertEquals;
@@ -134,7 +136,7 @@
   @Test
   public void usesSecurityShouldBeTrueIfSecurityUserSet() {
     Map<String, String> props = new HashMap<>();
-    props.put(GeodeConnectorConfig.SECURITY_USER, "some user");
+    props.put(SECURITY_USER, "some user");
     GeodeConnectorConfig config =
         new GeodeConnectorConfig(GeodeConnectorConfig.configurables(), props);
     assertTrue(config.usesSecurity());
@@ -143,7 +145,7 @@
   @Test
   public void usesSecurityShouldBeTrueIfSecurityClientAuthInitSet() {
     Map<String, String> props = new HashMap<>();
-    props.put(GeodeConnectorConfig.SECURITY_CLIENT_AUTH_INIT, "some_class");
+    props.put(SECURITY_CLIENT_AUTH_INIT, "some_class");
     GeodeConnectorConfig config =
         new GeodeConnectorConfig(GeodeConnectorConfig.configurables(), props);
     assertTrue(config.usesSecurity());
@@ -160,7 +162,7 @@
   @Test
   public void securityClientAuthInitShouldBeSetIfUserIsSet() {
     Map<String, String> props = new HashMap<>();
-    props.put(GeodeConnectorConfig.SECURITY_USER, "some user");
+    props.put(SECURITY_USER, "some user");
     GeodeConnectorConfig config =
         new GeodeConnectorConfig(GeodeConnectorConfig.configurables(), props);
     assertNotNull(config.getSecurityClientAuthInit());
diff --git a/src/test/java/org/apache/geode/kafka/security/SystemPropertyAuthInitTest.java b/src/test/java/org/apache/geode/kafka/security/SystemPropertyAuthInitTest.java
index 1ab05cc..648d77e 100644
--- a/src/test/java/org/apache/geode/kafka/security/SystemPropertyAuthInitTest.java
+++ b/src/test/java/org/apache/geode/kafka/security/SystemPropertyAuthInitTest.java
@@ -14,8 +14,8 @@
  */
 package org.apache.geode.kafka.security;
 
-import static org.apache.geode.kafka.GeodeConnectorConfig.SECURITY_PASSWORD;
-import static org.apache.geode.kafka.GeodeConnectorConfig.SECURITY_USER;
+import static org.apache.geode.kafka.utils.GeodeConfigurationConstants.SECURITY_PASSWORD;
+import static org.apache.geode.kafka.utils.GeodeConfigurationConstants.SECURITY_USER;
 import static org.junit.Assert.assertEquals;
 
 import java.util.Properties;
diff --git a/src/test/java/org/apache/geode/kafka/sink/GeodeKafkaSinkTaskTest.java b/src/test/java/org/apache/geode/kafka/sink/GeodeKafkaSinkTaskTest.java
index 85b9954..c5e5056 100644
--- a/src/test/java/org/apache/geode/kafka/sink/GeodeKafkaSinkTaskTest.java
+++ b/src/test/java/org/apache/geode/kafka/sink/GeodeKafkaSinkTaskTest.java
@@ -14,8 +14,10 @@
  */
 package org.apache.geode.kafka.sink;
 
-import static org.apache.geode.kafka.sink.GeodeSinkConnectorConfig.NULL_VALUES_MEAN_REMOVE;
-import static org.apache.geode.kafka.sink.GeodeSinkConnectorConfig.TOPIC_TO_REGION_BINDINGS;
+import static org.apache.geode.kafka.utils.GeodeConfigurationConstants.LOCATORS;
+import static org.apache.geode.kafka.utils.GeodeConfigurationConstants.TASK_ID;
+import static org.apache.geode.kafka.utils.GeodeSinkConfigurationConstants.NULL_VALUES_MEAN_REMOVE;
+import static org.apache.geode.kafka.utils.GeodeSinkConfigurationConstants.TOPIC_TO_REGION_BINDINGS;
 import static org.junit.Assert.assertNotNull;
 import static org.junit.Assert.assertTrue;
 import static org.mockito.Mockito.mock;
@@ -31,16 +33,15 @@
 import org.junit.Test;
 
 import org.apache.geode.cache.Region;
-import org.apache.geode.kafka.GeodeConnectorConfig;
 
 public class GeodeKafkaSinkTaskTest {
 
   private HashMap<String, String> createTestSinkProps() {
     HashMap<String, String> props = new HashMap<>();
     props.put(TOPIC_TO_REGION_BINDINGS, "[topic:region]");
-    props.put(GeodeConnectorConfig.TASK_ID, "0");
+    props.put(TASK_ID, "0");
     props.put(NULL_VALUES_MEAN_REMOVE, "true");
-    props.put(GeodeConnectorConfig.LOCATORS, "localhost[10334]");
+    props.put(LOCATORS, "localhost[10334]");
     return props;
   }
 
diff --git a/src/test/java/org/apache/geode/kafka/sink/GeodeKafkaSinkTest.java b/src/test/java/org/apache/geode/kafka/sink/GeodeKafkaSinkTest.java
index d3974cb..170bbbb 100644
--- a/src/test/java/org/apache/geode/kafka/sink/GeodeKafkaSinkTest.java
+++ b/src/test/java/org/apache/geode/kafka/sink/GeodeKafkaSinkTest.java
@@ -14,7 +14,7 @@
  */
 package org.apache.geode.kafka.sink;
 
-import static org.apache.geode.kafka.sink.GeodeSinkConnectorConfig.TOPIC_TO_REGION_BINDINGS;
+import static org.apache.geode.kafka.utils.GeodeSinkConfigurationConstants.TOPIC_TO_REGION_BINDINGS;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertTrue;
 
@@ -25,7 +25,7 @@
 
 import org.junit.Test;
 
-import org.apache.geode.kafka.GeodeConnectorConfig;
+import org.apache.geode.kafka.utils.GeodeConfigurationConstants;
 
 public class GeodeKafkaSinkTest {
 
@@ -65,7 +65,7 @@
     Collection<Map<String, String>> tasks = sink.taskConfigs(5);
     HashSet<String> seenIds = new HashSet<>();
     for (Map<String, String> taskProp : tasks) {
-      assertTrue(seenIds.add(taskProp.get(GeodeConnectorConfig.TASK_ID)));
+      assertTrue(seenIds.add(taskProp.get(GeodeConfigurationConstants.TASK_ID)));
     }
   }
 }
diff --git a/src/test/java/org/apache/geode/kafka/source/GeodeKafkaSourceTaskTest.java b/src/test/java/org/apache/geode/kafka/source/GeodeKafkaSourceTaskTest.java
index a85786b..411316c 100644
--- a/src/test/java/org/apache/geode/kafka/source/GeodeKafkaSourceTaskTest.java
+++ b/src/test/java/org/apache/geode/kafka/source/GeodeKafkaSourceTaskTest.java
@@ -14,8 +14,8 @@
  */
 package org.apache.geode.kafka.source;
 
-import static org.apache.geode.kafka.source.GeodeSourceConnectorConfig.DEFAULT_CQ_PREFIX;
-import static org.apache.geode.kafka.source.GeodeSourceConnectorConfig.REGION_PARTITION;
+import static org.apache.geode.kafka.utils.GeodeSourceConfigurationConstants.DEFAULT_CQ_PREFIX;
+import static org.apache.geode.kafka.utils.GeodeSourceConfigurationConstants.REGION_PARTITION;
 import static org.hamcrest.CoreMatchers.is;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertThat;
diff --git a/src/test/java/org/apache/geode/kafka/source/GeodeKafkaSourceTest.java b/src/test/java/org/apache/geode/kafka/source/GeodeKafkaSourceTest.java
index 3019101..02f2790 100644
--- a/src/test/java/org/apache/geode/kafka/source/GeodeKafkaSourceTest.java
+++ b/src/test/java/org/apache/geode/kafka/source/GeodeKafkaSourceTest.java
@@ -14,7 +14,8 @@
  */
 package org.apache.geode.kafka.source;
 
-import static org.apache.geode.kafka.source.GeodeSourceConnectorConfig.REGION_TO_TOPIC_BINDINGS;
+import static org.apache.geode.kafka.utils.GeodeConfigurationConstants.TASK_ID;
+import static org.apache.geode.kafka.utils.GeodeSourceConfigurationConstants.REGION_TO_TOPIC_BINDINGS;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertTrue;
 
@@ -25,8 +26,6 @@
 
 import org.junit.Test;
 
-import org.apache.geode.kafka.GeodeConnectorConfig;
-
 public class GeodeKafkaSourceTest {
 
   @Test
@@ -60,7 +59,7 @@
     Collection<Map<String, String>> tasks = sink.taskConfigs(5);
     HashSet<String> seenIds = new HashSet<>();
     for (Map<String, String> taskProp : tasks) {
-      assertTrue(seenIds.add(taskProp.get(GeodeConnectorConfig.TASK_ID)));
+      assertTrue(seenIds.add(taskProp.get(TASK_ID)));
     }
   }
 
diff --git a/src/test/java/org/apache/geode/kafka/source/GeodeSourceConnectorConfigTest.java b/src/test/java/org/apache/geode/kafka/source/GeodeSourceConnectorConfigTest.java
index d543d23..ea43585 100644
--- a/src/test/java/org/apache/geode/kafka/source/GeodeSourceConnectorConfigTest.java
+++ b/src/test/java/org/apache/geode/kafka/source/GeodeSourceConnectorConfigTest.java
@@ -14,7 +14,9 @@
  */
 package org.apache.geode.kafka.source;
 
-import static org.apache.geode.kafka.source.GeodeSourceConnectorConfig.DURABLE_CLIENT_ID_PREFIX;
+import static org.apache.geode.kafka.utils.GeodeConfigurationConstants.LOCATORS;
+import static org.apache.geode.kafka.utils.GeodeConfigurationConstants.TASK_ID;
+import static org.apache.geode.kafka.utils.GeodeSourceConfigurationConstants.DURABLE_CLIENT_ID_PREFIX;
 import static org.junit.Assert.assertEquals;
 
 import java.util.HashMap;
@@ -22,16 +24,14 @@
 
 import org.junit.Test;
 
-import org.apache.geode.kafka.GeodeConnectorConfig;
-
 public class GeodeSourceConnectorConfigTest {
 
   @Test
   public void durableClientIdShouldNotBeSetIfPrefixIsEmpty() {
     Map<String, String> props = new HashMap<>();
-    props.put(GeodeConnectorConfig.TASK_ID, "0");
+    props.put(TASK_ID, "0");
     props.put(DURABLE_CLIENT_ID_PREFIX, "");
-    props.put(GeodeConnectorConfig.LOCATORS, "localhost[10334]");
+    props.put(LOCATORS, "localhost[10334]");
     GeodeSourceConnectorConfig config = new GeodeSourceConnectorConfig(props);
     assertEquals("", config.getDurableClientId());
   }
diff --git a/src/test/java/org/apache/geode/kafka/utilities/WorkerAndHerderWrapper.java b/src/test/java/org/apache/geode/kafka/utilities/WorkerAndHerderWrapper.java
index 5c7a7e4..8e39e6c 100644
--- a/src/test/java/org/apache/geode/kafka/utilities/WorkerAndHerderWrapper.java
+++ b/src/test/java/org/apache/geode/kafka/utilities/WorkerAndHerderWrapper.java
@@ -14,7 +14,8 @@
  */
 package org.apache.geode.kafka.utilities;
 
-import static org.apache.geode.kafka.source.GeodeSourceConnectorConfig.REGION_TO_TOPIC_BINDINGS;
+import static org.apache.geode.kafka.utils.GeodeConfigurationConstants.LOCATORS;
+import static org.apache.geode.kafka.utils.GeodeSourceConfigurationConstants.REGION_TO_TOPIC_BINDINGS;
 
 import java.util.Arrays;
 import java.util.HashMap;
@@ -33,10 +34,10 @@
 import org.apache.kafka.connect.storage.MemoryOffsetBackingStore;
 import org.apache.kafka.connect.util.ConnectUtils;
 
-import org.apache.geode.kafka.GeodeConnectorConfig;
 import org.apache.geode.kafka.sink.GeodeKafkaSink;
-import org.apache.geode.kafka.sink.GeodeSinkConnectorConfig;
 import org.apache.geode.kafka.source.GeodeKafkaSource;
+import org.apache.geode.kafka.utils.GeodeConfigurationConstants;
+import org.apache.geode.kafka.utils.GeodeSinkConfigurationConstants;
 
 public class WorkerAndHerderWrapper {
 
@@ -80,7 +81,7 @@
     props.putAll(valueConverterProps);
     props.put("key.converter.schemas.enable", "false");
     props.put("value.converter.schemas.enable", "false");
-    props.put(GeodeConnectorConfig.LOCATORS, locatorString);
+    props.put(LOCATORS, locatorString);
     WorkerConfig workerCfg = new StandaloneConfig(props);
 
     MemoryOffsetBackingStore offBackingStore = new MemoryOffsetBackingStore();
@@ -98,7 +99,7 @@
     sourceProps.put(ConnectorConfig.CONNECTOR_CLASS_CONFIG, GeodeKafkaSource.class.getName());
     sourceProps.put(ConnectorConfig.NAME_CONFIG, "geode-kafka-source-connector");
     sourceProps.put(ConnectorConfig.TASKS_MAX_CONFIG, maxTasks);
-    sourceProps.put(GeodeConnectorConfig.LOCATORS, locatorString);
+    sourceProps.put(GeodeConfigurationConstants.LOCATORS, locatorString);
     sourceProps.put(REGION_TO_TOPIC_BINDINGS, regionToTopicBinding);
 
     herder.putConnectorConfig(
@@ -110,8 +111,8 @@
     sinkProps.put(ConnectorConfig.CONNECTOR_CLASS_CONFIG, GeodeKafkaSink.class.getName());
     sinkProps.put(ConnectorConfig.NAME_CONFIG, "geode-kafka-sink-connector");
     sinkProps.put(ConnectorConfig.TASKS_MAX_CONFIG, maxTasks);
-    sinkProps.put(GeodeSinkConnectorConfig.TOPIC_TO_REGION_BINDINGS, topicToRegionBinding);
-    sinkProps.put(GeodeConnectorConfig.LOCATORS, locatorString);
+    sinkProps.put(GeodeSinkConfigurationConstants.TOPIC_TO_REGION_BINDINGS, topicToRegionBinding);
+    sinkProps.put(GeodeConfigurationConstants.LOCATORS, locatorString);
     sinkProps.put("topics", sinkTopic);
 
     herder.putConnectorConfig(