Debug logging added
diff --git a/src/main/java/org/apache/geode/kafka/sink/GeodeKafkaSinkTask.java b/src/main/java/org/apache/geode/kafka/sink/GeodeKafkaSinkTask.java
index 700688e..1f50ea4 100644
--- a/src/main/java/org/apache/geode/kafka/sink/GeodeKafkaSinkTask.java
+++ b/src/main/java/org/apache/geode/kafka/sink/GeodeKafkaSinkTask.java
@@ -85,12 +85,17 @@
@Override
public void put(Collection<SinkRecord> records) {
+ logger.debug("Received " + records.size() + " records.");
put(records, new HashMap<>());
}
void put(Collection<SinkRecord> records, Map<String, BatchRecords> batchRecordsMap) {
// spin off a new thread to handle this operation? Downside is ordering and retries...
for (SinkRecord record : records) {
+ logger.debug("kafka coordinates:(Topic:"
+ + record.topic() +
+ " Partition:" + record.kafkaPartition() + " Offset:" + record.kafkaOffset()
+ + ")");
updateBatchForRegionByTopic(record, batchRecordsMap);
}
batchRecordsMap.forEach(
diff --git a/src/main/java/org/apache/geode/kafka/source/GeodeKafkaSourceTask.java b/src/main/java/org/apache/geode/kafka/source/GeodeKafkaSourceTask.java
index d53a9e9..13e5b60 100644
--- a/src/main/java/org/apache/geode/kafka/source/GeodeKafkaSourceTask.java
+++ b/src/main/java/org/apache/geode/kafka/source/GeodeKafkaSourceTask.java
@@ -104,6 +104,7 @@
ArrayList<SourceRecord> records = new ArrayList<>(batchSize);
ArrayList<GeodeEvent> events = new ArrayList<>(batchSize);
if (eventBufferSupplier.get().drainTo(events, batchSize) > 0) {
+ logger.debug("Geode events polled :" + events.size());
for (GeodeEvent event : events) {
String regionName = event.getRegionName();
List<String> topics = regionToTopics.get(regionName);