blob: 5e20994e597080640e335b30fafd9e92ee4a9743 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.solr.cloud;
import java.lang.invoke.MethodHandles;
import java.util.Set;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
import org.apache.solr.client.solrj.SolrQuery;
import org.apache.solr.client.solrj.embedded.JettySolrRunner;
import org.apache.solr.client.solrj.impl.CloudSolrClient;
import org.apache.solr.client.solrj.request.CollectionAdminRequest;
import org.apache.solr.client.solrj.response.QueryResponse;
import org.apache.solr.common.SolrInputDocument;
import org.apache.solr.common.cloud.CollectionStatePredicate;
import org.apache.solr.common.cloud.CollectionStateWatcher;
import org.apache.solr.common.cloud.DocCollection;
import org.apache.solr.common.cloud.Replica;
import org.apache.solr.common.util.NamedList;
import org.apache.solr.core.SolrCore;
import org.apache.solr.core.SolrEventListener;
import org.apache.solr.search.SolrIndexSearcher;
import org.apache.solr.servlet.SolrDispatchFilter;
import org.apache.solr.util.LogLevel;
import org.apache.solr.util.RefCounted;
import org.apache.solr.util.TestInjection;
import org.junit.After;
import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* Tests related to SOLR-6086
*/
@LogLevel("org.apache.solr.cloud.overseer.*=DEBUG,org.apache.solr.cloud.Overseer=DEBUG,org.apache.solr.cloud.ZkController=DEBUG")
public class TestCloudSearcherWarming extends SolrCloudTestCase {
public static final AtomicReference<String> coreNodeNameRef = new AtomicReference<>(null),
coreNameRef = new AtomicReference<>(null);
private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
private static final AtomicInteger sleepTime = new AtomicInteger(-1);
@BeforeClass
public static void setupCluster() throws Exception {
useFactory("solr.StandardDirectoryFactory"); // necessary to find the index+tlog intact after restart
}
@Before
public void setUp() throws Exception {
super.setUp();
configureCluster(1).addConfig("conf", configset("cloud-minimal")).configure();
}
@After
@Override
public void tearDown() throws Exception {
coreNameRef.set(null);
coreNodeNameRef.set(null);
sleepTime.set(-1);
if (null != cluster) {
cluster.deleteAllCollections();
cluster.deleteAllConfigSets();
cluster.shutdown();
cluster = null;
}
TestInjection.wrongIndexFingerprint = null;
super.tearDown();
}
@Test
public void testRepFactor1LeaderStartup() throws Exception {
CloudSolrClient solrClient = cluster.getSolrClient();
String collectionName = "testRepFactor1LeaderStartup";
CollectionAdminRequest.Create create = CollectionAdminRequest.createCollection(collectionName, 1, 1)
.setCreateNodeSet(cluster.getJettySolrRunner(0).getNodeName());
create.process(solrClient);
cluster.waitForActiveCollection(collectionName, 1, 1);
solrClient.setDefaultCollection(collectionName);
String addListenerCommand = "{" +
"'add-listener' : {'name':'newSearcherListener','event':'newSearcher', 'class':'" + SleepingSolrEventListener.class.getName() + "'}" +
"'add-listener' : {'name':'firstSearcherListener','event':'firstSearcher', 'class':'" + SleepingSolrEventListener.class.getName() + "'}" +
"}";
ConfigRequest request = new ConfigRequest(addListenerCommand);
solrClient.request(request);
solrClient.add(new SolrInputDocument("id", "1"));
solrClient.commit();
AtomicInteger expectedDocs = new AtomicInteger(1);
AtomicReference<String> failingCoreNodeName = new AtomicReference<>();
CollectionStateWatcher stateWatcher = createActiveReplicaSearcherWatcher(expectedDocs, failingCoreNodeName);
JettySolrRunner runner = cluster.getJettySolrRunner(0);
runner.stop();
cluster.waitForJettyToStop(runner);
// check waitForState only after we are sure the node has shutdown and have forced an update to liveNodes
// ie: workaround SOLR-13490
cluster.getSolrClient().getZkStateReader().updateLiveNodes();
waitForState("jetty count:" + cluster.getJettySolrRunners().size(), collectionName, clusterShape(1, 0));
// restart
sleepTime.set(1000);
runner.start();
cluster.waitForAllNodes(30);
cluster.getSolrClient().getZkStateReader().registerCollectionStateWatcher(collectionName, stateWatcher);
cluster.waitForActiveCollection(collectionName, 1, 1);
assertNull("No replica should have been active without registering a searcher, found: " + failingCoreNodeName.get(), failingCoreNodeName.get());
cluster.getSolrClient().getZkStateReader().removeCollectionStateWatcher(collectionName, stateWatcher);
}
@Test
public void testPeersyncFailureReplicationSuccess() throws Exception {
CloudSolrClient solrClient = cluster.getSolrClient();
String collectionName = "testPeersyncFailureReplicationSuccess";
CollectionAdminRequest.Create create = CollectionAdminRequest.createCollection(collectionName, 1, 1)
.setCreateNodeSet(cluster.getJettySolrRunner(0).getNodeName()).setMaxShardsPerNode(2);
create.process(solrClient);
waitForState("The collection should have 1 shard and 1 replica", collectionName, clusterShape(1, 1));
solrClient.setDefaultCollection(collectionName);
String addListenerCommand = "{" +
"'add-listener' : {'name':'newSearcherListener','event':'newSearcher', 'class':'" + SleepingSolrEventListener.class.getName() + "'}" +
"'add-listener' : {'name':'firstSearcherListener','event':'firstSearcher', 'class':'" + SleepingSolrEventListener.class.getName() + "'}" +
"}";
ConfigRequest request = new ConfigRequest(addListenerCommand);
solrClient.request(request);
solrClient.add(new SolrInputDocument("id", "1"));
solrClient.commit();
AtomicInteger expectedDocs = new AtomicInteger(1);
AtomicReference<String> failingCoreNodeName = new AtomicReference<>();
QueryResponse response = solrClient.query(new SolrQuery("*:*"));
assertEquals(1, response.getResults().getNumFound());
// reset
coreNameRef.set(null);
coreNodeNameRef.set(null);
failingCoreNodeName.set(null);
sleepTime.set(5000);
CollectionStateWatcher stateWatcher = createActiveReplicaSearcherWatcher(expectedDocs, failingCoreNodeName);
cluster.getSolrClient().getZkStateReader().registerCollectionStateWatcher(collectionName, stateWatcher);
JettySolrRunner newNode = cluster.startJettySolrRunner();
cluster.waitForAllNodes(30);
CollectionAdminRequest.addReplicaToShard(collectionName, "shard1")
.setNode(newNode.getNodeName())
.process(solrClient);
waitForState("The collection should have 1 shard and 2 replica", collectionName, clusterShape(1, 2));
assertNull("No replica should have been active without registering a searcher, found: " + failingCoreNodeName.get(), failingCoreNodeName.get());
// stop the old node
log.info("Stopping old node 1");
AtomicReference<String> oldNodeName = new AtomicReference<>(cluster.getJettySolrRunner(0).getNodeName());
JettySolrRunner oldNode = cluster.stopJettySolrRunner(0);
cluster.waitForJettyToStop(oldNode);
// the newly created replica should become leader
waitForState("The collection should have 1 shard and 1 replica", collectionName, clusterShape(1, 1));
// the above call is not enough because we want to assert that the down'ed replica is not active
// but clusterShape will also return true if replica is not live -- which we don't want
CollectionStatePredicate collectionStatePredicate = (liveNodes, collectionState) -> {
for (Replica r : collectionState.getReplicas()) {
if (r.getNodeName().equals(oldNodeName.get())) {
return r.getState() == Replica.State.DOWN;
}
}
return false;
};
waitForState("", collectionName, collectionStatePredicate);
assertNotNull(solrClient.getZkStateReader().getLeaderRetry(collectionName, "shard1"));
// reset
coreNameRef.set(null);
coreNodeNameRef.set(null);
failingCoreNodeName.set(null);
sleepTime.set(5000);
// inject wrong signature output
TestInjection.wrongIndexFingerprint = "true:100";
// now lets restart the old node
log.info("Starting old node 1");
cluster.startJettySolrRunner(oldNode);
waitForState("", collectionName, clusterShape(1, 2));
// invoke statewatcher explicitly to avoid race condition where the assert happens before the state watcher is invoked by ZkStateReader
cluster.getSolrClient().getZkStateReader().registerCollectionStateWatcher(collectionName, stateWatcher);
assertNull("No replica should have been active without registering a searcher, found: " + failingCoreNodeName.get(), failingCoreNodeName.get());
oldNodeName.set(cluster.getJettySolrRunner(1).getNodeName());
assertSame(oldNode, cluster.stopJettySolrRunner(1)); // old node is now at 1
log.info("Stopping old node 2");
waitForState("", collectionName, clusterShape(1, 1));
waitForState("", collectionName, collectionStatePredicate);
// reset
coreNameRef.set(null);
coreNodeNameRef.set(null);
failingCoreNodeName.set(null);
sleepTime.set(14000); // has to be higher than the twice the recovery wait pause between attempts plus some margin
// inject failure
TestInjection.failIndexFingerprintRequests = "true:100";
// now lets restart the old node again
log.info("Starting old node 2");
cluster.startJettySolrRunner(oldNode);
waitForState("", collectionName, clusterShape(1, 2));
// invoke statewatcher explicitly to avoid race condition where the assert happens before the state watcher is invoked by ZkStateReader
cluster.getSolrClient().getZkStateReader().registerCollectionStateWatcher(collectionName, stateWatcher);
assertNull("No replica should have been active without registering a searcher, found: " + failingCoreNodeName.get(), failingCoreNodeName.get());
cluster.getSolrClient().getZkStateReader().removeCollectionStateWatcher(collectionName, stateWatcher);
}
private CollectionStateWatcher createActiveReplicaSearcherWatcher(AtomicInteger expectedDocs, AtomicReference<String> failingCoreNodeName) {
return new CollectionStateWatcher() {
@Override
public boolean onStateChanged(Set<String> liveNodes, DocCollection collectionState) {
try {
String coreNodeName = coreNodeNameRef.get();
String coreName = coreNameRef.get();
if (coreNodeName == null || coreName == null) return false;
Replica replica = collectionState.getReplica(coreNodeName);
if (replica == null) return false;
log.info("Collection state: {}", collectionState);
if (replica.isActive(liveNodes)) {
log.info("Active replica: {}", coreNodeName);
for (int i = 0; i < cluster.getJettySolrRunners().size(); i++) {
JettySolrRunner jettySolrRunner = cluster.getJettySolrRunner(i);
if (log.isInfoEnabled()) {
log.info("Checking node: {}", jettySolrRunner.getNodeName());
}
if (jettySolrRunner.getNodeName().equals(replica.getNodeName())) {
SolrDispatchFilter solrDispatchFilter = jettySolrRunner.getSolrDispatchFilter();
try (SolrCore core = solrDispatchFilter.getCores().getCore(coreName)) {
if (core.getSolrConfig().useColdSearcher) {
log.error("useColdSearcher is enabled! It should not be enabled for this test!");
assert false;
return false;
}
if (log.isInfoEnabled()) {
log.info("Found SolrCore: {}, id: {}", core.getName(), core);
}
RefCounted<SolrIndexSearcher> registeredSearcher = core.getRegisteredSearcher();
if (registeredSearcher != null) {
log.error("registered searcher not null, maxdocs = {}", registeredSearcher.get().maxDoc());
if (registeredSearcher.get().maxDoc() != expectedDocs.get()) {
failingCoreNodeName.set(coreNodeName);
registeredSearcher.decref();
return false;
} else {
registeredSearcher.decref();
return false;
}
} else {
log.error("registered searcher was null!");
RefCounted<SolrIndexSearcher> newestSearcher = core.getNewestSearcher(false);
if (newestSearcher != null) {
SolrIndexSearcher searcher = newestSearcher.get();
log.warn("newest searcher was: {}", searcher);
newestSearcher.decref();
} else {
log.error("newest searcher was also null!");
}
// no registered searcher but replica is active!
failingCoreNodeName.set(coreNodeName);
}
}
}
}
}
} catch (Exception e) {
log.error("Unexpected exception in state watcher", e);
}
return false;
}
};
}
public static class SleepingSolrEventListener implements SolrEventListener {
@Override
public void init(@SuppressWarnings({"rawtypes"})NamedList args) {
// No-Op
}
@Override
public void postCommit() {
}
@Override
public void postSoftCommit() {
}
@Override
public void newSearcher(SolrIndexSearcher newSearcher, SolrIndexSearcher currentSearcher) {
if (sleepTime.get() > 0) {
TestCloudSearcherWarming.coreNodeNameRef.set(newSearcher.getCore().getCoreDescriptor().getCloudDescriptor().getCoreNodeName());
TestCloudSearcherWarming.coreNameRef.set(newSearcher.getCore().getName());
if (log.isInfoEnabled()) {
log.info("Sleeping for {} on newSearcher: {}, currentSearcher: {} belonging to (newest) core: {}, id: {}"
, sleepTime.get(), newSearcher, currentSearcher, newSearcher.getCore().getName(), newSearcher.getCore());
}
try {
Thread.sleep(sleepTime.get());
} catch (InterruptedException e) {
log.warn("newSearcher was interupdated", e);
}
if (log.isInfoEnabled()) {
log.info("Finished sleeping for {} on newSearcher: {}, currentSearcher: {} belonging to (newest) core: {}, id: {}"
, sleepTime.get(), newSearcher, currentSearcher, newSearcher.getCore().getName(), newSearcher.getCore());
}
}
}
}
}