blob: c3bc9b2d9981bd0510a1063ef632db1a7cabc102 [file] [log] [blame]
/*
*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*
*/
package org.apache.qpid.client;
import java.util.Map;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.Session;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.qpid.QpidException;
import org.apache.qpid.client.failover.FailoverException;
import org.apache.qpid.client.message.AMQMessageDelegateFactory;
import org.apache.qpid.client.message.AbstractJMSMessage;
import org.apache.qpid.client.message.MessageFactoryRegistry;
import org.apache.qpid.client.message.UnprocessedMessage_0_8;
import org.apache.qpid.client.util.JMSExceptionHelper;
import org.apache.qpid.common.AMQPFilterTypes;
import org.apache.qpid.configuration.ClientProperties;
import org.apache.qpid.framing.AMQFrame;
import org.apache.qpid.framing.AMQShortString;
import org.apache.qpid.framing.BasicCancelBody;
import org.apache.qpid.framing.BasicCancelOkBody;
import org.apache.qpid.jms.ConnectionURL;
public class BasicMessageConsumer_0_8 extends BasicMessageConsumer<UnprocessedMessage_0_8>
{
private final Logger _logger = LoggerFactory.getLogger(getClass());
private AMQSession_0_8.DestinationCache<AMQTopic> _topicDestinationCache;
private AMQSession_0_8.DestinationCache<AMQQueue> _queueDestinationCache;
private final RejectBehaviour _rejectBehaviour;
protected BasicMessageConsumer_0_8(int channelId, AMQConnection connection, AMQDestination destination,
String messageSelector, boolean noLocal, MessageFactoryRegistry messageFactory, AMQSession_0_8 session,
Map<String,Object> rawSelector, int prefetchHigh, int prefetchLow, boolean exclusive,
int acknowledgeMode, boolean browseOnly, boolean autoClose) throws JMSException
{
super(channelId, connection, destination,messageSelector,noLocal,messageFactory,session,
rawSelector, prefetchHigh, prefetchLow, exclusive, acknowledgeMode,
browseOnly, autoClose);
final Map<String,Object> consumerArguments = getArguments();
if (isAutoClose())
{
consumerArguments.put(AMQPFilterTypes.AUTO_CLOSE.getValue(), Boolean.TRUE);
}
if (isBrowseOnly())
{
consumerArguments.put(AMQPFilterTypes.NO_CONSUME.getValue(), Boolean.TRUE);
}
_topicDestinationCache = session.getTopicDestinationCache();
_queueDestinationCache = session.getQueueDestinationCache();
// This is due to the Destination carrying the temporary subscription name which is incorrect.
if (session.isResolved(destination) && AMQDestination.TOPIC_TYPE == destination.getAddressType())
{
boolean namedQueue = destination.getLink() != null && destination.getLink().getName() != null ;
if (!namedQueue)
{
setDestination(destination.copyDestination());
getDestination().setQueueName(null);
}
}
if (destination.getRejectBehaviour() != null)
{
_rejectBehaviour = destination.getRejectBehaviour();
}
else
{
ConnectionURL connectionURL = connection.getConnectionURL();
String rejectBehaviour = connectionURL.getOption(ConnectionURL.OPTIONS_REJECT_BEHAVIOUR);
if (rejectBehaviour != null)
{
_rejectBehaviour = RejectBehaviour.valueOf(rejectBehaviour.toUpperCase());
}
else
{
// use the default value for all connections, if not set
rejectBehaviour = System.getProperty(ClientProperties.REJECT_BEHAVIOUR_PROP_NAME, RejectBehaviour.NORMAL.toString());
_rejectBehaviour = RejectBehaviour.valueOf( rejectBehaviour.toUpperCase());
}
}
}
@Override
public AMQSession_0_8 getSession()
{
return (AMQSession_0_8) super.getSession();
}
void sendCancel() throws QpidException, FailoverException
{
BasicCancelBody body = getSession().getMethodRegistry().createBasicCancelBody(new AMQShortString(getConsumerTag()), false);
final AMQFrame cancelFrame = body.generateFrame(getChannelId());
getConnection().getProtocolHandler().syncWrite(cancelFrame, BasicCancelOkBody.class);
postSubscription();
getSession().sync();
if (_logger.isDebugEnabled())
{
_logger.debug("CancelOk'd for consumer:" + debugIdentity());
}
}
void postSubscription() throws QpidException
{
AMQDestination dest = this.getDestination();
if (dest != null && dest.getDestSyntax() == AMQDestination.DestSyntax.ADDR)
{
if (dest.getDelete() == AMQDestination.AddressOption.ALWAYS ||
dest.getDelete() == AMQDestination.AddressOption.RECEIVER )
{
getSession().handleNodeDelete(dest);
}
// Subscription queue is handled as part of linkDelete method.
getSession().handleLinkDelete(dest);
if (!isDurableSubscriber())
{
getSession().deleteSubscriptionQueue(dest);
}
}
}
public AbstractJMSMessage createJMSMessageFromUnprocessedMessage(AMQMessageDelegateFactory delegateFactory, UnprocessedMessage_0_8 messageFrame)throws Exception
{
return getMessageFactory().createMessage(messageFrame.getDeliveryTag(),
messageFrame.isRedelivered(),
messageFrame.getExchange() == null ? "" : AMQShortString.toString(
messageFrame.getExchange()),
AMQShortString.toString(messageFrame.getRoutingKey()),
messageFrame.getContentHeader(),
messageFrame.getBodies(),
_queueDestinationCache,
_topicDestinationCache,
getAddressType());
}
Message receiveBrowse() throws JMSException
{
return receive();
}
public RejectBehaviour getRejectBehaviour()
{
return _rejectBehaviour;
}
@Override
public Message receive(final long l) throws JMSException
{
boolean manageCredit = getSession().isManagingCredit();
boolean creditModified = false;
try
{
if (manageCredit)
{
creditModified = getSession().ensureCreditForReceive();
}
Message message = super.receive(l);
if (creditModified && message == null)
{
getSession().reduceCreditToOriginalSize();
}
if (manageCredit && !(getSession().getAcknowledgeMode() == Session.AUTO_ACKNOWLEDGE
|| getSession().getAcknowledgeMode() == Session.DUPS_OK_ACKNOWLEDGE) && message != null)
{
getSession().incUnacknowledgedMessages();
}
return message;
}
catch (QpidException e)
{
throw JMSExceptionHelper.chainJMSException(new JMSException("BasicMessageConsumer.receive failed."), e);
}
}
@Override
public Message receiveNoWait() throws JMSException
{
boolean manageCredit = getSession().isManagingCredit();
boolean creditModified = false;
try
{
if (manageCredit)
{
creditModified = getSession().ensureCreditForReceive();
if (creditModified)
{
getSession().sync();
}
}
Message message = super.receiveNoWait();
if (creditModified && message == null)
{
getSession().reduceCreditToOriginalSize();
}
if (manageCredit && !(getSession().getAcknowledgeMode() == Session.AUTO_ACKNOWLEDGE
|| getSession().getAcknowledgeMode() == Session.DUPS_OK_ACKNOWLEDGE) && message != null)
{
getSession().incUnacknowledgedMessages();
}
return message;
}
catch (QpidException e)
{
throw JMSExceptionHelper.chainJMSException(new JMSException("BasicMessageConsumer.receiveNoWait failed"),
e);
}
}
@Override
void postDeliver(AbstractJMSMessage msg)
{
getSession().stopFlowIfNeccessary();
super.postDeliver(msg);
}
}