blob: acaa0c7257ef16ad7cab0fd943e88d9011ce4864 [file] [log] [blame]
package org.apache.solr.cloud;
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
import org.apache.lucene.util.LuceneTestCase.Slow;
import org.apache.solr.SolrTestCaseJ4;
import org.apache.solr.common.cloud.SolrZkClient;
import org.apache.solr.common.cloud.ZkNodeProps;
import org.apache.solr.common.cloud.ZkStateReader;
import org.apache.solr.core.CoreContainer;
import org.junit.AfterClass;
import org.junit.BeforeClass;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.xml.sax.SAXException;
import javax.xml.parsers.ParserConfigurationException;
import java.io.File;
import java.io.IOException;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
@Slow
public class LeaderElectionIntegrationTest extends SolrTestCaseJ4 {
protected static Logger log = LoggerFactory
.getLogger(AbstractZkTestCase.class);
private final static int NUM_SHARD_REPLICAS = 5;
private static final boolean VERBOSE = false;
private static final Pattern HOST = Pattern
.compile(".*?\\:(\\d\\d\\d\\d)_.*");
protected ZkTestServer zkServer;
protected String zkDir;
private Map<Integer,CoreContainer> containerMap = new HashMap<Integer,CoreContainer>();
private Map<String,Set<Integer>> shardPorts = new HashMap<String,Set<Integer>>();
private SolrZkClient zkClient;
private ZkStateReader reader;
@BeforeClass
public static void beforeClass() {
System.setProperty("solrcloud.skip.autorecovery", "true");
}
@Override
public void setUp() throws Exception {
super.setUp();
createTempDir();
ignoreException("No UpdateLog found - cannot sync");
ignoreException("No UpdateLog found - cannot recover");
System.setProperty("zkClientTimeout", "8000");
zkDir = dataDir.getAbsolutePath() + File.separator
+ "zookeeper" + System.currentTimeMillis() + "/server1/data";
zkServer = new ZkTestServer(zkDir);
zkServer.run();
System.setProperty("zkHost", zkServer.getZkAddress());
AbstractZkTestCase.buildZooKeeper(zkServer.getZkHost(),
zkServer.getZkAddress(), "solrconfig.xml", "schema.xml");
log.info("####SETUP_START " + getTestName());
// set some system properties for use by tests
System.setProperty("solr.test.sys.prop1", "propone");
System.setProperty("solr.test.sys.prop2", "proptwo");
for (int i = 7000; i < 7000 + NUM_SHARD_REPLICAS; i++) {
try {
setupContainer(i, "shard1");
} catch (Throwable t) {
log.error("!!!Could not start container:" + i + " The exception thrown was: " + t.getClass() + " " + t.getMessage());
fail("Could not start container:" + i + ". Reason:" + t.getClass() + " " + t.getMessage());
}
}
try {
setupContainer(3333, "shard2");
} catch (Throwable t) {
log.error("!!!Could not start container 3333. The exception thrown was: " + t.getClass() + " " + t.getMessage());
fail("Could not start container: 3333");
}
zkClient = new SolrZkClient(zkServer.getZkAddress(),
AbstractZkTestCase.TIMEOUT);
reader = new ZkStateReader(zkClient);
reader.createClusterStateWatchersAndUpdate();
boolean initSuccessful = false;
for (int i = 0; i < 30; i++) {
List<String> liveNodes = zkClient.getChildren("/live_nodes", null, true);
if (liveNodes.size() == NUM_SHARD_REPLICAS + 1) {
// all nodes up
initSuccessful = true;
break;
}
Thread.sleep(1000);
log.info("Waiting for more nodes to come up, now: " + liveNodes.size()
+ "/" + (NUM_SHARD_REPLICAS + 1));
}
if (!initSuccessful) {
fail("Init was not successful!");
}
log.info("####SETUP_END " + getTestName());
}
private void setupContainer(int port, String shard) throws IOException,
ParserConfigurationException, SAXException {
File data = new File(dataDir + File.separator + "data_" + port);
data.mkdirs();
System.setProperty("hostPort", Integer.toString(port));
System.setProperty("shard", shard);
System.setProperty("solr.data.dir", data.getAbsolutePath());
System.setProperty("solr.solr.home", TEST_HOME());
Set<Integer> ports = shardPorts.get(shard);
if (ports == null) {
ports = new HashSet<Integer>();
shardPorts.put(shard, ports);
}
ports.add(port);
CoreContainer container = new CoreContainer();
container.load();
assertTrue("Container " + port + " has no cores!", container.getCores()
.size() > 0);
containerMap.put(port, container);
System.clearProperty("solr.solr.home");
System.clearProperty("hostPort");
}
@Test
public void testSimpleSliceLeaderElection() throws Exception {
//printLayout(zkServer.getZkAddress());
for (int i = 0; i < 4; i++) {
// who is the leader?
String leader = getLeader();
Set<Integer> shard1Ports = shardPorts.get("shard1");
int leaderPort = getLeaderPort(leader);
assertTrue(shard1Ports.toString(), shard1Ports.contains(leaderPort));
shard1Ports.remove(leaderPort);
// kill the leader
if (VERBOSE) System.out.println("Killing " + leaderPort);
containerMap.get(leaderPort).shutdown();
//printLayout(zkServer.getZkAddress());
// poll until leader change is visible
for (int j = 0; j < 90; j++) {
String currentLeader = getLeader();
if(!leader.equals(currentLeader)) {
break;
}
Thread.sleep(500);
}
leader = getLeader();
int newLeaderPort = getLeaderPort(leader);
int retry = 0;
while (leaderPort == newLeaderPort) {
if (retry++ == 60) {
break;
}
Thread.sleep(1000);
}
if (leaderPort == newLeaderPort) {
zkClient.printLayoutToStdOut();
fail("We didn't find a new leader! " + leaderPort + " was shutdown, but it's still showing as the leader");
}
assertTrue("Could not find leader " + newLeaderPort + " in " + shard1Ports, shard1Ports.contains(newLeaderPort));
}
}
@Test
public void testLeaderElectionAfterClientTimeout() throws Exception {
// TODO: work out the best timing here...
System.setProperty("zkClientTimeout", Integer.toString(ZkTestServer.TICK_TIME * 2 + 100));
// timeout the leader
String leader = getLeader();
int leaderPort = getLeaderPort(leader);
ZkController zkController = containerMap.get(leaderPort).getZkController();
zkController.getZkClient().getSolrZooKeeper().pauseCnxn(zkController.getClientTimeout() + 100);
for (int i = 0; i < 60; i++) { // wait till leader is changed
if (leaderPort != getLeaderPort(getLeader())) {
break;
}
Thread.sleep(100);
}
// make sure we have waited long enough for the first leader to have come back
Thread.sleep(ZkTestServer.TICK_TIME * 2 + 100);
if (VERBOSE) System.out.println("kill everyone");
// kill everyone but the first leader that should have reconnected by now
for (Map.Entry<Integer,CoreContainer> entry : containerMap.entrySet()) {
if (entry.getKey() != leaderPort) {
entry.getValue().shutdown();
}
}
for (int i = 0; i < 320; i++) { // wait till leader is changed
try {
if (leaderPort == getLeaderPort(getLeader())) {
break;
}
Thread.sleep(100);
} catch (Exception e) {
continue;
}
}
// the original leader should be leader again now - everyone else is down
// TODO: I saw this fail once...expected:<7000> but was:<7004>
assertEquals(leaderPort, getLeaderPort(getLeader()));
//printLayout(zkServer.getZkAddress());
//Thread.sleep(100000);
}
private String getLeader() throws InterruptedException {
ZkNodeProps props = reader.getLeaderRetry("collection1", "shard1", 30000);
String leader = props.getStr(ZkStateReader.NODE_NAME_PROP);
return leader;
}
private int getLeaderPort(String leader) {
Matcher m = HOST.matcher(leader);
int leaderPort = 0;
if (m.matches()) {
leaderPort = Integer.parseInt(m.group(1));
if (VERBOSE) System.out.println("The leader is:" + Integer.parseInt(m.group(1)));
} else {
throw new IllegalStateException();
}
return leaderPort;
}
@Override
public void tearDown() throws Exception {
if (VERBOSE) {
printLayout(zkServer.getZkHost());
}
if (zkClient != null) {
zkClient.close();
}
if (reader != null) {
reader.close();
}
for (CoreContainer cc : containerMap.values()) {
if (!cc.isShutDown()) {
cc.shutdown();
}
}
zkServer.shutdown();
super.tearDown();
System.clearProperty("zkClientTimeout");
System.clearProperty("zkHost");
System.clearProperty("hostPort");
System.clearProperty("shard");
System.clearProperty("solrcloud.update.delay");
}
private void printLayout(String zkHost) throws Exception {
SolrZkClient zkClient = new SolrZkClient(zkHost, AbstractZkTestCase.TIMEOUT);
zkClient.printLayoutToStdOut();
zkClient.close();
}
@AfterClass
public static void afterClass() throws InterruptedException {
System.clearProperty("solrcloud.skip.autorecovery");
System.clearProperty("zkClientTimeout");
System.clearProperty("zkHost");
System.clearProperty("shard");
System.clearProperty("solr.data.dir");
System.clearProperty("solr.solr.home");
resetExceptionIgnores();
// wait just a bit for any zk client threads to outlast timeout
Thread.sleep(2000);
}
}