Issue 257: Fixed a race condition in LeaderLatch that cause the recipe to create
two nodes in some edge cases.
diff --git a/CHANGES.txt b/CHANGES.txt
index b3386cc..de9f3a4 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,3 +1,8 @@
+1.3.4 - xxxxxxxxxxxxx
+=====================
+* Issue 257: Fixed a race condition in LeaderLatch that cause the recipe to create
+two nodes in some edge cases.
+
1.3.3 - March 6, 2013
=====================
* Issue 250: Restore support for passing null to usingWatcher().
diff --git a/curator-recipes/src/main/java/com/netflix/curator/framework/recipes/leader/LeaderLatch.java b/curator-recipes/src/main/java/com/netflix/curator/framework/recipes/leader/LeaderLatch.java
index ae0f464..13b8851 100644
--- a/curator-recipes/src/main/java/com/netflix/curator/framework/recipes/leader/LeaderLatch.java
+++ b/curator-recipes/src/main/java/com/netflix/curator/framework/recipes/leader/LeaderLatch.java
@@ -16,6 +16,7 @@
package com.netflix.curator.framework.recipes.leader;
+import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import com.netflix.curator.framework.CuratorFramework;
import com.netflix.curator.framework.api.BackgroundCallback;
@@ -37,6 +38,7 @@
import java.io.IOException;
import java.util.Collection;
import java.util.List;
+import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
@@ -57,6 +59,7 @@
private final String id;
private final AtomicReference<State> state = new AtomicReference<State>(State.LATENT);
private final AtomicBoolean hasLeadership = new AtomicBoolean(false);
+ private final AtomicReference<String> ourPath = new AtomicReference<String>();
private final ConnectionStateListener listener = new ConnectionStateListener()
{
@@ -67,8 +70,6 @@
}
};
- private volatile String ourPath = null;
-
private static final String LOCK_NAME = "latch-";
private static final LockInternalsSorter sorter = new LockInternalsSorter()
@@ -135,7 +136,7 @@
try
{
- deleteNode();
+ setNode(null);
}
catch ( Exception e )
{
@@ -308,19 +309,29 @@
return (state.get() == State.STARTED) && hasLeadership.get();
}
- private void reset() throws Exception
+ @VisibleForTesting
+ volatile CountDownLatch debugResetWaitLatch = null;
+
+ @VisibleForTesting
+ void reset() throws Exception
{
setLeadership(false);
- deleteNode();
+ setNode(null);
BackgroundCallback callback = new BackgroundCallback()
{
@Override
public void processResult(CuratorFramework client, CuratorEvent event) throws Exception
{
+ if ( debugResetWaitLatch != null )
+ {
+ debugResetWaitLatch.await();
+ debugResetWaitLatch = null;
+ }
+
if ( event.getResultCode() == KeeperException.Code.OK.intValue() )
{
- ourPath = event.getName();
+ setNode(event.getName());
getChildren();
}
else
@@ -334,8 +345,9 @@
private void checkLeadership(List<String> children) throws Exception
{
+ final String localOurPath = ourPath.get();
List<String> sortedChildren = LockInternals.getSortedChildren(LOCK_NAME, sorter, children);
- int ourIndex = (ourPath != null) ? sortedChildren.indexOf(ZKPaths.getNodeFromPath(ourPath)) : -1;
+ int ourIndex = (localOurPath != null) ? sortedChildren.indexOf(ZKPaths.getNodeFromPath(localOurPath)) : -1;
if ( ourIndex < 0 )
{
log.error("Can't find our node. Resetting. Index: " + ourIndex);
@@ -353,7 +365,7 @@
@Override
public void process(WatchedEvent event)
{
- if ( (state.get() == State.STARTED) && (event.getType() == Event.EventType.NodeDeleted) && (ourPath != null) )
+ if ( (state.get() == State.STARTED) && (event.getType() == Event.EventType.NodeDeleted) && (localOurPath != null) )
{
try
{
@@ -438,19 +450,12 @@
notifyAll();
}
- private void deleteNode() throws Exception
+ private void setNode(String newValue) throws Exception
{
- String localPath = ourPath;
- if ( localPath != null )
+ String oldPath = ourPath.getAndSet(newValue);
+ if ( oldPath != null )
{
- try
- {
- client.delete().guaranteed().inBackground().forPath(localPath);
- }
- finally
- {
- ourPath = null;
- }
+ client.delete().guaranteed().inBackground().forPath(oldPath);
}
}
}
diff --git a/curator-recipes/src/test/java/com/netflix/curator/framework/recipes/leader/TestLeaderLatch.java b/curator-recipes/src/test/java/com/netflix/curator/framework/recipes/leader/TestLeaderLatch.java
index 90f2837..b292948 100644
--- a/curator-recipes/src/test/java/com/netflix/curator/framework/recipes/leader/TestLeaderLatch.java
+++ b/curator-recipes/src/test/java/com/netflix/curator/framework/recipes/leader/TestLeaderLatch.java
@@ -43,6 +43,36 @@
private static final int MAX_LOOPS = 5;
@Test
+ public void testResetRace() throws Exception
+ {
+ Timing timing = new Timing();
+ LeaderLatch latch = null;
+ CuratorFramework client = CuratorFrameworkFactory.newClient(server.getConnectString(), timing.session(), timing.connection(), new RetryOneTime(1));
+ try
+ {
+ client.start();
+ latch = new LeaderLatch(client, PATH_NAME);
+
+ latch.debugResetWaitLatch = new CountDownLatch(1);
+ latch.start(); // will call reset()
+ latch.reset(); // should not result in two nodes
+
+ timing.sleepABit();
+
+ latch.debugResetWaitLatch.countDown();
+
+ timing.sleepABit();
+
+ Assert.assertEquals(client.getChildren().forPath(PATH_NAME).size(), 1);
+ }
+ finally
+ {
+ Closeables.closeQuietly(latch);
+ Closeables.closeQuietly(client);
+ }
+ }
+
+ @Test
public void testLostConnection() throws Exception
{
final int PARTICIPANT_QTY = 10;