QPID-8389: Address some of review comments
diff --git a/broker-core/src/main/java/org/apache/qpid/server/model/Consumer.java b/broker-core/src/main/java/org/apache/qpid/server/model/Consumer.java
index 505a03e..d516f51 100644
--- a/broker-core/src/main/java/org/apache/qpid/server/model/Consumer.java
+++ b/broker-core/src/main/java/org/apache/qpid/server/model/Consumer.java
@@ -24,7 +24,6 @@
import org.apache.qpid.server.consumer.ConsumerTarget;
import org.apache.qpid.server.message.MessageInstanceConsumer;
-import org.apache.qpid.server.session.AMQPSession;
@ManagedObject(creatable = false, amqpName = "org.apache.qpid.Consumer")
public interface Consumer<X extends Consumer<X,T>, T extends ConsumerTarget> extends ConfiguredObject<X>, MessageInstanceConsumer<T>
@@ -90,5 +89,6 @@
boolean isActive();
+ @DerivedAttribute(description = "Indicates whether the consumer is held in reserve after reaching the maximum number of live consumers")
boolean isNonLive();
}
diff --git a/broker-core/src/main/java/org/apache/qpid/server/queue/AbstractQueue.java b/broker-core/src/main/java/org/apache/qpid/server/queue/AbstractQueue.java
index 9d5602c..543ad1f 100644
--- a/broker-core/src/main/java/org/apache/qpid/server/queue/AbstractQueue.java
+++ b/broker-core/src/main/java/org/apache/qpid/server/queue/AbstractQueue.java
@@ -1038,30 +1038,9 @@
queueContext = new QueueContext(getEntries().getTail());
}
consumer.setQueueContext(queueContext);
-
- // this level of care over concurrency in maintaining the correct value for live consumers is probable not
- // necessary, as all this should take place serially in the configuration thread
- int maximumLiveConsumers = _maximumLiveConsumers;
- if(maximumLiveConsumers > 0)
+ if (_maximumLiveConsumers > 0 && !incrementNumberOfLiveConsumersIfApplicable())
{
- boolean added = false;
- int liveConsumers = LIVE_CONSUMERS_UPDATER.get(this);
- while(liveConsumers < maximumLiveConsumers)
- {
- if(LIVE_CONSUMERS_UPDATER.compareAndSet(this, liveConsumers, liveConsumers+1))
- {
- added = true;
-
- break;
- }
- liveConsumers = LIVE_CONSUMERS_UPDATER.get(this);
- }
-
- if(!added)
- {
- consumer.setNonLive(true);
- }
-
+ consumer.setNonLive(true);
}
_queueConsumerManager.addConsumer(consumer);
@@ -1099,7 +1078,7 @@
- void unregisterConsumer(final QueueConsumerImpl consumer)
+ <T extends ConsumerTarget<T>> void unregisterConsumer(final QueueConsumerImpl<T> consumer)
{
if (consumer == null)
{
@@ -1126,45 +1105,11 @@
resetSubPointersForGroups(consumer);
}
- // this level of care over concurrency in maintaining the correct value for live consumers is probable not
- // necessary, as all this should take place serially in the configuration thread
- int maximumLiveConsumers = _maximumLiveConsumers;
- if(maximumLiveConsumers > 0 && !consumer.isNonLive())
+ if (_maximumLiveConsumers > 0 && !consumer.isNonLive())
{
- boolean updated = false;
- int liveConsumers = LIVE_CONSUMERS_UPDATER.get(this);
- while(liveConsumers > 0)
- {
- if(LIVE_CONSUMERS_UPDATER.compareAndSet(this, liveConsumers, liveConsumers-1))
- {
- updated = true;
-
- break;
- }
- liveConsumers = LIVE_CONSUMERS_UPDATER.get(this);
- }
-
- liveConsumers = LIVE_CONSUMERS_UPDATER.get(this);
-
+ decrementNumberOfLiveConsumersIfApplicable();
consumer.setNonLive(true);
-
- Iterator<QueueConsumer<?,?>> consumerIterator = _queueConsumerManager.getAllIterator();
-
- QueueConsumerImpl<?> otherConsumer;
- while(consumerIterator.hasNext() && liveConsumers < maximumLiveConsumers)
- {
- otherConsumer = (QueueConsumerImpl<?>) consumerIterator.next();
-
- if(otherConsumer != null && otherConsumer.isNonLive() && LIVE_CONSUMERS_UPDATER.compareAndSet(this, liveConsumers, liveConsumers+1))
- {
- otherConsumer.setNonLive(false);
- otherConsumer.setNotifyWorkDesired(true);
- break;
- }
- liveConsumers = LIVE_CONSUMERS_UPDATER.get(this);
- maximumLiveConsumers = _maximumLiveConsumers;
- }
-
+ assignNextLiveConsumerIfApplicable();
}
// auto-delete queues must be deleted if there are no remaining subscribers
@@ -1193,6 +1138,68 @@
}
+ private boolean incrementNumberOfLiveConsumersIfApplicable()
+ {
+ // this level of care over concurrency in maintaining the correct value for live consumers is probable not
+ // necessary, as all this should take place serially in the configuration thread
+ int maximumLiveConsumers = _maximumLiveConsumers;
+ boolean added = false;
+ int liveConsumers = LIVE_CONSUMERS_UPDATER.get(this);
+ while (liveConsumers < maximumLiveConsumers)
+ {
+ if (LIVE_CONSUMERS_UPDATER.compareAndSet(this, liveConsumers, liveConsumers + 1))
+ {
+ added = true;
+ break;
+ }
+ liveConsumers = LIVE_CONSUMERS_UPDATER.get(this);
+ }
+
+ return added;
+ }
+
+ private boolean decrementNumberOfLiveConsumersIfApplicable()
+ {
+ // this level of care over concurrency in maintaining the correct value for live consumers is probable not
+ // necessary, as all this should take place serially in the configuration thread
+ boolean updated = false;
+ int liveConsumers = LIVE_CONSUMERS_UPDATER.get(this);
+ while (liveConsumers > 0)
+ {
+ if (LIVE_CONSUMERS_UPDATER.compareAndSet(this, liveConsumers, liveConsumers - 1))
+ {
+ updated = true;
+ break;
+ }
+ liveConsumers = LIVE_CONSUMERS_UPDATER.get(this);
+ }
+ return updated;
+ }
+
+ private void assignNextLiveConsumerIfApplicable()
+ {
+ int maximumLiveConsumers = _maximumLiveConsumers;
+ int liveConsumers = LIVE_CONSUMERS_UPDATER.get(this);
+ final Iterator<QueueConsumer<?, ?>> consumerIterator = _queueConsumerManager.getAllIterator();
+
+ QueueConsumerImpl<?> otherConsumer;
+ while (consumerIterator.hasNext() && liveConsumers < maximumLiveConsumers)
+ {
+ otherConsumer = (QueueConsumerImpl<?>) consumerIterator.next();
+
+ if (otherConsumer != null
+ && otherConsumer.isNonLive()
+ && LIVE_CONSUMERS_UPDATER.compareAndSet(this, liveConsumers, liveConsumers + 1))
+ {
+ otherConsumer.setNonLive(false);
+ otherConsumer.setNotifyWorkDesired(true);
+ break;
+ }
+ liveConsumers = LIVE_CONSUMERS_UPDATER.get(this);
+ maximumLiveConsumers = _maximumLiveConsumers;
+ }
+ }
+
@Override
public Collection<QueueConsumer<?,?>> getConsumers()
{
diff --git a/broker-core/src/test/java/org/apache/qpid/server/queue/StandardQueueTest.java b/broker-core/src/test/java/org/apache/qpid/server/queue/StandardQueueTest.java
index 89444ed..9a0ea22 100644
--- a/broker-core/src/test/java/org/apache/qpid/server/queue/StandardQueueTest.java
+++ b/broker-core/src/test/java/org/apache/qpid/server/queue/StandardQueueTest.java
@@ -44,6 +44,7 @@
import org.apache.qpid.server.store.MessageDurability;
import org.apache.qpid.server.store.MessageEnqueueRecord;
import org.apache.qpid.server.virtualhost.QueueManagingVirtualHost;
+
public class StandardQueueTest extends AbstractQueueTestBase
{