blob: 72888e3ea028ba6d47cf285f8ec7f5e71abf36dc [file] [log] [blame]
package org.apache.solr.cloud;
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
import java.io.File;
import java.io.IOException;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.solr.SolrTestCaseJ4;
import org.apache.solr.common.cloud.CloudState;
import org.apache.solr.common.cloud.CoreState;
import org.apache.solr.common.cloud.Slice;
import org.apache.solr.common.cloud.SolrZkClient;
import org.apache.solr.common.cloud.ZkNodeProps;
import org.apache.solr.common.cloud.ZkStateReader;
import org.apache.solr.core.CoreDescriptor;
import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.KeeperException.Code;
import org.apache.zookeeper.KeeperException.NodeExistsException;
import org.apache.zookeeper.data.Stat;
import org.junit.AfterClass;
import org.junit.BeforeClass;
import org.junit.Test;
public class OverseerTest extends SolrTestCaseJ4 {
static final int TIMEOUT = 10000;
private static final boolean DEBUG = false;
public static class MockZKController{
private final SolrZkClient zkClient;
private final ZkStateReader zkStateReader;
private final String nodeName;
private final String collection;
private final LeaderElector elector;
private final Map<String, CoreState> coreStates = Collections.synchronizedMap(new HashMap<String, CoreState>());
private final Map<String, ElectionContext> electionContext = Collections.synchronizedMap(new HashMap<String, ElectionContext>());
public MockZKController(String zkAddress, String nodeName, String collection) throws InterruptedException, TimeoutException, IOException, KeeperException {
this.nodeName = nodeName;
this.collection = collection;
zkClient = new SolrZkClient(zkAddress, TIMEOUT);
zkStateReader = new ZkStateReader(zkClient);
zkStateReader.createClusterStateWatchersAndUpdate();
Overseer.createClientNodes(zkClient, nodeName);
// live node
final String nodePath = ZkStateReader.LIVE_NODES_ZKNODE + "/" + nodeName;
zkClient.makePath(nodePath, CreateMode.EPHEMERAL, true);
elector = new LeaderElector(zkClient);
}
private void deleteNode(final String path) {
try {
Stat stat = zkClient.exists(path, null, false);
if (stat != null) {
zkClient.delete(path, stat.getVersion(), false);
}
} catch (KeeperException e) {
fail("Unexpected KeeperException!" + e);
} catch (InterruptedException e) {
fail("Unexpected InterruptedException!" + e);
}
}
public void close(){
try {
deleteNode(ZkStateReader.LIVE_NODES_ZKNODE + "/" + "node1");
zkClient.close();
} catch (InterruptedException e) {
//e.printStackTrace();
}
}
public void publishState(String coreName, String stateName, int numShards)
throws KeeperException, InterruptedException, IOException {
if (stateName == null) {
coreStates.remove(coreName);
ElectionContext ec = electionContext.remove(coreName);
if (ec != null) {
ec.cancelElection();
}
} else {
HashMap<String,String> coreProps = new HashMap<String,String>();
coreProps.put(ZkStateReader.STATE_PROP, stateName);
coreProps.put(ZkStateReader.NODE_NAME_PROP, nodeName);
coreProps.put(ZkStateReader.CORE_NAME_PROP, coreName);
coreProps.put(ZkStateReader.COLLECTION_PROP, collection);
coreProps.put(ZkStateReader.BASE_URL_PROP, "http://" + nodeName
+ "/solr/");
CoreState state = new CoreState(coreName, collection, coreProps,
numShards);
coreStates.remove(coreName);
coreStates.put(coreName, state);
}
final String statePath = Overseer.STATES_NODE + "/" + nodeName;
zkClient.setData(
statePath,
ZkStateReader.toJSON(coreStates.values().toArray(
new CoreState[coreStates.size()])), true);
for (int i = 0; i < 10; i++) {
String shardId = getShardId(coreName);
if (shardId != null) {
try {
zkClient.makePath("/collections/" + collection + "/leader_elect/"
+ shardId + "/election", true);
} catch (NodeExistsException nee) {}
ZkNodeProps props = new ZkNodeProps(ZkStateReader.BASE_URL_PROP,
"http://" + nodeName + "/solr/", ZkStateReader.NODE_NAME_PROP,
nodeName, ZkStateReader.CORE_NAME_PROP, coreName,
ZkStateReader.SHARD_ID_PROP, shardId,
ZkStateReader.COLLECTION_PROP, collection);
ShardLeaderElectionContextBase ctx = new ShardLeaderElectionContextBase(
elector, shardId, collection, nodeName + "_" + coreName, props,
zkStateReader);
elector.joinElection(ctx);
break;
}
Thread.sleep(200);
}
}
private String getShardId(final String coreName) {
Map<String,Slice> slices = zkStateReader.getCloudState().getSlices(
collection);
if (slices != null) {
for (Slice slice : slices.values()) {
if (slice.getShards().containsKey(nodeName + "_" + coreName))
;
return slice.getName();
}
}
return null;
}
}
@BeforeClass
public static void beforeClass() throws Exception {
System.setProperty("solrcloud.skip.autorecovery", "true");
initCore();
}
@AfterClass
public static void afterClass() throws Exception {
System.clearProperty("solrcloud.skip.autorecovery");
initCore();
}
@Test
public void testShardAssignment() throws Exception {
String zkDir = dataDir.getAbsolutePath() + File.separator
+ "zookeeper/server1/data";
ZkTestServer server = new ZkTestServer(zkDir);
ZkController zkController = null;
SolrZkClient zkClient = null;
try {
server.run();
AbstractZkTestCase.tryCleanSolrZkNode(server.getZkHost());
AbstractZkTestCase.makeSolrZkNode(server.getZkHost());
zkClient = new SolrZkClient(server.getZkAddress(), TIMEOUT);
zkClient.makePath(ZkStateReader.LIVE_NODES_ZKNODE, true);
ZkStateReader reader = new ZkStateReader(zkClient);
reader.createClusterStateWatchersAndUpdate();
System.setProperty(ZkStateReader.NUM_SHARDS_PROP, "3");
zkController = new ZkController(null, server.getZkAddress(), TIMEOUT, 10000,
"localhost", "8983", "solr", new CurrentCoreDescriptorProvider() {
@Override
public List<CoreDescriptor> getCurrentDescriptors() {
// do nothing
return null;
}
});
System.setProperty("bootstrap_confdir", getFile("solr/conf")
.getAbsolutePath());
final int numShards=6;
final String[] ids = new String[numShards];
for (int i = 0; i < numShards; i++) {
CloudDescriptor collection1Desc = new CloudDescriptor();
collection1Desc.setCollectionName("collection1");
CoreDescriptor desc1 = new CoreDescriptor(null, "core" + (i + 1), "");
desc1.setCloudDescriptor(collection1Desc);
zkController.preRegister(desc1);
ids[i] = zkController.register("core" + (i + 1), desc1);
}
assertEquals("shard1", ids[0]);
assertEquals("shard2", ids[1]);
assertEquals("shard3", ids[2]);
assertEquals("shard1", ids[3]);
assertEquals("shard2", ids[4]);
assertEquals("shard3", ids[5]);
waitForCollections(reader, "collection1");
//make sure leaders are in cloud state
assertNotNull(reader.getLeaderUrl("collection1", "shard1", 15000));
assertNotNull(reader.getLeaderUrl("collection1", "shard2", 15000));
assertNotNull(reader.getLeaderUrl("collection1", "shard3", 15000));
} finally {
System.clearProperty(ZkStateReader.NUM_SHARDS_PROP);
System.clearProperty("bootstrap_confdir");
if (DEBUG) {
if (zkController != null) {
zkClient.printLayoutToStdOut();
}
}
if (zkClient != null) {
zkClient.close();
}
if (zkController != null) {
zkController.close();
}
server.shutdown();
}
}
@Test
public void testShardAssignmentBigger() throws Exception {
String zkDir = dataDir.getAbsolutePath() + File.separator
+ "zookeeper/server1/data";
final int nodeCount = random.nextInt(50)+50; //how many simulated nodes (num of threads)
final int coreCount = random.nextInt(100)+100; //how many cores to register
final int sliceCount = random.nextInt(20)+1; //how many slices
ZkTestServer server = new ZkTestServer(zkDir);
SolrZkClient zkClient = null;
ZkStateReader reader = null;
final ZkController[] controllers = new ZkController[nodeCount];
final ExecutorService[] nodeExecutors = new ExecutorService[nodeCount];
try {
server.run();
AbstractZkTestCase.tryCleanSolrZkNode(server.getZkHost());
AbstractZkTestCase.makeSolrZkNode(server.getZkHost());
zkClient = new SolrZkClient(server.getZkAddress(), TIMEOUT);
zkClient.makePath(ZkStateReader.LIVE_NODES_ZKNODE, true);
reader = new ZkStateReader(zkClient);
reader.createClusterStateWatchersAndUpdate();
System.setProperty(ZkStateReader.NUM_SHARDS_PROP, Integer.valueOf(sliceCount).toString());
for (int i = 0; i < nodeCount; i++) {
controllers[i] = new ZkController(null, server.getZkAddress(), TIMEOUT, 10000,
"localhost", "898" + i, "solr", new CurrentCoreDescriptorProvider() {
@Override
public List<CoreDescriptor> getCurrentDescriptors() {
// do nothing
return null;
}
});
}
System.setProperty("bootstrap_confdir", getFile("solr/conf")
.getAbsolutePath());
for (int i = 0; i < nodeCount; i++) {
nodeExecutors[i] = Executors.newFixedThreadPool(1);
}
final String[] ids = new String[coreCount];
//register total of coreCount cores
for (int i = 0; i < coreCount; i++) {
final int slot = i;
Runnable coreStarter = new Runnable() {
@Override
public void run() {
final CloudDescriptor collection1Desc = new CloudDescriptor();
collection1Desc.setCollectionName("collection1");
final String coreName = "core" + slot;
final CoreDescriptor desc = new CoreDescriptor(null, coreName, "");
desc.setCloudDescriptor(collection1Desc);
try {
controllers[slot % nodeCount].preRegister(desc);
ids[slot] = controllers[slot % nodeCount]
.register(coreName, desc);
} catch (Throwable e) {
e.printStackTrace();
fail("register threw exception:" + e.getClass());
}
}
};
nodeExecutors[i % nodeCount].submit(coreStarter);
}
for (int i = 0; i < nodeCount; i++) {
nodeExecutors[i].shutdown();
}
for (int i = 0; i < nodeCount; i++) {
while (!nodeExecutors[i].awaitTermination(100, TimeUnit.MILLISECONDS));
}
// make sure all cores have been assigned a id in cloudstate
for (int i = 0; i < 40; i++) {
reader.updateCloudState(true);
CloudState state = reader.getCloudState();
Map<String,Slice> slices = state.getSlices("collection1");
int count = 0;
for (String name : slices.keySet()) {
count += slices.get(name).getShards().size();
}
if (coreCount == count) break;
Thread.sleep(200);
}
// make sure all cores have been returned a id
for (int i = 0; i < 90; i++) {
int assignedCount = 0;
for (int j = 0; j < coreCount; j++) {
if (ids[j] != null) {
assignedCount++;
}
}
if (coreCount == assignedCount) {
break;
}
Thread.sleep(500);
}
final HashMap<String, AtomicInteger> counters = new HashMap<String,AtomicInteger>();
for (int i = 1; i < sliceCount+1; i++) {
counters.put("shard" + i, new AtomicInteger());
}
for (int i = 0; i < coreCount; i++) {
final AtomicInteger ai = counters.get(ids[i]);
assertNotNull("could not find counter for shard:" + ids[i], ai);
ai.incrementAndGet();
}
for (String counter: counters.keySet()) {
int count = counters.get(counter).intValue();
int expectedCount = coreCount / sliceCount;
int min = expectedCount - 1;
int max = expectedCount + 1;
if (count < min || count > max) {
fail("Unevenly assigned shard ids, " + counter + " had " + count
+ ", expected: " + min + "-" + max);
}
}
//make sure leaders are in cloud state
for (int i = 0; i < sliceCount; i++) {
assertNotNull(reader.getLeaderUrl("collection1", "shard" + (i + 1)), 15000);
}
} finally {
System.clearProperty(ZkStateReader.NUM_SHARDS_PROP);
System.clearProperty("bootstrap_confdir");
if (DEBUG) {
if (controllers[0] != null) {
zkClient.printLayoutToStdOut();
}
}
if (zkClient != null) {
zkClient.close();
}
if (reader != null) {
reader.close();
}
for (int i = 0; i < controllers.length; i++)
if (controllers[i] != null) {
controllers[i].close();
}
server.shutdown();
for (int i = 0; i < nodeCount; i++) {
nodeExecutors[i].shutdownNow();
}
}
}
//wait until collections are available
private void waitForCollections(ZkStateReader stateReader, String... collections) throws InterruptedException, KeeperException {
int maxIterations = 100;
while (0 < maxIterations--) {
stateReader.updateCloudState(true);
final CloudState state = stateReader.getCloudState();
Set<String> availableCollections = state.getCollections();
int availableCount = 0;
for(String requiredCollection: collections) {
if(availableCollections.contains(requiredCollection)) {
availableCount++;
}
if(availableCount == collections.length) return;
Thread.sleep(50);
}
}
log.warn("Timeout waiting for collections: " + Arrays.asList(collections) + " state:" + stateReader.getCloudState());
}
@Test
public void testStateChange() throws Exception {
String zkDir = dataDir.getAbsolutePath() + File.separator
+ "zookeeper/server1/data";
ZkTestServer server = new ZkTestServer(zkDir);
SolrZkClient zkClient = null;
ZkStateReader reader = null;
SolrZkClient overseerClient = null;
try {
server.run();
zkClient = new SolrZkClient(server.getZkAddress(), TIMEOUT);
AbstractZkTestCase.tryCleanSolrZkNode(server.getZkHost());
AbstractZkTestCase.makeSolrZkNode(server.getZkHost());
zkClient.makePath("/live_nodes", true);
//live node
String nodePath = ZkStateReader.LIVE_NODES_ZKNODE + "/" + "node1";
zkClient.makePath(nodePath,CreateMode.EPHEMERAL, true);
reader = new ZkStateReader(zkClient);
reader.createClusterStateWatchersAndUpdate();
Overseer.createClientNodes(zkClient, "node1");
overseerClient = electNewOverseer(server.getZkAddress());
HashMap<String, String> coreProps = new HashMap<String,String>();
coreProps.put(ZkStateReader.BASE_URL_PROP, "http://127.0.0.1/solr");
coreProps.put(ZkStateReader.NODE_NAME_PROP, "node1");
coreProps.put(ZkStateReader.CORE_NAME_PROP, "core1");
coreProps.put(ZkStateReader.ROLES_PROP, "");
coreProps.put(ZkStateReader.STATE_PROP, ZkStateReader.RECOVERING);
CoreState state = new CoreState("core1", "collection1", coreProps, 2);
nodePath = "/node_states/node1";
try {
zkClient.makePath(nodePath, CreateMode.EPHEMERAL, true);
} catch (KeeperException ke) {
if(ke.code()!=Code.NODEEXISTS) {
throw ke;
}
}
zkClient.setData(nodePath, ZkStateReader.toJSON(new CoreState[]{state}), true);
waitForCollections(reader, "collection1");
assertEquals(reader.getCloudState().toString(), ZkStateReader.RECOVERING,
reader.getCloudState().getSlice("collection1", "shard1").getShards()
.get("node1_core1").get(ZkStateReader.STATE_PROP));
//publish node state (active)
coreProps.put(ZkStateReader.STATE_PROP, ZkStateReader.ACTIVE);
coreProps.put(ZkStateReader.SHARD_ID_PROP, "shard1");
state = new CoreState("core1", "collection1", coreProps, 2);
zkClient.setData(nodePath, ZkStateReader.toJSON(new CoreState[]{state}), true);
verifyStatus(reader, ZkStateReader.ACTIVE);
} finally {
if (zkClient != null) {
zkClient.close();
}
if (overseerClient != null) {
overseerClient.close();
}
if (reader != null) {
reader.close();
}
server.shutdown();
}
}
private void verifyStatus(ZkStateReader reader, String expectedState) throws InterruptedException {
int maxIterations = 100;
String coreState = null;
while(maxIterations-->0) {
Slice slice = reader.getCloudState().getSlice("collection1", "shard1");
if(slice!=null) {
coreState = slice.getShards().get("node1_core1").get(ZkStateReader.STATE_PROP);
if(coreState.equals(expectedState)) {
return;
}
}
Thread.sleep(50);
}
fail("Illegal state, was:" + coreState + " expected:" + expectedState + "cloudState:" + reader.getCloudState());
}
private void verifyShardLeader(ZkStateReader reader, String collection, String shard, String expectedCore) throws InterruptedException, KeeperException {
int maxIterations = 100;
while(maxIterations-->0) {
ZkNodeProps props = reader.getCloudState().getLeader(collection, shard);
if(props!=null) {
if(expectedCore.equals(props.get(ZkStateReader.CORE_NAME_PROP))) {
return;
}
}
Thread.sleep(100);
}
assertEquals("Unexpected shard leader coll:" + collection + " shard:" + shard, expectedCore, (reader.getCloudState().getLeader(collection, shard)!=null)?reader.getCloudState().getLeader(collection, shard).get(ZkStateReader.CORE_NAME_PROP):null);
}
@Test
public void testOverseerFailure() throws Exception {
String zkDir = dataDir.getAbsolutePath() + File.separator
+ "zookeeper/server1/data";
ZkTestServer server = new ZkTestServer(zkDir);
SolrZkClient controllerClient = null;
SolrZkClient overseerClient = null;
ZkStateReader reader = null;
MockZKController mockController = null;
try {
server.run();
controllerClient = new SolrZkClient(server.getZkAddress(), TIMEOUT);
AbstractZkTestCase.tryCleanSolrZkNode(server.getZkHost());
AbstractZkTestCase.makeSolrZkNode(server.getZkHost());
controllerClient.makePath(ZkStateReader.LIVE_NODES_ZKNODE, true);
reader = new ZkStateReader(controllerClient);
reader.createClusterStateWatchersAndUpdate();
mockController = new MockZKController(server.getZkAddress(), "node1", "collection1");
overseerClient = electNewOverseer(server.getZkAddress());
Thread.sleep(1000);
mockController.publishState("core1", ZkStateReader.RECOVERING, 1);
waitForCollections(reader, "collection1");
verifyStatus(reader, ZkStateReader.RECOVERING);
int version = getCloudStateVersion(controllerClient);
mockController.publishState("core1", ZkStateReader.ACTIVE, 1);
while(version == getCloudStateVersion(controllerClient));
verifyStatus(reader, ZkStateReader.ACTIVE);
version = getCloudStateVersion(controllerClient);
overseerClient.close();
Thread.sleep(1000); //wait for overseer to get killed
mockController.publishState("core1", ZkStateReader.RECOVERING, 1);
version = getCloudStateVersion(controllerClient);
overseerClient = electNewOverseer(server.getZkAddress());
while(version == getCloudStateVersion(controllerClient));
verifyStatus(reader, ZkStateReader.RECOVERING);
assertEquals("Live nodes count does not match", 1, reader.getCloudState()
.getLiveNodes().size());
assertEquals("Shard count does not match", 1, reader.getCloudState()
.getSlice("collection1", "shard1").getShards().size());
version = getCloudStateVersion(controllerClient);
mockController.publishState("core1", null,1);
while(version == getCloudStateVersion(controllerClient));
Thread.sleep(100);
assertEquals("Shard count does not match", 0, reader.getCloudState()
.getSlice("collection1", "shard1").getShards().size());
} finally {
if (mockController != null) {
mockController.close();
}
if (overseerClient != null) {
overseerClient.close();
}
if (controllerClient != null) {
controllerClient.close();
}
if (reader != null) {
reader.close();
}
server.shutdown();
}
}
private class OverseerRestarter implements Runnable{
SolrZkClient overseerClient = null;
public volatile boolean run = true;
private final String zkAddress;
public OverseerRestarter(String zkAddress) {
this.zkAddress = zkAddress;
}
@Override
public void run() {
try {
overseerClient = electNewOverseer(zkAddress);
} catch (Throwable t) {
//t.printStackTrace();
}
while (run) {
if(random.nextInt(20)==1){
try {
overseerClient.close();
overseerClient = electNewOverseer(zkAddress);
} catch (Throwable e) {
//e.printStackTrace();
}
}
try {
Thread.sleep(100);
} catch (Throwable e) {
//e.printStackTrace();
}
}
try {
overseerClient.close();
} catch (Throwable e) {
//e.printStackTrace();
}
}
}
@Test
public void testShardLeaderChange() throws Exception {
String zkDir = dataDir.getAbsolutePath() + File.separator
+ "zookeeper/server1/data";
final ZkTestServer server = new ZkTestServer(zkDir);
SolrZkClient controllerClient = null;
ZkStateReader reader = null;
MockZKController mockController = null;
MockZKController mockController2 = null;
OverseerRestarter killer = null;
Thread killerThread = null;
try {
server.run();
controllerClient = new SolrZkClient(server.getZkAddress(), TIMEOUT);
AbstractZkTestCase.tryCleanSolrZkNode(server.getZkHost());
AbstractZkTestCase.makeSolrZkNode(server.getZkHost());
controllerClient.makePath(ZkStateReader.LIVE_NODES_ZKNODE, true);
killer = new OverseerRestarter(server.getZkAddress());
killerThread = new Thread(killer);
killerThread.start();
reader = new ZkStateReader(controllerClient);
reader.createClusterStateWatchersAndUpdate();
for (int i = 0; i < 20; i++) {
mockController = new MockZKController(server.getZkAddress(), "node1", "collection1");
mockController.publishState("core1", "state1",1);
if(mockController2!=null) {
mockController2.close();
mockController2 = null;
}
mockController.publishState("core1", "state2",1);
mockController2 = new MockZKController(server.getZkAddress(), "node2", "collection1");
mockController.publishState("core1", "state1",1);
verifyShardLeader(reader, "collection1", "shard1", "core1");
mockController2.publishState("core4", "state2" ,1);
mockController.close();
mockController = null;
verifyShardLeader(reader, "collection1", "shard1", "core4");
}
} finally {
if (killer != null) {
killer.run = false;
if (killerThread != null) {
killerThread.join();
}
}
if (mockController != null) {
mockController.close();
}
if (mockController2 != null) {
mockController2.close();
}
if (controllerClient != null) {
controllerClient.close();
}
if (reader != null) {
reader.close();
}
server.shutdown();
}
}
@Test
public void testDoubleAssignment() throws Exception {
String zkDir = dataDir.getAbsolutePath() + File.separator
+ "zookeeper/server1/data";
ZkTestServer server = new ZkTestServer(zkDir);
SolrZkClient controllerClient = null;
SolrZkClient overseerClient = null;
ZkStateReader reader = null;
MockZKController mockController = null;
try {
server.run();
controllerClient = new SolrZkClient(server.getZkAddress(), TIMEOUT);
AbstractZkTestCase.tryCleanSolrZkNode(server.getZkHost());
AbstractZkTestCase.makeSolrZkNode(server.getZkHost());
controllerClient.makePath(ZkStateReader.LIVE_NODES_ZKNODE, true);
reader = new ZkStateReader(controllerClient);
reader.createClusterStateWatchersAndUpdate();
mockController = new MockZKController(server.getZkAddress(), "node1", "collection1");
overseerClient = electNewOverseer(server.getZkAddress());
mockController.publishState("core1", ZkStateReader.RECOVERING, 1);
waitForCollections(reader, "collection1");
verifyStatus(reader, ZkStateReader.RECOVERING);
mockController.close();
int version = getCloudStateVersion(controllerClient);
mockController = new MockZKController(server.getZkAddress(), "node1", "collection1");
mockController.publishState("core1", ZkStateReader.RECOVERING, 1);
while (version == getCloudStateVersion(controllerClient));
reader.updateCloudState(true);
CloudState state = reader.getCloudState();
int numFound = 0;
for (Map<String,Slice> collection : state.getCollectionStates().values()) {
for (Slice slice : collection.values()) {
if (slice.getShards().get("node1_core1") != null) {
numFound++;
}
}
}
assertEquals("Shard was found in more than 1 times in CloudState", 1,
numFound);
} finally {
if (overseerClient != null) {
overseerClient.close();
}
if (mockController != null) {
mockController.close();
}
if (controllerClient != null) {
controllerClient.close();
}
if (reader != null) {
reader.close();
}
server.shutdown();
}
}
@Test
public void testPlaceholders() throws Exception {
String zkDir = dataDir.getAbsolutePath() + File.separator
+ "zookeeper/server1/data";
ZkTestServer server = new ZkTestServer(zkDir);
SolrZkClient controllerClient = null;
SolrZkClient overseerClient = null;
ZkStateReader reader = null;
MockZKController mockController = null;
try {
server.run();
controllerClient = new SolrZkClient(server.getZkAddress(), TIMEOUT);
AbstractZkTestCase.tryCleanSolrZkNode(server.getZkHost());
AbstractZkTestCase.makeSolrZkNode(server.getZkHost());
controllerClient.makePath(ZkStateReader.LIVE_NODES_ZKNODE, true);
reader = new ZkStateReader(controllerClient);
reader.createClusterStateWatchersAndUpdate();
mockController = new MockZKController(server.getZkAddress(), "node1", "collection1");
overseerClient = electNewOverseer(server.getZkAddress());
mockController.publishState("core1", ZkStateReader.RECOVERING, 12);
waitForCollections(reader, "collection1");
assertEquals("Slicecount does not match", 12, reader.getCloudState().getSlices("collection1").size());
} finally {
if (overseerClient != null) {
overseerClient.close();
}
if (mockController != null) {
mockController.close();
}
if (controllerClient != null) {
controllerClient.close();
}
if (reader != null) {
reader.close();
}
server.shutdown();
}
}
private int getCloudStateVersion(SolrZkClient controllerClient)
throws KeeperException, InterruptedException {
return controllerClient.exists(ZkStateReader.CLUSTER_STATE, null, false).getVersion();
}
private SolrZkClient electNewOverseer(String address) throws InterruptedException,
TimeoutException, IOException, KeeperException {
SolrZkClient zkClient = new SolrZkClient(address, TIMEOUT);
ZkStateReader reader = new ZkStateReader(zkClient);
LeaderElector overseerElector = new LeaderElector(zkClient);
ElectionContext ec = new OverseerElectionContext(address.replaceAll("/", "_"), zkClient, reader);
overseerElector.setup(ec);
overseerElector.joinElection(ec);
return zkClient;
}
}