| = Kafka Component |
| :doctitle: Kafka |
| :shortname: kafka |
| :artifactid: camel-kafka |
| :description: Sent and receive messages to/from an Apache Kafka broker. |
| :since: 2.13 |
| :supportlevel: Stable |
| :tabs-sync-option: |
| :component-header: Both producer and consumer are supported |
| //Manually maintained attributes |
| :camel-spring-boot-name: kafka |
| |
| *Since Camel {since}* |
| |
| *{component-header}* |
| |
| The Kafka component is used for communicating with |
| http://kafka.apache.org/[Apache Kafka] message broker. |
| |
| Maven users will need to add the following dependency to their `pom.xml` |
| for this component. |
| |
| [source,xml] |
| ------------------------------------------------------------ |
| <dependency> |
| <groupId>org.apache.camel</groupId> |
| <artifactId>camel-kafka</artifactId> |
| <version>x.x.x</version> |
| <!-- use the same version as your Camel core version --> |
| </dependency> |
| ------------------------------------------------------------ |
| |
| |
| == URI format |
| |
| --------------------------- |
| kafka:topic[?options] |
| --------------------------- |
| |
| |
| // component-configure options: START |
| |
| // component-configure options: END |
| |
| // component options: START |
| include::partial$component-configure-options.adoc[] |
| include::partial$component-endpoint-options.adoc[] |
| // component options: END |
| |
| // endpoint options: START |
| |
| // endpoint options: END |
| |
| For more information about Producer/Consumer configuration: |
| |
| http://kafka.apache.org/documentation.html#newconsumerconfigs[http://kafka.apache.org/documentation.html#newconsumerconfigs] |
| http://kafka.apache.org/documentation.html#producerconfigs[http://kafka.apache.org/documentation.html#producerconfigs] |
| |
| // component headers: START |
| include::partial$component-endpoint-headers.adoc[] |
| // component headers: END |
| |
| If you want to send a message to a dynamic topic then use `KafkaConstants.OVERRIDE_TOPIC` as it is used as a one-time header that is not sent along the message, and actually is removed in the producer. |
| |
| == Consumer error handling |
| |
| While kafka consumer is polling messages from the kafka broker, then errors can happen. This section describes what happens and what |
| you can configure. |
| |
| The consumer may throw exception when invoking the Kafka `poll` API. For example, if the message cannot be deserialized due to invalid data, |
| and many other kinds of errors. Those errors are in the form of `KafkaException` which are either _retriable_ or not. The exceptions |
| which can be retried (`RetriableException`) will be retried again (with a poll timeout in between). All other kinds of exceptions are |
| handled according to the _pollOnError_ configuration. This configuration has the following values: |
| |
| * DISCARD will discard the message and continue to poll the next message. |
| * ERROR_HANDLER will use Camel's error handler to process the exception, and afterwards continue to poll the next message. |
| * RECONNECT will re-connect the consumer and try to poll the message again. |
| * RETRY will let the consumer retry polling the same message again |
| * STOP will stop the consumer (it has to be manually started/restarted if the consumer should be able to consume messages again). |
| |
| The default is *ERROR_HANDLER*, which will let Camel's error handler (if any configured) process the caused exception. |
| Afterwards continue to poll the next message. This behavior is similar to the _bridgeErrorHandler_ option that |
| Camel components have. |
| |
| For advanced control a custom implementation of `org.apache.camel.component.kafka.PollExceptionStrategy` can be configured |
| on the component level, which allows controlling which of the strategies to use for each exception. |
| |
| == Consumer error handling (advanced) |
| |
| By default, Camel will poll using the *ERROR_HANDLER* to process exceptions. |
| How Camel handles a message that results in an exception can be altered using the `breakOnFirstError` attribute in the configuration. |
| Instead of continuing to poll the next message, Camel will instead commit the offset so that the message that caused the exception will be retried. |
| This is similar to the *RETRY* polling strategy above. |
| |
| [source,java] |
| ---- |
| KafkaComponent kafka = new KafkaComponent(); |
| kafka.setBreakOnFirstError(true); |
| ... |
| camelContext.addComponent("kafka", kafka); |
| ---- |
| |
| It is recommended that you read the section below "Using manual commit with Kafka consumer" to understand how `breakOnFirstError` |
| will work based on the `CommitManager` that is configured. |
| |
| == Samples |
| |
| === Consuming messages from Kafka |
| |
| Here is the minimal route you need to read messages from Kafka. |
| |
| [source,java] |
| ---- |
| from("kafka:test?brokers=localhost:9092") |
| .log("Message received from Kafka : ${body}") |
| .log(" on the topic ${headers[kafka.TOPIC]}") |
| .log(" on the partition ${headers[kafka.PARTITION]}") |
| .log(" with the offset ${headers[kafka.OFFSET]}") |
| .log(" with the key ${headers[kafka.KEY]}") |
| ---- |
| |
| If you need to consume messages from multiple topics, you can use a comma separated list of topic names. |
| |
| [source,java] |
| ---- |
| from("kafka:test,test1,test2?brokers=localhost:9092") |
| .log("Message received from Kafka : ${body}") |
| .log(" on the topic ${headers[kafka.TOPIC]}") |
| .log(" on the partition ${headers[kafka.PARTITION]}") |
| .log(" with the offset ${headers[kafka.OFFSET]}") |
| .log(" with the key ${headers[kafka.KEY]}") |
| ---- |
| |
| It's also possible to subscribe to multiple topics giving a pattern as the topic name and using the `topicIsPattern` option. |
| |
| [source,java] |
| ---- |
| from("kafka:test*?brokers=localhost:9092&topicIsPattern=true") |
| .log("Message received from Kafka : ${body}") |
| .log(" on the topic ${headers[kafka.TOPIC]}") |
| .log(" on the partition ${headers[kafka.PARTITION]}") |
| .log(" with the offset ${headers[kafka.OFFSET]}") |
| .log(" with the key ${headers[kafka.KEY]}") |
| ---- |
| |
| When consuming messages from Kafka, you can use your own offset management and not delegate this management to Kafka. |
| To keep the offsets, the component needs a `StateRepository` implementation such as `FileStateRepository`. |
| This bean should be available in the registry. |
| Here how to use it : |
| |
| [source,java] |
| ---- |
| // Create the repository in which the Kafka offsets will be persisted |
| FileStateRepository repository = FileStateRepository.fileStateRepository(new File("/path/to/repo.dat")); |
| |
| // Bind this repository into the Camel registry |
| Registry registry = createCamelRegistry(); |
| registry.bind("offsetRepo", repository); |
| |
| // Configure the camel context |
| DefaultCamelContext camelContext = new DefaultCamelContext(registry); |
| camelContext.addRoutes(new RouteBuilder() { |
| @Override |
| public void configure() throws Exception { |
| from("kafka:" + TOPIC + "?brokers=localhost:{{kafkaPort}}" + |
| // Set up the topic and broker address |
| "&groupId=A" + |
| // The consumer processor group ID |
| "&autoOffsetReset=earliest" + |
| // Ask to start from the beginning if we have unknown offset |
| "&offsetRepository=#offsetRepo") |
| // Keep the offsets in the previously configured repository |
| .to("mock:result"); |
| } |
| }); |
| ---- |
| |
| |
| === Producing messages to Kafka |
| |
| Here is the minimal route you need in order to write messages to Kafka. |
| |
| [source,java] |
| ---- |
| from("direct:start") |
| .setBody(constant("Message from Camel")) // Message to send |
| .setHeader(KafkaConstants.KEY, constant("Camel")) // Key of the message |
| .to("kafka:test?brokers=localhost:9092"); |
| ---- |
| |
| == SSL configuration |
| |
| You have 2 different ways to configure the SSL communication on the Kafka component. |
| |
| The first way is through the many SSL endpoint parameters: |
| |
| [source,java] |
| ---- |
| from("kafka:" + TOPIC + "?brokers=localhost:{{kafkaPort}}" + |
| "&groupId=A" + |
| "&sslKeystoreLocation=/path/to/keystore.jks" + |
| "&sslKeystorePassword=changeit" + |
| "&sslKeyPassword=changeit" + |
| "&securityProtocol=SSL") |
| .to("mock:result"); |
| ---- |
| |
| The second way is to use the `sslContextParameters` endpoint parameter: |
| |
| [source,java] |
| ---- |
| // Configure the SSLContextParameters object |
| KeyStoreParameters ksp = new KeyStoreParameters(); |
| ksp.setResource("/path/to/keystore.jks"); |
| ksp.setPassword("changeit"); |
| KeyManagersParameters kmp = new KeyManagersParameters(); |
| kmp.setKeyStore(ksp); |
| kmp.setKeyPassword("changeit"); |
| SSLContextParameters scp = new SSLContextParameters(); |
| scp.setKeyManagers(kmp); |
| |
| // Bind this SSLContextParameters into the Camel registry |
| Registry registry = createCamelRegistry(); |
| registry.bind("ssl", scp); |
| |
| // Configure the camel context |
| DefaultCamelContext camelContext = new DefaultCamelContext(registry); |
| camelContext.addRoutes(new RouteBuilder() { |
| @Override |
| public void configure() throws Exception { |
| from("kafka:" + TOPIC + "?brokers=localhost:{{kafkaPort}}" + |
| // Set up the topic and broker address |
| "&groupId=A" + |
| // The consumer processor group ID |
| "&sslContextParameters=#ssl" + |
| // The security protocol |
| "&securityProtocol=SSL) |
| // Reference the SSL configuration |
| .to("mock:result"); |
| } |
| }); |
| ---- |
| |
| == Using the Kafka idempotent repository |
| |
| The `camel-kafka` library provides a Kafka topic-based idempotent repository. This repository stores broadcasts all changes to idempotent state (add/remove) in a Kafka topic, and populates a local in-memory cache for each repository's process instance through event sourcing. |
| The topic used must be unique per idempotent repository instance. The mechanism does not have any requirements about the number of topic partitions, as the repository consumes from all partitions at the same time. It also does not have any requirements about the replication factor of the topic. |
| Each repository instance that uses the topic, (e.g., typically on different machines running in parallel) controls its own consumer group, so in a cluster of 10 Camel processes using the same topic, each will control its own offset. |
| On startup, the instance subscribes to the topic, rewinds the offset to the beginning and rebuilds the cache to the latest state. The cache will not be considered warmed up until one poll of `pollDurationMs` in length returns 0 records. Startup will not be completed until either the cache has warmed up, or 30 seconds go by; if the latter happens, the idempotent repository may be in an inconsistent state until its consumer catches up to the end of the topic. |
| Be mindful of the format of the header used for the uniqueness check. By default, it uses Strings as the data types. When using primitive numeric formats, the header must be deserialized accordingly. Check the samples below for examples. |
| |
| A `KafkaIdempotentRepository` has the following properties: |
| [width="100%",cols="2m,5",options="header"] |
| |=== |
| | Property | Description |
| | topic | The name of the Kafka topic to use to broadcast changes. (required) |
| | bootstrapServers | The `bootstrap.servers` property on the internal Kafka producer and consumer. Use this as shorthand if not setting `consumerConfig` and `producerConfig`. If used, this component will apply sensible default configurations for the producer and consumer. |
| | producerConfig | Sets the properties that will be used by the Kafka producer that broadcasts changes. Overrides `bootstrapServers`, so must define the Kafka `bootstrap.servers` property itself |
| | consumerConfig | Sets the properties that will be used by the Kafka consumer that populates the cache from the topic. Overrides `bootstrapServers`, so must define the Kafka `bootstrap.servers` property itself |
| | maxCacheSize | How many of the most recently used keys should be stored in memory (default 1000). |
| | pollDurationMs | The poll duration of the Kafka consumer. The local caches are updated immediately. This value will affect how far behind other peers that update their caches from the topic are relative to the idempotent consumer instance that sent the cache action message. The default value of this is 100 ms. + |
| If setting this value explicitly, be aware that there is a tradeoff between the remote cache liveness and the volume of network traffic between this repository's consumer and the Kafka brokers. The cache warmup process also depends on there being one poll that fetches nothing - this indicates that the stream has been consumed up to the current point. If the poll duration is excessively long for the rate at which messages are sent on the topic, there exists a possibility that the cache cannot be warmed up and will operate in an inconsistent state relative to its peers until it catches up. |
| | groupId | The groupId to assign to the idempotent consumer. If not specified, it will be randomized. |
| |=== |
| |
| The repository can be instantiated by defining the `topic` and `bootstrapServers`, or the `producerConfig` and `consumerConfig` property sets can be explicitly defined to enable features such as SSL/SASL. |
| To use, this repository must be placed in the Camel registry, either manually or by registration as a bean in Spring/Blueprint, as it is `CamelContext` aware. |
| |
| Sample usage is as follows: |
| |
| [source,java] |
| ---- |
| KafkaIdempotentRepository kafkaIdempotentRepository = new KafkaIdempotentRepository("idempotent-db-inserts", "localhost:9091"); |
| |
| SimpleRegistry registry = new SimpleRegistry(); |
| registry.put("insertDbIdemRepo", kafkaIdempotentRepository); // must be registered in the registry, to enable access to the CamelContext |
| CamelContext context = new CamelContext(registry); |
| |
| // later in RouteBuilder... |
| from("direct:performInsert") |
| .idempotentConsumer(header("id")).idempotentRepository("insertDbIdemRepo") |
| // once-only insert into the database |
| .end() |
| ---- |
| |
| In XML: |
| |
| [source,xml] |
| ---- |
| <!-- simple --> |
| <bean id="insertDbIdemRepo" |
| class="org.apache.camel.processor.idempotent.kafka.KafkaIdempotentRepository"> |
| <property name="topic" value="idempotent-db-inserts"/> |
| <property name="bootstrapServers" value="localhost:9091"/> |
| </bean> |
| |
| <!-- complex --> |
| <bean id="insertDbIdemRepo" |
| class="org.apache.camel.processor.idempotent.kafka.KafkaIdempotentRepository"> |
| <property name="topic" value="idempotent-db-inserts"/> |
| <property name="maxCacheSize" value="10000"/> |
| <property name="consumerConfig"> |
| <props> |
| <prop key="bootstrap.servers">localhost:9091</prop> |
| </props> |
| </property> |
| <property name="producerConfig"> |
| <props> |
| <prop key="bootstrap.servers">localhost:9091</prop> |
| </props> |
| </property> |
| </bean> |
| ---- |
| |
| There are 3 alternatives to choose from when using idempotency with numeric identifiers. The first one is to use the static method `numericHeader` method from `org.apache.camel.component.kafka.serde.KafkaSerdeHelper` to perform the conversion for you: |
| |
| [source,java] |
| ---- |
| from("direct:performInsert") |
| .idempotentConsumer(numericHeader("id")).idempotentRepository("insertDbIdemRepo") |
| // once-only insert into the database |
| .end() |
| ---- |
| |
| Alternatively, it is possible to use a custom serializer configured via the route URL to perform the conversion: |
| |
| [source,java] |
| ---- |
| public class CustomHeaderDeserializer extends DefaultKafkaHeaderDeserializer { |
| private static final Logger LOG = LoggerFactory.getLogger(CustomHeaderDeserializer.class); |
| |
| @Override |
| public Object deserialize(String key, byte[] value) { |
| if (key.equals("id")) { |
| BigInteger bi = new BigInteger(value); |
| |
| return String.valueOf(bi.longValue()); |
| } else { |
| return super.deserialize(key, value); |
| } |
| } |
| } |
| ---- |
| |
| Lastly, it is also possible to do so in a processor: |
| |
| [source,java] |
| ---- |
| from(from).routeId("foo") |
| .process(exchange -> { |
| byte[] id = exchange.getIn().getHeader("id", byte[].class); |
| |
| BigInteger bi = new BigInteger(id); |
| exchange.getIn().setHeader("id", String.valueOf(bi.longValue())); |
| }) |
| .idempotentConsumer(header("id")) |
| .idempotentRepository("kafkaIdempotentRepository") |
| .to(to); |
| ---- |
| |
| == Using manual commit with Kafka consumer |
| |
| By default, the Kafka consumer will use auto commit, where the offset will be committed automatically in the background using a given interval. |
| |
| In case you want to force manual commits, you can use `KafkaManualCommit` API from the Camel Exchange, stored on the message header. |
| This requires turning on manual commits by either setting the option `allowManualCommit` to `true` on the `KafkaComponent` |
| or on the endpoint, for example: |
| |
| [source,java] |
| ---- |
| KafkaComponent kafka = new KafkaComponent(); |
| kafka.setAutoCommitEnable(false); |
| kafka.setAllowManualCommit(true); |
| ... |
| camelContext.addComponent("kafka", kafka); |
| ---- |
| |
| By default, it uses the `NoopCommitManager` behind the scenes. To commit an offset, you will |
| require you to use the `KafkaManualCommit` from Java code such as a Camel `Processor`: |
| |
| [source,java] |
| ---- |
| public void process(Exchange exchange) { |
| KafkaManualCommit manual = |
| exchange.getIn().getHeader(KafkaConstants.MANUAL_COMMIT, KafkaManualCommit.class); |
| manual.commit(); |
| } |
| ---- |
| |
| The `KafkaManualCommit` will force a synchronous commit which will block until the commit is acknowledged on Kafka, or if it fails an exception is thrown. |
| You can use an asynchronous commit as well by configuring the `KafkaManualCommitFactory` with the `DefaultKafkaManualAsyncCommitFactory` implementation. |
| |
| Then the commit will be done in the next consumer loop using the kafka asynchronous commit api. |
| |
| If you want to use a custom implementation of `KafkaManualCommit` then you can configure a custom `KafkaManualCommitFactory` |
| on the `KafkaComponent` that creates instances of your custom implementation. |
| |
| When configuring a consumer to use manual commit and a specific `CommitManager` it is important to understand how these influence the behavior |
| of `breakOnFirstError` |
| |
| [source,java] |
| ---- |
| KafkaComponent kafka = new KafkaComponent(); |
| kafka.setAutoCommitEnable(false); |
| kafka.setAllowManualCommit(true); |
| kafka.setBreakOnFirstError(true); |
| kafka.setKafkaManualCommitFactory(new DefaultKafkaManualCommitFactory()); |
| ... |
| camelContext.addComponent("kafka", kafka); |
| ---- |
| |
| When the `CommitManager` is left to the default `NoopCommitManager` then `breakOnFirstError` will not automatically commit the offset so that the |
| message with an error is retried. The consumer must manage that in the route using `KafkaManualCommit`. |
| |
| |
| When the `CommitManager` is changed to either the synchronous or asynchronous manager then `breakOnFirstError` will automatically commit the offset so that the |
| message with an error is retried. This message will be continually retried until it can be processed without an error. |
| |
| *Note 1*: records from a partition must be processed and committed by the same thread as the consumer. This means that certain EIPs, async or concurrent operations |
| in the DSL may cause the commit to fail. In such circumstances, tyring to commit the transaction will cause the Kafka client to throw a `java.util.ConcurrentModificationException` |
| exception with the message `KafkaConsumer is not safe for multi-threaded access`. To prevent this from happening, redesign your route to avoid those operations. |
| |
| *Note 2: this is mostly useful with aggregation's completion timeout strategies. |
| |
| == Pausable Consumers |
| |
| The Kafka component supports pausable consumers. This type of consumer can pause consuming data based on |
| conditions external to the component itself, such as an external system being unavailable or other transient conditions. |
| |
| [source,java] |
| ---- |
| from("kafka:topic") |
| .pausable(new KafkaConsumerListener(), () -> canContinue()) // the pausable check gets called if the exchange fails to be processed ... |
| .routeId("pausable-route") |
| .process(this::process) // Kafka consumer will be paused if this one throws an exception ... |
| .to("some:destination"); // or this one |
| ---- |
| |
| In this example, consuming messages can pause (by calling the Kafka's Consumer pause method) if the result from `canContinue` is false. |
| |
| The pausable EIP is meant to be used as a support mechanism when there is an exception somewhere in the route that prevents the exchange from being processed. More specifically, |
| the check called by the `pausable` EIP should be used to test for transient conditions preventing the exchange from being processed. |
| |
| == Kafka Headers propagation |
| |
| When consuming messages from Kafka, headers will be propagated to camel exchange headers automatically. |
| Producing flow backed by same behaviour - camel headers of particular exchange will be propagated to kafka message headers. |
| |
| Since kafka headers allow only `byte[]` values, in order camel exchange header to be propagated its value should be serialized to `bytes[]`, |
| otherwise header will be skipped. |
| The following header value types are supported: `String`, `Integer`, `Long`, `Double`, `Boolean`, `byte[]`. |
| Note: all headers propagated *from* kafka *to* camel exchange will contain `byte[]` value by default. |
| To override default functionality, these uri parameters can be set: `headerDeserializer` for `from` route and `headerSerializer` for `to` route. For example: |
| |
| [source,java] |
| ---- |
| from("kafka:my_topic?headerDeserializer=#myDeserializer") |
| ... |
| .to("kafka:my_topic?headerSerializer=#mySerializer") |
| ---- |
| |
| By default, all headers are being filtered by `KafkaHeaderFilterStrategy`. |
| Strategy filters out headers which start with `Camel` or `org.apache.camel` prefixes. |
| Default strategy can be overridden by using `headerFilterStrategy` uri parameter in both `to` and `from` routes: |
| |
| [source,java] |
| ---- |
| from("kafka:my_topic?headerFilterStrategy=#myStrategy") |
| ... |
| .to("kafka:my_topic?headerFilterStrategy=#myStrategy") |
| ---- |
| |
| `myStrategy` object should be a subclass of `HeaderFilterStrategy` and must be placed in the Camel registry, either manually or by registration as a bean in Spring/Blueprint, as it is `CamelContext` aware. |
| |
| == Kafka Transaction |
| |
| You need to add `transactional.id`, `enable.idempotence` and `retries` in `additional-properties` to enable kafka transaction with the producer. |
| [source,java] |
| ---- |
| from("direct:transaction") |
| .to("kafka:my_topic?additional-properties[transactional.id]=1234&additional-properties[enable.idempotence]=true&additional-properties[retries]=5"); |
| ---- |
| At the end of exchange routing, the kafka producer would commit the transaction or abort it if there is an Exception throwing or the exchange is `RollbackOnly`. Since Kafka does not support transactions in multi threads, it will throw `ProducerFencedException` if there is another producer with the same `transaction.id` to make the transactional request. |
| |
| It would work with JTA `camel-jta` by using `transacted()` and if it involves some resources (SQL or JMS), which supports XA, then they would work in tandem, where they both will either commit or rollback at the end of the exchange routing. In some cases, if the JTA transaction manager fails to commit (during the 2PC processing), but kafka transaction has been committed before and there is no chance to roll back the changes since the kafka transaction does not support JTA/XA spec. There is still a risk with the data consistency. |
| |
| == Setting Kerberos config file |
| |
| Configure the 'krb5.conf' file directly through the API |
| [source,java] |
| ---- |
| static { |
| KafkaComponent.setKerberosConfigLocation("path/to/config/file"); |
| } |
| ---- |
| |
| == Batching Consumer |
| |
| To use a Kafka batching consumer with Camel, an application has to set the configuration `batching` to `true`. |
| |
| |
| The received records are stored in a list in the exchange used in the pipeline. As such, it is possible to commit individually |
| every record or the whole batch at once by committing the last exchange on the list. |
| |
| The size of the batch is controlled by the option `maxPollRecords`. |
| |
| To avoid blocking for too long, waiting for the whole set of records to fill the batch, it is possible to use the `pollTimeoutMs` option to set a timeout for the polling. In this case, the batch may contain less messages than set in the `maxPollRecords`. |
| |
| === Automatic Commits |
| |
| By default, Camel uses automatic commits when using batch processing. In this case, Camel automatically commits the records after they have been successfully processed by the application. |
| |
| In case of failures, the records will not be processed. |
| |
| The code below provides an example of this approach: |
| [source,java] |
| ---- |
| public void configure() { |
| from("kafka:topic?groupId=myGroup&pollTimeoutMs=1000&batching=true&maxPollRecords=10&autoOffsetReset=earliest").process(e -> { |
| // The received records are stored as exchanges in a list. This gets the list of those exchanges |
| final List<?> exchanges = e.getMessage().getBody(List.class); |
| |
| // Ensure we are actually receiving what we are asking for |
| if (exchanges == null || exchanges.isEmpty()) { |
| return; |
| } |
| |
| // The records from the batch are stored in a list of exchanges in the original exchange. To process, we iterate over that list |
| for (Object obj : exchanges) { |
| if (obj instanceof Exchange exchange) { |
| LOG.info("Processing exchange with body {}", exchange.getMessage().getBody(String.class)); |
| } |
| } |
| }).to(KafkaTestUtil.MOCK_RESULT); |
| } |
| ---- |
| |
| ==== Handling Errors with Automatic Commits |
| |
| When using automatic commits, Camel will not commit records if there is a failure in processing. Because of this, there is a risk that records could be reprocessed multiple times. |
| |
| It is recommended to implement appropriate error handling mechanisms and patterns (i.e.; such as dead-letter queues), to prevent failed records from blocking processing progress. |
| |
| The code below provides an example of handling errors with automatic commits: |
| |
| [source,java] |
| ---- |
| public void configure() { |
| /* |
| We want to use continued here, so that Camel auto-commits the batch even though part of it has failed. In a |
| production scenario, applications should probably send these records to a separate topic or fix the condition |
| that lead to the failure |
| */ |
| onException(IllegalArgumentException.class).process(exchange -> { |
| LOG.warn("Failed to process batch {}", exchange.getMessage().getBody()); |
| LOG.warn("Failed to process due to {}", exchange.getProperty(Exchange.EXCEPTION_CAUGHT, Throwable.class).getMessage()); |
| }).continued(true); |
| |
| from("kafka:topic?groupId=myGroup&pollTimeoutMs=1000&batching=true&maxPollRecords=10&autoOffsetReset=earliest").process(e -> { |
| // The received records are stored as exchanges in a list. This gets the list of those exchanges |
| final List<?> exchanges = e.getMessage().getBody(List.class); |
| |
| // Ensure we are actually receiving what we are asking for |
| if (exchanges == null || exchanges.isEmpty()) { |
| return; |
| } |
| |
| // The records from the batch are stored in a list of exchanges in the original exchange. |
| int i = 0; |
| for (Object o : exchanges) { |
| if (o instanceof Exchange exchange) { |
| i++; |
| LOG.info("Processing exchange with body {}", exchange.getMessage().getBody(String.class)); |
| |
| if (i == 4) { |
| throw new IllegalArgumentException("Failed to process record"); |
| } |
| } |
| } |
| }).to(KafkaTestUtil.MOCK_RESULT); |
| } |
| ---- |
| |
| ==== Manual Commits |
| |
| When working with batch processing with manual commits, it's up to the application to commit the records, and handle the outcome of potentially invalid records. |
| |
| The code below provides an example of this approach: |
| |
| [source,java] |
| ---- |
| public void configure() { |
| from("kafka:topic?batching=true&allowManualCommit=true&maxPollRecords=100&kafkaManualCommitFactory=#class:org.apache.camel.component.kafka.consumer.DefaultKafkaManualCommitFactory") |
| .process(e -> { |
| // The received records are stored as exchanges in a list. This gets the list of those exchanges |
| final List<?> exchanges = e.getMessage().getBody(List.class); |
| |
| // Ensure we are actually receiving what we are asking for |
| if (exchanges == null || exchanges.isEmpty()) { |
| return; |
| } |
| |
| /* |
| Every exchange in that list should contain a reference to the manual commit object. We use the reference |
| for the last exchange in the list to commit the whole batch |
| */ |
| final Object tmp = exchanges.getLast(); |
| if (tmp instanceof Exchange exchange) { |
| KafkaManualCommit manual = |
| exchange.getMessage().getHeader(KafkaConstants.MANUAL_COMMIT, KafkaManualCommit.class); |
| LOG.debug("Performing manual commit"); |
| manual.commit(); |
| LOG.debug("Done performing manual commit"); |
| } |
| }); |
| } |
| ---- |
| |
| ==== Dealing with long polling timeouts |
| |
| In some cases, applications may want the polling process to have a long timeout (see: `pollTimeoutMs`). |
| |
| To properly do so, first make sure to have a max polling interval that is higher than the polling timeout (see: `maxPollIntervalMs`). |
| |
| Then, increase the shutdown timeout to ensure that committing, closing and other Kafka operations are not abruptly aborted. For instance: |
| |
| [source,java] |
| ---- |
| public void configure() { |
| // Note that this can be configured in other ways |
| getCamelContext().getShutdownStrategy().setTimeout(10000); |
| |
| // route setup ... |
| } |
| ---- |
| |
| == Custom Subscription Adapters |
| |
| Applications with complex subscription logic may provide a custom bean to handle the subscription process. To so, it is |
| necessary to implement the interface `SubscribeAdapter`. |
| |
| [source,java] |
| .Example subscriber adapter that subscribes to a set of Kafka topics or patterns |
| ---- |
| public class CustomSubscribeAdapter implements SubscribeAdapter { |
| @Override |
| public void subscribe(Consumer<?, ?> consumer, ConsumerRebalanceListener reBalanceListener, TopicInfo topicInfo) { |
| if (topicInfo.isPattern()) { |
| consumer.subscribe(topicInfo.getPattern(), reBalanceListener); |
| } else { |
| consumer.subscribe(topicInfo.getTopics(), reBalanceListener); |
| } |
| } |
| } |
| ---- |
| |
| Then, it is necessary to add it as named bean instance to the registry: |
| |
| [source,java] |
| .Add to registry example |
| ---- |
| context.getRegistry().bind(KafkaConstants.KAFKA_SUBSCRIBE_ADAPTER, new CustomSubscribeAdapter()); |
| ---- |
| |
| == Interoperability |
| |
| === JMS |
| |
| When interoperating Kafka and JMS, it may be necessary to coerce the JMS headers into their expected type. |
| |
| For instance, when consuming messages from Kafka carrying JMS headers and then sending them to a JMS broker, those headers are |
| first deserialized into a byte array. Then, the `camel-jms` component tries to coerce this byte array into the |
| specific type used by. |
| However, both the origin endpoint as well as how this was setup on the code itsef may affect how the data is serialized and |
| deserialized. As such, it is not feasible to naively assume the data type of the byte array. |
| |
| To address this issue, we provide a custom header deserializer to force Kafka to de-serialize the JMS data according to |
| the JMS specification. This approach ensures that the headers are properly interpreted and processed by the camel-jms component. |
| |
| Due to the inherent complexity of each possible system and endpoint, it may not be possible for this deserializer to cover all |
| possible scenarios. As such, it is provided as model that can be modified and/or adapted for the specific needs of each application. |
| |
| To utilize this solution, you need to modify the route URI on the consumer end of the pipeline by including the |
| `headerDeserializer` option. |
| For example: |
| |
| [source,java] |
| .Route snippet |
| ---- |
| from("kafka:topic?headerDeserializer=#class:org.apache.camel.component.kafka.consumer.support.interop.JMSDeserializer") |
| .to("..."); |
| ---- |
| |
| include::spring-boot:partial$starter.adoc[] |
| |
| |
| |