QPID-8185: [JMS AMQP 0-x][AMQP 0-8..0-91] Stop handling incoming frames on session after sending channel.close
Cherry-picked from f89f6c2f45d11fc63551d0d61c17eceedd6bd247
diff --git a/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolSession.java b/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolSession.java
index 51eb4cf..9b5c6a8 100644
--- a/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolSession.java
+++ b/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolSession.java
@@ -42,6 +42,8 @@
import org.apache.qpid.framing.AMQDataBlock;
import org.apache.qpid.framing.AMQMethodBody;
import org.apache.qpid.framing.AMQShortString;
+import org.apache.qpid.framing.ChannelCloseBody;
+import org.apache.qpid.framing.ChannelCloseOkBody;
import org.apache.qpid.framing.ContentBody;
import org.apache.qpid.framing.ContentHeaderBody;
import org.apache.qpid.framing.FieldTable;
@@ -229,6 +231,12 @@
@Override
public void contentHeaderReceived(int channelId, ContentHeaderBody contentHeader) throws QpidException
{
+ if (isClosedForInput(channelId))
+ {
+ _logger.debug("Ignoring content header as channel {} closed", channelId);
+ return;
+ }
+
final UnprocessedMessage_0_8 msg = (UnprocessedMessage_0_8) ((channelId & FAST_CHANNEL_ACCESS_MASK) == 0 ? _channelId2UnprocessedMsgArray[channelId]
: _channelId2UnprocessedMsgMap.get(channelId));
@@ -252,6 +260,11 @@
@Override
public void contentBodyReceived(final int channelId, ContentBody contentBody) throws QpidException
{
+ if (isClosedForInput(channelId))
+ {
+ _logger.debug("Ignoring content body as channel {} closed", channelId);
+ return;
+ }
UnprocessedMessage_0_8 msg;
final boolean fastAccess = (channelId & FAST_CHANNEL_ACCESS_MASK) == 0;
if (fastAccess)
@@ -469,7 +482,16 @@
@Override
public void methodFrameReceived(final int channel, final AMQMethodBody amqMethodBody) throws QpidException
{
- _protocolHandler.methodBodyReceived(channel, amqMethodBody);
+ if ( channel == 0
+ || !isClosedForInput(channel)
+ || (isClosing(channel) && (amqMethodBody instanceof ChannelCloseBody || amqMethodBody instanceof ChannelCloseOkBody)))
+ {
+ _protocolHandler.methodBodyReceived(channel, amqMethodBody);
+ }
+ else
+ {
+ _logger.debug("Ignoring method {} as channel {} closed on {}", amqMethodBody, channel);
+ }
}
public void notifyError(Exception error)
@@ -514,4 +536,16 @@
{
return _methodProcessor;
}
+
+ private boolean isClosing(final int channelId)
+ {
+ return _closingChannels.containsKey(channelId);
+ }
+
+ private boolean isClosedForInput(final int channelId)
+ {
+ AMQSession session;
+ return channelId > 0 && ((session = _connection.getSession(channelId)) == null || session.isClosed());
+ }
+
}