| [[Aggregation-Aggregation]] |
| = Aggregation |
| |
| In a Sink Connector scenario, there are, sometimes, use cases where an end user want to aggregate his Kafka record before sending them to an external system. |
| Usually this can be done by defining a batch size or a timeout and once the aggregation has been completed, sent the aggregate records collection to the external system. |
| |
| In Apache Camel it exists the xref:latest@components:eips:aggregate-eip.adoc[Aggregate EIP] implementation and in Camel-Kafka-connector we wanted to leverage what we already have in the plain Apache Camel project. |
| |
| We introduced then the following options in the Sink Connector Configuration: |
| |
| [source,bash] |
| ---- |
| camel.beans.aggregate=#class:org.apache.camel.kafkaconnector.aggregator.StringAggregator" |
| camel.beans.aggregation.size=10 |
| camel.beans.aggregation.timeout=5000 |
| ---- |
| |
| So you'll be able to define your own AggregationStrategy by writing it through extending the AggregationStrategy Camel class or you can use one of the Aggregator provided by Camel. |
| |
| We are working on adding some Aggregator out of the box in camel-kafka-connector. |
| |
| [[HowDoesAnAggregatorLookLike-HowDoesAnAggregatorLookLike]] |
| == How Does an aggregator look like? |
| |
| An Aggregator is something like the following: |
| |
| [source,java] |
| ---- |
| package org.apache.camel.kafkaconnector.aggregator; |
| |
| import org.apache.camel.AggregationStrategy; |
| import org.apache.camel.Exchange; |
| import org.apache.camel.Message; |
| |
| public class StringAggregator implements AggregationStrategy { |
| |
| @Override |
| public Exchange aggregate(Exchange oldExchange, Exchange newExchange) { |
| // lets append the old body to the new body |
| if (oldExchange == null) { |
| return newExchange; |
| } |
| |
| String body = oldExchange.getIn().getBody(String.class); |
| if (body != null) { |
| Message newIn = newExchange.getIn(); |
| String newBody = newIn.getBody(String.class); |
| if (newBody != null) { |
| body += System.lineSeparator() + newBody; |
| } |
| |
| newIn.setBody(body); |
| } |
| return newExchange; |
| } |
| } |
| ---- |
| |
| And you may think oldExchange and newExchange like records arriving to the Aggregator. |
| |
| So in this case each newExchange body will be concated with the oldExchange body and separated through the System line separator. |
| |
| This process will go ahead until the batch size has been completed or the timeout has been reached. |
| |
| [[ExtendingAConnector-ExtendingAConnector]] |
| == How Can I extend an existing connector for adding an aggregator? |
| |
| Take a look at xref:archetypes.adoc[Archetype Documentation] |