blob: e981167b6ac2632937efcef95ae6a281a34e8a10 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.solr.cloud;
import java.io.IOException;
import java.lang.invoke.MethodHandles;
import java.util.*;
import java.util.concurrent.TimeUnit;
import org.apache.lucene.util.LuceneTestCase;
import org.apache.solr.client.solrj.SolrRequest;
import org.apache.solr.client.solrj.SolrServerException;
import org.apache.solr.client.solrj.embedded.JettySolrRunner;
import org.apache.solr.client.solrj.request.CollectionAdminRequest;
import org.apache.solr.client.solrj.request.QueryRequest;
import org.apache.solr.client.solrj.response.CollectionAdminResponse;
import org.apache.solr.client.solrj.response.QueryResponse;
import org.apache.solr.common.cloud.DocCollection;
import org.apache.solr.common.cloud.Replica;
import org.apache.solr.common.cloud.Slice;
import org.apache.solr.common.params.CollectionParams;
import org.apache.solr.common.params.ModifiableSolrParams;
import org.apache.solr.common.util.TimeSource;
import org.apache.solr.util.TimeOut;
import org.apache.zookeeper.KeeperException;
import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@LuceneTestCase.Slow
public class TestRebalanceLeaders extends SolrCloudTestCase {
private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
private static final String COLLECTION_NAME = "TestColl";
private static int numNodes;
private static int numShards;
private static int numReplicas;
private static boolean useAdminToSetProps = false;
@BeforeClass
public static void setupCluster() throws Exception {
numNodes = random().nextInt(4) + 4;
numShards = random().nextInt(3) + 3;
numReplicas = random().nextInt(2) + 2;
useAdminToSetProps = random().nextBoolean();
configureCluster(numNodes)
.addConfig(COLLECTION_NAME, configset("cloud-minimal"))
.configure();
CollectionAdminResponse resp = CollectionAdminRequest.createCollection(COLLECTION_NAME, COLLECTION_NAME,
numShards, numReplicas, 0, 0)
.setMaxShardsPerNode((numShards * numReplicas) / numNodes + 1)
.process(cluster.getSolrClient());
assertEquals("Admin request failed; ", 0, resp.getStatus());
cluster.waitForActiveCollection(COLLECTION_NAME, numShards, numShards * numReplicas);
}
@Before
public void removeAllProperties() throws KeeperException, InterruptedException {
forceUpdateCollectionStatus();
DocCollection docCollection = cluster.getSolrClient().getZkStateReader().getClusterState().getCollection(COLLECTION_NAME);
for (Slice slice : docCollection.getSlices()) {
for (Replica rep : slice.getReplicas()) {
rep.getProperties().forEach((key, value) -> {
if (key.startsWith("property.")) {
try {
delProp(slice, rep, key);
} catch (IOException | SolrServerException e) {
fail("Caught unexpected exception in @Before " + e.getMessage());
}
}
});
}
}
}
int timeoutMs = 60000;
// test that setting an arbitrary "slice unique" property un-sets the property if it's on another replica in the
// slice. This is testing when the property is set on an _individual_ replica whereas testBalancePropertySliceUnique
// tests whether changing an individual _replica_ un-sets the property on other replicas _in that slice_.
//
// NOTE: There were significant problems because at one point the code implicitly defined
// shardUnique=true for the special property preferredLeader. That was removed at one point so we're explicitly
// testing that as well.
@Test
public void testSetArbitraryPropertySliceUnique() throws IOException, SolrServerException, InterruptedException, KeeperException {
// Check both special (preferredLeader) and something arbitrary.
doTestSetArbitraryPropertySliceUnique("foo" + random().nextInt(1_000_000));
removeAllProperties();
doTestSetArbitraryPropertySliceUnique("preferredleader");
}
// Test that automatically distributing a slice unique property un-sets that property if it's in any other replica
// on that slice.
// This is different than the test above. The test above sets individual properties on individual nodes. This one
// relies on Solr to pick which replicas to set the property on
@Test
public void testBalancePropertySliceUnique() throws KeeperException, InterruptedException, IOException, SolrServerException {
// Check both cases of "special" property preferred(Ll)eader
doTestBalancePropertySliceUnique("foo" + random().nextInt(1_000_000));
removeAllProperties();
doTestBalancePropertySliceUnique("preferredleader");
}
// We've moved on from a property being tested, we need to check if rebalancing the leaders actually chantges the
// leader appropriately.
@Test
public void testRebalanceLeaders() throws Exception {
// First let's unbalance the preferredLeader property, do all the leaders get reassigned properly?
concentrateProp("preferredLeader");
sendRebalanceCommand();
checkPreferredsAreLeaders();
// Now follow up by evenly distributing the property as well as possible.
doTestBalancePropertySliceUnique("preferredLeader");
sendRebalanceCommand();
checkPreferredsAreLeaders();
// Now check the condition we saw "in the wild" where you could not rebalance properly when Jetty was restarted.
concentratePropByRestartingJettys();
sendRebalanceCommand();
checkPreferredsAreLeaders();
}
// Insure that the property is set on only one replica per slice when changing a unique property on an individual
// replica.
private void doTestSetArbitraryPropertySliceUnique(String propIn) throws InterruptedException, KeeperException, IOException, SolrServerException {
final String prop = (random().nextBoolean()) ? propIn : propIn.toUpperCase(Locale.ROOT);
// First set the property in some replica in some slice
forceUpdateCollectionStatus();
DocCollection docCollection = cluster.getSolrClient().getZkStateReader().getClusterState().getCollection(COLLECTION_NAME);
Slice[] slices = docCollection.getSlices().toArray(new Slice[0]);
Slice slice = slices[random().nextInt(slices.length)];
// Bounce around a bit setting this property and insure it's only set in one replica.
Replica[] reps = slice.getReplicas().toArray(new Replica[0]);
for (int idx = 0; idx < 4; ++idx) {
Replica rep = reps[random().nextInt(reps.length)];
// Set the property on a particular replica
setProp(slice, rep, prop);
TimeOut timeout = new TimeOut(timeoutMs, TimeUnit.MILLISECONDS, TimeSource.NANO_TIME);
long count = 0;
boolean rightRep = false;
Slice modSlice;
DocCollection modColl = null; // keeps IDE happy
// insure that no other replica in that slice has the property when we return.
while (timeout.hasTimedOut() == false) {
forceUpdateCollectionStatus();
modColl = cluster.getSolrClient().getZkStateReader().getClusterState().getCollection(COLLECTION_NAME);
modSlice = modColl.getSlice(slice.getName());
rightRep = modSlice.getReplica(rep.getName()).getBool("property." + prop.toLowerCase(Locale.ROOT), false);
count = modSlice.getReplicas().stream().filter(thisRep -> thisRep.getBool("property." + prop.toLowerCase(Locale.ROOT), false)).count();
if (count == 1 && rightRep) {
break;
}
TimeUnit.MILLISECONDS.sleep(100);
}
if (count != 1 || rightRep == false) {
fail("The property " + prop + " was not uniquely distributed in slice " + slice.getName()
+ " " + modColl.toString());
}
}
}
// Fail if we the replicas with the preferredLeader property are _not_ also the leaders.
private void checkPreferredsAreLeaders() throws InterruptedException, KeeperException {
// Make sure that the shard unique are where you expect.
TimeOut timeout = new TimeOut(timeoutMs, TimeUnit.MILLISECONDS, TimeSource.NANO_TIME);
while (timeout.hasTimedOut() == false) {
if (checkPreferredsAreLeaders(false)) {
// Ok, all preferreds are leaders. Just for Let's also get the election queue and guarantee that every
// live replica is in the queue and none are repeated.
checkElectionQueues();
return;
}
TimeUnit.MILLISECONDS.sleep(100);
}
log.error("Leaders are not all preferres {}", cluster.getSolrClient().getZkStateReader().getClusterState().getCollection(COLLECTION_NAME));
// Show the errors
checkPreferredsAreLeaders(true);
}
// Do all active nodes in each slice appear exactly once in the slice's leader election queue?
// Since we assert that the number of live replicas is the same size as the leader election queue, we only
// have to compare one way.
private void checkElectionQueues() throws KeeperException, InterruptedException {
DocCollection docCollection = cluster.getSolrClient().getZkStateReader().getClusterState().getCollection(COLLECTION_NAME);
Set<String> liveNodes = cluster.getSolrClient().getZkStateReader().getClusterState().getLiveNodes();
for (Slice slice : docCollection.getSlices()) {
Set<Replica> liveReplicas = new HashSet<>();
slice.getReplicas().forEach(replica -> {
if (replica.isActive(liveNodes)) {
liveReplicas.add(replica);
}
});
checkOneQueue(docCollection, slice, liveReplicas);
}
}
// Helper method to check one leader election queue's consistency.
private void checkOneQueue(DocCollection coll, Slice slice, Set<Replica> liveReplicas) throws KeeperException, InterruptedException {
List<String> leaderQueue = cluster.getSolrClient().getZkStateReader().getZkClient().getChildren("/collections/" + COLLECTION_NAME +
"/leader_elect/" + slice.getName() + "/election", null, true);
if (leaderQueue.size() != liveReplicas.size()) {
log.error("One or more replicas is missing from the leader election queue! Slice {}, election queue: {}, collection: {}"
, slice.getName(), leaderQueue, coll);
fail("One or more replicas is missing from the leader election queue");
}
// Check that each election node has a corresponding live replica.
for (String electionNode : leaderQueue) {
String replica = LeaderElector.getNodeName(electionNode);
if (slice.getReplica(replica) == null) {
log.error("Replica {} is not in the election queue: {}", replica, leaderQueue);
fail("Replica is not in the election queue!");
}
}
}
// Just an encapsulation for checkPreferredsAreLeaders to make returning easier.
// the doAsserts var is to actually print the problem and fail the test if the condition is not met.
private boolean checkPreferredsAreLeaders(boolean doAsserts) throws KeeperException, InterruptedException {
forceUpdateCollectionStatus();
DocCollection docCollection = cluster.getSolrClient().getZkStateReader().getClusterState().getCollection(COLLECTION_NAME);
for (Slice slice : docCollection.getSlices()) {
for (Replica rep : slice.getReplicas()) {
if (rep.getBool("property.preferredleader", false)) {
boolean isLeader = rep.getBool("leader", false);
if (doAsserts) {
assertTrue("PreferredLeader should be the leader: ", isLeader);
} else if (isLeader == false) {
return false;
}
}
}
}
return true;
}
// Arbitrarily send the rebalance command either with the SolrJ interface or with an HTTP request.
private void sendRebalanceCommand() throws SolrServerException, InterruptedException, IOException {
if (random().nextBoolean()) {
rebalanceLeaderUsingSolrJAPI();
} else {
rebalanceLeaderUsingStandardRequest();
}
}
// Helper method to make sure the property is _unbalanced_ first, then it gets properly re-assigned with the
// BALANCESHARDUNIQUE command.
private void doTestBalancePropertySliceUnique(String propIn) throws InterruptedException, IOException, KeeperException, SolrServerException {
final String prop = (random().nextBoolean()) ? propIn : propIn.toUpperCase(Locale.ROOT);
// Concentrate the properties on as few replicas a possible
concentrateProp(prop);
// issue the BALANCESHARDUNIQUE command
rebalancePropAndCheck(prop);
// Verify that there are no more than one replica with the property per shard.
verifyPropUniquePerShard(prop);
// Verify that the property is reasonably evenly distributed
verifyPropCorrectlyDistributed(prop);
}
private void verifyPropCorrectlyDistributed(String prop) throws KeeperException, InterruptedException {
TimeOut timeout = new TimeOut(timeoutMs, TimeUnit.MILLISECONDS, TimeSource.NANO_TIME);
String propLC = prop.toLowerCase(Locale.ROOT);
DocCollection docCollection = null;
while (timeout.hasTimedOut() == false) {
forceUpdateCollectionStatus();
docCollection = cluster.getSolrClient().getZkStateReader().getClusterState().getCollection(COLLECTION_NAME);
int maxPropCount = Integer.MAX_VALUE;
int minPropCount = Integer.MIN_VALUE;
for (Slice slice : docCollection.getSlices()) {
int repCount = 0;
for (Replica rep : slice.getReplicas()) {
if (rep.getBool("property." + propLC, false)) {
repCount++;
}
}
maxPropCount = Math.max(maxPropCount, repCount);
minPropCount = Math.min(minPropCount, repCount);
}
if (Math.abs(maxPropCount - minPropCount) < 2) return;
}
log.error("Property {} is not distributed evenly. {}", prop, docCollection);
fail("Property is not distributed evenly " + prop);
}
// Used when we concentrate the leader on a few nodes.
private void verifyPropDistributedAsExpected(Map<String, String> expectedShardReplicaMap, String prop) throws InterruptedException, KeeperException {
// Make sure that the shard unique are where you expect.
TimeOut timeout = new TimeOut(timeoutMs, TimeUnit.MILLISECONDS, TimeSource.NANO_TIME);
String propLC = prop.toLowerCase(Locale.ROOT);
boolean failure = false;
DocCollection docCollection = null;
while (timeout.hasTimedOut() == false) {
forceUpdateCollectionStatus();
docCollection = cluster.getSolrClient().getZkStateReader().getClusterState().getCollection(COLLECTION_NAME);
failure = false;
for (Map.Entry<String, String> ent : expectedShardReplicaMap.entrySet()) {
Replica rep = docCollection.getSlice(ent.getKey()).getReplica(ent.getValue());
if (rep.getBool("property." + propLC, false) == false) {
failure = true;
}
}
if (failure == false) {
return;
}
TimeUnit.MILLISECONDS.sleep(100);
}
fail(prop + " properties are not on the expected replicas: " + docCollection.toString()
+ System.lineSeparator() + "Expected " + expectedShardReplicaMap.toString());
}
// Just check that the property is distributed as expectecd. This does _not_ rebalance the leaders
private void rebalancePropAndCheck(String prop) throws IOException, SolrServerException, InterruptedException, KeeperException {
if (random().nextBoolean()) {
rebalancePropUsingSolrJAPI(prop);
} else {
rebalancePropUsingStandardRequest(prop);
}
}
private void rebalanceLeaderUsingSolrJAPI() throws IOException, SolrServerException, InterruptedException {
CollectionAdminResponse resp = CollectionAdminRequest
.rebalanceLeaders(COLLECTION_NAME)
.process(cluster.getSolrClient());
assertTrue("All leaders should have been verified", resp.getResponse().get("Summary").toString().contains("Success"));
assertEquals("Admin request failed; ", 0, resp.getStatus());
}
private void rebalanceLeaderUsingStandardRequest() throws IOException, SolrServerException {
ModifiableSolrParams params = new ModifiableSolrParams();
params.set("action", CollectionParams.CollectionAction.REBALANCELEADERS.toString());
params.set("collection", COLLECTION_NAME);
QueryRequest request = new QueryRequest(params);
request.setPath("/admin/collections");
QueryResponse resp = request.process(cluster.getSolrClient());
assertTrue("All leaders should have been verified", resp.getResponse().get("Summary").toString().contains("Success"));
assertEquals("Call to rebalanceLeaders failed ", 0, resp.getStatus());
}
private void rebalancePropUsingSolrJAPI(String prop) throws IOException, SolrServerException, InterruptedException {
// Don't set the value, that should be done automatically.
CollectionAdminResponse resp;
if (prop.toLowerCase(Locale.ROOT).contains("preferredleader")) {
resp = CollectionAdminRequest
.balanceReplicaProperty(COLLECTION_NAME, prop)
.process(cluster.getSolrClient());
} else {
resp = CollectionAdminRequest
.balanceReplicaProperty(COLLECTION_NAME, prop)
.setShardUnique(true)
.process(cluster.getSolrClient());
}
assertEquals("Admin request failed; ", 0, resp.getStatus());
}
private void rebalancePropUsingStandardRequest(String prop) throws IOException, SolrServerException {
ModifiableSolrParams params = new ModifiableSolrParams();
params.set("action", CollectionParams.CollectionAction.BALANCESHARDUNIQUE.toString());
params.set("property", prop);
params.set("collection", COLLECTION_NAME);
if (prop.toLowerCase(Locale.ROOT).contains("preferredleader") == false) {
params.set("shardUnique", true);
}
QueryRequest request = new QueryRequest(params);
request.setPath("/admin/collections");
QueryResponse resp = request.process(cluster.getSolrClient());
assertEquals("Call to rebalanceLeaders failed ", 0, resp.getStatus());
}
// This important. I've (Erick Erickson) run across a situation where the "standard request" causes failures, but
// never the Admin request. So let's test both all the time for a given test.
//
// This sets an _individual_ replica to have the property, not collection-wide
private void setProp(Slice slice, Replica rep, String prop) throws IOException, SolrServerException {
if (useAdminToSetProps) {
setPropWithAdminRequest(slice, rep, prop);
} else {
setPropWithStandardRequest(slice, rep, prop);
}
}
void setPropWithStandardRequest(Slice slice, Replica rep, String prop) throws IOException, SolrServerException {
ModifiableSolrParams params = new ModifiableSolrParams();
params.set("action", CollectionParams.CollectionAction.ADDREPLICAPROP.toString());
params.set("collection", COLLECTION_NAME);
params.set("shard", slice.getName());
params.set("replica", rep.getName());
params.set("property", prop);
params.set("property.value", "true");
// Test to insure that implicit shardUnique is added for preferredLeader.
if (prop.toLowerCase(Locale.ROOT).equals("preferredleader") == false) {
params.set("shardUnique", "true");
}
@SuppressWarnings({"rawtypes"})
SolrRequest request = new QueryRequest(params);
request.setPath("/admin/collections");
cluster.getSolrClient().request(request);
String propLC = prop.toLowerCase(Locale.ROOT);
waitForState("Expecting property '" + prop + "'to appear on replica " + rep.getName(), COLLECTION_NAME,
(n, c) -> "true".equals(c.getReplica(rep.getName()).getProperty(propLC)));
}
void setPropWithAdminRequest(Slice slice, Replica rep, String prop) throws IOException, SolrServerException {
boolean setUnique = (prop.toLowerCase(Locale.ROOT).equals("preferredleader") == false);
CollectionAdminRequest.AddReplicaProp addProp =
CollectionAdminRequest.addReplicaProperty(COLLECTION_NAME, slice.getName(), rep.getName(), prop, "true");
if (setUnique) {
addProp.setShardUnique(true);
}
CollectionAdminResponse resp = addProp.process(cluster.getSolrClient());
assertEquals(0, resp.getStatus());
String propLC = prop.toLowerCase(Locale.ROOT);
waitForState("Expecting property '" + prop + "'to appear on replica " + rep.getName(), COLLECTION_NAME,
(n, c) -> "true".equals(c.getReplica(rep.getName()).getProperty(propLC)));
}
private void delProp(Slice slice, Replica rep, String prop) throws IOException, SolrServerException {
String propLC = prop.toLowerCase(Locale.ROOT);
CollectionAdminResponse resp = CollectionAdminRequest.deleteReplicaProperty(COLLECTION_NAME, slice.getName(), rep.getName(), propLC)
.process(cluster.getSolrClient());
assertEquals("Admin request failed; ", 0, resp.getStatus());
waitForState("Expecting property '" + prop + "' to be removed from replica " + rep.getName(), COLLECTION_NAME,
(n, c) -> c.getReplica(rep.getName()).getProperty(prop) == null);
}
// Intentionally un-balance the property to insure that BALANCESHARDUNIQUE does its job. There was an odd case
// where rebalancing didn't work very well if the Solr nodes were stopped and restarted that worked perfectly
// when if the nodes were _not_ restarted in the test. So we have to test that too.
private void concentratePropByRestartingJettys() throws Exception {
List<JettySolrRunner> jettys = new ArrayList<>(cluster.getJettySolrRunners());
Collections.shuffle(jettys, random());
jettys.remove(random().nextInt(jettys.size()));
// Now we have a list of jettys, and there is one missing. Stop all of the remaining jettys, then start them again
// to concentrate the leaders. It's not necessary that all shards have a leader.
for (JettySolrRunner jetty : jettys) {
cluster.stopJettySolrRunner(jetty);
cluster.waitForJettyToStop(jetty);
}
checkReplicasInactive(jettys);
for (int idx = 0; idx < jettys.size(); ++idx) {
cluster.startJettySolrRunner(jettys.get(idx));
}
cluster.waitForAllNodes(60);
// the nodes are present, but are all replica active?
checkAllReplicasActive();
}
// while banging my nead against a wall, I put a lot of force refresh statements in. Want to leave them in
// but have this be a no-op so if we start to get failures, we can re-enable with minimal effort.
private void forceUpdateCollectionStatus() throws KeeperException, InterruptedException {
// cluster.getSolrClient().getZkStateReader().forceUpdateCollection(COLLECTION_NAME);
}
// Since we have to restart jettys, we don't want to try rebalancing etc. until we're sure all jettys that should
// be up are up and all replicas are active.
private void checkReplicasInactive(List<JettySolrRunner> downJettys) throws KeeperException, InterruptedException {
TimeOut timeout = new TimeOut(timeoutMs, TimeUnit.MILLISECONDS, TimeSource.NANO_TIME);
DocCollection docCollection = null;
Set<String> liveNodes = null;
Set<String> downJettyNodes = new TreeSet<>();
for (JettySolrRunner jetty : downJettys) {
downJettyNodes.add(jetty.getBaseUrl().getHost() + ":" + jetty.getBaseUrl().getPort() + "_solr");
}
while (timeout.hasTimedOut() == false) {
forceUpdateCollectionStatus();
docCollection = cluster.getSolrClient().getZkStateReader().getClusterState().getCollection(COLLECTION_NAME);
liveNodes = cluster.getSolrClient().getZkStateReader().getClusterState().getLiveNodes();
boolean expectedInactive = true;
for (Slice slice : docCollection.getSlices()) {
for (Replica rep : slice.getReplicas()) {
if (downJettyNodes.contains(rep.getNodeName()) == false) {
continue; // We are on a live node
}
// A replica on an allegedly down node is reported as active.
if (rep.isActive(liveNodes)) {
expectedInactive = false;
}
}
}
if (expectedInactive) {
return;
}
TimeUnit.MILLISECONDS.sleep(100);
}
fail("timed out waiting for all replicas to become inactive: livenodes: " + liveNodes +
" Collection state: " + docCollection.toString());
}
// We need to wait around until all replicas are active before expecting rebalancing or distributing shard-unique
// properties to work.
private void checkAllReplicasActive() throws KeeperException, InterruptedException {
TimeOut timeout = new TimeOut(timeoutMs, TimeUnit.MILLISECONDS, TimeSource.NANO_TIME);
while (timeout.hasTimedOut() == false) {
forceUpdateCollectionStatus();
DocCollection docCollection = cluster.getSolrClient().getZkStateReader().getClusterState().getCollection(COLLECTION_NAME);
Set<String> liveNodes = cluster.getSolrClient().getZkStateReader().getClusterState().getLiveNodes();
boolean allActive = true;
for (Slice slice : docCollection.getSlices()) {
for (Replica rep : slice.getReplicas()) {
if (rep.isActive(liveNodes) == false) {
allActive = false;
}
}
}
if (allActive) {
return;
}
TimeUnit.MILLISECONDS.sleep(100);
}
fail("timed out waiting for all replicas to become active");
}
// use a simple heuristic to put as many replicas with the property on as few nodes as possible. The point is that
// then we can execute BALANCESHARDUNIQUE and be sure it worked correctly
private void concentrateProp(String prop) throws KeeperException, InterruptedException, IOException, SolrServerException {
// find all the live nodes
// for each slice, assign the leader to the first replica that is in the lowest position on live_nodes
List<String> liveNodes = new ArrayList<>(cluster.getSolrClient().getZkStateReader().getClusterState().getLiveNodes());
Collections.shuffle(liveNodes, random());
Map<String, String> uniquePropMap = new TreeMap<>();
forceUpdateCollectionStatus();
DocCollection docCollection = cluster.getSolrClient().getZkStateReader().getClusterState().getCollection(COLLECTION_NAME);
for (Slice slice : docCollection.getSlices()) {
Replica changedRep = null;
int livePos = Integer.MAX_VALUE;
for (Replica rep : slice.getReplicas()) {
int pos = liveNodes.indexOf(rep.getNodeName());
if (pos >= 0 && pos < livePos) {
livePos = pos;
changedRep = rep;
}
}
if (livePos == Integer.MAX_VALUE) {
fail("Invalid state! We should have a replica to add the property to! " + docCollection.toString());
}
uniquePropMap.put(slice.getName(), changedRep.getName());
// Now set the property on the "lowest" node in live_nodes.
setProp(slice, changedRep, prop);
}
verifyPropDistributedAsExpected(uniquePropMap, prop);
}
// make sure that the property in question is unique per shard.
private Map<String, String> verifyPropUniquePerShard(String prop) throws InterruptedException, KeeperException {
Map<String, String> uniquePropMaps = new TreeMap<>();
TimeOut timeout = new TimeOut(timeoutMs, TimeUnit.MILLISECONDS, TimeSource.NANO_TIME);
while (timeout.hasTimedOut() == false) {
uniquePropMaps.clear();
if (checkdUniquePropPerShard(uniquePropMaps, prop)) {
return uniquePropMaps;
}
TimeUnit.MILLISECONDS.sleep(100);
}
fail("There should be exactly one replica with value " + prop + " set to true per shard: "
+ cluster.getSolrClient().getZkStateReader().getClusterState().getCollection(COLLECTION_NAME).toString());
return null; // keeps IDE happy.
}
// return true if every shard has exactly one replica with the unique property set to "true"
private boolean checkdUniquePropPerShard(Map<String, String> uniques, String prop) throws KeeperException, InterruptedException {
forceUpdateCollectionStatus();
DocCollection docCollection = cluster.getSolrClient().getZkStateReader().getClusterState().getCollection(COLLECTION_NAME);
for (Slice slice : docCollection.getSlices()) {
int propfCount = 0;
for (Replica rep : slice.getReplicas()) {
if (rep.getBool("property." + prop.toLowerCase(Locale.ROOT), false)) {
propfCount++;
uniques.put(slice.getName(), rep.getName());
}
}
if (1 != propfCount) {
return false;
}
}
return true;
}
}