This example shows how to create a simple source Kamelet which periodically sends events (and certain failures). The events are consumed by a log sink in a KameletBinding. With the support of the ErrorHandler, we will redirect all errors to a Sink error-handler Kamelet that first stores the events in a Kafka topic and then provide a nice log notifying us about the error that occurred.
my-first-test.First of all, you must install the incremental-id-source Kamelet defined in incremental-id-source.kamelet.yaml file. This source will emit events every second with an autoincrement counter that will be forced to fail when the number 0 is present in the data. With this trick, we will simulate possible event faults.
$ kubectl apply -f incremental-id-source.kamelet.yaml
You can check the newly created kamelet checking the list of kamelets available:
$ kubectl get kamelets NAME PHASE incremental-id-source Ready
Now it's the turn of installing the log-sink_ Kamelet defined in log-sink.kamelet.yaml file:
$ kubectl apply -f log-sink.kamelet.yaml
You can check the newly created kamelet checking the list of kamelets available:
$ kubectl get kamelets NAME PHASE log-sink Ready incremental-id-source Ready
We finally install an error handler as specified in error-handler.kamelet.yaml file. Let's have a look at how it is configured:
apiVersion: camel.apache.org/v1alpha1
kind: Kamelet
metadata:
name: error-handler
spec:
definition:
...
properties:
kafka-brokers:
...
kafka-topic:
...
kafka-service-account-id:
...
kafka-service-account-secret:
...
log-message:
...
template:
from:
uri: kamelet:source
steps:
# First step: send to the DLC for future processing
- to:
uri: kafka:{{kafka-topic}}
parameters:
brokers: "{{kafka-brokers}}"
security-protocol: SASL_SSL
sasl-mechanism: PLAIN
sasl-jaas-config: "org.apache.kafka.common.security.plain.PlainLoginModule required username={{kafka-service-account-id}} password={{kafka-service-account-secret}};"
# Log an error message to notify about the failure
- set-body:
constant: "{{log-message}} - worry not, the event is stored in the DLC"
- to: "log:error-sink"
We first send the errored event to a kafka topic, and then, we send a simple notification message to output, just to let the user know that some issue happened. Let's apply it:
$ kubectl apply -f error-handler.kamelet.yaml
You can check the newly created kamelet listing the kamelets available:
$ kubectl get kamelets NAME PHASE error-handler Ready log-sink Ready incremental-id-source Ready
We can now create a KameletBinding which is started by the incremental-id-source Kamelet and log events to log-sink Kamelet. As this will sporadically fail, we can configure an errorHandler with the error-handler Kamelet as Sink. We want to configure also some redelivery policies (1 retry, with a 2000 milliseconds delay). We can declare it as in kamelet-binding-error-handler.yaml file:
...
errorHandler:
sink:
endpoint:
ref:
kind: Kamelet
apiVersion: camel.apache.org/v1alpha1
name: error-handler
properties:
message: "ERROR!"
...
parameters:
maximumRedeliveries: 1
redeliveryDelay: 2000
Execute the following command to start the Integration:
kubectl apply -f kamelet-binding-error-handler.yaml
As soon as the Integration starts, it will log the events on the ok log channel and errors on the error log channel:
[1] 2021-04-29 08:35:08,875 INFO [sink] (Camel (camel-1) thread #0 - timer://tick) Exchange[ExchangePattern: InOnly, BodyType: String, Body: Producing message #49] [1] 2021-04-29 08:35:11,878 INFO [sink] (Camel (camel-1) thread #0 - timer://tick) Exchange[ExchangePattern: InOnly, BodyType: String, Body: Producing message #51] [1] 2021-04-29 08:35:12,088 INFO [error-sink] (Camel (camel-1) thread #9 - KafkaProducer[my-first-test]) Exchange[ExchangePattern: InOnly, BodyType: String, Body: ERROR! - worry not, the event is stored in the DLC] [1] 2021-04-29 08:35:12,877 INFO [sink] (Camel (camel-1) thread #0 - timer://tick) Exchange[ExchangePattern: InOnly, BodyType: String, Body: Producing message #52]
If you're curious to know what was going on in the DLC side, you can use the example you found in kafka sasl ssl consumer:
kamel run --config secret:kafka-props SaslSSLKafkaConsumer.java --dev ... [1] 2021-04-29 08:57:08,636 INFO [org.apa.kaf.com.uti.AppInfoParser] (Camel (camel-1) thread #0 - KafkaConsumer[my-first-test]) Kafka commitId: 448719dc99a19793 [1] 2021-04-29 08:57:08,636 INFO [org.apa.kaf.com.uti.AppInfoParser] (Camel (camel-1) thread #0 - KafkaConsumer[my-first-test]) Kafka startTimeMs: 1619686628635 [1] 2021-04-29 08:57:08,637 INFO [org.apa.cam.com.kaf.KafkaConsumer] (Camel (camel-1) thread #0 - KafkaConsumer[my-first-test]) Subscribing my-first-test-Thread 0 to topic my-first-test ... [1] 2021-04-29 08:35:02,894 INFO [FromKafka] (Camel (camel-1) thread #0 - KafkaConsumer[my-first-test]) Producing message #40 [1] 2021-04-29 08:35:12,995 INFO [FromKafka] (Camel (camel-1) thread #0 - KafkaConsumer[my-first-test]) Producing message #50 [1] 2021-04-29 08:35:22,879 INFO [FromKafka] (Camel (camel-1) thread #0 - KafkaConsumer[my-first-test]) Producing message #60 ... ยดยดยด