Modified to use ConfigDef and default values
bumped down the version to 1.9 for now
Added system property security auth init implementation that retries username and password from system properties
diff --git a/build.gradle b/build.gradle
index b095ada..2b7f853 100644
--- a/build.gradle
+++ b/build.gradle
@@ -61,8 +61,8 @@
 }
 
 dependencies {
-    compile('org.apache.geode:geode-core:1.10.0')
-    compile('org.apache.geode:geode-cq:1.10.0')
+    compile('org.apache.geode:geode-core:1.9.0')
+    compile('org.apache.geode:geode-cq:1.9.0')
     compile(group: 'org.apache.kafka', name: 'connect-api', version: '2.3.1')
     compile(group: 'org.apache.logging.log4j', name: 'log4j-api', version: '2.13.0')
     compile(group: 'org.apache.logging.log4j', name: 'log4j-core', version: '2.13.0')
diff --git a/src/main/java/org/geode/kafka/GeodeConnectorConfig.java b/src/main/java/org/geode/kafka/GeodeConnectorConfig.java
index 62ee160..24efde5 100644
--- a/src/main/java/org/geode/kafka/GeodeConnectorConfig.java
+++ b/src/main/java/org/geode/kafka/GeodeConnectorConfig.java
@@ -14,6 +14,9 @@
  */
 package org.geode.kafka;
 
+import org.apache.kafka.common.config.AbstractConfig;
+import org.apache.kafka.common.config.ConfigDef;
+
 import java.util.Arrays;
 import java.util.Collection;
 import java.util.HashMap;
@@ -21,7 +24,7 @@
 import java.util.Map;
 import java.util.stream.Collectors;
 
-public class GeodeConnectorConfig {
+public class GeodeConnectorConfig extends AbstractConfig {
 
   // GeodeKafka Specific Configuration
   /**
@@ -34,19 +37,50 @@
   public static final String LOCATORS = "locators";
   public static final String DEFAULT_LOCATOR = "localhost[10334]";
   public static final String SECURITY_CLIENT_AUTH_INIT = "security-client-auth-init";
+  private static final String DEFAULT_SECURITY_AUTH_INIT = "geode.kafka.SystemPropertyAuthInit";
+  public static final String SECURITY_USER = "securityUsername";
+  public static final String SECURITY_PASSWORD= "securityPassword";
 
   protected final int taskId;
   protected List<LocatorHostPort> locatorHostPorts;
   private String securityClientAuthInit;
+  private String securityUser;
+  private String securityPassword;
 
+  //Just for testing
   protected GeodeConnectorConfig() {
+    super(new ConfigDef(), new HashMap());
     taskId = 0;
   }
 
-  public GeodeConnectorConfig(Map<String, String> connectorProperties) {
-    taskId = Integer.parseInt(connectorProperties.get(TASK_ID));
-    locatorHostPorts = parseLocators(connectorProperties.get(GeodeConnectorConfig.LOCATORS));
-    securityClientAuthInit = connectorProperties.get(SECURITY_CLIENT_AUTH_INIT);
+  //Just for testing
+  protected GeodeConnectorConfig(Map<String, String> props) {
+    super(new ConfigDef(), props);
+    taskId = 0;
+  }
+
+
+  public GeodeConnectorConfig(ConfigDef configDef, Map<String, String> connectorProperties) {
+    super(configDef, connectorProperties);
+    taskId = getInt(TASK_ID);
+    locatorHostPorts = parseLocators(getString(GeodeConnectorConfig.LOCATORS));
+    securityUser = getString(SECURITY_USER);
+    securityPassword = getString(SECURITY_PASSWORD);
+    securityClientAuthInit = getString(SECURITY_CLIENT_AUTH_INIT);
+    //if we registered a username/password instead of auth init, we should use the default auth init if one isn't specified
+    if (usesSecurity()) {
+      securityClientAuthInit = securityClientAuthInit != null ? securityClientAuthInit : DEFAULT_SECURITY_AUTH_INIT;
+    }
+  }
+
+  protected static ConfigDef configurables() {
+    ConfigDef configDef = new ConfigDef();
+    configDef.define(TASK_ID, ConfigDef.Type.INT,  "0", ConfigDef.Importance.MEDIUM,"");
+    configDef.define(LOCATORS, ConfigDef.Type.STRING, DEFAULT_LOCATOR, ConfigDef.Importance.HIGH, "");
+    configDef.define(SECURITY_USER, ConfigDef.Type.STRING, null, ConfigDef.Importance.HIGH, "");
+    configDef.define(SECURITY_PASSWORD, ConfigDef.Type.STRING, null, ConfigDef.Importance.HIGH, "");
+    configDef.define(SECURITY_CLIENT_AUTH_INIT, ConfigDef.Type.STRING, null, ConfigDef.Importance.HIGH, "");
+    return configDef;
   }
 
 
@@ -125,4 +159,8 @@
   public String getSecurityClientAuthInit() {
     return securityClientAuthInit;
   }
+
+  public boolean usesSecurity() {
+    return securityClientAuthInit != null || securityUser != null;
+  }
 }
diff --git a/src/main/java/org/geode/kafka/GeodeContext.java b/src/main/java/org/geode/kafka/GeodeContext.java
index d844581..2858e4d 100644
--- a/src/main/java/org/geode/kafka/GeodeContext.java
+++ b/src/main/java/org/geode/kafka/GeodeContext.java
@@ -27,6 +27,8 @@
 import org.apache.geode.cache.query.CqQuery;
 import org.apache.geode.cache.query.RegionNotFoundException;
 
+import static org.geode.kafka.GeodeConnectorConfig.SECURITY_CLIENT_AUTH_INIT;
+
 public class GeodeContext {
 
   private ClientCache clientCache;
@@ -35,15 +37,15 @@
   public GeodeContext() {}
 
   public ClientCache connectClient(List<LocatorHostPort> locatorHostPortList,
-      String durableClientId, String durableClientTimeout, String securityAuthInit) {
+      String durableClientId, String durableClientTimeout, String securityAuthInit, boolean usesSecurity) {
     clientCache = createClientCache(locatorHostPortList, durableClientId, durableClientTimeout,
-        securityAuthInit);
+        securityAuthInit, usesSecurity);
     return clientCache;
   }
 
   public ClientCache connectClient(List<LocatorHostPort> locatorHostPortList,
-      String securityAuthInit) {
-    clientCache = createClientCache(locatorHostPortList, "", "", securityAuthInit);
+      String securityAuthInit, boolean usesSecurity) {
+    clientCache = createClientCache(locatorHostPortList, "", "", securityAuthInit, usesSecurity);
     return clientCache;
   }
 
@@ -52,11 +54,11 @@
   }
 
   public ClientCache createClientCache(List<LocatorHostPort> locators, String durableClientName,
-      String durableClientTimeOut, String securityAuthInit) {
+      String durableClientTimeOut, String securityAuthInit, boolean usesSecurity) {
     ClientCacheFactory ccf = new ClientCacheFactory();
 
-    if (securityAuthInit != null) {
-      ccf.set(GeodeConnectorConfig.SECURITY_CLIENT_AUTH_INIT, securityAuthInit);
+    if (usesSecurity ) {
+      ccf.set(SECURITY_CLIENT_AUTH_INIT, securityAuthInit);
     }
     if (!durableClientName.equals("")) {
       ccf.set("durable-client-id", durableClientName)
diff --git a/src/main/java/geode/kafka/example/ExampleAuthInit.java b/src/main/java/org/geode/kafka/security/SystemPropertyAuthInit.java
similarity index 71%
rename from src/main/java/geode/kafka/example/ExampleAuthInit.java
rename to src/main/java/org/geode/kafka/security/SystemPropertyAuthInit.java
index da9e53d..db48699 100644
--- a/src/main/java/geode/kafka/example/ExampleAuthInit.java
+++ b/src/main/java/org/geode/kafka/security/SystemPropertyAuthInit.java
@@ -12,28 +12,23 @@
  * or implied. See the License for the specific language governing permissions and limitations under
  * the License.
  */
-package geode.kafka.example;
+package org.geode.kafka.security;
 
 import java.util.Properties;
 
 import org.apache.geode.distributed.DistributedMember;
 import org.apache.geode.security.AuthInitialize;
 import org.apache.geode.security.AuthenticationFailedException;
+import org.geode.kafka.GeodeConnectorConfig;
 
-/**
- * This is purely for example purposes and used in conjunction with the SimpleSecurityManager in
- * Apache Geode
- * DO NOT USE THIS AS A REAL WORLD SOLUTION
- */
-public class ExampleAuthInit implements AuthInitialize {
+
+public class SystemPropertyAuthInit implements AuthInitialize {
   @Override
   public Properties getCredentials(Properties securityProps, DistributedMember server,
       boolean isPeer) throws AuthenticationFailedException {
     Properties extractedProperties = new Properties();
-    // Do not do this in real use case. This is hardcoded and sets the user name and password for
-    // all users
-    extractedProperties.put("security-username", "Bearer");
-    extractedProperties.put("security-password", "Bearer");
+    extractedProperties.put("security-username", System.getProperty(GeodeConnectorConfig.SECURITY_USER));
+    extractedProperties.put("security-password", System.getProperty(GeodeConnectorConfig.SECURITY_PASSWORD));
     return extractedProperties;
   }
 }
diff --git a/src/main/java/org/geode/kafka/sink/GeodeKafkaSink.java b/src/main/java/org/geode/kafka/sink/GeodeKafkaSink.java
index bc768b1..a8985c2 100644
--- a/src/main/java/org/geode/kafka/sink/GeodeKafkaSink.java
+++ b/src/main/java/org/geode/kafka/sink/GeodeKafkaSink.java
@@ -24,13 +24,14 @@
 import org.apache.kafka.connect.connector.Task;
 import org.apache.kafka.connect.sink.SinkConnector;
 
+import static org.geode.kafka.sink.GeodeSinkConnectorConfig.SINK_CONFIG_DEF;
+
 public class GeodeKafkaSink extends SinkConnector {
-  private static final ConfigDef CONFIG_DEF = new ConfigDef();
   private Map<String, String> sharedProps;
 
   @Override
   public void start(Map<String, String> props) {
-    sharedProps = computeMissingConfigurations(props);
+    sharedProps = props;
   }
 
   @Override
@@ -61,7 +62,7 @@
 
   @Override
   public ConfigDef config() {
-    return CONFIG_DEF;
+    return SINK_CONFIG_DEF;
   }
 
   @Override
@@ -70,12 +71,4 @@
     return "unknown";
   }
 
-
-  private Map<String, String> computeMissingConfigurations(Map<String, String> props) {
-    props.computeIfAbsent(
-        GeodeConnectorConfig.LOCATORS, (key) -> GeodeConnectorConfig.DEFAULT_LOCATOR);
-    props.computeIfAbsent(
-        GeodeSinkConnectorConfig.NULL_VALUES_MEAN_REMOVE, (key) -> GeodeSinkConnectorConfig.DEFAULT_NULL_VALUES_MEAN_REMOVE);
-    return props;
-  }
 }
diff --git a/src/main/java/org/geode/kafka/sink/GeodeKafkaSinkTask.java b/src/main/java/org/geode/kafka/sink/GeodeKafkaSinkTask.java
index b019d80..4552e09 100644
--- a/src/main/java/org/geode/kafka/sink/GeodeKafkaSinkTask.java
+++ b/src/main/java/org/geode/kafka/sink/GeodeKafkaSinkTask.java
@@ -61,7 +61,7 @@
       configure(geodeConnectorConfig);
       geodeContext = new GeodeContext();
       geodeContext.connectClient(geodeConnectorConfig.getLocatorHostPorts(),
-          geodeConnectorConfig.getSecurityClientAuthInit());
+          geodeConnectorConfig.getSecurityClientAuthInit(), geodeConnectorConfig.usesSecurity());
       regionNameToRegion = createProxyRegions(topicToRegions.values());
     } catch (Exception e) {
       logger.error("Unable to start sink task", e);
diff --git a/src/main/java/org/geode/kafka/sink/GeodeSinkConnectorConfig.java b/src/main/java/org/geode/kafka/sink/GeodeSinkConnectorConfig.java
index 558b874..bb51b0e 100644
--- a/src/main/java/org/geode/kafka/sink/GeodeSinkConnectorConfig.java
+++ b/src/main/java/org/geode/kafka/sink/GeodeSinkConnectorConfig.java
@@ -14,14 +14,19 @@
  */
 package org.geode.kafka.sink;
 
+import org.apache.kafka.common.config.ConfigDef;
+
 import java.util.List;
 import java.util.Map;
 
 import org.geode.kafka.GeodeConnectorConfig;
 
 public class GeodeSinkConnectorConfig extends GeodeConnectorConfig {
+  public static final ConfigDef SINK_CONFIG_DEF = configurables();
+
   // Used by sink
   public static final String TOPIC_TO_REGION_BINDINGS = "topicToRegions";
+  public static final String DEFAULT_TOPIC_TO_REGION_BINDING = "[gkcTopic:gkcRegion]";
   public static final String NULL_VALUES_MEAN_REMOVE = "nullValuesMeanRemove";
   public static final String DEFAULT_NULL_VALUES_MEAN_REMOVE = "true";
 
@@ -29,9 +34,16 @@
   private final boolean nullValuesMeanRemove;
 
   public GeodeSinkConnectorConfig(Map<String, String> connectorProperties) {
-    super(connectorProperties);
-    topicToRegions = parseTopicToRegions(connectorProperties.get(TOPIC_TO_REGION_BINDINGS));
-    nullValuesMeanRemove = Boolean.parseBoolean(connectorProperties.get(NULL_VALUES_MEAN_REMOVE));
+    super(SINK_CONFIG_DEF, connectorProperties);
+    topicToRegions = parseTopicToRegions(getString(TOPIC_TO_REGION_BINDINGS));
+    nullValuesMeanRemove = getBoolean(NULL_VALUES_MEAN_REMOVE);
+  }
+
+  protected static ConfigDef configurables() {
+    ConfigDef configDef = GeodeConnectorConfig.configurables();
+    configDef.define(TOPIC_TO_REGION_BINDINGS, ConfigDef.Type.STRING, DEFAULT_TOPIC_TO_REGION_BINDING, ConfigDef.Importance.HIGH, "");
+    configDef.define(NULL_VALUES_MEAN_REMOVE, ConfigDef.Type.BOOLEAN, DEFAULT_NULL_VALUES_MEAN_REMOVE, ConfigDef.Importance.MEDIUM, "");
+    return configDef;
   }
 
   public Map<String, List<String>> getTopicToRegions() {
diff --git a/src/main/java/org/geode/kafka/source/GeodeKafkaSource.java b/src/main/java/org/geode/kafka/source/GeodeKafkaSource.java
index be4e7a5..dac94f6 100644
--- a/src/main/java/org/geode/kafka/source/GeodeKafkaSource.java
+++ b/src/main/java/org/geode/kafka/source/GeodeKafkaSource.java
@@ -26,13 +26,12 @@
 import org.apache.kafka.connect.source.SourceConnector;
 import org.apache.kafka.connect.util.ConnectorUtils;
 
+import static org.geode.kafka.source.GeodeSourceConnectorConfig.SOURCE_CONFIG_DEF;
+
 
 public class GeodeKafkaSource extends SourceConnector {
 
   private Map<String, String> sharedProps;
-  // TODO maybe club this into GeodeConnnectorConfig
-  private static final ConfigDef CONFIG_DEF = new ConfigDef();
-
 
   @Override
   public Class<? extends Task> taskClass() {
@@ -61,30 +60,12 @@
 
   @Override
   public ConfigDef config() {
-    return CONFIG_DEF;
+    return SOURCE_CONFIG_DEF;
   }
 
   @Override
   public void start(Map<String, String> props) {
-    sharedProps = computeMissingConfigurations(props);
-  }
-
-  private Map<String, String> computeMissingConfigurations(Map<String, String> props) {
-    props.computeIfAbsent(
-        GeodeConnectorConfig.LOCATORS, (key) -> GeodeConnectorConfig.DEFAULT_LOCATOR);
-    props.computeIfAbsent(
-        GeodeSourceConnectorConfig.DURABLE_CLIENT_TIME_OUT, (key) -> GeodeSourceConnectorConfig.DEFAULT_DURABLE_CLIENT_TIMEOUT);
-    props.computeIfAbsent(
-        GeodeSourceConnectorConfig.DURABLE_CLIENT_ID_PREFIX, (key) -> GeodeSourceConnectorConfig.DEFAULT_DURABLE_CLIENT_ID);
-    props.computeIfAbsent(
-        GeodeSourceConnectorConfig.BATCH_SIZE, (key) -> GeodeSourceConnectorConfig.DEFAULT_BATCH_SIZE);
-    props.computeIfAbsent(
-        GeodeSourceConnectorConfig.QUEUE_SIZE, (key) -> GeodeSourceConnectorConfig.DEFAULT_QUEUE_SIZE);
-    props.computeIfAbsent(
-        GeodeSourceConnectorConfig.CQ_PREFIX, (key) -> GeodeSourceConnectorConfig.DEFAULT_CQ_PREFIX);
-    props.computeIfAbsent(
-        GeodeSourceConnectorConfig.LOAD_ENTIRE_REGION, (key) -> GeodeSourceConnectorConfig.DEFAULT_LOAD_ENTIRE_REGION);
-    return props;
+    sharedProps = props;
   }
 
   @Override
diff --git a/src/main/java/org/geode/kafka/source/GeodeKafkaSourceTask.java b/src/main/java/org/geode/kafka/source/GeodeKafkaSourceTask.java
index 2b08973..b829335 100644
--- a/src/main/java/org/geode/kafka/source/GeodeKafkaSourceTask.java
+++ b/src/main/java/org/geode/kafka/source/GeodeKafkaSourceTask.java
@@ -64,16 +64,14 @@
   public void start(Map<String, String> props) {
     try {
       geodeConnectorConfig = new GeodeSourceConnectorConfig(props);
-      int taskId = geodeConnectorConfig.getTaskId();
       logger.debug("GeodeKafkaSourceTask id:" + geodeConnectorConfig.getTaskId() + " starting");
       geodeContext = new GeodeContext();
       geodeContext.connectClient(geodeConnectorConfig.getLocatorHostPorts(),
           geodeConnectorConfig.getDurableClientId(), geodeConnectorConfig.getDurableClientTimeout(),
-          geodeConnectorConfig.getSecurityClientAuthInit());
+          geodeConnectorConfig.getSecurityClientAuthInit(), geodeConnectorConfig.usesSecurity());
 
-      batchSize = Integer.parseInt(props.get(GeodeSourceConnectorConfig.BATCH_SIZE));
-      eventBufferSupplier = new SharedEventBufferSupplier(Integer.parseInt(props.get(
-          GeodeSourceConnectorConfig.QUEUE_SIZE)));
+      batchSize = geodeConnectorConfig.getBatchSize();
+      eventBufferSupplier = new SharedEventBufferSupplier(geodeConnectorConfig.getQueueSize());
 
       regionToTopics = geodeConnectorConfig.getRegionToTopics();
       geodeConnectorConfig.getCqsToRegister();
diff --git a/src/main/java/org/geode/kafka/source/GeodeSourceConnectorConfig.java b/src/main/java/org/geode/kafka/source/GeodeSourceConnectorConfig.java
index 82bb712..78673cd 100644
--- a/src/main/java/org/geode/kafka/source/GeodeSourceConnectorConfig.java
+++ b/src/main/java/org/geode/kafka/source/GeodeSourceConnectorConfig.java
@@ -18,10 +18,13 @@
 import java.util.List;
 import java.util.Map;
 
+import org.apache.kafka.common.config.ConfigDef;
 import org.geode.kafka.GeodeConnectorConfig;
 
 public class GeodeSourceConnectorConfig extends GeodeConnectorConfig {
 
+  public static final ConfigDef SOURCE_CONFIG_DEF = configurables();
+
   // Geode Configuration
   public static final String DURABLE_CLIENT_ID_PREFIX = "durableClientIdPrefix";
   public static final String DEFAULT_DURABLE_CLIENT_ID = "";
@@ -36,7 +39,8 @@
    */
   public static final String REGION_PARTITION = "regionPartition";
   public static final String REGION_TO_TOPIC_BINDINGS = "regionToTopics";
-  public static final String CQS_TO_REGISTER = "cqsToRegister";
+  public static final String DEFAULT_REGION_TO_TOPIC_BINDING = "[gkcRegion:gkcTopic]";
+  public static final String CQS_TO_REGISTER = "cqsToRegister"; //used internally so that only 1 task will register a cq
 
   public static final String BATCH_SIZE = "geodeConnectorBatchSize";
   public static final String DEFAULT_BATCH_SIZE = "100";
@@ -52,23 +56,40 @@
   private final String durableClientTimeout;
   private final String cqPrefix;
   private final boolean loadEntireRegion;
+  private final int batchSize;
+  private final int queueSize;
 
   private Map<String, List<String>> regionToTopics;
   private Collection<String> cqsToRegister;
 
   public GeodeSourceConnectorConfig(Map<String, String> connectorProperties) {
-    super(connectorProperties);
-    cqsToRegister = parseRegionToTopics(connectorProperties.get(CQS_TO_REGISTER)).keySet();
-    regionToTopics = parseRegionToTopics(connectorProperties.get(REGION_TO_TOPIC_BINDINGS));
-    durableClientIdPrefix = connectorProperties.get(DURABLE_CLIENT_ID_PREFIX);
+    super(SOURCE_CONFIG_DEF, connectorProperties);
+    cqsToRegister = parseRegionToTopics(getString(CQS_TO_REGISTER)).keySet();
+    regionToTopics = parseRegionToTopics(getString(REGION_TO_TOPIC_BINDINGS));
+    durableClientIdPrefix = getString(DURABLE_CLIENT_ID_PREFIX);
     if (isDurable(durableClientIdPrefix)) {
       durableClientId = durableClientIdPrefix + taskId;
     } else {
       durableClientId = "";
     }
-    durableClientTimeout = connectorProperties.get(DURABLE_CLIENT_TIME_OUT);
-    cqPrefix = connectorProperties.get(CQ_PREFIX);
-    loadEntireRegion = Boolean.parseBoolean(connectorProperties.get(LOAD_ENTIRE_REGION));
+    durableClientTimeout = getString(DURABLE_CLIENT_TIME_OUT);
+    cqPrefix = getString(CQ_PREFIX);
+    loadEntireRegion = getBoolean(LOAD_ENTIRE_REGION);
+    batchSize = getInt(BATCH_SIZE);
+    queueSize = getInt(QUEUE_SIZE);
+  }
+
+  protected static ConfigDef configurables() {
+    ConfigDef configDef = GeodeConnectorConfig.configurables();
+    configDef.define(CQS_TO_REGISTER, ConfigDef.Type.STRING, "", ConfigDef.Importance.HIGH, "Internally created and used parameter, for signalling a task to register cqs");
+    configDef.define(REGION_TO_TOPIC_BINDINGS, ConfigDef.Type.STRING, DEFAULT_REGION_TO_TOPIC_BINDING, ConfigDef.Importance.HIGH, "");
+    configDef.define(DURABLE_CLIENT_ID_PREFIX, ConfigDef.Type.STRING, DEFAULT_DURABLE_CLIENT_ID, ConfigDef.Importance.LOW, "");
+    configDef.define(DURABLE_CLIENT_TIME_OUT, ConfigDef.Type.STRING, DEFAULT_DURABLE_CLIENT_TIMEOUT, ConfigDef.Importance.LOW, "");
+    configDef.define(CQ_PREFIX, ConfigDef.Type.STRING, DEFAULT_CQ_PREFIX, ConfigDef.Importance.LOW, "");
+    configDef.define(BATCH_SIZE, ConfigDef.Type.INT, DEFAULT_BATCH_SIZE, ConfigDef.Importance.MEDIUM, "");
+    configDef.define(QUEUE_SIZE, ConfigDef.Type.INT, DEFAULT_QUEUE_SIZE, ConfigDef.Importance.MEDIUM, "");
+    configDef.define(LOAD_ENTIRE_REGION, ConfigDef.Type.BOOLEAN, DEFAULT_LOAD_ENTIRE_REGION, ConfigDef.Importance.MEDIUM, "");
+    return configDef;
   }
 
   public boolean isDurable() {
@@ -111,4 +132,11 @@
     return cqsToRegister;
   }
 
+  public int getBatchSize() {
+    return batchSize;
+  }
+
+  public int getQueueSize() {
+    return queueSize;
+  }
 }
diff --git a/src/test/java/org/geode/kafka/GeodeConnectorConfigTest.java b/src/test/java/org/geode/kafka/GeodeConnectorConfigTest.java
index e04db5a..11a00f9 100644
--- a/src/test/java/org/geode/kafka/GeodeConnectorConfigTest.java
+++ b/src/test/java/org/geode/kafka/GeodeConnectorConfigTest.java
@@ -14,14 +14,20 @@
  */
 package org.geode.kafka;
 
+
+import static org.geode.kafka.GeodeConnectorConfig.SECURITY_CLIENT_AUTH_INIT;
+import static org.geode.kafka.GeodeConnectorConfig.SECURITY_USER;
 import static org.hamcrest.CoreMatchers.allOf;
 import static org.hamcrest.CoreMatchers.is;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertNull;
 import static org.junit.Assert.assertThat;
 import static org.junit.Assert.assertTrue;
 
 import java.util.Arrays;
+import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 
@@ -128,6 +134,43 @@
     assertTrue(regionToTopics.get("region1").contains("topic1"));
   }
 
+  @Test
+  public void usesSecurityShouldBeTrueIfSecurityUserSet() {
+    Map<String, String> props = new HashMap<>();
+    props.put(SECURITY_USER, "some user");
+    GeodeConnectorConfig config = new GeodeConnectorConfig(GeodeConnectorConfig.configurables(), props);
+    assertTrue(config.usesSecurity());
+  }
+
+  @Test
+  public void usesSecurityShouldBeTrueIfSecurityClientAuthInitSet() {
+    Map<String, String> props = new HashMap<>();
+    props.put(SECURITY_CLIENT_AUTH_INIT, "someclass");
+    GeodeConnectorConfig config = new GeodeConnectorConfig(GeodeConnectorConfig.configurables(), props);
+    assertTrue(config.usesSecurity());
+  }
+
+  @Test
+  public void usesSecurityShouldBeFalseIfSecurityUserAndSecurityClientAuthInitNotSet() {
+    Map<String, String> props = new HashMap<>();
+    GeodeConnectorConfig config = new GeodeConnectorConfig(GeodeConnectorConfig.configurables(), props);
+    assertFalse(config.usesSecurity());
+  }
+
+  @Test
+  public void securityClientAuthInitShouldBeSetIfUserIsSet() {
+    Map<String, String> props = new HashMap<>();
+    props.put(SECURITY_USER, "some user");
+    GeodeConnectorConfig config = new GeodeConnectorConfig(GeodeConnectorConfig.configurables(), props);
+    assertNotNull(config.getSecurityClientAuthInit());
+  }
+
+  @Test
+  public void securityClientAuthInitShouldNotBeSetIfUserIsNotSetAndNotSpecificallySet() {
+    Map<String, String> props = new HashMap<>();
+    GeodeConnectorConfig config = new GeodeConnectorConfig(GeodeConnectorConfig.configurables(), props);
+    assertNull(config.getSecurityClientAuthInit());
+  }
 
   /*
    * taskId = Integer.parseInt(connectorProperties.get(TASK_ID));