blob: d30f51608fc1fac746c11b30414cb589d5100cef [file] [log] [blame]
/*
*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*
*/
package org.apache.qpid.client;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
import javax.jms.InvalidSelectorException;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageListener;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.qpid.QpidException;
import org.apache.qpid.AMQInternalException;
import org.apache.qpid.client.failover.FailoverException;
import org.apache.qpid.client.filter.JMSSelectorFilter;
import org.apache.qpid.client.filter.MessageFilter;
import org.apache.qpid.client.message.AMQMessageDelegateFactory;
import org.apache.qpid.client.message.AbstractJMSMessage;
import org.apache.qpid.client.message.CloseConsumerMessage;
import org.apache.qpid.client.message.MessageFactoryRegistry;
import org.apache.qpid.client.util.JMSExceptionHelper;
import org.apache.qpid.common.AMQPFilterTypes;
import org.apache.qpid.jms.MessageConsumer;
import org.apache.qpid.jms.Session;
import org.apache.qpid.transport.TransportException;
public abstract class BasicMessageConsumer<U> extends Closeable implements MessageConsumer
{
private static final Logger _logger = LoggerFactory.getLogger(BasicMessageConsumer.class);
private final AMQConnection _connection;
private final MessageFilter _messageSelectorFilter;
private final boolean _noLocal;
private AMQDestination _destination;
/**
* When true indicates that a blocking receive call is in progress
*/
private final AtomicBoolean _receiving = new AtomicBoolean(false);
/**
* Holds an atomic reference to the listener installed.
*/
private final AtomicReference<MessageListener> _messageListener = new AtomicReference<MessageListener>();
private String _consumerTag;
private final int _channelId;
private final BlockingQueue _synchronousQueue;
private final MessageFactoryRegistry _messageFactory;
private final AMQSession _session;
/**
* We need to store the "raw" field table so that we can resubscribe in the event of failover being required
*/
private final Map<String,Object> _arguments;
/**
* We store the high water prefetch field in order to be able to reuse it when resubscribing in the event of
* failover
*/
private final int _prefetchHigh;
/**
* We store the low water prefetch field in order to be able to reuse it when resubscribing in the event of
* failover
*/
private final int _prefetchLow;
private boolean _exclusive;
private final int _acknowledgeMode;
/**
* List of tags delievered, The last of which which should be acknowledged on commit in transaction mode.
*/
private ConcurrentLinkedQueue<Long> _receivedDeliveryTags = new ConcurrentLinkedQueue<Long>();
/**
* The thread that was used to call receive(). This is important for being able to interrupt that thread if a
* receive() is in progress.
*/
private volatile Thread _receivingThread;
/**
* Used to store this consumer queue name
* Usefull when more than binding key should be used
*/
private String _queuename;
/**
* autoClose denotes that the consumer will automatically cancel itself when there are no more messages to receive
* on the queue. This is used for queue browsing.
*/
private final boolean _autoClose;
private final boolean _browseOnly;
private boolean _isDurableSubscriber = false;
private int _addressType = AMQDestination.UNKNOWN_TYPE;
protected BasicMessageConsumer(int channelId, AMQConnection connection, AMQDestination destination,
String messageSelector, boolean noLocal, MessageFactoryRegistry messageFactory,
AMQSession session, Map<String,Object> rawSelector,
int prefetchHigh, int prefetchLow, boolean exclusive,
int acknowledgeMode, boolean browseOnly, boolean autoClose) throws JMSException
{
_channelId = channelId;
_connection = connection;
_noLocal = noLocal;
_destination = destination;
_messageFactory = messageFactory;
_session = session;
_prefetchHigh = prefetchHigh;
_prefetchLow = prefetchLow;
_exclusive = exclusive;
_synchronousQueue = new LinkedBlockingQueue();
_autoClose = autoClose;
_browseOnly = browseOnly;
try
{
if (messageSelector == null || "".equals(messageSelector.trim()))
{
_messageSelectorFilter = null;
}
else
{
_messageSelectorFilter = new JMSSelectorFilter(messageSelector);
}
}
catch (final AMQInternalException ie)
{
throw JMSExceptionHelper.chainJMSException(new InvalidSelectorException(
"cannot create consumer because of selector issue"), ie);
}
// Force queue browsers not to use acknowledge modes.
if (_browseOnly)
{
_acknowledgeMode = Session.NO_ACKNOWLEDGE;
}
else
{
_acknowledgeMode = acknowledgeMode;
}
final Map<String,Object> ft = new HashMap<>();
if(destination.getConsumerArguments() != null)
{
ft.putAll(destination.getConsumerArguments());
}
// rawSelector is used by HeadersExchange and is not a JMS Selector
if (rawSelector != null)
{
ft.putAll(rawSelector);
}
if(destination.getLocalAddress() != null)
{
ft.put("local-address", destination.getLocalAddress());
}
// We must always send the selector argument even if empty, so that we can tell when a selector is removed from a
// durable topic subscription that the broker arguments don't match any more. This is because it is not otherwise
// possible to determine when querying the broker whether there are no arguments or just a non-matching selector
// argument, as specifying null for the arguments when querying means they should not be checked at all
ft.put(AMQPFilterTypes.JMS_SELECTOR.getValue(), messageSelector == null ? "" : messageSelector);
if(noLocal)
{
ft.put(AMQPFilterTypes.NO_LOCAL.getValue(), noLocal);
}
_arguments = ft;
_addressType = _destination.getAddressType();
}
public AMQDestination getDestination()
{
return _destination;
}
public String getMessageSelector() throws JMSException
{
checkPreConditions();
return _messageSelectorFilter == null ? null :_messageSelectorFilter.getSelector();
}
public MessageListener getMessageListener() throws JMSException
{
checkPreConditions();
return _messageListener.get();
}
/**
* The acknowledge mode in force for this consumer. Note that the AMQP protocol allows different ack modes per
* consumer whereas JMS defines this at the session level, hence why we associate it with the consumer in our
* implementation.
*/
public int getAcknowledgeMode()
{
return _acknowledgeMode;
}
protected boolean isMessageListenerSet()
{
return _messageListener.get() != null;
}
public void setMessageListener(final MessageListener messageListener) throws JMSException
{
checkPreConditions();
// if the current listener is non-null and the session is not stopped, then
// it is an error to call this method.
// i.e. it is only valid to call this method if
//
// (a) the connection is stopped, in which case the dispatcher is not running
// OR
// (b) the listener is null AND we are not receiving synchronously at present
//
if (!_session.getAMQConnection().started())
{
_messageListener.set(messageListener);
_session.setHasMessageListeners();
if (_logger.isDebugEnabled())
{
_logger.debug(
"Session stopped : Message listener(" + messageListener + ") set for destination " + _destination);
}
}
else
{
if (_receiving.get())
{
throw new javax.jms.IllegalStateException("Another thread is already receiving synchronously.");
}
if (!_messageListener.compareAndSet(null, messageListener))
{
throw new javax.jms.IllegalStateException("Attempt to alter listener while session is started.");
}
if (_logger.isDebugEnabled())
{
_logger.debug("Message listener set for destination " + _destination);
}
if (messageListener != null)
{
//todo: handle case where connection has already been started, and the dispatcher has alreaded started
// putting values on the _synchronousQueue
synchronized (_session)
{
_messageListener.set(messageListener);
_session.setHasMessageListeners();
_session.startDispatcherIfNecessary();
// If we already have messages on the queue, deliver them to the listener
Object o = _synchronousQueue.poll();
while (o != null)
{
notifyMessage((AbstractJMSMessage) o);
o = _synchronousQueue.poll();
}
}
}
}
}
/**
* @param immediate if true then return immediately if the connection is failing over
*
* @return boolean if the acquisition was successful
*
* @throws JMSException if a listener has already been set or another thread is receiving
* @throws InterruptedException if interrupted
*/
private boolean acquireReceiving(boolean immediate) throws JMSException, InterruptedException
{
if (_connection.isFailingOver())
{
if (immediate)
{
return false;
}
else
{
_connection.blockUntilNotFailingOver();
}
}
if (isMessageListenerSet())
{
throw new javax.jms.IllegalStateException("A listener has already been set.");
}
if (!_receiving.compareAndSet(false, true))
{
throw new javax.jms.IllegalStateException("Another thread is already receiving.");
}
_receivingThread = Thread.currentThread();
return true;
}
private void releaseReceiving()
{
_receiving.set(false);
_receivingThread = null;
}
public Map<String,Object> getArguments()
{
return _arguments;
}
public int getPrefetch()
{
return _prefetchHigh;
}
public int getPrefetchHigh()
{
return _prefetchHigh;
}
public int getPrefetchLow()
{
return _prefetchLow;
}
public boolean isNoLocal()
{
return _noLocal;
}
/**
* We store the exclusive field in order to be able to reuse it when resubscribing in the event of failover
*/
public boolean isExclusive()
{
AMQDestination dest = this.getDestination();
if (dest.getDestSyntax() == AMQDestination.DestSyntax.ADDR)
{
if (dest.getAddressType() == AMQDestination.TOPIC_TYPE)
{
return true;
}
else
{
return dest.getLink().getSubscription().isExclusive();
}
}
else
{
return _exclusive;
}
}
public boolean isReceiving()
{
return _receiving.get();
}
public MessageFilter getMessageSelectorFilter()
{
return _messageSelectorFilter;
}
public Message receive() throws JMSException
{
return receive(0);
}
public Message receive(long l) throws JMSException
{
checkPreConditions();
try
{
acquireReceiving(false);
}
catch (InterruptedException e)
{
_logger.warn("Interrupted acquire: " + e);
if (isClosed())
{
return null;
}
}
_session.startDispatcherIfNecessary();
try
{
Object o = getMessageFromQueue(l);
_receivingThread = null;
final AbstractJMSMessage m = returnMessageOrThrow(o);
if (m != null)
{
preDeliver(m);
postDeliver(m);
}
return m;
}
catch (InterruptedException e)
{
_logger.warn("Interrupted: " + e);
return null;
}
catch(TransportException e)
{
throw _session.toJMSException("Exception while receiving:" + e.getMessage(), e);
}
finally
{
releaseReceiving();
// clear the interrupted flag - prevents spurious interrupts caused by the consumer being closed from
// another thread racing
Thread.interrupted();
}
}
public Object getMessageFromQueue(long l) throws InterruptedException
{
Object o;
if (l > 0)
{
o = _synchronousQueue.poll(l, TimeUnit.MILLISECONDS);
}
else if (l < 0)
{
o = _synchronousQueue.poll();
}
else
{
o = _synchronousQueue.take();
}
return o;
}
abstract Message receiveBrowse() throws JMSException;
public Message receiveNoWait() throws JMSException
{
checkPreConditions();
try
{
if (!acquireReceiving(true))
{
//If we couldn't acquire the receiving thread then return null.
// This will occur if failing over.
return null;
}
}
catch (InterruptedException e)
{
/*
* This seems slightly shoddy but should never actually be executed
* since we told acquireReceiving to return immediately and it shouldn't
* block on anything.
*/
return null;
}
_session.startDispatcherIfNecessary();
try
{
Object o = getMessageFromQueue(-1);
final AbstractJMSMessage m = returnMessageOrThrow(o);
if (m != null)
{
preDeliver(m);
postDeliver(m);
}
return m;
}
catch (InterruptedException e)
{
_logger.warn("Interrupted: " + e);
return null;
}
catch(TransportException e)
{
throw _session.toJMSException("Exception while receiving:" + e.getMessage(), e);
}
finally
{
releaseReceiving();
}
}
/**
* We can get back either a Message or an exception from the queue. This method examines the argument and deals with
* it by throwing it (if an exception) or returning it (in any other case).
*
* @param o the object to return or throw
* @return a message only if o is a Message
* @throws JMSException if the argument is a throwable. If it is a JMSException it is rethrown as is, but if not a
* JMSException is created with the linked exception set appropriately
*/
private AbstractJMSMessage returnMessageOrThrow(Object o) throws JMSException
{
// errors are passed via the queue too since there is no way of interrupting the poll() via the API.
if (o instanceof Throwable)
{
throw JMSExceptionHelper.chainJMSException(new JMSException(
"Message consumer forcibly closed due to error: " + o), (Throwable) o);
}
else if (o instanceof CloseConsumerMessage)
{
setClosed();
deregisterConsumer();
return null;
}
else
{
return (AbstractJMSMessage) o;
}
}
public void close() throws JMSException
{
close(true);
}
public void close(boolean sendClose) throws JMSException
{
if (_logger.isDebugEnabled())
{
_logger.debug("Closing consumer:" + debugIdentity());
}
if (!setClosed())
{
setClosing(true);
if (sendClose)
{
// The Synchronized block only needs to protect network traffic.
try
{
// If the session is open or we are in the process
// of closing the session then send a cance
// no point otherwise as the connection will be gone
while (!_session.isClosed() || _session.isClosing())
{
if (_session.tryLockMessageDelivery())
{
try
{
synchronized (_connection.getFailoverMutex())
{
sendCancel();
}
break;
}
finally
{
_session.unlockMessageDelivery();
}
}
}
}
catch (QpidException e)
{
throw JMSExceptionHelper.chainJMSException(new JMSException("Error closing consumer: " + e.getMessage()), e);
}
catch (FailoverException e)
{
throw JMSExceptionHelper.chainJMSException(new JMSException(
"FailoverException interrupted basic cancel."), e);
}
catch (TransportException e)
{
throw _session.toJMSException("Exception while closing consumer: " + e.getMessage(), e);
}
}
else
{
// FIXME?
deregisterConsumer();
}
// This will occur if session.close is called closing all consumers we may be blocked waiting for a receive
// so we need to let it know it is time to close.
if ((_messageListener != null) && _receiving.get())
{
if (_logger.isInfoEnabled())
{
_logger.info("Interrupting thread: " + _receivingThread);
}
final Thread receivingThread = _receivingThread;
if(receivingThread != null)
{
receivingThread.interrupt();
}
}
if(!(isBrowseOnly() || getSession().isClosing()))
{
releasePendingMessages();
}
}
}
abstract void sendCancel() throws QpidException, FailoverException;
/**
* Called when you need to invalidate a consumer. Used for example when failover has occurred and the client has
* vetoed automatic resubscription. The caller must hold the failover mutex.
*/
void markClosed()
{
setClosed();
deregisterConsumer();
}
/**
* @param closeMessage
* this message signals that we should close the browser
*/
public void notifyCloseMessage(CloseConsumerMessage closeMessage)
{
if (isMessageListenerSet())
{
// Currently only possible to get this msg type with a browser.
// If we get the message here then we should probably just close
// this consumer.
// Though an AutoClose consumer with message listener is quite odd..
// Just log out the fact so we know where we are
_logger.warn("Using an AutoCloseconsumer with message listener is not supported.");
}
else
{
try
{
_synchronousQueue.put(closeMessage);
}
catch (InterruptedException e)
{
_logger.info(" SynchronousQueue.put interupted. Usually result of connection closing,"
+ "but we shouldn't have close yet");
}
}
}
/**
* Called from the AMQSession when a message has arrived for this consumer. This methods handles both the case of a
* message listener or a synchronous receive() caller.
*
* @param messageFrame the raw unprocessed mesage
*/
void notifyMessage(U messageFrame)
{
if (messageFrame instanceof CloseConsumerMessage)
{
notifyCloseMessage((CloseConsumerMessage) messageFrame);
return;
}
try
{
AbstractJMSMessage jmsMessage = createJMSMessageFromUnprocessedMessage(_session.getMessageDelegateFactory(), messageFrame);
if (_logger.isDebugEnabled())
{
_logger.debug("Message is of type: " + jmsMessage.getClass().getName());
}
notifyMessage(jmsMessage);
}
catch (Exception e)
{
if (e instanceof InterruptedException)
{
_logger.info("SynchronousQueue.put interupted. Usually result of connection closing");
}
else
{
_logger.error("Caught exception (dump follows) - ignoring...", e);
}
}
}
public abstract AbstractJMSMessage createJMSMessageFromUnprocessedMessage(AMQMessageDelegateFactory delegateFactory, U messageFrame)
throws Exception;
/** @param jmsMessage this message has already been processed so can't redo preDeliver */
public void notifyMessage(AbstractJMSMessage jmsMessage)
{
try
{
if (isMessageListenerSet())
{
preDeliver(jmsMessage);
getMessageListener().onMessage(jmsMessage);
postDeliver(jmsMessage);
}
else
{
// we should not be allowed to add a message is the
// consumer is closed
_synchronousQueue.put(jmsMessage);
}
}
catch (Exception e)
{
if (e instanceof InterruptedException)
{
_logger.info("reNotification : SynchronousQueue.put interupted. Usually result of connection closing");
}
else
{
_logger.error("reNotification : Caught exception (dump follows) - ignoring...", e);
}
}
}
protected void preDeliver(AbstractJMSMessage msg)
{
_session.setInRecovery(false);
msg.setAMQSession(_session);
switch (_acknowledgeMode)
{
case Session.PRE_ACKNOWLEDGE:
_session.acknowledgeMessage(msg.getDeliveryTag(), false);
break;
case Session.AUTO_ACKNOWLEDGE:
//fall through
case Session.DUPS_OK_ACKNOWLEDGE:
_session.addUnacknowledgedMessage(msg.getDeliveryTag());
break;
case Session.CLIENT_ACKNOWLEDGE:
_session.addUnacknowledgedMessage(msg.getDeliveryTag());
_session.markDirty();
break;
case Session.SESSION_TRANSACTED:
_session.addDeliveredMessage(msg.getDeliveryTag());
_session.markDirty();
break;
case Session.NO_ACKNOWLEDGE:
//do nothing.
//path used for NO-ACK consumers, and browsers (see constructor).
break;
}
}
void postDeliver(AbstractJMSMessage msg)
{
switch (_acknowledgeMode)
{
case Session.DUPS_OK_ACKNOWLEDGE:
case Session.AUTO_ACKNOWLEDGE:
// we do not auto ack a message if the application code called recover()
if (!_session.isInRecovery())
{
_session.acknowledgeMessage(msg.getDeliveryTag(), false);
}
break;
}
}
void notifyError(Throwable cause)
{
setClosed();
// QPID-293 can "request redelivery of this error through dispatcher"
// we have no way of propagating the exception to a message listener - a JMS limitation - so we
// deal with the case where we have a synchronous receive() waiting for a message to arrive
if (!isMessageListenerSet())
{
// offer only succeeds if there is a thread waiting for an item from the queue
if (_synchronousQueue.offer(cause))
{
_logger.debug("Passed exception to synchronous queue for propagation to receive()");
}
}
deregisterConsumer();
}
/**
* Perform cleanup to deregister this consumer. This occurs when closing the consumer in both the clean case and in
* the case of an error occurring.
*/
private void deregisterConsumer()
{
_session.deregisterConsumer(this);
}
/** The consumer tag allows us to close the consumer by sending a jmsCancel method to the broker */
public String getConsumerTag()
{
return _consumerTag;
}
public void setConsumerTag(String consumerTag)
{
_consumerTag = consumerTag;
}
public AMQSession getSession()
{
return _session;
}
private void checkPreConditions() throws JMSException
{
this.checkNotClosed();
if ((_session == null) || _session.isClosed())
{
throw new javax.jms.IllegalStateException("Invalid Session");
}
}
public boolean isAutoClose()
{
return _autoClose;
}
public boolean isBrowseOnly()
{
return _browseOnly;
}
void releasePendingMessages()
{
if (_synchronousQueue.size() > 0)
{
if (_logger.isDebugEnabled())
{
_logger.debug("Rejecting the messages(" + _synchronousQueue
.size() + ") in _syncQueue (PRQ)" + "for consumer with tag:" + _consumerTag);
}
Iterator iterator = _synchronousQueue.iterator();
int initialSize = _synchronousQueue.size();
boolean removed = false;
while (iterator.hasNext())
{
Object o = iterator.next();
if (o instanceof AbstractJMSMessage)
{
_session.rejectMessage(((AbstractJMSMessage) o), true);
if (_logger.isDebugEnabled())
{
_logger.debug("Rejected message:" + ((AbstractJMSMessage) o).getDeliveryTag());
}
iterator.remove();
removed = true;
}
else
{
_logger.error("Queue contained a :" + o.getClass()
+ " unable to reject as it is not an AbstractJMSMessage. Will be cleared");
iterator.remove();
removed = true;
}
}
if (removed && (initialSize == _synchronousQueue.size()))
{
_logger.error("Queue had content removed but didn't change in size." + initialSize);
}
if (_synchronousQueue.size() != 0)
{
_logger.warn("Queue was not empty after rejecting all messages Remaining:" + _synchronousQueue.size());
releasePendingMessages();
}
clearReceiveQueue();
}
}
public String debugIdentity()
{
return String.valueOf(_consumerTag) + "[" + System.identityHashCode(this) + "]";
}
public void clearReceiveQueue()
{
_synchronousQueue.clear();
}
public List<Long> drainReceiverQueueAndRetrieveDeliveryTags()
{
Iterator<AbstractJMSMessage> iterator = _synchronousQueue.iterator();
List<Long> tags = new ArrayList<Long>(_synchronousQueue.size());
while (iterator.hasNext())
{
AbstractJMSMessage msg = iterator.next();
tags.add(msg.getDeliveryTag());
iterator.remove();
}
return tags;
}
public String getQueuename()
{
return _queuename;
}
public void setQueuename(String queuename)
{
this._queuename = queuename;
}
public void addBindingKey(AMQDestination amqd, String routingKey) throws QpidException
{
_session.addBindingKey(this,amqd,routingKey);
}
/** to be called when a failover has occured */
public void failedOverPre()
{
clearReceiveQueue();
}
public void failedOverPost() {}
/** The connection being used by this consumer */
protected AMQConnection getConnection()
{
return _connection;
}
protected void setDestination(AMQDestination destination)
{
_destination = destination;
}
/** We need to know the channel id when constructing frames */
protected int getChannelId()
{
return _channelId;
}
/**
* Used in the blocking receive methods to receive a message from the Session thread.
* <p>
* Or to notify of errors.
* <p>
* Argument true indicates we want strict FIFO semantics
*/
protected BlockingQueue getSynchronousQueue()
{
return _synchronousQueue;
}
protected MessageFactoryRegistry getMessageFactory()
{
return _messageFactory;
}
protected boolean isDurableSubscriber()
{
return _isDurableSubscriber;
}
protected void markAsDurableSubscriber()
{
_isDurableSubscriber = true;
}
void setAddressType(final int addressType)
{
_addressType = addressType;
}
int getAddressType()
{
return _addressType;
}
}