| [[ContentEnricher-ContentEnricher]] |
| = Content Enricher |
| |
| Camel supports the |
| http://www.enterpriseintegrationpatterns.com/DataEnricher.html[Content |
| Enricher] from the xref:enterprise-integration-patterns.adoc[EIP |
| patterns] using a xref:message-translator.adoc[Message Translator], an |
| arbitrary xref:processor.adoc[Processor] in the routing logic, or using |
| the xref:content-enricher.adoc[enrich] DSL element to enrich the |
| message. |
| |
| image::eip/DataEnricher.gif[image] |
| |
| [[ContentEnricher-ContentenrichmentusingaMessageTranslatororaProcessor]] |
| == Content enrichment using a Message Translator or a Processor |
| |
| *Using the* *xref:fluent-builders.adoc[Fluent Builders]* |
| |
| You can use xref:templating.adoc[Templating] to consume a message from |
| one destination, transform it with something like |
| xref:components::velocity-component.adoc[Velocity] or xref:components::xquery-component.adoc[XQuery], and then send |
| it on to another destination. For example using InOnly (one way |
| messaging) |
| |
| [source,java] |
| ---- |
| from("activemq:My.Queue") |
| .to("velocity:com/acme/MyResponse.vm") |
| .to("activemq:Another.Queue"); |
| ---- |
| |
| If you want to use InOut (request-reply) semantics to process requests |
| on the *My.Queue* queue on xref:components::activemq-component.adoc[ActiveMQ] with a template |
| generated response, then sending responses back to the JMSReplyTo |
| Destination you could use this: |
| |
| [source,java] |
| ---- |
| from("activemq:My.Queue") |
| .to("velocity:com/acme/MyResponse.vm"); |
| ---- |
| |
| Here is a simple example using the xref:dsl.adoc[DSL] directly to |
| transform the message |
| |
| [source,java] |
| ---- |
| include::{examplesdir}/core/camel-core/src/test/java/org/apache/camel/processor/TransformViaDSLTest.java[tags=example] |
| ---- |
| |
| In this example we add our own xref:processor.adoc[Processor] using |
| explicit Java |
| |
| [source,java] |
| ---- |
| include::{examplesdir}/core/camel-core/src/test/java/org/apache/camel/processor/TransformTest.java[tags=example] |
| ---- |
| |
| we can use xref:bean-integration.adoc[Bean Integration] to use any Java |
| method on any bean to act as the transformer |
| |
| [source,java] |
| ---- |
| from("activemq:My.Queue") |
| .beanRef("myBeanName", "myMethodName") |
| .to("activemq:Another.Queue"); |
| ---- |
| |
| For further examples of this pattern in use you could look at one of the |
| JUnit tests |
| |
| * https://github.com/apache/camel/blob/master/core/camel-core/src/test/java/org/apache/camel/processor/TransformTest.java[TransformTest] |
| * https://github.com/apache/camel/blob/master/core/camel-core/src/test/java/org/apache/camel/processor/TransformViaDSLTest.java[TransformViaDSLTest] |
| |
| *Using Spring XML* |
| |
| [source,xml] |
| ---- |
| <route> |
| <from uri="activemq:Input"/> |
| <bean ref="myBeanName" method="doTransform"/> |
| <to uri="activemq:Output"/> |
| </route> |
| ---- |
| |
| [[ContentEnricher-ContentenrichmentusingtheenrichDSLelement]] |
| == Content enrichment using the `enrich` DSL element |
| |
| Camel comes with two flavors of content enricher in the DSL |
| |
| * `enrich` |
| * `pollEnrich` |
| |
| `enrich` uses a `Producer` to obtain the additional data. It is usually |
| used for xref:requestReply-eip.adoc[Request Reply] messaging, for instance |
| to invoke an external web service. + |
| `pollEnrich` on the other hand uses a xref:polling-consumer.adoc[Polling |
| Consumer] to obtain the additional data. It is usually used for |
| xref:event-message.adoc[Event Message] messaging, for instance to read a |
| file or download a xref:components::ftp-component.adoc[FTP] file. |
| |
| Camel 2.15 or older - Data from current Exchange not used |
| |
| `pollEnrich` or `enrich` does *not* access any data from the current |
| xref:exchange.adoc[Exchange] which means when polling it cannot use any |
| of the existing headers you may have set on the |
| xref:exchange.adoc[Exchange]. For example you cannot set a filename in |
| the `Exchange.FILE_NAME` header and use `pollEnrich` to consume only |
| that file. For that you *must* set the filename in the endpoint URI. |
| |
| Instead of using `enrich` you can use xref:recipientList-eip.adoc[Recipient |
| List] and have dynamic endpoints and define an `AggregationStrategy` on |
| the xref:recipientList-eip.adoc[Recipient List] which then would work as a |
| `enrich` would do. |
| |
| pollEnrich only accept one message as response. That means that if you |
| target to enrich your original message with the enricher collecting |
| messages from a seda, ... components using an aggregation strategy. Only |
| one response message will be aggregated with the original message. |
| |
| From *Camel 2.16* onwards both enrich and pollEnrich supports dynamic |
| endpoints that uses an xref:expression.adoc[Expression] to compute the |
| uri, which allows to use data from the current |
| xref:exchange.adoc[Exchange]. In other words all what is told above no |
| longer apply and it just works. |
| |
| [[ContentEnricher-EnrichOptions]] |
| == Enrich Options |
| |
| confluenceTableSmall |
| |
| [width="100%",cols="34%,33%,33%",options="header",] |
| |======================================================================= |
| |Name |Default Value |Description |
| |`uri` | |The endpoint uri for the external service to enrich from. You |
| must use either `uri` or `ref`. *Important:* From Camel 2.16 onwards, |
| this option is removed, and you use an xref:expression.adoc[Expression] |
| to configure the uri, such as xref:simple-language.adoc[Simple] or |
| xref:constant-language.adoc[Constant] or any other dynamic language that can |
| compute the uri dynamically using values from the current |
| xref:exchange.adoc[Exchange]. |
| |
| |`ref` | |Refers to the endpoint for the external service to enrich |
| from. You must use either `uri` or `ref`. **Important:** From Camel |
| 2.16 onwards, this option is removed, and you use an |
| xref:expression.adoc[Expression] to configure the uri, such as |
| xref:simple-language.adoc[Simple] or xref:constant-language.adoc[Constant] or any other |
| dynamic language that can compute the uri dynamically using values from |
| the current xref:exchange.adoc[Exchange]. |
| |
| |expression | |*Camel 2.16:* Mandatory. |
| The xref:expression.adoc[Expression] to configure the uri, such as |
| xref:simple-language.adoc[Simple] or xref:constant-language.adoc[Constant] or any other |
| dynamic language that can compute the uri dynamically using values from |
| the current xref:exchange.adoc[Exchange]. |
| |
| |`strategyRef` | |Refers to an |
| https://github.com/apache/camel/blob/master/core/camel-api/src/main/java/org/apache/camel/AggregationStrategy.java[AggregationStrategy] |
| to be used to merge the reply from the external service, into a single |
| outgoing message. By default Camel will use the reply from the external |
| service as outgoing message. From *Camel 2.12* onwards you can also use |
| a POJO as the `AggregationStrategy`, see the |
| xref:aggregate-eip.adoc[Aggregate] page for more details. |
| |
| |`strategyMethodName` | |*Camel 2.12:* This option can be used to |
| explicit declare the method name to use, when using POJOs as the |
| `AggregationStrategy`. See the xref:aggregate-eip.adoc[Aggregate] page for |
| more details. |
| |
| |`strategyMethodAllowNull` |`false` |*Camel 2.12:* If this option is |
| `false` then the aggregate method is not used if there was no data to |
| enrich. If this option is `true` then `null` values is used as the |
| `oldExchange` (when no data to enrich), when using POJOs as the |
| `AggregationStrategy`. See the xref:aggregate-eip.adoc[Aggregate] page for |
| more details. |
| |
| |`aggregateOnException` |`false` |*Camel 2.14:* If this option is |
| `false` then the aggregate method is *not* used if there was an |
| exception thrown while trying to retrieve the data to enrich from the |
| resource. Setting this option to `true` allows end users to control what |
| to do if there was an exception in the `aggregate` method. For example |
| to suppress the exception or set a custom message body etc. |
| |
| |`shareUnitOfWork` |`false` |*Camel 2.16:* Shares the unit of work with |
| the parent and the resource exchange. Enrich will by default not share |
| unit of work between the parent exchange and the resource exchange. This |
| means the resource exchange has its own individual unit of work. See |
| xref:split-eip.adoc[Splitter] for more information and example. |
| |
| |`cacheSize` | |*Camel 2.16:* Allows to configure the cache size for |
| the `ProducerCache` which caches producers for reuse in the enrich. Will |
| by default use the default cache size which is 1000. Setting the value |
| to -1 allows to turn off the cache all together. |
| |
| |`ignoreInvalidEndpoint` |`false` |**Camel 2.16:** Whether to ignore an |
| endpoint URI that could not be resolved. If disabled, Camel will throw |
| an exception identifying the invalid endpoint URI. |
| |======================================================================= |
| |
| *Using the* *xref:fluent-builders.adoc[Fluent Builders]* |
| |
| |
| [source,java] |
| ---- |
| AggregationStrategy aggregationStrategy = ... |
| |
| from("direct:start") |
| .enrich("direct:resource", aggregationStrategy) |
| .to("direct:result"); |
| |
| from("direct:resource") ... |
| ---- |
| |
| The content enricher (`enrich`) retrieves additional data from a |
| _resource endpoint_ in order to enrich an incoming message (contained in |
| the _original exchange_). An aggregation strategy is used to combine the |
| original exchange and the _resource exchange_. The first parameter of |
| the `AggregationStrategy.aggregate(Exchange, Exchange)` method |
| corresponds to the the original exchange, the second parameter the |
| resource exchange. The results from the resource endpoint are stored in |
| the resource exchange's out-message. Here's an example template for |
| implementing an aggregation strategy: |
| |
| [source,java] |
| ---- |
| public class ExampleAggregationStrategy implements AggregationStrategy { |
| |
| public Exchange aggregate(Exchange original, Exchange resource) { |
| Object originalBody = original.getIn().getBody(); |
| Object resourceResponse = resource.getIn().getBody(); |
| Object mergeResult = ... // combine original body and resource response |
| |
| if (original.getPattern().isOutCapable()) { |
| original.getOut().setBody(mergeResult); |
| } else { |
| original.getIn().setBody(mergeResult); |
| } |
| |
| return original; |
| } |
| } |
| ---- |
| |
| Using this template the original exchange can be of any pattern. The |
| resource exchange created by the enricher is always an in-out exchange. |
| |
| *Using Spring XML* |
| |
| The same example in the Spring DSL |
| |
| [source,xml] |
| ---- |
| <camelContext id="camel" xmlns="http://camel.apache.org/schema/spring"> |
| <route> |
| <from uri="direct:start"/> |
| <enrich strategyRef="aggregationStrategy"> |
| <constant>direct:resource</constant> |
| </enrich> |
| <to uri="direct:result"/> |
| </route> |
| <route> |
| <from uri="direct:resource"/> |
| ... |
| </route> |
| </camelContext> |
| |
| <bean id="aggregationStrategy" class="..." /> |
| ---- |
| |
| [[ContentEnricher-Aggregationstrategyisoptional]] |
| === Aggregation strategy is optional |
| |
| The aggregation strategy is optional. If you do not provide it Camel |
| will by default just use the body obtained from the resource. |
| |
| [source,java] |
| ---- |
| from("direct:start") |
| .enrich("direct:resource") |
| .to("direct:result"); |
| ---- |
| |
| In the route above the message sent to the `direct:result` endpoint will |
| contain the output from the `direct:resource` as we do not use any |
| custom aggregation. |
| |
| And for Spring DSL: |
| |
| [source,xml] |
| ---- |
| <route> |
| <from uri="direct:start"/> |
| <enrich> |
| <constant>direct:resource</constant> |
| </enrich> |
| <to uri="direct:result"/> |
| </route> |
| ---- |
| |
| [[ContentEnricher-Usingdynamicuris]] |
| === Using dynamic uris |
| |
| *Since Camel 2.16* |
| |
| From Camel 2.16 onwards enrich and pollEnrich supports using dynamic |
| uris computed based on information from the |
| current xref:exchange.adoc[Exchange]. For example to enrich from |
| a xref:components::http-component.adoc[HTTP] endpoint where the header with key orderId is |
| used as part of the content-path of the HTTP url: |
| |
| [source,java] |
| ---- |
| from("direct:start") |
| .enrich().simple("http:myserver/$\{header.orderId}/order") |
| .to("direct:result"); |
| ---- |
| |
| And in XML DSL |
| |
| [source,xml] |
| ---- |
| <route> |
| <from uri="direct:start"/> |
| <enrich> |
| <simple>http:myserver/$\{header.orderId}/order</simple> |
| </enrich> |
| <to uri="direct:result"/> |
| </route> |
| ---- |
| |
| [[ContentEnricher-ContentenrichmentusingpollEnrich]] |
| === Content enrichment using `pollEnrich` |
| |
| The `pollEnrich` works just as the `enrich` however as it uses a |
| xref:polling-consumer.adoc[Polling Consumer] we have 3 methods when |
| polling |
| |
| * receive |
| * receiveNoWait |
| * receive(timeout) |
| |
| [[ContentEnricher-PollEnrichOptions]] |
| === PollEnrich Options |
| |
| [width="100%",cols="34%,33%,33%",options="header",] |
| |======================================================================= |
| |Name |Default Value |Description |
| |`uri` | |The endpoint uri for the external service to enrich from. You |
| must use either `uri` or `ref`. **Important:** From Camel 2.16 onwards, |
| this option is removed, and you use an xref:expression.adoc[Expression] |
| to configure the uri, such as xref:simple-language.adoc[Simple] or |
| xref:constant-language.adoc[Constant] or any other dynamic language that can |
| compute the uri dynamically using values from the current |
| xref:exchange.adoc[Exchange]. |
| |
| |`ref` | |Refers to the endpoint for the external service to enrich |
| from. You must use either `uri` or `ref`. **Important:** From Camel 2.16 |
| onwards, this option is removed, and you use an |
| xref:expression.adoc[Expression] to configure the uri, such as |
| xref:simple-language.adoc[Simple] or xref:constant-language.adoc[Constant] or any other |
| dynamic language that can compute the uri dynamically using values from |
| the current xref:exchange.adoc[Exchange]. |
| |
| |`expression` | |**Camel 2.16:** Mandatory. |
| The xref:expression.adoc[Expression] to configure the uri, such as |
| xref:simple-language.adoc[Simple] or xref:constant-language.adoc[Constant] or any other |
| dynamic language that can compute the uri dynamically using values from |
| the current xref:exchange.adoc[Exchange]. |
| |
| |`strategyRef` | |Refers to an |
| https://github.com/apache/camel/blob/master/core/camel-api/src/main/java/org/apache/camel/AggregationStrategy.java[AggregationStrategy] |
| to be used to merge the reply from the external service, into a single |
| outgoing message. By default Camel will use the reply from the external |
| service as outgoing message. From *Camel 2.12* onwards you can also use |
| a POJO as the `AggregationStrategy`, see the |
| xref:aggregate-eip.adoc[Aggregate] page for more details. |
| |
| |`strategyMethodName` | |*Camel 2.12:* This option can be used to |
| explicit declare the method name to use, when using POJOs as the |
| `AggregationStrategy`. See the xref:aggregate-eip.adoc[Aggregate] page for |
| more details. |
| |
| |`strategyMethodAllowNull` |`false` |*Camel 2.12:* If this option is |
| `false` then the aggregate method is not used if there was no data to |
| enrich. If this option is `true` then `null` values is used as the |
| `oldExchange` (when no data to enrich), when using POJOs as the |
| `AggregationStrategy`. See the xref:aggregate-eip.adoc[Aggregate] page for |
| more details. |
| |
| |`timeout` |`-1` |Timeout in millis when polling from the external |
| service. See below for important details about the timeout. |
| |
| |`aggregateOnException` |`false` |*Camel 2.14:* If this option is |
| `false` then the aggregate method is *not* used if there was an |
| exception thrown while trying to retrieve the data to enrich from the |
| resource. Setting this option to `true` allows end users to control what |
| to do if there was an exception in the `aggregate` method. For example |
| to suppress the exception or set a custom message body etc. |
| |
| |`cacheSize` | |*Camel 2.16:* Allows to configure the cache size for |
| the `ConsumerCache` which caches consumers for reuse in the pollEnrich. |
| Will by default use the default cache size which is 1000. Setting the |
| value to -1 allows to turn off the cache all together. |
| |
| |`ignoreInvalidEndpoint` |`false` |**Camel 2.16:** Whether to ignore an |
| endpoint URI that could not be resolved. If disabled, Camel will throw |
| an exception identifying the invalid endpoint URI. |
| |======================================================================= |
| |
| Good practice to use timeout value |
| |
| By default Camel will use the `receive`. Which may block until there is |
| a message available. It is therefore recommended to always provide a |
| timeout value, to make this clear that we may wait for a message, until |
| the timeout is hit. |
| |
| If there is no data then the `newExchange` in the aggregation strategy |
| is `null`. |
| |
| You can pass in a timeout value that determines which method to use |
| |
| * if timeout is -1 or other negative number then `receive` is selected |
| (*Important:* the `receive` method may block if there is no message) |
| * if timeout is 0 then `receiveNoWait` is selected |
| * otherwise `receive(timeout)` is selected |
| |
| The timeout values is in millis. |
| |
| Camel 2.15 or older - Data from current Exchange not used |
| |
| `pollEnrich` does *not* access any data from the current |
| xref:exchange.adoc[Exchange] which means when polling it cannot use any |
| of the existing headers you may have set on the |
| xref:exchange.adoc[Exchange]. For example you cannot set a filename in |
| the `Exchange.FILE_NAME` header and use `pollEnrich` to consume only |
| that file. For that you *must* set the filename in the endpoint URI. |
| |
| From **Camel 2.16** onwards both enrich and pollEnrich supports dynamic |
| endpoints that uses an xref:expression.adoc[Expression] to compute the |
| uri, which allows to use data from the current |
| xref:exchange.adoc[Exchange]. In other words all what is told above no |
| longer apply and it just works. |
| |
| [[ContentEnricher-Example]] |
| ==== Example |
| |
| In this example we enrich the message by loading the content from the |
| file named inbox/data.txt. |
| |
| [source,java] |
| ---- |
| from("direct:start") |
| .pollEnrich("file:inbox?fileName=data.txt") |
| .to("direct:result"); |
| ---- |
| |
| And in XML DSL you do: |
| |
| [source,xml] |
| ---- |
| <route> |
| <from uri="direct:start"/> |
| <pollEnrich> |
| <constant>file:inbox?fileName=data.txt</constant> |
| </pollEnrich> |
| <to uri="direct:result"/> |
| </route> |
| ---- |
| |
| If there is no file then the message is empty. We can use a timeout to |
| either wait (potentially forever) until a file exists, or use a timeout |
| to wait a certain period. |
| |
| For example to wait up to 5 seconds you can do: |
| |
| [source,xml] |
| ---- |
| <route> |
| <from uri="direct:start"/> |
| <pollEnrich timeout="5000"> |
| <constant>file:inbox?fileName=data.txt</constant> |
| </pollEnrich> |
| <to uri="direct:result"/> |
| </route> |
| ---- |
| |
| [[ContentEnricher-Usingdynamicuris.1]] |
| ==== Using dynamic uris |
| |
| *Since Camel 2.16* |
| |
| From Camel 2.16 onwards enrich and pollEnrich supports using dynamic |
| uris computed based on information from the |
| current xref:exchange.adoc[Exchange]. For example to pollEnrich from an |
| endpoint that uses a header to indicate a xref:components::seda-component.adoc[SEDA] queue |
| name: |
| |
| [source,java] |
| ---- |
| from("direct:start") |
| .pollEnrich() |
| .simple("seda:$\{header.name}") |
| .to("direct:result"); |
| ---- |
| |
| And in XML DSL |
| |
| [source,xml] |
| ---- |
| <route> |
| <from uri="direct:start"/> |
| <pollEnrich> |
| <simple>seda:$\{header.name}</simple> |
| </pollEnrich> |
| <to uri="direct:result"/> |
| </route> |
| ---- |
| |