blob: 1983ed606dfffce6af7f5f8814718e9d4771caf5 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hdds.scm.pipeline;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hdds.protocol.DatanodeDetails;
import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
import org.apache.hadoop.hdds.scm.ScmConfigKeys;
import org.apache.hadoop.hdds.scm.SCMCommonPlacementPolicy;
import org.apache.hadoop.hdds.scm.container.placement.metrics.SCMNodeMetric;
import org.apache.hadoop.hdds.scm.exceptions.SCMException;
import org.apache.hadoop.hdds.scm.net.NetworkTopology;
import org.apache.hadoop.hdds.scm.net.Node;
import org.apache.hadoop.hdds.scm.node.NodeManager;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
import java.util.stream.Collectors;
/**
* Pipeline placement policy that choose datanodes based on load balancing
* and network topology to supply pipeline creation.
* <p>
* 1. get a list of healthy nodes
* 2. filter out nodes that are not too heavily engaged in other pipelines
* 3. Choose an anchor node among the viable nodes.
* 4. Choose other nodes around the anchor node based on network topology
*/
public final class PipelinePlacementPolicy extends SCMCommonPlacementPolicy {
@VisibleForTesting
static final Logger LOG =
LoggerFactory.getLogger(PipelinePlacementPolicy.class);
private final NodeManager nodeManager;
private final Configuration conf;
private final int heavyNodeCriteria;
/**
* Constructs a pipeline placement with considering network topology,
* load balancing and rack awareness.
*
* @param nodeManager Node Manager
* @param conf Configuration
*/
public PipelinePlacementPolicy(
final NodeManager nodeManager, final Configuration conf) {
super(nodeManager, conf);
this.nodeManager = nodeManager;
this.conf = conf;
heavyNodeCriteria = conf.getInt(
ScmConfigKeys.OZONE_DATANODE_MAX_PIPELINE_ENGAGEMENT,
ScmConfigKeys.OZONE_DATANODE_MAX_PIPELINE_ENGAGEMENT_DEFAULT);
}
/**
* Returns true if this node meets the criteria.
*
* @param datanodeDetails DatanodeDetails
* @return true if we have enough space.
*/
@VisibleForTesting
boolean meetCriteria(DatanodeDetails datanodeDetails, long heavyNodeLimit) {
return (nodeManager.getPipelinesCount(datanodeDetails) <= heavyNodeLimit);
}
/**
* Filter out viable nodes based on
* 1. nodes that are healthy
* 2. nodes that are not too heavily engaged in other pipelines
*
* @param excludedNodes - excluded nodes
* @param nodesRequired - number of datanodes required.
* @return a list of viable nodes
* @throws SCMException when viable nodes are not enough in numbers
*/
List<DatanodeDetails> filterViableNodes(
List<DatanodeDetails> excludedNodes, int nodesRequired)
throws SCMException {
// get nodes in HEALTHY state
List<DatanodeDetails> healthyNodes =
nodeManager.getNodes(HddsProtos.NodeState.HEALTHY);
if (excludedNodes != null) {
healthyNodes.removeAll(excludedNodes);
}
String msg;
if (healthyNodes.size() == 0) {
msg = "No healthy node found to allocate pipeline.";
LOG.error(msg);
throw new SCMException(msg, SCMException.ResultCodes
.FAILED_TO_FIND_HEALTHY_NODES);
}
if (healthyNodes.size() < nodesRequired) {
msg = String.format("Not enough healthy nodes to allocate pipeline. %d "
+ " datanodes required. Found %d",
nodesRequired, healthyNodes.size());
LOG.error(msg);
throw new SCMException(msg,
SCMException.ResultCodes.FAILED_TO_FIND_SUITABLE_NODE);
}
// filter nodes that meet the size and pipeline engagement criteria.
// Pipeline placement doesn't take node space left into account.
List<DatanodeDetails> healthyList = healthyNodes.stream().filter(d ->
meetCriteria(d, heavyNodeCriteria)).collect(Collectors.toList());
if (healthyList.size() < nodesRequired) {
msg = String.format("Unable to find enough nodes that meet " +
"the criteria that cannot engage in more than %d pipelines." +
" Nodes required: %d Found: %d",
heavyNodeCriteria, nodesRequired, healthyList.size());
LOG.error(msg);
throw new SCMException(msg,
SCMException.ResultCodes.FAILED_TO_FIND_SUITABLE_NODE);
}
return healthyList;
}
/**
* Pipeline placement choose datanodes to join the pipeline.
*
* @param excludedNodes - excluded nodes
* @param favoredNodes - list of nodes preferred.
* @param nodesRequired - number of datanodes required.
* @param sizeRequired - size required for the container or block.
* @return a list of chosen datanodeDetails
* @throws SCMException when chosen nodes are not enough in numbers
*/
@Override
public List<DatanodeDetails> chooseDatanodes(
List<DatanodeDetails> excludedNodes, List<DatanodeDetails> favoredNodes,
int nodesRequired, final long sizeRequired) throws SCMException {
// Get a list of viable nodes based on criteria
// and make sure excludedNodes are excluded from list.
List<DatanodeDetails> healthyNodes =
filterViableNodes(excludedNodes, nodesRequired);
// Randomly picks nodes when all nodes are equal.
// This happens when network topology is absent or
// all nodes are on the same rack.
if (checkAllNodesAreEqual(nodeManager.getClusterNetworkTopologyMap())) {
LOG.info("All nodes are considered equal. Now randomly pick nodes. " +
"Required nodes: {}", nodesRequired);
return super.getResultSet(nodesRequired, healthyNodes);
} else {
// Since topology and rack awareness are available, picks nodes
// based on them.
return this.getResultSet(nodesRequired, healthyNodes);
}
}
/**
* Get result set based on the pipeline placement algorithm which considers
* network topology and rack awareness.
* @param nodesRequired - Nodes Required
* @param healthyNodes - List of Nodes in the result set.
* @return a list of datanodes
* @throws SCMException SCMException
*/
@Override
public List<DatanodeDetails> getResultSet(
int nodesRequired, List<DatanodeDetails> healthyNodes)
throws SCMException {
List <DatanodeDetails> results = new ArrayList<>(nodesRequired);
// Since nodes are widely distributed, the results should be selected
// base on distance in topology, rack awareness and load balancing.
List<DatanodeDetails> exclude = new ArrayList<>();
// First choose an anchor nodes randomly
DatanodeDetails anchor = chooseNode(healthyNodes);
if (anchor == null) {
LOG.error("Unable to find the first healthy nodes that " +
"meet the criteria. Required nodes: {}, Found nodes: {}",
nodesRequired, results.size());
throw new SCMException("Unable to find required number of nodes.",
SCMException.ResultCodes.FAILED_TO_FIND_SUITABLE_NODE);
}
results.add(anchor);
exclude.add(anchor);
nodesRequired--;
// Choose the second node on different racks from anchor.
DatanodeDetails nodeOnDifferentRack = chooseNodeBasedOnRackAwareness(
healthyNodes, exclude,
nodeManager.getClusterNetworkTopologyMap(), anchor);
if (nodeOnDifferentRack == null) {
LOG.error("Unable to find nodes on different racks that " +
"meet the criteria. Required nodes: {}, Found nodes: {}",
nodesRequired, results.size());
throw new SCMException("Unable to find required number of nodes.",
SCMException.ResultCodes.FAILED_TO_FIND_SUITABLE_NODE);
}
results.add(nodeOnDifferentRack);
exclude.add(nodeOnDifferentRack);
nodesRequired--;
// Then choose nodes close to anchor based on network topology
for (int x = 0; x < nodesRequired; x++) {
// invoke the choose function defined in the derived classes.
DatanodeDetails pick = chooseNodeFromNetworkTopology(
nodeManager.getClusterNetworkTopologyMap(), anchor, exclude);
if (pick != null) {
results.add(pick);
// exclude the picked node for next time
exclude.add(pick);
}
}
if (results.size() < nodesRequired) {
LOG.error("Unable to find the required number of healthy nodes that " +
"meet the criteria. Required nodes: {}, Found nodes: {}",
nodesRequired, results.size());
throw new SCMException("Unable to find required number of nodes.",
SCMException.ResultCodes.FAILED_TO_FIND_SUITABLE_NODE);
}
return results;
}
/**
* Find a node from the healthy list and return it after removing it from the
* list that we are operating on.
*
* @param healthyNodes - Set of healthy nodes we can choose from.
* @return chosen datanodDetails
*/
@Override
public DatanodeDetails chooseNode(
List<DatanodeDetails> healthyNodes) {
int firstNodeNdx = getRand().nextInt(healthyNodes.size());
int secondNodeNdx = getRand().nextInt(healthyNodes.size());
DatanodeDetails datanodeDetails;
// There is a possibility that both numbers will be same.
// if that is so, we just return the node.
if (firstNodeNdx == secondNodeNdx) {
datanodeDetails = healthyNodes.get(firstNodeNdx);
} else {
DatanodeDetails firstNodeDetails = healthyNodes.get(firstNodeNdx);
DatanodeDetails secondNodeDetails = healthyNodes.get(secondNodeNdx);
SCMNodeMetric firstNodeMetric =
nodeManager.getNodeStat(firstNodeDetails);
SCMNodeMetric secondNodeMetric =
nodeManager.getNodeStat(secondNodeDetails);
datanodeDetails = firstNodeMetric.isGreater(secondNodeMetric.get())
? firstNodeDetails : secondNodeDetails;
}
// the pick is decided and it should be removed from candidates.
healthyNodes.remove(datanodeDetails);
return datanodeDetails;
}
/**
* Choose node on different racks as anchor is on based on rack awareness.
* If a node on different racks cannot be found, then return a random node.
* @param healthyNodes healthy nodes
* @param excludedNodes excluded nodes
* @param networkTopology network topology
* @param anchor anchor node
* @return a node on different rack
*/
@VisibleForTesting
protected DatanodeDetails chooseNodeBasedOnRackAwareness(
List<DatanodeDetails> healthyNodes, List<DatanodeDetails> excludedNodes,
NetworkTopology networkTopology, DatanodeDetails anchor) {
Preconditions.checkArgument(networkTopology != null);
if (checkAllNodesAreEqual(networkTopology)) {
return null;
}
for (DatanodeDetails node : healthyNodes) {
if (excludedNodes.contains(node)
|| networkTopology.isSameParent(anchor, node)) {
continue;
} else {
// the pick is decided and it should be removed from candidates.
healthyNodes.remove(node);
return node;
}
}
return null;
}
/**
* Check if all nodes are equal in topology.
* They are equal when network topology is absent or there are on
* the same rack.
* @param topology network topology
* @return true when all nodes are equal
*/
private boolean checkAllNodesAreEqual(NetworkTopology topology) {
if (topology == null) {
return true;
}
return (topology.getNumOfNodes(topology.getMaxLevel() - 1) == 1);
}
/**
* Choose node based on network topology.
* @param networkTopology network topology
* @param anchor anchor datanode to start with
* @param excludedNodes excluded datanodes
* @return chosen datanode
*/
@VisibleForTesting
protected DatanodeDetails chooseNodeFromNetworkTopology(
NetworkTopology networkTopology, DatanodeDetails anchor,
List<DatanodeDetails> excludedNodes) {
Preconditions.checkArgument(networkTopology != null);
Collection<Node> excluded = new ArrayList<>();
if (excludedNodes != null && excludedNodes.size() != 0) {
excluded.addAll(excludedNodes);
}
excluded.add(anchor);
Node pick = networkTopology.chooseRandom(
anchor.getNetworkLocation(), excluded);
DatanodeDetails pickedNode = (DatanodeDetails) pick;
// exclude the picked node for next time
if (excludedNodes != null) {
excludedNodes.add(pickedNode);
}
return pickedNode;
}
}