/**
 * Licensed to the Apache Software Foundation (ASF) under one or more
 * contributor license agreements.  See the NOTICE file distributed with
 * this work for additional information regarding copyright ownership.
 * The ASF licenses this file to You under the Apache License, Version 2.0
 * (the "License"); you may not use this file except in compliance with
 * the License.  You may obtain a copy of the License at
 *
 *      http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */
package org.apache.camel.component.jms;

import java.util.Map;
import java.util.concurrent.ExecutorService;
import javax.jms.ConnectionFactory;
import javax.jms.ExceptionListener;
import javax.jms.Message;
import javax.jms.Session;

import org.apache.camel.CamelContext;
import org.apache.camel.Endpoint;
import org.apache.camel.LoggingLevel;
import org.apache.camel.impl.HeaderFilterStrategyComponent;
import org.apache.camel.spi.Metadata;
import org.apache.camel.util.ObjectHelper;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.BeansException;
import org.springframework.context.ApplicationContext;
import org.springframework.context.ApplicationContextAware;
import org.springframework.core.task.TaskExecutor;
import org.springframework.jms.connection.JmsTransactionManager;
import org.springframework.jms.connection.UserCredentialsConnectionFactoryAdapter;
import org.springframework.jms.core.JmsOperations;
import org.springframework.jms.support.converter.MessageConverter;
import org.springframework.jms.support.destination.DestinationResolver;
import org.springframework.transaction.PlatformTransactionManager;
import org.springframework.util.ErrorHandler;

import static org.apache.camel.util.StringHelper.removeStartingCharacters;

/**
 * A <a href="http://activemq.apache.org/jms.html">JMS Component</a>
 *
 * @version 
 */
public class JmsComponent extends HeaderFilterStrategyComponent implements ApplicationContextAware {

    private static final Logger LOG = LoggerFactory.getLogger(JmsComponent.class);

    private static final String KEY_FORMAT_STRATEGY_PARAM = "jmsKeyFormatStrategy";

    private ExecutorService asyncStartStopExecutorService;
    private ApplicationContext applicationContext;

    @Metadata(label = "advanced", description = "To use a shared JMS configuration")
    private JmsConfiguration configuration;
    @Metadata(label = "advanced", description = "To use a custom QueueBrowseStrategy when browsing queues")
    private QueueBrowseStrategy queueBrowseStrategy;
    @Metadata(label = "advanced", description = "To use the given MessageCreatedStrategy which are invoked when Camel creates new instances"
            + " of javax.jms.Message objects when Camel is sending a JMS message.")
    private MessageCreatedStrategy messageCreatedStrategy;

    public JmsComponent() {
        super(JmsEndpoint.class);
    }

    public JmsComponent(Class<? extends Endpoint> endpointClass) {
        super(endpointClass);
    }

    public JmsComponent(CamelContext context) {
        super(context, JmsEndpoint.class);
    }

    public JmsComponent(CamelContext context, Class<? extends Endpoint> endpointClass) {
        super(context, endpointClass);
    }

    public JmsComponent(JmsConfiguration configuration) {
        this();
        this.configuration = configuration;
    }

    /**
     * Static builder method
     */
    public static JmsComponent jmsComponent() {
        return new JmsComponent();
    }

    /**
     * Static builder method
     */
    public static JmsComponent jmsComponent(JmsConfiguration configuration) {
        return new JmsComponent(configuration);
    }

    /**
     * Static builder method
     */
    public static JmsComponent jmsComponent(ConnectionFactory connectionFactory) {
        return jmsComponent(new JmsConfiguration(connectionFactory));
    }

    /**
     * Static builder method
     */
    public static JmsComponent jmsComponentClientAcknowledge(ConnectionFactory connectionFactory) {
        JmsConfiguration template = new JmsConfiguration(connectionFactory);
        template.setAcknowledgementMode(Session.CLIENT_ACKNOWLEDGE);
        return jmsComponent(template);
    }

    /**
     * Static builder method
     */
    public static JmsComponent jmsComponentAutoAcknowledge(ConnectionFactory connectionFactory) {
        JmsConfiguration template = new JmsConfiguration(connectionFactory);
        template.setAcknowledgementMode(Session.AUTO_ACKNOWLEDGE);
        return jmsComponent(template);
    }

    public static JmsComponent jmsComponentTransacted(ConnectionFactory connectionFactory) {
        JmsTransactionManager transactionManager = new JmsTransactionManager();
        transactionManager.setConnectionFactory(connectionFactory);
        return jmsComponentTransacted(connectionFactory, transactionManager);
    }

    @SuppressWarnings("deprecation")
    public static JmsComponent jmsComponentTransacted(ConnectionFactory connectionFactory,
                                                      PlatformTransactionManager transactionManager) {
        JmsConfiguration template = new JmsConfiguration(connectionFactory);
        template.setTransactionManager(transactionManager);
        template.setTransacted(true);
        template.setTransactedInOut(true);
        return jmsComponent(template);
    }

    // Properties
    // -------------------------------------------------------------------------

    public JmsConfiguration getConfiguration() {
        if (configuration == null) {
            configuration = createConfiguration();

            // If we are being configured with spring...
            if (applicationContext != null) {

                if (isAllowAutoWiredConnectionFactory()) {
                    Map<String, ConnectionFactory> beansOfTypeConnectionFactory = applicationContext.getBeansOfType(ConnectionFactory.class);
                    if (!beansOfTypeConnectionFactory.isEmpty()) {
                        ConnectionFactory cf = beansOfTypeConnectionFactory.values().iterator().next();
                        configuration.setConnectionFactory(cf);
                    }
                }

                if (isAllowAutoWiredDestinationResolver()) {
                    Map<String, DestinationResolver> beansOfTypeDestinationResolver = applicationContext.getBeansOfType(DestinationResolver.class);
                    if (!beansOfTypeDestinationResolver.isEmpty()) {
                        DestinationResolver destinationResolver = beansOfTypeDestinationResolver.values().iterator().next();
                        configuration.setDestinationResolver(destinationResolver);
                    }
                }
            }
        }
        return configuration;
    }

    /**
     * Subclasses can override to prevent the jms configuration from being
     * setup to use an auto-wired the connection factory that's found in the spring
     * application context.
     *
     * @return true by default
     */
    public boolean isAllowAutoWiredConnectionFactory() {
        return true;
    }

    /**
     * Subclasses can override to prevent the jms configuration from being
     * setup to use an auto-wired the destination resolved that's found in the spring
     * application context.
     *
     * @return true by default
     */
    public boolean isAllowAutoWiredDestinationResolver() {
        return true;
    }

    /**
     * To use a shared JMS configuration
     */
    public void setConfiguration(JmsConfiguration configuration) {
        this.configuration = configuration;
    }

    /**
     * Specifies whether the consumer accept messages while it is stopping.
     * You may consider enabling this option, if you start and stop JMS routes at runtime, while there are still messages
     * enqueued on the queue. If this option is false, and you stop the JMS route, then messages may be rejected,
     * and the JMS broker would have to attempt redeliveries, which yet again may be rejected, and eventually the message
     * may be moved at a dead letter queue on the JMS broker. To avoid this its recommended to enable this option.
     */
    @Metadata(label = "consumer,advanced",
            description = "Specifies whether the consumer accept messages while it is stopping."
                    + " You may consider enabling this option, if you start and stop JMS routes at runtime, while there are still messages"
                    + " enqueued on the queue. If this option is false, and you stop the JMS route, then messages may be rejected,"
                    + " and the JMS broker would have to attempt redeliveries, which yet again may be rejected, and eventually the message"
                    + " may be moved at a dead letter queue on the JMS broker. To avoid this its recommended to enable this option.")
    public void setAcceptMessagesWhileStopping(boolean acceptMessagesWhileStopping) {
        getConfiguration().setAcceptMessagesWhileStopping(acceptMessagesWhileStopping);   
    }
    
    /**
     * Whether the DefaultMessageListenerContainer used in the reply managers for request-reply messaging allow 
     * the DefaultMessageListenerContainer.runningAllowed flag to quick stop in case JmsConfiguration#isAcceptMessagesWhileStopping
     * is enabled, and org.apache.camel.CamelContext is currently being stopped. This quick stop ability is enabled by
     * default in the regular JMS consumers but to enable for reply managers you must enable this flag.
      */
    @Metadata(label = "consumer,advanced",
            description = "Whether the DefaultMessageListenerContainer used in the reply managers for request-reply messaging allow "
                    + " the DefaultMessageListenerContainer.runningAllowed flag to quick stop in case JmsConfiguration#isAcceptMessagesWhileStopping"
                    + " is enabled, and org.apache.camel.CamelContext is currently being stopped. This quick stop ability is enabled by"
                    + " default in the regular JMS consumers but to enable for reply managers you must enable this flag.")
    public void setAllowReplyManagerQuickStop(boolean allowReplyManagerQuickStop) {
        getConfiguration().setAllowReplyManagerQuickStop(allowReplyManagerQuickStop);
    }

    /**
     * The JMS acknowledgement mode defined as an Integer.
     * Allows you to set vendor-specific extensions to the acknowledgment mode.
     * For the regular modes, it is preferable to use the acknowledgementModeName instead.
     */
    @Metadata(label = "consumer",
            description = "The JMS acknowledgement mode defined as an Integer. Allows you to set vendor-specific extensions to the acknowledgment mode."
                    + "For the regular modes, it is preferable to use the acknowledgementModeName instead.")
    public void setAcknowledgementMode(int consumerAcknowledgementMode) {
        getConfiguration().setAcknowledgementMode(consumerAcknowledgementMode);
    }

    /**
     * Enables eager loading of JMS properties as soon as a message is loaded
     * which generally is inefficient as the JMS properties may not be required
     * but sometimes can catch early any issues with the underlying JMS provider
     * and the use of JMS properties
     */
    @Metadata(label = "consumer,advanced",
            description = "Enables eager loading of JMS properties as soon as a message is loaded"
                    + " which generally is inefficient as the JMS properties may not be required"
                    + " but sometimes can catch early any issues with the underlying JMS provider"
                    + " and the use of JMS properties")
    public void setEagerLoadingOfProperties(boolean eagerLoadingOfProperties) {
        getConfiguration().setEagerLoadingOfProperties(eagerLoadingOfProperties);
    }

    /**
     * The JMS acknowledgement name, which is one of: SESSION_TRANSACTED, CLIENT_ACKNOWLEDGE, AUTO_ACKNOWLEDGE, DUPS_OK_ACKNOWLEDGE
     */
    @Metadata(defaultValue = "AUTO_ACKNOWLEDGE", label = "consumer", enums = "SESSION_TRANSACTED,CLIENT_ACKNOWLEDGE,AUTO_ACKNOWLEDGE,DUPS_OK_ACKNOWLEDGE",
            description = "The JMS acknowledgement name, which is one of: SESSION_TRANSACTED, CLIENT_ACKNOWLEDGE, AUTO_ACKNOWLEDGE, DUPS_OK_ACKNOWLEDGE")
    public void setAcknowledgementModeName(String consumerAcknowledgementMode) {
        getConfiguration().setAcknowledgementModeName(consumerAcknowledgementMode);
    }

    /**
     * Specifies whether the consumer container should auto-startup.
     */
    @Metadata(label = "consumer", defaultValue = "true",
            description = "Specifies whether the consumer container should auto-startup.")
    public void setAutoStartup(boolean autoStartup) {
        getConfiguration().setAutoStartup(autoStartup);
    }

    /**
     * Sets the cache level by ID for the underlying JMS resources. See cacheLevelName option for more details.
     */
    @Metadata(label = "consumer",
            description = "Sets the cache level by ID for the underlying JMS resources. See cacheLevelName option for more details.")
    public void setCacheLevel(int cacheLevel) {
        getConfiguration().setCacheLevel(cacheLevel);
    }

    /**
     * Sets the cache level by name for the underlying JMS resources.
     * Possible values are: CACHE_AUTO, CACHE_CONNECTION, CACHE_CONSUMER, CACHE_NONE, and CACHE_SESSION.
     * The default setting is CACHE_AUTO. See the Spring documentation and Transactions Cache Levels for more information.
     */
    @Metadata(defaultValue = "CACHE_AUTO", label = "consumer", enums = "CACHE_AUTO,CACHE_CONNECTION,CACHE_CONSUMER,CACHE_NONE,CACHE_SESSION",
            description = "Sets the cache level by name for the underlying JMS resources."
                    + " Possible values are: CACHE_AUTO, CACHE_CONNECTION, CACHE_CONSUMER, CACHE_NONE, and CACHE_SESSION."
                    + " The default setting is CACHE_AUTO. See the Spring documentation and Transactions Cache Levels for more information.")
    public void setCacheLevelName(String cacheName) {
        getConfiguration().setCacheLevelName(cacheName);
    }

    /**
     * Sets the cache level by name for the reply consumer when doing request/reply over JMS.
     * This option only applies when using fixed reply queues (not temporary).
     * Camel will by default use: CACHE_CONSUMER for exclusive or shared w/ replyToSelectorName.
     * And CACHE_SESSION for shared without replyToSelectorName. Some JMS brokers such as IBM WebSphere
     * may require to set the replyToCacheLevelName=CACHE_NONE to work.
     * Note: If using temporary queues then CACHE_NONE is not allowed,
     * and you must use a higher value such as CACHE_CONSUMER or CACHE_SESSION.
     */
    @Metadata(label = "producer,advanced", enums = "CACHE_AUTO,CACHE_CONNECTION,CACHE_CONSUMER,CACHE_NONE,CACHE_SESSION",
            description = "Sets the cache level by name for the reply consumer when doing request/reply over JMS."
                    + " This option only applies when using fixed reply queues (not temporary)."
                    + " Camel will by default use: CACHE_CONSUMER for exclusive or shared w/ replyToSelectorName."
                    + " And CACHE_SESSION for shared without replyToSelectorName. Some JMS brokers such as IBM WebSphere"
                    + " may require to set the replyToCacheLevelName=CACHE_NONE to work."
                    + " Note: If using temporary queues then CACHE_NONE is not allowed,"
                    + " and you must use a higher value such as CACHE_CONSUMER or CACHE_SESSION.")
    public void setReplyToCacheLevelName(String cacheName) {
        getConfiguration().setReplyToCacheLevelName(cacheName);
    }

    /**
     * Sets the JMS client ID to use. Note that this value, if specified, must be unique and can only be used by a single JMS connection instance.
     * It is typically only required for durable topic subscriptions.
     * <p/>
     * If using Apache ActiveMQ you may prefer to use Virtual Topics instead.
     */
    @Metadata(description = "Sets the JMS client ID to use. Note that this value, if specified, must be unique and can only be used by a single JMS connection instance."
            + " It is typically only required for durable topic subscriptions."
            + " If using Apache ActiveMQ you may prefer to use Virtual Topics instead.")
    public void setClientId(String consumerClientId) {
        getConfiguration().setClientId(consumerClientId);
    }

    /**
     * Specifies the default number of concurrent consumers when consuming from JMS (not for request/reply over JMS).
     * See also the maxMessagesPerTask option to control dynamic scaling up/down of threads.
     * <p/>
     * When doing request/reply over JMS then the option replyToConcurrentConsumers is used to control number
     * of concurrent consumers on the reply message listener.
     */
    @Metadata(defaultValue = "1", label = "consumer",
            description = "Specifies the default number of concurrent consumers when consuming from JMS (not for request/reply over JMS)."
                    + " See also the maxMessagesPerTask option to control dynamic scaling up/down of threads."
                    + " When doing request/reply over JMS then the option replyToConcurrentConsumers is used to control number"
                    + " of concurrent consumers on the reply message listener.")
    public void setConcurrentConsumers(int concurrentConsumers) {
        getConfiguration().setConcurrentConsumers(concurrentConsumers);
    }

    /**
     * Specifies the default number of concurrent consumers when doing request/reply over JMS.
     * See also the maxMessagesPerTask option to control dynamic scaling up/down of threads.
     */
    @Metadata(defaultValue = "1", label = "producer",
            description = "Specifies the default number of concurrent consumers when doing request/reply over JMS."
                    + " See also the maxMessagesPerTask option to control dynamic scaling up/down of threads.")
    public void setReplyToConcurrentConsumers(int concurrentConsumers) {
        getConfiguration().setReplyToConcurrentConsumers(concurrentConsumers);
    }

    /**
     * The connection factory to be use. A connection factory must be configured either on the component or endpoint.
     */
    @Metadata(description = "The connection factory to be use. A connection factory must be configured either on the component or endpoint.")
    public void setConnectionFactory(ConnectionFactory connectionFactory) {
        getConfiguration().setConnectionFactory(connectionFactory);
    }

    /**
     * Username to use with the ConnectionFactory. You can also configure username/password directly on the ConnectionFactory.
     */
    @Metadata(label = "security", secret = true, description = "Username to use with the ConnectionFactory. You can also configure username/password directly on the ConnectionFactory.")
    public void setUsername(String username) {
        getConfiguration().setUsername(username);
    }

    /**
     * Password to use with the ConnectionFactory. You can also configure username/password directly on the ConnectionFactory.
     */
    @Metadata(label = "security", secret = true, description = "Password to use with the ConnectionFactory. You can also configure username/password directly on the ConnectionFactory.")
    public void setPassword(String password) {
        getConfiguration().setPassword(password);
    }

    /**
     * Specifies whether persistent delivery is used by default.
     */
    @Metadata(defaultValue = "true", label = "producer",
            description = "Specifies whether persistent delivery is used by default.")
    public void setDeliveryPersistent(boolean deliveryPersistent) {
        getConfiguration().setDeliveryPersistent(deliveryPersistent);
    }

    /**
     * Specifies the delivery mode to be used. Possible values are
     * Possibles values are those defined by javax.jms.DeliveryMode.
     * NON_PERSISTENT = 1 and PERSISTENT = 2.
     */
    @Metadata(label = "producer", enums = "1,2",
            description = "Specifies the delivery mode to be used."
                    + " Possibles values are those defined by javax.jms.DeliveryMode."
                    + " NON_PERSISTENT = 1 and PERSISTENT = 2.")
    public void setDeliveryMode(Integer deliveryMode) {
        getConfiguration().setDeliveryMode(deliveryMode);
    }

    /**
     * The durable subscriber name for specifying durable topic subscriptions. The clientId option must be configured as well.
     */
    @Metadata(description = "The durable subscriber name for specifying durable topic subscriptions. The clientId option must be configured as well.")
    public void setDurableSubscriptionName(String durableSubscriptionName) {
        getConfiguration().setDurableSubscriptionName(durableSubscriptionName);
    }

    /**
     * Specifies the JMS Exception Listener that is to be notified of any underlying JMS exceptions.
     */
    @Metadata(label = "advanced",
            description = "Specifies the JMS Exception Listener that is to be notified of any underlying JMS exceptions.")
    public void setExceptionListener(ExceptionListener exceptionListener) {
        getConfiguration().setExceptionListener(exceptionListener);
    }

    /**
     * Specifies a org.springframework.util.ErrorHandler to be invoked in case of any uncaught exceptions thrown while processing a Message.
     * By default these exceptions will be logged at the WARN level, if no errorHandler has been configured.
     * You can configure logging level and whether stack traces should be logged using errorHandlerLoggingLevel and errorHandlerLogStackTrace options.
     * This makes it much easier to configure, than having to code a custom errorHandler.
     */
    @Metadata(label = "advanced",
            description = "Specifies a org.springframework.util.ErrorHandler to be invoked in case of any uncaught exceptions thrown while processing a Message."
                    + " By default these exceptions will be logged at the WARN level, if no errorHandler has been configured."
                    + " You can configure logging level and whether stack traces should be logged using errorHandlerLoggingLevel and errorHandlerLogStackTrace options."
                    + " This makes it much easier to configure, than having to code a custom errorHandler.")
    public void setErrorHandler(ErrorHandler errorHandler) {
        getConfiguration().setErrorHandler(errorHandler);
    }

    /**
     * Allows to configure the default errorHandler logging level for logging uncaught exceptions.
     */
    @Metadata(defaultValue = "WARN", label = "consumer,logging",
            description = "Allows to configure the default errorHandler logging level for logging uncaught exceptions.")
    public void setErrorHandlerLoggingLevel(LoggingLevel errorHandlerLoggingLevel) {
        getConfiguration().setErrorHandlerLoggingLevel(errorHandlerLoggingLevel);
    }

    /**
     * Allows to control whether stacktraces should be logged or not, by the default errorHandler.
     */
    @Metadata(defaultValue = "true", label = "consumer,logging",
            description = "Allows to control whether stacktraces should be logged or not, by the default errorHandler.")
    public void setErrorHandlerLogStackTrace(boolean errorHandlerLogStackTrace) {
        getConfiguration().setErrorHandlerLogStackTrace(errorHandlerLogStackTrace);
    }

    /**
     * Set if the deliveryMode, priority or timeToLive qualities of service should be used when sending messages.
     * This option is based on Spring's JmsTemplate. The deliveryMode, priority and timeToLive options are applied to the current endpoint.
     * This contrasts with the preserveMessageQos option, which operates at message granularity,
     * reading QoS properties exclusively from the Camel In message headers.
     */
    @Metadata(label = "producer", defaultValue = "false",
            description = "Set if the deliveryMode, priority or timeToLive qualities of service should be used when sending messages."
                    + " This option is based on Spring's JmsTemplate. The deliveryMode, priority and timeToLive options are applied to the current endpoint."
                    + " This contrasts with the preserveMessageQos option, which operates at message granularity,"
                    + " reading QoS properties exclusively from the Camel In message headers.")
    public void setExplicitQosEnabled(boolean explicitQosEnabled) {
        getConfiguration().setExplicitQosEnabled(explicitQosEnabled);
    }

    /**
     * Specifies whether the listener session should be exposed when consuming messages.
     */
    @Metadata(label = "consumer,advanced",
            description = "Specifies whether the listener session should be exposed when consuming messages.")
    public void setExposeListenerSession(boolean exposeListenerSession) {
        getConfiguration().setExposeListenerSession(exposeListenerSession);
    }

    /**
     * Specifies the limit for idle executions of a receive task, not having received any message within its execution.
     * If this limit is reached, the task will shut down and leave receiving to other executing tasks
     * (in the case of dynamic scheduling; see the maxConcurrentConsumers setting).
     * There is additional doc available from Spring.
     */
    @Metadata(defaultValue = "1", label = "advanced",
            description = "Specifies the limit for idle executions of a receive task, not having received any message within its execution."
                    + " If this limit is reached, the task will shut down and leave receiving to other executing tasks"
                    + " (in the case of dynamic scheduling; see the maxConcurrentConsumers setting)."
                    + " There is additional doc available from Spring.")
    public void setIdleTaskExecutionLimit(int idleTaskExecutionLimit) {
        getConfiguration().setIdleTaskExecutionLimit(idleTaskExecutionLimit);
    }

    /**
     * Specify the limit for the number of consumers that are allowed to be idle at any given time.
     */
    @Metadata(defaultValue = "1", label = "advanced",
            description = "Specify the limit for the number of consumers that are allowed to be idle at any given time.")
    public void setIdleConsumerLimit(int idleConsumerLimit) {
        getConfiguration().setIdleConsumerLimit(idleConsumerLimit);
    }

    /**
     * Specifies the maximum number of concurrent consumers when consuming from JMS (not for request/reply over JMS).
     * See also the maxMessagesPerTask option to control dynamic scaling up/down of threads.
     * <p/>
     * When doing request/reply over JMS then the option replyToMaxConcurrentConsumers is used to control number
     * of concurrent consumers on the reply message listener.
     */
    @Metadata(label = "consumer",
            description = "Specifies the maximum number of concurrent consumers when consuming from JMS (not for request/reply over JMS)."
                    + " See also the maxMessagesPerTask option to control dynamic scaling up/down of threads."
                    + " When doing request/reply over JMS then the option replyToMaxConcurrentConsumers is used to control number"
                    + " of concurrent consumers on the reply message listener.")
    public void setMaxConcurrentConsumers(int maxConcurrentConsumers) {
        getConfiguration().setMaxConcurrentConsumers(maxConcurrentConsumers);
    }

    /**
     * Specifies the maximum number of concurrent consumers when using request/reply over JMS.
     * See also the maxMessagesPerTask option to control dynamic scaling up/down of threads.
     */
    @Metadata(label = "producer",
            description = "Specifies the maximum number of concurrent consumers when using request/reply over JMS."
                    + " See also the maxMessagesPerTask option to control dynamic scaling up/down of threads.")
    public void setReplyToMaxConcurrentConsumers(int maxConcurrentConsumers) {
        getConfiguration().setReplyToMaxConcurrentConsumers(maxConcurrentConsumers);
    }

    /**
     * Specifies the maximum number of concurrent consumers for continue routing when timeout occurred when using request/reply over JMS.
     */
    @Metadata(label = "producer", defaultValue = "1",
            description = "Specifies the maximum number of concurrent consumers for continue routing when timeout occurred when using request/reply over JMS.")
    public void setReplyOnTimeoutToMaxConcurrentConsumers(int maxConcurrentConsumers) {
        getConfiguration().setReplyToOnTimeoutMaxConcurrentConsumers(maxConcurrentConsumers);
    }

    /**
     * The number of messages per task. -1 is unlimited.
     * If you use a range for concurrent consumers (eg min < max), then this option can be used to set
     * a value to eg 100 to control how fast the consumers will shrink when less work is required.
     */
    @Metadata(defaultValue = "-1", label = "advanced",
            description = "The number of messages per task. -1 is unlimited."
                    + " If you use a range for concurrent consumers (eg min < max), then this option can be used to set"
                    + " a value to eg 100 to control how fast the consumers will shrink when less work is required.")
    public void setMaxMessagesPerTask(int maxMessagesPerTask) {
        getConfiguration().setMaxMessagesPerTask(maxMessagesPerTask);
    }

    /**
     * To use a custom Spring org.springframework.jms.support.converter.MessageConverter so you can be in control
     * how to map to/from a javax.jms.Message.
     */
    @Metadata(label = "advanced",
            description = "To use a custom Spring org.springframework.jms.support.converter.MessageConverter so you can be in control how to map to/from a javax.jms.Message.")
    public void setMessageConverter(MessageConverter messageConverter) {
        getConfiguration().setMessageConverter(messageConverter);
    }

    /**
     * Specifies whether Camel should auto map the received JMS message to a suited payload type, such as javax.jms.TextMessage to a String etc.
     * See section about how mapping works below for more details.
     */
    @Metadata(defaultValue = "true", label = "advanced",
            description = "Specifies whether Camel should auto map the received JMS message to a suited payload type, such as javax.jms.TextMessage to a String etc.")
    public void setMapJmsMessage(boolean mapJmsMessage) {
        getConfiguration().setMapJmsMessage(mapJmsMessage);
    }

    /**
     * When sending, specifies whether message IDs should be added. This is just an hint to the JMS Broker.
     * If the JMS provider accepts this hint, these messages must have the message ID set to null; if the provider ignores the hint, the message ID must be set to its normal unique value
     */
    @Metadata(defaultValue = "true", label = "advanced",
            description = "When sending, specifies whether message IDs should be added. This is just an hint to the JMS broker."
                    + "If the JMS provider accepts this hint, these messages must have the message ID set to null; if the provider ignores the hint, "
                    + "the message ID must be set to its normal unique value")
    public void setMessageIdEnabled(boolean messageIdEnabled) {
        getConfiguration().setMessageIdEnabled(messageIdEnabled);
    }

    /**
     * Specifies whether timestamps should be enabled by default on sending messages.
     */
    @Metadata(defaultValue = "true", label = "advanced",
            description = "Specifies whether timestamps should be enabled by default on sending messages. This is just an hint to the JMS broker."
                    + "If the JMS provider accepts this hint, these messages must have the timestamp set to zero; if the provider ignores the hint "
                    + "the timestamp must be set to its normal value")
    public void setMessageTimestampEnabled(boolean messageTimestampEnabled) {
        getConfiguration().setMessageTimestampEnabled(messageTimestampEnabled);
    }

    /**
     * If true, Camel will always make a JMS message copy of the message when it is passed to the producer for sending.
     * Copying the message is needed in some situations, such as when a replyToDestinationSelectorName is set
     * (incidentally, Camel will set the alwaysCopyMessage option to true, if a replyToDestinationSelectorName is set)
     */
    @Metadata(label = "producer,advanced",
            description = "If true, Camel will always make a JMS message copy of the message when it is passed to the producer for sending."
                    + " Copying the message is needed in some situations, such as when a replyToDestinationSelectorName is set"
                    + " (incidentally, Camel will set the alwaysCopyMessage option to true, if a replyToDestinationSelectorName is set)")
    public void setAlwaysCopyMessage(boolean alwaysCopyMessage) {
        getConfiguration().setAlwaysCopyMessage(alwaysCopyMessage);
    }

    /**
     * Specifies whether JMSMessageID should always be used as JMSCorrelationID for InOut messages.
     */
    @Metadata(label = "advanced",
            description = "Specifies whether JMSMessageID should always be used as JMSCorrelationID for InOut messages.")
    public void setUseMessageIDAsCorrelationID(boolean useMessageIDAsCorrelationID) {
        getConfiguration().setUseMessageIDAsCorrelationID(useMessageIDAsCorrelationID);
    }

    /**
     * Values greater than 1 specify the message priority when sending (where 0 is the lowest priority and 9 is the highest).
     * The explicitQosEnabled option must also be enabled in order for this option to have any effect.
     */
    @Metadata(defaultValue = "" + Message.DEFAULT_PRIORITY, enums = "1,2,3,4,5,6,7,8,9", label = "producer",
            description = "Values greater than 1 specify the message priority when sending (where 0 is the lowest priority and 9 is the highest)."
                    + " The explicitQosEnabled option must also be enabled in order for this option to have any effect.")
    public void setPriority(int priority) {
        getConfiguration().setPriority(priority);
    }

    /**
     * Specifies whether to inhibit the delivery of messages published by its own connection.
     */
    @Metadata(label = "advanced",
            description = "Specifies whether to inhibit the delivery of messages published by its own connection.")
    public void setPubSubNoLocal(boolean pubSubNoLocal) {
        getConfiguration().setPubSubNoLocal(pubSubNoLocal);
    }

    /**
     * The timeout for receiving messages (in milliseconds).
     */
    @Metadata(defaultValue = "1000", label = "advanced",
            description = "The timeout for receiving messages (in milliseconds).")
    public void setReceiveTimeout(long receiveTimeout) {
        getConfiguration().setReceiveTimeout(receiveTimeout);
    }

    /**
     * Specifies the interval between recovery attempts, i.e. when a connection is being refreshed, in milliseconds.
     * The default is 5000 ms, that is, 5 seconds.
     */
    @Metadata(defaultValue = "5000", label = "advanced",
            description = "Specifies the interval between recovery attempts, i.e. when a connection is being refreshed, in milliseconds."
                    + " The default is 5000 ms, that is, 5 seconds.")
    public void setRecoveryInterval(long recoveryInterval) {
        getConfiguration().setRecoveryInterval(recoveryInterval);
    }

    /**
     * Allows you to specify a custom task executor for consuming messages.
     */
    @Metadata(label = "consumer,advanced",
            description = "Allows you to specify a custom task executor for consuming messages.")
    public void setTaskExecutor(TaskExecutor taskExecutor) {
        getConfiguration().setTaskExecutor(taskExecutor);
    }

    /**
     * When sending messages, specifies the time-to-live of the message (in milliseconds).
     */
    @Metadata(defaultValue = "-1", label = "producer",
            description = "When sending messages, specifies the time-to-live of the message (in milliseconds).")
    public void setTimeToLive(long timeToLive) {
        getConfiguration().setTimeToLive(timeToLive);
    }

    /**
     * Specifies whether to use transacted mode
     */
    @Metadata(label = "transaction",
            description = "Specifies whether to use transacted mode")
    public void setTransacted(boolean consumerTransacted) {
        getConfiguration().setTransacted(consumerTransacted);
    }

    /**
     * If true, Camel will create a JmsTransactionManager, if there is no transactionManager injected when option transacted=true.
     */
    @Metadata(defaultValue = "true", label = "transaction,advanced",
            description = "If true, Camel will create a JmsTransactionManager, if there is no transactionManager injected when option transacted=true.")
    public void setLazyCreateTransactionManager(boolean lazyCreating) {
        getConfiguration().setLazyCreateTransactionManager(lazyCreating);
    }

    /**
     * The Spring transaction manager to use.
     */
    @Metadata(label = "transaction,advanced",
            description = "The Spring transaction manager to use.")
    public void setTransactionManager(PlatformTransactionManager transactionManager) {
        getConfiguration().setTransactionManager(transactionManager);
    }

    /**
     * The name of the transaction to use.
     */
    @Metadata(label = "transaction,advanced",
            description = "The name of the transaction to use.")
    public void setTransactionName(String transactionName) {
        getConfiguration().setTransactionName(transactionName);
    }

    /**
     * The timeout value of the transaction (in seconds), if using transacted mode.
     */
    @Metadata(defaultValue = "-1", label = "transaction,advanced",
            description = "The timeout value of the transaction (in seconds), if using transacted mode.")
    public void setTransactionTimeout(int transactionTimeout) {
        getConfiguration().setTransactionTimeout(transactionTimeout);
    }

    /**
     * Specifies whether to test the connection on startup.
     * This ensures that when Camel starts that all the JMS consumers have a valid connection to the JMS broker.
     * If a connection cannot be granted then Camel throws an exception on startup.
     * This ensures that Camel is not started with failed connections.
     * The JMS producers is tested as well.
     */
    @Metadata(description = "Specifies whether to test the connection on startup."
            + " This ensures that when Camel starts that all the JMS consumers have a valid connection to the JMS broker."
            + " If a connection cannot be granted then Camel throws an exception on startup."
            + " This ensures that Camel is not started with failed connections."
            + " The JMS producers is tested as well.")
    public void setTestConnectionOnStartup(boolean testConnectionOnStartup) {
        getConfiguration().setTestConnectionOnStartup(testConnectionOnStartup);
    }

    /**
     * Whether to startup the JmsConsumer message listener asynchronously, when starting a route.
     * For example if a JmsConsumer cannot get a connection to a remote JMS broker, then it may block while retrying
     * and/or failover. This will cause Camel to block while starting routes. By setting this option to true,
     * you will let routes startup, while the JmsConsumer connects to the JMS broker using a dedicated thread
     * in asynchronous mode. If this option is used, then beware that if the connection could not be established,
     * then an exception is logged at WARN level, and the consumer will not be able to receive messages;
     * You can then restart the route to retry.
     */
    @Metadata(label = "advanced",
            description = "Whether to startup the JmsConsumer message listener asynchronously, when starting a route."
                    + " For example if a JmsConsumer cannot get a connection to a remote JMS broker, then it may block while retrying"
                    + " and/or failover. This will cause Camel to block while starting routes. By setting this option to true,"
                    + " you will let routes startup, while the JmsConsumer connects to the JMS broker using a dedicated thread"
                    + " in asynchronous mode. If this option is used, then beware that if the connection could not be established,"
                    + " then an exception is logged at WARN level, and the consumer will not be able to receive messages;"
                    + " You can then restart the route to retry.")
    public void setAsyncStartListener(boolean asyncStartListener) {
        getConfiguration().setAsyncStartListener(asyncStartListener);
    }

    /**
     * Whether to stop the JmsConsumer message listener asynchronously, when stopping a route.
     */
    @Metadata(label = "advanced",
            description = "Whether to stop the JmsConsumer message listener asynchronously, when stopping a route.")
    public void setAsyncStopListener(boolean asyncStopListener) {
        getConfiguration().setAsyncStopListener(asyncStopListener);
    }

    /**
     * When using mapJmsMessage=false Camel will create a new JMS message to send to a new JMS destination
     * if you touch the headers (get or set) during the route. Set this option to true to force Camel to send
     * the original JMS message that was received.
     */
    @Metadata(label = "producer,advanced",
            description = "When using mapJmsMessage=false Camel will create a new JMS message to send to a new JMS destination"
                    + " if you touch the headers (get or set) during the route. Set this option to true to force Camel to send"
                    + " the original JMS message that was received.")
    public void setForceSendOriginalMessage(boolean forceSendOriginalMessage) {
        getConfiguration().setForceSendOriginalMessage(forceSendOriginalMessage);
    }

    /**
     * The timeout for waiting for a reply when using the InOut Exchange Pattern (in milliseconds).
     * The default is 20 seconds. You can include the header "CamelJmsRequestTimeout" to override this endpoint configured
     * timeout value, and thus have per message individual timeout values.
     * See also the requestTimeoutCheckerInterval option.
     */
    @Metadata(defaultValue = "20000", label = "producer",
            description = "The timeout for waiting for a reply when using the InOut Exchange Pattern (in milliseconds)."
                    + " The default is 20 seconds. You can include the header \"CamelJmsRequestTimeout\" to override this endpoint configured"
                    + " timeout value, and thus have per message individual timeout values."
                    + " See also the requestTimeoutCheckerInterval option.")
    public void setRequestTimeout(long requestTimeout) {
        getConfiguration().setRequestTimeout(requestTimeout);
    }

    /**
     * Configures how often Camel should check for timed out Exchanges when doing request/reply over JMS.
     * By default Camel checks once per second. But if you must react faster when a timeout occurs,
     * then you can lower this interval, to check more frequently. The timeout is determined by the option requestTimeout.
     */
    @Metadata(defaultValue = "1000", label = "advanced",
            description = "Configures how often Camel should check for timed out Exchanges when doing request/reply over JMS."
                    + " By default Camel checks once per second. But if you must react faster when a timeout occurs,"
                    + " then you can lower this interval, to check more frequently. The timeout is determined by the option requestTimeout.")
    public void setRequestTimeoutCheckerInterval(long requestTimeoutCheckerInterval) {
        getConfiguration().setRequestTimeoutCheckerInterval(requestTimeoutCheckerInterval);
    }

    /**
     * You can transfer the exchange over the wire instead of just the body and headers.
     * The following fields are transferred: In body, Out body, Fault body, In headers, Out headers, Fault headers,
     * exchange properties, exchange exception.
     * This requires that the objects are serializable. Camel will exclude any non-serializable objects and log it at WARN level.
     * You must enable this option on both the producer and consumer side, so Camel knows the payloads is an Exchange and not a regular payload.
     */
    @Metadata(label = "advanced",
            description = "You can transfer the exchange over the wire instead of just the body and headers."
                    + " The following fields are transferred: In body, Out body, Fault body, In headers, Out headers, Fault headers,"
                    + " exchange properties, exchange exception."
                    + " This requires that the objects are serializable. Camel will exclude any non-serializable objects and log it at WARN level."
                    + " You must enable this option on both the producer and consumer side, so Camel knows the payloads is an Exchange and not a regular payload.")
    public void setTransferExchange(boolean transferExchange) {
        getConfiguration().setTransferExchange(transferExchange);
    }

    /**
     * If enabled and you are using Request Reply messaging (InOut) and an Exchange failed on the consumer side,
     * then the caused Exception will be send back in response as a javax.jms.ObjectMessage.
     * If the client is Camel, the returned Exception is rethrown. This allows you to use Camel JMS as a bridge
     * in your routing - for example, using persistent queues to enable robust routing.
     * Notice that if you also have transferExchange enabled, this option takes precedence.
     * The caught exception is required to be serializable.
     * The original Exception on the consumer side can be wrapped in an outer exception
     * such as org.apache.camel.RuntimeCamelException when returned to the producer.
     */
    @Metadata(label = "advanced",
            description = "If enabled and you are using Request Reply messaging (InOut) and an Exchange failed on the consumer side,"
                    + " then the caused Exception will be send back in response as a javax.jms.ObjectMessage."
                    + " If the client is Camel, the returned Exception is rethrown. This allows you to use Camel JMS as a bridge"
                    + " in your routing - for example, using persistent queues to enable robust routing."
                    + " Notice that if you also have transferExchange enabled, this option takes precedence."
                    + " The caught exception is required to be serializable."
                    + " The original Exception on the consumer side can be wrapped in an outer exception"
                    + " such as org.apache.camel.RuntimeCamelException when returned to the producer.")
    public void setTransferException(boolean transferException) {
        getConfiguration().setTransferException(transferException);
    }

    /**
     * If enabled and you are using Request Reply messaging (InOut) and an Exchange failed with a SOAP fault (not exception) on the consumer side,
     * then the fault flag on {@link org.apache.camel.Message#isFault()} will be send back in the response as a JMS header with the key
     * {@link JmsConstants#JMS_TRANSFER_FAULT}.
     * If the client is Camel, the returned fault flag will be set on the {@link org.apache.camel.Message#setFault(boolean)}.
     * <p/>
     * You may want to enable this when using Camel components that support faults such as SOAP based such as cxf or spring-ws.
     */
    @Metadata(label = "advanced",
            description = "If enabled and you are using Request Reply messaging (InOut) and an Exchange failed with a SOAP fault (not exception) on the consumer side,"
                    + " then the fault flag on Message#isFault() will be send back in the response as a JMS header with the key"
                    + " org.apache.camel.component.jms.JmsConstants#JMS_TRANSFER_FAULT#JMS_TRANSFER_FAULT."
                    + " If the client is Camel, the returned fault flag will be set on the {@link org.apache.camel.Message#setFault(boolean)}."
                    + " You may want to enable this when using Camel components that support faults such as SOAP based such as cxf or spring-ws.")
    public void setTransferFault(boolean transferFault) {
        getConfiguration().setTransferFault(transferFault);
    }

    /**
     * Allows you to use your own implementation of the org.springframework.jms.core.JmsOperations interface.
     * Camel uses JmsTemplate as default. Can be used for testing purpose, but not used much as stated in the spring API docs.
     */
    @Metadata(label = "advanced",
            description = "Allows you to use your own implementation of the org.springframework.jms.core.JmsOperations interface."
                    + " Camel uses JmsTemplate as default. Can be used for testing purpose, but not used much as stated in the spring API docs.")
    public void setJmsOperations(JmsOperations jmsOperations) {
        getConfiguration().setJmsOperations(jmsOperations);
    }

    /**
     * A pluggable org.springframework.jms.support.destination.DestinationResolver that allows you to use your own resolver
     * (for example, to lookup the real destination in a JNDI registry).
     */
    @Metadata(label = "advanced", description = "A pluggable org.springframework.jms.support.destination.DestinationResolver that allows you to use your own resolver"
            + " (for example, to lookup the real destination in a JNDI registry).")
    public void setDestinationResolver(DestinationResolver destinationResolver) {
        getConfiguration().setDestinationResolver(destinationResolver);
    }

    /**
     * Allows for explicitly specifying which kind of strategy to use for replyTo queues when doing request/reply over JMS.
     * Possible values are: Temporary, Shared, or Exclusive.
     * By default Camel will use temporary queues. However if replyTo has been configured, then Shared is used by default.
     * This option allows you to use exclusive queues instead of shared ones.
     * See Camel JMS documentation for more details, and especially the notes about the implications if running in a clustered environment,
     * and the fact that Shared reply queues has lower performance than its alternatives Temporary and Exclusive.
     */
    @Metadata(label = "producer",
            description = "Allows for explicitly specifying which kind of strategy to use for replyTo queues when doing request/reply over JMS."
                    + " Possible values are: Temporary, Shared, or Exclusive."
                    + " By default Camel will use temporary queues. However if replyTo has been configured, then Shared is used by default."
                    + " This option allows you to use exclusive queues instead of shared ones."
                    + " See Camel JMS documentation for more details, and especially the notes about the implications if running in a clustered environment,"
                    + " and the fact that Shared reply queues has lower performance than its alternatives Temporary and Exclusive.")
    public void setReplyToType(ReplyToType replyToType) {
        getConfiguration().setReplyToType(replyToType);
    }

    /**
     * Set to true, if you want to send message using the QoS settings specified on the message,
     * instead of the QoS settings on the JMS endpoint. The following three headers are considered JMSPriority, JMSDeliveryMode,
     * and JMSExpiration. You can provide all or only some of them. If not provided, Camel will fall back to use the
     * values from the endpoint instead. So, when using this option, the headers override the values from the endpoint.
     * The explicitQosEnabled option, by contrast, will only use options set on the endpoint, and not values from the message header.
     */
    @Metadata(label = "producer",
            description = "Set to true, if you want to send message using the QoS settings specified on the message,"
                    + " instead of the QoS settings on the JMS endpoint. The following three headers are considered JMSPriority, JMSDeliveryMode,"
                    + " and JMSExpiration. You can provide all or only some of them. If not provided, Camel will fall back to use the"
                    + " values from the endpoint instead. So, when using this option, the headers override the values from the endpoint."
                    + " The explicitQosEnabled option, by contrast, will only use options set on the endpoint, and not values from the message header.")
    public void setPreserveMessageQos(boolean preserveMessageQos) {
        getConfiguration().setPreserveMessageQos(preserveMessageQos);
    }

    /**
     * Whether the JmsConsumer processes the Exchange asynchronously.
     * If enabled then the JmsConsumer may pickup the next message from the JMS queue,
     * while the previous message is being processed asynchronously (by the Asynchronous Routing Engine).
     * This means that messages may be processed not 100% strictly in order. If disabled (as default)
     * then the Exchange is fully processed before the JmsConsumer will pickup the next message from the JMS queue.
     * Note if transacted has been enabled, then asyncConsumer=true does not run asynchronously, as transaction
     * must be executed synchronously (Camel 3.0 may support async transactions).
     */
    @Metadata(label = "consumer",
            description = "Whether the JmsConsumer processes the Exchange asynchronously."
                    + " If enabled then the JmsConsumer may pickup the next message from the JMS queue,"
                    + " while the previous message is being processed asynchronously (by the Asynchronous Routing Engine)."
                    + " This means that messages may be processed not 100% strictly in order. If disabled (as default)"
                    + " then the Exchange is fully processed before the JmsConsumer will pickup the next message from the JMS queue."
                    + " Note if transacted has been enabled, then asyncConsumer=true does not run asynchronously, as transaction"
                    + "  must be executed synchronously (Camel 3.0 may support async transactions).")
    public void setAsyncConsumer(boolean asyncConsumer) {
        getConfiguration().setAsyncConsumer(asyncConsumer);
    }

    /**
     * Whether to allow sending messages with no body. If this option is false and the message body is null, then an JMSException is thrown.
     */
    @Metadata(defaultValue = "true", label = "producer,advanced",
            description = "Whether to allow sending messages with no body. If this option is false and the message body is null, then an JMSException is thrown.")
    public void setAllowNullBody(boolean allowNullBody) {
        getConfiguration().setAllowNullBody(allowNullBody);
    }

    /**
     * Only applicable when sending to JMS destination using InOnly (eg fire and forget).
     * Enabling this option will enrich the Camel Exchange with the actual JMSMessageID
     * that was used by the JMS client when the message was sent to the JMS destination.
     */
    @Metadata(label = "producer,advanced",
            description = "Only applicable when sending to JMS destination using InOnly (eg fire and forget)."
                    + " Enabling this option will enrich the Camel Exchange with the actual JMSMessageID"
                    + " that was used by the JMS client when the message was sent to the JMS destination.")
    public void setIncludeSentJMSMessageID(boolean includeSentJMSMessageID) {
        getConfiguration().setIncludeSentJMSMessageID(includeSentJMSMessageID);
    }

    /**
     * Whether to include all JMSXxxx properties when mapping from JMS to Camel Message.
     * Setting this to true will include properties such as JMSXAppID, and JMSXUserID etc.
     * Note: If you are using a custom headerFilterStrategy then this option does not apply.
     */
    @Metadata(label = "advanced",
            description = "Whether to include all JMSXxxx properties when mapping from JMS to Camel Message."
                    + " Setting this to true will include properties such as JMSXAppID, and JMSXUserID etc."
                    + " Note: If you are using a custom headerFilterStrategy then this option does not apply.")
    public void setIncludeAllJMSXProperties(boolean includeAllJMSXProperties) {
        getConfiguration().setIncludeAllJMSXProperties(includeAllJMSXProperties);
    }

    /**
     * Specifies what default TaskExecutor type to use in the DefaultMessageListenerContainer,
     * for both consumer endpoints and the ReplyTo consumer of producer endpoints.
     * Possible values: SimpleAsync (uses Spring's SimpleAsyncTaskExecutor) or ThreadPool
     * (uses Spring's ThreadPoolTaskExecutor with optimal values - cached threadpool-like).
     * If not set, it defaults to the previous behaviour, which uses a cached thread pool
     * for consumer endpoints and SimpleAsync for reply consumers.
     * The use of ThreadPool is recommended to reduce "thread trash" in elastic configurations
     * with dynamically increasing and decreasing concurrent consumers.
     */
    @Metadata(label = "consumer,advanced",
            description = "Specifies what default TaskExecutor type to use in the DefaultMessageListenerContainer,"
                    + " for both consumer endpoints and the ReplyTo consumer of producer endpoints."
                    + " Possible values: SimpleAsync (uses Spring's SimpleAsyncTaskExecutor) or ThreadPool"
                    + " (uses Spring's ThreadPoolTaskExecutor with optimal values - cached threadpool-like)."
                    + " If not set, it defaults to the previous behaviour, which uses a cached thread pool"
                    + " for consumer endpoints and SimpleAsync for reply consumers."
                    + " The use of ThreadPool is recommended to reduce thread trash in elastic configurations"
                    + " with dynamically increasing and decreasing concurrent consumers.")
    public void setDefaultTaskExecutorType(DefaultTaskExecutorType type) {
        getConfiguration().setDefaultTaskExecutorType(type);
    }

    /**
     * Pluggable strategy for encoding and decoding JMS keys so they can be compliant with the JMS specification.
     * Camel provides two implementations out of the box: default and passthrough.
     * The default strategy will safely marshal dots and hyphens (. and -). The passthrough strategy leaves the key as is.
     * Can be used for JMS brokers which do not care whether JMS header keys contain illegal characters.
     * You can provide your own implementation of the org.apache.camel.component.jms.JmsKeyFormatStrategy
     * and refer to it using the # notation.
     */
    @Metadata(label = "advanced",
            description = "Pluggable strategy for encoding and decoding JMS keys so they can be compliant with the JMS specification."
                    + " Camel provides two implementations out of the box: default and passthrough."
                    + " The default strategy will safely marshal dots and hyphens (. and -). The passthrough strategy leaves the key as is."
                    + " Can be used for JMS brokers which do not care whether JMS header keys contain illegal characters."
                    + " You can provide your own implementation of the org.apache.camel.component.jms.JmsKeyFormatStrategy"
                    + " and refer to it using the # notation.")
    public void setJmsKeyFormatStrategy(JmsKeyFormatStrategy jmsKeyFormatStrategy) {
        getConfiguration().setJmsKeyFormatStrategy(jmsKeyFormatStrategy);
    }

    /**
     * Pluggable strategy for encoding and decoding JMS keys so they can be compliant with the JMS specification.
     * Camel provides two implementations out of the box: default and passthrough.
     * The default strategy will safely marshal dots and hyphens (. and -). The passthrough strategy leaves the key as is.
     * Can be used for JMS brokers which do not care whether JMS header keys contain illegal characters.
     * You can provide your own implementation of the org.apache.camel.component.jms.JmsKeyFormatStrategy
     * and refer to it using the # notation.
     */
    public void setJmsKeyFormatStrategy(String jmsKeyFormatStrategyName) {
        // allow to configure a standard by its name, which is simpler
        JmsKeyFormatStrategy strategy = resolveStandardJmsKeyFormatStrategy(jmsKeyFormatStrategyName);
        if (strategy == null) {
            throw new IllegalArgumentException("JmsKeyFormatStrategy with name " + jmsKeyFormatStrategyName + " is not a standard supported name");
        } else {
            getConfiguration().setJmsKeyFormatStrategy(strategy);
        }
    }

    /**
     * This option is used to allow additional headers which may have values that are invalid according to JMS specification.
     + For example some message systems such as WMQ do this with header names using prefix JMS_IBM_MQMD_ containing values with byte array or other invalid types.
     + You can specify multiple header names separated by comma, and use * as suffix for wildcard matching.
     */
    @Metadata(label = "producer,advanced",
        description = "This option is used to allow additional headers which may have values that are invalid according to JMS specification."
            + " For example some message systems such as WMQ do this with header names using prefix JMS_IBM_MQMD_ containing values with byte array or other invalid types."
            + " You can specify multiple header names separated by comma, and use * as suffix for wildcard matching.")
    public void setAllowAdditionalHeaders(String allowAdditionalHeaders) {
        getConfiguration().setAllowAdditionalHeaders(allowAdditionalHeaders);
    }

    /**
     * Sets the Spring ApplicationContext to use
     */
    public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {
        this.applicationContext = applicationContext;
    }

    public QueueBrowseStrategy getQueueBrowseStrategy() {
        if (queueBrowseStrategy == null) {
            queueBrowseStrategy = new DefaultQueueBrowseStrategy();
        }
        return queueBrowseStrategy;
    }

    /**
     * To use a custom QueueBrowseStrategy when browsing queues
     */
    public void setQueueBrowseStrategy(QueueBrowseStrategy queueBrowseStrategy) {
        this.queueBrowseStrategy = queueBrowseStrategy;
    }

    public MessageCreatedStrategy getMessageCreatedStrategy() {
        return messageCreatedStrategy;
    }

    /**
     * To use the given MessageCreatedStrategy which are invoked when Camel creates new instances of <tt>javax.jms.Message</tt>
     * objects when Camel is sending a JMS message.
     */
    public void setMessageCreatedStrategy(MessageCreatedStrategy messageCreatedStrategy) {
        this.messageCreatedStrategy = messageCreatedStrategy;
    }

    public int getWaitForProvisionCorrelationToBeUpdatedCounter() {
        return getConfiguration().getWaitForProvisionCorrelationToBeUpdatedCounter();
    }

    /**
     * Number of times to wait for provisional correlation id to be updated to the actual correlation id when doing request/reply over JMS
     * and when the option useMessageIDAsCorrelationID is enabled.
     */
    @Metadata(defaultValue = "50", label = "advanced",
            description = "Number of times to wait for provisional correlation id to be updated to the actual correlation id when doing request/reply over JMS"
                    + " and when the option useMessageIDAsCorrelationID is enabled.")
    public void setWaitForProvisionCorrelationToBeUpdatedCounter(int counter) {
        getConfiguration().setWaitForProvisionCorrelationToBeUpdatedCounter(counter);
    }

    public long getWaitForProvisionCorrelationToBeUpdatedThreadSleepingTime() {
        return getConfiguration().getWaitForProvisionCorrelationToBeUpdatedThreadSleepingTime();
    }

    /**
     * Interval in millis to sleep each time while waiting for provisional correlation id to be updated.
     */
    @Metadata(defaultValue = "100", label = "advanced",
            description = "Interval in millis to sleep each time while waiting for provisional correlation id to be updated.")
    public void setWaitForProvisionCorrelationToBeUpdatedThreadSleepingTime(long sleepingTime) {
        getConfiguration().setWaitForProvisionCorrelationToBeUpdatedThreadSleepingTime(sleepingTime);
    }

    /**
     * Use this JMS property to correlate messages in InOut exchange pattern (request-reply)
     * instead of JMSCorrelationID property. This allows you to exchange messages with 
     * systems that do not correlate messages using JMSCorrelationID JMS property. If used
     * JMSCorrelationID will not be used or set by Camel. The value of here named property
     * will be generated if not supplied in the header of the message under the same name.
     */
    @Metadata(label = "producer,advanced",
            description = "Use this JMS property to correlate messages in InOut exchange pattern (request-reply)"
                    + " instead of JMSCorrelationID property. This allows you to exchange messages with"
                    + " systems that do not correlate messages using JMSCorrelationID JMS property. If used"
                    + " JMSCorrelationID will not be used or set by Camel. The value of here named property"
                    + " will be generated if not supplied in the header of the message under the same name.")
    public void setCorrelationProperty(final String correlationProperty) {
        getConfiguration().setCorrelationProperty(correlationProperty);
    }

    // JMS 2.0 API
    // -------------------------------------------------------------------------

    public boolean isSubscriptionDurable() {
        return getConfiguration().isSubscriptionDurable();
    }

    /**
     * Set whether to make the subscription durable. The durable subscription name
     * to be used can be specified through the "subscriptionName" property.
     * <p>Default is "false". Set this to "true" to register a durable subscription,
     * typically in combination with a "subscriptionName" value (unless
     * your message listener class name is good enough as subscription name).
     * <p>Only makes sense when listening to a topic (pub-sub domain),
     * therefore this method switches the "pubSubDomain" flag as well.
     */
    @Metadata(label = "consumer", description = "Set whether to make the subscription durable. The durable subscription name"
        + " to be used can be specified through the subscriptionName property."
        + " Default is false. Set this to true to register a durable subscription,"
        + " typically in combination with a subscriptionName value (unless"
        + " your message listener class name is good enough as subscription name)."
        + " Only makes sense when listening to a topic (pub-sub domain),"
        + " therefore this method switches the pubSubDomain flag as well.")
    public void setSubscriptionDurable(boolean subscriptionDurable) {
        getConfiguration().setSubscriptionDurable(subscriptionDurable);
    }

    public boolean isSubscriptionShared() {
        return getConfiguration().isSubscriptionShared();
    }

    /**
     * Set whether to make the subscription shared. The shared subscription name
     * to be used can be specified through the "subscriptionName" property.
     * <p>Default is "false". Set this to "true" to register a shared subscription,
     * typically in combination with a "subscriptionName" value (unless
     * your message listener class name is good enough as subscription name).
     * Note that shared subscriptions may also be durable, so this flag can
     * (and often will) be combined with "subscriptionDurable" as well.
     * <p>Only makes sense when listening to a topic (pub-sub domain),
     * therefore this method switches the "pubSubDomain" flag as well.
     * <p><b>Requires a JMS 2.0 compatible message broker.</b>
     */
    @Metadata(label = "consumer", description = "Set whether to make the subscription shared. The shared subscription name"
        + " to be used can be specified through the subscriptionName property."
        + " Default is false. Set this to true to register a shared subscription,"
        + " typically in combination with a subscriptionName value (unless"
        + " your message listener class name is good enough as subscription name)."
        + " Note that shared subscriptions may also be durable, so this flag can"
        + " (and often will) be combined with subscriptionDurable as well."
        + " Only makes sense when listening to a topic (pub-sub domain),"
        + " therefore this method switches the pubSubDomain flag as well."
        + " Requires a JMS 2.0 compatible message broker.")
    public void setSubscriptionShared(boolean subscriptionShared) {
        getConfiguration().setSubscriptionShared(subscriptionShared);
    }

    public String getSubscriptionName() {
        return getConfiguration().getSubscriptionName();
    }

    /**
     * Set the name of a subscription to create. To be applied in case
     * of a topic (pub-sub domain) with a shared or durable subscription.
     * <p>The subscription name needs to be unique within this client's
     * JMS client id. Default is the class name of the specified message listener.
     * <p>Note: Only 1 concurrent consumer (which is the default of this
     * message listener container) is allowed for each subscription,
     * except for a shared subscription (which requires JMS 2.0).
     */
    @Metadata(label = "consumer", description = "Set the name of a subscription to create. To be applied in case"
        + " of a topic (pub-sub domain) with a shared or durable subscription."
        + " The subscription name needs to be unique within this client's"
        + " JMS client id. Default is the class name of the specified message listener."
        + " Note: Only 1 concurrent consumer (which is the default of this"
        + " message listener container) is allowed for each subscription,"
        + " except for a shared subscription (which requires JMS 2.0).")
    public void setSubscriptionName(String subscriptionName) {
        getConfiguration().setSubscriptionName(subscriptionName);
    }


    public boolean isStreamMessageTypeEnabled() {
        return getConfiguration().isStreamMessageTypeEnabled();
    }

    /**
     * Sets whether StreamMessage type is enabled or not.
     * Message payloads of streaming kind such as files, InputStream, etc will either by sent as BytesMessage or StreamMessage.
     * This option controls which kind will be used. By default BytesMessage is used which enforces the entire message payload to be read into memory.
     * By enabling this option the message payload is read into memory in chunks and each chunk is then written to the StreamMessage until no more data.
     */
    @Metadata(label = "producer,advanced", description = "Sets whether StreamMessage type is enabled or not."
        + " Message payloads of streaming kind such as files, InputStream, etc will either by sent as BytesMessage or StreamMessage."
        + " This option controls which kind will be used. By default BytesMessage is used which enforces the entire message payload to be read into memory."
        + " By enabling this option the message payload is read into memory in chunks and each chunk is then written to the StreamMessage until no more data.")
    public void setStreamMessageTypeEnabled(boolean streamMessageTypeEnabled) {
        getConfiguration().setStreamMessageTypeEnabled(streamMessageTypeEnabled);
    }

    /**
     * Gets whether date headers should be formatted according to the ISO 8601
     * standard.
     */
    public boolean isFormatDateHeadersToIso8601() {
        return getConfiguration().isFormatDateHeadersToIso8601();
    }

    /**
     * Sets whether date headers should be formatted according to the ISO 8601
     * standard.
     */
    @Metadata(label = "producer", description = "Sets whether date headers should be formatted according to the ISO 8601 standard.")
    public void setFormatDateHeadersToIso8601(boolean formatDateHeadersToIso8601) {
        getConfiguration().setFormatDateHeadersToIso8601(formatDateHeadersToIso8601);
    }

    // Implementation methods
    // -------------------------------------------------------------------------

    @Override
    protected void doStart() throws Exception {
        if (getHeaderFilterStrategy() == null) {
            setHeaderFilterStrategy(new JmsHeaderFilterStrategy(getConfiguration().isIncludeAllJMSXProperties()));
        }
    }

    @Override
    protected void doShutdown() throws Exception {
        if (asyncStartStopExecutorService != null) {
            getCamelContext().getExecutorServiceManager().shutdownNow(asyncStartStopExecutorService);
            asyncStartStopExecutorService = null;
        }
        super.doShutdown();
    }

    protected synchronized ExecutorService getAsyncStartStopExecutorService() {
        if (asyncStartStopExecutorService == null) {
            // use a cached thread pool for async start tasks as they can run for a while, and we need a dedicated thread
            // for each task, and the thread pool will shrink when no more tasks running
            asyncStartStopExecutorService = getCamelContext().getExecutorServiceManager().newCachedThreadPool(this, "AsyncStartStopListener");
        }
        return asyncStartStopExecutorService;
    }

    @Override
    protected Endpoint createEndpoint(String uri, String remaining, Map<String, Object> parameters)
        throws Exception {

        boolean pubSubDomain = false;
        boolean tempDestination = false;

        if (ObjectHelper.isNotEmpty(remaining)) {
            if (remaining.startsWith(JmsConfiguration.QUEUE_PREFIX)) {
                pubSubDomain = false;
                remaining = removeStartingCharacters(remaining.substring(JmsConfiguration.QUEUE_PREFIX.length()), '/');
            } else if (remaining.startsWith(JmsConfiguration.TOPIC_PREFIX)) {
                pubSubDomain = true;
                remaining = removeStartingCharacters(remaining.substring(JmsConfiguration.TOPIC_PREFIX.length()), '/');
            } else if (remaining.startsWith(JmsConfiguration.TEMP_QUEUE_PREFIX)) {
                pubSubDomain = false;
                tempDestination = true;
                remaining = removeStartingCharacters(remaining.substring(JmsConfiguration.TEMP_QUEUE_PREFIX.length()), '/');
            } else if (remaining.startsWith(JmsConfiguration.TEMP_TOPIC_PREFIX)) {
                pubSubDomain = true;
                tempDestination = true;
                remaining = removeStartingCharacters(remaining.substring(JmsConfiguration.TEMP_TOPIC_PREFIX.length()), '/');
            }
        }

        final String subject = convertPathToActualDestination(remaining, parameters);

        // lets make sure we copy the configuration as each endpoint can
        // customize its own version
        JmsConfiguration newConfiguration = getConfiguration().copy();
        JmsEndpoint endpoint;
        if (pubSubDomain) {
            if (tempDestination) {
                endpoint = createTemporaryTopicEndpoint(uri, this, subject, newConfiguration);
            } else {
                endpoint = createTopicEndpoint(uri, this, subject, newConfiguration);
            }
        } else {
            QueueBrowseStrategy strategy = getQueueBrowseStrategy();
            if (tempDestination) {
                endpoint = createTemporaryQueueEndpoint(uri, this, subject, newConfiguration, strategy);
            } else {
                endpoint = createQueueEndpoint(uri, this, subject, newConfiguration, strategy);
            }
        }

        // resolve any custom connection factory first
        ConnectionFactory cf = resolveAndRemoveReferenceParameter(parameters, "connectionFactory", ConnectionFactory.class);
        if (cf != null) {
            endpoint.getConfiguration().setConnectionFactory(cf);
        }

        // if username or password provided then wrap the connection factory
        String cfUsername = getAndRemoveParameter(parameters, "username", String.class, getConfiguration().getUsername());
        String cfPassword = getAndRemoveParameter(parameters, "password", String.class, getConfiguration().getPassword());
        if (cfUsername != null && cfPassword != null) {
            cf = endpoint.getConfiguration().getConnectionFactory();
            ObjectHelper.notNull(cf, "ConnectionFactory");
            LOG.debug("Wrapping existing ConnectionFactory with UserCredentialsConnectionFactoryAdapter using username: {} and password: ******", cfUsername);
            UserCredentialsConnectionFactoryAdapter ucfa = new UserCredentialsConnectionFactoryAdapter();
            ucfa.setTargetConnectionFactory(cf);
            ucfa.setPassword(cfPassword);
            ucfa.setUsername(cfUsername);
            endpoint.getConfiguration().setConnectionFactory(ucfa);
        } else {
            // if only username or password was provided then fail
            if (cfUsername != null || cfPassword != null) {
                if (cfUsername == null) {
                    throw new IllegalArgumentException("Username must also be provided when using username/password as credentials.");
                } else {
                    throw new IllegalArgumentException("Password must also be provided when using username/password as credentials.");
                }
            }
        }

        // jms header strategy
        String strategyVal = getAndRemoveParameter(parameters, KEY_FORMAT_STRATEGY_PARAM, String.class);
        JmsKeyFormatStrategy strategy = resolveStandardJmsKeyFormatStrategy(strategyVal);
        if (strategy != null) {
            endpoint.setJmsKeyFormatStrategy(strategy);
        } else {
            // its not a standard, but a reference
            parameters.put(KEY_FORMAT_STRATEGY_PARAM, strategyVal);
            endpoint.setJmsKeyFormatStrategy(resolveAndRemoveReferenceParameter(
                    parameters, KEY_FORMAT_STRATEGY_PARAM, JmsKeyFormatStrategy.class));
        }

        MessageListenerContainerFactory messageListenerContainerFactory = resolveAndRemoveReferenceParameter(parameters,
                "messageListenerContainerFactoryRef", MessageListenerContainerFactory.class);
        if (messageListenerContainerFactory == null) {
            messageListenerContainerFactory = resolveAndRemoveReferenceParameter(parameters,
                    "messageListenerContainerFactory", MessageListenerContainerFactory.class);
        }
        if (messageListenerContainerFactory != null) {
            endpoint.setMessageListenerContainerFactory(messageListenerContainerFactory);
        }

        endpoint.setHeaderFilterStrategy(getHeaderFilterStrategy());
        setProperties(endpoint.getConfiguration(), parameters);

        return endpoint;
    }

    protected JmsEndpoint createTemporaryTopicEndpoint(String uri, JmsComponent component, String subject, JmsConfiguration configuration) {
        return new JmsTemporaryTopicEndpoint(uri, component, subject, configuration);
    }

    protected JmsEndpoint createTopicEndpoint(String uri, JmsComponent component, String subject, JmsConfiguration configuration) {
        return new JmsEndpoint(uri, component, subject, true, configuration);
    }

    protected JmsEndpoint createTemporaryQueueEndpoint(String uri, JmsComponent component, String subject, JmsConfiguration configuration, QueueBrowseStrategy queueBrowseStrategy) {
        return new JmsTemporaryQueueEndpoint(uri, component, subject, configuration, queueBrowseStrategy);
    }

    protected JmsEndpoint createQueueEndpoint(String uri, JmsComponent component, String subject, JmsConfiguration configuration, QueueBrowseStrategy queueBrowseStrategy) {
        return new JmsQueueEndpoint(uri, component, subject, configuration, queueBrowseStrategy);
    }

    /**
     * Resolves the standard supported {@link JmsKeyFormatStrategy} by a name which can be:
     * <ul>
     *     <li>default - to use the default strategy</li>
     *     <li>passthrough - to use the passthrough strategy</li>
     * </ul>
     *
     * @param name  the name
     * @return the strategy, or <tt>null</tt> if not a standard name.
     */
    private static JmsKeyFormatStrategy resolveStandardJmsKeyFormatStrategy(String name) {
        if ("default".equalsIgnoreCase(name)) {
            return new DefaultJmsKeyFormatStrategy();
        } else if ("passthrough".equalsIgnoreCase(name)) {
            return new PassThroughJmsKeyFormatStrategy();
        } else {
            return null;
        }
    }

    /**
     * A strategy method allowing the URI destination to be translated into the
     * actual JMS destination name (say by looking up in JNDI or something)
     */
    protected String convertPathToActualDestination(String path, Map<String, Object> parameters) {
        return path;
    }

    /**
     * Factory method to create the default configuration instance
     *
     * @return a newly created configuration object which can then be further
     *         customized
     */
    protected JmsConfiguration createConfiguration() {
        return new JmsConfiguration();
    }

}
