blob: 3db0f61a08ea075035caecb049cb7751d0a4f07e [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.solr.core;
import org.apache.commons.io.FileUtils;
import org.apache.solr.SolrTestCaseJ4;
import org.apache.solr.client.solrj.SolrServerException;
import org.apache.solr.client.solrj.embedded.JettySolrRunner;
import org.apache.solr.client.solrj.impl.HttpSolrServer;
import org.apache.solr.client.solrj.request.UpdateRequest;
import org.apache.solr.client.solrj.response.QueryResponse;
import org.apache.solr.client.solrj.response.UpdateResponse;
import org.apache.solr.common.SolrInputDocument;
import org.apache.solr.common.params.ModifiableSolrParams;
import org.junit.After;
import org.junit.AfterClass;
import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.Test;
import java.io.File;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.Locale;
import java.util.Map;
import java.util.Random;
import java.util.TreeMap;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
/**
* Incorporate the open/close stress tests into unit tests.
*/
public class OpenCloseCoreStressTest extends SolrTestCaseJ4 {
private final Object locker = new Object();
private int numCores = 20;
private Map<String, Long> coreCounts;
private List<String> coreNames;
static final int COMMIT_WITHIN = 5000;
final int indexingThreads = 15;
final int queryThreads = 15;
final int resetInterval = 30 * 60; // minutes to report then delete everything
long cumulativeDocs = 0;
String url;
JettySolrRunner jetty = null;
File solrHomeDirectory;
List<HttpSolrServer> indexingServers = new ArrayList<HttpSolrServer>(indexingThreads);
List<HttpSolrServer> queryServers = new ArrayList<HttpSolrServer>(queryThreads);
static String savedFactory;
// Keep the indexes from being randomly generated.
@BeforeClass
public static void beforeClass() {
savedFactory = System.getProperty("solr.DirectoryFactory");
System.setProperty("solr.directoryFactory", "org.apache.solr.core.MockFSDirectoryFactory");
}
@AfterClass
public static void afterClass() {
if (savedFactory == null) {
System.clearProperty("solr.directoryFactory");
} else {
System.setProperty("solr.directoryFactory", savedFactory);
}
}
@Before
public void setupServer() throws Exception {
coreCounts = new TreeMap<String, Long>();
coreNames = new ArrayList<String>();
cumulativeDocs = 0;
solrHomeDirectory = new File(TEMP_DIR, "OpenCloseCoreStressTest_");
FileUtils.deleteDirectory(solrHomeDirectory); // Insure that a failed test didn't leave something lying around.
jetty = new JettySolrRunner(solrHomeDirectory.getAbsolutePath(), "/solr", 0);
}
@After
public void tearDownServer() throws Exception {
if (jetty != null) jetty.stop();
FileUtils.deleteDirectory(solrHomeDirectory);
}
@Test
@Slow
public void test30SecondsOld() throws Exception {
doStress(30, true);
}
@Test
@Slow
public void test30SecondsNew() throws Exception {
doStress(30, false);
}
@Test
@Nightly
public void test10MinutesOld() throws Exception {
doStress(300, true);
}
@Test
@Nightly
public void test10MinutesNew() throws Exception {
doStress(300, false);
}
@Test
@Weekly
public void test1HourOld() throws Exception {
doStress(1800, true);
}
@Test
@Weekly
public void test1HourNew() throws Exception {
doStress(1800, false);
}
private void getServers() throws Exception {
jetty.start();
url = "http://127.0.0.1:" + jetty.getLocalPort() + "/solr/";
// Mostly to keep annoying logging messages from being sent out all the time.
for (int idx = 0; idx < indexingThreads; ++idx) {
HttpSolrServer server = new HttpSolrServer(url);
server.setDefaultMaxConnectionsPerHost(25);
server.setConnectionTimeout(30000);
server.setSoTimeout(30000);
indexingServers.add(server);
}
for (int idx = 0; idx < queryThreads; ++idx) {
HttpSolrServer server = new HttpSolrServer(url);
server.setDefaultMaxConnectionsPerHost(25);
server.setConnectionTimeout(30000);
server.setSoTimeout(30000);
queryServers.add(server);
}
}
// Unless things go _really_ well, stop after you have the directories set up.
private void doStress(int secondsToRun, boolean oldStyle) throws Exception {
makeCores(solrHomeDirectory, oldStyle);
//MUST start the server after the cores are made.
getServers();
try {
log.info("Starting indexing and querying");
int secondsRun = 0;
int secondsRemaining = secondsToRun;
do {
int cycleSeconds = Math.min(resetInterval, secondsRemaining);
log.info(String.format(Locale.ROOT, "\n\n\n\n\nStarting a %,d second cycle, seconds left: %,d. Seconds run so far: %,d.",
cycleSeconds, secondsRemaining, secondsRun));
Indexer idxer = new Indexer(this, url, indexingServers, indexingThreads, cycleSeconds, random());
Queries queries = new Queries(this, url, queryServers, queryThreads, random());
idxer.waitOnThreads();
queries.waitOnThreads();
secondsRemaining = Math.max(secondsRemaining - resetInterval, 0);
checkResults(queryServers.get(0), queries, idxer);
secondsRun += cycleSeconds;
if (secondsRemaining > 0) {
deleteAllDocuments(queryServers.get(0), queries);
}
} while (secondsRemaining > 0);
assertTrue("We didn't index any documents, somethings really messed up", cumulativeDocs > 0);
} catch (Exception e) {
e.printStackTrace();
fail("Caught unexpected exception");
}
}
private void makeCores(File home, boolean oldStyle) throws Exception {
File testSrcRoot = new File(SolrTestCaseJ4.TEST_HOME());
String srcSolrXml = "solr-stress-new.xml";
if (oldStyle) {
srcSolrXml = "solr-stress-old.xml";
}
FileUtils.copyFile(new File(testSrcRoot, srcSolrXml), new File(home, "solr.xml"));
// create directories in groups of 100 until you have enough.
for (int idx = 0; idx < numCores; ++idx) {
String coreName = String.format(Locale.ROOT, "%05d_core", idx);
makeCore(new File(home, coreName), testSrcRoot, oldStyle);
coreCounts.put(coreName, 0L);
coreNames.add(coreName);
}
}
private void makeCore(File coreDir, File testSrcRoot, boolean oldStyle) throws IOException {
File conf = new File(coreDir, "conf");
if (!conf.mkdirs()) log.warn("mkdirs returned false in makeCore... ignoring");
File testConf = new File(testSrcRoot, "collection1/conf");
FileUtils.copyFile(new File(testConf, "schema-tiny.xml"), new File(conf, "schema-tiny.xml"));
FileUtils.copyFile(new File(testConf, "solrconfig-minimal.xml"), new File(conf, "solrconfig-minimal.xml"));
FileUtils.copyFile(new File(testConf, "solrconfig.snippet.randomindexconfig.xml"),
new File(conf, "solrconfig.snippet.randomindexconfig.xml"));
if (!oldStyle) {
FileUtils.copyFile(new File(testSrcRoot, "conf/core.properties"), new File(coreDir, "core.properties"));
}
}
void deleteAllDocuments(HttpSolrServer server, Queries queries) {
log.info("Deleting data from last cycle, this may take a few minutes.");
for (String core : coreNames) {
try {
server.setBaseURL(url + core);
server.deleteByQuery("*:*");
server.optimize(true, true); // should be close to a no-op.
} catch (Exception e) {
e.printStackTrace();
}
}
// We're testing, after all. Let's be really sure things are as we expect.
log.info("Insuring all cores empty");
long foundDocs = 0;
for (String core : coreNames) {
try {
long found = queries.getCount(server, core);
assertEquals("Cores should be empty", found, 0L);
foundDocs += found;
} catch (Exception e) {
e.printStackTrace();
}
}
if (foundDocs > 0) {
log.warn("Found docs after purging done, this is bad.");
}
// Reset counters for another go-round
coreCounts.clear();
for (String core : coreNames) {
coreCounts.put(core, 0L);
}
}
private void checkResults(HttpSolrServer server, Queries queries, Indexer idxer) throws InterruptedException {
log.info("Checking if indexes have all the documents they should...");
long totalDocsFound = 0;
for (Map.Entry<String, Long> ent : coreCounts.entrySet()) {
server.setBaseURL(url + ent.getKey());
for (int idx = 0; idx < 3; ++idx) {
try {
server.commit(true, true);
break; // retry loop
} catch (Exception e) {
log.warn("Exception when committing core " + ent.getKey() + " " + e.getMessage());
Thread.sleep(100L);
}
}
long numFound = queries.getCount(server, ent.getKey());
totalDocsFound += numFound;
assertEquals(String.format(Locale.ROOT, "Core %s bad!", ent.getKey()), (long) ent.getValue(), numFound);
}
log.info(String.format(Locale.ROOT, "\n\nDocs indexed (cumulative, all cycles): %,d, total docs: %,d: Cycle stats: updates: %,d: qtimes: %,d",
Indexer.idUnique.get(), totalDocsFound, idxer.getAccumUpdates(), idxer.getAccumQtimes()));
cumulativeDocs += totalDocsFound;
}
String getRandomCore(Random random) {
return coreNames.get(Math.abs(random.nextInt()) % coreNames.size());
}
void incrementCoreCount(String core) {
synchronized (locker) {
coreCounts.put(core, coreCounts.get(core) + 1);
}
}
}
class Indexer {
static volatile long stopTime;
static AtomicInteger idUnique = new AtomicInteger(0);
static AtomicInteger errors = new AtomicInteger(0);
static AtomicInteger docsThisCycle = new AtomicInteger(0);
static AtomicLong qTimesAccum = new AtomicLong(0);
static AtomicInteger updateCounts = new AtomicInteger(0);
static volatile int lastCount;
static volatile long nextTime;
ArrayList<OneIndexer> _threads = new ArrayList<OneIndexer>();
public Indexer(OpenCloseCoreStressTest OCCST, String url, List<HttpSolrServer> servers, int numThreads, int secondsToRun, Random random) {
stopTime = System.currentTimeMillis() + (secondsToRun * 1000);
nextTime = System.currentTimeMillis() + 60000;
docsThisCycle.set(0);
qTimesAccum.set(0);
updateCounts.set(0);
for (int idx = 0; idx < numThreads; ++idx) {
OneIndexer one = new OneIndexer(OCCST, url, servers.get(idx), random.nextLong());
_threads.add(one);
one.start();
}
}
public void waitOnThreads() {
for (Thread thread : _threads) {
try {
thread.join();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
public long getAccumQtimes() {
return qTimesAccum.get();
}
public int getAccumUpdates() {
return updateCounts.get();
}
synchronized static void progress(int myId, String core) {
if (nextTime - System.currentTimeMillis() <= 0) {
SolrTestCaseJ4.log.info(String.format(Locale.ROOT, " s indexed: [run %,8d] [cycle %,8d] [last minute %,8d] Last core updated: %s. Seconds left in cycle %,4d",
myId, docsThisCycle.get(), myId - lastCount, core, stopTime - (System.currentTimeMillis() / 1000)));
lastCount = myId;
nextTime += (System.currentTimeMillis() / 1000) * 60;
}
}
}
class OneIndexer extends Thread {
private final OpenCloseCoreStressTest OCCST;
private final HttpSolrServer server;
private final String baseUrl;
private final Random random;
OneIndexer(OpenCloseCoreStressTest OCCST, String url, HttpSolrServer server, long seed) {
this.OCCST = OCCST;
this.server = server;
this.baseUrl = url;
this.random = new Random(seed);
}
@Override
public void run() {
SolrTestCaseJ4.log.info(String.format(Locale.ROOT, "Starting indexing thread: " + getId()));
while (Indexer.stopTime > System.currentTimeMillis()) {
int myId = Indexer.idUnique.incrementAndGet();
Indexer.docsThisCycle.incrementAndGet();
String core = OCCST.getRandomCore(random);
OCCST.incrementCoreCount(core);
Indexer.progress(myId, core);
for (int idx = 0; idx < 3; ++idx) {
SolrInputDocument doc = new SolrInputDocument();
doc.addField("id", "id" + Integer.toString(myId));
doc.addField("text", "text " + Integer.toString(myId));
UpdateRequest update = new UpdateRequest();
update.add(doc);
try {
server.setBaseURL(baseUrl + core);
UpdateResponse response = server.add(doc, OpenCloseCoreStressTest.COMMIT_WITHIN);
if (response.getStatus() != 0) {
SolrTestCaseJ4.log.warn("Failed to index a document to core " + core + " with status " + response.getStatus());
} else {
Indexer.qTimesAccum.addAndGet(response.getQTime());
Indexer.updateCounts.incrementAndGet();
break; // retry loop.
}
Thread.sleep(100L); // Let's not go crazy here.
} catch (Exception e) {
if (e instanceof InterruptedException) return;
Indexer.errors.incrementAndGet();
if (idx == 2) {
SolrTestCaseJ4.log.warn("Could not reach server while indexing for three tries, quitting " + e.getMessage());
} else {
SolrTestCaseJ4.log.info("Indexing thread " + Thread.currentThread().getId() + " swallowed one exception " + e.getMessage());
try {
Thread.sleep(500);
} catch (InterruptedException tex) {
return;
}
}
}
}
}
SolrTestCaseJ4.log.info("Leaving indexing thread " + getId());
}
}
class Queries {
static AtomicBoolean _keepon = new AtomicBoolean(true);
List<Thread> _threads = new ArrayList<Thread>();
static AtomicInteger _errors = new AtomicInteger(0);
String baseUrl;
public Queries(OpenCloseCoreStressTest OCCST, String url, List<HttpSolrServer> servers, int numThreads, Random random) {
baseUrl = url;
for (int idx = 0; idx < numThreads; ++idx) {
Thread one = new OneQuery(OCCST, url, servers.get(idx), random.nextLong());
_threads.add(one);
one.start();
}
}
public void waitOnThreads() {
Queries._keepon.set(false);
for (Thread thread : _threads) {
try {
thread.join();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
public long getCount(HttpSolrServer server, String core) {
ModifiableSolrParams params = new ModifiableSolrParams();
params.set("qt", "/select");
params.set("q", "*:*");
long numFound = 0;
server.setBaseURL(baseUrl + core);
try {
QueryResponse response = server.query(params);
numFound = response.getResults().getNumFound();
} catch (Exception e) {
e.printStackTrace();
}
return numFound;
}
}
class OneQuery extends Thread {
OpenCloseCoreStressTest OCCST;
private final HttpSolrServer server;
private final String baseUrl;
private final Random random;
OneQuery(OpenCloseCoreStressTest OCCST, String url, HttpSolrServer server, long seed) {
this.OCCST = OCCST;
this.server = server;
this.baseUrl = url;
this.random = new Random(seed);
}
@Override
public void run() {
SolrTestCaseJ4.log.info(String.format(Locale.ROOT, "Starting query thread: " + getId()));
while (Queries._keepon.get()) {
String core = OCCST.getRandomCore(random);
for (int idx = 0; idx < 3; ++idx) {
ModifiableSolrParams params = new ModifiableSolrParams();
params.set("qt", "/select");
params.set("q", "*:*");
try {
// sleep between 250ms and 10000 ms
Thread.sleep(100L); // Let's not go crazy here.
server.setBaseURL(baseUrl + core);
QueryResponse response = server.query(params);
if (response.getStatus() != 0) {
SolrTestCaseJ4.log.warn("Failed to query core " + core + " with status " + response.getStatus());
}
// Perhaps collect some stats here in future.
break; // retry loop
} catch (Exception e) {
if (e instanceof InterruptedException) return;
Queries._errors.incrementAndGet();
if (idx == 2) {
SolrTestCaseJ4.log.warn("Could not reach server while indexing for three tries, quitting " + e.getMessage());
} else {
SolrTestCaseJ4.log.info("Querying thread: " + Thread.currentThread().getId() + " swallowed exception: " + e.getMessage());
try {
Thread.sleep(500L);
} catch (InterruptedException tex) {
return;
}
}
}
}
}
SolrTestCaseJ4.log.info(String.format(Locale.ROOT, "Leaving query thread: " + getId()));
}
}