| /* |
| * Licensed to the Apache Software Foundation (ASF) under one or more |
| * contributor license agreements. See the NOTICE file distributed with |
| * this work for additional information regarding copyright ownership. |
| * The ASF licenses this file to You under the Apache License, Version 2.0 |
| * (the "License"); you may not use this file except in compliance with |
| * the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| |
| package org.apache.solr.cloud.autoscaling.sim; |
| |
| import java.io.ByteArrayOutputStream; |
| import java.io.File; |
| import java.io.IOException; |
| import java.lang.invoke.MethodHandles; |
| import java.util.ArrayList; |
| import java.util.Collections; |
| import java.util.HashMap; |
| import java.util.HashSet; |
| import java.util.List; |
| import java.util.Locale; |
| import java.util.Map; |
| import java.util.Random; |
| import java.util.Set; |
| import java.util.TreeMap; |
| import java.util.concurrent.Callable; |
| import java.util.concurrent.ConcurrentHashMap; |
| import java.util.concurrent.ConcurrentSkipListMap; |
| import java.util.concurrent.ExecutorService; |
| import java.util.concurrent.Future; |
| import java.util.concurrent.TimeUnit; |
| import java.util.concurrent.atomic.AtomicInteger; |
| import java.util.concurrent.atomic.AtomicLong; |
| |
| import com.codahale.metrics.jvm.ClassLoadingGaugeSet; |
| import com.codahale.metrics.jvm.GarbageCollectorMetricSet; |
| import com.codahale.metrics.jvm.MemoryUsageGaugeSet; |
| import com.codahale.metrics.jvm.ThreadStatesGaugeSet; |
| import org.apache.solr.client.solrj.SolrClient; |
| import org.apache.solr.client.solrj.SolrRequest; |
| import org.apache.solr.client.solrj.SolrResponse; |
| import org.apache.solr.client.solrj.SolrServerException; |
| import org.apache.solr.client.solrj.cloud.DistribStateManager; |
| import org.apache.solr.client.solrj.cloud.DistributedQueueFactory; |
| import org.apache.solr.client.solrj.cloud.NodeStateProvider; |
| import org.apache.solr.client.solrj.cloud.SolrCloudManager; |
| import org.apache.solr.client.solrj.cloud.autoscaling.AutoScalingConfig; |
| import org.apache.solr.client.solrj.cloud.autoscaling.ReplicaInfo; |
| import org.apache.solr.client.solrj.cloud.autoscaling.Variable; |
| import org.apache.solr.client.solrj.impl.ClusterStateProvider; |
| import org.apache.solr.client.solrj.request.AbstractUpdateRequest; |
| import org.apache.solr.client.solrj.request.CollectionAdminRequest; |
| import org.apache.solr.client.solrj.request.QueryRequest; |
| import org.apache.solr.client.solrj.request.RequestWriter; |
| import org.apache.solr.client.solrj.request.UpdateRequest; |
| import org.apache.solr.client.solrj.request.V2Request; |
| import org.apache.solr.client.solrj.response.RequestStatusState; |
| import org.apache.solr.client.solrj.response.SolrResponseBase; |
| import org.apache.solr.client.solrj.response.UpdateResponse; |
| import org.apache.solr.cloud.Overseer; |
| import org.apache.solr.cloud.autoscaling.AutoScalingHandler; |
| import org.apache.solr.cloud.autoscaling.OverseerTriggerThread; |
| import org.apache.solr.common.SolrException; |
| import org.apache.solr.common.SolrInputDocument; |
| import org.apache.solr.common.cloud.ClusterState; |
| import org.apache.solr.common.cloud.Replica; |
| import org.apache.solr.common.cloud.ZkNodeProps; |
| import org.apache.solr.common.cloud.ZkStateReader; |
| import org.apache.solr.common.cloud.rule.ImplicitSnitch; |
| import org.apache.solr.common.params.CollectionAdminParams; |
| import org.apache.solr.common.params.CollectionParams; |
| import org.apache.solr.common.params.CommonAdminParams; |
| import org.apache.solr.common.params.CommonParams; |
| import org.apache.solr.common.params.CoreAdminParams; |
| import org.apache.solr.common.params.ModifiableSolrParams; |
| import org.apache.solr.common.params.SolrParams; |
| import org.apache.solr.common.util.ContentStreamBase; |
| import org.apache.solr.common.util.ExecutorUtil; |
| import org.apache.solr.common.util.IOUtils; |
| import org.apache.solr.common.util.NamedList; |
| import org.apache.solr.common.util.ObjectCache; |
| import org.apache.solr.common.util.SimpleOrderedMap; |
| import org.apache.solr.common.util.TimeSource; |
| import org.apache.solr.core.CloudConfig; |
| import org.apache.solr.core.SolrInfoBean; |
| import org.apache.solr.core.SolrResourceLoader; |
| import org.apache.solr.handler.admin.MetricsHandler; |
| import org.apache.solr.handler.admin.MetricsHistoryHandler; |
| import org.apache.solr.metrics.AltBufferPoolMetricSet; |
| import org.apache.solr.metrics.MetricsMap; |
| import org.apache.solr.metrics.OperatingSystemMetricSet; |
| import org.apache.solr.metrics.SolrMetricManager; |
| import org.apache.solr.request.LocalSolrQueryRequest; |
| import org.apache.solr.response.SolrQueryResponse; |
| import org.apache.solr.common.util.SolrNamedThreadFactory; |
| import org.apache.solr.util.MockSearchableSolrClient; |
| import org.slf4j.Logger; |
| import org.slf4j.LoggerFactory; |
| |
| import static org.apache.solr.cloud.api.collections.OverseerCollectionMessageHandler.REQUESTID; |
| |
| /** |
| * Simulated {@link SolrCloudManager}. |
| * |
| * @deprecated to be removed in Solr 9.0 (see SOLR-14656) |
| */ |
| public class SimCloudManager implements SolrCloudManager { |
| private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass()); |
| |
| private static final Random random; |
| |
| static { |
| String seed = System.getProperty("tests.seed"); |
| if (seed == null) { |
| random = new Random(); |
| } else { |
| random = new Random(seed.hashCode()); |
| } |
| } |
| |
| private final SimDistribStateManager stateManager; |
| private final SimClusterStateProvider clusterStateProvider; |
| private final SimNodeStateProvider nodeStateProvider; |
| private final AutoScalingHandler autoScalingHandler; |
| private final LiveNodesSet liveNodesSet = new LiveNodesSet(); |
| private final DistributedQueueFactory queueFactory; |
| private final ObjectCache objectCache = new ObjectCache(); |
| private final SolrMetricManager metricManager = new SolrMetricManager(); |
| private final String metricTag; |
| |
| private final List<SolrInputDocument> systemColl = Collections.synchronizedList(new ArrayList<>()); |
| private final Map<String, Map<String, AtomicInteger>> eventCounts = new ConcurrentHashMap<>(); |
| private final MockSearchableSolrClient solrClient; |
| private final Map<String, AtomicLong> opCounts = new ConcurrentSkipListMap<>(); |
| /** |
| * @see #submit |
| * @see #getBackgroundTaskFailureCount |
| * @see LoggingCallable |
| */ |
| private final AtomicLong backgroundTaskFailureCounter = new AtomicLong(0); |
| |
| private ExecutorService simCloudManagerPool; |
| private Overseer.OverseerThread triggerThread; |
| private ThreadGroup triggerThreadGroup; |
| private SolrResourceLoader loader; |
| private MetricsHandler metricsHandler; |
| private MetricsHistoryHandler metricsHistoryHandler; |
| private TimeSource timeSource; |
| private boolean useSystemCollection = true; |
| |
| private static int nodeIdPort = 10000; |
| public static int DEFAULT_FREE_DISK = 10240; // 10 TiB |
| public static int DEFAULT_TOTAL_DISK = 10240; // 10 TiB |
| public static long DEFAULT_IDX_SIZE_BYTES = 10240; // 10 kiB |
| |
| /** |
| * Create a simulated cluster. This cluster uses the following components: |
| * <ul> |
| * <li>{@link SimDistribStateManager} with non-shared root node.</li> |
| * <li>{@link SimClusterStateProvider}</li> |
| * <li>{@link SimNodeStateProvider}, where node values are automatically initialized when using |
| * {@link #simAddNode()} method.</li> |
| * <li>{@link GenericDistributedQueueFactory} that uses {@link SimDistribStateManager} as its storage.</li> |
| * <li>an instance of {@link AutoScalingHandler} for managing AutoScalingConfig.</li> |
| * <li>an instance of {@link OverseerTriggerThread} for managing triggers and processing events.</li> |
| * </ul> |
| * @param timeSource time source to use. |
| */ |
| public SimCloudManager(TimeSource timeSource) throws Exception { |
| this(timeSource, null); |
| } |
| |
| SimCloudManager(TimeSource timeSource, SimDistribStateManager distribStateManager) throws Exception { |
| this.loader = new SolrResourceLoader(); |
| if (distribStateManager == null) { |
| this.stateManager = new SimDistribStateManager(SimDistribStateManager.createNewRootNode()); |
| // init common paths |
| stateManager.makePath(ZkStateReader.CLUSTER_STATE); |
| stateManager.makePath(ZkStateReader.CLUSTER_PROPS); |
| stateManager.makePath(ZkStateReader.SOLR_AUTOSCALING_CONF_PATH); |
| stateManager.makePath(ZkStateReader.LIVE_NODES_ZKNODE); |
| stateManager.makePath(ZkStateReader.ROLES); |
| stateManager.makePath(ZkStateReader.SOLR_AUTOSCALING_EVENTS_PATH); |
| stateManager.makePath(ZkStateReader.SOLR_AUTOSCALING_TRIGGER_STATE_PATH); |
| stateManager.makePath(ZkStateReader.SOLR_AUTOSCALING_NODE_LOST_PATH); |
| stateManager.makePath(ZkStateReader.SOLR_AUTOSCALING_NODE_ADDED_PATH); |
| stateManager.makePath(Overseer.OVERSEER_ELECT); |
| } else { |
| this.stateManager = distribStateManager; |
| } |
| |
| // register common metrics |
| metricTag = Integer.toHexString(hashCode()); |
| String registryName = SolrMetricManager.getRegistryName(SolrInfoBean.Group.jvm); |
| metricManager.registerAll(registryName, new AltBufferPoolMetricSet(), SolrMetricManager.ResolutionStrategy.REPLACE, "buffers"); |
| metricManager.registerAll(registryName, new ClassLoadingGaugeSet(), SolrMetricManager.ResolutionStrategy.REPLACE, "classes"); |
| metricManager.registerAll(registryName, new OperatingSystemMetricSet(), SolrMetricManager.ResolutionStrategy.REPLACE, "os"); |
| metricManager.registerAll(registryName, new GarbageCollectorMetricSet(), SolrMetricManager.ResolutionStrategy.REPLACE, "gc"); |
| metricManager.registerAll(registryName, new MemoryUsageGaugeSet(), SolrMetricManager.ResolutionStrategy.REPLACE, "memory"); |
| metricManager.registerAll(registryName, new ThreadStatesGaugeSet(), SolrMetricManager.ResolutionStrategy.REPLACE, "threads"); // todo should we use CachedThreadStatesGaugeSet instead? |
| MetricsMap sysprops = new MetricsMap((detailed, map) -> { |
| System.getProperties().forEach((k, v) -> { |
| map.put(String.valueOf(k), v); |
| }); |
| }); |
| metricManager.registerGauge(null, registryName, sysprops, metricTag, true, "properties", "system"); |
| |
| registryName = SolrMetricManager.getRegistryName(SolrInfoBean.Group.node); |
| metricManager.registerGauge(null, registryName, () -> new File("/").getUsableSpace(), |
| metricTag, true, "usableSpace", SolrInfoBean.Category.CONTAINER.toString(), "fs", "coreRoot"); |
| |
| solrClient = new MockSearchableSolrClient() { |
| @Override |
| @SuppressWarnings({"rawtypes"}) |
| public NamedList<Object> request(SolrRequest request, String collection) throws SolrServerException, IOException { |
| if (collection != null) { |
| if (request instanceof AbstractUpdateRequest) { |
| ((AbstractUpdateRequest)request).setParam("collection", collection); |
| } else if (request instanceof QueryRequest) { |
| if (request.getPath() != null && ( |
| request.getPath().startsWith("/admin/autoscaling") || |
| request.getPath().startsWith("/cluster/autoscaling") || |
| request.getPath().startsWith("/admin/metrics/history") || |
| request.getPath().startsWith("/cluster/metrics/history") |
| )) { |
| // forward it |
| ModifiableSolrParams params = new ModifiableSolrParams(request.getParams()); |
| params.set("collection", collection); |
| request = new QueryRequest(params); |
| } else { |
| // search request |
| if (collection.equals(CollectionAdminParams.SYSTEM_COLL)) { |
| return super.request(request, collection); |
| } else { |
| // forward it |
| ModifiableSolrParams params = new ModifiableSolrParams(request.getParams()); |
| params.set("collection", collection); |
| request = new QueryRequest(params); |
| } |
| } |
| } else { |
| throw new SolrException(SolrException.ErrorCode.BAD_REQUEST, "when collection != null only UpdateRequest and QueryRequest are supported: request=" + request + ", collection=" + collection); |
| } |
| } |
| try { |
| SolrResponse rsp = SimCloudManager.this.request(request); |
| return rsp.getResponse(); |
| } catch (UnsupportedOperationException e) { |
| throw new SolrServerException(e); |
| } |
| } |
| }; |
| |
| |
| this.timeSource = timeSource != null ? timeSource : TimeSource.NANO_TIME; |
| this.clusterStateProvider = new SimClusterStateProvider(liveNodesSet, this); |
| this.nodeStateProvider = new SimNodeStateProvider(liveNodesSet, this.stateManager, this.clusterStateProvider, null); |
| this.queueFactory = new GenericDistributedQueueFactory(stateManager); |
| this.simCloudManagerPool = ExecutorUtil.newMDCAwareFixedThreadPool(200, new SolrNamedThreadFactory("simCloudManagerPool")); |
| |
| this.autoScalingHandler = new AutoScalingHandler(this, loader); |
| |
| |
| triggerThreadGroup = new ThreadGroup("Simulated Overseer autoscaling triggers"); |
| OverseerTriggerThread trigger = new OverseerTriggerThread(loader, this, |
| new CloudConfig.CloudConfigBuilder("nonexistent", 0, "sim").build()); |
| triggerThread = new Overseer.OverseerThread(triggerThreadGroup, trigger, "Simulated OverseerAutoScalingTriggerThread"); |
| triggerThread.start(); |
| } |
| |
| // ---------- simulator setup methods ----------- |
| |
| /** |
| * Create a cluster with the specified number of nodes. Node metrics are pre-populated. |
| * @param numNodes number of nodes to create |
| * @param timeSource time source |
| * @return instance of simulated cluster |
| */ |
| public static SimCloudManager createCluster(int numNodes, TimeSource timeSource) throws Exception { |
| SimCloudManager cloudManager = new SimCloudManager(timeSource); |
| for (int i = 1; i <= numNodes; i++) { |
| cloudManager.simAddNode(); |
| } |
| return cloudManager; |
| } |
| |
| /** |
| * Create a cluster initialized from the provided cluster state. |
| * @param initialState existing cluster state |
| * @param timeSource time source |
| * @return instance of simulated cluster with the same layout as the provided cluster state. |
| */ |
| public static SimCloudManager createCluster(ClusterState initialState, TimeSource timeSource) throws Exception { |
| SimCloudManager cloudManager = new SimCloudManager(timeSource); |
| cloudManager.getSimClusterStateProvider().simSetClusterState(initialState); |
| for (String node : cloudManager.getClusterStateProvider().getLiveNodes()) { |
| cloudManager.getSimNodeStateProvider().simSetNodeValues(node, createNodeValues(node)); |
| } |
| return cloudManager; |
| } |
| |
| public static SimCloudManager createCluster(SolrCloudManager other, AutoScalingConfig config, TimeSource timeSource) throws Exception { |
| SimDistribStateManager distribStateManager = new SimDistribStateManager(SimDistribStateManager.createNewRootNode()); |
| distribStateManager.copyFrom(other.getDistribStateManager(), false); |
| SimCloudManager cloudManager = new SimCloudManager(timeSource, distribStateManager); |
| if (config != null) { |
| cloudManager.getSimDistribStateManager().simSetAutoScalingConfig(config); |
| } else { |
| config = cloudManager.getDistribStateManager().getAutoScalingConfig(); |
| } |
| Set<String> nodeTags = new HashSet<>(SimUtils.COMMON_NODE_TAGS); |
| nodeTags.addAll(config.getPolicy().getParamNames()); |
| Set<String> replicaTags = new HashSet<>(SimUtils.COMMON_REPLICA_TAGS); |
| replicaTags.addAll(config.getPolicy().getPerReplicaAttributes()); |
| cloudManager.getSimClusterStateProvider().copyFrom(other.getClusterStateProvider()); |
| for (String node : other.getClusterStateProvider().getLiveNodes()) { |
| SimClusterStateProvider simClusterStateProvider = cloudManager.getSimClusterStateProvider(); |
| cloudManager.getSimNodeStateProvider().simSetNodeValues(node, other.getNodeStateProvider().getNodeValues(node, nodeTags)); |
| Map<String, Map<String, List<ReplicaInfo>>> infos = other.getNodeStateProvider().getReplicaInfo(node, replicaTags); |
| simClusterStateProvider.simSetReplicaValues(node, infos, true); |
| } |
| SimUtils.checkConsistency(cloudManager, config); |
| return cloudManager; |
| } |
| |
| /** |
| * Create simulated node values (metrics) for a node. |
| * @param nodeName node name (eg. '127.0.0.1:10000_solr'). If null then a new node name will be |
| * created using sequentially increasing port number. |
| * @return node values |
| */ |
| public static Map<String, Object> createNodeValues(String nodeName) { |
| Map<String, Object> values = new HashMap<>(); |
| String host, nodeId; |
| int port; |
| if (nodeName == null) { |
| host = "127.0.0.1"; |
| port = nodeIdPort++; |
| nodeId = host + ":" + port + "_solr"; |
| values.put("ip_1", "127"); |
| values.put("ip_2", "0"); |
| values.put("ip_3", "0"); |
| values.put("ip_4", "1"); |
| } else { |
| String[] hostPortCtx = nodeName.split(":"); |
| if (hostPortCtx.length != 2) { |
| throw new RuntimeException("Invalid nodeName " + nodeName); |
| } |
| host = hostPortCtx[0]; |
| String[] portCtx = hostPortCtx[1].split("_"); |
| if (portCtx.length != 2) { |
| throw new RuntimeException("Invalid port_context in nodeName " + nodeName); |
| } |
| port = Integer.parseInt(portCtx[0]); |
| nodeId = host + ":" + port + "_" + portCtx[1]; |
| String[] ip = host.split("\\."); |
| if (ip.length == 4) { |
| values.put("ip_1", ip[0]); |
| values.put("ip_2", ip[1]); |
| values.put("ip_3", ip[2]); |
| values.put("ip_4", ip[3]); |
| } |
| } |
| values.put(ImplicitSnitch.HOST, host); |
| values.put(ImplicitSnitch.PORT, port); |
| values.put(ImplicitSnitch.NODE, nodeId); |
| values.put(ImplicitSnitch.CORES, 0); |
| values.put(ImplicitSnitch.DISK, DEFAULT_FREE_DISK); |
| values.put(Variable.Type.TOTALDISK.tagName, DEFAULT_TOTAL_DISK); |
| values.put(ImplicitSnitch.SYSLOADAVG, 1.0); |
| values.put(ImplicitSnitch.HEAPUSAGE, 123450000); |
| values.put("sysprop.java.version", System.getProperty("java.version")); |
| values.put("sysprop.java.vendor", System.getProperty("java.vendor")); |
| // fake some metrics expected in tests |
| values.put("metrics:solr.node:ADMIN./admin/authorization.clientErrors:count", 0); |
| values.put("metrics:solr.jvm:buffers.direct.Count", 0); |
| return values; |
| } |
| |
| public void disableMetricsHistory() { |
| metricsHistoryHandler.close(); |
| } |
| |
| public String dumpClusterState(boolean withCollections) throws Exception { |
| StringBuilder sb = new StringBuilder(); |
| sb.append("#######################################\n"); |
| sb.append("############ CLUSTER STATE ############\n"); |
| sb.append("#######################################\n"); |
| sb.append("## Live nodes:\t\t").append(getLiveNodesSet().size()).append("\n"); |
| int emptyNodes = 0; |
| int maxReplicas = 0; |
| int minReplicas = Integer.MAX_VALUE; |
| Map<String, Map<Replica.State, AtomicInteger>> replicaStates = new TreeMap<>(); |
| int numReplicas = 0; |
| for (String node : getLiveNodesSet().get()) { |
| List<ReplicaInfo> replicas = getSimClusterStateProvider().simGetReplicaInfos(node); |
| numReplicas += replicas.size(); |
| if (replicas.size() > maxReplicas) { |
| maxReplicas = replicas.size(); |
| } |
| if (minReplicas > replicas.size()) { |
| minReplicas = replicas.size(); |
| } |
| for (ReplicaInfo ri : replicas) { |
| replicaStates.computeIfAbsent(ri.getCollection(), c -> new TreeMap<>()) |
| .computeIfAbsent(ri.getState(), s -> new AtomicInteger()) |
| .incrementAndGet(); |
| } |
| if (replicas.isEmpty()) { |
| emptyNodes++; |
| } |
| } |
| if (minReplicas == Integer.MAX_VALUE) { |
| minReplicas = 0; |
| } |
| sb.append("## Empty nodes:\t").append(emptyNodes).append("\n"); |
| Set<String> deadNodes = getSimNodeStateProvider().simGetDeadNodes(); |
| sb.append("## Dead nodes:\t\t").append(deadNodes.size()).append("\n"); |
| deadNodes.forEach(n -> sb.append("##\t\t").append(n).append("\n")); |
| sb.append("## Collections:\n"); |
| clusterStateProvider.simGetCollectionStats().forEach((coll, stats) -> { |
| sb.append("## * ").append(coll).append('\n'); |
| stats.forEach((k, v) -> { |
| sb.append("## ").append(k).append("\t").append(v).append("\n"); |
| }); |
| }); |
| if (withCollections) { |
| ClusterState state = clusterStateProvider.getClusterState(); |
| state.forEachCollection(coll -> sb.append(coll.toString()).append("\n")); |
| } |
| sb.append("## Max replicas per node:\t").append(maxReplicas).append("\n"); |
| sb.append("## Min replicas per node:\t").append(minReplicas).append("\n"); |
| sb.append("## Total replicas:\t\t").append(numReplicas).append("\n"); |
| replicaStates.forEach((c, map) -> { |
| AtomicInteger repCnt = new AtomicInteger(); |
| map.forEach((s, cnt) -> repCnt.addAndGet(cnt.get())); |
| sb.append("## * ").append(c).append("\t\t").append(repCnt.get()).append("\n"); |
| map.forEach((s, cnt) -> sb.append("##\t\t- ").append(String.format(Locale.ROOT, "%-12s %4d", s, cnt.get())).append("\n")); |
| }); |
| sb.append("######### Solr op counts ##########\n"); |
| simGetOpCounts().forEach((k, cnt) -> sb.append("##\t\t- ").append(String.format(Locale.ROOT, "%-14s %4d", k, cnt.get())).append("\n")); |
| sb.append("######### Autoscaling event counts ###########\n"); |
| Map<String, Map<String, AtomicInteger>> counts = simGetEventCounts(); |
| counts.forEach((trigger, map) -> { |
| sb.append("## * Trigger: ").append(trigger).append("\n"); |
| map.forEach((s, cnt) -> sb.append("##\t\t- ").append(String.format(Locale.ROOT, "%-11s %4d", s, cnt.get())).append("\n")); |
| }); |
| return sb.toString(); |
| } |
| |
| /** |
| * Get the instance of {@link SolrResourceLoader} that is used by the cluster components. |
| */ |
| public SolrResourceLoader getLoader() { |
| return loader; |
| } |
| |
| /** |
| * Get the source of randomness (usually initialized by the test suite). |
| */ |
| public Random getRandom() { |
| return random; |
| } |
| |
| /** |
| * Add a new node and initialize its node values (metrics). The |
| * /live_nodes list is updated with the new node id. |
| * @return new node id |
| */ |
| public String simAddNode() throws Exception { |
| Map<String, Object> values = createNodeValues(null); |
| String nodeId = (String)values.get(ImplicitSnitch.NODE); |
| nodeStateProvider.simSetNodeValues(nodeId, values); |
| clusterStateProvider.simAddNode(nodeId); |
| log.trace("-- added node {}", nodeId); |
| // initialize history handler if this is the first node |
| if (metricsHistoryHandler == null && liveNodesSet.size() == 1) { |
| metricsHandler = new MetricsHandler(metricManager); |
| metricsHistoryHandler = new MetricsHistoryHandler(nodeId, metricsHandler, solrClient, this, new HashMap<>()); |
| metricsHistoryHandler.initializeMetrics(metricManager, SolrMetricManager.getRegistryName(SolrInfoBean.Group.node), metricTag, CommonParams.METRICS_HISTORY_PATH); |
| } |
| return nodeId; |
| } |
| |
| /** |
| * Remove a node from the cluster. This simulates a node lost scenario. |
| * Node id is removed from the /live_nodes list. |
| * @param nodeId node id |
| * @param withValues when true, remove also simulated node values. If false |
| * then node values are retained to later simulate |
| * a node that comes back up |
| */ |
| public void simRemoveNode(String nodeId, boolean withValues) throws Exception { |
| clusterStateProvider.simRemoveNode(nodeId); |
| if (withValues) { |
| nodeStateProvider.simRemoveNodeValues(nodeId); |
| } |
| if (liveNodesSet.isEmpty()) { |
| // remove handlers |
| if (metricsHistoryHandler != null) { |
| IOUtils.closeQuietly(metricsHistoryHandler); |
| metricsHistoryHandler = null; |
| } |
| if (metricsHandler != null) { |
| metricsHandler = null; |
| } |
| } |
| log.trace("-- removed node {}", nodeId); |
| } |
| |
| /** |
| * Remove a number of randomly selected nodes |
| * @param number number of nodes to remove |
| * @param withValues when true, remove also simulated node values. If false |
| * then node values are retained to later simulate |
| * a node that comes back up |
| * @param random random |
| */ |
| public void simRemoveRandomNodes(int number, boolean withValues, Random random) throws Exception { |
| List<String> nodes = new ArrayList<>(liveNodesSet.get()); |
| Collections.shuffle(nodes, random); |
| int count = Math.min(number, nodes.size()); |
| for (int i = 0; i < count; i++) { |
| simRemoveNode(nodes.get(i), withValues); |
| } |
| } |
| |
| public void simSetUseSystemCollection(boolean useSystemCollection) { |
| this.useSystemCollection = useSystemCollection; |
| } |
| |
| /** |
| * Clear the (simulated) .system collection. |
| */ |
| public void simClearSystemCollection() { |
| systemColl.clear(); |
| } |
| |
| /** |
| * Get the content of (simulated) .system collection. |
| * @return documents in the collection, in chronological order starting from the oldest. |
| */ |
| public List<SolrInputDocument> simGetSystemCollection() { |
| return systemColl; |
| } |
| |
| public Map<String, Map<String, AtomicInteger>> simGetEventCounts() { |
| TreeMap<String, Map<String, AtomicInteger>> counts = new TreeMap<>(eventCounts); |
| return counts; |
| } |
| |
| /** |
| * Get a {@link SolrClient} implementation where calls are forwarded to this |
| * instance of the cluster. |
| * @return simulated SolrClient. |
| */ |
| public SolrClient simGetSolrClient() { |
| return solrClient; |
| // return new SolrClient() { |
| // @Override |
| // public NamedList<Object> request(SolrRequest request, String collection) throws SolrServerException, IOException { |
| // if (collection != null) { |
| // if (request instanceof AbstractUpdateRequest) { |
| // ((AbstractUpdateRequest)request).setParam("collection", collection); |
| // } else if (request instanceof QueryRequest) { |
| // ModifiableSolrParams params = new ModifiableSolrParams(request.getParams()); |
| // params.set("collection", collection); |
| // request = new QueryRequest(params); |
| // } else { |
| // throw new SolrException(SolrException.ErrorCode.BAD_REQUEST, "when collection != null only UpdateRequest and QueryRequest are supported: request=" + request + ", collection=" + collection); |
| // } |
| // } |
| // SolrResponse rsp = SimCloudManager.this.request(request); |
| // return rsp.getResponse(); |
| // } |
| // |
| // @Override |
| // public void close() throws IOException { |
| // |
| // } |
| // }; |
| } |
| |
| /** |
| * Simulate the effect of restarting Overseer leader - in this case this means closing the current |
| * {@link OverseerTriggerThread} (and optionally killing a node) then starting a new |
| * {@link OverseerTriggerThread}. |
| * All background tasks currently in progress will be interrupted. |
| * @param killNodeId optional nodeId to kill. If null then don't kill any node, just restart the thread |
| * @see #getOverseerTriggerThread |
| */ |
| public void simRestartOverseer(String killNodeId) throws Exception { |
| log.info("=== Restarting OverseerTriggerThread and clearing object cache..."); |
| triggerThread.interrupt(); |
| IOUtils.closeQuietly(triggerThread); |
| if (killNodeId != null) { |
| log.info(" = killing node {}", killNodeId); |
| simRemoveNode(killNodeId, false); |
| } |
| objectCache.clear(); |
| |
| try { |
| simCloudManagerPool.shutdownNow(); |
| } catch (Exception e) { |
| // ignore |
| } |
| simCloudManagerPool = ExecutorUtil.newMDCAwareFixedThreadPool(200, new SolrNamedThreadFactory("simCloudManagerPool")); |
| |
| OverseerTriggerThread trigger = new OverseerTriggerThread(loader, this, |
| new CloudConfig.CloudConfigBuilder("nonexistent", 0, "sim").build()); |
| triggerThread = new Overseer.OverseerThread(triggerThreadGroup, trigger, "Simulated OverseerAutoScalingTriggerThread"); |
| triggerThread.start(); |
| |
| } |
| |
| /** |
| * Submit a task to execute in a thread pool. |
| * Every callable submitted will be wrapped such that errors not handled w/in the callable |
| * will be logged and counted for later assertions. |
| * |
| * @param callable task to execute |
| * @return future to obtain results |
| * @see #getBackgroundTaskFailureCount |
| */ |
| @SuppressWarnings({"unchecked", "rawtypes"}) |
| public <T> Future<T> submit(Callable<T> callable) { |
| return simCloudManagerPool.submit(new LoggingCallable(backgroundTaskFailureCounter, callable)); |
| } |
| /** |
| * Returns a total count of the number of tasks submitted to {@link #submit} that have failed |
| * with any throwable other then <code>InteruptedException</code> |
| * |
| * @see #submit |
| */ |
| public long getBackgroundTaskFailureCount() { |
| return backgroundTaskFailureCounter.get(); |
| } |
| |
| // ---------- type-safe methods to obtain simulator components ---------- |
| public SimClusterStateProvider getSimClusterStateProvider() { |
| return clusterStateProvider; |
| } |
| |
| public SimNodeStateProvider getSimNodeStateProvider() { |
| return nodeStateProvider; |
| } |
| |
| public SimDistribStateManager getSimDistribStateManager() { |
| return stateManager; |
| } |
| |
| public LiveNodesSet getLiveNodesSet() { |
| return liveNodesSet; |
| } |
| |
| /** |
| * Get the number and type of operations processed by this cluster. |
| */ |
| public Map<String, AtomicLong> simGetOpCounts() { |
| return opCounts; |
| } |
| |
| public void simResetOpCounts() { |
| opCounts.clear(); |
| } |
| |
| /** |
| * Get the number of processed operations of a specified type. |
| * @param op operation name, eg. MOVEREPLICA |
| * @return number of operations |
| */ |
| public long simGetOpCount(String op) { |
| AtomicLong count = opCounts.get(op); |
| return count != null ? count.get() : 0L; |
| } |
| |
| public SolrMetricManager getMetricManager() { |
| return metricManager; |
| } |
| |
| // --------- interface methods ----------- |
| |
| |
| @Override |
| public ObjectCache getObjectCache() { |
| return objectCache; |
| } |
| |
| @Override |
| public TimeSource getTimeSource() { |
| return timeSource; |
| } |
| |
| @Override |
| public ClusterStateProvider getClusterStateProvider() { |
| return clusterStateProvider; |
| } |
| |
| @Override |
| public NodeStateProvider getNodeStateProvider() { |
| return nodeStateProvider; |
| } |
| |
| @Override |
| public DistribStateManager getDistribStateManager() { |
| return stateManager; |
| } |
| |
| @Override |
| public DistributedQueueFactory getDistributedQueueFactory() { |
| return queueFactory; |
| } |
| |
| @Override |
| @SuppressWarnings({"rawtypes"}) |
| public SolrResponse request(SolrRequest req) throws IOException { |
| try { |
| // NOTE: we're doing 2 odd things here: |
| // 1) rather then calling simHandleSolrRequest directly, we're submitting it to the |
| // executor service and immediately waiting on the Future. |
| // - This can introduce a delays if there are a lot of existing background tasks submitted |
| // 2) we use simCloudManagerPool directly, instead of using the public submit() method |
| // - this is because there may be "user level" errors (ie: bad input) deliberately generated |
| // by the testcase. we're going to immediately catch & re-throw any exceptions, so we don't |
| // need/want to be wrapped in a LoggingCallable w/getBackgroundTaskFailureCount() tracking |
| Future<SolrResponse> rsp = simCloudManagerPool.submit(() -> simHandleSolrRequest(req)); |
| return rsp.get(120, TimeUnit.SECONDS); // longer then this and something is seriously wrong |
| } catch (Exception e) { |
| throw new IOException(e); |
| } |
| } |
| |
| private void incrementCount(String op) { |
| AtomicLong count = opCounts.computeIfAbsent(op, o -> new AtomicLong()); |
| count.incrementAndGet(); |
| } |
| |
| /** |
| * Handler method for autoscaling requests. NOTE: only a specific subset of autoscaling requests is |
| * supported! |
| * @param req autoscaling request |
| * @return results |
| */ |
| |
| @SuppressWarnings({"unchecked", "rawtypes"}) |
| public SolrResponse simHandleSolrRequest(SolrRequest req) throws IOException, InterruptedException { |
| // pay the penalty for remote request, at least 5 ms |
| timeSource.sleep(5); |
| |
| if (log.isTraceEnabled()) { |
| log.trace("--- got SolrRequest: {} {} {}", req.getMethod(), req.getPath(), |
| (req.getParams() != null ? " " + req.getParams() : "")); // logOk |
| } |
| if (req.getPath() != null) { |
| if (req.getPath().startsWith("/admin/autoscaling") || |
| req.getPath().startsWith("/cluster/autoscaling") || |
| req.getPath().startsWith("/admin/metrics") || |
| req.getPath().startsWith("/cluster/metrics") |
| ) { |
| metricManager.registry("solr.node").counter("ADMIN." + req.getPath() + ".requests").inc(); |
| boolean autoscaling = req.getPath().contains("autoscaling"); |
| boolean history = req.getPath().contains("history"); |
| if (autoscaling) { |
| incrementCount("autoscaling"); |
| } else if (history) { |
| incrementCount("metricsHistory"); |
| } else { |
| incrementCount("metrics"); |
| } |
| ModifiableSolrParams params = new ModifiableSolrParams(req.getParams()); |
| params.set(CommonParams.PATH, req.getPath()); |
| LocalSolrQueryRequest queryRequest = new LocalSolrQueryRequest(null, params); |
| if (autoscaling) { |
| RequestWriter.ContentWriter cw = req.getContentWriter("application/json"); |
| if (null != cw) { |
| ByteArrayOutputStream baos = new ByteArrayOutputStream(); |
| cw.write(baos); |
| String payload = baos.toString("UTF-8"); |
| log.trace("-- payload: {}", payload); |
| queryRequest.setContentStreams(Collections.singletonList(new ContentStreamBase.StringStream(payload))); |
| } |
| } |
| queryRequest.getContext().put("httpMethod", req.getMethod().toString()); |
| SolrQueryResponse queryResponse = new SolrQueryResponse(); |
| queryResponse.addResponseHeader(new SimpleOrderedMap<>()); |
| if (autoscaling) { |
| autoScalingHandler.handleRequest(queryRequest, queryResponse); |
| } else { |
| if (history) { |
| if (metricsHistoryHandler != null) { |
| metricsHistoryHandler.handleRequest(queryRequest, queryResponse); |
| } else { |
| queryRequest.close(); |
| throw new UnsupportedOperationException("must add at least 1 node first"); |
| } |
| } else { |
| if (metricsHandler != null) { |
| metricsHandler.handleRequest(queryRequest, queryResponse); |
| } else { |
| queryRequest.close(); |
| throw new UnsupportedOperationException("must add at least 1 node first"); |
| } |
| } |
| } |
| if (queryResponse.getException() != null) { |
| if (log.isDebugEnabled()) { |
| log.debug("-- exception handling request", queryResponse.getException()); |
| } |
| throw new IOException(queryResponse.getException()); |
| } |
| SolrResponse rsp = new SolrResponseBase(); |
| rsp.setResponse(queryResponse.getValues()); |
| log.trace("-- response: {}", rsp); |
| return rsp; |
| } else if (req instanceof QueryRequest) { |
| incrementCount("query"); |
| return clusterStateProvider.simQuery((QueryRequest)req); |
| } |
| } |
| if (req instanceof UpdateRequest) { |
| incrementCount("update"); |
| UpdateRequest ureq = (UpdateRequest)req; |
| String collection = ureq.getCollection(); |
| UpdateResponse rsp = clusterStateProvider.simUpdate(ureq); |
| if (collection == null || collection.equals(CollectionAdminParams.SYSTEM_COLL)) { |
| List<SolrInputDocument> docs = ureq.getDocuments(); |
| if (docs != null) { |
| if (useSystemCollection) { |
| systemColl.addAll(docs); |
| } |
| for (SolrInputDocument d : docs) { |
| if (!"autoscaling_event".equals(d.getFieldValue("type"))) { |
| continue; |
| } |
| eventCounts.computeIfAbsent((String)d.getFieldValue("event.source_s"), s -> new ConcurrentHashMap<>()) |
| .computeIfAbsent((String)d.getFieldValue("stage_s"), s -> new AtomicInteger()) |
| .incrementAndGet(); |
| } |
| } |
| return new UpdateResponse(); |
| } else { |
| return rsp; |
| } |
| } |
| // support only a specific subset of collection admin ops |
| SolrParams params = req.getParams(); |
| String a = params != null ? params.get(CoreAdminParams.ACTION) : null; |
| SolrResponse rsp = new SolrResponseBase(); |
| rsp.setResponse(new NamedList<>()); |
| String path = params != null ? params.get("path") : null; |
| if (!(req instanceof CollectionAdminRequest)) { |
| // maybe a V2Request? |
| if (req instanceof V2Request) { |
| params = SimUtils.v2AdminRequestToV1Params((V2Request)req); |
| a = params.get(CoreAdminParams.ACTION); |
| } else if (path != null && (path.startsWith("/admin/") || path.startsWith("/cluster/"))) { |
| // pass it through, it's likely a generic request containing admin params |
| } else { |
| throw new UnsupportedOperationException("Only some CollectionAdminRequest-s are supported: " + req.getClass().getName() + ": " + req.getPath() + " " + req.getParams()); |
| } |
| } |
| metricManager.registry("solr.node").counter("ADMIN." + req.getPath() + ".requests").inc(); |
| if (a != null) { |
| CollectionParams.CollectionAction action = CollectionParams.CollectionAction.get(a); |
| if (action == null) { |
| throw new SolrException(SolrException.ErrorCode.BAD_REQUEST, "Unknown action: " + a); |
| } |
| if (log.isTraceEnabled()) { |
| log.trace("Invoking Collection Action :{} with params {}", action.toLower(), params.toQueryString()); |
| } |
| @SuppressWarnings({"rawtypes"}) |
| NamedList results = new NamedList(); |
| rsp.setResponse(results); |
| incrementCount(action.name()); |
| switch (action) { |
| case REQUESTSTATUS: |
| // we complete all async ops immediately |
| String requestId = params.get(REQUESTID); |
| SimpleOrderedMap<String> status = new SimpleOrderedMap<>(); |
| status.add("state", RequestStatusState.COMPLETED.getKey()); |
| status.add("msg", "found [" + requestId + "] in completed tasks"); |
| results.add("status", status); |
| results.add("success", ""); |
| // ExecutePlanAction expects a specific response class |
| rsp = new CollectionAdminRequest.RequestStatusResponse(); |
| rsp.setResponse(results); |
| break; |
| case DELETESTATUS: |
| requestId = params.get(REQUESTID); |
| results.add("status", "successfully removed stored response for [" + requestId + "]"); |
| results.add("success", ""); |
| break; |
| case CREATE: |
| try { |
| clusterStateProvider.simCreateCollection(new ZkNodeProps(params.toNamedList().asMap(10)), results); |
| } catch (Exception e) { |
| throw new SolrException(SolrException.ErrorCode.BAD_REQUEST, e); |
| } |
| break; |
| case DELETE: |
| try { |
| clusterStateProvider.simDeleteCollection(params.get(CommonParams.NAME), |
| params.get(CommonAdminParams.ASYNC), results); |
| } catch (Exception e) { |
| throw new SolrException(SolrException.ErrorCode.BAD_REQUEST, e); |
| } |
| break; |
| case LIST: |
| results.add("collections", clusterStateProvider.simListCollections()); |
| break; |
| case ADDREPLICA: |
| try { |
| clusterStateProvider.simAddReplica(new ZkNodeProps(params.toNamedList().asMap(10)), results); |
| } catch (Exception e) { |
| throw new SolrException(SolrException.ErrorCode.BAD_REQUEST, e); |
| } |
| break; |
| case MOVEREPLICA: |
| try { |
| clusterStateProvider.simMoveReplica(new ZkNodeProps(params.toNamedList().asMap(10)), results); |
| } catch (Exception e) { |
| throw new SolrException(SolrException.ErrorCode.BAD_REQUEST, e); |
| } |
| break; |
| case OVERSEERSTATUS: |
| if (params.get(CommonAdminParams.ASYNC) != null) { |
| results.add(REQUESTID, params.get(CommonAdminParams.ASYNC)); |
| } |
| if (!liveNodesSet.get().isEmpty()) { |
| results.add("leader", liveNodesSet.get().iterator().next()); |
| } |
| results.add("overseer_queue_size", 0); |
| results.add("overseer_work_queue_size", 0); |
| results.add("overseer_collection_queue_size", 0); |
| results.add("success", ""); |
| break; |
| case ADDROLE: |
| nodeStateProvider.simSetNodeValue(params.get("node"), "nodeRole", params.get("role")); |
| break; |
| case CREATESHARD: |
| try { |
| clusterStateProvider.simCreateShard(new ZkNodeProps(params.toNamedList().asMap(10)), results); |
| } catch (Exception e) { |
| throw new SolrException(SolrException.ErrorCode.BAD_REQUEST, e); |
| } |
| break; |
| case SPLITSHARD: |
| try { |
| clusterStateProvider.simSplitShard(new ZkNodeProps(params.toNamedList().asMap(10)), results); |
| } catch (Exception e) { |
| throw new SolrException(SolrException.ErrorCode.BAD_REQUEST, e); |
| } |
| break; |
| case DELETESHARD: |
| try { |
| clusterStateProvider.simDeleteShard(new ZkNodeProps(params.toNamedList().asMap(10)), results); |
| } catch (Exception e) { |
| throw new SolrException(SolrException.ErrorCode.BAD_REQUEST, e); |
| } |
| break; |
| default: |
| throw new UnsupportedOperationException("Unsupported collection admin action=" + action + " in request: " + params); |
| } |
| } else { |
| throw new SolrException(SolrException.ErrorCode.BAD_REQUEST, "action is a required param in request: " + params); |
| } |
| return rsp; |
| |
| } |
| |
| /** |
| * HTTP requests are not supported by this implementation. |
| */ |
| @Override |
| public byte[] httpRequest(String url, SolrRequest.METHOD method, Map<String, String> headers, String payload, int timeout, boolean followRedirects) throws IOException { |
| throw new UnsupportedOperationException("general HTTP requests are not supported yet"); |
| } |
| |
| @Override |
| public void close() throws IOException { |
| // make sure we shutdown the pool first, so any in active background tasks get interupted |
| // before we start closing resources they may be using. |
| simCloudManagerPool.shutdownNow(); |
| |
| if (metricsHistoryHandler != null) { |
| IOUtils.closeQuietly(metricsHistoryHandler); |
| } |
| IOUtils.closeQuietly(clusterStateProvider); |
| IOUtils.closeQuietly(nodeStateProvider); |
| IOUtils.closeQuietly(stateManager); |
| triggerThread.interrupt(); |
| IOUtils.closeQuietly(triggerThread); |
| triggerThread.interrupt(); |
| try { |
| triggerThread.join(); |
| } catch (InterruptedException e) { |
| Thread.currentThread().interrupt(); |
| } |
| IOUtils.closeQuietly(objectCache); |
| } |
| |
| /** |
| * Direct access to the current {@link OverseerTriggerThread} |
| * @see #simRestartOverseer |
| */ |
| public OverseerTriggerThread getOverseerTriggerThread() { |
| return ((OverseerTriggerThread) triggerThread.getThread()); |
| } |
| |
| /** |
| * Wrapper for any Callable that will log a warn/error in the event of InterruptException/Throwable. |
| * Also increments the passed in counter so the CloudManger can later report total errors programatically. |
| * |
| * @see #submit |
| * @see #getBackgroundTaskFailureCount |
| */ |
| private static final class LoggingCallable<T> implements Callable<T> { |
| |
| final AtomicLong failCounter; |
| final Callable<T> inner; |
| |
| public LoggingCallable(final AtomicLong failCounter, final Callable<T> inner) { |
| assert null != failCounter; |
| assert null != inner; |
| this.failCounter = failCounter; |
| this.inner = inner; |
| } |
| |
| public T call() throws Exception { |
| try { |
| return inner.call(); |
| } catch (InterruptedException ignored) { |
| log.warn("Callable interupted", ignored); |
| throw ignored; |
| } catch (Throwable t) { |
| // be forgiving of errors that occured as a result of interuption, even if |
| // the inner Callable didn't realize it... |
| if (Thread.currentThread().isInterrupted()) { |
| log.warn("Callable interrupted w/o noticing", t); |
| throw t; |
| } |
| Throwable cause = t; |
| while ((cause = cause.getCause()) != null) { |
| if (cause instanceof InterruptedException) { |
| log.warn("Callable threw wrapped InterruptedException", t); |
| throw t; |
| } |
| } |
| |
| // in all other situations, this is a problem that should be tracked in the failCounter |
| failCounter.incrementAndGet(); |
| log.error("Callable failed", t); |
| throw t; |
| } |
| } |
| } |
| } |