QPID-8389 : [Broker-J] Support the ability to limit the number of active consumers
diff --git a/broker-core/src/main/java/org/apache/qpid/server/model/Consumer.java b/broker-core/src/main/java/org/apache/qpid/server/model/Consumer.java
index 9db4b21..505a03e 100644
--- a/broker-core/src/main/java/org/apache/qpid/server/model/Consumer.java
+++ b/broker-core/src/main/java/org/apache/qpid/server/model/Consumer.java
@@ -90,4 +90,5 @@
 
     boolean isActive();
 
+    boolean isNonLive();
 }
diff --git a/broker-core/src/main/java/org/apache/qpid/server/model/Queue.java b/broker-core/src/main/java/org/apache/qpid/server/model/Queue.java
index 3920a14..bf4b724 100644
--- a/broker-core/src/main/java/org/apache/qpid/server/model/Queue.java
+++ b/broker-core/src/main/java/org/apache/qpid/server/model/Queue.java
@@ -88,6 +88,7 @@
     String MAXIMUM_QUEUE_DEPTH_MESSAGES = "maximumQueueDepthMessages";
     String MAXIMUM_QUEUE_DEPTH_BYTES = "maximumQueueDepthBytes";
     String EXPIRY_POLICY = "expiryPolicy";
+    String MAXIMUM_LIVE_CONSUMERS = "maximumLiveConsumers";
 
     String QUEUE_SCAVANGE_COUNT = "qpid.queue.scavenge_count";
     @SuppressWarnings("unused")
@@ -349,6 +350,18 @@
             mandatory = true)
     ExpiryPolicy getExpiryPolicy();
 
+    @SuppressWarnings("unused")
+    @ManagedContextDefault(name = "queue.defaultMaximumLiveConsumers",
+            description = "Specifies the default value for maximum live consumers. ")
+    int DEFAULT_MAXIMUM_LIVE_CONSUMERS = 0;
+
+    @ManagedAttribute(defaultValue = "${queue.defaultMaximumLiveConsumers}",
+            description = "Maximum live consumers. If the maximum number of live consumers is set to a number greater "
+                          + "than zero, then consumers in excess of this limit are held in reserve and will only become "
+                          + "eligible to receive messages if a live consumer is removed")
+    int getMaximumLiveConsumers();
+
+
     @ManagedOperation(nonModifying = true, changesConfiguredObjectState = false, skipAclCheck = true)
     Collection<PublishingLink> getPublishingLinks();
 
diff --git a/broker-core/src/main/java/org/apache/qpid/server/queue/AbstractQueue.java b/broker-core/src/main/java/org/apache/qpid/server/queue/AbstractQueue.java
index cc6608d..a9e316e 100644
--- a/broker-core/src/main/java/org/apache/qpid/server/queue/AbstractQueue.java
+++ b/broker-core/src/main/java/org/apache/qpid/server/queue/AbstractQueue.java
@@ -49,6 +49,7 @@
 import java.util.concurrent.CopyOnWriteArrayList;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
 import java.util.function.Predicate;
 import java.util.stream.Collectors;
 import java.util.zip.GZIPInputStream;
@@ -257,6 +258,13 @@
     @ManagedAttributeField
     private ExpiryPolicy _expiryPolicy;
 
+    @ManagedAttributeField
+    private volatile int _maximumLiveConsumers;
+
+    private static final AtomicIntegerFieldUpdater<AbstractQueue> LIVE_CONSUMERS_UPDATER =
+            AtomicIntegerFieldUpdater.newUpdater(AbstractQueue.class, "_liveConsumers");
+    private volatile int _liveConsumers;
+
     private static final int RECOVERING = 1;
     private static final int COMPLETING_RECOVERY = 2;
     private static final int RECOVERED = 3;
@@ -266,7 +274,7 @@
     private final ConcurrentLinkedQueue<EnqueueRequest> _postRecoveryQueue = new ConcurrentLinkedQueue<>();
     private final ConcurrentMap<String, Callable<MessageFilter>> _defaultFiltersMap = new ConcurrentHashMap<>();
     private final List<HoldMethod> _holdMethods = new CopyOnWriteArrayList<>();
-    private final Set<DestinationReferrer> _referrers = Collections.newSetFromMap(new ConcurrentHashMap<DestinationReferrer,Boolean>());
+    private final Set<DestinationReferrer> _referrers = Collections.newSetFromMap(new ConcurrentHashMap<>());
     private final Set<LocalTransaction> _transactions = ConcurrentHashMap.newKeySet();
     private final LocalTransaction.LocalTransactionListener _localTransactionListener = _transactions::remove;
 
@@ -282,7 +290,7 @@
 
     private interface HoldMethod
     {
-        boolean isHeld(MessageReference<?> message, long evalutaionTime);
+        boolean isHeld(MessageReference<?> message, long evaluationTime);
     }
 
     protected AbstractQueue(Map<String, Object> attributes, QueueManagingVirtualHost<?> virtualHost)
@@ -302,30 +310,20 @@
                             || getLifetimePolicy() == LifetimePolicy.DELETE_ON_SESSION_END))
         {
             Subject.doAs(getSubjectWithAddedSystemRights(),
-                         new PrivilegedAction<Object>()
-                         {
-                             @Override
-                             public Object run()
-                             {
-                                 setAttributes(Collections.<String, Object>singletonMap(AbstractConfiguredObject.DURABLE,
-                                                                                        false));
-                                 return null;
-                             }
+                         (PrivilegedAction<Object>) () -> {
+                             setAttributes(Collections.<String, Object>singletonMap(AbstractConfiguredObject.DURABLE,
+                                                                                    false));
+                             return null;
                          });
         }
 
         if(!isDurable() && getMessageDurability() != MessageDurability.NEVER)
         {
             Subject.doAs(getSubjectWithAddedSystemRights(),
-                         new PrivilegedAction<Object>()
-                         {
-                             @Override
-                             public Object run()
-                             {
-                                 setAttributes(Collections.<String, Object>singletonMap(Queue.MESSAGE_DURABILITY,
-                                                                                        MessageDurability.NEVER));
-                                 return null;
-                             }
+                         (PrivilegedAction<Object>) () -> {
+                             setAttributes(Collections.<String, Object>singletonMap(Queue.MESSAGE_DURABILITY,
+                                                                                    MessageDurability.NEVER));
+                             return null;
                          });
         }
 
@@ -372,7 +370,7 @@
 
         _queueHouseKeepingTask = new AdvanceConsumersTask();
         Subject activeSubject = Subject.getSubject(AccessController.getContext());
-        Set<SessionPrincipal> sessionPrincipals = activeSubject == null ? Collections.<SessionPrincipal>emptySet() : activeSubject.getPrincipals(SessionPrincipal.class);
+        Set<SessionPrincipal> sessionPrincipals = activeSubject == null ? Collections.emptySet() : activeSubject.getPrincipals(SessionPrincipal.class);
         AMQPSession<?, ?> session;
         if(sessionPrincipals.isEmpty())
         {
@@ -774,7 +772,7 @@
     @Override
     public Collection<String> getAvailableAttributes()
     {
-        return new ArrayList<String>(_arguments.keySet());
+        return new ArrayList<>(_arguments.keySet());
     }
 
     @Override
@@ -849,7 +847,7 @@
             });
 
             target.consumerAdded(queueConsumer);
-            if(isEmpty())
+            if(isEmpty() || queueConsumer.isNonLive())
             {
                 target.noMessagesAvailable();
             }
@@ -1041,6 +1039,31 @@
         }
         consumer.setQueueContext(queueContext);
 
+        // this level of care over concurrency in maintaining the correct value for live consumers is probable not
+        // necessary, as all this should take place serially in the configuration thread
+        int maximumLiveConsumers = _maximumLiveConsumers;
+        if(maximumLiveConsumers > 0)
+        {
+            boolean added = false;
+            int liveConsumers = LIVE_CONSUMERS_UPDATER.get(this);
+            while(liveConsumers < maximumLiveConsumers)
+            {
+                if(LIVE_CONSUMERS_UPDATER.compareAndSet(this, liveConsumers, liveConsumers+1))
+                {
+                    added = true;
+
+                    break;
+                }
+                liveConsumers = LIVE_CONSUMERS_UPDATER.get(this);
+            }
+
+            if(!added)
+            {
+                consumer.setNonLive(true);
+            }
+
+        }
+
         _queueConsumerManager.addConsumer(consumer);
         if (consumer.isNotifyWorkDesired())
         {
@@ -1089,7 +1112,7 @@
         {
             consumer.closeAsync();
             // No longer can the queue have an exclusive consumer
-            setExclusiveSubscriber(null);
+            clearExclusiveSubscriber();
 
             consumer.setQueueContext(null);
 
@@ -1103,6 +1126,47 @@
                 resetSubPointersForGroups(consumer);
             }
 
+            // this level of care over concurrency in maintaining the correct value for live consumers is probable not
+            // necessary, as all this should take place serially in the configuration thread
+            int maximumLiveConsumers = _maximumLiveConsumers;
+            if(maximumLiveConsumers > 0 && !consumer.isNonLive())
+            {
+                boolean updated = false;
+                int liveConsumers = LIVE_CONSUMERS_UPDATER.get(this);
+                while(liveConsumers > 0)
+                {
+                    if(LIVE_CONSUMERS_UPDATER.compareAndSet(this, liveConsumers, liveConsumers-1))
+                    {
+                        updated = true;
+
+                        break;
+                    }
+                    liveConsumers = LIVE_CONSUMERS_UPDATER.get(this);
+                }
+
+                liveConsumers = LIVE_CONSUMERS_UPDATER.get(this);
+
+                consumer.setNonLive(true);
+
+                Iterator<QueueConsumer<?,?>> consumerIterator = _queueConsumerManager.getAllIterator();
+
+                QueueConsumerImpl<?> otherConsumer;
+                while(consumerIterator.hasNext() && liveConsumers < maximumLiveConsumers)
+                {
+                    otherConsumer = (QueueConsumerImpl<?>) consumerIterator.next();
+
+                    if(otherConsumer != null && otherConsumer.isNonLive() && LIVE_CONSUMERS_UPDATER.compareAndSet(this, liveConsumers, liveConsumers+1))
+                    {
+                        otherConsumer.setNonLive(false);
+                        otherConsumer.setNotifyWorkDesired(true);
+                        break;
+                    }
+                    liveConsumers = LIVE_CONSUMERS_UPDATER.get(this);
+                    maximumLiveConsumers = _maximumLiveConsumers;
+                }
+
+            }
+
             // auto-delete queues must be deleted if there are no remaining subscribers
 
             if(!consumer.isTransient()
@@ -1114,15 +1178,10 @@
 
                 LOGGER.debug("Auto-deleting queue: {}", this);
 
-                Subject.doAs(getSubjectWithAddedSystemRights(), new PrivilegedAction<Object>()
-                             {
-                                 @Override
-                                 public Object run()
-                                 {
-                                     AbstractQueue.this.delete();
-                                     return null;
-                                 }
-                             });
+                Subject.doAs(getSubjectWithAddedSystemRights(), (PrivilegedAction<Object>) () -> {
+                    AbstractQueue.this.delete();
+                    return null;
+                });
 
 
                 // we need to manually fire the event to the removed consumer (which was the last one left for this
@@ -1583,6 +1642,12 @@
         return _deleted.get();
     }
 
+    @Override
+    public int getMaximumLiveConsumers()
+    {
+        return _maximumLiveConsumers;
+    }
+
     boolean wouldExpire(final ServerMessage message)
     {
         long expiration = calculateExpiration(message);
@@ -1592,7 +1657,7 @@
     @Override
     public List<QueueEntry> getMessagesOnTheQueue()
     {
-        ArrayList<QueueEntry> entryList = new ArrayList<QueueEntry>();
+        ArrayList<QueueEntry> entryList = new ArrayList<>();
         QueueEntryIterator queueListIterator = getEntries().iterator();
         while (queueListIterator.advance())
         {
@@ -1623,9 +1688,9 @@
         return _exclusiveSubscriber != null;
     }
 
-    private void setExclusiveSubscriber(QueueConsumer<?,?> exclusiveSubscriber)
+    private void clearExclusiveSubscriber()
     {
-        _exclusiveSubscriber = exclusiveSubscriber;
+        _exclusiveSubscriber = null;
     }
 
     /** Used to track bindings to exchanges so that on deletion they can easily be cancelled. */
@@ -1679,7 +1744,7 @@
 
     List<QueueEntry> getMessagesOnTheQueue(QueueEntryFilter filter)
     {
-        ArrayList<QueueEntry> entryList = new ArrayList<QueueEntry>();
+        ArrayList<QueueEntry> entryList = new ArrayList<>();
         QueueEntryIterator queueListIterator = getEntries().iterator();
         while (queueListIterator.advance() && !filter.filterComplete())
         {
@@ -2077,14 +2142,20 @@
     {
         boolean queueEmpty = false;
         MessageContainer messageContainer = null;
-
         _queueConsumerManager.setNotified(consumer, false);
         try
         {
 
             if (!consumer.isSuspended())
             {
-                messageContainer = attemptDelivery(consumer);
+                if(consumer.isNonLive())
+                {
+                    messageContainer = NO_MESSAGES;
+                }
+                else
+                {
+                    messageContainer = attemptDelivery(consumer);
+                }
 
                 if(messageContainer.getMessageInstance() == null)
                 {
@@ -2138,7 +2209,7 @@
     {
         // avoid referring old deleted queue entry in sub._queueContext._lastSeen
         QueueEntry node = getNextAvailableEntry(sub);
-        boolean subActive = sub.isActive() && !sub.isSuspended();
+        boolean subActive = sub.isActive() && !sub.isSuspended() && !sub.isNonLive();
 
         if (node != null && subActive
             && (sub.getPriority() == Integer.MAX_VALUE || noHigherPriorityWithCredit(sub, node)))
diff --git a/broker-core/src/main/java/org/apache/qpid/server/queue/QueueArgumentsConverter.java b/broker-core/src/main/java/org/apache/qpid/server/queue/QueueArgumentsConverter.java
index f9b054d..3aa8e8d 100644
--- a/broker-core/src/main/java/org/apache/qpid/server/queue/QueueArgumentsConverter.java
+++ b/broker-core/src/main/java/org/apache/qpid/server/queue/QueueArgumentsConverter.java
@@ -63,6 +63,8 @@
 
     public static final String X_QPID_DESCRIPTION = "x-qpid-description";
 
+    public static final String X_SINGLE_ACTIVE_CONSUMER = "x-single-active-consumer";
+
     private static final String QPID_LAST_VALUE_QUEUE_KEY = "qpid.last_value_queue_key";
 
     private static final String QPID_QUEUE_SORT_KEY = "qpid.queue_sort_key";
@@ -250,6 +252,17 @@
                 }
             }
 
+            if(wireArguments.containsKey(X_SINGLE_ACTIVE_CONSUMER))
+            {
+                wireArgumentNames.remove(X_SINGLE_ACTIVE_CONSUMER);
+                Object argument = wireArguments.get(X_SINGLE_ACTIVE_CONSUMER);
+                if ((argument instanceof Boolean && ((Boolean) argument).booleanValue())
+                    || (argument instanceof String && Boolean.parseBoolean((String)argument)))
+                {
+                    modelArguments.putIfAbsent(Queue.MAXIMUM_LIVE_CONSUMERS, 1);
+                }
+            }
+
             if (!wireArgumentNames.isEmpty())
             {
 
diff --git a/broker-core/src/main/java/org/apache/qpid/server/queue/QueueConsumerImpl.java b/broker-core/src/main/java/org/apache/qpid/server/queue/QueueConsumerImpl.java
index 9687433..8fe2342 100644
--- a/broker-core/src/main/java/org/apache/qpid/server/queue/QueueConsumerImpl.java
+++ b/broker-core/src/main/java/org/apache/qpid/server/queue/QueueConsumerImpl.java
@@ -102,6 +102,7 @@
     private final String _linkName;
 
     private volatile QueueConsumerNode _queueConsumerNode;
+    private volatile boolean _nonLive;
 
     QueueConsumerImpl(final AbstractQueue<?> queue,
                       T target,
@@ -193,7 +194,7 @@
     @Override
     public boolean isNotifyWorkDesired()
     {
-        return _target.isNotifyWorkDesired();
+        return !isNonLive() && _target.isNotifyWorkDesired();
     }
 
     @Override
@@ -544,6 +545,16 @@
         return _selector;
     }
 
+    @Override
+    public boolean isNonLive()
+    {
+        return _nonLive;
+    }
+
+    public void setNonLive(final boolean nonLive)
+    {
+        _nonLive = nonLive;
+    }
 
     @Override
     public String toLogString()
diff --git a/doc/java-broker/src/docbkx/Java-Broker-Appendix-Queue-Declaration-Arguments.xml b/doc/java-broker/src/docbkx/Java-Broker-Appendix-Queue-Declaration-Arguments.xml
index 78ff31d..ad90280 100644
--- a/doc/java-broker/src/docbkx/Java-Broker-Appendix-Queue-Declaration-Arguments.xml
+++ b/doc/java-broker/src/docbkx/Java-Broker-Appendix-Queue-Declaration-Arguments.xml
@@ -170,6 +170,17 @@
                         and assigned as this queue's <literal>alternateBinding</literal>.</para>
                     </entry>
                 </row>
+                <row xml:id="Java-Broker-Appendix-Queue-Declare-Arguments-X-Single-Active-Consumer">
+                    <entry>
+                        <para>x-single-active-consumer</para>
+                    </entry>
+                    <entry>
+                        <para>If set <literal>true</literal>, then of all consumers attached to a queue, only one will
+                            be designated as <emphasis>active</emphasis>, and eligible to receive messages.  If the active
+                            consumer is detached, and other consumers are attached, one of these other consumers is selected
+                            to become the single active consumer.</para>
+                    </entry>
+                </row>
             </tbody>
         </tgroup>
     </table>
diff --git a/systests/qpid-systests-jms_1.1/src/test/java/org/apache/qpid/systests/jms_1_1/extensions/limitliveconsumers/LimitLiveConsumersTest.java b/systests/qpid-systests-jms_1.1/src/test/java/org/apache/qpid/systests/jms_1_1/extensions/limitliveconsumers/LimitLiveConsumersTest.java
new file mode 100644
index 0000000..edb18f2
--- /dev/null
+++ b/systests/qpid-systests-jms_1.1/src/test/java/org/apache/qpid/systests/jms_1_1/extensions/limitliveconsumers/LimitLiveConsumersTest.java
@@ -0,0 +1,130 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.systests.jms_1_1.extensions.limitliveconsumers;
+
+import static org.apache.qpid.systests.Utils.INDEX;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertNull;
+
+import java.util.Collections;
+
+import javax.jms.Connection;
+import javax.jms.Message;
+import javax.jms.MessageConsumer;
+import javax.jms.MessageProducer;
+import javax.jms.Queue;
+import javax.jms.Session;
+
+import org.junit.Test;
+
+import org.apache.qpid.systests.JmsTestBase;
+import org.apache.qpid.systests.Utils;
+
+public class LimitLiveConsumersTest extends JmsTestBase
+{
+
+    @Test
+    public void testLimitLiveConsumers() throws Exception
+    {
+        String queueName = getTestName();
+        createEntityUsingAmqpManagement(queueName, "org.apache.qpid.Queue",
+                                        Collections.singletonMap("maximumLiveConsumers", 1));
+        Queue queue = createQueue(queueName);
+        int numberOfMessages = 5;
+        Connection connection = getConnectionBuilder().setSyncPublish(true).build();
+        try
+        {
+            Session session1 = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+            connection.start();
+
+
+            MessageConsumer consumer1 = session1.createConsumer(queue);
+
+            Session session2 = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+
+            MessageConsumer consumer2 = session2.createConsumer(queue);
+
+
+            Utils.sendMessages(session1, queue, numberOfMessages);
+
+
+            for (int i = 0; i < 3; i++)
+            {
+                Message receivedMsg = consumer1.receive(getReceiveTimeout());
+                assertNotNull("Message " + i + " not received", receivedMsg);
+                assertEquals("Unexpected message", i, receivedMsg.getIntProperty(INDEX));
+            }
+
+            assertNull("Unexpected message arrived", consumer2.receive(getShortReceiveTimeout()));
+
+            consumer1.close();
+            session1.close();
+
+            for (int i = 3; i < numberOfMessages; i++)
+            {
+                Message receivedMsg = consumer2.receive(getReceiveTimeout());
+                assertNotNull("Message " + i + " not received", receivedMsg);
+                assertEquals("Unexpected message", i, receivedMsg.getIntProperty(INDEX));
+            }
+
+            assertNull("Unexpected message arrived", consumer2.receive(getShortReceiveTimeout()));
+
+            Session session3 = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+
+            MessageConsumer consumer3 = session3.createConsumer(queue);
+
+            MessageProducer producer = session3.createProducer(queue);
+            producer.send(Utils.createNextMessage(session3, 6));
+            producer.send(Utils.createNextMessage(session3, 7));
+
+
+            assertNotNull("Message not received on second consumer", consumer2.receive(getReceiveTimeout()));
+            assertNull("Message unexpectedly received on third consumer", consumer3.receive(getShortReceiveTimeout()));
+
+            Session session4 = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+
+            MessageConsumer consumer4 = session4.createConsumer(queue);
+
+            assertNull("Message unexpectedly received on fourth consumer", consumer4.receive(getShortReceiveTimeout()));
+            consumer3.close();
+            session3.close();
+
+            assertNull("Message unexpectedly received on fourth consumer", consumer4.receive(getShortReceiveTimeout()));
+            consumer2.close();
+            session2.close();
+
+            assertNotNull("Message not received on fourth consumer", consumer4.receive(getReceiveTimeout()));
+
+
+            assertNull("Unexpected message arrived", consumer4.receive(getShortReceiveTimeout()));
+        }
+        finally
+        {
+            connection.close();
+        }
+    }
+
+    private long getShortReceiveTimeout()
+    {
+        return getReceiveTimeout() / 4;
+    }
+}