| /* |
| * Licensed to the Apache Software Foundation (ASF) under one or more |
| * contributor license agreements. See the NOTICE file distributed with |
| * this work for additional information regarding copyright ownership. |
| * The ASF licenses this file to You under the Apache License, Version 2.0 |
| * (the "License"); you may not use this file except in compliance with |
| * the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| package org.apache.qpid.jms; |
| |
| import java.net.URI; |
| import java.net.URISyntaxException; |
| import java.util.Map; |
| import java.util.Set; |
| import java.util.concurrent.ConcurrentHashMap; |
| import java.util.concurrent.CopyOnWriteArraySet; |
| import java.util.concurrent.LinkedBlockingQueue; |
| import java.util.concurrent.ThreadPoolExecutor; |
| import java.util.concurrent.TimeUnit; |
| import java.util.concurrent.atomic.AtomicBoolean; |
| import java.util.concurrent.atomic.AtomicLong; |
| import java.util.concurrent.atomic.AtomicReference; |
| |
| import jakarta.jms.Connection; |
| import jakarta.jms.ConnectionConsumer; |
| import jakarta.jms.ConnectionMetaData; |
| import jakarta.jms.Destination; |
| import jakarta.jms.ExceptionListener; |
| import jakarta.jms.IllegalStateException; |
| import jakarta.jms.InvalidClientIDException; |
| import jakarta.jms.InvalidDestinationException; |
| import jakarta.jms.JMSException; |
| import jakarta.jms.JMSRuntimeException; |
| import jakarta.jms.Queue; |
| import jakarta.jms.QueueConnection; |
| import jakarta.jms.QueueSession; |
| import jakarta.jms.ServerSessionPool; |
| import jakarta.jms.Session; |
| import jakarta.jms.TemporaryQueue; |
| import jakarta.jms.TemporaryTopic; |
| import jakarta.jms.Topic; |
| import jakarta.jms.TopicConnection; |
| import jakarta.jms.TopicSession; |
| |
| import org.apache.qpid.jms.exceptions.JmsConnectionFailedException; |
| import org.apache.qpid.jms.exceptions.JmsExceptionSupport; |
| import org.apache.qpid.jms.message.JmsInboundMessageDispatch; |
| import org.apache.qpid.jms.message.JmsMessage; |
| import org.apache.qpid.jms.message.JmsMessageFactory; |
| import org.apache.qpid.jms.message.JmsMessageTransformation; |
| import org.apache.qpid.jms.message.JmsOutboundMessageDispatch; |
| import org.apache.qpid.jms.meta.JmsConnectionId; |
| import org.apache.qpid.jms.meta.JmsConnectionInfo; |
| import org.apache.qpid.jms.meta.JmsConsumerId; |
| import org.apache.qpid.jms.meta.JmsConsumerInfo; |
| import org.apache.qpid.jms.meta.JmsProducerId; |
| import org.apache.qpid.jms.meta.JmsProducerInfo; |
| import org.apache.qpid.jms.meta.JmsResource; |
| import org.apache.qpid.jms.meta.JmsResource.ResourceState; |
| import org.apache.qpid.jms.meta.JmsSessionId; |
| import org.apache.qpid.jms.meta.JmsSessionInfo; |
| import org.apache.qpid.jms.meta.JmsTransactionId; |
| import org.apache.qpid.jms.meta.JmsTransactionInfo; |
| import org.apache.qpid.jms.policy.JmsDeserializationPolicy; |
| import org.apache.qpid.jms.policy.JmsMessageIDPolicy; |
| import org.apache.qpid.jms.policy.JmsPrefetchPolicy; |
| import org.apache.qpid.jms.policy.JmsPresettlePolicy; |
| import org.apache.qpid.jms.policy.JmsRedeliveryPolicy; |
| import org.apache.qpid.jms.provider.AsyncResult; |
| import org.apache.qpid.jms.provider.Provider; |
| import org.apache.qpid.jms.provider.ProviderConstants.ACK_TYPE; |
| import org.apache.qpid.jms.provider.ProviderException; |
| import org.apache.qpid.jms.provider.ProviderFuture; |
| import org.apache.qpid.jms.provider.ProviderListener; |
| import org.apache.qpid.jms.provider.ProviderSynchronization; |
| import org.apache.qpid.jms.tracing.JmsTracer; |
| import org.apache.qpid.jms.util.FifoMessageQueue; |
| import org.apache.qpid.jms.util.MessageQueue; |
| import org.apache.qpid.jms.util.PriorityMessageQueue; |
| import org.apache.qpid.jms.util.QpidJMSThreadFactory; |
| import org.apache.qpid.jms.util.ThreadPoolUtils; |
| import org.apache.qpid.jms.util.URISupport; |
| import org.slf4j.Logger; |
| import org.slf4j.LoggerFactory; |
| |
| /** |
| * Implementation of a JMS Connection |
| */ |
| public class JmsConnection implements AutoCloseable, Connection, TopicConnection, QueueConnection, ProviderListener { |
| |
| private static final Logger LOG = LoggerFactory.getLogger(JmsConnection.class); |
| |
| private final Map<JmsSessionId, JmsSession> sessions = new ConcurrentHashMap<>(); |
| private final Map<JmsConsumerId, JmsConnectionConsumer> connectionConsumers = new ConcurrentHashMap<>(); |
| private final AtomicBoolean connected = new AtomicBoolean(); |
| private final AtomicBoolean closed = new AtomicBoolean(); |
| private final AtomicBoolean closing = new AtomicBoolean(); |
| private final AtomicBoolean started = new AtomicBoolean(); |
| private final AtomicReference<Exception> failureCause = new AtomicReference<>(); |
| private final JmsConnectionInfo connectionInfo; |
| private final ThreadPoolExecutor executor; |
| |
| private ExceptionListener exceptionListener; |
| private JmsMessageFactory messageFactory; |
| private Provider provider; |
| |
| private final Set<JmsConnectionListener> connectionListeners = new CopyOnWriteArraySet<>(); |
| private final Map<JmsTemporaryDestination, JmsTemporaryDestination> tempDestinations = new ConcurrentHashMap<>(); |
| private final AtomicLong sessionIdGenerator = new AtomicLong(); |
| private final AtomicLong tempDestIdGenerator = new AtomicLong(); |
| private final AtomicLong transactionIdGenerator = new AtomicLong(); |
| private final AtomicLong connectionConsumerIdGenerator = new AtomicLong(); |
| private final Map<AsyncResult, AsyncResult> requests = new ConcurrentHashMap<>(); |
| |
| protected JmsConnection(final JmsConnectionInfo connectionInfo, Provider provider) throws JMSException { |
| |
| // This executor can be used for dispatching asynchronous tasks that might block or result |
| // in reentrant calls to this Connection that could block. The thread in this executor |
| // will also serve as a means of preventing JVM shutdown should a client application |
| // not have it's own mechanism for doing so if the configuration specifies that the |
| // Connection create this thread as a non-daemon thread. |
| executor = new ThreadPoolExecutor(1, 1, 5, TimeUnit.SECONDS, new LinkedBlockingQueue<Runnable>(), |
| new QpidJMSThreadFactory("QpidJMS Connection Executor: " + connectionInfo.getId(), connectionInfo.isUseDaemonThread())); |
| |
| executor.setRejectedExecutionHandler(new ThreadPoolExecutor.DiscardOldestPolicy()); |
| |
| // We need to start the core thread in order for it to prevent JVM shutdown as our |
| // single non-daemon thread. |
| executor.execute(new Runnable() { |
| @Override |
| public void run() { |
| } |
| }); |
| |
| this.provider = provider; |
| this.provider.setProviderListener(this); |
| try { |
| this.provider.start(); |
| } catch (Exception e) { |
| executor.shutdown(); |
| throw JmsExceptionSupport.create(e); |
| } |
| |
| this.connectionInfo = connectionInfo; |
| this.connectionInfo.setConnection(this); |
| } |
| |
| JmsConnection connect() throws JMSException { |
| if (provider == null) { |
| throw new IllegalStateException("Remote provider instance not set."); |
| } |
| |
| try { |
| provider.connect(connectionInfo); |
| } catch (Exception ex) { |
| URI remoteURI = connectionInfo.getConfiguredURI(); |
| LOG.error("Failed to connect to remote at: {}", |
| remoteURI.getScheme() + "://" + remoteURI.getHost() + ":" + remoteURI.getPort()); |
| LOG.trace("Error: ", ex); |
| try { |
| provider.close(); |
| } catch (Throwable ignored) {} |
| |
| try { |
| executor.shutdown(); |
| } catch (Throwable ignored) {} |
| |
| throw JmsExceptionSupport.create(ex); |
| } |
| |
| if (connectionInfo.isExplicitClientID() || !connectionInfo.isAwaitClientID()) { |
| createJmsConnection(); |
| } |
| |
| return this; |
| } |
| |
| @Override |
| public void close() throws JMSException { |
| boolean interrupted = Thread.interrupted(); |
| |
| for (JmsSession session : sessions.values()) { |
| session.checkIsDeliveryThread(); |
| session.checkIsCompletionThread(); |
| } |
| |
| try { |
| |
| if (!closed.get() && !isFailed()) { |
| // do not fail if already closed as specified by the JMS specification. |
| doStop(false); |
| } |
| |
| synchronized (this) { |
| |
| if (closed.get()) { |
| return; |
| } |
| |
| closing.set(true); |
| |
| for (JmsSession session : sessions.values()) { |
| session.shutdown(); |
| } |
| |
| for (JmsConnectionConsumer connectionConsumer : connectionConsumers.values()) { |
| connectionConsumer.shutdown(); |
| } |
| |
| if (isConnected() && !isFailed()) { |
| ProviderFuture request = provider.newProviderFuture(); |
| requests.put(request, request); |
| try { |
| provider.destroy(connectionInfo, request); |
| try { |
| request.sync(); |
| } catch (Exception ex) { |
| // NOTE - Spec is a bit vague here, we don't fail if already closed but |
| // in this case we really aren't closed yet so there could be an |
| // argument that at this point an exception is still valid. |
| if (ex.getCause() instanceof InterruptedException) { |
| throw (InterruptedException) ex.getCause(); |
| } |
| LOG.debug("Failed destroying Connection resource: {}", ex.getMessage()); |
| } |
| } catch(ProviderException prodiverError) { |
| LOG.debug("Ignoring provider exception during connection close"); |
| } finally { |
| requests.remove(request); |
| } |
| } |
| |
| sessions.clear(); |
| tempDestinations.clear(); |
| connected.set(false); |
| started.set(false); |
| closing.set(false); |
| closed.set(true); |
| connectionInfo.getTracer().close(); |
| } |
| } catch (Exception e) { |
| throw JmsExceptionSupport.create(e); |
| } finally { |
| try { |
| ThreadPoolUtils.shutdown(executor); |
| } catch (Throwable e) { |
| LOG.warn("Error shutting down thread pool: " + executor + ". This exception will be ignored.", e); |
| } |
| |
| if (provider != null) { |
| provider.close(); |
| provider = null; |
| } |
| |
| if (interrupted) { |
| Thread.currentThread().interrupt(); |
| } |
| } |
| } |
| |
| /** |
| * Called to free all Connection resources. |
| */ |
| protected void shutdown() throws JMSException { |
| shutdown(null); |
| } |
| |
| /** |
| * Called to free all Connection resources. |
| */ |
| protected void shutdown(Exception cause) throws JMSException { |
| // NOTE - Once ConnectionConsumer is added we must shutdown those as well. |
| |
| connectionInfo.setState(ResourceState.CLOSED); |
| |
| for (JmsSession session : sessions.values()) { |
| session.shutdown(cause); |
| } |
| |
| for (JmsConnectionConsumer connectionConsumer : connectionConsumers.values()) { |
| connectionConsumer.shutdown(); |
| } |
| |
| if (isConnected() && !isFailed() && !closing.get()) { |
| destroyResource(connectionInfo); |
| } |
| |
| tempDestinations.clear(); |
| started.set(false); |
| connected.set(false); |
| } |
| |
| @Override |
| public Session createSession() throws JMSException { |
| return createSession(false, Session.AUTO_ACKNOWLEDGE); |
| } |
| |
| @Override |
| public Session createSession(int acknowledgeMode) throws JMSException { |
| return createSession(acknowledgeMode == Session.SESSION_TRANSACTED ? true : false, acknowledgeMode); |
| } |
| |
| @Override |
| public Session createSession(boolean transacted, int acknowledgeMode) throws JMSException { |
| checkClosedOrFailed(); |
| createJmsConnection(); |
| int ackMode = getSessionAcknowledgeMode(transacted, acknowledgeMode); |
| JmsSession result = new JmsSession(this, getNextSessionId(), ackMode); |
| if (started.get()) { |
| result.start(); |
| } |
| return result; |
| } |
| |
| @Override |
| public synchronized String getClientID() throws JMSException { |
| checkClosedOrFailed(); |
| return connected.get() ? connectionInfo.getClientId() : null; |
| } |
| |
| @Override |
| public ConnectionMetaData getMetaData() throws JMSException { |
| checkClosedOrFailed(); |
| return JmsConnectionMetaData.INSTANCE; |
| } |
| |
| @Override |
| public synchronized void setClientID(String clientID) throws JMSException { |
| checkClosedOrFailed(); |
| |
| if (connectionInfo.isExplicitClientID()) { |
| throw new IllegalStateException("The clientID has already been set"); |
| } |
| if (clientID == null || clientID.isEmpty()) { |
| throw new InvalidClientIDException("Cannot have a null or empty clientID"); |
| } |
| if (connected.get()) { |
| throw new IllegalStateException("Cannot set the client id once connected."); |
| } |
| |
| this.connectionInfo.setClientId(clientID, true); |
| |
| // We weren't connected if we got this far, we should now connect to ensure the |
| // configured clientID is valid. |
| createJmsConnection(); |
| } |
| |
| @Override |
| public void start() throws JMSException { |
| checkClosedOrFailed(); |
| createJmsConnection(); |
| if (started.compareAndSet(false, true)) { |
| try { |
| for (JmsSession session : sessions.values()) { |
| session.start(); |
| } |
| |
| for (JmsConnectionConsumer connectionConsumer : connectionConsumers.values()) { |
| connectionConsumer.start(); |
| } |
| } catch (Exception e) { |
| throw JmsExceptionSupport.create(e); |
| } |
| } |
| } |
| |
| @Override |
| public void stop() throws JMSException { |
| doStop(true); |
| } |
| |
| /** |
| * Performs the actual connection stop with the option to disable the close check in cases |
| * where we don't want an exception. |
| * |
| * @see #stop() |
| * @param checkClosed <tt>true</tt> to check for already closed and throw |
| * {@link java.lang.IllegalStateException} if already closed, |
| * <tt>false</tt> to skip this check |
| * @throws JMSException if the JMS provider fails to stop message delivery due to some internal error. |
| */ |
| void doStop(boolean checkClosed) throws JMSException { |
| if (checkClosed) { |
| checkClosedOrFailed(); |
| } |
| |
| for (JmsSession session : sessions.values()) { |
| session.checkIsDeliveryThread(); |
| } |
| |
| if (started.compareAndSet(true, false)) { |
| synchronized (sessions) { |
| for (JmsSession session : sessions.values()) { |
| session.stop(); |
| } |
| } |
| |
| synchronized (connectionConsumers) { |
| for (JmsConnectionConsumer connectionConsumer : connectionConsumers.values()) { |
| connectionConsumer.stop(); |
| } |
| } |
| } |
| } |
| |
| @Override |
| public ConnectionConsumer createSharedConnectionConsumer(Topic topic, String subscriptionName, String messageSelector, ServerSessionPool sessionPool, int maxMessages) throws JMSException { |
| checkClosedOrFailed(); |
| createJmsConnection(); |
| |
| return createConnectionConsumer(topic, messageSelector, sessionPool, maxMessages, subscriptionName, false, true); |
| } |
| |
| @Override |
| public ConnectionConsumer createSharedDurableConnectionConsumer(Topic topic, String subscriptionName, String messageSelector, ServerSessionPool sessionPool, int maxMessages) throws JMSException { |
| checkClosedOrFailed(); |
| createJmsConnection(); |
| |
| return createConnectionConsumer(topic, messageSelector, sessionPool, maxMessages, subscriptionName, true, true); |
| } |
| |
| @Override |
| public ConnectionConsumer createDurableConnectionConsumer(Topic topic, String subscriptionName, String messageSelector, ServerSessionPool sessionPool, int maxMessages) throws JMSException { |
| checkClosedOrFailed(); |
| createJmsConnection(); |
| |
| return createConnectionConsumer(topic, messageSelector, sessionPool, maxMessages, subscriptionName, true, false); |
| } |
| |
| @Override |
| public ConnectionConsumer createConnectionConsumer(Destination destination, String messageSelector, ServerSessionPool sessionPool, int maxMessages) throws JMSException { |
| checkClosedOrFailed(); |
| createJmsConnection(); |
| |
| return createConnectionConsumer(destination, messageSelector, sessionPool, maxMessages, null, false, false); |
| } |
| |
| @Override |
| public ConnectionConsumer createConnectionConsumer(Topic topic, String messageSelector, ServerSessionPool sessionPool, int maxMessages) throws JMSException { |
| checkClosedOrFailed(); |
| createJmsConnection(); |
| |
| return createConnectionConsumer(topic, messageSelector, sessionPool, maxMessages, null, false, false); |
| } |
| |
| @Override |
| public ConnectionConsumer createConnectionConsumer(Queue queue, String messageSelector, ServerSessionPool sessionPool, int maxMessages) throws JMSException { |
| checkClosedOrFailed(); |
| createJmsConnection(); |
| |
| return createConnectionConsumer(queue, messageSelector, sessionPool, maxMessages, null, false, false); |
| } |
| |
| private ConnectionConsumer createConnectionConsumer(Destination destination, String messageSelector, ServerSessionPool sessionPool, int maxMessages, String subscriptionName, boolean durable, boolean shared) throws JMSException { |
| JmsDestination jmsDestination = JmsMessageTransformation.transformDestination(this, destination); |
| |
| int configuredPrefetch = getPrefetchPolicy().getConfiguredPrefetch((JmsSession) null, jmsDestination, durable, false); |
| |
| final MessageQueue messageQueue; |
| |
| if (isLocalMessagePriority()) { |
| messageQueue = new PriorityMessageQueue(); |
| } else { |
| messageQueue = new FifoMessageQueue(configuredPrefetch); |
| } |
| |
| JmsConsumerInfo consumerInfo = new JmsConsumerInfo(getNextConnectionConsumerId(), null); |
| consumerInfo.setExplicitClientID(isExplicitClientID()); |
| consumerInfo.setSelector(messageSelector); |
| consumerInfo.setDurable(durable); |
| consumerInfo.setSubscriptionName(subscriptionName); |
| consumerInfo.setShared(shared); |
| consumerInfo.setDestination(jmsDestination); |
| consumerInfo.setAcknowledgementMode(Session.AUTO_ACKNOWLEDGE); |
| consumerInfo.setNoLocal(false); |
| consumerInfo.setBrowser(false); |
| consumerInfo.setPrefetchSize(configuredPrefetch); |
| consumerInfo.setRedeliveryPolicy(getRedeliveryPolicy().copy()); |
| consumerInfo.setLocalMessageExpiry(isLocalMessageExpiry()); |
| consumerInfo.setPresettle(false); |
| consumerInfo.setDeserializationPolicy(getDeserializationPolicy().copy()); |
| consumerInfo.setMaxMessages(maxMessages); |
| consumerInfo.setConnectionConsumer(true); |
| |
| JmsConnectionConsumer consumer = new JmsConnectionConsumer(this, consumerInfo, messageQueue, sessionPool); |
| |
| try { |
| consumer.init(); |
| if (started.get()) { |
| consumer.start(); |
| } |
| return consumer; |
| } catch (JMSException jmsEx) { |
| consumer.close(); |
| throw jmsEx; |
| } |
| } |
| |
| @Override |
| public TopicSession createTopicSession(boolean transacted, int acknowledgeMode) throws JMSException { |
| checkClosedOrFailed(); |
| createJmsConnection(); |
| int ackMode = getSessionAcknowledgeMode(transacted, acknowledgeMode); |
| JmsTopicSession result = new JmsTopicSession(this, getNextSessionId(), ackMode); |
| addSession(result.getSessionInfo(), result); |
| if (started.get()) { |
| result.start(); |
| } |
| return result; |
| } |
| |
| @Override |
| public QueueSession createQueueSession(boolean transacted, int acknowledgeMode) throws JMSException { |
| checkClosedOrFailed(); |
| createJmsConnection(); |
| int ackMode = getSessionAcknowledgeMode(transacted, acknowledgeMode); |
| JmsQueueSession result = new JmsQueueSession(this, getNextSessionId(), ackMode); |
| addSession(result.getSessionInfo(), result); |
| if (started.get()) { |
| result.start(); |
| } |
| return result; |
| } |
| |
| public void onException(Exception ex) { |
| onException(JmsExceptionSupport.create(ex)); |
| } |
| |
| public void onException(JMSException ex) { |
| ExceptionListener listener = this.exceptionListener; |
| if (listener != null) { |
| listener.onException(JmsExceptionSupport.create(ex)); |
| } |
| } |
| |
| protected int getSessionAcknowledgeMode(boolean transacted, int acknowledgeMode) throws JMSException { |
| int result = acknowledgeMode; |
| if (!transacted && acknowledgeMode == Session.SESSION_TRANSACTED) { |
| throw new JMSException("acknowledgeMode SESSION_TRANSACTED cannot be used for an non-transacted Session"); |
| } |
| |
| if (transacted) { |
| result = Session.SESSION_TRANSACTED; |
| } else { |
| try { |
| JmsSession.validateSessionMode(acknowledgeMode); |
| } catch (JMSRuntimeException jmsre) { |
| throw new JMSException("acknowledgeMode " + acknowledgeMode + " cannot be used for an non-transacted Session"); |
| } |
| } |
| |
| return result; |
| } |
| |
| protected void removeSession(JmsSessionInfo sessionInfo) { |
| sessions.remove(sessionInfo.getId()); |
| } |
| |
| protected void addSession(JmsSessionInfo sessionInfo, JmsSession session) { |
| sessions.put(sessionInfo.getId(), session); |
| } |
| |
| protected void removeConnectionConsumer(JmsConsumerInfo consumerInfo) { |
| connectionConsumers.remove(consumerInfo.getId()); |
| } |
| |
| protected void addConnectionConsumer(JmsConsumerInfo consumerInfo, JmsConnectionConsumer consumer) { |
| connectionConsumers.put(consumerInfo.getId(), consumer); |
| } |
| |
| private void createJmsConnection() throws JMSException { |
| if (isConnected() || closed.get()) { |
| return; |
| } |
| |
| synchronized(this.connectionInfo) { |
| if (isConnected() || closed.get()) { |
| return; |
| } |
| |
| if (connectionInfo.getClientId() == null || connectionInfo.getClientId().trim().isEmpty()) { |
| throw new IllegalArgumentException("Client ID cannot be null or empty string"); |
| } |
| |
| createResource(connectionInfo); |
| connected.set(true); |
| } |
| } |
| |
| /** |
| * @return a newly initialized TemporaryQueue instance. |
| */ |
| protected TemporaryQueue createTemporaryQueue() throws JMSException { |
| String destinationName = connectionInfo.getId() + ":" + tempDestIdGenerator.incrementAndGet(); |
| JmsTemporaryQueue queue = new JmsTemporaryQueue(destinationName); |
| createResource(queue); |
| tempDestinations.put(queue, queue); |
| queue.setConnection(this); |
| return queue; |
| } |
| |
| /** |
| * @return a newly initialized TemporaryTopic instance. |
| */ |
| protected TemporaryTopic createTemporaryTopic() throws JMSException { |
| String destinationName = connectionInfo.getId() + ":" + tempDestIdGenerator.incrementAndGet(); |
| JmsTemporaryTopic topic = new JmsTemporaryTopic(destinationName); |
| createResource(topic); |
| tempDestinations.put(topic, topic); |
| topic.setConnection(this); |
| return topic; |
| } |
| |
| protected void deleteTemporaryDestination(JmsTemporaryDestination destination) throws JMSException { |
| checkClosedOrFailed(); |
| |
| try { |
| for (JmsSession session : sessions.values()) { |
| if (session.isDestinationInUse(destination)) { |
| throw new IllegalStateException("A consumer is consuming from the temporary destination"); |
| } |
| } |
| |
| tempDestinations.remove(destination); |
| |
| destroyResource(destination); |
| } catch (Exception e) { |
| throw JmsExceptionSupport.create(e); |
| } |
| } |
| |
| protected void checkClosedOrFailed() throws JMSException { |
| checkClosed(); |
| if (failureCause.get() != null) { |
| throw new JmsConnectionFailedException(failureCause.get()); |
| } |
| } |
| |
| protected void checkConsumeFromTemporaryDestination(JmsTemporaryDestination destination) throws JMSException { |
| if (!this.equals(destination.getConnection())) { |
| throw new InvalidDestinationException("Can't consume from a temporary destination created using another connection"); |
| } |
| } |
| |
| protected boolean isTemporaryDestinationDeleted(JmsDestination destination) { |
| return !tempDestinations.containsKey(destination); |
| } |
| |
| protected void checkClosed() throws IllegalStateException { |
| if (closed.get()) { |
| throw new IllegalStateException("The Connection is closed"); |
| } |
| } |
| |
| protected JmsSessionId getNextSessionId() { |
| return new JmsSessionId(connectionInfo.getId(), sessionIdGenerator.incrementAndGet()); |
| } |
| |
| protected JmsTransactionId getNextTransactionId() { |
| return new JmsTransactionId(connectionInfo.getId(), transactionIdGenerator.incrementAndGet()); |
| } |
| |
| protected JmsConsumerId getNextConnectionConsumerId() { |
| return new JmsConsumerId(connectionInfo.getId().toString(), -1, connectionConsumerIdGenerator.incrementAndGet()); |
| } |
| |
| protected synchronized boolean isExplicitClientID() { |
| return connectionInfo.isExplicitClientID(); |
| } |
| |
| //----- Provider interface methods ---------------------------------------// |
| |
| void createResource(JmsResource resource) throws JMSException { |
| createResource(resource, null); |
| } |
| |
| void createResource(JmsResource resource, ProviderSynchronization synchronization) throws JMSException { |
| checkClosedOrFailed(); |
| |
| try { |
| ProviderFuture request = provider.newProviderFuture(synchronization); |
| requests.put(request, request); |
| try { |
| provider.create(resource, request); |
| request.sync(); |
| } finally { |
| requests.remove(request); |
| } |
| } catch (Exception ex) { |
| throw JmsExceptionSupport.create(ex); |
| } |
| } |
| |
| void startResource(JmsResource resource) throws JMSException { |
| startResource(resource, null); |
| } |
| |
| void startResource(JmsResource resource, ProviderSynchronization synchronization) throws JMSException { |
| checkClosedOrFailed(); |
| |
| try { |
| ProviderFuture request = provider.newProviderFuture(synchronization); |
| requests.put(request, request); |
| try { |
| provider.start(resource, request); |
| request.sync(); |
| } finally { |
| requests.remove(request); |
| } |
| } catch (Exception ioe) { |
| throw JmsExceptionSupport.create(ioe); |
| } |
| } |
| |
| void stopResource(JmsResource resource) throws JMSException { |
| stopResource(resource, null); |
| } |
| |
| void stopResource(JmsResource resource, ProviderSynchronization synchronization) throws JMSException { |
| checkClosedOrFailed(); |
| |
| try { |
| ProviderFuture request = provider.newProviderFuture(synchronization); |
| requests.put(request, request); |
| try { |
| provider.stop(resource, request); |
| request.sync(); |
| } finally { |
| requests.remove(request); |
| } |
| } catch (Exception ioe) { |
| throw JmsExceptionSupport.create(ioe); |
| } |
| } |
| |
| void destroyResource(JmsResource resource) throws JMSException { |
| destroyResource(resource, null); |
| } |
| |
| void destroyResource(JmsResource resource, ProviderSynchronization synchronization) throws JMSException { |
| checkClosedOrFailed(); |
| |
| try { |
| ProviderFuture request = provider.newProviderFuture(synchronization); |
| requests.put(request, request); |
| try { |
| provider.destroy(resource, request); |
| request.sync(); |
| } finally { |
| requests.remove(request); |
| } |
| } catch (Exception ioe) { |
| throw JmsExceptionSupport.create(ioe); |
| } |
| } |
| |
| void send(JmsOutboundMessageDispatch envelope, ProviderSynchronization synchronization) throws JMSException { |
| checkClosedOrFailed(); |
| |
| try { |
| ProviderFuture request = provider.newProviderFuture(synchronization); |
| requests.put(request, request); |
| try { |
| provider.send(envelope, request); |
| request.sync(); |
| } finally { |
| requests.remove(request); |
| } |
| } catch (Exception ioe) { |
| throw JmsExceptionSupport.create(ioe); |
| } |
| } |
| |
| void acknowledge(JmsInboundMessageDispatch envelope, ACK_TYPE ackType) throws JMSException { |
| acknowledge(envelope, ackType, null); |
| } |
| |
| void acknowledge(JmsInboundMessageDispatch envelope, ACK_TYPE ackType, ProviderSynchronization synchronization) throws JMSException { |
| checkClosedOrFailed(); |
| |
| try { |
| ProviderFuture request = provider.newProviderFuture(synchronization); |
| provider.acknowledge(envelope, ackType, request); |
| request.sync(); |
| } catch (Exception ioe) { |
| throw JmsExceptionSupport.create(ioe); |
| } |
| } |
| |
| void acknowledge(JmsSessionId sessionId, ACK_TYPE ackType) throws JMSException { |
| acknowledge(sessionId, ackType, null); |
| } |
| |
| void acknowledge(JmsSessionId sessionId, ACK_TYPE ackType, ProviderSynchronization synchronization) throws JMSException { |
| checkClosedOrFailed(); |
| |
| try { |
| ProviderFuture request = provider.newProviderFuture(synchronization); |
| provider.acknowledge(sessionId, ackType, request); |
| request.sync(); |
| } catch (Exception ioe) { |
| throw JmsExceptionSupport.create(ioe); |
| } |
| } |
| |
| void unsubscribe(String name) throws JMSException { |
| unsubscribe(name, null); |
| } |
| |
| void unsubscribe(String name, ProviderSynchronization synchronization) throws JMSException { |
| checkClosedOrFailed(); |
| |
| try { |
| ProviderFuture request = provider.newProviderFuture(synchronization); |
| requests.put(request, request); |
| try { |
| provider.unsubscribe(name, request); |
| request.sync(); |
| } finally { |
| requests.remove(request); |
| } |
| } catch (Exception ioe) { |
| throw JmsExceptionSupport.create(ioe); |
| } |
| } |
| |
| void commit(JmsTransactionInfo transactionInfo, JmsTransactionInfo nextTransactionId) throws JMSException { |
| commit(transactionInfo, nextTransactionId, null); |
| } |
| |
| void commit(JmsTransactionInfo transactionInfo, JmsTransactionInfo nextTransactionId, ProviderSynchronization synchronization) throws JMSException { |
| checkClosedOrFailed(); |
| |
| try { |
| ProviderFuture request = provider.newProviderFuture(synchronization); |
| requests.put(request, request); |
| try { |
| provider.commit(transactionInfo, nextTransactionId, request); |
| request.sync(); |
| } finally { |
| requests.remove(request); |
| } |
| } catch (Exception ioe) { |
| throw JmsExceptionSupport.create(ioe); |
| } |
| } |
| |
| void rollback(JmsTransactionInfo transactionInfo, JmsTransactionInfo nextTransactionId) throws JMSException { |
| rollback(transactionInfo, nextTransactionId, null); |
| } |
| |
| void rollback(JmsTransactionInfo transactionInfo, JmsTransactionInfo nextTransactionId, ProviderSynchronization synchronization) throws JMSException { |
| checkClosedOrFailed(); |
| |
| try { |
| ProviderFuture request = provider.newProviderFuture(synchronization); |
| requests.put(request, request); |
| try { |
| provider.rollback(transactionInfo, nextTransactionId, request); |
| request.sync(); |
| } finally { |
| requests.remove(request); |
| } |
| } catch (Exception ioe) { |
| throw JmsExceptionSupport.create(ioe); |
| } |
| } |
| |
| void recover(JmsSessionId sessionId) throws JMSException { |
| recover(sessionId, null); |
| } |
| |
| void recover(JmsSessionId sessionId, ProviderSynchronization synchronization) throws JMSException { |
| checkClosedOrFailed(); |
| |
| try { |
| ProviderFuture request = provider.newProviderFuture(synchronization); |
| requests.put(request, request); |
| try { |
| provider.recover(sessionId, request); |
| request.sync(); |
| } finally { |
| requests.remove(request); |
| } |
| } catch (Exception ioe) { |
| throw JmsExceptionSupport.create(ioe); |
| } |
| } |
| |
| void pull(JmsConsumerId consumerId, long timeout) throws JMSException { |
| pull(consumerId, timeout, null); |
| } |
| |
| void pull(JmsConsumerId consumerId, long timeout, ProviderSynchronization synchronization) throws JMSException { |
| checkClosedOrFailed(); |
| |
| try { |
| ProviderFuture request = provider.newProviderFuture(synchronization); |
| requests.put(request, request); |
| try { |
| provider.pull(consumerId, timeout, request); |
| request.sync(); |
| } finally { |
| requests.remove(request); |
| } |
| } catch (Exception ioe) { |
| throw JmsExceptionSupport.create(ioe); |
| } |
| } |
| |
| //----- Property setters and getters -------------------------------------// |
| |
| @Override |
| public ExceptionListener getExceptionListener() throws JMSException { |
| checkClosedOrFailed(); |
| return this.exceptionListener; |
| } |
| |
| @Override |
| public void setExceptionListener(ExceptionListener listener) throws JMSException { |
| checkClosedOrFailed(); |
| this.exceptionListener = listener; |
| } |
| |
| /** |
| * Adds a JmsConnectionListener so that a client can be notified of events in |
| * the underlying connection and its state. |
| * |
| * @param listener |
| * the new listener to add to the collection. |
| */ |
| public void addConnectionListener(JmsConnectionListener listener) { |
| connectionListeners.add(listener); |
| } |
| |
| /** |
| * Removes a JmsConnectionListener that was previously registered. |
| * |
| * @param listener |
| * the listener to remove from the collection. |
| * |
| * @return true if the given listener was removed from the current set. |
| */ |
| public boolean removeConnectionListener(JmsConnectionListener listener) { |
| return connectionListeners.remove(listener); |
| } |
| |
| public boolean isForceAsyncSend() { |
| return connectionInfo.isForceAsyncSend(); |
| } |
| |
| public void setForceAsyncSend(boolean forceAsyncSend) { |
| connectionInfo.setForceAsyncSend(forceAsyncSend); |
| } |
| |
| public boolean isForceSyncSend() { |
| return connectionInfo.isForceSyncSend(); |
| } |
| |
| public void setForceSyncSend(boolean alwaysSyncSend) { |
| connectionInfo.setForceSyncSend(alwaysSyncSend); |
| } |
| |
| public String getTopicPrefix() { |
| return connectionInfo.getTopicPrefix(); |
| } |
| |
| public void setTopicPrefix(String topicPrefix) { |
| connectionInfo.setTopicPrefix(topicPrefix); |
| } |
| |
| public String getQueuePrefix() { |
| return connectionInfo.getQueuePrefix(); |
| } |
| |
| public void setQueuePrefix(String queuePrefix) { |
| connectionInfo.setQueuePrefix(queuePrefix); |
| } |
| |
| public boolean isValidatePropertyNames() { |
| return connectionInfo.isValidatePropertyNames(); |
| } |
| |
| public void setValidatePropertyNames(boolean validatePropertyNames) { |
| connectionInfo.setValidatePropertyNames(validatePropertyNames); |
| } |
| |
| public boolean isValidateSelector() { |
| return connectionInfo.isValidateSelector(); |
| } |
| |
| public void setValidateSelector(boolean validateSelector) { |
| connectionInfo.setValidateSelector(validateSelector); |
| } |
| |
| public JmsPrefetchPolicy getPrefetchPolicy() { |
| return connectionInfo.getPrefetchPolicy(); |
| } |
| |
| public void setPrefetchPolicy(JmsPrefetchPolicy prefetchPolicy) { |
| connectionInfo.setPrefetchPolicy(prefetchPolicy); |
| } |
| |
| public JmsRedeliveryPolicy getRedeliveryPolicy() { |
| return connectionInfo.getRedeliveryPolicy(); |
| } |
| |
| public void setRedeliveryPolicy(JmsRedeliveryPolicy redeliveryPolicy) { |
| connectionInfo.setRedeliveryPolicy(redeliveryPolicy); |
| } |
| |
| public JmsPresettlePolicy getPresettlePolicy() { |
| return connectionInfo.getPresettlePolicy(); |
| } |
| |
| public void setPresettlePolicy(JmsPresettlePolicy presettlePolicy) { |
| connectionInfo.setPresettlePolicy(presettlePolicy); |
| } |
| |
| public JmsDeserializationPolicy getDeserializationPolicy() { |
| return connectionInfo.getDeserializationPolicy(); |
| } |
| |
| public void setDeserializationPolicy(JmsDeserializationPolicy deserializationPolicy) { |
| connectionInfo.setDeserializationPolicy(deserializationPolicy); |
| } |
| |
| public boolean isReceiveLocalOnly() { |
| return connectionInfo.isReceiveLocalOnly(); |
| } |
| |
| public void setReceiveLocalOnly(boolean receiveLocalOnly) { |
| this.connectionInfo.setReceiveLocalOnly(receiveLocalOnly); |
| } |
| |
| public boolean isReceiveNoWaitLocalOnly() { |
| return connectionInfo.isReceiveNoWaitLocalOnly(); |
| } |
| |
| public void setReceiveNoWaitLocalOnly(boolean receiveNoWaitLocalOnly) { |
| this.connectionInfo.setReceiveNoWaitLocalOnly(receiveNoWaitLocalOnly); |
| } |
| |
| public boolean isLocalMessagePriority() { |
| return connectionInfo.isLocalMessagePriority(); |
| } |
| |
| public void setLocalMessagePriority(boolean localMessagePriority) { |
| this.connectionInfo.setLocalMessagePriority(localMessagePriority); |
| } |
| |
| public long getCloseTimeout() { |
| return connectionInfo.getCloseTimeout(); |
| } |
| |
| public void setCloseTimeout(long closeTimeout) { |
| connectionInfo.setCloseTimeout(closeTimeout); |
| } |
| |
| public long getConnectTimeout() { |
| return connectionInfo.getConnectTimeout(); |
| } |
| |
| public void setConnectTimeout(long connectTimeout) { |
| this.connectionInfo.setConnectTimeout(connectTimeout); |
| } |
| |
| public long getSendTimeout() { |
| return connectionInfo.getSendTimeout(); |
| } |
| |
| public void setSendTimeout(long sendTimeout) { |
| connectionInfo.setSendTimeout(sendTimeout); |
| } |
| |
| public long getRequestTimeout() { |
| return connectionInfo.getRequestTimeout(); |
| } |
| |
| public void setRequestTimeout(long requestTimeout) { |
| connectionInfo.setRequestTimeout(requestTimeout); |
| } |
| |
| public URI getConfiguredURI() { |
| return connectionInfo.getConfiguredURI(); |
| } |
| |
| public URI getConnectedURI() { |
| return connectionInfo.getConnectedURI(); |
| } |
| |
| public String getUsername() { |
| return connectionInfo.getUsername(); |
| } |
| |
| byte[] getEncodedUsername() { |
| return connectionInfo.getEncodedUsername(); |
| } |
| |
| public String getPassword() { |
| return connectionInfo.getPassword(); |
| } |
| |
| public boolean isConnected() { |
| return connected.get(); |
| } |
| |
| public boolean isStarted() { |
| return started.get(); |
| } |
| |
| public boolean isClosed() { |
| return closed.get(); |
| } |
| |
| public boolean isFailed() { |
| return failureCause.get() != null; |
| } |
| |
| public JmsConnectionId getId() { |
| return connectionInfo.getId(); |
| } |
| |
| public JmsMessageFactory getMessageFactory() { |
| if (messageFactory == null) { |
| throw new RuntimeException("Message factory should never be null"); |
| } |
| return messageFactory; |
| } |
| |
| void setMessageFactory(JmsMessageFactory factory) { |
| messageFactory = factory; |
| } |
| |
| public boolean isForceAsyncAcks() { |
| return connectionInfo.isForceAsyncAcks(); |
| } |
| |
| public void setForceAsyncAcks(boolean forceAsyncAcks) { |
| connectionInfo.setForceAsyncAcks(forceAsyncAcks); |
| } |
| |
| public boolean isLocalMessageExpiry() { |
| return connectionInfo.isLocalMessageExpiry(); |
| } |
| |
| public void setLocalMessageExpiry(boolean localMessageExpiry) { |
| connectionInfo.setLocalMessageExpiry(localMessageExpiry); |
| } |
| |
| public JmsMessageIDPolicy getMessageIDPolicy() { |
| return connectionInfo.getMessageIDPolicy(); |
| } |
| |
| public void setMessageIDPolicy(JmsMessageIDPolicy messageIDPolicy) { |
| connectionInfo.setMessageIDPolicy(messageIDPolicy); |
| } |
| |
| public boolean isPopulateJMSXUserID() { |
| return connectionInfo.isPopulateJMSXUserID(); |
| } |
| |
| public void setPopulateJMSXUserID(boolean populateJMSXUserID) { |
| connectionInfo.setPopulateJMSXUserID(populateJMSXUserID); |
| } |
| |
| public boolean isUseDaemonThread() { |
| return connectionInfo.isUseDaemonThread(); |
| } |
| |
| public boolean isCloseLinksThatFailOnReconnect() { |
| return connectionInfo.isCloseLinksThatFailOnReconnect(); |
| } |
| |
| public void setCloseLinksThatFailOnReconnect(boolean closeLinksThatFailOnReconnect) { |
| connectionInfo.setCloseLinksThatFailOnReconnect(closeLinksThatFailOnReconnect); |
| } |
| |
| JmsTracer getTracer() { |
| return connectionInfo.getTracer(); |
| } |
| |
| //----- Async event handlers ---------------------------------------------// |
| |
| @Override |
| public void onInboundMessage(final JmsInboundMessageDispatch envelope) { |
| JmsMessage incoming = envelope.getMessage(); |
| if (incoming != null) { |
| // Ensure incoming Messages are in read-only mode and configured properly |
| incoming.setReadOnlyBody(true); |
| incoming.setReadOnlyProperties(true); |
| incoming.setValidatePropertyNames(isValidatePropertyNames()); |
| } |
| |
| JmsMessageDispatcher dispatcher = null; |
| |
| if (envelope.getConsumerInfo() != null && envelope.getConsumerInfo().getDispatcher() != null) { |
| dispatcher = envelope.getConsumerInfo().getDispatcher(); |
| } else { |
| dispatcher = sessions.get(envelope.getConsumerId().getParentId()); |
| if (dispatcher == null) { |
| dispatcher = connectionConsumers.get(envelope.getConsumerId()); |
| } |
| } |
| |
| if (dispatcher != null) { |
| dispatcher.onInboundMessage(envelope); |
| } else { |
| LOG.debug("Message inbound with no dispatcher registered for its consumer: {}", envelope.getConsumerId()); |
| } |
| |
| // Run the application callbacks on the connection executor to allow the provider to |
| // return to its normal processing without waiting for client level processing to finish. |
| if (!connectionListeners.isEmpty()) { |
| for (final JmsConnectionListener listener : connectionListeners) { |
| executor.submit(() -> { |
| listener.onInboundMessage(envelope); |
| }); |
| } |
| } |
| } |
| |
| @Override |
| public void onCompletedMessageSend(JmsOutboundMessageDispatch envelope) { |
| JmsSession session = sessions.get(envelope.getProducerId().getParentId()); |
| if (session != null) { |
| session.onCompletedMessageSend(envelope); |
| } else { |
| LOG.debug("No matching Session found for async send result"); |
| } |
| } |
| |
| @Override |
| public void onFailedMessageSend(JmsOutboundMessageDispatch envelope, ProviderException cause) { |
| JmsSession session = sessions.get(envelope.getProducerId().getParentId()); |
| if (session != null) { |
| session.onFailedMessageSend(envelope, cause); |
| } else { |
| LOG.debug("No matching Session found for failed async send result"); |
| } |
| } |
| |
| @Override |
| public void onConnectionInterrupted(final URI remoteURI) { |
| try { |
| LOG.info("Connection {} interrupted to server: {}", connectionInfo.getId(), URISupport.removeQuery(remoteURI)); |
| } catch (URISyntaxException e) { |
| LOG.info("Connection {} interrupted to server: {}:{}", connectionInfo.getId(), remoteURI.getHost(), remoteURI.getPort()); |
| } |
| |
| for (JmsSession session : sessions.values()) { |
| session.onConnectionInterrupted(); |
| } |
| |
| // Run the application callbacks on the connection executor to allow the provider to |
| // return to its normal processing without waiting for client level processing to finish. |
| for (final JmsConnectionListener listener : connectionListeners) { |
| executor.submit(new Runnable() { |
| |
| @Override |
| public void run() { |
| listener.onConnectionInterrupted(remoteURI); |
| } |
| }); |
| } |
| } |
| |
| @Override |
| public void onConnectionRecovery(Provider provider) throws Exception { |
| LOG.debug("Connection {} is starting recovery.", connectionInfo.getId()); |
| |
| ProviderFuture request = provider.newProviderFuture(); |
| provider.create(connectionInfo, request); |
| request.sync(); |
| |
| for (JmsTemporaryDestination tempDestination : tempDestinations.values()) { |
| request = provider.newProviderFuture(); |
| provider.create(tempDestination, request); |
| request.sync(); |
| } |
| |
| for (JmsConnectionConsumer connectionConsumer : connectionConsumers.values()) { |
| JmsConsumerInfo consumerInfo = connectionConsumer.getConsumerInfo(); |
| if (!consumerInfo.isClosed()) { |
| request = provider.newProviderFuture(); |
| provider.create(consumerInfo, request); |
| request.sync(); |
| } |
| } |
| |
| for (JmsSession session : sessions.values()) { |
| session.onConnectionRecovery(provider); |
| } |
| } |
| |
| @Override |
| public void onConnectionRecovered(Provider provider) throws Exception { |
| LOG.debug("Connection {} is finalizing recovery.", connectionInfo.getId()); |
| |
| setMessageFactory(provider.getMessageFactory()); |
| connectionInfo.setConnectedURI(provider.getRemoteURI()); |
| |
| for (JmsConnectionConsumer connectionConsumer : connectionConsumers.values()) { |
| JmsConsumerInfo consumerInfo = connectionConsumer.getConsumerInfo(); |
| if (!consumerInfo.isClosed()) { |
| ProviderFuture request = provider.newProviderFuture(); |
| provider.start(consumerInfo, request); |
| request.sync(); |
| } |
| } |
| |
| for (JmsSession session : sessions.values()) { |
| session.onConnectionRecovered(provider); |
| } |
| } |
| |
| @Override |
| public void onConnectionRestored(final URI remoteURI) { |
| for (JmsSession session : sessions.values()) { |
| session.onConnectionRestored(); |
| } |
| |
| try { |
| LOG.info("Connection {} restored to server: {}", connectionInfo.getId(), URISupport.removeQuery(remoteURI)); |
| } catch (URISyntaxException e) { |
| LOG.info("Connection {} restored to server: {}:{}", connectionInfo.getId(), remoteURI.getHost(), remoteURI.getPort()); |
| } |
| |
| // Run the application callbacks on the connection executor to allow the provider to |
| // return to its normal processing without waiting for client level processing to finish. |
| for (final JmsConnectionListener listener : connectionListeners) { |
| executor.submit(new Runnable() { |
| |
| @Override |
| public void run() { |
| listener.onConnectionRestored(remoteURI); |
| } |
| }); |
| } |
| } |
| |
| @Override |
| public void onConnectionEstablished(final URI remoteURI) { |
| try { |
| LOG.info("Connection {} connected to server: {}", connectionInfo.getId(), URISupport.removeQuery(remoteURI)); |
| } catch (URISyntaxException e) { |
| LOG.info("Connection {} connected to server: {}:{}", connectionInfo.getId(), remoteURI.getHost(), remoteURI.getPort()); |
| } |
| |
| setMessageFactory(provider.getMessageFactory()); |
| connectionInfo.setConnectedURI(provider.getRemoteURI()); |
| |
| // Run the application callbacks on the connection executor to allow the provider to |
| // return to its normal processing without waiting for client level processing to finish. |
| for (final JmsConnectionListener listener : connectionListeners) { |
| executor.submit(new Runnable() { |
| |
| @Override |
| public void run() { |
| listener.onConnectionEstablished(remoteURI); |
| } |
| }); |
| } |
| } |
| |
| @Override |
| public void onConnectionFailure(final ProviderException ex) { |
| providerFailed(ex); |
| |
| if (!closing.get() && !closed.get()) { |
| LOG.warn("Connection {} has failed due to: {}", connectionInfo.getId(), ex != null ? ex.getMessage() : "No error details provided."); |
| } |
| |
| // Signal that connection dropped we need to mark transactions as |
| // failed, deliver failure events to asynchronous send completions etc. |
| for (JmsSession session : sessions.values()) { |
| try { |
| session.onConnectionInterrupted(); |
| } catch (Throwable t) { |
| LOG.warn("Exception while marking session interrupted", t); |
| } |
| } |
| |
| onProviderException(ex); |
| |
| for (AsyncResult request : requests.keySet()) { |
| try { |
| request.onFailure(ex); |
| } catch (Exception e) { |
| LOG.debug("Exception during request cleanup", e); |
| } |
| } |
| |
| if (!closing.get() && !closed.get()) { |
| executor.execute(new Runnable() { |
| @Override |
| public void run() { |
| if (provider != null) { |
| try { |
| provider.close(); |
| } catch (Throwable error) { |
| LOG.debug("Error while closing failed Provider: {}", error.getMessage()); |
| } |
| } |
| |
| for (AsyncResult request : requests.keySet()) { |
| try { |
| request.onFailure(ex); |
| } catch (Exception e) { |
| LOG.debug("Exception during request cleanup", e); |
| } |
| } |
| |
| try { |
| shutdown(ex); |
| } catch (JMSException e) { |
| LOG.warn("Exception during connection cleanup, " + e, e); |
| } |
| |
| for (JmsConnectionListener listener : connectionListeners) { |
| listener.onConnectionFailure(ex); |
| } |
| |
| // Don't accept any more connection work but allow all pending work |
| // to complete in order to ensure notifications are sent to any blocked |
| // resources. |
| executor.shutdown(); |
| } |
| }); |
| } |
| } |
| |
| @Override |
| public void onResourceClosed(final JmsResource resource, final ProviderException cause) { |
| // Closure of the Connection itself is notified via onConnectionFailure |
| |
| // Run on the connection executor to free the provider to go do more work and avoid |
| // any chance of a deadlock if the code ever looped back to the provider. |
| |
| if (!closing.get() && !closed.get()) { |
| |
| // Set the failure cause indicator now to more quickly reflect the correct |
| // state in the resource. The actual shutdown and clean will be done on the |
| // connection executor thread to avoid looping or stalling the provider thread. |
| if (resource instanceof JmsSessionInfo) { |
| JmsSession session = sessions.get(resource.getId()); |
| if (session != null) { |
| session.setFailureCause(cause); |
| } |
| } else if (resource instanceof JmsProducerInfo) { |
| JmsSessionId parentId = ((JmsProducerInfo) resource).getParentId(); |
| JmsSession session = sessions.get(parentId); |
| if (session != null) { |
| JmsMessageProducer producer = session.lookup((JmsProducerId) resource.getId()); |
| if (producer != null) { |
| producer.setFailureCause(cause); |
| } |
| } |
| } else if (resource instanceof JmsConsumerInfo) { |
| JmsConsumerInfo consumerInfo = (JmsConsumerInfo) resource; |
| if (consumerInfo.isConnectionConsumer()) { |
| JmsConnectionConsumer consumer = connectionConsumers.get(consumerInfo.getId()); |
| if (consumer != null) { |
| consumer.setFailureCause(cause); |
| } |
| } else { |
| JmsSessionId parentId = consumerInfo.getParentId(); |
| JmsSession session = sessions.get(parentId); |
| if (session != null) { |
| JmsMessageConsumer consumer = session.lookup((JmsConsumerId) resource.getId()); |
| if (consumer != null) { |
| consumer.setFailureCause(cause); |
| } |
| } |
| } |
| } |
| |
| executor.execute(new Runnable() { |
| |
| @Override |
| public void run() { |
| if (resource instanceof JmsSessionInfo) { |
| JmsSession session = sessions.get(resource.getId()); |
| if (session != null) { |
| session.sessionClosed(cause); |
| for (JmsConnectionListener listener : connectionListeners) { |
| listener.onSessionClosed(session, cause); |
| } |
| } |
| } else if (resource instanceof JmsProducerInfo) { |
| JmsSessionId parentId = ((JmsProducerInfo) resource).getParentId(); |
| JmsSession session = sessions.get(parentId); |
| if (session != null) { |
| JmsMessageProducer producer = session.producerClosed((JmsProducerInfo) resource, cause); |
| if (producer != null) { |
| for (JmsConnectionListener listener : connectionListeners) { |
| listener.onProducerClosed(producer, cause); |
| } |
| } |
| } |
| } else if (resource instanceof JmsConsumerInfo) { |
| JmsConsumerInfo consumerInfo = (JmsConsumerInfo) resource; |
| if (consumerInfo.isConnectionConsumer()) { |
| JmsConnectionConsumer consumer = connectionConsumers.get(consumerInfo.getId()); |
| if (consumer != null) { |
| try { |
| if (consumer != null) { |
| consumer.shutdown(cause); |
| } |
| } catch (Throwable error) { |
| LOG.trace("Ignoring exception thrown during cleanup of closed connection consumer", error); |
| } |
| |
| onAsyncException(new JMSException("Connection Consumer remotely closed").initCause(cause)); |
| } |
| } else { |
| JmsSessionId parentId = consumerInfo.getParentId(); |
| JmsSession session = sessions.get(parentId); |
| if (session != null) { |
| JmsMessageConsumer consumer = session.consumerClosed((JmsConsumerInfo) resource, cause); |
| if (consumer != null) { |
| for (JmsConnectionListener listener : connectionListeners) { |
| listener.onConsumerClosed(consumer, cause); |
| } |
| } |
| } |
| } |
| } else { |
| LOG.info("A JMS resource has been remotely closed: {}", resource); |
| } |
| } |
| }); |
| } |
| } |
| |
| @Override |
| public void onProviderException(final ProviderException cause) { |
| // Report this to any registered exception listener, let the receiver |
| // decide if it should be fatal. |
| onAsyncException(cause); |
| } |
| |
| /** |
| * Handles any asynchronous errors that occur from the JMS framework classes. |
| * |
| * If any listeners are registered they will be notified of the error from a thread |
| * in the Connection's Executor service. |
| * |
| * @param error |
| * The exception that triggered this error. |
| */ |
| public void onAsyncException(Throwable error) { |
| if (!closed.get() && !closing.get()) { |
| if (this.exceptionListener != null) { |
| |
| final JMSException jmsError = JmsExceptionSupport.create(error); |
| |
| executor.execute(new Runnable() { |
| @Override |
| public void run() { |
| JmsConnection.this.exceptionListener.onException(jmsError); |
| } |
| }); |
| } else { |
| LOG.debug("Async exception with no exception listener: {}", error, error); |
| } |
| } |
| } |
| |
| protected void providerFailed(ProviderException cause) { |
| failureCause.compareAndSet(null, cause); |
| } |
| } |