use Timing
diff --git a/curator-recipes/src/test/java/org/apache/curator/framework/recipes/locks/TestInterProcessSemaphore.java b/curator-recipes/src/test/java/org/apache/curator/framework/recipes/locks/TestInterProcessSemaphore.java
index 6eeba94..b930cc0 100644
--- a/curator-recipes/src/test/java/org/apache/curator/framework/recipes/locks/TestInterProcessSemaphore.java
+++ b/curator-recipes/src/test/java/org/apache/curator/framework/recipes/locks/TestInterProcessSemaphore.java
@@ -16,6 +16,7 @@
* specific language governing permissions and limitations
* under the License.
*/
+
package org.apache.curator.framework.recipes.locks;
import com.google.common.collect.Lists;
@@ -47,21 +48,21 @@
@Test
public void testThreadedLeaseIncrease() throws Exception
{
- final Timing timing = new Timing();
- CuratorFramework client = CuratorFrameworkFactory.newClient(server.getConnectString(), timing.session(), timing.connection(), new RetryOneTime(1));
+ final Timing timing = new Timing();
+ CuratorFramework client = CuratorFrameworkFactory.newClient(server.getConnectString(), timing.session(), timing.connection(), new RetryOneTime(1));
try
{
client.start();
- final SharedCount count = new SharedCount(client, "/foo/count", 1);
+ final SharedCount count = new SharedCount(client, "/foo/count", 1);
count.start();
- final InterProcessSemaphoreV2 semaphore = new InterProcessSemaphoreV2(client, "/test", count);
+ final InterProcessSemaphoreV2 semaphore = new InterProcessSemaphoreV2(client, "/test", count);
- ExecutorService service = Executors.newCachedThreadPool();
+ ExecutorService service = Executors.newCachedThreadPool();
- final CountDownLatch latch = new CountDownLatch(1);
- Future<Object> future1 = service.submit
+ final CountDownLatch latch = new CountDownLatch(1);
+ Future<Object> future1 = service.submit
(
new Callable<Object>()
{
@@ -71,26 +72,26 @@
Lease lease = semaphore.acquire(timing.seconds(), TimeUnit.SECONDS);
Assert.assertNotNull(lease);
latch.countDown();
- lease = semaphore.acquire(timing.seconds(), TimeUnit.SECONDS);
+ lease = semaphore.acquire(timing.forWaiting().seconds(), TimeUnit.SECONDS);
Assert.assertNotNull(lease);
return null;
}
}
);
- Future<Object> future2 = service.submit
- (
- new Callable<Object>()
- {
- @Override
- public Object call() throws Exception
+ Future<Object> future2 = service.submit
+ (
+ new Callable<Object>()
{
- Assert.assertTrue(latch.await(timing.seconds(), TimeUnit.SECONDS));
- timing.sleepABit(); // make sure second acquire is waiting
- Assert.assertTrue(count.trySetCount(2));
- return null;
+ @Override
+ public Object call() throws Exception
+ {
+ Assert.assertTrue(latch.await(timing.forWaiting().seconds(), TimeUnit.SECONDS));
+ timing.sleepABit(); // make sure second acquire is waiting
+ Assert.assertTrue(count.trySetCount(2));
+ return null;
+ }
}
- }
- );
+ );
future1.get();
future2.get();
@@ -104,6 +105,7 @@
@Test
public void testClientClose() throws Exception
{
+ final Timing timing = new Timing();
CuratorFramework client1 = null;
CuratorFramework client2 = null;
InterProcessSemaphoreV2 semaphore1;
@@ -119,7 +121,7 @@
semaphore1 = new InterProcessSemaphoreV2(client1, "/test", 1);
semaphore2 = new InterProcessSemaphoreV2(client2, "/test", 1);
- Lease lease = semaphore2.acquire(10, TimeUnit.SECONDS);
+ Lease lease = semaphore2.acquire(timing.forWaiting().seconds(), TimeUnit.SECONDS);
Assert.assertNotNull(lease);
lease.close();
@@ -128,8 +130,8 @@
client1.close(); // should release any held leases
client1 = null;
-
- Assert.assertNotNull(semaphore2.acquire(10, TimeUnit.SECONDS));
+
+ Assert.assertNotNull(semaphore2.acquire(timing.forWaiting().seconds(), TimeUnit.SECONDS));
}
finally
{
@@ -139,92 +141,93 @@
}
@Test
- public void testMaxPerSession() throws Exception
+ public void testMaxPerSession() throws Exception
{
- final int CLIENT_QTY = 10;
- final int LOOP_QTY = 100;
- final Random random = new Random();
- final int SESSION_MAX = random.nextInt(75) + 25;
+ final int CLIENT_QTY = 10;
+ final int LOOP_QTY = 100;
+ final Random random = new Random();
+ final int SESSION_MAX = random.nextInt(75) + 25;
+ final Timing timing = new Timing();
- List<Future<Object>> futures = Lists.newArrayList();
- ExecutorService service = Executors.newCachedThreadPool();
- final Counter counter = new Counter();
- final AtomicInteger available = new AtomicInteger(SESSION_MAX);
+ List<Future<Object>> futures = Lists.newArrayList();
+ ExecutorService service = Executors.newCachedThreadPool();
+ final Counter counter = new Counter();
+ final AtomicInteger available = new AtomicInteger(SESSION_MAX);
for ( int i = 0; i < CLIENT_QTY; ++i )
{
futures.add
- (
- service.submit
(
- new Callable<Object>()
- {
- @Override
- public Object call() throws Exception
- {
- CuratorFramework client = CuratorFrameworkFactory.newClient(server.getConnectString(), new RetryOneTime(1));
- client.start();
- try
+ service.submit
+ (
+ new Callable<Object>()
{
- InterProcessSemaphoreV2 semaphore = new InterProcessSemaphoreV2(client, "/test", SESSION_MAX);
-
- for ( int i = 0; i < LOOP_QTY; ++i )
+ @Override
+ public Object call() throws Exception
{
- long start = System.currentTimeMillis();
- int thisQty;
- synchronized(available)
- {
- if ( (System.currentTimeMillis() - start) > 10000 )
- {
- throw new TimeoutException();
- }
- while ( available.get() == 0 )
- {
- available.wait(10000);
- }
-
- thisQty = (available.get() > 1) ? (random.nextInt(available.get()) + 1) : 1;
-
- available.addAndGet(-1 * thisQty);
- Assert.assertTrue(available.get() >= 0);
- }
- Collection<Lease> leases = semaphore.acquire(thisQty, 10, TimeUnit.SECONDS);
- Assert.assertNotNull(leases);
+ CuratorFramework client = CuratorFrameworkFactory.newClient(server.getConnectString(), timing.session(), timing.connection(), new RetryOneTime(1));
+ client.start();
try
{
- synchronized(counter)
+ InterProcessSemaphoreV2 semaphore = new InterProcessSemaphoreV2(client, "/test", SESSION_MAX);
+
+ for ( int i = 0; i < LOOP_QTY; ++i )
{
- counter.currentCount += thisQty;
- if ( counter.currentCount > counter.maxCount )
+ long start = System.currentTimeMillis();
+ int thisQty;
+ synchronized(available)
{
- counter.maxCount = counter.currentCount;
+ if ( (System.currentTimeMillis() - start) > 10000 )
+ {
+ throw new TimeoutException();
+ }
+ while ( available.get() == 0 )
+ {
+ available.wait(timing.forWaiting().milliseconds());
+ }
+
+ thisQty = (available.get() > 1) ? (random.nextInt(available.get()) + 1) : 1;
+
+ available.addAndGet(-1 * thisQty);
+ Assert.assertTrue(available.get() >= 0);
+ }
+ Collection<Lease> leases = semaphore.acquire(thisQty, timing.forWaiting().seconds(), TimeUnit.SECONDS);
+ Assert.assertNotNull(leases);
+ try
+ {
+ synchronized(counter)
+ {
+ counter.currentCount += thisQty;
+ if ( counter.currentCount > counter.maxCount )
+ {
+ counter.maxCount = counter.currentCount;
+ }
+ }
+ Thread.sleep(random.nextInt(25));
+ }
+ finally
+ {
+ synchronized(counter)
+ {
+ counter.currentCount -= thisQty;
+ }
+ semaphore.returnAll(leases);
+ synchronized(available)
+ {
+ available.addAndGet(thisQty);
+ available.notifyAll();
+ }
}
}
- Thread.sleep(random.nextInt(25));
}
finally
{
- synchronized(counter)
- {
- counter.currentCount -= thisQty;
- }
- semaphore.returnAll(leases);
- synchronized(available)
- {
- available.addAndGet(thisQty);
- available.notifyAll();
- }
+ client.close();
}
+ return null;
}
}
- finally
- {
- client.close();
- }
- return null;
- }
- }
- )
- );
+ )
+ );
}
for ( Future<Object> f : futures )
@@ -242,60 +245,61 @@
}
@Test
- public void testRelease1AtATime() throws Exception
+ public void testRelease1AtATime() throws Exception
{
- final int CLIENT_QTY = 10;
- final int MAX = CLIENT_QTY / 2;
- final AtomicInteger maxLeases = new AtomicInteger(0);
- final AtomicInteger activeQty = new AtomicInteger(0);
- final AtomicInteger uses = new AtomicInteger(0);
+ final Timing timing = new Timing();
+ final int CLIENT_QTY = 10;
+ final int MAX = CLIENT_QTY / 2;
+ final AtomicInteger maxLeases = new AtomicInteger(0);
+ final AtomicInteger activeQty = new AtomicInteger(0);
+ final AtomicInteger uses = new AtomicInteger(0);
- List<Future<Object>> futures = Lists.newArrayList();
- ExecutorService service = Executors.newFixedThreadPool(CLIENT_QTY);
+ List<Future<Object>> futures = Lists.newArrayList();
+ ExecutorService service = Executors.newFixedThreadPool(CLIENT_QTY);
for ( int i = 0; i < CLIENT_QTY; ++i )
{
- Future<Object> f = service.submit
- (
- new Callable<Object>()
- {
- @Override
- public Object call() throws Exception
+ Future<Object> f = service.submit
+ (
+ new Callable<Object>()
{
- CuratorFramework client = CuratorFrameworkFactory.newClient(server.getConnectString(), new RetryOneTime(1));
- client.start();
- try
+ @Override
+ public Object call() throws Exception
{
- InterProcessSemaphoreV2 semaphore = new InterProcessSemaphoreV2(client, "/test", MAX);
- Lease lease = semaphore.acquire(10, TimeUnit.SECONDS);
- Assert.assertNotNull(lease);
- uses.incrementAndGet();
+ CuratorFramework client = CuratorFrameworkFactory.newClient(server.getConnectString(), new RetryOneTime(1));
+ client.start();
try
{
- synchronized(maxLeases)
+ InterProcessSemaphoreV2 semaphore = new InterProcessSemaphoreV2(client, "/test", MAX);
+ Lease lease = semaphore.acquire(timing.forWaiting().seconds(), TimeUnit.SECONDS);
+ Assert.assertNotNull(lease);
+ uses.incrementAndGet();
+ try
{
- int qty = activeQty.incrementAndGet();
- if ( qty > maxLeases.get() )
+ synchronized(maxLeases)
{
- maxLeases.set(qty);
+ int qty = activeQty.incrementAndGet();
+ if ( qty > maxLeases.get() )
+ {
+ maxLeases.set(qty);
+ }
}
- }
- Thread.sleep(500);
+ timing.sleepABit();
+ }
+ finally
+ {
+ activeQty.decrementAndGet();
+ lease.close();
+ }
}
finally
{
- activeQty.decrementAndGet();
- lease.close();
+ client.close();
}
+ return null;
}
- finally
- {
- client.close();
- }
- return null;
}
- }
- );
+ );
futures.add(f);
}
@@ -307,66 +311,67 @@
Assert.assertEquals(uses.get(), CLIENT_QTY);
Assert.assertEquals(maxLeases.get(), MAX);
}
-
- @Test
- public void testReleaseInChunks() throws Exception
- {
- final int MAX_LEASES = 11;
- final int THREADS = 100;
- final CuratorFramework client = CuratorFrameworkFactory.newClient(server.getConnectString(), new RetryOneTime(1));
+ @Test
+ public void testReleaseInChunks() throws Exception
+ {
+ final Timing timing = new Timing();
+ final int MAX_LEASES = 11;
+ final int THREADS = 100;
+
+ final CuratorFramework client = CuratorFrameworkFactory.newClient(server.getConnectString(), new RetryOneTime(1));
client.start();
try
{
- final Stepper latch = new Stepper();
- final Random random = new Random();
- final Counter counter = new Counter();
- ExecutorService service = Executors.newCachedThreadPool();
- ExecutorCompletionService<Object> completionService = new ExecutorCompletionService<Object>(service);
+ final Stepper latch = new Stepper();
+ final Random random = new Random();
+ final Counter counter = new Counter();
+ ExecutorService service = Executors.newCachedThreadPool();
+ ExecutorCompletionService<Object> completionService = new ExecutorCompletionService<Object>(service);
for ( int i = 0; i < THREADS; ++i )
{
completionService.submit
- (
- new Callable<Object>()
- {
- @Override
- public Object call() throws Exception
+ (
+ new Callable<Object>()
{
- InterProcessSemaphoreV2 semaphore = new InterProcessSemaphoreV2(client, "/test", MAX_LEASES);
- Lease lease = semaphore.acquire(10, TimeUnit.SECONDS);
- if ( lease == null )
+ @Override
+ public Object call() throws Exception
{
- throw new Exception("timed out");
- }
- try
- {
- synchronized(counter)
+ InterProcessSemaphoreV2 semaphore = new InterProcessSemaphoreV2(client, "/test", MAX_LEASES);
+ Lease lease = semaphore.acquire(timing.forWaiting().seconds(), TimeUnit.SECONDS);
+ if ( lease == null )
{
- ++counter.currentCount;
- if ( counter.currentCount > counter.maxCount )
+ throw new Exception("timed out");
+ }
+ try
+ {
+ synchronized(counter)
{
- counter.maxCount = counter.currentCount;
+ ++counter.currentCount;
+ if ( counter.currentCount > counter.maxCount )
+ {
+ counter.maxCount = counter.currentCount;
+ }
+ counter.notifyAll();
}
- counter.notifyAll();
- }
- latch.await();
- }
- finally
- {
- synchronized(counter)
- {
- --counter.currentCount;
+ latch.await();
}
- semaphore.returnLease(lease);
+ finally
+ {
+ synchronized(counter)
+ {
+ --counter.currentCount;
+ }
+ semaphore.returnLease(lease);
+ }
+ return null;
}
- return null;
}
- }
- );
+ );
}
- int remaining = THREADS;
+ int remaining = THREADS;
while ( remaining > 0 )
{
int times = Math.min(random.nextInt(5) + 1, remaining);
@@ -374,7 +379,7 @@
remaining -= times;
Thread.sleep(random.nextInt(100) + 1);
}
-
+
for ( int i = 0; i < THREADS; ++i )
{
completionService.take();
@@ -393,40 +398,40 @@
client.close();
}
}
-
+
@Test
- public void testThreads() throws Exception
+ public void testThreads() throws Exception
{
- final int THREAD_QTY = 10;
+ final int THREAD_QTY = 10;
CuratorFramework client = CuratorFrameworkFactory.newClient(server.getConnectString(), new RetryOneTime(1));
client.start();
try
{
- final InterProcessSemaphoreV2 semaphore = new InterProcessSemaphoreV2(client, "/test", 1);
- ExecutorService service = Executors.newFixedThreadPool(THREAD_QTY);
+ final InterProcessSemaphoreV2 semaphore = new InterProcessSemaphoreV2(client, "/test", 1);
+ ExecutorService service = Executors.newFixedThreadPool(THREAD_QTY);
for ( int i = 0; i < THREAD_QTY; ++i )
{
service.submit
- (
- new Callable<Object>()
- {
- @Override
- public Object call() throws Exception
+ (
+ new Callable<Object>()
{
- Lease lease = semaphore.acquire();
- try
+ @Override
+ public Object call() throws Exception
{
- Thread.sleep(1);
+ Lease lease = semaphore.acquire();
+ try
+ {
+ Thread.sleep(1);
+ }
+ finally
+ {
+ lease.close();
+ }
+ return null;
}
- finally
- {
- lease.close();
- }
- return null;
}
- }
- );
+ );
}
service.shutdown();
Assert.assertTrue(service.awaitTermination(10, TimeUnit.SECONDS));
@@ -437,17 +442,17 @@
}
}
-
@Test
- public void testSimple() throws Exception
+ public void testSimple() throws Exception
{
+ Timing timing = new Timing();
CuratorFramework client = CuratorFrameworkFactory.newClient(server.getConnectString(), new RetryOneTime(1));
client.start();
try
{
- InterProcessSemaphoreV2 semaphore = new InterProcessSemaphoreV2(client, "/test", 1);
- Assert.assertNotNull(semaphore.acquire(10, TimeUnit.SECONDS));
- Assert.assertNull(semaphore.acquire(3, TimeUnit.SECONDS));
+ InterProcessSemaphoreV2 semaphore = new InterProcessSemaphoreV2(client, "/test", 1);
+ Assert.assertNotNull(semaphore.acquire(timing.forWaiting().seconds(), TimeUnit.SECONDS));
+ Assert.assertNull(semaphore.acquire(timing.forWaiting().seconds(), TimeUnit.SECONDS));
}
finally
{
@@ -456,29 +461,30 @@
}
@Test
- public void testSimple2() throws Exception
+ public void testSimple2() throws Exception
{
- final int MAX_LEASES = 3;
+ final int MAX_LEASES = 3;
+ Timing timing = new Timing();
CuratorFramework client = CuratorFrameworkFactory.newClient(server.getConnectString(), new RetryOneTime(1));
client.start();
try
{
- List<Lease> leases = Lists.newArrayList();
+ List<Lease> leases = Lists.newArrayList();
for ( int i = 0; i < MAX_LEASES; ++i )
{
- InterProcessSemaphoreV2 semaphore = new InterProcessSemaphoreV2(client, "/test", MAX_LEASES);
- Lease lease = semaphore.acquire(10, TimeUnit.SECONDS);
+ InterProcessSemaphoreV2 semaphore = new InterProcessSemaphoreV2(client, "/test", MAX_LEASES);
+ Lease lease = semaphore.acquire(timing.forWaiting().seconds(), TimeUnit.SECONDS);
Assert.assertNotNull(lease);
leases.add(lease);
}
- InterProcessSemaphoreV2 semaphore = new InterProcessSemaphoreV2(client, "/test", MAX_LEASES);
- Lease lease = semaphore.acquire(3, TimeUnit.SECONDS);
+ InterProcessSemaphoreV2 semaphore = new InterProcessSemaphoreV2(client, "/test", MAX_LEASES);
+ Lease lease = semaphore.acquire(timing.forWaiting().seconds(), TimeUnit.SECONDS);
Assert.assertNull(lease);
leases.remove(0).close();
- Assert.assertNotNull(semaphore.acquire(10, TimeUnit.SECONDS));
+ Assert.assertNotNull(semaphore.acquire(timing.forWaiting().seconds(), TimeUnit.SECONDS));
}
finally
{