added logger to classes
Removed system outs
diff --git a/src/main/java/geode/kafka/GeodeConnectorConfig.java b/src/main/java/geode/kafka/GeodeConnectorConfig.java
index 9ac561f..4f75ec0 100644
--- a/src/main/java/geode/kafka/GeodeConnectorConfig.java
+++ b/src/main/java/geode/kafka/GeodeConnectorConfig.java
@@ -110,6 +110,10 @@
}
+ public int getTaskId() {
+ return taskId;
+ }
+
public String getDurableClientId() {
return durableClientId;
}
diff --git a/src/main/java/geode/kafka/source/GeodeKafkaSourceListener.java b/src/main/java/geode/kafka/source/GeodeKafkaSourceListener.java
index c4d6b22..019bb5a 100644
--- a/src/main/java/geode/kafka/source/GeodeKafkaSourceListener.java
+++ b/src/main/java/geode/kafka/source/GeodeKafkaSourceListener.java
@@ -2,12 +2,16 @@
import org.apache.geode.cache.query.CqEvent;
import org.apache.geode.cache.query.CqStatusListener;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.TimeUnit;
class GeodeKafkaSourceListener implements CqStatusListener {
+ private static final Logger logger = LoggerFactory.getLogger(GeodeKafkaSourceListener.class);
+
public String regionName;
private BlockingQueue<GeodeEvent> eventBuffer;
@@ -19,7 +23,6 @@
@Override
public void onEvent(CqEvent aCqEvent) {
try {
- System.out.println("JASON cqEvent and putting into eventBuffer");
eventBuffer.offer(new GeodeEvent(regionName, aCqEvent), 2, TimeUnit.SECONDS);
} catch (InterruptedException e) {
@@ -30,7 +33,7 @@
} catch (InterruptedException ex) {
ex.printStackTrace();
}
- System.out.println("GeodeKafkaSource Queue is full");
+ logger.info("GeodeKafkaSource Queue is full");
}
}
}
diff --git a/src/main/java/geode/kafka/source/GeodeKafkaSourceTask.java b/src/main/java/geode/kafka/source/GeodeKafkaSourceTask.java
index 23fa141..3f2ac80 100644
--- a/src/main/java/geode/kafka/source/GeodeKafkaSourceTask.java
+++ b/src/main/java/geode/kafka/source/GeodeKafkaSourceTask.java
@@ -6,6 +6,8 @@
import org.apache.geode.cache.query.CqAttributesFactory;
import org.apache.kafka.connect.source.SourceRecord;
import org.apache.kafka.connect.source.SourceTask;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
import java.util.ArrayList;
import java.util.HashMap;
@@ -22,16 +24,16 @@
public class GeodeKafkaSourceTask extends SourceTask {
+ private static final Logger logger = LoggerFactory.getLogger(GeodeKafkaSourceTask.class);
+
private static final String TASK_PREFIX = "TASK";
private static final String DOT = ".";
//property string to pass in to identify task
private static final Map<String, Long> OFFSET_DEFAULT = createOffset();
- private int taskId;
private GeodeContext geodeContext;
private List<String> topics;
-
private Map<String, Map<String, String>> sourcePartitions;
private static BlockingQueue<GeodeEvent> eventBuffer;
private int batchSize;
@@ -52,6 +54,7 @@
public void start(Map<String, String> props) {
try {
GeodeConnectorConfig geodeConnectorConfig = new GeodeConnectorConfig(props);
+ logger.debug("GeodeKafkaSourceTask id:" + geodeConnectorConfig.getTaskId() + " starting");
geodeContext = new GeodeContext(geodeConnectorConfig);
batchSize = Integer.parseInt(props.get(BATCH_SIZE));
@@ -66,8 +69,7 @@
installOnGeode(geodeConnectorConfig, geodeContext, eventBuffer, cqPrefix);
}
catch (Exception e) {
- System.out.println("Exception:" + e);
- e.printStackTrace();
+ logger.error("Unable to start task", e);
throw e;
}
}
@@ -82,7 +84,6 @@
records.add(new SourceRecord(sourcePartitions.get(event.getRegionName()), OFFSET_DEFAULT, topic, null, event.getEvent()));
}
}
-
return records;
}
@@ -96,6 +97,7 @@
void installOnGeode(GeodeConnectorConfig geodeConnectorConfig, GeodeContext geodeContext, BlockingQueue eventBuffer, String cqPrefix) {
boolean isDurable = geodeConnectorConfig.isDurable();
+ int taskId = geodeConnectorConfig.getTaskId();
for (String region : geodeConnectorConfig.getRegionNames()) {
installListenersToRegion(geodeContext, taskId, eventBuffer, region, cqPrefix, isDurable);
}
diff --git a/src/test/java/geode/kafka/WorkerAndHerderCluster.java b/src/test/java/geode/kafka/WorkerAndHerderCluster.java
index c347946..37a53f8 100644
--- a/src/test/java/geode/kafka/WorkerAndHerderCluster.java
+++ b/src/test/java/geode/kafka/WorkerAndHerderCluster.java
@@ -11,7 +11,6 @@
}
public void start() throws IOException, InterruptedException {
- System.out.println("JASON starting worker");
workerAndHerder.exec();
}
diff --git a/src/test/java/geode/kafka/source/GeodeKafkaSourceTaskTest.java b/src/test/java/geode/kafka/source/GeodeKafkaSourceTaskTest.java
index 33c260d..de78345 100644
--- a/src/test/java/geode/kafka/source/GeodeKafkaSourceTaskTest.java
+++ b/src/test/java/geode/kafka/source/GeodeKafkaSourceTaskTest.java
@@ -74,7 +74,7 @@
props.put(GeodeConnectorConfig.BATCH_SIZE, GeodeConnectorConfig.DEFAULT_BATCH_SIZE);
GeodeKafkaSourceTask task = new GeodeKafkaSourceTask();
- task.start(props);
+// task.start(props);
// assertThat(task.getQueueSize(GeodeConnectorConfig.QUEUE_SIZE));