| /** |
| * Licensed to the Apache Software Foundation (ASF) under one or more |
| * contributor license agreements. See the NOTICE file distributed with |
| * this work for additional information regarding copyright ownership. |
| * The ASF licenses this file to You under the Apache License, Version 2.0 |
| * (the "License"); you may not use this file except in compliance with |
| * the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| package org.apache.camel.component.jms; |
| |
| import java.util.Map; |
| import java.util.concurrent.ExecutorService; |
| import javax.jms.ConnectionFactory; |
| import javax.jms.ExceptionListener; |
| import javax.jms.Session; |
| |
| import org.apache.camel.CamelContext; |
| import org.apache.camel.Endpoint; |
| import org.apache.camel.LoggingLevel; |
| import org.apache.camel.impl.UriEndpointComponent; |
| import org.apache.camel.spi.HeaderFilterStrategy; |
| import org.apache.camel.spi.HeaderFilterStrategyAware; |
| import org.springframework.beans.BeansException; |
| import org.springframework.context.ApplicationContext; |
| import org.springframework.context.ApplicationContextAware; |
| import org.springframework.core.task.TaskExecutor; |
| import org.springframework.jms.connection.JmsTransactionManager; |
| import org.springframework.jms.connection.UserCredentialsConnectionFactoryAdapter; |
| import org.springframework.jms.core.JmsOperations; |
| import org.springframework.jms.support.converter.MessageConverter; |
| import org.springframework.jms.support.destination.DestinationResolver; |
| import org.springframework.transaction.PlatformTransactionManager; |
| import org.springframework.util.ErrorHandler; |
| |
| import static org.apache.camel.util.ObjectHelper.removeStartingCharacters; |
| |
| /** |
| * A <a href="http://activemq.apache.org/jms.html">JMS Component</a> |
| * |
| * @version |
| */ |
| public class JmsComponent extends UriEndpointComponent implements ApplicationContextAware, HeaderFilterStrategyAware { |
| |
| private static final String KEY_FORMAT_STRATEGY_PARAM = "jmsKeyFormatStrategy"; |
| private JmsConfiguration configuration; |
| private ApplicationContext applicationContext; |
| private QueueBrowseStrategy queueBrowseStrategy; |
| private HeaderFilterStrategy headerFilterStrategy; |
| private ExecutorService asyncStartStopExecutorService; |
| private MessageCreatedStrategy messageCreatedStrategy; |
| |
| public JmsComponent() { |
| super(JmsEndpoint.class); |
| } |
| |
| public JmsComponent(Class<? extends Endpoint> endpointClass) { |
| super(endpointClass); |
| } |
| |
| public JmsComponent(CamelContext context) { |
| super(context, JmsEndpoint.class); |
| } |
| |
| public JmsComponent(CamelContext context, Class<? extends Endpoint> endpointClass) { |
| super(context, endpointClass); |
| } |
| |
| public JmsComponent(JmsConfiguration configuration) { |
| this(); |
| this.configuration = configuration; |
| } |
| |
| /** |
| * Static builder method |
| */ |
| public static JmsComponent jmsComponent() { |
| return new JmsComponent(); |
| } |
| |
| /** |
| * Static builder method |
| */ |
| public static JmsComponent jmsComponent(JmsConfiguration configuration) { |
| return new JmsComponent(configuration); |
| } |
| |
| /** |
| * Static builder method |
| */ |
| public static JmsComponent jmsComponent(ConnectionFactory connectionFactory) { |
| return jmsComponent(new JmsConfiguration(connectionFactory)); |
| } |
| |
| /** |
| * Static builder method |
| */ |
| public static JmsComponent jmsComponentClientAcknowledge(ConnectionFactory connectionFactory) { |
| JmsConfiguration template = new JmsConfiguration(connectionFactory); |
| template.setAcknowledgementMode(Session.CLIENT_ACKNOWLEDGE); |
| return jmsComponent(template); |
| } |
| |
| /** |
| * Static builder method |
| */ |
| public static JmsComponent jmsComponentAutoAcknowledge(ConnectionFactory connectionFactory) { |
| JmsConfiguration template = new JmsConfiguration(connectionFactory); |
| template.setAcknowledgementMode(Session.AUTO_ACKNOWLEDGE); |
| return jmsComponent(template); |
| } |
| |
| public static JmsComponent jmsComponentTransacted(ConnectionFactory connectionFactory) { |
| JmsTransactionManager transactionManager = new JmsTransactionManager(); |
| transactionManager.setConnectionFactory(connectionFactory); |
| return jmsComponentTransacted(connectionFactory, transactionManager); |
| } |
| |
| @SuppressWarnings("deprecation") |
| public static JmsComponent jmsComponentTransacted(ConnectionFactory connectionFactory, |
| PlatformTransactionManager transactionManager) { |
| JmsConfiguration template = new JmsConfiguration(connectionFactory); |
| template.setTransactionManager(transactionManager); |
| template.setTransacted(true); |
| template.setTransactedInOut(true); |
| return jmsComponent(template); |
| } |
| |
| // Properties |
| // ------------------------------------------------------------------------- |
| |
| public JmsConfiguration getConfiguration() { |
| if (configuration == null) { |
| configuration = createConfiguration(); |
| |
| // If we are being configured with spring... |
| if (applicationContext != null) { |
| Map<String, ConnectionFactory> beansOfTypeConnectionFactory = applicationContext.getBeansOfType(ConnectionFactory.class); |
| if (!beansOfTypeConnectionFactory.isEmpty()) { |
| ConnectionFactory cf = beansOfTypeConnectionFactory.values().iterator().next(); |
| configuration.setConnectionFactory(cf); |
| } |
| Map<String, DestinationResolver> beansOfTypeDestinationResolver = applicationContext.getBeansOfType(DestinationResolver.class); |
| if (!beansOfTypeDestinationResolver.isEmpty()) { |
| DestinationResolver destinationResolver = beansOfTypeDestinationResolver.values().iterator().next(); |
| configuration.setDestinationResolver(destinationResolver); |
| } |
| } |
| } |
| return configuration; |
| } |
| |
| /** |
| * To use a shared JMS configuration |
| */ |
| public void setConfiguration(JmsConfiguration configuration) { |
| this.configuration = configuration; |
| } |
| |
| /** |
| * Specifies whether the consumer accept messages while it is stopping. |
| * You may consider enabling this option, if you start and stop JMS routes at runtime, while there are still messages |
| * enqued on the queue. If this option is false, and you stop the JMS route, then messages may be rejected, |
| * and the JMS broker would have to attempt redeliveries, which yet again may be rejected, and eventually the message |
| * may be moved at a dead letter queue on the JMS broker. To avoid this its recommended to enable this option. |
| */ |
| public void setAcceptMessagesWhileStopping(boolean acceptMessagesWhileStopping) { |
| getConfiguration().setAcceptMessagesWhileStopping(acceptMessagesWhileStopping); |
| } |
| |
| /** |
| * The JMS acknowledgement mode defined as an Integer. |
| * Allows you to set vendor-specific extensions to the acknowledgment mode. |
| * For the regular modes, it is preferable to use the acknowledgementModeName instead. |
| */ |
| public void setAcknowledgementMode(int consumerAcknowledgementMode) { |
| getConfiguration().setAcknowledgementMode(consumerAcknowledgementMode); |
| } |
| |
| /** |
| * Enables eager loading of JMS properties as soon as a message is loaded |
| * which generally is inefficient as the JMS properties may not be required |
| * but sometimes can catch early any issues with the underlying JMS provider |
| * and the use of JMS properties |
| */ |
| public void setEagerLoadingOfProperties(boolean eagerLoadingOfProperties) { |
| getConfiguration().setEagerLoadingOfProperties(eagerLoadingOfProperties); |
| } |
| |
| /** |
| * The JMS acknowledgement name, which is one of: SESSION_TRANSACTED, CLIENT_ACKNOWLEDGE, AUTO_ACKNOWLEDGE, DUPS_OK_ACKNOWLEDGE |
| */ |
| public void setAcknowledgementModeName(String consumerAcknowledgementMode) { |
| getConfiguration().setAcknowledgementModeName(consumerAcknowledgementMode); |
| } |
| |
| /** |
| * Specifies whether the consumer container should auto-startup. |
| */ |
| public void setAutoStartup(boolean autoStartup) { |
| getConfiguration().setAutoStartup(autoStartup); |
| } |
| |
| /** |
| * Sets the cache level by ID for the underlying JMS resources. See cacheLevelName option for more details. |
| */ |
| public void setCacheLevel(int cacheLevel) { |
| getConfiguration().setCacheLevel(cacheLevel); |
| } |
| |
| /** |
| * Sets the cache level by name for the underlying JMS resources. |
| * Possible values are: CACHE_AUTO, CACHE_CONNECTION, CACHE_CONSUMER, CACHE_NONE, and CACHE_SESSION. |
| * The default setting is CACHE_AUTO. See the Spring documentation and Transactions Cache Levels for more information. |
| */ |
| public void setCacheLevelName(String cacheName) { |
| getConfiguration().setCacheLevelName(cacheName); |
| } |
| |
| /** |
| * Sets the cache level by name for the reply consumer when doing request/reply over JMS. |
| * This option only applies when using fixed reply queues (not temporary). |
| * Camel will by default use: CACHE_CONSUMER for exclusive or shared w/ replyToSelectorName. |
| * And CACHE_SESSION for shared without replyToSelectorName. Some JMS brokers such as IBM WebSphere |
| * may require to set the replyToCacheLevelName=CACHE_NONE to work. |
| * Note: If using temporary queues then CACHE_NONE is not allowed, |
| * and you must use a higher value such as CACHE_CONSUMER or CACHE_SESSION. |
| */ |
| public void setReplyToCacheLevelName(String cacheName) { |
| getConfiguration().setReplyToCacheLevelName(cacheName); |
| } |
| |
| /** |
| * Sets the JMS client ID to use. Note that this value, if specified, must be unique and can only be used by a single JMS connection instance. |
| * It is typically only required for durable topic subscriptions. |
| * <p/> |
| * If using Apache ActiveMQ you may prefer to use Virtual Topics instead. |
| */ |
| public void setClientId(String consumerClientId) { |
| getConfiguration().setClientId(consumerClientId); |
| } |
| |
| /** |
| * Specifies the default number of concurrent consumers when consuming from JMS (not for request/reply over JMS). |
| * See also the maxMessagesPerTask option to control dynamic scaling up/down of threads. |
| * <p/> |
| * When doing request/reply over JMS then the option replyToConcurrentConsumers is used to control number |
| * of concurrent consumers on the reply message listener. |
| */ |
| public void setConcurrentConsumers(int concurrentConsumers) { |
| getConfiguration().setConcurrentConsumers(concurrentConsumers); |
| } |
| |
| /** |
| * Specifies the default number of concurrent consumers when doing request/reply over JMS. |
| * See also the maxMessagesPerTask option to control dynamic scaling up/down of threads. |
| */ |
| public void setReplyToConcurrentConsumers(int concurrentConsumers) { |
| getConfiguration().setReplyToConcurrentConsumers(concurrentConsumers); |
| } |
| |
| /** |
| * Sets the default connection factory to be use |
| */ |
| public void setConnectionFactory(ConnectionFactory connectionFactory) { |
| getConfiguration().setConnectionFactory(connectionFactory); |
| } |
| |
| /** |
| * Specifies whether persistent delivery is used by default. |
| */ |
| public void setDeliveryPersistent(boolean deliveryPersistent) { |
| getConfiguration().setDeliveryPersistent(deliveryPersistent); |
| } |
| |
| /** |
| * Specifies the delivery mode to be used. Possible values are |
| * Possibles values are those defined by javax.jms.DeliveryMode. |
| * NON_PERSISTENT = 1 and PERSISTENT = 2. |
| */ |
| public void setDeliveryMode(Integer deliveryMode) { |
| getConfiguration().setDeliveryMode(deliveryMode); |
| } |
| |
| /** |
| * The durable subscriber name for specifying durable topic subscriptions. The clientId option must be configured as well. |
| */ |
| public void setDurableSubscriptionName(String durableSubscriptionName) { |
| getConfiguration().setDurableSubscriptionName(durableSubscriptionName); |
| } |
| |
| /** |
| * Specifies the JMS Exception Listener that is to be notified of any underlying JMS exceptions. |
| */ |
| public void setExceptionListener(ExceptionListener exceptionListener) { |
| getConfiguration().setExceptionListener(exceptionListener); |
| } |
| |
| /** |
| * Specifies a org.springframework.util.ErrorHandler to be invoked in case of any uncaught exceptions thrown while processing a Message. |
| * By default these exceptions will be logged at the WARN level, if no errorHandler has been configured. |
| * You can configure logging level and whether stack traces should be logged using errorHandlerLoggingLevel and errorHandlerLogStackTrace options. |
| * This makes it much easier to configure, than having to code a custom errorHandler. |
| */ |
| public void setErrorHandler(ErrorHandler errorHandler) { |
| getConfiguration().setErrorHandler(errorHandler); |
| } |
| |
| /** |
| * Allows to configure the default errorHandler logging level for logging uncaught exceptions. |
| */ |
| public void setErrorHandlerLoggingLevel(LoggingLevel errorHandlerLoggingLevel) { |
| getConfiguration().setErrorHandlerLoggingLevel(errorHandlerLoggingLevel); |
| } |
| |
| /** |
| * Allows to control whether stacktraces should be logged or not, by the default errorHandler. |
| */ |
| public void setErrorHandlerLogStackTrace(boolean errorHandlerLogStackTrace) { |
| getConfiguration().setErrorHandlerLogStackTrace(errorHandlerLogStackTrace); |
| } |
| |
| /** |
| * Set if the deliveryMode, priority or timeToLive qualities of service should be used when sending messages. |
| * This option is based on Spring's JmsTemplate. The deliveryMode, priority and timeToLive options are applied to the current endpoint. |
| * This contrasts with the preserveMessageQos option, which operates at message granularity, |
| * reading QoS properties exclusively from the Camel In message headers. |
| */ |
| public void setExplicitQosEnabled(boolean explicitQosEnabled) { |
| getConfiguration().setExplicitQosEnabled(explicitQosEnabled); |
| } |
| |
| /** |
| * Specifies whether the listener session should be exposed when consuming messages. |
| */ |
| public void setExposeListenerSession(boolean exposeListenerSession) { |
| getConfiguration().setExposeListenerSession(exposeListenerSession); |
| } |
| |
| /** |
| * Specifies the limit for idle executions of a receive task, not having received any message within its execution. |
| * If this limit is reached, the task will shut down and leave receiving to other executing tasks |
| * (in the case of dynamic scheduling; see the maxConcurrentConsumers setting). |
| * There is additional doc available from Spring. |
| */ |
| public void setIdleTaskExecutionLimit(int idleTaskExecutionLimit) { |
| getConfiguration().setIdleTaskExecutionLimit(idleTaskExecutionLimit); |
| } |
| |
| /** |
| * Specify the limit for the number of consumers that are allowed to be idle at any given time. |
| */ |
| public void setIdleConsumerLimit(int idleConsumerLimit) { |
| getConfiguration().setIdleConsumerLimit(idleConsumerLimit); |
| } |
| |
| /** |
| * Specifies the maximum number of concurrent consumers when consuming from JMS (not for request/reply over JMS). |
| * See also the maxMessagesPerTask option to control dynamic scaling up/down of threads. |
| * <p/> |
| * When doing request/reply over JMS then the option replyToMaxConcurrentConsumers is used to control number |
| * of concurrent consumers on the reply message listener. |
| */ |
| public void setMaxConcurrentConsumers(int maxConcurrentConsumers) { |
| getConfiguration().setMaxConcurrentConsumers(maxConcurrentConsumers); |
| } |
| |
| /** |
| * Specifies the maximum number of concurrent consumers when using request/reply over JMS. |
| * See also the maxMessagesPerTask option to control dynamic scaling up/down of threads. |
| */ |
| public void setReplyToMaxConcurrentConsumers(int maxConcurrentConsumers) { |
| getConfiguration().setReplyToMaxConcurrentConsumers(maxConcurrentConsumers); |
| } |
| |
| /** |
| * The number of messages per task. -1 is unlimited. |
| * If you use a range for concurrent consumers (eg min < max), then this option can be used to set |
| * a value to eg 100 to control how fast the consumers will shrink when less work is required. |
| */ |
| public void setMaxMessagesPerTask(int maxMessagesPerTask) { |
| getConfiguration().setMaxMessagesPerTask(maxMessagesPerTask); |
| } |
| |
| /** |
| * To use a custom Spring org.springframework.jms.support.converter.MessageConverter so you can be in control |
| * how to map to/from a javax.jms.Message. |
| */ |
| public void setMessageConverter(MessageConverter messageConverter) { |
| getConfiguration().setMessageConverter(messageConverter); |
| } |
| |
| /** |
| * Specifies whether Camel should auto map the received JMS message to a suited payload type, such as javax.jms.TextMessage to a String etc. |
| * See section about how mapping works below for more details. |
| */ |
| public void setMapJmsMessage(boolean mapJmsMessage) { |
| getConfiguration().setMapJmsMessage(mapJmsMessage); |
| } |
| |
| /** |
| * When sending, specifies whether message IDs should be added. |
| */ |
| public void setMessageIdEnabled(boolean messageIdEnabled) { |
| getConfiguration().setMessageIdEnabled(messageIdEnabled); |
| } |
| |
| /** |
| * Specifies whether timestamps should be enabled by default on sending messages. |
| */ |
| public void setMessageTimestampEnabled(boolean messageTimestampEnabled) { |
| getConfiguration().setMessageTimestampEnabled(messageTimestampEnabled); |
| } |
| |
| /** |
| * If true, Camel will always make a JMS message copy of the message when it is passed to the producer for sending. |
| * Copying the message is needed in some situations, such as when a replyToDestinationSelectorName is set |
| * (incidentally, Camel will set the alwaysCopyMessage option to true, if a replyToDestinationSelectorName is set) |
| */ |
| public void setAlwaysCopyMessage(boolean alwaysCopyMessage) { |
| getConfiguration().setAlwaysCopyMessage(alwaysCopyMessage); |
| } |
| |
| /** |
| * Specifies whether JMSMessageID should always be used as JMSCorrelationID for InOut messages. |
| */ |
| public void setUseMessageIDAsCorrelationID(boolean useMessageIDAsCorrelationID) { |
| getConfiguration().setUseMessageIDAsCorrelationID(useMessageIDAsCorrelationID); |
| } |
| |
| /** |
| * Values greater than 1 specify the message priority when sending (where 0 is the lowest priority and 9 is the highest). |
| * The explicitQosEnabled option must also be enabled in order for this option to have any effect. |
| */ |
| public void setPriority(int priority) { |
| getConfiguration().setPriority(priority); |
| } |
| |
| /** |
| * Specifies whether to inhibit the delivery of messages published by its own connection. |
| */ |
| public void setPubSubNoLocal(boolean pubSubNoLocal) { |
| getConfiguration().setPubSubNoLocal(pubSubNoLocal); |
| } |
| |
| /** |
| * The timeout for receiving messages (in milliseconds). |
| */ |
| public void setReceiveTimeout(long receiveTimeout) { |
| getConfiguration().setReceiveTimeout(receiveTimeout); |
| } |
| |
| /** |
| * Specifies the interval between recovery attempts, i.e. when a connection is being refreshed, in milliseconds. |
| * The default is 5000 ms, that is, 5 seconds. |
| */ |
| public void setRecoveryInterval(long recoveryInterval) { |
| getConfiguration().setRecoveryInterval(recoveryInterval); |
| } |
| |
| /** |
| * Deprecated: Enabled by default, if you specify a durableSubscriptionName and a clientId. |
| */ |
| @Deprecated |
| public void setSubscriptionDurable(boolean subscriptionDurable) { |
| getConfiguration().setSubscriptionDurable(subscriptionDurable); |
| } |
| |
| /** |
| * Allows you to specify a custom task executor for consuming messages. |
| */ |
| public void setTaskExecutor(TaskExecutor taskExecutor) { |
| getConfiguration().setTaskExecutor(taskExecutor); |
| } |
| |
| /** |
| * When sending messages, specifies the time-to-live of the message (in milliseconds). |
| */ |
| public void setTimeToLive(long timeToLive) { |
| getConfiguration().setTimeToLive(timeToLive); |
| } |
| |
| /** |
| * Specifies whether to use transacted mode |
| */ |
| public void setTransacted(boolean consumerTransacted) { |
| getConfiguration().setTransacted(consumerTransacted); |
| } |
| |
| /** |
| * If true, Camel will create a JmsTransactionManager, if there is no transactionManager injected when option transacted=true. |
| */ |
| public void setLazyCreateTransactionManager(boolean lazyCreating) { |
| getConfiguration().setLazyCreateTransactionManager(lazyCreating); |
| } |
| |
| /** |
| * The Spring transaction manager to use. |
| */ |
| public void setTransactionManager(PlatformTransactionManager transactionManager) { |
| getConfiguration().setTransactionManager(transactionManager); |
| } |
| |
| /** |
| * The name of the transaction to use. |
| */ |
| public void setTransactionName(String transactionName) { |
| getConfiguration().setTransactionName(transactionName); |
| } |
| |
| /** |
| * The timeout value of the transaction (in seconds), if using transacted mode. |
| */ |
| public void setTransactionTimeout(int transactionTimeout) { |
| getConfiguration().setTransactionTimeout(transactionTimeout); |
| } |
| |
| /** |
| * Specifies whether to test the connection on startup. |
| * This ensures that when Camel starts that all the JMS consumers have a valid connection to the JMS broker. |
| * If a connection cannot be granted then Camel throws an exception on startup. |
| * This ensures that Camel is not started with failed connections. |
| * The JMS producers is tested as well. |
| */ |
| public void setTestConnectionOnStartup(boolean testConnectionOnStartup) { |
| getConfiguration().setTestConnectionOnStartup(testConnectionOnStartup); |
| } |
| |
| /** |
| * Whether to startup the JmsConsumer message listener asynchronously, when starting a route. |
| * For example if a JmsConsumer cannot get a connection to a remote JMS broker, then it may block while retrying |
| * and/or failover. This will cause Camel to block while starting routes. By setting this option to true, |
| * you will let routes startup, while the JmsConsumer connects to the JMS broker using a dedicated thread |
| * in asynchronous mode. If this option is used, then beware that if the connection could not be established, |
| * then an exception is logged at WARN level, and the consumer will not be able to receive messages; |
| * You can then restart the route to retry. |
| */ |
| public void setAsyncStartListener(boolean asyncStartListener) { |
| getConfiguration().setAsyncStartListener(asyncStartListener); |
| } |
| |
| /** |
| * Whether to stop the JmsConsumer message listener asynchronously, when stopping a route. |
| */ |
| public void setAsyncStopListener(boolean asyncStopListener) { |
| getConfiguration().setAsyncStopListener(asyncStopListener); |
| } |
| |
| /** |
| * When using mapJmsMessage=false Camel will create a new JMS message to send to a new JMS destination |
| * if you touch the headers (get or set) during the route. Set this option to true to force Camel to send |
| * the original JMS message that was received. |
| */ |
| public void setForceSendOriginalMessage(boolean forceSendOriginalMessage) { |
| getConfiguration().setForceSendOriginalMessage(forceSendOriginalMessage); |
| } |
| |
| /** |
| * The timeout for waiting for a reply when using the InOut Exchange Pattern (in milliseconds). |
| * The default is 20 seconds. You can include the header "CamelJmsRequestTimeout" to override this endpoint configured |
| * timeout value, and thus have per message individual timeout values. |
| * See also the requestTimeoutCheckerInterval option. |
| */ |
| public void setRequestTimeout(long requestTimeout) { |
| getConfiguration().setRequestTimeout(requestTimeout); |
| } |
| |
| /** |
| * Configures how often Camel should check for timed out Exchanges when doing request/reply over JMS. |
| * By default Camel checks once per second. But if you must react faster when a timeout occurs, |
| * then you can lower this interval, to check more frequently. The timeout is determined by the option requestTimeout. |
| */ |
| public void setRequestTimeoutCheckerInterval(long requestTimeoutCheckerInterval) { |
| getConfiguration().setRequestTimeoutCheckerInterval(requestTimeoutCheckerInterval); |
| } |
| |
| /** |
| * You can transfer the exchange over the wire instead of just the body and headers. |
| * The following fields are transferred: In body, Out body, Fault body, In headers, Out headers, Fault headers, |
| * exchange properties, exchange exception. |
| * This requires that the objects are serializable. Camel will exclude any non-serializable objects and log it at WARN level. |
| * You must enable this option on both the producer and consumer side, so Camel knows the payloads is an Exchange and not a regular payload. |
| */ |
| public void setTransferExchange(boolean transferExchange) { |
| getConfiguration().setTransferExchange(transferExchange); |
| } |
| |
| /** |
| * If enabled and you are using Request Reply messaging (InOut) and an Exchange failed on the consumer side, |
| * then the caused Exception will be send back in response as a javax.jms.ObjectMessage. |
| * If the client is Camel, the returned Exception is rethrown. This allows you to use Camel JMS as a bridge |
| * in your routing - for example, using persistent queues to enable robust routing. |
| * Notice that if you also have transferExchange enabled, this option takes precedence. |
| * The caught exception is required to be serializable. |
| * The original Exception on the consumer side can be wrapped in an outer exception |
| * such as org.apache.camel.RuntimeCamelException when returned to the producer. |
| */ |
| public void setTransferException(boolean transferException) { |
| getConfiguration().setTransferException(transferException); |
| } |
| |
| /** |
| * If enabled and you are using Request Reply messaging (InOut) and an Exchange failed with a SOAP fault (not exception) on the consumer side, |
| * then the fault flag on {@link org.apache.camel.Message#isFault()} will be send back in the response as a JMS header with the key |
| * {@link JmsConstants#JMS_TRANSFER_FAULT}. |
| * If the client is Camel, the returned fault flag will be set on the {@link org.apache.camel.Message#setFault(boolean)}. |
| * <p/> |
| * You may want to enable this when using Camel components that support faults such as SOAP based such as cxf or spring-ws. |
| */ |
| public void setTransferFault(boolean transferFault) { |
| getConfiguration().setTransferFault(transferFault); |
| } |
| |
| /** |
| * Allows you to use your own implementation of the org.springframework.jms.core.JmsOperations interface. |
| * Camel uses JmsTemplate as default. Can be used for testing purpose, but not used much as stated in the spring API docs. |
| */ |
| public void setJmsOperations(JmsOperations jmsOperations) { |
| getConfiguration().setJmsOperations(jmsOperations); |
| } |
| |
| /** |
| * A pluggable org.springframework.jms.support.destination.DestinationResolver that allows you to use your own resolver |
| * (for example, to lookup the real destination in a JNDI registry). |
| */ |
| public void setDestinationResolver(DestinationResolver destinationResolver) { |
| getConfiguration().setDestinationResolver(destinationResolver); |
| } |
| |
| /** |
| * Allows for explicitly specifying which kind of strategy to use for replyTo queues when doing request/reply over JMS. |
| * Possible values are: Temporary, Shared, or Exclusive. |
| * By default Camel will use temporary queues. However if replyTo has been configured, then Shared is used by default. |
| * This option allows you to use exclusive queues instead of shared ones. |
| * See Camel JMS documentation for more details, and especially the notes about the implications if running in a clustered environment, |
| * and the fact that Shared reply queues has lower performance than its alternatives Temporary and Exclusive. |
| */ |
| public void setReplyToType(ReplyToType replyToType) { |
| getConfiguration().setReplyToType(replyToType); |
| } |
| |
| /** |
| * Set to true, if you want to send message using the QoS settings specified on the message, |
| * instead of the QoS settings on the JMS endpoint. The following three headers are considered JMSPriority, JMSDeliveryMode, |
| * and JMSExpiration. You can provide all or only some of them. If not provided, Camel will fall back to use the |
| * values from the endpoint instead. So, when using this option, the headers override the values from the endpoint. |
| * The explicitQosEnabled option, by contrast, will only use options set on the endpoint, and not values from the message header. |
| */ |
| public void setPreserveMessageQos(boolean preserveMessageQos) { |
| getConfiguration().setPreserveMessageQos(preserveMessageQos); |
| } |
| |
| /** |
| * Whether the JmsConsumer processes the Exchange asynchronously. |
| * If enabled then the JmsConsumer may pickup the next message from the JMS queue, |
| * while the previous message is being processed asynchronously (by the Asynchronous Routing Engine). |
| * This means that messages may be processed not 100% strictly in order. If disabled (as default) |
| * then the Exchange is fully processed before the JmsConsumer will pickup the next message from the JMS queue. |
| * Note if transacted has been enabled, then asyncConsumer=true does not run asynchronously, as transaction |
| * must be executed synchronously (Camel 3.0 may support async transactions). |
| */ |
| public void setAsyncConsumer(boolean asyncConsumer) { |
| getConfiguration().setAsyncConsumer(asyncConsumer); |
| } |
| |
| /** |
| * Whether to allow sending messages with no body. If this option is false and the message body is null, then an JMSException is thrown. |
| */ |
| public void setAllowNullBody(boolean allowNullBody) { |
| getConfiguration().setAllowNullBody(allowNullBody); |
| } |
| |
| /** |
| * Only applicable when sending to JMS destination using InOnly (eg fire and forget). |
| * Enabling this option will enrich the Camel Exchange with the actual JMSMessageID |
| * that was used by the JMS client when the message was sent to the JMS destination. |
| */ |
| public void setIncludeSentJMSMessageID(boolean includeSentJMSMessageID) { |
| getConfiguration().setIncludeSentJMSMessageID(includeSentJMSMessageID); |
| } |
| |
| /** |
| * Whether to include all JMSXxxx properties when mapping from JMS to Camel Message. |
| * Setting this to true will include properties such as JMSXAppID, and JMSXUserID etc. |
| * Note: If you are using a custom headerFilterStrategy then this option does not apply. |
| */ |
| public void setIncludeAllJMSXProperties(boolean includeAllJMSXProperties) { |
| getConfiguration().setIncludeAllJMSXProperties(includeAllJMSXProperties); |
| } |
| |
| /** |
| * Specifies what default TaskExecutor type to use in the DefaultMessageListenerContainer, |
| * for both consumer endpoints and the ReplyTo consumer of producer endpoints. |
| * Possible values: SimpleAsync (uses Spring's SimpleAsyncTaskExecutor) or ThreadPool |
| * (uses Spring's ThreadPoolTaskExecutor with optimal values - cached threadpool-like). |
| * If not set, it defaults to the previous behaviour, which uses a cached thread pool |
| * for consumer endpoints and SimpleAsync for reply consumers. |
| * The use of ThreadPool is recommended to reduce "thread trash" in elastic configurations |
| * with dynamically increasing and decreasing concurrent consumers. |
| */ |
| public void setDefaultTaskExecutorType(DefaultTaskExecutorType type) { |
| getConfiguration().setDefaultTaskExecutorType(type); |
| } |
| |
| /** |
| * Pluggable strategy for encoding and decoding JMS keys so they can be compliant with the JMS specification. |
| * Camel provides two implementations out of the box: default and passthrough. |
| * The default strategy will safely marshal dots and hyphens (. and -). The passthrough strategy leaves the key as is. |
| * Can be used for JMS brokers which do not care whether JMS header keys contain illegal characters. |
| * You can provide your own implementation of the org.apache.camel.component.jms.JmsKeyFormatStrategy |
| * and refer to it using the # notation. |
| */ |
| public void setJmsKeyFormatStrategy(JmsKeyFormatStrategy jmsKeyFormatStrategy) { |
| getConfiguration().setJmsKeyFormatStrategy(jmsKeyFormatStrategy); |
| } |
| |
| /** |
| * Pluggable strategy for encoding and decoding JMS keys so they can be compliant with the JMS specification. |
| * Camel provides two implementations out of the box: default and passthrough. |
| * The default strategy will safely marshal dots and hyphens (. and -). The passthrough strategy leaves the key as is. |
| * Can be used for JMS brokers which do not care whether JMS header keys contain illegal characters. |
| * You can provide your own implementation of the org.apache.camel.component.jms.JmsKeyFormatStrategy |
| * and refer to it using the # notation. |
| */ |
| public void setJmsKeyFormatStrategy(String jmsKeyFormatStrategyName) { |
| // allow to configure a standard by its name, which is simpler |
| JmsKeyFormatStrategy strategy = resolveStandardJmsKeyFormatStrategy(jmsKeyFormatStrategyName); |
| if (strategy == null) { |
| throw new IllegalArgumentException("JmsKeyFormatStrategy with name " + jmsKeyFormatStrategyName + " is not a standard supported name"); |
| } else { |
| getConfiguration().setJmsKeyFormatStrategy(strategy); |
| } |
| } |
| |
| /** |
| * Sets the Spring ApplicationContext to use |
| */ |
| public void setApplicationContext(ApplicationContext applicationContext) throws BeansException { |
| this.applicationContext = applicationContext; |
| } |
| |
| public QueueBrowseStrategy getQueueBrowseStrategy() { |
| if (queueBrowseStrategy == null) { |
| queueBrowseStrategy = new DefaultQueueBrowseStrategy(); |
| } |
| return queueBrowseStrategy; |
| } |
| |
| /** |
| * To use a custom QueueBrowseStrategy when browsing queues |
| */ |
| public void setQueueBrowseStrategy(QueueBrowseStrategy queueBrowseStrategy) { |
| this.queueBrowseStrategy = queueBrowseStrategy; |
| } |
| |
| public HeaderFilterStrategy getHeaderFilterStrategy() { |
| return headerFilterStrategy; |
| } |
| |
| /** |
| * To use a custom HeaderFilterStrategy to filter header to and from Camel message. |
| */ |
| public void setHeaderFilterStrategy(HeaderFilterStrategy strategy) { |
| this.headerFilterStrategy = strategy; |
| } |
| |
| public MessageCreatedStrategy getMessageCreatedStrategy() { |
| return messageCreatedStrategy; |
| } |
| |
| /** |
| * To use the given MessageCreatedStrategy which are invoked when Camel creates new instances of <tt>javax.jms.Message</tt> |
| * objects when Camel is sending a JMS message. |
| */ |
| public void setMessageCreatedStrategy(MessageCreatedStrategy messageCreatedStrategy) { |
| this.messageCreatedStrategy = messageCreatedStrategy; |
| } |
| |
| // Implementation methods |
| // ------------------------------------------------------------------------- |
| |
| |
| @Override |
| protected void doStart() throws Exception { |
| if (headerFilterStrategy == null) { |
| headerFilterStrategy = new JmsHeaderFilterStrategy(getConfiguration().isIncludeAllJMSXProperties()); |
| } |
| } |
| |
| @Override |
| protected void doShutdown() throws Exception { |
| if (asyncStartStopExecutorService != null) { |
| getCamelContext().getExecutorServiceManager().shutdownNow(asyncStartStopExecutorService); |
| asyncStartStopExecutorService = null; |
| } |
| super.doShutdown(); |
| } |
| |
| protected synchronized ExecutorService getAsyncStartStopExecutorService() { |
| if (asyncStartStopExecutorService == null) { |
| // use a cached thread pool for async start tasks as they can run for a while, and we need a dedicated thread |
| // for each task, and the thread pool will shrink when no more tasks running |
| asyncStartStopExecutorService = getCamelContext().getExecutorServiceManager().newCachedThreadPool(this, "AsyncStartStopListener"); |
| } |
| return asyncStartStopExecutorService; |
| } |
| |
| @Override |
| protected Endpoint createEndpoint(String uri, String remaining, Map<String, Object> parameters) |
| throws Exception { |
| |
| boolean pubSubDomain = false; |
| boolean tempDestination = false; |
| if (remaining.startsWith(JmsConfiguration.QUEUE_PREFIX)) { |
| pubSubDomain = false; |
| remaining = removeStartingCharacters(remaining.substring(JmsConfiguration.QUEUE_PREFIX.length()), '/'); |
| } else if (remaining.startsWith(JmsConfiguration.TOPIC_PREFIX)) { |
| pubSubDomain = true; |
| remaining = removeStartingCharacters(remaining.substring(JmsConfiguration.TOPIC_PREFIX.length()), '/'); |
| } else if (remaining.startsWith(JmsConfiguration.TEMP_QUEUE_PREFIX)) { |
| pubSubDomain = false; |
| tempDestination = true; |
| remaining = removeStartingCharacters(remaining.substring(JmsConfiguration.TEMP_QUEUE_PREFIX.length()), '/'); |
| } else if (remaining.startsWith(JmsConfiguration.TEMP_TOPIC_PREFIX)) { |
| pubSubDomain = true; |
| tempDestination = true; |
| remaining = removeStartingCharacters(remaining.substring(JmsConfiguration.TEMP_TOPIC_PREFIX.length()), '/'); |
| } |
| |
| final String subject = convertPathToActualDestination(remaining, parameters); |
| |
| // lets make sure we copy the configuration as each endpoint can |
| // customize its own version |
| JmsConfiguration newConfiguration = getConfiguration().copy(); |
| JmsEndpoint endpoint; |
| if (pubSubDomain) { |
| if (tempDestination) { |
| endpoint = createTemporaryTopicEndpoint(uri, this, subject, newConfiguration); |
| } else { |
| endpoint = createTopicEndpoint(uri, this, subject, newConfiguration); |
| } |
| } else { |
| QueueBrowseStrategy strategy = getQueueBrowseStrategy(); |
| if (tempDestination) { |
| endpoint = createTemporaryQueueEndpoint(uri, this, subject, newConfiguration, strategy); |
| } else { |
| endpoint = createQueueEndpoint(uri, this, subject, newConfiguration, strategy); |
| } |
| } |
| |
| // resolve any custom connection factory first |
| ConnectionFactory cf = resolveAndRemoveReferenceParameter(parameters, "connectionFactory", ConnectionFactory.class); |
| if (cf != null) { |
| endpoint.getConfiguration().setConnectionFactory(cf); |
| } |
| |
| String username = getAndRemoveParameter(parameters, "username", String.class); |
| String password = getAndRemoveParameter(parameters, "password", String.class); |
| if (username != null && password != null) { |
| cf = endpoint.getConfiguration().getConnectionFactory(); |
| UserCredentialsConnectionFactoryAdapter ucfa = new UserCredentialsConnectionFactoryAdapter(); |
| ucfa.setTargetConnectionFactory(cf); |
| ucfa.setPassword(password); |
| ucfa.setUsername(username); |
| endpoint.getConfiguration().setConnectionFactory(ucfa); |
| } else { |
| if (username != null || password != null) { |
| // exclude the the saturation of username and password are all empty |
| throw new IllegalArgumentException("The JmsComponent's username or password is null"); |
| } |
| } |
| |
| // jms header strategy |
| String strategyVal = getAndRemoveParameter(parameters, KEY_FORMAT_STRATEGY_PARAM, String.class); |
| JmsKeyFormatStrategy strategy = resolveStandardJmsKeyFormatStrategy(strategyVal); |
| if (strategy != null) { |
| endpoint.setJmsKeyFormatStrategy(strategy); |
| } else { |
| // its not a standard, but a reference |
| parameters.put(KEY_FORMAT_STRATEGY_PARAM, strategyVal); |
| endpoint.setJmsKeyFormatStrategy(resolveAndRemoveReferenceParameter( |
| parameters, KEY_FORMAT_STRATEGY_PARAM, JmsKeyFormatStrategy.class)); |
| } |
| |
| MessageListenerContainerFactory messageListenerContainerFactory = resolveAndRemoveReferenceParameter(parameters, |
| "messageListenerContainerFactoryRef", MessageListenerContainerFactory.class); |
| if (messageListenerContainerFactory == null) { |
| messageListenerContainerFactory = resolveAndRemoveReferenceParameter(parameters, |
| "messageListenerContainerFactory", MessageListenerContainerFactory.class); |
| } |
| if (messageListenerContainerFactory != null) { |
| endpoint.setMessageListenerContainerFactory(messageListenerContainerFactory); |
| } |
| |
| setProperties(endpoint.getConfiguration(), parameters); |
| endpoint.setHeaderFilterStrategy(getHeaderFilterStrategy()); |
| |
| return endpoint; |
| } |
| |
| protected JmsEndpoint createTemporaryTopicEndpoint(String uri, JmsComponent component, String subject, JmsConfiguration configuration) { |
| return new JmsTemporaryTopicEndpoint(uri, component, subject, configuration); |
| } |
| |
| protected JmsEndpoint createTopicEndpoint(String uri, JmsComponent component, String subject, JmsConfiguration configuration) { |
| return new JmsEndpoint(uri, component, subject, true, configuration); |
| } |
| |
| protected JmsEndpoint createTemporaryQueueEndpoint(String uri, JmsComponent component, String subject, JmsConfiguration configuration, QueueBrowseStrategy queueBrowseStrategy) { |
| return new JmsTemporaryQueueEndpoint(uri, component, subject, configuration, queueBrowseStrategy); |
| } |
| |
| protected JmsEndpoint createQueueEndpoint(String uri, JmsComponent component, String subject, JmsConfiguration configuration, QueueBrowseStrategy queueBrowseStrategy) { |
| return new JmsQueueEndpoint(uri, component, subject, configuration, queueBrowseStrategy); |
| } |
| |
| /** |
| * Resolves the standard supported {@link JmsKeyFormatStrategy} by a name which can be: |
| * <ul> |
| * <li>default - to use the default strategy</li> |
| * <li>passthrough - to use the passthrough strategy</li> |
| * </ul> |
| * |
| * @param name the name |
| * @return the strategy, or <tt>null</tt> if not a standard name. |
| */ |
| private static JmsKeyFormatStrategy resolveStandardJmsKeyFormatStrategy(String name) { |
| if ("default".equalsIgnoreCase(name)) { |
| return new DefaultJmsKeyFormatStrategy(); |
| } else if ("passthrough".equalsIgnoreCase(name)) { |
| return new PassThroughJmsKeyFormatStrategy(); |
| } else { |
| return null; |
| } |
| } |
| |
| /** |
| * A strategy method allowing the URI destination to be translated into the |
| * actual JMS destination name (say by looking up in JNDI or something) |
| */ |
| protected String convertPathToActualDestination(String path, Map<String, Object> parameters) { |
| return path; |
| } |
| |
| /** |
| * Factory method to create the default configuration instance |
| * |
| * @return a newly created configuration object which can then be further |
| * customized |
| */ |
| protected JmsConfiguration createConfiguration() { |
| return new JmsConfiguration(); |
| } |
| |
| } |