Kafka Batch Log Example: Added a little processor to show records content
Signed-off-by: Andrea Cosentino <ancosen@gmail.com>
diff --git a/jbang/kafka-batch-log/BatchLog.java b/jbang/kafka-batch-log/BatchLog.java
new file mode 100644
index 0000000..54d89bd
--- /dev/null
+++ b/jbang/kafka-batch-log/BatchLog.java
@@ -0,0 +1,32 @@
+package camel.example;
+
+import java.util.List;
+
+import org.apache.camel.Exchange;
+import org.apache.camel.Processor;
+import org.apache.camel.util.StringHelper;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class BatchLog implements Processor {
+
+ private static final Logger LOG = LoggerFactory.getLogger(BatchLog.class);
+
+ @Override
+ public void process(Exchange e) throws Exception {
+ final List<?> exchanges = e.getMessage().getBody(List.class);
+
+ // Ensure we are actually receiving what we are asking for
+ if (exchanges == null || exchanges.isEmpty()) {
+ return;
+ }
+
+ // The records from the batch are stored in a list of exchanges in the original exchange. To process, we iterate over that list
+ for (Object obj : exchanges) {
+ if (obj instanceof Exchange) {
+ LOG.info("Processing exchange with body {}", ((Exchange)obj).getMessage().getBody(String.class));
+ }
+ }
+ }
+
+}
diff --git a/jbang/kafka-batch-log/README.adoc b/jbang/kafka-batch-log/README.adoc
index ae47d6e..31715d4 100644
--- a/jbang/kafka-batch-log/README.adoc
+++ b/jbang/kafka-batch-log/README.adoc
@@ -57,7 +57,7 @@
[source,sh]
----
-$ jbang -Dcamel.jbang.version=4.4.0-SNAPSHOT camel@apache/camel run --local-kamelet-dir=<path_to_kamelets_repository> kafka-batch-log.yaml
+$ jbang -Dcamel.jbang.version=4.4.0-SNAPSHOT camel@apache/camel run --local-kamelet-dir=<path_to_kamelets_repository> BatchLog.java kafka-batch-log.yaml
----
=== Consumer running
@@ -105,12 +105,8 @@
[source,sh]
----
-2024-02-05 09:42:07.908 INFO 21666 --- [mer[test-topic]] log-sink : Exchange[
- ExchangePattern: InOnly
- Headers: {}
- BodyType: java.util.ArrayList
- Body: [Exchange[], Exchange[]]
-]
+2024-02-05 09:42:07.908 INFO 21666 --- [mer[test-topic]] camel.example.BatchLog : Processing exchange with body hello there
+2024-02-05 09:42:07.909 INFO 21666 --- [mer[test-topic]] camel.example.BatchLog : Processing exchange with body hello there
----
If you check the situation for the consumer group 'my-group' you could see that the commit happened manually by using the kafka-batch-manual-commit-action.
@@ -134,40 +130,26 @@
[source,sh]
----
-2024-02-05 09:50:33.947 INFO 24182 --- [mer[test-topic]] log-sink : Exchange[
- ExchangePattern: InOnly
- Headers: {}
- BodyType: java.util.ArrayList
- Body: [Exchange[], Exchange[], Exchange[], Exchange[], Exchange[], Exchange[], Exchange[], Exchange[], Exchange[], Exchange[]]
-]
-2024-02-05 09:50:44.137 INFO 24182 --- [mer[test-topic]] log-sink : Exchange[
- ExchangePattern: InOnly
- Headers: {}
- BodyType: java.util.ArrayList
- Body: [Exchange[], Exchange[], Exchange[], Exchange[], Exchange[], Exchange[], Exchange[], Exchange[], Exchange[], Exchange[]]
-]
-2024-02-05 09:50:54.324 INFO 24182 --- [mer[test-topic]] log-sink : Exchange[
- ExchangePattern: InOnly
- Headers: {}
- BodyType: java.util.ArrayList
- Body: [Exchange[], Exchange[], Exchange[], Exchange[], Exchange[], Exchange[], Exchange[], Exchange[], Exchange[], Exchange[]]
-]
-2024-02-05 09:51:04.535 INFO 24182 --- [mer[test-topic]] log-sink : Exchange[
- ExchangePattern: InOnly
- Headers: {}
- BodyType: java.util.ArrayList
- Body: [Exchange[], Exchange[], Exchange[], Exchange[], Exchange[], Exchange[], Exchange[], Exchange[], Exchange[], Exchange[]]
-]
-2024-02-05 09:51:14.747 INFO 24182 --- [mer[test-topic]] log-sink : Exchange[
- ExchangePattern: InOnly
- Headers: {}
- BodyType: java.util.ArrayList
- Body: [Exchange[], Exchange[], Exchange[], Exchange[], Exchange[], Exchange[], Exchange[], Exchange[], Exchange[], Exchange[]]
-]
+.
+.
+.
+.
+2024-02-05 09:42:40.908 INFO 21666 --- [mer[test-topic]] camel.example.BatchLog : Processing exchange with body hello there
+2024-02-05 09:42:40.909 INFO 21666 --- [mer[test-topic]] camel.example.BatchLog : Processing exchange with body hello there
+2024-02-05 09:42:40.913 INFO 21666 --- [mer[test-topic]] camel.example.BatchLog : Processing exchange with body hello there
+2024-02-05 09:42:40.914 INFO 21666 --- [mer[test-topic]] camel.example.BatchLog : Processing exchange with body hello there
+2024-02-05 09:42:40.920 INFO 21666 --- [mer[test-topic]] camel.example.BatchLog : Processing exchange with body hello there
+2024-02-05 09:42:40.928 INFO 21666 --- [mer[test-topic]] camel.example.BatchLog : Processing exchange with body hello there
+2024-02-05 09:42:40.930 INFO 21666 --- [mer[test-topic]] camel.example.BatchLog : Processing exchange with body hello there
+2024-02-05 09:42:40.940 INFO 21666 --- [mer[test-topic]] camel.example.BatchLog : Processing exchange with body hello there
+2024-02-05 09:42:40.950 INFO 21666 --- [mer[test-topic]] camel.example.BatchLog : Processing exchange with body hello there
+2024-02-05 09:42:40.955 INFO 21666 --- [mer[test-topic]] camel.example.BatchLog : Processing exchange with body hello there
+.
+.
+.
+.
----
-For the aim of this example the payload of the records is not important.
-
If you check again the offset for the consumers of my-group group you'll notice we are at offset 52 now.
[source,sh]
diff --git a/jbang/kafka-batch-log/kafka-batch-log.yaml b/jbang/kafka-batch-log/kafka-batch-log.yaml
index 11c25df..1a779bd 100644
--- a/jbang/kafka-batch-log/kafka-batch-log.yaml
+++ b/jbang/kafka-batch-log/kafka-batch-log.yaml
@@ -17,6 +17,10 @@
# camel-k: dependency=camel:kafka
+- beans:
+ - name: batchLog
+ type: "#class:camel.example.BatchLog"
+
- route:
id: "kafka-to-log"
from:
@@ -31,11 +35,7 @@
autoCommitEnable: false
allowManualCommit: true
steps:
- - to:
- uri: "kamelet:log-sink"
- parameters:
- showStreams: true
- showHeaders: true
- multiline: true
+ - bean:
+ ref: batchLog
- to:
uri: "kamelet:kafka-batch-manual-commit-action"