blob: 480282ed98604a65d9a3e15263ed28184fbb1f98 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.solr.handler.admin;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.TimeUnit;
import org.apache.solr.SolrTestCaseJ4;
import org.apache.solr.client.solrj.SolrClient;
import org.apache.solr.client.solrj.SolrServerException;
import org.apache.solr.client.solrj.io.Tuple;
import org.apache.solr.client.solrj.io.stream.SolrStream;
import org.apache.solr.client.solrj.io.stream.TupleStream;
import org.apache.solr.client.solrj.request.CollectionAdminRequest;
import org.apache.solr.client.solrj.response.QueryResponse;
import org.apache.solr.cloud.MiniSolrCloudCluster;
import org.apache.solr.common.params.SolrParams;
import org.apache.solr.common.util.TimeSource;
import org.apache.solr.handler.TestSQLHandler;
import org.apache.solr.util.TimeOut;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
public class DaemonStreamApiTest extends SolrTestCaseJ4 {
private MiniSolrCloudCluster cluster;
private static final String SOURCE_COLL = "sourceColl";
private static final String TARGET_COLL = "targetColl";
private static final String CHECKPOINT_COLL = "checkpointColl";
private static final String DAEMON_ROOT = "daemon";
private static final String CONF_NAME = "conf";
private static final String DAEMON_OP = "DaemonOp";
// We want 2-5 daemons. Choose one of them to start/stop/kill to catch any off-by-one or other bookeeping errors.
final int numDaemons = random().nextInt(3) + 2;
String daemonOfInterest;
List<String> daemonNames = new ArrayList<>();
private String url;
@Override
@Before
public void setUp() throws Exception {
super.setUp();
cluster = new MiniSolrCloudCluster(1, createTempDir(), buildJettyConfig("/solr"));
url = cluster.getJettySolrRunners().get(0).getBaseUrl().toString() + "/" + CHECKPOINT_COLL;
cluster.uploadConfigSet(configset("cloud-minimal"), CONF_NAME);
// create a single shard, single replica collection. This is necessary until SOLR-13245 since the commands
// don't look in all replicas.
CollectionAdminRequest.createCollection(SOURCE_COLL, CONF_NAME, 1, 1)
.setMaxShardsPerNode(1)
.process(cluster.getSolrClient());
CollectionAdminRequest.createCollection(TARGET_COLL, CONF_NAME, 1, 1)
.setMaxShardsPerNode(1)
.process(cluster.getSolrClient());
CollectionAdminRequest.createCollection(CHECKPOINT_COLL, CONF_NAME, 1, 1)
.setMaxShardsPerNode(1)
.process(cluster.getSolrClient());
for (int idx = 0; idx < numDaemons; ++idx) {
String name = DAEMON_ROOT + idx;
daemonNames.add(name);
}
daemonOfInterest = daemonNames.get(random().nextInt(numDaemons));
}
@Override
@After
public void tearDown() throws Exception {
if (null != cluster) {
cluster.shutdown();
cluster = null;
}
super.tearDown();
}
@Test
public void testAPIs() throws IOException, SolrServerException, InterruptedException {
checkCmdsNoDaemon(daemonOfInterest); // test no daemon defined
// Now create all our daemons.
for (String name : daemonNames) {
createDaemon(DAEMON_DEF.replace("DAEMON_NAME", name), name);
}
List<Tuple> tuples = getTuples(TestSQLHandler.mapParams("qt", "/stream", "action", "list"));
assertEquals("Should have all daemons listed", numDaemons, tuples.size());
for (int idx = 0; idx < numDaemons; ++idx) {
assertEquals("Daemon should be running ", tuples.get(idx).getString("id"), daemonNames.get(idx));
}
// Are all the daemons in a good state?
for (String daemon : daemonNames) {
checkAlive(daemon);
}
// We shouldn't be able to open a daemon twice without closing., leads to thread leeks.
Tuple tupleOfInterest = getTupleOfInterest(TestSQLHandler.mapParams("qt", "/stream", "action", "start", "id", daemonOfInterest)
, DAEMON_OP);
assertTrue("Should not open twice without closing",
tupleOfInterest.getString(DAEMON_OP).contains("There is already an open daemon named"));
// Try stopping and check return.
tupleOfInterest = getTupleOfInterest(TestSQLHandler.mapParams("qt", "/stream", "action", "stop", "id", daemonOfInterest),
DAEMON_OP);
assertTrue("Should have been able to stop the daemon",
tupleOfInterest.getString(DAEMON_OP).contains(daemonOfInterest + " stopped"));
checkStopped();
// Are all the daemons alive? NOTE: a stopped daemon is still there, but in a TERMINATED state
for (String daemon : daemonNames) {
if (daemon.equals(daemonOfInterest) == false) {
checkAlive(daemon);
}
}
// Try starting and check return.
tupleOfInterest = getTupleOfInterest(TestSQLHandler.mapParams("qt", "/stream", "action", "start", "id", daemonOfInterest),
DAEMON_OP);
assertTrue("Should have been able to start the daemon",
tupleOfInterest.getString(DAEMON_OP).contains(daemonOfInterest + " started"));
// Are all the daemons alive?
for (String daemon : daemonNames) {
checkAlive(daemon);
}
// Try killing a daemon, it should be removed from lists.
tupleOfInterest = getTupleOfInterest(TestSQLHandler.mapParams("qt", "/stream", "action", "kill", "id", daemonOfInterest),
DAEMON_OP);
assertTrue("Daemon should have been killed",
tupleOfInterest.getString(DAEMON_OP).contains(daemonOfInterest + " killed"));
// Loop for a bit, waiting for the daemon to be removed from the list of possible entries.
checkDaemonKilled(daemonOfInterest);
// Should not be able to start a killed daemon
tupleOfInterest = getTupleOfInterest(TestSQLHandler.mapParams("qt", "/stream", "action", "start", "id", daemonOfInterest),
DAEMON_OP);
assertTrue("Daemon should not be found",
tupleOfInterest.getString(DAEMON_OP).contains(daemonOfInterest + " not found"));
// Should not be able to sop a killed daemon
tupleOfInterest = getTupleOfInterest(TestSQLHandler.mapParams("qt", "/stream", "action", "stop", "id", daemonOfInterest),
DAEMON_OP);
assertTrue("Daemon should not be found",
tupleOfInterest.getString(DAEMON_OP).contains(daemonOfInterest + " not found"));
// Should not be able to kill a killed daemon
tupleOfInterest = getTupleOfInterest(TestSQLHandler.mapParams("qt", "/stream", "action", "kill", "id", daemonOfInterest),
DAEMON_OP);
assertTrue("Daemon should not be found",
tupleOfInterest.getString(DAEMON_OP).contains(daemonOfInterest + " not found"));
// Let's bring the killed daemon back and see if it returns in our lists. Use the method that loops a bit to check
// in case there's a delay.
createDaemon(DAEMON_DEF.replace("DAEMON_NAME", daemonOfInterest), daemonOfInterest);
checkAlive(daemonOfInterest);
// Now kill them all so the threads disappear.
for (String daemon : daemonNames) {
getTuples(TestSQLHandler.mapParams("qt", "/stream", "action", "kill", "id", daemon));
checkDaemonKilled(daemon);
}
}
// There can be some delay while threads stabilize, so we need to loop;
private void checkAlive(String daemonName) throws InterruptedException, IOException {
TimeOut timeout = new TimeOut(10, TimeUnit.SECONDS, TimeSource.NANO_TIME);
while (timeout.hasTimedOut() == false) {
Tuple tuple = getTupleOfInterest(TestSQLHandler.mapParams("qt", "/stream", "action", "list"),
daemonName);
String state = tuple.getString("state");
if (state.equals("RUNNABLE") || state.equals("WAITING") || state.equals("TIMED_WAITING")) {
return;
}
TimeUnit.MILLISECONDS.sleep(100);
}
fail("State for daemon '" + daemonName + "' did not become RUNNABLE, WAITING or TIMED_WAITING in 10 seconds");
}
// There can be some delay while threads stabilize, so we need to loop. Evenutally, the statu of a stopped
// thread should be "TERMINATED"
private void checkStopped() throws InterruptedException, IOException {
TimeOut timeout = new TimeOut(10, TimeUnit.SECONDS, TimeSource.NANO_TIME);
while (timeout.hasTimedOut() == false) {
Tuple tuple = getTupleOfInterest(TestSQLHandler.mapParams("qt", "/stream", "action", "list"),
daemonOfInterest);
if (tuple.getString("state").equals("TERMINATED")) {
return;
}
TimeUnit.MILLISECONDS.sleep(100);
}
fail("State for daemon '" + daemonOfInterest + "' did not become TERMINATED in 10 seconds");
}
private void checkDaemonKilled(String daemon) throws IOException, InterruptedException {
TimeOut timeout = new TimeOut(10, TimeUnit.SECONDS, TimeSource.NANO_TIME);
while (timeout.hasTimedOut() == false) {
List<Tuple> tuples = getTuples(TestSQLHandler.mapParams("qt", "/stream", "action", "list"));
Boolean foundIt = false;
for (Tuple tuple : tuples) {
if (tuple.get("id").equals(daemon)) {
foundIt = true;
}
}
if (foundIt == false) return;
TimeUnit.MILLISECONDS.sleep(100);
}
fail("'" + daemonOfInterest + "' did not disappear in 10 seconds");
}
private void createDaemon(String daemonDef, String errMsg) throws IOException, SolrServerException {
SolrClient client = cluster.getSolrClient();
// create a daemon
QueryResponse resp = client.query(CHECKPOINT_COLL, TestSQLHandler.mapParams("expr", daemonDef, "qt", "/stream"));
assertEquals(errMsg, 0, resp.getStatus());
// This should close and replace the current daemon and NOT leak threads.
resp = client.query(CHECKPOINT_COLL, TestSQLHandler.mapParams("expr", daemonDef, "qt", "/stream"));
assertEquals(errMsg, 0, resp.getStatus());
}
private void checkCmdsNoDaemon(String daemonName) throws IOException {
List<Tuple> tuples = getTuples(TestSQLHandler.mapParams("qt", "/stream", "action", "list"));
assertEquals("List should be empty", 0, tuples.size());
Tuple tupleOfInterest = getTupleOfInterest(TestSQLHandler.mapParams("qt", "/stream", "action", "start", "id", daemonName),
"DaemonOp");
assertTrue("Start for daemon should not be found", tupleOfInterest.getString("DaemonOp").contains("not found on"));
tupleOfInterest = getTupleOfInterest(TestSQLHandler.mapParams("qt", "/stream", "action", "stop", "id", daemonName),
"DaemonOp");
assertTrue("Stop for daemon should not be found", tupleOfInterest.getString("DaemonOp").contains("not found on"));
tupleOfInterest = getTupleOfInterest(TestSQLHandler.mapParams("qt", "/stream", "action", "kill", "id", daemonName),
"DaemonOp");
assertTrue("Kill for daemon should not be found", tupleOfInterest.getString("DaemonOp").contains("not found on"));
}
// It's _really_ useful to have the tuples sorted....
private List<Tuple> getTuples(final SolrParams params) throws IOException {
return getTuples(params, null);
}
private List<Tuple> getTuples(final SolrParams params, String ofInterest) throws IOException {
//log.info("Tuples from params: {}", params);
TupleStream tupleStream = new SolrStream(url, params);
tupleStream.open();
List<Tuple> tuples = new ArrayList<>();
for (; ; ) {
Tuple t = tupleStream.read();
//log.info(" ... {}", t.fields);
if (t.EOF) {
break;
} else if (ofInterest == null || t.getString("id").equals(ofInterest) || t.getString(ofInterest).equals("null") == false) {
// a failed return is a bit different, the onlyh key is DaemonOp
tuples.add(t);
}
}
tupleStream.close();
Collections.sort(tuples, (o1, o2) -> (o1.getString("id").compareTo(o2.getString("id"))));
return tuples;
}
private Tuple getTupleOfInterest(final SolrParams params, String ofInterest) throws IOException {
List<Tuple> tuples = getTuples(params, ofInterest);
if (tuples.size() != 1) {
fail("Should have found a tuple for tuple of interest: " + ofInterest);
}
return tuples.get(0);
}
private static String DAEMON_DEF =
" daemon(id=\"DAEMON_NAME\"," +
" runInterval=\"1000\"," +
" terminate=\"false\"," +
" update(targetColl," +
" batchSize=100," +
" topic(checkpointColl," +
" sourceColl," +
" q=\"*:*\"," +
" fl=\"id\"," +
" id=\"topic1\"," +
" initialCheckpoint=0)" +
"))";
}