QPID-8389: Address some of review comments
diff --git a/broker-core/src/main/java/org/apache/qpid/server/model/Consumer.java b/broker-core/src/main/java/org/apache/qpid/server/model/Consumer.java
index 505a03e..d516f51 100644
--- a/broker-core/src/main/java/org/apache/qpid/server/model/Consumer.java
+++ b/broker-core/src/main/java/org/apache/qpid/server/model/Consumer.java
@@ -24,7 +24,6 @@
 
 import org.apache.qpid.server.consumer.ConsumerTarget;
 import org.apache.qpid.server.message.MessageInstanceConsumer;
-import org.apache.qpid.server.session.AMQPSession;
 
 @ManagedObject(creatable = false, amqpName = "org.apache.qpid.Consumer")
 public interface Consumer<X extends Consumer<X,T>, T extends ConsumerTarget> extends ConfiguredObject<X>, MessageInstanceConsumer<T>
@@ -90,5 +89,6 @@
 
     boolean isActive();
 
+    @DerivedAttribute(description = "Indicates whether the consumer is held in reserve after reaching the maximum number of live consumers")
     boolean isNonLive();
 }
diff --git a/broker-core/src/main/java/org/apache/qpid/server/queue/AbstractQueue.java b/broker-core/src/main/java/org/apache/qpid/server/queue/AbstractQueue.java
index 9d5602c..543ad1f 100644
--- a/broker-core/src/main/java/org/apache/qpid/server/queue/AbstractQueue.java
+++ b/broker-core/src/main/java/org/apache/qpid/server/queue/AbstractQueue.java
@@ -1038,30 +1038,9 @@
             queueContext = new QueueContext(getEntries().getTail());
         }
         consumer.setQueueContext(queueContext);
-
-        // this level of care over concurrency in maintaining the correct value for live consumers is probable not
-        // necessary, as all this should take place serially in the configuration thread
-        int maximumLiveConsumers = _maximumLiveConsumers;
-        if(maximumLiveConsumers > 0)
+        if (_maximumLiveConsumers > 0 && !incrementNumberOfLiveConsumersIfApplicable())
         {
-            boolean added = false;
-            int liveConsumers = LIVE_CONSUMERS_UPDATER.get(this);
-            while(liveConsumers < maximumLiveConsumers)
-            {
-                if(LIVE_CONSUMERS_UPDATER.compareAndSet(this, liveConsumers, liveConsumers+1))
-                {
-                    added = true;
-
-                    break;
-                }
-                liveConsumers = LIVE_CONSUMERS_UPDATER.get(this);
-            }
-
-            if(!added)
-            {
-                consumer.setNonLive(true);
-            }
-
+            consumer.setNonLive(true);
         }
 
         _queueConsumerManager.addConsumer(consumer);
@@ -1099,7 +1078,7 @@
 
 
 
-    void unregisterConsumer(final QueueConsumerImpl consumer)
+    <T extends ConsumerTarget<T>> void unregisterConsumer(final QueueConsumerImpl<T> consumer)
     {
         if (consumer == null)
         {
@@ -1126,45 +1105,11 @@
                 resetSubPointersForGroups(consumer);
             }
 
-            // this level of care over concurrency in maintaining the correct value for live consumers is probable not
-            // necessary, as all this should take place serially in the configuration thread
-            int maximumLiveConsumers = _maximumLiveConsumers;
-            if(maximumLiveConsumers > 0 && !consumer.isNonLive())
+            if (_maximumLiveConsumers > 0 && !consumer.isNonLive())
             {
-                boolean updated = false;
-                int liveConsumers = LIVE_CONSUMERS_UPDATER.get(this);
-                while(liveConsumers > 0)
-                {
-                    if(LIVE_CONSUMERS_UPDATER.compareAndSet(this, liveConsumers, liveConsumers-1))
-                    {
-                        updated = true;
-
-                        break;
-                    }
-                    liveConsumers = LIVE_CONSUMERS_UPDATER.get(this);
-                }
-
-                liveConsumers = LIVE_CONSUMERS_UPDATER.get(this);
-
+                decrementNumberOfLiveConsumersIfApplicable();
                 consumer.setNonLive(true);
-
-                Iterator<QueueConsumer<?,?>> consumerIterator = _queueConsumerManager.getAllIterator();
-
-                QueueConsumerImpl<?> otherConsumer;
-                while(consumerIterator.hasNext() && liveConsumers < maximumLiveConsumers)
-                {
-                    otherConsumer = (QueueConsumerImpl<?>) consumerIterator.next();
-
-                    if(otherConsumer != null && otherConsumer.isNonLive() && LIVE_CONSUMERS_UPDATER.compareAndSet(this, liveConsumers, liveConsumers+1))
-                    {
-                        otherConsumer.setNonLive(false);
-                        otherConsumer.setNotifyWorkDesired(true);
-                        break;
-                    }
-                    liveConsumers = LIVE_CONSUMERS_UPDATER.get(this);
-                    maximumLiveConsumers = _maximumLiveConsumers;
-                }
-
+                assignNextLiveConsumerIfApplicable();
             }
 
             // auto-delete queues must be deleted if there are no remaining subscribers
@@ -1193,6 +1138,68 @@
 
     }
 
+    private boolean incrementNumberOfLiveConsumersIfApplicable()
+    {
+        // this level of care over concurrency in maintaining the correct value for live consumers is probable not
+        // necessary, as all this should take place serially in the configuration thread
+        int maximumLiveConsumers = _maximumLiveConsumers;
+        boolean added = false;
+        int liveConsumers = LIVE_CONSUMERS_UPDATER.get(this);
+        while (liveConsumers < maximumLiveConsumers)
+        {
+            if (LIVE_CONSUMERS_UPDATER.compareAndSet(this, liveConsumers, liveConsumers + 1))
+            {
+                added = true;
+                break;
+            }
+            liveConsumers = LIVE_CONSUMERS_UPDATER.get(this);
+        }
+
+        return added;
+    }
+
+    private boolean decrementNumberOfLiveConsumersIfApplicable()
+    {
+        // this level of care over concurrency in maintaining the correct value for live consumers is probable not
+        // necessary, as all this should take place serially in the configuration thread
+        boolean updated = false;
+        int liveConsumers = LIVE_CONSUMERS_UPDATER.get(this);
+        while (liveConsumers > 0)
+        {
+            if (LIVE_CONSUMERS_UPDATER.compareAndSet(this, liveConsumers, liveConsumers - 1))
+            {
+                updated = true;
+                break;
+            }
+            liveConsumers = LIVE_CONSUMERS_UPDATER.get(this);
+        }
+        return updated;
+    }
+
+    private void assignNextLiveConsumerIfApplicable()
+    {
+        int maximumLiveConsumers = _maximumLiveConsumers;
+        int liveConsumers = LIVE_CONSUMERS_UPDATER.get(this);
+        final Iterator<QueueConsumer<?, ?>> consumerIterator = _queueConsumerManager.getAllIterator();
+
+        QueueConsumerImpl<?> otherConsumer;
+        while (consumerIterator.hasNext() && liveConsumers < maximumLiveConsumers)
+        {
+            otherConsumer = (QueueConsumerImpl<?>) consumerIterator.next();
+
+            if (otherConsumer != null
+                && otherConsumer.isNonLive()
+                && LIVE_CONSUMERS_UPDATER.compareAndSet(this, liveConsumers, liveConsumers + 1))
+            {
+                otherConsumer.setNonLive(false);
+                otherConsumer.setNotifyWorkDesired(true);
+                break;
+            }
+            liveConsumers = LIVE_CONSUMERS_UPDATER.get(this);
+            maximumLiveConsumers = _maximumLiveConsumers;
+        }
+    }
+
     @Override
     public Collection<QueueConsumer<?,?>> getConsumers()
     {
diff --git a/broker-core/src/test/java/org/apache/qpid/server/queue/StandardQueueTest.java b/broker-core/src/test/java/org/apache/qpid/server/queue/StandardQueueTest.java
index 89444ed..9a0ea22 100644
--- a/broker-core/src/test/java/org/apache/qpid/server/queue/StandardQueueTest.java
+++ b/broker-core/src/test/java/org/apache/qpid/server/queue/StandardQueueTest.java
@@ -44,6 +44,7 @@
 import org.apache.qpid.server.store.MessageDurability;
 import org.apache.qpid.server.store.MessageEnqueueRecord;
 import org.apache.qpid.server.virtualhost.QueueManagingVirtualHost;
+
 public class StandardQueueTest extends AbstractQueueTestBase
 {