blob: f8343c24ac2e80d0dfffc42c8899ed9ef7ff1bfd [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.solr.cloud;
import java.io.IOException;
import java.lang.invoke.MethodHandles;
import java.nio.file.Path;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import org.apache.lucene.util.LuceneTestCase;
import org.apache.lucene.util.LuceneTestCase.Slow;
import org.apache.solr.SolrTestCaseJ4;
import org.apache.solr.SolrTestUtil;
import org.apache.solr.common.cloud.OnReconnect;
import org.apache.solr.common.cloud.Replica;
import org.apache.solr.common.cloud.SolrZkClient;
import org.apache.solr.common.cloud.SolrZooKeeper;
import org.apache.solr.common.cloud.ZkCoreNodeProps;
import org.apache.solr.common.cloud.ZkNodeProps;
import org.apache.solr.common.cloud.ZkStateReader;
import org.apache.solr.common.util.SolrNamedThreadFactory;
import org.apache.solr.core.CoreDescriptor;
import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.KeeperException.NoNodeException;
import org.apache.zookeeper.KeeperException.SessionExpiredException;
import org.junit.AfterClass;
import org.junit.BeforeClass;
import org.junit.Ignore;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@Slow
@LuceneTestCase.Nightly
@Ignore // MRM-TEST TODO:
public class LeaderElectionTest extends SolrTestCaseJ4 {
private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
static final int TIMEOUT = 30000;
private ZkTestServer server;
private SolrZkClient zkClient;
private ZkStateReader zkStateReader;
private Map<Integer,Thread> seqToThread;
private volatile boolean stopStress = false;
@BeforeClass
public static void beforeClass() {
}
@AfterClass
public static void afterClass() {
}
@Override
public void setUp() throws Exception {
super.setUp();
Path zkDir = SolrTestUtil.createTempDir("zkData");
server = new ZkTestServer(zkDir);
server.setTheTickTime(1000);
server.run();
zkClient = server.getZkClient();
zkStateReader = new ZkStateReader(zkClient);
seqToThread = new ConcurrentHashMap<>();
zkClient.mkdirs("/collections/collection1");
zkClient.mkdir("/collections/collection2");
zkClient.mkdir("/collections/collection1/election");
zkClient.mkdir("/collections/collection2/election");
}
static class TestLeaderElectionContext extends ShardLeaderElectionContextBase {
private long runLeaderDelay = 0;
public TestLeaderElectionContext(LeaderElector leaderElector,
String shardId, String collection, String coreNodeName, Replica props,
ZkController zkController, long runLeaderDelay, CoreDescriptor cd) {
super (coreNodeName, "/collections/" + collection,
"/collections/" + collection + "/leader", props, cd, zkController.getZkClient());
this.runLeaderDelay = runLeaderDelay;
}
@Override
boolean runLeaderProcess(ElectionContext context, boolean weAreReplacement, int pauseBeforeStartMs)
throws KeeperException, InterruptedException, IOException {
super.runLeaderProcess(context, weAreReplacement, pauseBeforeStartMs);
if (runLeaderDelay > 0) {
log.info("Sleeping for {}ms to simulate leadership takeover delay", runLeaderDelay);
Thread.sleep(runLeaderDelay);
}
return true;
}
}
class ElectorSetup {
SolrZkClient zkClient;
ZkStateReader zkStateReader;
ZkController zkController;
LeaderElector elector;
public ElectorSetup(OnReconnect onReconnect) {
zkClient = server.getZkClient();
zkStateReader = new ZkStateReader(zkClient);
elector = new LeaderElector(zkController);
zkController = MockSolrSource.makeSimpleMock(null, zkStateReader, null);
}
public void close() {
if (!zkClient.isClosed()) {
zkClient.close();
}
zkStateReader.close();
}
}
class ClientThread extends Thread {
ElectorSetup es;
private String shard;
private String nodeName;
private long runLeaderDelay = 0;
private volatile int seq = -1;
private volatile boolean stop;
private volatile boolean electionDone = false;
private final Replica replica;
public ClientThread(String shard, int nodeNumber) throws Exception {
this(null, shard, nodeNumber, 0);
}
public ClientThread(ElectorSetup es, String shard, int nodeNumber, long runLeaderDelay) throws Exception {
super("Thread-" + shard + nodeNumber);
this.shard = shard;
this.nodeName = shard + nodeNumber;
this.runLeaderDelay = runLeaderDelay;
Map<String,Object> props = new HashMap<>();
props.put(ZkStateReader.NODE_NAME_PROP, Integer.toString(nodeNumber));
props.put(ZkStateReader.CORE_NAME_PROP, "");
replica = new Replica("", props, "", -1l, shard, zkStateReader);
this.es = es;
if (this.es == null) {
this.es = new ElectorSetup(new OnReconnect() {
@Override
public void command() throws SessionExpiredException {
try {
setupOnConnect();
} catch (Throwable t) {
}
}
@Override
public String getName() {
return "test";
}
});
}
}
private void setupOnConnect() throws InterruptedException, KeeperException,
IOException {
assertNotNull(es);
TestLeaderElectionContext context = new TestLeaderElectionContext(es.elector, shard, "collection1", nodeName, replica, es.zkController, runLeaderDelay, null);
es.elector.setup(context);
// MRM TODO: - we have to get the seq another way, now returns if become leader first try
//seq = es.elector.joinElection(context, false);
electionDone = true;
seqToThread.put(seq, this);
}
@Override
public void run() {
try {
setupOnConnect();
} catch (Throwable e) {
log.error("setup failed", e);
es.close();
return;
}
while (!stop) {
try {
Thread.sleep(100);
} catch (InterruptedException e) {
return;
}
}
}
public void close() {
es.close();
this.stop = true;
}
public int getSeq() {
return seq;
}
}
// MRM TODO:
// @Test
// public void testBasic() throws Exception {
// LeaderElector elector = new LeaderElector(zkClient);
// ZkNodeProps props = new ZkNodeProps(ZkStateReader.BASE_URL_PROP,
// "http://127.0.0.1/solr/", ZkStateReader.CORE_NAME_PROP, "");
// ZkController zkController = MockSolrSource.makeSimpleMock(null, null, zkClient);
// ElectionContext context = new ShardLeaderElectionContextBase(elector,
// "s2", "collection1", "dummynode1", props, zkController);
// elector.setup(context);
// elector.joinElection(context, false);
// assertEquals("http://127.0.0.1/solr/",
// getLeaderUrl("collection1", "s2"));
// }
// MRM TODO:
// @Test
// public void testCancelElection() throws Exception {
// LeaderElector first = new LeaderElector(zkClient);
// ZkNodeProps props = new ZkNodeProps(ZkStateReader.BASE_URL_PROP,
// "http://127.0.0.1/solr/", ZkStateReader.CORE_NAME_PROP, "1");
// ZkController zkController = MockSolrSource.makeSimpleMock(null, null, zkClient);
// ElectionContext firstContext = new ShardLeaderElectionContextBase(first,
// "slice1", "collection2", "dummynode1", props, zkController);
// first.setup(firstContext);
// first.joinElection(firstContext, false);
//
// Thread.sleep(1000);
// assertEquals("original leader was not registered", "http://127.0.0.1/solr/1/", getLeaderUrl("collection2", "slice1"));
//
// LeaderElector second = new LeaderElector(zkClient);
// props = new ZkNodeProps(ZkStateReader.BASE_URL_PROP,
// "http://127.0.0.1/solr/", ZkStateReader.CORE_NAME_PROP, "2");
// zkController = MockSolrSource.makeSimpleMock(null, null, zkClient);
// ElectionContext context = new ShardLeaderElectionContextBase(second,
// "slice1", "collection2", "dummynode2", props, zkController);
// second.setup(context);
// second.joinElection(context, false);
// Thread.sleep(1000);
// assertEquals("original leader should have stayed leader", "http://127.0.0.1/solr/1/", getLeaderUrl("collection2", "slice1"));
// firstContext.cancelElection();
// Thread.sleep(1000);
// assertEquals("new leader was not registered", "http://127.0.0.1/solr/2/", getLeaderUrl("collection2", "slice1"));
// }
private String getLeaderUrl(final String collection, final String slice)
throws KeeperException, InterruptedException {
int iterCount = 60;
while (iterCount-- > 0) {
try {
byte[] data = zkClient.getData(
ZkStateReader.getShardLeadersPath(collection, slice), null, null);
ZkCoreNodeProps leaderProps = new ZkCoreNodeProps(
ZkNodeProps.load(data));
// MRM TODO:
Replica replica = new Replica("", leaderProps.getNodeProps().getProperties(), collection, -1l, slice, zkStateReader);
return replica.getCoreUrl();
} catch (NoNodeException | SessionExpiredException e) {
Thread.sleep(500);
}
}
throw new RuntimeException("Could not get leader props for " + collection + " " + slice);
}
private static void startAndJoinElection (List<ClientThread> threads) throws InterruptedException {
for (Thread thread : threads) {
thread.start();
}
int cnt = 0;
while (true) { // wait for election to complete
if (cnt++ == 100) {
fail("Timeout starting and joining election");
}
int doneCount = 0;
for (ClientThread thread : threads) {
if (thread.electionDone) {
doneCount++;
}
}
if (doneCount == threads.size()) {
break;
}
Thread.sleep(250);
}
}
@Test
public void testElection() throws Exception {
List<ClientThread> threads = new ArrayList<>();
for (int i = 0; i < 15; i++) {
ClientThread thread = new ClientThread("shard1", i);
threads.add(thread);
}
try {
startAndJoinElection(threads);
int leaderThread = getLeaderThread();
// whoever the leader is, should be the n_0 seq
assertEquals(0, threads.get(leaderThread).seq);
// kill n_0, 1, 3 and 4
((ClientThread) seqToThread.get(0)).close();
waitForLeader(threads, 1);
leaderThread = getLeaderThread();
// whoever the leader is, should be the n_1 seq
assertEquals(1, threads.get(leaderThread).seq);
((ClientThread) seqToThread.get(4)).close();
((ClientThread) seqToThread.get(1)).close();
((ClientThread) seqToThread.get(3)).close();
// whoever the leader is, should be the n_2 seq
waitForLeader(threads, 2);
leaderThread = getLeaderThread();
assertEquals(2, threads.get(leaderThread).seq);
// kill n_5, 2, 6, 7, and 8
((ClientThread) seqToThread.get(5)).close();
((ClientThread) seqToThread.get(2)).close();
((ClientThread) seqToThread.get(6)).close();
((ClientThread) seqToThread.get(7)).close();
((ClientThread) seqToThread.get(8)).close();
waitForLeader(threads, 9);
leaderThread = getLeaderThread();
// whoever the leader is, should be the n_9 seq
assertEquals(9, threads.get(leaderThread).seq);
} finally {
// cleanup any threads still running
for (ClientThread thread : threads) {
thread.close();
thread.interrupt();
}
for (Thread thread : threads) {
thread.join();
}
}
}
@Test
public void testParallelElection() throws Exception {
final int numShards = 2 + random().nextInt(18);
log.info("Testing parallel election across {} shards", numShards);
List<ClientThread> threads = new ArrayList<>();
try {
List<ClientThread> replica1s = new ArrayList<>();
ElectorSetup es1 = new ElectorSetup(null);
for (int i = 1; i <= numShards; i++) {
ClientThread thread = new ClientThread(es1, "parshard" + i, 1, 0 /* don't delay */);
threads.add(thread);
replica1s.add(thread);
}
startAndJoinElection(replica1s);
log.info("First replicas brought up and registered");
// bring up second in line
List<ClientThread> replica2s = new ArrayList<>();
ElectorSetup es2 = new ElectorSetup(null);
for (int i = 1; i <= numShards; i++) {
ClientThread thread = new ClientThread(es2, "parshard" + i, 2, 40000 / (numShards - 1) /* delay enough to timeout or expire */);
threads.add(thread);
replica2s.add(thread);
}
startAndJoinElection(replica2s);
log.info("Second replicas brought up and registered");
// disconnect the leaders
es1.close();
for (int i = 1; i <= numShards; i ++) {
// if this test fails, getLeaderUrl will more likely throw an exception and fail the test,
// but add an assertEquals as well for good measure
assertEquals("2/", getLeaderUrl("collection1", "parshard" + i));
}
} finally {
// cleanup any threads still running
for (ClientThread thread : threads) {
thread.close();
thread.interrupt();
}
for (Thread thread : threads) {
thread.join();
}
}
}
private void waitForLeader(List<ClientThread> threads, int seq)
throws KeeperException, InterruptedException {
int leaderThread;
int tries = 0;
leaderThread = getLeaderThread();
while (threads.get(leaderThread).seq < seq) {
leaderThread = getLeaderThread();
if (tries++ > 50) {
break;
}
Thread.sleep(200);
}
}
private int getLeaderThread() throws KeeperException, InterruptedException {
String leaderUrl = getLeaderUrl("collection1", "shard1");
return Integer.parseInt(leaderUrl.replaceAll("/", ""));
}
@Test
public void testStressElection() throws Exception {
final ScheduledExecutorService scheduler = Executors
.newScheduledThreadPool(3, new SolrNamedThreadFactory("stressElection"));
final List<ClientThread> threads = Collections
.synchronizedList(new ArrayList<ClientThread>());
// start with a leader
ClientThread thread1 = null;
thread1 = new ClientThread("shard1", 0);
threads.add(thread1);
scheduler.schedule(thread1, 0, TimeUnit.MILLISECONDS);
Thread scheduleThread = new Thread() {
@Override
public void run() {
int count = SolrTestUtil.atLeast(5);
for (int i = 1; i < count; i++) {
int launchIn = random().nextInt(500);
ClientThread thread = null;
try {
thread = new ClientThread("shard1", i);
} catch (Exception e) {
//
}
if (thread != null) {
threads.add(thread);
scheduler.schedule(thread, launchIn, TimeUnit.MILLISECONDS);
}
}
}
};
Thread killThread = new Thread() {
@Override
public void run() {
while (!stopStress) {
try {
int j;
try {
// always 1 we won't kill...
j = random().nextInt(threads.size() - 2);
} catch(IllegalArgumentException e) {
continue;
}
try {
threads.get(j).close();
} catch (Exception e) {
}
Thread.sleep(10);
} catch (Exception e) {
}
}
}
};
Thread connLossThread = new Thread() {
@Override
public void run() {
while (!stopStress) {
try {
Thread.sleep(50);
int j;
j = random().nextInt(threads.size());
try {
((SolrZooKeeper)threads.get(j).es.zkClient.getConnectionManager().getKeeper()).closeCnxn();
if (random().nextBoolean()) {
long sessionId = zkClient.getSessionId();
server.expire(sessionId);
}
} catch (Exception e) {
log.error("", e);
}
Thread.sleep(500);
} catch (Exception e) {
}
}
}
};
scheduleThread.start();
connLossThread.start();
killThread.start();
Thread.sleep(4000);
stopStress = true;
scheduleThread.interrupt();
connLossThread.interrupt();
killThread.interrupt();
scheduleThread.join();
scheduler.shutdown();
connLossThread.join();
killThread.join();
int seq = threads.get(getLeaderThread()).getSeq();
// we have a leader we know, TODO: lets check some other things
// cleanup any threads still running
for (ClientThread thread : threads) {
// thread.es.zkClient.getSolrZooKeeper().close();
thread.close();
}
for (Thread thread : threads) {
thread.join();
}
}
@Override
public void tearDown() throws Exception {
zkClient.close();
zkStateReader.close();
server.printLayout();
server.shutdown();
super.tearDown();
}
private void printLayout() throws Exception {
zkClient.printLayoutToStream(System.out);
}
}