| /** |
| * Licensed to the Apache Software Foundation (ASF) under one or more |
| * contributor license agreements. See the NOTICE file distributed with |
| * this work for additional information regarding copyright ownership. |
| * The ASF licenses this file to You under the Apache License, Version 2.0 |
| * (the "License"); you may not use this file except in compliance with |
| * the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| package org.apache.camel.component.jms; |
| |
| import java.util.Map; |
| import java.util.concurrent.ExecutorService; |
| import javax.jms.ConnectionFactory; |
| import javax.jms.ExceptionListener; |
| import javax.jms.Message; |
| import javax.jms.Session; |
| |
| import org.apache.camel.CamelContext; |
| import org.apache.camel.Endpoint; |
| import org.apache.camel.LoggingLevel; |
| import org.apache.camel.impl.HeaderFilterStrategyComponent; |
| import org.apache.camel.spi.Metadata; |
| import org.apache.camel.util.ObjectHelper; |
| import org.slf4j.Logger; |
| import org.slf4j.LoggerFactory; |
| import org.springframework.beans.BeansException; |
| import org.springframework.context.ApplicationContext; |
| import org.springframework.context.ApplicationContextAware; |
| import org.springframework.core.task.TaskExecutor; |
| import org.springframework.jms.connection.JmsTransactionManager; |
| import org.springframework.jms.connection.UserCredentialsConnectionFactoryAdapter; |
| import org.springframework.jms.core.JmsOperations; |
| import org.springframework.jms.support.converter.MessageConverter; |
| import org.springframework.jms.support.destination.DestinationResolver; |
| import org.springframework.transaction.PlatformTransactionManager; |
| import org.springframework.util.ErrorHandler; |
| |
| import static org.apache.camel.util.StringHelper.removeStartingCharacters; |
| |
| /** |
| * A <a href="http://activemq.apache.org/jms.html">JMS Component</a> |
| * |
| * @version |
| */ |
| public class JmsComponent extends HeaderFilterStrategyComponent implements ApplicationContextAware { |
| |
| private static final Logger LOG = LoggerFactory.getLogger(JmsComponent.class); |
| |
| private static final String KEY_FORMAT_STRATEGY_PARAM = "jmsKeyFormatStrategy"; |
| |
| private ExecutorService asyncStartStopExecutorService; |
| private ApplicationContext applicationContext; |
| |
| @Metadata(label = "advanced", description = "To use a shared JMS configuration") |
| private JmsConfiguration configuration; |
| @Metadata(label = "advanced", description = "To use a custom QueueBrowseStrategy when browsing queues") |
| private QueueBrowseStrategy queueBrowseStrategy; |
| @Metadata(label = "advanced", description = "To use the given MessageCreatedStrategy which are invoked when Camel creates new instances" |
| + " of javax.jms.Message objects when Camel is sending a JMS message.") |
| private MessageCreatedStrategy messageCreatedStrategy; |
| |
| public JmsComponent() { |
| super(JmsEndpoint.class); |
| } |
| |
| public JmsComponent(Class<? extends Endpoint> endpointClass) { |
| super(endpointClass); |
| } |
| |
| public JmsComponent(CamelContext context) { |
| super(context, JmsEndpoint.class); |
| } |
| |
| public JmsComponent(CamelContext context, Class<? extends Endpoint> endpointClass) { |
| super(context, endpointClass); |
| } |
| |
| public JmsComponent(JmsConfiguration configuration) { |
| this(); |
| this.configuration = configuration; |
| } |
| |
| /** |
| * Static builder method |
| */ |
| public static JmsComponent jmsComponent() { |
| return new JmsComponent(); |
| } |
| |
| /** |
| * Static builder method |
| */ |
| public static JmsComponent jmsComponent(JmsConfiguration configuration) { |
| return new JmsComponent(configuration); |
| } |
| |
| /** |
| * Static builder method |
| */ |
| public static JmsComponent jmsComponent(ConnectionFactory connectionFactory) { |
| return jmsComponent(new JmsConfiguration(connectionFactory)); |
| } |
| |
| /** |
| * Static builder method |
| */ |
| public static JmsComponent jmsComponentClientAcknowledge(ConnectionFactory connectionFactory) { |
| JmsConfiguration template = new JmsConfiguration(connectionFactory); |
| template.setAcknowledgementMode(Session.CLIENT_ACKNOWLEDGE); |
| return jmsComponent(template); |
| } |
| |
| /** |
| * Static builder method |
| */ |
| public static JmsComponent jmsComponentAutoAcknowledge(ConnectionFactory connectionFactory) { |
| JmsConfiguration template = new JmsConfiguration(connectionFactory); |
| template.setAcknowledgementMode(Session.AUTO_ACKNOWLEDGE); |
| return jmsComponent(template); |
| } |
| |
| public static JmsComponent jmsComponentTransacted(ConnectionFactory connectionFactory) { |
| JmsTransactionManager transactionManager = new JmsTransactionManager(); |
| transactionManager.setConnectionFactory(connectionFactory); |
| return jmsComponentTransacted(connectionFactory, transactionManager); |
| } |
| |
| @SuppressWarnings("deprecation") |
| public static JmsComponent jmsComponentTransacted(ConnectionFactory connectionFactory, |
| PlatformTransactionManager transactionManager) { |
| JmsConfiguration template = new JmsConfiguration(connectionFactory); |
| template.setTransactionManager(transactionManager); |
| template.setTransacted(true); |
| template.setTransactedInOut(true); |
| return jmsComponent(template); |
| } |
| |
| // Properties |
| // ------------------------------------------------------------------------- |
| |
| public JmsConfiguration getConfiguration() { |
| if (configuration == null) { |
| configuration = createConfiguration(); |
| |
| // If we are being configured with spring... |
| if (applicationContext != null) { |
| |
| if (isAllowAutoWiredConnectionFactory()) { |
| Map<String, ConnectionFactory> beansOfTypeConnectionFactory = applicationContext.getBeansOfType(ConnectionFactory.class); |
| if (!beansOfTypeConnectionFactory.isEmpty()) { |
| ConnectionFactory cf = beansOfTypeConnectionFactory.values().iterator().next(); |
| configuration.setConnectionFactory(cf); |
| } |
| } |
| |
| if (isAllowAutoWiredDestinationResolver()) { |
| Map<String, DestinationResolver> beansOfTypeDestinationResolver = applicationContext.getBeansOfType(DestinationResolver.class); |
| if (!beansOfTypeDestinationResolver.isEmpty()) { |
| DestinationResolver destinationResolver = beansOfTypeDestinationResolver.values().iterator().next(); |
| configuration.setDestinationResolver(destinationResolver); |
| } |
| } |
| } |
| } |
| return configuration; |
| } |
| |
| /** |
| * Subclasses can override to prevent the jms configuration from being |
| * setup to use an auto-wired the connection factory that's found in the spring |
| * application context. |
| * |
| * @return true by default |
| */ |
| public boolean isAllowAutoWiredConnectionFactory() { |
| return true; |
| } |
| |
| /** |
| * Subclasses can override to prevent the jms configuration from being |
| * setup to use an auto-wired the destination resolved that's found in the spring |
| * application context. |
| * |
| * @return true by default |
| */ |
| public boolean isAllowAutoWiredDestinationResolver() { |
| return true; |
| } |
| |
| /** |
| * To use a shared JMS configuration |
| */ |
| public void setConfiguration(JmsConfiguration configuration) { |
| this.configuration = configuration; |
| } |
| |
| /** |
| * Specifies whether the consumer accept messages while it is stopping. |
| * You may consider enabling this option, if you start and stop JMS routes at runtime, while there are still messages |
| * enqueued on the queue. If this option is false, and you stop the JMS route, then messages may be rejected, |
| * and the JMS broker would have to attempt redeliveries, which yet again may be rejected, and eventually the message |
| * may be moved at a dead letter queue on the JMS broker. To avoid this its recommended to enable this option. |
| */ |
| @Metadata(label = "consumer,advanced", |
| description = "Specifies whether the consumer accept messages while it is stopping." |
| + " You may consider enabling this option, if you start and stop JMS routes at runtime, while there are still messages" |
| + " enqueued on the queue. If this option is false, and you stop the JMS route, then messages may be rejected," |
| + " and the JMS broker would have to attempt redeliveries, which yet again may be rejected, and eventually the message" |
| + " may be moved at a dead letter queue on the JMS broker. To avoid this its recommended to enable this option.") |
| public void setAcceptMessagesWhileStopping(boolean acceptMessagesWhileStopping) { |
| getConfiguration().setAcceptMessagesWhileStopping(acceptMessagesWhileStopping); |
| } |
| |
| /** |
| * Whether the DefaultMessageListenerContainer used in the reply managers for request-reply messaging allow |
| * the DefaultMessageListenerContainer.runningAllowed flag to quick stop in case JmsConfiguration#isAcceptMessagesWhileStopping |
| * is enabled, and org.apache.camel.CamelContext is currently being stopped. This quick stop ability is enabled by |
| * default in the regular JMS consumers but to enable for reply managers you must enable this flag. |
| */ |
| @Metadata(label = "consumer,advanced", |
| description = "Whether the DefaultMessageListenerContainer used in the reply managers for request-reply messaging allow " |
| + " the DefaultMessageListenerContainer.runningAllowed flag to quick stop in case JmsConfiguration#isAcceptMessagesWhileStopping" |
| + " is enabled, and org.apache.camel.CamelContext is currently being stopped. This quick stop ability is enabled by" |
| + " default in the regular JMS consumers but to enable for reply managers you must enable this flag.") |
| public void setAllowReplyManagerQuickStop(boolean allowReplyManagerQuickStop) { |
| getConfiguration().setAllowReplyManagerQuickStop(allowReplyManagerQuickStop); |
| } |
| |
| /** |
| * The JMS acknowledgement mode defined as an Integer. |
| * Allows you to set vendor-specific extensions to the acknowledgment mode. |
| * For the regular modes, it is preferable to use the acknowledgementModeName instead. |
| */ |
| @Metadata(label = "consumer", |
| description = "The JMS acknowledgement mode defined as an Integer. Allows you to set vendor-specific extensions to the acknowledgment mode." |
| + "For the regular modes, it is preferable to use the acknowledgementModeName instead.") |
| public void setAcknowledgementMode(int consumerAcknowledgementMode) { |
| getConfiguration().setAcknowledgementMode(consumerAcknowledgementMode); |
| } |
| |
| /** |
| * Enables eager loading of JMS properties as soon as a message is loaded |
| * which generally is inefficient as the JMS properties may not be required |
| * but sometimes can catch early any issues with the underlying JMS provider |
| * and the use of JMS properties |
| */ |
| @Metadata(label = "consumer,advanced", |
| description = "Enables eager loading of JMS properties as soon as a message is loaded" |
| + " which generally is inefficient as the JMS properties may not be required" |
| + " but sometimes can catch early any issues with the underlying JMS provider" |
| + " and the use of JMS properties") |
| public void setEagerLoadingOfProperties(boolean eagerLoadingOfProperties) { |
| getConfiguration().setEagerLoadingOfProperties(eagerLoadingOfProperties); |
| } |
| |
| /** |
| * The JMS acknowledgement name, which is one of: SESSION_TRANSACTED, CLIENT_ACKNOWLEDGE, AUTO_ACKNOWLEDGE, DUPS_OK_ACKNOWLEDGE |
| */ |
| @Metadata(defaultValue = "AUTO_ACKNOWLEDGE", label = "consumer", enums = "SESSION_TRANSACTED,CLIENT_ACKNOWLEDGE,AUTO_ACKNOWLEDGE,DUPS_OK_ACKNOWLEDGE", |
| description = "The JMS acknowledgement name, which is one of: SESSION_TRANSACTED, CLIENT_ACKNOWLEDGE, AUTO_ACKNOWLEDGE, DUPS_OK_ACKNOWLEDGE") |
| public void setAcknowledgementModeName(String consumerAcknowledgementMode) { |
| getConfiguration().setAcknowledgementModeName(consumerAcknowledgementMode); |
| } |
| |
| /** |
| * Specifies whether the consumer container should auto-startup. |
| */ |
| @Metadata(label = "consumer", defaultValue = "true", |
| description = "Specifies whether the consumer container should auto-startup.") |
| public void setAutoStartup(boolean autoStartup) { |
| getConfiguration().setAutoStartup(autoStartup); |
| } |
| |
| /** |
| * Sets the cache level by ID for the underlying JMS resources. See cacheLevelName option for more details. |
| */ |
| @Metadata(label = "consumer", |
| description = "Sets the cache level by ID for the underlying JMS resources. See cacheLevelName option for more details.") |
| public void setCacheLevel(int cacheLevel) { |
| getConfiguration().setCacheLevel(cacheLevel); |
| } |
| |
| /** |
| * Sets the cache level by name for the underlying JMS resources. |
| * Possible values are: CACHE_AUTO, CACHE_CONNECTION, CACHE_CONSUMER, CACHE_NONE, and CACHE_SESSION. |
| * The default setting is CACHE_AUTO. See the Spring documentation and Transactions Cache Levels for more information. |
| */ |
| @Metadata(defaultValue = "CACHE_AUTO", label = "consumer", enums = "CACHE_AUTO,CACHE_CONNECTION,CACHE_CONSUMER,CACHE_NONE,CACHE_SESSION", |
| description = "Sets the cache level by name for the underlying JMS resources." |
| + " Possible values are: CACHE_AUTO, CACHE_CONNECTION, CACHE_CONSUMER, CACHE_NONE, and CACHE_SESSION." |
| + " The default setting is CACHE_AUTO. See the Spring documentation and Transactions Cache Levels for more information.") |
| public void setCacheLevelName(String cacheName) { |
| getConfiguration().setCacheLevelName(cacheName); |
| } |
| |
| /** |
| * Sets the cache level by name for the reply consumer when doing request/reply over JMS. |
| * This option only applies when using fixed reply queues (not temporary). |
| * Camel will by default use: CACHE_CONSUMER for exclusive or shared w/ replyToSelectorName. |
| * And CACHE_SESSION for shared without replyToSelectorName. Some JMS brokers such as IBM WebSphere |
| * may require to set the replyToCacheLevelName=CACHE_NONE to work. |
| * Note: If using temporary queues then CACHE_NONE is not allowed, |
| * and you must use a higher value such as CACHE_CONSUMER or CACHE_SESSION. |
| */ |
| @Metadata(label = "producer,advanced", enums = "CACHE_AUTO,CACHE_CONNECTION,CACHE_CONSUMER,CACHE_NONE,CACHE_SESSION", |
| description = "Sets the cache level by name for the reply consumer when doing request/reply over JMS." |
| + " This option only applies when using fixed reply queues (not temporary)." |
| + " Camel will by default use: CACHE_CONSUMER for exclusive or shared w/ replyToSelectorName." |
| + " And CACHE_SESSION for shared without replyToSelectorName. Some JMS brokers such as IBM WebSphere" |
| + " may require to set the replyToCacheLevelName=CACHE_NONE to work." |
| + " Note: If using temporary queues then CACHE_NONE is not allowed," |
| + " and you must use a higher value such as CACHE_CONSUMER or CACHE_SESSION.") |
| public void setReplyToCacheLevelName(String cacheName) { |
| getConfiguration().setReplyToCacheLevelName(cacheName); |
| } |
| |
| /** |
| * Sets the JMS client ID to use. Note that this value, if specified, must be unique and can only be used by a single JMS connection instance. |
| * It is typically only required for durable topic subscriptions. |
| * <p/> |
| * If using Apache ActiveMQ you may prefer to use Virtual Topics instead. |
| */ |
| @Metadata(description = "Sets the JMS client ID to use. Note that this value, if specified, must be unique and can only be used by a single JMS connection instance." |
| + " It is typically only required for durable topic subscriptions." |
| + " If using Apache ActiveMQ you may prefer to use Virtual Topics instead.") |
| public void setClientId(String consumerClientId) { |
| getConfiguration().setClientId(consumerClientId); |
| } |
| |
| /** |
| * Specifies the default number of concurrent consumers when consuming from JMS (not for request/reply over JMS). |
| * See also the maxMessagesPerTask option to control dynamic scaling up/down of threads. |
| * <p/> |
| * When doing request/reply over JMS then the option replyToConcurrentConsumers is used to control number |
| * of concurrent consumers on the reply message listener. |
| */ |
| @Metadata(defaultValue = "1", label = "consumer", |
| description = "Specifies the default number of concurrent consumers when consuming from JMS (not for request/reply over JMS)." |
| + " See also the maxMessagesPerTask option to control dynamic scaling up/down of threads." |
| + " When doing request/reply over JMS then the option replyToConcurrentConsumers is used to control number" |
| + " of concurrent consumers on the reply message listener.") |
| public void setConcurrentConsumers(int concurrentConsumers) { |
| getConfiguration().setConcurrentConsumers(concurrentConsumers); |
| } |
| |
| /** |
| * Specifies the default number of concurrent consumers when doing request/reply over JMS. |
| * See also the maxMessagesPerTask option to control dynamic scaling up/down of threads. |
| */ |
| @Metadata(defaultValue = "1", label = "producer", |
| description = "Specifies the default number of concurrent consumers when doing request/reply over JMS." |
| + " See also the maxMessagesPerTask option to control dynamic scaling up/down of threads.") |
| public void setReplyToConcurrentConsumers(int concurrentConsumers) { |
| getConfiguration().setReplyToConcurrentConsumers(concurrentConsumers); |
| } |
| |
| /** |
| * The connection factory to be use. A connection factory must be configured either on the component or endpoint. |
| */ |
| @Metadata(description = "The connection factory to be use. A connection factory must be configured either on the component or endpoint.") |
| public void setConnectionFactory(ConnectionFactory connectionFactory) { |
| getConfiguration().setConnectionFactory(connectionFactory); |
| } |
| |
| /** |
| * Username to use with the ConnectionFactory. You can also configure username/password directly on the ConnectionFactory. |
| */ |
| @Metadata(label = "security", secret = true, description = "Username to use with the ConnectionFactory. You can also configure username/password directly on the ConnectionFactory.") |
| public void setUsername(String username) { |
| getConfiguration().setUsername(username); |
| } |
| |
| /** |
| * Password to use with the ConnectionFactory. You can also configure username/password directly on the ConnectionFactory. |
| */ |
| @Metadata(label = "security", secret = true, description = "Password to use with the ConnectionFactory. You can also configure username/password directly on the ConnectionFactory.") |
| public void setPassword(String password) { |
| getConfiguration().setPassword(password); |
| } |
| |
| /** |
| * Specifies whether persistent delivery is used by default. |
| */ |
| @Metadata(defaultValue = "true", label = "producer", |
| description = "Specifies whether persistent delivery is used by default.") |
| public void setDeliveryPersistent(boolean deliveryPersistent) { |
| getConfiguration().setDeliveryPersistent(deliveryPersistent); |
| } |
| |
| /** |
| * Specifies the delivery mode to be used. Possible values are |
| * Possibles values are those defined by javax.jms.DeliveryMode. |
| * NON_PERSISTENT = 1 and PERSISTENT = 2. |
| */ |
| @Metadata(label = "producer", enums = "1,2", |
| description = "Specifies the delivery mode to be used." |
| + " Possibles values are those defined by javax.jms.DeliveryMode." |
| + " NON_PERSISTENT = 1 and PERSISTENT = 2.") |
| public void setDeliveryMode(Integer deliveryMode) { |
| getConfiguration().setDeliveryMode(deliveryMode); |
| } |
| |
| /** |
| * The durable subscriber name for specifying durable topic subscriptions. The clientId option must be configured as well. |
| */ |
| @Metadata(description = "The durable subscriber name for specifying durable topic subscriptions. The clientId option must be configured as well.") |
| public void setDurableSubscriptionName(String durableSubscriptionName) { |
| getConfiguration().setDurableSubscriptionName(durableSubscriptionName); |
| } |
| |
| /** |
| * Specifies the JMS Exception Listener that is to be notified of any underlying JMS exceptions. |
| */ |
| @Metadata(label = "advanced", |
| description = "Specifies the JMS Exception Listener that is to be notified of any underlying JMS exceptions.") |
| public void setExceptionListener(ExceptionListener exceptionListener) { |
| getConfiguration().setExceptionListener(exceptionListener); |
| } |
| |
| /** |
| * Specifies a org.springframework.util.ErrorHandler to be invoked in case of any uncaught exceptions thrown while processing a Message. |
| * By default these exceptions will be logged at the WARN level, if no errorHandler has been configured. |
| * You can configure logging level and whether stack traces should be logged using errorHandlerLoggingLevel and errorHandlerLogStackTrace options. |
| * This makes it much easier to configure, than having to code a custom errorHandler. |
| */ |
| @Metadata(label = "advanced", |
| description = "Specifies a org.springframework.util.ErrorHandler to be invoked in case of any uncaught exceptions thrown while processing a Message." |
| + " By default these exceptions will be logged at the WARN level, if no errorHandler has been configured." |
| + " You can configure logging level and whether stack traces should be logged using errorHandlerLoggingLevel and errorHandlerLogStackTrace options." |
| + " This makes it much easier to configure, than having to code a custom errorHandler.") |
| public void setErrorHandler(ErrorHandler errorHandler) { |
| getConfiguration().setErrorHandler(errorHandler); |
| } |
| |
| /** |
| * Allows to configure the default errorHandler logging level for logging uncaught exceptions. |
| */ |
| @Metadata(defaultValue = "WARN", label = "consumer,logging", |
| description = "Allows to configure the default errorHandler logging level for logging uncaught exceptions.") |
| public void setErrorHandlerLoggingLevel(LoggingLevel errorHandlerLoggingLevel) { |
| getConfiguration().setErrorHandlerLoggingLevel(errorHandlerLoggingLevel); |
| } |
| |
| /** |
| * Allows to control whether stacktraces should be logged or not, by the default errorHandler. |
| */ |
| @Metadata(defaultValue = "true", label = "consumer,logging", |
| description = "Allows to control whether stacktraces should be logged or not, by the default errorHandler.") |
| public void setErrorHandlerLogStackTrace(boolean errorHandlerLogStackTrace) { |
| getConfiguration().setErrorHandlerLogStackTrace(errorHandlerLogStackTrace); |
| } |
| |
| /** |
| * Set if the deliveryMode, priority or timeToLive qualities of service should be used when sending messages. |
| * This option is based on Spring's JmsTemplate. The deliveryMode, priority and timeToLive options are applied to the current endpoint. |
| * This contrasts with the preserveMessageQos option, which operates at message granularity, |
| * reading QoS properties exclusively from the Camel In message headers. |
| */ |
| @Metadata(label = "producer", defaultValue = "false", |
| description = "Set if the deliveryMode, priority or timeToLive qualities of service should be used when sending messages." |
| + " This option is based on Spring's JmsTemplate. The deliveryMode, priority and timeToLive options are applied to the current endpoint." |
| + " This contrasts with the preserveMessageQos option, which operates at message granularity," |
| + " reading QoS properties exclusively from the Camel In message headers.") |
| public void setExplicitQosEnabled(boolean explicitQosEnabled) { |
| getConfiguration().setExplicitQosEnabled(explicitQosEnabled); |
| } |
| |
| /** |
| * Specifies whether the listener session should be exposed when consuming messages. |
| */ |
| @Metadata(label = "consumer,advanced", |
| description = "Specifies whether the listener session should be exposed when consuming messages.") |
| public void setExposeListenerSession(boolean exposeListenerSession) { |
| getConfiguration().setExposeListenerSession(exposeListenerSession); |
| } |
| |
| /** |
| * Specifies the limit for idle executions of a receive task, not having received any message within its execution. |
| * If this limit is reached, the task will shut down and leave receiving to other executing tasks |
| * (in the case of dynamic scheduling; see the maxConcurrentConsumers setting). |
| * There is additional doc available from Spring. |
| */ |
| @Metadata(defaultValue = "1", label = "advanced", |
| description = "Specifies the limit for idle executions of a receive task, not having received any message within its execution." |
| + " If this limit is reached, the task will shut down and leave receiving to other executing tasks" |
| + " (in the case of dynamic scheduling; see the maxConcurrentConsumers setting)." |
| + " There is additional doc available from Spring.") |
| public void setIdleTaskExecutionLimit(int idleTaskExecutionLimit) { |
| getConfiguration().setIdleTaskExecutionLimit(idleTaskExecutionLimit); |
| } |
| |
| /** |
| * Specify the limit for the number of consumers that are allowed to be idle at any given time. |
| */ |
| @Metadata(defaultValue = "1", label = "advanced", |
| description = "Specify the limit for the number of consumers that are allowed to be idle at any given time.") |
| public void setIdleConsumerLimit(int idleConsumerLimit) { |
| getConfiguration().setIdleConsumerLimit(idleConsumerLimit); |
| } |
| |
| /** |
| * Specifies the maximum number of concurrent consumers when consuming from JMS (not for request/reply over JMS). |
| * See also the maxMessagesPerTask option to control dynamic scaling up/down of threads. |
| * <p/> |
| * When doing request/reply over JMS then the option replyToMaxConcurrentConsumers is used to control number |
| * of concurrent consumers on the reply message listener. |
| */ |
| @Metadata(label = "consumer", |
| description = "Specifies the maximum number of concurrent consumers when consuming from JMS (not for request/reply over JMS)." |
| + " See also the maxMessagesPerTask option to control dynamic scaling up/down of threads." |
| + " When doing request/reply over JMS then the option replyToMaxConcurrentConsumers is used to control number" |
| + " of concurrent consumers on the reply message listener.") |
| public void setMaxConcurrentConsumers(int maxConcurrentConsumers) { |
| getConfiguration().setMaxConcurrentConsumers(maxConcurrentConsumers); |
| } |
| |
| /** |
| * Specifies the maximum number of concurrent consumers when using request/reply over JMS. |
| * See also the maxMessagesPerTask option to control dynamic scaling up/down of threads. |
| */ |
| @Metadata(label = "producer", |
| description = "Specifies the maximum number of concurrent consumers when using request/reply over JMS." |
| + " See also the maxMessagesPerTask option to control dynamic scaling up/down of threads.") |
| public void setReplyToMaxConcurrentConsumers(int maxConcurrentConsumers) { |
| getConfiguration().setReplyToMaxConcurrentConsumers(maxConcurrentConsumers); |
| } |
| |
| /** |
| * Specifies the maximum number of concurrent consumers for continue routing when timeout occurred when using request/reply over JMS. |
| */ |
| @Metadata(label = "producer", defaultValue = "1", |
| description = "Specifies the maximum number of concurrent consumers for continue routing when timeout occurred when using request/reply over JMS.") |
| public void setReplyOnTimeoutToMaxConcurrentConsumers(int maxConcurrentConsumers) { |
| getConfiguration().setReplyToOnTimeoutMaxConcurrentConsumers(maxConcurrentConsumers); |
| } |
| |
| /** |
| * The number of messages per task. -1 is unlimited. |
| * If you use a range for concurrent consumers (eg min < max), then this option can be used to set |
| * a value to eg 100 to control how fast the consumers will shrink when less work is required. |
| */ |
| @Metadata(defaultValue = "-1", label = "advanced", |
| description = "The number of messages per task. -1 is unlimited." |
| + " If you use a range for concurrent consumers (eg min < max), then this option can be used to set" |
| + " a value to eg 100 to control how fast the consumers will shrink when less work is required.") |
| public void setMaxMessagesPerTask(int maxMessagesPerTask) { |
| getConfiguration().setMaxMessagesPerTask(maxMessagesPerTask); |
| } |
| |
| /** |
| * To use a custom Spring org.springframework.jms.support.converter.MessageConverter so you can be in control |
| * how to map to/from a javax.jms.Message. |
| */ |
| @Metadata(label = "advanced", |
| description = "To use a custom Spring org.springframework.jms.support.converter.MessageConverter so you can be in control how to map to/from a javax.jms.Message.") |
| public void setMessageConverter(MessageConverter messageConverter) { |
| getConfiguration().setMessageConverter(messageConverter); |
| } |
| |
| /** |
| * Specifies whether Camel should auto map the received JMS message to a suited payload type, such as javax.jms.TextMessage to a String etc. |
| * See section about how mapping works below for more details. |
| */ |
| @Metadata(defaultValue = "true", label = "advanced", |
| description = "Specifies whether Camel should auto map the received JMS message to a suited payload type, such as javax.jms.TextMessage to a String etc.") |
| public void setMapJmsMessage(boolean mapJmsMessage) { |
| getConfiguration().setMapJmsMessage(mapJmsMessage); |
| } |
| |
| /** |
| * When sending, specifies whether message IDs should be added. This is just an hint to the JMS Broker. |
| * If the JMS provider accepts this hint, these messages must have the message ID set to null; if the provider ignores the hint, the message ID must be set to its normal unique value |
| */ |
| @Metadata(defaultValue = "true", label = "advanced", |
| description = "When sending, specifies whether message IDs should be added. This is just an hint to the JMS broker." |
| + "If the JMS provider accepts this hint, these messages must have the message ID set to null; if the provider ignores the hint, " |
| + "the message ID must be set to its normal unique value") |
| public void setMessageIdEnabled(boolean messageIdEnabled) { |
| getConfiguration().setMessageIdEnabled(messageIdEnabled); |
| } |
| |
| /** |
| * Specifies whether timestamps should be enabled by default on sending messages. |
| */ |
| @Metadata(defaultValue = "true", label = "advanced", |
| description = "Specifies whether timestamps should be enabled by default on sending messages. This is just an hint to the JMS broker." |
| + "If the JMS provider accepts this hint, these messages must have the timestamp set to zero; if the provider ignores the hint " |
| + "the timestamp must be set to its normal value") |
| public void setMessageTimestampEnabled(boolean messageTimestampEnabled) { |
| getConfiguration().setMessageTimestampEnabled(messageTimestampEnabled); |
| } |
| |
| /** |
| * If true, Camel will always make a JMS message copy of the message when it is passed to the producer for sending. |
| * Copying the message is needed in some situations, such as when a replyToDestinationSelectorName is set |
| * (incidentally, Camel will set the alwaysCopyMessage option to true, if a replyToDestinationSelectorName is set) |
| */ |
| @Metadata(label = "producer,advanced", |
| description = "If true, Camel will always make a JMS message copy of the message when it is passed to the producer for sending." |
| + " Copying the message is needed in some situations, such as when a replyToDestinationSelectorName is set" |
| + " (incidentally, Camel will set the alwaysCopyMessage option to true, if a replyToDestinationSelectorName is set)") |
| public void setAlwaysCopyMessage(boolean alwaysCopyMessage) { |
| getConfiguration().setAlwaysCopyMessage(alwaysCopyMessage); |
| } |
| |
| /** |
| * Specifies whether JMSMessageID should always be used as JMSCorrelationID for InOut messages. |
| */ |
| @Metadata(label = "advanced", |
| description = "Specifies whether JMSMessageID should always be used as JMSCorrelationID for InOut messages.") |
| public void setUseMessageIDAsCorrelationID(boolean useMessageIDAsCorrelationID) { |
| getConfiguration().setUseMessageIDAsCorrelationID(useMessageIDAsCorrelationID); |
| } |
| |
| /** |
| * Values greater than 1 specify the message priority when sending (where 0 is the lowest priority and 9 is the highest). |
| * The explicitQosEnabled option must also be enabled in order for this option to have any effect. |
| */ |
| @Metadata(defaultValue = "" + Message.DEFAULT_PRIORITY, enums = "1,2,3,4,5,6,7,8,9", label = "producer", |
| description = "Values greater than 1 specify the message priority when sending (where 0 is the lowest priority and 9 is the highest)." |
| + " The explicitQosEnabled option must also be enabled in order for this option to have any effect.") |
| public void setPriority(int priority) { |
| getConfiguration().setPriority(priority); |
| } |
| |
| /** |
| * Specifies whether to inhibit the delivery of messages published by its own connection. |
| */ |
| @Metadata(label = "advanced", |
| description = "Specifies whether to inhibit the delivery of messages published by its own connection.") |
| public void setPubSubNoLocal(boolean pubSubNoLocal) { |
| getConfiguration().setPubSubNoLocal(pubSubNoLocal); |
| } |
| |
| /** |
| * The timeout for receiving messages (in milliseconds). |
| */ |
| @Metadata(defaultValue = "1000", label = "advanced", |
| description = "The timeout for receiving messages (in milliseconds).") |
| public void setReceiveTimeout(long receiveTimeout) { |
| getConfiguration().setReceiveTimeout(receiveTimeout); |
| } |
| |
| /** |
| * Specifies the interval between recovery attempts, i.e. when a connection is being refreshed, in milliseconds. |
| * The default is 5000 ms, that is, 5 seconds. |
| */ |
| @Metadata(defaultValue = "5000", label = "advanced", |
| description = "Specifies the interval between recovery attempts, i.e. when a connection is being refreshed, in milliseconds." |
| + " The default is 5000 ms, that is, 5 seconds.") |
| public void setRecoveryInterval(long recoveryInterval) { |
| getConfiguration().setRecoveryInterval(recoveryInterval); |
| } |
| |
| /** |
| * Allows you to specify a custom task executor for consuming messages. |
| */ |
| @Metadata(label = "consumer,advanced", |
| description = "Allows you to specify a custom task executor for consuming messages.") |
| public void setTaskExecutor(TaskExecutor taskExecutor) { |
| getConfiguration().setTaskExecutor(taskExecutor); |
| } |
| |
| /** |
| * When sending messages, specifies the time-to-live of the message (in milliseconds). |
| */ |
| @Metadata(defaultValue = "-1", label = "producer", |
| description = "When sending messages, specifies the time-to-live of the message (in milliseconds).") |
| public void setTimeToLive(long timeToLive) { |
| getConfiguration().setTimeToLive(timeToLive); |
| } |
| |
| /** |
| * Specifies whether to use transacted mode |
| */ |
| @Metadata(label = "transaction", |
| description = "Specifies whether to use transacted mode") |
| public void setTransacted(boolean consumerTransacted) { |
| getConfiguration().setTransacted(consumerTransacted); |
| } |
| |
| /** |
| * If true, Camel will create a JmsTransactionManager, if there is no transactionManager injected when option transacted=true. |
| */ |
| @Metadata(defaultValue = "true", label = "transaction,advanced", |
| description = "If true, Camel will create a JmsTransactionManager, if there is no transactionManager injected when option transacted=true.") |
| public void setLazyCreateTransactionManager(boolean lazyCreating) { |
| getConfiguration().setLazyCreateTransactionManager(lazyCreating); |
| } |
| |
| /** |
| * The Spring transaction manager to use. |
| */ |
| @Metadata(label = "transaction,advanced", |
| description = "The Spring transaction manager to use.") |
| public void setTransactionManager(PlatformTransactionManager transactionManager) { |
| getConfiguration().setTransactionManager(transactionManager); |
| } |
| |
| /** |
| * The name of the transaction to use. |
| */ |
| @Metadata(label = "transaction,advanced", |
| description = "The name of the transaction to use.") |
| public void setTransactionName(String transactionName) { |
| getConfiguration().setTransactionName(transactionName); |
| } |
| |
| /** |
| * The timeout value of the transaction (in seconds), if using transacted mode. |
| */ |
| @Metadata(defaultValue = "-1", label = "transaction,advanced", |
| description = "The timeout value of the transaction (in seconds), if using transacted mode.") |
| public void setTransactionTimeout(int transactionTimeout) { |
| getConfiguration().setTransactionTimeout(transactionTimeout); |
| } |
| |
| /** |
| * Specifies whether to test the connection on startup. |
| * This ensures that when Camel starts that all the JMS consumers have a valid connection to the JMS broker. |
| * If a connection cannot be granted then Camel throws an exception on startup. |
| * This ensures that Camel is not started with failed connections. |
| * The JMS producers is tested as well. |
| */ |
| @Metadata(description = "Specifies whether to test the connection on startup." |
| + " This ensures that when Camel starts that all the JMS consumers have a valid connection to the JMS broker." |
| + " If a connection cannot be granted then Camel throws an exception on startup." |
| + " This ensures that Camel is not started with failed connections." |
| + " The JMS producers is tested as well.") |
| public void setTestConnectionOnStartup(boolean testConnectionOnStartup) { |
| getConfiguration().setTestConnectionOnStartup(testConnectionOnStartup); |
| } |
| |
| /** |
| * Whether to startup the JmsConsumer message listener asynchronously, when starting a route. |
| * For example if a JmsConsumer cannot get a connection to a remote JMS broker, then it may block while retrying |
| * and/or failover. This will cause Camel to block while starting routes. By setting this option to true, |
| * you will let routes startup, while the JmsConsumer connects to the JMS broker using a dedicated thread |
| * in asynchronous mode. If this option is used, then beware that if the connection could not be established, |
| * then an exception is logged at WARN level, and the consumer will not be able to receive messages; |
| * You can then restart the route to retry. |
| */ |
| @Metadata(label = "advanced", |
| description = "Whether to startup the JmsConsumer message listener asynchronously, when starting a route." |
| + " For example if a JmsConsumer cannot get a connection to a remote JMS broker, then it may block while retrying" |
| + " and/or failover. This will cause Camel to block while starting routes. By setting this option to true," |
| + " you will let routes startup, while the JmsConsumer connects to the JMS broker using a dedicated thread" |
| + " in asynchronous mode. If this option is used, then beware that if the connection could not be established," |
| + " then an exception is logged at WARN level, and the consumer will not be able to receive messages;" |
| + " You can then restart the route to retry.") |
| public void setAsyncStartListener(boolean asyncStartListener) { |
| getConfiguration().setAsyncStartListener(asyncStartListener); |
| } |
| |
| /** |
| * Whether to stop the JmsConsumer message listener asynchronously, when stopping a route. |
| */ |
| @Metadata(label = "advanced", |
| description = "Whether to stop the JmsConsumer message listener asynchronously, when stopping a route.") |
| public void setAsyncStopListener(boolean asyncStopListener) { |
| getConfiguration().setAsyncStopListener(asyncStopListener); |
| } |
| |
| /** |
| * When using mapJmsMessage=false Camel will create a new JMS message to send to a new JMS destination |
| * if you touch the headers (get or set) during the route. Set this option to true to force Camel to send |
| * the original JMS message that was received. |
| */ |
| @Metadata(label = "producer,advanced", |
| description = "When using mapJmsMessage=false Camel will create a new JMS message to send to a new JMS destination" |
| + " if you touch the headers (get or set) during the route. Set this option to true to force Camel to send" |
| + " the original JMS message that was received.") |
| public void setForceSendOriginalMessage(boolean forceSendOriginalMessage) { |
| getConfiguration().setForceSendOriginalMessage(forceSendOriginalMessage); |
| } |
| |
| /** |
| * The timeout for waiting for a reply when using the InOut Exchange Pattern (in milliseconds). |
| * The default is 20 seconds. You can include the header "CamelJmsRequestTimeout" to override this endpoint configured |
| * timeout value, and thus have per message individual timeout values. |
| * See also the requestTimeoutCheckerInterval option. |
| */ |
| @Metadata(defaultValue = "20000", label = "producer", |
| description = "The timeout for waiting for a reply when using the InOut Exchange Pattern (in milliseconds)." |
| + " The default is 20 seconds. You can include the header \"CamelJmsRequestTimeout\" to override this endpoint configured" |
| + " timeout value, and thus have per message individual timeout values." |
| + " See also the requestTimeoutCheckerInterval option.") |
| public void setRequestTimeout(long requestTimeout) { |
| getConfiguration().setRequestTimeout(requestTimeout); |
| } |
| |
| /** |
| * Configures how often Camel should check for timed out Exchanges when doing request/reply over JMS. |
| * By default Camel checks once per second. But if you must react faster when a timeout occurs, |
| * then you can lower this interval, to check more frequently. The timeout is determined by the option requestTimeout. |
| */ |
| @Metadata(defaultValue = "1000", label = "advanced", |
| description = "Configures how often Camel should check for timed out Exchanges when doing request/reply over JMS." |
| + " By default Camel checks once per second. But if you must react faster when a timeout occurs," |
| + " then you can lower this interval, to check more frequently. The timeout is determined by the option requestTimeout.") |
| public void setRequestTimeoutCheckerInterval(long requestTimeoutCheckerInterval) { |
| getConfiguration().setRequestTimeoutCheckerInterval(requestTimeoutCheckerInterval); |
| } |
| |
| /** |
| * You can transfer the exchange over the wire instead of just the body and headers. |
| * The following fields are transferred: In body, Out body, Fault body, In headers, Out headers, Fault headers, |
| * exchange properties, exchange exception. |
| * This requires that the objects are serializable. Camel will exclude any non-serializable objects and log it at WARN level. |
| * You must enable this option on both the producer and consumer side, so Camel knows the payloads is an Exchange and not a regular payload. |
| */ |
| @Metadata(label = "advanced", |
| description = "You can transfer the exchange over the wire instead of just the body and headers." |
| + " The following fields are transferred: In body, Out body, Fault body, In headers, Out headers, Fault headers," |
| + " exchange properties, exchange exception." |
| + " This requires that the objects are serializable. Camel will exclude any non-serializable objects and log it at WARN level." |
| + " You must enable this option on both the producer and consumer side, so Camel knows the payloads is an Exchange and not a regular payload.") |
| public void setTransferExchange(boolean transferExchange) { |
| getConfiguration().setTransferExchange(transferExchange); |
| } |
| |
| /** |
| * If enabled and you are using Request Reply messaging (InOut) and an Exchange failed on the consumer side, |
| * then the caused Exception will be send back in response as a javax.jms.ObjectMessage. |
| * If the client is Camel, the returned Exception is rethrown. This allows you to use Camel JMS as a bridge |
| * in your routing - for example, using persistent queues to enable robust routing. |
| * Notice that if you also have transferExchange enabled, this option takes precedence. |
| * The caught exception is required to be serializable. |
| * The original Exception on the consumer side can be wrapped in an outer exception |
| * such as org.apache.camel.RuntimeCamelException when returned to the producer. |
| */ |
| @Metadata(label = "advanced", |
| description = "If enabled and you are using Request Reply messaging (InOut) and an Exchange failed on the consumer side," |
| + " then the caused Exception will be send back in response as a javax.jms.ObjectMessage." |
| + " If the client is Camel, the returned Exception is rethrown. This allows you to use Camel JMS as a bridge" |
| + " in your routing - for example, using persistent queues to enable robust routing." |
| + " Notice that if you also have transferExchange enabled, this option takes precedence." |
| + " The caught exception is required to be serializable." |
| + " The original Exception on the consumer side can be wrapped in an outer exception" |
| + " such as org.apache.camel.RuntimeCamelException when returned to the producer.") |
| public void setTransferException(boolean transferException) { |
| getConfiguration().setTransferException(transferException); |
| } |
| |
| /** |
| * If enabled and you are using Request Reply messaging (InOut) and an Exchange failed with a SOAP fault (not exception) on the consumer side, |
| * then the fault flag on {@link org.apache.camel.Message#isFault()} will be send back in the response as a JMS header with the key |
| * {@link JmsConstants#JMS_TRANSFER_FAULT}. |
| * If the client is Camel, the returned fault flag will be set on the {@link org.apache.camel.Message#setFault(boolean)}. |
| * <p/> |
| * You may want to enable this when using Camel components that support faults such as SOAP based such as cxf or spring-ws. |
| */ |
| @Metadata(label = "advanced", |
| description = "If enabled and you are using Request Reply messaging (InOut) and an Exchange failed with a SOAP fault (not exception) on the consumer side," |
| + " then the fault flag on Message#isFault() will be send back in the response as a JMS header with the key" |
| + " org.apache.camel.component.jms.JmsConstants#JMS_TRANSFER_FAULT#JMS_TRANSFER_FAULT." |
| + " If the client is Camel, the returned fault flag will be set on the {@link org.apache.camel.Message#setFault(boolean)}." |
| + " You may want to enable this when using Camel components that support faults such as SOAP based such as cxf or spring-ws.") |
| public void setTransferFault(boolean transferFault) { |
| getConfiguration().setTransferFault(transferFault); |
| } |
| |
| /** |
| * Allows you to use your own implementation of the org.springframework.jms.core.JmsOperations interface. |
| * Camel uses JmsTemplate as default. Can be used for testing purpose, but not used much as stated in the spring API docs. |
| */ |
| @Metadata(label = "advanced", |
| description = "Allows you to use your own implementation of the org.springframework.jms.core.JmsOperations interface." |
| + " Camel uses JmsTemplate as default. Can be used for testing purpose, but not used much as stated in the spring API docs.") |
| public void setJmsOperations(JmsOperations jmsOperations) { |
| getConfiguration().setJmsOperations(jmsOperations); |
| } |
| |
| /** |
| * A pluggable org.springframework.jms.support.destination.DestinationResolver that allows you to use your own resolver |
| * (for example, to lookup the real destination in a JNDI registry). |
| */ |
| @Metadata(label = "advanced", description = "A pluggable org.springframework.jms.support.destination.DestinationResolver that allows you to use your own resolver" |
| + " (for example, to lookup the real destination in a JNDI registry).") |
| public void setDestinationResolver(DestinationResolver destinationResolver) { |
| getConfiguration().setDestinationResolver(destinationResolver); |
| } |
| |
| /** |
| * Allows for explicitly specifying which kind of strategy to use for replyTo queues when doing request/reply over JMS. |
| * Possible values are: Temporary, Shared, or Exclusive. |
| * By default Camel will use temporary queues. However if replyTo has been configured, then Shared is used by default. |
| * This option allows you to use exclusive queues instead of shared ones. |
| * See Camel JMS documentation for more details, and especially the notes about the implications if running in a clustered environment, |
| * and the fact that Shared reply queues has lower performance than its alternatives Temporary and Exclusive. |
| */ |
| @Metadata(label = "producer", |
| description = "Allows for explicitly specifying which kind of strategy to use for replyTo queues when doing request/reply over JMS." |
| + " Possible values are: Temporary, Shared, or Exclusive." |
| + " By default Camel will use temporary queues. However if replyTo has been configured, then Shared is used by default." |
| + " This option allows you to use exclusive queues instead of shared ones." |
| + " See Camel JMS documentation for more details, and especially the notes about the implications if running in a clustered environment," |
| + " and the fact that Shared reply queues has lower performance than its alternatives Temporary and Exclusive.") |
| public void setReplyToType(ReplyToType replyToType) { |
| getConfiguration().setReplyToType(replyToType); |
| } |
| |
| /** |
| * Set to true, if you want to send message using the QoS settings specified on the message, |
| * instead of the QoS settings on the JMS endpoint. The following three headers are considered JMSPriority, JMSDeliveryMode, |
| * and JMSExpiration. You can provide all or only some of them. If not provided, Camel will fall back to use the |
| * values from the endpoint instead. So, when using this option, the headers override the values from the endpoint. |
| * The explicitQosEnabled option, by contrast, will only use options set on the endpoint, and not values from the message header. |
| */ |
| @Metadata(label = "producer", |
| description = "Set to true, if you want to send message using the QoS settings specified on the message," |
| + " instead of the QoS settings on the JMS endpoint. The following three headers are considered JMSPriority, JMSDeliveryMode," |
| + " and JMSExpiration. You can provide all or only some of them. If not provided, Camel will fall back to use the" |
| + " values from the endpoint instead. So, when using this option, the headers override the values from the endpoint." |
| + " The explicitQosEnabled option, by contrast, will only use options set on the endpoint, and not values from the message header.") |
| public void setPreserveMessageQos(boolean preserveMessageQos) { |
| getConfiguration().setPreserveMessageQos(preserveMessageQos); |
| } |
| |
| /** |
| * Whether the JmsConsumer processes the Exchange asynchronously. |
| * If enabled then the JmsConsumer may pickup the next message from the JMS queue, |
| * while the previous message is being processed asynchronously (by the Asynchronous Routing Engine). |
| * This means that messages may be processed not 100% strictly in order. If disabled (as default) |
| * then the Exchange is fully processed before the JmsConsumer will pickup the next message from the JMS queue. |
| * Note if transacted has been enabled, then asyncConsumer=true does not run asynchronously, as transaction |
| * must be executed synchronously (Camel 3.0 may support async transactions). |
| */ |
| @Metadata(label = "consumer", |
| description = "Whether the JmsConsumer processes the Exchange asynchronously." |
| + " If enabled then the JmsConsumer may pickup the next message from the JMS queue," |
| + " while the previous message is being processed asynchronously (by the Asynchronous Routing Engine)." |
| + " This means that messages may be processed not 100% strictly in order. If disabled (as default)" |
| + " then the Exchange is fully processed before the JmsConsumer will pickup the next message from the JMS queue." |
| + " Note if transacted has been enabled, then asyncConsumer=true does not run asynchronously, as transaction" |
| + " must be executed synchronously (Camel 3.0 may support async transactions).") |
| public void setAsyncConsumer(boolean asyncConsumer) { |
| getConfiguration().setAsyncConsumer(asyncConsumer); |
| } |
| |
| /** |
| * Whether to allow sending messages with no body. If this option is false and the message body is null, then an JMSException is thrown. |
| */ |
| @Metadata(defaultValue = "true", label = "producer,advanced", |
| description = "Whether to allow sending messages with no body. If this option is false and the message body is null, then an JMSException is thrown.") |
| public void setAllowNullBody(boolean allowNullBody) { |
| getConfiguration().setAllowNullBody(allowNullBody); |
| } |
| |
| /** |
| * Only applicable when sending to JMS destination using InOnly (eg fire and forget). |
| * Enabling this option will enrich the Camel Exchange with the actual JMSMessageID |
| * that was used by the JMS client when the message was sent to the JMS destination. |
| */ |
| @Metadata(label = "producer,advanced", |
| description = "Only applicable when sending to JMS destination using InOnly (eg fire and forget)." |
| + " Enabling this option will enrich the Camel Exchange with the actual JMSMessageID" |
| + " that was used by the JMS client when the message was sent to the JMS destination.") |
| public void setIncludeSentJMSMessageID(boolean includeSentJMSMessageID) { |
| getConfiguration().setIncludeSentJMSMessageID(includeSentJMSMessageID); |
| } |
| |
| /** |
| * Whether to include all JMSXxxx properties when mapping from JMS to Camel Message. |
| * Setting this to true will include properties such as JMSXAppID, and JMSXUserID etc. |
| * Note: If you are using a custom headerFilterStrategy then this option does not apply. |
| */ |
| @Metadata(label = "advanced", |
| description = "Whether to include all JMSXxxx properties when mapping from JMS to Camel Message." |
| + " Setting this to true will include properties such as JMSXAppID, and JMSXUserID etc." |
| + " Note: If you are using a custom headerFilterStrategy then this option does not apply.") |
| public void setIncludeAllJMSXProperties(boolean includeAllJMSXProperties) { |
| getConfiguration().setIncludeAllJMSXProperties(includeAllJMSXProperties); |
| } |
| |
| /** |
| * Specifies what default TaskExecutor type to use in the DefaultMessageListenerContainer, |
| * for both consumer endpoints and the ReplyTo consumer of producer endpoints. |
| * Possible values: SimpleAsync (uses Spring's SimpleAsyncTaskExecutor) or ThreadPool |
| * (uses Spring's ThreadPoolTaskExecutor with optimal values - cached threadpool-like). |
| * If not set, it defaults to the previous behaviour, which uses a cached thread pool |
| * for consumer endpoints and SimpleAsync for reply consumers. |
| * The use of ThreadPool is recommended to reduce "thread trash" in elastic configurations |
| * with dynamically increasing and decreasing concurrent consumers. |
| */ |
| @Metadata(label = "consumer,advanced", |
| description = "Specifies what default TaskExecutor type to use in the DefaultMessageListenerContainer," |
| + " for both consumer endpoints and the ReplyTo consumer of producer endpoints." |
| + " Possible values: SimpleAsync (uses Spring's SimpleAsyncTaskExecutor) or ThreadPool" |
| + " (uses Spring's ThreadPoolTaskExecutor with optimal values - cached threadpool-like)." |
| + " If not set, it defaults to the previous behaviour, which uses a cached thread pool" |
| + " for consumer endpoints and SimpleAsync for reply consumers." |
| + " The use of ThreadPool is recommended to reduce thread trash in elastic configurations" |
| + " with dynamically increasing and decreasing concurrent consumers.") |
| public void setDefaultTaskExecutorType(DefaultTaskExecutorType type) { |
| getConfiguration().setDefaultTaskExecutorType(type); |
| } |
| |
| /** |
| * Pluggable strategy for encoding and decoding JMS keys so they can be compliant with the JMS specification. |
| * Camel provides two implementations out of the box: default and passthrough. |
| * The default strategy will safely marshal dots and hyphens (. and -). The passthrough strategy leaves the key as is. |
| * Can be used for JMS brokers which do not care whether JMS header keys contain illegal characters. |
| * You can provide your own implementation of the org.apache.camel.component.jms.JmsKeyFormatStrategy |
| * and refer to it using the # notation. |
| */ |
| @Metadata(label = "advanced", |
| description = "Pluggable strategy for encoding and decoding JMS keys so they can be compliant with the JMS specification." |
| + " Camel provides two implementations out of the box: default and passthrough." |
| + " The default strategy will safely marshal dots and hyphens (. and -). The passthrough strategy leaves the key as is." |
| + " Can be used for JMS brokers which do not care whether JMS header keys contain illegal characters." |
| + " You can provide your own implementation of the org.apache.camel.component.jms.JmsKeyFormatStrategy" |
| + " and refer to it using the # notation.") |
| public void setJmsKeyFormatStrategy(JmsKeyFormatStrategy jmsKeyFormatStrategy) { |
| getConfiguration().setJmsKeyFormatStrategy(jmsKeyFormatStrategy); |
| } |
| |
| /** |
| * Pluggable strategy for encoding and decoding JMS keys so they can be compliant with the JMS specification. |
| * Camel provides two implementations out of the box: default and passthrough. |
| * The default strategy will safely marshal dots and hyphens (. and -). The passthrough strategy leaves the key as is. |
| * Can be used for JMS brokers which do not care whether JMS header keys contain illegal characters. |
| * You can provide your own implementation of the org.apache.camel.component.jms.JmsKeyFormatStrategy |
| * and refer to it using the # notation. |
| */ |
| public void setJmsKeyFormatStrategy(String jmsKeyFormatStrategyName) { |
| // allow to configure a standard by its name, which is simpler |
| JmsKeyFormatStrategy strategy = resolveStandardJmsKeyFormatStrategy(jmsKeyFormatStrategyName); |
| if (strategy == null) { |
| throw new IllegalArgumentException("JmsKeyFormatStrategy with name " + jmsKeyFormatStrategyName + " is not a standard supported name"); |
| } else { |
| getConfiguration().setJmsKeyFormatStrategy(strategy); |
| } |
| } |
| |
| /** |
| * This option is used to allow additional headers which may have values that are invalid according to JMS specification. |
| + For example some message systems such as WMQ do this with header names using prefix JMS_IBM_MQMD_ containing values with byte array or other invalid types. |
| + You can specify multiple header names separated by comma, and use * as suffix for wildcard matching. |
| */ |
| @Metadata(label = "producer,advanced", |
| description = "This option is used to allow additional headers which may have values that are invalid according to JMS specification." |
| + " For example some message systems such as WMQ do this with header names using prefix JMS_IBM_MQMD_ containing values with byte array or other invalid types." |
| + " You can specify multiple header names separated by comma, and use * as suffix for wildcard matching.") |
| public void setAllowAdditionalHeaders(String allowAdditionalHeaders) { |
| getConfiguration().setAllowAdditionalHeaders(allowAdditionalHeaders); |
| } |
| |
| /** |
| * Sets the Spring ApplicationContext to use |
| */ |
| public void setApplicationContext(ApplicationContext applicationContext) throws BeansException { |
| this.applicationContext = applicationContext; |
| } |
| |
| public QueueBrowseStrategy getQueueBrowseStrategy() { |
| if (queueBrowseStrategy == null) { |
| queueBrowseStrategy = new DefaultQueueBrowseStrategy(); |
| } |
| return queueBrowseStrategy; |
| } |
| |
| /** |
| * To use a custom QueueBrowseStrategy when browsing queues |
| */ |
| public void setQueueBrowseStrategy(QueueBrowseStrategy queueBrowseStrategy) { |
| this.queueBrowseStrategy = queueBrowseStrategy; |
| } |
| |
| public MessageCreatedStrategy getMessageCreatedStrategy() { |
| return messageCreatedStrategy; |
| } |
| |
| /** |
| * To use the given MessageCreatedStrategy which are invoked when Camel creates new instances of <tt>javax.jms.Message</tt> |
| * objects when Camel is sending a JMS message. |
| */ |
| public void setMessageCreatedStrategy(MessageCreatedStrategy messageCreatedStrategy) { |
| this.messageCreatedStrategy = messageCreatedStrategy; |
| } |
| |
| public int getWaitForProvisionCorrelationToBeUpdatedCounter() { |
| return getConfiguration().getWaitForProvisionCorrelationToBeUpdatedCounter(); |
| } |
| |
| /** |
| * Number of times to wait for provisional correlation id to be updated to the actual correlation id when doing request/reply over JMS |
| * and when the option useMessageIDAsCorrelationID is enabled. |
| */ |
| @Metadata(defaultValue = "50", label = "advanced", |
| description = "Number of times to wait for provisional correlation id to be updated to the actual correlation id when doing request/reply over JMS" |
| + " and when the option useMessageIDAsCorrelationID is enabled.") |
| public void setWaitForProvisionCorrelationToBeUpdatedCounter(int counter) { |
| getConfiguration().setWaitForProvisionCorrelationToBeUpdatedCounter(counter); |
| } |
| |
| public long getWaitForProvisionCorrelationToBeUpdatedThreadSleepingTime() { |
| return getConfiguration().getWaitForProvisionCorrelationToBeUpdatedThreadSleepingTime(); |
| } |
| |
| /** |
| * Interval in millis to sleep each time while waiting for provisional correlation id to be updated. |
| */ |
| @Metadata(defaultValue = "100", label = "advanced", |
| description = "Interval in millis to sleep each time while waiting for provisional correlation id to be updated.") |
| public void setWaitForProvisionCorrelationToBeUpdatedThreadSleepingTime(long sleepingTime) { |
| getConfiguration().setWaitForProvisionCorrelationToBeUpdatedThreadSleepingTime(sleepingTime); |
| } |
| |
| /** |
| * Use this JMS property to correlate messages in InOut exchange pattern (request-reply) |
| * instead of JMSCorrelationID property. This allows you to exchange messages with |
| * systems that do not correlate messages using JMSCorrelationID JMS property. If used |
| * JMSCorrelationID will not be used or set by Camel. The value of here named property |
| * will be generated if not supplied in the header of the message under the same name. |
| */ |
| @Metadata(label = "producer,advanced", |
| description = "Use this JMS property to correlate messages in InOut exchange pattern (request-reply)" |
| + " instead of JMSCorrelationID property. This allows you to exchange messages with" |
| + " systems that do not correlate messages using JMSCorrelationID JMS property. If used" |
| + " JMSCorrelationID will not be used or set by Camel. The value of here named property" |
| + " will be generated if not supplied in the header of the message under the same name.") |
| public void setCorrelationProperty(final String correlationProperty) { |
| getConfiguration().setCorrelationProperty(correlationProperty); |
| } |
| |
| // JMS 2.0 API |
| // ------------------------------------------------------------------------- |
| |
| public boolean isSubscriptionDurable() { |
| return getConfiguration().isSubscriptionDurable(); |
| } |
| |
| /** |
| * Set whether to make the subscription durable. The durable subscription name |
| * to be used can be specified through the "subscriptionName" property. |
| * <p>Default is "false". Set this to "true" to register a durable subscription, |
| * typically in combination with a "subscriptionName" value (unless |
| * your message listener class name is good enough as subscription name). |
| * <p>Only makes sense when listening to a topic (pub-sub domain), |
| * therefore this method switches the "pubSubDomain" flag as well. |
| */ |
| @Metadata(label = "consumer", description = "Set whether to make the subscription durable. The durable subscription name" |
| + " to be used can be specified through the subscriptionName property." |
| + " Default is false. Set this to true to register a durable subscription," |
| + " typically in combination with a subscriptionName value (unless" |
| + " your message listener class name is good enough as subscription name)." |
| + " Only makes sense when listening to a topic (pub-sub domain)," |
| + " therefore this method switches the pubSubDomain flag as well.") |
| public void setSubscriptionDurable(boolean subscriptionDurable) { |
| getConfiguration().setSubscriptionDurable(subscriptionDurable); |
| } |
| |
| public boolean isSubscriptionShared() { |
| return getConfiguration().isSubscriptionShared(); |
| } |
| |
| /** |
| * Set whether to make the subscription shared. The shared subscription name |
| * to be used can be specified through the "subscriptionName" property. |
| * <p>Default is "false". Set this to "true" to register a shared subscription, |
| * typically in combination with a "subscriptionName" value (unless |
| * your message listener class name is good enough as subscription name). |
| * Note that shared subscriptions may also be durable, so this flag can |
| * (and often will) be combined with "subscriptionDurable" as well. |
| * <p>Only makes sense when listening to a topic (pub-sub domain), |
| * therefore this method switches the "pubSubDomain" flag as well. |
| * <p><b>Requires a JMS 2.0 compatible message broker.</b> |
| */ |
| @Metadata(label = "consumer", description = "Set whether to make the subscription shared. The shared subscription name" |
| + " to be used can be specified through the subscriptionName property." |
| + " Default is false. Set this to true to register a shared subscription," |
| + " typically in combination with a subscriptionName value (unless" |
| + " your message listener class name is good enough as subscription name)." |
| + " Note that shared subscriptions may also be durable, so this flag can" |
| + " (and often will) be combined with subscriptionDurable as well." |
| + " Only makes sense when listening to a topic (pub-sub domain)," |
| + " therefore this method switches the pubSubDomain flag as well." |
| + " Requires a JMS 2.0 compatible message broker.") |
| public void setSubscriptionShared(boolean subscriptionShared) { |
| getConfiguration().setSubscriptionShared(subscriptionShared); |
| } |
| |
| public String getSubscriptionName() { |
| return getConfiguration().getSubscriptionName(); |
| } |
| |
| /** |
| * Set the name of a subscription to create. To be applied in case |
| * of a topic (pub-sub domain) with a shared or durable subscription. |
| * <p>The subscription name needs to be unique within this client's |
| * JMS client id. Default is the class name of the specified message listener. |
| * <p>Note: Only 1 concurrent consumer (which is the default of this |
| * message listener container) is allowed for each subscription, |
| * except for a shared subscription (which requires JMS 2.0). |
| */ |
| @Metadata(label = "consumer", description = "Set the name of a subscription to create. To be applied in case" |
| + " of a topic (pub-sub domain) with a shared or durable subscription." |
| + " The subscription name needs to be unique within this client's" |
| + " JMS client id. Default is the class name of the specified message listener." |
| + " Note: Only 1 concurrent consumer (which is the default of this" |
| + " message listener container) is allowed for each subscription," |
| + " except for a shared subscription (which requires JMS 2.0).") |
| public void setSubscriptionName(String subscriptionName) { |
| getConfiguration().setSubscriptionName(subscriptionName); |
| } |
| |
| |
| public boolean isStreamMessageTypeEnabled() { |
| return getConfiguration().isStreamMessageTypeEnabled(); |
| } |
| |
| /** |
| * Sets whether StreamMessage type is enabled or not. |
| * Message payloads of streaming kind such as files, InputStream, etc will either by sent as BytesMessage or StreamMessage. |
| * This option controls which kind will be used. By default BytesMessage is used which enforces the entire message payload to be read into memory. |
| * By enabling this option the message payload is read into memory in chunks and each chunk is then written to the StreamMessage until no more data. |
| */ |
| @Metadata(label = "producer,advanced", description = "Sets whether StreamMessage type is enabled or not." |
| + " Message payloads of streaming kind such as files, InputStream, etc will either by sent as BytesMessage or StreamMessage." |
| + " This option controls which kind will be used. By default BytesMessage is used which enforces the entire message payload to be read into memory." |
| + " By enabling this option the message payload is read into memory in chunks and each chunk is then written to the StreamMessage until no more data.") |
| public void setStreamMessageTypeEnabled(boolean streamMessageTypeEnabled) { |
| getConfiguration().setStreamMessageTypeEnabled(streamMessageTypeEnabled); |
| } |
| |
| /** |
| * Gets whether date headers should be formatted according to the ISO 8601 |
| * standard. |
| */ |
| public boolean isFormatDateHeadersToIso8601() { |
| return getConfiguration().isFormatDateHeadersToIso8601(); |
| } |
| |
| /** |
| * Sets whether date headers should be formatted according to the ISO 8601 |
| * standard. |
| */ |
| @Metadata(label = "producer", description = "Sets whether date headers should be formatted according to the ISO 8601 standard.") |
| public void setFormatDateHeadersToIso8601(boolean formatDateHeadersToIso8601) { |
| getConfiguration().setFormatDateHeadersToIso8601(formatDateHeadersToIso8601); |
| } |
| |
| // Implementation methods |
| // ------------------------------------------------------------------------- |
| |
| @Override |
| protected void doStart() throws Exception { |
| if (getHeaderFilterStrategy() == null) { |
| setHeaderFilterStrategy(new JmsHeaderFilterStrategy(getConfiguration().isIncludeAllJMSXProperties())); |
| } |
| } |
| |
| @Override |
| protected void doShutdown() throws Exception { |
| if (asyncStartStopExecutorService != null) { |
| getCamelContext().getExecutorServiceManager().shutdownNow(asyncStartStopExecutorService); |
| asyncStartStopExecutorService = null; |
| } |
| super.doShutdown(); |
| } |
| |
| protected synchronized ExecutorService getAsyncStartStopExecutorService() { |
| if (asyncStartStopExecutorService == null) { |
| // use a cached thread pool for async start tasks as they can run for a while, and we need a dedicated thread |
| // for each task, and the thread pool will shrink when no more tasks running |
| asyncStartStopExecutorService = getCamelContext().getExecutorServiceManager().newCachedThreadPool(this, "AsyncStartStopListener"); |
| } |
| return asyncStartStopExecutorService; |
| } |
| |
| @Override |
| protected Endpoint createEndpoint(String uri, String remaining, Map<String, Object> parameters) |
| throws Exception { |
| |
| boolean pubSubDomain = false; |
| boolean tempDestination = false; |
| |
| if (ObjectHelper.isNotEmpty(remaining)) { |
| if (remaining.startsWith(JmsConfiguration.QUEUE_PREFIX)) { |
| pubSubDomain = false; |
| remaining = removeStartingCharacters(remaining.substring(JmsConfiguration.QUEUE_PREFIX.length()), '/'); |
| } else if (remaining.startsWith(JmsConfiguration.TOPIC_PREFIX)) { |
| pubSubDomain = true; |
| remaining = removeStartingCharacters(remaining.substring(JmsConfiguration.TOPIC_PREFIX.length()), '/'); |
| } else if (remaining.startsWith(JmsConfiguration.TEMP_QUEUE_PREFIX)) { |
| pubSubDomain = false; |
| tempDestination = true; |
| remaining = removeStartingCharacters(remaining.substring(JmsConfiguration.TEMP_QUEUE_PREFIX.length()), '/'); |
| } else if (remaining.startsWith(JmsConfiguration.TEMP_TOPIC_PREFIX)) { |
| pubSubDomain = true; |
| tempDestination = true; |
| remaining = removeStartingCharacters(remaining.substring(JmsConfiguration.TEMP_TOPIC_PREFIX.length()), '/'); |
| } |
| } |
| |
| final String subject = convertPathToActualDestination(remaining, parameters); |
| |
| // lets make sure we copy the configuration as each endpoint can |
| // customize its own version |
| JmsConfiguration newConfiguration = getConfiguration().copy(); |
| JmsEndpoint endpoint; |
| if (pubSubDomain) { |
| if (tempDestination) { |
| endpoint = createTemporaryTopicEndpoint(uri, this, subject, newConfiguration); |
| } else { |
| endpoint = createTopicEndpoint(uri, this, subject, newConfiguration); |
| } |
| } else { |
| QueueBrowseStrategy strategy = getQueueBrowseStrategy(); |
| if (tempDestination) { |
| endpoint = createTemporaryQueueEndpoint(uri, this, subject, newConfiguration, strategy); |
| } else { |
| endpoint = createQueueEndpoint(uri, this, subject, newConfiguration, strategy); |
| } |
| } |
| |
| // resolve any custom connection factory first |
| ConnectionFactory cf = resolveAndRemoveReferenceParameter(parameters, "connectionFactory", ConnectionFactory.class); |
| if (cf != null) { |
| endpoint.getConfiguration().setConnectionFactory(cf); |
| } |
| |
| // if username or password provided then wrap the connection factory |
| String cfUsername = getAndRemoveParameter(parameters, "username", String.class, getConfiguration().getUsername()); |
| String cfPassword = getAndRemoveParameter(parameters, "password", String.class, getConfiguration().getPassword()); |
| if (cfUsername != null && cfPassword != null) { |
| cf = endpoint.getConfiguration().getConnectionFactory(); |
| ObjectHelper.notNull(cf, "ConnectionFactory"); |
| LOG.debug("Wrapping existing ConnectionFactory with UserCredentialsConnectionFactoryAdapter using username: {} and password: ******", cfUsername); |
| UserCredentialsConnectionFactoryAdapter ucfa = new UserCredentialsConnectionFactoryAdapter(); |
| ucfa.setTargetConnectionFactory(cf); |
| ucfa.setPassword(cfPassword); |
| ucfa.setUsername(cfUsername); |
| endpoint.getConfiguration().setConnectionFactory(ucfa); |
| } else { |
| // if only username or password was provided then fail |
| if (cfUsername != null || cfPassword != null) { |
| if (cfUsername == null) { |
| throw new IllegalArgumentException("Username must also be provided when using username/password as credentials."); |
| } else { |
| throw new IllegalArgumentException("Password must also be provided when using username/password as credentials."); |
| } |
| } |
| } |
| |
| // jms header strategy |
| String strategyVal = getAndRemoveParameter(parameters, KEY_FORMAT_STRATEGY_PARAM, String.class); |
| JmsKeyFormatStrategy strategy = resolveStandardJmsKeyFormatStrategy(strategyVal); |
| if (strategy != null) { |
| endpoint.setJmsKeyFormatStrategy(strategy); |
| } else { |
| // its not a standard, but a reference |
| parameters.put(KEY_FORMAT_STRATEGY_PARAM, strategyVal); |
| endpoint.setJmsKeyFormatStrategy(resolveAndRemoveReferenceParameter( |
| parameters, KEY_FORMAT_STRATEGY_PARAM, JmsKeyFormatStrategy.class)); |
| } |
| |
| MessageListenerContainerFactory messageListenerContainerFactory = resolveAndRemoveReferenceParameter(parameters, |
| "messageListenerContainerFactoryRef", MessageListenerContainerFactory.class); |
| if (messageListenerContainerFactory == null) { |
| messageListenerContainerFactory = resolveAndRemoveReferenceParameter(parameters, |
| "messageListenerContainerFactory", MessageListenerContainerFactory.class); |
| } |
| if (messageListenerContainerFactory != null) { |
| endpoint.setMessageListenerContainerFactory(messageListenerContainerFactory); |
| } |
| |
| endpoint.setHeaderFilterStrategy(getHeaderFilterStrategy()); |
| setProperties(endpoint.getConfiguration(), parameters); |
| |
| return endpoint; |
| } |
| |
| protected JmsEndpoint createTemporaryTopicEndpoint(String uri, JmsComponent component, String subject, JmsConfiguration configuration) { |
| return new JmsTemporaryTopicEndpoint(uri, component, subject, configuration); |
| } |
| |
| protected JmsEndpoint createTopicEndpoint(String uri, JmsComponent component, String subject, JmsConfiguration configuration) { |
| return new JmsEndpoint(uri, component, subject, true, configuration); |
| } |
| |
| protected JmsEndpoint createTemporaryQueueEndpoint(String uri, JmsComponent component, String subject, JmsConfiguration configuration, QueueBrowseStrategy queueBrowseStrategy) { |
| return new JmsTemporaryQueueEndpoint(uri, component, subject, configuration, queueBrowseStrategy); |
| } |
| |
| protected JmsEndpoint createQueueEndpoint(String uri, JmsComponent component, String subject, JmsConfiguration configuration, QueueBrowseStrategy queueBrowseStrategy) { |
| return new JmsQueueEndpoint(uri, component, subject, configuration, queueBrowseStrategy); |
| } |
| |
| /** |
| * Resolves the standard supported {@link JmsKeyFormatStrategy} by a name which can be: |
| * <ul> |
| * <li>default - to use the default strategy</li> |
| * <li>passthrough - to use the passthrough strategy</li> |
| * </ul> |
| * |
| * @param name the name |
| * @return the strategy, or <tt>null</tt> if not a standard name. |
| */ |
| private static JmsKeyFormatStrategy resolveStandardJmsKeyFormatStrategy(String name) { |
| if ("default".equalsIgnoreCase(name)) { |
| return new DefaultJmsKeyFormatStrategy(); |
| } else if ("passthrough".equalsIgnoreCase(name)) { |
| return new PassThroughJmsKeyFormatStrategy(); |
| } else { |
| return null; |
| } |
| } |
| |
| /** |
| * A strategy method allowing the URI destination to be translated into the |
| * actual JMS destination name (say by looking up in JNDI or something) |
| */ |
| protected String convertPathToActualDestination(String path, Map<String, Object> parameters) { |
| return path; |
| } |
| |
| /** |
| * Factory method to create the default configuration instance |
| * |
| * @return a newly created configuration object which can then be further |
| * customized |
| */ |
| protected JmsConfiguration createConfiguration() { |
| return new JmsConfiguration(); |
| } |
| |
| } |