QPID-8185: [JMS AMQP 0-x][AMQP 0-8..0-91] Make sure that client closes TCP connection on failure with sending connection.close
diff --git a/client/src/main/java/org/apache/qpid/client/AMQConnection.java b/client/src/main/java/org/apache/qpid/client/AMQConnection.java
index c629414..4426fb6 100644
--- a/client/src/main/java/org/apache/qpid/client/AMQConnection.java
+++ b/client/src/main/java/org/apache/qpid/client/AMQConnection.java
@@ -1203,7 +1203,7 @@
}
catch (JMSException e)
{
- _logger.error("Error closing connection", e);
+ _logger.warn("Error closing connection", e);
throw JMSExceptionHelper.chainJMSException(new JMSException("Error closing connection: " + e), e);
}
finally
@@ -1271,7 +1271,7 @@
}
catch (JMSException e)
{
- _logger.error("Error closing session: " + e);
+ _logger.warn("Error closing session: " + e);
sessionException = e;
}
}
diff --git a/client/src/main/java/org/apache/qpid/client/AMQProtocolHandler.java b/client/src/main/java/org/apache/qpid/client/AMQProtocolHandler.java
index 5d59e50..6b50f90 100644
--- a/client/src/main/java/org/apache/qpid/client/AMQProtocolHandler.java
+++ b/client/src/main/java/org/apache/qpid/client/AMQProtocolHandler.java
@@ -672,10 +672,18 @@
/** More convenient method to write a frame and wait for it's response. */
public AMQMethodEvent syncWrite(AMQFrame frame, Class responseClass, long timeout) throws QpidException, FailoverException
{
- return writeCommandFrameAndWaitForReply(frame, new SpecificMethodFrameListener(frame.getChannel(), responseClass),
+ return writeCommandFrameAndWaitForReply(frame,
+ new SpecificMethodFrameListener(frame.getChannel(),
+ responseClass,
+ getConnectionDetails()),
timeout);
}
+ public String getConnectionDetails()
+ {
+ return getLocalAddress() + "-" + getRemoteAddress();
+ }
+
public void closeSession(AMQSession session) throws QpidException
{
_protocolSession.closeSession(session);
@@ -707,17 +715,19 @@
final AMQFrame frame = body.generateFrame(0);
syncWrite(frame, ConnectionCloseOkBody.class, timeout);
- _network.close();
- closed();
- }
+ }
catch (AMQTimeoutException e)
{
- closed();
+ _logger.debug("Timeout on sending connection close : " + e);
}
catch (FailoverException e)
{
_logger.debug("FailoverException interrupted connection close, ignoring as connection closed anyway.");
}
+ finally
+ {
+ _network.close();
+ }
}
}
diff --git a/client/src/main/java/org/apache/qpid/client/AMQSession_0_8.java b/client/src/main/java/org/apache/qpid/client/AMQSession_0_8.java
index 1acb3a1..6c7738a 100644
--- a/client/src/main/java/org/apache/qpid/client/AMQSession_0_8.java
+++ b/client/src/main/java/org/apache/qpid/client/AMQSession_0_8.java
@@ -929,7 +929,7 @@
public QueueDeclareOkHandler()
{
- super(getChannelId(), QueueDeclareOkBody.class);
+ super(getChannelId(), QueueDeclareOkBody.class, getProtocolHandler().getConnectionDetails());
}
public boolean processMethod(int channelId, AMQMethodBody frame) //throws AMQException
diff --git a/client/src/main/java/org/apache/qpid/client/BasicMessageProducer_0_8.java b/client/src/main/java/org/apache/qpid/client/BasicMessageProducer_0_8.java
index 41166e0..8412849 100644
--- a/client/src/main/java/org/apache/qpid/client/BasicMessageProducer_0_8.java
+++ b/client/src/main/java/org/apache/qpid/client/BasicMessageProducer_0_8.java
@@ -362,17 +362,19 @@
&& (connectionDelegate80.isConfirmedPublishSupported()
|| (!getSession().isTransacted() && connectionDelegate80.isConfirmedPublishNonTransactionalSupported()));
+ AMQProtocolHandler protocolHandler = getConnection().getProtocolHandler();
if(!useConfirms)
{
- getConnection().getProtocolHandler().writeFrame(compositeFrame);
+ protocolHandler.writeFrame(compositeFrame);
}
else
{
- final PublishConfirmMessageListener frameListener = new PublishConfirmMessageListener(getChannelId());
+ final PublishConfirmMessageListener frameListener = new PublishConfirmMessageListener(getChannelId(),
+ protocolHandler.getConnectionDetails());
try
{
- getConnection().getProtocolHandler().writeCommandFrameAndWaitForReply(compositeFrame,
+ protocolHandler.writeCommandFrameAndWaitForReply(compositeFrame,
frameListener);
if(frameListener.isRejected())
@@ -468,9 +470,9 @@
*
* @param channelId The channel id to filter incoming methods with.
*/
- public PublishConfirmMessageListener(final int channelId)
+ public PublishConfirmMessageListener(final int channelId, String connectionDetails)
{
- super(channelId);
+ super(channelId, connectionDetails);
}
@Override
diff --git a/client/src/main/java/org/apache/qpid/client/protocol/BlockingMethodFrameListener.java b/client/src/main/java/org/apache/qpid/client/protocol/BlockingMethodFrameListener.java
index 6618b34..5c56ad7 100644
--- a/client/src/main/java/org/apache/qpid/client/protocol/BlockingMethodFrameListener.java
+++ b/client/src/main/java/org/apache/qpid/client/protocol/BlockingMethodFrameListener.java
@@ -55,6 +55,7 @@
public abstract class BlockingMethodFrameListener extends BlockingWaiter<AMQMethodEvent> implements AMQMethodListener
{
+ private final String _connectionDetails;
/** Holds the channel id for the channel upon which this listener is waiting for a response. */
private int _channelId;
@@ -62,10 +63,12 @@
* Creates a new method listener, that filters incoming method to just those that match the specified channel id.
*
* @param channelId The channel id to filter incoming methods with.
+ * @param connectionDetails
*/
- public BlockingMethodFrameListener(int channelId)
+ public BlockingMethodFrameListener(int channelId, final String connectionDetails)
{
_channelId = channelId;
+ _connectionDetails = connectionDetails;
}
/**
@@ -121,4 +124,9 @@
}
}
+ @Override
+ public String getConnectionDetails()
+ {
+ return _connectionDetails;
+ }
}
diff --git a/client/src/main/java/org/apache/qpid/client/state/StateWaiter.java b/client/src/main/java/org/apache/qpid/client/state/StateWaiter.java
index 5f0a935..ce7c03a 100644
--- a/client/src/main/java/org/apache/qpid/client/state/StateWaiter.java
+++ b/client/src/main/java/org/apache/qpid/client/state/StateWaiter.java
@@ -82,6 +82,12 @@
return _awaitStates.contains(state);
}
+ @Override
+ public String getConnectionDetails()
+ {
+ return null;
+ }
+
/**
* Await for the required State to be achieved within the default timeout.
* @return The achieved state that was requested.
diff --git a/client/src/main/java/org/apache/qpid/client/state/listener/SpecificMethodFrameListener.java b/client/src/main/java/org/apache/qpid/client/state/listener/SpecificMethodFrameListener.java
index f0d7feb..9a3f733 100644
--- a/client/src/main/java/org/apache/qpid/client/state/listener/SpecificMethodFrameListener.java
+++ b/client/src/main/java/org/apache/qpid/client/state/listener/SpecificMethodFrameListener.java
@@ -28,9 +28,9 @@
{
private final Class _expectedClass;
- public SpecificMethodFrameListener(int channelId, Class expectedClass)
+ public SpecificMethodFrameListener(int channelId, Class expectedClass, final String connectionDetails)
{
- super(channelId);
+ super(channelId, connectionDetails);
_expectedClass = expectedClass;
}
diff --git a/client/src/main/java/org/apache/qpid/client/util/BlockingWaiter.java b/client/src/main/java/org/apache/qpid/client/util/BlockingWaiter.java
index 66be535..23adf3c 100644
--- a/client/src/main/java/org/apache/qpid/client/util/BlockingWaiter.java
+++ b/client/src/main/java/org/apache/qpid/client/util/BlockingWaiter.java
@@ -170,8 +170,8 @@
final String errorMsg = String.format(
"The server's response was not received within the time-out period of %d ms. "
+ "Possible reasons include: the server may be too busy, the network may be "
- + "overloaded, or this JVM itself may be too busy to process the response.",
- timeout);
+ + "overloaded, or this JVM itself may be too busy to process the response. [%s]",
+ timeout, getConnectionDetails() == null ? "" : getConnectionDetails());
_error = new AMQTimeoutException(errorMsg, null);
_ready = true;
}
@@ -338,4 +338,6 @@
return new QpidException("Waiter was closed.", null);
}
+ public abstract String getConnectionDetails();
+
}
diff --git a/client/src/test/java/org/apache/qpid/client/protocol/AMQProtocolHandlerTest.java b/client/src/test/java/org/apache/qpid/client/protocol/AMQProtocolHandlerTest.java
index 74a1809..69d180e 100644
--- a/client/src/test/java/org/apache/qpid/client/protocol/AMQProtocolHandlerTest.java
+++ b/client/src/test/java/org/apache/qpid/client/protocol/AMQProtocolHandlerTest.java
@@ -286,7 +286,7 @@
*/
public BlockToAccessFrameListener(int channelId)
{
- super(channelId);
+ super(channelId, "Test");
_logger.info("Creating a listener:" + this);
}