blob: d64e7cfee5a68697da91b22db994682480935885 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.curator.framework.recipes.leader;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertNull;
import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.junit.jupiter.api.Assertions.fail;
import com.google.common.base.Throwables;
import com.google.common.collect.Lists;
import com.google.common.collect.Queues;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.framework.imps.TestCleanState;
import org.apache.curator.framework.state.ConnectionState;
import org.apache.curator.framework.state.ConnectionStateListener;
import org.apache.curator.framework.state.ConnectionStateListenerManagerFactory;
import org.apache.curator.framework.state.SessionConnectionStateErrorPolicy;
import org.apache.curator.framework.state.StandardConnectionStateErrorPolicy;
import org.apache.curator.retry.RetryForever;
import org.apache.curator.retry.RetryNTimes;
import org.apache.curator.retry.RetryOneTime;
import org.apache.curator.test.BaseClassForTests;
import org.apache.curator.test.TestingServer;
import org.apache.curator.test.Timing;
import org.apache.curator.test.compatibility.CuratorTestBase;
import org.apache.curator.test.compatibility.Timing2;
import org.apache.curator.utils.CloseableUtils;
import org.junit.jupiter.api.Tag;
import org.junit.jupiter.api.Test;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.Callable;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorCompletionService;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
@Tag(CuratorTestBase.zk35TestCompatibilityGroup)
public class TestLeaderLatch extends BaseClassForTests
{
private static final String PATH_NAME = "/one/two/me";
private static final int MAX_LOOPS = 5;
private static class Holder
{
final BlockingQueue<ConnectionState> stateChanges = new LinkedBlockingQueue<>();
final CountDownLatch isLockedLatch = new CountDownLatch(1);
volatile LeaderLatch latch;
}
@Test
public void testWithCircuitBreaker() throws Exception
{
final int threadQty = 5;
ExecutorService executorService = Executors.newFixedThreadPool(threadQty);
List<Holder> holders = Collections.emptyList();
Timing2 timing = new Timing2();
ConnectionStateListenerManagerFactory managerFactory = ConnectionStateListenerManagerFactory.circuitBreaking(new RetryForever(timing.multiple(2).milliseconds()));
CuratorFramework client = CuratorFrameworkFactory.builder()
.connectString(server.getConnectString())
.retryPolicy(new RetryOneTime(1))
.connectionStateListenerManagerFactory(managerFactory)
.connectionTimeoutMs(timing.connection())
.sessionTimeoutMs(timing.session())
.build();
try {
client.start();
client.create().forPath("/hey");
Semaphore lostSemaphore = new Semaphore(0);
ConnectionStateListener unProxiedListener = new ConnectionStateListener()
{
@Override
public void stateChanged(CuratorFramework client, ConnectionState newState)
{
if ( newState == ConnectionState.LOST )
{
lostSemaphore.release();
}
}
@Override
public boolean doNotProxy()
{
return true;
}
};
client.getConnectionStateListenable().addListener(unProxiedListener);
holders = IntStream.range(0, threadQty)
.mapToObj(index -> {
Holder holder = new Holder();
holder.latch = new LeaderLatch(client, "/foo/bar/" + index)
{
@Override
protected void handleStateChange(ConnectionState newState)
{
holder.stateChanges.offer(newState);
super.handleStateChange(newState);
}
};
return holder;
})
.collect(Collectors.toList());
holders.forEach(holder -> {
executorService.submit(() -> {
holder.latch.start();
assertTrue(holder.latch.await(timing.forWaiting().milliseconds(), TimeUnit.MILLISECONDS));
holder.isLockedLatch.countDown();
return null;
});
timing.awaitLatch(holder.isLockedLatch);
});
for ( int i = 0; i < 4; ++i ) // note: 4 is just a random number of loops to simulate disconnections
{
server.stop();
assertTrue(timing.acquireSemaphore(lostSemaphore));
server.restart();
timing.sleepABit();
}
for ( Holder holder : holders )
{
assertTrue(holder.latch.await(timing.forWaiting().milliseconds(), TimeUnit.MILLISECONDS));
assertEquals(timing.takeFromQueue(holder.stateChanges), ConnectionState.SUSPENDED);
assertEquals(timing.takeFromQueue(holder.stateChanges), ConnectionState.LOST);
assertEquals(timing.takeFromQueue(holder.stateChanges), ConnectionState.RECONNECTED);
}
}
finally
{
holders.forEach(holder -> CloseableUtils.closeQuietly(holder.latch));
CloseableUtils.closeQuietly(client);
executorService.shutdownNow();
}
}
@Test
public void testUncreatedPathGetLeader() throws Exception
{
try ( CuratorFramework client = CuratorFrameworkFactory.newClient(server.getConnectString(), new RetryOneTime(1)) )
{
client.start();
LeaderLatch latch = new LeaderLatch(client, "/foo/bar");
latch.getLeader(); // CURATOR-436 - was throwing NoNodeException
}
}
@Test
public void testWatchedNodeDeletedOnReconnect() throws Exception
{
final String latchPath = "/foo/bar";
Timing2 timing = new Timing2();
try ( CuratorFramework client = CuratorFrameworkFactory.newClient(server.getConnectString(), timing.session(), timing.connection(), new RetryOneTime(1)) )
{
client.start();
LeaderLatch latch1 = new LeaderLatch(client, latchPath, "1");
try ( LeaderLatch latch2 = new LeaderLatch(client, latchPath, "2") )
{
latch1.start();
assertTrue(latch1.await(timing.milliseconds(), TimeUnit.MILLISECONDS));
latch2.start(); // will get a watcher on latch1's node
timing.sleepABit();
latch2.debugCheckLeaderShipLatch = new CountDownLatch(1);
latch1.close(); // simulate the leader's path getting deleted
latch1 = null;
timing.sleepABit(); // after this, latch2 should be blocked just before getting the path in checkLeadership()
latch2.reset(); // force the internal "ourPath" to get reset
latch2.debugCheckLeaderShipLatch.countDown(); // allow checkLeadership() to continue
assertTrue(latch2.await(timing.forSessionSleep().forWaiting().milliseconds(), TimeUnit.MILLISECONDS));
timing.sleepABit();
assertEquals(client.getChildren().forPath(latchPath).size(), 1);
}
finally
{
CloseableUtils.closeQuietly(latch1);
}
}
}
@Test
public void testSessionErrorPolicy() throws Exception
{
Timing timing = new Timing();
LeaderLatch latch = null;
CuratorFramework client = null;
for ( int i = 0; i < 2; ++i )
{
boolean isSessionIteration = (i == 0);
try
{
client = CuratorFrameworkFactory.builder()
.connectString(server.getConnectString())
.connectionTimeoutMs(10000)
.sessionTimeoutMs(60000)
.retryPolicy(new RetryOneTime(1))
.connectionStateErrorPolicy(isSessionIteration ? new SessionConnectionStateErrorPolicy() : new StandardConnectionStateErrorPolicy())
.build();
final BlockingQueue<String> states = Queues.newLinkedBlockingQueue();
ConnectionStateListener stateListener = new ConnectionStateListener()
{
@Override
public void stateChanged(CuratorFramework client, ConnectionState newState)
{
states.add(newState.name());
}
};
client.getConnectionStateListenable().addListener(stateListener);
client.start();
latch = new LeaderLatch(client, "/test");
LeaderLatchListener listener = new LeaderLatchListener()
{
@Override
public void isLeader()
{
states.add("true");
}
@Override
public void notLeader()
{
states.add("false");
}
};
latch.addListener(listener);
latch.start();
assertEquals(states.poll(timing.forWaiting().milliseconds(), TimeUnit.MILLISECONDS), ConnectionState.CONNECTED.name());
assertEquals(states.poll(timing.forWaiting().milliseconds(), TimeUnit.MILLISECONDS), "true");
server.stop();
if ( isSessionIteration )
{
assertEquals(states.poll(timing.forWaiting().milliseconds(), TimeUnit.MILLISECONDS), ConnectionState.SUSPENDED.name());
server.restart();
assertEquals(states.poll(timing.forWaiting().milliseconds(), TimeUnit.MILLISECONDS), ConnectionState.RECONNECTED.name());
assertNull(states.poll(timing.milliseconds(), TimeUnit.MILLISECONDS));
}
else
{
String s = states.poll(timing.forWaiting().milliseconds(), TimeUnit.MILLISECONDS);
assertTrue("false".equals(s) || ConnectionState.SUSPENDED.name().equals(s));
s = states.poll(timing.forWaiting().milliseconds(), TimeUnit.MILLISECONDS);
assertTrue("false".equals(s) || ConnectionState.SUSPENDED.name().equals(s));
server.restart();
assertEquals(states.poll(timing.forWaiting().milliseconds(), TimeUnit.MILLISECONDS), ConnectionState.RECONNECTED.name());
assertEquals(states.poll(timing.forWaiting().milliseconds(), TimeUnit.MILLISECONDS), "true");
}
}
finally
{
CloseableUtils.closeQuietly(latch);
CloseableUtils.closeQuietly(client);
}
}
}
@Test
public void testErrorPolicies() throws Exception
{
Timing2 timing = new Timing2();
LeaderLatch latch = null;
CuratorFramework client = CuratorFrameworkFactory.builder()
.connectString(server.getConnectString())
.connectionTimeoutMs(1000)
.sessionTimeoutMs(timing.session())
.retryPolicy(new RetryOneTime(1))
.connectionStateErrorPolicy(new StandardConnectionStateErrorPolicy())
.build();
try
{
final BlockingQueue<String> states = Queues.newLinkedBlockingQueue();
ConnectionStateListener stateListener = new ConnectionStateListener()
{
@Override
public void stateChanged(CuratorFramework client, ConnectionState newState)
{
states.add(newState.name());
}
};
client.getConnectionStateListenable().addListener(stateListener);
client.start();
latch = new LeaderLatch(client, "/test");
LeaderLatchListener listener = new LeaderLatchListener()
{
@Override
public void isLeader()
{
states.add("true");
}
@Override
public void notLeader()
{
states.add("false");
}
};
latch.addListener(listener);
latch.start();
assertEquals(states.poll(timing.forWaiting().milliseconds(), TimeUnit.MILLISECONDS), ConnectionState.CONNECTED.name());
assertEquals(states.poll(timing.forWaiting().milliseconds(), TimeUnit.MILLISECONDS), "true");
server.close();
List<String> next = Lists.newArrayList();
next.add(states.poll(timing.forWaiting().milliseconds(), TimeUnit.MILLISECONDS));
next.add(states.poll(timing.forWaiting().milliseconds(), TimeUnit.MILLISECONDS));
assertTrue(next.equals(Arrays.asList(ConnectionState.SUSPENDED.name(), "false")) || next.equals(Arrays.asList("false", ConnectionState.SUSPENDED.name())), next.toString());
assertEquals(states.poll(timing.forSessionSleep().milliseconds(), TimeUnit.MILLISECONDS), ConnectionState.LOST.name());
latch.close();
client.close();
timing.sleepABit();
states.clear();
server = new TestingServer();
client = CuratorFrameworkFactory.builder()
.connectString(server.getConnectString())
.connectionTimeoutMs(1000)
.sessionTimeoutMs(timing.session())
.retryPolicy(new RetryOneTime(1))
.connectionStateErrorPolicy(new SessionConnectionStateErrorPolicy())
.build();
client.getConnectionStateListenable().addListener(stateListener);
client.start();
latch = new LeaderLatch(client, "/test");
latch.addListener(listener);
latch.start();
assertEquals(states.poll(timing.forWaiting().milliseconds(), TimeUnit.MILLISECONDS), ConnectionState.CONNECTED.name());
assertEquals(states.poll(timing.forWaiting().milliseconds(), TimeUnit.MILLISECONDS), "true");
server.close();
assertEquals(states.poll(timing.forWaiting().milliseconds(), TimeUnit.MILLISECONDS), ConnectionState.SUSPENDED.name());
next = Lists.newArrayList();
next.add(states.poll(timing.forSessionSleep().milliseconds(), TimeUnit.MILLISECONDS));
next.add(states.poll(timing.forSessionSleep().milliseconds(), TimeUnit.MILLISECONDS));
assertTrue(next.equals(Arrays.asList(ConnectionState.LOST.name(), "false")) || next.equals(Arrays.asList("false", ConnectionState.LOST.name())), next.toString());
}
finally
{
CloseableUtils.closeQuietly(latch);
CloseableUtils.closeQuietly(client);
}
}
@Test
public void testProperCloseWithoutConnectionEstablished() throws Exception
{
server.stop();
Timing timing = new Timing();
LeaderLatch latch = null;
CuratorFramework client = CuratorFrameworkFactory.newClient(server.getConnectString(), timing.session(), timing.connection(), new RetryOneTime(1));
try
{
client.start();
final AtomicBoolean resetCalled = new AtomicBoolean(false);
final CountDownLatch cancelStartTaskLatch = new CountDownLatch(1);
latch = new LeaderLatch(client, PATH_NAME)
{
@Override
void reset() throws Exception
{
resetCalled.set(true);
super.reset();
}
@Override
protected boolean cancelStartTask()
{
if ( super.cancelStartTask() )
{
cancelStartTaskLatch.countDown();
return true;
}
return false;
}
};
latch.start();
latch.close();
latch = null;
assertTrue(timing.awaitLatch(cancelStartTaskLatch));
assertFalse(resetCalled.get());
}
finally
{
CloseableUtils.closeQuietly(latch);
TestCleanState.closeAndTestClean(client);
}
}
@Test
public void testResetRace() throws Exception
{
Timing timing = new Timing();
LeaderLatch latch = null;
CuratorFramework client = CuratorFrameworkFactory.newClient(server.getConnectString(), timing.session(), timing.connection(), new RetryOneTime(1));
try
{
client.start();
latch = new LeaderLatch(client, PATH_NAME);
latch.debugResetWaitLatch = new CountDownLatch(1);
latch.start(); // will call reset()
latch.reset(); // should not result in two nodes
timing.sleepABit();
latch.debugResetWaitLatch.countDown();
timing.sleepABit();
assertEquals(client.getChildren().forPath(PATH_NAME).size(), 1);
}
finally
{
CloseableUtils.closeQuietly(latch);
TestCleanState.closeAndTestClean(client);
}
}
@Test
public void testCreateDeleteRace() throws Exception
{
Timing timing = new Timing();
CuratorFramework client = CuratorFrameworkFactory.newClient(server.getConnectString(), timing.session(), timing.connection(), new RetryOneTime(1));
try
{
client.start();
client.create().creatingParentsIfNeeded().forPath(PATH_NAME);
LeaderLatch latch = new LeaderLatch(client, PATH_NAME);
latch.debugResetWaitLatch = new CountDownLatch(1);
latch.start();
latch.close();
timing.sleepABit();
latch.debugResetWaitLatch.countDown();
timing.sleepABit();
assertEquals(client.getChildren().forPath(PATH_NAME).size(), 0);
}
finally
{
TestCleanState.closeAndTestClean(client);
}
}
@Test
public void testLostConnection() throws Exception
{
final int PARTICIPANT_QTY = 10;
List<LeaderLatch> latches = Lists.newArrayList();
final Timing timing = new Timing();
final CuratorFramework client = CuratorFrameworkFactory.newClient(server.getConnectString(), timing.session(), timing.connection(), new RetryOneTime(1));
try
{
client.start();
final CountDownLatch countDownLatch = new CountDownLatch(1);
client.getConnectionStateListenable().addListener(new ConnectionStateListener()
{
@Override
public void stateChanged(CuratorFramework client, ConnectionState newState)
{
if ( newState == ConnectionState.LOST )
{
countDownLatch.countDown();
}
}
});
for ( int i = 0; i < PARTICIPANT_QTY; ++i )
{
LeaderLatch latch = new LeaderLatch(client, PATH_NAME);
latch.start();
latches.add(latch);
}
waitForALeader(latches, timing);
server.stop();
assertTrue(timing.awaitLatch(countDownLatch));
timing.forWaiting().sleepABit();
assertEquals(getLeaders(latches).size(), 0);
server.restart();
assertEquals(waitForALeader(latches, timing).size(), 1); // should reconnect
}
finally
{
for ( LeaderLatch latch : latches )
{
CloseableUtils.closeQuietly(latch);
}
TestCleanState.closeAndTestClean(client);
}
}
@Test
public void testCorrectWatching() throws Exception
{
final int PARTICIPANT_QTY = 10;
final int PARTICIPANT_ID = 2;
List<LeaderLatch> latches = Lists.newArrayList();
final Timing timing = new Timing();
final CuratorFramework client = CuratorFrameworkFactory.newClient(server.getConnectString(), timing.session(), timing.connection(), new RetryOneTime(1));
try
{
client.start();
for ( int i = 0; i < PARTICIPANT_QTY; ++i )
{
LeaderLatch latch = new LeaderLatch(client, PATH_NAME);
latch.start();
latches.add(latch);
waitForALeader(latches, timing);
}
//we need to close a Participant that doesn't be actual leader (first Participant) nor the last
latches.get(PARTICIPANT_ID).close();
//As the previous algorithm assumed that if the watched node is deleted gets the leadership
//we need to ensure that the PARTICIPANT_ID-1 is not getting (wrongly) elected as leader.
assertTrue(!latches.get(PARTICIPANT_ID - 1).hasLeadership());
}
finally
{
//removes the already closed participant
latches.remove(PARTICIPANT_ID);
for ( LeaderLatch latch : latches )
{
CloseableUtils.closeQuietly(latch);
}
TestCleanState.closeAndTestClean(client);
}
}
@Test
public void testWaiting() throws Exception
{
final int LOOPS = 10;
for ( int i = 0; i < LOOPS; ++i )
{
System.out.println("TRY #" + i);
internalTestWaitingOnce();
Thread.sleep(10);
}
}
private void internalTestWaitingOnce() throws Exception
{
final int PARTICIPANT_QTY = 10;
ExecutorService executorService = Executors.newFixedThreadPool(PARTICIPANT_QTY);
ExecutorCompletionService<Void> service = new ExecutorCompletionService<Void>(executorService);
final Timing timing = new Timing();
final CuratorFramework client = CuratorFrameworkFactory.newClient(server.getConnectString(), timing.session(), timing.connection(), new RetryOneTime(1));
try
{
client.start();
final AtomicBoolean thereIsALeader = new AtomicBoolean(false);
for ( int i = 0; i < PARTICIPANT_QTY; ++i )
{
service.submit(new Callable<Void>()
{
@Override
public Void call() throws Exception
{
LeaderLatch latch = new LeaderLatch(client, PATH_NAME);
try
{
latch.start();
assertTrue(latch.await(timing.forWaiting().seconds(), TimeUnit.SECONDS));
assertTrue(thereIsALeader.compareAndSet(false, true));
Thread.sleep((int)(10 * Math.random()));
thereIsALeader.set(false);
}
finally
{
latch.close();
}
return null;
}
});
}
for ( int i = 0; i < PARTICIPANT_QTY; ++i )
{
service.take().get();
}
}
finally
{
executorService.shutdownNow();
TestCleanState.closeAndTestClean(client);
}
}
@Test
public void testBasic() throws Exception
{
basic(Mode.START_IMMEDIATELY);
}
@Test
public void testBasicAlt() throws Exception
{
basic(Mode.START_IN_THREADS);
}
@Test
public void testCallbackSanity() throws Exception
{
final int PARTICIPANT_QTY = 10;
final CountDownLatch timesSquare = new CountDownLatch(PARTICIPANT_QTY);
final AtomicLong masterCounter = new AtomicLong(0);
final AtomicLong notLeaderCounter = new AtomicLong(0);
Timing timing = new Timing();
CuratorFramework client = CuratorFrameworkFactory.newClient(server.getConnectString(), timing.session(), timing.connection(), new RetryOneTime(1));
ExecutorService exec = Executors.newSingleThreadExecutor(new ThreadFactoryBuilder().setDaemon(true).setNameFormat("callbackSanity-%s").build());
List<LeaderLatch> latches = Lists.newArrayList();
for ( int i = 0; i < PARTICIPANT_QTY; ++i )
{
final LeaderLatch latch = new LeaderLatch(client, PATH_NAME);
latch.addListener(new LeaderLatchListener()
{
boolean beenLeader = false;
@Override
public void isLeader()
{
if ( !beenLeader )
{
masterCounter.incrementAndGet();
beenLeader = true;
try
{
latch.reset();
}
catch ( Exception e )
{
throw Throwables.propagate(e);
}
}
else
{
masterCounter.incrementAndGet();
CloseableUtils.closeQuietly(latch);
timesSquare.countDown();
}
}
@Override
public void notLeader()
{
notLeaderCounter.incrementAndGet();
}
}, exec);
latches.add(latch);
}
try
{
client.start();
for ( LeaderLatch latch : latches )
{
latch.start();
}
timesSquare.await();
assertEquals(masterCounter.get(), PARTICIPANT_QTY * 2);
assertEquals(notLeaderCounter.get(), PARTICIPANT_QTY);
for ( LeaderLatch latch : latches )
{
assertEquals(latch.getState(), LeaderLatch.State.CLOSED);
}
}
finally
{
for ( LeaderLatch latch : latches )
{
if ( latch.getState() != LeaderLatch.State.CLOSED )
{
CloseableUtils.closeQuietly(latch);
}
}
TestCleanState.closeAndTestClean(client);
}
}
@Test
public void testCallbackNotifyLeader() throws Exception
{
final int PARTICIPANT_QTY = 10;
final int SILENT_QTY = 3;
final CountDownLatch timesSquare = new CountDownLatch(PARTICIPANT_QTY);
final AtomicLong masterCounter = new AtomicLong(0);
final AtomicLong notLeaderCounter = new AtomicLong(0);
Timing timing = new Timing();
CuratorFramework client = CuratorFrameworkFactory.newClient(server.getConnectString(), timing.session(), timing.connection(), new RetryOneTime(1));
ExecutorService exec = Executors.newSingleThreadExecutor(new ThreadFactoryBuilder().setDaemon(true).setNameFormat("callbackNotifyLeader-%s").build());
List<LeaderLatch> latches = Lists.newArrayList();
for ( int i = 0; i < PARTICIPANT_QTY; ++i )
{
LeaderLatch.CloseMode closeMode = i < SILENT_QTY ? LeaderLatch.CloseMode.SILENT : LeaderLatch.CloseMode.NOTIFY_LEADER;
final LeaderLatch latch = new LeaderLatch(client, PATH_NAME, "", closeMode);
latch.addListener(new LeaderLatchListener()
{
boolean beenLeader = false;
@Override
public void isLeader()
{
if ( !beenLeader )
{
masterCounter.incrementAndGet();
beenLeader = true;
try
{
latch.reset();
}
catch ( Exception e )
{
throw Throwables.propagate(e);
}
}
else
{
masterCounter.incrementAndGet();
CloseableUtils.closeQuietly(latch);
timesSquare.countDown();
}
}
@Override
public void notLeader()
{
notLeaderCounter.incrementAndGet();
}
}, exec);
latches.add(latch);
}
try
{
client.start();
for ( LeaderLatch latch : latches )
{
latch.start();
}
timesSquare.await();
assertEquals(masterCounter.get(), PARTICIPANT_QTY * 2);
assertEquals(notLeaderCounter.get(), PARTICIPANT_QTY * 2 - SILENT_QTY);
for ( LeaderLatch latch : latches )
{
assertEquals(latch.getState(), LeaderLatch.State.CLOSED);
}
}
finally
{
for ( LeaderLatch latch : latches )
{
if ( latch.getState() != LeaderLatch.State.CLOSED )
{
CloseableUtils.closeQuietly(latch);
}
}
TestCleanState.closeAndTestClean(client);
}
}
@Test
public void testCallbackDontNotify() throws Exception
{
final AtomicLong masterCounter = new AtomicLong(0);
final AtomicLong notLeaderCounter = new AtomicLong(0);
Timing timing = new Timing();
CuratorFramework client = CuratorFrameworkFactory.newClient(server.getConnectString(), timing.session(), timing.connection(), new RetryOneTime(1));
final LeaderLatch leader = new LeaderLatch(client, PATH_NAME);
final LeaderLatch notifiedLeader = new LeaderLatch(client, PATH_NAME, "", LeaderLatch.CloseMode.NOTIFY_LEADER);
leader.addListener(new LeaderLatchListener()
{
@Override
public void isLeader()
{
}
@Override
public void notLeader()
{
masterCounter.incrementAndGet();
}
});
notifiedLeader.addListener(new LeaderLatchListener()
{
@Override
public void isLeader()
{
}
@Override
public void notLeader()
{
notLeaderCounter.incrementAndGet();
}
});
try
{
client.start();
leader.start();
timing.sleepABit();
notifiedLeader.start();
timing.sleepABit();
notifiedLeader.close();
timing.sleepABit();
// Test the close override
leader.close(LeaderLatch.CloseMode.NOTIFY_LEADER);
assertEquals(leader.getState(), LeaderLatch.State.CLOSED);
assertEquals(notifiedLeader.getState(), LeaderLatch.State.CLOSED);
assertEquals(masterCounter.get(), 1);
assertEquals(notLeaderCounter.get(), 0);
}
finally
{
if ( leader.getState() != LeaderLatch.State.CLOSED )
{
CloseableUtils.closeQuietly(leader);
}
if ( notifiedLeader.getState() != LeaderLatch.State.CLOSED )
{
CloseableUtils.closeQuietly(notifiedLeader);
}
TestCleanState.closeAndTestClean(client);
}
}
@Test
public void testNoServerAtStart()
{
CloseableUtils.closeQuietly(server);
Timing timing = new Timing();
CuratorFramework client = CuratorFrameworkFactory.newClient(server.getConnectString(), timing.session(), timing.connection(), new RetryNTimes(5, 1000));
client.start();
final LeaderLatch leader = new LeaderLatch(client, PATH_NAME);
final CountDownLatch leaderCounter = new CountDownLatch(1);
final AtomicInteger leaderCount = new AtomicInteger(0);
final AtomicInteger notLeaderCount = new AtomicInteger(0);
leader.addListener(new LeaderLatchListener()
{
@Override
public void isLeader()
{
leaderCounter.countDown();
leaderCount.incrementAndGet();
}
@Override
public void notLeader()
{
notLeaderCount.incrementAndGet();
}
});
try
{
leader.start();
timing.sleepABit();
// Start the new server
server = new TestingServer(server.getPort(), server.getTempDirectory());
assertTrue(timing.awaitLatch(leaderCounter), "Not elected leader");
assertEquals(leaderCount.get(), 1, "Elected too many times");
assertEquals(notLeaderCount.get(), 0, "Unelected too many times");
}
catch ( Exception e )
{
fail("Unexpected exception", e);
}
finally
{
CloseableUtils.closeQuietly(leader);
TestCleanState.closeAndTestClean(client);
CloseableUtils.closeQuietly(server);
}
}
private enum Mode
{
START_IMMEDIATELY,
START_IN_THREADS
}
private void basic(Mode mode) throws Exception
{
final int PARTICIPANT_QTY = 1;//0;
List<LeaderLatch> latches = Lists.newArrayList();
Timing timing = new Timing();
CuratorFramework client = CuratorFrameworkFactory.newClient(server.getConnectString(), timing.session(), timing.connection(), new RetryOneTime(1));
try
{
client.start();
for ( int i = 0; i < PARTICIPANT_QTY; ++i )
{
LeaderLatch latch = new LeaderLatch(client, PATH_NAME);
if ( mode == Mode.START_IMMEDIATELY )
{
latch.start();
}
latches.add(latch);
}
if ( mode == Mode.START_IN_THREADS )
{
ExecutorService service = Executors.newFixedThreadPool(latches.size());
for ( final LeaderLatch latch : latches )
{
service.submit(new Callable<Object>()
{
@Override
public Object call() throws Exception
{
Thread.sleep((int)(100 * Math.random()));
latch.start();
return null;
}
});
}
service.shutdown();
}
while ( latches.size() > 0 )
{
List<LeaderLatch> leaders = waitForALeader(latches, timing);
assertEquals(leaders.size(), 1); // there can only be one leader
LeaderLatch theLeader = leaders.get(0);
if ( mode == Mode.START_IMMEDIATELY )
{
assertEquals(latches.indexOf(theLeader), 0); // assert ordering - leadership should advance in start order
}
theLeader.close();
latches.remove(theLeader);
}
}
finally
{
for ( LeaderLatch latch : latches )
{
CloseableUtils.closeQuietly(latch);
}
TestCleanState.closeAndTestClean(client);
}
}
private List<LeaderLatch> waitForALeader(List<LeaderLatch> latches, Timing timing) throws InterruptedException
{
for ( int i = 0; i < MAX_LOOPS; ++i )
{
List<LeaderLatch> leaders = getLeaders(latches);
if ( leaders.size() != 0 )
{
return leaders;
}
timing.sleepABit();
}
return Lists.newArrayList();
}
private List<LeaderLatch> getLeaders(Collection<LeaderLatch> latches)
{
List<LeaderLatch> leaders = Lists.newArrayList();
for ( LeaderLatch latch : latches )
{
if ( latch.hasLeadership() )
{
leaders.add(latch);
}
}
return leaders;
}
@Test
public void testRelativePath()
{
assertThrows(IllegalArgumentException.class, ()->{
Timing timing = new Timing();
CuratorFramework client = CuratorFrameworkFactory.newClient(server.getConnectString(), timing.session(), timing.connection(), new RetryOneTime(1));
new LeaderLatch(client, "parent");
});
}
}