| /* |
| * Licensed to the Apache Software Foundation (ASF) under one or more |
| * contributor license agreements. See the NOTICE file distributed with |
| * this work for additional information regarding copyright ownership. |
| * The ASF licenses this file to You under the Apache License, Version 2.0 |
| * (the "License"); you may not use this file except in compliance with |
| * the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| package org.apache.solr.cloud.autoscaling; |
| |
| import java.io.IOException; |
| import java.lang.invoke.MethodHandles; |
| import java.util.ArrayList; |
| import java.util.Collections; |
| import java.util.HashMap; |
| import java.util.HashSet; |
| import java.util.List; |
| import java.util.Map; |
| import java.util.Set; |
| import java.util.concurrent.ConcurrentHashMap; |
| import java.util.concurrent.TimeUnit; |
| import java.util.concurrent.atomic.AtomicInteger; |
| import java.util.concurrent.atomic.AtomicLong; |
| |
| import com.google.common.annotations.VisibleForTesting; |
| import com.google.common.util.concurrent.AtomicDouble; |
| import org.apache.solr.client.solrj.cloud.autoscaling.Policy; |
| import org.apache.solr.client.solrj.cloud.autoscaling.ReplicaInfo; |
| import org.apache.solr.client.solrj.cloud.SolrCloudManager; |
| import org.apache.solr.client.solrj.cloud.autoscaling.Suggester; |
| import org.apache.solr.client.solrj.cloud.autoscaling.TriggerEventType; |
| import org.apache.solr.common.SolrException; |
| import org.apache.solr.common.cloud.ClusterState; |
| import org.apache.solr.common.cloud.Replica; |
| import org.apache.solr.common.cloud.ZkStateReader; |
| import org.apache.solr.common.params.AutoScalingParams; |
| import org.apache.solr.common.params.CollectionParams; |
| import org.apache.solr.common.util.Pair; |
| import org.apache.solr.common.util.StrUtils; |
| import org.apache.solr.common.util.Utils; |
| import org.apache.solr.core.SolrResourceLoader; |
| import org.apache.solr.metrics.SolrCoreMetricManager; |
| import org.slf4j.Logger; |
| import org.slf4j.LoggerFactory; |
| |
| /** |
| * Trigger for the {@link org.apache.solr.client.solrj.cloud.autoscaling.TriggerEventType#SEARCHRATE} event. |
| * |
| * @deprecated to be removed in Solr 9.0 (see SOLR-14656) |
| */ |
| public class SearchRateTrigger extends TriggerBase { |
| private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass()); |
| |
| public static final String COLLECTIONS_PROP = "collections"; |
| public static final String METRIC_PROP = "metric"; |
| public static final String MAX_OPS_PROP = "maxOps"; |
| public static final String MIN_REPLICAS_PROP = "minReplicas"; |
| public static final String ABOVE_RATE_PROP = "aboveRate"; |
| public static final String BELOW_RATE_PROP = "belowRate"; |
| public static final String ABOVE_NODE_RATE_PROP = "aboveNodeRate"; |
| public static final String BELOW_NODE_RATE_PROP = "belowNodeRate"; |
| public static final String ABOVE_OP_PROP = "aboveOp"; |
| public static final String BELOW_OP_PROP = "belowOp"; |
| public static final String ABOVE_NODE_OP_PROP = "aboveNodeOp"; |
| public static final String BELOW_NODE_OP_PROP = "belowNodeOp"; |
| |
| // back-compat |
| public static final String BC_COLLECTION_PROP = "collection"; |
| public static final String BC_RATE_PROP = "rate"; |
| |
| |
| public static final String HOT_NODES = "hotNodes"; |
| public static final String HOT_COLLECTIONS = "hotCollections"; |
| public static final String HOT_SHARDS = "hotShards"; |
| public static final String HOT_REPLICAS = "hotReplicas"; |
| public static final String COLD_NODES = "coldNodes"; |
| public static final String COLD_COLLECTIONS = "coldCollections"; |
| public static final String COLD_SHARDS = "coldShards"; |
| public static final String COLD_REPLICAS = "coldReplicas"; |
| public static final String VIOLATION_PROP = "violationType"; |
| |
| public static final int DEFAULT_MAX_OPS = 3; |
| public static final String DEFAULT_METRIC = "QUERY./select.requestTimes:1minRate"; |
| |
| private String metric; |
| private int maxOps; |
| private Integer minReplicas = null; |
| private final Set<String> collections = new HashSet<>(); |
| private String shard; |
| private String node; |
| private double aboveRate; |
| private double belowRate; |
| private double aboveNodeRate; |
| private double belowNodeRate; |
| private CollectionParams.CollectionAction aboveOp, belowOp, aboveNodeOp, belowNodeOp; |
| private final Map<String, Long> lastCollectionEvent = new ConcurrentHashMap<>(); |
| private final Map<String, Long> lastNodeEvent = new ConcurrentHashMap<>(); |
| private final Map<String, Long> lastShardEvent = new ConcurrentHashMap<>(); |
| private final Map<String, Long> lastReplicaEvent = new ConcurrentHashMap<>(); |
| private final Map<String, Object> state = new HashMap<>(); |
| |
| public SearchRateTrigger(String name) { |
| super(TriggerEventType.SEARCHRATE, name); |
| this.state.put("lastCollectionEvent", lastCollectionEvent); |
| this.state.put("lastNodeEvent", lastNodeEvent); |
| this.state.put("lastShardEvent", lastShardEvent); |
| this.state.put("lastReplicaEvent", lastReplicaEvent); |
| TriggerUtils.validProperties(validProperties, |
| COLLECTIONS_PROP, AutoScalingParams.SHARD, AutoScalingParams.NODE, |
| METRIC_PROP, |
| MAX_OPS_PROP, |
| MIN_REPLICAS_PROP, |
| ABOVE_OP_PROP, |
| BELOW_OP_PROP, |
| ABOVE_NODE_OP_PROP, |
| BELOW_NODE_OP_PROP, |
| ABOVE_RATE_PROP, |
| BELOW_RATE_PROP, |
| ABOVE_NODE_RATE_PROP, |
| BELOW_NODE_RATE_PROP, |
| // back-compat props |
| BC_COLLECTION_PROP, |
| BC_RATE_PROP); |
| } |
| |
| @Override |
| public void configure(SolrResourceLoader loader, SolrCloudManager cloudManager, Map<String, Object> properties) throws TriggerValidationException { |
| super.configure(loader, cloudManager, properties); |
| // parse config options |
| String collectionsStr = (String)properties.get(COLLECTIONS_PROP); |
| if (collectionsStr != null) { |
| collections.addAll(StrUtils.splitSmart(collectionsStr, ',')); |
| } |
| // check back-compat collection prop |
| collectionsStr = (String)properties.get(BC_COLLECTION_PROP); |
| if (collectionsStr != null) { |
| if (!collectionsStr.equals(Policy.ANY)) { |
| collections.add(collectionsStr); |
| } |
| } |
| shard = (String)properties.getOrDefault(AutoScalingParams.SHARD, Policy.ANY); |
| if (!shard.equals(Policy.ANY) && (collections.isEmpty() || collections.size() > 1)) { |
| throw new TriggerValidationException(name, AutoScalingParams.SHARD, "When 'shard' is other than #ANY then exactly one collection name must be set"); |
| } |
| node = (String)properties.getOrDefault(AutoScalingParams.NODE, Policy.ANY); |
| metric = (String)properties.getOrDefault(METRIC_PROP, DEFAULT_METRIC); |
| |
| String maxOpsStr = String.valueOf(properties.getOrDefault(MAX_OPS_PROP, DEFAULT_MAX_OPS)); |
| try { |
| maxOps = Integer.parseInt(maxOpsStr); |
| } catch (Exception e) { |
| throw new TriggerValidationException(name, MAX_OPS_PROP, "invalid value '" + maxOpsStr + "': " + e.toString()); |
| } |
| |
| Object o = properties.get(MIN_REPLICAS_PROP); |
| if (o != null) { |
| try { |
| minReplicas = Integer.parseInt(o.toString()); |
| if (minReplicas < 1) { |
| throw new Exception("must be at least 1, or not set to use 'replicationFactor'"); |
| } |
| } catch (Exception e) { |
| throw new TriggerValidationException(name, MIN_REPLICAS_PROP, "invalid value '" + o + "': " + e.toString()); |
| } |
| } |
| |
| Object above = properties.get(ABOVE_RATE_PROP); |
| Object below = properties.get(BELOW_RATE_PROP); |
| // back-compat rate prop |
| if (properties.containsKey(BC_RATE_PROP)) { |
| above = properties.get(BC_RATE_PROP); |
| } |
| if (above == null && below == null) { |
| throw new TriggerValidationException(name, ABOVE_RATE_PROP, "at least one of '" + |
| ABOVE_RATE_PROP + "' or '" + BELOW_RATE_PROP + "' must be set"); |
| } |
| if (above != null) { |
| try { |
| aboveRate = Double.parseDouble(String.valueOf(above)); |
| } catch (Exception e) { |
| throw new TriggerValidationException(name, ABOVE_RATE_PROP, "Invalid configuration value: '" + above + "': " + e.toString()); |
| } |
| } else { |
| aboveRate = Double.MAX_VALUE; |
| } |
| if (below != null) { |
| try { |
| belowRate = Double.parseDouble(String.valueOf(below)); |
| } catch (Exception e) { |
| throw new TriggerValidationException(name, BELOW_RATE_PROP, "Invalid configuration value: '" + below + "': " + e.toString()); |
| } |
| } else { |
| belowRate = -1; |
| } |
| |
| // node rates |
| above = properties.get(ABOVE_NODE_RATE_PROP); |
| below = properties.get(BELOW_NODE_RATE_PROP); |
| if (above != null) { |
| try { |
| aboveNodeRate = Double.parseDouble(String.valueOf(above)); |
| } catch (Exception e) { |
| throw new TriggerValidationException(name, ABOVE_NODE_RATE_PROP, "Invalid configuration value: '" + above + "': " + e.toString()); |
| } |
| } else { |
| aboveNodeRate = Double.MAX_VALUE; |
| } |
| if (below != null) { |
| try { |
| belowNodeRate = Double.parseDouble(String.valueOf(below)); |
| } catch (Exception e) { |
| throw new TriggerValidationException(name, BELOW_NODE_RATE_PROP, "Invalid configuration value: '" + below + "': " + e.toString()); |
| } |
| } else { |
| belowNodeRate = -1; |
| } |
| |
| String aboveOpStr = String.valueOf(properties.getOrDefault(ABOVE_OP_PROP, CollectionParams.CollectionAction.ADDREPLICA.toLower())); |
| String belowOpStr = String.valueOf(properties.getOrDefault(BELOW_OP_PROP, CollectionParams.CollectionAction.DELETEREPLICA.toLower())); |
| aboveOp = CollectionParams.CollectionAction.get(aboveOpStr); |
| if (aboveOp == null) { |
| throw new TriggerValidationException(getName(), ABOVE_OP_PROP, "unrecognized value: '" + aboveOpStr + "'"); |
| } |
| belowOp = CollectionParams.CollectionAction.get(belowOpStr); |
| if (belowOp == null) { |
| throw new TriggerValidationException(getName(), BELOW_OP_PROP, "unrecognized value: '" + belowOpStr + "'"); |
| } |
| Object aboveNodeObj = properties.getOrDefault(ABOVE_NODE_OP_PROP, CollectionParams.CollectionAction.MOVEREPLICA.toLower()); |
| // do NOT set the default to DELETENODE |
| Object belowNodeObj = properties.get(BELOW_NODE_OP_PROP); |
| try { |
| aboveNodeOp = CollectionParams.CollectionAction.get(String.valueOf(aboveNodeObj)); |
| } catch (Exception e) { |
| throw new TriggerValidationException(getName(), ABOVE_NODE_OP_PROP, "unrecognized value: '" + aboveNodeObj + "'"); |
| } |
| if (belowNodeObj != null) { |
| try { |
| belowNodeOp = CollectionParams.CollectionAction.get(String.valueOf(belowNodeObj)); |
| } catch (Exception e) { |
| throw new TriggerValidationException(getName(), BELOW_NODE_OP_PROP, "unrecognized value: '" + belowNodeObj + "'"); |
| } |
| } |
| } |
| |
| @VisibleForTesting |
| Map<String, Object> getConfig() { |
| Map<String, Object> config = new HashMap<>(); |
| config.put("name", name); |
| config.put(COLLECTIONS_PROP, collections); |
| config.put(AutoScalingParams.SHARD, shard); |
| config.put(AutoScalingParams.NODE, node); |
| config.put(METRIC_PROP, metric); |
| config.put(MAX_OPS_PROP, maxOps); |
| config.put(MIN_REPLICAS_PROP, minReplicas); |
| config.put(ABOVE_RATE_PROP, aboveRate); |
| config.put(BELOW_RATE_PROP, belowRate); |
| config.put(ABOVE_NODE_RATE_PROP, aboveNodeRate); |
| config.put(BELOW_NODE_RATE_PROP, belowNodeRate); |
| config.put(ABOVE_OP_PROP, aboveOp); |
| config.put(ABOVE_NODE_OP_PROP, aboveNodeOp); |
| config.put(BELOW_OP_PROP, belowOp); |
| config.put(BELOW_NODE_OP_PROP, belowNodeOp); |
| return config; |
| } |
| |
| @Override |
| protected Map<String, Object> getState() { |
| return state; |
| } |
| |
| @Override |
| protected void setState(Map<String, Object> state) { |
| lastCollectionEvent.clear(); |
| lastNodeEvent.clear(); |
| lastShardEvent.clear(); |
| lastReplicaEvent.clear(); |
| @SuppressWarnings({"unchecked"}) |
| Map<String, Long> collTimes = (Map<String, Long>)state.get("lastCollectionEvent"); |
| if (collTimes != null) { |
| lastCollectionEvent.putAll(collTimes); |
| } |
| @SuppressWarnings({"unchecked"}) |
| Map<String, Long> nodeTimes = (Map<String, Long>)state.get("lastNodeEvent"); |
| if (nodeTimes != null) { |
| lastNodeEvent.putAll(nodeTimes); |
| } |
| @SuppressWarnings({"unchecked"}) |
| Map<String, Long> shardTimes = (Map<String, Long>)state.get("lastShardEvent"); |
| if (shardTimes != null) { |
| lastShardEvent.putAll(shardTimes); |
| } |
| @SuppressWarnings({"unchecked"}) |
| Map<String, Long> replicaTimes = (Map<String, Long>)state.get("lastReplicaEvent"); |
| if (replicaTimes != null) { |
| lastReplicaEvent.putAll(replicaTimes); |
| } |
| } |
| |
| @Override |
| public void restoreState(AutoScaling.Trigger old) { |
| assert old.isClosed(); |
| if (old instanceof SearchRateTrigger) { |
| SearchRateTrigger that = (SearchRateTrigger)old; |
| assert this.name.equals(that.name); |
| this.lastCollectionEvent.clear(); |
| this.lastNodeEvent.clear(); |
| this.lastShardEvent.clear(); |
| this.lastReplicaEvent.clear(); |
| this.lastCollectionEvent.putAll(that.lastCollectionEvent); |
| this.lastNodeEvent.putAll(that.lastNodeEvent); |
| this.lastShardEvent.putAll(that.lastShardEvent); |
| this.lastReplicaEvent.putAll(that.lastReplicaEvent); |
| } else { |
| throw new SolrException(SolrException.ErrorCode.INVALID_STATE, |
| "Unable to restore state from an unknown type of trigger"); |
| } |
| |
| } |
| |
| @Override |
| public void run() { |
| AutoScaling.TriggerEventProcessor processor = processorRef.get(); |
| if (processor == null) { |
| return; |
| } |
| |
| // collection, shard, list(replica + rate) |
| Map<String, Map<String, List<ReplicaInfo>>> collectionRates = new HashMap<>(); |
| // node, rate |
| Map<String, AtomicDouble> nodeRates = new HashMap<>(); |
| // this replication factor only considers replica types that are searchable |
| // collection, shard, RF |
| Map<String, Map<String, AtomicInteger>> searchableReplicationFactors = new HashMap<>(); |
| |
| ClusterState clusterState = null; |
| try { |
| clusterState = cloudManager.getClusterStateProvider().getClusterState(); |
| } catch (IOException e) { |
| log.warn("Error getting ClusterState", e); |
| return; |
| } |
| for (String node : cloudManager.getClusterStateProvider().getLiveNodes()) { |
| Map<String, ReplicaInfo> metricTags = new HashMap<>(); |
| // coll, shard, replica |
| Map<String, Map<String, List<ReplicaInfo>>> infos = cloudManager.getNodeStateProvider().getReplicaInfo(node, Collections.emptyList()); |
| infos.forEach((coll, shards) -> { |
| Map<String, AtomicInteger> replPerShard = searchableReplicationFactors.computeIfAbsent(coll, c -> new HashMap<>()); |
| shards.forEach((sh, replicas) -> { |
| AtomicInteger repl = replPerShard.computeIfAbsent(sh, s -> new AtomicInteger()); |
| replicas.forEach(replica -> { |
| // skip non-active replicas |
| if (replica.getState() != Replica.State.ACTIVE) { |
| return; |
| } |
| repl.incrementAndGet(); |
| // we have to translate to the metrics registry name, which uses "_replica_nN" as suffix |
| String replicaName = Utils.parseMetricsReplicaName(coll, replica.getCore()); |
| if (replicaName == null) { // should never happen??? |
| replicaName = replica.getName(); // which is actually coreNode name... |
| } |
| String registry = SolrCoreMetricManager.createRegistryName(true, coll, sh, replicaName, null); |
| String tag = "metrics:" + registry + ":" + metric; |
| metricTags.put(tag, replica); |
| }); |
| }); |
| }); |
| if (metricTags.isEmpty()) { |
| continue; |
| } |
| Map<String, Object> rates = cloudManager.getNodeStateProvider().getNodeValues(node, metricTags.keySet()); |
| if (log.isDebugEnabled()) { |
| log.debug("### rates for node {}", node); |
| rates.forEach((tag, rate) -> log.debug("### " + tag + "\t" + rate)); // logOk |
| } |
| rates.forEach((tag, rate) -> { |
| ReplicaInfo info = metricTags.get(tag); |
| if (info == null) { |
| log.warn("Missing replica info for response tag {}", tag); |
| } else { |
| Map<String, List<ReplicaInfo>> perCollection = collectionRates.computeIfAbsent(info.getCollection(), s -> new HashMap<>()); |
| List<ReplicaInfo> perShard = perCollection.computeIfAbsent(info.getShard(), s -> new ArrayList<>()); |
| info = (ReplicaInfo)info.clone(); |
| info.getVariables().put(AutoScalingParams.RATE, ((Number)rate).doubleValue()); |
| perShard.add(info); |
| AtomicDouble perNode = nodeRates.computeIfAbsent(node, s -> new AtomicDouble()); |
| perNode.addAndGet(((Number)rate).doubleValue()); |
| } |
| }); |
| } |
| |
| if (log.isDebugEnabled()) { |
| collectionRates.forEach((coll, collRates) -> { |
| log.debug("## Collection: {}", coll); |
| collRates.forEach((s, replicas) -> { |
| log.debug("## - {}", s); |
| replicas.forEach(ri -> log.debug("## {} {}", ri.getCore(), ri.getVariable(AutoScalingParams.RATE))); //logOk |
| }); |
| }); |
| } |
| long now = cloudManager.getTimeSource().getTimeNs(); |
| Map<String, Double> hotNodes = new HashMap<>(); |
| Map<String, Double> coldNodes = new HashMap<>(); |
| |
| // check for exceeded rates and filter out those with less than waitFor from previous events |
| nodeRates.entrySet().stream() |
| .filter(entry -> node.equals(Policy.ANY) || node.equals(entry.getKey())) |
| .forEach(entry -> { |
| if (entry.getValue().get() > aboveNodeRate) { |
| if (waitForElapsed(entry.getKey(), now, lastNodeEvent)) { |
| hotNodes.put(entry.getKey(), entry.getValue().get()); |
| } |
| } else if (entry.getValue().get() < belowNodeRate) { |
| if (waitForElapsed(entry.getKey(), now, lastNodeEvent)) { |
| coldNodes.put(entry.getKey(), entry.getValue().get()); |
| } |
| } else { |
| // no violation - clear waitForElapsed |
| // (violation is only valid if it persists throughout waitFor) |
| lastNodeEvent.remove(entry.getKey()); |
| } |
| }); |
| |
| Map<String, Map<String, Double>> hotShards = new HashMap<>(); |
| Map<String, Map<String, Double>> coldShards = new HashMap<>(); |
| List<ReplicaInfo> hotReplicas = new ArrayList<>(); |
| List<ReplicaInfo> coldReplicas = new ArrayList<>(); |
| collectionRates.forEach((coll, shardRates) -> { |
| shardRates.forEach((sh, replicaRates) -> { |
| double totalShardRate = replicaRates.stream() |
| .map(r -> { |
| String elapsedKey = r.getCollection() + "." + r.getCore(); |
| if ((Double)r.getVariable(AutoScalingParams.RATE) > aboveRate) { |
| if (waitForElapsed(elapsedKey, now, lastReplicaEvent)) { |
| hotReplicas.add(r); |
| } |
| } else if ((Double)r.getVariable(AutoScalingParams.RATE) < belowRate) { |
| if (waitForElapsed(elapsedKey, now, lastReplicaEvent)) { |
| coldReplicas.add(r); |
| } |
| } else { |
| // no violation - clear waitForElapsed |
| lastReplicaEvent.remove(elapsedKey); |
| } |
| return r; |
| }) |
| .mapToDouble(r -> (Double)r.getVariable(AutoScalingParams.RATE)).sum(); |
| // calculate average shard rate over all searchable replicas (see SOLR-12470) |
| double shardRate = totalShardRate / searchableReplicationFactors.get(coll).get(sh).doubleValue(); |
| String elapsedKey = coll + "." + sh; |
| log.debug("-- {}: totalShardRate={}, shardRate={}", elapsedKey, totalShardRate, shardRate); |
| if ((collections.isEmpty() || collections.contains(coll)) && |
| (shard.equals(Policy.ANY) || shard.equals(sh))) { |
| if (shardRate > aboveRate) { |
| if (waitForElapsed(elapsedKey, now, lastShardEvent)) { |
| hotShards.computeIfAbsent(coll, s -> new HashMap<>()).put(sh, shardRate); |
| } |
| } else if (shardRate < belowRate) { |
| if (waitForElapsed(elapsedKey, now, lastShardEvent)) { |
| coldShards.computeIfAbsent(coll, s -> new HashMap<>()).put(sh, shardRate); |
| log.debug("-- coldShard waitFor elapsed {}", elapsedKey); |
| } else { |
| if (log.isDebugEnabled()) { |
| Long lastTime = lastShardEvent.computeIfAbsent(elapsedKey, s -> now); |
| long elapsed = TimeUnit.SECONDS.convert(now - lastTime, TimeUnit.NANOSECONDS); |
| if (log.isDebugEnabled()) { |
| log.debug("-- waitFor didn't elapse for {}, waitFor={}, elapsed={}", elapsedKey, getWaitForSecond(), elapsed); |
| } |
| } |
| } |
| } else { |
| // no violation - clear waitForElapsed |
| lastShardEvent.remove(elapsedKey); |
| } |
| } |
| }); |
| }); |
| |
| Map<String, Double> hotCollections = new HashMap<>(); |
| Map<String, Double> coldCollections = new HashMap<>(); |
| collectionRates.forEach((coll, shardRates) -> { |
| double total = shardRates.entrySet().stream() |
| .mapToDouble(e -> e.getValue().stream() |
| .mapToDouble(r -> (Double)r.getVariable(AutoScalingParams.RATE)).sum()).sum(); |
| if (collections.isEmpty() || collections.contains(coll)) { |
| if (total > aboveRate) { |
| if (waitForElapsed(coll, now, lastCollectionEvent)) { |
| hotCollections.put(coll, total); |
| } |
| } else if (total < belowRate) { |
| if (waitForElapsed(coll, now, lastCollectionEvent)) { |
| coldCollections.put(coll, total); |
| } |
| } else { |
| // no violation - clear waitForElapsed |
| lastCollectionEvent.remove(coll); |
| } |
| } |
| }); |
| |
| if (hotCollections.isEmpty() && |
| hotShards.isEmpty() && |
| hotReplicas.isEmpty() && |
| hotNodes.isEmpty() && |
| coldCollections.isEmpty() && |
| coldShards.isEmpty() && |
| coldReplicas.isEmpty() && |
| coldNodes.isEmpty()) { |
| return; |
| } |
| |
| // generate event |
| |
| // find the earliest time when a condition was exceeded |
| final AtomicLong eventTime = new AtomicLong(now); |
| hotCollections.forEach((c, r) -> { |
| long time = lastCollectionEvent.get(c); |
| if (eventTime.get() > time) { |
| eventTime.set(time); |
| } |
| }); |
| coldCollections.forEach((c, r) -> { |
| long time = lastCollectionEvent.get(c); |
| if (eventTime.get() > time) { |
| eventTime.set(time); |
| } |
| }); |
| hotShards.forEach((c, shards) -> { |
| shards.forEach((s, r) -> { |
| long time = lastShardEvent.get(c + "." + s); |
| if (eventTime.get() > time) { |
| eventTime.set(time); |
| } |
| }); |
| }); |
| coldShards.forEach((c, shards) -> { |
| shards.forEach((s, r) -> { |
| long time = lastShardEvent.get(c + "." + s); |
| if (eventTime.get() > time) { |
| eventTime.set(time); |
| } |
| }); |
| }); |
| hotReplicas.forEach(r -> { |
| long time = lastReplicaEvent.get(r.getCollection() + "." + r.getCore()); |
| if (eventTime.get() > time) { |
| eventTime.set(time); |
| } |
| }); |
| coldReplicas.forEach(r -> { |
| long time = lastReplicaEvent.get(r.getCollection() + "." + r.getCore()); |
| if (eventTime.get() > time) { |
| eventTime.set(time); |
| } |
| }); |
| hotNodes.forEach((n, r) -> { |
| long time = lastNodeEvent.get(n); |
| if (eventTime.get() > time) { |
| eventTime.set(time); |
| } |
| }); |
| coldNodes.forEach((n, r) -> { |
| long time = lastNodeEvent.get(n); |
| if (eventTime.get() > time) { |
| eventTime.set(time); |
| } |
| }); |
| |
| final List<TriggerEvent.Op> ops = new ArrayList<>(); |
| final Set<String> violations = new HashSet<>(); |
| |
| calculateHotOps(ops, violations, searchableReplicationFactors, hotNodes, hotCollections, hotShards, hotReplicas); |
| calculateColdOps(ops, violations, clusterState, searchableReplicationFactors, coldNodes, coldCollections, coldShards, coldReplicas); |
| |
| if (ops.isEmpty()) { |
| return; |
| } |
| |
| if (processor.process(new SearchRateEvent(getName(), eventTime.get(), ops, |
| hotNodes, hotCollections, hotShards, hotReplicas, |
| coldNodes, coldCollections, coldShards, coldReplicas, violations))) { |
| // update lastEvent times |
| hotNodes.keySet().forEach(node -> lastNodeEvent.put(node, now)); |
| coldNodes.keySet().forEach(node -> lastNodeEvent.put(node, now)); |
| hotCollections.keySet().forEach(coll -> lastCollectionEvent.put(coll, now)); |
| coldCollections.keySet().forEach(coll -> lastCollectionEvent.put(coll, now)); |
| hotShards.entrySet().forEach(e -> e.getValue() |
| .forEach((sh, rate) -> lastShardEvent.put(e.getKey() + "." + sh, now))); |
| coldShards.entrySet().forEach(e -> e.getValue() |
| .forEach((sh, rate) -> lastShardEvent.put(e.getKey() + "." + sh, now))); |
| hotReplicas.forEach(r -> lastReplicaEvent.put(r.getCollection() + "." + r.getCore(), now)); |
| coldReplicas.forEach(r -> lastReplicaEvent.put(r.getCollection() + "." + r.getCore(), now)); |
| } |
| } |
| |
| private void calculateHotOps(List<TriggerEvent.Op> ops, |
| Set<String> violations, |
| Map<String, Map<String, AtomicInteger>> searchableReplicationFactors, |
| Map<String, Double> hotNodes, |
| Map<String, Double> hotCollections, |
| Map<String, Map<String, Double>> hotShards, |
| List<ReplicaInfo> hotReplicas) { |
| // calculate the number of replicas to add to each hot shard, based on how much the rate was |
| // exceeded - but within limits. |
| |
| // first resolve a situation when only a node is hot but no collection / shard is hot |
| // TODO: eventually we may want to commission a new node |
| if (!hotNodes.isEmpty()) { |
| if (hotShards.isEmpty() && hotCollections.isEmpty()) { |
| // move replicas around |
| if (aboveNodeOp != null) { |
| hotNodes.forEach((n, r) -> { |
| ops.add(new TriggerEvent.Op(aboveNodeOp, Suggester.Hint.SRC_NODE, n)); |
| violations.add(HOT_NODES); |
| }); |
| } |
| } else { |
| // ignore - hot shards will result in changes that will change hot node status anyway |
| } |
| } |
| // add replicas |
| Map<String, Map<String, List<Pair<String, String>>>> hints = new HashMap<>(); |
| |
| // HOT COLLECTIONS |
| // currently we don't do anything for hot collections. Theoretically we could add |
| // 1 replica more to each shard, based on how close to the threshold each shard is |
| // but it's probably better to wait for a shard to become hot and be more precise. |
| |
| // HOT SHARDS |
| |
| hotShards.forEach((coll, shards) -> shards.forEach((s, r) -> { |
| List<Pair<String, String>> perShard = hints |
| .computeIfAbsent(coll, c -> new HashMap<>()) |
| .computeIfAbsent(s, sh -> new ArrayList<>()); |
| addReplicaHints(coll, s, r, searchableReplicationFactors.get(coll).get(s).get(), perShard); |
| violations.add(HOT_SHARDS); |
| })); |
| |
| // HOT REPLICAS |
| // Hot replicas (while their shards are not hot) may be caused by |
| // dumb clients that use direct replica URLs - this is beyond our control |
| // so ignore them. |
| |
| hints.values().forEach(m -> m.values().forEach(lst -> lst.forEach(p -> { |
| ops.add(new TriggerEvent.Op(aboveOp, Suggester.Hint.COLL_SHARD, p)); |
| }))); |
| |
| } |
| |
| /** |
| * This method implements a primitive form of proportional controller with a limiter. |
| */ |
| @SuppressWarnings({"unchecked", "rawtypes"}) |
| private void addReplicaHints(String collection, String shard, double r, int replicationFactor, List<Pair<String, String>> hints) { |
| int numReplicas = (int)Math.round((r - aboveRate) / (double) replicationFactor); |
| // in one event add at least 1 replica |
| if (numReplicas < 1) { |
| numReplicas = 1; |
| } |
| // ... and at most maxOps replicas |
| if (numReplicas > maxOps) { |
| numReplicas = maxOps; |
| } |
| for (int i = 0; i < numReplicas; i++) { |
| hints.add(new Pair(collection, shard)); |
| } |
| } |
| |
| private void calculateColdOps(List<TriggerEvent.Op> ops, |
| Set<String> violations, |
| ClusterState clusterState, |
| Map<String, Map<String, AtomicInteger>> searchableReplicationFactors, |
| Map<String, Double> coldNodes, |
| Map<String, Double> coldCollections, |
| Map<String, Map<String, Double>> coldShards, |
| List<ReplicaInfo> coldReplicas) { |
| // COLD COLLECTIONS |
| // Probably can't do anything reasonable about whole cold collections |
| // because they may be needed even if not used. |
| |
| // COLD SHARDS & COLD REPLICAS: |
| // We remove cold replicas only from cold shards, otherwise we are susceptible to uneven |
| // replica routing (which is beyond our control). |
| // If we removed replicas from non-cold shards we could accidentally bring that shard into |
| // the hot range, which would result in adding replica, and that replica could again stay cold due to |
| // the same routing issue, which then would lead to removing that replica, etc, etc... |
| |
| // Remove cold replicas but only when there's at least a minimum number of searchable |
| // replicas still available (additional non-searchable replicas may exist, too) |
| // NOTE: do this before adding ops for DELETENODE because we don't want to attempt |
| // deleting replicas that have been already moved elsewhere |
| Map<String, Map<String, List<ReplicaInfo>>> byCollectionByShard = new HashMap<>(); |
| coldReplicas.forEach(ri -> { |
| byCollectionByShard.computeIfAbsent(ri.getCollection(), c -> new HashMap<>()) |
| .computeIfAbsent(ri.getShard(), s -> new ArrayList<>()) |
| .add(ri); |
| }); |
| coldShards.forEach((coll, perShard) -> { |
| perShard.forEach((shard, rate) -> { |
| List<ReplicaInfo> replicas = byCollectionByShard |
| .getOrDefault(coll, Collections.emptyMap()) |
| .getOrDefault(shard, Collections.emptyList()); |
| if (replicas.isEmpty()) { |
| return; |
| } |
| // only delete if there's at least minRF searchable replicas left |
| int rf = searchableReplicationFactors.get(coll).get(shard).get(); |
| // assume first that we only really need a leader and we may be |
| // allowed to remove other replicas |
| int minRF = 1; |
| // but check the official RF and don't go below that |
| Integer RF = clusterState.getCollection(coll).getReplicationFactor(); |
| if (RF != null) { |
| minRF = RF; |
| } |
| // unless minReplicas is set explicitly |
| if (minReplicas != null) { |
| minRF = minReplicas; |
| } |
| if (minRF < 1) { |
| minRF = 1; |
| } |
| if (rf > minRF) { |
| // delete at most maxOps replicas at a time |
| AtomicInteger limit = new AtomicInteger(Math.min(maxOps, rf - minRF)); |
| replicas.forEach(ri -> { |
| if (limit.get() == 0) { |
| return; |
| } |
| // don't delete a leader |
| if (ri.getBool(ZkStateReader.LEADER_PROP, false)) { |
| return; |
| } |
| TriggerEvent.Op op = new TriggerEvent.Op(belowOp, |
| Suggester.Hint.COLL_SHARD, new Pair<>(ri.getCollection(), ri.getShard())); |
| op.addHint(Suggester.Hint.REPLICA, ri.getName()); |
| ops.add(op); |
| violations.add(COLD_SHARDS); |
| limit.decrementAndGet(); |
| }); |
| } |
| }); |
| }); |
| |
| // COLD NODES: |
| // Unlike the case of hot nodes, if a node is cold then any monitored |
| // collections / shards / replicas located on that node are cold, too. |
| // HOWEVER, we check only replicas from selected collections / shards, |
| // so deleting a cold node is dangerous because it may interfere with these |
| // non-monitored resources - this is the reason the default belowNodeOp is null / ignored. |
| // |
| // Also, note that due to the way activity is measured only nodes that contain any |
| // monitored resources are considered - there may be cold nodes in the cluster that don't |
| // belong to the monitored collections and they will be ignored. |
| if (belowNodeOp != null) { |
| coldNodes.forEach((node, rate) -> { |
| ops.add(new TriggerEvent.Op(belowNodeOp, Suggester.Hint.SRC_NODE, node)); |
| violations.add(COLD_NODES); |
| }); |
| } |
| |
| |
| } |
| |
| private boolean waitForElapsed(String name, long now, Map<String, Long> lastEventMap) { |
| Long lastTime = lastEventMap.computeIfAbsent(name, s -> now); |
| long elapsed = TimeUnit.SECONDS.convert(now - lastTime, TimeUnit.NANOSECONDS); |
| if (log.isTraceEnabled()) { |
| log.trace("name={}, lastTime={}, elapsed={}, waitFor={}", name, lastTime, elapsed, getWaitForSecond()); |
| } |
| if (TimeUnit.SECONDS.convert(now - lastTime, TimeUnit.NANOSECONDS) < getWaitForSecond()) { |
| return false; |
| } |
| return true; |
| } |
| |
| public static class SearchRateEvent extends TriggerEvent { |
| public SearchRateEvent(String source, long eventTime, List<Op> ops, |
| Map<String, Double> hotNodes, |
| Map<String, Double> hotCollections, |
| Map<String, Map<String, Double>> hotShards, |
| List<ReplicaInfo> hotReplicas, |
| Map<String, Double> coldNodes, |
| Map<String, Double> coldCollections, |
| Map<String, Map<String, Double>> coldShards, |
| List<ReplicaInfo> coldReplicas, |
| Set<String> violations) { |
| super(TriggerEventType.SEARCHRATE, source, eventTime, null); |
| properties.put(TriggerEvent.REQUESTED_OPS, ops); |
| properties.put(HOT_NODES, hotNodes); |
| properties.put(HOT_COLLECTIONS, hotCollections); |
| properties.put(HOT_SHARDS, hotShards); |
| properties.put(HOT_REPLICAS, hotReplicas); |
| properties.put(COLD_NODES, coldNodes); |
| properties.put(COLD_COLLECTIONS, coldCollections); |
| properties.put(COLD_SHARDS, coldShards); |
| properties.put(COLD_REPLICAS, coldReplicas); |
| properties.put(VIOLATION_PROP, violations); |
| } |
| } |
| } |