QPID-7633: Pull up the processPendingIterator and associated methods.
diff --git a/broker-core/src/main/java/org/apache/qpid/server/session/AbstractAMQPSession.java b/broker-core/src/main/java/org/apache/qpid/server/session/AbstractAMQPSession.java
index 7d38be8..39e497e 100644
--- a/broker-core/src/main/java/org/apache/qpid/server/session/AbstractAMQPSession.java
+++ b/broker-core/src/main/java/org/apache/qpid/server/session/AbstractAMQPSession.java
@@ -25,8 +25,10 @@
import java.util.Collection;
import java.util.Date;
import java.util.HashMap;
+import java.util.Iterator;
import java.util.List;
import java.util.Map;
+import java.util.Set;
import java.util.concurrent.CopyOnWriteArrayList;
import javax.security.auth.Subject;
@@ -36,7 +38,9 @@
import com.google.common.util.concurrent.ListenableFuture;
import org.apache.qpid.server.connection.SessionPrincipal;
+import org.apache.qpid.server.consumer.AbstractConsumerTarget;
import org.apache.qpid.server.consumer.ConsumerTarget;
+import org.apache.qpid.server.consumer.ScheduledConsumerTargetSet;
import org.apache.qpid.server.logging.EventLogger;
import org.apache.qpid.server.logging.EventLoggerProvider;
import org.apache.qpid.server.logging.LogSubject;
@@ -76,10 +80,15 @@
protected final SecurityToken _token;
protected final PublishAuthorisationCache _publishAuthCache;
+ protected final long _maxUncommittedInMemorySize;
+
protected final LogSubject _logSubject;
protected final List<Action<? super S>> _taskList = new CopyOnWriteArrayList<>();
+ protected final Set<AbstractConsumerTarget> _consumersWithPendingWork = new ScheduledConsumerTargetSet<>();
+ private Iterator<AbstractConsumerTarget> _processPendingIterator;
+
protected AbstractAMQPSession(final Connection<?> parent, final int sessionId)
{
super(parent, createAttributes(sessionId));
@@ -115,6 +124,9 @@
final long authCacheTimeout = _connection.getContextValue(Long.class, Session.PRODUCER_AUTH_CACHE_TIMEOUT);
final int authCacheSize = _connection.getContextValue(Integer.class, Session.PRODUCER_AUTH_CACHE_SIZE);
_publishAuthCache = new PublishAuthorisationCache(_token, authCacheTimeout, authCacheSize);
+
+ _maxUncommittedInMemorySize = _connection.getContextValue(Long.class, Connection.MAX_UNCOMMITTED_IN_MEMORY_SIZE);
+
_logSubject = new ChannelLogSubject(this);
setState(State.ACTIVE);
@@ -349,9 +361,19 @@
}
}
- public abstract void addTicker(final Ticker ticker);
+ @Override
+ public void addTicker(final Ticker ticker)
+ {
+ _connection.getAggregateTicker().addTicker(ticker);
+ // trigger a wakeup to ensure the ticker will be taken into account
+ getAMQPConnection().notifyWork();
+ }
- public abstract void removeTicker(final Ticker ticker);
+ @Override
+ public void removeTicker(final Ticker ticker)
+ {
+ _connection.getAggregateTicker().removeTicker(ticker);
+ }
public abstract void doTimeoutAction(final String idleTransactionTimeoutError);
@@ -364,11 +386,54 @@
public abstract long getTransactionStartTimeLong();
-
@Override
protected void logOperation(final String operation)
{
getEventLogger().message(ChannelMessages.OPERATION(operation));
}
+ @Override
+ public boolean processPending()
+ {
+ if (!getAMQPConnection().isIOThread() || isClosing())
+ {
+ return false;
+ }
+
+ updateBlockedStateIfNecessary();
+
+ if(!_consumersWithPendingWork.isEmpty() && !getAMQPConnection().isTransportBlockedForWriting())
+ {
+ if (_processPendingIterator == null || !_processPendingIterator.hasNext())
+ {
+ _processPendingIterator = _consumersWithPendingWork.iterator();
+ }
+
+ if(_processPendingIterator.hasNext())
+ {
+ AbstractConsumerTarget target = _processPendingIterator.next();
+ _processPendingIterator.remove();
+ if (target.processPending())
+ {
+ _consumersWithPendingWork.add(target);
+ }
+ }
+ }
+
+ return !_consumersWithPendingWork.isEmpty() && !getAMQPConnection().isTransportBlockedForWriting();
+ }
+
+ public void notifyWork(final X target)
+ {
+ if(_consumersWithPendingWork.add((AbstractConsumerTarget) target))
+ {
+ getAMQPConnection().notifyWork(this);
+ }
+ }
+
+ public abstract void transportStateChanged();
+
+ protected abstract void updateBlockedStateIfNecessary();
+
+ public abstract boolean isClosing();
}
diff --git a/broker-core/src/main/java/org/apache/qpid/server/transport/AMQPConnection.java b/broker-core/src/main/java/org/apache/qpid/server/transport/AMQPConnection.java
index 3987a4b..13a3077 100644
--- a/broker-core/src/main/java/org/apache/qpid/server/transport/AMQPConnection.java
+++ b/broker-core/src/main/java/org/apache/qpid/server/transport/AMQPConnection.java
@@ -75,6 +75,7 @@
boolean hasSessionWithName(byte[] name);
+ AggregateTicker getAggregateTicker();
enum CloseReason
{
diff --git a/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerConnection.java b/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerConnection.java
index ebe5aad..2e23c1b 100644
--- a/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerConnection.java
+++ b/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerConnection.java
@@ -482,7 +482,7 @@
{
for (ServerSession ssn : getSessionModels())
{
- ssn.transportStateChanged();
+ ssn.getModelObject().transportStateChanged();
}
}
diff --git a/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSession.java b/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSession.java
index 5bb684f..ac98736 100644
--- a/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSession.java
+++ b/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSession.java
@@ -54,11 +54,9 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import org.apache.qpid.server.consumer.ScheduledConsumerTargetSet;
import org.apache.qpid.server.logging.LogMessage;
import org.apache.qpid.server.logging.LogSubject;
import org.apache.qpid.server.logging.messages.ChannelMessages;
-import org.apache.qpid.server.logging.subjects.ChannelLogSubject;
import org.apache.qpid.server.message.InstanceProperties;
import org.apache.qpid.server.message.MessageDestination;
import org.apache.qpid.server.message.MessageInstance;
@@ -93,10 +91,8 @@
import org.apache.qpid.server.txn.TimeoutDtxException;
import org.apache.qpid.server.txn.UnknownDtxBranchException;
import org.apache.qpid.server.util.Action;
-import org.apache.qpid.server.util.Deletable;
import org.apache.qpid.server.util.ServerScopedRuntimeException;
import org.apache.qpid.transport.*;
-import org.apache.qpid.transport.network.Ticker;
public class ServerSession extends Session
implements LogSubject, AsyncAutoCommitTransaction.FutureRecorder
@@ -125,8 +121,6 @@
private long _blockTime;
private long _blockingTimeout;
private boolean _wireBlockingState;
- private final Set<ConsumerTarget_0_10> _consumersWithPendingWork = new ScheduledConsumerTargetSet<>();
- private Iterator<ConsumerTarget_0_10> _processPendingIterator;
public static interface MessageDispositionChangeListener
{
@@ -154,13 +148,10 @@
private Map<String, ConsumerTarget_0_10> _subscriptions = new ConcurrentHashMap<String, ConsumerTarget_0_10>();
private final CopyOnWriteArrayList<Consumer<?, ConsumerTarget_0_10>> _consumers = new CopyOnWriteArrayList<>();
- private final List<Action<? super ServerSession>> _taskList = new CopyOnWriteArrayList<Action<? super ServerSession>>();
-
private AtomicReference<LogMessage> _forcedCloseLogMessage = new AtomicReference<LogMessage>();
private volatile long _uncommittedMessageSize;
private final List<StoredMessage<MessageMetaData_0_10>> _uncommittedMessages = new ArrayList<>();
- private long _maxUncommittedInMemorySize;
public ServerSession(Connection connection, SessionDelegate delegate, Binary name, long expiry)
{
@@ -170,7 +161,6 @@
ServerConnection serverConnection = (ServerConnection) connection;
_blockingTimeout = serverConnection.getBroker().getContextValue(Long.class, Broker.CHANNEL_FLOW_CONTROL_ENFORCEMENT_TIMEOUT);
- _maxUncommittedInMemorySize = getAMQPConnection().getContextProvider().getContextValue(Long.class, org.apache.qpid.server.model.Connection.MAX_UNCOMMITTED_IN_MEMORY_SIZE);
}
public AccessControlContext getAccessControllerContext()
@@ -832,16 +822,24 @@
return b;
}
- public void transportStateChanged()
+ public void updateBlockedStateIfNecesssary()
{
- for(ConsumerTarget_0_10 consumerTarget : getSubscriptions())
+ boolean desiredBlockingState = _blocking.get();
+ if (desiredBlockingState != _wireBlockingState)
{
- consumerTarget.transportStateChanged();
+ _wireBlockingState = desiredBlockingState;
+
+ if (desiredBlockingState)
+ {
+ invokeBlock();
+ }
+ else
+ {
+ invokeUnblock();
+ }
+ _blockTime = desiredBlockingState ? System.currentTimeMillis() : 0;
}
- if (!_consumersWithPendingWork.isEmpty() && !getAMQPConnection().isTransportBlockedForWriting())
- {
- getAMQPConnection().notifyWork(_modelObject);
- }
+
}
public Object getConnectionReference()
@@ -1100,70 +1098,6 @@
}
}
- public boolean processPending()
- {
- if (!getAMQPConnection().isIOThread() || isClosing())
- {
- return false;
- }
-
- boolean desiredBlockingState = _blocking.get();
- if (desiredBlockingState != _wireBlockingState)
- {
- _wireBlockingState = desiredBlockingState;
-
- if (desiredBlockingState)
- {
- invokeBlock();
- }
- else
- {
- invokeUnblock();
- }
- _blockTime = desiredBlockingState ? System.currentTimeMillis() : 0;
- }
-
- if (!_consumersWithPendingWork.isEmpty() && !getAMQPConnection().isTransportBlockedForWriting())
- {
- if (_processPendingIterator == null || !_processPendingIterator.hasNext())
- {
- _processPendingIterator = _consumersWithPendingWork.iterator();
- }
-
- if (_processPendingIterator.hasNext())
- {
- ConsumerTarget_0_10 target = _processPendingIterator.next();
- _processPendingIterator.remove();
- if (target.processPending())
- {
- _consumersWithPendingWork.add(target);
- }
- }
- }
-
- return !_consumersWithPendingWork.isEmpty() && !getAMQPConnection().isTransportBlockedForWriting();
- }
-
- public void addTicker(final Ticker ticker)
- {
- getAMQPConnection().getAggregateTicker().addTicker(ticker);
- // trigger a wakeup to ensure the ticker will be taken into account
- getAMQPConnection().notifyWork();
- }
-
- public void removeTicker(final Ticker ticker)
- {
- getAMQPConnection().getAggregateTicker().removeTicker(ticker);
- }
-
- public void notifyWork(final ConsumerTarget_0_10 target)
- {
- if(_consumersWithPendingWork.add(target))
- {
- getAMQPConnection().notifyWork(_modelObject);
- }
- }
-
public void doTimeoutAction(final String reason)
{
getAMQPConnection().closeSessionAsync(_modelObject,
@@ -1172,7 +1106,7 @@
public final long getMaxUncommittedInMemorySize()
{
- return _maxUncommittedInMemorySize;
+ return _modelObject.getMaxUncommittedInMemorySize();
}
public int compareTo(AMQSessionModel o)
diff --git a/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/Session_0_10.java b/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/Session_0_10.java
index dd028c3..9b2e78a 100644
--- a/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/Session_0_10.java
+++ b/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/Session_0_10.java
@@ -35,9 +35,7 @@
import org.apache.qpid.server.protocol.ConsumerListener;
import org.apache.qpid.server.protocol.PublishAuthorisationCache;
import org.apache.qpid.server.session.AbstractAMQPSession;
-import org.apache.qpid.server.transport.AMQPConnection;
import org.apache.qpid.server.util.Action;
-import org.apache.qpid.transport.network.Ticker;
public class Session_0_10 extends AbstractAMQPSession<Session_0_10, ConsumerTarget_0_10>
implements AMQSessionModel<Session_0_10, ConsumerTarget_0_10>, LogSubject
@@ -115,19 +113,20 @@
@Override
public void transportStateChanged()
{
- _serverSession.transportStateChanged();
+ for(ConsumerTarget_0_10 consumerTarget : _serverSession.getSubscriptions())
+ {
+ consumerTarget.transportStateChanged();
+ }
+ if (!_consumersWithPendingWork.isEmpty() && !getAMQPConnection().isTransportBlockedForWriting())
+ {
+ getAMQPConnection().notifyWork(this);
+ }
}
@Override
- public boolean processPending()
+ protected void updateBlockedStateIfNecessary()
{
- return _serverSession.processPending();
- }
-
- @Override
- public void notifyWork(final ConsumerTarget_0_10 target)
- {
- _serverSession.notifyWork(target);
+ _serverSession.updateBlockedStateIfNecesssary();
}
@Override
@@ -179,18 +178,6 @@
}
@Override
- public void addTicker(final Ticker ticker)
- {
- _serverSession.addTicker(ticker);
- }
-
- @Override
- public void removeTicker(final Ticker ticker)
- {
- _serverSession.removeTicker(ticker);
- }
-
- @Override
public void doTimeoutAction(final String idleTransactionTimeoutError)
{
_serverSession.doTimeoutAction(idleTransactionTimeoutError);
@@ -242,4 +229,9 @@
{
return _serverSession;
}
+
+ public long getMaxUncommittedInMemorySize()
+ {
+ return _maxUncommittedInMemorySize;
+ }
}
diff --git a/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java b/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java
index 784d9c4..93f6985 100644
--- a/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java
+++ b/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java
@@ -32,7 +32,6 @@
import java.util.EnumSet;
import java.util.HashMap;
import java.util.HashSet;
-import java.util.Iterator;
import java.util.LinkedHashMap;
import java.util.LinkedList;
import java.util.List;
@@ -60,20 +59,16 @@
import org.apache.qpid.protocol.ErrorCodes;
import org.apache.qpid.server.consumer.ConsumerOption;
import org.apache.qpid.server.consumer.ConsumerTarget;
-import org.apache.qpid.server.consumer.ScheduledConsumerTargetSet;
import org.apache.qpid.server.filter.AMQInvalidArgumentException;
import org.apache.qpid.server.filter.ArrivalTimeFilter;
import org.apache.qpid.server.filter.FilterManager;
import org.apache.qpid.server.filter.FilterManagerFactory;
import org.apache.qpid.server.filter.Filterable;
import org.apache.qpid.server.filter.MessageFilter;
-import org.apache.qpid.server.logging.EventLogger;
import org.apache.qpid.server.logging.EventLoggerProvider;
import org.apache.qpid.server.logging.LogMessage;
-import org.apache.qpid.server.logging.LogSubject;
import org.apache.qpid.server.logging.messages.ChannelMessages;
import org.apache.qpid.server.logging.messages.ExchangeMessages;
-import org.apache.qpid.server.logging.subjects.ChannelLogSubject;
import org.apache.qpid.server.message.InstanceProperties;
import org.apache.qpid.server.message.MessageDestination;
import org.apache.qpid.server.message.MessageInstance;
@@ -103,7 +98,6 @@
import org.apache.qpid.server.virtualhost.ExchangeIsAlternateException;
import org.apache.qpid.server.virtualhost.RequiredExchangeException;
import org.apache.qpid.server.virtualhost.ReservedExchangeNameException;
-import org.apache.qpid.transport.network.Ticker;
public class AMQChannel extends AbstractAMQPSession<AMQChannel, ConsumerTarget_0_8>
implements AMQSessionModel<AMQChannel, ConsumerTarget_0_8>,
@@ -155,10 +149,6 @@
/** Maps from consumer tag to subscription instance. Allows us to unsubscribe from a queue. */
private final Map<AMQShortString, ConsumerTarget_0_8> _tag2SubscriptionTargetMap = new HashMap<AMQShortString, ConsumerTarget_0_8>();
- private final Set<ConsumerTarget_0_8> _consumersWithPendingWork = new ScheduledConsumerTargetSet<>();
-
- private Iterator<ConsumerTarget_0_8> _processPendingIterator;
-
private final MessageStore _messageStore;
private final LinkedList<AsyncCommand> _unfinishedCommandsQueue = new LinkedList<AsyncCommand>();
@@ -205,7 +195,6 @@
private long _confirmedMessageCounter;
private volatile long _uncommittedMessageSize;
private final List<StoredMessage<MessageMetaData>> _uncommittedMessages = new ArrayList<>();
- private long _maxUncommittedInMemorySize;
private boolean _wireBlockingState;
@@ -231,8 +220,6 @@
_connection = connection;
_channelId = channelId;
-
- _maxUncommittedInMemorySize = connection.getContextProvider().getContextValue(Long.class, Connection.MAX_UNCOMMITTED_IN_MEMORY_SIZE);
_messageStore = messageStore;
_blockingTimeout = connection.getBroker().getContextValue(Long.class,
Broker.CHANNEL_FLOW_CONTROL_ENFORCEMENT_TIMEOUT);
@@ -1294,7 +1281,8 @@
return "("+ _suspended.get() + ", " + _closing.get() + ", " + _connection.isClosing() + ") "+"["+ _connection.toString()+":"+_channelId+"]";
}
- boolean isClosing()
+ @Override
+ public boolean isClosing()
{
return _closing.get();
}
@@ -3666,14 +3654,8 @@
}
}
- @Override
- public boolean processPending()
+ protected void updateBlockedStateIfNecessary()
{
- if (!getAMQPConnection().isIOThread() || isClosing())
- {
- return false;
- }
-
boolean desiredBlockingState = _blocking.get();
if (desiredBlockingState != _wireBlockingState)
{
@@ -3681,49 +3663,6 @@
sendFlow(!desiredBlockingState);
_blockTime = desiredBlockingState ? System.currentTimeMillis() : 0;
}
-
- if(!_consumersWithPendingWork.isEmpty() && !getAMQPConnection().isTransportBlockedForWriting())
- {
- if (_processPendingIterator == null || !_processPendingIterator.hasNext())
- {
- _processPendingIterator = _consumersWithPendingWork.iterator();
- }
-
- if(_processPendingIterator.hasNext())
- {
- ConsumerTarget_0_8 target = _processPendingIterator.next();
- _processPendingIterator.remove();
- if (target.processPending())
- {
- _consumersWithPendingWork.add(target);
- }
- }
- }
-
- return !_consumersWithPendingWork.isEmpty() && !getAMQPConnection().isTransportBlockedForWriting();
- }
-
- @Override
- public void addTicker(final Ticker ticker)
- {
- getConnection().getAggregateTicker().addTicker(ticker);
- // trigger a wakeup to ensure the ticker will be taken into account
- getAMQPConnection().notifyWork();
- }
-
- @Override
- public void removeTicker(final Ticker ticker)
- {
- getConnection().getAggregateTicker().removeTicker(ticker);
- }
-
- @Override
- public void notifyWork(final ConsumerTarget_0_8 target)
- {
- if(_consumersWithPendingWork.add(target))
- {
- getAMQPConnection().notifyWork(this);
- }
}
@Override
diff --git a/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/AMQChannelTest.java b/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/AMQChannelTest.java
index 2c7267b..55c7d9e 100644
--- a/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/AMQChannelTest.java
+++ b/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/AMQChannelTest.java
@@ -123,6 +123,7 @@
when(_amqConnection.getContextProvider()).thenReturn(_virtualHost);
when(_amqConnection.getContextValue(Long.class, Session.PRODUCER_AUTH_CACHE_TIMEOUT)).thenReturn(Session.PRODUCER_AUTH_CACHE_TIMEOUT_DEFAULT);
when(_amqConnection.getContextValue(Integer.class, Session.PRODUCER_AUTH_CACHE_SIZE)).thenReturn(Session.PRODUCER_AUTH_CACHE_SIZE_DEFAULT);
+ when(_amqConnection.getContextValue(Long.class, Connection.MAX_UNCOMMITTED_IN_MEMORY_SIZE)).thenReturn(Connection.DEFAULT_MAX_UNCOMMITTED_IN_MEMORY_SIZE);
when(_amqConnection.getTaskExecutor()).thenReturn(taskExecutor);
when(_amqConnection.getChildExecutor()).thenReturn(taskExecutor);
when(_amqConnection.getModel()).thenReturn(BrokerModel.getInstance());
diff --git a/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Session_1_0.java b/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Session_1_0.java
index 1853e8e..4def54f 100644
--- a/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Session_1_0.java
+++ b/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Session_1_0.java
@@ -125,8 +125,11 @@
import org.apache.qpid.server.txn.ServerTransaction;
import org.apache.qpid.server.util.Action;
import org.apache.qpid.server.util.ConnectionScopedRuntimeException;
+<<<<<<< HEAD
import org.apache.qpid.server.virtualhost.QueueManagingVirtualHost;
import org.apache.qpid.transport.network.Ticker;
+=======
+>>>>>>> QPID-7633: Pull up the processPendingIterator and associated methods.
public class Session_1_0 extends AbstractAMQPSession<Session_1_0, ConsumerTarget_1_0>
implements AMQSessionModel<Session_1_0, ConsumerTarget_1_0>, LogSubject
@@ -149,8 +152,6 @@
private final ConfigurationChangeListener _consumerClosedListener = new ConsumerClosedListener();
private final CopyOnWriteArrayList<ConsumerListener> _consumerListeners = new CopyOnWriteArrayList<ConsumerListener>();
private Session<?> _modelObject = this;
- private final Set<ConsumerTarget_1_0> _consumersWithPendingWork = new ScheduledConsumerTargetSet<>();
- private Iterator<ConsumerTarget_1_0> _processPendingIterator;
private SessionState _sessionState;
@@ -1920,56 +1921,15 @@
}
@Override
- public boolean processPending()
+ protected void updateBlockedStateIfNecessary()
{
- if (!getAMQPConnection().isIOThread() || END_STATES.contains(getSessionState()))
- {
- return false;
- }
-
- if(!_consumersWithPendingWork.isEmpty() && !getAMQPConnection().isTransportBlockedForWriting())
- {
- if (_processPendingIterator == null || !_processPendingIterator.hasNext())
- {
- _processPendingIterator = _consumersWithPendingWork.iterator();
- }
-
- if(_processPendingIterator.hasNext())
- {
- ConsumerTarget_1_0 target = _processPendingIterator.next();
- _processPendingIterator.remove();
- if (target.processPending())
- {
- _consumersWithPendingWork.add(target);
- }
- }
- }
-
- return !_consumersWithPendingWork.isEmpty() && !getAMQPConnection().isTransportBlockedForWriting();
}
@Override
- public void addTicker(final Ticker ticker)
+ public boolean isClosing()
{
- getConnection().getAggregateTicker().addTicker(ticker);
- // trigger a wakeup to ensure the ticker will be taken into account
- getAMQPConnection().notifyWork();
- }
-
- @Override
- public void removeTicker(final Ticker ticker)
- {
- getConnection().getAggregateTicker().removeTicker(ticker);
- }
-
- @Override
- public void notifyWork(final ConsumerTarget_1_0 target)
- {
- if(_consumersWithPendingWork.add(target))
- {
- getAMQPConnection().notifyWork(this);
- }
+ return END_STATES.contains(getSessionState());
}
@Override