blob: 3a280e4726ef9bc2dc432e14aaf07c6e7f30048c [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.solr.cloud.autoscaling;
import java.net.URL;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import com.codahale.metrics.MetricRegistry;
import com.google.common.util.concurrent.AtomicDouble;
import org.apache.solr.client.solrj.cloud.NodeStateProvider;
import org.apache.solr.client.solrj.cloud.autoscaling.ReplicaInfo;
import org.apache.solr.client.solrj.cloud.SolrCloudManager;
import org.apache.solr.client.solrj.cloud.autoscaling.TriggerEventType;
import org.apache.solr.client.solrj.embedded.JettySolrRunner;
import org.apache.solr.client.solrj.impl.CloudSolrClient;
import org.apache.solr.client.solrj.impl.HttpSolrClient;
import org.apache.solr.client.solrj.impl.SolrClientCloudManager;
import org.apache.solr.client.solrj.impl.SolrClientNodeStateProvider;
import org.apache.solr.client.solrj.request.CollectionAdminRequest;
import org.apache.solr.cloud.CloudUtil;
import org.apache.solr.cloud.SolrCloudTestCase;
import org.apache.solr.cloud.ZkDistributedQueueFactory;
import org.apache.solr.common.cloud.SolrZkClient;
import org.apache.solr.common.params.AutoScalingParams;
import org.apache.solr.common.params.CollectionParams;
import org.apache.solr.common.params.CommonParams;
import org.apache.solr.common.params.SolrParams;
import org.apache.solr.common.util.TimeSource;
import org.apache.solr.core.CoreContainer;
import org.apache.solr.core.SolrResourceLoader;
import org.apache.solr.metrics.SolrMetricManager;
import org.apache.solr.util.TimeOut;
import org.junit.After;
import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.Test;
/**
*
*/
public class SearchRateTriggerTest extends SolrCloudTestCase {
private static final String PREFIX = SearchRateTriggerTest.class.getSimpleName() + "-";
private static final String COLL1 = PREFIX + "collection1";
private static final String COLL2 = PREFIX + "collection2";
private AutoScaling.TriggerEventProcessor noFirstRunProcessor = event -> {
fail("Did not expect the listener to fire on first run!");
return true;
};
@BeforeClass
public static void setupCluster() throws Exception {
System.setProperty("metricsEnabled", "true");
}
@Before
public void removeCollections() throws Exception {
configureCluster(4)
.addConfig("conf", configset("cloud-minimal"))
.configure();
}
@After
public void after() throws Exception {
shutdownCluster();
}
@Test
@AwaitsFix(bugUrl="https://issues.apache.org/jira/browse/SOLR-12028")
@SuppressWarnings({"unchecked"})
public void testTrigger() throws Exception {
JettySolrRunner targetNode = cluster.getJettySolrRunner(0);
SolrZkClient zkClient = cluster.getSolrClient().getZkStateReader().getZkClient();
SolrResourceLoader loader = targetNode.getCoreContainer().getResourceLoader();
CoreContainer container = targetNode.getCoreContainer();
SolrCloudManager cloudManager = new SolrClientCloudManager(new ZkDistributedQueueFactory(zkClient), cluster.getSolrClient());
CollectionAdminRequest.Create create = CollectionAdminRequest.createCollection(COLL1,
"conf", 2, 2);
CloudSolrClient solrClient = cluster.getSolrClient();
create.setMaxShardsPerNode(1);
create.process(solrClient);
create = CollectionAdminRequest.createCollection(COLL2,
"conf", 2, 2);
create.setMaxShardsPerNode(1);
create.process(solrClient);
CloudUtil.waitForState(cloudManager, COLL1, 60, TimeUnit.SECONDS, clusterShape(2, 2));
CloudUtil.waitForState(cloudManager, COLL2, 60, TimeUnit.SECONDS, clusterShape(2, 2));
double rate = 1.0;
URL baseUrl = targetNode.getBaseUrl();
long waitForSeconds = 5 + random().nextInt(5);
Map<String, Object> props = createTriggerProps(Arrays.asList(COLL1, COLL2), waitForSeconds, rate, -1);
final List<TriggerEvent> events = new ArrayList<>();
try (SearchRateTrigger trigger = new SearchRateTrigger("search_rate_trigger")) {
trigger.configure(loader, cloudManager, props);
trigger.init();
trigger.setProcessor(noFirstRunProcessor);
trigger.run();
trigger.setProcessor(event -> events.add(event));
// generate replica traffic
String coreName = container.getLoadedCoreNames().iterator().next();
String url = baseUrl.toString() + "/" + coreName;
try (HttpSolrClient simpleClient = new HttpSolrClient.Builder(url).build()) {
SolrParams query = params(CommonParams.Q, "*:*", CommonParams.DISTRIB, "false");
for (int i = 0; i < 130; i++) {
simpleClient.query(query);
}
String registryCoreName = coreName.replaceFirst("_", ".").replaceFirst("_", ".");
SolrMetricManager manager = targetNode.getCoreContainer().getMetricManager();
MetricRegistry registry = manager.registry("solr.core."+registryCoreName);
TimeOut timeOut = new TimeOut(10, TimeUnit.SECONDS, TimeSource.NANO_TIME);
// If we getting the rate too early, it will return 0
timeOut.waitFor("Timeout waiting for rate is not zero",
() -> registry.timer("QUERY./select.requestTimes").getOneMinuteRate()!=0.0);
trigger.run();
// waitFor delay
assertEquals(0, events.size());
Thread.sleep(waitForSeconds * 1000);
// should generate replica event
trigger.run();
assertEquals(1, events.size());
TriggerEvent event = events.get(0);
assertEquals(TriggerEventType.SEARCHRATE, event.eventType);
List<ReplicaInfo> infos = (List<ReplicaInfo>)event.getProperty(SearchRateTrigger.HOT_REPLICAS);
assertEquals(1, infos.size());
ReplicaInfo info = infos.get(0);
assertEquals(coreName, info.getCore());
assertTrue((Double)info.getVariable(AutoScalingParams.RATE) > rate);
}
// close that jetty to remove the violation - alternatively wait for 1 min...
JettySolrRunner j = cluster.stopJettySolrRunner(1);
cluster.waitForJettyToStop(j);
events.clear();
SolrParams query = params(CommonParams.Q, "*:*");
for (int i = 0; i < 130; i++) {
solrClient.query(COLL1, query);
}
Thread.sleep(waitForSeconds * 1000);
trigger.run();
// should generate collection event
assertEquals(1, events.size());
TriggerEvent event = events.get(0);
Map<String, Double> hotCollections = (Map<String, Double>)event.getProperty(SearchRateTrigger.HOT_COLLECTIONS);
assertEquals(1, hotCollections.size());
Double Rate = hotCollections.get(COLL1);
assertNotNull(Rate);
assertTrue(Rate > rate);
events.clear();
for (int i = 0; i < 150; i++) {
solrClient.query(COLL2, query);
solrClient.query(COLL1, query);
}
Thread.sleep(waitForSeconds * 1000);
trigger.run();
// should generate collection event but not for COLL2 because of waitFor
assertEquals(1, events.size());
event = events.get(0);
Map<String, Double> hotNodes = (Map<String, Double>)event.getProperty(SearchRateTrigger.HOT_NODES);
assertTrue("hotNodes", hotNodes.isEmpty());
hotNodes.forEach((n, r) -> assertTrue(n, r > rate));
hotCollections = (Map<String, Double>)event.getProperty(SearchRateTrigger.HOT_COLLECTIONS);
assertEquals(1, hotCollections.size());
Rate = hotCollections.get(COLL1);
assertNotNull(Rate);
events.clear();
// assert that waitFor prevents new events from being generated
trigger.run();
// should not generate any events
assertEquals(0, events.size());
Thread.sleep(waitForSeconds * 1000 * 2);
trigger.run();
// should generate collection event
assertEquals(1, events.size());
event = events.get(0);
hotCollections = (Map<String, Double>)event.getProperty(SearchRateTrigger.HOT_COLLECTIONS);
assertEquals(2, hotCollections.size());
Rate = hotCollections.get(COLL1);
assertNotNull(Rate);
Rate = hotCollections.get(COLL2);
assertNotNull(Rate);
hotNodes = (Map<String, Double>)event.getProperty(SearchRateTrigger.HOT_NODES);
assertTrue("hotNodes", hotNodes.isEmpty());
}
}
private static final AtomicDouble mockRate = new AtomicDouble();
@Test
@SuppressWarnings({"unchecked"})
public void testWaitForElapsed() throws Exception {
SolrResourceLoader loader = cluster.getJettySolrRunner(0).getCoreContainer().getResourceLoader();
CloudSolrClient solrClient = cluster.getSolrClient();
SolrZkClient zkClient = solrClient.getZkStateReader().getZkClient();
SolrCloudManager cloudManager = new SolrClientCloudManager(new ZkDistributedQueueFactory(zkClient), solrClient) {
@Override
public NodeStateProvider getNodeStateProvider() {
return new SolrClientNodeStateProvider(solrClient) {
@Override
public Map<String, Object> getNodeValues(String node, Collection<String> tags) {
Map<String, Object> values = super.getNodeValues(node, tags);
values.keySet().forEach(k -> {
values.replace(k, mockRate.get());
});
return values;
}
};
}
};
TimeSource timeSource = cloudManager.getTimeSource();
CollectionAdminRequest.Create create = CollectionAdminRequest.createCollection(COLL1,
"conf", 2, 2);
create.setMaxShardsPerNode(1);
create.process(solrClient);
CloudUtil.waitForState(cloudManager, COLL1, 60, TimeUnit.SECONDS, clusterShape(2, 4));
long waitForSeconds = 5 + random().nextInt(5);
Map<String, Object> props = createTriggerProps(Arrays.asList(COLL1, COLL2), waitForSeconds, 1.0, 0.1);
final List<TriggerEvent> events = new ArrayList<>();
try (SearchRateTrigger trigger = new SearchRateTrigger("search_rate_trigger1")) {
trigger.configure(loader, cloudManager, props);
trigger.init();
trigger.setProcessor(noFirstRunProcessor);
trigger.run();
trigger.setProcessor(event -> events.add(event));
// set mock rates
mockRate.set(2.0);
TimeOut timeOut = new TimeOut(waitForSeconds + 2, TimeUnit.SECONDS, timeSource);
// simulate ScheduledTriggers
while (!timeOut.hasTimedOut()) {
trigger.run();
timeSource.sleep(1000);
}
// violation persisted longer than waitFor - there should be events
assertTrue(events.toString(), events.size() > 0);
TriggerEvent event = events.get(0);
assertEquals(event.toString(), TriggerEventType.SEARCHRATE, event.eventType);
Map<String, Object> hotNodes, hotCollections, hotShards;
List<ReplicaInfo> hotReplicas;
hotNodes = (Map<String, Object>)event.properties.get(SearchRateTrigger.HOT_NODES);
hotCollections = (Map<String, Object>)event.properties.get(SearchRateTrigger.HOT_COLLECTIONS);
hotShards = (Map<String, Object>)event.properties.get(SearchRateTrigger.HOT_SHARDS);
hotReplicas = (List<ReplicaInfo>)event.properties.get(SearchRateTrigger.HOT_REPLICAS);
assertTrue("no hot nodes?", hotNodes.isEmpty());
assertFalse("no hot collections?", hotCollections.isEmpty());
assertFalse("no hot shards?", hotShards.isEmpty());
assertFalse("no hot replicas?", hotReplicas.isEmpty());
}
mockRate.set(0.0);
events.clear();
try (SearchRateTrigger trigger = new SearchRateTrigger("search_rate_trigger2")) {
trigger.configure(loader, cloudManager, props);
trigger.init();
trigger.setProcessor(noFirstRunProcessor);
trigger.run();
trigger.setProcessor(event -> events.add(event));
mockRate.set(2.0);
trigger.run();
// waitFor not elapsed
assertTrue(events.toString(), events.isEmpty());
Thread.sleep(1000);
trigger.run();
assertTrue(events.toString(), events.isEmpty());
Thread.sleep(1000);
mockRate.set(0.0);
trigger.run();
Thread.sleep(TimeUnit.MILLISECONDS.convert(waitForSeconds - 2, TimeUnit.SECONDS));
trigger.run();
// violations persisted shorter than waitFor - there should be no events
assertTrue(events.toString(), events.isEmpty());
}
}
@Test
public void testDefaultsAndBackcompat() throws Exception {
Map<String, Object> props = new HashMap<>();
props.put("rate", 1.0);
props.put("collection", "test");
SolrResourceLoader loader = cluster.getJettySolrRunner(0).getCoreContainer().getResourceLoader();
SolrZkClient zkClient = cluster.getSolrClient().getZkStateReader().getZkClient();
SolrCloudManager cloudManager = new SolrClientCloudManager(new ZkDistributedQueueFactory(zkClient), cluster.getSolrClient());
try (SearchRateTrigger trigger = new SearchRateTrigger("search_rate_trigger2")) {
trigger.configure(loader, cloudManager, props);
Map<String, Object> config = trigger.getConfig();
@SuppressWarnings({"unchecked"})
Set<String> collections = (Set<String>)config.get(SearchRateTrigger.COLLECTIONS_PROP);
assertEquals(collections.toString(), 1, collections.size());
assertEquals("test", collections.iterator().next());
assertEquals("#ANY", config.get(AutoScalingParams.SHARD));
assertEquals("#ANY", config.get(AutoScalingParams.NODE));
assertEquals(1.0, config.get(SearchRateTrigger.ABOVE_RATE_PROP));
assertEquals(-1.0, config.get(SearchRateTrigger.BELOW_RATE_PROP));
assertEquals(SearchRateTrigger.DEFAULT_METRIC, config.get(SearchRateTrigger.METRIC_PROP));
assertEquals(SearchRateTrigger.DEFAULT_MAX_OPS, config.get(SearchRateTrigger.MAX_OPS_PROP));
assertNull(config.get(SearchRateTrigger.MIN_REPLICAS_PROP));
assertEquals(CollectionParams.CollectionAction.ADDREPLICA, config.get(SearchRateTrigger.ABOVE_OP_PROP));
assertEquals(CollectionParams.CollectionAction.MOVEREPLICA, config.get(SearchRateTrigger.ABOVE_NODE_OP_PROP));
assertEquals(CollectionParams.CollectionAction.DELETEREPLICA, config.get(SearchRateTrigger.BELOW_OP_PROP));
assertNull(config.get(SearchRateTrigger.BELOW_NODE_OP_PROP));
}
}
private Map<String, Object> createTriggerProps(List<String> collections, long waitForSeconds, double aboveRate, double belowRate) {
Map<String, Object> props = new HashMap<>();
props.put("aboveRate", aboveRate);
props.put("belowRate", belowRate);
props.put("event", "searchRate");
props.put("waitFor", waitForSeconds);
props.put("enabled", true);
if (collections != null && !collections.isEmpty()) {
props.put("collections", String.join(",", collections));
}
List<Map<String, String>> actions = new ArrayList<>(3);
Map<String, String> map = new HashMap<>(2);
map.put("name", "compute_plan");
map.put("class", "solr.ComputePlanAction");
actions.add(map);
map = new HashMap<>(2);
map.put("name", "execute_plan");
map.put("class", "solr.ExecutePlanAction");
actions.add(map);
props.put("actions", actions);
return props;
}
}