QPID-8389 : [Broker-J] Support the ability to limit the number of active consumers
diff --git a/broker-core/src/main/java/org/apache/qpid/server/model/Consumer.java b/broker-core/src/main/java/org/apache/qpid/server/model/Consumer.java
index 9db4b21..505a03e 100644
--- a/broker-core/src/main/java/org/apache/qpid/server/model/Consumer.java
+++ b/broker-core/src/main/java/org/apache/qpid/server/model/Consumer.java
@@ -90,4 +90,5 @@
boolean isActive();
+ boolean isNonLive();
}
diff --git a/broker-core/src/main/java/org/apache/qpid/server/model/Queue.java b/broker-core/src/main/java/org/apache/qpid/server/model/Queue.java
index 3920a14..bf4b724 100644
--- a/broker-core/src/main/java/org/apache/qpid/server/model/Queue.java
+++ b/broker-core/src/main/java/org/apache/qpid/server/model/Queue.java
@@ -88,6 +88,7 @@
String MAXIMUM_QUEUE_DEPTH_MESSAGES = "maximumQueueDepthMessages";
String MAXIMUM_QUEUE_DEPTH_BYTES = "maximumQueueDepthBytes";
String EXPIRY_POLICY = "expiryPolicy";
+ String MAXIMUM_LIVE_CONSUMERS = "maximumLiveConsumers";
String QUEUE_SCAVANGE_COUNT = "qpid.queue.scavenge_count";
@SuppressWarnings("unused")
@@ -349,6 +350,18 @@
mandatory = true)
ExpiryPolicy getExpiryPolicy();
+ @SuppressWarnings("unused")
+ @ManagedContextDefault(name = "queue.defaultMaximumLiveConsumers",
+ description = "Specifies the default value for maximum live consumers. ")
+ int DEFAULT_MAXIMUM_LIVE_CONSUMERS = 0;
+
+ @ManagedAttribute(defaultValue = "${queue.defaultMaximumLiveConsumers}",
+ description = "Maximum live consumers. If the maximum number of live consumers is set to a number greater "
+ + "than zero, then consumers in excess of this limit are held in reserve and will only become "
+ + "eligible to receive messages if a live consumer is removed")
+ int getMaximumLiveConsumers();
+
+
@ManagedOperation(nonModifying = true, changesConfiguredObjectState = false, skipAclCheck = true)
Collection<PublishingLink> getPublishingLinks();
diff --git a/broker-core/src/main/java/org/apache/qpid/server/queue/AbstractQueue.java b/broker-core/src/main/java/org/apache/qpid/server/queue/AbstractQueue.java
index cc6608d..a9e316e 100644
--- a/broker-core/src/main/java/org/apache/qpid/server/queue/AbstractQueue.java
+++ b/broker-core/src/main/java/org/apache/qpid/server/queue/AbstractQueue.java
@@ -49,6 +49,7 @@
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
import java.util.function.Predicate;
import java.util.stream.Collectors;
import java.util.zip.GZIPInputStream;
@@ -257,6 +258,13 @@
@ManagedAttributeField
private ExpiryPolicy _expiryPolicy;
+ @ManagedAttributeField
+ private volatile int _maximumLiveConsumers;
+
+ private static final AtomicIntegerFieldUpdater<AbstractQueue> LIVE_CONSUMERS_UPDATER =
+ AtomicIntegerFieldUpdater.newUpdater(AbstractQueue.class, "_liveConsumers");
+ private volatile int _liveConsumers;
+
private static final int RECOVERING = 1;
private static final int COMPLETING_RECOVERY = 2;
private static final int RECOVERED = 3;
@@ -266,7 +274,7 @@
private final ConcurrentLinkedQueue<EnqueueRequest> _postRecoveryQueue = new ConcurrentLinkedQueue<>();
private final ConcurrentMap<String, Callable<MessageFilter>> _defaultFiltersMap = new ConcurrentHashMap<>();
private final List<HoldMethod> _holdMethods = new CopyOnWriteArrayList<>();
- private final Set<DestinationReferrer> _referrers = Collections.newSetFromMap(new ConcurrentHashMap<DestinationReferrer,Boolean>());
+ private final Set<DestinationReferrer> _referrers = Collections.newSetFromMap(new ConcurrentHashMap<>());
private final Set<LocalTransaction> _transactions = ConcurrentHashMap.newKeySet();
private final LocalTransaction.LocalTransactionListener _localTransactionListener = _transactions::remove;
@@ -282,7 +290,7 @@
private interface HoldMethod
{
- boolean isHeld(MessageReference<?> message, long evalutaionTime);
+ boolean isHeld(MessageReference<?> message, long evaluationTime);
}
protected AbstractQueue(Map<String, Object> attributes, QueueManagingVirtualHost<?> virtualHost)
@@ -302,30 +310,20 @@
|| getLifetimePolicy() == LifetimePolicy.DELETE_ON_SESSION_END))
{
Subject.doAs(getSubjectWithAddedSystemRights(),
- new PrivilegedAction<Object>()
- {
- @Override
- public Object run()
- {
- setAttributes(Collections.<String, Object>singletonMap(AbstractConfiguredObject.DURABLE,
- false));
- return null;
- }
+ (PrivilegedAction<Object>) () -> {
+ setAttributes(Collections.<String, Object>singletonMap(AbstractConfiguredObject.DURABLE,
+ false));
+ return null;
});
}
if(!isDurable() && getMessageDurability() != MessageDurability.NEVER)
{
Subject.doAs(getSubjectWithAddedSystemRights(),
- new PrivilegedAction<Object>()
- {
- @Override
- public Object run()
- {
- setAttributes(Collections.<String, Object>singletonMap(Queue.MESSAGE_DURABILITY,
- MessageDurability.NEVER));
- return null;
- }
+ (PrivilegedAction<Object>) () -> {
+ setAttributes(Collections.<String, Object>singletonMap(Queue.MESSAGE_DURABILITY,
+ MessageDurability.NEVER));
+ return null;
});
}
@@ -372,7 +370,7 @@
_queueHouseKeepingTask = new AdvanceConsumersTask();
Subject activeSubject = Subject.getSubject(AccessController.getContext());
- Set<SessionPrincipal> sessionPrincipals = activeSubject == null ? Collections.<SessionPrincipal>emptySet() : activeSubject.getPrincipals(SessionPrincipal.class);
+ Set<SessionPrincipal> sessionPrincipals = activeSubject == null ? Collections.emptySet() : activeSubject.getPrincipals(SessionPrincipal.class);
AMQPSession<?, ?> session;
if(sessionPrincipals.isEmpty())
{
@@ -774,7 +772,7 @@
@Override
public Collection<String> getAvailableAttributes()
{
- return new ArrayList<String>(_arguments.keySet());
+ return new ArrayList<>(_arguments.keySet());
}
@Override
@@ -849,7 +847,7 @@
});
target.consumerAdded(queueConsumer);
- if(isEmpty())
+ if(isEmpty() || queueConsumer.isNonLive())
{
target.noMessagesAvailable();
}
@@ -1041,6 +1039,31 @@
}
consumer.setQueueContext(queueContext);
+ // this level of care over concurrency in maintaining the correct value for live consumers is probable not
+ // necessary, as all this should take place serially in the configuration thread
+ int maximumLiveConsumers = _maximumLiveConsumers;
+ if(maximumLiveConsumers > 0)
+ {
+ boolean added = false;
+ int liveConsumers = LIVE_CONSUMERS_UPDATER.get(this);
+ while(liveConsumers < maximumLiveConsumers)
+ {
+ if(LIVE_CONSUMERS_UPDATER.compareAndSet(this, liveConsumers, liveConsumers+1))
+ {
+ added = true;
+
+ break;
+ }
+ liveConsumers = LIVE_CONSUMERS_UPDATER.get(this);
+ }
+
+ if(!added)
+ {
+ consumer.setNonLive(true);
+ }
+
+ }
+
_queueConsumerManager.addConsumer(consumer);
if (consumer.isNotifyWorkDesired())
{
@@ -1089,7 +1112,7 @@
{
consumer.closeAsync();
// No longer can the queue have an exclusive consumer
- setExclusiveSubscriber(null);
+ clearExclusiveSubscriber();
consumer.setQueueContext(null);
@@ -1103,6 +1126,47 @@
resetSubPointersForGroups(consumer);
}
+ // this level of care over concurrency in maintaining the correct value for live consumers is probable not
+ // necessary, as all this should take place serially in the configuration thread
+ int maximumLiveConsumers = _maximumLiveConsumers;
+ if(maximumLiveConsumers > 0 && !consumer.isNonLive())
+ {
+ boolean updated = false;
+ int liveConsumers = LIVE_CONSUMERS_UPDATER.get(this);
+ while(liveConsumers > 0)
+ {
+ if(LIVE_CONSUMERS_UPDATER.compareAndSet(this, liveConsumers, liveConsumers-1))
+ {
+ updated = true;
+
+ break;
+ }
+ liveConsumers = LIVE_CONSUMERS_UPDATER.get(this);
+ }
+
+ liveConsumers = LIVE_CONSUMERS_UPDATER.get(this);
+
+ consumer.setNonLive(true);
+
+ Iterator<QueueConsumer<?,?>> consumerIterator = _queueConsumerManager.getAllIterator();
+
+ QueueConsumerImpl<?> otherConsumer;
+ while(consumerIterator.hasNext() && liveConsumers < maximumLiveConsumers)
+ {
+ otherConsumer = (QueueConsumerImpl<?>) consumerIterator.next();
+
+ if(otherConsumer != null && otherConsumer.isNonLive() && LIVE_CONSUMERS_UPDATER.compareAndSet(this, liveConsumers, liveConsumers+1))
+ {
+ otherConsumer.setNonLive(false);
+ otherConsumer.setNotifyWorkDesired(true);
+ break;
+ }
+ liveConsumers = LIVE_CONSUMERS_UPDATER.get(this);
+ maximumLiveConsumers = _maximumLiveConsumers;
+ }
+
+ }
+
// auto-delete queues must be deleted if there are no remaining subscribers
if(!consumer.isTransient()
@@ -1114,15 +1178,10 @@
LOGGER.debug("Auto-deleting queue: {}", this);
- Subject.doAs(getSubjectWithAddedSystemRights(), new PrivilegedAction<Object>()
- {
- @Override
- public Object run()
- {
- AbstractQueue.this.delete();
- return null;
- }
- });
+ Subject.doAs(getSubjectWithAddedSystemRights(), (PrivilegedAction<Object>) () -> {
+ AbstractQueue.this.delete();
+ return null;
+ });
// we need to manually fire the event to the removed consumer (which was the last one left for this
@@ -1583,6 +1642,12 @@
return _deleted.get();
}
+ @Override
+ public int getMaximumLiveConsumers()
+ {
+ return _maximumLiveConsumers;
+ }
+
boolean wouldExpire(final ServerMessage message)
{
long expiration = calculateExpiration(message);
@@ -1592,7 +1657,7 @@
@Override
public List<QueueEntry> getMessagesOnTheQueue()
{
- ArrayList<QueueEntry> entryList = new ArrayList<QueueEntry>();
+ ArrayList<QueueEntry> entryList = new ArrayList<>();
QueueEntryIterator queueListIterator = getEntries().iterator();
while (queueListIterator.advance())
{
@@ -1623,9 +1688,9 @@
return _exclusiveSubscriber != null;
}
- private void setExclusiveSubscriber(QueueConsumer<?,?> exclusiveSubscriber)
+ private void clearExclusiveSubscriber()
{
- _exclusiveSubscriber = exclusiveSubscriber;
+ _exclusiveSubscriber = null;
}
/** Used to track bindings to exchanges so that on deletion they can easily be cancelled. */
@@ -1679,7 +1744,7 @@
List<QueueEntry> getMessagesOnTheQueue(QueueEntryFilter filter)
{
- ArrayList<QueueEntry> entryList = new ArrayList<QueueEntry>();
+ ArrayList<QueueEntry> entryList = new ArrayList<>();
QueueEntryIterator queueListIterator = getEntries().iterator();
while (queueListIterator.advance() && !filter.filterComplete())
{
@@ -2077,14 +2142,20 @@
{
boolean queueEmpty = false;
MessageContainer messageContainer = null;
-
_queueConsumerManager.setNotified(consumer, false);
try
{
if (!consumer.isSuspended())
{
- messageContainer = attemptDelivery(consumer);
+ if(consumer.isNonLive())
+ {
+ messageContainer = NO_MESSAGES;
+ }
+ else
+ {
+ messageContainer = attemptDelivery(consumer);
+ }
if(messageContainer.getMessageInstance() == null)
{
@@ -2138,7 +2209,7 @@
{
// avoid referring old deleted queue entry in sub._queueContext._lastSeen
QueueEntry node = getNextAvailableEntry(sub);
- boolean subActive = sub.isActive() && !sub.isSuspended();
+ boolean subActive = sub.isActive() && !sub.isSuspended() && !sub.isNonLive();
if (node != null && subActive
&& (sub.getPriority() == Integer.MAX_VALUE || noHigherPriorityWithCredit(sub, node)))
diff --git a/broker-core/src/main/java/org/apache/qpid/server/queue/QueueArgumentsConverter.java b/broker-core/src/main/java/org/apache/qpid/server/queue/QueueArgumentsConverter.java
index f9b054d..3aa8e8d 100644
--- a/broker-core/src/main/java/org/apache/qpid/server/queue/QueueArgumentsConverter.java
+++ b/broker-core/src/main/java/org/apache/qpid/server/queue/QueueArgumentsConverter.java
@@ -63,6 +63,8 @@
public static final String X_QPID_DESCRIPTION = "x-qpid-description";
+ public static final String X_SINGLE_ACTIVE_CONSUMER = "x-single-active-consumer";
+
private static final String QPID_LAST_VALUE_QUEUE_KEY = "qpid.last_value_queue_key";
private static final String QPID_QUEUE_SORT_KEY = "qpid.queue_sort_key";
@@ -250,6 +252,17 @@
}
}
+ if(wireArguments.containsKey(X_SINGLE_ACTIVE_CONSUMER))
+ {
+ wireArgumentNames.remove(X_SINGLE_ACTIVE_CONSUMER);
+ Object argument = wireArguments.get(X_SINGLE_ACTIVE_CONSUMER);
+ if ((argument instanceof Boolean && ((Boolean) argument).booleanValue())
+ || (argument instanceof String && Boolean.parseBoolean((String)argument)))
+ {
+ modelArguments.putIfAbsent(Queue.MAXIMUM_LIVE_CONSUMERS, 1);
+ }
+ }
+
if (!wireArgumentNames.isEmpty())
{
diff --git a/broker-core/src/main/java/org/apache/qpid/server/queue/QueueConsumerImpl.java b/broker-core/src/main/java/org/apache/qpid/server/queue/QueueConsumerImpl.java
index 9687433..8fe2342 100644
--- a/broker-core/src/main/java/org/apache/qpid/server/queue/QueueConsumerImpl.java
+++ b/broker-core/src/main/java/org/apache/qpid/server/queue/QueueConsumerImpl.java
@@ -102,6 +102,7 @@
private final String _linkName;
private volatile QueueConsumerNode _queueConsumerNode;
+ private volatile boolean _nonLive;
QueueConsumerImpl(final AbstractQueue<?> queue,
T target,
@@ -193,7 +194,7 @@
@Override
public boolean isNotifyWorkDesired()
{
- return _target.isNotifyWorkDesired();
+ return !isNonLive() && _target.isNotifyWorkDesired();
}
@Override
@@ -544,6 +545,16 @@
return _selector;
}
+ @Override
+ public boolean isNonLive()
+ {
+ return _nonLive;
+ }
+
+ public void setNonLive(final boolean nonLive)
+ {
+ _nonLive = nonLive;
+ }
@Override
public String toLogString()
diff --git a/doc/java-broker/src/docbkx/Java-Broker-Appendix-Queue-Declaration-Arguments.xml b/doc/java-broker/src/docbkx/Java-Broker-Appendix-Queue-Declaration-Arguments.xml
index 78ff31d..ad90280 100644
--- a/doc/java-broker/src/docbkx/Java-Broker-Appendix-Queue-Declaration-Arguments.xml
+++ b/doc/java-broker/src/docbkx/Java-Broker-Appendix-Queue-Declaration-Arguments.xml
@@ -170,6 +170,17 @@
and assigned as this queue's <literal>alternateBinding</literal>.</para>
</entry>
</row>
+ <row xml:id="Java-Broker-Appendix-Queue-Declare-Arguments-X-Single-Active-Consumer">
+ <entry>
+ <para>x-single-active-consumer</para>
+ </entry>
+ <entry>
+ <para>If set <literal>true</literal>, then of all consumers attached to a queue, only one will
+ be designated as <emphasis>active</emphasis>, and eligible to receive messages. If the active
+ consumer is detached, and other consumers are attached, one of these other consumers is selected
+ to become the single active consumer.</para>
+ </entry>
+ </row>
</tbody>
</tgroup>
</table>
diff --git a/systests/qpid-systests-jms_1.1/src/test/java/org/apache/qpid/systests/jms_1_1/extensions/limitliveconsumers/LimitLiveConsumersTest.java b/systests/qpid-systests-jms_1.1/src/test/java/org/apache/qpid/systests/jms_1_1/extensions/limitliveconsumers/LimitLiveConsumersTest.java
new file mode 100644
index 0000000..edb18f2
--- /dev/null
+++ b/systests/qpid-systests-jms_1.1/src/test/java/org/apache/qpid/systests/jms_1_1/extensions/limitliveconsumers/LimitLiveConsumersTest.java
@@ -0,0 +1,130 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.systests.jms_1_1.extensions.limitliveconsumers;
+
+import static org.apache.qpid.systests.Utils.INDEX;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertNull;
+
+import java.util.Collections;
+
+import javax.jms.Connection;
+import javax.jms.Message;
+import javax.jms.MessageConsumer;
+import javax.jms.MessageProducer;
+import javax.jms.Queue;
+import javax.jms.Session;
+
+import org.junit.Test;
+
+import org.apache.qpid.systests.JmsTestBase;
+import org.apache.qpid.systests.Utils;
+
+public class LimitLiveConsumersTest extends JmsTestBase
+{
+
+ @Test
+ public void testLimitLiveConsumers() throws Exception
+ {
+ String queueName = getTestName();
+ createEntityUsingAmqpManagement(queueName, "org.apache.qpid.Queue",
+ Collections.singletonMap("maximumLiveConsumers", 1));
+ Queue queue = createQueue(queueName);
+ int numberOfMessages = 5;
+ Connection connection = getConnectionBuilder().setSyncPublish(true).build();
+ try
+ {
+ Session session1 = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+ connection.start();
+
+
+ MessageConsumer consumer1 = session1.createConsumer(queue);
+
+ Session session2 = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+
+ MessageConsumer consumer2 = session2.createConsumer(queue);
+
+
+ Utils.sendMessages(session1, queue, numberOfMessages);
+
+
+ for (int i = 0; i < 3; i++)
+ {
+ Message receivedMsg = consumer1.receive(getReceiveTimeout());
+ assertNotNull("Message " + i + " not received", receivedMsg);
+ assertEquals("Unexpected message", i, receivedMsg.getIntProperty(INDEX));
+ }
+
+ assertNull("Unexpected message arrived", consumer2.receive(getShortReceiveTimeout()));
+
+ consumer1.close();
+ session1.close();
+
+ for (int i = 3; i < numberOfMessages; i++)
+ {
+ Message receivedMsg = consumer2.receive(getReceiveTimeout());
+ assertNotNull("Message " + i + " not received", receivedMsg);
+ assertEquals("Unexpected message", i, receivedMsg.getIntProperty(INDEX));
+ }
+
+ assertNull("Unexpected message arrived", consumer2.receive(getShortReceiveTimeout()));
+
+ Session session3 = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+
+ MessageConsumer consumer3 = session3.createConsumer(queue);
+
+ MessageProducer producer = session3.createProducer(queue);
+ producer.send(Utils.createNextMessage(session3, 6));
+ producer.send(Utils.createNextMessage(session3, 7));
+
+
+ assertNotNull("Message not received on second consumer", consumer2.receive(getReceiveTimeout()));
+ assertNull("Message unexpectedly received on third consumer", consumer3.receive(getShortReceiveTimeout()));
+
+ Session session4 = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+
+ MessageConsumer consumer4 = session4.createConsumer(queue);
+
+ assertNull("Message unexpectedly received on fourth consumer", consumer4.receive(getShortReceiveTimeout()));
+ consumer3.close();
+ session3.close();
+
+ assertNull("Message unexpectedly received on fourth consumer", consumer4.receive(getShortReceiveTimeout()));
+ consumer2.close();
+ session2.close();
+
+ assertNotNull("Message not received on fourth consumer", consumer4.receive(getReceiveTimeout()));
+
+
+ assertNull("Unexpected message arrived", consumer4.receive(getShortReceiveTimeout()));
+ }
+ finally
+ {
+ connection.close();
+ }
+ }
+
+ private long getShortReceiveTimeout()
+ {
+ return getReceiveTimeout() / 4;
+ }
+}