blob: 921a4962d11035a71aeaf40a668bf38c167c8d98 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.solr.cloud.autoscaling.sim;
import java.lang.invoke.MethodHandles;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Locale;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.stream.Collectors;
import org.apache.commons.math3.stat.descriptive.SummaryStatistics;
import org.apache.lucene.util.TestUtil;
import org.apache.solr.client.solrj.SolrClient;
import org.apache.solr.client.solrj.cloud.autoscaling.Suggester;
import org.apache.solr.client.solrj.cloud.autoscaling.TriggerEventProcessorStage;
import org.apache.solr.client.solrj.cloud.autoscaling.TriggerEventType;
import org.apache.solr.client.solrj.cloud.autoscaling.Variable;
import org.apache.solr.client.solrj.request.CollectionAdminRequest;
import org.apache.solr.client.solrj.request.UpdateRequest;
import org.apache.solr.cloud.CloudTestUtils;
import org.apache.solr.cloud.CloudUtil;
import org.apache.solr.cloud.autoscaling.ActionContext;
import org.apache.solr.cloud.autoscaling.CapturedEvent;
import org.apache.solr.cloud.autoscaling.ComputePlanAction;
import org.apache.solr.cloud.autoscaling.ExecutePlanAction;
import org.apache.solr.cloud.autoscaling.SearchRateTrigger;
import org.apache.solr.cloud.autoscaling.TriggerActionBase;
import org.apache.solr.cloud.autoscaling.TriggerEvent;
import org.apache.solr.cloud.autoscaling.TriggerListenerBase;
import org.apache.solr.common.SolrInputDocument;
import org.apache.solr.common.cloud.ClusterState;
import org.apache.solr.common.cloud.DocCollection;
import org.apache.solr.common.cloud.Replica;
import org.apache.solr.common.params.CollectionParams;
import org.apache.solr.common.util.Pair;
import org.apache.solr.common.util.TimeSource;
import org.apache.solr.common.util.Utils;
import org.apache.solr.util.LogLevel;
import org.apache.solr.util.TimeOut;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
*
*/
@LogLevel("org.apache.solr.cloud.autoscaling=DEBUG")
public class TestSimLargeCluster extends SimSolrCloudTestCase {
private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
public static final int SPEED = 100;
public static final int NUM_NODES = 100;
static final Map<String, List<CapturedEvent>> listenerEvents = new ConcurrentHashMap<>();
static final AtomicInteger triggerFinishedCount = new AtomicInteger();
static final AtomicInteger triggerStartedCount = new AtomicInteger();
static volatile CountDownLatch triggerStartedLatch;
static volatile CountDownLatch triggerFinishedLatch;
static volatile CountDownLatch listenerEventLatch;
static volatile int waitForSeconds;
@After
public void tearDownTest() throws Exception {
shutdownCluster();
}
@Before
public void setupTest() throws Exception {
configureCluster(NUM_NODES, TimeSource.get("simTime:" + SPEED));
// disable metrics history collection
cluster.disableMetricsHistory();
// disable .scheduled_maintenance (once it exists)
CloudTestUtils.waitForTriggerToBeScheduled(cluster, ".scheduled_maintenance");
CloudTestUtils.suspendTrigger(cluster, ".scheduled_maintenance");
// disable .auto_add_replicas (once it exists)
CloudTestUtils.waitForTriggerToBeScheduled(cluster, ".auto_add_replicas");
CloudTestUtils.suspendTrigger(cluster, ".auto_add_replicas");
cluster.getSimClusterStateProvider().createSystemCollection();
waitForSeconds = 5;
triggerStartedCount.set(0);
triggerFinishedCount.set(0);
triggerStartedLatch = new CountDownLatch(1);
triggerFinishedLatch = new CountDownLatch(1);
// by default assume we want to allow a (virtually) unbounded amount of events,
// tests that expect a specific number can override
listenerEventLatch = new CountDownLatch(Integer.MAX_VALUE);
listenerEvents.clear();
}
public static class TestTriggerListener extends TriggerListenerBase {
@Override
public synchronized void onEvent(TriggerEvent event, TriggerEventProcessorStage stage, String actionName,
ActionContext context, Throwable error, String message) {
CapturedEvent ev = new CapturedEvent(cluster.getTimeSource().getTimeNs(), context, config, stage, actionName, event, message);
final CountDownLatch latch = listenerEventLatch;
synchronized (latch) {
if (0 == latch.getCount()) {
log.warn("Ignoring captured event since latch is 'full': {}", ev);
} else {
List<CapturedEvent> lst = listenerEvents.computeIfAbsent(config.name, s -> new ArrayList<>());
lst.add(ev);
latch.countDown();
}
}
}
}
public static class FinishTriggerAction extends TriggerActionBase {
@Override
public void process(TriggerEvent event, ActionContext context) throws Exception {
triggerFinishedCount.incrementAndGet();
triggerFinishedLatch.countDown();
}
}
public static class StartTriggerAction extends TriggerActionBase {
@Override
public void process(TriggerEvent event, ActionContext context) throws Exception {
triggerStartedCount.incrementAndGet();
triggerStartedLatch.countDown();
}
}
@Test
@AwaitsFix(bugUrl="https://issues.apache.org/jira/browse/SOLR-12028") // this test hits a timeout easily
public void testBasic() throws Exception {
SolrClient solrClient = cluster.simGetSolrClient();
assertAutoScalingRequest
( "{" +
"'set-trigger' : {" +
"'name' : 'node_lost_trigger1'," +
"'event' : 'nodeLost'," +
"'waitFor' : '" + waitForSeconds + "s'," +
"'enabled' : true," +
"'actions' : [" +
"{'name':'start','class':'" + StartTriggerAction.class.getName() + "'}," +
"{'name':'compute','class':'" + ComputePlanAction.class.getName() + "'}," +
"{'name':'execute','class':'" + ExecutePlanAction.class.getName() + "'}," +
"{'name':'test','class':'" + FinishTriggerAction.class.getName() + "'}" +
"]" +
"}}");
assertAutoScalingRequest
( "{" +
"'set-listener' : " +
"{" +
"'name' : 'foo'," +
"'trigger' : 'node_lost_trigger1'," +
"'stage' : ['STARTED','ABORTED','SUCCEEDED', 'FAILED']," +
"'beforeAction' : ['compute', 'execute']," +
"'afterAction' : ['compute', 'execute']," +
"'class' : '" + TestTriggerListener.class.getName() + "'" +
"}" +
"}");
assertAutoscalingUpdateComplete();
// pick a few random nodes
List<String> nodes = new ArrayList<>();
int limit = 75;
for (String node : cluster.getClusterStateProvider().getLiveNodes()) {
nodes.add(node);
if (nodes.size() > limit) {
break;
}
}
Collections.shuffle(nodes, random());
// create collection on these nodes
String collectionName = "testBasic";
CollectionAdminRequest.Create create = CollectionAdminRequest.createCollection(collectionName,
"conf", 5, 5, 5, 5);
create.setMaxShardsPerNode(1);
create.setAutoAddReplicas(false);
create.setCreateNodeSet(String.join(",", nodes));
create.process(solrClient);
if (log.isInfoEnabled()) {
log.info("Ready after {} ms", CloudUtil.waitForState(cluster, collectionName, 30 * nodes.size(), TimeUnit.SECONDS,
CloudUtil.clusterShape(5, 15, false, true)));
}
int KILL_NODES = 8;
// kill off a number of nodes
for (int i = 0; i < KILL_NODES; i++) {
cluster.simRemoveNode(nodes.get(i), false);
}
// should fully recover
if (log.isInfoEnabled()) {
log.info("Ready after {} ms", CloudUtil.waitForState(cluster, collectionName, 90 * KILL_NODES, TimeUnit.SECONDS,
CloudUtil.clusterShape(5, 15, false, true)));
log.info("OP COUNTS: {}", cluster.simGetOpCounts()); // logOk
}
long moveReplicaOps = cluster.simGetOpCount(CollectionParams.CollectionAction.MOVEREPLICA.name());
// simulate a number of flaky nodes
int FLAKY_NODES = 10;
int flakyReplicas = 0;
for (int cnt = 0; cnt < 10; cnt++) {
for (int i = KILL_NODES; i < KILL_NODES + FLAKY_NODES; i++) {
flakyReplicas += cluster.getSimClusterStateProvider().simGetReplicaInfos(nodes.get(i))
.stream().filter(r -> r.getState().equals(Replica.State.ACTIVE)).count();
cluster.simRemoveNode(nodes.get(i), false);
}
cluster.getTimeSource().sleep(TimeUnit.SECONDS.toMillis(waitForSeconds) * 2);
for (int i = KILL_NODES; i < KILL_NODES + FLAKY_NODES; i++) {
final String nodeId = nodes.get(i);
cluster.submit(() -> cluster.getSimClusterStateProvider().simRestoreNode(nodeId));
}
}
// wait until started == finished
TimeOut timeOut = new TimeOut(20 * waitForSeconds * NUM_NODES, TimeUnit.SECONDS, cluster.getTimeSource());
while (!timeOut.hasTimedOut()) {
if (triggerStartedCount.get() == triggerFinishedCount.get()) {
break;
}
timeOut.sleep(1000);
}
if (timeOut.hasTimedOut()) {
fail("did not finish processing all events in time: started=" + triggerStartedCount.get() + ", finished=" + triggerFinishedCount.get());
}
if (log.isInfoEnabled()) {
log.info("Ready after {} ms", CloudUtil.waitForState(cluster, collectionName, 30 * nodes.size(), TimeUnit.SECONDS,
CloudUtil.clusterShape(5, 15, false, true)));
}
long newMoveReplicaOps = cluster.simGetOpCount(CollectionParams.CollectionAction.MOVEREPLICA.name());
if (log.isInfoEnabled()) {
log.info("==== Flaky replicas: {}. Additional MOVEREPLICA count: {}", flakyReplicas, (newMoveReplicaOps - moveReplicaOps));
}
// flaky nodes lead to a number of MOVEREPLICA that is non-zero but lower than the number of flaky replicas
assertTrue("there should be new MOVERPLICA ops", newMoveReplicaOps - moveReplicaOps > 0);
assertTrue("there should be less than flakyReplicas=" + flakyReplicas + " MOVEREPLICA ops",
newMoveReplicaOps - moveReplicaOps < flakyReplicas);
}
@Test
public void testCreateLargeSimCollections() throws Exception {
SolrClient solrClient = cluster.simGetSolrClient();
final int numCollections = atLeast(5);
for (int i = 0; i < numCollections; i++) {
// wide and shallow, or deep and narrow...
final int numShards = TestUtil.nextInt(random(), 5, 20);
final int nReps = TestUtil.nextInt(random(), 2, 25 - numShards);
final int tReps = TestUtil.nextInt(random(), 2, 25 - numShards);
final int pReps = TestUtil.nextInt(random(), 2, 25 - numShards);
final int repsPerShard = (nReps + tReps + pReps);
final int totalCores = repsPerShard * numShards;
final int maxShardsPerNode = atLeast(2) + (totalCores / NUM_NODES);
final String name = "large_sim_collection" + i;
final CollectionAdminRequest.Create create = CollectionAdminRequest.createCollection
(name, "conf", numShards, nReps, tReps, pReps);
create.setMaxShardsPerNode(maxShardsPerNode);
create.setAutoAddReplicas(false);
log.info("CREATE: {}", create);
create.process(solrClient);
// Since our current goal is to try and find situations where cores are just flat out missing
// no matter how long we wait, let's be excessive and generous in our timeout.
// (REMINDER: this uses the cluster's timesource, and ADDREPLICA has a hardcoded delay of 500ms)
CloudUtil.waitForState(cluster, name, 2 * totalCores, TimeUnit.SECONDS,
CloudUtil.clusterShape(numShards, repsPerShard, false, true));
final CollectionAdminRequest.Delete delete = CollectionAdminRequest.deleteCollection(name);
log.info("DELETE: {}", delete);
delete.process(solrClient);
}
}
@Test
public void testAddNode() throws Exception {
SolrClient solrClient = cluster.simGetSolrClient();
assertAutoScalingRequest
( "{" +
"'set-trigger' : {" +
"'name' : 'node_added_trigger2'," +
"'event' : 'nodeAdded'," +
"'waitFor' : '" + waitForSeconds + "s'," +
"'enabled' : true," +
"'actions' : [" +
"{'name':'start','class':'" + StartTriggerAction.class.getName() + "'}," +
"{'name':'compute','class':'" + ComputePlanAction.class.getName() + "'}," +
"{'name':'execute','class':'" + ExecutePlanAction.class.getName() + "'}," +
"{'name':'test','class':'" + FinishTriggerAction.class.getName() + "'}" +
"]" +
"}}");
assertAutoscalingUpdateComplete();
// create a collection with more than 1 replica per node
String collectionName = "testNodeAdded";
CollectionAdminRequest.Create create = CollectionAdminRequest.createCollection(collectionName,
"conf", NUM_NODES / 10, NUM_NODES / 8, NUM_NODES / 8, NUM_NODES / 8);
create.setMaxShardsPerNode(5);
create.setAutoAddReplicas(false);
create.process(solrClient);
if (log.isInfoEnabled()) {
log.info("Ready after {} ms", CloudUtil.waitForState(cluster, collectionName, 90 * NUM_NODES, TimeUnit.SECONDS,
CloudUtil.clusterShape(NUM_NODES / 10, NUM_NODES / 8 * 3, false, true)));
}
// start adding nodes
int numAddNode = NUM_NODES / 5;
List<String> addNodesList = new ArrayList<>(numAddNode);
for (int i = 0; i < numAddNode; i++) {
addNodesList.add(cluster.simAddNode());
}
// wait until at least one event is generated
assertTrue("Trigger did not start even after await()ing an excessive amount of time",
triggerStartedLatch.await(60, TimeUnit.SECONDS));
// wait until started == finished
TimeOut timeOut = new TimeOut(45 * waitForSeconds * NUM_NODES, TimeUnit.SECONDS, cluster.getTimeSource());
while (!timeOut.hasTimedOut()) {
final int started = triggerStartedCount.get();
final int finished = triggerFinishedCount.get();
log.info("started={} =?= finished={}", started, finished);
if (triggerStartedCount.get() == triggerFinishedCount.get()) {
log.info("started == finished: {} == {}", started, finished);
break;
}
timeOut.sleep(1000);
}
if (timeOut.hasTimedOut()) {
fail("did not finish processing all events in time: started=" + triggerStartedCount.get() + ", finished=" + triggerFinishedCount.get());
}
List<SolrInputDocument> systemColl = cluster.simGetSystemCollection();
int startedEventPos = -1;
for (int i = 0; i < systemColl.size(); i++) {
SolrInputDocument d = systemColl.get(i);
if (!"node_added_trigger2".equals(d.getFieldValue("event.source_s"))) {
continue;
}
if ("NODEADDED".equals(d.getFieldValue("event.type_s")) &&
"STARTED".equals(d.getFieldValue("stage_s"))) {
startedEventPos = i;
break;
}
}
assertTrue("no STARTED event", startedEventPos > -1);
SolrInputDocument startedEvent = systemColl.get(startedEventPos);
int lastIgnoredPos = startedEventPos;
// make sure some replicas have been moved
long lastNumOps = cluster.simGetOpCount("MOVEREPLICA");
log.info("1st check: lastNumOps (MOVEREPLICA) = {}", lastNumOps);
assertTrue("no MOVEREPLICA ops?", lastNumOps > 0);
if (log.isInfoEnabled()) {
log.info("Ready after {} ms", CloudUtil.waitForState(cluster, collectionName, 20 * NUM_NODES, TimeUnit.SECONDS,
CloudUtil.clusterShape(NUM_NODES / 10, NUM_NODES / 8 * 3, false, true)));
}
int count = 1000;
SolrInputDocument finishedEvent = null;
lastNumOps = cluster.simGetOpCount("MOVEREPLICA");
log.info("2nd check: lastNumOps (MOVEREPLICA) = {}", lastNumOps);
while (count-- > 0) {
cluster.getTimeSource().sleep(10000);
if (cluster.simGetOpCount("MOVEREPLICA") < 2) {
log.info("MOVEREPLICA < 2");
continue;
}
long currentNumOps = cluster.simGetOpCount("MOVEREPLICA");
if (currentNumOps == lastNumOps) {
int size = systemColl.size() - 1;
for (int i = size; i > lastIgnoredPos; i--) {
SolrInputDocument d = systemColl.get(i);
if (!"node_added_trigger2".equals(d.getFieldValue("event.source_s"))) {
continue;
}
if ("SUCCEEDED".equals(d.getFieldValue("stage_s"))) {
finishedEvent = d;
log.info("finishedEvent = {}", finishedEvent);
break;
}
}
log.info("breaking because currentNumOps == lastNumOps == {}", currentNumOps);
break;
} else {
lastNumOps = currentNumOps;
}
}
assertNotNull("did not finish processing changes", finishedEvent);
long delta = (Long) finishedEvent.getFieldValue("event.time_l") - (Long) startedEvent.getFieldValue("event.time_l");
if (log.isInfoEnabled()) {
log.info("#### System stabilized after {} ms", TimeUnit.NANOSECONDS.toMillis(delta));
}
assertTrue("unexpected number of MOVEREPLICA ops: " + cluster.simGetOpCount("MOVEREPLICA"),
cluster.simGetOpCount("MOVEREPLICA") > 1);
}
@Test
public void testNodeLost() throws Exception {
doTestNodeLost(waitForSeconds, 5000, 0);
}
// Renard R5 series - evenly covers a log10 range
private static final int[] renard5 = new int[] {
1, 2, 3, 4, 6,
10
};
private static final int[] renard5x = new int[] {
1, 2, 3, 4, 6,
10, 16, 25, 40, 63,
100
};
private static final int[] renard5xx = new int[] {
1, 2, 3, 4, 6,
10, 16, 25, 40, 63,
100, 158, 251, 398, 631,
1000, 1585, 2512, 3981, 6310,
10000
};
// Renard R10 series
private static final double[] renard10 = new double[] {
1, 1.3, 1.6, 2, 2.5, 3.2, 4, 5, 6.3, 7.9,
10
};
private static final double[] renard10x = new double[] {
1, 1.3, 1.6, 2, 2.5, 3.2, 4, 5, 6.3, 7.9,
10, 12.6, 15.8, 20, 25.1, 31.6, 39.8, 50.1, 63.1, 79.4,
100
};
private static final AtomicInteger ZERO = new AtomicInteger(0);
//@Test
public void benchmarkNodeLost() throws Exception {
List<String> results = new ArrayList<>();
for (int wait : renard5x) {
for (int delay : renard5x) {
SummaryStatistics totalTime = new SummaryStatistics();
SummaryStatistics ignoredOurEvents = new SummaryStatistics();
SummaryStatistics ignoredOtherEvents = new SummaryStatistics();
SummaryStatistics startedOurEvents = new SummaryStatistics();
SummaryStatistics startedOtherEvents = new SummaryStatistics();
for (int i = 0; i < 5; i++) {
if (cluster != null) {
cluster.close();
}
setUp();
setupTest();
long total = doTestNodeLost(wait, delay * 1000, 0);
totalTime.addValue(total);
// get event counts
Map<String, Map<String, AtomicInteger>> counts = cluster.simGetEventCounts();
Map<String, AtomicInteger> map = counts.remove("node_lost_trigger");
startedOurEvents.addValue(map.getOrDefault("STARTED", ZERO).get());
ignoredOurEvents.addValue(map.getOrDefault("IGNORED", ZERO).get());
int otherStarted = 0;
int otherIgnored = 0;
for (Map<String, AtomicInteger> m : counts.values()) {
otherStarted += m.getOrDefault("STARTED", ZERO).get();
otherIgnored += m.getOrDefault("IGNORED", ZERO).get();
}
startedOtherEvents.addValue(otherStarted);
ignoredOtherEvents.addValue(otherIgnored);
}
results.add(String.format(Locale.ROOT, "%d\t%d\t%4.0f\t%4.0f\t%4.0f\t%4.0f\t%6.0f\t%6.0f\t%6.0f\t%6.0f\t%6.0f",
wait, delay, startedOurEvents.getMean(), ignoredOurEvents.getMean(),
startedOtherEvents.getMean(), ignoredOtherEvents.getMean(),
totalTime.getMin(), totalTime.getMax(), totalTime.getMean(), totalTime.getStandardDeviation(), totalTime.getVariance()));
}
}
log.info("===== RESULTS ======");
log.info("waitFor\tdelay\tSTRT\tIGN\toSTRT\toIGN\tmin\tmax\tmean\tstdev\tvar");
if (log.isInfoEnabled()) {
results.forEach(s -> log.info(s));
}
}
private long doTestNodeLost(int waitFor, long killDelay, int minIgnored) throws Exception {
SolrClient solrClient = cluster.simGetSolrClient();
assertAutoScalingRequest
( "{" +
"'set-trigger' : {" +
"'name' : 'node_lost_trigger3'," +
"'event' : 'nodeLost'," +
"'waitFor' : '" + waitFor + "s'," +
"'enabled' : true," +
"'actions' : [" +
"{'name':'start','class':'" + StartTriggerAction.class.getName() + "'}," +
"{'name':'compute','class':'" + ComputePlanAction.class.getName() + "'}," +
"{'name':'execute','class':'" + ExecutePlanAction.class.getName() + "'}," +
"{'name':'test','class':'" + FinishTriggerAction.class.getName() + "'}" +
"]" +
"}}");
assertAutoScalingRequest
( "{" +
"'set-listener' : " +
"{" +
"'name' : 'failures'," +
"'trigger' : 'node_lost_trigger3'," +
"'stage' : ['FAILED']," +
"'class' : '" + TestTriggerListener.class.getName() + "'" +
"}" +
"}");
assertAutoscalingUpdateComplete();
// create a collection with 1 replica per node
String collectionName = "testNodeLost";
CollectionAdminRequest.Create create = CollectionAdminRequest.createCollection(collectionName,
"conf", NUM_NODES / 5, NUM_NODES / 10);
create.setMaxShardsPerNode(5);
create.setAutoAddReplicas(false);
create.process(solrClient);
if (log.isInfoEnabled()) {
log.info("Ready after {} ms", CloudUtil.waitForState(cluster, collectionName, 60 * NUM_NODES, TimeUnit.SECONDS,
CloudUtil.clusterShape(NUM_NODES / 5, NUM_NODES / 10, false, true)));
}
// start killing nodes
int numNodes = NUM_NODES / 5;
List<String> nodes = new ArrayList<>(cluster.getLiveNodesSet().get());
for (int i = 0; i < numNodes; i++) {
// this may also select a node where a replica is moved to, so the total number of
// MOVEREPLICA may vary
cluster.simRemoveNode(nodes.get(i), false);
cluster.getTimeSource().sleep(killDelay);
}
// wait for the trigger to fire and complete at least once
assertTrue("Trigger did not finish even after await()ing an excessive amount of time",
triggerFinishedLatch.await(60, TimeUnit.SECONDS));
List<SolrInputDocument> systemColl = cluster.simGetSystemCollection();
int startedEventPos = -1;
for (int i = 0; i < systemColl.size(); i++) {
SolrInputDocument d = systemColl.get(i);
if (!"node_lost_trigger3".equals(d.getFieldValue("event.source_s"))) {
continue;
}
if ("NODELOST".equals(d.getFieldValue("event.type_s")) &&
"STARTED".equals(d.getFieldValue("stage_s"))) {
startedEventPos = i;
break;
}
}
// TODO we may not even have a .system collection because the message of node going down is interrupted on the executor
// by the OverseerTriggerThread executors being interrupted on Overseer restart
if (systemColl.size() > 0) {
return 0;
}
assertTrue("no STARTED event: " + systemColl + ", " +
"waitFor=" + waitFor + ", killDelay=" + killDelay + ", minIgnored=" + minIgnored,
startedEventPos > -1);
SolrInputDocument startedEvent = systemColl.get(startedEventPos);
// we can expect some failures when target node in MOVEREPLICA has been killed
// between when the event processing started and the actual moment of MOVEREPLICA execution
// wait until started == (finished + failed)
TimeOut timeOut = new TimeOut(20 * waitFor * NUM_NODES, TimeUnit.SECONDS, cluster.getTimeSource());
while (!timeOut.hasTimedOut()) {
if (triggerStartedCount.get() == triggerFinishedCount.get()) {
break;
}
if (log.isDebugEnabled()) {
log.debug("started={}, finished={}, failed={}", triggerStartedCount.get(), triggerFinishedCount.get()
, listenerEvents.size());
}
timeOut.sleep(1000);
}
if (timeOut.hasTimedOut()) {
if (triggerStartedCount.get() > triggerFinishedCount.get() + listenerEvents.size()) {
fail("did not finish processing all events in time: started=" + triggerStartedCount.get() + ", finished=" + triggerFinishedCount.get() +
", failed=" + listenerEvents.size());
}
}
int ignored = 0;
int lastIgnoredPos = startedEventPos;
for (int i = startedEventPos + 1; i < systemColl.size(); i++) {
SolrInputDocument d = systemColl.get(i);
if (!"node_lost_trigger3".equals(d.getFieldValue("event.source_s"))) {
continue;
}
if ("NODELOST".equals(d.getFieldValue("event.type_s"))) {
if ("IGNORED".equals(d.getFieldValue("stage_s"))) {
ignored++;
lastIgnoredPos = i;
}
}
}
assertTrue("should be at least " + minIgnored + " IGNORED events, " +
"waitFor=" + waitFor + ", killDelay=" + killDelay + ", minIgnored=" + minIgnored,
ignored >= minIgnored);
// make sure some replicas have been moved
assertTrue("no MOVEREPLICA ops? " +
"waitFor=" + waitFor + ", killDelay=" + killDelay + ", minIgnored=" + minIgnored,
cluster.simGetOpCount("MOVEREPLICA") > 0);
if (listenerEvents.isEmpty()) {
// no failed movements - verify collection shape
if (log.isInfoEnabled()) {
log.info("Ready after {} ms", CloudUtil.waitForState(cluster, collectionName, 20 * NUM_NODES, TimeUnit.SECONDS,
CloudUtil.clusterShape(NUM_NODES / 5, NUM_NODES / 10, false, true)));
}
} else {
cluster.getTimeSource().sleep(NUM_NODES * 100);
}
int count = 50;
SolrInputDocument finishedEvent = null;
long lastNumOps = cluster.simGetOpCount("MOVEREPLICA");
while (count-- > 0) {
cluster.getTimeSource().sleep(waitFor * 10000);
long currentNumOps = cluster.simGetOpCount("MOVEREPLICA");
if (currentNumOps == lastNumOps) {
int size = systemColl.size() - 1;
for (int i = size; i > lastIgnoredPos; i--) {
SolrInputDocument d = systemColl.get(i);
if (!"node_lost_trigger3".equals(d.getFieldValue("event.source_s"))) {
continue;
}
if ("SUCCEEDED".equals(d.getFieldValue("stage_s"))) {
finishedEvent = d;
break;
}
}
break;
} else {
lastNumOps = currentNumOps;
}
}
assertTrue("did not finish processing changes, " +
"waitFor=" + waitFor + ", killDelay=" + killDelay + ", minIgnored=" + minIgnored,
finishedEvent != null);
Long delta = 0L;
if (startedEvent != null) {
delta = (Long) finishedEvent.getFieldValue("event.time_l")
- (Long) startedEvent.getFieldValue("event.time_l");
delta = TimeUnit.NANOSECONDS.toMillis(delta);
log.info("#### System stabilized after {} ms", delta);
}
long ops = cluster.simGetOpCount("MOVEREPLICA");
long expectedMinOps = 40;
if (!listenerEvents.isEmpty()) {
expectedMinOps = 20;
}
assertTrue("unexpected number (" + expectedMinOps + ") of MOVEREPLICA ops: " + ops + ", " +
"waitFor=" + waitFor + ", killDelay=" + killDelay + ", minIgnored=" + minIgnored,
ops >= expectedMinOps);
return delta;
}
@Test
@SuppressWarnings({"unchecked"})
public void testSearchRate() throws Exception {
SolrClient solrClient = cluster.simGetSolrClient();
String collectionName = "testSearchRate";
CollectionAdminRequest.Create create = CollectionAdminRequest.createCollection(collectionName,
"conf", 2, 10);
create.process(solrClient);
if (log.isInfoEnabled()) {
log.info("Ready after {} ms", CloudUtil.waitForState(cluster, collectionName, 300, TimeUnit.SECONDS,
CloudUtil.clusterShape(2, 10, false, true)));
}
// collect the node names for shard1
Set<String> nodes = new HashSet<>();
cluster.getSimClusterStateProvider().getClusterState().getCollection(collectionName)
.getSlice("shard1")
.getReplicas()
.forEach(r -> nodes.add(r.getNodeName()));
String metricName = "QUERY./select.requestTimes:1minRate";
// simulate search traffic
cluster.getSimClusterStateProvider().simSetShardValue(collectionName, "shard1", metricName, 40, false, true);
// now define the trigger. doing it earlier may cause partial events to be generated (where only some
// nodes / replicas exceeded the threshold).
assertAutoScalingRequest
( "{" +
"'set-trigger' : {" +
"'name' : 'search_rate_trigger'," +
"'event' : 'searchRate'," +
"'waitFor' : '" + waitForSeconds + "s'," +
"'aboveRate' : 1.0," +
"'aboveNodeRate' : 1.0," +
"'enabled' : true," +
"'actions' : [" +
"{'name':'compute','class':'" + ComputePlanAction.class.getName() + "'}," +
"{'name':'execute','class':'" + ExecutePlanAction.class.getName() + "'}," +
"{'name':'test','class':'" + FinishTriggerAction.class.getName() + "'}" +
"]" +
"}}");
// we're going to expect our trigger listener to process exactly one captured event
listenerEventLatch = new CountDownLatch(1);
assertAutoScalingRequest
( "{" +
"'set-listener' : " +
"{" +
"'name' : 'srt'," +
"'trigger' : 'search_rate_trigger'," +
"'stage' : ['FAILED','SUCCEEDED']," +
"'class' : '" + TestTriggerListener.class.getName() + "'" +
"}" +
"}");
assertAutoscalingUpdateComplete();
assertTrue("Trigger did not finish even after await()ing an excessive amount of time",
triggerFinishedLatch.await(60, TimeUnit.SECONDS));
assertTrue("The listener didn't record the event even after await()ing an excessive amount of time",
listenerEventLatch.await(60, TimeUnit.SECONDS));
List<CapturedEvent> events = listenerEvents.get("srt");
assertNotNull("no srt events: " + listenerEvents.toString(), events);
assertEquals(events.toString(), 1, events.size());
CapturedEvent ev = events.get(0);
assertEquals(TriggerEventType.SEARCHRATE, ev.event.getEventType());
Map<String, Number> m = (Map<String, Number>)ev.event.getProperty(SearchRateTrigger.HOT_NODES);
assertNotNull(m);
assertEquals(nodes.size(), m.size());
assertEquals(nodes, m.keySet());
m.forEach((k, v) -> assertEquals(4.0, v.doubleValue(), 0.01));
List<TriggerEvent.Op> ops = (List<TriggerEvent.Op>)ev.event.getProperty(TriggerEvent.REQUESTED_OPS);
assertNotNull(ops);
assertEquals(ops.toString(), 1, ops.size());
ops.forEach(op -> {
assertEquals(CollectionParams.CollectionAction.ADDREPLICA, op.getAction());
assertEquals(1, op.getHints().size());
Object o = op.getHints().get(Suggester.Hint.COLL_SHARD);
// this may be a pair or a HashSet of pairs with size 1
Pair<String, String> hint = null;
if (o instanceof Pair) {
hint = (Pair<String, String>)o;
} else if (o instanceof Set) {
assertEquals("unexpected number of hints: " + o, 1, ((Set)o).size());
o = ((Set)o).iterator().next();
assertTrue("unexpected hint: " + o, o instanceof Pair);
hint = (Pair<String, String>)o;
} else {
fail("unexpected hints: " + o);
}
assertNotNull(hint);
assertEquals(collectionName, hint.first());
assertEquals("shard1", hint.second());
});
}
@Test
public void testFreediskTracking() throws Exception {
int NUM_DOCS = 100000;
String collectionName = "testFreeDisk";
SolrClient solrClient = cluster.simGetSolrClient();
CollectionAdminRequest.Create create = CollectionAdminRequest.createCollection(collectionName,
"conf",2, 2);
create.process(solrClient);
CloudUtil.waitForState(cluster, "Timed out waiting for replicas of new collection to be active",
collectionName, CloudUtil.clusterShape(2, 2, false, true));
ClusterState clusterState = cluster.getClusterStateProvider().getClusterState();
DocCollection coll = clusterState.getCollection(collectionName);
Set<String> nodes = coll.getReplicas().stream()
.map(r -> r.getNodeName())
.collect(Collectors.toSet());
Map<String, Number> initialFreedisk = getFreeDiskPerNode(nodes);
// test small updates
for (int i = 0; i < NUM_DOCS; i++) {
SolrInputDocument doc = new SolrInputDocument("id", "id-" + i);
solrClient.add(collectionName, doc);
}
Map<String, Number> updatedFreedisk = getFreeDiskPerNode(nodes);
double delta = getDeltaFreeDiskBytes(initialFreedisk, updatedFreedisk);
// 2 replicas - twice as much delta
assertEquals(SimClusterStateProvider.DEFAULT_DOC_SIZE_BYTES * NUM_DOCS * 2, delta, delta * 0.1);
// test small deletes - delete half of docs
for (int i = 0; i < NUM_DOCS / 2; i++) {
solrClient.deleteById(collectionName, "id-" + i);
}
Map<String, Number> updatedFreedisk1 = getFreeDiskPerNode(nodes);
double delta1 = getDeltaFreeDiskBytes(initialFreedisk, updatedFreedisk1);
// 2 replicas but half the docs
assertEquals(SimClusterStateProvider.DEFAULT_DOC_SIZE_BYTES * NUM_DOCS * 2 / 2, delta1, delta1 * 0.1);
// test bulk delete
solrClient.deleteByQuery(collectionName, "*:*");
Map<String, Number> updatedFreedisk2 = getFreeDiskPerNode(nodes);
double delta2 = getDeltaFreeDiskBytes(initialFreedisk, updatedFreedisk2);
// 0 docs - initial freedisk
if (log.isInfoEnabled()) {
log.info(cluster.dumpClusterState(true));
}
assertEquals(0.0, delta2, delta2 * 0.1);
// test bulk update
UpdateRequest ureq = new UpdateRequest();
ureq.setDocIterator(new FakeDocIterator(0, NUM_DOCS));
ureq.process(solrClient, collectionName);
Map<String, Number> updatedFreedisk3 = getFreeDiskPerNode(nodes);
double delta3 = getDeltaFreeDiskBytes(initialFreedisk, updatedFreedisk3);
assertEquals(SimClusterStateProvider.DEFAULT_DOC_SIZE_BYTES * NUM_DOCS * 2, delta3, delta3 * 0.1);
}
private double getDeltaFreeDiskBytes(Map<String, Number> initial, Map<String, Number> updated) {
double deltaGB = 0;
for (String node : initial.keySet()) {
double before = initial.get(node).doubleValue();
double after = updated.get(node).doubleValue();
assertTrue("freedisk after=" + after + " not smaller than before=" + before, after <= before);
deltaGB += before - after;
}
return deltaGB * 1024.0 * 1024.0 * 1024.0;
}
private Map<String, Number> getFreeDiskPerNode(Collection<String> nodes) throws Exception {
Map<String, Number> freediskPerNode = new HashMap<>();
for (String node : nodes) {
Map<String, Object> values = cluster.getNodeStateProvider().getNodeValues(node, Arrays.asList(Variable.Type.FREEDISK.tagName));
freediskPerNode.put(node, (Number) values.get(Variable.Type.FREEDISK.tagName));
}
if (log.isInfoEnabled()) {
log.info("- freeDiskPerNode: {}", Utils.toJSONString(freediskPerNode));
}
return freediskPerNode;
}
}