blob: 8cd8aece3fd36f7ef27ba89778053a81520aa451 [file] [log] [blame]
/* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.qpid.client;
import java.util.Iterator;
import java.util.Map;
import java.util.concurrent.atomic.AtomicBoolean;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageListener;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.qpid.QpidException;
import org.apache.qpid.AMQException;
import org.apache.qpid.client.AMQDestination.AddressOption;
import org.apache.qpid.client.message.AMQMessageDelegateFactory;
import org.apache.qpid.client.message.AMQMessageDelegate_0_10;
import org.apache.qpid.client.message.AbstractJMSMessage;
import org.apache.qpid.client.message.MessageFactoryRegistry;
import org.apache.qpid.client.message.UnprocessedMessage_0_10;
import org.apache.qpid.client.util.JMSExceptionHelper;
import org.apache.qpid.common.ServerPropertyNames;
import org.apache.qpid.jms.Session;
import org.apache.qpid.protocol.ErrorCodes;
import org.apache.qpid.transport.Acquired;
import org.apache.qpid.transport.MessageCreditUnit;
import org.apache.qpid.transport.Option;
import org.apache.qpid.transport.Range;
import org.apache.qpid.transport.RangeSet;
import org.apache.qpid.transport.RangeSetFactory;
import org.apache.qpid.transport.SessionException;
import org.apache.qpid.transport.TransportException;
/**
* This is a 0.10 message consumer.
*/
public class BasicMessageConsumer_0_10 extends BasicMessageConsumer<UnprocessedMessage_0_10>
{
/**
* This class logger
*/
private final Logger _logger = LoggerFactory.getLogger(getClass());
/**
* The underlying QpidSession
*/
private AMQSession_0_10 _0_10session;
/**
* Indicates whether this consumer receives pre-acquired messages
*/
private final boolean _preAcquire;
/**
* Specify whether this consumer is performing a sync receive
*/
private final AtomicBoolean _syncReceive = new AtomicBoolean(false);
private final long _capacity;
/** Flag indicating if the server supports message selectors */
private final boolean _serverJmsSelectorSupport;
protected BasicMessageConsumer_0_10(int channelId, AMQConnection connection, AMQDestination destination,
String messageSelector, boolean noLocal, MessageFactoryRegistry messageFactory,
AMQSession<?,?> session, Map<String,Object> rawSelector,
int prefetchHigh, int prefetchLow, boolean exclusive,
int acknowledgeMode, boolean browseOnly, boolean autoClose)
throws JMSException
{
super(channelId, connection, destination, messageSelector, noLocal, messageFactory, session, rawSelector,
prefetchHigh, prefetchLow, exclusive, acknowledgeMode, browseOnly, autoClose);
_0_10session = (AMQSession_0_10) session;
_serverJmsSelectorSupport = connection.isSupportedServerFeature(ServerPropertyNames.FEATURE_QPID_JMS_SELECTOR);
_preAcquire = evaluatePreAcquire(browseOnly, destination, _serverJmsSelectorSupport);
_capacity = evaluateCapacity(destination);
// This is due to the Destination carrying the temporary subscription name which is incorrect.
if (_0_10session.isResolved(destination) && AMQDestination.TOPIC_TYPE == destination.getAddressType())
{
boolean namedQueue = destination.getLink() != null && destination.getLink().getName() != null ;
if (!namedQueue)
{
setDestination(destination.copyDestination());
getDestination().setQueueName(null);
}
}
}
/**
*
* This is invoked by the session thread when emptying the session message queue.
* We first check if the message is valid (match the selector) and then deliver it to the
* message listener or to the sync consumer queue.
*
* @param jmsMessage this message has already been processed so can't redo preDeliver
*/
@Override public void notifyMessage(AbstractJMSMessage jmsMessage)
{
try
{
if (checkPreConditions(jmsMessage))
{
if (isMessageListenerSet() && _capacity == 0)
{
messageFlow();
}
_logger.debug("messageOk, trying to notify");
super.notifyMessage(jmsMessage);
}
else
{
// if we are synchronously waiting for a message
// and messages are not pre-fetched we then need to request another one
if(_capacity == 0)
{
messageFlow();
}
}
}
catch (QpidException e)
{
_logger.error("Received an Exception when receiving message", e);
getSession().getAMQConnection().closed(e);
}
}
/**
* This method is invoked when this consumer is stopped.
* It tells the broker to stop delivering messages to this consumer.
*/
@Override void sendCancel() throws QpidException
{
_0_10session.getQpidSession().messageCancel(getConsumerTag());
postSubscription();
try
{
_0_10session.getQpidSession().sync();
getSession().confirmConsumerCancelled(getConsumerTag()); // confirm cancel
}
catch (SessionException se)
{
_0_10session.setCurrentException(se);
}
QpidException amqe = _0_10session.getCurrentException();
if (amqe != null)
{
throw amqe;
}
}
@Override void notifyMessage(UnprocessedMessage_0_10 messageFrame)
{
super.notifyMessage(messageFrame);
}
@Override
protected void preDeliver(AbstractJMSMessage jmsMsg)
{
super.preDeliver(jmsMsg);
if (getAcknowledgeMode() == org.apache.qpid.jms.Session.NO_ACKNOWLEDGE)
{
//For 0-10 we need to ensure that all messages are indicated processed in some way to
//ensure their AMQP command-id is marked completed, and so we must send a completion
//even for no-ack messages even though there isnt actually an 'acknowledgement' occurring.
//Add message to the unacked message list to ensure we dont lose record of it before
//sending a completion of some sort.
getSession().addUnacknowledgedMessage(jmsMsg.getDeliveryTag());
}
}
@Override public AbstractJMSMessage createJMSMessageFromUnprocessedMessage(
AMQMessageDelegateFactory delegateFactory, UnprocessedMessage_0_10 msg) throws Exception
{
AMQMessageDelegate_0_10.updateExchangeTypeMapping(msg.getMessageTransfer().getHeader(), ((AMQSession_0_10)getSession()).getQpidSession());
return getMessageFactory().createMessage(msg.getMessageTransfer());
}
/**
* Check whether a message can be delivered to this consumer.
*
* @param message The message to be checked.
* @return true if the message matches the selector and can be acquired, false otherwise.
* @throws QpidException If the message preConditions cannot be checked due to some internal error.
*/
private boolean checkPreConditions(AbstractJMSMessage message) throws QpidException
{
boolean messageOk = true;
try
{
if (!_serverJmsSelectorSupport && getMessageSelectorFilter() != null)
{
messageOk = getMessageSelectorFilter().matches(message);
}
}
catch (Exception e)
{
throw new AMQException(ErrorCodes.INTERNAL_ERROR, "Error when evaluating message selector", e);
}
if (_logger.isDebugEnabled())
{
_logger.debug("messageOk " + messageOk);
_logger.debug("_preAcquire " + _preAcquire);
}
if (!messageOk)
{
if (_preAcquire)
{
// this is the case for topics
// We need to ack this message
if (_logger.isDebugEnabled())
{
_logger.debug("filterMessage - trying to ack message");
}
acknowledgeMessage(message);
}
else
{
if (_logger.isDebugEnabled())
{
_logger.debug("filterMessage - not ack'ing message as not acquired");
}
flushUnwantedMessage(message);
}
}
else if (!_preAcquire && !isBrowseOnly())
{
// now we need to acquire this message if needed
// this is the case of queue with a message selector set
if (_logger.isDebugEnabled())
{
_logger.debug("filterMessage - trying to acquire message");
}
messageOk = acquireMessage(message);
_logger.debug("filterMessage - message acquire status : " + messageOk);
}
return messageOk;
}
/**
* Acknowledge a message
*
* @param message The message to be acknowledged
* @throws QpidException If the message cannot be acquired due to some internal error.
*/
private void acknowledgeMessage(final AbstractJMSMessage message) throws QpidException
{
_0_10session.messageAcknowledge
(Range.newInstance((int) message.getDeliveryTag()),
getAcknowledgeMode() != org.apache.qpid.jms.Session.NO_ACKNOWLEDGE);
final QpidException amqe = _0_10session.getCurrentException();
if (amqe != null)
{
throw amqe;
}
}
/**
* Flush an unwanted message. For 0-10 we need to ensure that all messages are indicated
* processed to ensure their AMQP command-id is marked completed.
*
* @param message The unwanted message to be flushed
* @throws QpidException If the unwanted message cannot be flushed due to some internal error.
*/
private void flushUnwantedMessage(final AbstractJMSMessage message) throws QpidException
{
_0_10session.flushProcessed(Range.newInstance((int) message.getDeliveryTag()),false);
final QpidException amqe = _0_10session.getCurrentException();
if (amqe != null)
{
throw amqe;
}
}
/**
* Acquire a message
*
* @param message The message to be acquired
* @return true if the message has been acquired, false otherwise.
* @throws QpidException If the message cannot be acquired due to some internal error.
*/
private boolean acquireMessage(final AbstractJMSMessage message) throws QpidException
{
boolean result = false;
final Acquired acq = _0_10session.getQpidSession().messageAcquire(Range.newInstance((int)message.getDeliveryTag())).get();
final RangeSet acquired = acq.getTransfers();
if (acquired != null && acquired.size() > 0)
{
result = true;
}
return result;
}
private void messageFlow()
{
_0_10session.getQpidSession().messageFlow(getConsumerTag(),
MessageCreditUnit.MESSAGE, 1,
Option.UNRELIABLE);
}
public void setMessageListener(final MessageListener messageListener) throws JMSException
{
super.setMessageListener(messageListener);
try
{
if (messageListener != null && _capacity == 0)
{
messageFlow();
}
if (messageListener != null && !getSynchronousQueue().isEmpty())
{
Iterator messages= getSynchronousQueue().iterator();
while (messages.hasNext())
{
AbstractJMSMessage message=(AbstractJMSMessage) messages.next();
messages.remove();
getSession().rejectMessage(message, true);
}
}
}
catch(TransportException e)
{
throw getSession().toJMSException("Exception while setting message listener:" + e.getMessage(), e);
}
}
public void failedOverPost()
{
if (_0_10session.isStarted() && _syncReceive.get())
{
messageFlow();
}
}
/**
* When messages are not prefetched we need to request a message from the
* broker.
* Note that if the timeout is too short a message may be queued in _synchronousQueue until
* this consumer closes or request it.
* @param l
* @return
* @throws InterruptedException
*/
public Object getMessageFromQueue(long l) throws InterruptedException
{
if (_capacity == 0)
{
_syncReceive.set(true);
}
if (_0_10session.isStarted() && isMessageListenerSet() && _capacity == 0 && getSynchronousQueue().isEmpty())
{
messageFlow();
}
Object o = super.getMessageFromQueue(l);
if (o == null && _0_10session.isStarted())
{
_0_10session.getQpidSession().messageFlush
(getConsumerTag(), Option.UNRELIABLE, Option.SYNC);
_0_10session.getQpidSession().messageFlow
(getConsumerTag(), MessageCreditUnit.BYTE,
0xFFFFFFFF, Option.UNRELIABLE);
if (_capacity > 0)
{
_0_10session.getQpidSession().messageFlow
(getConsumerTag(),
MessageCreditUnit.MESSAGE,
_capacity,
Option.UNRELIABLE);
}
_0_10session.getQpidSession().sync();
_0_10session.syncDispatchQueue(false);
o = super.getMessageFromQueue(-1);
}
if (_capacity == 0)
{
_syncReceive.set(false);
}
return o;
}
void postDeliver(AbstractJMSMessage msg)
{
super.postDeliver(msg);
switch (getAcknowledgeMode())
{
case Session.SESSION_TRANSACTED:
_0_10session.sendTxCompletionsIfNecessary();
break;
case Session.NO_ACKNOWLEDGE:
if (!getSession().isInRecovery())
{
getSession().acknowledgeMessage(msg.getDeliveryTag(), false);
}
break;
case Session.AUTO_ACKNOWLEDGE:
if (!getSession().isInRecovery() && getSession().getAMQConnection().getSyncAck())
{
((AMQSession_0_10) getSession()).getQpidSession().sync();
}
break;
}
}
Message receiveBrowse() throws JMSException
{
return receiveNoWait();
}
@Override void releasePendingMessages()
{
if (getSynchronousQueue().size() > 0)
{
RangeSet ranges = RangeSetFactory.createRangeSet();
Iterator iterator = getSynchronousQueue().iterator();
while (iterator.hasNext())
{
Object o = iterator.next();
if (o instanceof AbstractJMSMessage)
{
ranges.add((int) ((AbstractJMSMessage) o).getDeliveryTag());
iterator.remove();
}
else
{
_logger.error("Queue contained a :" + o.getClass()
+ " unable to reject as it is not an AbstractJMSMessage. Will be cleared");
iterator.remove();
}
}
_0_10session.flushProcessed(ranges, false);
_0_10session.getQpidSession().messageRelease(ranges);
clearReceiveQueue();
}
}
void postSubscription() throws QpidException
{
AMQDestination dest = this.getDestination();
if (dest != null && dest.getDestSyntax() == AMQDestination.DestSyntax.ADDR)
{
if (dest.getDelete() == AddressOption.ALWAYS ||
dest.getDelete() == AddressOption.RECEIVER )
{
getSession().handleNodeDelete(dest);
}
// Subscription queue is handled as part of linkDelete method.
getSession().handleLinkDelete(dest);
if (!isDurableSubscriber())
{
((AMQSession_0_10) getSession()).deleteSubscriptionQueue(dest);
}
}
}
long getCapacity()
{
return _capacity;
}
boolean isPreAcquire()
{
return _preAcquire;
}
private boolean evaluatePreAcquire(boolean browseOnly, AMQDestination destination, boolean serverJmsSelectorSupport)
{
boolean preAcquire;
if (browseOnly)
{
preAcquire = false;
}
else
{
boolean isQueue = (destination instanceof AMQQueue || getDestination().getAddressType() == AMQDestination.QUEUE_TYPE);
if (!serverJmsSelectorSupport && isQueue && getMessageSelectorFilter() != null)
{
preAcquire = false;
}
else
{
preAcquire = true;
}
}
return preAcquire;
}
private long evaluateCapacity(AMQDestination destination)
{
long capacity = 0;
if (destination.getLink() != null && destination.getLink().getConsumerCapacity() >= 0)
{
capacity = destination.getLink().getConsumerCapacity();
}
else if (getSession().prefetch())
{
capacity = getSession().getPrefetch();
}
return capacity;
}
@Override
public Message receive(final long l) throws JMSException
{
long capacity = getCapacity();
try
{
AMQSession_0_10 session = (AMQSession_0_10) getSession();
if (capacity == 0 && getMessageListener() == null)
{
session.getQpidSession().messageFlow(getConsumerTag(),
MessageCreditUnit.MESSAGE, 1,
Option.UNRELIABLE);
session.sync();
}
Message message = super.receive(l);
if (message == null && capacity == 0 && getMessageListener() == null)
{
session.getQpidSession().messageFlow(getConsumerTag(),
MessageCreditUnit.MESSAGE, 0,
Option.UNRELIABLE);
session.sync();
message = super.receiveNoWait();
}
return message;
}
catch (QpidException e)
{
throw JMSExceptionHelper.chainJMSException(new JMSException("BasicMessageConsumer.receive failed"), e);
}
}
@Override
public Message receiveNoWait() throws JMSException
{
long capacity = getCapacity();
try
{
AMQSession_0_10 session = (AMQSession_0_10) getSession();
if (capacity == 0 && getMessageListener() == null)
{
session.getQpidSession().messageFlow(getConsumerTag(),
MessageCreditUnit.MESSAGE, 1,
Option.UNRELIABLE);
session.sync();
}
Message message = super.receiveNoWait();
if (message == null && capacity == 0 && getMessageListener() == null)
{
session.getQpidSession().messageFlow(getConsumerTag(),
MessageCreditUnit.MESSAGE, 0,
Option.UNRELIABLE);
session.sync();
message = super.receiveNoWait();
}
return message;
}
catch (QpidException e)
{
throw JMSExceptionHelper.chainJMSException(new JMSException("BasicMessageConsumer.receiveNoWait failed."),
e);
}
}
}