blob: aa7025e06886549ce480d48895ba278680ee7f45 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.qpid.server.queue;
import java.security.Principal;
import java.util.*;
import java.util.concurrent.ConcurrentSkipListSet;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.Executor;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import org.apache.log4j.Logger;
import org.apache.qpid.server.message.MessageSource;
import org.apache.qpid.server.model.ExclusivityPolicy;
import org.apache.qpid.server.model.LifetimePolicy;
import org.apache.qpid.server.protocol.AMQConnectionModel;
import org.apache.qpid.server.security.QpidSecurityException;
import org.apache.qpid.pool.ReferenceCountingExecutorService;
import org.apache.qpid.server.binding.Binding;
import org.apache.qpid.server.configuration.BrokerProperties;
import org.apache.qpid.server.exchange.Exchange;
import org.apache.qpid.server.filter.FilterManager;
import org.apache.qpid.server.logging.LogActor;
import org.apache.qpid.server.logging.LogSubject;
import org.apache.qpid.server.logging.actors.CurrentActor;
import org.apache.qpid.server.logging.actors.QueueActor;
import org.apache.qpid.server.logging.messages.QueueMessages;
import org.apache.qpid.server.logging.subjects.QueueLogSubject;
import org.apache.qpid.server.message.InstanceProperties;
import org.apache.qpid.server.message.MessageInstance;
import org.apache.qpid.server.message.MessageReference;
import org.apache.qpid.server.message.ServerMessage;
import org.apache.qpid.server.model.Queue;
import org.apache.qpid.server.protocol.AMQSessionModel;
import org.apache.qpid.server.security.AuthorizationHolder;
import org.apache.qpid.server.consumer.Consumer;
import org.apache.qpid.server.consumer.ConsumerTarget;
import org.apache.qpid.server.security.auth.AuthenticatedPrincipal;
import org.apache.qpid.server.store.StorableMessageMetaData;
import org.apache.qpid.server.txn.AutoCommitTransaction;
import org.apache.qpid.server.txn.LocalTransaction;
import org.apache.qpid.server.txn.ServerTransaction;
import org.apache.qpid.server.util.Action;
import org.apache.qpid.server.util.ConnectionScopedRuntimeException;
import org.apache.qpid.server.util.Deletable;
import org.apache.qpid.server.util.MapValueConverter;
import org.apache.qpid.server.util.ServerScopedRuntimeException;
import org.apache.qpid.server.util.StateChangeListener;
import org.apache.qpid.server.virtualhost.VirtualHost;
abstract class SimpleAMQQueue<E extends QueueEntryImpl<E,Q,L>, Q extends SimpleAMQQueue<E, Q,L>, L extends SimpleQueueEntryList<E,Q,L>> implements AMQQueue<E, Q, QueueConsumer<?,E,Q,L>>,
StateChangeListener<QueueConsumer<?,E,Q,L>, QueueConsumer.State>,
MessageGroupManager.ConsumerResetHelper<E,Q,L>
{
private static final Logger _logger = Logger.getLogger(SimpleAMQQueue.class);
public static final String SHARED_MSG_GROUP_ARG_VALUE = "1";
private static final String QPID_NO_GROUP = "qpid.no-group";
private static final String DEFAULT_SHARED_MESSAGE_GROUP = System.getProperty(BrokerProperties.PROPERTY_DEFAULT_SHARED_MESSAGE_GROUP, QPID_NO_GROUP);
// TODO - should make this configurable at the vhost / broker level
private static final int DEFAULT_MAX_GROUPS = 255;
private final VirtualHost _virtualHost;
private final String _name;
/** null means shared */
private String _description;
private final boolean _durable;
private Exchange _alternateExchange;
private final L _entries;
private final QueueConsumerList<E,Q,L> _consumerList = new QueueConsumerList<E,Q,L>();
private volatile QueueConsumer<?,E,Q,L> _exclusiveSubscriber;
private final AtomicInteger _atomicQueueCount = new AtomicInteger(0);
private final AtomicLong _atomicQueueSize = new AtomicLong(0L);
private final AtomicInteger _activeSubscriberCount = new AtomicInteger();
private final AtomicLong _totalMessagesReceived = new AtomicLong();
private final AtomicLong _dequeueCount = new AtomicLong();
private final AtomicLong _dequeueSize = new AtomicLong();
private final AtomicLong _enqueueCount = new AtomicLong();
private final AtomicLong _enqueueSize = new AtomicLong();
private final AtomicLong _persistentMessageEnqueueSize = new AtomicLong();
private final AtomicLong _persistentMessageDequeueSize = new AtomicLong();
private final AtomicLong _persistentMessageEnqueueCount = new AtomicLong();
private final AtomicLong _persistentMessageDequeueCount = new AtomicLong();
private final AtomicLong _unackedMsgCount = new AtomicLong(0);
private final AtomicLong _unackedMsgBytes = new AtomicLong();
private final AtomicInteger _bindingCountHigh = new AtomicInteger();
/** max allowed size(KB) of a single message */
private long _maximumMessageSize;
/** max allowed number of messages on a queue. */
private long _maximumMessageCount;
/** max queue depth for the queue */
private long _maximumQueueDepth;
/** maximum message age before alerts occur */
private long _maximumMessageAge;
/** the minimum interval between sending out consecutive alerts of the same type */
private long _minimumAlertRepeatGap;
private long _capacity;
private long _flowResumeCapacity;
private ExclusivityPolicy _exclusivityPolicy;
private LifetimePolicy _lifetimePolicy;
private Object _exclusiveOwner; // could be connection, session or Principal
private final Set<NotificationCheck> _notificationChecks = EnumSet.noneOf(NotificationCheck.class);
static final int MAX_ASYNC_DELIVERIES = 80;
private final AtomicLong _stateChangeCount = new AtomicLong(Long.MIN_VALUE);
private final Executor _asyncDelivery;
private AtomicInteger _deliveredMessages = new AtomicInteger();
private AtomicBoolean _stopped = new AtomicBoolean(false);
private final Set<AMQSessionModel> _blockedChannels = new ConcurrentSkipListSet<AMQSessionModel>();
private final AtomicBoolean _deleted = new AtomicBoolean(false);
private final List<Action<? super Q>> _deleteTaskList =
new CopyOnWriteArrayList<Action<? super Q>>();
private LogSubject _logSubject;
private LogActor _logActor;
private boolean _noLocal;
private final AtomicBoolean _overfull = new AtomicBoolean(false);
private boolean _deleteOnNoConsumers;
private final CopyOnWriteArrayList<Binding> _bindings = new CopyOnWriteArrayList<Binding>();
private UUID _id;
private final Map<String, Object> _arguments;
//TODO : persist creation time
private long _createTime = System.currentTimeMillis();
/** the maximum delivery count for each message on this queue or 0 if maximum delivery count is not to be enforced. */
private int _maximumDeliveryCount;
private final MessageGroupManager<E,Q,L> _messageGroupManager;
private final Collection<ConsumerRegistrationListener<Q>> _consumerListeners =
new ArrayList<ConsumerRegistrationListener<Q>>();
private AMQQueue.NotificationListener _notificationListener;
private final long[] _lastNotificationTimes = new long[NotificationCheck.values().length];
protected SimpleAMQQueue(VirtualHost virtualHost,
final AMQSessionModel<?,?> creatingSession,
Map<String, Object> attributes,
QueueEntryListFactory<E, Q, L> entryListFactory)
{
UUID id = MapValueConverter.getUUIDAttribute(Queue.ID, attributes);
String name = MapValueConverter.getStringAttribute(Queue.NAME, attributes);
boolean durable = MapValueConverter.getBooleanAttribute(Queue.DURABLE,attributes,false);
_exclusivityPolicy = MapValueConverter.getEnumAttribute(ExclusivityPolicy.class,
Queue.EXCLUSIVE,
attributes,
ExclusivityPolicy.NONE);
_lifetimePolicy = MapValueConverter.getEnumAttribute(LifetimePolicy.class,
Queue.LIFETIME_POLICY,
attributes,
LifetimePolicy.PERMANENT);
if(creatingSession != null)
{
switch(_exclusivityPolicy)
{
case PRINCIPAL:
_exclusiveOwner = creatingSession.getConnectionModel().getAuthorizedPrincipal();
break;
case CONTAINER:
_exclusiveOwner = creatingSession.getConnectionModel().getRemoteContainerName();
break;
case CONNECTION:
_exclusiveOwner = creatingSession.getConnectionModel();
addExclusivityConstraint(creatingSession.getConnectionModel());
break;
case SESSION:
_exclusiveOwner = creatingSession;
addExclusivityConstraint(creatingSession);
break;
case NONE:
case LINK:
// nothing to do as if link no link associated until there is a consumer associated
break;
default:
throw new ServerScopedRuntimeException("Unknown exclusivity policy: "
+ _exclusivityPolicy
+ " this is a coding error inside Qpid");
}
}
else if(_exclusivityPolicy == ExclusivityPolicy.PRINCIPAL)
{
String owner = MapValueConverter.getStringAttribute(Queue.OWNER, attributes, null);
if(owner != null)
{
_exclusiveOwner = new AuthenticatedPrincipal(owner);
}
}
else if(_exclusivityPolicy == ExclusivityPolicy.CONTAINER)
{
String owner = MapValueConverter.getStringAttribute(Queue.OWNER, attributes, null);
if(owner != null)
{
_exclusiveOwner = owner;
}
}
if(_lifetimePolicy == LifetimePolicy.DELETE_ON_CONNECTION_CLOSE)
{
if(creatingSession != null)
{
addLifetimeConstraint(creatingSession.getConnectionModel());
}
else
{
throw new IllegalArgumentException("Queues created with a lifetime policy of "
+ _lifetimePolicy
+ " must be created from a connection.");
}
}
else if(_lifetimePolicy == LifetimePolicy.DELETE_ON_SESSION_END)
{
if(creatingSession != null)
{
addLifetimeConstraint(creatingSession);
}
else
{
throw new IllegalArgumentException("Queues created with a lifetime policy of "
+ _lifetimePolicy
+ " must be created from a connection.");
}
}
if (name == null)
{
throw new IllegalArgumentException("Queue name must not be null");
}
if (virtualHost == null)
{
throw new IllegalArgumentException("Virtual Host must not be null");
}
_name = name;
_durable = durable;
_virtualHost = virtualHost;
_entries = entryListFactory.createQueueEntryList((Q) this);
final LinkedHashMap<String, Object> arguments = new LinkedHashMap<String, Object>(attributes);
arguments.put(Queue.EXCLUSIVE, _exclusivityPolicy);
arguments.put(Queue.LIFETIME_POLICY, _lifetimePolicy);
_arguments = Collections.synchronizedMap(arguments);
_description = MapValueConverter.getStringAttribute(Queue.DESCRIPTION, attributes, null);
_noLocal = MapValueConverter.getBooleanAttribute(Queue.NO_LOCAL, attributes, false);
_id = id;
_asyncDelivery = ReferenceCountingExecutorService.getInstance().acquireExecutorService();
_logSubject = new QueueLogSubject(this);
_logActor = new QueueActor(this, CurrentActor.get().getRootMessageLogger());
if (attributes.containsKey(Queue.ALERT_THRESHOLD_MESSAGE_AGE))
{
setMaximumMessageAge(MapValueConverter.getLongAttribute(Queue.ALERT_THRESHOLD_MESSAGE_AGE, attributes));
}
else
{
setMaximumMessageAge(virtualHost.getDefaultAlertThresholdMessageAge());
}
if (attributes.containsKey(Queue.ALERT_THRESHOLD_MESSAGE_SIZE))
{
setMaximumMessageSize(MapValueConverter.getLongAttribute(Queue.ALERT_THRESHOLD_MESSAGE_SIZE, attributes));
}
else
{
setMaximumMessageSize(virtualHost.getDefaultAlertThresholdMessageSize());
}
if (attributes.containsKey(Queue.ALERT_THRESHOLD_QUEUE_DEPTH_MESSAGES))
{
setMaximumMessageCount(MapValueConverter.getLongAttribute(Queue.ALERT_THRESHOLD_QUEUE_DEPTH_MESSAGES,
attributes));
}
else
{
setMaximumMessageCount(virtualHost.getDefaultAlertThresholdQueueDepthMessages());
}
if (attributes.containsKey(Queue.ALERT_THRESHOLD_QUEUE_DEPTH_BYTES))
{
setMaximumQueueDepth(MapValueConverter.getLongAttribute(Queue.ALERT_THRESHOLD_QUEUE_DEPTH_BYTES,
attributes));
}
else
{
setMaximumQueueDepth(virtualHost.getDefaultAlertThresholdQueueDepthBytes());
}
if (attributes.containsKey(Queue.ALERT_REPEAT_GAP))
{
setMinimumAlertRepeatGap(MapValueConverter.getLongAttribute(Queue.ALERT_REPEAT_GAP, attributes));
}
else
{
setMinimumAlertRepeatGap(virtualHost.getDefaultAlertRepeatGap());
}
if (attributes.containsKey(Queue.QUEUE_FLOW_CONTROL_SIZE_BYTES))
{
setCapacity(MapValueConverter.getLongAttribute(Queue.QUEUE_FLOW_CONTROL_SIZE_BYTES, attributes));
}
else
{
setCapacity(virtualHost.getDefaultQueueFlowControlSizeBytes());
}
if (attributes.containsKey(Queue.QUEUE_FLOW_RESUME_SIZE_BYTES))
{
setFlowResumeCapacity(MapValueConverter.getLongAttribute(Queue.QUEUE_FLOW_RESUME_SIZE_BYTES, attributes));
}
else
{
setFlowResumeCapacity(virtualHost.getDefaultQueueFlowResumeSizeBytes());
}
if (attributes.containsKey(Queue.MAXIMUM_DELIVERY_ATTEMPTS))
{
setMaximumDeliveryCount(MapValueConverter.getIntegerAttribute(Queue.MAXIMUM_DELIVERY_ATTEMPTS, attributes));
}
else
{
setMaximumDeliveryCount(virtualHost.getDefaultMaximumDeliveryAttempts());
}
final String ownerString;
switch(_exclusivityPolicy)
{
case PRINCIPAL:
ownerString = ((Principal) _exclusiveOwner).getName();
break;
case CONTAINER:
ownerString = (String) _exclusiveOwner;
break;
default:
ownerString = null;
}
// Log the creation of this Queue.
// The priorities display is toggled on if we set priorities > 0
CurrentActor.get().message(_logSubject,
QueueMessages.CREATED(ownerString,
_entries.getPriorities(),
ownerString != null ,
_lifetimePolicy != LifetimePolicy.PERMANENT,
durable,
!durable,
_entries.getPriorities() > 0));
if(attributes != null && attributes.containsKey(Queue.MESSAGE_GROUP_KEY))
{
if(attributes.get(Queue.MESSAGE_GROUP_SHARED_GROUPS) != null
&& (Boolean)(attributes.get(Queue.MESSAGE_GROUP_SHARED_GROUPS)))
{
Object defaultGroup = attributes.get(Queue.MESSAGE_GROUP_DEFAULT_GROUP);
_messageGroupManager =
new DefinedGroupMessageGroupManager<E,Q,L>(String.valueOf(attributes.get(Queue.MESSAGE_GROUP_KEY)),
defaultGroup == null ? DEFAULT_SHARED_MESSAGE_GROUP : defaultGroup.toString(),
this);
}
else
{
_messageGroupManager = new AssignedConsumerMessageGroupManager<E,Q,L>(String.valueOf(attributes.get(
Queue.MESSAGE_GROUP_KEY)), DEFAULT_MAX_GROUPS);
}
}
else
{
_messageGroupManager = null;
}
resetNotifications();
}
private void addLifetimeConstraint(final Deletable<? extends Deletable> lifetimeObject)
{
final Action<Deletable> deleteQueueTask = new Action<Deletable>()
{
@Override
public void performAction(final Deletable object)
{
try
{
getVirtualHost().removeQueue(SimpleAMQQueue.this);
}
catch (QpidSecurityException e)
{
throw new ConnectionScopedRuntimeException("Unable to delete a queue even though the queue's " +
"lifetime was tied to an object being deleted");
}
}
};
lifetimeObject.addDeleteTask(deleteQueueTask);
addDeleteTask(new DeleteDeleteTask(lifetimeObject, deleteQueueTask));
}
private void addExclusivityConstraint(final Deletable<? extends Deletable> lifetimeObject)
{
final ClearOwnerAction clearOwnerAction = new ClearOwnerAction(lifetimeObject);
final DeleteDeleteTask deleteDeleteTask = new DeleteDeleteTask(lifetimeObject, clearOwnerAction);
clearOwnerAction.setDeleteTask(deleteDeleteTask);
lifetimeObject.addDeleteTask(clearOwnerAction);
addDeleteTask(deleteDeleteTask);
}
public void resetNotifications()
{
// This ensure that the notification checks for the configured alerts are created.
setMaximumMessageAge(_maximumMessageAge);
setMaximumMessageCount(_maximumMessageCount);
setMaximumMessageSize(_maximumMessageSize);
setMaximumQueueDepth(_maximumQueueDepth);
}
// ------ Getters and Setters
public void execute(Runnable runnable)
{
try
{
_asyncDelivery.execute(runnable);
}
catch (RejectedExecutionException ree)
{
// Ignore - SubFlusherRunner or QueueRunner submitted execution as queue was being stopped.
if(!_stopped.get())
{
_logger.error("Unexpected rejected execution", ree);
throw ree;
}
}
}
public void setNoLocal(boolean nolocal)
{
_noLocal = nolocal;
}
public UUID getId()
{
return _id;
}
public boolean isDurable()
{
return _durable;
}
public boolean isExclusive()
{
return _exclusivityPolicy != ExclusivityPolicy.NONE;
}
public Exchange getAlternateExchange()
{
return _alternateExchange;
}
public void setAlternateExchange(Exchange exchange)
{
if(_alternateExchange != null)
{
_alternateExchange.removeReference(this);
}
if(exchange != null)
{
exchange.addReference(this);
}
_alternateExchange = exchange;
}
@Override
public Collection<String> getAvailableAttributes()
{
return new ArrayList<String>(_arguments.keySet());
}
@Override
public Object getAttribute(String attrName)
{
return _arguments.get(attrName);
}
@Override
public LifetimePolicy getLifetimePolicy()
{
return _lifetimePolicy;
}
public String getOwner()
{
if(_exclusiveOwner != null)
{
switch(_exclusivityPolicy)
{
case CONTAINER:
return (String) _exclusiveOwner;
case PRINCIPAL:
return ((Principal)_exclusiveOwner).getName();
}
}
return null;
}
public VirtualHost getVirtualHost()
{
return _virtualHost;
}
public String getName()
{
return _name;
}
// ------ Manage Consumers
@Override
public synchronized <T extends ConsumerTarget> QueueConsumer<T,E,Q,L> addConsumer(final T target,
final FilterManager filters,
final Class<? extends ServerMessage> messageClass,
final String consumerName,
EnumSet<Consumer.Option> optionSet)
throws ExistingExclusiveConsumer, ExistingConsumerPreventsExclusive, QpidSecurityException,
ConsumerAccessRefused
{
// Access control
if (!getVirtualHost().getSecurityManager().authoriseConsume(this))
{
throw new QpidSecurityException("Permission denied");
}
if (hasExclusiveConsumer())
{
throw new ExistingExclusiveConsumer();
}
switch(_exclusivityPolicy)
{
case CONNECTION:
if(_exclusiveOwner == null)
{
_exclusiveOwner = target.getSessionModel().getConnectionModel();
addExclusivityConstraint(target.getSessionModel().getConnectionModel());
}
else
{
if(_exclusiveOwner != target.getSessionModel().getConnectionModel())
{
throw new ConsumerAccessRefused();
}
}
break;
case SESSION:
if(_exclusiveOwner == null)
{
_exclusiveOwner = target.getSessionModel();
addExclusivityConstraint(target.getSessionModel());
}
else
{
if(_exclusiveOwner != target.getSessionModel())
{
throw new ConsumerAccessRefused();
}
}
break;
case LINK:
if(getConsumerCount() != 0)
{
throw new ConsumerAccessRefused();
}
break;
case PRINCIPAL:
if(_exclusiveOwner == null)
{
_exclusiveOwner = target.getSessionModel().getConnectionModel().getAuthorizedPrincipal();
}
else
{
if(!_exclusiveOwner.equals(target.getSessionModel().getConnectionModel().getAuthorizedPrincipal()))
{
throw new ConsumerAccessRefused();
}
}
break;
case CONTAINER:
if(_exclusiveOwner == null)
{
_exclusiveOwner = target.getSessionModel().getConnectionModel().getRemoteContainerName();
}
else
{
if(!_exclusiveOwner.equals(target.getSessionModel().getConnectionModel().getRemoteContainerName()))
{
throw new ConsumerAccessRefused();
}
}
break;
case NONE:
break;
default:
throw new ServerScopedRuntimeException("Unknown exclusivity policy " + _exclusivityPolicy);
}
boolean exclusive = optionSet.contains(Consumer.Option.EXCLUSIVE);
boolean isTransient = optionSet.contains(Consumer.Option.TRANSIENT);
QueueConsumer<T,E,Q,L> consumer = new QueueConsumer<T,E,Q,L>(filters, messageClass,
optionSet.contains(Consumer.Option.ACQUIRES),
optionSet.contains(Consumer.Option.SEES_REQUEUES),
consumerName, optionSet.contains(Consumer.Option.TRANSIENT), target);
target.consumerAdded(consumer);
if (exclusive && !isTransient)
{
_exclusiveSubscriber = consumer;
}
if(consumer.isActive())
{
_activeSubscriberCount.incrementAndGet();
}
consumer.setStateListener(this);
consumer.setQueueContext(new QueueContext<E,Q,L>(_entries.getHead()));
if (!isDeleted())
{
consumer.setQueue((Q)this, exclusive);
if(_noLocal)
{
consumer.setNoLocal(true);
}
synchronized (_consumerListeners)
{
for(ConsumerRegistrationListener<Q> listener : _consumerListeners)
{
listener.consumerAdded((Q)this, consumer);
}
}
_consumerList.add(consumer);
if (isDeleted())
{
consumer.queueDeleted();
}
}
else
{
// TODO
}
deliverAsync(consumer);
return consumer;
}
synchronized void unregisterConsumer(final QueueConsumer<?,E,Q,L> consumer)
{
if (consumer == null)
{
throw new NullPointerException("consumer argument is null");
}
boolean removed = _consumerList.remove(consumer);
if (removed)
{
consumer.close();
// No longer can the queue have an exclusive consumer
setExclusiveSubscriber(null);
consumer.setQueueContext(null);
if(_exclusivityPolicy == ExclusivityPolicy.LINK)
{
_exclusiveOwner = null;
}
if(_messageGroupManager != null)
{
resetSubPointersForGroups(consumer, true);
}
synchronized (_consumerListeners)
{
for(ConsumerRegistrationListener<Q> listener : _consumerListeners)
{
listener.consumerRemoved((Q)this, consumer);
}
}
// auto-delete queues must be deleted if there are no remaining subscribers
if(!consumer.isTransient()
&& ( _lifetimePolicy == LifetimePolicy.DELETE_ON_NO_OUTBOUND_LINKS
|| _lifetimePolicy == LifetimePolicy.DELETE_ON_NO_LINKS )
&& getConsumerCount() == 0)
{
if (_logger.isInfoEnabled())
{
_logger.info("Auto-deleting queue:" + this);
}
try
{
getVirtualHost().removeQueue(this);
}
catch (QpidSecurityException e)
{
throw new ConnectionScopedRuntimeException("Auto delete queue unable to delete itself", e);
}
// we need to manually fire the event to the removed consumer (which was the last one left for this
// queue. This is because the delete method uses the consumer set which has just been cleared
consumer.queueDeleted();
}
}
}
public Collection<QueueConsumer<?,E,Q,L>> getConsumers()
{
List<QueueConsumer<?,E,Q,L>> consumers = new ArrayList<QueueConsumer<?,E,Q,L>>();
QueueConsumerList.ConsumerNodeIterator<E,Q,L> iter = _consumerList.iterator();
while(iter.advance())
{
consumers.add(iter.getNode().getConsumer());
}
return consumers;
}
public void addConsumerRegistrationListener(final ConsumerRegistrationListener<Q> listener)
{
synchronized (_consumerListeners)
{
_consumerListeners.add(listener);
}
}
public void removeConsumerRegistrationListener(final ConsumerRegistrationListener<Q> listener)
{
synchronized (_consumerListeners)
{
_consumerListeners.remove(listener);
}
}
public void resetSubPointersForGroups(QueueConsumer<?,E,Q,L> consumer, boolean clearAssignments)
{
E entry = _messageGroupManager.findEarliestAssignedAvailableEntry(consumer);
if(clearAssignments)
{
_messageGroupManager.clearAssignments(consumer);
}
if(entry != null)
{
QueueConsumerList.ConsumerNodeIterator<E,Q,L> subscriberIter = _consumerList.iterator();
// iterate over all the subscribers, and if they are in advance of this queue entry then move them backwards
while (subscriberIter.advance())
{
QueueConsumer<?,E,Q,L> sub = subscriberIter.getNode().getConsumer();
// we don't make browsers send the same stuff twice
if (sub.seesRequeues())
{
updateSubRequeueEntry(sub, entry);
}
}
deliverAsync();
}
}
public boolean getDeleteOnNoConsumers()
{
return _deleteOnNoConsumers;
}
public void setDeleteOnNoConsumers(boolean b)
{
_deleteOnNoConsumers = b;
}
public void addBinding(final Binding binding)
{
_bindings.add(binding);
int bindingCount = _bindings.size();
int bindingCountHigh;
while(bindingCount > (bindingCountHigh = _bindingCountHigh.get()))
{
if(_bindingCountHigh.compareAndSet(bindingCountHigh, bindingCount))
{
break;
}
}
}
public void removeBinding(final Binding binding)
{
_bindings.remove(binding);
}
public List<Binding> getBindings()
{
return Collections.unmodifiableList(_bindings);
}
public int getBindingCount()
{
return getBindings().size();
}
public LogSubject getLogSubject()
{
return _logSubject;
}
// ------ Enqueue / Dequeue
public void enqueue(ServerMessage message, Action<? super MessageInstance<?, QueueConsumer<?,E,Q,L>>> action)
{
incrementQueueCount();
incrementQueueSize(message);
_totalMessagesReceived.incrementAndGet();
E entry;
final QueueConsumer<?,E,Q,L> exclusiveSub = _exclusiveSubscriber;
entry = _entries.add(message);
if(action != null || (exclusiveSub == null && _queueRunner.isIdle()))
{
/*
iterate over consumers and if any is at the end of the queue and can deliver this message, then deliver the message
*/
QueueConsumerList.ConsumerNode<E,Q,L> node = _consumerList.getMarkedNode();
QueueConsumerList.ConsumerNode<E,Q,L> nextNode = node.findNext();
if (nextNode == null)
{
nextNode = _consumerList.getHead().findNext();
}
while (nextNode != null)
{
if (_consumerList.updateMarkedNode(node, nextNode))
{
break;
}
else
{
node = _consumerList.getMarkedNode();
nextNode = node.findNext();
if (nextNode == null)
{
nextNode = _consumerList.getHead().findNext();
}
}
}
// always do one extra loop after we believe we've finished
// this catches the case where we *just* miss an update
int loops = 2;
while (entry.isAvailable() && loops != 0)
{
if (nextNode == null)
{
loops--;
nextNode = _consumerList.getHead();
}
else
{
// if consumer at end, and active, offer
QueueConsumer<?,E,Q,L> sub = nextNode.getConsumer();
deliverToConsumer(sub, entry);
}
nextNode = nextNode.findNext();
}
}
if (entry.isAvailable())
{
checkConsumersNotAheadOfDelivery(entry);
if (exclusiveSub != null)
{
deliverAsync(exclusiveSub);
}
else
{
deliverAsync();
}
}
checkForNotification(entry.getMessage());
if(action != null)
{
action.performAction(entry);
}
}
private void deliverToConsumer(final QueueConsumer<?,E,Q,L> sub, final E entry)
{
if(sub.trySendLock())
{
try
{
if (!sub.isSuspended()
&& consumerReadyAndHasInterest(sub, entry)
&& mightAssign(sub, entry)
&& !sub.wouldSuspend(entry))
{
if (sub.acquires() && !assign(sub, entry))
{
// restore credit here that would have been taken away by wouldSuspend since we didn't manage
// to acquire the entry for this consumer
sub.restoreCredit(entry);
}
else
{
deliverMessage(sub, entry, false);
}
}
}
finally
{
sub.releaseSendLock();
}
}
}
private boolean assign(final QueueConsumer<?,E,Q,L> sub, final E entry)
{
if(_messageGroupManager == null)
{
//no grouping, try to acquire immediately.
return entry.acquire(sub);
}
else
{
//the group manager is responsible for acquiring the message if/when appropriate
return _messageGroupManager.acceptMessage(sub, entry);
}
}
private boolean mightAssign(final QueueConsumer<?,E,Q,L> sub, final E entry)
{
if(_messageGroupManager == null || !sub.acquires())
{
return true;
}
QueueConsumer assigned = _messageGroupManager.getAssignedConsumer(entry);
return (assigned == null) || (assigned == sub);
}
protected void checkConsumersNotAheadOfDelivery(final E entry)
{
// This method is only required for queues which mess with ordering
// Simple Queues don't :-)
}
private void incrementQueueSize(final ServerMessage message)
{
long size = message.getSize();
getAtomicQueueSize().addAndGet(size);
_enqueueCount.incrementAndGet();
_enqueueSize.addAndGet(size);
if(message.isPersistent() && isDurable())
{
_persistentMessageEnqueueSize.addAndGet(size);
_persistentMessageEnqueueCount.incrementAndGet();
}
}
public long getTotalDequeueCount()
{
return _dequeueCount.get();
}
public long getTotalEnqueueCount()
{
return _enqueueCount.get();
}
private void incrementQueueCount()
{
getAtomicQueueCount().incrementAndGet();
}
private void deliverMessage(final QueueConsumer<?,E,Q,L> sub, final E entry, boolean batch)
{
setLastSeenEntry(sub, entry);
_deliveredMessages.incrementAndGet();
incrementUnackedMsgCount(entry);
sub.send(entry, batch);
}
private boolean consumerReadyAndHasInterest(final QueueConsumer<?,E,Q,L> sub, final E entry)
{
return sub.hasInterest(entry) && (getNextAvailableEntry(sub) == entry);
}
private void setLastSeenEntry(final QueueConsumer<?,E,Q,L> sub, final E entry)
{
QueueContext<E,Q,L> subContext = sub.getQueueContext();
if (subContext != null)
{
E releasedEntry = subContext.getReleasedEntry();
QueueContext._lastSeenUpdater.set(subContext, entry);
if(releasedEntry == entry)
{
QueueContext._releasedUpdater.compareAndSet(subContext, releasedEntry, null);
}
}
}
private void updateSubRequeueEntry(final QueueConsumer<?,E,Q,L> sub, final E entry)
{
QueueContext<E,Q,L> subContext = sub.getQueueContext();
if(subContext != null)
{
E oldEntry;
while((oldEntry = subContext.getReleasedEntry()) == null || oldEntry.compareTo(entry) > 0)
{
if(QueueContext._releasedUpdater.compareAndSet(subContext, oldEntry, entry))
{
break;
}
}
}
}
public void requeue(E entry)
{
QueueConsumerList.ConsumerNodeIterator<E,Q,L> subscriberIter = _consumerList.iterator();
// iterate over all the subscribers, and if they are in advance of this queue entry then move them backwards
while (subscriberIter.advance() && entry.isAvailable())
{
QueueConsumer<?,E,Q,L> sub = subscriberIter.getNode().getConsumer();
// we don't make browsers send the same stuff twice
if (sub.seesRequeues())
{
updateSubRequeueEntry(sub, entry);
}
}
deliverAsync();
}
@Override
public void dequeue(E entry)
{
decrementQueueCount();
decrementQueueSize(entry);
if (entry.acquiredByConsumer())
{
_deliveredMessages.decrementAndGet();
}
checkCapacity();
}
private void decrementQueueSize(final E entry)
{
final ServerMessage message = entry.getMessage();
long size = message.getSize();
getAtomicQueueSize().addAndGet(-size);
_dequeueSize.addAndGet(size);
if(message.isPersistent() && isDurable())
{
_persistentMessageDequeueSize.addAndGet(size);
_persistentMessageDequeueCount.incrementAndGet();
}
}
void decrementQueueCount()
{
getAtomicQueueCount().decrementAndGet();
_dequeueCount.incrementAndGet();
}
public boolean resend(final E entry, final QueueConsumer<?,E,Q,L> consumer)
{
/* TODO : This is wrong as the consumer may be suspended, we should instead change the state of the message
entry to resend and move back the consumer pointer. */
consumer.getSendLock();
try
{
if (!consumer.isClosed())
{
deliverMessage(consumer, entry, false);
return true;
}
else
{
return false;
}
}
finally
{
consumer.releaseSendLock();
}
}
public int getConsumerCount()
{
return _consumerList.size();
}
public int getActiveConsumerCount()
{
return _activeSubscriberCount.get();
}
public boolean isUnused()
{
return getConsumerCount() == 0;
}
public boolean isEmpty()
{
return getMessageCount() == 0;
}
public int getMessageCount()
{
return getAtomicQueueCount().get();
}
public long getQueueDepth()
{
return getAtomicQueueSize().get();
}
public int getUndeliveredMessageCount()
{
int count = getMessageCount() - _deliveredMessages.get();
if (count < 0)
{
return 0;
}
else
{
return count;
}
}
public long getReceivedMessageCount()
{
return _totalMessagesReceived.get();
}
public long getOldestMessageArrivalTime()
{
E entry = getOldestQueueEntry();
return entry == null ? Long.MAX_VALUE : entry.getMessage().getArrivalTime();
}
protected E getOldestQueueEntry()
{
return _entries.next(_entries.getHead());
}
public boolean isDeleted()
{
return _deleted.get();
}
public List<E> getMessagesOnTheQueue()
{
ArrayList<E> entryList = new ArrayList<E>();
QueueEntryIterator<E,Q,L,QueueConsumer<?,E,Q,L>> queueListIterator = _entries.iterator();
while (queueListIterator.advance())
{
E node = queueListIterator.getNode();
if (node != null && !node.isDeleted())
{
entryList.add(node);
}
}
return entryList;
}
public void stateChanged(QueueConsumer<?,E,Q,L> sub, QueueConsumer.State oldState, QueueConsumer.State newState)
{
if (oldState == QueueConsumer.State.ACTIVE && newState != QueueConsumer.State.ACTIVE)
{
_activeSubscriberCount.decrementAndGet();
}
else if (newState == QueueConsumer.State.ACTIVE)
{
if (oldState != QueueConsumer.State.ACTIVE)
{
_activeSubscriberCount.incrementAndGet();
}
deliverAsync(sub);
}
}
public int compareTo(final Q o)
{
return _name.compareTo(o.getName());
}
public AtomicInteger getAtomicQueueCount()
{
return _atomicQueueCount;
}
public AtomicLong getAtomicQueueSize()
{
return _atomicQueueSize;
}
public boolean hasExclusiveConsumer()
{
return _exclusiveSubscriber != null;
}
private void setExclusiveSubscriber(QueueConsumer<?,E,Q,L> exclusiveSubscriber)
{
_exclusiveSubscriber = exclusiveSubscriber;
}
long getStateChangeCount()
{
return _stateChangeCount.get();
}
/** Used to track bindings to exchanges so that on deletion they can easily be cancelled. */
protected L getEntries()
{
return _entries;
}
protected QueueConsumerList<E,Q,L> getConsumerList()
{
return _consumerList;
}
public static interface QueueEntryFilter<E extends QueueEntry>
{
public boolean accept(E entry);
public boolean filterComplete();
}
public List<E> getMessagesOnTheQueue(final long fromMessageId, final long toMessageId)
{
return getMessagesOnTheQueue(new QueueEntryFilter<E>()
{
public boolean accept(E entry)
{
final long messageId = entry.getMessage().getMessageNumber();
return messageId >= fromMessageId && messageId <= toMessageId;
}
public boolean filterComplete()
{
return false;
}
});
}
public E getMessageOnTheQueue(final long messageId)
{
List<E> entries = getMessagesOnTheQueue(new QueueEntryFilter<E>()
{
private boolean _complete;
public boolean accept(E entry)
{
_complete = entry.getMessage().getMessageNumber() == messageId;
return _complete;
}
public boolean filterComplete()
{
return _complete;
}
});
return entries.isEmpty() ? null : entries.get(0);
}
public List<E> getMessagesOnTheQueue(QueueEntryFilter<E> filter)
{
ArrayList<E> entryList = new ArrayList<E>();
QueueEntryIterator<E,Q,L,QueueConsumer<?,E,Q,L>> queueListIterator = _entries.iterator();
while (queueListIterator.advance() && !filter.filterComplete())
{
E node = queueListIterator.getNode();
if (!node.isDeleted() && filter.accept(node))
{
entryList.add(node);
}
}
return entryList;
}
public void visit(final QueueEntryVisitor<E> visitor)
{
QueueEntryIterator<E,Q,L,QueueConsumer<?,E,Q,L>> queueListIterator = _entries.iterator();
while(queueListIterator.advance())
{
E node = queueListIterator.getNode();
if(!node.isDeleted())
{
if(visitor.visit(node))
{
break;
}
}
}
}
/**
* Returns a list of QueEntries from a given range of queue positions, eg messages 5 to 10 on the queue.
*
* The 'queue position' index starts from 1. Using 0 in 'from' will be ignored and continue from 1.
* Using 0 in the 'to' field will return an empty list regardless of the 'from' value.
* @param fromPosition first message position
* @param toPosition last message position
* @return list of messages
*/
public List<E> getMessagesRangeOnTheQueue(final long fromPosition, final long toPosition)
{
return getMessagesOnTheQueue(new QueueEntryFilter<E>()
{
private long position = 0;
public boolean accept(E entry)
{
position++;
return (position >= fromPosition) && (position <= toPosition);
}
public boolean filterComplete()
{
return position >= toPosition;
}
});
}
public void purge(final long request) throws QpidSecurityException
{
clear(request);
}
public long getCreateTime()
{
return _createTime;
}
// ------ Management functions
// TODO - now only used by the tests
public void deleteMessageFromTop()
{
QueueEntryIterator<E,Q,L,QueueConsumer<?,E,Q,L>> queueListIterator = _entries.iterator();
boolean noDeletes = true;
while (noDeletes && queueListIterator.advance())
{
E node = queueListIterator.getNode();
if (node.acquire())
{
dequeueEntry(node);
noDeletes = false;
}
}
}
public long clearQueue() throws QpidSecurityException
{
return clear(0l);
}
private long clear(final long request) throws QpidSecurityException
{
//Perform ACLs
if (!getVirtualHost().getSecurityManager().authorisePurge(this))
{
throw new QpidSecurityException("Permission denied: queue " + getName());
}
QueueEntryIterator<E,Q,L,QueueConsumer<?,E,Q,L>> queueListIterator = _entries.iterator();
long count = 0;
ServerTransaction txn = new LocalTransaction(getVirtualHost().getMessageStore());
while (queueListIterator.advance())
{
E node = queueListIterator.getNode();
if (node.acquire())
{
dequeueEntry(node, txn);
if(++count == request)
{
break;
}
}
}
txn.commit();
return count;
}
private void dequeueEntry(final E node)
{
ServerTransaction txn = new AutoCommitTransaction(getVirtualHost().getMessageStore());
dequeueEntry(node, txn);
}
private void dequeueEntry(final E node, ServerTransaction txn)
{
txn.dequeue(this, node.getMessage(),
new ServerTransaction.Action()
{
public void postCommit()
{
node.delete();
}
public void onRollback()
{
}
});
}
@Override
public void addDeleteTask(final Action<? super Q> task)
{
_deleteTaskList.add(task);
}
@Override
public void removeDeleteTask(final Action<? super Q> task)
{
_deleteTaskList.remove(task);
}
// TODO list all thrown exceptions
public int delete() throws QpidSecurityException
{
// Check access
if (!_virtualHost.getSecurityManager().authoriseDelete(this))
{
throw new QpidSecurityException("Permission denied: " + getName());
}
if (!_deleted.getAndSet(true))
{
final ArrayList<Binding> bindingCopy = new ArrayList<Binding>(_bindings);
for (Binding b : bindingCopy)
{
b.getExchange().removeBinding(b);
}
QueueConsumerList.ConsumerNodeIterator consumerNodeIterator = _consumerList.iterator();
while (consumerNodeIterator.advance())
{
QueueConsumer s = consumerNodeIterator.getNode().getConsumer();
if (s != null)
{
s.queueDeleted();
}
}
List<E> entries = getMessagesOnTheQueue(new QueueEntryFilter<E>()
{
public boolean accept(E entry)
{
return entry.acquire();
}
public boolean filterComplete()
{
return false;
}
});
ServerTransaction txn = new LocalTransaction(getVirtualHost().getMessageStore());
for(final E entry : entries)
{
// TODO log requeues with a post enqueue action
int requeues = entry.routeToAlternate(null, txn);
if(requeues == 0)
{
// TODO log discard
}
}
txn.commit();
if(_alternateExchange != null)
{
_alternateExchange.removeReference(this);
}
for (Action<? super Q> task : _deleteTaskList)
{
task.performAction((Q)this);
}
_deleteTaskList.clear();
stop();
//Log Queue Deletion
CurrentActor.get().message(_logSubject, QueueMessages.DELETED());
}
return getMessageCount();
}
public void stop()
{
if (!_stopped.getAndSet(true))
{
ReferenceCountingExecutorService.getInstance().releaseExecutorService();
}
}
public void checkCapacity(AMQSessionModel channel)
{
if(_capacity != 0l)
{
if(_atomicQueueSize.get() > _capacity)
{
_overfull.set(true);
//Overfull log message
_logActor.message(_logSubject, QueueMessages.OVERFULL(_atomicQueueSize.get(), _capacity));
_blockedChannels.add(channel);
channel.block(this);
if(_atomicQueueSize.get() <= _flowResumeCapacity)
{
//Underfull log message
_logActor.message(_logSubject, QueueMessages.UNDERFULL(_atomicQueueSize.get(), _flowResumeCapacity));
channel.unblock(this);
_blockedChannels.remove(channel);
}
}
}
}
private void checkCapacity()
{
if(_capacity != 0L)
{
if(_overfull.get() && _atomicQueueSize.get() <= _flowResumeCapacity)
{
if(_overfull.compareAndSet(true,false))
{//Underfull log message
_logActor.message(_logSubject, QueueMessages.UNDERFULL(_atomicQueueSize.get(), _flowResumeCapacity));
}
for(final AMQSessionModel blockedChannel : _blockedChannels)
{
blockedChannel.unblock(this);
_blockedChannels.remove(blockedChannel);
}
}
}
}
private QueueRunner _queueRunner = new QueueRunner(this);
public void deliverAsync()
{
_stateChangeCount.incrementAndGet();
_queueRunner.execute(_asyncDelivery);
}
public void deliverAsync(QueueConsumer<?,E,Q,L> sub)
{
if(_exclusiveSubscriber == null)
{
deliverAsync();
}
else
{
SubFlushRunner flusher = sub.getRunner();
flusher.execute(_asyncDelivery);
}
}
void flushConsumer(QueueConsumer<?,E,Q,L> sub)
{
flushConsumer(sub, Long.MAX_VALUE);
}
boolean flushConsumer(QueueConsumer<?,E,Q,L> sub, long iterations)
{
boolean atTail = false;
final boolean keepSendLockHeld = iterations <= SimpleAMQQueue.MAX_ASYNC_DELIVERIES;
boolean queueEmpty = false;
try
{
if(keepSendLockHeld)
{
sub.getSendLock();
}
while (!sub.isSuspended() && !atTail && iterations != 0)
{
try
{
if(!keepSendLockHeld)
{
sub.getSendLock();
}
atTail = attemptDelivery(sub, true);
if (atTail && getNextAvailableEntry(sub) == null)
{
queueEmpty = true;
}
else if (!atTail)
{
iterations--;
}
}
finally
{
if(!keepSendLockHeld)
{
sub.releaseSendLock();
}
}
}
}
finally
{
if(keepSendLockHeld)
{
sub.releaseSendLock();
}
if(queueEmpty)
{
sub.queueEmpty();
}
sub.flushBatched();
}
// if there's (potentially) more than one consumer the others will potentially not have been advanced to the
// next entry they are interested in yet. This would lead to holding on to references to expired messages, etc
// which would give us memory "leak".
if (!hasExclusiveConsumer())
{
advanceAllConsumers();
}
return atTail;
}
/**
* Attempt delivery for the given consumer.
*
* Looks up the next node for the consumer and attempts to deliver it.
*
*
* @param sub the consumer
* @param batch true if processing can be batched
* @return true if we have completed all possible deliveries for this sub.
*/
private boolean attemptDelivery(QueueConsumer<?,E,Q,L> sub, boolean batch)
{
boolean atTail = false;
boolean subActive = sub.isActive() && !sub.isSuspended();
if (subActive)
{
E node = getNextAvailableEntry(sub);
if (node != null && node.isAvailable())
{
if (sub.hasInterest(node) && mightAssign(sub, node))
{
if (!sub.wouldSuspend(node))
{
if (sub.acquires() && !assign(sub, node))
{
// restore credit here that would have been taken away by wouldSuspend since we didn't manage
// to acquire the entry for this consumer
sub.restoreCredit(node);
}
else
{
deliverMessage(sub, node, batch);
}
}
else // Not enough Credit for message and wouldSuspend
{
//QPID-1187 - Treat the consumer as suspended for this message
// and wait for the message to be removed to continue delivery.
subActive = false;
node.addStateChangeListener(new QueueEntryListener(sub));
}
}
}
atTail = (node == null) || (_entries.next(node) == null);
}
return atTail || !subActive;
}
protected void advanceAllConsumers()
{
QueueConsumerList.ConsumerNodeIterator<E,Q,L> consumerNodeIterator = _consumerList.iterator();
while (consumerNodeIterator.advance())
{
QueueConsumerList.ConsumerNode<E,Q,L> subNode = consumerNodeIterator.getNode();
QueueConsumer<?,E,Q,L> sub = subNode.getConsumer();
if(sub.acquires())
{
getNextAvailableEntry(sub);
}
else
{
// TODO
}
}
}
private E getNextAvailableEntry(final QueueConsumer<?,E,Q,L> sub)
{
QueueContext<E,Q,L> context = sub.getQueueContext();
if(context != null)
{
E lastSeen = context.getLastSeenEntry();
E releasedNode = context.getReleasedEntry();
E node = (releasedNode != null && lastSeen.compareTo(releasedNode)>=0) ? releasedNode : _entries.next(lastSeen);
boolean expired = false;
while (node != null && (!node.isAvailable() || (expired = node.expired()) || !sub.hasInterest(node) ||
!mightAssign(sub,node)))
{
if (expired)
{
expired = false;
if (node.acquire())
{
dequeueEntry(node);
}
}
if(QueueContext._lastSeenUpdater.compareAndSet(context, lastSeen, node))
{
QueueContext._releasedUpdater.compareAndSet(context, releasedNode, null);
}
lastSeen = context.getLastSeenEntry();
releasedNode = context.getReleasedEntry();
node = (releasedNode != null && lastSeen.compareTo(releasedNode)>0) ? releasedNode : _entries.next(lastSeen);
}
return node;
}
else
{
return null;
}
}
public boolean isEntryAheadOfConsumer(E entry, QueueConsumer<?,E,Q,L> sub)
{
QueueContext<E,Q,L> context = sub.getQueueContext();
if(context != null)
{
E releasedNode = context.getReleasedEntry();
return releasedNode != null && releasedNode.compareTo(entry) < 0;
}
else
{
return false;
}
}
/**
* Used by queue Runners to asynchronously deliver messages to consumers.
*
* A queue Runner is started whenever a state change occurs, e.g when a new
* message arrives on the queue and cannot be immediately delivered to a
* consumer (i.e. asynchronous delivery is required). Unless there are
* SubFlushRunners operating (due to consumers unsuspending) which are
* capable of accepting/delivering all messages then these messages would
* otherwise remain on the queue.
*
* processQueue should be running while there are messages on the queue AND
* there are consumers that can deliver them. If there are no
* consumers capable of delivering the remaining messages on the queue
* then processQueue should stop to prevent spinning.
*
* Since processQueue is runs in a fixed size Executor, it should not run
* indefinitely to prevent starving other tasks of CPU (e.g jobs to process
* incoming messages may not be able to be scheduled in the thread pool
* because all threads are working on clearing down large queues). To solve
* this problem, after an arbitrary number of message deliveries the
* processQueue job stops iterating, resubmits itself to the executor, and
* ends the current instance
*
* @param runner the Runner to schedule
*/
public long processQueue(QueueRunner runner)
{
long stateChangeCount;
long previousStateChangeCount = Long.MIN_VALUE;
long rVal = Long.MIN_VALUE;
boolean deliveryIncomplete = true;
boolean lastLoop = false;
int iterations = MAX_ASYNC_DELIVERIES;
final int numSubs = _consumerList.size();
final int perSub = Math.max(iterations / Math.max(numSubs,1), 1);
// For every message enqueue/requeue the we fire deliveryAsync() which
// increases _stateChangeCount. If _sCC changes whilst we are in our loop
// (detected by setting previousStateChangeCount to stateChangeCount in the loop body)
// then we will continue to run for a maximum of iterations.
// So whilst delivery/rejection is going on a processQueue thread will be running
while (iterations != 0 && ((previousStateChangeCount != (stateChangeCount = _stateChangeCount.get())) || deliveryIncomplete))
{
// we want to have one extra loop after every consumer has reached the point where it cannot move
// further, just in case the advance of one consumer in the last loop allows a different consumer to
// move forward in the next iteration
if (previousStateChangeCount != stateChangeCount)
{
//further asynchronous delivery is required since the
//previous loop. keep going if iteration slicing allows.
lastLoop = false;
rVal = stateChangeCount;
}
previousStateChangeCount = stateChangeCount;
boolean allConsumersDone = true;
boolean consumerDone;
QueueConsumerList.ConsumerNodeIterator<E,Q,L> consumerNodeIterator = _consumerList.iterator();
//iterate over the subscribers and try to advance their pointer
while (consumerNodeIterator.advance())
{
QueueConsumer<?,E,Q,L> sub = consumerNodeIterator.getNode().getConsumer();
sub.getSendLock();
try
{
for(int i = 0 ; i < perSub; i++)
{
//attempt delivery. returns true if no further delivery currently possible to this sub
consumerDone = attemptDelivery(sub, true);
if (consumerDone)
{
sub.flushBatched();
if (lastLoop && !sub.isSuspended())
{
sub.queueEmpty();
}
break;
}
else
{
//this consumer can accept additional deliveries, so we must
//keep going after this (if iteration slicing allows it)
allConsumersDone = false;
lastLoop = false;
if(--iterations == 0)
{
sub.flushBatched();
break;
}
}
}
sub.flushBatched();
}
finally
{
sub.releaseSendLock();
}
}
if(allConsumersDone && lastLoop)
{
//We have done an extra loop already and there are again
//again no further delivery attempts possible, only
//keep going if state change demands it.
deliveryIncomplete = false;
}
else if(allConsumersDone)
{
//All consumers reported being done, but we have to do
//an extra loop if the iterations are not exhausted and
//there is still any work to be done
deliveryIncomplete = _consumerList.size() != 0;
lastLoop = true;
}
else
{
//some consumers can still accept more messages,
//keep going if iteration count allows.
lastLoop = false;
deliveryIncomplete = true;
}
}
// If iterations == 0 then the limiting factor was the time-slicing rather than available messages or credit
// therefore we should schedule this runner again (unless someone beats us to it :-) ).
if (iterations == 0)
{
if (_logger.isDebugEnabled())
{
_logger.debug("Rescheduling runner:" + runner);
}
return 0L;
}
return rVal;
}
public void checkMessageStatus()
{
QueueEntryIterator<E,Q,L,QueueConsumer<?,E,Q,L>> queueListIterator = _entries.iterator();
while (queueListIterator.advance())
{
E node = queueListIterator.getNode();
// Only process nodes that are not currently deleted and not dequeued
if (!node.isDeleted())
{
// If the node has expired then acquire it
if (node.expired() && node.acquire())
{
if (_logger.isDebugEnabled())
{
_logger.debug("Dequeuing expired node " + node);
}
// Then dequeue it.
dequeueEntry(node);
}
else
{
// There is a chance that the node could be deleted by
// the time the check actually occurs. So verify we
// can actually get the message to perform the check.
ServerMessage msg = node.getMessage();
if (msg != null)
{
checkForNotification(msg);
}
}
}
}
}
public long getMinimumAlertRepeatGap()
{
return _minimumAlertRepeatGap;
}
public void setMinimumAlertRepeatGap(long minimumAlertRepeatGap)
{
_minimumAlertRepeatGap = minimumAlertRepeatGap;
}
public long getMaximumMessageAge()
{
return _maximumMessageAge;
}
public void setMaximumMessageAge(long maximumMessageAge)
{
_maximumMessageAge = maximumMessageAge;
if (maximumMessageAge == 0L)
{
_notificationChecks.remove(NotificationCheck.MESSAGE_AGE_ALERT);
}
else
{
_notificationChecks.add(NotificationCheck.MESSAGE_AGE_ALERT);
}
}
public long getMaximumMessageCount()
{
return _maximumMessageCount;
}
public void setMaximumMessageCount(final long maximumMessageCount)
{
_maximumMessageCount = maximumMessageCount;
if (maximumMessageCount == 0L)
{
_notificationChecks.remove(NotificationCheck.MESSAGE_COUNT_ALERT);
}
else
{
_notificationChecks.add(NotificationCheck.MESSAGE_COUNT_ALERT);
}
}
public long getMaximumQueueDepth()
{
return _maximumQueueDepth;
}
// Sets the queue depth, the max queue size
public void setMaximumQueueDepth(final long maximumQueueDepth)
{
_maximumQueueDepth = maximumQueueDepth;
if (maximumQueueDepth == 0L)
{
_notificationChecks.remove(NotificationCheck.QUEUE_DEPTH_ALERT);
}
else
{
_notificationChecks.add(NotificationCheck.QUEUE_DEPTH_ALERT);
}
}
public long getMaximumMessageSize()
{
return _maximumMessageSize;
}
public void setMaximumMessageSize(final long maximumMessageSize)
{
_maximumMessageSize = maximumMessageSize;
if (maximumMessageSize == 0L)
{
_notificationChecks.remove(NotificationCheck.MESSAGE_SIZE_ALERT);
}
else
{
_notificationChecks.add(NotificationCheck.MESSAGE_SIZE_ALERT);
}
}
public long getCapacity()
{
return _capacity;
}
public void setCapacity(long capacity)
{
_capacity = capacity;
}
public long getFlowResumeCapacity()
{
return _flowResumeCapacity;
}
public void setFlowResumeCapacity(long flowResumeCapacity)
{
_flowResumeCapacity = flowResumeCapacity;
checkCapacity();
}
public boolean isOverfull()
{
return _overfull.get();
}
public Set<NotificationCheck> getNotificationChecks()
{
return _notificationChecks;
}
private static class DeleteDeleteTask implements Action<Deletable>
{
private final Deletable<? extends Deletable> _lifetimeObject;
private final Action<? super Deletable> _deleteQueueOwnerTask;
public DeleteDeleteTask(final Deletable<? extends Deletable> lifetimeObject,
final Action<? super Deletable> deleteQueueOwnerTask)
{
_lifetimeObject = lifetimeObject;
_deleteQueueOwnerTask = deleteQueueOwnerTask;
}
@Override
public void performAction(final Deletable object)
{
_lifetimeObject.removeDeleteTask(_deleteQueueOwnerTask);
}
}
private final class QueueEntryListener implements StateChangeListener<E, QueueEntry.State>
{
private final QueueConsumer<?,E,Q,L> _sub;
public QueueEntryListener(final QueueConsumer<?,E,Q,L> sub)
{
_sub = sub;
}
public boolean equals(Object o)
{
return o instanceof SimpleAMQQueue.QueueEntryListener
&& _sub == ((QueueEntryListener) o)._sub;
}
public int hashCode()
{
return System.identityHashCode(_sub);
}
public void stateChanged(E entry, QueueEntry.State oldSate, QueueEntry.State newState)
{
entry.removeStateChangeListener(this);
deliverAsync(_sub);
}
}
public List<Long> getMessagesOnTheQueue(int num)
{
return getMessagesOnTheQueue(num, 0);
}
public List<Long> getMessagesOnTheQueue(int num, int offset)
{
ArrayList<Long> ids = new ArrayList<Long>(num);
QueueEntryIterator it = _entries.iterator();
for (int i = 0; i < offset; i++)
{
it.advance();
}
for (int i = 0; i < num && !it.atTail(); i++)
{
it.advance();
ids.add(it.getNode().getMessage().getMessageNumber());
}
return ids;
}
public long getTotalEnqueueSize()
{
return _enqueueSize.get();
}
public long getTotalDequeueSize()
{
return _dequeueSize.get();
}
public long getPersistentByteEnqueues()
{
return _persistentMessageEnqueueSize.get();
}
public long getPersistentByteDequeues()
{
return _persistentMessageDequeueSize.get();
}
public long getPersistentMsgEnqueues()
{
return _persistentMessageEnqueueCount.get();
}
public long getPersistentMsgDequeues()
{
return _persistentMessageDequeueCount.get();
}
@Override
public String toString()
{
return getName();
}
public long getUnackedMessageCount()
{
return _unackedMsgCount.get();
}
public long getUnackedMessageBytes()
{
return _unackedMsgBytes.get();
}
public void decrementUnackedMsgCount(E queueEntry)
{
_unackedMsgCount.decrementAndGet();
_unackedMsgBytes.addAndGet(-queueEntry.getSize());
}
private void incrementUnackedMsgCount(E entry)
{
_unackedMsgCount.incrementAndGet();
_unackedMsgBytes.addAndGet(entry.getSize());
}
public LogActor getLogActor()
{
return _logActor;
}
public int getMaximumDeliveryCount()
{
return _maximumDeliveryCount;
}
public void setMaximumDeliveryCount(final int maximumDeliveryCount)
{
_maximumDeliveryCount = maximumDeliveryCount;
}
/**
* Checks if there is any notification to send to the listeners
*/
private void checkForNotification(ServerMessage<?> msg)
{
final Set<NotificationCheck> notificationChecks = getNotificationChecks();
final AMQQueue.NotificationListener listener = _notificationListener;
if(listener != null && !notificationChecks.isEmpty())
{
final long currentTime = System.currentTimeMillis();
final long thresholdTime = currentTime - getMinimumAlertRepeatGap();
for (NotificationCheck check : notificationChecks)
{
if (check.isMessageSpecific() || (_lastNotificationTimes[check.ordinal()] < thresholdTime))
{
if (check.notifyIfNecessary(msg, this, listener))
{
_lastNotificationTimes[check.ordinal()] = currentTime;
}
}
}
}
}
public void setNotificationListener(AMQQueue.NotificationListener listener)
{
_notificationListener = listener;
}
@Override
public void setDescription(String description)
{
_description = description;
}
@Override
public String getDescription()
{
return _description;
}
public final <M extends ServerMessage<? extends StorableMessageMetaData>> int send(final M message,
final InstanceProperties instanceProperties,
final ServerTransaction txn,
final Action<? super MessageInstance<?, ? extends Consumer>> postEnqueueAction)
{
txn.enqueue(this,message, new ServerTransaction.Action()
{
MessageReference _reference = message.newReference();
public void postCommit()
{
try
{
SimpleAMQQueue.this.enqueue(message, postEnqueueAction);
}
finally
{
_reference.release();
}
}
public void onRollback()
{
_reference.release();
}
});
return 1;
}
@Override
public boolean verifySessionAccess(final AMQSessionModel<?, ?> session)
{
boolean allowed;
switch(_exclusivityPolicy)
{
case NONE:
allowed = true;
break;
case SESSION:
allowed = _exclusiveOwner == null || _exclusiveOwner == session;
break;
case CONNECTION:
allowed = _exclusiveOwner == null || _exclusiveOwner == session.getConnectionModel();
break;
case PRINCIPAL:
allowed = _exclusiveOwner == null || _exclusiveOwner.equals(session.getConnectionModel().getAuthorizedPrincipal());
break;
case CONTAINER:
allowed = _exclusiveOwner == null || _exclusiveOwner.equals(session.getConnectionModel().getRemoteContainerName());
break;
case LINK:
allowed = _exclusiveSubscriber == null || _exclusiveSubscriber.getSessionModel() == session;
break;
default:
throw new ServerScopedRuntimeException("Unknown exclusivity policy " + _exclusivityPolicy);
}
return allowed;
}
@Override
public synchronized void setExclusivityPolicy(final ExclusivityPolicy desiredPolicy)
throws ExistingConsumerPreventsExclusive
{
if(desiredPolicy != _exclusivityPolicy && !(desiredPolicy == null && _exclusivityPolicy == ExclusivityPolicy.NONE))
{
switch(desiredPolicy)
{
case NONE:
_exclusiveOwner = null;
break;
case PRINCIPAL:
switchToPrincipalExclusivity();
break;
case CONTAINER:
switchToContainerExclusivity();
break;
case CONNECTION:
switchToConnectionExclusivity();
break;
case SESSION:
switchToSessionExclusivity();
break;
case LINK:
switchToLinkExclusivity();
break;
}
_exclusivityPolicy = desiredPolicy;
}
}
private void switchToLinkExclusivity() throws ExistingConsumerPreventsExclusive
{
switch (getConsumerCount())
{
case 1:
_exclusiveSubscriber = getConsumerList().getHead().getConsumer();
// deliberate fall through
case 0:
_exclusiveOwner = null;
break;
default:
throw new ExistingConsumerPreventsExclusive();
}
}
private void switchToSessionExclusivity() throws ExistingConsumerPreventsExclusive
{
switch(_exclusivityPolicy)
{
case NONE:
case PRINCIPAL:
case CONTAINER:
case CONNECTION:
AMQSessionModel session = null;
for(Consumer c : getConsumers())
{
if(session == null)
{
session = c.getSessionModel();
}
else if(!session.equals(c.getSessionModel()))
{
throw new ExistingConsumerPreventsExclusive();
}
}
_exclusiveOwner = session;
break;
case LINK:
_exclusiveOwner = _exclusiveSubscriber == null ? null : _exclusiveSubscriber.getSessionModel().getConnectionModel();
}
}
private void switchToConnectionExclusivity() throws ExistingConsumerPreventsExclusive
{
switch(_exclusivityPolicy)
{
case NONE:
case CONTAINER:
case PRINCIPAL:
AMQConnectionModel con = null;
for(Consumer c : getConsumers())
{
if(con == null)
{
con = c.getSessionModel().getConnectionModel();
}
else if(!con.equals(c.getSessionModel().getConnectionModel()))
{
throw new ExistingConsumerPreventsExclusive();
}
}
_exclusiveOwner = con;
break;
case SESSION:
_exclusiveOwner = _exclusiveOwner == null ? null : ((AMQSessionModel)_exclusiveOwner).getConnectionModel();
break;
case LINK:
_exclusiveOwner = _exclusiveSubscriber == null ? null : _exclusiveSubscriber.getSessionModel().getConnectionModel();
}
}
private void switchToContainerExclusivity() throws ExistingConsumerPreventsExclusive
{
switch(_exclusivityPolicy)
{
case NONE:
case PRINCIPAL:
String containerID = null;
for(Consumer c : getConsumers())
{
if(containerID == null)
{
containerID = c.getSessionModel().getConnectionModel().getRemoteContainerName();
}
else if(!containerID.equals(c.getSessionModel().getConnectionModel().getRemoteContainerName()))
{
throw new ExistingConsumerPreventsExclusive();
}
}
_exclusiveOwner = containerID;
break;
case CONNECTION:
_exclusiveOwner = _exclusiveOwner == null ? null : ((AMQConnectionModel)_exclusiveOwner).getRemoteContainerName();
break;
case SESSION:
_exclusiveOwner = _exclusiveOwner == null ? null : ((AMQSessionModel)_exclusiveOwner).getConnectionModel().getRemoteContainerName();
break;
case LINK:
_exclusiveOwner = _exclusiveSubscriber == null ? null : _exclusiveSubscriber.getSessionModel().getConnectionModel().getRemoteContainerName();
}
}
private void switchToPrincipalExclusivity() throws ExistingConsumerPreventsExclusive
{
switch(_exclusivityPolicy)
{
case NONE:
case CONTAINER:
Principal principal = null;
for(Consumer c : getConsumers())
{
if(principal == null)
{
principal = c.getSessionModel().getConnectionModel().getAuthorizedPrincipal();
}
else if(!principal.equals(c.getSessionModel().getConnectionModel().getAuthorizedPrincipal()))
{
throw new ExistingConsumerPreventsExclusive();
}
}
_exclusiveOwner = principal;
break;
case CONNECTION:
_exclusiveOwner = _exclusiveOwner == null ? null : ((AMQConnectionModel)_exclusiveOwner).getAuthorizedPrincipal();
break;
case SESSION:
_exclusiveOwner = _exclusiveOwner == null ? null : ((AMQSessionModel)_exclusiveOwner).getConnectionModel().getAuthorizedPrincipal();
break;
case LINK:
_exclusiveOwner = _exclusiveSubscriber == null ? null : _exclusiveSubscriber.getSessionModel().getConnectionModel().getAuthorizedPrincipal();
}
}
private class ClearOwnerAction implements Action<Deletable>
{
private final Deletable<? extends Deletable> _lifetimeObject;
private DeleteDeleteTask _deleteTask;
public ClearOwnerAction(final Deletable<? extends Deletable> lifetimeObject)
{
_lifetimeObject = lifetimeObject;
}
@Override
public void performAction(final Deletable object)
{
if(SimpleAMQQueue.this._exclusiveOwner == _lifetimeObject)
{
SimpleAMQQueue.this._exclusiveOwner = null;
}
if(_deleteTask != null)
{
removeDeleteTask(_deleteTask);
}
}
public void setDeleteTask(final DeleteDeleteTask deleteTask)
{
_deleteTask = deleteTask;
}
}
}