blob: 36a6f154ad51faa98589f125a3ec5e3f445c7fc5 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hdds.scm.node;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import org.apache.hadoop.hdds.protocol.proto
.StorageContainerDatanodeProtocolProtos.PipelineReportsProto;
import org.apache.hadoop.hdds.scm.container.ContainerID;
import org.apache.hadoop.hdds.scm.container.common.helpers.Pipeline;
import org.apache.hadoop.hdds.scm.container.common.helpers.PipelineID;
import org.apache.hadoop.hdds.scm.exceptions.SCMException;
import org.apache.hadoop.hdds.scm.node.states.NodeAlreadyExistsException;
import org.apache.hadoop.hdds.scm.node.states.NodeNotFoundException;
import org.apache.hadoop.hdds.scm.node.states.ReportResult;
import org.apache.hadoop.hdds.scm.server.StorageContainerManager;
import org.apache.hadoop.hdds.scm.VersionInfo;
import org.apache.hadoop.hdds.scm.container.placement.metrics.SCMNodeMetric;
import org.apache.hadoop.hdds.scm.container.placement.metrics.SCMNodeStat;
import org.apache.hadoop.hdds.server.events.EventPublisher;
import org.apache.hadoop.hdds.conf.OzoneConfiguration;
import org.apache.hadoop.hdds.protocol.DatanodeDetails;
import org.apache.hadoop.hdds.protocol.proto.HddsProtos.NodeState;
import org.apache.hadoop.hdds.protocol.proto
.StorageContainerDatanodeProtocolProtos.NodeReportProto;
import org.apache.hadoop.hdds.protocol.proto
.StorageContainerDatanodeProtocolProtos.SCMRegisteredResponseProto
.ErrorCode;
import org.apache.hadoop.hdds.protocol.proto
.StorageContainerDatanodeProtocolProtos.StorageReportProto;
import org.apache.hadoop.hdds.protocol.proto
.StorageContainerDatanodeProtocolProtos.SCMVersionRequestProto;
import org.apache.hadoop.ipc.Server;
import org.apache.hadoop.metrics2.util.MBeans;
import org.apache.hadoop.ozone.OzoneConsts;
import org.apache.hadoop.ozone.protocol.StorageContainerNodeProtocol;
import org.apache.hadoop.ozone.protocol.VersionResponse;
import org.apache.hadoop.ozone.protocol.commands.CommandForDatanode;
import org.apache.hadoop.ozone.protocol.commands.RegisteredCommand;
import org.apache.hadoop.ozone.protocol.commands.ReregisterCommand;
import org.apache.hadoop.ozone.protocol.commands.SCMCommand;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import javax.management.ObjectName;
import java.io.IOException;
import java.net.InetAddress;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.atomic.AtomicBoolean;
/**
* Maintains information about the Datanodes on SCM side.
* <p>
* Heartbeats under SCM is very simple compared to HDFS heartbeatManager.
* <p>
* The getNode(byState) functions make copy of node maps and then creates a list
* based on that. It should be assumed that these get functions always report
* *stale* information. For example, getting the deadNodeCount followed by
* getNodes(DEAD) could very well produce totally different count. Also
* getNodeCount(HEALTHY) + getNodeCount(DEAD) + getNodeCode(STALE), is not
* guaranteed to add up to the total nodes that we know off. Please treat all
* get functions in this file as a snap-shot of information that is inconsistent
* as soon as you read it.
*/
public class SCMNodeManager
implements NodeManager, StorageContainerNodeProtocol {
@VisibleForTesting
static final Logger LOG =
LoggerFactory.getLogger(SCMNodeManager.class);
private final NodeStateManager nodeStateManager;
// Should we maintain aggregated stats? If this is not frequently used, we
// can always calculate it from nodeStats whenever required.
// Aggregated node stats
private SCMNodeStat scmStat;
// Should we create ChillModeManager and extract all the chill mode logic
// to a new class?
private int chillModeNodeCount;
private final String clusterID;
private final VersionInfo version;
/**
* During start up of SCM, it will enter into chill mode and will be there
* until number of Datanodes registered reaches {@code chillModeNodeCount}.
* This flag is for tracking startup chill mode.
*/
private AtomicBoolean inStartupChillMode;
/**
* Administrator can put SCM into chill mode manually.
* This flag is for tracking manual chill mode.
*/
private AtomicBoolean inManualChillMode;
private final CommandQueue commandQueue;
// Node manager MXBean
private ObjectName nmInfoBean;
// Node pool manager.
private final StorageContainerManager scmManager;
/**
* Constructs SCM machine Manager.
*/
public SCMNodeManager(OzoneConfiguration conf, String clusterID,
StorageContainerManager scmManager, EventPublisher eventPublisher)
throws IOException {
this.nodeStateManager = new NodeStateManager(conf, eventPublisher);
this.scmStat = new SCMNodeStat();
this.clusterID = clusterID;
this.version = VersionInfo.getLatestVersion();
this.commandQueue = new CommandQueue();
// TODO: Support this value as a Percentage of known machines.
this.chillModeNodeCount = 1;
this.inStartupChillMode = new AtomicBoolean(true);
this.inManualChillMode = new AtomicBoolean(false);
this.scmManager = scmManager;
LOG.info("Entering startup chill mode.");
registerMXBean();
}
private void registerMXBean() {
this.nmInfoBean = MBeans.register("SCMNodeManager",
"SCMNodeManagerInfo", this);
}
private void unregisterMXBean() {
if(this.nmInfoBean != null) {
MBeans.unregister(this.nmInfoBean);
this.nmInfoBean = null;
}
}
/**
* Removes a data node from the management of this Node Manager.
*
* @param node - DataNode.
* @throws NodeNotFoundException
*/
@Override
public void removeNode(DatanodeDetails node) throws NodeNotFoundException {
nodeStateManager.removeNode(node);
}
/**
* Gets all datanodes that are in a certain state. This function works by
* taking a snapshot of the current collection and then returning the list
* from that collection. This means that real map might have changed by the
* time we return this list.
*
* @return List of Datanodes that are known to SCM in the requested state.
*/
@Override
public List<DatanodeDetails> getNodes(NodeState nodestate) {
return nodeStateManager.getNodes(nodestate);
}
/**
* Returns all datanodes that are known to SCM.
*
* @return List of DatanodeDetails
*/
@Override
public List<DatanodeDetails> getAllNodes() {
return nodeStateManager.getAllNodes();
}
/**
* Get the minimum number of nodes to get out of Chill mode.
*
* @return int
*/
@Override
public int getMinimumChillModeNodes() {
return chillModeNodeCount;
}
/**
* Sets the Minimum chill mode nodes count, used only in testing.
*
* @param count - Number of nodes.
*/
@VisibleForTesting
public void setMinimumChillModeNodes(int count) {
chillModeNodeCount = count;
}
/**
* Returns chill mode Status string.
* @return String
*/
@Override
public String getChillModeStatus() {
if (inStartupChillMode.get()) {
return "Still in chill mode, waiting on nodes to report in." +
String.format(" %d nodes reported, minimal %d nodes required.",
nodeStateManager.getTotalNodeCount(), getMinimumChillModeNodes());
}
if (inManualChillMode.get()) {
return "Out of startup chill mode, but in manual chill mode." +
String.format(" %d nodes have reported in.",
nodeStateManager.getTotalNodeCount());
}
return "Out of chill mode." +
String.format(" %d nodes have reported in.",
nodeStateManager.getTotalNodeCount());
}
/**
* Forcefully exits the chill mode even if we have not met the minimum
* criteria of exiting the chill mode. This will exit from both startup
* and manual chill mode.
*/
@Override
public void forceExitChillMode() {
if(inStartupChillMode.get()) {
LOG.info("Leaving startup chill mode.");
inStartupChillMode.set(false);
}
if(inManualChillMode.get()) {
LOG.info("Leaving manual chill mode.");
inManualChillMode.set(false);
}
}
/**
* Puts the node manager into manual chill mode.
*/
@Override
public void enterChillMode() {
LOG.info("Entering manual chill mode.");
inManualChillMode.set(true);
}
/**
* Brings node manager out of manual chill mode.
*/
@Override
public void exitChillMode() {
LOG.info("Leaving manual chill mode.");
inManualChillMode.set(false);
}
/**
* Returns true if node manager is out of chill mode, else false.
* @return true if out of chill mode, else false
*/
@Override
public boolean isOutOfChillMode() {
return !(inStartupChillMode.get() || inManualChillMode.get());
}
/**
* Returns the Number of Datanodes by State they are in.
*
* @return int -- count
*/
@Override
public int getNodeCount(NodeState nodestate) {
return nodeStateManager.getNodeCount(nodestate);
}
/**
* Returns the node state of a specific node.
*
* @param datanodeDetails - Datanode Details
* @return Healthy/Stale/Dead/Unknown.
*/
@Override
public NodeState getNodeState(DatanodeDetails datanodeDetails) {
try {
return nodeStateManager.getNodeState(datanodeDetails);
} catch (NodeNotFoundException e) {
// TODO: should we throw NodeNotFoundException?
return null;
}
}
private void updateNodeStat(UUID dnId, NodeReportProto nodeReport) {
SCMNodeStat stat;
try {
stat = nodeStateManager.getNodeStat(dnId);
} catch (NodeNotFoundException e) {
LOG.debug("SCM updateNodeStat based on heartbeat from previous" +
"dead datanode {}", dnId);
stat = new SCMNodeStat();
}
if (nodeReport != null && nodeReport.getStorageReportCount() > 0) {
long totalCapacity = 0;
long totalRemaining = 0;
long totalScmUsed = 0;
List<StorageReportProto> storageReports = nodeReport
.getStorageReportList();
for (StorageReportProto report : storageReports) {
totalCapacity += report.getCapacity();
totalRemaining += report.getRemaining();
totalScmUsed+= report.getScmUsed();
}
scmStat.subtract(stat);
stat.set(totalCapacity, totalScmUsed, totalRemaining);
scmStat.add(stat);
}
nodeStateManager.setNodeStat(dnId, stat);
}
/**
* Closes this stream and releases any system resources associated with it. If
* the stream is already closed then invoking this method has no effect.
*
* @throws IOException if an I/O error occurs
*/
@Override
public void close() throws IOException {
unregisterMXBean();
}
/**
* Gets the version info from SCM.
*
* @param versionRequest - version Request.
* @return - returns SCM version info and other required information needed by
* datanode.
*/
@Override
public VersionResponse getVersion(SCMVersionRequestProto versionRequest) {
return VersionResponse.newBuilder()
.setVersion(this.version.getVersion())
.addValue(OzoneConsts.SCM_ID,
this.scmManager.getScmStorage().getScmId())
.addValue(OzoneConsts.CLUSTER_ID, this.scmManager.getScmStorage()
.getClusterID())
.build();
}
/**
* Register the node if the node finds that it is not registered with any
* SCM.
*
* @param datanodeDetails - Send datanodeDetails with Node info.
* This function generates and assigns new datanode ID
* for the datanode. This allows SCM to be run independent
* of Namenode if required.
* @param nodeReport NodeReport.
*
* @return SCMHeartbeatResponseProto
*/
@Override
public RegisteredCommand register(
DatanodeDetails datanodeDetails, NodeReportProto nodeReport,
PipelineReportsProto pipelineReportsProto) {
InetAddress dnAddress = Server.getRemoteIp();
if (dnAddress != null) {
// Mostly called inside an RPC, update ip and peer hostname
datanodeDetails.setHostName(dnAddress.getHostName());
datanodeDetails.setIpAddress(dnAddress.getHostAddress());
}
UUID dnId = datanodeDetails.getUuid();
try {
nodeStateManager.addNode(datanodeDetails);
nodeStateManager.setNodeStat(dnId, new SCMNodeStat());
if(inStartupChillMode.get() &&
nodeStateManager.getTotalNodeCount() >= getMinimumChillModeNodes()) {
inStartupChillMode.getAndSet(false);
LOG.info("Leaving startup chill mode.");
}
// Updating Node Report, as registration is successful
updateNodeStat(datanodeDetails.getUuid(), nodeReport);
LOG.info("Data node with ID: {} Registered.", datanodeDetails.getUuid());
} catch (NodeAlreadyExistsException e) {
LOG.trace("Datanode is already registered. Datanode: {}",
datanodeDetails.toString());
}
return RegisteredCommand.newBuilder().setErrorCode(ErrorCode.success)
.setDatanodeUUID(datanodeDetails.getUuidString())
.setClusterID(this.clusterID)
.setHostname(datanodeDetails.getHostName())
.setIpAddress(datanodeDetails.getIpAddress())
.build();
}
/**
* Send heartbeat to indicate the datanode is alive and doing well.
*
* @param datanodeDetails - DatanodeDetailsProto.
* @return SCMheartbeat response.
* @throws IOException
*/
@Override
public List<SCMCommand> processHeartbeat(DatanodeDetails datanodeDetails) {
Preconditions.checkNotNull(datanodeDetails, "Heartbeat is missing " +
"DatanodeDetails.");
try {
nodeStateManager.updateLastHeartbeatTime(datanodeDetails);
} catch (NodeNotFoundException e) {
LOG.warn("SCM receive heartbeat from unregistered datanode {}",
datanodeDetails);
commandQueue.addCommand(datanodeDetails.getUuid(),
new ReregisterCommand());
}
return commandQueue.getCommand(datanodeDetails.getUuid());
}
/**
* Process node report.
*
* @param dnUuid
* @param nodeReport
*/
@Override
public void processNodeReport(UUID dnUuid, NodeReportProto nodeReport) {
this.updateNodeStat(dnUuid, nodeReport);
}
/**
* Returns the aggregated node stats.
* @return the aggregated node stats.
*/
@Override
public SCMNodeStat getStats() {
return new SCMNodeStat(this.scmStat);
}
/**
* Return a map of node stats.
* @return a map of individual node stats (live/stale but not dead).
*/
@Override
public Map<UUID, SCMNodeStat> getNodeStats() {
return nodeStateManager.getNodeStatsMap();
}
/**
* Return the node stat of the specified datanode.
* @param datanodeDetails - datanode ID.
* @return node stat if it is live/stale, null if it is decommissioned or
* doesn't exist.
*/
@Override
public SCMNodeMetric getNodeStat(DatanodeDetails datanodeDetails) {
try {
return new SCMNodeMetric(
nodeStateManager.getNodeStat(datanodeDetails.getUuid()));
} catch (NodeNotFoundException e) {
LOG.info("SCM getNodeStat from a decommissioned or removed datanode {}",
datanodeDetails.getUuid());
return null;
}
}
@Override
public Map<String, Integer> getNodeCount() {
Map<String, Integer> nodeCountMap = new HashMap<String, Integer>();
for(NodeState state : NodeState.values()) {
nodeCountMap.put(state.toString(), getNodeCount(state));
}
return nodeCountMap;
}
/**
* Get set of pipelines a datanode is part of.
* @param dnId - datanodeID
* @return Set of PipelineID
*/
@Override
public Set<PipelineID> getPipelineByDnID(UUID dnId) {
return nodeStateManager.getPipelineByDnID(dnId);
}
/**
* Add pipeline information in the NodeManager.
* @param pipeline - Pipeline to be added
*/
@Override
public void addPipeline(Pipeline pipeline) {
nodeStateManager.addPipeline(pipeline);
}
/**
* Remove a pipeline information from the NodeManager.
* @param pipeline - Pipeline to be removed
*/
@Override
public void removePipeline(Pipeline pipeline) {
nodeStateManager.removePipeline(pipeline);
}
/**
* Update set of containers available on a datanode.
* @param uuid - DatanodeID
* @param containerIds - Set of containerIDs
* @throws SCMException - if datanode is not known. For new datanode use
* addDatanodeInContainerMap call.
*/
@Override
public void setContainersForDatanode(UUID uuid,
Set<ContainerID> containerIds) throws SCMException {
nodeStateManager.setContainersForDatanode(uuid, containerIds);
}
/**
* Process containerReport received from datanode.
* @param uuid - DataonodeID
* @param containerIds - Set of containerIDs
* @return The result after processing containerReport
*/
@Override
public ReportResult<ContainerID> processContainerReport(UUID uuid,
Set<ContainerID> containerIds) {
return nodeStateManager.processContainerReport(uuid, containerIds);
}
/**
* Return set of containerIDs available on a datanode.
* @param uuid - DatanodeID
* @return - set of containerIDs
*/
@Override
public Set<ContainerID> getContainers(UUID uuid) {
return nodeStateManager.getContainers(uuid);
}
/**
* Insert a new datanode with set of containerIDs for containers available
* on it.
* @param uuid - DatanodeID
* @param containerIDs - Set of ContainerIDs
* @throws SCMException - if datanode already exists
*/
@Override
public void addDatanodeInContainerMap(UUID uuid,
Set<ContainerID> containerIDs) throws SCMException {
nodeStateManager.addDatanodeInContainerMap(uuid, containerIDs);
}
// TODO:
// Since datanode commands are added through event queue, onMessage method
// should take care of adding commands to command queue.
// Refactor and remove all the usage of this method and delete this method.
@Override
public void addDatanodeCommand(UUID dnId, SCMCommand command) {
this.commandQueue.addCommand(dnId, command);
}
/**
* This method is called by EventQueue whenever someone adds a new
* DATANODE_COMMAND to the Queue.
*
* @param commandForDatanode DatanodeCommand
* @param ignored publisher
*/
@Override
public void onMessage(CommandForDatanode commandForDatanode,
EventPublisher ignored) {
addDatanodeCommand(commandForDatanode.getDatanodeId(),
commandForDatanode.getCommand());
}
/**
* Update the node stats and cluster storage stats in this SCM Node Manager.
*
* @param dnUuid datanode uuid.
*/
@Override
public void processDeadNode(UUID dnUuid) {
try {
SCMNodeStat stat = nodeStateManager.getNodeStat(dnUuid);
if (stat != null) {
LOG.trace("Update stat values as Datanode {} is dead.", dnUuid);
scmStat.subtract(stat);
stat.set(0, 0, 0);
}
} catch (NodeNotFoundException e) {
LOG.warn("Can't update stats based on message of dead Datanode {}, it"
+ " doesn't exist or decommissioned already.", dnUuid);
}
}
}