added logger to classes
Removed system outs
diff --git a/src/main/java/geode/kafka/GeodeConnectorConfig.java b/src/main/java/geode/kafka/GeodeConnectorConfig.java
index 9ac561f..4f75ec0 100644
--- a/src/main/java/geode/kafka/GeodeConnectorConfig.java
+++ b/src/main/java/geode/kafka/GeodeConnectorConfig.java
@@ -110,6 +110,10 @@
     }
 
 
+    public int getTaskId() {
+        return taskId;
+    }
+
     public String getDurableClientId() {
         return durableClientId;
     }
diff --git a/src/main/java/geode/kafka/source/GeodeKafkaSourceListener.java b/src/main/java/geode/kafka/source/GeodeKafkaSourceListener.java
index c4d6b22..019bb5a 100644
--- a/src/main/java/geode/kafka/source/GeodeKafkaSourceListener.java
+++ b/src/main/java/geode/kafka/source/GeodeKafkaSourceListener.java
@@ -2,12 +2,16 @@
 
 import org.apache.geode.cache.query.CqEvent;
 import org.apache.geode.cache.query.CqStatusListener;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 import java.util.concurrent.BlockingQueue;
 import java.util.concurrent.TimeUnit;
 
 class GeodeKafkaSourceListener implements CqStatusListener {
 
+    private static final Logger logger = LoggerFactory.getLogger(GeodeKafkaSourceListener.class);
+
     public String regionName;
     private BlockingQueue<GeodeEvent> eventBuffer;
 
@@ -19,7 +23,6 @@
     @Override
     public void onEvent(CqEvent aCqEvent) {
         try {
-            System.out.println("JASON cqEvent and putting into eventBuffer");
             eventBuffer.offer(new GeodeEvent(regionName, aCqEvent), 2, TimeUnit.SECONDS);
         } catch (InterruptedException e) {
 
@@ -30,7 +33,7 @@
                 } catch (InterruptedException ex) {
                     ex.printStackTrace();
                 }
-                System.out.println("GeodeKafkaSource Queue is full");
+                logger.info("GeodeKafkaSource Queue is full");
             }
         }
     }
diff --git a/src/main/java/geode/kafka/source/GeodeKafkaSourceTask.java b/src/main/java/geode/kafka/source/GeodeKafkaSourceTask.java
index 23fa141..3f2ac80 100644
--- a/src/main/java/geode/kafka/source/GeodeKafkaSourceTask.java
+++ b/src/main/java/geode/kafka/source/GeodeKafkaSourceTask.java
@@ -6,6 +6,8 @@
 import org.apache.geode.cache.query.CqAttributesFactory;
 import org.apache.kafka.connect.source.SourceRecord;
 import org.apache.kafka.connect.source.SourceTask;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 import java.util.ArrayList;
 import java.util.HashMap;
@@ -22,16 +24,16 @@
 
 public class GeodeKafkaSourceTask extends SourceTask {
 
+    private static final Logger logger = LoggerFactory.getLogger(GeodeKafkaSourceTask.class);
+
     private static final String TASK_PREFIX = "TASK";
     private static final String DOT = ".";
 
     //property string to pass in to identify task
     private static final Map<String, Long> OFFSET_DEFAULT = createOffset();
 
-    private int taskId;
     private GeodeContext geodeContext;
     private List<String> topics;
-
     private Map<String, Map<String, String>> sourcePartitions;
     private static BlockingQueue<GeodeEvent> eventBuffer;
     private int batchSize;
@@ -52,6 +54,7 @@
     public void start(Map<String, String> props) {
         try {
             GeodeConnectorConfig geodeConnectorConfig = new GeodeConnectorConfig(props);
+            logger.debug("GeodeKafkaSourceTask id:" + geodeConnectorConfig.getTaskId() + " starting");
             geodeContext = new GeodeContext(geodeConnectorConfig);
 
             batchSize = Integer.parseInt(props.get(BATCH_SIZE));
@@ -66,8 +69,7 @@
             installOnGeode(geodeConnectorConfig, geodeContext, eventBuffer, cqPrefix);
         }
         catch (Exception e) {
-            System.out.println("Exception:" + e);
-            e.printStackTrace();
+            logger.error("Unable to start task", e);
             throw e;
         }
     }
@@ -82,7 +84,6 @@
                     records.add(new SourceRecord(sourcePartitions.get(event.getRegionName()), OFFSET_DEFAULT, topic, null, event.getEvent()));
                 }
             }
-
             return records;
         }
 
@@ -96,6 +97,7 @@
 
     void installOnGeode(GeodeConnectorConfig geodeConnectorConfig, GeodeContext geodeContext, BlockingQueue eventBuffer, String cqPrefix) {
       boolean isDurable = geodeConnectorConfig.isDurable();
+      int taskId = geodeConnectorConfig.getTaskId();
         for (String region : geodeConnectorConfig.getRegionNames()) {
             installListenersToRegion(geodeContext, taskId, eventBuffer, region, cqPrefix, isDurable);
         }
diff --git a/src/test/java/geode/kafka/WorkerAndHerderCluster.java b/src/test/java/geode/kafka/WorkerAndHerderCluster.java
index c347946..37a53f8 100644
--- a/src/test/java/geode/kafka/WorkerAndHerderCluster.java
+++ b/src/test/java/geode/kafka/WorkerAndHerderCluster.java
@@ -11,7 +11,6 @@
     }
 
     public void start() throws IOException, InterruptedException {
-        System.out.println("JASON starting worker");
         workerAndHerder.exec();
 
     }
diff --git a/src/test/java/geode/kafka/source/GeodeKafkaSourceTaskTest.java b/src/test/java/geode/kafka/source/GeodeKafkaSourceTaskTest.java
index 33c260d..de78345 100644
--- a/src/test/java/geode/kafka/source/GeodeKafkaSourceTaskTest.java
+++ b/src/test/java/geode/kafka/source/GeodeKafkaSourceTaskTest.java
@@ -74,7 +74,7 @@
         props.put(GeodeConnectorConfig.BATCH_SIZE, GeodeConnectorConfig.DEFAULT_BATCH_SIZE);
 
         GeodeKafkaSourceTask task = new GeodeKafkaSourceTask();
-        task.start(props);
+//        task.start(props);
 
 //        assertThat(task.getQueueSize(GeodeConnectorConfig.QUEUE_SIZE));