Changed all config to lower case
Changed to CqStatusListener instead of plain CqListener
diff --git a/src/main/java/kafka/GeodeConnectorConfig.java b/src/main/java/kafka/GeodeConnectorConfig.java
index b7140aa..003367d 100644
--- a/src/main/java/kafka/GeodeConnectorConfig.java
+++ b/src/main/java/kafka/GeodeConnectorConfig.java
@@ -3,39 +3,39 @@
 public class GeodeConnectorConfig {
 
     //Geode Configuration
-    public static final String DURABLE_CLIENT_ID_PREFIX = "DurableClientId";
+    public static final String DURABLE_CLIENT_ID_PREFIX = "durableClientId";
     public static final String DEFAULT_DURABLE_CLIENT_ID = "";
-    public static final String DURABLE_CLIENT_TIME_OUT = "DurableClientTimeout";
+    public static final String DURABLE_CLIENT_TIME_OUT = "durableClientTimeout";
     public static final String DEFAULT_DURABLE_CLIENT_TIMEOUT = "60000";
 
 
     //GeodeKafka Specific Configuration
-    public static final String CQ_PREFIX = "CqPrefix";
-    public static final String DEFAULT_CQ_PREFIX = "CqForGeodeKafka";
+    public static final String CQ_PREFIX = "cqPrefix";
+    public static final String DEFAULT_CQ_PREFIX = "cqForGeodeKafka";
     /**
      * Specifies which Locators to connect to Apache Geode
      */
-    public static final String LOCATORS = "Locators";
+    public static final String LOCATORS = "locators";
     public static final String DEFAULT_LOCATOR = "localhost[10334]";
 
     /**
      * Specifies which Regions to connect in Apache Geode
      */
-    public static final String REGIONS = "Regions";
+    public static final String REGIONS = "regions";
 
     /**
      * Specifies which Topics to connect in Kafka
      */
-    public static final String TOPICS = "Topics";
+    public static final String TOPICS = "topics";
 
     /**
      * Property to describe the Source Partition in a record
      */
-    public static final String REGION_NAME = "RegionName";  //used for Source Partition Events
+    public static final String REGION_NAME = "regionName";  //used for Source Partition Events
 
-    public static final String BATCH_SIZE = "GeodeConnectorBatchSize";
+    public static final String BATCH_SIZE = "geodeConnectorBatchSize";
     public static final String DEFAULT_BATCH_SIZE = "100";
 
-    public static final String QUEUE_SIZE = "GeodeConnectorQueueSize";
+    public static final String QUEUE_SIZE = "geodeConnectorQueueSize";
     public static final String DEFAULT_QUEUE_SIZE = "100000";
 }
diff --git a/src/main/java/kafka/GeodeKafkaSourceListener.java b/src/main/java/kafka/GeodeKafkaSourceListener.java
index ec94ee3..4c0e729 100644
--- a/src/main/java/kafka/GeodeKafkaSourceListener.java
+++ b/src/main/java/kafka/GeodeKafkaSourceListener.java
@@ -2,11 +2,12 @@
 
 import org.apache.geode.cache.query.CqEvent;
 import org.apache.geode.cache.query.CqListener;
+import org.apache.geode.cache.query.CqStatusListener;
 
 import java.util.concurrent.BlockingQueue;
 import java.util.concurrent.TimeUnit;
 
-class GeodeKafkaSourceListener implements CqListener {
+class GeodeKafkaSourceListener implements CqStatusListener {
 
     public String regionName;
     private BlockingQueue<GeodeEvent> eventBuffer;
@@ -39,4 +40,14 @@
     public void onError(CqEvent aCqEvent) {
 
     }
+
+    @Override
+    public void onCqDisconnected() {
+        //we should probably redistribute or reconnect
+    }
+
+    @Override
+    public void onCqConnected() {
+
+    }
 }
diff --git a/src/main/java/kafka/GeodeKafkaSourceTask.java b/src/main/java/kafka/GeodeKafkaSourceTask.java
index c463e6f..896b602 100644
--- a/src/main/java/kafka/GeodeKafkaSourceTask.java
+++ b/src/main/java/kafka/GeodeKafkaSourceTask.java
@@ -138,13 +138,10 @@
     void installListenersToRegion(int taskId, BlockingQueue<GeodeEvent> eventBuffer, String regionName, String cqPrefix, boolean isDurable) {
         CqAttributesFactory cqAttributesFactory = new CqAttributesFactory();
         cqAttributesFactory.addCqListener(new GeodeKafkaSourceListener(eventBuffer, regionName));
-        System.out.println("JASON installing on Geode");
         CqAttributes cqAttributes = cqAttributesFactory.create();
         try {
-            System.out.println("JASON installing new cq");
             clientCache.getQueryService().newCq(generateCqName(taskId, cqPrefix, regionName), "select * from /" + regionName, cqAttributes,
                     isDurable).execute();
-            System.out.println("JASON finished installing cq");
         } catch (CqExistsException e) {
             System.out.println("UHH");
             e.printStackTrace();