Merge branch 'CURATOR-462' of https://github.com/krajcsovszkig-ms/curator into CURATOR-462
diff --git a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/locks/InterProcessSemaphoreV2.java b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/locks/InterProcessSemaphoreV2.java
index 7bc98f5..03e1088 100644
--- a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/locks/InterProcessSemaphoreV2.java
+++ b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/locks/InterProcessSemaphoreV2.java
@@ -331,6 +331,7 @@
static volatile CountDownLatch debugAcquireLatch = null;
static volatile CountDownLatch debugFailedGetChildrenLatch = null;
+ volatile CountDownLatch debugWaitLatch = null;
private InternalAcquireResult internalAcquire1Lease(ImmutableList.Builder<Lease> builder, long startMs, boolean hasWait, long waitMs) throws Exception
{
@@ -353,6 +354,7 @@
}
Lease lease = null;
+ boolean success = false;
try
{
@@ -383,13 +385,11 @@
{
debugFailedGetChildrenLatch.countDown();
}
- returnLease(lease); // otherwise the just created ZNode will be orphaned causing a dead lock
throw e;
}
if ( !children.contains(nodeName) )
{
log.error("Sequential path not found: " + path);
- returnLease(lease);
return InternalAcquireResult.RETRY_DUE_TO_MISSING_NODE;
}
@@ -402,20 +402,32 @@
long thisWaitMs = getThisWaitMs(startMs, waitMs);
if ( thisWaitMs <= 0 )
{
- returnLease(lease);
return InternalAcquireResult.RETURN_NULL;
}
+ if ( debugWaitLatch != null )
+ {
+ debugWaitLatch.countDown();
+ }
wait(thisWaitMs);
}
else
{
+ if ( debugWaitLatch != null )
+ {
+ debugWaitLatch.countDown();
+ }
wait();
}
}
+ success = true;
}
}
finally
{
+ if ( !success )
+ {
+ returnLease(lease);
+ }
client.removeWatchers();
}
}
diff --git a/curator-recipes/src/test/java/org/apache/curator/framework/recipes/locks/TestInterProcessSemaphore.java b/curator-recipes/src/test/java/org/apache/curator/framework/recipes/locks/TestInterProcessSemaphore.java
index 73c76e8..50f6bce 100644
--- a/curator-recipes/src/test/java/org/apache/curator/framework/recipes/locks/TestInterProcessSemaphore.java
+++ b/curator-recipes/src/test/java/org/apache/curator/framework/recipes/locks/TestInterProcessSemaphore.java
@@ -778,4 +778,53 @@
TestCleanState.closeAndTestClean(client);
}
}
+
+ @Test
+ public void testInterruptAcquire() throws Exception
+ {
+ // CURATOR-462
+ final Timing timing = new Timing();
+ CuratorFramework client = CuratorFrameworkFactory.newClient(server.getConnectString(), timing.session(), timing.connection(), new RetryOneTime(1));
+ client.start();
+ try
+ {
+ final InterProcessSemaphoreV2 s1 = new InterProcessSemaphoreV2(client, "/test", 1);
+ final InterProcessSemaphoreV2 s2 = new InterProcessSemaphoreV2(client, "/test", 1);
+ final InterProcessSemaphoreV2 s3 = new InterProcessSemaphoreV2(client, "/test", 1);
+
+ final CountDownLatch debugWaitLatch = s2.debugWaitLatch = new CountDownLatch(1);
+
+ // Acquire exclusive semaphore
+ Lease lease = s1.acquire(timing.forWaiting().seconds(), TimeUnit.SECONDS);
+ Assert.assertNotNull(lease);
+
+ // Queue up another semaphore on the same path
+ Future<Object> handle = Executors.newSingleThreadExecutor().submit(new Callable<Object>() {
+
+ @Override
+ public Object call() throws Exception {
+ s2.acquire();
+ return null;
+ }
+ });
+
+ // Wait until second lease is created and the wait is started for it to become active
+ Assert.assertTrue(timing.awaitLatch(debugWaitLatch));
+
+ // Interrupt the wait
+ handle.cancel(true);
+
+ // Assert that the second lease is gone
+ timing.sleepABit();
+ Assert.assertEquals(client.getChildren().forPath("/test/leases").size(), 1);
+
+ // Assert that after closing the first (current) semaphore, we can acquire a new one
+ s1.returnLease(lease);
+ Assert.assertNotNull(s3.acquire(timing.forWaiting().seconds(), TimeUnit.SECONDS));
+ }
+ finally
+ {
+ TestCleanState.closeAndTestClean(client);
+ }
+ }
}