blob: b76ddb04f8069f14135393e2f4340e52349d8398 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.solr.cloud.autoscaling;
import org.apache.solr.client.solrj.SolrRequest;
import org.apache.solr.client.solrj.cloud.SolrCloudManager;
import org.apache.solr.client.solrj.cloud.autoscaling.*;
import org.apache.solr.common.SolrException;
import org.apache.solr.common.cloud.ClusterState;
import org.apache.solr.common.cloud.DocCollection;
import org.apache.solr.common.cloud.Replica;
import org.apache.solr.common.params.AutoScalingParams;
import org.apache.solr.common.params.CollectionParams;
import org.apache.solr.common.util.Pair;
import org.apache.solr.common.util.StrUtils;
import org.apache.solr.core.SolrResourceLoader;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.lang.invoke.MethodHandles;
import java.util.*;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Predicate;
import java.util.stream.Collectors;
import static org.apache.solr.cloud.autoscaling.TriggerEvent.NODE_NAMES;
/**
* This class is responsible for using the configured policy and preferences
* with the hints provided by the trigger event to compute the required cluster operations.
* <p>
* The cluster operations computed here are put into the {@link ActionContext}'s properties
* with the key name "operations". The value is a List of SolrRequest objects.
*
* @deprecated to be removed in Solr 9.0 (see SOLR-14656)
*/
public class ComputePlanAction extends TriggerActionBase {
private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
public static final String DIAGNOSTICS = "__compute_diag__";
// accept all collections by default
Predicate<String> collectionsPredicate = s -> true;
public ComputePlanAction() {
super();
TriggerUtils.validProperties(validProperties, "collections");
}
@Override
public void configure(SolrResourceLoader loader, SolrCloudManager cloudManager, Map<String, Object> properties) throws TriggerValidationException {
super.configure(loader, cloudManager, properties);
Object value = properties.get("collections");
if (value instanceof String) {
String colString = (String) value;
if (!colString.isEmpty()) {
List<String> whiteListedCollections = StrUtils.splitSmart(colString, ',');
collectionsPredicate = whiteListedCollections::contains;
}
} else if (value instanceof Map) {
@SuppressWarnings({"unchecked"})
Map<String, String> matchConditions = (Map<String, String>) value;
collectionsPredicate = collectionName -> {
try {
DocCollection collection = cloudManager.getClusterStateProvider().getCollection(collectionName);
if (collection == null) {
log.debug("Collection: {} was not found while evaluating conditions", collectionName);
return false;
}
for (Map.Entry<String, String> entry : matchConditions.entrySet()) {
if (!entry.getValue().equals(collection.get(entry.getKey()))) {
if (log.isDebugEnabled()) {
log.debug("Collection: {} does not match condition: {}:{}", collectionName, entry.getKey(), entry.getValue());
}
return false;
}
}
return true;
} catch (IOException e) {
log.error("Exception fetching collection information for: {}", collectionName, e);
return false;
}
};
}
}
@Override
public void process(TriggerEvent event, ActionContext context) throws Exception {
if (log.isDebugEnabled()) {
log.debug("-- processing event: {} with context properties: {}", event, context.getProperties());
}
SolrCloudManager cloudManager = context.getCloudManager();
try {
AutoScalingConfig autoScalingConf = cloudManager.getDistribStateManager().getAutoScalingConfig();
if (autoScalingConf.isEmpty()) {
throw new Exception("Action: " + getName() + " executed but no policy is configured");
}
PolicyHelper.SessionWrapper sessionWrapper = PolicyHelper.getSession(cloudManager);
Policy.Session session = sessionWrapper.get();
ClusterState clusterState = cloudManager.getClusterStateProvider().getClusterState();
if (log.isTraceEnabled()) {
log.trace("-- session: {}", session);
log.trace("-- state: {}", clusterState);
}
try {
Suggester suggester = getSuggester(session, event, context, cloudManager);
int maxOperations = getMaxNumOps(event, autoScalingConf, clusterState);
int requestedOperations = getRequestedNumOps(event);
if (requestedOperations > maxOperations) {
log.warn("Requested number of operations {} higher than maximum {}, adjusting...",
requestedOperations, maxOperations);
}
int opCount = 0;
int opLimit = maxOperations;
if (requestedOperations > 0) {
log.debug("-- adjusting limit due to explicitly requested number of ops={}", requestedOperations);
opLimit = requestedOperations;
}
addDiagnostics(event, "maxOperations", maxOperations);
addDiagnostics(event, "requestedOperations", requestedOperations);
addDiagnostics(event, "opLimit", opLimit);
do {
// computing changes in large clusters may take a long time
if (Thread.currentThread().isInterrupted()) {
throw new InterruptedException("stopping - thread was interrupted");
}
@SuppressWarnings({"rawtypes"})
SolrRequest operation = suggester.getSuggestion();
opCount++;
// prepare suggester for the next iteration
if (suggester.getSession() != null) {
session = suggester.getSession();
}
suggester = getSuggester(session, event, context, cloudManager);
// break on first null op
// unless a specific number of ops was requested
// uncomment the following to log too many operations
/*if (opCount > 10) {
PolicyHelper.logState(cloudManager, initialSuggester);
}*/
if (operation == null) {
if (requestedOperations < 0) {
//uncomment the following to log zero operations
// PolicyHelper.logState(cloudManager, initialSuggester);
log.debug("-- no more operations suggested, stopping after {} ops...", (opCount - 1));
addDiagnostics(event, "noSuggestionsStopAfter", (opCount - 1));
break;
} else {
log.info("Computed plan empty, remained {} requested ops to try.", opCount - opLimit);
continue;
}
}
if (log.isDebugEnabled()) {
log.debug("Computed Plan: {}", operation.getParams());
}
Map<String, Object> props = context.getProperties();
props.compute("operations", (k, v) -> {
@SuppressWarnings({"unchecked", "rawtypes"})
List<SolrRequest> operations = (List<SolrRequest>) v;
if (operations == null) operations = new ArrayList<>();
operations.add(operation);
return operations;
});
if (opCount >= opLimit) {
log.debug("-- reached limit of maxOps={}, stopping.", opLimit);
addDiagnostics(event, "opLimitReached", true);
}
} while (opCount < opLimit);
} finally {
releasePolicySession(sessionWrapper, session);
}
} catch (Exception e) {
throw new SolrException(SolrException.ErrorCode.SERVER_ERROR,
"Unexpected exception while processing event: " + event, e);
}
}
private void releasePolicySession(PolicyHelper.SessionWrapper sessionWrapper, Policy.Session session) {
sessionWrapper.returnSession(session);
sessionWrapper.release();
}
private void addDiagnostics(TriggerEvent event, String key, Object value) {
if (log.isDebugEnabled()) {
Map<String, Object> diag = (Map<String, Object>) event.getProperties()
.computeIfAbsent(DIAGNOSTICS, n -> new HashMap<>());
diag.put(key, value);
}
}
protected int getMaxNumOps(TriggerEvent event, AutoScalingConfig autoScalingConfig, ClusterState clusterState) {
// estimate a maximum default limit that should be sufficient for most purposes:
// number of nodes * total number of replicas * 3
AtomicInteger totalRF = new AtomicInteger();
clusterState.forEachCollection(coll -> {
Integer rf = coll.getReplicationFactor();
if (rf == null) {
if (coll.getSlices().isEmpty()) {
rf = 1; // ???
} else {
rf = coll.getReplicas().size() / coll.getSlices().size();
}
}
totalRF.addAndGet(rf * coll.getSlices().size());
});
int totalMax = clusterState.getLiveNodes().size() * totalRF.get() * 3;
addDiagnostics(event, "estimatedMaxOps", totalMax);
int maxOp = ((Number) autoScalingConfig.getProperties().getOrDefault(AutoScalingParams.MAX_COMPUTE_OPERATIONS, totalMax)).intValue();
Object o = event.getProperty(AutoScalingParams.MAX_COMPUTE_OPERATIONS, maxOp);
if (o != null) {
try {
maxOp = Integer.parseInt(String.valueOf(o));
} catch (Exception e) {
log.warn("Invalid '{}' event property: {}, using default {}", AutoScalingParams.MAX_COMPUTE_OPERATIONS, o, maxOp);
}
}
if (maxOp < 0) {
// unlimited
maxOp = Integer.MAX_VALUE;
} else if (maxOp < 1) {
// try at least one operation
log.debug("-- estimated maxOp={}, resetting to 1...", maxOp);
maxOp = 1;
}
log.debug("-- estimated total max ops={}, effective maxOps={}", totalMax, maxOp);
return maxOp;
}
protected int getRequestedNumOps(TriggerEvent event) {
@SuppressWarnings({"unchecked"})
Collection<TriggerEvent.Op> ops = (Collection<TriggerEvent.Op>) event.getProperty(TriggerEvent.REQUESTED_OPS, Collections.emptyList());
if (ops.isEmpty()) {
return -1;
} else {
return ops.size();
}
}
private static final String START = "__start__";
protected Suggester getSuggester(Policy.Session session, TriggerEvent event, ActionContext context, SolrCloudManager cloudManager) throws IOException {
Suggester suggester;
switch (event.getEventType()) {
case NODEADDED:
suggester = getNodeAddedSuggester(cloudManager, session, event);
break;
case NODELOST:
suggester = getNodeLostSuggester(cloudManager, session, event);
break;
case SEARCHRATE:
case METRIC:
case INDEXSIZE:
@SuppressWarnings({"unchecked"})
List<TriggerEvent.Op> ops = (List<TriggerEvent.Op>)event.getProperty(TriggerEvent.REQUESTED_OPS, Collections.emptyList());
int start = (Integer)event.getProperty(START, 0);
if (ops.isEmpty() || start >= ops.size()) {
return NoneSuggester.get(session);
}
TriggerEvent.Op op = ops.get(start);
suggester = session.getSuggester(op.getAction());
if (suggester instanceof UnsupportedSuggester) {
@SuppressWarnings({"unchecked"})
List<TriggerEvent.Op> unsupportedOps = (List<TriggerEvent.Op>)context.getProperties().computeIfAbsent(TriggerEvent.UNSUPPORTED_OPS, k -> new ArrayList<TriggerEvent.Op>());
unsupportedOps.add(op);
}
for (Map.Entry<Suggester.Hint, Object> e : op.getHints().entrySet()) {
suggester = suggester.hint(e.getKey(), e.getValue());
}
if (applyCollectionHints(cloudManager, suggester) == 0) return NoneSuggester.get(session);
suggester = suggester.forceOperation(true);
event.getProperties().put(START, ++start);
break;
case SCHEDULED:
String preferredOp = (String) event.getProperty(AutoScalingParams.PREFERRED_OP, CollectionParams.CollectionAction.MOVEREPLICA.toLower());
CollectionParams.CollectionAction action = CollectionParams.CollectionAction.get(preferredOp);
suggester = session.getSuggester(action);
if (applyCollectionHints(cloudManager, suggester) == 0) return NoneSuggester.get(session);
break;
default:
throw new UnsupportedOperationException("No support for events other than nodeAdded, nodeLost, searchRate, metric, scheduled and indexSize. Received: " + event.getEventType());
}
return suggester;
}
private Suggester getNodeLostSuggester(SolrCloudManager cloudManager, Policy.Session session, TriggerEvent event) throws IOException {
String preferredOp = (String) event.getProperty(AutoScalingParams.PREFERRED_OP, CollectionParams.CollectionAction.MOVEREPLICA.toLower());
CollectionParams.CollectionAction action = CollectionParams.CollectionAction.get(preferredOp);
switch (action) {
case MOVEREPLICA:
Suggester s = session.getSuggester(action)
.hint(Suggester.Hint.SRC_NODE, event.getProperty(NODE_NAMES));
if (applyCollectionHints(cloudManager, s) == 0) {
addDiagnostics(event, "noRelevantCollections", true);
return NoneSuggester.get(session);
}
return s;
case DELETENODE:
int start = (Integer)event.getProperty(START, 0);
@SuppressWarnings({"unchecked"})
List<String> srcNodes = (List<String>) event.getProperty(NODE_NAMES);
if (srcNodes.isEmpty() || start >= srcNodes.size()) {
addDiagnostics(event, "noSourceNodes", true);
return NoneSuggester.get(session);
}
String sourceNode = srcNodes.get(start);
s = session.getSuggester(action)
.hint(Suggester.Hint.SRC_NODE, event.getProperty(NODE_NAMES));
if (applyCollectionHints(cloudManager, s) == 0) {
log.debug("-- no relevant collections on {}, no operations computed.", srcNodes);
addDiagnostics(event, "noRelevantCollections", true);
return NoneSuggester.get(session);
}
s.hint(Suggester.Hint.SRC_NODE, Collections.singletonList(sourceNode));
event.getProperties().put(START, ++start);
return s;
case NONE:
return NoneSuggester.get(session);
default:
throw new SolrException(SolrException.ErrorCode.SERVER_ERROR, "Unsupported preferredOperation: " + action.toLower() + " specified for node lost trigger");
}
}
/**
* Applies collection hints for all collections that match the {@link #collectionsPredicate}
* and returns the number of collections that matched.
* @return number of collections that match the {@link #collectionsPredicate}
* @throws IOException if {@link org.apache.solr.client.solrj.impl.ClusterStateProvider} throws IOException
*/
private int applyCollectionHints(SolrCloudManager cloudManager, Suggester s) throws IOException {
ClusterState clusterState = cloudManager.getClusterStateProvider().getClusterState();
Set<String> set = clusterState.getCollectionStates().keySet().stream()
.filter(collectionRef -> collectionsPredicate.test(collectionRef))
.collect(Collectors.toSet());
if (set.size() < clusterState.getCollectionStates().size()) {
// apply hints only if a subset of collections are selected
set.forEach(c -> s.hint(Suggester.Hint.COLL, c));
}
return set.size();
}
private Suggester getNodeAddedSuggester(SolrCloudManager cloudManager, Policy.Session session, TriggerEvent event) throws IOException {
String preferredOp = (String) event.getProperty(AutoScalingParams.PREFERRED_OP, CollectionParams.CollectionAction.MOVEREPLICA.toLower());
Replica.Type replicaType = (Replica.Type) event.getProperty(AutoScalingParams.REPLICA_TYPE, Replica.Type.NRT);
CollectionParams.CollectionAction action = CollectionParams.CollectionAction.get(preferredOp);
Suggester suggester = session.getSuggester(action)
.hint(Suggester.Hint.TARGET_NODE, event.getProperty(NODE_NAMES));
switch (action) {
case ADDREPLICA:
// add all collection/shard pairs and let policy engine figure out which one
// to place on the target node
ClusterState clusterState = cloudManager.getClusterStateProvider().getClusterState();
Set<Pair<String, String>> collShards = new HashSet<>();
clusterState.getCollectionStates().entrySet().stream()
.filter(e -> collectionsPredicate.test(e.getKey()))
.forEach(entry -> {
DocCollection docCollection = entry.getValue().get();
if (docCollection != null) {
docCollection.getActiveSlices().stream()
.map(slice -> new Pair<>(entry.getKey(), slice.getName()))
.forEach(collShards::add);
}
});
log.debug("-- NODE_ADDED: ADDREPLICA suggester configured with {} collection/shard hints.", collShards.size());
addDiagnostics(event, "relevantCollShard", collShards);
suggester.hint(Suggester.Hint.COLL_SHARD, collShards);
suggester.hint(Suggester.Hint.REPLICATYPE, replicaType);
break;
case MOVEREPLICA:
log.debug("-- NODE_ADDED event specified MOVEREPLICA - no hints added.");
break;
case NONE:
log.debug("-- NODE_ADDED event specified NONE - no operations suggested.");
break;
default:
throw new SolrException(SolrException.ErrorCode.SERVER_ERROR,
"Unsupported preferredOperation=" + preferredOp + " for node added event");
}
return suggester;
}
}