Removed system outs
Passing through the security auth init property
diff --git a/src/main/java/geode/kafka/GeodeConnectorConfig.java b/src/main/java/geode/kafka/GeodeConnectorConfig.java
index 88f5a02..ac9a31f 100644
--- a/src/main/java/geode/kafka/GeodeConnectorConfig.java
+++ b/src/main/java/geode/kafka/GeodeConnectorConfig.java
@@ -32,9 +32,11 @@
      */
     public static final String LOCATORS = "locators";
     public static final String DEFAULT_LOCATOR = "localhost[10334]";
+    public static final String SECURITY_CLIENT_AUTH_INIT = "security-client-auth-init";
 
     protected final int taskId;
     protected List<LocatorHostPort> locatorHostPorts;
+    private String securityClientAuthInit;
 
     protected GeodeConnectorConfig() {
         taskId = 0;
@@ -43,6 +45,7 @@
     public GeodeConnectorConfig(Map<String, String> connectorProperties) {
         taskId = Integer.parseInt(connectorProperties.get(TASK_ID));
         locatorHostPorts = parseLocators(connectorProperties.get(GeodeConnectorConfig.LOCATORS));
+        securityClientAuthInit = connectorProperties.get(SECURITY_CLIENT_AUTH_INIT);
     }
 
 
@@ -116,4 +119,8 @@
     public List<LocatorHostPort> getLocatorHostPorts() {
         return locatorHostPorts;
     }
+
+    public String getSecurityClientAuthInit() {
+        return securityClientAuthInit;
+    }
 }
diff --git a/src/main/java/geode/kafka/GeodeContext.java b/src/main/java/geode/kafka/GeodeContext.java
index 2078782..ff8a8c3 100644
--- a/src/main/java/geode/kafka/GeodeContext.java
+++ b/src/main/java/geode/kafka/GeodeContext.java
@@ -26,6 +26,8 @@
 import java.util.Collection;
 import java.util.List;
 
+import static geode.kafka.GeodeConnectorConfig.SECURITY_CLIENT_AUTH_INIT;
+
 public class GeodeContext {
 
     private ClientCache clientCache;
@@ -34,13 +36,13 @@
     public GeodeContext() {
     }
 
-    public ClientCache connectClient(List<LocatorHostPort> locatorHostPortList, String durableClientId, String durableClientTimeout) {
-        clientCache = createClientCache(locatorHostPortList, durableClientId, durableClientTimeout);
+    public ClientCache connectClient(List<LocatorHostPort> locatorHostPortList, String durableClientId, String durableClientTimeout, String securityAuthInit) {
+        clientCache = createClientCache(locatorHostPortList, durableClientId, durableClientTimeout, securityAuthInit);
         return clientCache;
     }
 
-    public ClientCache connectClient(List<LocatorHostPort> locatorHostPortList) {
-        clientCache = createClientCache(locatorHostPortList, "", "");
+    public ClientCache connectClient(List<LocatorHostPort> locatorHostPortList, String securityAuthInit) {
+        clientCache = createClientCache(locatorHostPortList, "", "", securityAuthInit);
         return clientCache;
     }
 
@@ -55,8 +57,12 @@
      * @param durableClientTimeOut
      * @return
      */
-    public ClientCache createClientCache(List<LocatorHostPort> locators, String durableClientName, String durableClientTimeOut) {
+    public ClientCache createClientCache(List<LocatorHostPort> locators, String durableClientName, String durableClientTimeOut, String securityAuthInit) {
         ClientCacheFactory ccf = new ClientCacheFactory();
+
+        if (securityAuthInit != null) {
+            ccf.set(SECURITY_CLIENT_AUTH_INIT, securityAuthInit);
+        }
         if (!durableClientName.equals("")) {
             ccf.set("durable-client-id", durableClientName)
                     .set("durable-client-timeout", durableClientTimeOut);
diff --git a/src/main/java/geode/kafka/sink/GeodeKafkaSinkTask.java b/src/main/java/geode/kafka/sink/GeodeKafkaSinkTask.java
index a1d7ecf..66e528f 100644
--- a/src/main/java/geode/kafka/sink/GeodeKafkaSinkTask.java
+++ b/src/main/java/geode/kafka/sink/GeodeKafkaSinkTask.java
@@ -58,12 +58,11 @@
             GeodeSinkConnectorConfig geodeConnectorConfig = new GeodeSinkConnectorConfig(props);
             logger.debug("GeodeKafkaSourceTask id:" + geodeConnectorConfig.getTaskId() + " starting");
             geodeContext = new GeodeContext();
-            geodeContext.connectClient(geodeConnectorConfig.getLocatorHostPorts());
+            geodeContext.connectClient(geodeConnectorConfig.getLocatorHostPorts(), geodeConnectorConfig.getSecurityClientAuthInit());
             topicToRegions = geodeConnectorConfig.getTopicToRegions();
             regionNameToRegion = createProxyRegions(topicToRegions.values());
             nullValuesMeansRemove = geodeConnectorConfig.getNullValuesMeanRemove();
         } catch (Exception e) {
-            e.printStackTrace();
             logger.error("Unable to start sink task", e);
             throw e;
         }
diff --git a/src/main/java/geode/kafka/source/GeodeKafkaSourceTask.java b/src/main/java/geode/kafka/source/GeodeKafkaSourceTask.java
index 0f6b50c..8d827a1 100644
--- a/src/main/java/geode/kafka/source/GeodeKafkaSourceTask.java
+++ b/src/main/java/geode/kafka/source/GeodeKafkaSourceTask.java
@@ -68,11 +68,10 @@
     @Override
     public void start(Map<String, String> props) {
         try {
-            System.out.println("JASON start task");
             geodeConnectorConfig = new GeodeSourceConnectorConfig(props);
             logger.debug("GeodeKafkaSourceTask id:" + geodeConnectorConfig.getTaskId() + " starting");
             geodeContext = new GeodeContext();
-            geodeContext.connectClient(geodeConnectorConfig.getLocatorHostPorts(), geodeConnectorConfig.getDurableClientId(), geodeConnectorConfig.getDurableClientTimeout());
+            geodeContext.connectClient(geodeConnectorConfig.getLocatorHostPorts(), geodeConnectorConfig.getDurableClientId(), geodeConnectorConfig.getDurableClientTimeout(), geodeConnectorConfig.getSecurityClientAuthInit());
 
             batchSize = Integer.parseInt(props.get(BATCH_SIZE));
             int queueSize = Integer.parseInt(props.get(QUEUE_SIZE));
@@ -86,12 +85,9 @@
             boolean loadEntireRegion = geodeConnectorConfig.getLoadEntireRegion();
             installOnGeode(geodeConnectorConfig, geodeContext, eventBuffer, cqPrefix, loadEntireRegion);
         } catch (Exception e) {
-            System.out.println("JASON start task failed" + e);
-            e.printStackTrace();
             logger.error("Unable to start source task", e);
             throw e;
         }
-        System.out.println("JASON end task");
 
     }