moved cache creation errors to GeodeContext
added info logging
removed some debug logging
diff --git a/src/main/java/org/apache/geode/kafka/GeodeConnectorConfig.java b/src/main/java/org/apache/geode/kafka/GeodeConnectorConfig.java
index 586817a..3c218d4 100644
--- a/src/main/java/org/apache/geode/kafka/GeodeConnectorConfig.java
+++ b/src/main/java/org/apache/geode/kafka/GeodeConnectorConfig.java
@@ -66,7 +66,6 @@
     securityUserName = getString(SECURITY_USER);
     securityPassword = getPassword(SECURITY_PASSWORD);
     securityClientAuthInit = getPassword(SECURITY_CLIENT_AUTH_INIT);
-//    System.out.println(securityUserName + "NABA " + securityPassword.value() + "NABA" + securityClientAuthInit.value());
     // if we registered a username/password instead of auth init, we should use the default auth
     // init if one isn't specified
     if (usesSecurity()) {
diff --git a/src/main/java/org/apache/geode/kafka/GeodeContext.java b/src/main/java/org/apache/geode/kafka/GeodeContext.java
index 418a476..5e377e3 100644
--- a/src/main/java/org/apache/geode/kafka/GeodeContext.java
+++ b/src/main/java/org/apache/geode/kafka/GeodeContext.java
@@ -61,28 +61,33 @@
   public ClientCache createClientCache(List<LocatorHostPort> locators, String durableClientName,
                                        String durableClientTimeOut, String securityAuthInit, String securityUserName,
                                        String securityPassword, boolean usesSecurity) {
-    ClientCacheFactory ccf = new ClientCacheFactory();
+    try {
+      ClientCacheFactory ccf = new ClientCacheFactory();
 
-    ccf.setPdxReadSerialized(true);
-    if (usesSecurity) {
-      if (securityUserName != null && securityPassword != null) {
-        ccf.set(SECURITY_USER, securityUserName);
-        ccf.set(SECURITY_PASSWORD, securityPassword);
+      ccf.setPdxReadSerialized(true);
+      if (usesSecurity) {
+        if (securityUserName != null && securityPassword != null) {
+          ccf.set(SECURITY_USER, securityUserName);
+          ccf.set(SECURITY_PASSWORD, securityPassword);
+        }
+        ccf.set(SECURITY_CLIENT_AUTH_INIT, securityAuthInit);
       }
-      ccf.set(SECURITY_CLIENT_AUTH_INIT, securityAuthInit);
-    }
-    if (!durableClientName.equals("")) {
-      ccf.set("durable-client-id", durableClientName)
-          .set("durable-client-timeout", durableClientTimeOut);
-    }
-    // currently we only allow using the default pool.
-    // If we ever want to allow adding multiple pools we'll have to configure pool factories
-    ccf.setPoolSubscriptionEnabled(true);
+      if (!durableClientName.equals("")) {
+        ccf.set("durable-client-id", durableClientName)
+                .set("durable-client-timeout", durableClientTimeOut);
+      }
+      // currently we only allow using the default pool.
+      // If we ever want to allow adding multiple pools we'll have to configure pool factories
+      ccf.setPoolSubscriptionEnabled(true);
 
-    for (LocatorHostPort locator : locators) {
-      ccf.addPoolLocator(locator.getHostName(), locator.getPort());
+      for (LocatorHostPort locator : locators) {
+        ccf.addPoolLocator(locator.getHostName(), locator.getPort());
+      }
+      return ccf.create();
+    } catch (Exception e) {
+        throw new ConnectException(
+                "Unable to create an client cache connected to Apache Geode cluster");
     }
-    return ccf.create();
   }
 
   public CqQuery newCq(String name, String query, CqAttributes cqAttributes, boolean isDurable)
@@ -106,4 +111,8 @@
       throw new ConnectException(e);
     }
   }
+
+  public void close(boolean keepAlive) {
+    clientCache.close(keepAlive);
+  }
 }
diff --git a/src/main/java/org/apache/geode/kafka/sink/GeodeKafkaSinkTask.java b/src/main/java/org/apache/geode/kafka/sink/GeodeKafkaSinkTask.java
index 1688eb7..ef95450 100644
--- a/src/main/java/org/apache/geode/kafka/sink/GeodeKafkaSinkTask.java
+++ b/src/main/java/org/apache/geode/kafka/sink/GeodeKafkaSinkTask.java
@@ -61,16 +61,11 @@
       GeodeSinkConnectorConfig geodeConnectorConfig = new GeodeSinkConnectorConfig(props);
       configure(geodeConnectorConfig);
       geodeContext = new GeodeContext();
-      final ClientCache clientCache =
-          geodeContext.connectClient(geodeConnectorConfig.getLocatorHostPorts(),
+      geodeContext.connectClient(geodeConnectorConfig.getLocatorHostPorts(),
               geodeConnectorConfig.getSecurityClientAuthInit(),
               geodeConnectorConfig.getSecurityUserName(),
               geodeConnectorConfig.getSecurityPassword(),
               geodeConnectorConfig.usesSecurity());
-      if (clientCache == null) {
-        throw new ConnectException(
-            "Unable to create a client cache connected to the Apache Geode cluster");
-      }
       regionNameToRegion = createProxyRegions(topicToRegions.values());
     } catch (Exception e) {
       logger.error("Unable to start sink task", e);
@@ -149,7 +144,7 @@
 
   @Override
   public void stop() {
-    geodeContext.getClientCache().close(false);
+    geodeContext.close(false);
   }
 
 }
diff --git a/src/main/java/org/apache/geode/kafka/source/GeodeKafkaSourceListener.java b/src/main/java/org/apache/geode/kafka/source/GeodeKafkaSourceListener.java
index 8a54766..f2f3142 100644
--- a/src/main/java/org/apache/geode/kafka/source/GeodeKafkaSourceListener.java
+++ b/src/main/java/org/apache/geode/kafka/source/GeodeKafkaSourceListener.java
@@ -51,7 +51,7 @@
               TimeUnit.SECONDS))
             break;
         } catch (InterruptedException ex) {
-          ex.printStackTrace();
+          logger.info("Thread interrupted while updating buffer", ex);
         }
         logger.info("GeodeKafkaSource Queue is full");
       }
@@ -66,6 +66,7 @@
   @Override
   public void onCqDisconnected() {
     // we should probably redistribute or reconnect
+    logger.info("cq has been disconnected");
   }
 
   @Override
diff --git a/src/main/java/org/apache/geode/kafka/source/GeodeKafkaSourceTask.java b/src/main/java/org/apache/geode/kafka/source/GeodeKafkaSourceTask.java
index 24cd531..a04ea5e 100644
--- a/src/main/java/org/apache/geode/kafka/source/GeodeKafkaSourceTask.java
+++ b/src/main/java/org/apache/geode/kafka/source/GeodeKafkaSourceTask.java
@@ -29,7 +29,6 @@
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import org.apache.geode.cache.client.ClientCache;
 import org.apache.geode.cache.query.CqAttributes;
 import org.apache.geode.cache.query.CqAttributesFactory;
 import org.apache.geode.cache.query.CqQuery;
@@ -68,22 +67,17 @@
   @Override
   public void start(Map<String, String> props) {
     try {
-      System.out.println("NABA ::" + props);
       GeodeSourceConnectorConfig geodeConnectorConfig = new GeodeSourceConnectorConfig(props);
       logger.debug("GeodeKafkaSourceTask id:" + geodeConnectorConfig.getTaskId() + " starting");
       geodeContext = new GeodeContext();
-      final ClientCache clientCache =
-          geodeContext.connectClient(geodeConnectorConfig.getLocatorHostPorts(),
+      geodeContext.connectClient(geodeConnectorConfig.getLocatorHostPorts(),
               geodeConnectorConfig.getDurableClientId(),
               geodeConnectorConfig.getDurableClientTimeout(),
               geodeConnectorConfig.getSecurityClientAuthInit(),
               geodeConnectorConfig.getSecurityUserName(),
               geodeConnectorConfig.getSecurityPassword(),
               geodeConnectorConfig.usesSecurity());
-      if (clientCache == null) {
-        throw new ConnectException(
-            "Unable to create an client cache connected to Apache Geode cluster");
-      }
+
       batchSize = geodeConnectorConfig.getBatchSize();
       eventBufferSupplier = new SharedEventBufferSupplier(geodeConnectorConfig.getQueueSize());
 
@@ -122,7 +116,7 @@
 
   @Override
   public void stop() {
-    geodeContext.getClientCache().close(true);
+    geodeContext.close(true);
   }
 
   void installOnGeode(GeodeSourceConnectorConfig geodeConnectorConfig, GeodeContext geodeContext,