blob: 8c34c8b604cfc36d8f9da35657171846d9b1e8fd [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.solr.cloud.autoscaling;
import java.lang.invoke.MethodHandles;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import java.util.stream.Collectors;
import org.apache.solr.client.solrj.cloud.autoscaling.Policy;
import org.apache.solr.client.solrj.cloud.SolrCloudManager;
import org.apache.solr.client.solrj.cloud.autoscaling.Suggester;
import org.apache.solr.client.solrj.cloud.autoscaling.TriggerEventType;
import org.apache.solr.common.SolrException;
import org.apache.solr.common.cloud.ClusterState;
import org.apache.solr.common.cloud.DocCollection;
import org.apache.solr.common.cloud.Slice;
import org.apache.solr.common.params.AutoScalingParams;
import org.apache.solr.common.params.CollectionParams;
import org.apache.solr.common.util.Pair;
import org.apache.solr.core.SolrResourceLoader;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import static org.apache.solr.common.params.AutoScalingParams.ABOVE;
import static org.apache.solr.common.params.AutoScalingParams.BELOW;
import static org.apache.solr.common.params.AutoScalingParams.METRIC;
import static org.apache.solr.common.params.AutoScalingParams.PREFERRED_OP;
/**
*
* @deprecated to be removed in Solr 9.0 (see SOLR-14656)
*/
public class MetricTrigger extends TriggerBase {
private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
private String metric;
private Number above, below;
private String collection, shard, node, preferredOp;
private final Map<String, Long> lastNodeEvent = new ConcurrentHashMap<>();
public MetricTrigger(String name) {
super(TriggerEventType.METRIC, name);
TriggerUtils.requiredProperties(requiredProperties, validProperties, METRIC);
TriggerUtils.validProperties(validProperties, ABOVE, BELOW, PREFERRED_OP,
AutoScalingParams.COLLECTION,
AutoScalingParams.SHARD,
AutoScalingParams.NODE);
}
@Override
public void configure(SolrResourceLoader loader, SolrCloudManager cloudManager, Map<String, Object> properties) throws TriggerValidationException {
super.configure(loader, cloudManager, properties);
this.metric = (String) properties.get(METRIC);
this.above = (Number) properties.get(ABOVE);
this.below = (Number) properties.get(BELOW);
this.collection = (String) properties.getOrDefault(AutoScalingParams.COLLECTION, Policy.ANY);
shard = (String) properties.getOrDefault(AutoScalingParams.SHARD, Policy.ANY);
if (collection.equals(Policy.ANY) && !shard.equals(Policy.ANY)) {
throw new TriggerValidationException("shard", "When 'shard' is other than #ANY then collection name must be also other than #ANY");
}
node = (String) properties.getOrDefault(AutoScalingParams.NODE, Policy.ANY);
preferredOp = (String) properties.getOrDefault(PREFERRED_OP, CollectionParams.CollectionAction.MOVEREPLICA.toLower());
}
@Override
protected Map<String, Object> getState() {
return null;
}
@Override
protected void setState(Map<String, Object> state) {
lastNodeEvent.clear();
@SuppressWarnings({"unchecked"})
Map<String, Long> nodeTimes = (Map<String, Long>) state.get("lastNodeEvent");
if (nodeTimes != null) {
lastNodeEvent.putAll(nodeTimes);
}
}
@Override
public void restoreState(AutoScaling.Trigger old) {
assert old.isClosed();
if (old instanceof MetricTrigger) {
MetricTrigger that = (MetricTrigger) old;
assert this.name.equals(that.name);
this.lastNodeEvent.clear();
this.lastNodeEvent.putAll(that.lastNodeEvent);
} else {
throw new SolrException(SolrException.ErrorCode.INVALID_STATE,
"Unable to restore state from an unknown type of trigger");
}
}
@Override
public void run() {
AutoScaling.TriggerEventProcessor processor = processorRef.get();
if (processor == null) {
return;
}
Set<String> liveNodes = null;
if (node.equals(Policy.ANY)) {
if (collection.equals(Policy.ANY)) {
liveNodes = cloudManager.getClusterStateProvider().getLiveNodes();
} else {
final Set<String> nodes = new HashSet<>();
ClusterState.CollectionRef ref = cloudManager.getClusterStateProvider().getState(collection);
DocCollection docCollection;
if (ref == null || (docCollection = ref.get()) == null) {
log.warn("MetricTrigger could not find collection: {}", collection);
return;
}
if (shard.equals(Policy.ANY)) {
docCollection.getReplicas().forEach(replica -> {
nodes.add(replica.getNodeName());
});
} else {
Slice slice = docCollection.getSlice(shard);
if (slice == null) {
log.warn("MetricTrigger could not find collection: {} shard: {}", collection, shard);
return;
}
slice.getReplicas().forEach(replica -> nodes.add(replica.getNodeName()));
}
liveNodes = nodes;
}
} else {
liveNodes = Collections.singleton(node);
}
Map<String, Number> rates = new HashMap<>(liveNodes.size());
for (String node : liveNodes) {
Map<String, Object> values = cloudManager.getNodeStateProvider().getNodeValues(node, Collections.singletonList(metric));
values.forEach((tag, rate) -> rates.computeIfAbsent(node, s -> (Number) rate));
}
long now = cloudManager.getTimeSource().getTimeNs();
// check for exceeded rates and filter out those with less than waitFor from previous events
Map<String, Number> hotNodes = rates.entrySet().stream()
.filter(entry -> waitForElapsed(entry.getKey(), now, lastNodeEvent))
.filter(entry -> (below != null && Double.compare(entry.getValue().doubleValue(), below.doubleValue()) < 0) || (above != null && Double.compare(entry.getValue().doubleValue(), above.doubleValue()) > 0))
.collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue));
if (hotNodes.isEmpty()) return;
final AtomicLong eventTime = new AtomicLong(now);
hotNodes.forEach((n, r) -> {
long time = lastNodeEvent.get(n);
if (eventTime.get() > time) {
eventTime.set(time);
}
});
if (processor.process(new MetricBreachedEvent(getName(), collection, shard, preferredOp, eventTime.get(), metric, hotNodes))) {
hotNodes.keySet().forEach(node -> lastNodeEvent.put(node, now));
}
}
private boolean waitForElapsed(String name, long now, Map<String, Long> lastEventMap) {
Long lastTime = lastEventMap.computeIfAbsent(name, s -> now);
long elapsed = TimeUnit.SECONDS.convert(now - lastTime, TimeUnit.NANOSECONDS);
log.trace("name={}, lastTime={}, elapsed={}", name, lastTime, elapsed);
if (TimeUnit.SECONDS.convert(now - lastTime, TimeUnit.NANOSECONDS) < getWaitForSecond()) {
return false;
}
return true;
}
public static class MetricBreachedEvent extends TriggerEvent {
public MetricBreachedEvent(String source, String collection, String shard, String preferredOp, long eventTime, String metric, Map<String, Number> hotNodes) {
super(TriggerEventType.METRIC, source, eventTime, null);
properties.put(METRIC, metric);
properties.put(AutoScalingParams.NODE, hotNodes);
if (!collection.equals(Policy.ANY)) {
properties.put(AutoScalingParams.COLLECTION, collection);
}
if (!shard.equals(Policy.ANY)) {
properties.put(AutoScalingParams.SHARD, shard);
}
properties.put(PREFERRED_OP, preferredOp);
// specify requested ops
List<Op> ops = new ArrayList<>(hotNodes.size());
for (String n : hotNodes.keySet()) {
Op op = new Op(CollectionParams.CollectionAction.get(preferredOp));
op.addHint(Suggester.Hint.SRC_NODE, n);
if (!collection.equals(Policy.ANY)) {
if (!shard.equals(Policy.ANY)) {
op.addHint(Suggester.Hint.COLL_SHARD, new Pair<>(collection, shard));
} else {
op.addHint(Suggester.Hint.COLL, collection);
}
}
ops.add(op);
}
properties.put(TriggerEvent.REQUESTED_OPS, ops);
}
}
}