Merge branch 'CURATOR-331' of https://github.com/julnamoo/curator into CURATOR-331
diff --git a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/locks/InterProcessSemaphoreV2.java b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/locks/InterProcessSemaphoreV2.java
index 3d96be2..2b9d48d 100644
--- a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/locks/InterProcessSemaphoreV2.java
+++ b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/locks/InterProcessSemaphoreV2.java
@@ -44,6 +44,7 @@
import java.util.Collection;
import java.util.List;
import java.util.Set;
+import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import org.apache.curator.utils.PathUtils;
@@ -327,6 +328,9 @@
RETRY_DUE_TO_MISSING_NODE
}
+ static volatile CountDownLatch debugAcquireLatch = null;
+ static volatile CountDownLatch debugFailedGetChildrenLatch = null;
+
private InternalAcquireResult internalAcquire1Lease(ImmutableList.Builder<Lease> builder, long startMs, boolean hasWait, long waitMs) throws Exception
{
if ( client.getState() != CuratorFrameworkState.STARTED )
@@ -356,11 +360,29 @@
String nodeName = ZKPaths.getNodeFromPath(path);
lease = makeLease(path);
+ if ( debugAcquireLatch != null )
+ {
+ debugAcquireLatch.await();
+ }
+
synchronized(this)
{
for(;;)
{
- List<String> children = client.getChildren().usingWatcher(watcher).forPath(leasesPath);
+ List<String> children;
+ try
+ {
+ children = client.getChildren().usingWatcher(watcher).forPath(leasesPath);
+ }
+ catch ( Exception e )
+ {
+ if ( debugFailedGetChildrenLatch != null )
+ {
+ debugFailedGetChildrenLatch.countDown();
+ }
+ returnLease(lease); // otherwise the just created ZNode will be orphaned causing a dead lock
+ throw e;
+ }
if ( !children.contains(nodeName) )
{
log.error("Sequential path not found: " + path);
diff --git a/curator-recipes/src/test/java/org/apache/curator/framework/recipes/locks/TestInterProcessSemaphore.java b/curator-recipes/src/test/java/org/apache/curator/framework/recipes/locks/TestInterProcessSemaphore.java
index ad45d90..216c2a2 100644
--- a/curator-recipes/src/test/java/org/apache/curator/framework/recipes/locks/TestInterProcessSemaphore.java
+++ b/curator-recipes/src/test/java/org/apache/curator/framework/recipes/locks/TestInterProcessSemaphore.java
@@ -20,14 +20,19 @@
package org.apache.curator.framework.recipes.locks;
import com.google.common.collect.Lists;
-import org.apache.curator.framework.api.CuratorWatcher;
-import org.apache.curator.test.BaseClassForTests;
-import org.apache.curator.utils.CloseableUtils;
+import com.google.common.collect.Queues;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
+import org.apache.curator.framework.api.CuratorWatcher;
import org.apache.curator.framework.recipes.shared.SharedCount;
+import org.apache.curator.framework.state.ConnectionState;
+import org.apache.curator.framework.state.ConnectionStateListener;
+import org.apache.curator.retry.RetryNTimes;
import org.apache.curator.retry.RetryOneTime;
+import org.apache.curator.test.BaseClassForTests;
import org.apache.curator.test.Timing;
+import org.apache.curator.utils.CloseableUtils;
+import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.Watcher;
import org.testng.Assert;
@@ -35,6 +40,7 @@
import java.util.Collection;
import java.util.List;
import java.util.Random;
+import java.util.concurrent.BlockingQueue;
import java.util.concurrent.Callable;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorCompletionService;
@@ -49,6 +55,126 @@
public class TestInterProcessSemaphore extends BaseClassForTests
{
@Test
+ public void testAcquireAfterLostServer() throws Exception
+ {
+ // CURATOR-335
+
+ final String SEMAPHORE_PATH = "/test";
+ final int MAX_SEMAPHORES = 1;
+ final int NUM_CLIENTS = 10;
+
+ ExecutorService executor = Executors.newFixedThreadPool(NUM_CLIENTS);
+
+ final Timing timing = new Timing();
+
+ final CuratorFramework client = CuratorFrameworkFactory.newClient(server.getConnectString(), timing.forWaiting().milliseconds(), timing.connection(), new RetryOneTime(1)); // long session time on purpose
+ try
+ {
+ client.start();
+
+ InterProcessSemaphoreV2.debugAcquireLatch = new CountDownLatch(1); // cause one of the semaphores to create its node and then wait
+ InterProcessSemaphoreV2.debugFailedGetChildrenLatch = new CountDownLatch(1); // semaphore will notify when getChildren() fails
+ final CountDownLatch isReadyLatch = new CountDownLatch(NUM_CLIENTS);
+ final BlockingQueue<Boolean> acquiredQueue = Queues.newLinkedBlockingQueue();
+ Runnable runner = new Runnable()
+ {
+ @Override
+ public void run()
+ {
+ while ( !Thread.currentThread().isInterrupted() )
+ {
+ InterProcessSemaphoreV2 semaphore = new InterProcessSemaphoreV2(client, SEMAPHORE_PATH, MAX_SEMAPHORES);
+ Lease lease = null;
+ try
+ {
+ isReadyLatch.countDown();
+ lease = semaphore.acquire();
+ acquiredQueue.add(true);
+ timing.sleepABit();
+ }
+ catch ( InterruptedException e )
+ {
+ Thread.currentThread().interrupt();
+ break;
+ }
+ catch ( KeeperException e )
+ {
+ try
+ {
+ timing.sleepABit();
+ }
+ catch ( InterruptedException e2 )
+ {
+ Thread.currentThread().interrupt();
+ break;
+ }
+ }
+ catch ( Exception ignore )
+ {
+ // ignore
+ }
+ finally
+ {
+ if ( lease != null )
+ {
+ semaphore.returnLease(lease);
+ }
+ }
+ }
+ }
+ };
+ for ( int i = 0; i < NUM_CLIENTS; ++i )
+ {
+ executor.execute(runner);
+ }
+ Assert.assertTrue(timing.awaitLatch(isReadyLatch));
+ timing.sleepABit();
+
+ final CountDownLatch lostLatch = new CountDownLatch(1);
+ final CountDownLatch restartedLatch = new CountDownLatch(1);
+ client.getConnectionStateListenable().addListener(new ConnectionStateListener()
+ {
+ @Override
+ public void stateChanged(CuratorFramework client, ConnectionState newState)
+ {
+ if ( newState == ConnectionState.LOST )
+ {
+ lostLatch.countDown();
+ }
+ else if ( newState == ConnectionState.RECONNECTED )
+ {
+ restartedLatch.countDown();
+ }
+ }
+ });
+
+ timing.sleepABit();
+ server.stop();
+ Assert.assertTrue(timing.awaitLatch(lostLatch));
+ InterProcessSemaphoreV2.debugAcquireLatch.countDown(); // the waiting semaphore proceeds to getChildren - which should fail
+ Assert.assertTrue(timing.awaitLatch(InterProcessSemaphoreV2.debugFailedGetChildrenLatch)); // wait until getChildren fails
+
+ server.restart();
+
+ Assert.assertTrue(timing.awaitLatch(restartedLatch));
+ for ( int i = 0; i < NUM_CLIENTS; ++i )
+ {
+ // acquires should continue as normal after server restart
+ Boolean polled = acquiredQueue.poll(timing.forWaiting().milliseconds(), TimeUnit.MILLISECONDS);
+ if ( (polled == null) || !polled )
+ {
+ Assert.fail("Semaphores not reacquired after restart");
+ }
+ }
+ }
+ finally
+ {
+ executor.shutdownNow();
+ CloseableUtils.closeQuietly(client);
+ }
+ }
+
+ @Test
public void testThreadedLeaseIncrease() throws Exception
{
final Timing timing = new Timing();
@@ -551,13 +677,13 @@
Assert.assertTrue(client.getChildren().forPath("/test").size() > 0);
childReaper = new ChildReaper(
- client,
- "/test",
- Reaper.Mode.REAP_UNTIL_GONE,
- ChildReaper.newExecutorService(),
- 1,
- "/test-leader",
- InterProcessSemaphoreV2.LOCK_SCHEMA
+ client,
+ "/test",
+ Reaper.Mode.REAP_UNTIL_GONE,
+ ChildReaper.newExecutorService(),
+ 1,
+ "/test-leader",
+ InterProcessSemaphoreV2.LOCK_SCHEMA
);
childReaper.start();
@@ -591,18 +717,23 @@
Assert.assertEquals(childNodes.size(), 1);
final CountDownLatch nodeCreatedLatch = new CountDownLatch(1);
- client.getChildren().usingWatcher(new CuratorWatcher() {
+ client.getChildren().usingWatcher(new CuratorWatcher()
+ {
@Override
- public void process(WatchedEvent event) throws Exception {
- if (event.getType() == Watcher.Event.EventType.NodeCreated) {
+ public void process(WatchedEvent event) throws Exception
+ {
+ if ( event.getType() == Watcher.Event.EventType.NodeCreated )
+ {
nodeCreatedLatch.countDown();
}
}
}).forPath("/test/leases");
- final Future<Lease> leaseFuture = executor.submit(new Callable<Lease>() {
+ final Future<Lease> leaseFuture = executor.submit(new Callable<Lease>()
+ {
@Override
- public Lease call() throws Exception {
+ public Lease call() throws Exception
+ {
return semaphore.acquire(timing.forWaiting().multiple(2).seconds(), TimeUnit.SECONDS);
}
});
@@ -610,8 +741,10 @@
// wait for second lease to create its node
timing.awaitLatch(nodeCreatedLatch);
String newNode = null;
- for (String c : client.getChildren().forPath("/test/leases")) {
- if (!childNodes.contains(c)) {
+ for ( String c : client.getChildren().forPath("/test/leases") )
+ {
+ if ( !childNodes.contains(c) )
+ {
newNode = c;
}
}