blob: 4c04b466789beedb931a7dcbb8421c47dab16fe5 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hdfs.server.namenode.sps;
import static org.apache.hadoop.util.Time.monotonicNow;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.EnumMap;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Set;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.StorageType;
import org.apache.hadoop.hdfs.DFSConfigKeys;
import org.apache.hadoop.hdfs.protocol.Block;
import org.apache.hadoop.hdfs.protocol.BlockStoragePolicy;
import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
import org.apache.hadoop.hdfs.protocol.DatanodeInfoWithStorage;
import org.apache.hadoop.hdfs.protocol.ErasureCodingPolicy;
import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
import org.apache.hadoop.hdfs.protocol.HdfsConstants.StoragePolicySatisfierMode;
import org.apache.hadoop.hdfs.protocol.HdfsFileStatus;
import org.apache.hadoop.hdfs.protocol.HdfsLocatedFileStatus;
import org.apache.hadoop.hdfs.protocol.LocatedBlock;
import org.apache.hadoop.hdfs.protocol.LocatedBlocks;
import org.apache.hadoop.hdfs.protocol.LocatedStripedBlock;
import org.apache.hadoop.hdfs.server.balancer.Matcher;
import org.apache.hadoop.hdfs.server.namenode.ErasureCodingPolicyManager;
import org.apache.hadoop.hdfs.server.protocol.BlockStorageMovementCommand.BlockMovingInfo;
import org.apache.hadoop.hdfs.util.StripedBlockUtil;
import org.apache.hadoop.util.Daemon;
import org.apache.hadoop.util.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
/**
* Setting storagePolicy on a file after the file write will only update the new
* storage policy type in Namespace, but physical block storage movement will
* not happen until user runs "Mover Tool" explicitly for such files. The
* StoragePolicySatisfier Daemon thread implemented for addressing the case
* where users may want to physically move the blocks by a dedicated daemon (can
* run inside Namenode or stand alone) instead of running mover tool explicitly.
* Just calling client API to satisfyStoragePolicy on a file/dir will
* automatically trigger to move its physical storage locations as expected in
* asynchronous manner. Here SPS will pick the file blocks which are expecting
* to change its storages, then it will build the mapping of source block
* location and expected storage type and location to move. After that this
* class will also prepare requests to send to Datanode for processing the
* physical block movements.
*/
@InterfaceAudience.Private
public class StoragePolicySatisfier implements SPSService, Runnable {
public static final Logger LOG =
LoggerFactory.getLogger(StoragePolicySatisfier.class);
private Daemon storagePolicySatisfierThread;
private BlockStorageMovementNeeded storageMovementNeeded;
private BlockStorageMovementAttemptedItems storageMovementsMonitor;
private volatile boolean isRunning = false;
private int spsWorkMultiplier;
private long blockCount = 0L;
private int blockMovementMaxRetry;
private Context ctxt;
private final Configuration conf;
private DatanodeCacheManager dnCacheMgr;
public StoragePolicySatisfier(Configuration conf) {
this.conf = conf;
}
/**
* Represents the collective analysis status for all blocks.
*/
private static class BlocksMovingAnalysis {
enum Status {
// Represents that, the analysis skipped due to some conditions. A such
// condition is if block collection is in incomplete state.
ANALYSIS_SKIPPED_FOR_RETRY,
// Represents that few or all blocks found respective target to do
// the storage movement.
BLOCKS_TARGETS_PAIRED,
// Represents that none of the blocks found respective target to do
// the storage movement.
NO_BLOCKS_TARGETS_PAIRED,
// Represents that, none of the blocks found for block storage movements.
BLOCKS_ALREADY_SATISFIED,
// Represents that, the analysis skipped due to some conditions.
// Example conditions are if no blocks really exists in block collection
// or
// if analysis is not required on ec files with unsuitable storage
// policies
BLOCKS_TARGET_PAIRING_SKIPPED,
// Represents that, All the reported blocks are satisfied the policy but
// some of the blocks are low redundant.
FEW_LOW_REDUNDANCY_BLOCKS,
// Represents that, movement failures due to unexpected errors.
BLOCKS_FAILED_TO_MOVE
}
private Status status = null;
private Map<Block, Set<StorageTypeNodePair>> assignedBlocks = null;
BlocksMovingAnalysis(Status status,
Map<Block, Set<StorageTypeNodePair>> assignedBlocks) {
this.status = status;
this.assignedBlocks = assignedBlocks;
}
}
public void init(final Context context) {
this.ctxt = context;
this.storageMovementNeeded = new BlockStorageMovementNeeded(context);
this.storageMovementsMonitor = new BlockStorageMovementAttemptedItems(
this, storageMovementNeeded, context);
this.spsWorkMultiplier = getSPSWorkMultiplier(getConf());
this.blockMovementMaxRetry = getConf().getInt(
DFSConfigKeys.DFS_STORAGE_POLICY_SATISFIER_MAX_RETRY_ATTEMPTS_KEY,
DFSConfigKeys.DFS_STORAGE_POLICY_SATISFIER_MAX_RETRY_ATTEMPTS_DEFAULT);
}
/**
* Start storage policy satisfier demon thread. Also start block storage
* movements monitor for retry the attempts if needed.
*/
@Override
public synchronized void start(StoragePolicySatisfierMode serviceMode) {
if (serviceMode == StoragePolicySatisfierMode.NONE) {
LOG.error("Can't start StoragePolicySatisfier for the given mode:{}",
serviceMode);
return;
}
LOG.info("Starting {} StoragePolicySatisfier.",
StringUtils.toLowerCase(serviceMode.toString()));
isRunning = true;
storagePolicySatisfierThread = new Daemon(this);
storagePolicySatisfierThread.setName("StoragePolicySatisfier");
storagePolicySatisfierThread.start();
this.storageMovementsMonitor.start();
this.storageMovementNeeded.activate();
dnCacheMgr = new DatanodeCacheManager(conf);
}
@Override
public synchronized void stop(boolean forceStop) {
isRunning = false;
if (storagePolicySatisfierThread == null) {
return;
}
storageMovementNeeded.close();
storagePolicySatisfierThread.interrupt();
this.storageMovementsMonitor.stop();
if (forceStop) {
storageMovementNeeded.clearQueuesWithNotification();
} else {
LOG.info("Stopping StoragePolicySatisfier.");
}
}
@Override
public synchronized void stopGracefully() {
if (isRunning) {
stop(false);
}
if (this.storageMovementsMonitor != null) {
this.storageMovementsMonitor.stopGracefully();
}
if (storagePolicySatisfierThread != null) {
try {
storagePolicySatisfierThread.join(3000);
} catch (InterruptedException ie) {
if (LOG.isDebugEnabled()) {
LOG.debug("Interrupted Exception while waiting to join sps thread,"
+ " ignoring it", ie);
}
}
}
}
@Override
public boolean isRunning() {
return isRunning;
}
@Override
public void run() {
while (isRunning) {
// Check if dependent service is running
if (!ctxt.isRunning()) {
if (LOG.isDebugEnabled()) {
LOG.debug("Upstream service is down, skipping the sps work.");
}
continue;
}
ItemInfo itemInfo = null;
try {
boolean retryItem = false;
if (!ctxt.isInSafeMode()) {
itemInfo = storageMovementNeeded.get();
if (itemInfo != null) {
if(itemInfo.getRetryCount() >= blockMovementMaxRetry){
LOG.info("Failed to satisfy the policy after "
+ blockMovementMaxRetry + " retries. Removing inode "
+ itemInfo.getFile() + " from the queue");
storageMovementNeeded.removeItemTrackInfo(itemInfo, false);
continue;
}
long trackId = itemInfo.getFile();
BlocksMovingAnalysis status = null;
BlockStoragePolicy existingStoragePolicy;
// TODO: presently, context internally acquire the lock
// and returns the result. Need to discuss to move the lock outside?
HdfsFileStatus fileStatus = ctxt.getFileInfo(trackId);
// Check path existence.
if (fileStatus == null || fileStatus.isDir()) {
// File doesn't exists (maybe got deleted) or its a directory,
// just remove trackId from the queue
storageMovementNeeded.removeItemTrackInfo(itemInfo, true);
} else {
byte existingStoragePolicyID = fileStatus.getStoragePolicy();
existingStoragePolicy = ctxt
.getStoragePolicy(existingStoragePolicyID);
HdfsLocatedFileStatus file = (HdfsLocatedFileStatus) fileStatus;
status = analyseBlocksStorageMovementsAndAssignToDN(file,
existingStoragePolicy);
switch (status.status) {
// Just add to monitor, so it will be retried after timeout
case ANALYSIS_SKIPPED_FOR_RETRY:
// Just add to monitor, so it will be tracked for report and
// be removed on storage movement attempt finished report.
case BLOCKS_TARGETS_PAIRED:
if (LOG.isDebugEnabled()) {
LOG.debug("Block analysis status:{} for the file id:{}."
+ " Adding to attempt monitor queue for the storage "
+ "movement attempt finished report",
status.status, fileStatus.getFileId());
}
this.storageMovementsMonitor.add(itemInfo.getStartPath(),
itemInfo.getFile(), monotonicNow(), status.assignedBlocks,
itemInfo.getRetryCount());
break;
case NO_BLOCKS_TARGETS_PAIRED:
if (LOG.isDebugEnabled()) {
LOG.debug("Adding trackID:{} for the file id:{} back to"
+ " retry queue as none of the blocks found its eligible"
+ " targets.", trackId, fileStatus.getFileId());
}
retryItem = true;
break;
case FEW_LOW_REDUNDANCY_BLOCKS:
if (LOG.isDebugEnabled()) {
LOG.debug("Adding trackID:{} for the file id:{} back to "
+ "retry queue as some of the blocks are low redundant.",
trackId, fileStatus.getFileId());
}
retryItem = true;
break;
case BLOCKS_FAILED_TO_MOVE:
if (LOG.isDebugEnabled()) {
LOG.debug("Adding trackID:{} for the file id:{} back to "
+ "retry queue as some of the blocks movement failed.",
trackId, fileStatus.getFileId());
}
retryItem = true;
break;
// Just clean Xattrs
case BLOCKS_TARGET_PAIRING_SKIPPED:
case BLOCKS_ALREADY_SATISFIED:
default:
LOG.info("Block analysis status:{} for the file id:{}."
+ " So, Cleaning up the Xattrs.", status.status,
fileStatus.getFileId());
storageMovementNeeded.removeItemTrackInfo(itemInfo, true);
break;
}
}
}
} else {
LOG.info("Namenode is in safemode. It will retry again.");
Thread.sleep(3000);
}
int numLiveDn = ctxt.getNumLiveDataNodes();
if (storageMovementNeeded.size() == 0
|| blockCount > (numLiveDn * spsWorkMultiplier)) {
Thread.sleep(3000);
blockCount = 0L;
}
if (retryItem) {
this.storageMovementNeeded.add(itemInfo);
}
} catch (IOException e) {
LOG.error("Exception during StoragePolicySatisfier execution - "
+ "will continue next cycle", e);
// Since it could not finish this item in previous iteration due to IOE,
// just try again.
this.storageMovementNeeded.add(itemInfo);
} catch (Throwable t) {
synchronized (this) {
if (isRunning) {
isRunning = false;
if (t instanceof InterruptedException) {
LOG.info("Stopping StoragePolicySatisfier.", t);
} else {
LOG.error("StoragePolicySatisfier thread received "
+ "runtime exception.", t);
}
// Stopping monitor thread and clearing queues as well
this.clearQueues();
this.storageMovementsMonitor.stopGracefully();
}
}
}
}
}
private BlocksMovingAnalysis analyseBlocksStorageMovementsAndAssignToDN(
HdfsLocatedFileStatus fileInfo,
BlockStoragePolicy existingStoragePolicy) throws IOException {
BlocksMovingAnalysis.Status status =
BlocksMovingAnalysis.Status.BLOCKS_ALREADY_SATISFIED;
final ErasureCodingPolicy ecPolicy = fileInfo.getErasureCodingPolicy();
final LocatedBlocks locatedBlocks = fileInfo.getLocatedBlocks();
final boolean lastBlkComplete = locatedBlocks.isLastBlockComplete();
if (!lastBlkComplete) {
// Postpone, currently file is under construction
LOG.info("File: {} is under construction. So, postpone"
+ " this to the next retry iteration", fileInfo.getFileId());
return new BlocksMovingAnalysis(
BlocksMovingAnalysis.Status.ANALYSIS_SKIPPED_FOR_RETRY,
new HashMap<>());
}
List<LocatedBlock> blocks = locatedBlocks.getLocatedBlocks();
if (blocks.size() == 0) {
LOG.info("File: {} is not having any blocks."
+ " So, skipping the analysis.", fileInfo.getFileId());
return new BlocksMovingAnalysis(
BlocksMovingAnalysis.Status.BLOCKS_TARGET_PAIRING_SKIPPED,
new HashMap<>());
}
List<BlockMovingInfo> blockMovingInfos = new ArrayList<BlockMovingInfo>();
boolean hasLowRedundancyBlocks = false;
int replication = fileInfo.getReplication();
DatanodeMap liveDns = dnCacheMgr.getLiveDatanodeStorageReport(ctxt);
for (int i = 0; i < blocks.size(); i++) {
LocatedBlock blockInfo = blocks.get(i);
// Block is considered as low redundancy when the block locations array
// length is less than expected replication factor. If any of the block is
// low redundant, then hasLowRedundancyBlocks will be marked as true.
hasLowRedundancyBlocks |= isLowRedundancyBlock(blockInfo, replication,
ecPolicy);
List<StorageType> expectedStorageTypes;
if (blockInfo.isStriped()) {
if (ErasureCodingPolicyManager
.checkStoragePolicySuitableForECStripedMode(
existingStoragePolicy.getId())) {
expectedStorageTypes = existingStoragePolicy
.chooseStorageTypes((short) blockInfo.getLocations().length);
} else {
// Currently we support only limited policies (HOT, COLD, ALLSSD)
// for EC striped mode files. SPS will ignore to move the blocks if
// the storage policy is not in EC Striped mode supported policies
LOG.warn("The storage policy " + existingStoragePolicy.getName()
+ " is not suitable for Striped EC files. "
+ "So, ignoring to move the blocks");
return new BlocksMovingAnalysis(
BlocksMovingAnalysis.Status.BLOCKS_TARGET_PAIRING_SKIPPED,
new HashMap<>());
}
} else {
expectedStorageTypes = existingStoragePolicy
.chooseStorageTypes(fileInfo.getReplication());
}
List<StorageType> existing = new LinkedList<StorageType>(
Arrays.asList(blockInfo.getStorageTypes()));
if (!removeOverlapBetweenStorageTypes(expectedStorageTypes,
existing, true)) {
boolean blocksPaired = computeBlockMovingInfos(blockMovingInfos,
blockInfo, expectedStorageTypes, existing, blockInfo.getLocations(),
liveDns, ecPolicy);
if (blocksPaired) {
status = BlocksMovingAnalysis.Status.BLOCKS_TARGETS_PAIRED;
} else if (status !=
BlocksMovingAnalysis.Status.BLOCKS_TARGETS_PAIRED) {
// Check if the previous block was successfully paired. Here the
// status will set to NO_BLOCKS_TARGETS_PAIRED only when none of the
// blocks of a file found its eligible targets to satisfy the storage
// policy.
status = BlocksMovingAnalysis.Status.NO_BLOCKS_TARGETS_PAIRED;
}
}
}
// If there is no block paired and few blocks are low redundant, so marking
// the status as FEW_LOW_REDUNDANCY_BLOCKS.
if (hasLowRedundancyBlocks
&& status != BlocksMovingAnalysis.Status.BLOCKS_TARGETS_PAIRED) {
status = BlocksMovingAnalysis.Status.FEW_LOW_REDUNDANCY_BLOCKS;
}
Map<Block, Set<StorageTypeNodePair>> assignedBlocks = new HashMap<>();
for (BlockMovingInfo blkMovingInfo : blockMovingInfos) {
// Check for at least one block storage movement has been chosen
try {
ctxt.submitMoveTask(blkMovingInfo);
LOG.debug("BlockMovingInfo: {}", blkMovingInfo);
StorageTypeNodePair nodeStorage = new StorageTypeNodePair(
blkMovingInfo.getTargetStorageType(), blkMovingInfo.getTarget());
Set<StorageTypeNodePair> nodesWithStorage = assignedBlocks
.get(blkMovingInfo.getBlock());
if (nodesWithStorage == null) {
nodesWithStorage = new HashSet<>();
assignedBlocks.put(blkMovingInfo.getBlock(), nodesWithStorage);
}
nodesWithStorage.add(nodeStorage);
blockCount++;
} catch (IOException e) {
LOG.warn("Exception while scheduling movement task", e);
// failed to move the block.
status = BlocksMovingAnalysis.Status.BLOCKS_FAILED_TO_MOVE;
}
}
return new BlocksMovingAnalysis(status, assignedBlocks);
}
/**
* The given block is considered as low redundancy when the block locations
* length is less than expected replication factor. For EC blocks, redundancy
* is the summation of data + parity blocks.
*
* @param blockInfo
* block
* @param replication
* replication factor of the given file block
* @param ecPolicy
* erasure coding policy of the given file block
* @return true if the given block is low redundant.
*/
private boolean isLowRedundancyBlock(LocatedBlock blockInfo, int replication,
ErasureCodingPolicy ecPolicy) {
boolean hasLowRedundancyBlock = false;
if (blockInfo.isStriped()) {
// For EC blocks, redundancy is the summation of data + parity blocks.
replication = ecPolicy.getNumDataUnits() + ecPolicy.getNumParityUnits();
}
// block is considered as low redundancy when the block locations length is
// less than expected replication factor.
hasLowRedundancyBlock = blockInfo.getLocations().length < replication ? true
: false;
return hasLowRedundancyBlock;
}
/**
* Compute the list of block moving information corresponding to the given
* blockId. This will check that each block location of the given block is
* satisfying the expected storage policy. If block location is not satisfied
* the policy then find out the target node with the expected storage type to
* satisfy the storage policy.
*
* @param blockMovingInfos
* - list of block source and target node pair
* @param blockInfo
* - block details
* @param expectedStorageTypes
* - list of expected storage type to satisfy the storage policy
* @param existing
* - list to get existing storage types
* @param storages
* - available storages
* @param liveDns
* - live datanodes which can be used as targets
* @param ecPolicy
* - ec policy of sps invoked file
* @return false if some of the block locations failed to find target node to
* satisfy the storage policy, true otherwise
*/
private boolean computeBlockMovingInfos(
List<BlockMovingInfo> blockMovingInfos, LocatedBlock blockInfo,
List<StorageType> expectedStorageTypes, List<StorageType> existing,
DatanodeInfo[] storages, DatanodeMap liveDns,
ErasureCodingPolicy ecPolicy) {
boolean foundMatchingTargetNodesForBlock = true;
if (!removeOverlapBetweenStorageTypes(expectedStorageTypes,
existing, true)) {
List<StorageTypeNodePair> sourceWithStorageMap =
new ArrayList<StorageTypeNodePair>();
List<DatanodeInfo> existingBlockStorages = new ArrayList<DatanodeInfo>(
Arrays.asList(storages));
// Add existing storages into exclude nodes to avoid choosing this as
// remote target later.
List<DatanodeInfo> excludeNodes = new ArrayList<>(existingBlockStorages);
// if expected type exists in source node already, local movement would be
// possible, so lets find such sources first.
Iterator<DatanodeInfo> iterator = existingBlockStorages.iterator();
while (iterator.hasNext()) {
DatanodeInfoWithStorage dnInfo = (DatanodeInfoWithStorage) iterator
.next();
if (checkSourceAndTargetTypeExists(dnInfo, existing,
expectedStorageTypes, liveDns)) {
sourceWithStorageMap
.add(new StorageTypeNodePair(dnInfo.getStorageType(), dnInfo));
iterator.remove();
existing.remove(dnInfo.getStorageType());
}
}
// Let's find sources for existing types left.
for (StorageType existingType : existing) {
iterator = existingBlockStorages.iterator();
while (iterator.hasNext()) {
DatanodeInfoWithStorage dnStorageInfo =
(DatanodeInfoWithStorage) iterator.next();
StorageType storageType = dnStorageInfo.getStorageType();
if (storageType == existingType) {
iterator.remove();
sourceWithStorageMap.add(new StorageTypeNodePair(storageType,
dnStorageInfo));
break;
}
}
}
EnumMap<StorageType, List<DatanodeWithStorage.StorageDetails>> targetDns =
findTargetsForExpectedStorageTypes(expectedStorageTypes, liveDns);
foundMatchingTargetNodesForBlock |= findSourceAndTargetToMove(
blockMovingInfos, blockInfo, sourceWithStorageMap,
expectedStorageTypes, targetDns,
ecPolicy, excludeNodes);
}
return foundMatchingTargetNodesForBlock;
}
/**
* Find the good target node for each source node for which block storages was
* misplaced.
*
* @param blockMovingInfos
* - list of block source and target node pair
* @param blockInfo
* - Block
* @param sourceWithStorageList
* - Source Datanode with storages list
* @param expectedTypes
* - Expecting storages to move
* @param targetDns
* - Available DNs for expected storage types
* @param ecPolicy
* - erasure coding policy of sps invoked file
* @param excludeNodes
* - existing source nodes, which has replica copy
* @return false if some of the block locations failed to find target node to
* satisfy the storage policy
*/
private boolean findSourceAndTargetToMove(
List<BlockMovingInfo> blockMovingInfos, LocatedBlock blockInfo,
List<StorageTypeNodePair> sourceWithStorageList,
List<StorageType> expectedTypes,
EnumMap<StorageType, List<DatanodeWithStorage.StorageDetails>> targetDns,
ErasureCodingPolicy ecPolicy, List<DatanodeInfo> excludeNodes) {
boolean foundMatchingTargetNodesForBlock = true;
// Looping over all the source node locations and choose the target
// storage within same node if possible. This is done separately to
// avoid choosing a target which already has this block.
for (int i = 0; i < sourceWithStorageList.size(); i++) {
StorageTypeNodePair existingTypeNodePair = sourceWithStorageList.get(i);
// Check whether the block replica is already placed in the expected
// storage type in this source datanode.
if (!expectedTypes.contains(existingTypeNodePair.storageType)) {
StorageTypeNodePair chosenTarget = chooseTargetTypeInSameNode(blockInfo,
existingTypeNodePair.dn, targetDns, expectedTypes);
if (chosenTarget != null) {
if (blockInfo.isStriped()) {
buildStripedBlockMovingInfos(blockInfo, existingTypeNodePair.dn,
existingTypeNodePair.storageType, chosenTarget.dn,
chosenTarget.storageType, blockMovingInfos,
ecPolicy);
} else {
buildContinuousBlockMovingInfos(blockInfo, existingTypeNodePair.dn,
existingTypeNodePair.storageType, chosenTarget.dn,
chosenTarget.storageType, blockMovingInfos);
}
expectedTypes.remove(chosenTarget.storageType);
}
}
}
// If all the sources and targets are paired within same node, then simply
// return.
if (expectedTypes.size() <= 0) {
return foundMatchingTargetNodesForBlock;
}
// Looping over all the source node locations. Choose a remote target
// storage node if it was not found out within same node.
for (int i = 0; i < sourceWithStorageList.size(); i++) {
StorageTypeNodePair existingTypeNodePair = sourceWithStorageList.get(i);
StorageTypeNodePair chosenTarget = null;
// Chosen the target storage within same datanode. So just skipping this
// source node.
if (checkIfAlreadyChosen(blockMovingInfos, existingTypeNodePair.dn)) {
continue;
}
if (chosenTarget == null && dnCacheMgr.getCluster().isNodeGroupAware()) {
chosenTarget = chooseTarget(blockInfo, existingTypeNodePair.dn,
expectedTypes, Matcher.SAME_NODE_GROUP, targetDns,
excludeNodes);
}
// Then, match nodes on the same rack
if (chosenTarget == null) {
chosenTarget =
chooseTarget(blockInfo, existingTypeNodePair.dn, expectedTypes,
Matcher.SAME_RACK, targetDns, excludeNodes);
}
if (chosenTarget == null) {
chosenTarget =
chooseTarget(blockInfo, existingTypeNodePair.dn, expectedTypes,
Matcher.ANY_OTHER, targetDns, excludeNodes);
}
if (null != chosenTarget) {
if (blockInfo.isStriped()) {
buildStripedBlockMovingInfos(blockInfo, existingTypeNodePair.dn,
existingTypeNodePair.storageType, chosenTarget.dn,
chosenTarget.storageType, blockMovingInfos, ecPolicy);
} else {
buildContinuousBlockMovingInfos(blockInfo, existingTypeNodePair.dn,
existingTypeNodePair.storageType, chosenTarget.dn,
chosenTarget.storageType, blockMovingInfos);
}
expectedTypes.remove(chosenTarget.storageType);
excludeNodes.add(chosenTarget.dn);
} else {
LOG.warn(
"Failed to choose target datanode for the required"
+ " storage types {}, block:{}, existing storage type:{}",
expectedTypes, blockInfo, existingTypeNodePair.storageType);
}
}
if (expectedTypes.size() > 0) {
foundMatchingTargetNodesForBlock = false;
}
return foundMatchingTargetNodesForBlock;
}
private boolean checkIfAlreadyChosen(List<BlockMovingInfo> blockMovingInfos,
DatanodeInfo dn) {
for (BlockMovingInfo blockMovingInfo : blockMovingInfos) {
if (blockMovingInfo.getSource().equals(dn)) {
return true;
}
}
return false;
}
private void buildContinuousBlockMovingInfos(LocatedBlock blockInfo,
DatanodeInfo sourceNode, StorageType sourceStorageType,
DatanodeInfo targetNode, StorageType targetStorageType,
List<BlockMovingInfo> blkMovingInfos) {
Block blk = ExtendedBlock.getLocalBlock(blockInfo.getBlock());
BlockMovingInfo blkMovingInfo = new BlockMovingInfo(blk, sourceNode,
targetNode, sourceStorageType, targetStorageType);
blkMovingInfos.add(blkMovingInfo);
}
private void buildStripedBlockMovingInfos(LocatedBlock blockInfo,
DatanodeInfo sourceNode, StorageType sourceStorageType,
DatanodeInfo targetNode, StorageType targetStorageType,
List<BlockMovingInfo> blkMovingInfos, ErasureCodingPolicy ecPolicy) {
// For a striped block, it needs to construct internal block at the given
// index of a block group. Here it is iterating over all the block indices
// and construct internal blocks which can be then considered for block
// movement.
LocatedStripedBlock sBlockInfo = (LocatedStripedBlock) blockInfo;
byte[] indices = sBlockInfo.getBlockIndices();
DatanodeInfo[] locations = sBlockInfo.getLocations();
for (int i = 0; i < indices.length; i++) {
byte blkIndex = indices[i];
if (blkIndex >= 0) {
// pick block movement only for the given source node.
if (sourceNode.equals(locations[i])) {
// construct internal block
ExtendedBlock extBlock = sBlockInfo.getBlock();
long numBytes = StripedBlockUtil.getInternalBlockLength(
extBlock.getNumBytes(), ecPolicy, blkIndex);
Block blk = new Block(ExtendedBlock.getLocalBlock(extBlock));
long blkId = blk.getBlockId() + blkIndex;
blk.setBlockId(blkId);
blk.setNumBytes(numBytes);
BlockMovingInfo blkMovingInfo = new BlockMovingInfo(blk, sourceNode,
targetNode, sourceStorageType, targetStorageType);
blkMovingInfos.add(blkMovingInfo);
}
}
}
}
/**
* Choose the target storage within same datanode if possible.
*
* @param blockInfo
* - block info
* @param source
* - source datanode
* @param targetDns
* - set of target datanodes with its respective storage type
* @param targetTypes
* - list of target storage types
*/
private StorageTypeNodePair chooseTargetTypeInSameNode(LocatedBlock blockInfo,
DatanodeInfo source,
EnumMap<StorageType, List<DatanodeWithStorage.StorageDetails>> targetDns,
List<StorageType> targetTypes) {
for (StorageType t : targetTypes) {
List<DatanodeWithStorage.StorageDetails> targetNodeStorages =
targetDns.get(t);
if (targetNodeStorages == null) {
continue;
}
for (DatanodeWithStorage.StorageDetails targetNode : targetNodeStorages) {
if (targetNode.getDatanodeInfo().equals(source)) {
// Good target with enough space to write the given block size.
if (targetNode.hasSpaceForScheduling(blockInfo.getBlockSize())) {
targetNode.incScheduledSize(blockInfo.getBlockSize());
return new StorageTypeNodePair(t, source);
}
if (LOG.isDebugEnabled()) {
LOG.debug("Datanode:{} storage type:{} doesn't have sufficient "
+ "space:{} to move the target block size:{}",
source, t, targetNode, blockInfo.getBlockSize());
}
}
}
}
return null;
}
private StorageTypeNodePair chooseTarget(LocatedBlock block,
DatanodeInfo source, List<StorageType> targetTypes, Matcher matcher,
EnumMap<StorageType, List<DatanodeWithStorage.StorageDetails>>
locsForExpectedStorageTypes, List<DatanodeInfo> excludeNodes) {
for (StorageType t : targetTypes) {
List<DatanodeWithStorage.StorageDetails> nodesWithStorages =
locsForExpectedStorageTypes.get(t);
if (nodesWithStorages == null || nodesWithStorages.isEmpty()) {
continue; // no target nodes with the required storage type.
}
Collections.shuffle(nodesWithStorages);
for (DatanodeWithStorage.StorageDetails targetNode : nodesWithStorages) {
DatanodeInfo target = targetNode.getDatanodeInfo();
if (!excludeNodes.contains(target)
&& matcher.match(dnCacheMgr.getCluster(), source, target)) {
// Good target with enough space to write the given block size.
if (targetNode.hasSpaceForScheduling(block.getBlockSize())) {
targetNode.incScheduledSize(block.getBlockSize());
return new StorageTypeNodePair(t, target);
}
if (LOG.isDebugEnabled()) {
LOG.debug("Datanode:{} storage type:{} doesn't have sufficient "
+ "space:{} to move the target block size:{}",
target, t, targetNode, block.getBlockSize());
}
}
}
}
return null;
}
/**
* Keeps datanode with its respective storage type.
*/
static final class StorageTypeNodePair {
private final StorageType storageType;
private final DatanodeInfo dn;
StorageTypeNodePair(StorageType storageType, DatanodeInfo dn) {
this.storageType = storageType;
this.dn = dn;
}
public DatanodeInfo getDatanodeInfo() {
return dn;
}
public StorageType getStorageType() {
return storageType;
}
@Override
public String toString() {
return new StringBuilder().append("StorageTypeNodePair(\n ")
.append("DatanodeInfo: ").append(dn).append(", StorageType: ")
.append(storageType).toString();
}
}
private EnumMap<StorageType, List<DatanodeWithStorage.StorageDetails>>
findTargetsForExpectedStorageTypes(List<StorageType> expected,
DatanodeMap liveDns) {
EnumMap<StorageType, List<DatanodeWithStorage.StorageDetails>> targetsMap =
new EnumMap<StorageType, List<DatanodeWithStorage.StorageDetails>>(
StorageType.class);
for (StorageType storageType : expected) {
List<DatanodeWithStorage> nodes = liveDns.getTarget(storageType);
if (nodes == null) {
return targetsMap;
}
List<DatanodeWithStorage.StorageDetails> listNodes = targetsMap
.get(storageType);
if (listNodes == null) {
listNodes = new ArrayList<>();
targetsMap.put(storageType, listNodes);
}
for (DatanodeWithStorage n : nodes) {
final DatanodeWithStorage.StorageDetails node = getMaxRemaining(n,
storageType);
if (node != null) {
listNodes.add(node);
}
}
}
return targetsMap;
}
private static DatanodeWithStorage.StorageDetails getMaxRemaining(
DatanodeWithStorage node, StorageType storageType) {
long max = 0L;
DatanodeWithStorage.StorageDetails nodeInfo = null;
List<DatanodeWithStorage.StorageDetails> storages = node
.getNodesWithStorages(storageType);
for (DatanodeWithStorage.StorageDetails n : storages) {
if (n.availableSizeToMove() > max) {
max = n.availableSizeToMove();
nodeInfo = n;
}
}
return nodeInfo;
}
private boolean checkSourceAndTargetTypeExists(DatanodeInfo dn,
List<StorageType> existingStorageTypes,
List<StorageType> expectedStorageTypes, DatanodeMap liveDns) {
boolean isExpectedTypeAvailable = false;
boolean isExistingTypeAvailable = false;
for (DatanodeWithStorage liveDn : liveDns.getTargets()) {
if (dn.equals(liveDn.datanode)) {
for (StorageType eachType : liveDn.getStorageTypes()) {
if (existingStorageTypes.contains(eachType)) {
isExistingTypeAvailable = true;
}
if (expectedStorageTypes.contains(eachType)) {
isExpectedTypeAvailable = true;
}
if (isExistingTypeAvailable && isExpectedTypeAvailable) {
return true;
}
}
}
}
return isExistingTypeAvailable && isExpectedTypeAvailable;
}
/**
* Maintains storage type map with the available datanodes in the cluster.
*/
public static class DatanodeMap {
private final EnumMap<StorageType, List<DatanodeWithStorage>> targetsMap =
new EnumMap<StorageType, List<DatanodeWithStorage>>(StorageType.class);
private List<DatanodeWithStorage> targets = new ArrayList<>();
/**
* Build datanode map with the available storage types.
*
* @param node
* datanode
* @param storageTypes
* list of available storage types in the given datanode
* @param maxSize2Move
* available space which can be used for scheduling block move
*/
void addTarget(DatanodeInfo node, List<StorageType> storageTypes,
List<Long> maxSize2Move) {
DatanodeWithStorage nodeStorage = new DatanodeWithStorage(node);
targets.add(nodeStorage);
for (int i = 0; i < storageTypes.size(); i++) {
StorageType type = storageTypes.get(i);
List<DatanodeWithStorage> nodeStorages = targetsMap.get(type);
nodeStorage.addStorageType(type, maxSize2Move.get(i));
if (nodeStorages == null) {
nodeStorages = new LinkedList<>();
targetsMap.put(type, nodeStorages);
}
nodeStorages.add(nodeStorage);
}
}
List<DatanodeWithStorage> getTarget(StorageType storageType) {
return targetsMap.get(storageType);
}
public List<DatanodeWithStorage> getTargets() {
return targets;
}
void reset() {
targetsMap.clear();
}
}
/**
* Keeps datanode with its respective set of supported storage types. It holds
* the available space in each volumes and will be used while pairing the
* target datanodes.
*/
public static final class DatanodeWithStorage {
private final EnumMap<StorageType, List<StorageDetails>> storageMap =
new EnumMap<StorageType, List<StorageDetails>>(StorageType.class);
private final DatanodeInfo datanode;
private DatanodeWithStorage(DatanodeInfo datanode) {
this.datanode = datanode;
}
public DatanodeInfo getDatanodeInfo() {
return datanode;
}
Set<StorageType> getStorageTypes() {
return storageMap.keySet();
}
private void addStorageType(StorageType t, long maxSize2Move) {
List<StorageDetails> nodesWithStorages = getNodesWithStorages(t);
if (nodesWithStorages == null) {
nodesWithStorages = new LinkedList<StorageDetails>();
storageMap.put(t, nodesWithStorages);
}
nodesWithStorages.add(new StorageDetails(maxSize2Move));
}
/**
* Returns datanode storages which has the given storage type.
*
* @param type
* - storage type
* @return datanodes for the given storage type
*/
private List<StorageDetails> getNodesWithStorages(StorageType type) {
return storageMap.get(type);
}
@Override
public String toString() {
return new StringBuilder().append("DatanodeWithStorageInfo(\n ")
.append("Datanode: ").append(datanode).append(" StorageTypeNodeMap: ")
.append(storageMap).append(")").toString();
}
/** Storage details in a datanode storage type. */
final class StorageDetails {
private final long maxSize2Move;
private long scheduledSize = 0L;
private StorageDetails(long maxSize2Move) {
this.maxSize2Move = maxSize2Move;
}
private DatanodeInfo getDatanodeInfo() {
return DatanodeWithStorage.this.datanode;
}
/**
* Checks whether this datanode storage has sufficient space to occupy the
* given block size.
*/
private synchronized boolean hasSpaceForScheduling(long size) {
return availableSizeToMove() > size;
}
/**
* @return the total number of bytes that need to be moved.
*/
private synchronized long availableSizeToMove() {
return maxSize2Move - scheduledSize;
}
/** Increment scheduled size. */
private synchronized void incScheduledSize(long size) {
scheduledSize += size;
}
@Override
public String toString() {
return new StringBuilder().append("StorageDetails(\n ")
.append("maxSize2Move: ").append(maxSize2Move)
.append(" scheduledSize: ").append(scheduledSize).append(")")
.toString();
}
}
}
/**
* Receives storage movement attempt finished block report.
*
* @param dnInfo
* reported datanode
* @param storageType
* - storage type
* @param block
* movement attempt finished block.
*/
@Override
public void notifyStorageMovementAttemptFinishedBlk(DatanodeInfo dnInfo,
StorageType storageType, Block block) {
storageMovementsMonitor.notifyReportedBlock(dnInfo, storageType, block);
}
@VisibleForTesting
public BlockStorageMovementAttemptedItems getAttemptedItemsMonitor() {
return storageMovementsMonitor;
}
/**
* Clear the queues from to be storage movement needed lists and items tracked
* in storage movement monitor.
*/
public void clearQueues() {
LOG.warn("Clearing all the queues from StoragePolicySatisfier. So, "
+ "user requests on satisfying block storages would be discarded.");
storageMovementNeeded.clearAll();
}
/**
* This class contains information of an attempted blocks and its last
* attempted or reported time stamp. This is used by
* {@link BlockStorageMovementAttemptedItems#storageMovementAttemptedItems}.
*/
final static class AttemptedItemInfo extends ItemInfo {
private long lastAttemptedOrReportedTime;
private final Set<Block> blocks;
/**
* AttemptedItemInfo constructor.
*
* @param rootId
* rootId for trackId
* @param trackId
* trackId for file.
* @param lastAttemptedOrReportedTime
* last attempted or reported time
* @param blocks
* scheduled blocks
* @param retryCount
* file retry count
*/
AttemptedItemInfo(long rootId, long trackId,
long lastAttemptedOrReportedTime,
Set<Block> blocks, int retryCount) {
super(rootId, trackId, retryCount);
this.lastAttemptedOrReportedTime = lastAttemptedOrReportedTime;
this.blocks = blocks;
}
/**
* @return last attempted or reported time stamp.
*/
long getLastAttemptedOrReportedTime() {
return lastAttemptedOrReportedTime;
}
/**
* Update lastAttemptedOrReportedTime, so that the expiration time will be
* postponed to future.
*/
void touchLastReportedTimeStamp() {
this.lastAttemptedOrReportedTime = monotonicNow();
}
Set<Block> getBlocks() {
return this.blocks;
}
}
@Override
public void addFileToProcess(ItemInfo trackInfo, boolean scanCompleted) {
storageMovementNeeded.add(trackInfo, scanCompleted);
if (LOG.isDebugEnabled()) {
LOG.debug("Added track info for inode {} to block "
+ "storageMovementNeeded queue", trackInfo.getFile());
}
}
@Override
public void addAllFilesToProcess(long startPath, List<ItemInfo> itemInfoList,
boolean scanCompleted) {
getStorageMovementQueue().addAll(startPath, itemInfoList, scanCompleted);
}
@Override
public int processingQueueSize() {
return storageMovementNeeded.size();
}
@Override
public Configuration getConf() {
return conf;
}
@VisibleForTesting
public BlockStorageMovementNeeded getStorageMovementQueue() {
return storageMovementNeeded;
}
@Override
public void markScanCompletedForPath(long inodeId) {
getStorageMovementQueue().markScanCompletedForDir(inodeId);
}
/**
* Join main SPS thread.
*/
public void join() throws InterruptedException {
storagePolicySatisfierThread.join();
}
/**
* Remove the overlap between the expected types and the existing types.
*
* @param expected
* - Expected storage types list.
* @param existing
* - Existing storage types list.
* @param ignoreNonMovable
* ignore non-movable storage types by removing them from both
* expected and existing storage type list to prevent non-movable
* storage from being moved.
* @returns if the existing types or the expected types is empty after
* removing the overlap.
*/
private static boolean removeOverlapBetweenStorageTypes(
List<StorageType> expected,
List<StorageType> existing, boolean ignoreNonMovable) {
for (Iterator<StorageType> i = existing.iterator(); i.hasNext();) {
final StorageType t = i.next();
if (expected.remove(t)) {
i.remove();
}
}
if (ignoreNonMovable) {
removeNonMovable(existing);
removeNonMovable(expected);
}
return expected.isEmpty() || existing.isEmpty();
}
private static void removeNonMovable(List<StorageType> types) {
for (Iterator<StorageType> i = types.iterator(); i.hasNext();) {
final StorageType t = i.next();
if (!t.isMovable()) {
i.remove();
}
}
}
/**
* Get DFS_SPS_WORK_MULTIPLIER_PER_ITERATION from
* configuration.
*
* @param conf Configuration
* @return Value of DFS_SPS_WORK_MULTIPLIER_PER_ITERATION
*/
private static int getSPSWorkMultiplier(Configuration conf) {
int spsWorkMultiplier = conf
.getInt(
DFSConfigKeys.DFS_SPS_WORK_MULTIPLIER_PER_ITERATION,
DFSConfigKeys.DFS_SPS_WORK_MULTIPLIER_PER_ITERATION_DEFAULT);
Preconditions.checkArgument(
(spsWorkMultiplier > 0),
DFSConfigKeys.DFS_SPS_WORK_MULTIPLIER_PER_ITERATION +
" = '" + spsWorkMultiplier + "' is invalid. " +
"It should be a positive, non-zero integer value.");
return spsWorkMultiplier;
}
}