blob: 8558bc567a20b817fed4c96fb39dc47c15c8b038 [file] [log] [blame]
package org.apache.solr.cloud;
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
import java.net.ConnectException;
import java.util.ArrayList;
import java.util.List;
import org.apache.lucene.util.LuceneTestCase.BadApple;
import org.apache.lucene.util.LuceneTestCase.Slow;
import org.apache.http.client.HttpClient;
import org.apache.solr.client.solrj.SolrQuery;
import org.apache.solr.client.solrj.SolrServer;
import org.apache.solr.client.solrj.impl.ConcurrentUpdateSolrServer;
import org.apache.solr.client.solrj.impl.HttpClientUtil;
import org.apache.solr.client.solrj.impl.HttpSolrServer;
import org.apache.solr.common.SolrInputDocument;
import org.apache.solr.common.cloud.ZkStateReader;
import org.apache.solr.core.Diagnostics;
import org.apache.solr.update.SolrCmdDistributor;
import org.junit.After;
import org.junit.AfterClass;
import org.junit.Before;
import org.junit.BeforeClass;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@Slow
public class ChaosMonkeyNothingIsSafeTest extends AbstractFullDistribZkTestBase {
public static Logger log = LoggerFactory.getLogger(ChaosMonkeyNothingIsSafeTest.class);
private static final Integer RUN_LENGTH = Integer.parseInt(System.getProperty("solr.tests.cloud.cm.runlength", "-1"));
@BeforeClass
public static void beforeSuperClass() {
SolrCmdDistributor.testing_errorHook = new Diagnostics.Callable() {
@Override
public void call(Object... data) {
SolrCmdDistributor.Request sreq = (SolrCmdDistributor.Request)data[1];
if (sreq.exception == null) return;
if (sreq.exception.getMessage().contains("Timeout")) {
Diagnostics.logThreadDumps("REQUESTING THREAD DUMP DUE TO TIMEOUT: " + sreq.exception.getMessage());
}
}
};
}
@AfterClass
public static void afterSuperClass() {
SolrCmdDistributor.testing_errorHook = null;
}
@Before
@Override
public void setUp() throws Exception {
super.setUp();
// can help to hide this when testing and looking at logs
//ignoreException("shard update error");
System.setProperty("numShards", Integer.toString(sliceCount));
useFactory("solr.StandardDirectoryFactory");
}
@Override
@After
public void tearDown() throws Exception {
System.clearProperty("numShards");
super.tearDown();
resetExceptionIgnores();
}
public ChaosMonkeyNothingIsSafeTest() {
super();
sliceCount = Integer.parseInt(System.getProperty("solr.tests.cloud.cm.slicecount", "2"));
shardCount = Integer.parseInt(System.getProperty("solr.tests.cloud.cm.shardcount", "7"));
}
@Override
public void doTest() throws Exception {
boolean testsSuccesful = false;
try {
handle.clear();
handle.put("QTime", SKIPVAL);
handle.put("timestamp", SKIPVAL);
ZkStateReader zkStateReader = cloudClient.getZkStateReader();
// make sure we have leaders for each shard
for (int j = 1; j < sliceCount; j++) {
zkStateReader.getLeaderRetry(DEFAULT_COLLECTION, "shard" + j, 10000);
} // make sure we again have leaders for each shard
waitForRecoveriesToFinish(false);
// we cannot do delete by query
// as it's not supported for recovery
del("*:*");
List<StopableThread> threads = new ArrayList<StopableThread>();
int threadCount = 1;
int i = 0;
for (i = 0; i < threadCount; i++) {
StopableIndexingThread indexThread = new StopableIndexingThread(
(i+1) * 50000, true);
threads.add(indexThread);
indexThread.start();
}
threadCount = 1;
i = 0;
for (i = 0; i < threadCount; i++) {
StopableSearchThread searchThread = new StopableSearchThread();
threads.add(searchThread);
searchThread.start();
}
// TODO: only do this sometimes so that we can sometimes compare against control
boolean runFullThrottle = random().nextBoolean();
if (runFullThrottle) {
FullThrottleStopableIndexingThread ftIndexThread = new FullThrottleStopableIndexingThread(
clients, (i+1) * 50000, true);
threads.add(ftIndexThread);
ftIndexThread.start();
}
chaosMonkey.startTheMonkey(true, 10000);
long runLength;
if (RUN_LENGTH != -1) {
runLength = RUN_LENGTH;
} else {
int[] runTimes = new int[] {5000,6000,10000,15000,15000,30000,30000,45000,90000,120000};
runLength = runTimes[random().nextInt(runTimes.length - 1)];
}
try {
Thread.sleep(runLength);
} finally {
chaosMonkey.stopTheMonkey();
}
for (StopableThread indexThread : threads) {
indexThread.safeStop();
}
// wait for stop...
for (StopableThread indexThread : threads) {
indexThread.join();
}
// we expect full throttle fails, but cloud client should not easily fail
// but it's allowed to fail and sometimes does, so commented out for now
// for (StopableThread indexThread : threads) {
// if (indexThread instanceof StopableIndexingThread && !(indexThread instanceof FullThrottleStopableIndexingThread)) {
// assertEquals(0, ((StopableIndexingThread) indexThread).getFails());
// }
// }
// try and wait for any replications and what not to finish...
Thread.sleep(2000);
// wait until there are no recoveries...
waitForThingsToLevelOut(Integer.MAX_VALUE);//Math.round((runLength / 1000.0f / 3.0f)));
// make sure we again have leaders for each shard
for (int j = 1; j < sliceCount; j++) {
zkStateReader.getLeaderRetry(DEFAULT_COLLECTION, "shard" + j, 30000);
}
commit();
// TODO: assert we didnt kill everyone
zkStateReader.updateClusterState(true);
assertTrue(zkStateReader.getClusterState().getLiveNodes().size() > 0);
// full throttle thread can
// have request fails
checkShardConsistency(!runFullThrottle, true);
long ctrlDocs = controlClient.query(new SolrQuery("*:*")).getResults()
.getNumFound();
// ensure we have added more than 0 docs
long cloudClientDocs = cloudClient.query(new SolrQuery("*:*"))
.getResults().getNumFound();
assertTrue("Found " + ctrlDocs + " control docs", cloudClientDocs > 0);
if (VERBOSE) System.out.println("control docs:"
+ controlClient.query(new SolrQuery("*:*")).getResults()
.getNumFound() + "\n\n");
testsSuccesful = true;
} finally {
if (!testsSuccesful) {
printLayout();
}
}
}
class FullThrottleStopableIndexingThread extends StopableIndexingThread {
private HttpClient httpClient = HttpClientUtil.createClient(null);
private volatile boolean stop = false;
int clientIndex = 0;
private ConcurrentUpdateSolrServer suss;
private List<SolrServer> clients;
public FullThrottleStopableIndexingThread(List<SolrServer> clients,
int startI, boolean doDeletes) {
super(startI, doDeletes);
setName("FullThrottleStopableIndexingThread");
setDaemon(true);
this.clients = clients;
HttpClientUtil.setConnectionTimeout(httpClient, 15000);
HttpClientUtil.setSoTimeout(httpClient, 15000);
suss = new ConcurrentUpdateSolrServer(
((HttpSolrServer) clients.get(0)).getBaseURL(), httpClient, 8,
2) {
@Override
public void handleError(Throwable ex) {
log.warn("suss error", ex);
}
};
}
@Override
public void run() {
int i = startI;
int numDeletes = 0;
int numAdds = 0;
while (true && !stop) {
++i;
if (doDeletes && random().nextBoolean() && deletes.size() > 0) {
Integer delete = deletes.remove(0);
try {
numDeletes++;
suss.deleteById(Integer.toString(delete));
} catch (Exception e) {
changeUrlOnError(e);
//System.err.println("REQUEST FAILED:");
//e.printStackTrace();
fails.incrementAndGet();
}
}
try {
numAdds++;
if (numAdds > 4000)
continue;
SolrInputDocument doc = getDoc(
id,
i,
i1,
50,
tlong,
50,
t1,
"Saxon heptarchies that used to rip around so in old times and raise Cain. My, you ought to seen old Henry the Eight when he was in bloom. He WAS a blossom. He used to marry a new wife every day, and chop off her head next morning. And he would do it just as indifferent as if ");
suss.add(doc);
} catch (Exception e) {
changeUrlOnError(e);
//System.err.println("REQUEST FAILED:");
//e.printStackTrace();
fails.incrementAndGet();
}
if (doDeletes && random().nextBoolean()) {
deletes.add(i);
}
}
System.err.println("FT added docs:" + numAdds + " with " + fails + " fails" + " deletes:" + numDeletes);
}
private void changeUrlOnError(Exception e) {
if (e instanceof ConnectException) {
clientIndex++;
if (clientIndex > clients.size() - 1) {
clientIndex = 0;
}
suss.shutdownNow();
suss = new ConcurrentUpdateSolrServer(
((HttpSolrServer) clients.get(clientIndex)).getBaseURL(),
httpClient, 30, 3) {
@Override
public void handleError(Throwable ex) {
log.warn("suss error", ex);
}
};
}
}
@Override
public void safeStop() {
stop = true;
suss.shutdownNow();
httpClient.getConnectionManager().shutdown();
}
@Override
public int getFails() {
return fails.get();
}
};
// skip the randoms - they can deadlock...
@Override
protected void indexr(Object... fields) throws Exception {
SolrInputDocument doc = getDoc(fields);
indexDoc(doc);
}
}