| = JMS Component |
| :doctitle: JMS |
| :shortname: jms |
| :artifactid: camel-jms |
| :description: Sent and receive messages to/from a JMS Queue or Topic. |
| :since: 1.0 |
| :supportlevel: Stable |
| :tabs-sync-option: |
| :component-header: Both producer and consumer are supported |
| //Manually maintained attributes |
| :camel-spring-boot-name: jms |
| |
| *Since Camel {since}* |
| |
| *{component-header}* |
| |
| This component allows messages to be sent to (or consumed from) a |
| http://java.sun.com/products/jms/[JMS] Queue or Topic. It uses Spring's |
| JMS support for declarative transactions, including Spring's |
| `JmsTemplate` for sending and a `MessageListenerContainer` for |
| consuming. |
| |
| Maven users will need to add the following dependency to their `pom.xml` |
| for this component: |
| |
| [source,xml] |
| ------------------------------------------------------------ |
| <dependency> |
| <groupId>org.apache.camel</groupId> |
| <artifactId>camel-jms</artifactId> |
| <version>x.x.x</version> |
| <!-- use the same version as your Camel core version --> |
| </dependency> |
| ------------------------------------------------------------ |
| |
| [TIP] |
| ==== |
| *Using ActiveMQ* |
| |
| If you are using http://activemq.apache.org/[Apache ActiveMQ], you |
| should prefer the ActiveMQ component as it has been |
| optimized for ActiveMQ. All the options and |
| samples on this page are also valid for the ActiveMQ |
| component. |
| ==== |
| |
| [NOTE] |
| ==== |
| *Transacted and caching* |
| |
| See section _Transactions and Cache Levels_ below if you are using |
| transactions with xref:jms-component.adoc[JMS] as it can impact performance. |
| ==== |
| |
| [NOTE] |
| ==== |
| *Request/Reply over JMS* |
| |
| Make sure to read the section _Request-reply over JMS_ further below on |
| this page for important notes about request/reply, as Camel offers a |
| number of options to configure for performance, and clustered |
| environments. |
| ==== |
| |
| == URI format |
| |
| -------------------------------------------- |
| jms:[queue:|topic:]destinationName[?options] |
| -------------------------------------------- |
| |
| Where `destinationName` is a JMS queue or topic name. By default, the |
| `destinationName` is interpreted as a queue name. For example, to |
| connect to the queue, `FOO.BAR` use: |
| |
| ----------- |
| jms:FOO.BAR |
| ----------- |
| |
| You can include the optional `queue:` prefix, if you prefer: |
| |
| ----------------- |
| jms:queue:FOO.BAR |
| ----------------- |
| |
| To connect to a topic, you _must_ include the `topic:` prefix. For |
| example, to + |
| connect to the topic, `Stocks.Prices`, use: |
| |
| ----------------------- |
| jms:topic:Stocks.Prices |
| ----------------------- |
| |
| You append query options to the URI by using the following format, |
| `?option=value&option=value&...` |
| |
| == Notes |
| |
| === Using ActiveMQ |
| |
| The JMS component reuses Spring 2's `JmsTemplate` for sending messages. |
| This is not ideal for use in a non-J2EE container and typically requires |
| some caching in the JMS provider to avoid |
| http://activemq.apache.org/jmstemplate-gotchas.html[poor performance]. |
| |
| If you intend to use http://activemq.apache.org/[Apache ActiveMQ] as |
| your message broker, the recommendation is that you do one of the |
| following: |
| |
| * Use the ActiveMQ component, which is already |
| optimized to use ActiveMQ efficiently |
| * Use the `PoolingConnectionFactory` in ActiveMQ. |
| |
| === Transactions and Cache Levels |
| |
| If you are consuming messages and using transactions |
| (`transacted=true`) then the default settings for cache level can impact |
| performance. |
| |
| If you are using XA transactions, then you cannot cache as it can cause |
| the XA transaction to not work properly. |
| |
| If you are *not* using XA, then you should consider caching as it speeds |
| up performance, such as setting `cacheLevelName=CACHE_CONSUMER`. |
| |
| The default setting for `cacheLevelName` is |
| `CACHE_AUTO`. This default auto-detects the mode and sets the cache |
| level accordingly to: |
| |
| * `CACHE_CONSUMER` if `transacted=false` |
| * `CACHE_NONE` if `transacted=true` |
| |
| So you can say the default setting is conservative. Consider using |
| `cacheLevelName=CACHE_CONSUMER` if you are using non-XA transactions. |
| |
| === Durable Subscriptions |
| |
| ==== Durable Subscriptions with JMS 2.0 |
| |
| If you wish to use durable topic subscriptions, you need to specify the `durableSubscriptionName`. |
| |
| ==== Durable Subscriptions with JMS 1.1 |
| |
| If you wish to use durable topic subscriptions, you need to specify both |
| `clientId` and `durableSubscriptionName`. The value of the `clientId` |
| must be unique and can only be used by a single JMS connection instance |
| in your entire network. |
| |
| [NOTE] |
| ==== |
| If you are using the https://activemq.apache.org/components/classic/[Apache ActiveMQ Classic] or https://activemq.apache.org/components/artemis/[Apache ActiveMQ Artemis], |
| you may prefer to use a feature called Virtual Topic. This should remove the necessity of having a unique `clientId`. |
| |
| You can consult the specific documentation for https://activemq.apache.org/components/artemis/migration-documentation/VirtualTopics.html[Artemis] |
| or for https://activemq.apache.org/virtual-destinations.html[ActiveMQ Classic] for details about how to leverage this feature. |
| |
| You can find more details about durable messaging for ActiveMQ Classic http://activemq.apache.org/how-do-durable-queues-and-topics-work.html[here]. |
| ==== |
| |
| === Message Header Mapping |
| |
| When using message headers, the JMS specification states that header |
| names must be valid Java identifiers. So try to name your headers to be |
| valid Java identifiers. One benefit of doing this is that you can then |
| use your headers inside a JMS Selector (whose SQL92 syntax mandates Java |
| identifier syntax for headers). |
| |
| A simple strategy for mapping header names is used by default. The |
| strategy is to replace any dots and hyphens in the header name as shown |
| below and to reverse the replacement when the header name is restored |
| from a JMS message sent over the wire. What does this mean? No more |
| losing method names to invoke on a bean component, no more losing the |
| filename header for the File Component, and so on. |
| |
| The current header name strategy for accepting header names in Camel is |
| as follows: |
| |
| * Dots are replaced by `\_DOT_` and the replacement is reversed when |
| Camel consume the message |
| * Hyphen is replaced by `\_HYPHEN_` and the replacement is reversed when |
| Camel consumes the message |
| |
| You can configure many different properties on the JMS endpoint, which |
| map to properties on the `JMSConfiguration` object. |
| |
| [WARNING] |
| ==== |
| *Mapping to Spring JMS* |
| |
| Many of these properties map to properties on Spring JMS, which Camel |
| uses for sending and receiving messages. So you can get more information |
| about these properties by consulting the relevant Spring documentation. |
| ==== |
| |
| |
| // component-configure options: START |
| |
| // component-configure options: END |
| |
| // component options: START |
| include::partial$component-configure-options.adoc[] |
| include::partial$component-endpoint-options.adoc[] |
| // component options: END |
| |
| // endpoint options: START |
| |
| // endpoint options: END |
| |
| // component headers: START |
| include::partial$component-endpoint-headers.adoc[] |
| // component headers: END |
| |
| == Samples |
| |
| JMS is used in many examples for other components as well. But we |
| provide a few samples below to get started. |
| |
| === Receiving from JMS |
| |
| In the following sample, we configure a route that receives JMS messages |
| and routes the message to a POJO: |
| |
| [source,java] |
| -------------------------------- |
| from("jms:queue:foo"). |
| to("bean:myBusinessLogic"); |
| -------------------------------- |
| |
| You can use any of the EIP patterns so the route can be |
| context based. For example, here's how to filter an order topic for the |
| big spenders: |
| |
| [source,java] |
| ---------------------------------------------- |
| from("jms:topic:OrdersTopic"). |
| filter().method("myBean", "isGoldCustomer"). |
| to("jms:queue:BigSpendersQueue"); |
| ---------------------------------------------- |
| |
| === Sending to JMS |
| |
| In the sample below, we poll a file folder and send the file content to a |
| JMS topic. As we want the content of the file as a `TextMessage` instead |
| of a `BytesMessage`, we need to convert the body to a `String`: |
| |
| [source,java] |
| ------------------------------ |
| from("file://orders"). |
| convertBodyTo(String.class). |
| to("jms:topic:OrdersTopic"); |
| ------------------------------ |
| |
| === Using Annotations |
| |
| Camel also has annotations, so you can use xref:manual::pojo-consuming.adoc[POJO Consuming] and xref:manual::pojo-producing.adoc[POJO Producing]. |
| |
| === Spring DSL sample |
| |
| The preceding examples use the Java DSL. Camel also supports Spring XML |
| DSL. Here is the big spender sample using Spring DSL: |
| |
| [source,xml] |
| --------------------------------------------------- |
| <route> |
| <from uri="jms:topic:OrdersTopic"/> |
| <filter> |
| <method ref="myBean" method="isGoldCustomer"/> |
| <to uri="jms:queue:BigSpendersQueue"/> |
| </filter> |
| </route> |
| --------------------------------------------------- |
| |
| === Other samples |
| |
| JMS appears in many of the examples for other components and EIP |
| patterns, as well in this Camel documentation. So feel free to browse |
| the documentation. |
| |
| === Using JMS as a Dead Letter Queue storing Exchange |
| |
| Normally, when using xref:jms-component.adoc[JMS] as the transport, it only |
| transfers the body and headers as the payload. If you want to use |
| xref:jms-component.adoc[JMS] with a xref:eips:dead-letter-channel.adoc[Dead Letter |
| Channel], using a JMS queue as the Dead Letter Queue, then normally the |
| caused Exception is not stored in the JMS message. You can, however, use |
| the `transferExchange` option on the JMS dead letter queue to instruct |
| Camel to store the entire Exchange in the queue as a |
| `javax.jms.ObjectMessage` that holds a |
| `org.apache.camel.support.DefaultExchangeHolder`. This allows you to |
| consume from the Dead Letter Queue and retrieve the caused exception |
| from the Exchange property with the key `Exchange.EXCEPTION_CAUGHT`. The |
| demo below illustrates this: |
| |
| [source,java] |
| ------------------------------------------------------------------------ |
| // setup error handler to use JMS as queue and store the entire Exchange |
| errorHandler(deadLetterChannel("jms:queue:dead?transferExchange=true")); |
| ------------------------------------------------------------------------ |
| |
| Then you can consume from the JMS queue and analyze the problem: |
| |
| [source,java] |
| ----------------------------------------------------------------------------------- |
| from("jms:queue:dead").to("bean:myErrorAnalyzer"); |
| |
| // and in our bean |
| String body = exchange.getIn().getBody(); |
| Exception cause = exchange.getProperty(Exchange.EXCEPTION_CAUGHT, Exception.class); |
| // the cause message is |
| String problem = cause.getMessage(); |
| ----------------------------------------------------------------------------------- |
| |
| === Using JMS as a Dead Letter Channel storing error only |
| |
| You can use JMS to store the cause error message or to store a custom |
| body, which you can initialize yourself. The following example uses the |
| Message Translator EIP to do a |
| transformation on the failed exchange before it is moved to the |
| xref:jms-component.adoc[JMS] dead letter queue: |
| |
| [source,java] |
| -------------------------------------------------------------------------------------------------- |
| // we sent it to a seda dead queue first |
| errorHandler(deadLetterChannel("seda:dead")); |
| |
| // and on the seda dead queue we can do the custom transformation before its sent to the JMS queue |
| from("seda:dead").transform(exceptionMessage()).to("jms:queue:dead"); |
| -------------------------------------------------------------------------------------------------- |
| |
| Here we only store the original cause error message in the transform. |
| You can, however, use any Expression to send |
| whatever you like. For example, you can invoke a method on a Bean or use |
| a custom processor. |
| |
| |
| == Message Mapping between JMS and Camel |
| |
| Camel automatically maps messages between `javax.jms.Message` and |
| `org.apache.camel.Message`. |
| |
| When sending a JMS message, Camel converts the message body to the |
| following JMS message types: |
| |
| [width="100%",cols="10%,10%,80%",options="header",] |
| |======================================================================= |
| |Body Type |JMS Message |Comment |
| |`String` |`javax.jms.TextMessage` | |
| |
| |`org.w3c.dom.Node` |`javax.jms.TextMessage` |The DOM will be converted |
| to `String`. |
| |
| |`Map` |`javax.jms.MapMessage` | |
| |
| |`java.io.Serializable` |`javax.jms.ObjectMessage` | |
| |
| |`byte[]` |`javax.jms.BytesMessage` | |
| |
| |`java.io.File` |`javax.jms.BytesMessage` | |
| |
| |`java.io.Reader` |`javax.jms.BytesMessage` | |
| |
| |`java.io.InputStream` |`javax.jms.BytesMessage` | |
| |
| |`java.nio.ByteBuffer` |`javax.jms.BytesMessage` | |
| |======================================================================= |
| |
| When receiving a JMS message, Camel converts the JMS message to the |
| following body type: |
| |
| [width="100%",cols="50%,50%",options="header",] |
| |============================================= |
| |JMS Message |Body Type |
| |`javax.jms.TextMessage` |`String` |
| |`javax.jms.BytesMessage` |`byte[]` |
| |`javax.jms.MapMessage` |`Map<String, Object>` |
| |`javax.jms.ObjectMessage` |`Object` |
| |============================================= |
| |
| === Disabling auto-mapping of JMS messages |
| |
| You can use the `mapJmsMessage` option to disable the auto-mapping |
| above. If disabled, Camel will not try to map the received JMS message, |
| but instead uses it directly as the payload. This allows you to avoid |
| the overhead of mapping and let Camel just pass through the JMS message. |
| For instance, it even allows you to route `javax.jms.ObjectMessage` JMS |
| messages with classes you do *not* have on the classpath. |
| |
| === Using a custom MessageConverter |
| |
| You can use the `messageConverter` option to do the mapping yourself in |
| a Spring `org.springframework.jms.support.converter.MessageConverter` |
| class. |
| |
| For example, in the route below, we use a custom message converter when |
| sending a message to the JMS order queue: |
| |
| [source,java] |
| ---------------------------------------------------------------------------------------- |
| from("file://inbox/order").to("jms:queue:order?messageConverter=#myMessageConverter"); |
| ---------------------------------------------------------------------------------------- |
| |
| You can also use a custom message converter when consuming from a JMS |
| destination. |
| |
| === Controlling the mapping strategy selected |
| |
| You can use the `jmsMessageType` option on the endpoint URL to force a |
| specific message type for all messages. |
| |
| In the route below, we poll files from a folder and send them as |
| `javax.jms.TextMessage` as we have forced the JMS producer endpoint to |
| use text messages: |
| |
| [source,java] |
| ----------------------------------------------------------------------- |
| from("file://inbox/order").to("jms:queue:order?jmsMessageType=Text"); |
| ----------------------------------------------------------------------- |
| |
| You can also specify the message type to use for each message by setting |
| the header with the key `CamelJmsMessageType`. For example: |
| |
| [source,java] |
| --------------------------------------------------------------------------------------------------------- |
| from("file://inbox/order").setHeader("CamelJmsMessageType", JmsMessageType.Text).to("jms:queue:order"); |
| --------------------------------------------------------------------------------------------------------- |
| |
| The possible values are defined in the `enum` class, |
| `org.apache.camel.jms.JmsMessageType`. |
| |
| == Message format when sending |
| |
| The exchange sent over the JMS wire must conform to the |
| http://java.sun.com/j2ee/1.4/docs/api/javax/jms/Message.html[JMS Message |
| spec]. |
| |
| For the `exchange.in.header` the following rules apply for the header |
| **keys**: |
| |
| * Keys starting with `JMS` or `JMSX` are reserved. |
| * `exchange.in.headers` keys must be literals and all be valid Java |
| identifiers (do not use dots in the key name). |
| * Camel replaces dots & hyphens and the reverse when consuming JMS |
| messages: + |
| `.` is replaced by `_DOT_` and the reverse replacement when Camel |
| consumes the message. + |
| `-` is replaced by `_HYPHEN_` and the reverse replacement when Camel |
| consumes the message. |
| * See also the option `jmsKeyFormatStrategy`, which allows use of your |
| own custom strategy for formatting keys. |
| |
| For the `exchange.in.header`, the following rules apply for the header |
| **values**: |
| |
| * The values must be primitives or their counter-objects (such as |
| `Integer`, `Long`, `Character`). The types, `String`, `CharSequence`, |
| `Date`, `BigDecimal` and `BigInteger` are all converted to their |
| `toString()` representation. All other types are dropped. |
| |
| Camel will log with category `org.apache.camel.component.jms.JmsBinding` |
| at *DEBUG* level if it drops a given header value. For example: |
| |
| ---------------------------------------------------------------------------------------------------------------------------------------------------------------- |
| 2008-07-09 06:43:04,046 [main ] DEBUG JmsBinding |
| - Ignoring non primitive header: order of class: org.apache.camel.component.jms.issues.DummyOrder with value: DummyOrder{orderId=333, itemId=4444, quantity=2} |
| ---------------------------------------------------------------------------------------------------------------------------------------------------------------- |
| |
| == Message format when receiving |
| |
| Camel adds the following properties to the `Exchange` when it receives a |
| message: |
| |
| [width="100%",cols="10%,10%,80%",options="header",] |
| |======================================================================= |
| |Property |Type |Description |
| |`org.apache.camel.jms.replyDestination` |`javax.jms.Destination` |The |
| reply destination. |
| |======================================================================= |
| |
| Camel adds the following JMS properties to the In message headers when |
| it receives a JMS message: |
| |
| [width="100%",cols="10%,10%,80%",options="header",] |
| |======================================================================= |
| |Header |Type |Description |
| |`JMSCorrelationID` |`String` |The JMS correlation ID. |
| |
| |`JMSDeliveryMode` |`int` |The JMS delivery mode. |
| |
| |`JMSDestination` |`javax.jms.Destination` |The JMS destination. |
| |
| |`JMSExpiration` |`long` |The JMS expiration. |
| |
| |`JMSMessageID` |`String` |The JMS unique message ID. |
| |
| |`JMSPriority` |`int` |The JMS priority (with 0 as the lowest priority |
| and 9 as the highest). |
| |
| |`JMSRedelivered` |`boolean` |Whether the JMS message is redelivered. |
| |
| |`JMSReplyTo` |`javax.jms.Destination` |The JMS reply-to destination. |
| |
| |`JMSTimestamp` |`long` |The JMS timestamp. |
| |
| |`JMSType` |`String` |The JMS type. |
| |
| |`JMSXGroupID` |`String` |The JMS group ID. |
| |======================================================================= |
| |
| As all the above information is standard JMS, you can check the |
| http://java.sun.com/javaee/5/docs/api/javax/jms/Message.html[JMS |
| documentation] for further details. |
| |
| == About using Camel to send and receive messages and JMSReplyTo |
| |
| The JMS component is complex, and you have to pay close attention to how |
| it works in some cases. So this is a short summary of some |
| areas/pitfalls to look for. |
| |
| When Camel sends a message using its `JMSProducer`, it checks the |
| following conditions: |
| |
| * The message exchange pattern. |
| * Whether a `JMSReplyTo` was set in the endpoint or in the message |
| headers. |
| * Whether any of the following options have been set on the JMS |
| endpoint: `disableReplyTo`, `preserveMessageQos`, `explicitQosEnabled`. |
| |
| All this can be a tad complex to understand and configure to support |
| your use case. |
| |
| === JmsProducer |
| |
| The `JmsProducer` behaves as follows, depending on configuration: |
| |
| [width="100%",cols="10%,10%,80%",options="header",] |
| |======================================================================= |
| |Exchange Pattern |Other options |Description |
| |_InOut_ |- |Camel will expect a reply, set a temporary `JMSReplyTo`, |
| and after sending the message, it will start to listen for the reply |
| message on the temporary queue. |
| |
| |_InOut_ |`JMSReplyTo` is set |Camel will expect a reply and, after |
| sending the message, it will start to listen for the reply message on |
| the specified `JMSReplyTo` queue. |
| |
| |_InOnly_ |- |Camel will send the message and *not* expect a reply. |
| |
| |_InOnly_ |`JMSReplyTo` is set |By default, Camel discards the |
| `JMSReplyTo` destination and clears the `JMSReplyTo` header before |
| sending the message. Camel then sends the message and does *not* expect |
| a reply. Camel logs this in the log at `WARN` level (changed to `DEBUG` |
| level from *Camel 2.6* onwards. You can use `preserveMessageQuo=true` to |
| instruct Camel to keep the `JMSReplyTo`. In all situations the |
| `JmsProducer` does *not* expect any reply and thus continue after |
| sending the message. |
| |======================================================================= |
| |
| === JmsConsumer |
| |
| The `JmsConsumer` behaves as follows, depending on configuration: |
| |
| [width="100%",cols="10%,10%,80%",options="header",] |
| |======================================================================= |
| |Exchange Pattern |Other options |Description |
| |_InOut_ |- |Camel will send the reply back to the `JMSReplyTo` queue. |
| |
| |_InOnly_ |- |Camel will not send a reply back, as the pattern is |
| __InOnly__. |
| |
| |- |`disableReplyTo=true` |This option suppress replies. |
| |======================================================================= |
| |
| So pay attention to the message exchange pattern set on your exchanges. |
| |
| If you send a message to a JMS destination in the middle of your route, |
| you can specify the exchange pattern to use, see more at |
| Request Reply. + |
| This is useful if you want to send an `InOnly` message to a JMS topic: |
| |
| [source,java] |
| ------------------------------------------------------ |
| from("activemq:queue:in") |
| .to("bean:validateOrder") |
| .to(ExchangePattern.InOnly, "activemq:topic:order") |
| .to("bean:handleOrder"); |
| ------------------------------------------------------ |
| |
| == Reuse endpoint and send to different destinations computed at runtime |
| |
| If you need to send messages to a lot of different JMS destinations, it |
| makes sense to reuse a JMS endpoint and specify the real destination in |
| a message header. This allows Camel to reuse the same endpoint, but send |
| to different destinations. This greatly reduces the number of endpoints |
| created and economizes on memory and thread resources. |
| |
| You can specify the destination in the following headers: |
| |
| [width="100%",cols="10%,10%,80%",options="header",] |
| |===================================================================== |
| |Header |Type |Description |
| |`CamelJmsDestination` |`javax.jms.Destination` |A destination object. |
| |`CamelJmsDestinationName` |`String` |The destination name. |
| |===================================================================== |
| |
| For example, the following route shows how you can compute a destination |
| at run time and use it to override the destination appearing in the JMS |
| URL: |
| |
| [source,java] |
| -------------------------------- |
| from("file://inbox") |
| .to("bean:computeDestination") |
| .to("activemq:queue:dummy"); |
| -------------------------------- |
| |
| The queue name, `dummy`, is just a placeholder. It must be provided as |
| part of the JMS endpoint URL, but it will be ignored in this example. |
| |
| In the `computeDestination` bean, specify the real destination by |
| setting the `CamelJmsDestinationName` header as follows: |
| |
| [source,java] |
| ------------------------------------------------------------------------- |
| public void setJmsHeader(Exchange exchange) { |
| String id = .... |
| exchange.getIn().setHeader("CamelJmsDestinationName", "order:" + id"); |
| } |
| ------------------------------------------------------------------------- |
| |
| Then Camel will read this header and use it as the destination instead |
| of the one configured on the endpoint. So, in this example Camel sends |
| the message to `activemq:queue:order:2`, assuming the `id` value was 2. |
| |
| If both the `CamelJmsDestination` and the `CamelJmsDestinationName` |
| headers are set, `CamelJmsDestination` takes priority. Keep in mind that |
| the JMS producer removes both `CamelJmsDestination` and |
| `CamelJmsDestinationName` headers from the exchange and do not propagate |
| them to the created JMS message to avoid the accidental loops |
| in the routes (in scenarios when the message will be forwarded to |
| another JMS endpoint). |
| |
| == Configuring different JMS providers |
| |
| You can configure your JMS provider in Spring XML as |
| follows: |
| |
| You can configure as many JMS component instances as you wish |
| and give them *a unique name using the* `id` **attribute**. The |
| preceding example configures an `activemq` component. You could do the |
| same to configure MQSeries, TibCo, BEA, Sonic and so on. |
| |
| Once you have a named JMS component, you can then refer to endpoints |
| within that component using URIs. For example, for the component name, |
| `activemq`, you can then refer to destinations using the URI format, |
| `activemq:[queue:|topic:]destinationName`. You can use the same approach |
| for all other JMS providers. |
| |
| This works by the SpringCamelContext lazily fetching components from the |
| spring context for the scheme name you use for |
| Endpoint URIs and having the |
| Component resolve the endpoint URIs. |
| |
| === Using JNDI to find the ConnectionFactory |
| |
| If you are using a J2EE container, you might need to look up JNDI to |
| find the JMS `ConnectionFactory` rather than use the usual `<bean>` |
| mechanism in Spring. You can do this using Spring's factory bean or the |
| new Spring XML namespace. For example: |
| |
| [source,xml] |
| ----------------------------------------------------------------------------- |
| <bean id="weblogic" class="org.apache.camel.component.jms.JmsComponent"> |
| <property name="connectionFactory" ref="myConnectionFactory"/> |
| </bean> |
| |
| <jee:jndi-lookup id="myConnectionFactory" jndi-name="jms/connectionFactory"/> |
| ----------------------------------------------------------------------------- |
| |
| See |
| http://static.springsource.org/spring/docs/3.0.x/spring-framework-reference/html/xsd-config.html#xsd-config-body-schemas-jee[The |
| jee schema] in the Spring reference documentation for more details about |
| JNDI lookup. |
| |
| == Concurrent Consuming |
| |
| A common requirement with JMS is to consume messages concurrently in |
| multiple threads to make an application more responsive. You |
| can set the `concurrentConsumers` option to specify the number of |
| threads servicing the JMS endpoint, as follows: |
| |
| [source,java] |
| --------------------------------------------- |
| from("jms:SomeQueue?concurrentConsumers=20"). |
| bean(MyClass.class); |
| --------------------------------------------- |
| |
| You can configure this option in one of the following ways: |
| |
| * On the `JmsComponent`, |
| * On the endpoint URI or, |
| * By invoking `setConcurrentConsumers()` directly on the `JmsEndpoint`. |
| |
| === Concurrent Consuming with async consumer |
| |
| Notice that each concurrent consumer will only pick up the next available |
| message from the JMS broker, when the current message has been fully |
| processed. You can set the option `asyncConsumer=true` to let the |
| consumer pick up the next message from the JMS queue, while the previous |
| message is being processed asynchronously (by the |
| Asynchronous Routing Engine). See |
| more details in the table on top of the page about the `asyncConsumer` |
| option. |
| |
| [source,java] |
| ---------------------------------------------------------------- |
| from("jms:SomeQueue?concurrentConsumers=20&asyncConsumer=true"). |
| bean(MyClass.class); |
| ---------------------------------------------------------------- |
| |
| == Request-reply over JMS |
| |
| Camel supports Request Reply over JMS. In |
| essence the MEP of the Exchange should be `InOut` when you send a |
| message to a JMS queue. |
| |
| Camel offers a number of options to configure request/reply over JMS |
| that influence performance and clustered environments. The table below |
| summaries the options. |
| |
| [width="100%",cols="10%,10%,10%,70%",options="header",] |
| |======================================================================= |
| |Option |Performance |Cluster |Description |
| |`Temporary` |Fast |Yes |A temporary queue is used as reply queue, and |
| automatic created by Camel. To use this, do *not* specify a `replyTo` queue |
| name. And you can optionally configure `replyToType=Temporary` to make |
| it stand out that temporary queues are in use. |
| |
| |`Shared` |Slow |Yes |A shared persistent queue is used as reply queue. |
| The queue must be created beforehand, although some brokers can create |
| them on the fly, such as Apache ActiveMQ. To use this, you must specify |
| the replyTo queue name. And you can optionally configure |
| `replyToType=Shared` to make it stand out that shared queues are in use. |
| A shared queue can be used in a clustered environment with multiple |
| nodes running this Camel application at the same time. All of them using the |
| same shared reply queue. This is possible because JMS Message selectors |
| are used to correlate expected reply messages; this impacts performance |
| though. JMS Message selectors are slower, and therefore not as fast as |
| `Temporary` or `Exclusive` queues. See further below how to tweak this |
| for better performance. |
| |
| |`Exclusive` |Fast |No (*Yes) |An exclusive persistent queue is used as |
| reply queue. The queue must be created beforehand, although some brokers |
| can create them on the fly, such as Apache ActiveMQ. To use this, you must |
| specify the replyTo queue name. And you *must* configure |
| `replyToType=Exclusive` to instruct Camel to use exclusive queues, as |
| `Shared` is used by default, if a `replyTo` queue name was configured. |
| When using exclusive reply queues, then JMS Message selectors are *not* |
| in use, and therefore other applications must not use this queue as |
| well. An exclusive queue *cannot* be used in a clustered environment |
| with multiple nodes running this Camel application at the same time; as |
| we do not have control if the reply queue comes back to the same node |
| that sent the request message; that is why shared queues use JMS Message |
| selectors to make sure of this. *Though* if you configure each Exclusive |
| reply queue with a unique name per node, then you can run this in a |
| clustered environment. As then the reply message will be sent back to |
| that queue for the given node that awaits the reply message. |
| |
| |`concurrentConsumers` |Fast |Yes |Allows processing |
| reply messages concurrently using concurrent message listeners in use. |
| You can specify a range using the `concurrentConsumers` and |
| `maxConcurrentConsumers` options. *Notice:* That using `Shared` reply |
| queues may not work as well with concurrent listeners, so use this |
| option with care. |
| |
| |`maxConcurrentConsumers` |Fast |Yes |Allows processing |
| reply messages concurrently using concurrent message listeners in use. |
| You can specify a range using the `concurrentConsumers` and |
| `maxConcurrentConsumers` options. *Notice:* That using `Shared` reply |
| queues may not work as well with concurrent listeners, so use this |
| option with care. |
| |======================================================================= |
| |
| The `JmsProducer` detects the `InOut` and provides a `JMSReplyTo` header |
| with the reply destination to be used. By default, Camel uses a temporary |
| queue, but you can use the `replyTo` option on the endpoint to specify a |
| fixed reply queue (see more below about fixed reply queue). |
| |
| Camel will automatically set up a consumer that listens to on the reply queue, |
| so you should *not* do anything. + |
| This consumer is a Spring `DefaultMessageListenerContainer` which |
| listen for replies. However, it's fixed to one concurrent consumer. + |
| That means replies will be processed in sequence as there is only one |
| thread to process the replies. You can configure the listener to use |
| concurrent threads using the `concurrentConsumers` and |
| `maxConcurrentConsumers` options. This allows you to easier configure |
| this in Camel as shown below: |
| |
| [source,java] |
| ------------------------------------------------------- |
| from(xxx) |
| .inOut().to("activemq:queue:foo?concurrentConsumers=5") |
| .to(yyy) |
| .to(zzz); |
| ------------------------------------------------------- |
| |
| In this route, we instruct Camel to route replies |
| asynchronously using a thread pool with five threads. |
| |
| === Request-reply over JMS and using a shared fixed reply queue |
| |
| If you use a fixed reply queue when doing |
| Request Reply over JMS as shown in the example |
| below, then pay attention. |
| |
| [source,java] |
| --------------------------------------------- |
| from(xxx) |
| .inOut().to("activemq:queue:foo?replyTo=bar") |
| .to(yyy) |
| --------------------------------------------- |
| |
| In this example, the fixed reply queue named "bar" is used. By default, |
| Camel assumes the queue is shared when using fixed reply queues, and |
| therefore it uses a `JMSSelector` to only pick up the expected reply |
| messages (e.g., based on the `JMSCorrelationID`). See the next section for |
| exclusive fixed reply queues. That means it's not as fast as temporary |
| queues. You can speed up how often Camel will pull for reply messages |
| using the `receiveTimeout` option. By default, its 1000 milliseconds. So to |
| make it faster, you can set it to 250 millis to pull 4 times per second |
| as shown: |
| |
| [source,java] |
| ---------------------------------------------------------------- |
| from(xxx) |
| .inOut().to("activemq:queue:foo?replyTo=bar&receiveTimeout=250") |
| .to(yyy) |
| ---------------------------------------------------------------- |
| |
| Notice this will cause the Camel to send pull requests to the message |
| broker more frequently, and thus require more network traffic. + |
| It is generally recommended to use temporary queues if possible. |
| |
| === Request-reply over JMS and using an exclusive fixed reply queue |
| |
| In the previous example, Camel would anticipate the fixed reply queue |
| named "bar" was shared, and thus it uses a `JMSSelector` to only consume |
| reply messages which it expects. However, there is a drawback to doing this |
| as the JMS selector is slower. Also, the consumer on the reply queue is |
| slower to update with new JMS selector ids. In fact, it only updates when |
| the `receiveTimeout` option times out, which by default is 1 second. So |
| in theory, the reply messages could take up till about 1 sec to be |
| detected. On the other hand, if the fixed reply queue is exclusive to the |
| Camel reply consumer, then we can avoid using the JMS selectors, and |
| thus be more performant. In fact, as fast as using temporary queues. There is |
| the `ReplyToType` option which you can configure to `Exclusive` + |
| to tell Camel that the reply queue is exclusive as shown in the example |
| below: |
| |
| [source,java] |
| ------------------------------------------------------------------- |
| from(xxx) |
| .inOut().to("activemq:queue:foo?replyTo=bar&replyToType=Exclusive") |
| .to(yyy) |
| ------------------------------------------------------------------- |
| |
| Mind that the queue must be exclusive to each and every endpoint. So if |
| you have two routes, then they each need a unique reply queue as shown |
| in the next example: |
| |
| [source,java] |
| ----------------------------------------------------------------------------- |
| from(xxx) |
| .inOut().to("activemq:queue:foo?replyTo=bar&replyToType=Exclusive") |
| .to(yyy) |
| |
| from(aaa) |
| .inOut().to("activemq:queue:order?replyTo=order.reply&replyToType=Exclusive") |
| .to(bbb) |
| ----------------------------------------------------------------------------- |
| |
| The same applies if you run in a clustered environment. Then each node |
| in the cluster must use a unique reply queue name. As otherwise, each |
| node in the cluster may pick up messages intended as a reply on |
| another node. For clustered environments, it's recommended to use shared |
| reply queues instead. |
| |
| == Synchronizing clocks between senders and receivers |
| |
| When doing messaging between systems, it is desirable that the systems |
| have synchronized clocks. For example, when sending a xref:jms-component.adoc[JMS] |
| message, then you can set a time to live value on the message. Then the |
| receiver can inspect this value and determine if the message is already |
| expired, and thus drop the message instead of consume and process it. |
| However, this requires that both sender and receiver have synchronized |
| clocks. |
| |
| [NOTE] |
| ==== |
| If you are using http://activemq.apache.org/[ActiveMQ], then you |
| can use the http://activemq.apache.org/timestampplugin.html[timestamp |
| plugin] to synchronize clocks. |
| ==== |
| |
| == About time to live |
| |
| Read first above about synchronized clocks. |
| |
| When you do request/reply (InOut) over xref:jms-component.adoc[JMS] with Camel, |
| then Camel uses a timeout on the sender side, which is default 20 |
| seconds from the `requestTimeout` option. You can control this by |
| setting a higher/lower value. However, the time to live value is still |
| set on the xref:jms-component.adoc[JMS] message being sent. So that requires the |
| clocks to be synchronized between the systems. If they are not, then you |
| may want to disable the time to live value being set. This is now |
| possible using the `disableTimeToLive` option from *Camel 2.8* onwards. |
| So if you set this option to `disableTimeToLive=true`, then Camel does |
| *not* set any time to live value when sending xref:jms-component.adoc[JMS] |
| messages. *But* the request timeout is still active. So for example, if |
| you do request/reply over xref:jms-component.adoc[JMS] and have disabled time to |
| live, then Camel will still use a timeout by 20 seconds (the |
| `requestTimeout` option). That option can also be configured. |
| So the two options `requestTimeout` and `disableTimeToLive` gives you |
| Fine-grained control when doing request/reply. |
| |
| You can provide a header in the message |
| to override and use as the request timeout value instead of the endpoint |
| configured value. For example: |
| |
| [source,java] |
| -------------------------------------------------------- |
| from("direct:someWhere") |
| .to("jms:queue:foo?replyTo=bar&requestTimeout=30s") |
| .to("bean:processReply"); |
| -------------------------------------------------------- |
| |
| In the route above we have an endpoint configured `requestTimeout` of 30 |
| seconds. So Camel will wait up till 30 seconds for that reply message to |
| come back on the bar queue. If no reply message is received then a |
| `org.apache.camel.ExchangeTimedOutException` is set on the |
| Exchange, and Camel continues routing the message, |
| which would then fail due the exception, and Camel's error handler |
| reacts. |
| |
| If you want to use a per message timeout value, you can set the header |
| with key |
| `org.apache.camel.component.jms.JmsConstants#JMS_REQUEST_TIMEOUT` which |
| has constant value `"CamelJmsRequestTimeout"` with a timeout value as |
| a long type. |
| |
| For example, we can use a bean to compute the timeout value per |
| individual message, such as calling the `"whatIsTheTimeout"` method on |
| the service bean as shown below: |
| |
| [source,java] |
| ---------------------------------------------------------------------------------------- |
| from("direct:someWhere") |
| .setHeader("CamelJmsRequestTimeout", method(ServiceBean.class, "whatIsTheTimeout")) |
| .to("jms:queue:foo?replyTo=bar&requestTimeout=30s") |
| .to("bean:processReply"); |
| ---------------------------------------------------------------------------------------- |
| |
| When you do fire and forget (InOut) over xref:jms-component.adoc[JMS] with Camel, |
| then Camel by default does *not* set any time to live value on the |
| message. You can configure a value by using the `timeToLive` option. For |
| example, to indicate a 5 sec., you set `timeToLive=5000`. The option |
| `disableTimeToLive` can be used to force disabling the time to live, |
| also for InOnly messaging. The `requestTimeout` option is not being used |
| for InOnly messaging. |
| |
| == Enabling Transacted Consumption |
| |
| A common requirement is to consume from a queue in a transaction and |
| then process the message using the Camel route. To do this, just ensure |
| that you set the following properties on the component/endpoint: |
| |
| * `transacted` = true |
| * `transactionManager` = a _Transsaction Manager_ - typically the |
| `JmsTransactionManager` |
| |
| See the Transactional Client EIP pattern |
| for further details. |
| |
| Transactions and [Request Reply] over JMS |
| |
| When using Request Reply over JMS, you cannot |
| use a single transaction; JMS will not send any messages until a commit |
| is performed, so the server side won't receive anything at all until the |
| transaction commits. Therefore, to use xref:eips:requestReply-eip.adoc[Request |
| Reply], you must commit a transaction after sending the request and then |
| use a separate transaction for receiving the response. |
| |
| To address this issue, the JMS component uses different properties to |
| specify transaction use for oneway messaging and request reply |
| messaging: |
| |
| The `transacted` property applies *only* to the InOnly message |
| Exchange Pattern (MEP). |
| |
| You can leverage the |
| http://static.springsource.org/spring/docs/3.0.x/javadoc-api/org/springframework/jms/listener/AbstractPollingMessageListenerContainer.html#setSessionTransacted(boolean)[DMLC |
| transacted session API] using the following properties on |
| component/endpoint: |
| |
| * `transacted` = true |
| * `lazyCreateTransactionManager` = false |
| |
| The benefit of doing so is that the cacheLevel setting will be honored |
| when using local transactions without a configured TransactionManager. |
| When a TransactionManager is configured, no caching happens at DMLC |
| level, and it is necessary to rely on a pooled connection factory. For more |
| details about this kind of setup, see |
| http://tmielke.blogspot.com/2012/03/camel-jms-with-transactions-lessons.html[here] |
| and |
| http://forum.springsource.org/showthread.php?123631-JMS-DMLC-not-caching%20connection-when-using-TX-despite-cacheLevel-CACHE_CONSUMER&p=403530&posted=1#post403530[here]. |
| |
| == Using JMSReplyTo for late replies |
| |
| When using Camel as a JMS listener, it sets an Exchange property with |
| the value of the ReplyTo `javax.jms.Destination` object, having the key |
| `ReplyTo`. You can obtain this `Destination` as follows: |
| |
| [source,java] |
| ----------------------------------------------------------------------------------------------------------------- |
| Destination replyDestination = exchange.getIn().getHeader(JmsConstants.JMS_REPLY_DESTINATION, Destination.class); |
| ----------------------------------------------------------------------------------------------------------------- |
| |
| And then later use it to send a reply using regular JMS or Camel. |
| |
| [source,java] |
| ---------------------------------------------------------------------------------------- |
| // we need to pass in the JMS component, and in this sample we use ActiveMQ |
| JmsEndpoint endpoint = JmsEndpoint.newInstance(replyDestination, activeMQComponent); |
| // now we have the endpoint we can use regular Camel API to send a message to it |
| template.sendBody(endpoint, "Here is the late reply."); |
| ---------------------------------------------------------------------------------------- |
| |
| A different solution to sending a reply is to provide the |
| `replyDestination` object in the same Exchange property when sending. |
| Camel will then pick up this property and use it for the real |
| destination. The endpoint URI must include a dummy destination, however. |
| For example: |
| |
| [source,java] |
| ---------------------------------------------------------------------------------------------------------------------------------------- |
| // we pretend to send it to some non-existing dummy queue |
| template.send("activemq:queue:dummy, new Processor() { |
| public void process(Exchange exchange) throws Exception { |
| // and here we override the destination with the ReplyTo destination object so the message is sent to there instead of dummy |
| exchange.getIn().setHeader(JmsConstants.JMS_DESTINATION, replyDestination); |
| exchange.getIn().setBody("Here is the late reply."); |
| } |
| } |
| ---------------------------------------------------------------------------------------------------------------------------------------- |
| |
| == Using a request timeout |
| |
| In the sample below we send a Request Reply |
| style message Exchange (we use the `requestBody` |
| method = `InOut`) to the slow queue for further processing in Camel, and |
| we wait for a return reply: |
| |
| |
| == Sending an InOnly message and keeping the JMSReplyTo header |
| |
| When sending to a xref:jms-component.adoc[JMS] destination using *camel-jms*, the |
| producer will use the MEP to detect if its _InOnly_ or _InOut_ messaging. |
| However, there can be times when you want to send an _InOnly_ message but |
| keeping the `JMSReplyTo` header. To do so, you have to instruct Camel to |
| keep it, otherwise the `JMSReplyTo` header will be dropped. |
| |
| For example, to send an _InOnly_ message to the foo queue, but with a |
| `JMSReplyTo` with bar queue you can do as follows: |
| |
| [source,java] |
| ------------------------------------------------------------------------------------- |
| template.send("activemq:queue:foo?preserveMessageQos=true", new Processor() { |
| public void process(Exchange exchange) throws Exception { |
| exchange.getIn().setBody("World"); |
| exchange.getIn().setHeader("JMSReplyTo", "bar"); |
| } |
| }); |
| ------------------------------------------------------------------------------------- |
| |
| Notice we use `preserveMessageQos=true` to instruct Camel to keep the |
| `JMSReplyTo` header. |
| |
| == Setting JMS provider options on the destination |
| |
| Some JMS providers, like IBM's WebSphere MQ, need options to be set on |
| the JMS destination. For example, you may need to specify the |
| `targetClient` option. Since `targetClient` is a WebSphere MQ option and not |
| a Camel URI option, you need to set that on the JMS destination name |
| like so: |
| |
| [source,java] |
| ----------------------------------------------------------------------------------- |
| // ... |
| .setHeader("CamelJmsDestinationName", constant("queue:///MY_QUEUE?targetClient=1")) |
| .to("wmq:queue:MY_QUEUE?useMessageIDAsCorrelationID=true"); |
| ----------------------------------------------------------------------------------- |
| |
| Some versions of WMQ won't accept this option on the destination name, |
| and you will get an exception like: |
| |
| ---------------------------------------------------------------------------------------------------------------------------------- |
| com.ibm.msg.client.jms.DetailedJMSException: JMSCC0005: The specified |
| value 'MY_QUEUE?targetClient=1' is not allowed for |
| 'XMSC_DESTINATION_NAME' |
| ---------------------------------------------------------------------------------------------------------------------------------- |
| |
| A workaround is to use a custom DestinationResolver: |
| |
| [source,java] |
| ---------------------------------------------------------------------------------------------------------------------------------- |
| JmsComponent wmq = new JmsComponent(connectionFactory); |
| |
| wmq.setDestinationResolver(new DestinationResolver() { |
| public Destination resolveDestinationName(Session session, String destinationName, boolean pubSubDomain) throws JMSException { |
| MQQueueSession wmqSession = (MQQueueSession) session; |
| return wmqSession.createQueue("queue:///" + destinationName + "?targetClient=1"); |
| } |
| }); |
| ---------------------------------------------------------------------------------------------------------------------------------- |
| |
| |
| |
| include::spring-boot:partial$starter.adoc[] |