QPID-7774: [AMQP 0-8..0-91] Ensure failover latch is nulled on all paths following a successful failover
diff --git a/client/src/main/java/org/apache/qpid/client/AMQConnection.java b/client/src/main/java/org/apache/qpid/client/AMQConnection.java
index c629414..5ffb11e 100644
--- a/client/src/main/java/org/apache/qpid/client/AMQConnection.java
+++ b/client/src/main/java/org/apache/qpid/client/AMQConnection.java
@@ -43,6 +43,7 @@
import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Executors;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.ScheduledExecutorService;
diff --git a/client/src/main/java/org/apache/qpid/client/AMQProtocolHandler.java b/client/src/main/java/org/apache/qpid/client/AMQProtocolHandler.java
index 33348a3..cb154ea 100644
--- a/client/src/main/java/org/apache/qpid/client/AMQProtocolHandler.java
+++ b/client/src/main/java/org/apache/qpid/client/AMQProtocolHandler.java
@@ -277,14 +277,24 @@
// the fail over.
setFailoverLatch(new CountDownLatch(1));
- // We wake up listeners. If they can handle failover, they will extend the
- // FailoverRetrySupport class and will in turn block on the latch until failover
- // has completed before retrying the operation.
- notifyFailoverStarting();
+ try
+ {
+ // We wake up listeners. If they can handle failover, they will extend the
+ // FailoverRetrySupport class and will in turn block on the latch until failover
+ // has completed before retrying the operation.
+ notifyFailoverStarting();
- getConnection().doWithAllLocks(_failoverHandler);
-
- getFailoverLatch().countDown();
+ getConnection().doWithAllLocks(_failoverHandler);
+ }
+ finally
+ {
+ CountDownLatch failoverLatch = getFailoverLatch();
+ if (failoverLatch != null)
+ {
+ failoverLatch.countDown();
+ setFailoverLatch(null);
+ }
+ }
}
});
}
@@ -731,9 +741,9 @@
{
if (_failoverLatch != null)
{
- if(!_failoverLatch.await(MAXIMUM_STATE_WAIT_TIME, TimeUnit.MILLISECONDS))
+ if (!_failoverLatch.await(MAXIMUM_STATE_WAIT_TIME, TimeUnit.MILLISECONDS))
{
-
+ _logger.debug("Timed out after waiting {}ms for failover to complete.", MAXIMUM_STATE_WAIT_TIME);
}
}
}
diff --git a/client/src/main/java/org/apache/qpid/client/FailoverHandler.java b/client/src/main/java/org/apache/qpid/client/FailoverHandler.java
index bc89dc6..d90115e 100644
--- a/client/src/main/java/org/apache/qpid/client/FailoverHandler.java
+++ b/client/src/main/java/org/apache/qpid/client/FailoverHandler.java
@@ -52,6 +52,7 @@
/**
* Performs the failover procedure.
*/
+ @Override
public void run()
{
AMQConnection connection = _amqProtocolHandler.getConnection();
@@ -82,10 +83,6 @@
AMQDisconnectedException cause = new AMQDisconnectedException("Failover was vetoed by client", null);
connection.closed(cause);
-
- _amqProtocolHandler.getFailoverLatch().countDown();
- _amqProtocolHandler.setFailoverLatch(null);
-
return;
}