blob: c972bacba53b4a2205522895bcdf623ac70569fd [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.solr.cloud.autoscaling.sim;
import java.io.File;
import java.io.FileInputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.InputStreamReader;
import java.io.PrintStream;
import java.io.Reader;
import java.lang.invoke.MethodHandles;
import java.net.URLDecoder;
import java.nio.charset.Charset;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Locale;
import java.util.Map;
import java.util.Optional;
import java.util.Properties;
import java.util.Set;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import org.apache.solr.client.solrj.SolrRequest;
import org.apache.solr.client.solrj.SolrResponse;
import org.apache.solr.client.solrj.cloud.autoscaling.AutoScalingConfig;
import org.apache.solr.client.solrj.cloud.autoscaling.Clause;
import org.apache.solr.client.solrj.cloud.autoscaling.Policy;
import org.apache.solr.client.solrj.cloud.autoscaling.PolicyHelper;
import org.apache.solr.client.solrj.cloud.autoscaling.ReplicaInfo;
import org.apache.solr.client.solrj.cloud.autoscaling.Suggester;
import org.apache.solr.client.solrj.cloud.autoscaling.TriggerEventProcessorStage;
import org.apache.solr.client.solrj.cloud.autoscaling.Variable;
import org.apache.solr.client.solrj.impl.CloudSolrClient;
import org.apache.solr.client.solrj.impl.SolrClientCloudManager;
import org.apache.solr.client.solrj.request.GenericSolrRequest;
import org.apache.solr.client.solrj.request.RequestWriter;
import org.apache.solr.client.solrj.request.UpdateRequest;
import org.apache.solr.client.solrj.request.V2Request;
import org.apache.solr.cloud.CloudUtil;
import org.apache.solr.cloud.autoscaling.ActionContext;
import org.apache.solr.cloud.autoscaling.AutoScaling;
import org.apache.solr.cloud.autoscaling.AutoScalingHandler;
import org.apache.solr.cloud.autoscaling.TriggerEvent;
import org.apache.solr.cloud.autoscaling.TriggerListener;
import org.apache.solr.cloud.autoscaling.TriggerListenerBase;
import org.apache.solr.common.params.AutoScalingParams;
import org.apache.solr.common.params.CollectionAdminParams;
import org.apache.solr.common.params.CollectionParams;
import org.apache.solr.common.params.ModifiableSolrParams;
import org.apache.solr.common.params.SolrParams;
import org.apache.solr.common.util.IOUtils;
import org.apache.solr.common.util.TimeSource;
import org.apache.solr.common.util.Utils;
import org.apache.solr.common.util.PropertiesUtil;
import org.apache.solr.util.RedactionUtils;
import org.apache.solr.util.TimeOut;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* This class represents an autoscaling scenario consisting of a series of autoscaling
* operations on a simulated cluster.
*
* @deprecated to be removed in Solr 9.0 (see SOLR-14656)
*/
public class SimScenario implements AutoCloseable {
private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
/** Context variable: Random live node name. */
public static final String RANDOM_NODE_CTX_PROP = "_random_node_";
/** Context variable: Node name of the current Overseer leader. */
public static final String OVERSEER_LEADER_CTX_PROP = "_overseer_leader_";
/** Context variable: List of live nodes. */
public static final String LIVE_NODES_CTX_PROP = "_live_nodes_";
/** Context variable: List of collections. */
public static final String COLLECTIONS_CTX_PROP = "_collections_";
/** Context variable: List of calculated suggestions. */
public static final String SUGGESTIONS_CTX_PROP = "_suggestions_";
/** Context variable: List of SolrResponses of SOLR_REQUEST operations. */
public static final String RESPONSES_CTX_PROP = "_responses_";
/** Context variable: Current loop iteration or none if outside of loop. */
public static final String LOOP_ITER_PROP = "_loop_iter_";
/** Last trigger event captured by WAIT_EVENT. */
public static final String TRIGGER_EVENT_PREFIX = "_trigger_event_";
public SimCloudManager cluster;
public AutoScalingConfig config;
public List<SimOp> ops = new ArrayList<>();
public Map<String, Object> context = new HashMap<>();
public PrintStream console = System.err;
public boolean verbose;
public boolean abortLoop;
public boolean abortScenario;
/** Base class for implementation of scenario DSL actions. */
public static abstract class SimOp {
ModifiableSolrParams initParams;
ModifiableSolrParams params;
public void init(SolrParams params) {
this.initParams = new ModifiableSolrParams(params);
}
/**
* This method prepares a copy of initial params (and sets the value of {@link #params}
* with all property references resolved against the current {@link SimScenario#context}
* and system properties. This method should always be called before invoking
* {@link #execute(SimScenario)}.
* @param scenario current scenario
*/
@SuppressWarnings({"unchecked"})
public void prepareCurrentParams(SimScenario scenario) {
Properties props = new Properties();
scenario.context.forEach((k, v) -> {
if (v instanceof String[]) {
v = String.join(",", (String[]) v);
} else if (v instanceof Collection) {
StringBuilder sb = new StringBuilder();
for (Object o : (Collection<Object>)v) {
if (sb.length() > 0) {
sb.append(',');
}
if ((o instanceof String) || (o instanceof Number)) {
sb.append(o);
} else {
// skip all values
return;
}
}
v = sb.toString();
} else if ((v instanceof String) || (v instanceof Number)) {
// don't convert, put as is
} else {
// skip
return;
}
props.put(k, v);
});
ModifiableSolrParams currentParams = new ModifiableSolrParams();
initParams.forEach(e -> {
String newKey = PropertiesUtil.substituteProperty(e.getKey(), props);
if (newKey == null) {
newKey = e.getKey();
}
String[] newValues;
if (e.getValue() != null && e.getValue().length > 0) {
String[] values = e.getValue();
newValues = new String[values.length];
for (int k = 0; k < values.length; k++) {
String newVal = PropertiesUtil.substituteProperty(values[k], props);
if (newVal == null) {
newVal = values[k];
}
newValues[k] = newVal;
}
} else {
newValues = e.getValue();
}
currentParams.add(newKey, newValues);
});
params = currentParams;
}
/**
* Execute the operation.
* @param scenario current scenario.
*/
public abstract void execute (SimScenario scenario) throws Exception;
}
/**
* Actions supported by the scenario.
*/
public enum SimAction {
/** Create a new simulated cluster. */
CREATE_CLUSTER,
/** Create a simulated cluster from autoscaling snapshot. */
LOAD_SNAPSHOT,
/** Save autoscaling snapshot of the current simulated cluster. */
SAVE_SNAPSHOT,
/** Calculate autoscaling suggestions and put them in the scenario's context. */
CALCULATE_SUGGESTIONS,
/** Apply previously calculated autoscaling suggestions. */
APPLY_SUGGESTIONS,
/** Kill specific nodes, or a number of randomly selected nodes. */
KILL_NODES,
/** Add new nodes. */
ADD_NODES,
/** Load autoscaling.json configuration from a file. */
LOAD_AUTOSCALING,
/** Start a loop. */
LOOP_START,
/** End a loop. */
LOOP_END,
/** Set operation delays to simulate long-running actions. */
SET_OP_DELAYS,
/** Execute a SolrRequest (must be supported by {@link SimCloudManager}). */
SOLR_REQUEST,
/** Wait for a collection to reach the indicated number of shards and replicas. */
WAIT_COLLECTION,
/** Prepare a listener to listen for an autoscaling event. */
EVENT_LISTENER,
/** Wait for an autoscaling event using previously prepared listener. */
WAIT_EVENT,
/** Run the simulation for a while, allowing background tasks to execute. */
RUN,
/** Dump the internal state of the simulator to console. */
DUMP,
/** Set a variable in context. */
CTX_SET,
/** Remove a variable from context. */
CTX_REMOVE,
/** Set metrics for a node. */
SET_NODE_METRICS,
/** Set metrics for each replica of a collection's shard(s). */
SET_SHARD_METRICS,
/** Bulk index a number of simulated documents. */
INDEX_DOCS,
/** Assert a condition. */
ASSERT;
public static SimAction get(String str) {
if (str != null) {
try {
return SimAction.valueOf(str.toUpperCase(Locale.ROOT));
} catch (Exception e) {
return null;
}
} else {
return null;
}
}
public String toLower() {
return toString().toLowerCase(Locale.ROOT);
}
}
public static Map<SimAction, Class<? extends SimOp>> simOps = new HashMap<>();
static {
simOps.put(SimAction.CREATE_CLUSTER, CreateCluster.class);
simOps.put(SimAction.LOAD_SNAPSHOT, LoadSnapshot.class);
simOps.put(SimAction.SAVE_SNAPSHOT, SaveSnapshot.class);
simOps.put(SimAction.LOAD_AUTOSCALING, LoadAutoscaling.class);
simOps.put(SimAction.CALCULATE_SUGGESTIONS, CalculateSuggestions.class);
simOps.put(SimAction.APPLY_SUGGESTIONS, ApplySuggestions.class);
simOps.put(SimAction.KILL_NODES, KillNodes.class);
simOps.put(SimAction.ADD_NODES, AddNodes.class);
simOps.put(SimAction.LOOP_START, LoopOp.class);
simOps.put(SimAction.LOOP_END, null);
simOps.put(SimAction.SET_OP_DELAYS, SetOpDelays.class);
simOps.put(SimAction.SOLR_REQUEST, RunSolrRequest.class);
simOps.put(SimAction.RUN, RunSimulator.class);
simOps.put(SimAction.WAIT_COLLECTION, WaitCollection.class);
simOps.put(SimAction.EVENT_LISTENER, SetEventListener.class);
simOps.put(SimAction.WAIT_EVENT, WaitEvent.class);
simOps.put(SimAction.CTX_SET, CtxSet.class);
simOps.put(SimAction.CTX_REMOVE, CtxRemove.class);
simOps.put(SimAction.DUMP, Dump.class);
simOps.put(SimAction.SET_NODE_METRICS, SetNodeMetrics.class);
simOps.put(SimAction.SET_SHARD_METRICS, SetShardMetrics.class);
simOps.put(SimAction.INDEX_DOCS, IndexDocs.class);
simOps.put(SimAction.ASSERT, Assert.class);
}
/**
* Loop action.
*/
public static class LoopOp extends SimOp {
// populated by the DSL parser
List<SimOp> ops = new ArrayList<>();
int iterations;
@Override
public void execute(SimScenario scenario) throws Exception {
iterations = Integer.parseInt(params.get("iterations", "10"));
for (int i = 0; i < iterations; i++) {
if (scenario.abortLoop) {
log.info(" -- abortLoop requested, aborting after {} iterations.", i);
return;
}
scenario.context.put(LOOP_ITER_PROP, String.valueOf(i));
log.info(" * iter {} :", i + 1); // logOK
for (SimOp op : ops) {
op.prepareCurrentParams(scenario);
if (log.isInfoEnabled()) {
log.info(" - {}\t{})", op.getClass().getSimpleName(), op.params);
}
op.execute(scenario);
if (scenario.abortLoop) {
log.info(" -- abortLoop requested, aborting after {} iterations.", i);
return;
}
}
}
}
}
/**
* Set a context property.
*/
public static class CtxSet extends SimOp {
@Override
public void execute(SimScenario scenario) throws Exception {
String key = params.required().get("key");
String[] values = params.required().getParams("value");
if (values != null) {
scenario.context.put(key, Arrays.asList(values));
} else {
scenario.context.remove(key);
}
}
}
/**
* Remove a context property.
*/
public static class CtxRemove extends SimOp {
@Override
public void execute(SimScenario scenario) throws Exception {
String key = params.required().get("key");
scenario.context.remove(key);
}
}
/**
* Create a simulated cluster.
*/
public static class CreateCluster extends SimOp {
@Override
public void execute(SimScenario scenario) throws Exception {
int numNodes = Integer.parseInt(params.get("numNodes", "5"));
boolean disableMetricsHistory = Boolean.parseBoolean(params.get("disableMetricsHistory", "false"));
String timeSourceStr = params.get("timeSource", "simTime:50");
if (scenario.cluster != null) { // close & reset
IOUtils.closeQuietly(scenario.cluster);
scenario.context.clear();
}
scenario.cluster = SimCloudManager.createCluster(numNodes, TimeSource.get(timeSourceStr));
if (disableMetricsHistory) {
scenario.cluster.disableMetricsHistory();
}
scenario.config = scenario.cluster.getDistribStateManager().getAutoScalingConfig();
}
}
/**
* Create a simulated cluster from an autoscaling snapshot.
*/
public static class LoadSnapshot extends SimOp {
@Override
public void execute(SimScenario scenario) throws Exception {
String path = params.get("path");
SnapshotCloudManager snapshotCloudManager;
if (path == null) {
String zkHost = params.get("zkHost");
if (zkHost == null) {
throw new IOException(SimAction.LOAD_SNAPSHOT + " must specify 'path' or 'zkHost'");
} else {
try (CloudSolrClient cloudSolrClient = new CloudSolrClient.Builder(Collections.singletonList(zkHost), Optional.empty()).build()) {
cloudSolrClient.connect();
try (SolrClientCloudManager realCloudManager = new SolrClientCloudManager(NoopDistributedQueueFactory.INSTANCE, cloudSolrClient)) {
snapshotCloudManager = new SnapshotCloudManager(realCloudManager, null);
}
}
}
} else {
snapshotCloudManager = SnapshotCloudManager.readSnapshot(new File(path));
}
scenario.cluster = SimCloudManager.createCluster(snapshotCloudManager, null, snapshotCloudManager.getTimeSource());
scenario.config = scenario.cluster.getDistribStateManager().getAutoScalingConfig();
}
}
/**
* Save an autoscaling snapshot.
*/
public static class SaveSnapshot extends SimOp {
@Override
public void execute(SimScenario scenario) throws Exception {
String path = params.get("path");
if (path == null) {
throw new IOException(SimAction.SAVE_SNAPSHOT + " must specify 'path'");
}
boolean redact = Boolean.parseBoolean(params.get("redact", "false"));
try (SnapshotCloudManager snapshotCloudManager = new SnapshotCloudManager(scenario.cluster, null)) {
snapshotCloudManager.saveSnapshot(new File(path), true, redact);
}
}
}
/**
* Load autoscaling.json configuration.
*/
public static class LoadAutoscaling extends SimOp {
@Override
@SuppressWarnings({"unchecked"})
public void execute(SimScenario scenario) throws Exception {
Map<String, Object> map;
boolean addDefaults = Boolean.parseBoolean(params.get("withDefaultTriggers", "true"));
int defaultWaitFor = Integer.parseInt(params.get("defaultWaitFor", "120"));
String path = params.get("path");
if (path == null) {
String json = params.get("json");
if (json == null) {
throw new IOException(SimAction.LOAD_AUTOSCALING + " must specify either 'path' or 'json'");
} else {
map = (Map<String, Object>) Utils.fromJSONString(json);
}
} else {
File f = new File(path);
Reader r;
if (f.exists()) {
r = new InputStreamReader(new FileInputStream(f), Charset.forName("UTF-8"));
} else {
InputStream is = getClass().getResourceAsStream(path);
if (is == null) {
throw new IOException("path " + path + " does not exist and it's not a resource");
}
r = new InputStreamReader(is, Charset.forName("UTF-8"));
}
map = (Map<String, Object>) Utils.fromJSON(r);
}
AutoScalingConfig config = new AutoScalingConfig(map);
if (addDefaults) {
// add default triggers
if (!config.getTriggerConfigs().containsKey(AutoScaling.AUTO_ADD_REPLICAS_TRIGGER_NAME)) {
Map<String, Object> props = new HashMap<>(AutoScaling.AUTO_ADD_REPLICAS_TRIGGER_PROPS);
props.put("waitFor", defaultWaitFor);
AutoScalingConfig.TriggerConfig trigger = new AutoScalingConfig.TriggerConfig(AutoScaling.AUTO_ADD_REPLICAS_TRIGGER_NAME, props);
config = config.withTriggerConfig(trigger);
config = AutoScalingHandler.withSystemLogListener(config, AutoScaling.AUTO_ADD_REPLICAS_TRIGGER_NAME);
}
if (!config.getTriggerConfigs().containsKey(AutoScaling.SCHEDULED_MAINTENANCE_TRIGGER_NAME)) {
AutoScalingConfig.TriggerConfig trigger = new AutoScalingConfig.TriggerConfig(AutoScaling.SCHEDULED_MAINTENANCE_TRIGGER_NAME, AutoScaling.SCHEDULED_MAINTENANCE_TRIGGER_PROPS);
config = config.withTriggerConfig(trigger);
config = AutoScalingHandler.withSystemLogListener(config, AutoScaling.SCHEDULED_MAINTENANCE_TRIGGER_NAME);
}
}
scenario.config = config;
// set this config on the simulator
scenario.cluster.getSimDistribStateManager().simSetAutoScalingConfig(config);
// wait until it finished processing the config
(new TimeOut(30, TimeUnit.SECONDS, scenario.cluster.getTimeSource()))
.waitFor("OverseerTriggerThread never caught up to the latest znodeVersion", () -> {
try {
AutoScalingConfig autoscalingConfig = scenario.cluster.getDistribStateManager().getAutoScalingConfig();
return autoscalingConfig.getZkVersion() == scenario.cluster.getOverseerTriggerThread().getProcessedZnodeVersion();
} catch (Exception e) {
throw new RuntimeException("FAILED", e);
}
});
}
}
/**
* Kill one or more nodes.
*/
public static class KillNodes extends SimOp {
@Override
public void execute(SimScenario scenario) throws Exception {
if (params.get("numNodes") != null) {
int numNodes = Integer.parseInt(params.get("numNodes"));
scenario.cluster.simRemoveRandomNodes(numNodes, false, scenario.cluster.getRandom());
} else if (params.get("nodes") != null || params.get("node") != null) {
Set<String> nodes = new HashSet<>();
String[] nodesValues = params.getParams("nodes");
if (nodesValues != null) {
for (String nodesValue : nodesValues) {
String[] vals = nodesValue.split(",");
nodes.addAll(Arrays.asList(vals));
}
}
nodesValues = params.getParams("node");
if (nodesValues != null) {
nodes.addAll(Arrays.asList(nodesValues));
}
for (String node : nodes) {
scenario.cluster.simRemoveNode(node, false);
}
}
}
}
/**
* Add one or more nodes.
*/
public static class AddNodes extends SimOp {
@Override
public void execute(SimScenario scenario) throws Exception {
int numNodes = Integer.parseInt(params.get("numNodes"));
for (int i = 0; i < numNodes; i++) {
scenario.cluster.simAddNode();
}
}
}
/**
* Calculate autoscaling suggestions.
*/
public static class CalculateSuggestions extends SimOp {
@Override
public void execute(SimScenario scenario) throws Exception {
List<Suggester.SuggestionInfo> suggestions = PolicyHelper.getSuggestions(scenario.config, scenario.cluster);
scenario.context.put(SUGGESTIONS_CTX_PROP, suggestions);
if (log.isInfoEnabled()) {
log.info(" - {} suggestions", suggestions.size());
}
if (suggestions.isEmpty()) {
scenario.abortLoop = true;
}
}
}
/**
* Apply autoscaling suggestions.
*/
public static class ApplySuggestions extends SimOp {
@Override
public void execute(SimScenario scenario) throws Exception {
@SuppressWarnings({"unchecked"})
List<Suggester.SuggestionInfo> suggestions = (List<Suggester.SuggestionInfo>) scenario.context.getOrDefault(SUGGESTIONS_CTX_PROP, Collections.emptyList());
int unresolvedCount = 0;
for (Suggester.SuggestionInfo suggestion : suggestions) {
@SuppressWarnings({"rawtypes"})
SolrRequest operation = suggestion.getOperation();
if (operation == null) {
unresolvedCount++;
if (suggestion.getViolation() == null) {
log.error(" -- ignoring suggestion without violation and without operation: {}", suggestion);
}
continue;
}
SolrParams params = operation.getParams();
if (operation instanceof V2Request) {
params = SimUtils.v2AdminRequestToV1Params((V2Request)operation);
}
Map<String, Object> paramsMap = new LinkedHashMap<>();
params.toMap(paramsMap);
ReplicaInfo info = scenario.cluster.getSimClusterStateProvider().simGetReplicaInfo(
params.get(CollectionAdminParams.COLLECTION), params.get("replica"));
if (info == null) {
log.error("Could not find ReplicaInfo for params: {}", params);
} else if (scenario.verbose) {
paramsMap.put("replicaInfo", info);
} else if (info.getVariable(Variable.Type.CORE_IDX.tagName) != null) {
paramsMap.put(Variable.Type.CORE_IDX.tagName, info.getVariable(Variable.Type.CORE_IDX.tagName));
}
try {
scenario.cluster.request(operation);
} catch (Exception e) {
log.error("Aborting - error executing suggestion {}", suggestion, e);
break;
}
}
if (suggestions.size() > 0 && unresolvedCount == suggestions.size()) {
log.info(" -- aborting simulation, only {} unresolved violations remain.", unresolvedCount);
scenario.abortLoop = true;
}
}
}
/**
* Execute a SolrRequest supported by {@link SimCloudManager}.
*/
public static class RunSolrRequest extends SimOp {
@Override
public void execute(SimScenario scenario) throws Exception {
String path = params.get("path", "/");
SolrRequest.METHOD m = SolrRequest.METHOD.valueOf(params.get("httpMethod", "GET"));
params.remove("httpMethod");
String streamBody = params.get("stream.body");
params.remove("stream.body");
GenericSolrRequest req = new GenericSolrRequest(m, path, params);
if (streamBody != null) {
req.setContentWriter(new RequestWriter.StringPayloadContentWriter(streamBody, "application/json"));
}
SolrResponse rsp = scenario.cluster.request(req);
@SuppressWarnings("unchecked")
List<SolrResponse> responses = (List<SolrResponse>) scenario.context.computeIfAbsent(RESPONSES_CTX_PROP, o -> new ArrayList<SolrResponse>());
responses.add(rsp);
}
}
/**
* Set delays for specified collection operations in order to simulate slow execution.
*/
public static class SetOpDelays extends SimOp {
@Override
public void execute(SimScenario scenario) throws Exception {
String[] collections = params.remove("collection");
if (collections == null || collections.length == 0) {
throw new IOException("'collection' param is required but missing: " + params);
}
Map<String, Long> delays = new HashMap<>();
params.forEach(e -> {
String key = e.getKey();
CollectionParams.CollectionAction a = CollectionParams.CollectionAction.get(key);
if (a == null) {
log.warn("Invalid collection action {}, skipping...", key);
return;
}
String[] values = e.getValue();
if (values == null || values[0].trim().isEmpty()) {
delays.put(a.name(), null);
} else {
Long value = Long.parseLong(values[0]);
delays.put(a.name(), value);
}
});
for (String collection : collections) {
scenario.cluster.getSimClusterStateProvider().simSetOpDelays(collection, delays);
}
}
}
/**
* Run the simulator for a while.
*/
public static class RunSimulator extends SimOp {
@Override
public void execute(SimScenario scenario) throws Exception {
int timeMs = Integer.parseInt(params.get("time", "60000"));
scenario.cluster.getTimeSource().sleep(timeMs);
}
}
/**
* Wait for a specific collection shape.
*/
public static class WaitCollection extends SimOp {
@Override
public void execute(SimScenario scenario) throws Exception {
String collection = params.required().get("collection");
int shards = Integer.parseInt(params.required().get("shards"));
int replicas = Integer.parseInt(params.required().get("replicas"));
boolean withInactive = Boolean.parseBoolean(params.get("withInactive", "false"));
boolean requireLeaders = Boolean.parseBoolean(params.get("requireLeaders", "true"));
int waitSec = Integer.parseInt(params.required().get("wait", "" + CloudUtil.DEFAULT_TIMEOUT));
CloudUtil.waitForState(scenario.cluster, collection, waitSec, TimeUnit.SECONDS,
CloudUtil.clusterShape(shards, replicas, withInactive, requireLeaders));
}
}
private static class SimWaitListener extends TriggerListenerBase {
private final TimeSource timeSource;
private final AutoScalingConfig.TriggerListenerConfig config;
private CountDownLatch triggerFired = new CountDownLatch(1);
private TriggerEvent event;
SimWaitListener(TimeSource timeSource, AutoScalingConfig.TriggerListenerConfig config) {
this.timeSource = timeSource;
this.config = config;
}
@Override
public AutoScalingConfig.TriggerListenerConfig getConfig() {
return config;
}
@Override
public boolean isEnabled() {
return true;
}
@Override
public void onEvent(TriggerEvent event, TriggerEventProcessorStage stage, String actionName, ActionContext context, Throwable error, String message) throws Exception {
triggerFired.countDown();
this.event = event;
}
public TriggerEvent getEvent() {
return event;
}
public void wait(int waitSec) throws Exception {
long waitTime = timeSource.convertDelay(TimeUnit.SECONDS, waitSec, TimeUnit.MILLISECONDS);
boolean await = triggerFired.await(waitTime, TimeUnit.MILLISECONDS);
if (!await) {
throw new IOException("Timed out waiting for trigger " + config.trigger + " to fire after simulated " +
waitSec + "s (real " + waitTime + "ms).");
}
}
}
/**
* Set a temporary listener to wait for a specific trigger event processing.
*/
@SuppressWarnings({"unchecked"})
public static class SetEventListener extends SimOp {
@Override
public void execute(SimScenario scenario) throws Exception {
String trigger = params.required().get(AutoScalingParams.TRIGGER);
Map<String, Object> cfgMap = new HashMap<>();
String name = ".sim_wait_event_" + trigger;
cfgMap.put(AutoScalingParams.NAME, name);
cfgMap.put(AutoScalingParams.TRIGGER, trigger);
String[] beforeActions = params.getParams(AutoScalingParams.BEFORE_ACTION);
String[] afterActions = params.getParams(AutoScalingParams.AFTER_ACTION);
if (beforeActions != null) {
for (String beforeAction : beforeActions) {
((List<String>)cfgMap.computeIfAbsent(AutoScalingParams.BEFORE_ACTION, o -> new ArrayList<String>())).add(beforeAction);
}
}
if (afterActions != null) {
for (String afterAction : afterActions) {
((List<String>)cfgMap.computeIfAbsent(AutoScalingParams.AFTER_ACTION, o -> new ArrayList<String>())).add(afterAction);
}
}
String[] stages = params.required().getParams(AutoScalingParams.STAGE);
for (String stage : stages) {
String[] lst = stage.split("[,\\s]+");
for (String val : lst) {
try {
TriggerEventProcessorStage.valueOf(val);
((List<String>)cfgMap.computeIfAbsent(AutoScalingParams.STAGE, o -> new ArrayList<String>())).add(val);
} catch (IllegalArgumentException e) {
throw new IOException("Invalid stage name '" + val + "'");
}
}
}
final AutoScalingConfig.TriggerListenerConfig listenerConfig = new AutoScalingConfig.TriggerListenerConfig(name, cfgMap);
if (scenario.context.containsKey("_sim_waitListener_" + trigger)) {
throw new IOException("currently only one listener can be set per trigger. Trigger name: " + trigger);
}
TriggerListener listener = new SimWaitListener(scenario.cluster.getTimeSource(), listenerConfig);
scenario.context.put("_sim_waitListener_" + trigger, listener);
scenario.cluster.getOverseerTriggerThread().getScheduledTriggers().addAdditionalListener(listener);
}
}
/**
* Wait for the previously set listener to capture an event.
*/
public static class WaitEvent extends SimOp {
@Override
public void execute(SimScenario scenario) throws Exception {
String trigger = params.required().get(AutoScalingParams.TRIGGER);
int waitSec = Integer.parseInt(params.get("wait", "" + CloudUtil.DEFAULT_TIMEOUT));
SimWaitListener listener = (SimWaitListener)scenario.context.remove("_sim_waitListener_" + trigger);
if (listener == null) {
throw new IOException(SimAction.WAIT_EVENT + " must be preceded by " + SimAction.EVENT_LISTENER + " for trigger " + trigger);
}
try {
listener.wait(waitSec);
scenario.context.remove(TRIGGER_EVENT_PREFIX + trigger);
if (listener.getEvent() != null) {
@SuppressWarnings({"unchecked"})
Map<String, Object> ev = listener.getEvent().toMap(new LinkedHashMap<>());
scenario.context.put(TRIGGER_EVENT_PREFIX + trigger, ev);
}
} finally {
scenario.cluster.getOverseerTriggerThread().getScheduledTriggers().removeAdditionalListener(listener);
}
}
}
public static class SetNodeMetrics extends SimOp {
@Override
public void execute(SimScenario scenario) throws Exception {
String nodeset = params.required().get(Clause.NODESET);
Set<String> nodes = new HashSet<>();
if (nodeset.equals(Policy.ANY)) {
nodes.addAll(scenario.cluster.getLiveNodesSet().get());
} else {
String[] list = nodeset.split("[,\\s]+");
for (String node : list) {
if (node.trim().isEmpty()) {
continue;
}
nodes.add(node);
}
}
Map<String, Object> values = new HashMap<>();
params.remove(Clause.NODESET);
for (String key : params.getParameterNames()) {
String strVal = params.get(key);
Object val;
// try auto-converting to a number
try {
val = Long.parseLong(strVal);
} catch (NumberFormatException nfe) {
try {
val = Double.parseDouble(strVal);
} catch (NumberFormatException nfe1) {
val = strVal;
}
}
values.put(key, val);
}
for (String node : nodes) {
Map<String, Object> newValues = new HashMap<>(scenario.cluster.getSimNodeStateProvider().simGetNodeValues(node));
newValues.putAll(values);
scenario.cluster.getSimNodeStateProvider().simSetNodeValues(node, newValues);
}
}
}
public static class SetShardMetrics extends SimOp {
@Override
public void execute(SimScenario scenario) throws Exception {
String collection = params.required().get("collection");
String shard = params.get("shard");
boolean delta = params.getBool("delta", false);
boolean divide = params.getBool("divide", false);
params.remove("collection");
params.remove("shard");
params.remove("delta");
params.remove("divide");
Map<String, Object> values = new HashMap<>();
for (String key : params.getParameterNames()) {
// try guessing if it's a number
try {
Integer i = Integer.valueOf(params.get(key));
values.put(key, i);
} catch (NumberFormatException nfe) {
try {
Double d = Double.valueOf(params.get(key));
values.put(key, d);
} catch (NumberFormatException nfe1) {
// not a number
values.put(key, params.get(key));
}
}
}
values.forEach((k, v) -> {
try {
scenario.cluster.getSimClusterStateProvider().simSetShardValue(collection, shard, k, v, delta, divide);
} catch (Exception e) {
throw new RuntimeException("Error setting shard value", e);
}
});
}
}
public static class IndexDocs extends SimOp {
@Override
public void execute(SimScenario scenario) throws Exception {
String collection = params.required().get("collection");
long numDocs = params.required().getLong("numDocs");
long start = params.getLong("start", 0L);
UpdateRequest ureq = new UpdateRequest();
ureq.setParam("collection", collection);
ureq.setDocIterator(new FakeDocIterator(start, numDocs));
scenario.cluster.simGetSolrClient().request(ureq);
}
}
public enum Condition {
EQUALS,
NOT_EQUALS,
NULL,
NOT_NULL;
public static Condition get(String p) {
if (p == null) {
return null;
} else {
try {
return Condition.valueOf(p.toUpperCase(Locale.ROOT));
} catch (Exception e) {
return null;
}
}
}
}
public static class Assert extends SimOp {
@Override
public void execute(SimScenario scenario) throws Exception {
String key = params.get("key");
Condition condition = Condition.get(params.required().get("condition"));
if (condition == null) {
throw new IOException("Invalid 'condition' in params: " + params);
}
String expected = params.get("expected");
if (condition != Condition.NOT_NULL && condition != Condition.NULL && expected == null) {
throw new IOException("'expected' param is required when condition is " + condition);
}
Object value;
if (key != null) {
if (key.contains("/")) {
value = Utils.getObjectByPath(scenario.context, true, key);
} else {
value = scenario.context.get(key);
}
} else {
value = params.required().get("value");
}
switch (condition) {
case NULL:
if (value != null) {
throw new IOException("expected value should be null but was '" + value + "'");
}
break;
case NOT_NULL:
if (value == null) {
throw new IOException("expected value should not be null");
}
break;
case EQUALS:
if (!expected.equals(String.valueOf(value))) {
throw new IOException("expected value is '" + expected + "' but actual value is '" + value + "'");
}
break;
case NOT_EQUALS:
if (expected.equals(String.valueOf(value))) {
throw new IOException("expected value is '" + expected + "' and actual value is the same while it should be different");
}
break;
}
}
}
/**
* Dump the simulator state to the console.
*/
public static class Dump extends SimOp {
@Override
@SuppressWarnings({"unchecked"})
public void execute(SimScenario scenario) throws Exception {
boolean redact = Boolean.parseBoolean(params.get("redact", "false"));
boolean withData = Boolean.parseBoolean(params.get("withData", "false"));
boolean withStats = Boolean.parseBoolean(params.get("withStats", "false"));
boolean withSuggestions = Boolean.parseBoolean(params.get("withSuggestions", "true"));
boolean withDiagnostics = Boolean.parseBoolean(params.get("withDiagnostics", "false"));
boolean withNodeState = Boolean.parseBoolean(params.get("withNodeState", "false"));
boolean withClusterState = Boolean.parseBoolean(params.get("withClusterState", "false"));
boolean withManagerState = Boolean.parseBoolean(params.get("withManagerState", "false"));
SnapshotCloudManager snapshotCloudManager = new SnapshotCloudManager(scenario.cluster, null);
Map<String, Object> snapshot = snapshotCloudManager.getSnapshot(true, redact);
if (!withData) {
snapshot.remove(SnapshotCloudManager.DISTRIB_STATE_KEY);
}
if (!withNodeState) {
snapshot.remove(SnapshotCloudManager.NODE_STATE_KEY);
}
if (!withClusterState) {
snapshot.remove(SnapshotCloudManager.CLUSTER_STATE_KEY);
}
if (!withStats) {
snapshot.remove(SnapshotCloudManager.STATISTICS_STATE_KEY);
}
if (!withManagerState) {
snapshot.remove(SnapshotCloudManager.MANAGER_STATE_KEY);
}
if (!withDiagnostics) {
((Map<String, Object>)snapshot.get(SnapshotCloudManager.AUTOSCALING_STATE_KEY)).remove("diagnostics");
}
if (!withSuggestions) {
((Map<String, Object>)snapshot.get(SnapshotCloudManager.AUTOSCALING_STATE_KEY)).remove("suggestions");
}
String data = Utils.toJSONString(snapshot);
if (redact) {
RedactionUtils.RedactionContext ctx = SimUtils.getRedactionContext(snapshotCloudManager.getClusterStateProvider().getClusterState());
data = RedactionUtils.redactNames(ctx.getRedactions(), data);
}
snapshotCloudManager.close();
scenario.console.println(data);
}
}
/**
* Parse a DSL string and create a scenario ready to run.
* @param data DSL string with commands and parameters
* @return configured scenario
* @throws Exception on syntax errors
*/
public static SimScenario load(String data) throws Exception {
@SuppressWarnings("resource")
SimScenario scenario = new SimScenario();
String[] lines = data.split("\\r?\\n");
for (int i = 0; i < lines.length; i++) {
String line = lines[i];
line = line.trim();
if (line.trim().isEmpty() || line.startsWith("#") || line.startsWith("//")) {
continue;
}
// remove trailing / / comments
String[] comments = line.split("//");
String expr = comments[0];
// split on blank
String[] parts = expr.split("\\s+");
if (parts.length > 2) {
log.warn("Invalid line - wrong number of parts {}, skipping: {}", parts.length, line);
continue;
}
SimAction action = SimAction.get(parts[0]);
if (action == null) {
log.warn("Invalid scenario action {}, skipping...", parts[0]);
continue;
}
if (action == SimAction.LOOP_END) {
if (!scenario.context.containsKey("loop")) {
throw new IOException("LOOP_END without start!");
}
scenario.context.remove("loop");
continue;
}
Class<? extends SimOp> opClass = simOps.get(action);
SimOp op = opClass.getConstructor().newInstance();
ModifiableSolrParams params = new ModifiableSolrParams();
if (parts.length > 1) {
String paramsString = parts[1];
if (parts[1].contains("?")) { // url-like with path?params...
String[] urlParts = parts[1].split("\\?");
params.set("path", urlParts[0]);
paramsString = urlParts.length > 1 ? urlParts[1] : "";
}
String[] paramsParts = paramsString.split("&");
for (String paramPair : paramsParts) {
String[] paramKV = paramPair.split("=");
String k = URLDecoder.decode(paramKV[0], "UTF-8");
String v = paramKV.length > 1 ? URLDecoder.decode(paramKV[1], "UTF-8") : null;
params.add(k, v);
}
}
op.init(params);
// loop handling
if (action == SimAction.LOOP_START) {
if (scenario.context.containsKey("loop")) {
throw new IOException("only one loop level is allowed");
}
scenario.context.put("loop", op);
scenario.ops.add(op);
continue;
}
LoopOp currentLoop = (LoopOp) scenario.context.get("loop");
if (currentLoop != null) {
currentLoop.ops.add(op);
} else {
scenario.ops.add(op);
}
}
if (scenario.context.containsKey("loop")) {
throw new IOException("Unterminated loop statement");
}
// sanity check set_listener / wait_listener
int numSets = 0, numWaits = 0;
for (SimOp op : scenario.ops) {
if (op instanceof SetEventListener) {
numSets++;
} else if (op instanceof WaitEvent) {
numWaits++;
}
if (numWaits > numSets) {
throw new Exception("Unexpected " + SimAction.WAIT_EVENT + " without previous " + SimAction.EVENT_LISTENER);
}
}
if (numSets > numWaits) {
throw new Exception(SimAction.EVENT_LISTENER + " count should be equal to " + SimAction.WAIT_EVENT + " count but was " +
numSets + " > " + numWaits);
}
return scenario;
}
/**
* Run the scenario.
*/
public void run() throws Exception {
for (int i = 0; i < ops.size(); i++) {
if (abortScenario) {
log.info("-- abortScenario requested, aborting after {} ops.", i);
return;
}
SimOp op = ops.get(i);
if (log.isInfoEnabled()) {
log.info("{}.\t{}\t{}", i + 1, op.getClass().getSimpleName(), op.initParams); // logOk
}
// substitute parameters based on the current context
if (cluster != null && cluster.getLiveNodesSet().size() > 0) {
context.put(LIVE_NODES_CTX_PROP, new ArrayList<>(cluster.getLiveNodesSet().get()));
context.put(RANDOM_NODE_CTX_PROP, cluster.getSimClusterStateProvider().simGetRandomNode());
context.put(COLLECTIONS_CTX_PROP, cluster.getSimClusterStateProvider().simListCollections());
context.put(OVERSEER_LEADER_CTX_PROP, cluster.getSimClusterStateProvider().simGetOverseerLeader());
} else {
context.remove(LIVE_NODES_CTX_PROP);
context.remove(COLLECTIONS_CTX_PROP);
context.remove(RANDOM_NODE_CTX_PROP);
context.remove(SUGGESTIONS_CTX_PROP);
context.remove(OVERSEER_LEADER_CTX_PROP);
}
op.prepareCurrentParams(this);
if (log.isInfoEnabled()) {
log.info("\t\t{}\t{}", op.getClass().getSimpleName(), op.params);
}
op.execute(this);
}
}
@Override
public void close() throws IOException {
if (cluster != null) {
cluster.close();
cluster = null;
}
}
}