blob: d133489ec76670af11797dd8150ec76431aa43a0 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.solr.cloud;
import java.lang.invoke.MethodHandles;
import java.util.ArrayList;
import java.util.Collections;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import com.google.common.collect.ImmutableMap;
import org.apache.solr.client.solrj.SolrQuery;
import org.apache.solr.client.solrj.SolrRequest;
import org.apache.solr.client.solrj.cloud.NodeStateProvider;
import org.apache.solr.client.solrj.cloud.SolrCloudManager;
import org.apache.solr.client.solrj.embedded.JettySolrRunner;
import org.apache.solr.client.solrj.impl.SolrClientCloudManager;
import org.apache.solr.client.solrj.request.CollectionAdminRequest;
import org.apache.solr.client.solrj.request.UpdateRequest;
import org.apache.solr.client.solrj.response.QueryResponse;
import org.apache.solr.common.cloud.Replica;
import org.apache.solr.common.cloud.Slice;
import org.apache.solr.common.cloud.ZkStateReader;
import org.apache.solr.common.params.ModifiableSolrParams;
import org.apache.solr.common.params.ShardParams;
import org.apache.solr.common.util.CommonTestInjection;
import org.apache.solr.common.util.SimpleOrderedMap;
import org.apache.solr.common.util.TimeSource;
import org.apache.solr.handler.component.TrackingShardHandlerFactory;
import org.apache.solr.util.TestInjection;
import org.apache.solr.util.TimeOut;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import static org.apache.solr.common.cloud.rule.ImplicitSnitch.SYSPROP;
public class RoutingToNodesWithPropertiesTest extends SolrCloudTestCase {
private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
private static final String PROP_NAME = SYSPROP + "zone";
final static String COLLECTION = "coll";
private final List<String> zone1Nodes = new ArrayList<>();
private final List<String> zone2Nodes = new ArrayList<>();
private final LinkedList<TrackingShardHandlerFactory.ShardRequestAndParams> zone1Queue = new LinkedList<>();
private final LinkedList<TrackingShardHandlerFactory.ShardRequestAndParams> zone2Queue = new LinkedList<>();
@Before
public void setupCluster() throws Exception {
System.setProperty("metricsEnabled", "true");
CommonTestInjection.setAdditionalProps(ImmutableMap.of("zone", "us-west1"));
configureCluster(2)
.withSolrXml(TEST_PATH().resolve("solr-trackingshardhandler.xml"))
.addConfig("config", TEST_PATH().resolve("configsets").resolve("cloud-minimal").resolve("conf"))
.configure();
zone1Nodes.addAll(cluster.getJettySolrRunners().stream().map(JettySolrRunner::getNodeName).collect(Collectors.toSet()));
CommonTestInjection.setAdditionalProps(ImmutableMap.of("zone", "us-west2"));
zone2Nodes.add(cluster.startJettySolrRunner().getNodeName());
zone2Nodes.add(cluster.startJettySolrRunner().getNodeName());
String commands = "{set-cluster-policy :[{" +
" 'replica':'#EQUAL'," +
" 'shard':'#EACH'," +
" 'sysprop.zone':'#EACH'}]}";
@SuppressWarnings({"rawtypes"})
SolrRequest req = CloudTestUtils.AutoScalingRequest.create(SolrRequest.METHOD.POST, commands);
cluster.getSolrClient().request(req);
CollectionAdminRequest.createCollection(COLLECTION, 2, 2)
.process(cluster.getSolrClient());
cluster.waitForActiveCollection(COLLECTION, 2, 4);
// Checking putting replicas
for (Slice slice : getCollectionState(COLLECTION).getSlices()) {
int numReplicaInZone1 = 0;
int numReplicaInZone2 = 0;
for (Replica replica : slice.getReplicas()) {
if (zone1Nodes.contains(replica.getNodeName()))
numReplicaInZone1++;
if (zone2Nodes.contains(replica.getNodeName()))
numReplicaInZone2++;
}
assertEquals(1, numReplicaInZone1);
assertEquals(1, numReplicaInZone2);
}
// check inject props
try (SolrCloudManager cloudManager = new SolrClientCloudManager(new ZkDistributedQueueFactory(cluster.getZkClient()),
cluster.getSolrClient())) {
for (String zone1Node: zone1Nodes) {
NodeStateProvider nodeStateProvider = cloudManager.getNodeStateProvider();
Map<String, Object> map = nodeStateProvider.getNodeValues(zone1Node, Collections.singletonList(PROP_NAME));
assertEquals("us-west1", map.get(PROP_NAME));
}
for (String zone2Node: zone2Nodes) {
NodeStateProvider nodeStateProvider = cloudManager.getNodeStateProvider();
Map<String, Object> map = nodeStateProvider.getNodeValues(zone2Node, Collections.singletonList(PROP_NAME));
assertEquals("us-west2", map.get(PROP_NAME));
}
for (JettySolrRunner jetty : cluster.getJettySolrRunners()) {
if (zone1Nodes.contains(jetty.getNodeName())) {
((TrackingShardHandlerFactory)jetty.getCoreContainer().getShardHandlerFactory()).setTrackingQueue(zone1Queue);
} else {
((TrackingShardHandlerFactory)jetty.getCoreContainer().getShardHandlerFactory()).setTrackingQueue(zone2Queue);
}
}
for (int i = 0; i < 20; i++) {
new UpdateRequest()
.add("id", String.valueOf(i))
.process(cluster.getSolrClient(), COLLECTION);
}
new UpdateRequest()
.commit(cluster.getSolrClient(), COLLECTION);
}
}
@After
public void after() {
TestInjection.reset();
}
@Test
public void test() throws Exception {
final int NUM_TRY = 10;
CollectionAdminRequest
.setClusterProperty(ZkStateReader.DEFAULT_SHARD_PREFERENCES, ShardParams.SHARDS_PREFERENCE_NODE_WITH_SAME_SYSPROP +":"+PROP_NAME)
.process(cluster.getSolrClient());
{
TimeOut timeOut = new TimeOut(20, TimeUnit.SECONDS, TimeSource.NANO_TIME);
timeOut.waitFor("Timeout waiting for sysprops are cached in all nodes", () -> {
int total = 0;
for (JettySolrRunner runner : cluster.getJettySolrRunners()) {
total += runner.getCoreContainer().getZkController().getSysPropsCacher().getCacheSize();
}
return total == cluster.getJettySolrRunners().size() * cluster.getJettySolrRunners().size();
});
}
for (int i = 0; i < NUM_TRY; i++) {
SolrQuery qRequest = new SolrQuery("*:*");
ModifiableSolrParams qParams = new ModifiableSolrParams();
qParams.add(ShardParams.SHARDS_INFO, "true");
qRequest.add(qParams);
QueryResponse qResponse = cluster.getSolrClient().query(COLLECTION, qRequest);
Object shardsInfo = qResponse.getResponse().get(ShardParams.SHARDS_INFO);
assertNotNull("Unable to obtain "+ShardParams.SHARDS_INFO, shardsInfo);
SimpleOrderedMap<?> shardsInfoMap = (SimpleOrderedMap<?>)shardsInfo;
String firstReplicaAddr = ((SimpleOrderedMap) shardsInfoMap.getVal(0)).get("shardAddress").toString();
String secondReplicaAddr = ((SimpleOrderedMap) shardsInfoMap.getVal(1)).get("shardAddress").toString();
boolean firstReplicaInZone1 = false;
boolean secondReplicaInZone1 = false;
for (String zone1Node : zone1Nodes) {
zone1Node = zone1Node.replace("_solr", "");
firstReplicaInZone1 = firstReplicaInZone1 || firstReplicaAddr.contains(zone1Node);
secondReplicaInZone1 = secondReplicaInZone1 || secondReplicaAddr.contains(zone1Node);
}
assertEquals(firstReplicaInZone1, secondReplicaInZone1);
}
// intense asserting using TrackingShardHandlerFactory
assertRoutingToSameZone();
// Cachers should be stop running
CollectionAdminRequest
.setClusterProperty(ZkStateReader.DEFAULT_SHARD_PREFERENCES, ShardParams.SHARDS_PREFERENCE_REPLICA_TYPE+":PULL")
.process(cluster.getSolrClient());
{
TimeOut timeOut = new TimeOut(20, TimeUnit.SECONDS, TimeSource.NANO_TIME);
timeOut.waitFor("Timeout waiting for sysPropsCache stop", () -> {
int numNodeStillRunningCache = 0;
for (JettySolrRunner runner: cluster.getJettySolrRunners()) {
if (runner.getCoreContainer().getZkController().getSysPropsCacher().isRunning()) {
numNodeStillRunningCache++;
}
}
return numNodeStillRunningCache == 0;
});
}
// Testing disable default shard preferences
CollectionAdminRequest
.setClusterProperty(ZkStateReader.DEFAULT_SHARD_PREFERENCES, null)
.process(cluster.getSolrClient());
{
TimeOut timeOut = new TimeOut(20, TimeUnit.SECONDS, TimeSource.NANO_TIME);
timeOut.waitFor("Timeout waiting cluster properties get updated", () -> {
int numNodeGetUpdatedPref = 0;
int numNodeStillRunningCache = 0;
for (JettySolrRunner runner: cluster.getJettySolrRunners()) {
if (runner.getCoreContainer().getZkController()
.getZkStateReader().getClusterProperties().containsKey(ZkStateReader.DEFAULT_SHARD_PREFERENCES)) {
numNodeGetUpdatedPref++;
}
if (runner.getCoreContainer().getZkController().getSysPropsCacher().isRunning()) {
numNodeStillRunningCache++;
}
}
return numNodeGetUpdatedPref == 0 && numNodeStillRunningCache == 0;
});
}
}
private void assertRoutingToSameZone() {
for (TrackingShardHandlerFactory.ShardRequestAndParams sreq: zone1Queue) {
String firstNode = sreq.shard.split("\\|")[0];
assertTrue(zone1Nodes.stream().anyMatch(s -> firstNode.contains(s.replace('_','/'))));
}
for (TrackingShardHandlerFactory.ShardRequestAndParams sreq: zone2Queue) {
String firstNode = sreq.shard.split("\\|")[0];
assertTrue(zone2Nodes.stream().anyMatch(s -> firstNode.contains(s.replace('_','/'))));
}
}
}