| /** |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| package org.apache.hadoop.hdfs.server.balancer; |
| |
| import static org.apache.hadoop.hdfs.util.StripedBlockUtil.getInternalBlockLength; |
| import static org.apache.hadoop.hdfs.protocolPB.PBHelperClient.vintPrefixed; |
| |
| import java.io.BufferedInputStream; |
| import java.io.BufferedOutputStream; |
| import java.io.DataInputStream; |
| import java.io.DataOutputStream; |
| import java.io.IOException; |
| import java.io.InputStream; |
| import java.io.OutputStream; |
| import java.net.Socket; |
| import java.util.ArrayList; |
| import java.util.Arrays; |
| import java.util.Collection; |
| import java.util.Collections; |
| import java.util.EnumMap; |
| import java.util.HashMap; |
| import java.util.HashSet; |
| import java.util.Iterator; |
| import java.util.List; |
| import java.util.Map; |
| import java.util.Map.Entry; |
| import java.util.Set; |
| import java.util.concurrent.ExecutionException; |
| import java.util.concurrent.ExecutorService; |
| import java.util.concurrent.Executors; |
| import java.util.concurrent.Future; |
| import java.util.concurrent.ThreadPoolExecutor; |
| |
| import org.slf4j.Logger; |
| import org.slf4j.LoggerFactory; |
| import org.apache.hadoop.classification.InterfaceAudience; |
| import org.apache.hadoop.conf.Configuration; |
| import org.apache.hadoop.fs.StorageType; |
| import org.apache.hadoop.hdfs.DFSConfigKeys; |
| import org.apache.hadoop.hdfs.DFSUtilClient; |
| import org.apache.hadoop.hdfs.DistributedFileSystem; |
| import org.apache.hadoop.hdfs.client.HdfsClientConfigKeys; |
| import org.apache.hadoop.hdfs.protocol.Block; |
| import org.apache.hadoop.hdfs.protocol.DatanodeInfo; |
| import org.apache.hadoop.hdfs.protocol.ExtendedBlock; |
| import org.apache.hadoop.hdfs.protocol.HdfsConstants; |
| import org.apache.hadoop.hdfs.protocol.datatransfer.BlockPinningException; |
| import org.apache.hadoop.hdfs.protocol.datatransfer.DataTransferProtoUtil; |
| import org.apache.hadoop.hdfs.protocol.datatransfer.IOStreamPair; |
| import org.apache.hadoop.hdfs.protocol.datatransfer.Sender; |
| import org.apache.hadoop.hdfs.protocol.datatransfer.TrustedChannelResolver; |
| import org.apache.hadoop.hdfs.protocol.datatransfer.sasl.DataTransferSaslUtil; |
| import org.apache.hadoop.hdfs.protocol.datatransfer.sasl.SaslDataTransferClient; |
| import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.BlockOpResponseProto; |
| import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.Status; |
| import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier; |
| import org.apache.hadoop.hdfs.server.balancer.Dispatcher.DDatanode.StorageGroup; |
| import org.apache.hadoop.hdfs.server.blockmanagement.BlockPlacementPolicies; |
| import org.apache.hadoop.hdfs.protocol.BlockType; |
| import org.apache.hadoop.hdfs.server.protocol.BlocksWithLocations; |
| import org.apache.hadoop.hdfs.server.protocol.BlocksWithLocations.BlockWithLocations; |
| import org.apache.hadoop.hdfs.server.protocol.BlocksWithLocations.StripedBlockWithLocations; |
| import org.apache.hadoop.hdfs.server.protocol.DatanodeStorageReport; |
| import org.apache.hadoop.io.IOUtils; |
| import org.apache.hadoop.net.NetUtils; |
| import org.apache.hadoop.net.NetworkTopology; |
| import org.apache.hadoop.security.token.Token; |
| import org.apache.hadoop.util.StringUtils; |
| import org.apache.hadoop.util.Time; |
| |
| import com.google.common.annotations.VisibleForTesting; |
| import com.google.common.base.Preconditions; |
| |
| /** Dispatching block replica moves between datanodes. */ |
| @InterfaceAudience.Private |
| public class Dispatcher { |
| static final Logger LOG = LoggerFactory.getLogger(Dispatcher.class); |
| |
| /** |
| * the period of time to delay the usage of a DataNode after hitting |
| * errors when using it for migrating data |
| */ |
| private static long delayAfterErrors = 10 * 1000; |
| |
| private final NameNodeConnector nnc; |
| private final SaslDataTransferClient saslClient; |
| |
| /** Set of datanodes to be excluded. */ |
| private final Set<String> excludedNodes; |
| /** Restrict to the following nodes. */ |
| private final Set<String> includedNodes; |
| |
| private final Collection<Source> sources = new HashSet<Source>(); |
| private final Collection<StorageGroup> targets = new HashSet<StorageGroup>(); |
| |
| private final GlobalBlockMap globalBlocks = new GlobalBlockMap(); |
| private final MovedBlocks<StorageGroup> movedBlocks; |
| |
| /** Map (datanodeUuid,storageType -> StorageGroup) */ |
| private final StorageGroupMap<StorageGroup> storageGroupMap |
| = new StorageGroupMap<StorageGroup>(); |
| |
| private NetworkTopology cluster; |
| |
| private final ExecutorService dispatchExecutor; |
| |
| private final Allocator moverThreadAllocator; |
| |
| /** The maximum number of concurrent blocks moves at a datanode */ |
| private final int maxConcurrentMovesPerNode; |
| private final int maxMoverThreads; |
| |
| private final long getBlocksSize; |
| private final long getBlocksMinBlockSize; |
| private final long blockMoveTimeout; |
| /** |
| * If no block can be moved out of a {@link Source} after this configured |
| * amount of time, the Source should give up choosing the next possible move. |
| */ |
| private final int maxNoMoveInterval; |
| |
| private final int ioFileBufferSize; |
| |
| private final boolean connectToDnViaHostname; |
| private BlockPlacementPolicies placementPolicies; |
| |
| private long maxIterationTime; |
| |
| static class Allocator { |
| private final int max; |
| private int count = 0; |
| private int lotSize = 1; |
| |
| Allocator(int max) { |
| this.max = max; |
| } |
| |
| /** Allocate specified number of items */ |
| synchronized int allocate(int n) { |
| final int remaining = max - count; |
| if (remaining <= 0) { |
| return 0; |
| } else { |
| final int allocated = remaining < n? remaining: n; |
| count += allocated; |
| return allocated; |
| } |
| } |
| |
| /** Aloocate a single lot of items */ |
| int allocate() { |
| return allocate(lotSize); |
| } |
| |
| synchronized void reset() { |
| count = 0; |
| } |
| |
| /** Set the lot size */ |
| synchronized void setLotSize(int lotSize) { |
| this.lotSize = lotSize; |
| } |
| } |
| |
| private static class GlobalBlockMap { |
| private final Map<Block, DBlock> map = new HashMap<Block, DBlock>(); |
| |
| /** |
| * Put block in the map if it's not found |
| * @return the block which be put in the map the first time |
| */ |
| private DBlock putIfAbsent(Block blk, DBlock dblk) { |
| if (!map.containsKey(blk)) { |
| map.put(blk, dblk); |
| return dblk; |
| } |
| return map.get(blk); |
| } |
| |
| /** Remove all blocks except for the moved blocks. */ |
| private void removeAllButRetain(MovedBlocks<StorageGroup> movedBlocks) { |
| for (Iterator<Block> i = map.keySet().iterator(); i.hasNext();) { |
| if (!movedBlocks.contains(i.next())) { |
| i.remove(); |
| } |
| } |
| } |
| } |
| |
| public static class StorageGroupMap<G extends StorageGroup> { |
| private static String toKey(String datanodeUuid, StorageType storageType) { |
| return datanodeUuid + ":" + storageType; |
| } |
| |
| private final Map<String, G> map = new HashMap<String, G>(); |
| |
| public G get(String datanodeUuid, StorageType storageType) { |
| return map.get(toKey(datanodeUuid, storageType)); |
| } |
| |
| public void put(G g) { |
| final String key = toKey(g.getDatanodeInfo().getDatanodeUuid(), g.storageType); |
| final StorageGroup existing = map.put(key, g); |
| Preconditions.checkState(existing == null); |
| } |
| |
| int size() { |
| return map.size(); |
| } |
| |
| void clear() { |
| map.clear(); |
| } |
| |
| public Collection<G> values() { |
| return map.values(); |
| } |
| } |
| |
| /** This class keeps track of a scheduled reportedBlock move */ |
| public class PendingMove { |
| private DBlock reportedBlock; |
| private Source source; |
| private DDatanode proxySource; |
| private StorageGroup target; |
| |
| private PendingMove(Source source, StorageGroup target) { |
| this.source = source; |
| this.target = target; |
| } |
| |
| public DatanodeInfo getSource() { |
| return source.getDatanodeInfo(); |
| } |
| |
| @Override |
| public String toString() { |
| final Block b = reportedBlock != null ? reportedBlock.getBlock() : null; |
| String bStr = b != null ? (b + " with size=" + b.getNumBytes() + " ") |
| : " "; |
| return bStr + "from " + source.getDisplayName() + " to " + target |
| .getDisplayName() + " through " + (proxySource != null ? proxySource |
| .datanode : ""); |
| } |
| |
| /** |
| * Choose a good block/blockGroup from source & Get reportedBlock from |
| * the block & Choose a proxy source for the reportedBlock. |
| * |
| * @return true if a block and its proxy are chosen; false otherwise |
| */ |
| private boolean chooseBlockAndProxy() { |
| // source and target must have the same storage type |
| final StorageType t = source.getStorageType(); |
| // iterate all source's blocks until find a good one |
| for (Iterator<DBlock> i = source.getBlockIterator(); i.hasNext();) { |
| if (markMovedIfGoodBlock(i.next(), t)) { |
| i.remove(); |
| return true; |
| } |
| } |
| return false; |
| } |
| |
| /** |
| * @return true if the given block is good for the tentative move. |
| */ |
| private boolean markMovedIfGoodBlock(DBlock block, StorageType targetStorageType) { |
| synchronized (block) { |
| synchronized (movedBlocks) { |
| if (isGoodBlockCandidate(source, target, targetStorageType, block)) { |
| if (block instanceof DBlockStriped) { |
| reportedBlock = ((DBlockStriped) block).getInternalBlock(source); |
| } else { |
| reportedBlock = block; |
| } |
| if (chooseProxySource()) { |
| movedBlocks.put(block); |
| if (LOG.isDebugEnabled()) { |
| LOG.debug("Decided to move " + this); |
| } |
| return true; |
| } |
| } |
| } |
| } |
| return false; |
| } |
| |
| /** |
| * Choose a proxy source. |
| * |
| * @return true if a proxy is found; otherwise false |
| */ |
| private boolean chooseProxySource() { |
| final DatanodeInfo targetDN = target.getDatanodeInfo(); |
| // if source and target are same nodes then no need of proxy |
| if (source.getDatanodeInfo().equals(targetDN) && addTo(source)) { |
| return true; |
| } |
| // if node group is supported, first try add nodes in the same node group |
| if (cluster.isNodeGroupAware()) { |
| for (StorageGroup loc : reportedBlock.getLocations()) { |
| if (cluster.isOnSameNodeGroup(loc.getDatanodeInfo(), targetDN) |
| && addTo(loc)) { |
| return true; |
| } |
| } |
| } |
| // check if there is replica which is on the same rack with the target |
| for (StorageGroup loc : reportedBlock.getLocations()) { |
| if (cluster.isOnSameRack(loc.getDatanodeInfo(), targetDN) && addTo(loc)) { |
| return true; |
| } |
| } |
| // find out a non-busy replica |
| for (StorageGroup loc : reportedBlock.getLocations()) { |
| if (addTo(loc)) { |
| return true; |
| } |
| } |
| return false; |
| } |
| |
| /** add to a proxy source for specific reportedBlock movement */ |
| private boolean addTo(StorageGroup g) { |
| final DDatanode dn = g.getDDatanode(); |
| if (dn.addPendingBlock(this)) { |
| proxySource = dn; |
| return true; |
| } |
| return false; |
| } |
| |
| /** Dispatch the move to the proxy source & wait for the response. */ |
| private void dispatch() { |
| Socket sock = new Socket(); |
| DataOutputStream out = null; |
| DataInputStream in = null; |
| try { |
| if (source.isIterationOver()){ |
| LOG.info("Cancel moving " + this + |
| " as iteration is already cancelled due to" + |
| " dfs.balancer.max-iteration-time is passed."); |
| throw new IOException("Block move cancelled."); |
| } |
| LOG.info("Start moving " + this); |
| assert !(reportedBlock instanceof DBlockStriped); |
| |
| sock.connect( |
| NetUtils.createSocketAddr(target.getDatanodeInfo(). |
| getXferAddr(Dispatcher.this.connectToDnViaHostname)), |
| HdfsConstants.READ_TIMEOUT); |
| |
| // Set read timeout so that it doesn't hang forever against |
| // unresponsive nodes. Datanode normally sends IN_PROGRESS response |
| // twice within the client read timeout period (every 30 seconds by |
| // default). Here, we make it give up after 5 minutes of no response. |
| sock.setSoTimeout(HdfsConstants.READ_TIMEOUT * 5); |
| sock.setKeepAlive(true); |
| |
| OutputStream unbufOut = sock.getOutputStream(); |
| InputStream unbufIn = sock.getInputStream(); |
| ExtendedBlock eb = new ExtendedBlock(nnc.getBlockpoolID(), |
| reportedBlock.getBlock()); |
| final KeyManager km = nnc.getKeyManager(); |
| Token<BlockTokenIdentifier> accessToken = km.getAccessToken(eb, |
| new StorageType[]{target.storageType}, new String[0]); |
| IOStreamPair saslStreams = saslClient.socketSend(sock, unbufOut, |
| unbufIn, km, accessToken, target.getDatanodeInfo()); |
| unbufOut = saslStreams.out; |
| unbufIn = saslStreams.in; |
| out = new DataOutputStream(new BufferedOutputStream(unbufOut, |
| ioFileBufferSize)); |
| in = new DataInputStream(new BufferedInputStream(unbufIn, |
| ioFileBufferSize)); |
| |
| sendRequest(out, eb, accessToken); |
| receiveResponse(in); |
| nnc.getBytesMoved().addAndGet(reportedBlock.getNumBytes()); |
| target.getDDatanode().setHasSuccess(); |
| LOG.info("Successfully moved " + this); |
| } catch (IOException e) { |
| LOG.warn("Failed to move " + this, e); |
| target.getDDatanode().setHasFailure(); |
| // Check that the failure is due to block pinning errors. |
| if (e instanceof BlockPinningException) { |
| // Pinned block can't be moved. Add this block into failure list. |
| // Later in the next iteration mover will exclude these blocks from |
| // pending moves. |
| target.getDDatanode().addBlockPinningFailures(this); |
| return; |
| } |
| |
| // Proxy or target may have some issues, delay before using these nodes |
| // further in order to avoid a potential storm of "threads quota |
| // exceeded" warnings when the dispatcher gets out of sync with work |
| // going on in datanodes. |
| proxySource.activateDelay(delayAfterErrors); |
| target.getDDatanode().activateDelay(delayAfterErrors); |
| } finally { |
| IOUtils.closeStream(out); |
| IOUtils.closeStream(in); |
| IOUtils.closeSocket(sock); |
| |
| proxySource.removePendingBlock(this); |
| target.getDDatanode().removePendingBlock(this); |
| |
| synchronized (this) { |
| reset(); |
| } |
| synchronized (Dispatcher.this) { |
| Dispatcher.this.notifyAll(); |
| } |
| } |
| } |
| |
| /** Send a reportedBlock replace request to the output stream */ |
| private void sendRequest(DataOutputStream out, ExtendedBlock eb, |
| Token<BlockTokenIdentifier> accessToken) throws IOException { |
| new Sender(out).replaceBlock(eb, target.storageType, accessToken, |
| source.getDatanodeInfo().getDatanodeUuid(), proxySource.datanode, |
| null); |
| } |
| |
| /** Check whether to continue waiting for response */ |
| private boolean stopWaitingForResponse(long startTime) { |
| return source.isIterationOver() || |
| (blockMoveTimeout > 0 && |
| (Time.monotonicNow() - startTime > blockMoveTimeout)); |
| } |
| |
| /** Receive a reportedBlock copy response from the input stream */ |
| private void receiveResponse(DataInputStream in) throws IOException { |
| long startTime = Time.monotonicNow(); |
| BlockOpResponseProto response = |
| BlockOpResponseProto.parseFrom(vintPrefixed(in)); |
| while (response.getStatus() == Status.IN_PROGRESS) { |
| // read intermediate responses |
| response = BlockOpResponseProto.parseFrom(vintPrefixed(in)); |
| // Stop waiting for slow block moves. Even if it stops waiting, |
| // the actual move may continue. |
| if (stopWaitingForResponse(startTime)) { |
| throw new IOException("Block move timed out"); |
| } |
| } |
| String logInfo = "reportedBlock move is failed"; |
| DataTransferProtoUtil.checkBlockOpStatus(response, logInfo, true); |
| } |
| |
| /** reset the object */ |
| private void reset() { |
| reportedBlock = null; |
| source = null; |
| proxySource = null; |
| target = null; |
| } |
| } |
| |
| /** A class for keeping track of block locations in the dispatcher. */ |
| public static class DBlock extends MovedBlocks.Locations<StorageGroup> { |
| public DBlock(Block block) { |
| super(block); |
| } |
| |
| public long getNumBytes(StorageGroup storage) { |
| return super.getNumBytes(); |
| } |
| } |
| |
| public static class DBlockStriped extends DBlock { |
| |
| final byte[] indices; |
| final short dataBlockNum; |
| final int cellSize; |
| |
| public DBlockStriped(Block block, byte[] indices, short dataBlockNum, |
| int cellSize) { |
| super(block); |
| this.indices = indices; |
| this.dataBlockNum = dataBlockNum; |
| this.cellSize = cellSize; |
| } |
| |
| public DBlock getInternalBlock(StorageGroup storage) { |
| int idxInLocs = locations.indexOf(storage); |
| if (idxInLocs == -1) { |
| return null; |
| } |
| byte idxInGroup = indices[idxInLocs]; |
| long blkId = getBlock().getBlockId() + idxInGroup; |
| long numBytes = getInternalBlockLength(getNumBytes(), cellSize, |
| dataBlockNum, idxInGroup); |
| Block blk = new Block(getBlock()); |
| blk.setBlockId(blkId); |
| blk.setNumBytes(numBytes); |
| DBlock dblk = new DBlock(blk); |
| dblk.addLocation(storage); |
| return dblk; |
| } |
| |
| @Override |
| public long getNumBytes(StorageGroup storage) { |
| return getInternalBlock(storage).getNumBytes(); |
| } |
| } |
| |
| /** The class represents a desired move. */ |
| static class Task { |
| private final StorageGroup target; |
| private long size; // bytes scheduled to move |
| |
| Task(StorageGroup target, long size) { |
| this.target = target; |
| this.size = size; |
| } |
| |
| long getSize() { |
| return size; |
| } |
| } |
| |
| /** A class that keeps track of a datanode. */ |
| public static class DDatanode { |
| |
| /** A group of storages in a datanode with the same storage type. */ |
| public class StorageGroup { |
| final StorageType storageType; |
| final long maxSize2Move; |
| private long scheduledSize = 0L; |
| |
| private StorageGroup(StorageType storageType, long maxSize2Move) { |
| this.storageType = storageType; |
| this.maxSize2Move = maxSize2Move; |
| } |
| |
| public StorageType getStorageType() { |
| return storageType; |
| } |
| |
| private DDatanode getDDatanode() { |
| return DDatanode.this; |
| } |
| |
| public DatanodeInfo getDatanodeInfo() { |
| return DDatanode.this.datanode; |
| } |
| |
| /** Decide if still need to move more bytes */ |
| boolean hasSpaceForScheduling() { |
| return hasSpaceForScheduling(0L); |
| } |
| |
| synchronized boolean hasSpaceForScheduling(long size) { |
| return availableSizeToMove() > size; |
| } |
| |
| /** @return the total number of bytes that need to be moved */ |
| synchronized long availableSizeToMove() { |
| return maxSize2Move - scheduledSize; |
| } |
| |
| /** increment scheduled size */ |
| public synchronized void incScheduledSize(long size) { |
| scheduledSize += size; |
| } |
| |
| /** @return scheduled size */ |
| synchronized long getScheduledSize() { |
| return scheduledSize; |
| } |
| |
| /** Reset scheduled size to zero. */ |
| synchronized void resetScheduledSize() { |
| scheduledSize = 0L; |
| } |
| |
| private PendingMove addPendingMove(DBlock block, final PendingMove pm) { |
| if (getDDatanode().addPendingBlock(pm)) { |
| if (pm.markMovedIfGoodBlock(block, getStorageType())) { |
| incScheduledSize(pm.reportedBlock.getNumBytes()); |
| return pm; |
| } else { |
| getDDatanode().removePendingBlock(pm); |
| } |
| } |
| return null; |
| } |
| |
| /** @return the name for display */ |
| String getDisplayName() { |
| return datanode + ":" + storageType; |
| } |
| |
| @Override |
| public String toString() { |
| return getDisplayName(); |
| } |
| |
| @Override |
| public int hashCode() { |
| return getStorageType().hashCode() ^ getDatanodeInfo().hashCode(); |
| } |
| |
| @Override |
| public boolean equals(Object obj) { |
| if (this == obj) { |
| return true; |
| } else if (obj == null || !(obj instanceof StorageGroup)) { |
| return false; |
| } else { |
| final StorageGroup that = (StorageGroup) obj; |
| return this.getStorageType() == that.getStorageType() |
| && this.getDatanodeInfo().equals(that.getDatanodeInfo()); |
| } |
| } |
| |
| } |
| |
| final DatanodeInfo datanode; |
| private final EnumMap<StorageType, Source> sourceMap |
| = new EnumMap<StorageType, Source>(StorageType.class); |
| private final EnumMap<StorageType, StorageGroup> targetMap |
| = new EnumMap<StorageType, StorageGroup>(StorageType.class); |
| protected long delayUntil = 0L; |
| /** blocks being moved but not confirmed yet */ |
| private final List<PendingMove> pendings; |
| private volatile boolean hasFailure = false; |
| private Map<Long, Set<DatanodeInfo>> blockPinningFailures = new HashMap<>(); |
| private volatile boolean hasSuccess = false; |
| private ExecutorService moveExecutor; |
| |
| @Override |
| public String toString() { |
| return getClass().getSimpleName() + ":" + datanode; |
| } |
| |
| private DDatanode(DatanodeInfo datanode, int maxConcurrentMoves) { |
| this.datanode = datanode; |
| this.pendings = new ArrayList<PendingMove>(maxConcurrentMoves); |
| } |
| |
| public DatanodeInfo getDatanodeInfo() { |
| return datanode; |
| } |
| |
| synchronized ExecutorService initMoveExecutor(int poolSize) { |
| return moveExecutor = Executors.newFixedThreadPool(poolSize); |
| } |
| |
| synchronized ExecutorService getMoveExecutor() { |
| return moveExecutor; |
| } |
| |
| synchronized void shutdownMoveExecutor() { |
| if (moveExecutor != null) { |
| moveExecutor.shutdown(); |
| moveExecutor = null; |
| } |
| } |
| |
| private static <G extends StorageGroup> void put(StorageType storageType, |
| G g, EnumMap<StorageType, G> map) { |
| final StorageGroup existing = map.put(storageType, g); |
| Preconditions.checkState(existing == null); |
| } |
| |
| public StorageGroup addTarget(StorageType storageType, long maxSize2Move) { |
| final StorageGroup g = new StorageGroup(storageType, maxSize2Move); |
| put(storageType, g, targetMap); |
| return g; |
| } |
| |
| public Source addSource(StorageType storageType, long maxSize2Move, Dispatcher d) { |
| final Source s = d.new Source(storageType, maxSize2Move, this); |
| put(storageType, s, sourceMap); |
| return s; |
| } |
| |
| synchronized private void activateDelay(long delta) { |
| delayUntil = Time.monotonicNow() + delta; |
| LOG.info(this + " activateDelay " + delta/1000.0 + " seconds"); |
| } |
| |
| synchronized private boolean isDelayActive() { |
| if (delayUntil == 0 || Time.monotonicNow() > delayUntil) { |
| delayUntil = 0; |
| return false; |
| } |
| return true; |
| } |
| |
| /** Check if all the dispatched moves are done */ |
| synchronized boolean isPendingQEmpty() { |
| return pendings.isEmpty(); |
| } |
| |
| /** Add a scheduled block move to the node */ |
| synchronized boolean addPendingBlock(PendingMove pendingBlock) { |
| if (!isDelayActive()) { |
| return pendings.add(pendingBlock); |
| } |
| return false; |
| } |
| |
| /** Remove a scheduled block move from the node */ |
| synchronized boolean removePendingBlock(PendingMove pendingBlock) { |
| return pendings.remove(pendingBlock); |
| } |
| |
| void setHasFailure() { |
| this.hasFailure = true; |
| } |
| |
| void addBlockPinningFailures(PendingMove pendingBlock) { |
| synchronized (blockPinningFailures) { |
| long blockId = pendingBlock.reportedBlock.getBlock().getBlockId(); |
| Set<DatanodeInfo> pinnedLocations = blockPinningFailures.get(blockId); |
| if (pinnedLocations == null) { |
| pinnedLocations = new HashSet<>(); |
| blockPinningFailures.put(blockId, pinnedLocations); |
| } |
| pinnedLocations.add(pendingBlock.getSource()); |
| } |
| } |
| |
| Map<Long, Set<DatanodeInfo>> getBlockPinningFailureList() { |
| return blockPinningFailures; |
| } |
| |
| void setHasSuccess() { |
| this.hasSuccess = true; |
| } |
| } |
| |
| /** A node that can be the sources of a block move */ |
| public class Source extends DDatanode.StorageGroup { |
| |
| private final List<Task> tasks = new ArrayList<Task>(2); |
| private long blocksToReceive = 0L; |
| private final long startTime = Time.monotonicNow(); |
| /** |
| * Source blocks point to the objects in {@link Dispatcher#globalBlocks} |
| * because we want to keep one copy of a block and be aware that the |
| * locations are changing over time. |
| */ |
| private final List<DBlock> srcBlocks = new ArrayList<DBlock>(); |
| |
| private Source(StorageType storageType, long maxSize2Move, DDatanode dn) { |
| dn.super(storageType, maxSize2Move); |
| } |
| |
| /** |
| * Check if the iteration is over |
| */ |
| public boolean isIterationOver() { |
| if (maxIterationTime < 0){ |
| return false; |
| } |
| return (Time.monotonicNow()-startTime > maxIterationTime); |
| } |
| |
| /** Add a task */ |
| void addTask(Task task) { |
| Preconditions.checkState(task.target != this, |
| "Source and target are the same storage group " + getDisplayName()); |
| incScheduledSize(task.size); |
| tasks.add(task); |
| } |
| |
| /** @return an iterator to this source's blocks */ |
| Iterator<DBlock> getBlockIterator() { |
| return srcBlocks.iterator(); |
| } |
| |
| /** |
| * Fetch new blocks of this source from namenode and update this source's |
| * block list & {@link Dispatcher#globalBlocks}. |
| * |
| * @return the total size of the received blocks in the number of bytes. |
| */ |
| private long getBlockList() throws IOException { |
| final long size = Math.min(getBlocksSize, blocksToReceive); |
| final BlocksWithLocations newBlksLocs = |
| nnc.getBlocks(getDatanodeInfo(), size, getBlocksMinBlockSize); |
| |
| if (LOG.isTraceEnabled()) { |
| LOG.trace("getBlocks(" + getDatanodeInfo() + ", " |
| + StringUtils.TraditionalBinaryPrefix.long2String(size, "B", 2) |
| + ") returns " + newBlksLocs.getBlocks().length + " blocks."); |
| } |
| |
| long bytesReceived = 0; |
| for (BlockWithLocations blkLocs : newBlksLocs.getBlocks()) { |
| // Skip small blocks. |
| if (blkLocs.getBlock().getNumBytes() < getBlocksMinBlockSize) { |
| continue; |
| } |
| |
| DBlock block; |
| if (blkLocs instanceof StripedBlockWithLocations) { |
| StripedBlockWithLocations sblkLocs = |
| (StripedBlockWithLocations) blkLocs; |
| // approximate size |
| bytesReceived += sblkLocs.getBlock().getNumBytes() / |
| sblkLocs.getDataBlockNum(); |
| block = new DBlockStriped(sblkLocs.getBlock(), sblkLocs.getIndices(), |
| sblkLocs.getDataBlockNum(), sblkLocs.getCellSize()); |
| } else { |
| bytesReceived += blkLocs.getBlock().getNumBytes(); |
| block = new DBlock(blkLocs.getBlock()); |
| } |
| |
| synchronized (globalBlocks) { |
| block = globalBlocks.putIfAbsent(blkLocs.getBlock(), block); |
| synchronized (block) { |
| block.clearLocations(); |
| |
| // update locations |
| final String[] datanodeUuids = blkLocs.getDatanodeUuids(); |
| final StorageType[] storageTypes = blkLocs.getStorageTypes(); |
| for (int i = 0; i < datanodeUuids.length; i++) { |
| final StorageGroup g = storageGroupMap.get( |
| datanodeUuids[i], storageTypes[i]); |
| if (g != null) { // not unknown |
| block.addLocation(g); |
| } |
| } |
| } |
| if (!srcBlocks.contains(block) && isGoodBlockCandidate(block)) { |
| if (LOG.isTraceEnabled()) { |
| LOG.trace("Add " + block + " to " + this); |
| } |
| srcBlocks.add(block); |
| } |
| } |
| } |
| return bytesReceived; |
| } |
| |
| /** Decide if the given block is a good candidate to move or not */ |
| private boolean isGoodBlockCandidate(DBlock block) { |
| // source and target must have the same storage type |
| final StorageType sourceStorageType = getStorageType(); |
| for (Task t : tasks) { |
| if (Dispatcher.this.isGoodBlockCandidate(this, t.target, |
| sourceStorageType, block)) { |
| return true; |
| } |
| } |
| return false; |
| } |
| |
| /** |
| * Choose a move for the source. The block's source, target, and proxy |
| * are determined too. When choosing proxy and target, source & |
| * target throttling has been considered. They are chosen only when they |
| * have the capacity to support this block move. The block should be |
| * dispatched immediately after this method is returned. |
| * If the block is a block group. Only the internal block on this source |
| * will be dispatched. |
| * |
| * @return a move that's good for the source to dispatch immediately. |
| */ |
| private PendingMove chooseNextMove() { |
| for (Iterator<Task> i = tasks.iterator(); i.hasNext();) { |
| final Task task = i.next(); |
| final DDatanode target = task.target.getDDatanode(); |
| final PendingMove pendingBlock = new PendingMove(this, task.target); |
| if (target.addPendingBlock(pendingBlock)) { |
| // target is not busy, so do a tentative block allocation |
| if (pendingBlock.chooseBlockAndProxy()) { |
| long blockSize = pendingBlock.reportedBlock.getNumBytes(this); |
| incScheduledSize(-blockSize); |
| task.size -= blockSize; |
| if (task.size <= 0) { |
| i.remove(); |
| } |
| return pendingBlock; |
| } else { |
| // cancel the tentative move |
| target.removePendingBlock(pendingBlock); |
| } |
| } |
| } |
| return null; |
| } |
| |
| /** Add a pending move */ |
| public PendingMove addPendingMove(DBlock block, StorageGroup target) { |
| return target.addPendingMove(block, new PendingMove(this, target)); |
| } |
| |
| /** Iterate all source's blocks to remove moved ones */ |
| private void removeMovedBlocks() { |
| for (Iterator<DBlock> i = getBlockIterator(); i.hasNext();) { |
| if (movedBlocks.contains(i.next().getBlock())) { |
| i.remove(); |
| } |
| } |
| } |
| |
| /** @return if should fetch more blocks from namenode */ |
| private boolean shouldFetchMoreBlocks() { |
| return blocksToReceive > 0; |
| } |
| |
| /** |
| * This method iteratively does the following: it first selects a block to |
| * move, then sends a request to the proxy source to start the block move |
| * when the source's block list falls below a threshold, it asks the |
| * namenode for more blocks. It terminates when it has dispatch enough block |
| * move tasks or it has received enough blocks from the namenode, or the |
| * elapsed time of the iteration has exceeded the max time limit. |
| * |
| */ |
| private void dispatchBlocks() { |
| this.blocksToReceive = 2 * getScheduledSize(); |
| long previousMoveTimestamp = Time.monotonicNow(); |
| while (getScheduledSize() > 0 && !isIterationOver() |
| && (!srcBlocks.isEmpty() || blocksToReceive > 0)) { |
| if (LOG.isTraceEnabled()) { |
| LOG.trace(this + " blocksToReceive=" + blocksToReceive |
| + ", scheduledSize=" + getScheduledSize() |
| + ", srcBlocks#=" + srcBlocks.size()); |
| } |
| final PendingMove p = chooseNextMove(); |
| if (p != null) { |
| // Reset previous move timestamp |
| previousMoveTimestamp = Time.monotonicNow(); |
| executePendingMove(p); |
| continue; |
| } |
| |
| // Since we cannot schedule any block to move, |
| // remove any moved blocks from the source block list and |
| removeMovedBlocks(); // filter already moved blocks |
| // check if we should fetch more blocks from the namenode |
| if (shouldFetchMoreBlocks()) { |
| // fetch new blocks |
| try { |
| final long received = getBlockList(); |
| if (received == 0) { |
| return; |
| } |
| blocksToReceive -= received; |
| continue; |
| } catch (IOException e) { |
| LOG.warn("Exception while getting reportedBlock list", e); |
| return; |
| } |
| } else { |
| // jump out of while-loop after the configured timeout. |
| long noMoveInterval = Time.monotonicNow() - previousMoveTimestamp; |
| if (noMoveInterval > maxNoMoveInterval) { |
| LOG.info("Failed to find a pending move for " + noMoveInterval |
| + " ms. Skipping " + this); |
| resetScheduledSize(); |
| } |
| } |
| |
| // Now we can not schedule any block to move and there are |
| // no new blocks added to the source block list, so we wait. |
| try { |
| synchronized (Dispatcher.this) { |
| Dispatcher.this.wait(1000); // wait for targets/sources to be idle |
| } |
| // Didn't find a possible move in this iteration of the while loop, |
| // adding a small delay before choosing next move again. |
| Thread.sleep(100); |
| } catch (InterruptedException ignored) { |
| } |
| } |
| |
| if (isIterationOver()) { |
| LOG.info("The maximum iteration time (" + maxIterationTime/1000 |
| + " seconds) has been reached. Stopping " + this); |
| } |
| } |
| |
| @Override |
| public int hashCode() { |
| return super.hashCode(); |
| } |
| |
| @Override |
| public boolean equals(Object obj) { |
| return super.equals(obj); |
| } |
| } |
| |
| /** Constructor called by Mover. */ |
| public Dispatcher(NameNodeConnector nnc, Set<String> includedNodes, |
| Set<String> excludedNodes, long movedWinWidth, int moverThreads, |
| int dispatcherThreads, int maxConcurrentMovesPerNode, |
| int maxNoMoveInterval, Configuration conf) { |
| this(nnc, includedNodes, excludedNodes, movedWinWidth, |
| moverThreads, dispatcherThreads, maxConcurrentMovesPerNode, |
| 0L, 0L, 0, maxNoMoveInterval, -1, conf); |
| } |
| |
| Dispatcher(NameNodeConnector nnc, Set<String> includedNodes, |
| Set<String> excludedNodes, long movedWinWidth, int moverThreads, |
| int dispatcherThreads, int maxConcurrentMovesPerNode, |
| long getBlocksSize, long getBlocksMinBlockSize, int blockMoveTimeout, |
| int maxNoMoveInterval, long maxIterationTime, Configuration conf) { |
| this.nnc = nnc; |
| this.excludedNodes = excludedNodes; |
| this.includedNodes = includedNodes; |
| this.movedBlocks = new MovedBlocks<StorageGroup>(movedWinWidth); |
| |
| this.cluster = NetworkTopology.getInstance(conf); |
| |
| this.dispatchExecutor = dispatcherThreads == 0? null |
| : Executors.newFixedThreadPool(dispatcherThreads); |
| this.moverThreadAllocator = new Allocator(moverThreads); |
| this.maxMoverThreads = moverThreads; |
| this.maxConcurrentMovesPerNode = maxConcurrentMovesPerNode; |
| |
| this.getBlocksSize = getBlocksSize; |
| this.getBlocksMinBlockSize = getBlocksMinBlockSize; |
| this.blockMoveTimeout = blockMoveTimeout; |
| this.maxNoMoveInterval = maxNoMoveInterval; |
| |
| this.saslClient = new SaslDataTransferClient(conf, |
| DataTransferSaslUtil.getSaslPropertiesResolver(conf), |
| TrustedChannelResolver.getInstance(conf), nnc.fallbackToSimpleAuth); |
| this.ioFileBufferSize = DFSUtilClient.getIoFileBufferSize(conf); |
| this.connectToDnViaHostname = conf.getBoolean( |
| HdfsClientConfigKeys.DFS_CLIENT_USE_DN_HOSTNAME, |
| HdfsClientConfigKeys.DFS_CLIENT_USE_DN_HOSTNAME_DEFAULT); |
| placementPolicies = new BlockPlacementPolicies(conf, null, cluster, null); |
| this.maxIterationTime = maxIterationTime; |
| } |
| |
| public DistributedFileSystem getDistributedFileSystem() { |
| return nnc.getDistributedFileSystem(); |
| } |
| |
| public StorageGroupMap<StorageGroup> getStorageGroupMap() { |
| return storageGroupMap; |
| } |
| |
| public NetworkTopology getCluster() { |
| return cluster; |
| } |
| |
| long getBytesMoved() { |
| return nnc.getBytesMoved().get(); |
| } |
| |
| long bytesToMove() { |
| Preconditions.checkState( |
| storageGroupMap.size() >= sources.size() + targets.size(), |
| "Mismatched number of storage groups (" + storageGroupMap.size() |
| + " < " + sources.size() + " sources + " + targets.size() |
| + " targets)"); |
| |
| long b = 0L; |
| for (Source src : sources) { |
| b += src.getScheduledSize(); |
| } |
| return b; |
| } |
| |
| void add(Source source, StorageGroup target) { |
| sources.add(source); |
| targets.add(target); |
| } |
| |
| private boolean shouldIgnore(DatanodeInfo dn) { |
| // ignore out-of-service nodes |
| final boolean outOfService = !dn.isInService(); |
| // ignore nodes in exclude list |
| final boolean excluded = Util.isExcluded(excludedNodes, dn); |
| // ignore nodes not in the include list (if include list is not empty) |
| final boolean notIncluded = !Util.isIncluded(includedNodes, dn); |
| |
| if (outOfService || excluded || notIncluded) { |
| if (LOG.isTraceEnabled()) { |
| LOG.trace("Excluding datanode " + dn |
| + ": outOfService=" + outOfService |
| + ", excluded=" + excluded |
| + ", notIncluded=" + notIncluded); |
| } |
| return true; |
| } |
| return false; |
| } |
| |
| /** Get live datanode storage reports and then build the network topology. */ |
| public List<DatanodeStorageReport> init() throws IOException { |
| final DatanodeStorageReport[] reports = nnc.getLiveDatanodeStorageReport(); |
| final List<DatanodeStorageReport> trimmed = new ArrayList<DatanodeStorageReport>(); |
| // create network topology and classify utilization collections: |
| // over-utilized, above-average, below-average and under-utilized. |
| Collections.shuffle(Arrays.asList(reports)); |
| for (DatanodeStorageReport r : reports) { |
| final DatanodeInfo datanode = r.getDatanodeInfo(); |
| if (shouldIgnore(datanode)) { |
| continue; |
| } |
| trimmed.add(r); |
| cluster.add(datanode); |
| } |
| return trimmed; |
| } |
| |
| public DDatanode newDatanode(DatanodeInfo datanode) { |
| return new DDatanode(datanode, maxConcurrentMovesPerNode); |
| } |
| |
| |
| public void executePendingMove(final PendingMove p) { |
| // move the reportedBlock |
| final DDatanode targetDn = p.target.getDDatanode(); |
| ExecutorService moveExecutor = targetDn.getMoveExecutor(); |
| if (moveExecutor == null) { |
| final int nThreads = moverThreadAllocator.allocate(); |
| if (nThreads > 0) { |
| moveExecutor = targetDn.initMoveExecutor(nThreads); |
| } |
| } |
| if (moveExecutor == null) { |
| LOG.warn("No mover threads available: skip moving " + p); |
| targetDn.removePendingBlock(p); |
| p.proxySource.removePendingBlock(p); |
| return; |
| } |
| moveExecutor.execute(new Runnable() { |
| @Override |
| public void run() { |
| p.dispatch(); |
| } |
| }); |
| } |
| |
| public boolean dispatchAndCheckContinue() throws InterruptedException { |
| return nnc.shouldContinue(dispatchBlockMoves()); |
| } |
| |
| /** |
| * Dispatch block moves for each source. The thread selects blocks to move & |
| * sends request to proxy source to initiate block move. The process is flow |
| * controlled. Block selection is blocked if there are too many un-confirmed |
| * block moves. |
| * |
| * @return the total number of bytes successfully moved in this iteration. |
| */ |
| private long dispatchBlockMoves() throws InterruptedException { |
| final long bytesLastMoved = getBytesMoved(); |
| final Future<?>[] futures = new Future<?>[sources.size()]; |
| |
| int concurrentThreads = Math.min(sources.size(), |
| ((ThreadPoolExecutor)dispatchExecutor).getCorePoolSize()); |
| assert concurrentThreads > 0 : "Number of concurrent threads is 0."; |
| LOG.debug("Balancer concurrent dispatcher threads = {}", concurrentThreads); |
| |
| // Determine the size of each mover thread pool per target |
| int threadsPerTarget = maxMoverThreads/targets.size(); |
| if (threadsPerTarget == 0) { |
| // Some scheduled moves will get ignored as some targets won't have |
| // any threads allocated. |
| moverThreadAllocator.setLotSize(1); |
| LOG.warn(DFSConfigKeys.DFS_BALANCER_MOVERTHREADS_KEY + "=" + |
| maxMoverThreads + " is too small for moving blocks to " + |
| targets.size() + " targets. Balancing may be slower."); |
| } else { |
| if (threadsPerTarget > maxConcurrentMovesPerNode) { |
| threadsPerTarget = maxConcurrentMovesPerNode; |
| LOG.info("Limiting threads per target to the specified max."); |
| } |
| moverThreadAllocator.setLotSize(threadsPerTarget); |
| LOG.info("Allocating " + threadsPerTarget + " threads per target."); |
| } |
| |
| final Iterator<Source> i = sources.iterator(); |
| for (int j = 0; j < futures.length; j++) { |
| final Source s = i.next(); |
| futures[j] = dispatchExecutor.submit(new Runnable() { |
| @Override |
| public void run() { |
| s.dispatchBlocks(); |
| } |
| }); |
| } |
| |
| // wait for all dispatcher threads to finish |
| for (Future<?> future : futures) { |
| try { |
| future.get(); |
| } catch (ExecutionException e) { |
| LOG.warn("Dispatcher thread failed", e.getCause()); |
| } |
| } |
| |
| // wait for all reportedBlock moving to be done |
| waitForMoveCompletion(targets); |
| |
| return getBytesMoved() - bytesLastMoved; |
| } |
| |
| /** |
| * Wait for all reportedBlock move confirmations. |
| * @return true if there is failed move execution |
| */ |
| public static boolean waitForMoveCompletion( |
| Iterable<? extends StorageGroup> targets) { |
| boolean hasFailure = false; |
| for(;;) { |
| boolean empty = true; |
| for (StorageGroup t : targets) { |
| if (!t.getDDatanode().isPendingQEmpty()) { |
| empty = false; |
| break; |
| } else { |
| hasFailure |= t.getDDatanode().hasFailure; |
| } |
| } |
| if (empty) { |
| return hasFailure; // all pending queues are empty |
| } |
| try { |
| Thread.sleep(1000); |
| } catch (InterruptedException ignored) { |
| } |
| } |
| } |
| |
| /** |
| * Check any of the block movements are failed due to block pinning errors. If |
| * yes, add the failed blockId and its respective source node location to the |
| * excluded list. |
| */ |
| public static void checkForBlockPinningFailures( |
| Map<Long, Set<DatanodeInfo>> excludedPinnedBlocks, |
| Iterable<? extends StorageGroup> targets) { |
| for (StorageGroup t : targets) { |
| Map<Long, Set<DatanodeInfo>> blockPinningFailureList = t.getDDatanode() |
| .getBlockPinningFailureList(); |
| Set<Entry<Long, Set<DatanodeInfo>>> entrySet = blockPinningFailureList |
| .entrySet(); |
| for (Entry<Long, Set<DatanodeInfo>> entry : entrySet) { |
| Long blockId = entry.getKey(); |
| Set<DatanodeInfo> locs = excludedPinnedBlocks.get(blockId); |
| if (locs == null) { |
| // blockId doesn't exists in the excluded list. |
| locs = entry.getValue(); |
| excludedPinnedBlocks.put(blockId, locs); |
| } else { |
| // blockId already exists in the excluded list, add the pinned node. |
| locs.addAll(entry.getValue()); |
| } |
| } |
| } |
| } |
| |
| /** |
| * @return true if some moves are success. |
| */ |
| public static boolean checkForSuccess( |
| Iterable<? extends StorageGroup> targets) { |
| boolean hasSuccess = false; |
| for (StorageGroup t : targets) { |
| hasSuccess |= t.getDDatanode().hasSuccess; |
| } |
| return hasSuccess; |
| } |
| |
| /** |
| * Decide if the block/blockGroup is a good candidate to be moved from source |
| * to target. A block is a good candidate if |
| * 1. the block is not in the process of being moved/has not been moved; |
| * 2. the block does not have a replica/internalBlock on the target; |
| * 3. doing the move does not reduce the number of racks that the block has |
| */ |
| private boolean isGoodBlockCandidate(StorageGroup source, StorageGroup target, |
| StorageType targetStorageType, DBlock block) { |
| if (source.equals(target)) { |
| return false; |
| } |
| if (target.storageType != targetStorageType) { |
| return false; |
| } |
| // check if the block is moved or not |
| if (movedBlocks.contains(block.getBlock())) { |
| return false; |
| } |
| final DatanodeInfo targetDatanode = target.getDatanodeInfo(); |
| if (source.getDatanodeInfo().equals(targetDatanode)) { |
| // the reportedBlock is moved inside same DN |
| return true; |
| } |
| |
| // check if block has replica in target node |
| for (StorageGroup blockLocation : block.getLocations()) { |
| if (blockLocation.getDatanodeInfo().equals(targetDatanode)) { |
| return false; |
| } |
| } |
| |
| if (!isGoodBlockCandidateForPlacementPolicy(source, target, block)) { |
| return false; |
| } |
| return true; |
| } |
| |
| // Check if the move will violate the block placement policy. |
| private boolean isGoodBlockCandidateForPlacementPolicy(StorageGroup source, |
| StorageGroup target, DBlock block) { |
| List<DatanodeInfo> datanodeInfos = new ArrayList<>(); |
| synchronized (block) { |
| for (StorageGroup loc : block.locations) { |
| datanodeInfos.add(loc.getDatanodeInfo()); |
| } |
| datanodeInfos.add(target.getDatanodeInfo()); |
| } |
| return placementPolicies.getPolicy(BlockType.CONTIGUOUS).isMovable( |
| datanodeInfos, source.getDatanodeInfo(), target.getDatanodeInfo()); |
| } |
| |
| /** Reset all fields in order to prepare for the next iteration */ |
| void reset(Configuration conf) { |
| cluster = NetworkTopology.getInstance(conf); |
| storageGroupMap.clear(); |
| sources.clear(); |
| |
| moverThreadAllocator.reset(); |
| for(StorageGroup t : targets) { |
| t.getDDatanode().shutdownMoveExecutor(); |
| } |
| targets.clear(); |
| globalBlocks.removeAllButRetain(movedBlocks); |
| movedBlocks.cleanup(); |
| } |
| |
| @VisibleForTesting |
| public static void setDelayAfterErrors(long time) { |
| delayAfterErrors = time; |
| } |
| |
| /** shutdown thread pools */ |
| public void shutdownNow() { |
| if (dispatchExecutor != null) { |
| dispatchExecutor.shutdownNow(); |
| } |
| } |
| |
| static class Util { |
| /** @return true if data node is part of the excludedNodes. */ |
| static boolean isExcluded(Set<String> excludedNodes, DatanodeInfo dn) { |
| return isIn(excludedNodes, dn); |
| } |
| |
| /** |
| * @return true if includedNodes is empty or data node is part of the |
| * includedNodes. |
| */ |
| static boolean isIncluded(Set<String> includedNodes, DatanodeInfo dn) { |
| return (includedNodes.isEmpty() || isIn(includedNodes, dn)); |
| } |
| |
| /** |
| * Match is checked using host name , ip address with and without port |
| * number. |
| * |
| * @return true if the datanode's transfer address matches the set of nodes. |
| */ |
| private static boolean isIn(Set<String> datanodes, DatanodeInfo dn) { |
| return isIn(datanodes, dn.getPeerHostName(), dn.getXferPort()) |
| || isIn(datanodes, dn.getIpAddr(), dn.getXferPort()) |
| || isIn(datanodes, dn.getHostName(), dn.getXferPort()); |
| } |
| |
| /** @return true if nodes contains host or host:port */ |
| private static boolean isIn(Set<String> nodes, String host, int port) { |
| if (host == null) { |
| return false; |
| } |
| return (nodes.contains(host) || nodes.contains(host + ":" + port)); |
| } |
| } |
| } |