blob: dc48ab47c74df653428672a26ba92da585245ec7 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.solr.cloud;
import org.apache.lucene.util.LuceneTestCase;
import org.apache.lucene.util.LuceneTestCase.Slow;
import org.apache.solr.client.solrj.SolrClient;
import org.apache.solr.client.solrj.SolrQuery;
import org.apache.solr.client.solrj.SolrRequest;
import org.apache.solr.client.solrj.SolrServerException;
import org.apache.solr.client.solrj.embedded.JettySolrRunner;
import org.apache.solr.client.solrj.request.QueryRequest;
import org.apache.solr.client.solrj.request.UpdateRequest;
import org.apache.solr.common.SolrException;
import org.apache.solr.common.SolrInputDocument;
import org.apache.solr.common.cloud.Replica;
import org.apache.solr.common.params.CollectionParams.CollectionAction;
import org.apache.solr.common.params.ModifiableSolrParams;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.lang.invoke.MethodHandles;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.TimeUnit;
/**
* Test sync phase that occurs when Leader goes down and a new Leader is
* elected.
*/
@Slow
// MRM TODO: - bridge does not yet have some consistency checks here, finish and back in control client
@LuceneTestCase.Nightly
public class SyncSliceTest extends SolrCloudBridgeTestCase {
private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
private boolean success = false;
// @Override
public void distribTearDown() throws Exception {
if (!success) {
// printLayoutOnTearDown = true;
}
/// super.distribTearDown();
}
public SyncSliceTest() throws Exception {
super();
useFactory(null);
numJettys = TEST_NIGHTLY ? 7 : 4;
sliceCount = 1;
replicationFactor = numJettys;
// createControl = true;
}
@Test
public void test() throws Exception {
handle.clear();
handle.put("timestamp", SKIPVAL);
cluster.waitForActiveCollection(cluster.getSolrClient().getHttpClient(), COLLECTION, 5, TimeUnit.SECONDS, false, 1, numJettys, true, true);
List<JettySolrRunner> skipServers = new ArrayList<>();
int docId = 0;
indexDoc(skipServers, id, docId++, i1, 50, tlong, 50, t1, "to come to the aid of their country.");
indexDoc(skipServers, id, docId++, i1, 50, tlong, 50, t1, "old haven was blue.");
List<Replica> replicas = new ArrayList<>();
replicas.addAll(cloudClient.getZkStateReader().getClusterState().getCollection(COLLECTION).getSlice("s1").getReplicas());
skipServers.add(getJettyOnPort(getReplicaPort(replicas.get(0))));
indexDoc(skipServers, id, docId++, i1, 50, tlong, 50, t1, "but the song was fancy.");
skipServers.add(getJettyOnPort(getReplicaPort(replicas.get(1))));
indexDoc(skipServers, id, docId++, i1, 50, tlong, 50, t1, "under the moon and over the lake");
commit();
//d waitForRecoveriesToFinish(false);
// shard should be inconsistent
// String shardFailMessage = checkShardConsistency("shard1", true, false);
// assertNotNull(shardFailMessage);
ModifiableSolrParams params = new ModifiableSolrParams();
params.set("action", CollectionAction.SYNCSHARD.toString());
params.set("collection", COLLECTION);
params.set("shard", "s1");
SolrRequest request = new QueryRequest(params);
request.setPath("/admin/collections");
String baseUrl = replicas.get(1).getBaseUrl();
request.setBasePath(baseUrl);
baseUrl = baseUrl.substring(0, baseUrl.length() - "collection1".length());
// we only set the connect timeout, not so timeout
try (SolrClient baseClient = getClient(baseUrl)) {
baseClient.request(request);
}
// waitForThingsToLevelOut(15, TimeUnit.SECONDS);
// checkShardConsistency(false, true);
long cloudClientDocs = cloudClient.query(new SolrQuery("*:*")).getResults().getNumFound();
assertEquals(4, cloudClientDocs);
// kill the leader - new leader could have all the docs or be missing one
JettySolrRunner leaderJetty = getJettyOnPort(getReplicaPort(getShardLeader(COLLECTION, "s1", 10000)));
log.info("Stopping leader jetty {}", leaderJetty.getBaseUrl());
skipServers = getRandomOtherJetty(leaderJetty, null); // but not the leader
// this doc won't be on one node
indexDoc(skipServers, id, docId++, i1, 50, tlong, 50, t1, "to come to the aid of their country.");
commit();
leaderJetty.stop();
cloudClientDocs = cloudClient.query(new SolrQuery("*:*")).getResults().getNumFound();
assertEquals(5, cloudClientDocs);
JettySolrRunner deadJetty = leaderJetty;
// let's get the latest leader
int cnt = 0;
while (deadJetty == leaderJetty) {
// updateMappingsFromZk(this.jettys, this.clients);
try {
leaderJetty = getJettyOnPort(getReplicaPort(getShardLeader(COLLECTION, "s1", 1000)));
} catch (SolrException e) {
log.info("did not get leader", e);
}
if (deadJetty == leaderJetty) {
Thread.sleep(500);
}
if (cnt++ >= 30) {
fail("don't expect leader to be on the jetty we stopped deadJetty=" + deadJetty.getNodeName() + " leaderJetty=" + leaderJetty.getNodeName());
}
}
// bring back dead node
deadJetty.start(); // he is not the leader anymore
log.info("numJettys=" + numJettys);
cluster.waitForActiveCollection(cluster.getSolrClient().getHttpClient(), COLLECTION, 10, TimeUnit.SECONDS, false, 1, numJettys, true, true);
skipServers = getRandomOtherJetty(leaderJetty, deadJetty);
skipServers.addAll(getRandomOtherJetty(leaderJetty, deadJetty));
// skip list should be
// System.out.println("leader:" + leaderJetty.url);
// System.out.println("dead:" + deadJetty.url);
// System.out.println("skip list:" + skipServers);
// we are skipping 2 nodes
assertEquals(2, skipServers.size());
// more docs than can peer sync
for (int i = 0; i < 300; i++) {
indexDoc(skipServers, id, docId++, i1, 50, tlong, 50, t1, "to come to the aid of their country.");
}
commit();
// waitForRecoveriesToFinish(false);
// shard should be inconsistent
// String shardFailMessage = waitTillInconsistent();
// assertNotNull(
// "Test Setup Failure: shard1 should have just been set up to be inconsistent - but it's still consistent. Leader:"
// + leaderJetty.getBaseUrl() + " Dead Guy:" + deadJetty.getBaseUrl() + "skip list:" + skipServers, shardFailMessage);
//
// good place to test compareResults
if (controlClient != null) {
boolean shouldFail = CloudInspectUtil.compareResults(controlClient, cloudClient);
assertTrue("A test that compareResults is working correctly failed", shouldFail);
}
// kill the current leader
leaderJetty.stop();
// MRM TODO:
//waitForNoShardInconsistency();
//checkShardConsistency(true, true);
//cluster.getSolrClient().getZkStateReader().checkShardConsistency(COLLECTION);
success = true;
}
private String waitTillInconsistent() throws Exception, InterruptedException {
String shardFailMessage = null;
shardFailMessage = pollConsistency(shardFailMessage, 0);
shardFailMessage = pollConsistency(shardFailMessage, 3000);
shardFailMessage = pollConsistency(shardFailMessage, 5000);
shardFailMessage = pollConsistency(shardFailMessage, 15000);
return shardFailMessage;
}
private String pollConsistency(String shardFailMessage, int sleep)
throws InterruptedException, Exception {
try {
commit();
} catch (Throwable t) {
log.error("", t);
}
if (shardFailMessage == null) {
// try again
Thread.sleep(sleep);
// shardFailMessage = checkShardConsistency("shard1", true, false);
}
return shardFailMessage;
}
private List<JettySolrRunner> getRandomOtherJetty(JettySolrRunner leader, JettySolrRunner down) {
List<JettySolrRunner> skipServers = new ArrayList<>();
List<JettySolrRunner> candidates = new ArrayList<>();
candidates.addAll(cluster.getJettySolrRunners());
if (leader != null) {
candidates.remove(leader);
}
if (down != null) {
candidates.remove(down);
}
JettySolrRunner cjetty = candidates.get(random().nextInt(candidates.size()));
skipServers.add(cjetty);
return skipServers;
}
protected void indexDoc(List<JettySolrRunner> skipServers, Object... fields) throws IOException,
SolrServerException {
SolrInputDocument doc = new SolrInputDocument();
addFields(doc, fields);
addFields(doc, "rnd_b", true);
if (controlClient != null) controlClient.add(doc);
UpdateRequest ureq = new UpdateRequest();
ureq.add(doc);
ModifiableSolrParams params = new ModifiableSolrParams();
for (JettySolrRunner skip : skipServers) {
params.add("test.distrib.skip.servers", skip.getBaseUrl() + "/");
}
ureq.setParams(params);
ureq.process(cloudClient);
}
}