Kafka Batch Log Example: Added a little processor to show records content

Signed-off-by: Andrea Cosentino <ancosen@gmail.com>
diff --git a/jbang/kafka-batch-log/BatchLog.java b/jbang/kafka-batch-log/BatchLog.java
new file mode 100644
index 0000000..54d89bd
--- /dev/null
+++ b/jbang/kafka-batch-log/BatchLog.java
@@ -0,0 +1,32 @@
+package camel.example;
+
+import java.util.List;
+
+import org.apache.camel.Exchange;
+import org.apache.camel.Processor;
+import org.apache.camel.util.StringHelper;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class BatchLog implements Processor {
+
+    private static final Logger LOG = LoggerFactory.getLogger(BatchLog.class);
+
+    @Override
+    public void process(Exchange e) throws Exception {
+        final List<?> exchanges = e.getMessage().getBody(List.class);
+
+        // Ensure we are actually receiving what we are asking for
+        if (exchanges == null || exchanges.isEmpty()) {
+            return;
+        }
+
+        // The records from the batch are stored in a list of exchanges in the original exchange. To process, we iterate over that list
+        for (Object obj : exchanges) {
+            if (obj instanceof Exchange) {
+                LOG.info("Processing exchange with body {}", ((Exchange)obj).getMessage().getBody(String.class));
+            }
+        }
+    }
+
+}
diff --git a/jbang/kafka-batch-log/README.adoc b/jbang/kafka-batch-log/README.adoc
index ae47d6e..31715d4 100644
--- a/jbang/kafka-batch-log/README.adoc
+++ b/jbang/kafka-batch-log/README.adoc
@@ -57,7 +57,7 @@
 
 [source,sh]
 ----
-$ jbang -Dcamel.jbang.version=4.4.0-SNAPSHOT camel@apache/camel run --local-kamelet-dir=<path_to_kamelets_repository> kafka-batch-log.yaml
+$ jbang -Dcamel.jbang.version=4.4.0-SNAPSHOT camel@apache/camel run --local-kamelet-dir=<path_to_kamelets_repository> BatchLog.java kafka-batch-log.yaml
 ----
 
 === Consumer running
@@ -105,12 +105,8 @@
 
 [source,sh]
 ----
-2024-02-05 09:42:07.908  INFO 21666 --- [mer[test-topic]] log-sink                            : Exchange[
-  ExchangePattern: InOnly
-  Headers: {}
-  BodyType: java.util.ArrayList
-  Body: [Exchange[], Exchange[]]
-]
+2024-02-05 09:42:07.908  INFO 21666 --- [mer[test-topic]] camel.example.BatchLog              : Processing exchange with body hello there
+2024-02-05 09:42:07.909  INFO 21666 --- [mer[test-topic]] camel.example.BatchLog              : Processing exchange with body hello there
 ----
 
 If you check the situation for the consumer group 'my-group' you could see that the commit happened manually by using the kafka-batch-manual-commit-action.
@@ -134,40 +130,26 @@
 
 [source,sh]
 ----
-2024-02-05 09:50:33.947  INFO 24182 --- [mer[test-topic]] log-sink                            : Exchange[
-  ExchangePattern: InOnly
-  Headers: {}
-  BodyType: java.util.ArrayList
-  Body: [Exchange[], Exchange[], Exchange[], Exchange[], Exchange[], Exchange[], Exchange[], Exchange[], Exchange[], Exchange[]]
-]
-2024-02-05 09:50:44.137  INFO 24182 --- [mer[test-topic]] log-sink                            : Exchange[
-  ExchangePattern: InOnly
-  Headers: {}
-  BodyType: java.util.ArrayList
-  Body: [Exchange[], Exchange[], Exchange[], Exchange[], Exchange[], Exchange[], Exchange[], Exchange[], Exchange[], Exchange[]]
-]
-2024-02-05 09:50:54.324  INFO 24182 --- [mer[test-topic]] log-sink                            : Exchange[
-  ExchangePattern: InOnly
-  Headers: {}
-  BodyType: java.util.ArrayList
-  Body: [Exchange[], Exchange[], Exchange[], Exchange[], Exchange[], Exchange[], Exchange[], Exchange[], Exchange[], Exchange[]]
-]
-2024-02-05 09:51:04.535  INFO 24182 --- [mer[test-topic]] log-sink                            : Exchange[
-  ExchangePattern: InOnly
-  Headers: {}
-  BodyType: java.util.ArrayList
-  Body: [Exchange[], Exchange[], Exchange[], Exchange[], Exchange[], Exchange[], Exchange[], Exchange[], Exchange[], Exchange[]]
-]
-2024-02-05 09:51:14.747  INFO 24182 --- [mer[test-topic]] log-sink                            : Exchange[
-  ExchangePattern: InOnly
-  Headers: {}
-  BodyType: java.util.ArrayList
-  Body: [Exchange[], Exchange[], Exchange[], Exchange[], Exchange[], Exchange[], Exchange[], Exchange[], Exchange[], Exchange[]]
-]
+.
+.
+.
+.
+2024-02-05 09:42:40.908  INFO 21666 --- [mer[test-topic]] camel.example.BatchLog              : Processing exchange with body hello there
+2024-02-05 09:42:40.909  INFO 21666 --- [mer[test-topic]] camel.example.BatchLog              : Processing exchange with body hello there
+2024-02-05 09:42:40.913  INFO 21666 --- [mer[test-topic]] camel.example.BatchLog              : Processing exchange with body hello there
+2024-02-05 09:42:40.914  INFO 21666 --- [mer[test-topic]] camel.example.BatchLog              : Processing exchange with body hello there
+2024-02-05 09:42:40.920  INFO 21666 --- [mer[test-topic]] camel.example.BatchLog              : Processing exchange with body hello there
+2024-02-05 09:42:40.928  INFO 21666 --- [mer[test-topic]] camel.example.BatchLog              : Processing exchange with body hello there
+2024-02-05 09:42:40.930  INFO 21666 --- [mer[test-topic]] camel.example.BatchLog              : Processing exchange with body hello there
+2024-02-05 09:42:40.940  INFO 21666 --- [mer[test-topic]] camel.example.BatchLog              : Processing exchange with body hello there
+2024-02-05 09:42:40.950  INFO 21666 --- [mer[test-topic]] camel.example.BatchLog              : Processing exchange with body hello there
+2024-02-05 09:42:40.955  INFO 21666 --- [mer[test-topic]] camel.example.BatchLog              : Processing exchange with body hello there
+.
+.
+.
+.
 ----
 
-For the aim of this example the payload of the records is not important.
-
 If you check again the offset for the consumers of my-group group you'll notice we are at offset 52 now.
 
 [source,sh]
diff --git a/jbang/kafka-batch-log/kafka-batch-log.yaml b/jbang/kafka-batch-log/kafka-batch-log.yaml
index 11c25df..1a779bd 100644
--- a/jbang/kafka-batch-log/kafka-batch-log.yaml
+++ b/jbang/kafka-batch-log/kafka-batch-log.yaml
@@ -17,6 +17,10 @@
 
 # camel-k: dependency=camel:kafka
 
+- beans:
+  - name: batchLog
+    type: "#class:camel.example.BatchLog"
+
 - route:
     id: "kafka-to-log"
     from:
@@ -31,11 +35,7 @@
         autoCommitEnable: false
         allowManualCommit: true
       steps:
-        - to:
-            uri: "kamelet:log-sink"
-            parameters:
-              showStreams: true
-              showHeaders: true
-              multiline: true
+        - bean:
+            ref: batchLog
         - to:
             uri: "kamelet:kafka-batch-manual-commit-action"