QPID-7642 : Add experimental pull consumers
diff --git a/broker-core/src/main/java/org/apache/qpid/server/consumer/AbstractConsumerTarget.java b/broker-core/src/main/java/org/apache/qpid/server/consumer/AbstractConsumerTarget.java
index 73883f9..b936b52 100644
--- a/broker-core/src/main/java/org/apache/qpid/server/consumer/AbstractConsumerTarget.java
+++ b/broker-core/src/main/java/org/apache/qpid/server/consumer/AbstractConsumerTarget.java
@@ -21,26 +21,32 @@
 package org.apache.qpid.server.consumer;
 
 import java.util.ArrayList;
+import java.util.Iterator;
 import java.util.List;
 import java.util.Set;
 import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.concurrent.CopyOnWriteArrayList;
 import java.util.concurrent.CopyOnWriteArraySet;
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.atomic.AtomicReference;
 import java.util.concurrent.locks.Lock;
 import java.util.concurrent.locks.ReentrantLock;
 
-
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import org.apache.qpid.server.logging.LogSubject;
+import org.apache.qpid.server.logging.messages.SubscriptionMessages;
 import org.apache.qpid.server.message.MessageInstance;
+import org.apache.qpid.server.model.Consumer;
+import org.apache.qpid.server.queue.SuspendedConsumerLoggingTicker;
 import org.apache.qpid.server.transport.AMQPConnection;
 import org.apache.qpid.server.util.StateChangeListener;
 
-public abstract class AbstractConsumerTarget implements ConsumerTarget
+public abstract class AbstractConsumerTarget implements ConsumerTarget, LogSubject
 {
     private static final Logger LOGGER = LoggerFactory.getLogger(AbstractConsumerTarget.class);
+    protected static final String PULL_ONLY_CONSUMER = "x-pull-only";
     private final AtomicReference<State> _state;
 
     private final Set<StateChangeListener<ConsumerTarget, State>> _stateChangeListeners = new
@@ -48,12 +54,39 @@
 
     private final Lock _stateChangeLock = new ReentrantLock();
     private final AtomicInteger _stateActivates = new AtomicInteger();
+    private final boolean _isMultiQueue;
+    private final SuspendedConsumerLoggingTicker _suspendedConsumerLoggingTicker;
     private ConcurrentLinkedQueue<ConsumerMessageInstancePair> _queue = new ConcurrentLinkedQueue();
+    private final List<ConsumerImpl> _consumers = new CopyOnWriteArrayList<>();
+
+    private final boolean _isPullOnly;
+    private Iterator<ConsumerImpl> _pullIterator;
 
 
-    protected AbstractConsumerTarget(final State initialState)
+    protected AbstractConsumerTarget(final State initialState,
+                                     final boolean isPullOnly,
+                                     final boolean isMultiQueue,
+                                     final AMQPConnection<?> amqpConnection)
     {
         _state = new AtomicReference<State>(initialState);
+        _isPullOnly = isPullOnly;
+        _isMultiQueue = isMultiQueue;
+        _suspendedConsumerLoggingTicker = isMultiQueue
+                ? new SuspendedConsumerLoggingTicker(amqpConnection.getContextValue(Long.class, Consumer.SUSPEND_NOTIFICATION_PERIOD))
+                {
+                    @Override
+                    protected void log(final long period)
+                    {
+                        amqpConnection.getEventLogger().message(AbstractConsumerTarget.this, SubscriptionMessages.STATE(period));
+                    }
+                }
+                : null;
+
+    }
+
+    public boolean isMultiQueue()
+    {
+        return _isMultiQueue;
     }
 
     @Override
@@ -63,9 +96,8 @@
         {
             return false;
         }
-        if(hasMessagesToSend())
+        if(sendNextMessage())
         {
-            sendNextMessage();
             return true;
         }
         else
@@ -91,6 +123,28 @@
     protected abstract void processClosed();
 
     @Override
+    public void consumerAdded(final ConsumerImpl sub)
+    {
+        _consumers.add(sub);
+    }
+
+    @Override
+    public void consumerRemoved(final ConsumerImpl sub)
+    {
+        _consumers.remove(sub);
+        if(_consumers.isEmpty())
+        {
+            close();
+        }
+    }
+
+    public List<ConsumerImpl> getConsumers()
+    {
+        return _consumers;
+    }
+
+
+    @Override
     public final boolean isSuspended()
     {
         return getSessionModel().getAMQPConnection().isMessageAssignmentSuspended() || isFlowSuspended();
@@ -143,9 +197,24 @@
             }
             else
             {
-                for (StateChangeListener<ConsumerTarget, State> listener : _stateChangeListeners)
+                if(!_stateChangeListeners.isEmpty())
                 {
-                    listener.stateChanged(this, from, to);
+                    for (StateChangeListener<ConsumerTarget, State> listener : _stateChangeListeners)
+                    {
+                        listener.stateChanged(this, from, to);
+                    }
+                }
+            }
+            if(_suspendedConsumerLoggingTicker != null)
+            {
+                if (to == State.SUSPENDED)
+                {
+                    _suspendedConsumerLoggingTicker.setStartTime(System.currentTimeMillis());
+                    getSessionModel().addTicker(_suspendedConsumerLoggingTicker);
+                }
+                else
+                {
+                    getSessionModel().removeTicker(_suspendedConsumerLoggingTicker);
                 }
             }
             return true;
@@ -193,6 +262,12 @@
     }
 
     @Override
+    public boolean isPullOnly()
+    {
+        return _isPullOnly;
+    }
+
+    @Override
     public final long send(final ConsumerImpl consumer, MessageInstance entry, boolean batch)
     {
         AMQPConnection<?> amqpConnection = getSessionModel().getAMQPConnection();
@@ -207,12 +282,39 @@
     @Override
     public boolean hasMessagesToSend()
     {
-        return !_queue.isEmpty();
+        return !_queue.isEmpty() || (isPullOnly() && messagesAvailable());
+    }
+
+    private boolean messagesAvailable()
+    {
+        if(hasCredit())
+        {
+            for (ConsumerImpl consumer : _consumers)
+            {
+                if (consumer.hasAvailableMessages())
+                {
+                    return true;
+                }
+            }
+        }
+        return false;
     }
 
     @Override
-    public void sendNextMessage()
+    public boolean sendNextMessage()
     {
+        if(isPullOnly())
+        {
+            if(_pullIterator == null || !_pullIterator.hasNext())
+            {
+                _pullIterator = getConsumers().iterator();
+            }
+            if(_pullIterator.hasNext())
+            {
+                ConsumerImpl consumer = _pullIterator.next();
+                consumer.pullMessage();
+            }
+        }
         ConsumerMessageInstancePair consumerMessage = _queue.poll();
         if (consumerMessage != null)
         {
@@ -233,7 +335,13 @@
             {
                 consumerMessage.release();
             }
+            return true;
         }
+        else
+        {
+            return false;
+        }
+
 
     }
 
@@ -267,12 +375,26 @@
             releaseSendLock();
         }
 
-        afterCloseInternal();
+        for (ConsumerImpl consumer : _consumers)
+        {
+            consumer.close();
+        }
+        if(_suspendedConsumerLoggingTicker != null)
+        {
+            getSessionModel().removeTicker(_suspendedConsumerLoggingTicker);
+        }
+
         return closed;
 
     }
 
-    protected abstract void afterCloseInternal();
+    @Override
+    public String toLogString()
+    {
+
+        return "[(** Multi-Queue **)] ";
+    }
+
 
     protected abstract void doCloseInternal();
 }
diff --git a/broker-core/src/main/java/org/apache/qpid/server/consumer/ConsumerImpl.java b/broker-core/src/main/java/org/apache/qpid/server/consumer/ConsumerImpl.java
index fd6f338..cf91f75 100644
--- a/broker-core/src/main/java/org/apache/qpid/server/consumer/ConsumerImpl.java
+++ b/broker-core/src/main/java/org/apache/qpid/server/consumer/ConsumerImpl.java
@@ -33,6 +33,10 @@
 
     ConsumerTarget getTarget();
 
+    boolean hasAvailableMessages();
+
+    void pullMessage();
+
     enum Option
     {
         ACQUIRES,
diff --git a/broker-core/src/main/java/org/apache/qpid/server/consumer/ConsumerTarget.java b/broker-core/src/main/java/org/apache/qpid/server/consumer/ConsumerTarget.java
index 3d9cef8..7b9ac65 100644
--- a/broker-core/src/main/java/org/apache/qpid/server/consumer/ConsumerTarget.java
+++ b/broker-core/src/main/java/org/apache/qpid/server/consumer/ConsumerTarget.java
@@ -41,6 +41,8 @@
 
     boolean hasCredit();
 
+    boolean isMultiQueue();
+
     enum State
     {
         ACTIVE, SUSPENDED, CLOSED
@@ -66,7 +68,7 @@
 
     boolean hasMessagesToSend();
 
-    void sendNextMessage();
+    boolean sendNextMessage();
 
     void flushBatched();
 
@@ -88,4 +90,6 @@
 
     void releaseSendLock();
 
+    boolean isPullOnly();
+
 }
diff --git a/broker-core/src/main/java/org/apache/qpid/server/queue/AbstractQueue.java b/broker-core/src/main/java/org/apache/qpid/server/queue/AbstractQueue.java
index 6221da5..49e2fab 100644
--- a/broker-core/src/main/java/org/apache/qpid/server/queue/AbstractQueue.java
+++ b/broker-core/src/main/java/org/apache/qpid/server/queue/AbstractQueue.java
@@ -290,6 +290,7 @@
     private final ConcurrentMap<String, Callable<MessageFilter>> _defaultFiltersMap = new ConcurrentHashMap<>();
     private final List<HoldMethod> _holdMethods = new CopyOnWriteArrayList<>();
     private Map<String, String> _mimeTypeToFileExtension = Collections.emptyMap();
+    private volatile boolean _hasPullOnlyConsumers;
 
     private interface HoldMethod
     {
@@ -945,6 +946,10 @@
 
         if (!isDeleted())
         {
+            if(consumer.isPullOnly())
+            {
+                _hasPullOnlyConsumers = true;
+            }
             _consumerList.add(consumer);
 
             if (isDeleted())
@@ -960,7 +965,14 @@
         childAdded(consumer);
         consumer.addChangeListener(_deletedChildListener);
 
-        deliverAsync();
+        if(consumer.isPullOnly())
+        {
+            consumer.getSessionModel().getAMQPConnection().notifyWork();
+        }
+        else
+        {
+            deliverAsync();
+        }
 
         return consumer;
     }
@@ -1001,6 +1013,18 @@
                 resetSubPointersForGroups(consumer);
             }
 
+            if(consumer.isPullOnly())
+            {
+                boolean hasOnlyPushConsumers = true;
+                ConsumerNode consumerNode = _consumerList.getHead().findNext();
+                while (consumerNode != null && hasOnlyPushConsumers)
+                {
+                    hasOnlyPushConsumers = !consumerNode.getConsumer().isPullOnly();
+                    consumerNode = consumerNode.findNext();
+                }
+                _hasPullOnlyConsumers = !hasOnlyPushConsumers;
+            }
+
             // auto-delete queues must be deleted if there are no remaining subscribers
 
             if(!consumer.isTransient()
@@ -1071,7 +1095,7 @@
                 updateSubRequeueEntry(sub, entry);
             }
         }
-
+        notifyPullOnlyConsumers();
         deliverAsync();
     }
 
@@ -1224,7 +1248,7 @@
             if (entry.isAvailable())
             {
                 checkConsumersNotAheadOfDelivery(entry);
-
+                notifyPullOnlyConsumers();
                 deliverAsync();
             }
 
@@ -1518,7 +1542,7 @@
                 updateSubRequeueEntry(sub, entry);
             }
         }
-
+        notifyPullOnlyConsumers();
         deliverAsync();
 
     }
@@ -1713,9 +1737,16 @@
             if (oldState != State.ACTIVE)
             {
                 _activeSubscriberCount.incrementAndGet();
+                if(sub.isPullOnly())
+                {
+                    sub.getSessionModel().getAMQPConnection().notifyWork();
+                }
 
             }
-            deliverAsync();
+            if(!sub.isPullOnly())
+            {
+                deliverAsync();
+            }
         }
     }
 
@@ -2138,6 +2169,23 @@
 
     }
 
+    void notifyPullOnlyConsumers()
+    {
+        if(_hasPullOnlyConsumers)
+        {
+            ConsumerNode consumerNode = _consumerList.getHead().findNext();
+            while (consumerNode != null)
+            {
+                QueueConsumer<?> consumer = consumerNode.getConsumer();
+                if (consumer.isActive() && consumer.isPullOnly() && getNextAvailableEntry(consumer) != null)
+                {
+                    consumer.getSessionModel().getAMQPConnection().notifyWork();
+                }
+                consumerNode = consumerNode.findNext();
+            }
+        }
+    }
+
     void flushConsumer(QueueConsumer<?> sub)
     {
 
@@ -2384,6 +2432,12 @@
         }
     }
 
+
+    boolean hasAvailableMessages(final QueueConsumer queueConsumer)
+    {
+        return getNextAvailableEntry(queueConsumer) != null;
+    }
+
     /**
      * Used by queue Runners to asynchronously deliver messages to consumers.
      *
@@ -2450,44 +2504,46 @@
 
                 QueueConsumer<?> sub = consumerNodeIterator.getNode().getConsumer();
 
-                sub.getSendLock();
-
-                try
+                if(!sub.isPullOnly())
                 {
-                    for(int i = 0 ; i < perSub; i++)
+                    sub.getSendLock();
+
+                    try
                     {
-                        //attempt delivery. returns true if no further delivery currently possible to this sub
-                        consumerDone = attemptDelivery(sub, true);
-                        if (consumerDone)
+                        for (int i = 0; i < perSub; i++)
                         {
-                            sub.flushBatched();
-                            boolean noMore = getNextAvailableEntry(sub) == null;
-                            if (lastLoop && noMore)
-                            {
-                                sub.queueEmpty();
-                            }
-                            break;
-                        }
-                        else
-                        {
-                            //this consumer can accept additional deliveries, so we must
-                            //keep going after this (if iteration slicing allows it)
-                            allConsumersDone = false;
-                            lastLoop = false;
-                            if(--iterations == 0)
+                            //attempt delivery. returns true if no further delivery currently possible to this sub
+                            consumerDone = attemptDelivery(sub, true);
+                            if (consumerDone)
                             {
                                 sub.flushBatched();
+                                boolean noMore = getNextAvailableEntry(sub) == null;
+                                if (lastLoop && noMore)
+                                {
+                                    sub.queueEmpty();
+                                }
                                 break;
                             }
+                            else
+                            {
+                                //this consumer can accept additional deliveries, so we must
+                                //keep going after this (if iteration slicing allows it)
+                                allConsumersDone = false;
+                                lastLoop = false;
+                                if (--iterations == 0)
+                                {
+                                    sub.flushBatched();
+                                    break;
+                                }
+                            }
                         }
 
+                        sub.flushBatched();
                     }
-
-                    sub.flushBatched();
-                }
-                finally
-                {
-                    sub.releaseSendLock();
+                    finally
+                    {
+                        sub.releaseSendLock();
+                    }
                 }
             }
 
diff --git a/broker-core/src/main/java/org/apache/qpid/server/queue/QueueConsumer.java b/broker-core/src/main/java/org/apache/qpid/server/queue/QueueConsumer.java
index e7bee4f..771097a 100644
--- a/broker-core/src/main/java/org/apache/qpid/server/queue/QueueConsumer.java
+++ b/broker-core/src/main/java/org/apache/qpid/server/queue/QueueConsumer.java
@@ -53,4 +53,6 @@
     void awaitCredit(QueueEntry entry);
 
     boolean hasCredit();
+
+    boolean isPullOnly();
 }
diff --git a/broker-core/src/main/java/org/apache/qpid/server/queue/QueueConsumerImpl.java b/broker-core/src/main/java/org/apache/qpid/server/queue/QueueConsumerImpl.java
index adedebd..804bde1 100644
--- a/broker-core/src/main/java/org/apache/qpid/server/queue/QueueConsumerImpl.java
+++ b/broker-core/src/main/java/org/apache/qpid/server/queue/QueueConsumerImpl.java
@@ -120,7 +120,8 @@
     private int _priority;
 
     QueueConsumerImpl(final AbstractQueue<?> queue,
-                      ConsumerTarget target, final String consumerName,
+                      ConsumerTarget target,
+                      final String consumerName,
                       final FilterManager filters,
                       final Class<? extends ServerMessage> messageClass,
                       EnumSet<Option> optionSet,
@@ -157,14 +158,16 @@
         };
         _target.addStateListener(_listener);
 
-        _suspendedConsumerLoggingTicker = new SuspendedConsumerLoggingTicker(queue.getContextValue(Long.class, Consumer.SUSPEND_NOTIFICATION_PERIOD))
-        {
-            @Override
-            protected void log(final long period)
-            {
-                getEventLogger().message(getLogSubject(), SubscriptionMessages.STATE(period));
-            }
-        };
+        _suspendedConsumerLoggingTicker = target.isMultiQueue()
+                ? null
+                : new SuspendedConsumerLoggingTicker(queue.getContextValue(Long.class, Consumer.SUSPEND_NOTIFICATION_PERIOD))
+                    {
+                        @Override
+                        protected void log(final long period)
+                        {
+                            getEventLogger().message(getLogSubject(), SubscriptionMessages.STATE(period));
+                        }
+                    };
     }
 
     private static Map<String, Object> createAttributeMap(String name,
@@ -213,15 +216,17 @@
                 }
             }
 
-
-            if(newState == ConsumerTarget.State.SUSPENDED)
+            if(_suspendedConsumerLoggingTicker != null)
             {
-                _suspendedConsumerLoggingTicker.setStartTime(System.currentTimeMillis());
-                getSessionModel().addTicker(_suspendedConsumerLoggingTicker);
-            }
-            else
-            {
-                getSessionModel().removeTicker(_suspendedConsumerLoggingTicker);
+                if (newState == ConsumerTarget.State.SUSPENDED)
+                {
+                    _suspendedConsumerLoggingTicker.setStartTime(System.currentTimeMillis());
+                    getSessionModel().addTicker(_suspendedConsumerLoggingTicker);
+                }
+                else
+                {
+                    getSessionModel().removeTicker(_suspendedConsumerLoggingTicker);
+                }
             }
         }
 
@@ -257,7 +262,20 @@
     @Override
     public void externalStateChange()
     {
-        _queue.deliverAsync();
+        if(isPullOnly())
+        {
+            getSessionModel().getAMQPConnection().notifyWork();
+        }
+        else
+        {
+            _queue.deliverAsync();
+        }
+    }
+
+    @Override
+    public boolean hasAvailableMessages()
+    {
+        return !_queue.isEmpty() && _queue.hasAvailableMessages(this);
     }
 
     @Override
@@ -302,6 +320,10 @@
                 _target.consumerRemoved(this);
                 _target.removeStateChangeListener(_listener);
                 _queue.unregisterConsumer(this);
+                if(_suspendedConsumerLoggingTicker != null)
+                {
+                    getSessionModel().removeTicker(_suspendedConsumerLoggingTicker);
+                }
                 deleted();
             }
             finally
@@ -323,6 +345,12 @@
         _target.flushBatched();
     }
 
+    @Override
+    public boolean isPullOnly()
+    {
+        return _target.isPullOnly();
+    }
+
     public void queueDeleted()
     {
         _target.queueDeleted();
@@ -385,6 +413,22 @@
 
     }
 
+    @Override
+    public void pullMessage()
+    {
+        AMQPConnection<?> connection = _target.getSessionModel().getAMQPConnection();
+        try
+        {
+            connection.alwaysAllowMessageAssignmentInThisThreadIfItIsIOThread(true);
+            _queue.flushConsumer(this, 1);
+        }
+        finally
+        {
+            connection.alwaysAllowMessageAssignmentInThisThreadIfItIsIOThread(false);
+        }
+
+    }
+
     public boolean resend(final QueueEntry entry)
     {
         boolean messageWasResent = getQueue().resend(entry, this);
@@ -661,7 +705,14 @@
             entry.addStateChangeListener(this);
             if(!entry.isAvailable())
             {
-                _queue.deliverAsync();
+                if(isPullOnly())
+                {
+                    getSessionModel().getAMQPConnection().notifyWork();
+                }
+                else
+                {
+                    _queue.deliverAsync();
+                }
                 remove();
             }
         }
@@ -681,7 +732,14 @@
         {
             entry.removeStateChangeListener(this);
             _entry.compareAndSet(entry, null);
-            _queue.deliverAsync();
+            if(isPullOnly())
+            {
+                getSessionModel().getAMQPConnection().notifyWork();
+            }
+            else
+            {
+                _queue.deliverAsync();
+            }
         }
 
     }
diff --git a/broker-core/src/main/java/org/apache/qpid/server/transport/AMQPConnection.java b/broker-core/src/main/java/org/apache/qpid/server/transport/AMQPConnection.java
index 5b6415d..2f96140 100644
--- a/broker-core/src/main/java/org/apache/qpid/server/transport/AMQPConnection.java
+++ b/broker-core/src/main/java/org/apache/qpid/server/transport/AMQPConnection.java
@@ -28,11 +28,12 @@
 import javax.security.auth.Subject;
 
 import org.apache.qpid.protocol.AMQConstant;
+import org.apache.qpid.server.logging.EventLoggerProvider;
 import org.apache.qpid.server.model.Connection;
 import org.apache.qpid.server.protocol.AMQSessionModel;
 import org.apache.qpid.server.util.Deletable;
 
-public interface AMQPConnection<C extends AMQPConnection<C>> extends Connection<C>, Deletable<C>
+public interface AMQPConnection<C extends AMQPConnection<C>> extends Connection<C>, Deletable<C>, EventLoggerProvider
 {
     boolean isMessageAssignmentSuspended();
 
diff --git a/broker-core/src/main/java/org/apache/qpid/server/virtualhost/AbstractSystemMessageSource.java b/broker-core/src/main/java/org/apache/qpid/server/virtualhost/AbstractSystemMessageSource.java
index 6c27737..4a69f00 100644
--- a/broker-core/src/main/java/org/apache/qpid/server/virtualhost/AbstractSystemMessageSource.java
+++ b/broker-core/src/main/java/org/apache/qpid/server/virtualhost/AbstractSystemMessageSource.java
@@ -140,6 +140,18 @@
         }
 
         @Override
+        public boolean hasAvailableMessages()
+        {
+            return !_queue.isEmpty();
+        }
+
+        @Override
+        public void pullMessage()
+        {
+
+        }
+
+        @Override
         public long getBytesOut()
         {
             return 0;
diff --git a/broker-core/src/test/java/org/apache/qpid/server/consumer/MockConsumer.java b/broker-core/src/test/java/org/apache/qpid/server/consumer/MockConsumer.java
index 0b26f41..50ade31 100644
--- a/broker-core/src/test/java/org/apache/qpid/server/consumer/MockConsumer.java
+++ b/broker-core/src/test/java/org/apache/qpid/server/consumer/MockConsumer.java
@@ -154,9 +154,9 @@
     }
 
     @Override
-    public void sendNextMessage()
+    public boolean sendNextMessage()
     {
-
+        return false;
     }
 
     public void flushBatched()
@@ -278,6 +278,17 @@
         _stateChangeLock.unlock();
     }
 
+    @Override
+    public boolean isPullOnly()
+    {
+        return false;
+    }
+
+    @Override
+    public boolean isMultiQueue()
+    {
+        return false;
+    }
 
     private static class MockSessionModel implements AMQSessionModel<MockSessionModel>
     {
diff --git a/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ConsumerTarget_0_10.java b/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ConsumerTarget_0_10.java
index 396879a..cfb6770 100644
--- a/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ConsumerTarget_0_10.java
+++ b/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ConsumerTarget_0_10.java
@@ -22,9 +22,7 @@
 
 import java.io.IOException;
 import java.util.Collection;
-import java.util.List;
 import java.util.Map;
-import java.util.concurrent.CopyOnWriteArrayList;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicLong;
 
@@ -88,7 +86,6 @@
 
     private int _deferredMessageCredit;
     private long _deferredSizeCredit;
-    private final List<ConsumerImpl> _consumers = new CopyOnWriteArrayList<>();
 
     private final StateChangeListener<MessageInstance, EntryState> _unacknowledgedMessageListener = new StateChangeListener<MessageInstance, EntryState>()
     {
@@ -116,9 +113,10 @@
                                MessageAcquireMode acquireMode,
                                MessageFlowMode flowMode,
                                FlowCreditManager_0_10 creditManager,
-                               Map<String, Object> arguments)
+                               Map<String, Object> arguments,
+                               boolean multiQueue)
     {
-        super(State.SUSPENDED);
+        super(State.SUSPENDED, isPullOnly(arguments), multiQueue, session.getAMQPConnection());
         _session = session;
         _postIdSettingAction = new AddMessageDispositionListenerAction(session);
         _acceptMode = acceptMode;
@@ -137,6 +135,12 @@
         }
     }
 
+    private static boolean isPullOnly(Map<String, Object> arguments)
+    {
+        return arguments.containsKey(PULL_ONLY_CONSUMER)
+               && Boolean.valueOf(String.valueOf(arguments.get(PULL_ONLY_CONSUMER)));
+    }
+
     @Override
     public boolean isFlowSuspended()
     {
@@ -145,16 +149,6 @@
     }
 
     @Override
-    protected void afterCloseInternal()
-    {
-
-        for (ConsumerImpl consumer : _consumers)
-        {
-            consumer.close();
-        }
-    }
-
-    @Override
     protected void doCloseInternal()
     {
         _creditManager.removeListener(this);
@@ -620,7 +614,7 @@
     public void flush()
     {
         flushCreditState(true);
-        for(ConsumerImpl consumer : _consumers)
+        for(ConsumerImpl consumer : getConsumers())
         {
             consumer.flush();
         }
@@ -651,22 +645,6 @@
         return _targetAddress;
     }
 
-    @Override
-    public void consumerAdded(final ConsumerImpl sub)
-    {
-        _consumers.add(sub);
-    }
-
-    @Override
-    public void consumerRemoved(final ConsumerImpl sub)
-    {
-        _consumers.remove(sub);
-        if(_consumers.isEmpty())
-        {
-            close();
-        }
-    }
-
     public long getUnacknowledgedBytes()
     {
         return _unacknowledgedBytes.longValue();
diff --git a/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSession.java b/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSession.java
index 4925ddf..8afa6aa 100644
--- a/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSession.java
+++ b/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSession.java
@@ -1255,7 +1255,10 @@
         Collection<ConsumerTarget_0_10> consumerTargets = getSubscriptions();
         for(ConsumerTarget_0_10 consumerTarget: consumerTargets)
         {
-            consumerTarget.notifyCurrentState();
+            if(!consumerTarget.isPullOnly())
+            {
+                consumerTarget.notifyCurrentState();
+            }
         }
     }
 
diff --git a/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSessionDelegate.java b/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSessionDelegate.java
index 99ac3d0..898edd6 100644
--- a/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSessionDelegate.java
+++ b/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSessionDelegate.java
@@ -217,13 +217,8 @@
 
                 final Collection<MessageSource> sources = new HashSet<>();
                 final MessageSource queue = addressSpace.getAttainedMessageSource(queueName);
-                if(queue != null)
-                {
-                    sources.add(queue);
-                }
-                else if(getContextValue(session, Boolean.class, "qpid.enableMultiQueueConsumers")
-                        && method.getArguments() != null
-                        && method.getArguments().get("x-multiqueue") instanceof Collection)
+
+                if(method.getArguments() != null && method.getArguments().get("x-multiqueue") instanceof Collection)
                 {
                     for(Object object : (Collection<Object>)method.getArguments().get("x-multiqueue"))
                     {
@@ -245,6 +240,10 @@
                     }
                     queueName = method.getArguments().get("x-multiqueue").toString();
                 }
+                else if(queue != null)
+                {
+                    sources.add(queue);
+                }
 
                 if(sources.isEmpty())
                 {
@@ -306,13 +305,14 @@
 
                     }
 
-
+                    boolean multiQueue = sources.size()>1;
                     ConsumerTarget_0_10 target = new ConsumerTarget_0_10((ServerSession)session, destination,
-                                                                                 method.getAcceptMode(),
-                                                                                 method.getAcquireMode(),
-                                                                                 MessageFlowMode.WINDOW,
-                                                                                 creditManager,
-                                                                                 method.getArguments()
+                                                                         method.getAcceptMode(),
+                                                                         method.getAcquireMode(),
+                                                                         MessageFlowMode.WINDOW,
+                                                                         creditManager,
+                                                                         method.getArguments(),
+                                                                         multiQueue
                     );
 
                     Integer priority = null;
diff --git a/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java b/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java
index e1530fe..ccfb99e 100644
--- a/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java
+++ b/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java
@@ -103,7 +103,6 @@
 import org.apache.qpid.server.store.MessageStore;
 import org.apache.qpid.server.store.StoredMessage;
 import org.apache.qpid.server.store.TransactionLogResource;
-import org.apache.qpid.server.transport.AMQPConnection;
 import org.apache.qpid.server.txn.AsyncAutoCommitTransaction;
 import org.apache.qpid.server.txn.LocalTransaction;
 import org.apache.qpid.server.txn.LocalTransaction.ActivityTimeAccessor;
@@ -728,20 +727,20 @@
 
         ConsumerTarget_0_8 target;
         EnumSet<ConsumerImpl.Option> options = EnumSet.noneOf(ConsumerImpl.Option.class);
-
+        final boolean multiQueue = sources.size()>1;
         if(arguments != null && Boolean.TRUE.equals(arguments.get(AMQPFilterTypes.NO_CONSUME.getValue())))
         {
-            target = ConsumerTarget_0_8.createBrowserTarget(this, tag, arguments, _noAckCreditManager);
+            target = ConsumerTarget_0_8.createBrowserTarget(this, tag, arguments, _noAckCreditManager, multiQueue);
         }
         else if(acks)
         {
-            target = ConsumerTarget_0_8.createAckTarget(this, tag, arguments, _creditManager);
+            target = ConsumerTarget_0_8.createAckTarget(this, tag, arguments, _creditManager, multiQueue);
             options.add(ConsumerImpl.Option.ACQUIRES);
             options.add(ConsumerImpl.Option.SEES_REQUEUES);
         }
         else
         {
-            target = ConsumerTarget_0_8.createNoAckTarget(this, tag, arguments, _noAckCreditManager);
+            target = ConsumerTarget_0_8.createNoAckTarget(this, tag, arguments, _noAckCreditManager, multiQueue);
             options.add(ConsumerImpl.Option.ACQUIRES);
             options.add(ConsumerImpl.Option.SEES_REQUEUES);
         }
@@ -1416,7 +1415,7 @@
     }
 
     @Override
-    public AMQPConnection<?> getAMQPConnection()
+    public AMQPConnection_0_8 getAMQPConnection()
     {
         return _connection;
     }
@@ -2142,13 +2141,8 @@
 
         MessageSource queue1 = queueName == null ? getDefaultQueue() : vHost.getAttainedMessageSource(queueName);
         final Collection<MessageSource> sources = new HashSet<>();
-        if (queue1 != null)
-        {
-            sources.add(queue1);
-        }
-        else if (_connection.getContextProvider().getContextValue(Boolean.class, "qpid.enableMultiQueueConsumers")
-                 && arguments != null
-                 && arguments.get("x-multiqueue") instanceof Collection)
+
+        if (arguments != null && arguments.get("x-multiqueue") instanceof Collection)
         {
             for (Object object : (Collection<Object>) arguments.get("x-multiqueue"))
             {
@@ -2170,6 +2164,11 @@
             }
             queueName = arguments.get("x-multiqueue").toString();
         }
+        else if (queue1 != null)
+        {
+            sources.add(queue1);
+        }
+
 
         if (sources.isEmpty())
         {
@@ -3822,7 +3821,10 @@
     {
         for(ConsumerTarget_0_8 consumerTarget : getConsumerTargets())
         {
-            consumerTarget.notifyCurrentState();
+            if(!consumerTarget.isPullOnly())
+            {
+                consumerTarget.notifyCurrentState();
+            }
         }
     }
 
diff --git a/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/ConsumerTarget_0_8.java b/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/ConsumerTarget_0_8.java
index 557ac50..8736eca 100644
--- a/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/ConsumerTarget_0_8.java
+++ b/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/ConsumerTarget_0_8.java
@@ -20,8 +20,6 @@
  */
 package org.apache.qpid.server.protocol.v0_8;
 
-import java.util.List;
-import java.util.concurrent.CopyOnWriteArrayList;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicLong;
 
@@ -56,16 +54,16 @@
 
     private final AtomicLong _unacknowledgedCount = new AtomicLong(0);
     private final AtomicLong _unacknowledgedBytes = new AtomicLong(0);
-    private final List<ConsumerImpl> _consumers = new CopyOnWriteArrayList<>();
     private final AtomicBoolean _needToClose = new AtomicBoolean();
     private final String _targetAddress;
 
 
     public static ConsumerTarget_0_8 createBrowserTarget(AMQChannel channel,
                                                          AMQShortString consumerTag, FieldTable filters,
-                                                         FlowCreditManager creditManager)
+                                                         FlowCreditManager creditManager, final boolean multiQueue)
     {
-        return new BrowserConsumer(channel, consumerTag, filters, creditManager, channel.getClientDeliveryMethod(), channel.getRecordDeliveryMethod());
+        return new BrowserConsumer(channel, consumerTag, filters, creditManager, channel.getClientDeliveryMethod(), channel.getRecordDeliveryMethod(),
+                                   multiQueue);
     }
 
     public static ConsumerTarget_0_8 createGetNoAckTarget(final AMQChannel channel,
@@ -78,22 +76,18 @@
         return new GetNoAckConsumer(channel, consumerTag, filters, creditManager, deliveryMethod, recordMethod);
     }
 
-    public List<ConsumerImpl> getConsumers()
-    {
-        return _consumers;
-    }
-
-
     static final class BrowserConsumer extends ConsumerTarget_0_8
     {
         public BrowserConsumer(AMQChannel channel,
-                               AMQShortString consumerTag, FieldTable filters,
+                               AMQShortString consumerTag,
+                               FieldTable filters,
                                FlowCreditManager creditManager,
                                ClientDeliveryMethod deliveryMethod,
-                               RecordDeliveryMethod recordMethod)
+                               RecordDeliveryMethod recordMethod,
+                               boolean multiQueue)
         {
             super(channel, consumerTag,
-                  filters, creditManager, deliveryMethod, recordMethod);
+                  filters, creditManager, deliveryMethod, recordMethod, multiQueue);
         }
 
         /**
@@ -124,19 +118,12 @@
     }
 
     public static ConsumerTarget_0_8 createNoAckTarget(AMQChannel channel,
-                                                           AMQShortString consumerTag, FieldTable filters,
-                                                           FlowCreditManager creditManager)
-    {
-        return new NoAckConsumer(channel, consumerTag, filters, creditManager, channel.getClientDeliveryMethod(), channel.getRecordDeliveryMethod());
-    }
-
-    public static ConsumerTarget_0_8 createNoAckTarget(AMQChannel channel,
                                                        AMQShortString consumerTag, FieldTable filters,
                                                        FlowCreditManager creditManager,
-                                                       ClientDeliveryMethod deliveryMethod,
-                                                       RecordDeliveryMethod recordMethod) throws QpidException
+                                                       boolean multiQueue)
     {
-        return new NoAckConsumer(channel, consumerTag, filters, creditManager, deliveryMethod, recordMethod);
+        return new NoAckConsumer(channel, consumerTag, filters, creditManager, channel.getClientDeliveryMethod(), channel.getRecordDeliveryMethod(),
+                                 multiQueue);
     }
 
     public static class NoAckConsumer extends ConsumerTarget_0_8
@@ -144,12 +131,14 @@
         private final AutoCommitTransaction _txn;
 
         public NoAckConsumer(AMQChannel channel,
-                             AMQShortString consumerTag, FieldTable filters,
+                             AMQShortString consumerTag,
+                             FieldTable filters,
                              FlowCreditManager creditManager,
                              ClientDeliveryMethod deliveryMethod,
-                             RecordDeliveryMethod recordMethod)
+                             RecordDeliveryMethod recordMethod,
+                             boolean multiQueue)
         {
-            super(channel, consumerTag, filters, creditManager, deliveryMethod, recordMethod);
+            super(channel, consumerTag, filters, creditManager, deliveryMethod, recordMethod, multiQueue);
 
             _txn = new AutoCommitTransaction(channel.getAddressSpace().getMessageStore());
         }
@@ -219,17 +208,24 @@
                                 ClientDeliveryMethod deliveryMethod,
                                 RecordDeliveryMethod recordMethod)
         {
-            super(channel, consumerTag, filters, creditManager, deliveryMethod, recordMethod);
+            super(channel, consumerTag, filters, creditManager, deliveryMethod, recordMethod, false);
         }
 
     }
 
 
     public static ConsumerTarget_0_8 createAckTarget(AMQChannel channel,
-                                                         AMQShortString consumerTag, FieldTable filters,
-                                                         FlowCreditManager creditManager)
+                                                     AMQShortString consumerTag,
+                                                     FieldTable filters,
+                                                     FlowCreditManager creditManager,
+                                                     boolean multiQueue)
     {
-        return new AckConsumer(channel,consumerTag,filters,creditManager, channel.getClientDeliveryMethod(), channel.getRecordDeliveryMethod());
+        return new AckConsumer(channel,
+                               consumerTag,
+                               filters, creditManager,
+                               channel.getClientDeliveryMethod(),
+                               channel.getRecordDeliveryMethod(),
+                               multiQueue);
     }
 
 
@@ -239,7 +235,7 @@
                                                          ClientDeliveryMethod deliveryMethod,
                                                          RecordDeliveryMethod recordMethod)
     {
-        return new AckConsumer(channel,consumerTag,filters,creditManager, deliveryMethod, recordMethod);
+        return new AckConsumer(channel, consumerTag, filters, creditManager, deliveryMethod, recordMethod, false);
     }
 
     static final class AckConsumer extends ConsumerTarget_0_8
@@ -248,9 +244,10 @@
                            AMQShortString consumerTag, FieldTable filters,
                            FlowCreditManager creditManager,
                            ClientDeliveryMethod deliveryMethod,
-                           RecordDeliveryMethod recordMethod)
+                           RecordDeliveryMethod recordMethod,
+                           boolean multiQueue)
         {
-            super(channel, consumerTag, filters, creditManager, deliveryMethod, recordMethod);
+            super(channel, consumerTag, filters, creditManager, deliveryMethod, recordMethod, multiQueue);
         }
 
         /**
@@ -305,9 +302,10 @@
                               FieldTable arguments,
                               FlowCreditManager creditManager,
                               ClientDeliveryMethod deliveryMethod,
-                              RecordDeliveryMethod recordMethod)
+                              RecordDeliveryMethod recordMethod,
+                              boolean multiQueue)
     {
-        super(State.ACTIVE);
+        super(State.ACTIVE, isPullOnly(arguments), multiQueue, channel.getAMQPConnection());
 
         _channel = channel;
         _consumerTag = consumerTag;
@@ -346,21 +344,13 @@
         }
     }
 
-    @Override
-    public void consumerRemoved(final ConsumerImpl sub)
+    private static boolean isPullOnly(FieldTable arguments)
     {
-        _consumers.remove(sub);
-        if(_consumers.isEmpty())
-        {
-            close();
-        }
+        return arguments.containsKey(PULL_ONLY_CONSUMER)
+               && Boolean.valueOf(String.valueOf(arguments.get(PULL_ONLY_CONSUMER)));
     }
 
-    @Override
-    public void consumerAdded(final ConsumerImpl sub)
-    {
-        _consumers.add( sub );
-    }
+
 
     @Override
     public String getTargetAddress()
@@ -407,12 +397,6 @@
     }
 
     @Override
-    protected void afterCloseInternal()
-    {
-
-    }
-
-    @Override
     protected void doCloseInternal()
     {
         _creditManager.removeListener(this);
diff --git a/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ConsumerTarget_1_0.java b/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ConsumerTarget_1_0.java
index 6905220..876f91a 100644
--- a/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ConsumerTarget_1_0.java
+++ b/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ConsumerTarget_1_0.java
@@ -21,6 +21,7 @@
 package org.apache.qpid.server.protocol.v1_0;
 
 import java.nio.ByteBuffer;
+import java.util.Arrays;
 import java.util.Collection;
 
 import org.slf4j.Logger;
@@ -33,6 +34,7 @@
 import org.apache.qpid.server.protocol.v1_0.type.Binary;
 import org.apache.qpid.server.protocol.v1_0.type.DeliveryState;
 import org.apache.qpid.server.protocol.v1_0.type.Outcome;
+import org.apache.qpid.server.protocol.v1_0.type.Symbol;
 import org.apache.qpid.server.protocol.v1_0.type.Target;
 import org.apache.qpid.server.protocol.v1_0.type.UnsignedInteger;
 import org.apache.qpid.server.protocol.v1_0.type.codec.AMQPDescribedTypeRegistry;
@@ -40,6 +42,7 @@
 import org.apache.qpid.server.protocol.v1_0.type.messaging.Header;
 import org.apache.qpid.server.protocol.v1_0.type.messaging.Modified;
 import org.apache.qpid.server.protocol.v1_0.type.messaging.Released;
+import org.apache.qpid.server.protocol.v1_0.type.messaging.Source;
 import org.apache.qpid.server.protocol.v1_0.type.transaction.TransactionalState;
 import org.apache.qpid.server.protocol.v1_0.type.transport.SenderSettleMode;
 import org.apache.qpid.server.protocol.v1_0.type.transport.Transfer;
@@ -67,22 +70,22 @@
     private Binary _transactionId;
     private final AMQPDescribedTypeRegistry _typeRegistry;
     private final SectionEncoder _sectionEncoder;
-    private ConsumerImpl _consumer;
     private boolean _queueEmpty;
 
     public ConsumerTarget_1_0(final SendingLink_1_0 link,
                               boolean acquires)
     {
-        super(State.SUSPENDED);
+        super(State.SUSPENDED, isPullOnly(link), false, link.getSession().getAMQPConnection());
         _link = link;
         _typeRegistry = link.getEndpoint().getSession().getConnection().getDescribedTypeRegistry();
         _sectionEncoder = new SectionEncoderImpl(_typeRegistry);
         _acquires = acquires;
     }
 
-    public ConsumerImpl getConsumer()
+    private static boolean isPullOnly(SendingLink_1_0 link)
     {
-        return _consumer;
+        Source source = (Source) link.getEndpoint().getSource();
+        return Arrays.asList(source.getCapabilities()).contains(Symbol.getSymbol("QPID:PULL-ONLY"));
     }
 
     private SendingLinkEndpoint getEndpoint()
@@ -98,12 +101,6 @@
     }
 
     @Override
-    protected void afterCloseInternal()
-    {
-
-    }
-
-    @Override
     protected void doCloseInternal()
     {
 
@@ -212,7 +209,7 @@
                 else
                 {
                     UnsettledAction action = _acquires
-                            ? new DispositionAction(tag, entry)
+                            ? new DispositionAction(tag, entry, consumer)
                             : new DoNothingAction(tag, entry);
 
                     _link.addUnsettled(tag, action, entry);
@@ -239,7 +236,7 @@
 
                             public void onRollback()
                             {
-                                entry.release(getConsumer());
+                                entry.release(consumer);
                                 _link.getEndpoint().updateDisposition(tag, (DeliveryState) null, true);
                             }
                         });
@@ -251,7 +248,7 @@
             }
             else
             {
-                entry.release(getConsumer());
+                entry.release(consumer);
             }
 
         }
@@ -332,7 +329,10 @@
 
     public void flush()
     {
-        _consumer.flush();
+        for(ConsumerImpl consumer : getConsumers())
+        {
+            consumer.flush();
+        }
     }
 
     private class DispositionAction implements UnsettledAction
@@ -340,11 +340,18 @@
 
         private final MessageInstance _queueEntry;
         private final Binary _deliveryTag;
+        private final ConsumerImpl _consumer;
 
-        public DispositionAction(Binary tag, MessageInstance queueEntry)
+        public DispositionAction(Binary tag, MessageInstance queueEntry, final ConsumerImpl consumer)
         {
             _deliveryTag = tag;
             _queueEntry = queueEntry;
+            _consumer = consumer;
+        }
+
+        public ConsumerImpl getConsumer()
+        {
+            return _consumer;
         }
 
         public boolean process(DeliveryState state, final Boolean settled)
@@ -516,18 +523,6 @@
     }
 
     @Override
-    public void consumerAdded(final ConsumerImpl sub)
-    {
-        _consumer = sub;
-    }
-
-    @Override
-    public void consumerRemoved(final ConsumerImpl sub)
-    {
-        close();
-    }
-
-    @Override
     public long getUnacknowledgedBytes()
     {
         // TODO
diff --git a/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Session_1_0.java b/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Session_1_0.java
index 4ef66ee..744f009 100644
--- a/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Session_1_0.java
+++ b/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Session_1_0.java
@@ -1581,7 +1581,11 @@
     {
         for(SendingLink_1_0 link : _sendingLinks)
         {
-            link.getConsumerTarget().notifyCurrentState();
+            ConsumerTarget_1_0 consumerTarget = link.getConsumerTarget();
+            if(!consumerTarget.isPullOnly())
+            {
+                consumerTarget.notifyCurrentState();
+            }
         }
     }
 
diff --git a/broker-plugins/management-amqp/src/main/java/org/apache/qpid/server/management/amqp/ManagementNodeConsumer.java b/broker-plugins/management-amqp/src/main/java/org/apache/qpid/server/management/amqp/ManagementNodeConsumer.java
index e018070..9890992 100644
--- a/broker-plugins/management-amqp/src/main/java/org/apache/qpid/server/management/amqp/ManagementNodeConsumer.java
+++ b/broker-plugins/management-amqp/src/main/java/org/apache/qpid/server/management/amqp/ManagementNodeConsumer.java
@@ -67,6 +67,18 @@
     }
 
     @Override
+    public boolean hasAvailableMessages()
+    {
+        return !_queue.isEmpty();
+    }
+
+    @Override
+    public void pullMessage()
+    {
+
+    }
+
+    @Override
     public long getBytesOut()
     {
         return 0;