QPID-7774: Improve locking when using failover latch
diff --git a/client/src/main/java/org/apache/qpid/client/AMQConnection.java b/client/src/main/java/org/apache/qpid/client/AMQConnection.java
index 5ffb11e..c629414 100644
--- a/client/src/main/java/org/apache/qpid/client/AMQConnection.java
+++ b/client/src/main/java/org/apache/qpid/client/AMQConnection.java
@@ -43,7 +43,6 @@
import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
-import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Executors;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.ScheduledExecutorService;
diff --git a/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_0_10.java b/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_0_10.java
index b58ff1e..ba3e4ae 100644
--- a/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_0_10.java
+++ b/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_0_10.java
@@ -331,53 +331,53 @@
final ConnectionClose close = exc.getClose();
if (close == null || close.getReplyCode() == ConnectionCloseCode.CONNECTION_FORCED)
{
- _conn.getProtocolHandler().setFailoverLatch(new CountDownLatch(1));
-
- _qpidConnection.notifyFailoverRequired();
-
+ final CountDownLatch failoverLatch = new CountDownLatch(1);
+ _conn.getProtocolHandler().setFailoverLatch(failoverLatch);
final AtomicBoolean failoverDone = new AtomicBoolean();
-
- _conn.doWithAllLocks(new Runnable()
+ try
{
- @Override
- public void run()
+ _qpidConnection.notifyFailoverRequired();
+ _conn.doWithAllLocks(new Runnable()
{
- try
+ @Override
+ public void run()
{
- boolean preFailover = _conn.firePreFailover(false);
- if (preFailover)
+ try
{
- boolean reconnected;
- if(exc instanceof RedirectConnectionException)
+ boolean preFailover = _conn.firePreFailover(false);
+ if (preFailover)
{
- RedirectConnectionException redirect = (RedirectConnectionException)exc;
- reconnected = attemptRedirection(redirect.getHost(), redirect.getKnownHosts());
- }
- else
- {
- reconnected = _conn.attemptReconnection();
- }
- if(reconnected)
- {
- failoverPrep();
- _conn.resubscribeSessions();
- _conn.fireFailoverComplete();
- failoverDone.set(true);
+ boolean reconnected;
+ if (exc instanceof RedirectConnectionException)
+ {
+ RedirectConnectionException redirect = (RedirectConnectionException) exc;
+ reconnected = attemptRedirection(redirect.getHost(), redirect.getKnownHosts());
+ }
+ else
+ {
+ reconnected = _conn.attemptReconnection();
+ }
+ if (reconnected)
+ {
+ failoverPrep();
+ _conn.resubscribeSessions();
+ _conn.fireFailoverComplete();
+ failoverDone.set(true);
+ }
}
}
+ catch (Exception e)
+ {
+ _logger.error("error during failover", e);
+ }
}
- catch (Exception e)
- {
- _logger.error("error during failover", e);
- }
- finally
- {
- _conn.getProtocolHandler().getFailoverLatch().countDown();
- _conn.getProtocolHandler().setFailoverLatch(null);
- }
-
- }
- });
+ });
+ }
+ finally
+ {
+ failoverLatch.countDown();
+ _conn.getProtocolHandler().setFailoverLatch(null);
+ }
if (failoverDone.get())
diff --git a/client/src/main/java/org/apache/qpid/client/AMQProtocolHandler.java b/client/src/main/java/org/apache/qpid/client/AMQProtocolHandler.java
index cb154ea..f8ec8a8 100644
--- a/client/src/main/java/org/apache/qpid/client/AMQProtocolHandler.java
+++ b/client/src/main/java/org/apache/qpid/client/AMQProtocolHandler.java
@@ -275,7 +275,8 @@
// Create a latch, upon which tasks that must not run in parallel with a failover can wait for completion of
// the fail over.
- setFailoverLatch(new CountDownLatch(1));
+ final CountDownLatch failoverLatch = new CountDownLatch(1);
+ setFailoverLatch(failoverLatch);
try
{
@@ -288,12 +289,8 @@
}
finally
{
- CountDownLatch failoverLatch = getFailoverLatch();
- if (failoverLatch != null)
- {
- failoverLatch.countDown();
- setFailoverLatch(null);
- }
+ failoverLatch.countDown();
+ setFailoverLatch(null);
}
}
});
@@ -737,14 +734,12 @@
public void blockUntilNotFailingOver() throws InterruptedException
{
- synchronized(_failoverLatchChange)
+ CountDownLatch failoverLatch = getFailoverLatch();
+ if (failoverLatch != null)
{
- if (_failoverLatch != null)
+ if (!failoverLatch.await(MAXIMUM_STATE_WAIT_TIME, TimeUnit.MILLISECONDS))
{
- if (!_failoverLatch.await(MAXIMUM_STATE_WAIT_TIME, TimeUnit.MILLISECONDS))
- {
- _logger.debug("Timed out after waiting {}ms for failover to complete.", MAXIMUM_STATE_WAIT_TIME);
- }
+ _logger.debug("Timed out after waiting {}ms for failover to complete.", MAXIMUM_STATE_WAIT_TIME);
}
}
}
@@ -762,7 +757,7 @@
return queueName.replaceAll("_+", "_");
}
- public CountDownLatch getFailoverLatch()
+ CountDownLatch getFailoverLatch()
{
synchronized (_failoverLatchChange)
{
@@ -770,7 +765,7 @@
}
}
- public void setFailoverLatch(CountDownLatch failoverLatch)
+ void setFailoverLatch(CountDownLatch failoverLatch)
{
synchronized (_failoverLatchChange)
{