blob: e80ccdb48b330dc7b4bacddcc945e1b5cad111a5 [file] [log] [blame] [view]
# Kamelets Binding Error Handler example
This example shows how to create a simple _source_ `Kamelet` which periodically sends events (and certain failures). The events are consumed by a log _sink_ in a `KameletBinding`. With the support of the `ErrorHandler`, we will redirect all errors to a `Sink` _error-handler_ `Kamelet` that first stores the events in a `Kafka` topic and then provide a nice log notifying us about the error that occurred.
## Additional Requirements for running this example
- A Kafka broker containing a Kafka Topic, and a Service Account with access to the topic. In this example, we shall name the Topic `my-first-test`.
## Incremental ID Source Kamelet
First of all, you must install the _incremental-id-source_ Kamelet defined in `incremental-id-source.kamelet.yaml` file. This source will emit events every second with an autoincrement counter that will be forced to fail when the number 0 is present in the data. With this trick, we will simulate possible event faults.
```
$ kubectl apply -f incremental-id-source.kamelet.yaml
```
You can check the newly created `kamelet` checking the list of kamelets available:
```
$ kubectl get kamelets
NAME PHASE
incremental-id-source Ready
```
## Log Sink Kamelet
Now it's the turn of installing the log-sink_ Kamelet defined in `log-sink.kamelet.yaml` file:
```
$ kubectl apply -f log-sink.kamelet.yaml
```
You can check the newly created `kamelet` checking the list of kamelets available:
```
$ kubectl get kamelets
NAME PHASE
log-sink Ready
incremental-id-source Ready
```
## Error handler Kamelet
We finally install an error handler as specified in `error-handler.kamelet.yaml` file. Let's have a look at how it is configured:
```
apiVersion: camel.apache.org/v1alpha1
kind: Kamelet
metadata:
name: error-handler
spec:
definition:
...
properties:
kafka-brokers:
...
kafka-topic:
...
kafka-service-account-id:
...
kafka-service-account-secret:
...
log-message:
...
template:
from:
uri: kamelet:source
steps:
# First step: send to the DLC for future processing
- to:
uri: kafka:{{kafka-topic}}
parameters:
brokers: "{{kafka-brokers}}"
security-protocol: SASL_SSL
sasl-mechanism: PLAIN
sasl-jaas-config: "org.apache.kafka.common.security.plain.PlainLoginModule required username={{kafka-service-account-id}} password={{kafka-service-account-secret}};"
# Log an error message to notify about the failure
- set-body:
constant: "{{log-message}} - worry not, the event is stored in the DLC"
- to: "log:error-sink"
```
We first send the errored event to a kafka topic, and then, we send a simple notification message to output, just to let the user know that some issue happened. Let's apply it:
```
$ kubectl apply -f error-handler.kamelet.yaml
```
You can check the newly created `kamelet` listing the kamelets available:
```
$ kubectl get kamelets
NAME PHASE
error-handler Ready
log-sink Ready
incremental-id-source Ready
```
## Error Handler Kamelet Binding
We can now create a `KameletBinding` which is started by the _incremental-id-source_ `Kamelet` and log events to _log-sink_ `Kamelet`. As this will sporadically fail, we can configure an _errorHandler_ with the _error-handler_ `Kamelet` as **Sink**. We want to configure also some redelivery policies (1 retry, with a 2000 milliseconds delay). We can declare it as in `kamelet-binding-error-handler.yaml` file:
```
...
errorHandler:
sink:
endpoint:
ref:
kind: Kamelet
apiVersion: camel.apache.org/v1alpha1
name: error-handler
properties:
message: "ERROR!"
...
parameters:
maximumRedeliveries: 1
redeliveryDelay: 2000
```
Execute the following command to start the `Integration`:
```
kubectl apply -f kamelet-binding-error-handler.yaml
```
As soon as the `Integration` starts, it will log the events on the `ok` log channel and errors on the `error` log channel:
```
[1] 2021-04-29 08:35:08,875 INFO [sink] (Camel (camel-1) thread #0 - timer://tick) Exchange[ExchangePattern: InOnly, BodyType: String, Body: Producing message #49]
[1] 2021-04-29 08:35:11,878 INFO [sink] (Camel (camel-1) thread #0 - timer://tick) Exchange[ExchangePattern: InOnly, BodyType: String, Body: Producing message #51]
[1] 2021-04-29 08:35:12,088 INFO [error-sink] (Camel (camel-1) thread #9 - KafkaProducer[my-first-test]) Exchange[ExchangePattern: InOnly, BodyType: String, Body: ERROR! - worry not, the event is stored in the DLC]
[1] 2021-04-29 08:35:12,877 INFO [sink] (Camel (camel-1) thread #0 - timer://tick) Exchange[ExchangePattern: InOnly, BodyType: String, Body: Producing message #52]
```
### Recover the errors from DLC
If you're curious to know what was going on in the DLC side, you can use the example you found in [kafka sasl ssl consumer](../../kafka/sasl_ssl/):
```
kamel run --config secret:kafka-props SaslSSLKafkaConsumer.java --dev
...
[1] 2021-04-29 08:57:08,636 INFO [org.apa.kaf.com.uti.AppInfoParser] (Camel (camel-1) thread #0 - KafkaConsumer[my-first-test]) Kafka commitId: 448719dc99a19793
[1] 2021-04-29 08:57:08,636 INFO [org.apa.kaf.com.uti.AppInfoParser] (Camel (camel-1) thread #0 - KafkaConsumer[my-first-test]) Kafka startTimeMs: 1619686628635
[1] 2021-04-29 08:57:08,637 INFO [org.apa.cam.com.kaf.KafkaConsumer] (Camel (camel-1) thread #0 - KafkaConsumer[my-first-test]) Subscribing my-first-test-Thread 0 to topic my-first-test
...
[1] 2021-04-29 08:35:02,894 INFO [FromKafka] (Camel (camel-1) thread #0 - KafkaConsumer[my-first-test]) Producing message #40
[1] 2021-04-29 08:35:12,995 INFO [FromKafka] (Camel (camel-1) thread #0 - KafkaConsumer[my-first-test]) Producing message #50
[1] 2021-04-29 08:35:22,879 INFO [FromKafka] (Camel (camel-1) thread #0 - KafkaConsumer[my-first-test]) Producing message #60
...
ยดยดยด