blob: f733e6bbcaf42f30b4232e540c7812fe3fe39539 [file] [log] [blame]
/*
*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*
*/
package org.apache.qpid.client;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.qpid.AMQException;
import org.apache.qpid.client.failover.FailoverException;
import org.apache.qpid.client.message.AMQMessageDelegateFactory;
import org.apache.qpid.client.message.AbstractJMSMessage;
import org.apache.qpid.client.message.MessageFactoryRegistry;
import org.apache.qpid.client.message.UnprocessedMessage_0_8;
import org.apache.qpid.common.AMQPFilterTypes;
import org.apache.qpid.configuration.ClientProperties;
import org.apache.qpid.framing.AMQFrame;
import org.apache.qpid.framing.AMQShortString;
import org.apache.qpid.framing.BasicCancelBody;
import org.apache.qpid.framing.BasicCancelOkBody;
import org.apache.qpid.framing.FieldTable;
import org.apache.qpid.jms.ConnectionURL;
import javax.jms.JMSException;
import javax.jms.Message;
public class BasicMessageConsumer_0_8 extends BasicMessageConsumer<UnprocessedMessage_0_8>
{
private final Logger _logger = LoggerFactory.getLogger(getClass());
private AMQSession_0_8.DestinationCache<AMQTopic> _topicDestinationCache;
private AMQSession_0_8.DestinationCache<AMQQueue> _queueDestinationCache;
private final RejectBehaviour _rejectBehaviour;
protected BasicMessageConsumer_0_8(int channelId, AMQConnection connection, AMQDestination destination,
String messageSelector, boolean noLocal, MessageFactoryRegistry messageFactory, AMQSession_0_8 session,
FieldTable rawSelector, int prefetchHigh, int prefetchLow, boolean exclusive,
int acknowledgeMode, boolean browseOnly, boolean autoClose) throws JMSException
{
super(channelId, connection, destination,messageSelector,noLocal,messageFactory,session,
rawSelector, prefetchHigh, prefetchLow, exclusive, acknowledgeMode,
browseOnly, autoClose);
final FieldTable consumerArguments = getArguments();
if (isAutoClose())
{
consumerArguments.put(AMQPFilterTypes.AUTO_CLOSE.getValue(), Boolean.TRUE);
}
if (isBrowseOnly())
{
consumerArguments.put(AMQPFilterTypes.NO_CONSUME.getValue(), Boolean.TRUE);
}
_topicDestinationCache = session.getTopicDestinationCache();
_queueDestinationCache = session.getQueueDestinationCache();
if (destination.getRejectBehaviour() != null)
{
_rejectBehaviour = destination.getRejectBehaviour();
}
else
{
ConnectionURL connectionURL = connection.getConnectionURL();
String rejectBehaviour = connectionURL.getOption(ConnectionURL.OPTIONS_REJECT_BEHAVIOUR);
if (rejectBehaviour != null)
{
_rejectBehaviour = RejectBehaviour.valueOf(rejectBehaviour.toUpperCase());
}
else
{
// use the default value for all connections, if not set
rejectBehaviour = System.getProperty(ClientProperties.REJECT_BEHAVIOUR_PROP_NAME, RejectBehaviour.NORMAL.toString());
_rejectBehaviour = RejectBehaviour.valueOf( rejectBehaviour.toUpperCase());
}
}
}
@Override
public AMQSession_0_8 getSession()
{
return (AMQSession_0_8) super.getSession();
}
void sendCancel() throws AMQException, FailoverException
{
BasicCancelBody body = getSession().getMethodRegistry().createBasicCancelBody(new AMQShortString(String.valueOf(getConsumerTag())), false);
final AMQFrame cancelFrame = body.generateFrame(getChannelId());
getConnection().getProtocolHandler().syncWrite(cancelFrame, BasicCancelOkBody.class);
if (_logger.isDebugEnabled())
{
_logger.debug("CancelOk'd for consumer:" + debugIdentity());
}
}
public AbstractJMSMessage createJMSMessageFromUnprocessedMessage(AMQMessageDelegateFactory delegateFactory, UnprocessedMessage_0_8 messageFrame)throws Exception
{
return getMessageFactory().createMessage(messageFrame.getDeliveryTag(),
messageFrame.isRedelivered(), messageFrame.getExchange(),
messageFrame.getRoutingKey(), messageFrame.getContentHeader(), messageFrame.getBodies(),
_queueDestinationCache, _topicDestinationCache);
}
Message receiveBrowse() throws JMSException
{
return receive();
}
public RejectBehaviour getRejectBehaviour()
{
return _rejectBehaviour;
}
}