blob: 4062598c2fbec9b90097f1437df9e6dd00106664 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hdfs.server.blockmanagement;
import java.util.ArrayList;
import java.util.Collection;
import java.util.EnumMap;
import java.util.EnumSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import com.google.common.base.Preconditions;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hdfs.AddBlockFlag;
import org.apache.hadoop.fs.StorageType;
import org.apache.hadoop.hdfs.protocol.BlockStoragePolicy;
import org.apache.hadoop.hdfs.DFSConfigKeys;
import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
import org.apache.hadoop.net.NetworkTopology;
import org.apache.hadoop.net.Node;
import org.apache.hadoop.util.ReflectionUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* This interface is used for choosing the desired number of targets
* for placing block replicas.
*/
@InterfaceAudience.Private
public abstract class BlockPlacementPolicy {
static final Logger LOG = LoggerFactory.getLogger(
BlockPlacementPolicy.class);
@InterfaceAudience.Private
public static class NotEnoughReplicasException extends Exception {
private static final long serialVersionUID = 1L;
NotEnoughReplicasException(String msg) {
super(msg);
}
}
/**
* choose <i>numOfReplicas</i> data nodes for <i>writer</i>
* to re-replicate a block with size <i>blocksize</i>
* If not, return as many as we can.
*
* @param srcPath the file to which this chooseTargets is being invoked.
* @param numOfReplicas additional number of replicas wanted.
* @param writer the writer's machine, null if not in the cluster.
* @param chosen datanodes that have been chosen as targets.
* @param returnChosenNodes decide if the chosenNodes are returned.
* @param excludedNodes datanodes that should not be considered as targets.
* @param blocksize size of the data to be written.
* @param flags Block placement flags.
* @return array of DatanodeDescriptor instances chosen as target
* and sorted as a pipeline.
*/
public abstract DatanodeStorageInfo[] chooseTarget(String srcPath,
int numOfReplicas,
Node writer,
List<DatanodeStorageInfo> chosen,
boolean returnChosenNodes,
Set<Node> excludedNodes,
long blocksize,
BlockStoragePolicy storagePolicy,
EnumSet<AddBlockFlag> flags);
/**
* Same as {@link #chooseTarget(String, int, Node, Set, long, List, StorageType)}
* with added parameter {@code favoredDatanodes}
* @param favoredNodes datanodes that should be favored as targets. This
* is only a hint and due to cluster state, namenode may not be
* able to place the blocks on these datanodes.
*/
DatanodeStorageInfo[] chooseTarget(String src,
int numOfReplicas, Node writer,
Set<Node> excludedNodes,
long blocksize,
List<DatanodeDescriptor> favoredNodes,
BlockStoragePolicy storagePolicy,
EnumSet<AddBlockFlag> flags) {
// This class does not provide the functionality of placing
// a block in favored datanodes. The implementations of this class
// are expected to provide this functionality
return chooseTarget(src, numOfReplicas, writer,
new ArrayList<DatanodeStorageInfo>(numOfReplicas), false,
excludedNodes, blocksize, storagePolicy, flags);
}
/**
* @param storageTypes storage types that should be used as targets.
*/
public DatanodeStorageInfo[] chooseTarget(String srcPath, int numOfReplicas,
Node writer, List<DatanodeStorageInfo> chosen, boolean returnChosenNodes,
Set<Node> excludedNodes, long blocksize, BlockStoragePolicy storagePolicy,
EnumSet<AddBlockFlag> flags, EnumMap<StorageType, Integer> storageTypes) {
return chooseTarget(srcPath, numOfReplicas, writer, chosen,
returnChosenNodes, excludedNodes, blocksize, storagePolicy, flags);
}
/**
* Verify if the block's placement meets requirement of placement policy,
* i.e. replicas are placed on no less than minRacks racks in the system.
*
* @param locs block with locations
* @param numOfReplicas replica number of file to be verified
* @return the result of verification
*/
abstract public BlockPlacementStatus verifyBlockPlacement(
DatanodeInfo[] locs, int numOfReplicas);
/**
* Select the excess replica storages for deletion based on either
* delNodehint/Excess storage types.
*
* @param candidates
* available replicas
* @param expectedNumOfReplicas
* The required number of replicas for this block
* @param excessTypes
* type of the storagepolicy
* @param addedNode
* New replica reported
* @param delNodeHint
* Hint for excess storage selection
* @return Returns the list of excess replicas chosen for deletion
*/
abstract public List<DatanodeStorageInfo> chooseReplicasToDelete(
Collection<DatanodeStorageInfo> candidates, int expectedNumOfReplicas,
List<StorageType> excessTypes, DatanodeDescriptor addedNode,
DatanodeDescriptor delNodeHint);
/**
* Used to setup a BlockPlacementPolicy object. This should be defined by
* all implementations of a BlockPlacementPolicy.
*
* @param conf the configuration object
* @param stats retrieve cluster status from here
* @param clusterMap cluster topology
*/
abstract protected void initialize(Configuration conf, FSClusterStats stats,
NetworkTopology clusterMap,
Host2NodesMap host2datanodeMap);
/**
* Get an instance of the configured Block Placement Policy based on the
* the configuration property
* {@link DFSConfigKeys#DFS_BLOCK_REPLICATOR_CLASSNAME_KEY}.
*
* @param conf the configuration to be used
* @param stats an object that is used to retrieve the load on the cluster
* @param clusterMap the network topology of the cluster
* @return an instance of BlockPlacementPolicy
*/
public static BlockPlacementPolicy getInstance(Configuration conf,
FSClusterStats stats,
NetworkTopology clusterMap,
Host2NodesMap host2datanodeMap) {
final Class<? extends BlockPlacementPolicy> replicatorClass = conf.getClass(
DFSConfigKeys.DFS_BLOCK_REPLICATOR_CLASSNAME_KEY,
DFSConfigKeys.DFS_BLOCK_REPLICATOR_CLASSNAME_DEFAULT,
BlockPlacementPolicy.class);
final BlockPlacementPolicy replicator = ReflectionUtils.newInstance(
replicatorClass, conf);
replicator.initialize(conf, stats, clusterMap, host2datanodeMap);
return replicator;
}
/**
* Check if the move is allowed. Used by balancer and other tools.
* @
*
* @param candidates all replicas including source and target
* @param source source replica of the move
* @param target target replica of the move
*/
abstract public boolean isMovable(Collection<DatanodeInfo> candidates,
DatanodeInfo source, DatanodeInfo target);
/**
* Adjust rackmap, moreThanOne, and exactlyOne after removing replica on cur.
*
* @param rackMap a map from rack to replica
* @param moreThanOne The List of replica nodes on rack which has more than
* one replica
* @param exactlyOne The List of replica nodes on rack with only one replica
* @param cur current replica to remove
*/
public void adjustSetsWithChosenReplica(
final Map<String, List<DatanodeStorageInfo>> rackMap,
final List<DatanodeStorageInfo> moreThanOne,
final List<DatanodeStorageInfo> exactlyOne,
final DatanodeStorageInfo cur) {
final String rack = getRack(cur.getDatanodeDescriptor());
final List<DatanodeStorageInfo> storages = rackMap.get(rack);
storages.remove(cur);
if (storages.isEmpty()) {
rackMap.remove(rack);
}
if (moreThanOne.remove(cur)) {
if (storages.size() == 1) {
final DatanodeStorageInfo remaining = storages.get(0);
moreThanOne.remove(remaining);
exactlyOne.add(remaining);
}
} else {
exactlyOne.remove(cur);
}
}
protected <T> DatanodeInfo getDatanodeInfo(T datanode) {
Preconditions.checkArgument(
datanode instanceof DatanodeInfo ||
datanode instanceof DatanodeStorageInfo,
"class " + datanode.getClass().getName() + " not allowed");
if (datanode instanceof DatanodeInfo) {
return ((DatanodeInfo)datanode);
} else if (datanode instanceof DatanodeStorageInfo) {
return ((DatanodeStorageInfo)datanode).getDatanodeDescriptor();
} else {
return null;
}
}
/**
* Get rack string from a data node
* @return rack of data node
*/
protected String getRack(final DatanodeInfo datanode) {
return datanode.getNetworkLocation();
}
/**
* Split data nodes into two sets, one set includes nodes on rack with
* more than one replica, the other set contains the remaining nodes.
*
* @param storagesOrDataNodes DatanodeStorageInfo/DatanodeInfo to be split
* into two sets
* @param rackMap a map from rack to datanodes
* @param moreThanOne contains nodes on rack with more than one replica
* @param exactlyOne remains contains the remaining nodes
*/
public <T> void splitNodesWithRack(
final Iterable<T> storagesOrDataNodes,
final Map<String, List<T>> rackMap,
final List<T> moreThanOne,
final List<T> exactlyOne) {
for(T s: storagesOrDataNodes) {
final String rackName = getRack(getDatanodeInfo(s));
List<T> storageList = rackMap.get(rackName);
if (storageList == null) {
storageList = new ArrayList<T>();
rackMap.put(rackName, storageList);
}
storageList.add(s);
}
// split nodes into two sets
for(List<T> storageList : rackMap.values()) {
if (storageList.size() == 1) {
// exactlyOne contains nodes on rack with only one replica
exactlyOne.add(storageList.get(0));
} else {
// moreThanOne contains nodes on rack with more than one replica
moreThanOne.addAll(storageList);
}
}
}
}