blob: 1e0514f529e7668649e013bd30ef6ce9e7a43b73 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.solr.cloud;
import org.apache.lucene.util.LuceneTestCase.Slow;
import org.apache.solr.client.solrj.SolrQuery;
import org.apache.solr.client.solrj.SolrRequest;
import org.apache.solr.client.solrj.SolrServerException;
import org.apache.solr.client.solrj.impl.HttpSolrClient;
import org.apache.solr.client.solrj.request.QueryRequest;
import org.apache.solr.client.solrj.request.UpdateRequest;
import org.apache.solr.common.SolrInputDocument;
import org.apache.solr.common.cloud.ClusterState;
import org.apache.solr.common.cloud.DocCollection;
import org.apache.solr.common.cloud.Replica;
import org.apache.solr.common.cloud.Slice;
import org.apache.solr.common.cloud.ZkStateReader;
import org.apache.solr.common.params.CollectionParams.CollectionAction;
import org.apache.solr.common.params.ModifiableSolrParams;
import org.junit.Test;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
/**
* Test sync phase that occurs when Leader goes down and a new Leader is
* elected.
*/
@Slow
public class SyncSliceTest extends AbstractFullDistribZkTestBase {
private boolean success = false;
@Override
public void distribTearDown() throws Exception {
if (!success) {
printLayoutOnTearDown = true;
}
super.distribTearDown();
}
public SyncSliceTest() {
super();
sliceCount = 1;
fixShardCount(TEST_NIGHTLY ? 7 : 4);
}
@Test
public void test() throws Exception {
handle.clear();
handle.put("timestamp", SKIPVAL);
waitForThingsToLevelOut(30);
del("*:*");
List<CloudJettyRunner> skipServers = new ArrayList<>();
int docId = 0;
indexDoc(skipServers, id, docId++, i1, 50, tlong, 50, t1,
"to come to the aid of their country.");
indexDoc(skipServers, id, docId++, i1, 50, tlong, 50, t1,
"old haven was blue.");
skipServers.add(shardToJetty.get("shard1").get(1));
indexDoc(skipServers, id, docId++, i1, 50, tlong, 50, t1,
"but the song was fancy.");
skipServers.add(shardToJetty.get("shard1").get(2));
indexDoc(skipServers, id,docId++, i1, 50, tlong, 50, t1,
"under the moon and over the lake");
commit();
waitForRecoveriesToFinish(false);
// shard should be inconsistent
String shardFailMessage = checkShardConsistency("shard1", true, false);
assertNotNull(shardFailMessage);
ModifiableSolrParams params = new ModifiableSolrParams();
params.set("action", CollectionAction.SYNCSHARD.toString());
params.set("collection", "collection1");
params.set("shard", "shard1");
@SuppressWarnings({"rawtypes"})
SolrRequest request = new QueryRequest(params);
request.setPath("/admin/collections");
String baseUrl = ((HttpSolrClient) shardToJetty.get("shard1").get(2).client.solrClient)
.getBaseURL();
baseUrl = baseUrl.substring(0, baseUrl.length() - "collection1".length());
// we only set the connect timeout, not so timeout
try (HttpSolrClient baseClient = getHttpSolrClient(baseUrl, 30000)) {
baseClient.request(request);
}
waitForThingsToLevelOut(15);
checkShardConsistency(false, true);
long cloudClientDocs = cloudClient.query(new SolrQuery("*:*")).getResults().getNumFound();
assertEquals(4, cloudClientDocs);
// kill the leader - new leader could have all the docs or be missing one
CloudJettyRunner leaderJetty = shardToLeaderJetty.get("shard1");
skipServers = getRandomOtherJetty(leaderJetty, null); // but not the leader
// this doc won't be on one node
indexDoc(skipServers, id, docId++, i1, 50, tlong, 50, t1,
"to come to the aid of their country.");
commit();
Set<CloudJettyRunner> jetties = new HashSet<>();
jetties.addAll(shardToJetty.get("shard1"));
jetties.remove(leaderJetty);
assertEquals(getShardCount() - 1, jetties.size());
leaderJetty.jetty.stop();
Thread.sleep(3000);
waitForNoShardInconsistency();
Thread.sleep(1000);
checkShardConsistency(false, true);
cloudClientDocs = cloudClient.query(new SolrQuery("*:*")).getResults().getNumFound();
assertEquals(5, cloudClientDocs);
CloudJettyRunner deadJetty = leaderJetty;
// let's get the latest leader
while (deadJetty == leaderJetty) {
updateMappingsFromZk(this.jettys, this.clients);
leaderJetty = shardToLeaderJetty.get("shard1");
}
// bring back dead node
deadJetty.jetty.start(); // he is not the leader anymore
waitTillAllNodesActive();
skipServers = getRandomOtherJetty(leaderJetty, deadJetty);
skipServers.addAll( getRandomOtherJetty(leaderJetty, deadJetty));
// skip list should be
// System.out.println("leader:" + leaderJetty.url);
// System.out.println("dead:" + deadJetty.url);
// System.out.println("skip list:" + skipServers);
// we are skipping 2 nodes
assertEquals(2, skipServers.size());
// more docs than can peer sync
for (int i = 0; i < 300; i++) {
indexDoc(skipServers, id, docId++, i1, 50, tlong, 50, t1,
"to come to the aid of their country.");
}
commit();
Thread.sleep(1000);
waitForRecoveriesToFinish(false);
// shard should be inconsistent
shardFailMessage = waitTillInconsistent();
assertNotNull(
"Test Setup Failure: shard1 should have just been set up to be inconsistent - but it's still consistent. Leader:"
+ leaderJetty.url + " Dead Guy:" + deadJetty.url + "skip list:" + skipServers, shardFailMessage);
// good place to test compareResults
boolean shouldFail = CloudInspectUtil.compareResults(controlClient, cloudClient);
assertTrue("A test that compareResults is working correctly failed", shouldFail);
jetties = new HashSet<>();
jetties.addAll(shardToJetty.get("shard1"));
jetties.remove(leaderJetty);
assertEquals(getShardCount() - 1, jetties.size());
// kill the current leader
leaderJetty.jetty.stop();
waitForNoShardInconsistency();
checkShardConsistency(true, true);
success = true;
}
private void waitTillAllNodesActive() throws Exception {
for (int i = 0; i < 60; i++) {
Thread.sleep(3000);
ZkStateReader zkStateReader = cloudClient.getZkStateReader();
ClusterState clusterState = zkStateReader.getClusterState();
DocCollection collection1 = clusterState.getCollection("collection1");
Slice slice = collection1.getSlice("shard1");
Collection<Replica> replicas = slice.getReplicas();
boolean allActive = true;
for (Replica replica : replicas) {
if (!clusterState.liveNodesContain(replica.getNodeName()) || replica.getState() != Replica.State.ACTIVE) {
allActive = false;
break;
}
}
if (allActive) {
return;
}
}
printLayout();
fail("timeout waiting to see all nodes active");
}
private String waitTillInconsistent() throws Exception, InterruptedException {
String shardFailMessage = null;
shardFailMessage = pollConsistency(shardFailMessage, 0);
shardFailMessage = pollConsistency(shardFailMessage, 3000);
shardFailMessage = pollConsistency(shardFailMessage, 5000);
shardFailMessage = pollConsistency(shardFailMessage, 15000);
return shardFailMessage;
}
private String pollConsistency(String shardFailMessage, int sleep)
throws InterruptedException, Exception {
try {
commit();
} catch (Throwable t) {
t.printStackTrace();
}
if (shardFailMessage == null) {
// try again
Thread.sleep(sleep);
shardFailMessage = checkShardConsistency("shard1", true, false);
}
return shardFailMessage;
}
private List<CloudJettyRunner> getRandomOtherJetty(CloudJettyRunner leader, CloudJettyRunner down) {
List<CloudJettyRunner> skipServers = new ArrayList<>();
List<CloudJettyRunner> candidates = new ArrayList<>();
candidates.addAll(shardToJetty.get("shard1"));
if (leader != null) {
candidates.remove(leader);
}
if (down != null) {
candidates.remove(down);
}
CloudJettyRunner cjetty = candidates.get(random().nextInt(candidates.size()));
skipServers.add(cjetty);
return skipServers;
}
protected void indexDoc(List<CloudJettyRunner> skipServers, Object... fields) throws IOException,
SolrServerException {
SolrInputDocument doc = new SolrInputDocument();
addFields(doc, fields);
addFields(doc, "rnd_b", true);
controlClient.add(doc);
UpdateRequest ureq = new UpdateRequest();
ureq.add(doc);
ModifiableSolrParams params = new ModifiableSolrParams();
for (CloudJettyRunner skip : skipServers) {
params.add("test.distrib.skip.servers", skip.url + "/");
}
ureq.setParams(params);
ureq.process(cloudClient);
}
// skip the randoms - they can deadlock...
@Override
protected void indexr(Object... fields) throws Exception {
SolrInputDocument doc = new SolrInputDocument();
addFields(doc, fields);
addFields(doc, "rnd_b", true);
indexDoc(doc);
}
}