QPID-7642 : Add experimental pull consumers
diff --git a/broker-core/src/main/java/org/apache/qpid/server/consumer/AbstractConsumerTarget.java b/broker-core/src/main/java/org/apache/qpid/server/consumer/AbstractConsumerTarget.java
index 73883f9..b936b52 100644
--- a/broker-core/src/main/java/org/apache/qpid/server/consumer/AbstractConsumerTarget.java
+++ b/broker-core/src/main/java/org/apache/qpid/server/consumer/AbstractConsumerTarget.java
@@ -21,26 +21,32 @@
package org.apache.qpid.server.consumer;
import java.util.ArrayList;
+import java.util.Iterator;
import java.util.List;
import java.util.Set;
import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.CopyOnWriteArraySet;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
-
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import org.apache.qpid.server.logging.LogSubject;
+import org.apache.qpid.server.logging.messages.SubscriptionMessages;
import org.apache.qpid.server.message.MessageInstance;
+import org.apache.qpid.server.model.Consumer;
+import org.apache.qpid.server.queue.SuspendedConsumerLoggingTicker;
import org.apache.qpid.server.transport.AMQPConnection;
import org.apache.qpid.server.util.StateChangeListener;
-public abstract class AbstractConsumerTarget implements ConsumerTarget
+public abstract class AbstractConsumerTarget implements ConsumerTarget, LogSubject
{
private static final Logger LOGGER = LoggerFactory.getLogger(AbstractConsumerTarget.class);
+ protected static final String PULL_ONLY_CONSUMER = "x-pull-only";
private final AtomicReference<State> _state;
private final Set<StateChangeListener<ConsumerTarget, State>> _stateChangeListeners = new
@@ -48,12 +54,39 @@
private final Lock _stateChangeLock = new ReentrantLock();
private final AtomicInteger _stateActivates = new AtomicInteger();
+ private final boolean _isMultiQueue;
+ private final SuspendedConsumerLoggingTicker _suspendedConsumerLoggingTicker;
private ConcurrentLinkedQueue<ConsumerMessageInstancePair> _queue = new ConcurrentLinkedQueue();
+ private final List<ConsumerImpl> _consumers = new CopyOnWriteArrayList<>();
+
+ private final boolean _isPullOnly;
+ private Iterator<ConsumerImpl> _pullIterator;
- protected AbstractConsumerTarget(final State initialState)
+ protected AbstractConsumerTarget(final State initialState,
+ final boolean isPullOnly,
+ final boolean isMultiQueue,
+ final AMQPConnection<?> amqpConnection)
{
_state = new AtomicReference<State>(initialState);
+ _isPullOnly = isPullOnly;
+ _isMultiQueue = isMultiQueue;
+ _suspendedConsumerLoggingTicker = isMultiQueue
+ ? new SuspendedConsumerLoggingTicker(amqpConnection.getContextValue(Long.class, Consumer.SUSPEND_NOTIFICATION_PERIOD))
+ {
+ @Override
+ protected void log(final long period)
+ {
+ amqpConnection.getEventLogger().message(AbstractConsumerTarget.this, SubscriptionMessages.STATE(period));
+ }
+ }
+ : null;
+
+ }
+
+ public boolean isMultiQueue()
+ {
+ return _isMultiQueue;
}
@Override
@@ -63,9 +96,8 @@
{
return false;
}
- if(hasMessagesToSend())
+ if(sendNextMessage())
{
- sendNextMessage();
return true;
}
else
@@ -91,6 +123,28 @@
protected abstract void processClosed();
@Override
+ public void consumerAdded(final ConsumerImpl sub)
+ {
+ _consumers.add(sub);
+ }
+
+ @Override
+ public void consumerRemoved(final ConsumerImpl sub)
+ {
+ _consumers.remove(sub);
+ if(_consumers.isEmpty())
+ {
+ close();
+ }
+ }
+
+ public List<ConsumerImpl> getConsumers()
+ {
+ return _consumers;
+ }
+
+
+ @Override
public final boolean isSuspended()
{
return getSessionModel().getAMQPConnection().isMessageAssignmentSuspended() || isFlowSuspended();
@@ -143,9 +197,24 @@
}
else
{
- for (StateChangeListener<ConsumerTarget, State> listener : _stateChangeListeners)
+ if(!_stateChangeListeners.isEmpty())
{
- listener.stateChanged(this, from, to);
+ for (StateChangeListener<ConsumerTarget, State> listener : _stateChangeListeners)
+ {
+ listener.stateChanged(this, from, to);
+ }
+ }
+ }
+ if(_suspendedConsumerLoggingTicker != null)
+ {
+ if (to == State.SUSPENDED)
+ {
+ _suspendedConsumerLoggingTicker.setStartTime(System.currentTimeMillis());
+ getSessionModel().addTicker(_suspendedConsumerLoggingTicker);
+ }
+ else
+ {
+ getSessionModel().removeTicker(_suspendedConsumerLoggingTicker);
}
}
return true;
@@ -193,6 +262,12 @@
}
@Override
+ public boolean isPullOnly()
+ {
+ return _isPullOnly;
+ }
+
+ @Override
public final long send(final ConsumerImpl consumer, MessageInstance entry, boolean batch)
{
AMQPConnection<?> amqpConnection = getSessionModel().getAMQPConnection();
@@ -207,12 +282,39 @@
@Override
public boolean hasMessagesToSend()
{
- return !_queue.isEmpty();
+ return !_queue.isEmpty() || (isPullOnly() && messagesAvailable());
+ }
+
+ private boolean messagesAvailable()
+ {
+ if(hasCredit())
+ {
+ for (ConsumerImpl consumer : _consumers)
+ {
+ if (consumer.hasAvailableMessages())
+ {
+ return true;
+ }
+ }
+ }
+ return false;
}
@Override
- public void sendNextMessage()
+ public boolean sendNextMessage()
{
+ if(isPullOnly())
+ {
+ if(_pullIterator == null || !_pullIterator.hasNext())
+ {
+ _pullIterator = getConsumers().iterator();
+ }
+ if(_pullIterator.hasNext())
+ {
+ ConsumerImpl consumer = _pullIterator.next();
+ consumer.pullMessage();
+ }
+ }
ConsumerMessageInstancePair consumerMessage = _queue.poll();
if (consumerMessage != null)
{
@@ -233,7 +335,13 @@
{
consumerMessage.release();
}
+ return true;
}
+ else
+ {
+ return false;
+ }
+
}
@@ -267,12 +375,26 @@
releaseSendLock();
}
- afterCloseInternal();
+ for (ConsumerImpl consumer : _consumers)
+ {
+ consumer.close();
+ }
+ if(_suspendedConsumerLoggingTicker != null)
+ {
+ getSessionModel().removeTicker(_suspendedConsumerLoggingTicker);
+ }
+
return closed;
}
- protected abstract void afterCloseInternal();
+ @Override
+ public String toLogString()
+ {
+
+ return "[(** Multi-Queue **)] ";
+ }
+
protected abstract void doCloseInternal();
}
diff --git a/broker-core/src/main/java/org/apache/qpid/server/consumer/ConsumerImpl.java b/broker-core/src/main/java/org/apache/qpid/server/consumer/ConsumerImpl.java
index fd6f338..cf91f75 100644
--- a/broker-core/src/main/java/org/apache/qpid/server/consumer/ConsumerImpl.java
+++ b/broker-core/src/main/java/org/apache/qpid/server/consumer/ConsumerImpl.java
@@ -33,6 +33,10 @@
ConsumerTarget getTarget();
+ boolean hasAvailableMessages();
+
+ void pullMessage();
+
enum Option
{
ACQUIRES,
diff --git a/broker-core/src/main/java/org/apache/qpid/server/consumer/ConsumerTarget.java b/broker-core/src/main/java/org/apache/qpid/server/consumer/ConsumerTarget.java
index 3d9cef8..7b9ac65 100644
--- a/broker-core/src/main/java/org/apache/qpid/server/consumer/ConsumerTarget.java
+++ b/broker-core/src/main/java/org/apache/qpid/server/consumer/ConsumerTarget.java
@@ -41,6 +41,8 @@
boolean hasCredit();
+ boolean isMultiQueue();
+
enum State
{
ACTIVE, SUSPENDED, CLOSED
@@ -66,7 +68,7 @@
boolean hasMessagesToSend();
- void sendNextMessage();
+ boolean sendNextMessage();
void flushBatched();
@@ -88,4 +90,6 @@
void releaseSendLock();
+ boolean isPullOnly();
+
}
diff --git a/broker-core/src/main/java/org/apache/qpid/server/queue/AbstractQueue.java b/broker-core/src/main/java/org/apache/qpid/server/queue/AbstractQueue.java
index 6221da5..49e2fab 100644
--- a/broker-core/src/main/java/org/apache/qpid/server/queue/AbstractQueue.java
+++ b/broker-core/src/main/java/org/apache/qpid/server/queue/AbstractQueue.java
@@ -290,6 +290,7 @@
private final ConcurrentMap<String, Callable<MessageFilter>> _defaultFiltersMap = new ConcurrentHashMap<>();
private final List<HoldMethod> _holdMethods = new CopyOnWriteArrayList<>();
private Map<String, String> _mimeTypeToFileExtension = Collections.emptyMap();
+ private volatile boolean _hasPullOnlyConsumers;
private interface HoldMethod
{
@@ -945,6 +946,10 @@
if (!isDeleted())
{
+ if(consumer.isPullOnly())
+ {
+ _hasPullOnlyConsumers = true;
+ }
_consumerList.add(consumer);
if (isDeleted())
@@ -960,7 +965,14 @@
childAdded(consumer);
consumer.addChangeListener(_deletedChildListener);
- deliverAsync();
+ if(consumer.isPullOnly())
+ {
+ consumer.getSessionModel().getAMQPConnection().notifyWork();
+ }
+ else
+ {
+ deliverAsync();
+ }
return consumer;
}
@@ -1001,6 +1013,18 @@
resetSubPointersForGroups(consumer);
}
+ if(consumer.isPullOnly())
+ {
+ boolean hasOnlyPushConsumers = true;
+ ConsumerNode consumerNode = _consumerList.getHead().findNext();
+ while (consumerNode != null && hasOnlyPushConsumers)
+ {
+ hasOnlyPushConsumers = !consumerNode.getConsumer().isPullOnly();
+ consumerNode = consumerNode.findNext();
+ }
+ _hasPullOnlyConsumers = !hasOnlyPushConsumers;
+ }
+
// auto-delete queues must be deleted if there are no remaining subscribers
if(!consumer.isTransient()
@@ -1071,7 +1095,7 @@
updateSubRequeueEntry(sub, entry);
}
}
-
+ notifyPullOnlyConsumers();
deliverAsync();
}
@@ -1224,7 +1248,7 @@
if (entry.isAvailable())
{
checkConsumersNotAheadOfDelivery(entry);
-
+ notifyPullOnlyConsumers();
deliverAsync();
}
@@ -1518,7 +1542,7 @@
updateSubRequeueEntry(sub, entry);
}
}
-
+ notifyPullOnlyConsumers();
deliverAsync();
}
@@ -1713,9 +1737,16 @@
if (oldState != State.ACTIVE)
{
_activeSubscriberCount.incrementAndGet();
+ if(sub.isPullOnly())
+ {
+ sub.getSessionModel().getAMQPConnection().notifyWork();
+ }
}
- deliverAsync();
+ if(!sub.isPullOnly())
+ {
+ deliverAsync();
+ }
}
}
@@ -2138,6 +2169,23 @@
}
+ void notifyPullOnlyConsumers()
+ {
+ if(_hasPullOnlyConsumers)
+ {
+ ConsumerNode consumerNode = _consumerList.getHead().findNext();
+ while (consumerNode != null)
+ {
+ QueueConsumer<?> consumer = consumerNode.getConsumer();
+ if (consumer.isActive() && consumer.isPullOnly() && getNextAvailableEntry(consumer) != null)
+ {
+ consumer.getSessionModel().getAMQPConnection().notifyWork();
+ }
+ consumerNode = consumerNode.findNext();
+ }
+ }
+ }
+
void flushConsumer(QueueConsumer<?> sub)
{
@@ -2384,6 +2432,12 @@
}
}
+
+ boolean hasAvailableMessages(final QueueConsumer queueConsumer)
+ {
+ return getNextAvailableEntry(queueConsumer) != null;
+ }
+
/**
* Used by queue Runners to asynchronously deliver messages to consumers.
*
@@ -2450,44 +2504,46 @@
QueueConsumer<?> sub = consumerNodeIterator.getNode().getConsumer();
- sub.getSendLock();
-
- try
+ if(!sub.isPullOnly())
{
- for(int i = 0 ; i < perSub; i++)
+ sub.getSendLock();
+
+ try
{
- //attempt delivery. returns true if no further delivery currently possible to this sub
- consumerDone = attemptDelivery(sub, true);
- if (consumerDone)
+ for (int i = 0; i < perSub; i++)
{
- sub.flushBatched();
- boolean noMore = getNextAvailableEntry(sub) == null;
- if (lastLoop && noMore)
- {
- sub.queueEmpty();
- }
- break;
- }
- else
- {
- //this consumer can accept additional deliveries, so we must
- //keep going after this (if iteration slicing allows it)
- allConsumersDone = false;
- lastLoop = false;
- if(--iterations == 0)
+ //attempt delivery. returns true if no further delivery currently possible to this sub
+ consumerDone = attemptDelivery(sub, true);
+ if (consumerDone)
{
sub.flushBatched();
+ boolean noMore = getNextAvailableEntry(sub) == null;
+ if (lastLoop && noMore)
+ {
+ sub.queueEmpty();
+ }
break;
}
+ else
+ {
+ //this consumer can accept additional deliveries, so we must
+ //keep going after this (if iteration slicing allows it)
+ allConsumersDone = false;
+ lastLoop = false;
+ if (--iterations == 0)
+ {
+ sub.flushBatched();
+ break;
+ }
+ }
}
+ sub.flushBatched();
}
-
- sub.flushBatched();
- }
- finally
- {
- sub.releaseSendLock();
+ finally
+ {
+ sub.releaseSendLock();
+ }
}
}
diff --git a/broker-core/src/main/java/org/apache/qpid/server/queue/QueueConsumer.java b/broker-core/src/main/java/org/apache/qpid/server/queue/QueueConsumer.java
index e7bee4f..771097a 100644
--- a/broker-core/src/main/java/org/apache/qpid/server/queue/QueueConsumer.java
+++ b/broker-core/src/main/java/org/apache/qpid/server/queue/QueueConsumer.java
@@ -53,4 +53,6 @@
void awaitCredit(QueueEntry entry);
boolean hasCredit();
+
+ boolean isPullOnly();
}
diff --git a/broker-core/src/main/java/org/apache/qpid/server/queue/QueueConsumerImpl.java b/broker-core/src/main/java/org/apache/qpid/server/queue/QueueConsumerImpl.java
index adedebd..804bde1 100644
--- a/broker-core/src/main/java/org/apache/qpid/server/queue/QueueConsumerImpl.java
+++ b/broker-core/src/main/java/org/apache/qpid/server/queue/QueueConsumerImpl.java
@@ -120,7 +120,8 @@
private int _priority;
QueueConsumerImpl(final AbstractQueue<?> queue,
- ConsumerTarget target, final String consumerName,
+ ConsumerTarget target,
+ final String consumerName,
final FilterManager filters,
final Class<? extends ServerMessage> messageClass,
EnumSet<Option> optionSet,
@@ -157,14 +158,16 @@
};
_target.addStateListener(_listener);
- _suspendedConsumerLoggingTicker = new SuspendedConsumerLoggingTicker(queue.getContextValue(Long.class, Consumer.SUSPEND_NOTIFICATION_PERIOD))
- {
- @Override
- protected void log(final long period)
- {
- getEventLogger().message(getLogSubject(), SubscriptionMessages.STATE(period));
- }
- };
+ _suspendedConsumerLoggingTicker = target.isMultiQueue()
+ ? null
+ : new SuspendedConsumerLoggingTicker(queue.getContextValue(Long.class, Consumer.SUSPEND_NOTIFICATION_PERIOD))
+ {
+ @Override
+ protected void log(final long period)
+ {
+ getEventLogger().message(getLogSubject(), SubscriptionMessages.STATE(period));
+ }
+ };
}
private static Map<String, Object> createAttributeMap(String name,
@@ -213,15 +216,17 @@
}
}
-
- if(newState == ConsumerTarget.State.SUSPENDED)
+ if(_suspendedConsumerLoggingTicker != null)
{
- _suspendedConsumerLoggingTicker.setStartTime(System.currentTimeMillis());
- getSessionModel().addTicker(_suspendedConsumerLoggingTicker);
- }
- else
- {
- getSessionModel().removeTicker(_suspendedConsumerLoggingTicker);
+ if (newState == ConsumerTarget.State.SUSPENDED)
+ {
+ _suspendedConsumerLoggingTicker.setStartTime(System.currentTimeMillis());
+ getSessionModel().addTicker(_suspendedConsumerLoggingTicker);
+ }
+ else
+ {
+ getSessionModel().removeTicker(_suspendedConsumerLoggingTicker);
+ }
}
}
@@ -257,7 +262,20 @@
@Override
public void externalStateChange()
{
- _queue.deliverAsync();
+ if(isPullOnly())
+ {
+ getSessionModel().getAMQPConnection().notifyWork();
+ }
+ else
+ {
+ _queue.deliverAsync();
+ }
+ }
+
+ @Override
+ public boolean hasAvailableMessages()
+ {
+ return !_queue.isEmpty() && _queue.hasAvailableMessages(this);
}
@Override
@@ -302,6 +320,10 @@
_target.consumerRemoved(this);
_target.removeStateChangeListener(_listener);
_queue.unregisterConsumer(this);
+ if(_suspendedConsumerLoggingTicker != null)
+ {
+ getSessionModel().removeTicker(_suspendedConsumerLoggingTicker);
+ }
deleted();
}
finally
@@ -323,6 +345,12 @@
_target.flushBatched();
}
+ @Override
+ public boolean isPullOnly()
+ {
+ return _target.isPullOnly();
+ }
+
public void queueDeleted()
{
_target.queueDeleted();
@@ -385,6 +413,22 @@
}
+ @Override
+ public void pullMessage()
+ {
+ AMQPConnection<?> connection = _target.getSessionModel().getAMQPConnection();
+ try
+ {
+ connection.alwaysAllowMessageAssignmentInThisThreadIfItIsIOThread(true);
+ _queue.flushConsumer(this, 1);
+ }
+ finally
+ {
+ connection.alwaysAllowMessageAssignmentInThisThreadIfItIsIOThread(false);
+ }
+
+ }
+
public boolean resend(final QueueEntry entry)
{
boolean messageWasResent = getQueue().resend(entry, this);
@@ -661,7 +705,14 @@
entry.addStateChangeListener(this);
if(!entry.isAvailable())
{
- _queue.deliverAsync();
+ if(isPullOnly())
+ {
+ getSessionModel().getAMQPConnection().notifyWork();
+ }
+ else
+ {
+ _queue.deliverAsync();
+ }
remove();
}
}
@@ -681,7 +732,14 @@
{
entry.removeStateChangeListener(this);
_entry.compareAndSet(entry, null);
- _queue.deliverAsync();
+ if(isPullOnly())
+ {
+ getSessionModel().getAMQPConnection().notifyWork();
+ }
+ else
+ {
+ _queue.deliverAsync();
+ }
}
}
diff --git a/broker-core/src/main/java/org/apache/qpid/server/transport/AMQPConnection.java b/broker-core/src/main/java/org/apache/qpid/server/transport/AMQPConnection.java
index 5b6415d..2f96140 100644
--- a/broker-core/src/main/java/org/apache/qpid/server/transport/AMQPConnection.java
+++ b/broker-core/src/main/java/org/apache/qpid/server/transport/AMQPConnection.java
@@ -28,11 +28,12 @@
import javax.security.auth.Subject;
import org.apache.qpid.protocol.AMQConstant;
+import org.apache.qpid.server.logging.EventLoggerProvider;
import org.apache.qpid.server.model.Connection;
import org.apache.qpid.server.protocol.AMQSessionModel;
import org.apache.qpid.server.util.Deletable;
-public interface AMQPConnection<C extends AMQPConnection<C>> extends Connection<C>, Deletable<C>
+public interface AMQPConnection<C extends AMQPConnection<C>> extends Connection<C>, Deletable<C>, EventLoggerProvider
{
boolean isMessageAssignmentSuspended();
diff --git a/broker-core/src/main/java/org/apache/qpid/server/virtualhost/AbstractSystemMessageSource.java b/broker-core/src/main/java/org/apache/qpid/server/virtualhost/AbstractSystemMessageSource.java
index 6c27737..4a69f00 100644
--- a/broker-core/src/main/java/org/apache/qpid/server/virtualhost/AbstractSystemMessageSource.java
+++ b/broker-core/src/main/java/org/apache/qpid/server/virtualhost/AbstractSystemMessageSource.java
@@ -140,6 +140,18 @@
}
@Override
+ public boolean hasAvailableMessages()
+ {
+ return !_queue.isEmpty();
+ }
+
+ @Override
+ public void pullMessage()
+ {
+
+ }
+
+ @Override
public long getBytesOut()
{
return 0;
diff --git a/broker-core/src/test/java/org/apache/qpid/server/consumer/MockConsumer.java b/broker-core/src/test/java/org/apache/qpid/server/consumer/MockConsumer.java
index 0b26f41..50ade31 100644
--- a/broker-core/src/test/java/org/apache/qpid/server/consumer/MockConsumer.java
+++ b/broker-core/src/test/java/org/apache/qpid/server/consumer/MockConsumer.java
@@ -154,9 +154,9 @@
}
@Override
- public void sendNextMessage()
+ public boolean sendNextMessage()
{
-
+ return false;
}
public void flushBatched()
@@ -278,6 +278,17 @@
_stateChangeLock.unlock();
}
+ @Override
+ public boolean isPullOnly()
+ {
+ return false;
+ }
+
+ @Override
+ public boolean isMultiQueue()
+ {
+ return false;
+ }
private static class MockSessionModel implements AMQSessionModel<MockSessionModel>
{
diff --git a/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ConsumerTarget_0_10.java b/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ConsumerTarget_0_10.java
index 396879a..cfb6770 100644
--- a/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ConsumerTarget_0_10.java
+++ b/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ConsumerTarget_0_10.java
@@ -22,9 +22,7 @@
import java.io.IOException;
import java.util.Collection;
-import java.util.List;
import java.util.Map;
-import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
@@ -88,7 +86,6 @@
private int _deferredMessageCredit;
private long _deferredSizeCredit;
- private final List<ConsumerImpl> _consumers = new CopyOnWriteArrayList<>();
private final StateChangeListener<MessageInstance, EntryState> _unacknowledgedMessageListener = new StateChangeListener<MessageInstance, EntryState>()
{
@@ -116,9 +113,10 @@
MessageAcquireMode acquireMode,
MessageFlowMode flowMode,
FlowCreditManager_0_10 creditManager,
- Map<String, Object> arguments)
+ Map<String, Object> arguments,
+ boolean multiQueue)
{
- super(State.SUSPENDED);
+ super(State.SUSPENDED, isPullOnly(arguments), multiQueue, session.getAMQPConnection());
_session = session;
_postIdSettingAction = new AddMessageDispositionListenerAction(session);
_acceptMode = acceptMode;
@@ -137,6 +135,12 @@
}
}
+ private static boolean isPullOnly(Map<String, Object> arguments)
+ {
+ return arguments.containsKey(PULL_ONLY_CONSUMER)
+ && Boolean.valueOf(String.valueOf(arguments.get(PULL_ONLY_CONSUMER)));
+ }
+
@Override
public boolean isFlowSuspended()
{
@@ -145,16 +149,6 @@
}
@Override
- protected void afterCloseInternal()
- {
-
- for (ConsumerImpl consumer : _consumers)
- {
- consumer.close();
- }
- }
-
- @Override
protected void doCloseInternal()
{
_creditManager.removeListener(this);
@@ -620,7 +614,7 @@
public void flush()
{
flushCreditState(true);
- for(ConsumerImpl consumer : _consumers)
+ for(ConsumerImpl consumer : getConsumers())
{
consumer.flush();
}
@@ -651,22 +645,6 @@
return _targetAddress;
}
- @Override
- public void consumerAdded(final ConsumerImpl sub)
- {
- _consumers.add(sub);
- }
-
- @Override
- public void consumerRemoved(final ConsumerImpl sub)
- {
- _consumers.remove(sub);
- if(_consumers.isEmpty())
- {
- close();
- }
- }
-
public long getUnacknowledgedBytes()
{
return _unacknowledgedBytes.longValue();
diff --git a/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSession.java b/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSession.java
index 4925ddf..8afa6aa 100644
--- a/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSession.java
+++ b/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSession.java
@@ -1255,7 +1255,10 @@
Collection<ConsumerTarget_0_10> consumerTargets = getSubscriptions();
for(ConsumerTarget_0_10 consumerTarget: consumerTargets)
{
- consumerTarget.notifyCurrentState();
+ if(!consumerTarget.isPullOnly())
+ {
+ consumerTarget.notifyCurrentState();
+ }
}
}
diff --git a/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSessionDelegate.java b/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSessionDelegate.java
index 99ac3d0..898edd6 100644
--- a/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSessionDelegate.java
+++ b/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSessionDelegate.java
@@ -217,13 +217,8 @@
final Collection<MessageSource> sources = new HashSet<>();
final MessageSource queue = addressSpace.getAttainedMessageSource(queueName);
- if(queue != null)
- {
- sources.add(queue);
- }
- else if(getContextValue(session, Boolean.class, "qpid.enableMultiQueueConsumers")
- && method.getArguments() != null
- && method.getArguments().get("x-multiqueue") instanceof Collection)
+
+ if(method.getArguments() != null && method.getArguments().get("x-multiqueue") instanceof Collection)
{
for(Object object : (Collection<Object>)method.getArguments().get("x-multiqueue"))
{
@@ -245,6 +240,10 @@
}
queueName = method.getArguments().get("x-multiqueue").toString();
}
+ else if(queue != null)
+ {
+ sources.add(queue);
+ }
if(sources.isEmpty())
{
@@ -306,13 +305,14 @@
}
-
+ boolean multiQueue = sources.size()>1;
ConsumerTarget_0_10 target = new ConsumerTarget_0_10((ServerSession)session, destination,
- method.getAcceptMode(),
- method.getAcquireMode(),
- MessageFlowMode.WINDOW,
- creditManager,
- method.getArguments()
+ method.getAcceptMode(),
+ method.getAcquireMode(),
+ MessageFlowMode.WINDOW,
+ creditManager,
+ method.getArguments(),
+ multiQueue
);
Integer priority = null;
diff --git a/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java b/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java
index e1530fe..ccfb99e 100644
--- a/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java
+++ b/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java
@@ -103,7 +103,6 @@
import org.apache.qpid.server.store.MessageStore;
import org.apache.qpid.server.store.StoredMessage;
import org.apache.qpid.server.store.TransactionLogResource;
-import org.apache.qpid.server.transport.AMQPConnection;
import org.apache.qpid.server.txn.AsyncAutoCommitTransaction;
import org.apache.qpid.server.txn.LocalTransaction;
import org.apache.qpid.server.txn.LocalTransaction.ActivityTimeAccessor;
@@ -728,20 +727,20 @@
ConsumerTarget_0_8 target;
EnumSet<ConsumerImpl.Option> options = EnumSet.noneOf(ConsumerImpl.Option.class);
-
+ final boolean multiQueue = sources.size()>1;
if(arguments != null && Boolean.TRUE.equals(arguments.get(AMQPFilterTypes.NO_CONSUME.getValue())))
{
- target = ConsumerTarget_0_8.createBrowserTarget(this, tag, arguments, _noAckCreditManager);
+ target = ConsumerTarget_0_8.createBrowserTarget(this, tag, arguments, _noAckCreditManager, multiQueue);
}
else if(acks)
{
- target = ConsumerTarget_0_8.createAckTarget(this, tag, arguments, _creditManager);
+ target = ConsumerTarget_0_8.createAckTarget(this, tag, arguments, _creditManager, multiQueue);
options.add(ConsumerImpl.Option.ACQUIRES);
options.add(ConsumerImpl.Option.SEES_REQUEUES);
}
else
{
- target = ConsumerTarget_0_8.createNoAckTarget(this, tag, arguments, _noAckCreditManager);
+ target = ConsumerTarget_0_8.createNoAckTarget(this, tag, arguments, _noAckCreditManager, multiQueue);
options.add(ConsumerImpl.Option.ACQUIRES);
options.add(ConsumerImpl.Option.SEES_REQUEUES);
}
@@ -1416,7 +1415,7 @@
}
@Override
- public AMQPConnection<?> getAMQPConnection()
+ public AMQPConnection_0_8 getAMQPConnection()
{
return _connection;
}
@@ -2142,13 +2141,8 @@
MessageSource queue1 = queueName == null ? getDefaultQueue() : vHost.getAttainedMessageSource(queueName);
final Collection<MessageSource> sources = new HashSet<>();
- if (queue1 != null)
- {
- sources.add(queue1);
- }
- else if (_connection.getContextProvider().getContextValue(Boolean.class, "qpid.enableMultiQueueConsumers")
- && arguments != null
- && arguments.get("x-multiqueue") instanceof Collection)
+
+ if (arguments != null && arguments.get("x-multiqueue") instanceof Collection)
{
for (Object object : (Collection<Object>) arguments.get("x-multiqueue"))
{
@@ -2170,6 +2164,11 @@
}
queueName = arguments.get("x-multiqueue").toString();
}
+ else if (queue1 != null)
+ {
+ sources.add(queue1);
+ }
+
if (sources.isEmpty())
{
@@ -3822,7 +3821,10 @@
{
for(ConsumerTarget_0_8 consumerTarget : getConsumerTargets())
{
- consumerTarget.notifyCurrentState();
+ if(!consumerTarget.isPullOnly())
+ {
+ consumerTarget.notifyCurrentState();
+ }
}
}
diff --git a/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/ConsumerTarget_0_8.java b/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/ConsumerTarget_0_8.java
index 557ac50..8736eca 100644
--- a/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/ConsumerTarget_0_8.java
+++ b/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/ConsumerTarget_0_8.java
@@ -20,8 +20,6 @@
*/
package org.apache.qpid.server.protocol.v0_8;
-import java.util.List;
-import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
@@ -56,16 +54,16 @@
private final AtomicLong _unacknowledgedCount = new AtomicLong(0);
private final AtomicLong _unacknowledgedBytes = new AtomicLong(0);
- private final List<ConsumerImpl> _consumers = new CopyOnWriteArrayList<>();
private final AtomicBoolean _needToClose = new AtomicBoolean();
private final String _targetAddress;
public static ConsumerTarget_0_8 createBrowserTarget(AMQChannel channel,
AMQShortString consumerTag, FieldTable filters,
- FlowCreditManager creditManager)
+ FlowCreditManager creditManager, final boolean multiQueue)
{
- return new BrowserConsumer(channel, consumerTag, filters, creditManager, channel.getClientDeliveryMethod(), channel.getRecordDeliveryMethod());
+ return new BrowserConsumer(channel, consumerTag, filters, creditManager, channel.getClientDeliveryMethod(), channel.getRecordDeliveryMethod(),
+ multiQueue);
}
public static ConsumerTarget_0_8 createGetNoAckTarget(final AMQChannel channel,
@@ -78,22 +76,18 @@
return new GetNoAckConsumer(channel, consumerTag, filters, creditManager, deliveryMethod, recordMethod);
}
- public List<ConsumerImpl> getConsumers()
- {
- return _consumers;
- }
-
-
static final class BrowserConsumer extends ConsumerTarget_0_8
{
public BrowserConsumer(AMQChannel channel,
- AMQShortString consumerTag, FieldTable filters,
+ AMQShortString consumerTag,
+ FieldTable filters,
FlowCreditManager creditManager,
ClientDeliveryMethod deliveryMethod,
- RecordDeliveryMethod recordMethod)
+ RecordDeliveryMethod recordMethod,
+ boolean multiQueue)
{
super(channel, consumerTag,
- filters, creditManager, deliveryMethod, recordMethod);
+ filters, creditManager, deliveryMethod, recordMethod, multiQueue);
}
/**
@@ -124,19 +118,12 @@
}
public static ConsumerTarget_0_8 createNoAckTarget(AMQChannel channel,
- AMQShortString consumerTag, FieldTable filters,
- FlowCreditManager creditManager)
- {
- return new NoAckConsumer(channel, consumerTag, filters, creditManager, channel.getClientDeliveryMethod(), channel.getRecordDeliveryMethod());
- }
-
- public static ConsumerTarget_0_8 createNoAckTarget(AMQChannel channel,
AMQShortString consumerTag, FieldTable filters,
FlowCreditManager creditManager,
- ClientDeliveryMethod deliveryMethod,
- RecordDeliveryMethod recordMethod) throws QpidException
+ boolean multiQueue)
{
- return new NoAckConsumer(channel, consumerTag, filters, creditManager, deliveryMethod, recordMethod);
+ return new NoAckConsumer(channel, consumerTag, filters, creditManager, channel.getClientDeliveryMethod(), channel.getRecordDeliveryMethod(),
+ multiQueue);
}
public static class NoAckConsumer extends ConsumerTarget_0_8
@@ -144,12 +131,14 @@
private final AutoCommitTransaction _txn;
public NoAckConsumer(AMQChannel channel,
- AMQShortString consumerTag, FieldTable filters,
+ AMQShortString consumerTag,
+ FieldTable filters,
FlowCreditManager creditManager,
ClientDeliveryMethod deliveryMethod,
- RecordDeliveryMethod recordMethod)
+ RecordDeliveryMethod recordMethod,
+ boolean multiQueue)
{
- super(channel, consumerTag, filters, creditManager, deliveryMethod, recordMethod);
+ super(channel, consumerTag, filters, creditManager, deliveryMethod, recordMethod, multiQueue);
_txn = new AutoCommitTransaction(channel.getAddressSpace().getMessageStore());
}
@@ -219,17 +208,24 @@
ClientDeliveryMethod deliveryMethod,
RecordDeliveryMethod recordMethod)
{
- super(channel, consumerTag, filters, creditManager, deliveryMethod, recordMethod);
+ super(channel, consumerTag, filters, creditManager, deliveryMethod, recordMethod, false);
}
}
public static ConsumerTarget_0_8 createAckTarget(AMQChannel channel,
- AMQShortString consumerTag, FieldTable filters,
- FlowCreditManager creditManager)
+ AMQShortString consumerTag,
+ FieldTable filters,
+ FlowCreditManager creditManager,
+ boolean multiQueue)
{
- return new AckConsumer(channel,consumerTag,filters,creditManager, channel.getClientDeliveryMethod(), channel.getRecordDeliveryMethod());
+ return new AckConsumer(channel,
+ consumerTag,
+ filters, creditManager,
+ channel.getClientDeliveryMethod(),
+ channel.getRecordDeliveryMethod(),
+ multiQueue);
}
@@ -239,7 +235,7 @@
ClientDeliveryMethod deliveryMethod,
RecordDeliveryMethod recordMethod)
{
- return new AckConsumer(channel,consumerTag,filters,creditManager, deliveryMethod, recordMethod);
+ return new AckConsumer(channel, consumerTag, filters, creditManager, deliveryMethod, recordMethod, false);
}
static final class AckConsumer extends ConsumerTarget_0_8
@@ -248,9 +244,10 @@
AMQShortString consumerTag, FieldTable filters,
FlowCreditManager creditManager,
ClientDeliveryMethod deliveryMethod,
- RecordDeliveryMethod recordMethod)
+ RecordDeliveryMethod recordMethod,
+ boolean multiQueue)
{
- super(channel, consumerTag, filters, creditManager, deliveryMethod, recordMethod);
+ super(channel, consumerTag, filters, creditManager, deliveryMethod, recordMethod, multiQueue);
}
/**
@@ -305,9 +302,10 @@
FieldTable arguments,
FlowCreditManager creditManager,
ClientDeliveryMethod deliveryMethod,
- RecordDeliveryMethod recordMethod)
+ RecordDeliveryMethod recordMethod,
+ boolean multiQueue)
{
- super(State.ACTIVE);
+ super(State.ACTIVE, isPullOnly(arguments), multiQueue, channel.getAMQPConnection());
_channel = channel;
_consumerTag = consumerTag;
@@ -346,21 +344,13 @@
}
}
- @Override
- public void consumerRemoved(final ConsumerImpl sub)
+ private static boolean isPullOnly(FieldTable arguments)
{
- _consumers.remove(sub);
- if(_consumers.isEmpty())
- {
- close();
- }
+ return arguments.containsKey(PULL_ONLY_CONSUMER)
+ && Boolean.valueOf(String.valueOf(arguments.get(PULL_ONLY_CONSUMER)));
}
- @Override
- public void consumerAdded(final ConsumerImpl sub)
- {
- _consumers.add( sub );
- }
+
@Override
public String getTargetAddress()
@@ -407,12 +397,6 @@
}
@Override
- protected void afterCloseInternal()
- {
-
- }
-
- @Override
protected void doCloseInternal()
{
_creditManager.removeListener(this);
diff --git a/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ConsumerTarget_1_0.java b/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ConsumerTarget_1_0.java
index 6905220..876f91a 100644
--- a/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ConsumerTarget_1_0.java
+++ b/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ConsumerTarget_1_0.java
@@ -21,6 +21,7 @@
package org.apache.qpid.server.protocol.v1_0;
import java.nio.ByteBuffer;
+import java.util.Arrays;
import java.util.Collection;
import org.slf4j.Logger;
@@ -33,6 +34,7 @@
import org.apache.qpid.server.protocol.v1_0.type.Binary;
import org.apache.qpid.server.protocol.v1_0.type.DeliveryState;
import org.apache.qpid.server.protocol.v1_0.type.Outcome;
+import org.apache.qpid.server.protocol.v1_0.type.Symbol;
import org.apache.qpid.server.protocol.v1_0.type.Target;
import org.apache.qpid.server.protocol.v1_0.type.UnsignedInteger;
import org.apache.qpid.server.protocol.v1_0.type.codec.AMQPDescribedTypeRegistry;
@@ -40,6 +42,7 @@
import org.apache.qpid.server.protocol.v1_0.type.messaging.Header;
import org.apache.qpid.server.protocol.v1_0.type.messaging.Modified;
import org.apache.qpid.server.protocol.v1_0.type.messaging.Released;
+import org.apache.qpid.server.protocol.v1_0.type.messaging.Source;
import org.apache.qpid.server.protocol.v1_0.type.transaction.TransactionalState;
import org.apache.qpid.server.protocol.v1_0.type.transport.SenderSettleMode;
import org.apache.qpid.server.protocol.v1_0.type.transport.Transfer;
@@ -67,22 +70,22 @@
private Binary _transactionId;
private final AMQPDescribedTypeRegistry _typeRegistry;
private final SectionEncoder _sectionEncoder;
- private ConsumerImpl _consumer;
private boolean _queueEmpty;
public ConsumerTarget_1_0(final SendingLink_1_0 link,
boolean acquires)
{
- super(State.SUSPENDED);
+ super(State.SUSPENDED, isPullOnly(link), false, link.getSession().getAMQPConnection());
_link = link;
_typeRegistry = link.getEndpoint().getSession().getConnection().getDescribedTypeRegistry();
_sectionEncoder = new SectionEncoderImpl(_typeRegistry);
_acquires = acquires;
}
- public ConsumerImpl getConsumer()
+ private static boolean isPullOnly(SendingLink_1_0 link)
{
- return _consumer;
+ Source source = (Source) link.getEndpoint().getSource();
+ return Arrays.asList(source.getCapabilities()).contains(Symbol.getSymbol("QPID:PULL-ONLY"));
}
private SendingLinkEndpoint getEndpoint()
@@ -98,12 +101,6 @@
}
@Override
- protected void afterCloseInternal()
- {
-
- }
-
- @Override
protected void doCloseInternal()
{
@@ -212,7 +209,7 @@
else
{
UnsettledAction action = _acquires
- ? new DispositionAction(tag, entry)
+ ? new DispositionAction(tag, entry, consumer)
: new DoNothingAction(tag, entry);
_link.addUnsettled(tag, action, entry);
@@ -239,7 +236,7 @@
public void onRollback()
{
- entry.release(getConsumer());
+ entry.release(consumer);
_link.getEndpoint().updateDisposition(tag, (DeliveryState) null, true);
}
});
@@ -251,7 +248,7 @@
}
else
{
- entry.release(getConsumer());
+ entry.release(consumer);
}
}
@@ -332,7 +329,10 @@
public void flush()
{
- _consumer.flush();
+ for(ConsumerImpl consumer : getConsumers())
+ {
+ consumer.flush();
+ }
}
private class DispositionAction implements UnsettledAction
@@ -340,11 +340,18 @@
private final MessageInstance _queueEntry;
private final Binary _deliveryTag;
+ private final ConsumerImpl _consumer;
- public DispositionAction(Binary tag, MessageInstance queueEntry)
+ public DispositionAction(Binary tag, MessageInstance queueEntry, final ConsumerImpl consumer)
{
_deliveryTag = tag;
_queueEntry = queueEntry;
+ _consumer = consumer;
+ }
+
+ public ConsumerImpl getConsumer()
+ {
+ return _consumer;
}
public boolean process(DeliveryState state, final Boolean settled)
@@ -516,18 +523,6 @@
}
@Override
- public void consumerAdded(final ConsumerImpl sub)
- {
- _consumer = sub;
- }
-
- @Override
- public void consumerRemoved(final ConsumerImpl sub)
- {
- close();
- }
-
- @Override
public long getUnacknowledgedBytes()
{
// TODO
diff --git a/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Session_1_0.java b/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Session_1_0.java
index 4ef66ee..744f009 100644
--- a/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Session_1_0.java
+++ b/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Session_1_0.java
@@ -1581,7 +1581,11 @@
{
for(SendingLink_1_0 link : _sendingLinks)
{
- link.getConsumerTarget().notifyCurrentState();
+ ConsumerTarget_1_0 consumerTarget = link.getConsumerTarget();
+ if(!consumerTarget.isPullOnly())
+ {
+ consumerTarget.notifyCurrentState();
+ }
}
}
diff --git a/broker-plugins/management-amqp/src/main/java/org/apache/qpid/server/management/amqp/ManagementNodeConsumer.java b/broker-plugins/management-amqp/src/main/java/org/apache/qpid/server/management/amqp/ManagementNodeConsumer.java
index e018070..9890992 100644
--- a/broker-plugins/management-amqp/src/main/java/org/apache/qpid/server/management/amqp/ManagementNodeConsumer.java
+++ b/broker-plugins/management-amqp/src/main/java/org/apache/qpid/server/management/amqp/ManagementNodeConsumer.java
@@ -67,6 +67,18 @@
}
@Override
+ public boolean hasAvailableMessages()
+ {
+ return !_queue.isEmpty();
+ }
+
+ @Override
+ public void pullMessage()
+ {
+
+ }
+
+ @Override
public long getBytesOut()
{
return 0;