blob: c404f0719b13053f6c9b739b206c1b5d891e9e72 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.solr.cloud.autoscaling;
import java.io.IOException;
import java.lang.invoke.MethodHandles;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Locale;
import java.util.Map;
import java.util.NoSuchElementException;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import org.apache.solr.client.solrj.cloud.SolrCloudManager;
import org.apache.solr.client.solrj.cloud.autoscaling.TriggerEventType;
import org.apache.solr.common.SolrException;
import org.apache.solr.common.cloud.Replica;
import org.apache.solr.common.cloud.ZkStateReader;
import org.apache.solr.common.params.CollectionParams;
import org.apache.solr.common.util.Utils;
import org.apache.solr.core.SolrResourceLoader;
import org.apache.zookeeper.KeeperException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import static org.apache.solr.cloud.autoscaling.OverseerTriggerThread.MARKER_ACTIVE;
import static org.apache.solr.cloud.autoscaling.OverseerTriggerThread.MARKER_INACTIVE;
import static org.apache.solr.cloud.autoscaling.OverseerTriggerThread.MARKER_STATE;
import static org.apache.solr.common.params.AutoScalingParams.PREFERRED_OP;
import static org.apache.solr.common.params.AutoScalingParams.REPLICA_TYPE;
/**
* Trigger for the {@link TriggerEventType#NODEADDED} event
*
* @deprecated to be removed in Solr 9.0 (see SOLR-14656)
*/
public class NodeAddedTrigger extends TriggerBase {
private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
private Set<String> lastLiveNodes = new HashSet<>();
private Map<String, Long> nodeNameVsTimeAdded = new HashMap<>();
private String preferredOp;
private Replica.Type replicaType = Replica.Type.NRT;
public NodeAddedTrigger(String name) {
super(TriggerEventType.NODEADDED, name);
TriggerUtils.validProperties(validProperties, PREFERRED_OP, REPLICA_TYPE);
}
@Override
public void init() throws Exception {
super.init();
lastLiveNodes = new HashSet<>(cloudManager.getClusterStateProvider().getLiveNodes());
log.debug("NodeAddedTrigger {} - Initial livenodes: {}", name, lastLiveNodes);
log.debug("NodeAddedTrigger {} instantiated with properties: {}", name, properties);
// pick up added nodes for which marker paths were created
try {
List<String> added = stateManager.listData(ZkStateReader.SOLR_AUTOSCALING_NODE_ADDED_PATH);
added.forEach(n -> {
String markerPath = ZkStateReader.SOLR_AUTOSCALING_NODE_ADDED_PATH + "/" + n;
try {
Map<String, Object> markerData = Utils.getJson(stateManager, markerPath);
// skip inactive markers
if (markerData.getOrDefault(MARKER_STATE, MARKER_ACTIVE).equals(MARKER_INACTIVE)) {
return;
}
} catch (InterruptedException | IOException | KeeperException e) {
log.debug("-- ignoring marker {} state due to error{}", markerPath, e);
}
// don't add nodes that have since gone away
if (lastLiveNodes.contains(n) && !nodeNameVsTimeAdded.containsKey(n)) {
// since {@code #restoreState(AutoScaling.Trigger)} is called first, the timeAdded for a node may also be restored
log.debug("Adding node from marker path: {}", n);
nodeNameVsTimeAdded.put(n, cloudManager.getTimeSource().getTimeNs());
}
});
} catch (NoSuchElementException e) {
// ignore
} catch (Exception e) {
log.warn("Exception retrieving nodeLost markers", e);
}
}
@Override
public void configure(SolrResourceLoader loader, SolrCloudManager cloudManager, Map<String, Object> properties) throws TriggerValidationException {
super.configure(loader, cloudManager, properties);
preferredOp = (String) properties.getOrDefault(PREFERRED_OP, CollectionParams.CollectionAction.MOVEREPLICA.toLower());
preferredOp = preferredOp.toLowerCase(Locale.ROOT);
String replicaTypeStr = (String) properties.getOrDefault(REPLICA_TYPE, Replica.Type.NRT.name());
// verify
try {
replicaType = Replica.Type.valueOf(replicaTypeStr);
} catch (IllegalArgumentException | NullPointerException e) {
throw new TriggerValidationException("Unsupported replicaType=" + replicaTypeStr + " specified for node added trigger");
}
CollectionParams.CollectionAction action = CollectionParams.CollectionAction.get(preferredOp);
switch (action) {
case ADDREPLICA:
case MOVEREPLICA:
case NONE:
break;
default:
throw new TriggerValidationException("Unsupported preferredOperation=" + preferredOp + " specified for node added trigger");
}
}
@Override
public void restoreState(AutoScaling.Trigger old) {
assert old.isClosed();
if (old instanceof NodeAddedTrigger) {
NodeAddedTrigger that = (NodeAddedTrigger) old;
assert this.name.equals(that.name);
this.lastLiveNodes.clear();
this.lastLiveNodes.addAll(that.lastLiveNodes);
this.nodeNameVsTimeAdded.clear();
this.nodeNameVsTimeAdded.putAll(that.nodeNameVsTimeAdded);
} else {
throw new SolrException(SolrException.ErrorCode.INVALID_STATE,
"Unable to restore state from an unknown type of trigger");
}
}
@Override
protected Map<String, Object> getState() {
Map<String,Object> state = new HashMap<>();
state.put("lastLiveNodes", lastLiveNodes);
state.put("nodeNameVsTimeAdded", nodeNameVsTimeAdded);
return state;
}
@Override
protected void setState(Map<String, Object> state) {
this.lastLiveNodes.clear();
this.nodeNameVsTimeAdded.clear();
@SuppressWarnings({"unchecked"})
Collection<String> lastLiveNodes = (Collection<String>)state.get("lastLiveNodes");
if (lastLiveNodes != null) {
this.lastLiveNodes.addAll(lastLiveNodes);
}
@SuppressWarnings({"unchecked"})
Map<String,Long> nodeNameVsTimeAdded = (Map<String,Long>)state.get("nodeNameVsTimeAdded");
if (nodeNameVsTimeAdded != null) {
this.nodeNameVsTimeAdded.putAll(nodeNameVsTimeAdded);
}
}
@Override
public void run() {
try {
synchronized (this) {
if (isClosed) {
log.warn("NodeAddedTrigger ran but was already closed");
throw new RuntimeException("Trigger has been closed");
}
}
log.debug("Running NodeAddedTrigger {}", name);
Set<String> newLiveNodes = new HashSet<>(cloudManager.getClusterStateProvider().getLiveNodes());
if (log.isDebugEnabled()) {
log.debug("Found livenodes: {}", newLiveNodes.size());
}
// have any nodes that we were tracking been removed from the cluster?
// if so, remove them from the tracking map
Set<String> trackingKeySet = nodeNameVsTimeAdded.keySet();
trackingKeySet.retainAll(newLiveNodes);
// have any new nodes been added?
Set<String> copyOfNew = new HashSet<>(newLiveNodes);
copyOfNew.removeAll(lastLiveNodes);
copyOfNew.forEach(n -> {
long eventTime = cloudManager.getTimeSource().getTimeNs();
log.debug("Tracking new node: {} at time {}", n, eventTime);
nodeNameVsTimeAdded.put(n, eventTime);
});
// has enough time expired to trigger events for a node?
List<String> nodeNames = new ArrayList<>();
List<Long> times = new ArrayList<>();
for (Iterator<Map.Entry<String, Long>> it = nodeNameVsTimeAdded.entrySet().iterator(); it.hasNext(); ) {
Map.Entry<String, Long> entry = it.next();
String nodeName = entry.getKey();
Long timeAdded = entry.getValue();
long now = cloudManager.getTimeSource().getTimeNs();
if (TimeUnit.SECONDS.convert(now - timeAdded, TimeUnit.NANOSECONDS) >= getWaitForSecond()) {
nodeNames.add(nodeName);
times.add(timeAdded);
}
}
AutoScaling.TriggerEventProcessor processor = processorRef.get();
if (!nodeNames.isEmpty()) {
if (processor != null) {
if (log.isDebugEnabled()) {
log.debug("NodeAddedTrigger {} firing registered processor for nodes: {} added at times {}, now={}", name,
nodeNames, times, cloudManager.getTimeSource().getTimeNs());
}
if (processor.process(new NodeAddedEvent(getEventType(), getName(), times, nodeNames, preferredOp, replicaType))) {
// remove from tracking set only if the fire was accepted
nodeNames.forEach(n -> {
log.debug("Removing new node from tracking: {}", n);
nodeNameVsTimeAdded.remove(n);
});
} else {
log.debug("Processor returned false for {}!", nodeNames);
}
} else {
nodeNames.forEach(n -> {
nodeNameVsTimeAdded.remove(n);
});
}
}
lastLiveNodes = new HashSet<>(newLiveNodes);
} catch (RuntimeException e) {
log.error("Unexpected exception in NodeAddedTrigger", e);
}
}
public static class NodeAddedEvent extends TriggerEvent {
public NodeAddedEvent(TriggerEventType eventType, String source, List<Long> times, List<String> nodeNames, String preferredOp, Replica.Type replicaType) {
// use the oldest time as the time of the event
super(eventType, source, times.get(0), null);
properties.put(NODE_NAMES, nodeNames);
properties.put(EVENT_TIMES, times);
properties.put(PREFERRED_OP, preferredOp);
properties.put(REPLICA_TYPE, replicaType);
}
}
}