blob: 199d77982830f657a62f990a31da4cbd978a9adb [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.solr.cluster.placement.impl;
import org.apache.solr.client.solrj.cloud.SolrCloudManager;
import org.apache.solr.client.solrj.request.CollectionAdminRequest;
import org.apache.solr.client.solrj.request.V2Request;
import org.apache.solr.client.solrj.request.beans.PluginMeta;
import org.apache.solr.client.solrj.response.CollectionAdminResponse;
import org.apache.solr.client.solrj.response.V2Response;
import org.apache.solr.cloud.SolrCloudTestCase;
import org.apache.solr.cluster.Cluster;
import org.apache.solr.cluster.Node;
import org.apache.solr.cluster.SolrCollection;
import org.apache.solr.cluster.placement.AttributeFetcher;
import org.apache.solr.cluster.placement.AttributeValues;
import org.apache.solr.cluster.placement.CollectionMetrics;
import org.apache.solr.cluster.placement.NodeMetric;
import org.apache.solr.cluster.placement.PlacementPluginConfig;
import org.apache.solr.cluster.placement.PlacementPluginFactory;
import org.apache.solr.cluster.placement.ReplicaMetrics;
import org.apache.solr.cluster.placement.ShardMetrics;
import org.apache.solr.cluster.placement.plugins.AffinityPlacementConfig;
import org.apache.solr.cluster.placement.plugins.AffinityPlacementFactory;
import org.apache.solr.cloud.MiniSolrCloudCluster;
import org.apache.solr.cluster.placement.plugins.MinimizeCoresPlacementFactory;
import org.apache.solr.common.cloud.ClusterState;
import org.apache.solr.common.cloud.DocCollection;
import org.apache.solr.common.util.TimeSource;
import org.apache.solr.core.CoreContainer;
import org.apache.solr.util.LogLevel;
import org.apache.solr.util.TimeOut;
import org.junit.After;
import org.junit.BeforeClass;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.lang.invoke.MethodHandles;
import java.util.Arrays;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicInteger;
import static java.util.Collections.singletonMap;
/**
* Test for {@link MinimizeCoresPlacementFactory} using a {@link MiniSolrCloudCluster}.
*/
@LogLevel("org.apache.solr.cluster.placement.impl=DEBUG")
public class PlacementPluginIntegrationTest extends SolrCloudTestCase {
private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
private static final String COLLECTION = PlacementPluginIntegrationTest.class.getSimpleName() + "_collection";
private static SolrCloudManager cloudManager;
private static CoreContainer cc;
@BeforeClass
public static void setupCluster() throws Exception {
// placement plugins need metrics
System.setProperty("metricsEnabled", "true");
configureCluster(3)
.addConfig("conf", configset("cloud-minimal"))
.configure();
cc = cluster.getJettySolrRunner(0).getCoreContainer();
cloudManager = cc.getZkController().getSolrCloudManager();
}
@After
public void cleanup() throws Exception {
cluster.deleteAllCollections();
V2Request req = new V2Request.Builder("/cluster/plugin")
.forceV2(true)
.GET()
.build();
V2Response rsp = req.process(cluster.getSolrClient());
if (rsp._get(Arrays.asList("plugin", PlacementPluginFactory.PLUGIN_NAME), null) != null) {
req = new V2Request.Builder("/cluster/plugin")
.forceV2(true)
.POST()
.withPayload("{remove: '" + PlacementPluginFactory.PLUGIN_NAME + "'}")
.build();
req.process(cluster.getSolrClient());
}
}
@Test
public void testMinimizeCores() throws Exception {
PluginMeta plugin = new PluginMeta();
plugin.name = PlacementPluginFactory.PLUGIN_NAME;
plugin.klass = MinimizeCoresPlacementFactory.class.getName();
V2Request req = new V2Request.Builder("/cluster/plugin")
.forceV2(true)
.POST()
.withPayload(singletonMap("add", plugin))
.build();
req.process(cluster.getSolrClient());
CollectionAdminResponse rsp = CollectionAdminRequest.createCollection(COLLECTION, "conf", 2, 2)
.process(cluster.getSolrClient());
assertTrue(rsp.isSuccess());
cluster.waitForActiveCollection(COLLECTION, 2, 4);
// use Solr-specific API to verify the expected placements
ClusterState clusterState = cloudManager.getClusterStateProvider().getClusterState();
DocCollection collection = clusterState.getCollectionOrNull(COLLECTION);
assertNotNull(collection);
Map<String, AtomicInteger> coresByNode = new HashMap<>();
collection.forEachReplica((shard, replica) -> coresByNode.computeIfAbsent(replica.getNodeName(), n -> new AtomicInteger()).incrementAndGet());
int maxCores = 0;
int minCores = Integer.MAX_VALUE;
for (Map.Entry<String, AtomicInteger> entry : coresByNode.entrySet()) {
assertTrue("too few cores on node " + entry.getKey() + ": " + entry.getValue(),
entry.getValue().get() > 0);
if (entry.getValue().get() > maxCores) {
maxCores = entry.getValue().get();
}
if (entry.getValue().get() < minCores) {
minCores = entry.getValue().get();
}
}
assertEquals("max cores too high", 2, maxCores);
assertEquals("min cores too low", 1, minCores);
}
@Test
@SuppressWarnings("unchecked")
public void testDynamicReconfiguration() throws Exception {
PlacementPluginFactory<? extends PlacementPluginConfig> pluginFactory = cc.getPlacementPluginFactory();
assertTrue("wrong type " + pluginFactory.getClass().getName(), pluginFactory instanceof DelegatingPlacementPluginFactory);
DelegatingPlacementPluginFactory wrapper = (DelegatingPlacementPluginFactory) pluginFactory;
int version = wrapper.getVersion();
log.debug("--initial version={}", version);
PluginMeta plugin = new PluginMeta();
plugin.name = PlacementPluginFactory.PLUGIN_NAME;
plugin.klass = MinimizeCoresPlacementFactory.class.getName();
V2Request req = new V2Request.Builder("/cluster/plugin")
.forceV2(true)
.POST()
.withPayload(singletonMap("add", plugin))
.build();
req.process(cluster.getSolrClient());
version = waitForVersionChange(version, wrapper, 10);
assertTrue("wrong version " + version, version > 0);
PlacementPluginFactory<? extends PlacementPluginConfig> factory = wrapper.getDelegate();
assertTrue("wrong type " + factory.getClass().getName(), factory instanceof MinimizeCoresPlacementFactory);
// reconfigure
plugin.klass = AffinityPlacementFactory.class.getName();
plugin.config = new AffinityPlacementConfig(1, 2);
req = new V2Request.Builder("/cluster/plugin")
.forceV2(true)
.POST()
.withPayload(singletonMap("update", plugin))
.build();
req.process(cluster.getSolrClient());
version = waitForVersionChange(version, wrapper, 10);
factory = wrapper.getDelegate();
assertTrue("wrong type " + factory.getClass().getName(), factory instanceof AffinityPlacementFactory);
AffinityPlacementConfig config = ((AffinityPlacementFactory) factory).getConfig();
assertEquals("minimalFreeDiskGB", 1, config.minimalFreeDiskGB);
assertEquals("prioritizedFreeDiskGB", 2, config.prioritizedFreeDiskGB);
// change plugin config
plugin.config = new AffinityPlacementConfig(3, 4);
req = new V2Request.Builder("/cluster/plugin")
.forceV2(true)
.POST()
.withPayload(singletonMap("update", plugin))
.build();
req.process(cluster.getSolrClient());
version = waitForVersionChange(version, wrapper, 10);
factory = wrapper.getDelegate();
assertTrue("wrong type " + factory.getClass().getName(), factory instanceof AffinityPlacementFactory);
config = ((AffinityPlacementFactory) factory).getConfig();
assertEquals("minimalFreeDiskGB", 3, config.minimalFreeDiskGB);
assertEquals("prioritizedFreeDiskGB", 4, config.prioritizedFreeDiskGB);
// add plugin of the right type but with the wrong name
plugin.name = "myPlugin";
req = new V2Request.Builder("/cluster/plugin")
.forceV2(true)
.POST()
.withPayload(singletonMap("add", plugin))
.build();
req.process(cluster.getSolrClient());
try {
int newVersion = waitForVersionChange(version, wrapper, 5);
if (newVersion != version) {
fail("factory configuration updated but plugin name was wrong: " + plugin);
}
} catch (TimeoutException te) {
// expected
}
// remove plugin
req = new V2Request.Builder("/cluster/plugin")
.forceV2(true)
.POST()
.withPayload("{remove: '" + PlacementPluginFactory.PLUGIN_NAME + "'}")
.build();
req.process(cluster.getSolrClient());
waitForVersionChange(version, wrapper, 10);
factory = wrapper.getDelegate();
assertNull("no factory should be present", factory);
}
@Test
public void testWithCollectionIntegration() throws Exception {
PlacementPluginFactory<? extends PlacementPluginConfig> pluginFactory = cc.getPlacementPluginFactory();
assertTrue("wrong type " + pluginFactory.getClass().getName(), pluginFactory instanceof DelegatingPlacementPluginFactory);
DelegatingPlacementPluginFactory wrapper = (DelegatingPlacementPluginFactory) pluginFactory;
int version = wrapper.getVersion();
log.debug("--initial version={}", version);
Set<String> nodeSet = new HashSet<>();
for (String node : cloudManager.getClusterStateProvider().getLiveNodes()) {
if (nodeSet.size() > 1) {
break;
}
nodeSet.add(node);
}
String SECONDARY_COLLECTION = COLLECTION + "_secondary";
PluginMeta plugin = new PluginMeta();
plugin.name = PlacementPluginFactory.PLUGIN_NAME;
plugin.klass = AffinityPlacementFactory.class.getName();
plugin.config = new AffinityPlacementConfig(1, 2, Map.of(COLLECTION, SECONDARY_COLLECTION));
V2Request req = new V2Request.Builder("/cluster/plugin")
.forceV2(true)
.POST()
.withPayload(singletonMap("add", plugin))
.build();
req.process(cluster.getSolrClient());
version = waitForVersionChange(version, wrapper, 10);
CollectionAdminResponse rsp = CollectionAdminRequest.createCollection(SECONDARY_COLLECTION, "conf", 1, 3)
.process(cluster.getSolrClient());
assertTrue(rsp.isSuccess());
cluster.waitForActiveCollection(SECONDARY_COLLECTION, 1, 3);
DocCollection secondary = cloudManager.getClusterStateProvider().getClusterState().getCollection(SECONDARY_COLLECTION);
Set<String> secondaryNodes = new HashSet<>();
secondary.forEachReplica((shard, replica) -> secondaryNodes.add(replica.getNodeName()));
rsp = CollectionAdminRequest.createCollection(COLLECTION, "conf", 2, 2)
.setCreateNodeSet(String.join(",", nodeSet))
.process(cluster.getSolrClient());
assertTrue(rsp.isSuccess());
cluster.waitForActiveCollection(COLLECTION, 2, 4);
// make sure the primary replicas were placed on the nodeset
DocCollection primary = cloudManager.getClusterStateProvider().getClusterState().getCollection(COLLECTION);
primary.forEachReplica((shard, replica) ->
assertTrue("primary replica not on secondary node!", nodeSet.contains(replica.getNodeName())));
// try deleting secondary replica from node without the primary replica
Optional<String> onlySecondaryReplica = secondary.getReplicas().stream()
.filter(replica -> !nodeSet.contains(replica.getNodeName()))
.map(replica -> replica.getName()).findFirst();
assertTrue("no secondary node without primary replica", onlySecondaryReplica.isPresent());
rsp = CollectionAdminRequest.deleteReplica(SECONDARY_COLLECTION, "shard1", onlySecondaryReplica.get())
.process(cluster.getSolrClient());
assertTrue("delete of a lone secondary replica should succeed", rsp.isSuccess());
// try deleting secondary replica from node WITH the primary replica - should fail
Optional<String> secondaryWithPrimaryReplica = secondary.getReplicas().stream()
.filter(replica -> nodeSet.contains(replica.getNodeName()))
.map(replica -> replica.getName()).findFirst();
assertTrue("no secondary node with primary replica", secondaryWithPrimaryReplica.isPresent());
try {
rsp = CollectionAdminRequest.deleteReplica(SECONDARY_COLLECTION, "shard1", secondaryWithPrimaryReplica.get())
.process(cluster.getSolrClient());
fail("should have failed: " + rsp);
} catch (Exception e) {
assertTrue(e.toString(), e.toString().contains("co-located with replicas"));
}
// try deleting secondary collection
try {
rsp = CollectionAdminRequest.deleteCollection(SECONDARY_COLLECTION)
.process(cluster.getSolrClient());
fail("should have failed: " + rsp);
} catch (Exception e) {
assertTrue(e.toString(), e.toString().contains("colocated collection"));
}
}
@Test
public void testAttributeFetcherImpl() throws Exception {
CollectionAdminResponse rsp = CollectionAdminRequest.createCollection(COLLECTION, "conf", 2, 2)
.process(cluster.getSolrClient());
assertTrue(rsp.isSuccess());
cluster.waitForActiveCollection(COLLECTION, 2, 4);
Cluster cluster = new SimpleClusterAbstractionsImpl.ClusterImpl(cloudManager);
SolrCollection collection = cluster.getCollection(COLLECTION);
AttributeFetcher attributeFetcher = new AttributeFetcherImpl(cloudManager);
NodeMetric<String> someMetricKey = new NodeMetricImpl<>("solr.jvm:system.properties:user.name");
String sysprop = "user.name";
String sysenv = "PWD";
attributeFetcher
.fetchFrom(cluster.getLiveNodes())
.requestNodeMetric(NodeMetricImpl.HEAP_USAGE)
.requestNodeMetric(NodeMetricImpl.SYSLOAD_AVG)
.requestNodeMetric(NodeMetricImpl.NUM_CORES)
.requestNodeMetric(NodeMetricImpl.FREE_DISK_GB)
.requestNodeMetric(NodeMetricImpl.TOTAL_DISK_GB)
.requestNodeMetric(NodeMetricImpl.AVAILABLE_PROCESSORS)
.requestNodeMetric(someMetricKey)
.requestNodeSystemProperty(sysprop)
.requestNodeEnvironmentVariable(sysenv)
.requestCollectionMetrics(collection, Set.of(ReplicaMetricImpl.INDEX_SIZE_GB, ReplicaMetricImpl.QUERY_RATE_1MIN, ReplicaMetricImpl.UPDATE_RATE_1MIN));
AttributeValues attributeValues = attributeFetcher.fetchAttributes();
String userName = System.getProperty("user.name");
String pwd = System.getenv("PWD");
// node metrics
for (Node node : cluster.getLiveNodes()) {
Optional<Double> doubleOpt = attributeValues.getNodeMetric(node, NodeMetricImpl.HEAP_USAGE);
assertTrue("heap usage", doubleOpt.isPresent());
assertTrue("heap usage should be 0 < heapUsage < 100 but was " + doubleOpt, doubleOpt.get() > 0 && doubleOpt.get() < 100);
doubleOpt = attributeValues.getNodeMetric(node, NodeMetricImpl.TOTAL_DISK_GB);
assertTrue("total disk", doubleOpt.isPresent());
assertTrue("total disk should be > 0 but was " + doubleOpt, doubleOpt.get() > 0);
doubleOpt = attributeValues.getNodeMetric(node, NodeMetricImpl.FREE_DISK_GB);
assertTrue("free disk", doubleOpt.isPresent());
assertTrue("free disk should be > 0 but was " + doubleOpt, doubleOpt.get() > 0);
Optional<Integer> intOpt = attributeValues.getNodeMetric(node, NodeMetricImpl.NUM_CORES);
assertTrue("cores", intOpt.isPresent());
assertTrue("cores should be > 0", intOpt.get() > 0);
assertTrue("systemLoadAverage 2", attributeValues.getNodeMetric(node, NodeMetricImpl.SYSLOAD_AVG).isPresent());
assertTrue("availableProcessors", attributeValues.getNodeMetric(node, NodeMetricImpl.AVAILABLE_PROCESSORS).isPresent());
Optional<String> userNameOpt = attributeValues.getNodeMetric(node, someMetricKey);
assertTrue("user.name", userNameOpt.isPresent());
assertEquals("userName", userName, userNameOpt.get());
Optional<String> syspropOpt = attributeValues.getSystemProperty(node, sysprop);
assertTrue("sysprop", syspropOpt.isPresent());
assertEquals("user.name sysprop", userName, syspropOpt.get());
Optional<String> sysenvOpt = attributeValues.getEnvironmentVariable(node, sysenv);
assertTrue("sysenv", sysenvOpt.isPresent());
assertEquals("PWD sysenv", pwd, sysenvOpt.get());
}
assertTrue(attributeValues.getCollectionMetrics(COLLECTION).isPresent());
CollectionMetrics collectionMetrics = attributeValues.getCollectionMetrics(COLLECTION).get();
collection.shards().forEach(shard -> {
Optional<ShardMetrics> shardMetricsOpt = collectionMetrics.getShardMetrics(shard.getShardName());
assertTrue("shard metrics", shardMetricsOpt.isPresent());
shard.replicas().forEach(replica -> {
Optional<ReplicaMetrics> replicaMetricsOpt = shardMetricsOpt.get().getReplicaMetrics(replica.getReplicaName());
assertTrue("replica metrics", replicaMetricsOpt.isPresent());
ReplicaMetrics replicaMetrics = replicaMetricsOpt.get();
Optional<Double> indexSizeOpt = replicaMetrics.getReplicaMetric(ReplicaMetricImpl.INDEX_SIZE_GB);
assertTrue("indexSize", indexSizeOpt.isPresent());
assertTrue("wrong type, expected Double but was " + indexSizeOpt.get().getClass(), indexSizeOpt.get() instanceof Double);
assertTrue("indexSize should be > 0 but was " + indexSizeOpt.get(), indexSizeOpt.get() > 0);
assertTrue("indexSize should be < 0.01 but was " + indexSizeOpt.get(), indexSizeOpt.get() < 0.01);
assertNotNull("queryRate", replicaMetrics.getReplicaMetric(ReplicaMetricImpl.QUERY_RATE_1MIN));
assertNotNull("updateRate", replicaMetrics.getReplicaMetric(ReplicaMetricImpl.UPDATE_RATE_1MIN));
});
});
}
private int waitForVersionChange(int currentVersion, DelegatingPlacementPluginFactory wrapper, int timeoutSec) throws Exception {
TimeOut timeout = new TimeOut(timeoutSec, TimeUnit.SECONDS, TimeSource.NANO_TIME);
while (!timeout.hasTimedOut()) {
int newVersion = wrapper.getVersion();
if (newVersion < currentVersion) {
throw new Exception("Invalid version - went back! currentVersion=" + currentVersion +
" newVersion=" + newVersion);
} else if (currentVersion < newVersion) {
log.debug("--current version was {}, new version is {}", currentVersion, newVersion);
return newVersion;
}
timeout.sleep(200);
}
throw new TimeoutException("version didn't change in time, currentVersion=" + currentVersion);
}
}