#noissue - Test case for interprocess mutex not reconnecting.
diff --git a/curator-recipes/src/test/java/org/apache/curator/framework/recipes/locks/TestInterProcessMutexNotReconnecting.java b/curator-recipes/src/test/java/org/apache/curator/framework/recipes/locks/TestInterProcessMutexNotReconnecting.java new file mode 100755 index 0000000..87468bd --- /dev/null +++ b/curator-recipes/src/test/java/org/apache/curator/framework/recipes/locks/TestInterProcessMutexNotReconnecting.java
@@ -0,0 +1,147 @@ +package org.apache.curator.framework.recipes.locks; + +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicInteger; + +import org.apache.curator.framework.CuratorFramework; +import org.apache.curator.framework.CuratorFrameworkFactory; +import org.apache.curator.framework.state.ConnectionState; +import org.apache.curator.framework.state.ConnectionStateListener; +import org.apache.curator.retry.RetryOneTime; +import org.apache.curator.test.BaseClassForTests; +import org.apache.curator.utils.CloseableUtils; +import org.apache.zookeeper.KeeperException; +import org.testng.Assert; + +public class TestInterProcessMutexNotReconnecting extends BaseClassForTests +{ + @org.testng.annotations.Test + public void test() throws Exception + { + final String SEMAPHORE_PATH = "/test"; + final int MAX_SEMAPHORES = 1; + final int NUM_CLIENTS = 10; + + server.start(); + + CuratorFramework client = null; + + ExecutorService executor = Executors.newFixedThreadPool(NUM_CLIENTS); + + final AtomicInteger counter = new AtomicInteger(0); + final AtomicBoolean run = new AtomicBoolean(true); + + try { + client = CuratorFrameworkFactory.newClient(server.getConnectString(), 5000, 5000, new RetryOneTime(1)); + client.start(); + + final CuratorFramework lClient = client; + + for(int i = 0; i < NUM_CLIENTS; ++i) + { + executor.execute(new Runnable() + { + + @Override + public void run() + { + while(run.get()) + { + InterProcessSemaphoreV2 semaphore = new InterProcessSemaphoreV2(lClient, SEMAPHORE_PATH, MAX_SEMAPHORES); + System.err.println(Thread.currentThread() + "Acquiring"); + Lease lease = null; + try + { + lease = semaphore.acquire(); + System.err.println(Thread.currentThread() + "Acquired"); + counter.incrementAndGet(); + Thread.sleep(2000); + } + catch(InterruptedException e) + { + System.err.println("Interrupted"); + Thread.currentThread().interrupt(); + break; + } + catch(KeeperException e) + { + try + { + Thread.sleep(2000); + } + catch(InterruptedException e2) + { + System.err.println("Interrupted"); + Thread.currentThread().interrupt(); + break; + } + } + catch(Exception e) + { + e.printStackTrace(); + } + finally + { + if(lease != null) { + semaphore.returnLease(lease); + } + } + } + } + }); + } + + + final AtomicBoolean lost = new AtomicBoolean(false); + client.getConnectionStateListenable().addListener(new ConnectionStateListener() { + + @Override + public void stateChanged(CuratorFramework client, ConnectionState newState) { + System.err.println("New state : " + newState); + + if(newState == ConnectionState.LOST) { + lost.set(true); + } + } + }); + + Thread.sleep(2000); + + System.err.println("Stopping server"); + server.stop(); + System.err.println("Stopped server"); + + while(!lost.get()) + { + Thread.sleep(1000); + } + + int preRestartCount = counter.get(); + + System.err.println("Restarting server"); + server.restart(); + + long startCheckTime = System.currentTimeMillis(); + while(true) + { + if(counter.get() > preRestartCount) + { + break; + } + else if((System.currentTimeMillis() - startCheckTime) > 30000) + { + Assert.fail("Semaphores not reacquired after restart"); + } + } + + } + finally + { + run.set(false); + executor.shutdownNow(); + CloseableUtils.closeQuietly(client); + } + } +}