blob: 33c969700643a41eaba425d3e2e1924ea45e2f40 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with this
* work for additional information regarding copyright ownership. The ASF
* licenses this file to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
* <p>
* http://www.apache.org/licenses/LICENSE-2.0
* <p>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations under
* the License.
*/
package org.apache.hadoop.hdds.scm.node;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
import org.apache.hadoop.hdds.conf.OzoneConfiguration;
import org.apache.hadoop.hdds.protocol.DatanodeDetails;
import org.apache.hadoop.hdds.protocol.proto.HddsProtos.NodeOperationalState;
import org.apache.hadoop.hdds.scm.DatanodeAdminError;
import org.apache.hadoop.hdds.scm.ScmConfigKeys;
import org.apache.hadoop.hdds.scm.container.ContainerManagerV2;
import org.apache.hadoop.hdds.scm.container.ReplicationManager;
import org.apache.hadoop.hdds.scm.ha.SCMContext;
import org.apache.hadoop.hdds.scm.node.states.NodeNotFoundException;
import org.apache.hadoop.hdds.server.events.EventPublisher;
import org.apache.hadoop.hdfs.DFSConfigKeys;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.net.InetAddress;
import java.net.URI;
import java.net.URISyntaxException;
import java.net.UnknownHostException;
import java.util.ArrayList;
import java.util.LinkedList;
import java.util.List;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
/**
* Class used to manage datanodes scheduled for maintenance or decommission.
*/
public class NodeDecommissionManager {
private ScheduledExecutorService executor;
private DatanodeAdminMonitor monitor;
private NodeManager nodeManager;
//private ContainerManagerV2 containerManager;
private SCMContext scmContext;
private EventPublisher eventQueue;
private ReplicationManager replicationManager;
private OzoneConfiguration conf;
private boolean useHostnames;
private long monitorInterval;
private static final Logger LOG =
LoggerFactory.getLogger(NodeDecommissionManager.class);
static class HostDefinition {
private String rawHostname;
private String hostname;
private int port;
HostDefinition(String hostname) throws InvalidHostStringException {
this.rawHostname = hostname;
parseHostname();
}
public String getRawHostname() {
return rawHostname;
}
public String getHostname() {
return hostname;
}
public int getPort() {
return port;
}
private void parseHostname() throws InvalidHostStringException{
try {
// A URI *must* have a scheme, so just create a fake one
URI uri = new URI("empty://"+rawHostname.trim());
this.hostname = uri.getHost();
this.port = uri.getPort();
if (this.hostname == null) {
throw new InvalidHostStringException("The string "+rawHostname+
" does not contain a value hostname or hostname:port definition");
}
} catch (URISyntaxException e) {
throw new InvalidHostStringException(
"Unable to parse the hoststring "+rawHostname, e);
}
}
}
private List<DatanodeDetails> mapHostnamesToDatanodes(List<String> hosts)
throws InvalidHostStringException {
List<DatanodeDetails> results = new LinkedList<>();
for (String hostString : hosts) {
HostDefinition host = new HostDefinition(hostString);
InetAddress addr;
try {
addr = InetAddress.getByName(host.getHostname());
} catch (UnknownHostException e) {
throw new InvalidHostStringException("Unable to resolve host "
+ host.getRawHostname(), e);
}
String dnsName;
if (useHostnames) {
dnsName = addr.getHostName();
} else {
dnsName = addr.getHostAddress();
}
List<DatanodeDetails> found = nodeManager.getNodesByAddress(dnsName);
if (found.size() == 0) {
throw new InvalidHostStringException("Host " + host.getRawHostname()
+ " (" + dnsName + ") is not running any datanodes registered"
+ " with SCM."
+ " Please check the host name.");
} else if (found.size() == 1) {
if (host.getPort() != -1 &&
!validateDNPortMatch(host.getPort(), found.get(0))) {
throw new InvalidHostStringException("Host " + host.getRawHostname()
+ " is running a datanode registered with SCM,"
+ " but the port number doesn't match."
+ " Please check the port number.");
}
results.add(found.get(0));
} else if (found.size() > 1) {
DatanodeDetails match = null;
for(DatanodeDetails dn : found) {
if (validateDNPortMatch(host.getPort(), dn)) {
match = dn;
break;
}
}
if (match == null) {
throw new InvalidHostStringException("Host " + host.getRawHostname()
+ " is running multiple datanodes registered with SCM,"
+ " but no port numbers match."
+ " Please check the port number.");
}
results.add(match);
}
}
return results;
}
/**
* Check if the passed port is used by the given DatanodeDetails object. If
* it is, return true, otherwise return false.
* @param port Port number to check if it is used by the datanode
* @param dn Datanode to check if it is using the given port
* @return True if port is used by the datanode. False otherwise.
*/
private boolean validateDNPortMatch(int port, DatanodeDetails dn) {
for (DatanodeDetails.Port p : dn.getPorts()) {
if (p.getValue() == port) {
return true;
}
}
return false;
}
public NodeDecommissionManager(OzoneConfiguration config, NodeManager nm,
ContainerManagerV2 containerManager, SCMContext scmContext,
EventPublisher eventQueue, ReplicationManager rm) {
this.nodeManager = nm;
conf = config;
//this.containerManager = containerManager;
this.scmContext = scmContext;
this.eventQueue = eventQueue;
this.replicationManager = rm;
executor = Executors.newScheduledThreadPool(1,
new ThreadFactoryBuilder().setNameFormat("DatanodeAdminManager-%d")
.setDaemon(true).build());
useHostnames = conf.getBoolean(
DFSConfigKeys.DFS_DATANODE_USE_DN_HOSTNAME,
DFSConfigKeys.DFS_DATANODE_USE_DN_HOSTNAME_DEFAULT);
monitorInterval = conf.getTimeDuration(
ScmConfigKeys.OZONE_SCM_DATANODE_ADMIN_MONITOR_INTERVAL,
ScmConfigKeys.OZONE_SCM_DATANODE_ADMIN_MONITOR_INTERVAL_DEFAULT,
TimeUnit.SECONDS);
if (monitorInterval <= 0) {
LOG.warn("{} must be greater than zero, defaulting to {}",
ScmConfigKeys.OZONE_SCM_DATANODE_ADMIN_MONITOR_INTERVAL,
ScmConfigKeys.OZONE_SCM_DATANODE_ADMIN_MONITOR_INTERVAL_DEFAULT);
conf.set(ScmConfigKeys.OZONE_SCM_DATANODE_ADMIN_MONITOR_INTERVAL,
ScmConfigKeys.OZONE_SCM_DATANODE_ADMIN_MONITOR_INTERVAL_DEFAULT);
monitorInterval = conf.getTimeDuration(
ScmConfigKeys.OZONE_SCM_DATANODE_ADMIN_MONITOR_INTERVAL,
ScmConfigKeys.OZONE_SCM_DATANODE_ADMIN_MONITOR_INTERVAL_DEFAULT,
TimeUnit.SECONDS);
}
monitor = new DatanodeAdminMonitorImpl(conf, eventQueue, nodeManager,
replicationManager);
executor.scheduleAtFixedRate(monitor, monitorInterval, monitorInterval,
TimeUnit.SECONDS);
}
@VisibleForTesting
public DatanodeAdminMonitor getMonitor() {
return monitor;
}
public synchronized List<DatanodeAdminError> decommissionNodes(
List<String> nodes) throws InvalidHostStringException {
List<DatanodeDetails> dns = mapHostnamesToDatanodes(nodes);
List<DatanodeAdminError> errors = new ArrayList<>();
for (DatanodeDetails dn : dns) {
try {
startDecommission(dn);
} catch (NodeNotFoundException e) {
// We already validated the host strings and retrieved the DnDetails
// object from the node manager. Therefore we should never get a
// NodeNotFoundException here expect if the node is remove in the
// very short window between validation and starting decom. Therefore
// log a warning and ignore the exception
LOG.warn("The host {} was not found in SCM. Ignoring the request to "+
"decommission it", dn.getHostName());
errors.add(new DatanodeAdminError(dn.getHostName(),
"The host was not found in SCM"));
} catch (InvalidNodeStateException e) {
errors.add(new DatanodeAdminError(dn.getHostName(), e.getMessage()));
}
}
return errors;
}
/**
* If a SCM is restarted, then upon re-registration the datanode will already
* be in DECOMMISSIONING or ENTERING_MAINTENANCE state. In that case, it
* needs to be added back into the monitor to track its progress.
* @param dn Datanode to add back to tracking.
* @throws NodeNotFoundException
*/
public synchronized void continueAdminForNode(DatanodeDetails dn)
throws NodeNotFoundException {
if (!scmContext.isLeader()) {
LOG.info("follower SCM ignored continue admin for datanode {}", dn);
return;
}
NodeOperationalState opState = getNodeStatus(dn).getOperationalState();
if (opState == NodeOperationalState.DECOMMISSIONING
|| opState == NodeOperationalState.ENTERING_MAINTENANCE
|| opState == NodeOperationalState.IN_MAINTENANCE) {
LOG.info("Continue admin for datanode {}", dn);
monitor.startMonitoring(dn);
}
}
public synchronized void startDecommission(DatanodeDetails dn)
throws NodeNotFoundException, InvalidNodeStateException {
NodeStatus nodeStatus = getNodeStatus(dn);
NodeOperationalState opState = nodeStatus.getOperationalState();
if (opState == NodeOperationalState.IN_SERVICE) {
LOG.info("Starting Decommission for node {}", dn);
nodeManager.setNodeOperationalState(
dn, NodeOperationalState.DECOMMISSIONING);
monitor.startMonitoring(dn);
} else if (nodeStatus.isDecommission()) {
LOG.info("Start Decommission called on node {} in state {}. Nothing to "+
"do.", dn, opState);
} else {
LOG.error("Cannot decommission node {} in state {}", dn, opState);
throw new InvalidNodeStateException("Cannot decommission node "+
dn +" in state "+ opState);
}
}
public synchronized List<DatanodeAdminError> recommissionNodes(
List<String> nodes) throws InvalidHostStringException {
List<DatanodeDetails> dns = mapHostnamesToDatanodes(nodes);
List<DatanodeAdminError> errors = new ArrayList<>();
for (DatanodeDetails dn : dns) {
try {
recommission(dn);
} catch (NodeNotFoundException e) {
// We already validated the host strings and retrieved the DnDetails
// object from the node manager. Therefore we should never get a
// NodeNotFoundException here expect if the node is remove in the
// very short window between validation and starting decom. Therefore
// log a warning and ignore the exception
LOG.warn("Host {} was not found in SCM. Ignoring the request to "+
"recommission it.", dn.getHostName());
errors.add(new DatanodeAdminError(dn.getHostName(),
"The host was not found in SCM"));
}
}
return errors;
}
public synchronized void recommission(DatanodeDetails dn)
throws NodeNotFoundException{
NodeStatus nodeStatus = getNodeStatus(dn);
NodeOperationalState opState = nodeStatus.getOperationalState();
if (opState != NodeOperationalState.IN_SERVICE) {
// The node will be set back to IN_SERVICE when it is processed by the
// monitor
monitor.stopMonitoring(dn);
LOG.info("Queued node {} for recommission", dn);
} else {
LOG.info("Recommission called on node {} with state {}. "+
"Nothing to do.", dn, opState);
}
}
public synchronized List<DatanodeAdminError> startMaintenanceNodes(
List<String> nodes, int endInHours) throws InvalidHostStringException {
List<DatanodeDetails> dns = mapHostnamesToDatanodes(nodes);
List<DatanodeAdminError> errors = new ArrayList<>();
for (DatanodeDetails dn : dns) {
try {
startMaintenance(dn, endInHours);
} catch (NodeNotFoundException e) {
// We already validated the host strings and retrieved the DnDetails
// object from the node manager. Therefore we should never get a
// NodeNotFoundException here expect if the node is remove in the
// very short window between validation and starting decom. Therefore
// log a warning and ignore the exception
LOG.warn("The host {} was not found in SCM. Ignoring the request to "+
"start maintenance on it", dn.getHostName());
} catch (InvalidNodeStateException e) {
errors.add(new DatanodeAdminError(dn.getHostName(), e.getMessage()));
}
}
return errors;
}
// TODO - If startMaintenance is called on a host already in maintenance,
// then we should update the end time?
public synchronized void startMaintenance(DatanodeDetails dn, int endInHours)
throws NodeNotFoundException, InvalidNodeStateException {
NodeStatus nodeStatus = getNodeStatus(dn);
NodeOperationalState opState = nodeStatus.getOperationalState();
long maintenanceEnd = 0;
if (endInHours != 0) {
maintenanceEnd =
(System.currentTimeMillis() / 1000L) + (endInHours * 60L * 60L);
}
if (opState == NodeOperationalState.IN_SERVICE) {
nodeManager.setNodeOperationalState(
dn, NodeOperationalState.ENTERING_MAINTENANCE, maintenanceEnd);
monitor.startMonitoring(dn);
LOG.info("Starting Maintenance for node {}", dn);
} else if (nodeStatus.isMaintenance()) {
LOG.info("Starting Maintenance called on node {} with state {}. "+
"Nothing to do.", dn, opState);
} else {
LOG.error("Cannot start maintenance on node {} in state {}", dn, opState);
throw new InvalidNodeStateException("Cannot start maintenance on node "+
dn +" in state "+ opState);
}
}
/**
* Stops the decommission monitor from running when SCM is shutdown.
*/
public void stop() {
if (executor != null) {
executor.shutdown();
}
}
private NodeStatus getNodeStatus(DatanodeDetails dn)
throws NodeNotFoundException {
return nodeManager.getNodeStatus(dn);
}
/**
* Called in SCMStateMachine#notifyLeaderChanged when current SCM becomes
* leader.
*/
public void onBecomeLeader() {
nodeManager.getAllNodes().forEach(datanodeDetails -> {
try {
continueAdminForNode(datanodeDetails);
} catch (NodeNotFoundException e) {
// Should not happen, as the node has just registered to call this event
// handler.
LOG.warn("NodeNotFound when adding the node to the decommissionManager",
e);
}
});
}
}