blob: d4054da436aa63b9512b4ba4a5f31cb68b8cf409 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.solr.cloud;
import java.lang.invoke.MethodHandles;
import java.util.ArrayList;
import java.util.Collection;
import java.util.LinkedHashMap;
import java.util.LinkedHashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import com.codahale.metrics.Counter;
import org.apache.lucene.util.TestUtil;
import org.apache.solr.BaseDistributedSearchTestCase;
import org.apache.solr.SolrTestCaseJ4;
import org.apache.solr.client.solrj.SolrQuery;
import org.apache.solr.client.solrj.embedded.JettySolrRunner;
import org.apache.solr.client.solrj.impl.HttpSolrClient;
import org.apache.solr.client.solrj.request.CollectionAdminRequest;
import org.apache.solr.cloud.overseer.OverseerAction;
import org.apache.solr.common.cloud.ClusterState;
import org.apache.solr.common.cloud.DocCollection;
import org.apache.solr.common.cloud.Replica;
import org.apache.solr.common.cloud.ZkNodeProps;
import org.apache.solr.common.cloud.ZkStateReader;
import org.apache.solr.common.util.Utils;
import org.apache.solr.core.CoreContainer;
import org.apache.solr.core.SolrCore;
import org.apache.solr.metrics.SolrMetricManager;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@SolrTestCaseJ4.SuppressSSL
public class TestRandomRequestDistribution extends AbstractFullDistribZkTestBase {
private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
List<String> nodeNames = new ArrayList<>(3);
@Test
@BaseDistributedSearchTestCase.ShardsFixed(num = 3)
public void test() throws Exception {
waitForThingsToLevelOut(30);
for (CloudJettyRunner cloudJetty : cloudJettys) {
nodeNames.add(cloudJetty.nodeName);
}
assertEquals(3, nodeNames.size());
testRequestTracking();
testQueryAgainstDownReplica();
}
/**
* Asserts that requests aren't always sent to the same poor node. See SOLR-7493
*/
private void testRequestTracking() throws Exception {
CollectionAdminRequest.createCollection("a1x2", "conf1", 1, 2)
.setCreateNodeSet(nodeNames.get(0) + ',' + nodeNames.get(1))
.process(cloudClient);
CollectionAdminRequest.createCollection("b1x1", "conf1", 1, 1)
.setCreateNodeSet(nodeNames.get(2))
.process(cloudClient);
waitForRecoveriesToFinish("a1x2", true);
waitForRecoveriesToFinish("b1x1", true);
cloudClient.getZkStateReader().forceUpdateCollection("b1x1");
// get direct access to the metrics counters for each core/replica we're interested to monitor them
final Map<String,Counter> counters = new LinkedHashMap<>();
for (JettySolrRunner runner : jettys) {
CoreContainer container = runner.getCoreContainer();
SolrMetricManager metricManager = container.getMetricManager();
for (SolrCore core : container.getCores()) {
if ("a1x2".equals(core.getCoreDescriptor().getCollectionName())) {
String registry = core.getCoreMetricManager().getRegistryName();
Counter cnt = metricManager.counter(null, registry, "requests", "QUERY./select");
// sanity check
assertEquals(core.getName() + " has already received some requests?",
0, cnt.getCount());
counters.put(core.getName(), cnt);
}
}
}
assertEquals("Sanity Check: we know there should be 2 replicas", 2, counters.size());
// send queries to the node that doesn't host any core/replica and see where it routes them
ClusterState clusterState = cloudClient.getZkStateReader().getClusterState();
DocCollection b1x1 = clusterState.getCollection("b1x1");
Collection<Replica> replicas = b1x1.getSlice("shard1").getReplicas();
assertEquals(1, replicas.size());
String baseUrl = replicas.iterator().next().getBaseUrl();
if (!baseUrl.endsWith("/")) baseUrl += "/";
try (HttpSolrClient client = getHttpSolrClient(baseUrl + "a1x2", 2000, 5000)) {
long expectedTotalRequests = 0;
Set<String> uniqueCoreNames = new LinkedHashSet<>();
log.info("Making requests to {} a1x2", baseUrl);
while (uniqueCoreNames.size() < counters.keySet().size() && expectedTotalRequests < 1000L) {
expectedTotalRequests++;
client.query(new SolrQuery("*:*"));
long actualTotalRequests = 0;
for (Map.Entry<String,Counter> e : counters.entrySet()) {
final long coreCount = e.getValue().getCount();
actualTotalRequests += coreCount;
if (0 < coreCount) {
uniqueCoreNames.add(e.getKey());
}
}
assertEquals("Sanity Check: Num Queries So Far Doesn't Match Total????",
expectedTotalRequests, actualTotalRequests);
}
log.info("Total requests: {}", expectedTotalRequests);
assertEquals("either request randomization code is broken of this test seed is really unlucky, " +
"Gave up waiting for requests to hit every core at least once after " +
expectedTotalRequests + " requests",
uniqueCoreNames.size(), counters.size());
}
}
/**
* Asserts that requests against a collection are only served by a 'active' local replica
*/
private void testQueryAgainstDownReplica() throws Exception {
log.info("Creating collection 'football' with 1 shard and 2 replicas");
CollectionAdminRequest.createCollection("football", "conf1", 1, 2)
.setCreateNodeSet(nodeNames.get(0) + ',' + nodeNames.get(1))
.process(cloudClient);
waitForRecoveriesToFinish("football", true);
cloudClient.getZkStateReader().forceUpdateCollection("football");
Replica leader = null;
Replica notLeader = null;
Collection<Replica> replicas = cloudClient.getZkStateReader().getClusterState().getCollection("football").getSlice("shard1").getReplicas();
for (Replica replica : replicas) {
if (replica.getStr(ZkStateReader.LEADER_PROP) != null) {
leader = replica;
} else {
notLeader = replica;
}
}
//Simulate a replica being in down state.
ZkNodeProps m = new ZkNodeProps(Overseer.QUEUE_OPERATION, OverseerAction.STATE.toLower(),
ZkStateReader.NODE_NAME_PROP, notLeader.getStr(ZkStateReader.NODE_NAME_PROP),
ZkStateReader.COLLECTION_PROP, "football",
ZkStateReader.SHARD_ID_PROP, "shard1",
ZkStateReader.CORE_NAME_PROP, notLeader.getStr(ZkStateReader.CORE_NAME_PROP),
ZkStateReader.ROLES_PROP, "",
ZkStateReader.STATE_PROP, Replica.State.DOWN.toString());
if (log.isInfoEnabled()) {
log.info("Forcing {} to go into 'down' state", notLeader.getStr(ZkStateReader.CORE_NAME_PROP));
}
ZkDistributedQueue q = jettys.get(0).getCoreContainer().getZkController().getOverseer().getStateUpdateQueue();
q.offer(Utils.toJSON(m));
verifyReplicaStatus(cloudClient.getZkStateReader(), "football", "shard1", notLeader.getName(), Replica.State.DOWN);
//Query against the node which hosts the down replica
String baseUrl = notLeader.getBaseUrl();
if (!baseUrl.endsWith("/")) baseUrl += "/";
String path = baseUrl + "football";
log.info("Firing queries against path={}", path);
try (HttpSolrClient client = getHttpSolrClient(path, 2000, 5000)) {
SolrCore leaderCore = null;
for (JettySolrRunner jetty : jettys) {
CoreContainer container = jetty.getCoreContainer();
for (SolrCore core : container.getCores()) {
if (core.getName().equals(leader.getStr(ZkStateReader.CORE_NAME_PROP))) {
leaderCore = core;
break;
}
}
}
assertNotNull(leaderCore);
SolrMetricManager leaderMetricManager = leaderCore.getCoreContainer().getMetricManager();
String leaderRegistry = leaderCore.getCoreMetricManager().getRegistryName();
Counter cnt = leaderMetricManager.counter(null, leaderRegistry, "requests", "QUERY./select");
// All queries should be served by the active replica
// To make sure that's true we keep querying the down replica
// If queries are getting processed by the down replica then the cluster state hasn't updated for that replica
// locally
// So we keep trying till it has updated and then verify if ALL queries go to the active replica
long count = 0;
while (true) {
count++;
client.query(new SolrQuery("*:*"));
long c = cnt.getCount();
if (c == 1) {
break; // cluster state has got update locally
} else {
Thread.sleep(100);
}
if (count > 10000) {
fail("After 10k queries we still see all requests being processed by the down replica");
}
}
// Now we fire a few additional queries and make sure ALL of them
// are served by the active replica
int moreQueries = TestUtil.nextInt(random(), 4, 10);
count = 1; // Since 1 query has already hit the leader
for (int i = 0; i < moreQueries; i++) {
client.query(new SolrQuery("*:*"));
count++;
long c = cnt.getCount();
assertEquals("Query wasn't served by leader", count, c);
}
}
}
}