blob: 0b8a02b74ce6d0aa1076efe8f43caf17059e652a [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.bookkeeper.net;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.HashSet;
import java.util.List;
import java.util.Random;
import java.util.Set;
import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* The class represents a cluster of computer with a tree hierarchical
* network topology.
* For example, a cluster may be consists of many data centers filled
* with racks of computers.
* In a network topology, leaves represent data nodes (computers) and inner
* nodes represent switches/routers that manage traffic in/out of data centers
* or racks.
*
*/
public class NetworkTopologyImpl implements NetworkTopology {
public static final int DEFAULT_HOST_LEVEL = 2;
public static final Logger LOG = LoggerFactory.getLogger(NetworkTopologyImpl.class);
public static final String NODE_SEPARATOR = ",";
/**
* A marker for an InvalidTopology Exception.
*/
public static class InvalidTopologyException extends RuntimeException {
private static final long serialVersionUID = 1L;
public InvalidTopologyException(String msg) {
super(msg);
}
}
/** InnerNode represents a switch/router of a data center or rack.
* Different from a leaf node, it has non-null children.
*/
static class InnerNode extends NodeBase {
protected List<Node> children = new ArrayList<Node>();
private int numOfLeaves;
/**
* Construct an InnerNode from a path-like string.
*/
InnerNode(String path) {
super(path);
}
/**
* Construct an InnerNode from its name and its network location.
*/
InnerNode(String name, String location) {
super(name, location);
}
/**
* Construct an InnerNode from its name, its network location, its parent, and its level.
*/
InnerNode(String name, String location, InnerNode parent, int level) {
super(name, location, parent, level);
}
/**
* @return its children
*/
List<Node> getChildren() {
return children;
}
/**
* @return the number of children this node has
*/
int getNumOfChildren() {
return children.size();
}
/**
* Judge if this node represents a rack.
* @return true if it has no child or its children are not InnerNodes
*/
boolean isRack() {
if (children.isEmpty()) {
return true;
}
Node firstChild = children.get(0);
return !(firstChild instanceof InnerNode);
}
/**
* Judge if this node is an ancestor of node <i>n</i>.
*
* @param n a node
* @return true if this node is an ancestor of <i>n</i>
*/
boolean isAncestor(Node n) {
return getPath(this).equals(NodeBase.PATH_SEPARATOR_STR)
|| (n.getNetworkLocation() + NodeBase.PATH_SEPARATOR_STR).startsWith(getPath(this)
+ NodeBase.PATH_SEPARATOR_STR);
}
/**
* Judge if this node is the parent of node <i>n</i>.
*
* @param n a node
* @return true if this node is the parent of <i>n</i>
*/
boolean isParent(Node n) {
return n.getNetworkLocation().equals(getPath(this));
}
/**
* Return a child name of this node who is an ancestor of node <i>n</i>.
*/
private String getNextAncestorName(Node n) {
if (!isAncestor(n)) {
throw new IllegalArgumentException(this + "is not an ancestor of " + n);
}
String name = n.getNetworkLocation().substring(getPath(this).length());
if (name.charAt(0) == PATH_SEPARATOR) {
name = name.substring(1);
}
int index = name.indexOf(PATH_SEPARATOR);
if (index != -1) {
name = name.substring(0, index);
}
return name;
}
/**
* Add node <i>n</i> to the subtree of this node.
* @param n node to be added
* @return true if the node is added; false otherwise
*/
boolean add(Node n) {
if (!isAncestor(n)) {
throw new IllegalArgumentException(n.getName() + ", which is located at " + n.getNetworkLocation()
+ ", is not a decendent of " + getPath(this));
}
if (isParent(n)) {
// this node is the parent of n; add n directly
n.setParent(this);
n.setLevel(this.level + 1);
for (int i = 0; i < children.size(); i++) {
if (children.get(i).getName().equals(n.getName())) {
children.set(i, n);
return false;
}
}
children.add(n);
numOfLeaves++;
return true;
} else {
// find the next ancestor node
String parentName = getNextAncestorName(n);
InnerNode parentNode = null;
for (int i = 0; i < children.size(); i++) {
if (children.get(i).getName().equals(parentName)) {
parentNode = (InnerNode) children.get(i);
break;
}
}
if (parentNode == null) {
// create a new InnerNode
parentNode = createParentNode(parentName);
children.add(parentNode);
}
// add n to the subtree of the next ancestor node
if (parentNode.add(n)) {
numOfLeaves++;
return true;
} else {
return false;
}
}
}
/**
* Creates a parent node to be added to the list of children.
* Creates a node using the InnerNode four argument constructor specifying
* the name, location, parent, and level of this node.
*
* <p>To be overridden in subclasses for specific InnerNode implementations,
* as alternative to overriding the full {@link #add(Node)} method.
*
* @param parentName The name of the parent node
* @return A new inner node
* @see InnerNode#InnerNode(String, String, InnerNode, int)
*/
protected InnerNode createParentNode(String parentName) {
return new InnerNode(parentName, getPath(this), this, this.getLevel() + 1);
}
/**
* Remove node <i>n</i> from the subtree of this node.
* @param n node to be deleted
* @return true if the node is deleted; false otherwise
*/
boolean remove(Node n) {
String parent = n.getNetworkLocation();
String currentPath = getPath(this);
if (!isAncestor(n)) {
throw new IllegalArgumentException(n.getName() + ", which is located at " + parent
+ ", is not a descendent of " + currentPath);
}
if (isParent(n)) {
// this node is the parent of n; remove n directly
for (int i = 0; i < children.size(); i++) {
if (children.get(i).getName().equals(n.getName())) {
children.remove(i);
numOfLeaves--;
n.setParent(null);
return true;
}
}
return false;
} else {
// find the next ancestor node: the parent node
String parentName = getNextAncestorName(n);
InnerNode parentNode = null;
int i;
for (i = 0; i < children.size(); i++) {
if (children.get(i).getName().equals(parentName)) {
parentNode = (InnerNode) children.get(i);
break;
}
}
if (parentNode == null) {
return false;
}
// remove n from the parent node
boolean isRemoved = parentNode.remove(n);
// if the parent node has no children, remove the parent node too
if (isRemoved) {
if (parentNode.getNumOfChildren() == 0) {
children.remove(i);
}
numOfLeaves--;
}
return isRemoved;
}
} // end of remove
/**
* Given a node's string representation, return a reference to the node.
* @param loc string location of the form /rack/node
* @return null if the node is not found or the childnode is there but
* not an instance of {@link InnerNode}
*/
private Node getLoc(String loc) {
if (loc == null || loc.length() == 0) {
return this;
}
String[] path = loc.split(PATH_SEPARATOR_STR, 2);
Node childnode = null;
for (int i = 0; i < children.size(); i++) {
if (children.get(i).getName().equals(path[0])) {
childnode = children.get(i);
}
}
if (childnode == null) {
return null; // non-existing node
} else if (path.length == 1) {
return childnode;
} else if (childnode instanceof InnerNode) {
return ((InnerNode) childnode).getLoc(path[1]);
} else {
return null;
}
}
/**
* Get <i>leafIndex</i> leaf of this subtree
* if it is not in the <i>excludedNode</i>.
*
* @param leafIndex an indexed leaf of the node
* @param excludedNode an excluded node (can be null)
* @return
*/
Node getLeaf(int leafIndex, Node excludedNode) {
int count = 0;
// check if the excluded node a leaf
boolean isLeaf = excludedNode == null || !(excludedNode instanceof InnerNode);
// calculate the total number of excluded leaf nodes
int numOfExcludedLeaves = isLeaf ? 1 : ((InnerNode) excludedNode).getNumOfLeaves();
if (isLeafParent()) { // children are leaves
if (isLeaf) { // excluded node is a leaf node
int excludedIndex = children.indexOf(excludedNode);
if (excludedIndex != -1 && leafIndex >= 0) {
// excluded node is one of the children so adjust the leaf index
leafIndex = leafIndex >= excludedIndex ? leafIndex + 1 : leafIndex;
}
}
// range check
if (leafIndex < 0 || leafIndex >= this.getNumOfChildren()) {
return null;
}
return children.get(leafIndex);
} else {
for (int i = 0; i < children.size(); i++) {
InnerNode child = (InnerNode) children.get(i);
if (excludedNode == null || excludedNode != child) {
// not the excludedNode
int numOfLeaves = child.getNumOfLeaves();
if (excludedNode != null && child.isAncestor(excludedNode)) {
numOfLeaves -= numOfExcludedLeaves;
}
if (count + numOfLeaves > leafIndex) {
// the leaf is in the child subtree
return child.getLeaf(leafIndex - count, excludedNode);
} else {
// go to the next child
count = count + numOfLeaves;
}
} else { // it is the excluededNode
// skip it and set the excludedNode to be null
excludedNode = null;
}
}
return null;
}
}
protected boolean isLeafParent() {
return isRack();
}
/**
* Determine if children a leaves, default implementation calls {@link #isRack()}.
*
* <p>To be overridden in subclasses for specific InnerNode implementations,
* as alternative to overriding the full {@link #getLeaf(int, Node)} method.
*
* @return true if children are leaves, false otherwise
*/
protected boolean areChildrenLeaves() {
return isRack();
}
/**
* Get number of leaves.
*/
int getNumOfLeaves() {
return numOfLeaves;
}
} // end of InnerNode
/**
* The root cluster map.
*/
InnerNode clusterMap;
/**
* Depth of all leaf nodes.
*/
private int depthOfAllLeaves = -1;
/**
* Rack counter.
*/
protected int numOfRacks = 0;
/**
* The lock used to manage access.
*/
protected ReadWriteLock netlock = new ReentrantReadWriteLock();
public NetworkTopologyImpl() {
clusterMap = new InnerNode(InnerNode.ROOT);
}
/**
* Add a leaf node.
* Update node counter and rack counter if necessary
* @param node node to be added; can be null
* @exception IllegalArgumentException if add a node to a leave
or node to be added is not a leaf
*/
public void add(Node node) {
if (node == null) {
return;
}
String oldTopoStr = this.toString();
if (node instanceof InnerNode) {
throw new IllegalArgumentException("Not allow to add an inner node: " + NodeBase.getPath(node));
}
int newDepth = NodeBase.locationToDepth(node.getNetworkLocation()) + 1;
netlock.writeLock().lock();
try {
if ((depthOfAllLeaves != -1) && (depthOfAllLeaves != newDepth)) {
LOG.error("Error: can't add leaf node {} at depth {} to topology:\n{}", node, newDepth, oldTopoStr);
throw new InvalidTopologyException("Invalid network topology. "
+ "You cannot have a rack and a non-rack node at the same level of the network topology.");
}
Node rack = getNodeForNetworkLocation(node);
if (rack != null && !(rack instanceof InnerNode)) {
LOG.error("Unexpected data node {} at an illegal network location", node);
throw new IllegalArgumentException("Unexpected data node " + node.toString()
+ " at an illegal network location");
}
if (clusterMap.add(node)) {
LOG.info("Adding a new node: " + NodeBase.getPath(node));
if (rack == null) {
numOfRacks++;
}
if (!(node instanceof InnerNode)) {
if (depthOfAllLeaves == -1) {
depthOfAllLeaves = node.getLevel();
}
}
}
if (LOG.isDebugEnabled()) {
LOG.debug("NetworkTopology became:\n" + this.toString());
}
} finally {
netlock.writeLock().unlock();
}
}
/**
* Return a reference to the node given its string representation.
* Default implementation delegates to {@link #getNode(String)}.
*
* <p>To be overridden in subclasses for specific NetworkTopology
* implementations, as alternative to overriding the full {@link #add(Node)}
* method.
*
* @param node The string representation of this node's network location is
* used to retrieve a Node object.
* @return a reference to the node; null if the node is not in the tree
*
* @see #add(Node)
* @see #getNode(String)
*/
protected Node getNodeForNetworkLocation(Node node) {
return getNode(node.getNetworkLocation());
}
/**
* Given a string representation of a rack, return its children.
* @param loc a path-like string representation of a rack
* @return a newly allocated list with all the node's children
*/
public List<Node> getDatanodesInRack(String loc) {
netlock.readLock().lock();
try {
loc = NodeBase.normalize(loc);
if (!NodeBase.ROOT.equals(loc)) {
loc = loc.substring(1);
}
InnerNode rack = (InnerNode) clusterMap.getLoc(loc);
if (rack == null) {
return null;
}
return new ArrayList<Node>(rack.getChildren());
} finally {
netlock.readLock().unlock();
}
}
/**
* Remove a node.
* Update node counter and rack counter if necessary.
*
* @param node node to be removed; can be null
*/
@Override
public void remove(Node node) {
if (node == null) {
return;
} else if (node instanceof InnerNode) {
throw new IllegalArgumentException("Not allow to remove an inner node: " + NodeBase.getPath(node));
}
LOG.info("Removing a node: " + NodeBase.getPath(node));
netlock.writeLock().lock();
try {
if (clusterMap.remove(node)) {
InnerNode rack = (InnerNode) getNode(node.getNetworkLocation());
if (rack == null) {
numOfRacks--;
}
}
if (LOG.isDebugEnabled()) {
LOG.debug("NetworkTopology became:\n" + this.toString());
}
} finally {
netlock.writeLock().unlock();
}
}
/**
* Check if the tree contains node <i>node</i>.
*
* @param node a node
* @return true if <i>node</i> is already in the tree; false otherwise
*/
@Override
public boolean contains(Node node) {
if (node == null) {
return false;
}
netlock.readLock().lock();
try {
Node parent = node.getParent();
for (int level = node.getLevel(); parent != null && level > 0; parent = parent.getParent(), level--) {
if (parent == clusterMap) {
return true;
}
}
} finally {
netlock.readLock().unlock();
}
return false;
}
/**
* Given a string representation of a node, return its reference.
*
* @param loc
* a path-like string representation of a node
* @return a reference to the node; null if the node is not in the tree
*/
@Override
public Node getNode(String loc) {
netlock.readLock().lock();
try {
loc = NodeBase.normalize(loc);
if (!NodeBase.ROOT.equals(loc)) {
loc = loc.substring(1);
}
return clusterMap.getLoc(loc);
} finally {
netlock.readLock().unlock();
}
}
/**
* Given a string representation of a rack for a specific network location.
*
* <p>To be overridden in subclasses for specific NetworkTopology
* implementations, as alternative to overriding the full
* {@link #getRack(String)} method.
* @param loc
* a path-like string representation of a network location
* @return a rack string
*/
public String getRack(String loc) {
return loc;
}
/** @return the total number of racks */
@Override
public int getNumOfRacks() {
netlock.readLock().lock();
try {
return numOfRacks;
} finally {
netlock.readLock().unlock();
}
}
/** @return the total number of leaf nodes */
public int getNumOfLeaves() {
netlock.readLock().lock();
try {
return clusterMap.getNumOfLeaves();
} finally {
netlock.readLock().unlock();
}
}
/**
* Return the distance between two nodes.
*
* <p>It is assumed that the distance from one node to its parent is 1
* The distance between two nodes is calculated by summing up their distances
* to their closest common ancestor.
* @param node1 one node
* @param node2 another node
* @return the distance between node1 and node2 which is zero if they are the same
* or {@link Integer#MAX_VALUE} if node1 or node2 do not belong to the cluster
*/
public int getDistance(Node node1, Node node2) {
if (node1 == node2) {
return 0;
}
Node n1 = node1, n2 = node2;
int dis = 0;
netlock.readLock().lock();
try {
int level1 = node1.getLevel(), level2 = node2.getLevel();
while (n1 != null && level1 > level2) {
n1 = n1.getParent();
level1--;
dis++;
}
while (n2 != null && level2 > level1) {
n2 = n2.getParent();
level2--;
dis++;
}
while (n1 != null && n2 != null && n1.getParent() != n2.getParent()) {
n1 = n1.getParent();
n2 = n2.getParent();
dis += 2;
}
} finally {
netlock.readLock().unlock();
}
if (n1 == null) {
LOG.warn("The cluster does not contain node: {}", NodeBase.getPath(node1));
return Integer.MAX_VALUE;
}
if (n2 == null) {
LOG.warn("The cluster does not contain node: {}", NodeBase.getPath(node2));
return Integer.MAX_VALUE;
}
return dis + 2;
}
/**
* Check if two nodes are on the same rack.
* @param node1 one node (can be null)
* @param node2 another node (can be null)
* @return true if node1 and node2 are on the same rack; false otherwise
* @exception IllegalArgumentException when either node1 or node2 is null, or
* node1 or node2 do not belong to the cluster
*/
public boolean isOnSameRack(Node node1, Node node2) {
if (node1 == null || node2 == null) {
return false;
}
netlock.readLock().lock();
try {
return isSameParents(node1, node2);
} finally {
netlock.readLock().unlock();
}
}
/**
* Check if network topology is aware of NodeGroup.
*/
public boolean isNodeGroupAware() {
return false;
}
/**
* Return false directly as not aware of NodeGroup, to be override in sub-class.
*/
public boolean isOnSameNodeGroup(Node node1, Node node2) {
return false;
}
/**
* Compare the parents of each node for equality.
*
* <p>To be overridden in subclasses for specific NetworkTopology
* implementations, as alternative to overriding the full
* {@link #isOnSameRack(Node, Node)} method.
*
* @param node1 the first node to compare
* @param node2 the second node to compare
* @return true if their parents are equal, false otherwise
*
* @see #isOnSameRack(Node, Node)
*/
protected boolean isSameParents(Node node1, Node node2) {
return node1.getParent() == node2.getParent();
}
private static final Random r = new Random();
/**
* Randomly choose one node from <i>scope</i>.
*
* <p>If scope starts with ~, choose one from the all nodes except for the
* ones in <i>scope</i>; otherwise, choose one from <i>scope</i>.
* @param scope range of nodes from which a node will be chosen
* @return the chosen node
*/
public Node chooseRandom(String scope) {
netlock.readLock().lock();
try {
if (scope.startsWith("~")) {
return chooseRandom(NodeBase.ROOT, scope.substring(1));
} else {
return chooseRandom(scope, null);
}
} finally {
netlock.readLock().unlock();
}
}
private Node chooseRandom(String scope, String excludedScope) {
if (excludedScope != null) {
if (scope.startsWith(excludedScope)) {
return null;
}
if (!excludedScope.startsWith(scope)) {
excludedScope = null;
}
}
Node node = getNode(scope);
if (!(node instanceof InnerNode)) {
return node;
}
InnerNode innerNode = (InnerNode) node;
int numOfDatanodes = innerNode.getNumOfLeaves();
if (excludedScope == null) {
node = null;
} else {
node = getNode(excludedScope);
if (!(node instanceof InnerNode)) {
numOfDatanodes -= 1;
} else {
numOfDatanodes -= ((InnerNode) node).getNumOfLeaves();
}
}
int leaveIndex = r.nextInt(numOfDatanodes);
return innerNode.getLeaf(leaveIndex, node);
}
/**
* Return leaves in <i>scope</i>.
* @param scope a path string
* @return leaves nodes under specific scope
*/
private Set<Node> doGetLeaves(String scope) {
Node node = getNode(scope);
Set<Node> leafNodes = new HashSet<Node>();
if (!(node instanceof InnerNode)) {
leafNodes.add(node);
} else {
InnerNode innerNode = (InnerNode) node;
for (int i = 0; i < innerNode.getNumOfLeaves(); i++) {
leafNodes.add(innerNode.getLeaf(i, null));
}
}
return leafNodes;
}
@Override
public Set<Node> getLeaves(String scope) {
netlock.readLock().lock();
try {
if (scope.startsWith("~")) {
Set<Node> allNodes = doGetLeaves(NodeBase.ROOT);
String[] excludeScopes = scope.substring(1).split(NODE_SEPARATOR);
Set<Node> excludeNodes = new HashSet<Node>();
Arrays.stream(excludeScopes).forEach((excludeScope) -> {
excludeNodes.addAll(doGetLeaves(excludeScope));
});
allNodes.removeAll(excludeNodes);
return allNodes;
} else {
return doGetLeaves(scope);
}
} finally {
netlock.readLock().unlock();
}
}
@Override
public int countNumOfAvailableNodes(String scope, Collection<Node> excludedNodes) {
boolean isExcluded = false;
if (scope.startsWith("~")) {
isExcluded = true;
scope = scope.substring(1);
}
scope = NodeBase.normalize(scope);
int count = 0; // the number of nodes in both scope & excludedNodes
netlock.readLock().lock();
try {
for (Node node : excludedNodes) {
if ((NodeBase.getPath(node) + NodeBase.PATH_SEPARATOR_STR).startsWith(scope
+ NodeBase.PATH_SEPARATOR_STR)) {
count++;
}
}
Node n = getNode(scope);
int scopeNodeCount = 0;
if (n instanceof InnerNode) {
scopeNodeCount = ((InnerNode) n).getNumOfLeaves();
}
if (isExcluded) {
return clusterMap.getNumOfLeaves() - scopeNodeCount - excludedNodes.size() + count;
} else {
return scopeNodeCount - count;
}
} finally {
netlock.readLock().unlock();
}
}
/**
* Convert a network tree to a string.
*/
@Override
public String toString() {
// print the number of racks
StringBuilder tree = new StringBuilder();
tree.append("Number of racks: ");
tree.append(numOfRacks);
tree.append("\n");
// print the number of leaves
int numOfLeaves = getNumOfLeaves();
tree.append("Expected number of leaves:");
tree.append(numOfLeaves);
tree.append("\n");
// print nodes
for (int i = 0; i < numOfLeaves; i++) {
tree.append(NodeBase.getPath(clusterMap.getLeaf(i, null)));
tree.append("\n");
}
return tree.toString();
}
/**
* Divide networklocation string into two parts by last separator, and get
* the first part here.
*
* @param networkLocation
* @return
*/
public static String getFirstHalf(String networkLocation) {
int index = networkLocation.lastIndexOf(NodeBase.PATH_SEPARATOR_STR);
return networkLocation.substring(0, index);
}
/**
* Divide networklocation string into two parts by last separator, and get
* the second part here.
*
* @param networkLocation
* @return
*/
public static String getLastHalf(String networkLocation) {
int index = networkLocation.lastIndexOf(NodeBase.PATH_SEPARATOR_STR);
return networkLocation.substring(index);
}
/**
* Swap two array items.
*/
protected static void swap(Node[] nodes, int i, int j) {
Node tempNode;
tempNode = nodes[j];
nodes[j] = nodes[i];
nodes[i] = tempNode;
}
/** Sort nodes array by their distances to <i>reader</i>
* It linearly scans the array, if a local node is found, swap it with
* the first element of the array.
* If a local rack node is found, swap it with the first element following
* the local node.
* If neither local node or local rack node is found, put a random replica
* location at position 0.
* It leaves the rest nodes untouched.
* @param reader the node that wishes to read a block from one of the nodes
* @param nodes the list of nodes containing data for the reader
*/
public void pseudoSortByDistance(Node reader, Node[] nodes) {
int tempIndex = 0;
int localRackNode = -1;
if (reader != null) {
//scan the array to find the local node & local rack node
for (int i = 0; i < nodes.length; i++) {
if (tempIndex == 0 && reader == nodes[i]) { //local node
//swap the local node and the node at position 0
if (i != 0) {
swap(nodes, tempIndex, i);
}
tempIndex = 1;
if (localRackNode != -1) {
if (localRackNode == 0) {
localRackNode = i;
}
break;
}
} else if (localRackNode == -1 && isOnSameRack(reader, nodes[i])) {
//local rack
localRackNode = i;
if (tempIndex != 0) {
break;
}
}
}
// swap the local rack node and the node at position tempIndex
if (localRackNode != -1 && localRackNode != tempIndex) {
swap(nodes, tempIndex, localRackNode);
tempIndex++;
}
}
// put a random node at position 0 if it is not a local/local-rack node
if (tempIndex == 0 && localRackNode == -1 && nodes.length != 0) {
swap(nodes, 0, r.nextInt(nodes.length));
}
}
}