blob: 216c2a2ffab6a9342f554dddee14b7949d4cb339 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.curator.framework.recipes.locks;
import com.google.common.collect.Lists;
import com.google.common.collect.Queues;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.framework.api.CuratorWatcher;
import org.apache.curator.framework.recipes.shared.SharedCount;
import org.apache.curator.framework.state.ConnectionState;
import org.apache.curator.framework.state.ConnectionStateListener;
import org.apache.curator.retry.RetryNTimes;
import org.apache.curator.retry.RetryOneTime;
import org.apache.curator.test.BaseClassForTests;
import org.apache.curator.test.Timing;
import org.apache.curator.utils.CloseableUtils;
import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.Watcher;
import org.testng.Assert;
import org.testng.annotations.Test;
import java.util.Collection;
import java.util.List;
import java.util.Random;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.Callable;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorCompletionService;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicInteger;
@SuppressWarnings({"SynchronizationOnLocalVariableOrMethodParameter"})
public class TestInterProcessSemaphore extends BaseClassForTests
{
@Test
public void testAcquireAfterLostServer() throws Exception
{
// CURATOR-335
final String SEMAPHORE_PATH = "/test";
final int MAX_SEMAPHORES = 1;
final int NUM_CLIENTS = 10;
ExecutorService executor = Executors.newFixedThreadPool(NUM_CLIENTS);
final Timing timing = new Timing();
final CuratorFramework client = CuratorFrameworkFactory.newClient(server.getConnectString(), timing.forWaiting().milliseconds(), timing.connection(), new RetryOneTime(1)); // long session time on purpose
try
{
client.start();
InterProcessSemaphoreV2.debugAcquireLatch = new CountDownLatch(1); // cause one of the semaphores to create its node and then wait
InterProcessSemaphoreV2.debugFailedGetChildrenLatch = new CountDownLatch(1); // semaphore will notify when getChildren() fails
final CountDownLatch isReadyLatch = new CountDownLatch(NUM_CLIENTS);
final BlockingQueue<Boolean> acquiredQueue = Queues.newLinkedBlockingQueue();
Runnable runner = new Runnable()
{
@Override
public void run()
{
while ( !Thread.currentThread().isInterrupted() )
{
InterProcessSemaphoreV2 semaphore = new InterProcessSemaphoreV2(client, SEMAPHORE_PATH, MAX_SEMAPHORES);
Lease lease = null;
try
{
isReadyLatch.countDown();
lease = semaphore.acquire();
acquiredQueue.add(true);
timing.sleepABit();
}
catch ( InterruptedException e )
{
Thread.currentThread().interrupt();
break;
}
catch ( KeeperException e )
{
try
{
timing.sleepABit();
}
catch ( InterruptedException e2 )
{
Thread.currentThread().interrupt();
break;
}
}
catch ( Exception ignore )
{
// ignore
}
finally
{
if ( lease != null )
{
semaphore.returnLease(lease);
}
}
}
}
};
for ( int i = 0; i < NUM_CLIENTS; ++i )
{
executor.execute(runner);
}
Assert.assertTrue(timing.awaitLatch(isReadyLatch));
timing.sleepABit();
final CountDownLatch lostLatch = new CountDownLatch(1);
final CountDownLatch restartedLatch = new CountDownLatch(1);
client.getConnectionStateListenable().addListener(new ConnectionStateListener()
{
@Override
public void stateChanged(CuratorFramework client, ConnectionState newState)
{
if ( newState == ConnectionState.LOST )
{
lostLatch.countDown();
}
else if ( newState == ConnectionState.RECONNECTED )
{
restartedLatch.countDown();
}
}
});
timing.sleepABit();
server.stop();
Assert.assertTrue(timing.awaitLatch(lostLatch));
InterProcessSemaphoreV2.debugAcquireLatch.countDown(); // the waiting semaphore proceeds to getChildren - which should fail
Assert.assertTrue(timing.awaitLatch(InterProcessSemaphoreV2.debugFailedGetChildrenLatch)); // wait until getChildren fails
server.restart();
Assert.assertTrue(timing.awaitLatch(restartedLatch));
for ( int i = 0; i < NUM_CLIENTS; ++i )
{
// acquires should continue as normal after server restart
Boolean polled = acquiredQueue.poll(timing.forWaiting().milliseconds(), TimeUnit.MILLISECONDS);
if ( (polled == null) || !polled )
{
Assert.fail("Semaphores not reacquired after restart");
}
}
}
finally
{
executor.shutdownNow();
CloseableUtils.closeQuietly(client);
}
}
@Test
public void testThreadedLeaseIncrease() throws Exception
{
final Timing timing = new Timing();
CuratorFramework client = CuratorFrameworkFactory.newClient(server.getConnectString(), timing.session(), timing.connection(), new RetryOneTime(1));
try
{
client.start();
final SharedCount count = new SharedCount(client, "/foo/count", 1);
count.start();
final InterProcessSemaphoreV2 semaphore = new InterProcessSemaphoreV2(client, "/test", count);
ExecutorService service = Executors.newCachedThreadPool();
final CountDownLatch latch1 = new CountDownLatch(1);
final CountDownLatch latch2 = new CountDownLatch(1);
Future<Object> future1 = service.submit
(
new Callable<Object>()
{
@Override
public Object call() throws Exception
{
Lease lease = semaphore.acquire(timing.seconds(), TimeUnit.SECONDS);
Assert.assertNotNull(lease);
latch1.countDown();
lease = semaphore.acquire(timing.forWaiting().seconds(), TimeUnit.SECONDS);
Assert.assertNotNull(lease);
latch2.countDown();
return null;
}
}
);
Future<Object> future2 = service.submit
(
new Callable<Object>()
{
@Override
public Object call() throws Exception
{
Assert.assertTrue(latch1.await(timing.forWaiting().seconds(), TimeUnit.SECONDS));
timing.sleepABit(); // make sure second acquire is waiting
Assert.assertTrue(count.trySetCount(2));
//Make sure second acquire takes less than full waiting time:
timing.sleepABit();
Assert.assertTrue(latch2.await(0, TimeUnit.SECONDS));
return null;
}
}
);
future1.get();
future2.get();
}
finally
{
CloseableUtils.closeQuietly(client);
}
}
@Test
public void testClientClose() throws Exception
{
final Timing timing = new Timing();
CuratorFramework client1 = null;
CuratorFramework client2 = null;
InterProcessSemaphoreV2 semaphore1;
InterProcessSemaphoreV2 semaphore2;
try
{
client1 = CuratorFrameworkFactory.newClient(server.getConnectString(), timing.session(), timing.connection(), new RetryOneTime(1));
client2 = CuratorFrameworkFactory.newClient(server.getConnectString(), timing.session(), timing.connection(), new RetryOneTime(1));
client1.start();
client2.start();
semaphore1 = new InterProcessSemaphoreV2(client1, "/test", 1);
semaphore2 = new InterProcessSemaphoreV2(client2, "/test", 1);
Lease lease = semaphore2.acquire(timing.forWaiting().seconds(), TimeUnit.SECONDS);
Assert.assertNotNull(lease);
lease.close();
lease = semaphore1.acquire(10, TimeUnit.SECONDS);
Assert.assertNotNull(lease);
client1.close(); // should release any held leases
client1 = null;
Assert.assertNotNull(semaphore2.acquire(timing.forWaiting().seconds(), TimeUnit.SECONDS));
}
finally
{
CloseableUtils.closeQuietly(client1);
CloseableUtils.closeQuietly(client2);
}
}
@Test
public void testMaxPerSession() throws Exception
{
final int CLIENT_QTY = 10;
final int LOOP_QTY = 100;
final Random random = new Random();
final int SESSION_MAX = random.nextInt(75) + 25;
final Timing timing = new Timing();
List<Future<Object>> futures = Lists.newArrayList();
ExecutorService service = Executors.newCachedThreadPool();
final Counter counter = new Counter();
final AtomicInteger available = new AtomicInteger(SESSION_MAX);
for ( int i = 0; i < CLIENT_QTY; ++i )
{
futures.add
(
service.submit
(
new Callable<Object>()
{
@Override
public Object call() throws Exception
{
CuratorFramework client = CuratorFrameworkFactory.newClient(server.getConnectString(), timing.session(), timing.connection(), new RetryOneTime(1));
client.start();
try
{
InterProcessSemaphoreV2 semaphore = new InterProcessSemaphoreV2(client, "/test", SESSION_MAX);
for ( int i = 0; i < LOOP_QTY; ++i )
{
long start = System.currentTimeMillis();
int thisQty;
synchronized(available)
{
if ( (System.currentTimeMillis() - start) > 10000 )
{
throw new TimeoutException();
}
while ( available.get() == 0 )
{
available.wait(timing.forWaiting().milliseconds());
}
thisQty = (available.get() > 1) ? (random.nextInt(available.get()) + 1) : 1;
available.addAndGet(-1 * thisQty);
Assert.assertTrue(available.get() >= 0);
}
Collection<Lease> leases = semaphore.acquire(thisQty, timing.forWaiting().seconds(), TimeUnit.SECONDS);
Assert.assertNotNull(leases);
try
{
synchronized(counter)
{
counter.currentCount += thisQty;
if ( counter.currentCount > counter.maxCount )
{
counter.maxCount = counter.currentCount;
}
}
Thread.sleep(random.nextInt(25));
}
finally
{
synchronized(counter)
{
counter.currentCount -= thisQty;
}
semaphore.returnAll(leases);
synchronized(available)
{
available.addAndGet(thisQty);
available.notifyAll();
}
}
}
}
finally
{
client.close();
}
return null;
}
}
)
);
}
for ( Future<Object> f : futures )
{
f.get();
}
synchronized(counter)
{
Assert.assertTrue(counter.currentCount == 0);
Assert.assertTrue(counter.maxCount > 0);
Assert.assertTrue(counter.maxCount <= SESSION_MAX);
System.out.println(counter.maxCount);
}
}
@Test
public void testRelease1AtATime() throws Exception
{
final Timing timing = new Timing();
final int CLIENT_QTY = 10;
final int MAX = CLIENT_QTY / 2;
final AtomicInteger maxLeases = new AtomicInteger(0);
final AtomicInteger activeQty = new AtomicInteger(0);
final AtomicInteger uses = new AtomicInteger(0);
List<Future<Object>> futures = Lists.newArrayList();
ExecutorService service = Executors.newFixedThreadPool(CLIENT_QTY);
for ( int i = 0; i < CLIENT_QTY; ++i )
{
Future<Object> f = service.submit
(
new Callable<Object>()
{
@Override
public Object call() throws Exception
{
CuratorFramework client = CuratorFrameworkFactory.newClient(server.getConnectString(), timing.session(), timing.connection(), new RetryOneTime(1));
client.start();
try
{
InterProcessSemaphoreV2 semaphore = new InterProcessSemaphoreV2(client, "/test", MAX);
Lease lease = semaphore.acquire(timing.forWaiting().seconds(), TimeUnit.SECONDS);
Assert.assertNotNull(lease);
uses.incrementAndGet();
try
{
synchronized(maxLeases)
{
int qty = activeQty.incrementAndGet();
if ( qty > maxLeases.get() )
{
maxLeases.set(qty);
}
}
timing.sleepABit();
}
finally
{
activeQty.decrementAndGet();
lease.close();
}
}
finally
{
client.close();
}
return null;
}
}
);
futures.add(f);
}
for ( Future<Object> f : futures )
{
f.get();
}
Assert.assertEquals(uses.get(), CLIENT_QTY);
Assert.assertEquals(maxLeases.get(), MAX);
}
@Test
public void testReleaseInChunks() throws Exception
{
final Timing timing = new Timing();
final int MAX_LEASES = 11;
final int THREADS = 100;
final CuratorFramework client = CuratorFrameworkFactory.newClient(server.getConnectString(), timing.session(), timing.connection(), new RetryOneTime(1));
client.start();
try
{
final Stepper latch = new Stepper();
final Random random = new Random();
final Counter counter = new Counter();
ExecutorService service = Executors.newCachedThreadPool();
ExecutorCompletionService<Object> completionService = new ExecutorCompletionService<Object>(service);
for ( int i = 0; i < THREADS; ++i )
{
completionService.submit
(
new Callable<Object>()
{
@Override
public Object call() throws Exception
{
InterProcessSemaphoreV2 semaphore = new InterProcessSemaphoreV2(client, "/test", MAX_LEASES);
Lease lease = semaphore.acquire(timing.forWaiting().seconds(), TimeUnit.SECONDS);
if ( lease == null )
{
throw new Exception("timed out");
}
try
{
synchronized(counter)
{
++counter.currentCount;
if ( counter.currentCount > counter.maxCount )
{
counter.maxCount = counter.currentCount;
}
counter.notifyAll();
}
latch.await();
}
finally
{
synchronized(counter)
{
--counter.currentCount;
}
semaphore.returnLease(lease);
}
return null;
}
}
);
}
int remaining = THREADS;
while ( remaining > 0 )
{
int times = Math.min(random.nextInt(5) + 1, remaining);
latch.countDown(times);
remaining -= times;
Thread.sleep(random.nextInt(100) + 1);
}
for ( int i = 0; i < THREADS; ++i )
{
completionService.take();
}
timing.sleepABit();
synchronized(counter)
{
Assert.assertTrue(counter.currentCount == 0);
Assert.assertTrue(counter.maxCount > 0);
Assert.assertTrue(counter.maxCount <= MAX_LEASES);
System.out.println(counter.maxCount);
}
}
finally
{
client.close();
}
}
@Test
public void testThreads() throws Exception
{
final int THREAD_QTY = 10;
Timing timing = new Timing();
CuratorFramework client = CuratorFrameworkFactory.newClient(server.getConnectString(), timing.session(), timing.connection(), new RetryOneTime(1));
client.start();
try
{
final InterProcessSemaphoreV2 semaphore = new InterProcessSemaphoreV2(client, "/test", 1);
ExecutorService service = Executors.newFixedThreadPool(THREAD_QTY);
for ( int i = 0; i < THREAD_QTY; ++i )
{
service.submit
(
new Callable<Object>()
{
@Override
public Object call() throws Exception
{
Lease lease = semaphore.acquire();
try
{
Thread.sleep(1);
}
finally
{
lease.close();
}
return null;
}
}
);
}
service.shutdown();
Assert.assertTrue(service.awaitTermination(10, TimeUnit.SECONDS));
}
finally
{
client.close();
}
}
@Test
public void testSimple() throws Exception
{
Timing timing = new Timing();
CuratorFramework client = CuratorFrameworkFactory.newClient(server.getConnectString(), timing.session(), timing.connection(), new RetryOneTime(1));
client.start();
try
{
InterProcessSemaphoreV2 semaphore = new InterProcessSemaphoreV2(client, "/test", 1);
Assert.assertNotNull(semaphore.acquire(timing.forWaiting().seconds(), TimeUnit.SECONDS));
Assert.assertNull(semaphore.acquire(timing.forWaiting().seconds(), TimeUnit.SECONDS));
}
finally
{
client.close();
}
}
@Test
public void testSimple2() throws Exception
{
final int MAX_LEASES = 3;
Timing timing = new Timing();
List<Lease> leases = Lists.newArrayList();
CuratorFramework client = CuratorFrameworkFactory.newClient(server.getConnectString(), timing.session(), timing.connection(), new RetryOneTime(1));
client.start();
try
{
for ( int i = 0; i < MAX_LEASES; ++i )
{
InterProcessSemaphoreV2 semaphore = new InterProcessSemaphoreV2(client, "/test", MAX_LEASES);
Lease lease = semaphore.acquire(timing.forWaiting().seconds(), TimeUnit.SECONDS);
Assert.assertNotNull(lease);
leases.add(lease);
}
InterProcessSemaphoreV2 semaphore = new InterProcessSemaphoreV2(client, "/test", MAX_LEASES);
Lease lease = semaphore.acquire(timing.forWaiting().seconds(), TimeUnit.SECONDS);
Assert.assertNull(lease);
leases.remove(0).close();
Assert.assertNotNull(semaphore.acquire(timing.forWaiting().seconds(), TimeUnit.SECONDS));
}
finally
{
for ( Lease l : leases )
{
CloseableUtils.closeQuietly(l);
}
CloseableUtils.closeQuietly(client);
}
}
@Test
public void testGetParticipantNodes() throws Exception
{
final int LEASES = 3;
Timing timing = new Timing();
CuratorFramework client = CuratorFrameworkFactory.newClient(server.getConnectString(), timing.session(), timing.connection(), new RetryOneTime(1));
List<Lease> leases = Lists.newArrayList();
client.start();
try
{
InterProcessSemaphoreV2 semaphore = new InterProcessSemaphoreV2(client, "/test", LEASES);
for ( int i = 0; i < LEASES; ++i )
{
leases.add(semaphore.acquire());
}
Assert.assertEquals(semaphore.getParticipantNodes().size(), LEASES);
}
finally
{
for ( Lease l : leases )
{
CloseableUtils.closeQuietly(l);
}
CloseableUtils.closeQuietly(client);
}
}
@Test
public void testChildReaperCleansUpLockNodes() throws Exception
{
Timing timing = new Timing();
CuratorFramework client = CuratorFrameworkFactory.newClient(server.getConnectString(), timing.session(), timing.connection(), new RetryOneTime(1));
client.start();
ChildReaper childReaper = null;
try
{
InterProcessSemaphoreV2 semaphore = new InterProcessSemaphoreV2(client, "/test/lock", 1);
semaphore.returnLease(semaphore.acquire(timing.forWaiting().seconds(), TimeUnit.SECONDS));
Assert.assertTrue(client.getChildren().forPath("/test").size() > 0);
childReaper = new ChildReaper(
client,
"/test",
Reaper.Mode.REAP_UNTIL_GONE,
ChildReaper.newExecutorService(),
1,
"/test-leader",
InterProcessSemaphoreV2.LOCK_SCHEMA
);
childReaper.start();
timing.forWaiting().sleepABit();
List<String> children = client.getChildren().forPath("/test");
Assert.assertEquals(children.size(), 0, "All children of /test should have been reaped");
}
finally
{
CloseableUtils.closeQuietly(childReaper);
CloseableUtils.closeQuietly(client);
}
}
@Test
public void testNoOrphanedNodes() throws Exception
{
final Timing timing = new Timing();
final ExecutorService executor = Executors.newFixedThreadPool(1);
CuratorFramework client = CuratorFrameworkFactory.newClient(server.getConnectString(), timing.session(), timing.connection(), new RetryOneTime(1));
client.start();
try
{
final InterProcessSemaphoreV2 semaphore = new InterProcessSemaphoreV2(client, "/test", 1);
Lease lease = semaphore.acquire(timing.forWaiting().seconds(), TimeUnit.SECONDS);
Assert.assertNotNull(lease);
final List<String> childNodes = client.getChildren().forPath("/test/leases");
Assert.assertEquals(childNodes.size(), 1);
final CountDownLatch nodeCreatedLatch = new CountDownLatch(1);
client.getChildren().usingWatcher(new CuratorWatcher()
{
@Override
public void process(WatchedEvent event) throws Exception
{
if ( event.getType() == Watcher.Event.EventType.NodeCreated )
{
nodeCreatedLatch.countDown();
}
}
}).forPath("/test/leases");
final Future<Lease> leaseFuture = executor.submit(new Callable<Lease>()
{
@Override
public Lease call() throws Exception
{
return semaphore.acquire(timing.forWaiting().multiple(2).seconds(), TimeUnit.SECONDS);
}
});
// wait for second lease to create its node
timing.awaitLatch(nodeCreatedLatch);
String newNode = null;
for ( String c : client.getChildren().forPath("/test/leases") )
{
if ( !childNodes.contains(c) )
{
newNode = c;
}
}
Assert.assertNotNull(newNode);
// delete the ephemeral node to trigger a retry
client.delete().forPath("/test/leases/" + newNode);
// release first lease so second one can be acquired
lease.close();
lease = leaseFuture.get();
Assert.assertNotNull(lease);
lease.close();
Assert.assertEquals(client.getChildren().forPath("/test/leases").size(), 0);
// no more lease exist. must be possible to acquire a new one
Assert.assertNotNull(semaphore.acquire(timing.forWaiting().seconds(), TimeUnit.SECONDS));
}
finally
{
client.close();
executor.shutdownNow();
}
}
}