blob: eb0ce52eb54b293f1cf0e15cde3375fd5231525b [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.solr.cloud.autoscaling;
import java.io.IOException;
import java.lang.invoke.MethodHandles;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.TreeMap;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import java.util.Locale;
import org.apache.solr.client.solrj.cloud.SolrCloudManager;
import org.apache.solr.client.solrj.cloud.autoscaling.ReplicaInfo;
import org.apache.solr.client.solrj.cloud.autoscaling.Suggester;
import org.apache.solr.client.solrj.cloud.autoscaling.TriggerEventType;
import org.apache.solr.common.SolrException;
import org.apache.solr.common.cloud.ClusterState;
import org.apache.solr.common.cloud.DocCollection;
import org.apache.solr.common.cloud.Replica;
import org.apache.solr.common.cloud.Slice;
import org.apache.solr.common.params.CollectionParams;
import org.apache.solr.common.params.CommonAdminParams;
import org.apache.solr.common.util.Pair;
import org.apache.solr.common.util.StrUtils;
import org.apache.solr.common.util.Utils;
import org.apache.solr.core.SolrResourceLoader;
import org.apache.solr.metrics.SolrCoreMetricManager;
import org.apache.solr.update.SolrIndexSplitter;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import static org.apache.solr.client.solrj.cloud.autoscaling.Variable.Type.CORE_IDX;
/**
* @deprecated to be removed in Solr 9.0 (see SOLR-14656)
*/
public class IndexSizeTrigger extends TriggerBase {
private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
// configuration properties
public static final String ABOVE_BYTES_PROP = "aboveBytes";
public static final String ABOVE_DOCS_PROP = "aboveDocs";
public static final String ABOVE_OP_PROP = "aboveOp";
public static final String BELOW_BYTES_PROP = "belowBytes";
public static final String BELOW_DOCS_PROP = "belowDocs";
public static final String BELOW_OP_PROP = "belowOp";
public static final String COLLECTIONS_PROP = "collections";
public static final String MAX_OPS_PROP = "maxOps";
public static final String SPLIT_FUZZ_PROP = CommonAdminParams.SPLIT_FUZZ;
public static final String SPLIT_METHOD_PROP = CommonAdminParams.SPLIT_METHOD;
public static final String SPLIT_BY_PREFIX = CommonAdminParams.SPLIT_BY_PREFIX;
// event properties
public static final String BYTES_SIZE_KEY = "__bytes__";
public static final String TOTAL_BYTES_SIZE_KEY = "__total_bytes__";
public static final String DOCS_SIZE_KEY = "__docs__";
public static final String MAX_DOC_KEY = "__maxDoc__";
public static final String COMMIT_SIZE_KEY = "__commitBytes__";
public static final String ABOVE_SIZE_KEY = "aboveSize";
public static final String BELOW_SIZE_KEY = "belowSize";
public static final String VIOLATION_KEY = "violationType";
public static final int DEFAULT_MAX_OPS = 10;
public enum Unit { bytes, docs }
private long aboveBytes, aboveDocs, belowBytes, belowDocs;
private int maxOps;
private SolrIndexSplitter.SplitMethod splitMethod;
private boolean splitByPrefix;
private float splitFuzz;
private CollectionParams.CollectionAction aboveOp, belowOp;
private final Set<String> collections = new HashSet<>();
private final Map<String, Long> lastAboveEventMap = new ConcurrentHashMap<>();
private final Map<String, Long> lastBelowEventMap = new ConcurrentHashMap<>();
public IndexSizeTrigger(String name) {
super(TriggerEventType.INDEXSIZE, name);
TriggerUtils.validProperties(validProperties,
ABOVE_BYTES_PROP, ABOVE_DOCS_PROP, ABOVE_OP_PROP,
BELOW_BYTES_PROP, BELOW_DOCS_PROP, BELOW_OP_PROP,
COLLECTIONS_PROP, MAX_OPS_PROP,
SPLIT_METHOD_PROP, SPLIT_FUZZ_PROP, SPLIT_BY_PREFIX);
}
@Override
public void configure(SolrResourceLoader loader, SolrCloudManager cloudManager, Map<String, Object> properties) throws TriggerValidationException {
super.configure(loader, cloudManager, properties);
String aboveStr = String.valueOf(properties.getOrDefault(ABOVE_BYTES_PROP, Long.MAX_VALUE));
String belowStr = String.valueOf(properties.getOrDefault(BELOW_BYTES_PROP, -1));
try {
aboveBytes = Long.parseLong(aboveStr);
if (aboveBytes <= 0) {
throw new Exception("value must be > 0");
}
} catch (Exception e) {
throw new TriggerValidationException(getName(), ABOVE_BYTES_PROP, "invalid value '" + aboveStr + "': " + e.toString());
}
try {
belowBytes = Long.parseLong(belowStr);
if (belowBytes < 0) {
belowBytes = -1;
}
} catch (Exception e) {
throw new TriggerValidationException(getName(), BELOW_BYTES_PROP, "invalid value '" + belowStr + "': " + e.toString());
}
// below must be at least 2x smaller than above, otherwise splitting a shard
// would immediately put the shard below the threshold and cause the mergeshards action
if (belowBytes > 0 && (belowBytes * 2 > aboveBytes)) {
throw new TriggerValidationException(getName(), BELOW_BYTES_PROP,
"invalid value " + belowBytes + ", should be less than half of '" + ABOVE_BYTES_PROP + "' value, which is " + aboveBytes);
}
// do the same for docs bounds
aboveStr = String.valueOf(properties.getOrDefault(ABOVE_DOCS_PROP, Long.MAX_VALUE));
belowStr = String.valueOf(properties.getOrDefault(BELOW_DOCS_PROP, -1));
try {
aboveDocs = Long.parseLong(aboveStr);
if (aboveDocs <= 0) {
throw new Exception("value must be > 0");
}
} catch (Exception e) {
throw new TriggerValidationException(getName(), ABOVE_DOCS_PROP, "invalid value '" + aboveStr + "': " + e.toString());
}
try {
belowDocs = Long.parseLong(belowStr);
if (belowDocs < 0) {
belowDocs = -1;
}
} catch (Exception e) {
throw new TriggerValidationException(getName(), BELOW_DOCS_PROP, "invalid value '" + belowStr + "': " + e.toString());
}
// below must be at least 2x smaller than above, otherwise splitting a shard
// would immediately put the shard below the threshold and cause the mergeshards action
if (belowDocs > 0 && (belowDocs * 2 > aboveDocs)) {
throw new TriggerValidationException(getName(), BELOW_DOCS_PROP,
"invalid value " + belowDocs + ", should be less than half of '" + ABOVE_DOCS_PROP + "' value, which is " + aboveDocs);
}
String collectionsString = (String) properties.get(COLLECTIONS_PROP);
if (collectionsString != null && !collectionsString.isEmpty()) {
collections.addAll(StrUtils.splitSmart(collectionsString, ','));
}
String aboveOpStr = String.valueOf(properties.getOrDefault(ABOVE_OP_PROP, CollectionParams.CollectionAction.SPLITSHARD.toLower()));
// TODO: this is a placeholder until SOLR-9407 is implemented
String belowOpStr = String.valueOf(properties.getOrDefault(BELOW_OP_PROP, CollectionParams.CollectionAction.MERGESHARDS.toLower()));
aboveOp = CollectionParams.CollectionAction.get(aboveOpStr);
if (aboveOp == null) {
throw new TriggerValidationException(getName(), ABOVE_OP_PROP, "unrecognized value of: '" + aboveOpStr + "'");
}
belowOp = CollectionParams.CollectionAction.get(belowOpStr);
if (belowOp == null) {
throw new TriggerValidationException(getName(), BELOW_OP_PROP, "unrecognized value of: '" + belowOpStr + "'");
}
String maxOpsStr = String.valueOf(properties.getOrDefault(MAX_OPS_PROP, DEFAULT_MAX_OPS));
try {
maxOps = Integer.parseInt(maxOpsStr);
if (maxOps < 1) {
throw new Exception("must be > 1");
}
} catch (Exception e) {
throw new TriggerValidationException(getName(), MAX_OPS_PROP, "invalid value: '" + maxOpsStr + "': " + e.getMessage());
}
String methodStr = (String)properties.getOrDefault(SPLIT_METHOD_PROP, SolrIndexSplitter.SplitMethod.LINK.toLower());
splitMethod = SolrIndexSplitter.SplitMethod.get(methodStr);
if (splitMethod == null) {
throw new TriggerValidationException(getName(), SPLIT_METHOD_PROP, "unrecognized value of: '" + methodStr + "'");
}
String fuzzStr = String.valueOf(properties.getOrDefault(SPLIT_FUZZ_PROP, 0.0f));
try {
splitFuzz = Float.parseFloat(fuzzStr);
} catch (Exception e) {
throw new TriggerValidationException(getName(), SPLIT_FUZZ_PROP, "invalid value: '" + fuzzStr + "': " + e.getMessage());
}
String splitByPrefixStr = String.valueOf(properties.getOrDefault(SPLIT_BY_PREFIX, false));
try {
splitByPrefix = getValidBool(splitByPrefixStr);
} catch (Exception e) {
throw new TriggerValidationException(getName(), SPLIT_BY_PREFIX, "invalid value: '" + splitByPrefixStr + "': " + e.getMessage());
}
}
private boolean getValidBool(String str) throws Exception {
if (str != null && (str.toLowerCase(Locale.ROOT).equals("true") || str.toLowerCase(Locale.ROOT).equals("false"))) {
return Boolean.parseBoolean(str);
}
throw new IllegalArgumentException("Expected a valid boolean value but got " + str);
}
@Override
protected Map<String, Object> getState() {
Map<String, Object> state = new HashMap<>();
state.put("lastAboveEventMap", lastAboveEventMap);
state.put("lastBelowEventMap", lastBelowEventMap);
return state;
}
@Override
@SuppressWarnings({"unchecked"})
protected void setState(Map<String, Object> state) {
this.lastAboveEventMap.clear();
this.lastBelowEventMap.clear();
Map<String, Long> replicaVsTime = (Map<String, Long>)state.get("lastAboveEventMap");
if (replicaVsTime != null) {
this.lastAboveEventMap.putAll(replicaVsTime);
}
replicaVsTime = (Map<String, Long>)state.get("lastBelowEventMap");
if (replicaVsTime != null) {
this.lastBelowEventMap.putAll(replicaVsTime);
}
}
@Override
public void restoreState(AutoScaling.Trigger old) {
assert old.isClosed();
if (old instanceof IndexSizeTrigger) {
IndexSizeTrigger that = (IndexSizeTrigger)old;
assert this.name.equals(that.name);
this.lastAboveEventMap.clear();
this.lastBelowEventMap.clear();
this.lastAboveEventMap.putAll(that.lastAboveEventMap);
this.lastBelowEventMap.putAll(that.lastBelowEventMap);
} else {
throw new SolrException(SolrException.ErrorCode.INVALID_STATE,
"Unable to restore state from an unknown type of trigger");
}
}
@Override
@SuppressWarnings({"unchecked", "rawtypes"})
public void run() {
synchronized(this) {
if (isClosed) {
log.warn("{} ran but was already closed", getName());
return;
}
}
AutoScaling.TriggerEventProcessor processor = processorRef.get();
if (processor == null) {
return;
}
// replica name / info + size, retrieved from leaders only
Map<String, ReplicaInfo> currentSizes = new HashMap<>();
try {
ClusterState clusterState = cloudManager.getClusterStateProvider().getClusterState();
for (String node : clusterState.getLiveNodes()) {
Map<String, ReplicaInfo> metricTags = new HashMap<>();
// coll, shard, replica
Map<String, Map<String, List<ReplicaInfo>>> infos = cloudManager.getNodeStateProvider().getReplicaInfo(node, Collections.emptyList());
infos.forEach((coll, shards) -> {
if (!collections.isEmpty() && !collections.contains(coll)) {
return;
}
DocCollection docCollection = clusterState.getCollection(coll);
shards.forEach((sh, replicas) -> {
// check only the leader replica in an active shard
Slice s = docCollection.getSlice(sh);
if (s.getState() != Slice.State.ACTIVE) {
return;
}
Replica r = s.getLeader();
// no leader - don't do anything
if (r == null) {
return;
}
// not on this node
if (!r.getNodeName().equals(node)) {
return;
}
// find ReplicaInfo
ReplicaInfo info = null;
for (ReplicaInfo ri : replicas) {
if (r.getCoreName().equals(ri.getCore())) {
info = ri;
break;
}
}
if (info == null) {
// probably replica is not on this node?
return;
}
// we have to translate to the metrics registry name, which uses "_replica_nN" as suffix
String replicaName = Utils.parseMetricsReplicaName(coll, info.getCore());
if (replicaName == null) { // should never happen???
replicaName = info.getName(); // which is actually coreNode name...
}
String registry = SolrCoreMetricManager.createRegistryName(true, coll, sh, replicaName, null);
String tag = "metrics:" + registry + ":" + CORE_IDX.metricsAttribute;
metricTags.put(tag, info);
tag = "metrics:" + registry + ":SEARCHER.searcher.numDocs";
metricTags.put(tag, info);
tag = "metrics:" + registry + ":SEARCHER.searcher.maxDoc";
metricTags.put(tag, info);
tag = "metrics:" + registry + ":SEARCHER.searcher.indexCommitSize";
metricTags.put(tag, info);
});
});
if (metricTags.isEmpty()) {
continue;
}
Map<String, Object> sizes = cloudManager.getNodeStateProvider().getNodeValues(node, metricTags.keySet());
sizes.forEach((tag, size) -> {
final ReplicaInfo info = metricTags.get(tag);
if (info == null) {
log.warn("Missing replica info for response tag {}", tag);
} else {
// verify that it's a Number
if (!(size instanceof Number)) {
log.warn("invalid size value for tag {} - not a number: '{}' is {}", tag, size, size.getClass().getName());
return;
}
ReplicaInfo currentInfo = currentSizes.computeIfAbsent(info.getCore(), k -> (ReplicaInfo)info.clone());
if (tag.contains("INDEX")) {
currentInfo.getVariables().put(TOTAL_BYTES_SIZE_KEY, ((Number) size).longValue());
} else if (tag.endsWith("SEARCHER.searcher.numDocs")) {
currentInfo.getVariables().put(DOCS_SIZE_KEY, ((Number) size).longValue());
} else if (tag.endsWith("SEARCHER.searcher.maxDoc")) {
currentInfo.getVariables().put(MAX_DOC_KEY, ((Number) size).longValue());
} else if (tag.endsWith("SEARCHER.searcher.indexCommitSize")) {
currentInfo.getVariables().put(COMMIT_SIZE_KEY, ((Number) size).longValue());
}
}
});
}
} catch (IOException e) {
log.warn("Error running trigger {}", getName(), e);
return;
}
long now = cloudManager.getTimeSource().getTimeNs();
// now check thresholds
// collection / list(info)
Map<String, List<ReplicaInfo>> aboveSize = new HashMap<>();
Set<String> splittable = new HashSet<>();
currentSizes.forEach((coreName, info) -> {
// calculate estimated bytes
long maxDoc = (Long)info.getVariable(MAX_DOC_KEY);
long numDocs = (Long)info.getVariable(DOCS_SIZE_KEY);
long commitSize = (Long)info.getVariable(COMMIT_SIZE_KEY, 0L);
if (commitSize <= 0) {
commitSize = (Long)info.getVariable(TOTAL_BYTES_SIZE_KEY);
}
// calculate estimated size as a side-effect
commitSize = estimatedSize(maxDoc, numDocs, commitSize);
info.getVariables().put(BYTES_SIZE_KEY, commitSize);
if ((Long)info.getVariable(BYTES_SIZE_KEY) > aboveBytes ||
(Long)info.getVariable(DOCS_SIZE_KEY) > aboveDocs) {
if (waitForElapsed(coreName, now, lastAboveEventMap)) {
List<ReplicaInfo> infos = aboveSize.computeIfAbsent(info.getCollection(), c -> new ArrayList<>());
if (!infos.contains(info)) {
if ((Long)info.getVariable(BYTES_SIZE_KEY) > aboveBytes) {
info.getVariables().put(VIOLATION_KEY, ABOVE_BYTES_PROP);
} else {
info.getVariables().put(VIOLATION_KEY, ABOVE_DOCS_PROP);
}
infos.add(info);
splittable.add(info.getName());
}
}
} else {
// no violation - clear waitForElapsed
lastAboveEventMap.remove(coreName);
}
});
// collection / list(info)
Map<String, List<ReplicaInfo>> belowSize = new HashMap<>();
currentSizes.forEach((coreName, info) -> {
if (((Long)info.getVariable(BYTES_SIZE_KEY) < belowBytes ||
(Long)info.getVariable(DOCS_SIZE_KEY) < belowDocs) &&
// make sure we don't produce conflicting ops
!splittable.contains(info.getName())) {
if (waitForElapsed(coreName, now, lastBelowEventMap)) {
List<ReplicaInfo> infos = belowSize.computeIfAbsent(info.getCollection(), c -> new ArrayList<>());
if (!infos.contains(info)) {
if ((Long)info.getVariable(BYTES_SIZE_KEY) < belowBytes) {
info.getVariables().put(VIOLATION_KEY, BELOW_BYTES_PROP);
} else {
info.getVariables().put(VIOLATION_KEY, BELOW_DOCS_PROP);
}
infos.add(info);
}
}
} else {
// no violation - clear waitForElapsed
lastBelowEventMap.remove(coreName);
}
});
if (aboveSize.isEmpty() && belowSize.isEmpty()) {
log.trace("NO VIOLATIONS: Now={}", now);
log.trace("lastAbove={}", lastAboveEventMap);
log.trace("lastBelow={}", lastBelowEventMap);
return;
}
// find the earliest time when a condition was exceeded
final AtomicLong eventTime = new AtomicLong(now);
// calculate ops
final List<TriggerEvent.Op> ops = new ArrayList<>();
aboveSize.forEach((coll, replicas) -> {
// sort by decreasing size to first split the largest ones
// XXX see the comment below about using DOCS_SIZE_PROP in lieu of BYTES_SIZE_PROP
replicas.sort((r1, r2) -> {
long delta = (Long) r1.getVariable(DOCS_SIZE_KEY) - (Long) r2.getVariable(DOCS_SIZE_KEY);
if (delta > 0) {
return -1;
} else if (delta < 0) {
return 1;
} else {
return 0;
}
});
replicas.forEach(r -> {
if (ops.size() >= maxOps) {
return;
}
TriggerEvent.Op op = new TriggerEvent.Op(aboveOp);
op.addHint(Suggester.Hint.COLL_SHARD, new Pair<>(coll, r.getShard()));
Map<String, Object> params = new HashMap<>();
params.put(SPLIT_METHOD_PROP, splitMethod.toLower());
if (splitFuzz > 0) {
params.put(SPLIT_FUZZ_PROP, splitFuzz);
}
params.put(SPLIT_BY_PREFIX, splitByPrefix);
op.addHint(Suggester.Hint.PARAMS, params);
ops.add(op);
Long time = lastAboveEventMap.get(r.getCore());
if (time != null && eventTime.get() > time) {
eventTime.set(time);
}
});
});
belowSize.forEach((coll, replicas) -> {
if (replicas.size() < 2) {
return;
}
if (ops.size() >= maxOps) {
return;
}
// sort by increasing size
replicas.sort((r1, r2) -> {
// XXX this is not quite correct - if BYTES_SIZE_PROP decided that replica got here
// then we should be sorting by BYTES_SIZE_PROP. However, since DOCS and BYTES are
// loosely correlated it's simpler to sort just by docs (which better reflects the "too small"
// condition than index size, due to possibly existing deleted docs that still occupy space)
long delta = (Long) r1.getVariable(DOCS_SIZE_KEY) - (Long) r2.getVariable(DOCS_SIZE_KEY);
if (delta > 0) {
return 1;
} else if (delta < 0) {
return -1;
} else {
return 0;
}
});
// TODO: MERGESHARDS is not implemented yet. For now take the top two smallest shards
// TODO: but in the future we probably need to get ones with adjacent ranges.
// TODO: generate as many MERGESHARDS as needed to consume all belowSize shards
TriggerEvent.Op op = new TriggerEvent.Op(belowOp);
op.addHint(Suggester.Hint.COLL_SHARD, new Pair(coll, replicas.get(0).getShard()));
op.addHint(Suggester.Hint.COLL_SHARD, new Pair(coll, replicas.get(1).getShard()));
ops.add(op);
Long time = lastBelowEventMap.get(replicas.get(0).getCore());
if (time != null && eventTime.get() > time) {
eventTime.set(time);
}
time = lastBelowEventMap.get(replicas.get(1).getCore());
if (time != null && eventTime.get() > time) {
eventTime.set(time);
}
});
if (ops.isEmpty()) {
return;
}
if (processor.process(new IndexSizeEvent(getName(), eventTime.get(), ops, aboveSize, belowSize))) {
// update last event times
aboveSize.forEach((coll, replicas) -> {
replicas.forEach(r -> lastAboveEventMap.put(r.getCore(), now));
});
belowSize.forEach((coll, replicas) -> {
if (replicas.size() < 2) {
return;
}
lastBelowEventMap.put(replicas.get(0).getCore(), now);
lastBelowEventMap.put(replicas.get(1).getCore(), now);
});
}
}
public static long estimatedSize(long maxDoc, long numDocs, long commitSize) {
if (maxDoc == 0) {
return 0;
}
if (maxDoc == numDocs) {
return commitSize;
}
return commitSize * numDocs / maxDoc;
}
private boolean waitForElapsed(String name, long now, Map<String, Long> lastEventMap) {
Long lastTime = lastEventMap.computeIfAbsent(name, s -> now);
long elapsed = TimeUnit.SECONDS.convert(now - lastTime, TimeUnit.NANOSECONDS);
log.trace("name={}, lastTime={}, elapsed={}", name, lastTime, elapsed);
if (TimeUnit.SECONDS.convert(now - lastTime, TimeUnit.NANOSECONDS) < getWaitForSecond()) {
return false;
}
return true;
}
public static class IndexSizeEvent extends TriggerEvent {
public IndexSizeEvent(String source, long eventTime, List<Op> ops, Map<String, List<ReplicaInfo>> aboveSize,
Map<String, List<ReplicaInfo>> belowSize) {
super(TriggerEventType.INDEXSIZE, source, eventTime, null);
properties.put(TriggerEvent.REQUESTED_OPS, ops);
// avoid passing very large amounts of data here - just use replica names
TreeMap<String, String> above = new TreeMap<>();
aboveSize.forEach((coll, replicas) ->
replicas.forEach(r -> above.put(r.getCore(), "docs=" + r.getVariable(DOCS_SIZE_KEY) + ", bytes=" + r.getVariable(BYTES_SIZE_KEY))));
properties.put(ABOVE_SIZE_KEY, above);
TreeMap<String, String> below = new TreeMap<>();
belowSize.forEach((coll, replicas) ->
replicas.forEach(r -> below.put(r.getCore(), "docs=" + r.getVariable(DOCS_SIZE_KEY) + ", bytes=" + r.getVariable(BYTES_SIZE_KEY))));
properties.put(BELOW_SIZE_KEY, below);
}
}
}