blob: 054744005b153fb602bc6f2336d3fbd26f1567d8 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.qpid.jms;
import java.util.List;
import java.util.concurrent.Callable;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
import javax.jms.IllegalStateException;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.MessageListener;
import javax.jms.Session;
import org.apache.qpid.jms.exceptions.JmsExceptionSupport;
import org.apache.qpid.jms.message.JmsInboundMessageDispatch;
import org.apache.qpid.jms.message.JmsMessage;
import org.apache.qpid.jms.meta.JmsConsumerId;
import org.apache.qpid.jms.meta.JmsConsumerInfo;
import org.apache.qpid.jms.provider.Provider;
import org.apache.qpid.jms.provider.ProviderConstants.ACK_TYPE;
import org.apache.qpid.jms.provider.ProviderFuture;
import org.apache.qpid.jms.util.FifoMessageQueue;
import org.apache.qpid.jms.util.MessageQueue;
import org.apache.qpid.jms.util.PriorityMessageQueue;
/**
* implementation of a JMS Message Consumer
*/
public class JmsMessageConsumer implements MessageConsumer, JmsMessageAvailableConsumer, JmsMessageDispatcher {
protected final JmsSession session;
protected final JmsConnection connection;
protected JmsConsumerInfo consumerInfo;
protected final int acknowledgementMode;
protected final AtomicBoolean closed = new AtomicBoolean();
protected boolean started;
protected MessageListener messageListener;
protected JmsMessageAvailableListener availableListener;
protected final MessageQueue messageQueue;
protected final Lock lock = new ReentrantLock();
protected final AtomicBoolean suspendedConnection = new AtomicBoolean();
protected final AtomicBoolean delivered = new AtomicBoolean();
/**
* Create a non-durable MessageConsumer
*
* @param consumerId
* @param session
* @param destination
* @param selector
* @param noLocal
* @throws JMSException
*/
protected JmsMessageConsumer(JmsConsumerId consumerId, JmsSession session, JmsDestination destination,
String selector, boolean noLocal) throws JMSException {
this(consumerId, session, destination, null, selector, noLocal);
}
/**
* Create a MessageConsumer which could be durable.
*
* @param consumerId
* @param session
* @param destination
* @param name
* @param selector
* @param noLocal
* @throws JMSException
*/
protected JmsMessageConsumer(JmsConsumerId consumerId, JmsSession session, JmsDestination destination,
String name, String selector, boolean noLocal) throws JMSException {
this.session = session;
this.connection = session.getConnection();
this.acknowledgementMode = session.acknowledgementMode();
if(destination.isTemporary()) {
connection.checkConsumeFromTemporaryDestination((JmsTemporaryDestination) destination);
}
if (connection.isLocalMessagePriority()) {
this.messageQueue = new PriorityMessageQueue();
} else {
this.messageQueue = new FifoMessageQueue();
}
JmsPrefetchPolicy policy = this.connection.getPrefetchPolicy();
this.consumerInfo = new JmsConsumerInfo(consumerId);
this.consumerInfo.setClientId(connection.getClientID());
this.consumerInfo.setSelector(selector);
this.consumerInfo.setSubscriptionName(name);
this.consumerInfo.setDestination(destination);
this.consumerInfo.setAcknowledgementMode(acknowledgementMode);
this.consumerInfo.setNoLocal(noLocal);
this.consumerInfo.setBrowser(isBrowser());
this.consumerInfo.setPrefetchSize(getConfiguredPrefetch(destination, policy));
try {
this.consumerInfo = session.getConnection().createResource(consumerInfo);
} catch (JMSException ex) {
throw ex;
}
}
public void init() throws JMSException {
session.add(this);
startConsumerResource();
}
private void startConsumerResource() throws JMSException {
try {
session.getConnection().startResource(consumerInfo);
} catch (JMSException ex) {
session.remove(this);
throw ex;
}
}
/**
* @throws JMSException
* @see javax.jms.MessageConsumer#close()
*/
@Override
public void close() throws JMSException {
if (!closed.get()) {
if (delivered.get() && session.getTransactionContext().isInTransaction()) {
session.getTransactionContext().addSynchronization(new JmsTxSynchronization() {
@Override
public void afterCommit() throws Exception {
doClose();
}
@Override
public void afterRollback() throws Exception {
doClose();
}
});
} else {
doClose();
}
}
}
/**
* Called to initiate shutdown of Producer resources and request that the remote
* peer remove the registered producer.
*
* @throws JMSException
*/
protected void doClose() throws JMSException {
shutdown();
this.connection.destroyResource(consumerInfo);
}
/**
* Called to release all producer resources without requiring a destroy request
* to be sent to the remote peer. This is most commonly needed when the parent
* Session is closing.
*
* @throws JMSException
*/
protected void shutdown() throws JMSException {
if (closed.compareAndSet(false, true)) {
this.session.remove(this);
stop(true);
}
}
/**
* @return a Message or null if closed during the operation
* @throws JMSException
* @see javax.jms.MessageConsumer#receive()
*/
@Override
public Message receive() throws JMSException {
checkClosed();
checkMessageListener();
sendPullCommand(0);
try {
return copy(ackFromReceive(this.messageQueue.dequeue(-1)));
} catch (Exception e) {
throw JmsExceptionSupport.create(e);
}
}
/**
* @param timeout
* @return a Message or null
* @throws JMSException
* @see javax.jms.MessageConsumer#receive(long)
*/
@Override
public Message receive(long timeout) throws JMSException {
checkClosed();
checkMessageListener();
sendPullCommand(timeout);
if (timeout > 0) {
try {
return copy(ackFromReceive(this.messageQueue.dequeue(timeout)));
} catch (InterruptedException e) {
throw JmsExceptionSupport.create(e);
}
}
return null;
}
/**
* @return a Message or null
* @throws JMSException
* @see javax.jms.MessageConsumer#receiveNoWait()
*/
@Override
public Message receiveNoWait() throws JMSException {
checkClosed();
checkMessageListener();
sendPullCommand(-1);
return copy(ackFromReceive(this.messageQueue.dequeueNoWait()));
}
protected void checkClosed() throws IllegalStateException {
if (this.closed.get()) {
throw new IllegalStateException("The MessageConsumer is closed");
}
}
JmsMessage copy(final JmsInboundMessageDispatch envelope) throws JMSException {
if (envelope == null || envelope.getMessage() == null) {
return null;
}
return envelope.getMessage().copy();
}
JmsInboundMessageDispatch ackFromReceive(final JmsInboundMessageDispatch envelope) throws JMSException {
if (envelope != null && envelope.getMessage() != null) {
JmsMessage message = envelope.getMessage();
if (message.getAcknowledgeCallback() != null) {
// Message has been received by the app.. expand the credit
// window so that we receive more messages.
doAckDelivered(envelope);
} else {
doAckConsumed(envelope);
}
// Tags that we have delivered and can't close if in a TX Session.
delivered.set(true);
}
return envelope;
}
private JmsInboundMessageDispatch doAckConsumed(final JmsInboundMessageDispatch envelope) throws JMSException {
checkClosed();
try {
session.acknowledge(envelope, ACK_TYPE.CONSUMED);
} catch (JMSException ex) {
session.onException(ex);
throw ex;
}
return envelope;
}
private JmsInboundMessageDispatch doAckDelivered(final JmsInboundMessageDispatch envelope) throws JMSException {
try {
session.acknowledge(envelope, ACK_TYPE.DELIVERED);
} catch (JMSException ex) {
session.onException(ex);
throw ex;
}
return envelope;
}
private void doAckReleased(final JmsInboundMessageDispatch envelope) throws JMSException {
try {
session.acknowledge(envelope, ACK_TYPE.RELEASED);
} catch (JMSException ex) {
session.onException(ex);
throw ex;
}
}
/**
* Called from the session when a new Message has been dispatched to this Consumer
* from the connection.
*
* @param envelope
* the newly arrived message.
*/
@Override
public void onInboundMessage(final JmsInboundMessageDispatch envelope) {
lock.lock();
try {
if (acknowledgementMode == Session.CLIENT_ACKNOWLEDGE) {
envelope.getMessage().setAcknowledgeCallback(new Callable<Void>() {
@Override
public Void call() throws Exception {
if (session.isClosed()) {
throw new javax.jms.IllegalStateException("Session closed.");
}
session.acknowledge();
envelope.getMessage().setAcknowledgeCallback(null);
return null;
}
});
}
if(envelope.isEnqueueFirst()) {
this.messageQueue.enqueueFirst(envelope);
} else {
this.messageQueue.enqueue(envelope);
}
} finally {
lock.unlock();
}
if (this.messageListener != null && this.started) {
session.getExecutor().execute(new MessageDeliverTask());
} else {
if (availableListener != null) {
availableListener.onMessageAvailable(this);
}
}
}
public void start() {
lock.lock();
try {
this.started = true;
this.messageQueue.start();
drainMessageQueueToListener();
} finally {
lock.unlock();
}
}
public void stop() {
stop(false);
}
private void stop(boolean closeMessageQueue) {
lock.lock();
try {
this.started = false;
if (closeMessageQueue) {
this.messageQueue.close();
} else {
this.messageQueue.stop();
}
} finally {
lock.unlock();
}
}
void suspendForRollback() throws JMSException {
stop();
session.getConnection().stopResource(consumerInfo);
}
void resumeAfterRollback() throws JMSException {
if (!this.messageQueue.isEmpty()) {
List<JmsInboundMessageDispatch> drain = this.messageQueue.removeAll();
for (JmsInboundMessageDispatch envelope : drain) {
doAckReleased(envelope);
}
drain.clear();
}
start();
startConsumerResource();
}
void drainMessageQueueToListener() {
if (this.messageListener != null && this.started) {
session.getExecutor().execute(new MessageDeliverTask());
}
}
/**
* @return the id
*/
public JmsConsumerId getConsumerId() {
return this.consumerInfo.getConsumerId();
}
/**
* @return the Destination
*/
public JmsDestination getDestination() {
return this.consumerInfo.getDestination();
}
@Override
public MessageListener getMessageListener() throws JMSException {
checkClosed();
return this.messageListener;
}
/**
* @param listener
* @throws JMSException
* @see javax.jms.MessageConsumer#setMessageListener(javax.jms.MessageListener)
*/
@Override
public void setMessageListener(MessageListener listener) throws JMSException {
checkClosed();
if (consumerInfo.getPrefetchSize() == 0) {
throw new JMSException("Illegal prefetch size of zero. This setting is not supported" +
"for asynchronous consumers please set a value of at least 1");
}
this.messageListener = listener;
drainMessageQueueToListener();
}
/**
* @return the Message Selector
* @throws JMSException
* @see javax.jms.MessageConsumer#getMessageSelector()
*/
@Override
public String getMessageSelector() throws JMSException {
checkClosed();
return this.consumerInfo.getSelector();
}
/**
* Gets the configured prefetch size for this consumer.
* @return the prefetch size configuration for this consumer.
*/
public int getPrefetchSize() {
return this.consumerInfo.getPrefetchSize();
}
protected void checkMessageListener() throws JMSException {
session.checkMessageListener();
}
boolean hasMessageListener() {
return this.messageListener != null;
}
boolean isUsingDestination(JmsDestination destination) {
return this.consumerInfo.getDestination().equals(destination);
}
protected int getMessageQueueSize() {
return this.messageQueue.size();
}
protected boolean isNoLocal() {
return this.consumerInfo.isNoLocal();
}
public boolean isDurableSubscription() {
return false;
}
public boolean isBrowser() {
return false;
}
@Override
public void setAvailableListener(JmsMessageAvailableListener availableListener) {
this.availableListener = availableListener;
}
@Override
public JmsMessageAvailableListener getAvailableListener() {
return availableListener;
}
protected void onConnectionInterrupted() {
messageQueue.clear();
}
protected void onConnectionRecovery(Provider provider) throws Exception {
ProviderFuture request = new ProviderFuture();
provider.create(consumerInfo, request);
request.sync();
}
protected void onConnectionRecovered(Provider provider) throws Exception {
ProviderFuture request = new ProviderFuture();
provider.start(consumerInfo, request);
request.sync();
}
protected void onConnectionRestored() {
}
/**
* Triggers a pull request from the connected Provider. An attempt is made to set
* a timeout on the pull request however some providers will not honor this value
* and the pull will remain active until a message is dispatched.
*
* The timeout value can be one of:
*
* < 0 to indicate that the request should expire immediately if no message.
* = 0 to indicate that the request should never time out.
* > 1 to indicate that the request should expire after the given time in milliseconds.
*
* @param timeout
* The amount of time the pull request should remain valid.
*/
protected void sendPullCommand(long timeout) throws JMSException {
if (messageQueue.isEmpty() && (getPrefetchSize() == 0 || isBrowser())) {
connection.pull(getConsumerId(), timeout);
}
}
private int getConfiguredPrefetch(JmsDestination destination, JmsPrefetchPolicy policy) {
int prefetch = 0;
if (destination.isTopic()) {
if (isDurableSubscription()) {
prefetch = policy.getDurableTopicPrefetch();
} else {
prefetch = policy.getTopicPrefetch();
}
} else {
if (isBrowser()) {
prefetch = policy.getQueueBrowserPrefetch();
} else {
prefetch = policy.getQueuePrefetch();
}
}
return prefetch;
}
private final class MessageDeliverTask implements Runnable {
@Override
public void run() {
JmsInboundMessageDispatch envelope;
while (session.isStarted() && (envelope = messageQueue.dequeueNoWait()) != null) {
try {
JmsMessage copy = null;
boolean autoAckOrDupsOk = acknowledgementMode == Session.AUTO_ACKNOWLEDGE ||
acknowledgementMode == Session.DUPS_OK_ACKNOWLEDGE;
if (autoAckOrDupsOk) {
copy = copy(doAckDelivered(envelope));
} else {
copy = copy(ackFromReceive(envelope));
}
session.clearSessionRecovered();
messageListener.onMessage(copy);
if (autoAckOrDupsOk && !session.isSessionRecovered()) {
doAckConsumed(envelope);
}
} catch (Exception e) {
// TODO - We need to handle exception of on message with some other
// ack such as rejected and consider adding a redlivery policy
// to control when we might just poison the message with an ack
// of modified set to not deliverable here.
session.getConnection().onException(e);
}
}
}
}
}