blob: 695a69505ddccf79319d425e0931b60636379c94 [file] [log] [blame]
/* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.qpid.client;
import static org.apache.qpid.transport.Option.BATCH;
import static org.apache.qpid.transport.Option.NONE;
import static org.apache.qpid.transport.Option.SYNC;
import static org.apache.qpid.transport.Option.UNRELIABLE;
import java.lang.ref.WeakReference;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.UUID;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import javax.jms.Destination;
import javax.jms.JMSException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.qpid.QpidException;
import org.apache.qpid.AMQException;
import org.apache.qpid.client.AMQDestination.Binding;
import org.apache.qpid.client.AMQDestination.DestSyntax;
import org.apache.qpid.client.failover.FailoverException;
import org.apache.qpid.client.failover.FailoverNoopSupport;
import org.apache.qpid.client.failover.FailoverProtectedOperation;
import org.apache.qpid.client.message.AMQMessageDelegateFactory;
import org.apache.qpid.client.message.UnprocessedMessage_0_10;
import org.apache.qpid.client.messaging.address.AddressHelper;
import org.apache.qpid.client.messaging.address.Link;
import org.apache.qpid.client.messaging.address.Link.SubscriptionQueue;
import org.apache.qpid.client.messaging.address.Node;
import org.apache.qpid.common.AMQPFilterTypes;
import org.apache.qpid.protocol.ErrorCodes;
import org.apache.qpid.transport.*;
import org.apache.qpid.util.Serial;
import org.apache.qpid.util.Strings;
/**
* This is a 0.10 Session
*/
public class AMQSession_0_10 extends AMQSession<BasicMessageConsumer_0_10, BasicMessageProducer_0_10>
implements SessionListener
{
/**
* This class logger
*/
private static final Logger _logger = LoggerFactory.getLogger(AMQSession_0_10.class);
private final String _name;
private static class Flusher implements Runnable
{
private WeakReference<AMQSession_0_10> session;
private ScheduledFuture<?> _future;
public Flusher(AMQSession_0_10 session)
{
this.session = new WeakReference<AMQSession_0_10>(session);
}
public void setFuture(final ScheduledFuture<?> future)
{
_future = future;
}
public void run()
{
AMQSession_0_10 ssn = session.get();
if (ssn == null)
{
if(_future != null)
{
_future.cancel(false);
}
}
else
{
try
{
ssn.flushAcknowledgments(true);
}
catch (Exception t)
{
_logger.error("error flushing acks", t);
}
}
}
}
/**
* The underlying QpidSession
*/
private Session _qpidSession;
/**
* The latest qpid Exception that has been raised.
*/
private Object _currentExceptionLock = new Object();
private QpidException _currentException;
// a ref on the qpid connection
private org.apache.qpid.transport.Connection _qpidConnection;
private long maxAckDelay = Long.getLong("qpid.session.max_ack_delay", 1000);
private ScheduledFuture<?> _flushTaskFuture = null;
private RangeSet unacked = RangeSetFactory.createRangeSet();
private int unackedCount = 0;
/**
* Used to store the range of in tx messages
*/
private final RangeSet _txRangeSet = RangeSetFactory.createRangeSet();
private int _txSize = 0;
private boolean _isHardError = Boolean.getBoolean("qpid.session.legacy_exception_behaviour");
//--- constructors
/**
* Creates a new session on a connection.
*
* @param con The connection on which to create the session.
* @param channelId The unique identifier for the session.
* @param transacted Indicates whether or not the session is transactional.
* @param acknowledgeMode The acknowledgement mode for the session.
* @param defaultPrefetchHighMark The maximum number of messages to prefetched before suspending the session.
* @param defaultPrefetchLowMark The number of prefetched messages at which to resume the session.
* @param qpidConnection The qpid connection
*/
AMQSession_0_10(org.apache.qpid.transport.Connection qpidConnection, AMQConnection con, int channelId,
boolean transacted, int acknowledgeMode, int defaultPrefetchHighMark, int defaultPrefetchLowMark,String name)
{
super(con, channelId, transacted, acknowledgeMode, defaultPrefetchHighMark,
defaultPrefetchLowMark);
_qpidConnection = qpidConnection;
_name = name;
_qpidSession = createSession();
if (maxAckDelay > 0)
{
Flusher flusher = new Flusher(this);
_flushTaskFuture = con.scheduleTask(flusher, 0, maxAckDelay, TimeUnit.MILLISECONDS);
flusher.setFuture(_flushTaskFuture);
}
}
protected Session createSession()
{
Session qpidSession;
if (_name == null)
{
qpidSession = _qpidConnection.createSession(1);
}
else
{
qpidSession = _qpidConnection.createSession(_name,1);
}
if (isTransacted())
{
qpidSession.txSelect();
qpidSession.setTransacted(true);
}
qpidSession.setSessionListener(this);
return qpidSession;
}
private void addUnacked(int id)
{
synchronized (unacked)
{
unacked.add(id);
unackedCount++;
}
}
private void clearUnacked()
{
synchronized (unacked)
{
unacked.clear();
unackedCount = 0;
}
}
protected Connection getQpidConnection()
{
return _qpidConnection;
}
//------- overwritten methods of class AMQSession
void failoverPrep()
{
syncDispatchQueue(true);
clearUnacked();
}
/**
* Acknowledge one or many messages.
*
* @param deliveryTag The tag of the last message to be acknowledged.
* @param multiple <tt>true</tt> to acknowledge all messages up to and including the one specified by the
* delivery tag, <tt>false</tt> to just acknowledge that message.
*/
public void acknowledgeMessage(long deliveryTag, boolean multiple)
{
if (_logger.isDebugEnabled())
{
_logger.debug("Sending ack for delivery tag " + deliveryTag + " on session " + getChannelId());
}
// acknowledge this message
if (multiple)
{
for (Long messageTag : getUnacknowledgedMessageTags())
{
if( messageTag <= deliveryTag )
{
addUnacked(messageTag.intValue());
getUnacknowledgedMessageTags().remove(messageTag);
}
}
//empty the list of unack messages
}
else
{
addUnacked((int) deliveryTag);
getUnacknowledgedMessageTags().remove(deliveryTag);
}
long prefetch = getAMQConnection().getMaxPrefetch();
if (unackedCount >= prefetch/2 || maxAckDelay <= 0 || getAcknowledgeMode() == javax.jms.Session.AUTO_ACKNOWLEDGE)
{
flushAcknowledgments();
}
}
protected void flushAcknowledgments()
{
flushAcknowledgments(false);
}
void flushAcknowledgments(boolean setSyncBit)
{
synchronized (unacked)
{
if (unackedCount > 0)
{
messageAcknowledge
(unacked, getAcknowledgeMode() != org.apache.qpid.jms.Session.NO_ACKNOWLEDGE,setSyncBit);
clearUnacked();
}
}
}
void messageAcknowledge(final RangeSet ranges, final boolean accept)
{
messageAcknowledge(ranges,accept,false);
}
void messageAcknowledge(final RangeSet ranges, final boolean accept, final boolean setSyncBit)
{
final Session ssn = getQpidSession();
flushProcessed(ranges,accept);
if (accept)
{
ssn.messageAccept(ranges, UNRELIABLE, setSyncBit ? SYNC : NONE);
}
}
/**
* Flush any outstanding commands. This causes session complete to be sent.
* @param ranges the range of command ids.
* @param batch true if batched.
*/
void flushProcessed(final RangeSet ranges, final boolean batch)
{
final Session ssn = getQpidSession();
for (final Range range : ranges)
{
ssn.processed(range);
}
ssn.flushProcessed(batch ? BATCH : NONE);
}
/**
* Bind a queue with an exchange.
*
* @param queueName Specifies the name of the queue to bind. If the queue name is empty,
* refers to the current
* queue for the session, which is the last declared queue.
* @param exchangeName The exchange name.
* @param routingKey Specifies the routing key for the binding.
* @param arguments 0_8 specific
*/
@Override
public void sendQueueBind(final String queueName, final String routingKey,
final Map<String,Object> arguments, final String exchangeName,
final AMQDestination destination, final boolean nowait)
throws QpidException
{
if (destination == null || destination.getDestSyntax() == DestSyntax.BURL)
{
if(destination != null)
{
for (String rk: destination.getBindingKeys())
{
doSendQueueBind(queueName, exchangeName, arguments, rk);
}
if(!Arrays.asList(destination.getBindingKeys()).contains(routingKey))
{
doSendQueueBind(queueName, exchangeName, arguments, routingKey);
}
}
else
{
doSendQueueBind(queueName, exchangeName, arguments, routingKey);
}
}
else
{
// Leaving this here to ensure the public method bindQueue in AMQSession.java works as expected.
List<Binding> bindings = new ArrayList<Binding>();
bindings.addAll(destination.getNode().getBindings());
String defaultExchange = destination.getAddressType() == AMQDestination.TOPIC_TYPE ?
destination.getAddressName(): "amq.topic";
for (Binding binding: bindings)
{
// Currently there is a bug (QPID-3317) with setting up and tearing down x-bindings for link.
// The null check below is a way to side step that issue while fixing QPID-4146
// Note this issue only affects producers.
if (binding.getQueue() == null && queueName == null)
{
continue;
}
String queue = binding.getQueue() == null ? queueName : binding.getQueue();
String exchange = binding.getExchange() == null ? defaultExchange : binding.getExchange();
_logger.debug("Binding queue : " + queue +
" exchange: " + exchange +
" using binding key " + binding.getBindingKey() +
" with args " + Strings.printMap(binding.getArgs()));
doBind(destination, binding, queue, exchange);
}
}
if (!nowait)
{
// We need to sync so that we get notify of an error.
sync();
}
}
private void doSendQueueBind(final String queueName,
final String exchangeName,
final Map args,
final String rk)
{
_logger.debug("Binding queue : " + queueName +
" exchange: " + exchangeName +
" using binding key " + rk);
getQpidSession().exchangeBind(queueName,
exchangeName,
rk,
args);
}
/**
* Close this session.
*
* @param timeout no used / 0_8 specific
* @throws QpidException
* @throws FailoverException
*/
public void sendClose(long timeout) throws QpidException, FailoverException
{
cancelTimerTask();
flushAcknowledgments();
try
{
getQpidSession().sync();
getQpidSession().close();
}
catch (SessionException se)
{
setCurrentException(se);
}
QpidException amqe = getCurrentException();
if (amqe != null)
{
throw amqe;
}
}
/**
* Create a queue with a given name.
*
* @param name The queue name
* @param autoDelete If this field is set and the exclusive field is also set,
* then the queue is deleted when the connection closes.
* @param durable If set when creating a new queue,
* the queue will be marked as durable.
* @param exclusive Exclusive queues can only be used from one connection at a time.
* @param arguments Exclusive queues can only be used from one connection at a time.
* @throws QpidException
* @throws FailoverException
*/
public void sendCreateQueue(String name, final boolean autoDelete, final boolean durable,
final boolean exclusive, Map<String, Object> arguments) throws QpidException, FailoverException
{
getQpidSession().queueDeclare(name, null, arguments, durable ? Option.DURABLE : Option.NONE,
autoDelete ? Option.AUTO_DELETE : Option.NONE,
exclusive ? Option.EXCLUSIVE : Option.NONE);
// We need to sync so that we get notify of an error.
sync();
}
/**
* This method asks the broker to redeliver all unacknowledged messages
*
* @throws QpidException
* @throws FailoverException
*/
public void sendRecover() throws QpidException, FailoverException
{
// release all unacked messages
RangeSet all = RangeSetFactory.createRangeSet();
RangeSet delivered = gatherRangeSet(getUnacknowledgedMessageTags());
RangeSet prefetched = gatherRangeSet(getPrefetchedMessageTags());
for (Iterator<Range> deliveredIter = delivered.iterator(); deliveredIter.hasNext();)
{
Range range = deliveredIter.next();
all.add(range);
}
for (Iterator<Range> prefetchedIter = prefetched.iterator(); prefetchedIter.hasNext();)
{
Range range = prefetchedIter.next();
all.add(range);
}
flushProcessed(all, false);
getQpidSession().messageRelease(delivered, Option.SET_REDELIVERED);
getQpidSession().messageRelease(prefetched);
// We need to sync so that we get notify of an error.
sync();
}
private RangeSet gatherRangeSet(ConcurrentLinkedQueue<Long> messageTags)
{
RangeSet ranges = RangeSetFactory.createRangeSet();
while (true)
{
Long tag = messageTags.poll();
if (tag == null)
{
break;
}
ranges.add(tag.intValue());
}
return ranges;
}
public void releaseForRollback()
{
if (_txSize > 0)
{
flushProcessed(_txRangeSet, false);
getQpidSession().messageRelease(_txRangeSet, Option.SET_REDELIVERED);
_txRangeSet.clear();
_txSize = 0;
}
}
/**
* Release (0_8 notion of Reject) an acquired message
*
* @param deliveryTag the message ID
* @param requeue always true
*/
public void rejectMessage(long deliveryTag, boolean requeue)
{
// The value of requeue is always true
RangeSet ranges = RangeSetFactory.createRangeSet();
ranges.add((int) deliveryTag);
flushProcessed(ranges, false);
if (requeue)
{
getQpidSession().messageRelease(ranges);
}
else
{
getQpidSession().messageRelease(ranges, Option.SET_REDELIVERED);
}
//I don't think we need to sync
}
/**
* Create an 0_10 message consumer
*/
public BasicMessageConsumer_0_10 createMessageConsumer(final AMQDestination destination, final int prefetchHigh,
final int prefetchLow, final boolean noLocal,
final boolean exclusive, String messageSelector,
final Map<String,Object> rawSelector, final boolean noConsume,
final boolean autoClose) throws JMSException
{
return new BasicMessageConsumer_0_10(getChannelId(), getAMQConnection(), destination, messageSelector, noLocal,
getMessageFactoryRegistry(), this, rawSelector, prefetchHigh, prefetchLow,
exclusive, getAcknowledgeMode(), noConsume, autoClose);
}
/**
* Bind a queue with an exchange.
*/
public boolean isQueueBound(final String exchangeName, final String queueName, final String routingKey)
{
return isQueueBound(exchangeName,queueName,routingKey,(Map<String,Object>)null);
}
public boolean isQueueBound(final AMQDestination destination)
{
return isQueueBound(destination.getExchangeName(),destination.getAMQQueueName(),destination.getRoutingKey(),destination.getBindingKeys());
}
public boolean isQueueBound(final String exchangeName, final String queueName, final String routingKey, String[] bindingKeys)
{
String rk = null;
if (bindingKeys != null && bindingKeys.length>0)
{
rk = bindingKeys[0];
}
else if (routingKey != null)
{
rk = routingKey;
}
return isQueueBound(exchangeName, queueName, rk, (Map<String,Object>)null);
}
public boolean isQueueBound(final String exchangeName, final String queueName, final String bindingKey,Map<String,Object> args)
{
boolean res;
ExchangeBoundResult bindingQueryResult =
getQpidSession().exchangeBound(exchangeName,queueName, bindingKey, args).get();
if (bindingKey == null)
{
res = !(bindingQueryResult.getExchangeNotFound() || bindingQueryResult.getQueueNotFound());
}
else
{
if (args == null)
{
res = !(bindingQueryResult.getExchangeNotFound() || bindingQueryResult.getKeyNotMatched() || bindingQueryResult.getQueueNotFound() || bindingQueryResult
.getQueueNotMatched());
}
else
{
res = !(bindingQueryResult.getExchangeNotFound() || bindingQueryResult.getKeyNotMatched() || bindingQueryResult.getQueueNotFound() || bindingQueryResult
.getQueueNotMatched() || bindingQueryResult.getArgsNotMatched());
}
}
return res;
}
@Override
protected boolean isBound(String exchangeName, String amqQueueName, String routingKey)
{
return isQueueBound(exchangeName, amqQueueName, routingKey);
}
/**
* This method is invoked when a consumer is created
* Registers the consumer with the broker
*/
public void sendConsume(BasicMessageConsumer_0_10 consumer, String queueName,
boolean nowait)
throws QpidException, FailoverException
{
queueName = preprocessAddressTopic(consumer, queueName);
boolean preAcquire = consumer.isPreAcquire();
AMQDestination destination = consumer.getDestination();
long capacity = consumer.getCapacity();
Map<String, Object> arguments = consumer.getArguments();
Link link = destination.getLink();
if (link != null && link.getSubscription() != null && link.getSubscription().getArgs() != null)
{
arguments.putAll(link.getSubscription().getArgs());
}
boolean acceptModeNone = getAcknowledgeMode() == NO_ACKNOWLEDGE;
String queue = queueName == null ? destination.getAddressName() : queueName;
String consumerTag = consumer.getConsumerTag();
getQpidSession().messageSubscribe
(queue, consumerTag,
acceptModeNone ? MessageAcceptMode.NONE : MessageAcceptMode.EXPLICIT,
preAcquire ? MessageAcquireMode.PRE_ACQUIRED : MessageAcquireMode.NOT_ACQUIRED, null, 0, arguments,
consumer.isExclusive() ? Option.EXCLUSIVE : Option.NONE);
if (capacity == 0)
{
getQpidSession().messageSetFlowMode(consumerTag, MessageFlowMode.CREDIT);
}
else
{
getQpidSession().messageSetFlowMode(consumerTag, MessageFlowMode.WINDOW);
}
getQpidSession().messageFlow(consumerTag, MessageCreditUnit.BYTE, 0xFFFFFFFF,
Option.UNRELIABLE);
if(capacity > 0 && getDispatcher() != null && (isStarted() || isImmediatePrefetch()))
{
// set the flow
getQpidSession().messageFlow(consumerTag,
MessageCreditUnit.MESSAGE,
capacity,
Option.UNRELIABLE);
}
sync();
}
/**
* Create an 0_10 message producer
*/
public BasicMessageProducer_0_10 createMessageProducer(final Destination destination, final Boolean mandatory,
final Boolean immediate, final long producerId) throws JMSException
{
try
{
return new BasicMessageProducer_0_10(getAMQConnection(), (AMQDestination) destination, isTransacted(), getChannelId(), this,
producerId, immediate, mandatory);
}
catch (QpidException e)
{
throw toJMSException("Error creating producer",e);
}
catch(TransportException e)
{
throw toJMSException("Exception while creating message producer:" + e.getMessage(), e);
}
}
/**
* creates an exchange if it does not already exist
*/
public void sendExchangeDeclare(final String name, final String type, final boolean nowait,
boolean durable, boolean autoDelete, boolean internal) throws QpidException, FailoverException
{
//The 'internal' parameter is ignored on the 0-10 path, the protocol does not support it
sendExchangeDeclare(name, type, null, null, nowait, durable, autoDelete);
}
public void sendExchangeDeclare(final String name, final String type, final boolean nowait,
boolean durable, boolean autoDelete, Map<String,Object> arguments, final boolean passive) throws
QpidException, FailoverException
{
sendExchangeDeclare(name, type, null,
arguments,
nowait, durable, autoDelete);
}
public void sendExchangeDeclare(final String name, final String type,
final String alternateExchange, final Map<String, Object> args,
final boolean nowait, boolean durable, boolean autoDelete) throws QpidException
{
getQpidSession().exchangeDeclare(
name,
type,
alternateExchange,
args,
name.startsWith("amq.") ? Option.PASSIVE : Option.NONE,
durable ? Option.DURABLE : Option.NONE,
autoDelete ? Option.AUTO_DELETE : Option.NONE);
// We need to sync so that we get notify of an error.
if (!nowait)
{
sync();
}
}
/**
* deletes an exchange
*/
public void sendExchangeDelete(final String name, final boolean nowait)
throws QpidException, FailoverException
{
getQpidSession().exchangeDelete(name);
// We need to sync so that we get notify of an error.
if (!nowait)
{
sync();
}
}
/**
* Declare a queue with the given queueName
*/
public String send0_10QueueDeclare(final AMQDestination amqd, final boolean noLocal,
final boolean nowait, boolean passive)
throws QpidException
{
String queueName;
if (amqd.getAMQQueueName() == null)
{
// generate a name for this queue
queueName = createTemporaryQueueName();
amqd.setQueueName(queueName);
}
else
{
queueName = amqd.getAMQQueueName();
}
if (amqd.getDestSyntax() == DestSyntax.BURL)
{
Map<String,Object> arguments = new HashMap<String,Object>();
if (noLocal)
{
arguments.put(AddressHelper.NO_LOCAL, true);
}
getQpidSession().queueDeclare(queueName, "" , arguments,
amqd.isAutoDelete() ? Option.AUTO_DELETE : Option.NONE,
amqd.isDurable() ? Option.DURABLE : Option.NONE,
amqd.isExclusive() ? Option.EXCLUSIVE : Option.NONE,
passive ? Option.PASSIVE : Option.NONE);
}
else
{
// This code is here to ensure address based destination work with the declareQueue public method in AMQSession.java
Node node = amqd.getNode();
Map<String,Object> arguments = new HashMap<String,Object>();
arguments.putAll(node.getDeclareArgs());
if (arguments.get(AddressHelper.NO_LOCAL) == null)
{
arguments.put(AddressHelper.NO_LOCAL, noLocal);
}
getQpidSession().queueDeclare(queueName, node.getAlternateExchange() ,
arguments,
node.isAutoDelete() ? Option.AUTO_DELETE : Option.NONE,
node.isDurable() ? Option.DURABLE : Option.NONE,
node.isExclusive() ? Option.EXCLUSIVE : Option.NONE);
}
// passive --> false
if (!nowait)
{
// We need to sync so that we get notify of an error.
sync();
}
return queueName;
}
/**
* deletes a queue
*/
public void sendQueueDelete(final String queueName) throws QpidException, FailoverException
{
getQpidSession().queueDelete(queueName);
// We need to sync so that we get notify of an error.
sync();
}
/**
* Activate/deactivate the message flow for all the consumers of this session.
*/
public void sendSuspendChannel(boolean suspend) throws QpidException, FailoverException
{
if (suspend)
{
for (BasicMessageConsumer consumer : getConsumers())
{
getQpidSession().messageStop(consumer.getConsumerTag(),
Option.UNRELIABLE);
}
sync();
}
else
{
for (BasicMessageConsumer_0_10 consumer : getConsumers())
{
String consumerTag = consumer.getConsumerTag();
//only set if msg list is null
try
{
long capacity = consumer.getCapacity();
if (capacity == 0)
{
if (consumer.getMessageListener() != null)
{
getQpidSession().messageFlow(consumerTag,
MessageCreditUnit.MESSAGE, 1,
Option.UNRELIABLE);
}
}
else
{
getQpidSession()
.messageFlow(consumerTag, MessageCreditUnit.MESSAGE,
capacity,
Option.UNRELIABLE);
}
getQpidSession()
.messageFlow(consumerTag, MessageCreditUnit.BYTE, 0xFFFFFFFF,
Option.UNRELIABLE);
}
catch (Exception e)
{
throw new AMQException(ErrorCodes.INTERNAL_ERROR, "Error while trying to get the listener", e);
}
}
}
// We need to sync so that we get notify of an error.
sync();
}
public void sendRollback() throws QpidException, FailoverException
{
getQpidSession().txRollback();
// We need to sync so that we get notify of an error.
sync();
}
//------ Private methods
/**
* Access to the underlying Qpid Session
*
* @return The associated Qpid Session.
*/
protected Session getQpidSession()
{
return _qpidSession;
}
/**
* Get the latest thrown exception.
*
* @throws SessionException get the latest thrown error.
*/
public QpidException getCurrentException()
{
QpidException amqe = null;
synchronized (_currentExceptionLock)
{
if (_currentException != null)
{
amqe = _currentException;
_currentException = null;
}
}
return amqe;
}
public void opened(Session ssn) {}
public void resumed(Session ssn)
{
_qpidConnection = ssn.getConnection();
}
public void message(Session ssn, MessageTransfer xfr)
{
messageReceived(new UnprocessedMessage_0_10(xfr));
}
public void exception(Session ssn, SessionException exc)
{
setCurrentException(exc);
}
public void closed(Session ssn)
{
try
{
super.closed(null);
if (_flushTaskFuture != null)
{
_flushTaskFuture.cancel(false);
_flushTaskFuture = null;
}
} catch (Exception e)
{
_logger.error("Error closing JMS session", e);
}
}
public QpidException getLastException()
{
return getCurrentException();
}
@Override
protected String declareQueue(final AMQDestination amqd,
final boolean noLocal, final boolean nowait, final boolean passive)
throws QpidException
{
return new FailoverNoopSupport<String, QpidException>(
new FailoverProtectedOperation<String, QpidException>()
{
public String execute() throws QpidException, FailoverException
{
// Generate the queue name if the destination indicates that a client generated name is to be used.
if (amqd.isNameRequired())
{
String bindingKey = "";
for(String key : amqd.getBindingKeys())
{
bindingKey = bindingKey + "_" + key;
}
amqd.setQueueName(bindingKey + "@"
+ amqd.getExchangeName() + "_" + UUID.randomUUID());
}
return send0_10QueueDeclare(amqd, noLocal, nowait, passive);
}
}, getAMQConnection()).execute();
}
protected Long requestQueueDepth(AMQDestination amqd, boolean sync)
{
flushAcknowledgments();
if (sync)
{
getQpidSession().sync();
}
return getQpidSession().queueQuery(amqd.getQueueName()).get().getMessageCount();
}
/**
* Store non committed messages for this session
* @param id
*/
@Override protected void addDeliveredMessage(long id)
{
_txRangeSet.add((int) id);
_txSize++;
}
/**
* With 0.10 messages are consumed with window mode, we must send a completion
* before the window size is reached so credits don't dry up.
*/
protected void sendTxCompletionsIfNecessary()
{
// this is a heuristic, we may want to have that configurable
if (_txSize > 0 && (getAMQConnection().getMaxPrefetch() == 1 ||
getAMQConnection().getMaxPrefetch() != 0 && _txSize % (getAMQConnection().getMaxPrefetch() / 2) == 0))
{
// send completed so consumer credits don't dry up
messageAcknowledge(_txRangeSet, false);
}
}
public void commitImpl() throws QpidException, FailoverException, TransportException
{
if( _txSize > 0 )
{
messageAcknowledge(_txRangeSet, true);
_txRangeSet.clear();
_txSize = 0;
}
getQpidSession().setAutoSync(true);
try
{
getQpidSession().txCommit();
}
finally
{
getQpidSession().setAutoSync(false);
}
// We need to sync so that we get notify of an error.
sync();
}
protected final boolean tagLE(long tag1, long tag2)
{
return Serial.le((int) tag1, (int) tag2);
}
protected final boolean updateRollbackMark(long currentMark, long deliveryTag)
{
return Serial.lt((int) currentMark, (int) deliveryTag);
}
public void sync() throws QpidException
{
try
{
getQpidSession().sync();
}
catch (SessionException se)
{
setCurrentException(se);
}
QpidException amqe = getCurrentException();
if (amqe != null)
{
throw amqe;
}
}
public void setCurrentException(SessionException se)
{
QpidException amqe;
synchronized (_currentExceptionLock)
{
ExecutionException ee = se.getException();
int code = ErrorCodes.INTERNAL_ERROR;
if (ee != null)
{
code = ee.getErrorCode().getValue();
}
amqe = new AMQException(code, _isHardError, se.getMessage(), se.getCause());
_currentException = amqe;
}
if (!_isHardError)
{
cancelTimerTask();
stopDispatcherThread();
try
{
closed(amqe);
}
catch(Exception e)
{
_logger.warn("Error closing session", e);
}
getAMQConnection().exceptionReceived(amqe);
}
else
{
getAMQConnection().closed(amqe);
}
}
public AMQMessageDelegateFactory getMessageDelegateFactory()
{
return AMQMessageDelegateFactory.FACTORY_0_10;
}
@Override
public boolean isExchangeExist(AMQDestination dest,boolean assertNode) throws QpidException
{
boolean match = true;
ExchangeQueryResult result = getQpidSession().exchangeQuery(dest.getAddressName(), Option.NONE).get();
match = !result.getNotFound();
Node node = dest.getNode();
if (match)
{
if (assertNode)
{
match = (result.getDurable() == node.isDurable()) &&
(node.getExchangeType() != null &&
node.getExchangeType().equals(result.getType())) &&
(matchProps(result.getArguments(),node.getDeclareArgs()));
}
else
{
_logger.debug("Setting Exchange type " + result.getType());
node.setExchangeType(result.getType());
dest.setExchangeClass(result.getType());
}
}
if (assertNode)
{
if (!match)
{
throw new QpidException("Assert failed for address : " + dest +", Result was : " + result);
}
}
return match;
}
@Override
public boolean isQueueExist(AMQDestination dest, boolean assertNode) throws QpidException
{
Node node = dest.getNode();
return isQueueExist(dest.getAddressName(), assertNode,
node.isDurable(), node.isAutoDelete(),
node.isExclusive(), node.getDeclareArgs());
}
public boolean isQueueExist(String queueName, boolean assertNode,
boolean durable, boolean autoDelete,
boolean exclusive, Map<String, Object> args) throws QpidException
{
boolean match = true;
try
{
QueueQueryResult result = getQpidSession().queueQuery(queueName, Option.NONE).get();
match = queueName.equals(result.getQueue());
if (match && assertNode)
{
match = (result.getDurable() == durable) &&
(result.getAutoDelete() == autoDelete) &&
(result.getExclusive() == exclusive) &&
(matchProps(result.getArguments(),args));
}
if (assertNode)
{
if (!match)
{
throw new QpidException("Assert failed for queue : " + queueName +", Result was : " + result);
}
}
}
catch(SessionException e)
{
final ExecutionException underlying = e.getException();
if (underlying == null)
{
throw new QpidException("Error querying queue", e);
}
else
{
if (underlying.getErrorCode() == ExecutionErrorCode.RESOURCE_DELETED
|| underlying.getErrorCode() == ExecutionErrorCode.NOT_FOUND)
{
match = false;
}
else
{
throw new AMQException(underlying.getErrorCode().getValue(),
"Error querying queue", e);
}
}
}
return match;
}
private boolean matchProps(Map<String,Object> target,Map<String,Object> source)
{
boolean match = true;
for (String key: source.keySet())
{
match = target.containsKey(key) &&
(target.get(key).equals(source.get(key))
|| (target.get(key) instanceof Number)
&& source.get(key) instanceof Number &&
(((Number) target.get(key)).longValue()) == (((Number) source.get(key)).longValue()));
if (!match)
{
StringBuffer buf = new StringBuffer();
buf.append("Property given in address did not match with the args sent by the broker.");
buf.append(" Expected { ").append(key).append(" : ").append(source.get(key)).append(" }, ");
buf.append(" Actual { ").append(key).append(" : ").append(target.get(key)).append(" }");
_logger.debug(buf.toString());
return match;
}
}
return match;
}
@Override
public int resolveAddressType(AMQDestination dest) throws QpidException
{
int type = dest.getAddressType();
String name = dest.getAddressName();
if (type != AMQDestination.UNKNOWN_TYPE)
{
return type;
}
else
{
ExchangeBoundResult result = getQpidSession().exchangeBound(name,name,null,null).get();
if (result.getQueueNotFound() && result.getExchangeNotFound()) {
//neither a queue nor an exchange exists with that name; treat it as a queue
type = AMQDestination.QUEUE_TYPE;
} else if (result.getExchangeNotFound()) {
//name refers to a queue
type = AMQDestination.QUEUE_TYPE;
} else if (result.getQueueNotFound()) {
//name refers to an exchange
type = AMQDestination.TOPIC_TYPE;
} else {
//both a queue and exchange exist for that name
throw new QpidException("Ambiguous address, please specify queue or topic as node type");
}
dest.setAddressType(type);
return type;
}
}
@Override
void createSubscriptionQueue(AMQDestination dest, boolean noLocal, String messageSelector) throws QpidException
{
Link link = dest.getLink();
String queueName = dest.getQueueName();
if (queueName == null)
{
queueName = link.getName() == null ? "TempQueue" + UUID.randomUUID() : link.getName();
dest.setQueueName(queueName);
}
SubscriptionQueue queueProps = link.getSubscriptionQueue();
Map<String,Object> arguments = queueProps.getDeclareArgs();
if (!arguments.containsKey((AddressHelper.NO_LOCAL)))
{
arguments.put(AddressHelper.NO_LOCAL, noLocal);
}
if (link.isDurable() && queueName.startsWith("TempQueue"))
{
throw new QpidException("You cannot mark a subscription queue as durable without providing a name for the link.");
}
getQpidSession().queueDeclare(queueName,
queueProps.getAlternateExchange(), arguments,
queueProps.isAutoDelete() ? Option.AUTO_DELETE : Option.NONE,
link.isDurable() ? Option.DURABLE : Option.NONE,
queueProps.isExclusive() ? Option.EXCLUSIVE : Option.NONE);
Map<String,Object> bindingArguments = new HashMap<String, Object>();
bindingArguments.put(AMQPFilterTypes.JMS_SELECTOR.getValue(), messageSelector == null ? "" : messageSelector);
getQpidSession().exchangeBind(queueName,
dest.getAddressName(),
dest.getSubject(),
bindingArguments);
}
protected void acknowledgeImpl()
{
RangeSet ranges = gatherRangeSet(getUnacknowledgedMessageTags());
if(ranges.size() > 0 )
{
messageAcknowledge(ranges, true);
if (getAMQConnection().getSyncClientAck())
{
getQpidSession().sync();
}
}
}
@Override
void resubscribe() throws QpidException
{
// Clear txRangeSet/unacknowledgedMessageTags so we don't complete commands corresponding to
//messages that came from the old broker.
_txRangeSet.clear();
_txSize = 0;
super.resubscribe();
getQpidSession().sync();
}
@Override
void stop() throws QpidException
{
// Stop the server delivering messages to this session.
suspendChannelIfNotClosing();
drainDispatchQueueWithDispatcher();
stopExistingDispatcher();
for (BasicMessageConsumer consumer : getConsumers())
{
List<Long> tags = consumer.drainReceiverQueueAndRetrieveDeliveryTags();
getPrefetchedMessageTags().addAll(tags);
}
RangeSet delivered = gatherRangeSet(getUnacknowledgedMessageTags());
RangeSet prefetched = gatherRangeSet(getPrefetchedMessageTags());
RangeSet all = RangeSetFactory.createRangeSet(delivered.size()
+ prefetched.size());
for (Iterator<Range> deliveredIter = delivered.iterator(); deliveredIter.hasNext();)
{
Range range = deliveredIter.next();
all.add(range);
}
for (Iterator<Range> prefetchedIter = prefetched.iterator(); prefetchedIter.hasNext();)
{
Range range = prefetchedIter.next();
all.add(range);
}
flushProcessed(all, false);
getQpidSession().messageRelease(delivered,Option.SET_REDELIVERED);
getQpidSession().messageRelease(prefetched);
sync();
}
@Override
public boolean isFlowBlocked()
{
return _qpidSession.isFlowBlocked();
}
@Override
public void setFlowControl(boolean active)
{
// Supported by 0-8..0-9-1 only
throw new UnsupportedOperationException("Operation not supported by this protocol");
}
private void cancelTimerTask()
{
if (_flushTaskFuture != null)
{
_flushTaskFuture.cancel(false);
_flushTaskFuture = null;
}
}
@Override
protected void handleQueueNodeCreation(AMQDestination dest, boolean noLocal) throws QpidException
{
Node node = dest.getNode();
Map<String,Object> arguments = node.getDeclareArgs();
if (!arguments.containsKey((AddressHelper.NO_LOCAL)))
{
arguments.put(AddressHelper.NO_LOCAL, noLocal);
}
getQpidSession().queueDeclare(dest.getAddressName(),
node.getAlternateExchange(), arguments,
node.isAutoDelete() ? Option.AUTO_DELETE : Option.NONE,
node.isDurable() ? Option.DURABLE : Option.NONE,
node.isExclusive() ? Option.EXCLUSIVE : Option.NONE);
createBindings(dest, dest.getNode().getBindings());
sync();
}
@Override
void handleExchangeNodeCreation(AMQDestination dest) throws QpidException
{
Node node = dest.getNode();
sendExchangeDeclare(dest.getAddressName(),
node.getExchangeType(),
node.getAlternateExchange(),
node.getDeclareArgs(),
false,
node.isDurable(),
node.isAutoDelete());
// If bindings are specified without a queue name and is called by the producer,
// the broker will send an exception as expected.
createBindings(dest, dest.getNode().getBindings());
sync();
}
protected void doBind(final AMQDestination dest, final Binding binding, final String queue, final String exchange)
{
getQpidSession().exchangeBind(queue,
exchange,
binding.getBindingKey(),
binding.getArgs());
}
void handleLinkDelete(AMQDestination dest) throws QpidException
{
// We need to destroy link bindings
String defaultExchangeForBinding = dest.getAddressType() == AMQDestination.TOPIC_TYPE ? dest
.getAddressName() : "amq.topic";
String defaultQueueName = null;
if (AMQDestination.QUEUE_TYPE == dest.getAddressType())
{
defaultQueueName = dest.getQueueName();
}
else
{
defaultQueueName = dest.getLink().getName() != null ? dest.getLink().getName() : dest.getQueueName();
}
for (Binding binding: dest.getLink().getBindings())
{
String queue = binding.getQueue() == null?
defaultQueueName: binding.getQueue();
String exchange = binding.getExchange() == null ?
defaultExchangeForBinding :
binding.getExchange();
if (_logger.isDebugEnabled())
{
_logger.debug("Unbinding queue : " + queue +
" exchange: " + exchange +
" using binding key " + binding.getBindingKey() +
" with args " + Strings.printMap(binding.getArgs()));
}
getQpidSession().exchangeUnbind(queue, exchange,
binding.getBindingKey());
}
}
void deleteSubscriptionQueue(AMQDestination dest) throws QpidException
{
// We need to delete the subscription queue.
if (dest.getAddressType() == AMQDestination.TOPIC_TYPE &&
dest.getLink().getSubscriptionQueue().isExclusive() &&
isQueueExist(dest.getQueueName(), false, false, false, false, null))
{
getQpidSession().queueDelete(dest.getQueueName());
}
}
@Override
void handleNodeDelete(AMQDestination dest) throws QpidException
{
if (AMQDestination.TOPIC_TYPE == dest.getAddressType())
{
if (isExchangeExist(dest,false))
{
getQpidSession().exchangeDelete(dest.getAddressName());
setUnresolved(dest);
}
}
else
{
if (isQueueExist(dest,false))
{
getQpidSession().queueDelete(dest.getAddressName());
setUnresolved(dest);
}
}
}
}