| /** |
| * Licensed to the Apache Software Foundation (ASF) under one or more |
| * contributor license agreements. See the NOTICE file distributed with |
| * this work for additional information regarding copyright ownership. |
| * The ASF licenses this file to You under the Apache License, Version 2.0 |
| * (the "License"); you may not use this file except in compliance with |
| * the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| package org.apache.camel.component.jms; |
| |
| import java.util.concurrent.Executor; |
| import java.util.concurrent.ExecutorService; |
| import java.util.concurrent.atomic.AtomicInteger; |
| import javax.jms.ConnectionFactory; |
| import javax.jms.Destination; |
| import javax.jms.ExceptionListener; |
| import javax.jms.JMSException; |
| import javax.jms.Message; |
| import javax.jms.Queue; |
| import javax.jms.Session; |
| import javax.jms.TemporaryQueue; |
| import javax.jms.TemporaryTopic; |
| import javax.jms.Topic; |
| |
| import org.apache.camel.AsyncEndpoint; |
| import org.apache.camel.Exchange; |
| import org.apache.camel.ExchangePattern; |
| import org.apache.camel.LoggingLevel; |
| import org.apache.camel.MultipleConsumersSupport; |
| import org.apache.camel.PollingConsumer; |
| import org.apache.camel.Processor; |
| import org.apache.camel.Producer; |
| import org.apache.camel.Service; |
| import org.apache.camel.api.management.ManagedAttribute; |
| import org.apache.camel.api.management.ManagedResource; |
| import org.apache.camel.impl.DefaultEndpoint; |
| import org.apache.camel.impl.SynchronousDelegateProducer; |
| import org.apache.camel.spi.HeaderFilterStrategy; |
| import org.apache.camel.spi.HeaderFilterStrategyAware; |
| import org.apache.camel.spi.Metadata; |
| import org.apache.camel.spi.UriEndpoint; |
| import org.apache.camel.spi.UriParam; |
| import org.apache.camel.spi.UriPath; |
| import org.apache.camel.util.StringHelper; |
| import org.apache.camel.util.UnsafeUriCharactersEncoder; |
| import org.slf4j.Logger; |
| import org.slf4j.LoggerFactory; |
| import org.springframework.core.task.TaskExecutor; |
| import org.springframework.jms.core.JmsOperations; |
| import org.springframework.jms.listener.AbstractMessageListenerContainer; |
| import org.springframework.jms.listener.DefaultMessageListenerContainer; |
| import org.springframework.jms.listener.SimpleMessageListenerContainer; |
| import org.springframework.jms.support.converter.MessageConverter; |
| import org.springframework.jms.support.destination.DestinationResolver; |
| import org.springframework.transaction.PlatformTransactionManager; |
| import org.springframework.util.ErrorHandler; |
| |
| /** |
| * The jms component allows messages to be sent to (or consumed from) a JMS Queue or Topic. |
| * |
| * This component uses Spring JMS and supports JMS 1.1 and 2.0 API. |
| */ |
| @ManagedResource(description = "Managed JMS Endpoint") |
| @UriEndpoint(firstVersion = "1.0.0", scheme = "jms", title = "JMS", syntax = "jms:destinationType:destinationName", consumerClass = JmsConsumer.class, label = "messaging") |
| public class JmsEndpoint extends DefaultEndpoint implements AsyncEndpoint, HeaderFilterStrategyAware, MultipleConsumersSupport, Service { |
| protected final Logger log = LoggerFactory.getLogger(getClass()); |
| private final AtomicInteger runningMessageListeners = new AtomicInteger(); |
| private boolean pubSubDomain; |
| private JmsBinding binding; |
| @UriPath(defaultValue = "queue", enums = "queue,topic,temp-queue,temp-topic", description = "The kind of destination to use") |
| private String destinationType; |
| @UriPath(description = "Name of the queue or topic to use as destination") |
| @Metadata(required = "true") |
| private String destinationName; |
| private Destination destination; |
| @UriParam(label = "advanced", description = "To use a custom HeaderFilterStrategy to filter header to and from Camel message.") |
| private HeaderFilterStrategy headerFilterStrategy; |
| @UriParam |
| private JmsConfiguration configuration; |
| |
| public JmsEndpoint() { |
| this(null, null); |
| } |
| |
| public JmsEndpoint(Topic destination) throws JMSException { |
| this("jms:topic:" + destination.getTopicName(), null); |
| this.destination = destination; |
| this.destinationType = "topic"; |
| } |
| |
| public JmsEndpoint(String uri, JmsComponent component, String destinationName, boolean pubSubDomain, JmsConfiguration configuration) { |
| super(UnsafeUriCharactersEncoder.encode(uri), component); |
| this.configuration = configuration; |
| this.destinationName = destinationName; |
| this.pubSubDomain = pubSubDomain; |
| if (pubSubDomain) { |
| this.destinationType = "topic"; |
| } else { |
| this.destinationType = "queue"; |
| } |
| } |
| |
| @SuppressWarnings("deprecation") |
| public JmsEndpoint(String endpointUri, JmsBinding binding, JmsConfiguration configuration, String destinationName, boolean pubSubDomain) { |
| super(UnsafeUriCharactersEncoder.encode(endpointUri)); |
| this.binding = binding; |
| this.configuration = configuration; |
| this.destinationName = destinationName; |
| this.pubSubDomain = pubSubDomain; |
| if (pubSubDomain) { |
| this.destinationType = "topic"; |
| } else { |
| this.destinationType = "queue"; |
| } |
| } |
| |
| public JmsEndpoint(String endpointUri, String destinationName, boolean pubSubDomain) { |
| this(UnsafeUriCharactersEncoder.encode(endpointUri), null, new JmsConfiguration(), destinationName, pubSubDomain); |
| this.binding = new JmsBinding(this); |
| if (pubSubDomain) { |
| this.destinationType = "topic"; |
| } else { |
| this.destinationType = "queue"; |
| } |
| } |
| |
| /** |
| * Creates a pub-sub endpoint with the given destination |
| */ |
| public JmsEndpoint(String endpointUri, String destinationName) { |
| this(UnsafeUriCharactersEncoder.encode(endpointUri), destinationName, true); |
| } |
| |
| /** |
| * Returns a new JMS endpoint for the given JMS destination using the configuration from the given JMS component |
| */ |
| public static JmsEndpoint newInstance(Destination destination, JmsComponent component) throws JMSException { |
| JmsEndpoint answer = newInstance(destination); |
| JmsConfiguration newConfiguration = component.getConfiguration().copy(); |
| answer.setConfiguration(newConfiguration); |
| answer.setCamelContext(component.getCamelContext()); |
| return answer; |
| } |
| |
| /** |
| * Returns a new JMS endpoint for the given JMS destination |
| */ |
| public static JmsEndpoint newInstance(Destination destination) throws JMSException { |
| if (destination instanceof TemporaryQueue) { |
| return new JmsTemporaryQueueEndpoint((TemporaryQueue) destination); |
| } else if (destination instanceof TemporaryTopic) { |
| return new JmsTemporaryTopicEndpoint((TemporaryTopic) destination); |
| } else if (destination instanceof Queue) { |
| return new JmsQueueEndpoint((Queue) destination); |
| } else { |
| return new JmsEndpoint((Topic) destination); |
| } |
| } |
| |
| public Producer createProducer() throws Exception { |
| Producer answer = new JmsProducer(this); |
| if (isSynchronous()) { |
| return new SynchronousDelegateProducer(answer); |
| } else { |
| return answer; |
| } |
| } |
| |
| public JmsConsumer createConsumer(Processor processor) throws Exception { |
| AbstractMessageListenerContainer listenerContainer = createMessageListenerContainer(); |
| return createConsumer(processor, listenerContainer); |
| } |
| |
| public AbstractMessageListenerContainer createMessageListenerContainer() throws Exception { |
| return configuration.createMessageListenerContainer(this); |
| } |
| |
| public void configureListenerContainer(AbstractMessageListenerContainer listenerContainer, JmsConsumer consumer) { |
| if (destinationName != null) { |
| listenerContainer.setDestinationName(destinationName); |
| log.debug("Using destinationName: {} on listenerContainer: {}", destinationName, listenerContainer); |
| } else if (destination != null) { |
| listenerContainer.setDestination(destination); |
| log.debug("Using destination: {} on listenerContainer: {}", destinationName, listenerContainer); |
| } else { |
| DestinationResolver resolver = getDestinationResolver(); |
| if (resolver != null) { |
| listenerContainer.setDestinationResolver(resolver); |
| } else { |
| throw new IllegalArgumentException("Neither destination, destinationName or destinationResolver are specified on this endpoint!"); |
| } |
| log.debug("Using destinationResolver: {} on listenerContainer: {}", resolver, listenerContainer); |
| } |
| listenerContainer.setPubSubDomain(pubSubDomain); |
| |
| // include destination name as part of thread and transaction name |
| String consumerName = getThreadName(); |
| |
| if (configuration.getTaskExecutor() != null) { |
| if (log.isDebugEnabled()) { |
| log.debug("Using custom TaskExecutor: {} on listener container: {}", configuration.getTaskExecutor(), listenerContainer); |
| } |
| setContainerTaskExecutor(listenerContainer, configuration.getTaskExecutor()); |
| // we are using a shared thread pool that this listener container is using. |
| // store a reference to the consumer, but we should not shutdown the thread pool when the consumer stops |
| // as the lifecycle of the shared thread pool is handled elsewhere |
| if (configuration.getTaskExecutor() instanceof ExecutorService) { |
| consumer.setListenerContainerExecutorService((ExecutorService) configuration.getTaskExecutor(), false); |
| } |
| } else if ((listenerContainer instanceof DefaultJmsMessageListenerContainer && configuration.getDefaultTaskExecutorType() == null) |
| || !(listenerContainer instanceof DefaultJmsMessageListenerContainer)) { |
| // preserve backwards compatibility if an explicit Default TaskExecutor Type was not set; |
| // otherwise, defer the creation of the TaskExecutor |
| // use a cached pool as DefaultMessageListenerContainer will throttle pool sizing |
| ExecutorService executor = getCamelContext().getExecutorServiceManager().newCachedThreadPool(consumer, consumerName); |
| setContainerTaskExecutor(listenerContainer, executor); |
| // we created a new private thread pool that this listener container is using, now store a reference on the consumer |
| // so when the consumer is stopped we can shutdown the thread pool also, to ensure all resources is shutdown |
| consumer.setListenerContainerExecutorService(executor, true); |
| } else { |
| // do nothing, as we're working with a DefaultJmsMessageListenerContainer with an explicit DefaultTaskExecutorType, |
| // so DefaultJmsMessageListenerContainer#createDefaultTaskExecutor will handle the creation |
| log.debug("Deferring creation of TaskExecutor for listener container: {} as per policy: {}", |
| listenerContainer, getDefaultTaskExecutorType()); |
| } |
| |
| // set a default transaction name if none provided |
| if (configuration.getTransactionName() == null) { |
| if (listenerContainer instanceof DefaultMessageListenerContainer) { |
| ((DefaultMessageListenerContainer) listenerContainer).setTransactionName(consumerName); |
| } |
| } |
| |
| // now configure the JMS 2.0 API |
| if (configuration.getDurableSubscriptionName() != null) { |
| listenerContainer.setDurableSubscriptionName(configuration.getDurableSubscriptionName()); |
| } else if (configuration.isSubscriptionDurable()) { |
| listenerContainer.setSubscriptionDurable(true); |
| if (configuration.getSubscriptionName() != null) { |
| listenerContainer.setSubscriptionName(configuration.getSubscriptionName()); |
| } |
| } |
| listenerContainer.setSubscriptionShared(configuration.isSubscriptionShared()); |
| } |
| |
| private void setContainerTaskExecutor(AbstractMessageListenerContainer listenerContainer, Executor executor) { |
| if (listenerContainer instanceof SimpleMessageListenerContainer) { |
| ((SimpleMessageListenerContainer) listenerContainer).setTaskExecutor(executor); |
| } else if (listenerContainer instanceof DefaultMessageListenerContainer) { |
| ((DefaultMessageListenerContainer) listenerContainer).setTaskExecutor(executor); |
| } |
| } |
| |
| /** |
| * Gets the destination name which was configured from the endpoint uri. |
| * |
| * @return the destination name resolved from the endpoint uri |
| */ |
| public String getEndpointConfiguredDestinationName() { |
| String remainder = StringHelper.after(getEndpointKey(), "//"); |
| if (remainder != null && remainder.contains("?")) { |
| // remove parameters |
| remainder = StringHelper.before(remainder, "?"); |
| } |
| return JmsMessageHelper.normalizeDestinationName(remainder); |
| } |
| |
| /** |
| * Creates a consumer using the given processor and listener container |
| * |
| * @param processor the processor to use to process the messages |
| * @param listenerContainer the listener container |
| * @return a newly created consumer |
| * @throws Exception if the consumer cannot be created |
| */ |
| public JmsConsumer createConsumer(Processor processor, AbstractMessageListenerContainer listenerContainer) throws Exception { |
| JmsConsumer consumer = new JmsConsumer(this, processor, listenerContainer); |
| configureListenerContainer(listenerContainer, consumer); |
| configureConsumer(consumer); |
| return consumer; |
| } |
| |
| @Override |
| public PollingConsumer createPollingConsumer() throws Exception { |
| JmsPollingConsumer answer = new JmsPollingConsumer(this); |
| configurePollingConsumer(answer); |
| return answer; |
| } |
| |
| @Override |
| public Exchange createExchange(ExchangePattern pattern) { |
| Exchange exchange = super.createExchange(pattern); |
| exchange.setProperty(Exchange.BINDING, getBinding()); |
| return exchange; |
| } |
| |
| public Exchange createExchange(Message message, Session session) { |
| Exchange exchange = createExchange(getExchangePattern()); |
| exchange.setIn(new JmsMessage(message, session, getBinding())); |
| return exchange; |
| } |
| |
| /** |
| * Factory method for creating a new template for InOnly message exchanges |
| */ |
| public JmsOperations createInOnlyTemplate() { |
| return configuration.createInOnlyTemplate(this, pubSubDomain, destinationName); |
| } |
| |
| /** |
| * Factory method for creating a new template for InOut message exchanges |
| */ |
| public JmsOperations createInOutTemplate() { |
| return configuration.createInOutTemplate(this, pubSubDomain, destinationName, configuration.getRequestTimeout()); |
| } |
| |
| public boolean isMultipleConsumersSupported() { |
| // JMS allows multiple consumers on both queues and topics |
| return true; |
| } |
| |
| public String getThreadName() { |
| return "JmsConsumer[" + getEndpointConfiguredDestinationName() + "]"; |
| } |
| |
| // Properties |
| // ------------------------------------------------------------------------- |
| |
| @Override |
| public JmsComponent getComponent() { |
| return (JmsComponent) super.getComponent(); |
| } |
| |
| public HeaderFilterStrategy getHeaderFilterStrategy() { |
| if (headerFilterStrategy == null) { |
| headerFilterStrategy = new JmsHeaderFilterStrategy(isIncludeAllJMSXProperties()); |
| } |
| return headerFilterStrategy; |
| } |
| |
| /** |
| * To use a custom HeaderFilterStrategy to filter header to and from Camel message. |
| */ |
| public void setHeaderFilterStrategy(HeaderFilterStrategy strategy) { |
| this.headerFilterStrategy = strategy; |
| } |
| |
| public JmsBinding getBinding() { |
| if (binding == null) { |
| binding = createBinding(); |
| } |
| return binding; |
| } |
| |
| /** |
| * Creates the {@link org.apache.camel.component.jms.JmsBinding} to use. |
| */ |
| protected JmsBinding createBinding() { |
| return new JmsBinding(this); |
| } |
| |
| /** |
| * Sets the binding used to convert from a Camel message to and from a JMS |
| * message |
| */ |
| public void setBinding(JmsBinding binding) { |
| this.binding = binding; |
| } |
| |
| public String getDestinationType() { |
| return destinationType; |
| } |
| |
| /** |
| * The kind of destination to use |
| */ |
| public void setDestinationType(String destinationType) { |
| this.destinationType = destinationType; |
| } |
| |
| public String getDestinationName() { |
| return destinationName; |
| } |
| |
| /** |
| * Name of the queue or topic to use as destination |
| */ |
| public void setDestinationName(String destinationName) { |
| this.destinationName = destinationName; |
| } |
| |
| public Destination getDestination() { |
| return destination; |
| } |
| |
| /** |
| * Allows a specific JMS Destination object to be used as the destination |
| */ |
| public void setDestination(Destination destination) { |
| this.destination = destination; |
| } |
| |
| public JmsConfiguration getConfiguration() { |
| return configuration; |
| } |
| |
| public void setConfiguration(JmsConfiguration configuration) { |
| this.configuration = configuration; |
| } |
| |
| public boolean isSingleton() { |
| return true; |
| } |
| |
| @ManagedAttribute |
| public boolean isPubSubDomain() { |
| return pubSubDomain; |
| } |
| |
| /** |
| * Lazily loads the temporary queue type if one has not been explicitly configured |
| * via calling the {@link JmsProviderMetadata#setTemporaryQueueType(Class)} |
| * on the {@link #getConfiguration()} instance |
| */ |
| public Class<? extends TemporaryQueue> getTemporaryQueueType() { |
| JmsProviderMetadata metadata = getProviderMetadata(); |
| JmsOperations template = getMetadataJmsOperations(); |
| return metadata.getTemporaryQueueType(template); |
| } |
| |
| /** |
| * Lazily loads the temporary topic type if one has not been explicitly configured |
| * via calling the {@link JmsProviderMetadata#setTemporaryTopicType(Class)} |
| * on the {@link #getConfiguration()} instance |
| */ |
| public Class<? extends TemporaryTopic> getTemporaryTopicType() { |
| JmsOperations template = getMetadataJmsOperations(); |
| JmsProviderMetadata metadata = getProviderMetadata(); |
| return metadata.getTemporaryTopicType(template); |
| } |
| |
| /** |
| * Returns the provider metadata |
| */ |
| protected JmsProviderMetadata getProviderMetadata() { |
| JmsConfiguration conf = getConfiguration(); |
| JmsProviderMetadata metadata = conf.getProviderMetadata(); |
| return metadata; |
| } |
| |
| /** |
| * Returns the {@link JmsOperations} used for metadata operations such as creating temporary destinations |
| */ |
| protected JmsOperations getMetadataJmsOperations() { |
| JmsOperations template = getConfiguration().getMetadataJmsOperations(this); |
| if (template == null) { |
| throw new IllegalArgumentException("No Metadata JmsTemplate supplied!"); |
| } |
| return template; |
| } |
| |
| protected ExecutorService getAsyncStartStopExecutorService() { |
| if (getComponent() == null) { |
| throw new IllegalStateException("AsyncStartStopListener requires JmsComponent to be configured on this endpoint: " + this); |
| } |
| // use shared thread pool from component |
| return getComponent().getAsyncStartStopExecutorService(); |
| } |
| |
| public void onListenerContainerStarting(AbstractMessageListenerContainer container) { |
| runningMessageListeners.incrementAndGet(); |
| } |
| |
| public void onListenerContainerStopped(AbstractMessageListenerContainer container) { |
| runningMessageListeners.decrementAndGet(); |
| } |
| |
| /** |
| * State whether this endpoint is running (eg started) |
| */ |
| protected boolean isRunning() { |
| return isStarted(); |
| } |
| |
| @Override |
| public void stop() throws Exception { |
| int running = runningMessageListeners.get(); |
| if (running <= 0) { |
| super.stop(); |
| } else { |
| log.trace("There are still {} running message listeners. Cannot stop endpoint {}", running, this); |
| } |
| } |
| |
| @Override |
| public void shutdown() throws Exception { |
| int running = runningMessageListeners.get(); |
| if (running <= 0) { |
| super.shutdown(); |
| } else { |
| log.trace("There are still {} running message listeners. Cannot shutdown endpoint {}", running, this); |
| } |
| } |
| |
| // Delegated properties from the configuration |
| //------------------------------------------------------------------------- |
| @ManagedAttribute |
| public int getAcknowledgementMode() { |
| return getConfiguration().getAcknowledgementMode(); |
| } |
| |
| @ManagedAttribute |
| public String getAcknowledgementModeName() { |
| return getConfiguration().getAcknowledgementModeName(); |
| } |
| |
| @ManagedAttribute |
| public int getCacheLevel() { |
| return getConfiguration().getCacheLevel(); |
| } |
| |
| @ManagedAttribute |
| public String getCacheLevelName() { |
| return getConfiguration().getCacheLevelName(); |
| } |
| |
| @ManagedAttribute |
| public String getReplyToCacheLevelName() { |
| return getConfiguration().getReplyToCacheLevelName(); |
| } |
| |
| @ManagedAttribute |
| public String getClientId() { |
| return getConfiguration().getClientId(); |
| } |
| |
| @ManagedAttribute |
| public int getConcurrentConsumers() { |
| return getConfiguration().getConcurrentConsumers(); |
| } |
| |
| @ManagedAttribute |
| public int getReplyToConcurrentConsumers() { |
| return getConfiguration().getReplyToConcurrentConsumers(); |
| } |
| |
| public ConnectionFactory getConnectionFactory() { |
| return getConfiguration().getConnectionFactory(); |
| } |
| |
| public DestinationResolver getDestinationResolver() { |
| return getConfiguration().getDestinationResolver(); |
| } |
| |
| @ManagedAttribute |
| public String getDurableSubscriptionName() { |
| return getConfiguration().getDurableSubscriptionName(); |
| } |
| |
| public ExceptionListener getExceptionListener() { |
| return getConfiguration().getExceptionListener(); |
| } |
| |
| public ErrorHandler getErrorHandler() { |
| return getConfiguration().getErrorHandler(); |
| } |
| |
| public LoggingLevel getErrorHandlerLoggingLevel() { |
| return getConfiguration().getErrorHandlerLoggingLevel(); |
| } |
| |
| @ManagedAttribute |
| public boolean isErrorHandlerLogStackTrace() { |
| return getConfiguration().isErrorHandlerLogStackTrace(); |
| } |
| |
| @ManagedAttribute |
| public void setErrorHandlerLogStackTrace(boolean errorHandlerLogStackTrace) { |
| getConfiguration().setErrorHandlerLogStackTrace(errorHandlerLogStackTrace); |
| } |
| |
| @ManagedAttribute |
| public int getIdleTaskExecutionLimit() { |
| return getConfiguration().getIdleTaskExecutionLimit(); |
| } |
| |
| @ManagedAttribute |
| public int getIdleConsumerLimit() { |
| return getConfiguration().getIdleConsumerLimit(); |
| } |
| |
| public JmsOperations getJmsOperations() { |
| return getConfiguration().getJmsOperations(); |
| } |
| |
| public ConnectionFactory getListenerConnectionFactory() { |
| return getConfiguration().getListenerConnectionFactory(); |
| } |
| |
| @ManagedAttribute |
| public int getMaxConcurrentConsumers() { |
| return getConfiguration().getMaxConcurrentConsumers(); |
| } |
| |
| @ManagedAttribute |
| public int getReplyToMaxConcurrentConsumers() { |
| return getConfiguration().getReplyToMaxConcurrentConsumers(); |
| } |
| |
| @ManagedAttribute |
| public int getReplyToOnTimeoutMaxConcurrentConsumers() { |
| return getConfiguration().getReplyToOnTimeoutMaxConcurrentConsumers(); |
| } |
| |
| @ManagedAttribute |
| public int getMaxMessagesPerTask() { |
| return getConfiguration().getMaxMessagesPerTask(); |
| } |
| |
| public MessageConverter getMessageConverter() { |
| return getConfiguration().getMessageConverter(); |
| } |
| |
| public JmsOperations getMetadataJmsOperations(JmsEndpoint endpoint) { |
| return getConfiguration().getMetadataJmsOperations(endpoint); |
| } |
| |
| @ManagedAttribute |
| public int getPriority() { |
| return getConfiguration().getPriority(); |
| } |
| |
| @ManagedAttribute |
| public long getReceiveTimeout() { |
| return getConfiguration().getReceiveTimeout(); |
| } |
| |
| @ManagedAttribute |
| public long getRecoveryInterval() { |
| return getConfiguration().getRecoveryInterval(); |
| } |
| |
| @ManagedAttribute |
| public String getReplyTo() { |
| return getConfiguration().getReplyTo(); |
| } |
| |
| @ManagedAttribute |
| public String getReplyToOverride() { |
| return getConfiguration().getReplyToOverride(); |
| } |
| |
| @ManagedAttribute |
| public boolean isReplyToSameDestinationAllowed() { |
| return getConfiguration().isReplyToSameDestinationAllowed(); |
| } |
| |
| @ManagedAttribute |
| public String getReplyToDestinationSelectorName() { |
| return getConfiguration().getReplyToDestinationSelectorName(); |
| } |
| |
| @ManagedAttribute |
| public long getRequestTimeout() { |
| return getConfiguration().getRequestTimeout(); |
| } |
| |
| @ManagedAttribute |
| public long getRequestTimeoutCheckerInterval() { |
| return getConfiguration().getRequestTimeoutCheckerInterval(); |
| } |
| |
| public TaskExecutor getTaskExecutor() { |
| return getConfiguration().getTaskExecutor(); |
| } |
| |
| public ConnectionFactory getTemplateConnectionFactory() { |
| return getConfiguration().getTemplateConnectionFactory(); |
| } |
| |
| @ManagedAttribute |
| public long getTimeToLive() { |
| return getConfiguration().getTimeToLive(); |
| } |
| |
| public PlatformTransactionManager getTransactionManager() { |
| return getConfiguration().getTransactionManager(); |
| } |
| |
| @ManagedAttribute |
| public String getTransactionName() { |
| return getConfiguration().getTransactionName(); |
| } |
| |
| @ManagedAttribute |
| public int getTransactionTimeout() { |
| return getConfiguration().getTransactionTimeout(); |
| } |
| |
| @ManagedAttribute |
| public boolean isAcceptMessagesWhileStopping() { |
| return getConfiguration().isAcceptMessagesWhileStopping(); |
| } |
| |
| @ManagedAttribute |
| public boolean isAllowReplyManagerQuickStop() { |
| return getConfiguration().isAllowReplyManagerQuickStop(); |
| } |
| |
| @ManagedAttribute |
| public boolean isAlwaysCopyMessage() { |
| return getConfiguration().isAlwaysCopyMessage(); |
| } |
| |
| @ManagedAttribute |
| public boolean isAutoStartup() { |
| return getConfiguration().isAutoStartup(); |
| } |
| |
| @ManagedAttribute |
| public boolean isDeliveryPersistent() { |
| return getConfiguration().isDeliveryPersistent(); |
| } |
| |
| @ManagedAttribute |
| public Integer getDeliveryMode() { |
| return getConfiguration().getDeliveryMode(); |
| } |
| |
| @ManagedAttribute |
| public boolean isDisableReplyTo() { |
| return getConfiguration().isDisableReplyTo(); |
| } |
| |
| @ManagedAttribute |
| public boolean isEagerLoadingOfProperties() { |
| return getConfiguration().isEagerLoadingOfProperties(); |
| } |
| |
| @ManagedAttribute |
| public boolean isExplicitQosEnabled() { |
| return getConfiguration().isExplicitQosEnabled(); |
| } |
| |
| @ManagedAttribute |
| public boolean isExposeListenerSession() { |
| return getConfiguration().isExposeListenerSession(); |
| } |
| |
| @ManagedAttribute |
| public boolean isMessageIdEnabled() { |
| return getConfiguration().isMessageIdEnabled(); |
| } |
| |
| @ManagedAttribute |
| public boolean isMessageTimestampEnabled() { |
| return getConfiguration().isMessageTimestampEnabled(); |
| } |
| |
| @ManagedAttribute |
| public boolean isPreserveMessageQos() { |
| return getConfiguration().isPreserveMessageQos(); |
| } |
| |
| @ManagedAttribute |
| public boolean isPubSubNoLocal() { |
| return getConfiguration().isPubSubNoLocal(); |
| } |
| |
| @ManagedAttribute |
| public boolean isReplyToDeliveryPersistent() { |
| return getConfiguration().isReplyToDeliveryPersistent(); |
| } |
| |
| @ManagedAttribute |
| public boolean isTransacted() { |
| return getConfiguration().isTransacted(); |
| } |
| |
| @ManagedAttribute |
| public boolean isLazyCreateTransactionManager() { |
| return getConfiguration().isLazyCreateTransactionManager(); |
| } |
| |
| @ManagedAttribute |
| @Deprecated |
| public boolean isTransactedInOut() { |
| return getConfiguration().isTransactedInOut(); |
| } |
| |
| @ManagedAttribute |
| public boolean isUseMessageIDAsCorrelationID() { |
| return getConfiguration().isUseMessageIDAsCorrelationID(); |
| } |
| |
| @ManagedAttribute |
| public void setAcceptMessagesWhileStopping(boolean acceptMessagesWhileStopping) { |
| getConfiguration().setAcceptMessagesWhileStopping(acceptMessagesWhileStopping); |
| } |
| |
| @ManagedAttribute |
| public void setAllowReplyManagerQuickStop(boolean allowReplyManagerQuickStop) { |
| getConfiguration().setAllowReplyManagerQuickStop(allowReplyManagerQuickStop); |
| } |
| |
| @ManagedAttribute |
| public void setAcknowledgementMode(int consumerAcknowledgementMode) { |
| getConfiguration().setAcknowledgementMode(consumerAcknowledgementMode); |
| } |
| |
| @ManagedAttribute |
| public void setAcknowledgementModeName(String consumerAcknowledgementMode) { |
| getConfiguration().setAcknowledgementModeName(consumerAcknowledgementMode); |
| } |
| |
| @ManagedAttribute |
| public void setAlwaysCopyMessage(boolean alwaysCopyMessage) { |
| getConfiguration().setAlwaysCopyMessage(alwaysCopyMessage); |
| } |
| |
| @ManagedAttribute |
| public void setAutoStartup(boolean autoStartup) { |
| getConfiguration().setAutoStartup(autoStartup); |
| } |
| |
| @ManagedAttribute |
| public void setCacheLevel(int cacheLevel) { |
| getConfiguration().setCacheLevel(cacheLevel); |
| } |
| |
| @ManagedAttribute |
| public void setCacheLevelName(String cacheName) { |
| getConfiguration().setCacheLevelName(cacheName); |
| } |
| |
| @ManagedAttribute |
| public void setReplyToCacheLevelName(String cacheName) { |
| getConfiguration().setReplyToCacheLevelName(cacheName); |
| } |
| |
| @ManagedAttribute |
| public void setClientId(String consumerClientId) { |
| getConfiguration().setClientId(consumerClientId); |
| } |
| |
| @ManagedAttribute |
| public void setConcurrentConsumers(int concurrentConsumers) { |
| getConfiguration().setConcurrentConsumers(concurrentConsumers); |
| } |
| |
| @ManagedAttribute |
| public void setReplyToConcurrentConsumers(int concurrentConsumers) { |
| getConfiguration().setReplyToConcurrentConsumers(concurrentConsumers); |
| } |
| |
| public void setConnectionFactory(ConnectionFactory connectionFactory) { |
| getConfiguration().setConnectionFactory(connectionFactory); |
| } |
| |
| @ManagedAttribute |
| public void setDeliveryPersistent(boolean deliveryPersistent) { |
| getConfiguration().setDeliveryPersistent(deliveryPersistent); |
| } |
| |
| @ManagedAttribute |
| public void setDeliveryMode(Integer deliveryMode) { |
| getConfiguration().setDeliveryMode(deliveryMode); |
| } |
| |
| public void setDestinationResolver(DestinationResolver destinationResolver) { |
| getConfiguration().setDestinationResolver(destinationResolver); |
| } |
| |
| @ManagedAttribute |
| public void setDisableReplyTo(boolean disableReplyTo) { |
| getConfiguration().setDisableReplyTo(disableReplyTo); |
| } |
| |
| @ManagedAttribute |
| public void setDurableSubscriptionName(String durableSubscriptionName) { |
| getConfiguration().setDurableSubscriptionName(durableSubscriptionName); |
| } |
| |
| @ManagedAttribute |
| public void setEagerLoadingOfProperties(boolean eagerLoadingOfProperties) { |
| getConfiguration().setEagerLoadingOfProperties(eagerLoadingOfProperties); |
| } |
| |
| public void setExceptionListener(ExceptionListener exceptionListener) { |
| getConfiguration().setExceptionListener(exceptionListener); |
| } |
| |
| public void setErrorHandler(ErrorHandler errorHandler) { |
| getConfiguration().setErrorHandler(errorHandler); |
| } |
| |
| @ManagedAttribute |
| public void setExplicitQosEnabled(boolean explicitQosEnabled) { |
| getConfiguration().setExplicitQosEnabled(explicitQosEnabled); |
| } |
| |
| @ManagedAttribute |
| public void setExposeListenerSession(boolean exposeListenerSession) { |
| getConfiguration().setExposeListenerSession(exposeListenerSession); |
| } |
| |
| @ManagedAttribute |
| public void setIdleTaskExecutionLimit(int idleTaskExecutionLimit) { |
| getConfiguration().setIdleTaskExecutionLimit(idleTaskExecutionLimit); |
| } |
| |
| @ManagedAttribute |
| public void setIdleConsumerLimit(int idleConsumerLimit) { |
| getConfiguration().setIdleConsumerLimit(idleConsumerLimit); |
| } |
| |
| public void setJmsOperations(JmsOperations jmsOperations) { |
| getConfiguration().setJmsOperations(jmsOperations); |
| } |
| |
| public void setListenerConnectionFactory(ConnectionFactory listenerConnectionFactory) { |
| getConfiguration().setListenerConnectionFactory(listenerConnectionFactory); |
| } |
| |
| @ManagedAttribute |
| public void setMaxConcurrentConsumers(int maxConcurrentConsumers) { |
| getConfiguration().setMaxConcurrentConsumers(maxConcurrentConsumers); |
| } |
| |
| @ManagedAttribute |
| public void setReplyToMaxConcurrentConsumers(int maxConcurrentConsumers) { |
| getConfiguration().setReplyToMaxConcurrentConsumers(maxConcurrentConsumers); |
| } |
| |
| @ManagedAttribute |
| public void setMaxMessagesPerTask(int maxMessagesPerTask) { |
| getConfiguration().setMaxMessagesPerTask(maxMessagesPerTask); |
| } |
| |
| public void setMessageConverter(MessageConverter messageConverter) { |
| getConfiguration().setMessageConverter(messageConverter); |
| } |
| |
| @ManagedAttribute |
| public void setMessageIdEnabled(boolean messageIdEnabled) { |
| getConfiguration().setMessageIdEnabled(messageIdEnabled); |
| } |
| |
| @ManagedAttribute |
| public void setMessageTimestampEnabled(boolean messageTimestampEnabled) { |
| getConfiguration().setMessageTimestampEnabled(messageTimestampEnabled); |
| } |
| |
| public void setMetadataJmsOperations(JmsOperations metadataJmsOperations) { |
| getConfiguration().setMetadataJmsOperations(metadataJmsOperations); |
| } |
| |
| @ManagedAttribute |
| public void setPreserveMessageQos(boolean preserveMessageQos) { |
| getConfiguration().setPreserveMessageQos(preserveMessageQos); |
| } |
| |
| @ManagedAttribute |
| public void setPriority(int priority) { |
| getConfiguration().setPriority(priority); |
| } |
| |
| public void setProviderMetadata(JmsProviderMetadata providerMetadata) { |
| getConfiguration().setProviderMetadata(providerMetadata); |
| } |
| |
| @ManagedAttribute |
| public void setPubSubNoLocal(boolean pubSubNoLocal) { |
| getConfiguration().setPubSubNoLocal(pubSubNoLocal); |
| } |
| |
| @ManagedAttribute |
| public void setReceiveTimeout(long receiveTimeout) { |
| getConfiguration().setReceiveTimeout(receiveTimeout); |
| } |
| |
| @ManagedAttribute |
| public void setRecoveryInterval(long recoveryInterval) { |
| getConfiguration().setRecoveryInterval(recoveryInterval); |
| } |
| |
| @ManagedAttribute |
| public void setReplyTo(String replyToDestination) { |
| getConfiguration().setReplyTo(replyToDestination); |
| } |
| |
| @ManagedAttribute |
| public void setReplyToOverride(String replyToDestination) { |
| getConfiguration().setReplyToOverride(replyToDestination); |
| } |
| |
| @ManagedAttribute |
| public void setReplyToSameDestinationAllowed(boolean replyToSameDestinationAllowed) { |
| getConfiguration().setReplyToSameDestinationAllowed(replyToSameDestinationAllowed); |
| } |
| |
| @ManagedAttribute |
| public void setReplyToDeliveryPersistent(boolean replyToDeliveryPersistent) { |
| getConfiguration().setReplyToDeliveryPersistent(replyToDeliveryPersistent); |
| } |
| |
| @ManagedAttribute |
| public void setReplyToDestinationSelectorName(String replyToDestinationSelectorName) { |
| getConfiguration().setReplyToDestinationSelectorName(replyToDestinationSelectorName); |
| } |
| |
| @ManagedAttribute |
| public void setRequestTimeout(long requestTimeout) { |
| getConfiguration().setRequestTimeout(requestTimeout); |
| } |
| |
| public void setTaskExecutor(TaskExecutor taskExecutor) { |
| getConfiguration().setTaskExecutor(taskExecutor); |
| } |
| |
| public void setTemplateConnectionFactory(ConnectionFactory templateConnectionFactory) { |
| getConfiguration().setTemplateConnectionFactory(templateConnectionFactory); |
| } |
| |
| @ManagedAttribute |
| public void setTimeToLive(long timeToLive) { |
| getConfiguration().setTimeToLive(timeToLive); |
| } |
| |
| @ManagedAttribute |
| public void setTransacted(boolean consumerTransacted) { |
| getConfiguration().setTransacted(consumerTransacted); |
| } |
| |
| @ManagedAttribute |
| @Deprecated |
| public void setTransactedInOut(boolean transactedInOut) { |
| getConfiguration().setTransactedInOut(transactedInOut); |
| } |
| |
| @ManagedAttribute |
| public void setLazyCreateTransactionManager(boolean lazyCreating) { |
| getConfiguration().setLazyCreateTransactionManager(lazyCreating); |
| } |
| |
| public void setTransactionManager(PlatformTransactionManager transactionManager) { |
| getConfiguration().setTransactionManager(transactionManager); |
| } |
| |
| @ManagedAttribute |
| public void setTransactionName(String transactionName) { |
| getConfiguration().setTransactionName(transactionName); |
| } |
| |
| @ManagedAttribute |
| public void setTransactionTimeout(int transactionTimeout) { |
| getConfiguration().setTransactionTimeout(transactionTimeout); |
| } |
| |
| @ManagedAttribute |
| public void setUseMessageIDAsCorrelationID(boolean useMessageIDAsCorrelationID) { |
| getConfiguration().setUseMessageIDAsCorrelationID(useMessageIDAsCorrelationID); |
| } |
| |
| public JmsMessageType getJmsMessageType() { |
| return getConfiguration().getJmsMessageType(); |
| } |
| |
| public void setJmsMessageType(JmsMessageType jmsMessageType) { |
| getConfiguration().setJmsMessageType(jmsMessageType); |
| } |
| |
| public JmsKeyFormatStrategy getJmsKeyFormatStrategy() { |
| return getConfiguration().getJmsKeyFormatStrategy(); |
| } |
| |
| public void setJmsKeyFormatStrategy(JmsKeyFormatStrategy jmsHeaderStrategy) { |
| getConfiguration().setJmsKeyFormatStrategy(jmsHeaderStrategy); |
| } |
| |
| public MessageCreatedStrategy getMessageCreatedStrategy() { |
| return getConfiguration().getMessageCreatedStrategy(); |
| } |
| |
| public void setMessageCreatedStrategy(MessageCreatedStrategy messageCreatedStrategy) { |
| getConfiguration().setMessageCreatedStrategy(messageCreatedStrategy); |
| } |
| |
| @ManagedAttribute |
| public boolean isTransferExchange() { |
| return getConfiguration().isTransferExchange(); |
| } |
| |
| @ManagedAttribute |
| public void setTransferExchange(boolean transferExchange) { |
| getConfiguration().setTransferExchange(transferExchange); |
| } |
| |
| @ManagedAttribute |
| public boolean isAllowSerializedHeaders() { |
| return getConfiguration().isAllowSerializedHeaders(); |
| } |
| |
| @ManagedAttribute |
| public void setAllowSerializedHeaders(boolean allowSerializedHeaders) { |
| getConfiguration().setAllowSerializedHeaders(allowSerializedHeaders); |
| } |
| |
| @ManagedAttribute |
| public boolean isTransferException() { |
| return getConfiguration().isTransferException(); |
| } |
| |
| @ManagedAttribute |
| public void setTransferException(boolean transferException) { |
| getConfiguration().setTransferException(transferException); |
| } |
| |
| @ManagedAttribute |
| public void setTransferFault(boolean transferFault) { |
| getConfiguration().setTransferFault(transferFault); |
| } |
| |
| @ManagedAttribute |
| public boolean isTransferFault() { |
| return getConfiguration().isTransferFault(); |
| } |
| |
| @ManagedAttribute |
| public boolean isTestConnectionOnStartup() { |
| return configuration.isTestConnectionOnStartup(); |
| } |
| |
| @ManagedAttribute |
| public void setTestConnectionOnStartup(boolean testConnectionOnStartup) { |
| configuration.setTestConnectionOnStartup(testConnectionOnStartup); |
| } |
| |
| @ManagedAttribute |
| public boolean isForceSendOriginalMessage() { |
| return configuration.isForceSendOriginalMessage(); |
| } |
| |
| @ManagedAttribute |
| public void setForceSendOriginalMessage(boolean forceSendOriginalMessage) { |
| configuration.setForceSendOriginalMessage(forceSendOriginalMessage); |
| } |
| |
| @ManagedAttribute |
| public boolean isDisableTimeToLive() { |
| return configuration.isDisableTimeToLive(); |
| } |
| |
| @ManagedAttribute |
| public void setDisableTimeToLive(boolean disableTimeToLive) { |
| configuration.setDisableTimeToLive(disableTimeToLive); |
| } |
| |
| @ManagedAttribute |
| public void setAsyncConsumer(boolean asyncConsumer) { |
| configuration.setAsyncConsumer(asyncConsumer); |
| } |
| |
| @ManagedAttribute |
| public boolean isAsyncConsumer() { |
| return configuration.isAsyncConsumer(); |
| } |
| |
| @ManagedAttribute |
| public void setAsyncStartListener(boolean asyncStartListener) { |
| configuration.setAsyncStartListener(asyncStartListener); |
| } |
| |
| @ManagedAttribute |
| public boolean isAsyncStartListener() { |
| return configuration.isAsyncStartListener(); |
| } |
| |
| @ManagedAttribute |
| public void setAsyncStopListener(boolean asyncStopListener) { |
| configuration.setAsyncStopListener(asyncStopListener); |
| } |
| |
| @ManagedAttribute |
| public boolean isAsyncStopListener() { |
| return configuration.isAsyncStopListener(); |
| } |
| |
| @ManagedAttribute |
| public boolean isAllowNullBody() { |
| return configuration.isAllowNullBody(); |
| } |
| |
| @ManagedAttribute |
| public void setAllowNullBody(boolean allowNullBody) { |
| configuration.setAllowNullBody(allowNullBody); |
| } |
| |
| @ManagedAttribute |
| public boolean isIncludeSentJMSMessageID() { |
| return configuration.isIncludeSentJMSMessageID(); |
| } |
| |
| @ManagedAttribute |
| public void setIncludeSentJMSMessageID(boolean includeSentJMSMessageID) { |
| configuration.setIncludeSentJMSMessageID(includeSentJMSMessageID); |
| } |
| |
| @ManagedAttribute |
| public boolean isIncludeAllJMSXProperties() { |
| return configuration.isIncludeAllJMSXProperties(); |
| } |
| |
| @ManagedAttribute |
| public void setIncludeAllJMSXProperties(boolean includeAllJMSXProperties) { |
| configuration.setIncludeAllJMSXProperties(includeAllJMSXProperties); |
| } |
| |
| @ManagedAttribute |
| public DefaultTaskExecutorType getDefaultTaskExecutorType() { |
| return configuration.getDefaultTaskExecutorType(); |
| } |
| |
| public void setDefaultTaskExecutorType(DefaultTaskExecutorType type) { |
| configuration.setDefaultTaskExecutorType(type); |
| } |
| |
| @ManagedAttribute |
| public String getAllowAdditionalHeaders() { |
| return configuration.getAllowAdditionalHeaders(); |
| } |
| |
| @ManagedAttribute |
| public void setAllowAdditionalHeaders(String allowAdditionalHeaders) { |
| configuration.setAllowAdditionalHeaders(allowAdditionalHeaders); |
| } |
| |
| public MessageListenerContainerFactory getMessageListenerContainerFactory() { |
| return configuration.getMessageListenerContainerFactory(); |
| } |
| |
| public void setMessageListenerContainerFactory(MessageListenerContainerFactory messageListenerContainerFactory) { |
| configuration.setMessageListenerContainerFactory(messageListenerContainerFactory); |
| configuration.setConsumerType(ConsumerType.Custom); |
| } |
| |
| @ManagedAttribute |
| public boolean isSubscriptionDurable() { |
| return getConfiguration().isSubscriptionDurable(); |
| } |
| |
| @ManagedAttribute |
| public void setSubscriptionDurable(boolean subscriptionDurable) { |
| getConfiguration().setSubscriptionDurable(subscriptionDurable); |
| } |
| |
| @ManagedAttribute |
| public boolean isSubscriptionShared() { |
| return getConfiguration().isSubscriptionShared(); |
| } |
| |
| @ManagedAttribute |
| public void setSubscriptionShared(boolean subscriptionShared) { |
| getConfiguration().setSubscriptionShared(subscriptionShared); |
| } |
| |
| @ManagedAttribute |
| public String getSubscriptionName() { |
| return getConfiguration().getSubscriptionName(); |
| } |
| |
| @ManagedAttribute |
| public void setSubscriptionName(String subscriptionName) { |
| getConfiguration().setSubscriptionName(subscriptionName); |
| } |
| |
| |
| @ManagedAttribute |
| public String getReplyToType() { |
| if (configuration.getReplyToType() != null) { |
| return configuration.getReplyToType().name(); |
| } else { |
| return null; |
| } |
| } |
| |
| @ManagedAttribute |
| public void setReplyToType(String replyToType) { |
| ReplyToType type = ReplyToType.valueOf(replyToType); |
| configuration.setReplyToType(type); |
| } |
| |
| @ManagedAttribute(description = "Number of running message listeners") |
| public int getRunningMessageListeners() { |
| return runningMessageListeners.get(); |
| } |
| |
| @ManagedAttribute |
| public String getSelector() { |
| return configuration.getSelector(); |
| } |
| |
| public void setSelector(String selector) { |
| configuration.setSelector(selector); |
| } |
| |
| @ManagedAttribute |
| public int getWaitForProvisionCorrelationToBeUpdatedCounter() { |
| return configuration.getWaitForProvisionCorrelationToBeUpdatedCounter(); |
| } |
| |
| @ManagedAttribute |
| public void setWaitForProvisionCorrelationToBeUpdatedCounter(int counter) { |
| configuration.setWaitForProvisionCorrelationToBeUpdatedCounter(counter); |
| } |
| |
| @ManagedAttribute |
| public long getWaitForProvisionCorrelationToBeUpdatedThreadSleepingTime() { |
| return configuration.getWaitForProvisionCorrelationToBeUpdatedThreadSleepingTime(); |
| } |
| |
| @ManagedAttribute |
| public void setWaitForProvisionCorrelationToBeUpdatedThreadSleepingTime(long sleepingTime) { |
| configuration.setWaitForProvisionCorrelationToBeUpdatedThreadSleepingTime(sleepingTime); |
| } |
| |
| @ManagedAttribute |
| public boolean isFormatDateHeadersToIso8601() { |
| return configuration.isFormatDateHeadersToIso8601(); |
| } |
| |
| @ManagedAttribute |
| public void setFormatDateHeadersToIso8601(boolean formatDateHeadersToIso8601) { |
| configuration.setFormatDateHeadersToIso8601(formatDateHeadersToIso8601); |
| } |
| |
| // Implementation methods |
| //------------------------------------------------------------------------- |
| |
| @Override |
| protected String createEndpointUri() { |
| String scheme = "jms"; |
| if (destination != null) { |
| return scheme + ":" + destination; |
| } else if (destinationName != null) { |
| return scheme + ":" + destinationName; |
| } |
| DestinationResolver resolver = getDestinationResolver(); |
| if (resolver != null) { |
| return scheme + ":" + resolver; |
| } |
| return super.createEndpointUri(); |
| } |
| |
| } |