QPID-7633: Pull up the processPendingIterator and associated methods.
diff --git a/broker-core/src/main/java/org/apache/qpid/server/session/AbstractAMQPSession.java b/broker-core/src/main/java/org/apache/qpid/server/session/AbstractAMQPSession.java
index 7d38be8..39e497e 100644
--- a/broker-core/src/main/java/org/apache/qpid/server/session/AbstractAMQPSession.java
+++ b/broker-core/src/main/java/org/apache/qpid/server/session/AbstractAMQPSession.java
@@ -25,8 +25,10 @@
 import java.util.Collection;
 import java.util.Date;
 import java.util.HashMap;
+import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
+import java.util.Set;
 import java.util.concurrent.CopyOnWriteArrayList;
 
 import javax.security.auth.Subject;
@@ -36,7 +38,9 @@
 import com.google.common.util.concurrent.ListenableFuture;
 
 import org.apache.qpid.server.connection.SessionPrincipal;
+import org.apache.qpid.server.consumer.AbstractConsumerTarget;
 import org.apache.qpid.server.consumer.ConsumerTarget;
+import org.apache.qpid.server.consumer.ScheduledConsumerTargetSet;
 import org.apache.qpid.server.logging.EventLogger;
 import org.apache.qpid.server.logging.EventLoggerProvider;
 import org.apache.qpid.server.logging.LogSubject;
@@ -76,10 +80,15 @@
     protected final SecurityToken _token;
     protected final PublishAuthorisationCache _publishAuthCache;
 
+    protected final long _maxUncommittedInMemorySize;
+
     protected final LogSubject _logSubject;
 
     protected final List<Action<? super S>> _taskList = new CopyOnWriteArrayList<>();
 
+    protected final Set<AbstractConsumerTarget> _consumersWithPendingWork = new ScheduledConsumerTargetSet<>();
+    private Iterator<AbstractConsumerTarget> _processPendingIterator;
+
     protected AbstractAMQPSession(final Connection<?> parent, final int sessionId)
     {
         super(parent, createAttributes(sessionId));
@@ -115,6 +124,9 @@
         final long authCacheTimeout = _connection.getContextValue(Long.class, Session.PRODUCER_AUTH_CACHE_TIMEOUT);
         final int authCacheSize = _connection.getContextValue(Integer.class, Session.PRODUCER_AUTH_CACHE_SIZE);
         _publishAuthCache = new PublishAuthorisationCache(_token, authCacheTimeout, authCacheSize);
+
+        _maxUncommittedInMemorySize = _connection.getContextValue(Long.class, Connection.MAX_UNCOMMITTED_IN_MEMORY_SIZE);
+
         _logSubject = new ChannelLogSubject(this);
 
         setState(State.ACTIVE);
@@ -349,9 +361,19 @@
         }
     }
 
-    public abstract void addTicker(final Ticker ticker);
+    @Override
+    public void addTicker(final Ticker ticker)
+    {
+        _connection.getAggregateTicker().addTicker(ticker);
+        // trigger a wakeup to ensure the ticker will be taken into account
+        getAMQPConnection().notifyWork();
+    }
 
-    public abstract void removeTicker(final Ticker ticker);
+    @Override
+    public void removeTicker(final Ticker ticker)
+    {
+        _connection.getAggregateTicker().removeTicker(ticker);
+    }
 
     public abstract void doTimeoutAction(final String idleTransactionTimeoutError);
 
@@ -364,11 +386,54 @@
 
     public abstract long getTransactionStartTimeLong();
 
-
     @Override
     protected void logOperation(final String operation)
     {
         getEventLogger().message(ChannelMessages.OPERATION(operation));
     }
 
+    @Override
+    public boolean processPending()
+    {
+        if (!getAMQPConnection().isIOThread() || isClosing())
+        {
+            return false;
+        }
+
+        updateBlockedStateIfNecessary();
+
+        if(!_consumersWithPendingWork.isEmpty() && !getAMQPConnection().isTransportBlockedForWriting())
+        {
+            if (_processPendingIterator == null || !_processPendingIterator.hasNext())
+            {
+                _processPendingIterator = _consumersWithPendingWork.iterator();
+            }
+
+            if(_processPendingIterator.hasNext())
+            {
+                AbstractConsumerTarget target = _processPendingIterator.next();
+                _processPendingIterator.remove();
+                if (target.processPending())
+                {
+                    _consumersWithPendingWork.add(target);
+                }
+            }
+        }
+
+        return !_consumersWithPendingWork.isEmpty() && !getAMQPConnection().isTransportBlockedForWriting();
+    }
+
+    public void notifyWork(final X target)
+    {
+        if(_consumersWithPendingWork.add((AbstractConsumerTarget) target))
+        {
+            getAMQPConnection().notifyWork(this);
+        }
+    }
+
+    public abstract void transportStateChanged();
+
+    protected abstract void updateBlockedStateIfNecessary();
+
+    public abstract boolean isClosing();
 }
diff --git a/broker-core/src/main/java/org/apache/qpid/server/transport/AMQPConnection.java b/broker-core/src/main/java/org/apache/qpid/server/transport/AMQPConnection.java
index 3987a4b..13a3077 100644
--- a/broker-core/src/main/java/org/apache/qpid/server/transport/AMQPConnection.java
+++ b/broker-core/src/main/java/org/apache/qpid/server/transport/AMQPConnection.java
@@ -75,6 +75,7 @@
 
     boolean hasSessionWithName(byte[] name);
 
+    AggregateTicker getAggregateTicker();
 
     enum CloseReason
     {
diff --git a/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerConnection.java b/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerConnection.java
index ebe5aad..2e23c1b 100644
--- a/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerConnection.java
+++ b/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerConnection.java
@@ -482,7 +482,7 @@
     {
         for (ServerSession ssn : getSessionModels())
         {
-            ssn.transportStateChanged();
+            ssn.getModelObject().transportStateChanged();
         }
     }
 
diff --git a/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSession.java b/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSession.java
index 5bb684f..ac98736 100644
--- a/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSession.java
+++ b/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSession.java
@@ -54,11 +54,9 @@
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import org.apache.qpid.server.consumer.ScheduledConsumerTargetSet;
 import org.apache.qpid.server.logging.LogMessage;
 import org.apache.qpid.server.logging.LogSubject;
 import org.apache.qpid.server.logging.messages.ChannelMessages;
-import org.apache.qpid.server.logging.subjects.ChannelLogSubject;
 import org.apache.qpid.server.message.InstanceProperties;
 import org.apache.qpid.server.message.MessageDestination;
 import org.apache.qpid.server.message.MessageInstance;
@@ -93,10 +91,8 @@
 import org.apache.qpid.server.txn.TimeoutDtxException;
 import org.apache.qpid.server.txn.UnknownDtxBranchException;
 import org.apache.qpid.server.util.Action;
-import org.apache.qpid.server.util.Deletable;
 import org.apache.qpid.server.util.ServerScopedRuntimeException;
 import org.apache.qpid.transport.*;
-import org.apache.qpid.transport.network.Ticker;
 
 public class ServerSession extends Session
         implements LogSubject, AsyncAutoCommitTransaction.FutureRecorder
@@ -125,8 +121,6 @@
     private long _blockTime;
     private long _blockingTimeout;
     private boolean _wireBlockingState;
-    private final Set<ConsumerTarget_0_10> _consumersWithPendingWork = new ScheduledConsumerTargetSet<>();
-    private Iterator<ConsumerTarget_0_10> _processPendingIterator;
 
     public static interface MessageDispositionChangeListener
     {
@@ -154,13 +148,10 @@
     private Map<String, ConsumerTarget_0_10> _subscriptions = new ConcurrentHashMap<String, ConsumerTarget_0_10>();
     private final CopyOnWriteArrayList<Consumer<?, ConsumerTarget_0_10>> _consumers = new CopyOnWriteArrayList<>();
 
-    private final List<Action<? super ServerSession>> _taskList = new CopyOnWriteArrayList<Action<? super ServerSession>>();
-
     private AtomicReference<LogMessage> _forcedCloseLogMessage = new AtomicReference<LogMessage>();
 
     private volatile long _uncommittedMessageSize;
     private final List<StoredMessage<MessageMetaData_0_10>> _uncommittedMessages = new ArrayList<>();
-    private long _maxUncommittedInMemorySize;
 
     public ServerSession(Connection connection, SessionDelegate delegate, Binary name, long expiry)
     {
@@ -170,7 +161,6 @@
         ServerConnection serverConnection = (ServerConnection) connection;
 
         _blockingTimeout = serverConnection.getBroker().getContextValue(Long.class, Broker.CHANNEL_FLOW_CONTROL_ENFORCEMENT_TIMEOUT);
-        _maxUncommittedInMemorySize = getAMQPConnection().getContextProvider().getContextValue(Long.class, org.apache.qpid.server.model.Connection.MAX_UNCOMMITTED_IN_MEMORY_SIZE);
     }
 
     public AccessControlContext getAccessControllerContext()
@@ -832,16 +822,24 @@
         return b;
     }
 
-    public void transportStateChanged()
+    public void updateBlockedStateIfNecesssary()
     {
-        for(ConsumerTarget_0_10 consumerTarget : getSubscriptions())
+        boolean desiredBlockingState = _blocking.get();
+        if (desiredBlockingState != _wireBlockingState)
         {
-            consumerTarget.transportStateChanged();
+            _wireBlockingState = desiredBlockingState;
+
+            if (desiredBlockingState)
+            {
+                invokeBlock();
+            }
+            else
+            {
+                invokeUnblock();
+            }
+            _blockTime = desiredBlockingState ? System.currentTimeMillis() : 0;
         }
-        if (!_consumersWithPendingWork.isEmpty() && !getAMQPConnection().isTransportBlockedForWriting())
-        {
-            getAMQPConnection().notifyWork(_modelObject);
-        }
+
     }
 
     public Object getConnectionReference()
@@ -1100,70 +1098,6 @@
         }
     }
 
-    public boolean processPending()
-    {
-        if (!getAMQPConnection().isIOThread() || isClosing())
-        {
-            return false;
-        }
-
-        boolean desiredBlockingState = _blocking.get();
-        if (desiredBlockingState != _wireBlockingState)
-        {
-            _wireBlockingState = desiredBlockingState;
-
-            if (desiredBlockingState)
-            {
-                invokeBlock();
-            }
-            else
-            {
-                invokeUnblock();
-            }
-            _blockTime = desiredBlockingState ? System.currentTimeMillis() : 0;
-        }
-
-        if (!_consumersWithPendingWork.isEmpty() && !getAMQPConnection().isTransportBlockedForWriting())
-        {
-            if (_processPendingIterator == null || !_processPendingIterator.hasNext())
-            {
-                _processPendingIterator = _consumersWithPendingWork.iterator();
-            }
-
-            if (_processPendingIterator.hasNext())
-            {
-                ConsumerTarget_0_10 target = _processPendingIterator.next();
-                _processPendingIterator.remove();
-                if (target.processPending())
-                {
-                    _consumersWithPendingWork.add(target);
-                }
-            }
-        }
-
-        return !_consumersWithPendingWork.isEmpty() && !getAMQPConnection().isTransportBlockedForWriting();
-    }
-
-    public void addTicker(final Ticker ticker)
-    {
-        getAMQPConnection().getAggregateTicker().addTicker(ticker);
-        // trigger a wakeup to ensure the ticker will be taken into account
-        getAMQPConnection().notifyWork();
-    }
-
-    public void removeTicker(final Ticker ticker)
-    {
-        getAMQPConnection().getAggregateTicker().removeTicker(ticker);
-    }
-
-    public void notifyWork(final ConsumerTarget_0_10 target)
-    {
-        if(_consumersWithPendingWork.add(target))
-        {
-            getAMQPConnection().notifyWork(_modelObject);
-        }
-    }
-
     public void doTimeoutAction(final String reason)
     {
         getAMQPConnection().closeSessionAsync(_modelObject,
@@ -1172,7 +1106,7 @@
 
     public final long getMaxUncommittedInMemorySize()
     {
-        return _maxUncommittedInMemorySize;
+        return _modelObject.getMaxUncommittedInMemorySize();
     }
 
     public int compareTo(AMQSessionModel o)
diff --git a/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/Session_0_10.java b/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/Session_0_10.java
index dd028c3..9b2e78a 100644
--- a/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/Session_0_10.java
+++ b/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/Session_0_10.java
@@ -35,9 +35,7 @@
 import org.apache.qpid.server.protocol.ConsumerListener;
 import org.apache.qpid.server.protocol.PublishAuthorisationCache;
 import org.apache.qpid.server.session.AbstractAMQPSession;
-import org.apache.qpid.server.transport.AMQPConnection;
 import org.apache.qpid.server.util.Action;
-import org.apache.qpid.transport.network.Ticker;
 
 public class Session_0_10 extends AbstractAMQPSession<Session_0_10, ConsumerTarget_0_10>
         implements AMQSessionModel<Session_0_10, ConsumerTarget_0_10>, LogSubject
@@ -115,19 +113,20 @@
     @Override
     public void transportStateChanged()
     {
-        _serverSession.transportStateChanged();
+        for(ConsumerTarget_0_10 consumerTarget : _serverSession.getSubscriptions())
+        {
+            consumerTarget.transportStateChanged();
+        }
+        if (!_consumersWithPendingWork.isEmpty() && !getAMQPConnection().isTransportBlockedForWriting())
+        {
+            getAMQPConnection().notifyWork(this);
+        }
     }
 
     @Override
-    public boolean processPending()
+    protected void updateBlockedStateIfNecessary()
     {
-        return _serverSession.processPending();
-    }
-
-    @Override
-    public void notifyWork(final ConsumerTarget_0_10 target)
-    {
-        _serverSession.notifyWork(target);
+        _serverSession.updateBlockedStateIfNecesssary();
     }
 
     @Override
@@ -179,18 +178,6 @@
     }
 
     @Override
-    public void addTicker(final Ticker ticker)
-    {
-        _serverSession.addTicker(ticker);
-    }
-
-    @Override
-    public void removeTicker(final Ticker ticker)
-    {
-        _serverSession.removeTicker(ticker);
-    }
-
-    @Override
     public void doTimeoutAction(final String idleTransactionTimeoutError)
     {
         _serverSession.doTimeoutAction(idleTransactionTimeoutError);
@@ -242,4 +229,9 @@
     {
         return _serverSession;
     }
+
+    public long getMaxUncommittedInMemorySize()
+    {
+        return _maxUncommittedInMemorySize;
+    }
 }
diff --git a/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java b/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java
index 784d9c4..93f6985 100644
--- a/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java
+++ b/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java
@@ -32,7 +32,6 @@
 import java.util.EnumSet;
 import java.util.HashMap;
 import java.util.HashSet;
-import java.util.Iterator;
 import java.util.LinkedHashMap;
 import java.util.LinkedList;
 import java.util.List;
@@ -60,20 +59,16 @@
 import org.apache.qpid.protocol.ErrorCodes;
 import org.apache.qpid.server.consumer.ConsumerOption;
 import org.apache.qpid.server.consumer.ConsumerTarget;
-import org.apache.qpid.server.consumer.ScheduledConsumerTargetSet;
 import org.apache.qpid.server.filter.AMQInvalidArgumentException;
 import org.apache.qpid.server.filter.ArrivalTimeFilter;
 import org.apache.qpid.server.filter.FilterManager;
 import org.apache.qpid.server.filter.FilterManagerFactory;
 import org.apache.qpid.server.filter.Filterable;
 import org.apache.qpid.server.filter.MessageFilter;
-import org.apache.qpid.server.logging.EventLogger;
 import org.apache.qpid.server.logging.EventLoggerProvider;
 import org.apache.qpid.server.logging.LogMessage;
-import org.apache.qpid.server.logging.LogSubject;
 import org.apache.qpid.server.logging.messages.ChannelMessages;
 import org.apache.qpid.server.logging.messages.ExchangeMessages;
-import org.apache.qpid.server.logging.subjects.ChannelLogSubject;
 import org.apache.qpid.server.message.InstanceProperties;
 import org.apache.qpid.server.message.MessageDestination;
 import org.apache.qpid.server.message.MessageInstance;
@@ -103,7 +98,6 @@
 import org.apache.qpid.server.virtualhost.ExchangeIsAlternateException;
 import org.apache.qpid.server.virtualhost.RequiredExchangeException;
 import org.apache.qpid.server.virtualhost.ReservedExchangeNameException;
-import org.apache.qpid.transport.network.Ticker;
 
 public class AMQChannel extends AbstractAMQPSession<AMQChannel, ConsumerTarget_0_8>
         implements AMQSessionModel<AMQChannel, ConsumerTarget_0_8>,
@@ -155,10 +149,6 @@
     /** Maps from consumer tag to subscription instance. Allows us to unsubscribe from a queue. */
     private final Map<AMQShortString, ConsumerTarget_0_8> _tag2SubscriptionTargetMap = new HashMap<AMQShortString, ConsumerTarget_0_8>();
 
-    private final Set<ConsumerTarget_0_8> _consumersWithPendingWork = new ScheduledConsumerTargetSet<>();
-
-    private Iterator<ConsumerTarget_0_8> _processPendingIterator;
-
     private final MessageStore _messageStore;
 
     private final LinkedList<AsyncCommand> _unfinishedCommandsQueue = new LinkedList<AsyncCommand>();
@@ -205,7 +195,6 @@
     private long _confirmedMessageCounter;
     private volatile long _uncommittedMessageSize;
     private final List<StoredMessage<MessageMetaData>> _uncommittedMessages = new ArrayList<>();
-    private long _maxUncommittedInMemorySize;
 
     private boolean _wireBlockingState;
 
@@ -231,8 +220,6 @@
         _connection = connection;
         _channelId = channelId;
 
-
-        _maxUncommittedInMemorySize = connection.getContextProvider().getContextValue(Long.class, Connection.MAX_UNCOMMITTED_IN_MEMORY_SIZE);
         _messageStore = messageStore;
         _blockingTimeout = connection.getBroker().getContextValue(Long.class,
                                                                   Broker.CHANNEL_FLOW_CONTROL_ENFORCEMENT_TIMEOUT);
@@ -1294,7 +1281,8 @@
         return "("+ _suspended.get() + ", " + _closing.get() + ", " + _connection.isClosing() + ") "+"["+ _connection.toString()+":"+_channelId+"]";
     }
 
-    boolean isClosing()
+    @Override
+    public boolean isClosing()
     {
         return _closing.get();
     }
@@ -3666,14 +3654,8 @@
         }
     }
 
-    @Override
-    public boolean processPending()
+    protected void updateBlockedStateIfNecessary()
     {
-        if (!getAMQPConnection().isIOThread() || isClosing())
-        {
-            return false;
-        }
-
         boolean desiredBlockingState = _blocking.get();
         if (desiredBlockingState != _wireBlockingState)
         {
@@ -3681,49 +3663,6 @@
             sendFlow(!desiredBlockingState);
             _blockTime = desiredBlockingState ? System.currentTimeMillis() : 0;
         }
-
-        if(!_consumersWithPendingWork.isEmpty() && !getAMQPConnection().isTransportBlockedForWriting())
-        {
-            if (_processPendingIterator == null || !_processPendingIterator.hasNext())
-            {
-                _processPendingIterator = _consumersWithPendingWork.iterator();
-            }
-
-            if(_processPendingIterator.hasNext())
-            {
-                ConsumerTarget_0_8 target = _processPendingIterator.next();
-                _processPendingIterator.remove();
-                if (target.processPending())
-                {
-                    _consumersWithPendingWork.add(target);
-                }
-            }
-        }
-
-        return !_consumersWithPendingWork.isEmpty() && !getAMQPConnection().isTransportBlockedForWriting();
-    }
-
-    @Override
-    public void addTicker(final Ticker ticker)
-    {
-        getConnection().getAggregateTicker().addTicker(ticker);
-        // trigger a wakeup to ensure the ticker will be taken into account
-        getAMQPConnection().notifyWork();
-    }
-
-    @Override
-    public void removeTicker(final Ticker ticker)
-    {
-        getConnection().getAggregateTicker().removeTicker(ticker);
-    }
-
-    @Override
-    public void notifyWork(final ConsumerTarget_0_8 target)
-    {
-        if(_consumersWithPendingWork.add(target))
-        {
-            getAMQPConnection().notifyWork(this);
-        }
     }
 
     @Override
diff --git a/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/AMQChannelTest.java b/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/AMQChannelTest.java
index 2c7267b..55c7d9e 100644
--- a/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/AMQChannelTest.java
+++ b/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/AMQChannelTest.java
@@ -123,6 +123,7 @@
         when(_amqConnection.getContextProvider()).thenReturn(_virtualHost);
         when(_amqConnection.getContextValue(Long.class, Session.PRODUCER_AUTH_CACHE_TIMEOUT)).thenReturn(Session.PRODUCER_AUTH_CACHE_TIMEOUT_DEFAULT);
         when(_amqConnection.getContextValue(Integer.class, Session.PRODUCER_AUTH_CACHE_SIZE)).thenReturn(Session.PRODUCER_AUTH_CACHE_SIZE_DEFAULT);
+        when(_amqConnection.getContextValue(Long.class, Connection.MAX_UNCOMMITTED_IN_MEMORY_SIZE)).thenReturn(Connection.DEFAULT_MAX_UNCOMMITTED_IN_MEMORY_SIZE);
         when(_amqConnection.getTaskExecutor()).thenReturn(taskExecutor);
         when(_amqConnection.getChildExecutor()).thenReturn(taskExecutor);
         when(_amqConnection.getModel()).thenReturn(BrokerModel.getInstance());
diff --git a/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Session_1_0.java b/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Session_1_0.java
index 1853e8e..4def54f 100644
--- a/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Session_1_0.java
+++ b/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Session_1_0.java
@@ -125,8 +125,11 @@
 import org.apache.qpid.server.txn.ServerTransaction;
 import org.apache.qpid.server.util.Action;
 import org.apache.qpid.server.util.ConnectionScopedRuntimeException;
+<<<<<<< HEAD
 import org.apache.qpid.server.virtualhost.QueueManagingVirtualHost;
 import org.apache.qpid.transport.network.Ticker;
+=======
+>>>>>>> QPID-7633: Pull up the processPendingIterator and associated methods.
 
 public class Session_1_0 extends AbstractAMQPSession<Session_1_0, ConsumerTarget_1_0>
         implements AMQSessionModel<Session_1_0, ConsumerTarget_1_0>, LogSubject
@@ -149,8 +152,6 @@
     private final ConfigurationChangeListener _consumerClosedListener = new ConsumerClosedListener();
     private final CopyOnWriteArrayList<ConsumerListener> _consumerListeners = new CopyOnWriteArrayList<ConsumerListener>();
     private Session<?> _modelObject = this;
-    private final Set<ConsumerTarget_1_0> _consumersWithPendingWork = new ScheduledConsumerTargetSet<>();
-    private Iterator<ConsumerTarget_1_0> _processPendingIterator;
 
     private SessionState _sessionState;
 
@@ -1920,56 +1921,15 @@
     }
 
     @Override
-    public boolean processPending()
+    protected void updateBlockedStateIfNecessary()
     {
-        if (!getAMQPConnection().isIOThread() || END_STATES.contains(getSessionState()))
-        {
-            return false;
-        }
 
-
-        if(!_consumersWithPendingWork.isEmpty() && !getAMQPConnection().isTransportBlockedForWriting())
-        {
-            if (_processPendingIterator == null || !_processPendingIterator.hasNext())
-            {
-                _processPendingIterator = _consumersWithPendingWork.iterator();
-            }
-
-            if(_processPendingIterator.hasNext())
-            {
-                ConsumerTarget_1_0 target = _processPendingIterator.next();
-                _processPendingIterator.remove();
-                if (target.processPending())
-                {
-                    _consumersWithPendingWork.add(target);
-                }
-            }
-        }
-
-        return !_consumersWithPendingWork.isEmpty() && !getAMQPConnection().isTransportBlockedForWriting();
     }
 
     @Override
-    public void addTicker(final Ticker ticker)
+    public boolean isClosing()
     {
-        getConnection().getAggregateTicker().addTicker(ticker);
-        // trigger a wakeup to ensure the ticker will be taken into account
-        getAMQPConnection().notifyWork();
-    }
-
-    @Override
-    public void removeTicker(final Ticker ticker)
-    {
-        getConnection().getAggregateTicker().removeTicker(ticker);
-    }
-
-    @Override
-    public void notifyWork(final ConsumerTarget_1_0 target)
-    {
-        if(_consumersWithPendingWork.add(target))
-        {
-            getAMQPConnection().notifyWork(this);
-        }
+        return END_STATES.contains(getSessionState());
     }
 
     @Override