blob: d9c2c42f8b37b0403d35647fd0968b6942530e40 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.qpid.jms;
import java.io.Serializable;
import java.lang.reflect.Method;
import java.lang.reflect.Modifier;
import java.util.ArrayDeque;
import java.util.ArrayList;
import java.util.Deque;
import java.util.Iterator;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentLinkedDeque;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executor;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
import java.util.concurrent.locks.ReentrantLock;
import java.util.function.Consumer;
import javax.jms.BytesMessage;
import javax.jms.CompletionListener;
import javax.jms.DeliveryMode;
import javax.jms.Destination;
import javax.jms.IllegalStateException;
import javax.jms.InvalidDestinationException;
import javax.jms.InvalidSelectorException;
import javax.jms.JMSContext;
import javax.jms.JMSException;
import javax.jms.JMSRuntimeException;
import javax.jms.MapMessage;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.MessageFormatException;
import javax.jms.MessageListener;
import javax.jms.MessageProducer;
import javax.jms.ObjectMessage;
import javax.jms.Queue;
import javax.jms.QueueBrowser;
import javax.jms.QueueReceiver;
import javax.jms.QueueSender;
import javax.jms.QueueSession;
import javax.jms.Session;
import javax.jms.StreamMessage;
import javax.jms.TemporaryQueue;
import javax.jms.TemporaryTopic;
import javax.jms.TextMessage;
import javax.jms.Topic;
import javax.jms.TopicPublisher;
import javax.jms.TopicSession;
import javax.jms.TopicSubscriber;
import org.apache.qpid.jms.exceptions.JmsConnectionFailedException;
import org.apache.qpid.jms.exceptions.JmsExceptionSupport;
import org.apache.qpid.jms.message.JmsInboundMessageDispatch;
import org.apache.qpid.jms.message.JmsMessage;
import org.apache.qpid.jms.message.JmsMessageTransformation;
import org.apache.qpid.jms.message.JmsOutboundMessageDispatch;
import org.apache.qpid.jms.meta.JmsConsumerId;
import org.apache.qpid.jms.meta.JmsConsumerInfo;
import org.apache.qpid.jms.meta.JmsProducerId;
import org.apache.qpid.jms.meta.JmsProducerInfo;
import org.apache.qpid.jms.meta.JmsResource.ResourceState;
import org.apache.qpid.jms.meta.JmsSessionId;
import org.apache.qpid.jms.meta.JmsSessionInfo;
import org.apache.qpid.jms.policy.JmsDeserializationPolicy;
import org.apache.qpid.jms.policy.JmsMessageIDPolicy;
import org.apache.qpid.jms.policy.JmsPrefetchPolicy;
import org.apache.qpid.jms.policy.JmsPresettlePolicy;
import org.apache.qpid.jms.policy.JmsRedeliveryPolicy;
import org.apache.qpid.jms.provider.Provider;
import org.apache.qpid.jms.provider.ProviderConstants.ACK_TYPE;
import org.apache.qpid.jms.provider.ProviderException;
import org.apache.qpid.jms.provider.ProviderFuture;
import org.apache.qpid.jms.provider.ProviderSynchronization;
import org.apache.qpid.jms.selector.SelectorParser;
import org.apache.qpid.jms.selector.filter.FilterException;
import org.apache.qpid.jms.util.NoOpExecutor;
import org.apache.qpid.jms.util.QpidJMSThreadFactory;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* JMS Session implementation
*/
public class JmsSession implements AutoCloseable, Session, QueueSession, TopicSession, JmsMessageDispatcher {
private static final Logger LOG = LoggerFactory.getLogger(JmsSession.class);
private static final int INDIVIDUAL_ACKNOWLEDGE = 101;
private static final int ARTEMIS_PRE_ACKNOWLEDGE = 100;
private static final int NO_ACKNOWLEDGE = 257;
private final JmsConnection connection;
private final int acknowledgementMode;
private final Map<JmsProducerId, JmsMessageProducer> producers = new ConcurrentHashMap<JmsProducerId, JmsMessageProducer>();
private final Map<JmsConsumerId, JmsMessageConsumer> consumers = new ConcurrentHashMap<JmsConsumerId, JmsMessageConsumer>();
private MessageListener messageListener;
private final java.util.Queue<Consumer<JmsSession>> sessionQueue = new ArrayDeque<>();
private final AtomicBoolean closed = new AtomicBoolean();
private final AtomicBoolean started = new AtomicBoolean();
private final JmsSessionInfo sessionInfo;
private final ReentrantLock sendLock = new ReentrantLock();
private volatile ThreadPoolExecutor deliveryExecutor;
private volatile ThreadPoolExecutor completionExcecutor;
private AtomicReference<Thread> deliveryThread = new AtomicReference<Thread>();
private boolean deliveryThreadCheckEnabled = true;
private AtomicReference<Thread> completionThread = new AtomicReference<Thread>();
private final AtomicLong consumerIdGenerator = new AtomicLong();
private final AtomicLong producerIdGenerator = new AtomicLong();
private JmsTransactionContext transactionContext;
private boolean sessionRecovered;
private final AtomicReference<Throwable> failureCause = new AtomicReference<>();
private final Deque<SendCompletion> asyncSendQueue = new ConcurrentLinkedDeque<SendCompletion>();
protected JmsSession(JmsConnection connection, JmsSessionId sessionId, int acknowledgementMode) throws JMSException {
this.connection = connection;
this.acknowledgementMode = acknowledgementMode;
if (acknowledgementMode == SESSION_TRANSACTED) {
setTransactionContext(new JmsLocalTransactionContext(this));
} else {
setTransactionContext(new JmsNoTxTransactionContext());
}
sessionInfo = new JmsSessionInfo(sessionId);
sessionInfo.setAcknowledgementMode(acknowledgementMode);
sessionInfo.setSendAcksAsync(connection.isForceAsyncAcks());
sessionInfo.setMessageIDPolicy(connection.getMessageIDPolicy().copy());
sessionInfo.setPrefetchPolicy(connection.getPrefetchPolicy().copy());
sessionInfo.setPresettlePolicy(connection.getPresettlePolicy().copy());
sessionInfo.setRedeliveryPolicy(connection.getRedeliveryPolicy().copy());
sessionInfo.setDeserializationPolicy(connection.getDeserializationPolicy());
connection.createResource(sessionInfo, new ProviderSynchronization() {
@Override
public void onPendingSuccess() {
connection.addSession(sessionInfo, JmsSession.this);
}
@Override
public void onPendingFailure(ProviderException cause) {
}
});
// We always keep an open TX if transacted so start now.
try {
getTransactionContext().begin();
} catch (Exception e) {
// failed, close the AMQP session before we throw
try {
connection.destroyResource(sessionInfo, new ProviderSynchronization() {
@Override
public void onPendingSuccess() {
connection.removeSession(sessionInfo);
}
@Override
public void onPendingFailure(ProviderException cause) {
connection.removeSession(sessionInfo);
}
});
} catch (Exception ex) {
// Ignore, throw original error
}
throw e;
}
}
int acknowledgementMode() {
return acknowledgementMode;
}
//////////////////////////////////////////////////////////////////////////
// Session methods
//////////////////////////////////////////////////////////////////////////
@Override
public int getAcknowledgeMode() throws JMSException {
checkClosed();
return acknowledgementMode;
}
@Override
public boolean getTransacted() throws JMSException {
checkClosed();
return isTransacted();
}
@Override
public MessageListener getMessageListener() throws JMSException {
checkClosed();
return messageListener;
}
@Override
public void setMessageListener(MessageListener listener) throws JMSException {
if (listener != null) {
checkClosed();
}
this.messageListener = listener;
}
@Override
public void recover() throws JMSException {
checkClosed();
if (getTransacted()) {
throw new javax.jms.IllegalStateException("Cannot call recover() on a transacted session");
}
boolean wasStarted = isStarted();
stop();
connection.recover(getSessionId());
sessionRecovered = true;
if (wasStarted) {
start();
}
}
@Override
public void commit() throws JMSException {
checkClosed();
checkIsCompletionThread();
if (!getTransacted()) {
throw new javax.jms.IllegalStateException("Not a transacted session");
}
transactionContext.commit();
}
@Override
public void rollback() throws JMSException {
checkClosed();
checkIsCompletionThread();
if (!getTransacted()) {
throw new javax.jms.IllegalStateException("Not a transacted session");
}
// Stop processing any new messages that arrive
try {
for (JmsMessageConsumer c : consumers.values()) {
c.suspendForRollback();
}
} finally {
transactionContext.rollback();
}
// Currently some consumers won't get suspended and some won't restart
// after a failed rollback.
for (JmsMessageConsumer c : consumers.values()) {
c.resumeAfterRollback();
}
}
@Override
public void close() throws JMSException {
checkIsDeliveryThread();
checkIsCompletionThread();
if (!closed.get()) {
doClose();
}
}
/**
* Shutdown the Session and release all resources. Once completed the Session can
* request that the Provider destroy the Session and it's child resources.
*
* @throws JMSException if an internal error occurs during the close operation.
*/
protected void doClose() throws JMSException {
boolean interrupted = Thread.interrupted();
shutdown();
try {
connection.destroyResource(sessionInfo);
} catch (JmsConnectionFailedException jmsex) {
}
if (interrupted) {
Thread.currentThread().interrupt();
}
}
/**
* This method should terminate all Session resources and prepare for disposal of the
* Session. It is called either from the Session close method or from the Connection
* when a close request is made and the Connection wants to cleanup all Session resources.
*
* This method should not attempt to send any requests to the Provider as the resources
* that were associated with this session are either cleaned up by another method in the
* session or are already gone due to remote close or some other error.
*
* @throws JMSException if an error occurs while shutting down the session resources.
*/
protected void shutdown() throws JMSException {
shutdown(null);
}
protected void shutdown(Throwable cause) throws JMSException {
if (closed.compareAndSet(false, true)) {
JMSException shutdownError = null;
try {
sessionInfo.setState(ResourceState.CLOSED);
setFailureCause(cause);
try {
stop();
for (JmsMessageConsumer consumer : new ArrayList<JmsMessageConsumer>(this.consumers.values())) {
consumer.shutdown(cause);
}
for (JmsMessageProducer producer : new ArrayList<JmsMessageProducer>(this.producers.values())) {
producer.shutdown(cause);
}
} catch (JMSException jmsEx) {
shutdownError = jmsEx;
}
boolean inDoubt = transactionContext.isInDoubt();
try {
transactionContext.shutdown();
} catch (JMSException jmsEx) {
if (!inDoubt) {
LOG.warn("Rollback of active transaction failed due to error: ", jmsEx);
shutdownError = shutdownError == null ? jmsEx : shutdownError;
} else {
LOG.trace("Rollback of indoubt transaction failed due to error: ", jmsEx);
}
}
// Ensure that no asynchronous completion sends remain blocked after close.
synchronized (sessionInfo) {
if (cause == null) {
cause = new JMSException("Session closed remotely before message transfer result was notified");
}
getCompletionExecutor().execute(new FailOrCompleteAsyncCompletionsTask(JmsExceptionSupport.create(cause)));
getCompletionExecutor().shutdown();
}
try {
getCompletionExecutor().awaitTermination(connection.getCloseTimeout(), TimeUnit.MILLISECONDS);
} catch (InterruptedException e) {
LOG.trace("Session close awaiting send completions was interrupted");
}
try {
if (getSessionMode() == Session.CLIENT_ACKNOWLEDGE) {
acknowledge(ACK_TYPE.SESSION_SHUTDOWN);
}
} catch (Exception e) {
LOG.trace("Exception during session shutdown cleanup acknowledgement", e);
}
if (shutdownError != null) {
throw shutdownError;
}
} catch (Throwable e) {
if (shutdownError != null) {
throw shutdownError;
} else {
throw JmsExceptionSupport.create(e);
}
} finally {
connection.removeSession(sessionInfo);
}
}
}
//----- Events fired when resource remotely closed due to some error -----//
void sessionClosed(Throwable cause) {
try {
shutdown(cause);
} catch (Throwable error) {
LOG.trace("Ignoring exception thrown during cleanup of closed session", error);
}
}
JmsMessageConsumer consumerClosed(JmsConsumerInfo resource, Throwable cause) {
LOG.info("A JMS MessageConsumer has been closed: {}", resource);
JmsMessageConsumer consumer = consumers.get(resource.getId());
if (consumer.hasMessageListener()) {
connection.onAsyncException(JmsExceptionSupport.create(cause));
}
try {
if (consumer != null) {
consumer.shutdown(cause);
}
} catch (Throwable error) {
LOG.trace("Ignoring exception thrown during cleanup of closed consumer", error);
}
return consumer;
}
JmsMessageProducer producerClosed(JmsProducerInfo resource, Throwable cause) {
LOG.info("A JMS MessageProducer has been closed: {}", resource);
JmsMessageProducer producer = producers.get(resource.getId());
try {
if (producer != null) {
getCompletionExecutor().execute(new FailOrCompleteAsyncCompletionsTask(
producer.getProducerId(), JmsExceptionSupport.create(cause)));
producer.shutdown(cause);
}
} catch (Throwable error) {
LOG.trace("Ignoring exception thrown during cleanup of closed producer", error);
}
return producer;
}
//////////////////////////////////////////////////////////////////////////
// Consumer creation
//////////////////////////////////////////////////////////////////////////
/**
* @see javax.jms.Session#createConsumer(javax.jms.Destination)
*/
@Override
public MessageConsumer createConsumer(Destination destination) throws JMSException {
return createConsumer(destination, null);
}
/**
* @see javax.jms.Session#createConsumer(javax.jms.Destination, java.lang.String)
*/
@Override
public MessageConsumer createConsumer(Destination destination, String messageSelector) throws JMSException {
return createConsumer(destination, messageSelector, false);
}
/**
* @see javax.jms.Session#createConsumer(javax.jms.Destination, java.lang.String, boolean)
*/
@Override
public MessageConsumer createConsumer(Destination destination, String messageSelector, boolean noLocal) throws JMSException {
checkClosed();
checkDestination(destination);
messageSelector = checkSelector(messageSelector);
JmsDestination dest = JmsMessageTransformation.transformDestination(connection, destination);
JmsMessageConsumer result = new JmsMessageConsumer(getNextConsumerId(), this, dest, messageSelector, noLocal);
result.init();
return result;
}
/**
* @see javax.jms.QueueSession#createReceiver(javax.jms.Queue)
*/
@Override
public QueueReceiver createReceiver(Queue queue) throws JMSException {
checkClosed();
checkDestination(queue);
JmsDestination dest = JmsMessageTransformation.transformDestination(connection, queue);
JmsQueueReceiver result = new JmsQueueReceiver(getNextConsumerId(), this, dest, null);
result.init();
return result;
}
/**
* @see javax.jms.QueueSession#createReceiver(javax.jms.Queue, java.lang.String)
*/
@Override
public QueueReceiver createReceiver(Queue queue, String messageSelector) throws JMSException {
checkClosed();
checkDestination(queue);
messageSelector = checkSelector(messageSelector);
JmsDestination dest = JmsMessageTransformation.transformDestination(connection, queue);
JmsQueueReceiver result = new JmsQueueReceiver(getNextConsumerId(), this, dest, messageSelector);
result.init();
return result;
}
/**
* @see javax.jms.Session#createBrowser(javax.jms.Queue)
*/
@Override
public QueueBrowser createBrowser(Queue destination) throws JMSException {
return createBrowser(destination, null);
}
/**
* @see javax.jms.Session#createBrowser(javax.jms.Queue, java.lang.String)
*/
@Override
public QueueBrowser createBrowser(Queue destination, String messageSelector) throws JMSException {
checkClosed();
checkDestination(destination);
messageSelector = checkSelector(messageSelector);
JmsDestination dest = JmsMessageTransformation.transformDestination(connection, destination);
JmsQueueBrowser result = new JmsQueueBrowser(this, dest, messageSelector);
return result;
}
/**
* @see javax.jms.TopicSession#createSubscriber(javax.jms.Topic)
*/
@Override
public TopicSubscriber createSubscriber(Topic topic) throws JMSException {
return createSubscriber(topic, null, false);
}
/**
* @see javax.jms.TopicSession#createSubscriber(javax.jms.Topic, java.lang.String, boolean)
*/
@Override
public TopicSubscriber createSubscriber(Topic topic, String messageSelector, boolean noLocal) throws JMSException {
checkClosed();
checkDestination(topic);
messageSelector = checkSelector(messageSelector);
JmsDestination dest = JmsMessageTransformation.transformDestination(connection, topic);
JmsTopicSubscriber result = new JmsTopicSubscriber(getNextConsumerId(), this, dest, noLocal, messageSelector);
result.init();
return result;
}
/**
* @see javax.jms.Session#createDurableSubscriber(javax.jms.Topic, java.lang.String)
*/
@Override
public TopicSubscriber createDurableSubscriber(Topic topic, String name) throws JMSException {
return createDurableSubscriber(topic, name, null, false);
}
/**
* @see javax.jms.Session#createDurableSubscriber(javax.jms.Topic, java.lang.String, java.lang.String, boolean)
*/
@Override
public TopicSubscriber createDurableSubscriber(Topic topic, String name, String messageSelector, boolean noLocal) throws JMSException {
checkClosed();
checkDestination(topic);
checkClientIDWasSetExplicitly();
messageSelector = checkSelector(messageSelector);
JmsDestination dest = JmsMessageTransformation.transformDestination(connection, topic);
JmsTopicSubscriber result = new JmsDurableTopicSubscriber(getNextConsumerId(), this, dest, name, noLocal, messageSelector);
result.init();
return result;
}
/**
* @see javax.jms.Session#createDurableConsumer(javax.jms.Topic, java.lang.String)
*/
@Override
public MessageConsumer createDurableConsumer(Topic topic, String name) throws JMSException {
return createDurableSubscriber(topic, name, null, false);
}
/**
* @see javax.jms.Session#createDurableConsumer(javax.jms.Topic, java.lang.String, java.lang.String, boolean)
*/
@Override
public MessageConsumer createDurableConsumer(Topic topic, String name, String messageSelector, boolean noLocal) throws JMSException {
return createDurableSubscriber(topic, name, messageSelector, noLocal);
}
protected void checkClientIDWasSetExplicitly() throws IllegalStateException {
if (!connection.isExplicitClientID()) {
throw new IllegalStateException("You must specify a unique clientID for the Connection to use a DurableSubscriber");
}
}
/**
* @see javax.jms.Session#unsubscribe(java.lang.String)
*/
@Override
public void unsubscribe(String name) throws JMSException {
checkClosed();
connection.unsubscribe(name);
}
/**
* @see javax.jms.Session#createSharedConsumer(javax.jms.Topic, java.lang.String)
*/
@Override
public MessageConsumer createSharedConsumer(Topic topic, String name) throws JMSException {
checkClosed();
return createSharedConsumer(topic, name, null);
}
/**
* @see javax.jms.Session#createSharedConsumer(javax.jms.Topic, java.lang.String, java.lang.String)
*/
@Override
public MessageConsumer createSharedConsumer(Topic topic, String name, String selector) throws JMSException {
checkClosed();
checkDestination(topic);
selector = checkSelector(selector);
JmsDestination dest = JmsMessageTransformation.transformDestination(connection, topic);
JmsMessageConsumer result = new JmsSharedMessageConsumer(getNextConsumerId(), this, dest, name, selector);
result.init();
return result;
}
/**
* @see javax.jms.Session#createSharedDurableConsumer(javax.jms.Topic, java.lang.String)
*/
@Override
public MessageConsumer createSharedDurableConsumer(Topic topic, String name) throws JMSException {
checkClosed();
return createSharedDurableConsumer(topic, name, null);
}
/**
* @see javax.jms.Session#createSharedDurableConsumer(javax.jms.Topic, java.lang.String, java.lang.String)
*/
@Override
public MessageConsumer createSharedDurableConsumer(Topic topic, String name, String selector) throws JMSException {
checkClosed();
checkDestination(topic);
selector = checkSelector(selector);
JmsDestination dest = JmsMessageTransformation.transformDestination(connection, topic);
JmsMessageConsumer result = new JmsSharedDurableMessageConsumer(getNextConsumerId(), this, dest, name, selector);
result.init();
return result;
}
//////////////////////////////////////////////////////////////////////////
// Producer creation
//////////////////////////////////////////////////////////////////////////
/**
* @see javax.jms.Session#createProducer(javax.jms.Destination)
*/
@Override
public MessageProducer createProducer(Destination destination) throws JMSException {
checkClosed();
JmsDestination dest = JmsMessageTransformation.transformDestination(connection, destination);
JmsMessageProducer result = new JmsMessageProducer(getNextProducerId(), this, dest);
return result;
}
/**
* @see javax.jms.QueueSession#createSender(javax.jms.Queue)
*/
@Override
public QueueSender createSender(Queue queue) throws JMSException {
checkClosed();
JmsDestination dest = JmsMessageTransformation.transformDestination(connection, queue);
JmsQueueSender result = new JmsQueueSender(getNextProducerId(), this, dest);
return result;
}
/**
* @see javax.jms.TopicSession#createPublisher(javax.jms.Topic)
*/
@Override
public TopicPublisher createPublisher(Topic topic) throws JMSException {
checkClosed();
JmsDestination dest = JmsMessageTransformation.transformDestination(connection, topic);
JmsTopicPublisher result = new JmsTopicPublisher(getNextProducerId(), this, dest);
return result;
}
//////////////////////////////////////////////////////////////////////////
// Message creation
//////////////////////////////////////////////////////////////////////////
@Override
public BytesMessage createBytesMessage() throws JMSException {
checkClosed();
return init(connection.getMessageFactory().createBytesMessage());
}
@Override
public MapMessage createMapMessage() throws JMSException {
checkClosed();
return init(connection.getMessageFactory().createMapMessage());
}
@Override
public Message createMessage() throws JMSException {
checkClosed();
return init(connection.getMessageFactory().createMessage());
}
@Override
public ObjectMessage createObjectMessage() throws JMSException {
checkClosed();
return init(connection.getMessageFactory().createObjectMessage(null));
}
@Override
public ObjectMessage createObjectMessage(Serializable object) throws JMSException {
checkClosed();
return init(connection.getMessageFactory().createObjectMessage(object));
}
@Override
public StreamMessage createStreamMessage() throws JMSException {
checkClosed();
return init(connection.getMessageFactory().createStreamMessage());
}
@Override
public TextMessage createTextMessage() throws JMSException {
checkClosed();
return init(connection.getMessageFactory().createTextMessage(null));
}
@Override
public TextMessage createTextMessage(String text) throws JMSException {
checkClosed();
return init(connection.getMessageFactory().createTextMessage(text));
}
//////////////////////////////////////////////////////////////////////////
// Destination creation
//////////////////////////////////////////////////////////////////////////
/**
* @see javax.jms.Session#createQueue(java.lang.String)
*/
@Override
public Queue createQueue(String queueName) throws JMSException {
checkClosed();
return new JmsQueue(queueName);
}
/**
* @see javax.jms.Session#createTopic(java.lang.String)
*/
@Override
public Topic createTopic(String topicName) throws JMSException {
checkClosed();
return new JmsTopic(topicName);
}
/**
* @see javax.jms.Session#createTemporaryQueue()
*/
@Override
public TemporaryQueue createTemporaryQueue() throws JMSException {
checkClosed();
return connection.createTemporaryQueue();
}
/**
* @see javax.jms.Session#createTemporaryTopic()
*/
@Override
public TemporaryTopic createTemporaryTopic() throws JMSException {
checkClosed();
return connection.createTemporaryTopic();
}
//----- Session dispatch support -----------------------------------------//
@Override
public void run() {
try {
checkClosed();
} catch (IllegalStateException ex) {
throw new RuntimeException(ex);
}
Consumer<JmsSession> dispatcher = null;
while ((dispatcher = sessionQueue.poll()) != null) {
dispatcher.accept(this);
}
}
//----- Session Implementation methods -----------------------------------//
protected void add(JmsMessageConsumer consumer) {
consumers.put(consumer.getConsumerId(), consumer);
}
protected void remove(JmsMessageConsumer consumer) {
consumers.remove(consumer.getConsumerId());
}
protected JmsMessageConsumer lookup(JmsConsumerId consumerId) {
return consumers.get(consumerId);
}
protected void add(JmsMessageProducer producer) {
producers.put(producer.getProducerId(), producer);
}
protected void remove(JmsMessageProducer producer) {
producers.remove(producer.getProducerId());
}
protected JmsMessageProducer lookup(JmsProducerId producerId) {
return producers.get(producerId);
}
protected void onException(Exception ex) {
connection.onException(ex);
}
protected void send(JmsMessageProducer producer, Destination dest, Message msg, int deliveryMode, int priority, long timeToLive, boolean disableMsgId, boolean disableTimestamp, long deliveryDelay, CompletionListener listener) throws JMSException {
if (dest == null) {
throw new InvalidDestinationException("Destination must not be null");
}
if (msg == null) {
throw new MessageFormatException("Message must not be null");
}
JmsDestination destination = JmsMessageTransformation.transformDestination(connection, dest);
if (destination.isTemporary() && ((JmsTemporaryDestination) destination).isDeleted()) {
throw new IllegalStateException("Temporary destination has been deleted");
}
send(producer, destination, msg, deliveryMode, priority, timeToLive, disableMsgId, disableTimestamp, deliveryDelay, listener);
}
private void send(JmsMessageProducer producer, JmsDestination destination, Message original, int deliveryMode, int priority, long timeToLive, boolean disableMsgId, boolean disableTimestamp, long deliveryDelay, CompletionListener listener) throws JMSException {
sendLock.lock();
JmsMessage outbound = null;
try {
original.setJMSDeliveryMode(deliveryMode);
original.setJMSPriority(priority);
original.setJMSRedelivered(false);
original.setJMSDestination(destination);
long timeStamp = System.currentTimeMillis();
boolean hasTTL = timeToLive > Message.DEFAULT_TIME_TO_LIVE;
boolean hasDelay = deliveryDelay > Message.DEFAULT_DELIVERY_DELAY;
boolean isJmsMessage = original instanceof JmsMessage;
if (!disableTimestamp) {
original.setJMSTimestamp(timeStamp);
} else {
original.setJMSTimestamp(0);
}
if (hasTTL) {
original.setJMSExpiration(timeStamp + timeToLive);
} else {
original.setJMSExpiration(0);
}
long messageSequence = producer.getNextMessageSequence();
Object messageId = null;
if (!disableMsgId) {
messageId = producer.getMessageIDBuilder().createMessageID(producer.getProducerId().toString(), messageSequence);
}
if (isJmsMessage) {
outbound = (JmsMessage) original;
} else {
// Transform and assign the Destination as one of our own destination objects.
outbound = JmsMessageTransformation.transformMessage(connection, original);
outbound.setJMSDestination(destination);
}
// Set the delivery time. Purposefully avoided doing this earlier so
// that we use the 'outbound' JmsMessage object reference when
// updating our own message instances, avoids using the interface
// in case the JMS 1.1 Message API is actually being used due to
// being on the classpath too.
long deliveryTime = timeStamp;
if (hasDelay) {
deliveryTime = timeStamp + deliveryDelay;
}
outbound.getFacade().setDeliveryTime(deliveryTime, hasDelay);
if(!isJmsMessage) {
// If the original was a foreign message, we still need to update it too.
setForeignMessageDeliveryTime(original, deliveryTime);
}
// Set the message ID
outbound.getFacade().setProviderMessageIdObject(messageId);
if (!isJmsMessage) {
// If the original was a foreign message, we still need to update it
// with the properly encoded Message ID String, get it from the one
// we transformed from now that it is set.
original.setJMSMessageID(outbound.getJMSMessageID());
}
// If configured set the User ID using the value we have encoded and cached,
// otherwise clear to prevent caller from spoofing the user ID value.
if (connection.isPopulateJMSXUserID()) {
outbound.getFacade().setUserIdBytes(connection.getEncodedUsername());
} else {
outbound.getFacade().setUserId(null);
}
boolean sync = connection.isForceSyncSend() ||
(!connection.isForceAsyncSend() && deliveryMode == DeliveryMode.PERSISTENT && !getTransacted());
outbound.onSend(timeToLive);
JmsOutboundMessageDispatch envelope = new JmsOutboundMessageDispatch();
envelope.setMessage(outbound);
envelope.setPayload(outbound.getFacade().encodeMessage());
envelope.setProducerId(producer.getProducerId());
envelope.setDestination(destination);
envelope.setSendAsync(listener == null ? !sync : true);
envelope.setDispatchId(messageSequence);
envelope.setCompletionRequired(listener != null);
if (producer.isAnonymous()) {
envelope.setPresettle(getPresettlePolicy().isProducerPresttled(this, destination));
} else {
envelope.setPresettle(producer.isPresettled());
}
if (envelope.isSendAsync() && !envelope.isCompletionRequired() && !envelope.isPresettle()) {
envelope.setMessage(outbound.copy());
outbound.onSendComplete();
}
if (envelope.isCompletionRequired()) {
transactionContext.send(connection, envelope, new ProviderSynchronization() {
@Override
public void onPendingSuccess() {
// Provider accepted the send request so new we place the marker in
// the queue so that it can be completed asynchronously.
asyncSendQueue.addLast(new SendCompletion(envelope, listener));
}
@Override
public void onPendingFailure(ProviderException cause) {
// Provider has rejected the send request so we will throw the
// exception that is to follow so no completion will be needed.
}
});
} else {
transactionContext.send(connection, envelope, null);
}
} catch (JMSException jmsEx) {
// Ensure that on failure case the message is returned to usable state for another send attempt.
if(outbound != null) {
outbound.onSendComplete();
}
throw jmsEx;
} finally {
sendLock.unlock();
}
}
private void setForeignMessageDeliveryTime(Message foreignMessage, long deliveryTime) throws JMSException {
// Verify if the setJMSDeliveryTime method exists, i.e the foreign provider isn't only JMS 1.1.
Method deliveryTimeMethod = null;
try {
Class<?> clazz = foreignMessage.getClass();
Method method = clazz.getMethod("setJMSDeliveryTime", new Class[] { long.class });
if (!Modifier.isAbstract(method.getModifiers())) {
deliveryTimeMethod = method;
}
} catch (NoSuchMethodException e) {
// Assume its a JMS 1.1 Message, we will no-op.
}
if (deliveryTimeMethod != null) {
// Method exists, isn't abstract, so use it.
foreignMessage.setJMSDeliveryTime(deliveryTime);
}
}
JmsInboundMessageDispatch acknowledge(JmsInboundMessageDispatch envelope, ACK_TYPE ackType) throws JMSException {
transactionContext.acknowledge(connection, envelope, ackType);
return envelope;
}
/**
* Acknowledge all previously delivered messages in this Session as consumed. This
* method is usually only called when the Session is in the CLIENT_ACKNOWLEDGE mode.
*
* @param ackType
* The type of acknowledgement being done.
*
* @throws JMSException if an error occurs while the acknowledge is processed.
*/
void acknowledge(ACK_TYPE ackType) throws JMSException {
if (isTransacted()) {
throw new IllegalStateException("Session acknowledge called inside a transacted Session");
}
this.connection.acknowledge(sessionInfo.getId(), ackType);
}
/**
* Acknowledge a specific delivered message in this Session as consumed. This
* method is usually only called when the Session is in the individual ack mode.
*
* @param ackType
* The type of acknowledgement being done.
* @param envelope
* The message envelope.
*
* @throws JMSException if an error occurs while the acknowledge is processed.
*/
void acknowledgeIndividual(ACK_TYPE ackType, JmsInboundMessageDispatch envelope) throws JMSException {
if (isTransacted()) {
throw new IllegalStateException("Message acknowledge called inside a transacted Session");
}
this.connection.acknowledge(envelope, ackType);
}
public boolean isClosed() {
return closed.get();
}
/**
* Checks whether the session uses transactions.
*
* @return true if the session uses transactions.
*/
public boolean isTransacted() {
return acknowledgementMode == Session.SESSION_TRANSACTED;
}
/**
* Checks whether the session used client acknowledgment.
*
* @return true if the session uses client acknowledgment.
*/
public boolean isClientAcknowledge() {
return acknowledgementMode == Session.CLIENT_ACKNOWLEDGE;
}
/**
* Checks whether the session used auto acknowledgment.
*
* @return true if the session uses client acknowledgment.
*/
public boolean isAutoAcknowledge() {
return acknowledgementMode == Session.AUTO_ACKNOWLEDGE;
}
/**
* Checks whether the session used dup ok acknowledgment.
*
* @return true if the session uses client acknowledgment.
*/
public boolean isDupsOkAcknowledge() {
return acknowledgementMode == Session.DUPS_OK_ACKNOWLEDGE;
}
/**
* Checks whether the session uses presettlement for all consumers.
*
* @return true if the session is using a presettlement for consumers.
*/
public boolean isNoAcknowledge() {
return acknowledgementMode == NO_ACKNOWLEDGE ||
acknowledgementMode == ARTEMIS_PRE_ACKNOWLEDGE;
}
/**
* Checks whether the session used individual acknowledgment mode.
*
* @return true if the session uses individual acknowledgment.
*/
public boolean isIndividualAcknowledge() {
return acknowledgementMode == INDIVIDUAL_ACKNOWLEDGE;
}
protected void checkClosed() throws IllegalStateException {
if (closed.get()) {
IllegalStateException jmsEx = null;
if (failureCause.get() == null) {
jmsEx = new IllegalStateException("The Session is closed");
} else {
jmsEx = new IllegalStateException("The Session was closed due to an unrecoverable error.");
jmsEx.initCause(failureCause.get());
}
throw jmsEx;
}
}
static String checkSelector(String selector) throws InvalidSelectorException {
if (selector != null) {
if (selector.trim().length() == 0) {
return null;
}
try {
SelectorParser.parse(selector);
} catch (FilterException e) {
throw new InvalidSelectorException(e.getMessage());
}
}
return selector;
}
public static void checkDestination(Destination dest) throws InvalidDestinationException {
if (dest == null) {
throw new InvalidDestinationException("Destination cannot be null");
}
}
protected void start() throws JMSException {
if (started.compareAndSet(false, true)) {
for (JmsMessageConsumer consumer : consumers.values()) {
consumer.start();
}
}
}
protected void stop() throws JMSException {
started.set(false);
for (JmsMessageConsumer consumer : consumers.values()) {
consumer.stop();
}
synchronized (sessionInfo) {
if (deliveryExecutor != null) {
deliveryExecutor.shutdown();
deliveryExecutor = null;
}
}
}
protected boolean isStarted() {
return started.get();
}
public JmsConnection getConnection() {
return connection;
}
Executor getDispatcherExecutor() {
ThreadPoolExecutor exec = deliveryExecutor;
if (exec == null) {
synchronized (sessionInfo) {
if (deliveryExecutor == null) {
if (!closed.get()) {
deliveryExecutor = exec = createExecutor("delivery dispatcher", deliveryThread);
} else {
return NoOpExecutor.INSTANCE;
}
} else {
exec = deliveryExecutor;
}
}
}
return exec;
}
private ExecutorService getCompletionExecutor() {
ThreadPoolExecutor exec = completionExcecutor;
if (exec == null) {
synchronized (sessionInfo) {
exec = completionExcecutor;
if (exec == null) {
exec = createExecutor("completion dispatcher", completionThread);
// Ensure work thread is fully up before allowing other threads
// to attempt to execute on this instance.
Future<?> starter = exec.submit(() -> {});
try {
starter.get();
} catch (InterruptedException | ExecutionException e) {
LOG.trace("Completion Executor starter task failed: {}", e.getMessage());
}
completionExcecutor = exec;
}
}
}
return exec;
}
private ThreadPoolExecutor createExecutor(final String threadNameSuffix, AtomicReference<Thread> threadTracker) {
ThreadPoolExecutor executor = new ThreadPoolExecutor(1, 1, 5, TimeUnit.SECONDS, new LinkedBlockingQueue<Runnable>(),
new QpidJMSThreadFactory("JmsSession ["+ sessionInfo.getId() + "] " + threadNameSuffix, true, threadTracker));
executor.setRejectedExecutionHandler(new ThreadPoolExecutor.DiscardOldestPolicy() {
@Override
public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
// Completely ignore the task if the session has closed.
if (!closed.get()) {
LOG.trace("Task {} rejected from executor: {}", r, e);
super.rejectedExecution(r, e);
}
}
});
return executor;
}
protected JmsSessionInfo getSessionInfo() {
return sessionInfo;
}
protected JmsSessionId getSessionId() {
return sessionInfo.getId();
}
protected int getSessionMode() {
return acknowledgementMode;
}
protected JmsConsumerId getNextConsumerId() {
return new JmsConsumerId(sessionInfo.getId(), consumerIdGenerator.incrementAndGet());
}
protected JmsProducerId getNextProducerId() {
return new JmsProducerId(sessionInfo.getId(), producerIdGenerator.incrementAndGet());
}
void setFailureCause(Throwable failureCause) {
this.failureCause.set(failureCause);
}
Throwable getFailureCause() {
return failureCause.get();
}
private <T extends JmsMessage> T init(T message) {
message.setConnection(connection);
message.setValidatePropertyNames(connection.isValidatePropertyNames());
return message;
}
boolean isDestinationInUse(JmsDestination destination) {
for (JmsMessageConsumer consumer : consumers.values()) {
if (consumer.isUsingDestination(destination)) {
return true;
}
}
return false;
}
void checkMessageListener() throws JMSException {
if (messageListener != null) {
throw new IllegalStateException("Cannot synchronously receive a message when a MessageListener is set");
}
for (JmsMessageConsumer consumer : consumers.values()) {
if (consumer.hasMessageListener()) {
throw new IllegalStateException("Cannot synchronously receive a message when a MessageListener is set");
}
}
}
void setDeliveryThreadCheckEnabled(boolean enabled) {
deliveryThreadCheckEnabled = enabled;
}
void checkIsDeliveryThread() throws JMSException {
if (deliveryThreadCheckEnabled && Thread.currentThread().equals(deliveryThread.get())) {
throw new IllegalStateException("Illegal invocation from MessageListener callback");
}
}
void checkIsCompletionThread() throws JMSException {
if (Thread.currentThread().equals(completionThread.get())) {
throw new IllegalStateException("Illegal invocation from CompletionListener callback");
}
}
public JmsMessageIDPolicy getMessageIDPolicy() {
return sessionInfo.getMessageIDPolicy();
}
public JmsPrefetchPolicy getPrefetchPolicy() {
return sessionInfo.getPrefetchPolicy();
}
public JmsPresettlePolicy getPresettlePolicy() {
return sessionInfo.getPresettlePolicy();
}
public JmsRedeliveryPolicy getRedeliveryPolicy() {
return sessionInfo.getRedeliveryPolicy();
}
public JmsDeserializationPolicy getDeserializationPolicy() {
return sessionInfo.getDeserializationPolicy();
}
/**
* Sets the transaction context of the session.
*
* @param transactionContext
* provides the means to control a JMS transaction.
*/
public void setTransactionContext(JmsTransactionContext transactionContext) {
this.transactionContext = transactionContext;
}
/**
* Returns the transaction context of the session.
*
* @return transactionContext
* session's transaction context.
*/
public JmsTransactionContext getTransactionContext() {
return transactionContext;
}
boolean isSessionRecovered() {
return sessionRecovered;
}
void clearSessionRecovered() {
sessionRecovered = false;
}
static void validateSessionMode(int mode) {
switch (mode) {
case JMSContext.AUTO_ACKNOWLEDGE:
case JMSContext.CLIENT_ACKNOWLEDGE:
case JMSContext.DUPS_OK_ACKNOWLEDGE:
case JMSContext.SESSION_TRANSACTED:
case INDIVIDUAL_ACKNOWLEDGE:
case ARTEMIS_PRE_ACKNOWLEDGE:
case NO_ACKNOWLEDGE:
return;
default:
throw new JMSRuntimeException("Invalid Session Mode: " + mode);
}
}
boolean redeliveryExceeded(JmsInboundMessageDispatch envelope) {
LOG.trace("checking envelope with {} redeliveries", envelope.getRedeliveryCount());
JmsConsumerInfo consumerInfo = envelope.getConsumerInfo();
JmsRedeliveryPolicy redeliveryPolicy = consumerInfo.getRedeliveryPolicy();
return redeliveryPolicy != null &&
redeliveryPolicy.getMaxRedeliveries(consumerInfo.getDestination()) >= 0 &&
redeliveryPolicy.getMaxRedeliveries(consumerInfo.getDestination()) < envelope.getRedeliveryCount();
}
//----- Event handlers ---------------------------------------------------//
@Override
public void onInboundMessage(JmsInboundMessageDispatch envelope) {
deliver(envelope);
}
protected void onCompletedMessageSend(final JmsOutboundMessageDispatch envelope) {
getCompletionExecutor().execute(new AsyncCompletionTask(envelope));
}
protected void onFailedMessageSend(final JmsOutboundMessageDispatch envelope, final Throwable cause) {
getCompletionExecutor().execute(new AsyncCompletionTask(envelope, cause));
}
protected void onConnectionInterrupted() {
transactionContext.onConnectionInterrupted();
// TODO - Synthesize a better exception
JMSException failureCause = new JMSException("Send failed due to connection loss");
getCompletionExecutor().execute(new FailOrCompleteAsyncCompletionsTask(failureCause));
for (JmsMessageProducer producer : producers.values()) {
producer.onConnectionInterrupted();
}
for (JmsMessageConsumer consumer : consumers.values()) {
consumer.onConnectionInterrupted();
}
}
protected void onConnectionRecovery(Provider provider) throws Exception {
if (!sessionInfo.isClosed()) {
ProviderFuture request = provider.newProviderFuture();
provider.create(sessionInfo, request);
request.sync();
transactionContext.onConnectionRecovery(provider);
for (JmsMessageProducer producer : producers.values()) {
producer.onConnectionRecovery(provider);
}
for (JmsMessageConsumer consumer : consumers.values()) {
consumer.onConnectionRecovery(provider);
}
}
}
protected void onConnectionRecovered(Provider provider) throws Exception {
for (JmsMessageProducer producer : producers.values()) {
producer.onConnectionRecovered(provider);
}
for (JmsMessageConsumer consumer : consumers.values()) {
consumer.onConnectionRecovered(provider);
}
}
protected void onConnectionRestored() {
for (JmsMessageProducer producer : producers.values()) {
producer.onConnectionRestored();
}
for (JmsMessageConsumer consumer : consumers.values()) {
consumer.onConnectionRestored();
}
}
private void deliver(JmsInboundMessageDispatch envelope) {
JmsConsumerId id = envelope.getConsumerId();
if (id == null) {
this.connection.onException(new JMSException("No ConsumerId set for " + envelope.getMessage()));
}
JmsMessageConsumer consumer = consumers.get(id);
if (consumer != null) {
consumer.onInboundMessage(envelope);
}
}
void enqueueInSession(Consumer<JmsSession> dispatcher) {
sessionQueue.add(dispatcher);
}
//----- Asynchronous Send Helpers ----------------------------------------//
private final class FailOrCompleteAsyncCompletionsTask implements Runnable {
private final JMSException failureCause;
private final JmsProducerId producerId;
public FailOrCompleteAsyncCompletionsTask(JMSException failureCause) {
this(null, failureCause);
}
public FailOrCompleteAsyncCompletionsTask(JmsProducerId producerId, JMSException failureCause) {
this.failureCause = failureCause;
this.producerId = producerId;
}
@Override
public void run() {
// For any completion that is not yet marked as complete we fail it
// otherwise we send the already marked completion state event.
Iterator<SendCompletion> pending = asyncSendQueue.iterator();
while (pending.hasNext()) {
SendCompletion completion = pending.next();
if (producerId == null || producerId.equals(completion.envelope.getProducerId())) {
if (!completion.hasCompleted()) {
completion.markAsFailed(failureCause);
}
try {
completion.signalCompletion();
} catch (Throwable error) {
LOG.error("Failure while performing completion for send: {}", completion.envelope, error);
} finally {
LOG.trace("Signaled completion of send: {}", completion.envelope);
}
}
}
// Only clear on non-discriminating variant to avoid losing track of completions.
if (producerId == null) {
asyncSendQueue.clear();
}
}
}
private final class AsyncCompletionTask implements Runnable {
private final JmsOutboundMessageDispatch envelope;
private final Throwable cause;
public AsyncCompletionTask(JmsOutboundMessageDispatch envelope) {
this(envelope, null);
}
public AsyncCompletionTask(JmsOutboundMessageDispatch envelope, Throwable cause) {
this.envelope = envelope;
this.cause = cause;
}
@Override
public void run() {
try {
SendCompletion completion = asyncSendQueue.peek();
if (completion.getEnvelope().getDispatchId() == envelope.getDispatchId()) {
try {
completion = asyncSendQueue.remove();
if (cause == null) {
completion.markAsComplete();
} else {
completion.markAsFailed(JmsExceptionSupport.create(cause));
}
completion.signalCompletion();
} catch (Throwable error) {
LOG.error("Failure while performing completion for send: {}", envelope, error);
}
// Signal any trailing completions that have been marked complete
// before this one was that they have now that the one in front has
Iterator<SendCompletion> pending = asyncSendQueue.iterator();
while (pending.hasNext()) {
completion = pending.next();
if (completion.hasCompleted()) {
try {
completion.signalCompletion();
} catch (Throwable error) {
LOG.error("Failure while performing completion for send: {}", envelope, error);
} finally {
pending.remove();
}
} else {
break;
}
}
} else {
// Not head so mark as complete and wait for the one in front to send
// the notification of completion.
Iterator<SendCompletion> pending = asyncSendQueue.iterator();
while (pending.hasNext()) {
completion = pending.next();
if (completion.getEnvelope().getDispatchId() == envelope.getDispatchId()) {
if (cause == null) {
completion.markAsComplete();
} else {
completion.markAsFailed(JmsExceptionSupport.create(cause));
}
}
}
}
} catch (Exception ex) {
LOG.error("Async completion task encountered unexpected failure", ex);
}
}
}
private final class SendCompletion {
private final JmsOutboundMessageDispatch envelope;
private final CompletionListener listener;
private Exception failureCause;
private boolean completed;
public SendCompletion(JmsOutboundMessageDispatch envelope, CompletionListener listener) {
this.envelope = envelope;
this.listener = listener;
}
public void markAsComplete() {
completed = true;
}
public void markAsFailed(Exception cause) {
completed = true;
failureCause = cause;
}
public boolean hasCompleted() {
return completed;
}
public void signalCompletion() {
JmsMessage message = envelope.getMessage();
message.onSendComplete(); // Ensure message is returned as readable.
if (failureCause == null) {
try {
listener.onCompletion(message);
} catch (Exception ex) {
LOG.trace("CompletionListener threw exception from onCompletion for send {}", envelope, ex);
}
} else {
try {
listener.onException(message, failureCause);
} catch (Exception ex) {
LOG.trace("CompletionListener threw exception from onException for send {}", envelope, ex);
}
}
}
public JmsOutboundMessageDispatch getEnvelope() {
return envelope;
}
}
}