Revert previous change and instead ensure that next() after hasNext() is always safe
diff --git a/broker-core/src/main/java/org/apache/qpid/server/queue/AbstractQueue.java b/broker-core/src/main/java/org/apache/qpid/server/queue/AbstractQueue.java
index da161a9..2900acd 100644
--- a/broker-core/src/main/java/org/apache/qpid/server/queue/AbstractQueue.java
+++ b/broker-core/src/main/java/org/apache/qpid/server/queue/AbstractQueue.java
@@ -58,6 +58,7 @@
import javax.security.auth.Subject;
+import com.google.common.collect.Lists;
import com.google.common.io.ByteStreams;
import com.google.common.util.concurrent.FutureCallback;
import com.google.common.util.concurrent.Futures;
@@ -284,7 +285,7 @@
while (consumerIterator.hasNext())
{
QueueConsumer<?> queueConsumer = consumerIterator.next();
- if (queueConsumer != null && queueConsumer.getPriority() < highestNotifiedPriority || notifyConsumer(queueConsumer))
+ if (queueConsumer.getPriority() < highestNotifiedPriority || notifyConsumer(queueConsumer))
{
break;
}
@@ -1042,7 +1043,7 @@
@Override
public Collection<QueueConsumer<?>> getConsumers()
{
- return getQueueConsumersAsList();
+ return Lists.newArrayList(_queueConsumerManager.getAllIterator());
}
@@ -1323,7 +1324,7 @@
QueueConsumer<?> sub = consumerIterator.next();
// we don't make browsers send the same stuff twice
- if (sub != null && sub.seesRequeues())
+ if (sub.seesRequeues())
{
updateSubRequeueEntry(sub, entry);
}
@@ -1829,7 +1830,7 @@
while (nonAcquiringIterator.hasNext())
{
QueueConsumer<?> consumer = nonAcquiringIterator.next();
- if(consumer != null && consumer.hasInterest(entry))
+ if(consumer.hasInterest(entry))
{
notifyConsumer(consumer);
}
@@ -1839,7 +1840,7 @@
while (entry.isAvailable() && interestedIterator.hasNext())
{
QueueConsumer<?> consumer = interestedIterator.next();
- if(consumer != null && consumer.hasInterest(entry))
+ if(consumer.hasInterest(entry))
{
if(notifyConsumer(consumer))
{
@@ -1861,14 +1862,12 @@
while (hasAvailableMessages() && interestedIterator.hasNext())
{
QueueConsumer<?> consumer = interestedIterator.next();
- if(consumer != null)
+
+ if (excludedConsumer != consumer)
{
- if (excludedConsumer != consumer)
+ if (notifyConsumer(consumer))
{
- if (notifyConsumer(consumer))
- {
- break;
- }
+ break;
}
}
}
@@ -2015,7 +2014,7 @@
while (consumerIterator.hasNext())
{
QueueConsumer<?> consumer = consumerIterator.next();
- if(consumer != null && consumer.getPriority() > sub.getPriority())
+ if(consumer.getPriority() > sub.getPriority())
{
if(consumer.isNotifyWorkDesired()
&& consumer.acquires()
@@ -2959,26 +2958,11 @@
{
return _queueConsumerManager == null
? Collections.<C>emptySet()
- : (Collection<C>) getQueueConsumersAsList();
+ : (Collection<C>) Lists.newArrayList(_queueConsumerManager.getAllIterator());
}
else return Collections.emptySet();
}
- private List<QueueConsumer<?>> getQueueConsumersAsList()
- {
- List<QueueConsumer<?>> consumers = new ArrayList<>(_queueConsumerManager.getAllSize());
- final Iterator<QueueConsumer<?>> iter = _queueConsumerManager.getAllIterator();
- while(iter.hasNext())
- {
- final QueueConsumer<?> consumer = iter.next();
- if(consumer != null)
- {
- consumers.add(consumer);
- }
- }
- return consumers;
- }
-
@Override
protected <C extends ConfiguredObject> ListenableFuture<C> addChildAsync(final Class<C> childClass,
final Map<String, Object> attributes,
@@ -3441,7 +3425,7 @@
while (consumerIterator.hasNext() && !isDeleted())
{
QueueConsumer<?> sub = consumerIterator.next();
- if(sub != null && sub.acquires())
+ if(sub.acquires())
{
getNextAvailableEntry(sub);
}
diff --git a/broker-core/src/main/java/org/apache/qpid/server/queue/QueueConsumerManagerImpl.java b/broker-core/src/main/java/org/apache/qpid/server/queue/QueueConsumerManagerImpl.java
index 89415e8..8e69837 100644
--- a/broker-core/src/main/java/org/apache/qpid/server/queue/QueueConsumerManagerImpl.java
+++ b/broker-core/src/main/java/org/apache/qpid/server/queue/QueueConsumerManagerImpl.java
@@ -20,9 +20,11 @@
*/
package org.apache.qpid.server.queue;
+import java.util.Collections;
import java.util.EnumSet;
import java.util.Iterator;
import java.util.List;
+import java.util.NoSuchElementException;
import java.util.concurrent.CopyOnWriteArrayList;
public class QueueConsumerManagerImpl implements QueueConsumerManager
@@ -263,8 +265,7 @@
@Override
public QueueConsumer<?> next()
{
- QueueConsumerNode next = _underlying.next();
- return next == null ? null : next.getQueueConsumer();
+ return _underlying.next().getQueueConsumer();
}
@Override
@@ -347,6 +348,7 @@
private PrioritisedQueueConsumerNodeIterator(List<PriorityConsumerListPair> list)
{
_outerIterator = list.iterator();
+ _innerIterator = Collections.emptyIterator();
}
@Override
@@ -354,7 +356,7 @@
{
while (true)
{
- if (_innerIterator != null && _innerIterator.hasNext())
+ if (_innerIterator.hasNext())
{
return true;
}
@@ -373,15 +375,14 @@
@Override
public QueueConsumerNode next()
{
- if (hasNext())
+ try
{
return _innerIterator.next();
}
- else
+ catch (NoSuchElementException e)
{
- // throwing exceptions is expensive, and due to concurrency a caller might get here even though they
- // had previously checked with hasNext()
- return null;
+ hasNext();
+ return _innerIterator.next();
}
}