blob: ba749783f5f545be915cae17a6cd9e004ac4007b [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hdfs;
import java.io.IOException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.Callable;
import java.util.concurrent.CancellationException;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.BlockLocation;
import org.apache.hadoop.fs.BlockStorageLocation;
import org.apache.hadoop.fs.HdfsVolumeId;
import org.apache.hadoop.fs.VolumeId;
import org.apache.hadoop.hdfs.protocol.ClientDatanodeProtocol;
import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
import org.apache.hadoop.hdfs.protocol.HdfsBlocksMetadata;
import org.apache.hadoop.hdfs.protocol.LocatedBlock;
import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier;
import org.apache.hadoop.hdfs.security.token.block.InvalidBlockTokenException;
import org.apache.hadoop.ipc.RPC;
import org.apache.hadoop.security.token.Token;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
@InterfaceAudience.Private
@InterfaceStability.Unstable
class BlockStorageLocationUtil {
static final Log LOG = LogFactory
.getLog(BlockStorageLocationUtil.class);
/**
* Create a list of {@link VolumeBlockLocationCallable} corresponding to a set
* of datanodes and blocks. The blocks must all correspond to the same
* block pool.
*
* @param datanodeBlocks
* Map of datanodes to block replicas at each datanode
* @return callables Used to query each datanode for location information on
* the block replicas at the datanode
*/
private static List<VolumeBlockLocationCallable> createVolumeBlockLocationCallables(
Configuration conf, Map<DatanodeInfo, List<LocatedBlock>> datanodeBlocks,
int timeout, boolean connectToDnViaHostname) {
if (datanodeBlocks.isEmpty()) {
return Lists.newArrayList();
}
// Construct the callables, one per datanode
List<VolumeBlockLocationCallable> callables =
new ArrayList<VolumeBlockLocationCallable>();
for (Map.Entry<DatanodeInfo, List<LocatedBlock>> entry : datanodeBlocks
.entrySet()) {
// Construct RPC parameters
DatanodeInfo datanode = entry.getKey();
List<LocatedBlock> locatedBlocks = entry.getValue();
if (locatedBlocks.isEmpty()) {
continue;
}
// Ensure that the blocks all are from the same block pool.
String poolId = locatedBlocks.get(0).getBlock().getBlockPoolId();
for (LocatedBlock lb : locatedBlocks) {
if (!poolId.equals(lb.getBlock().getBlockPoolId())) {
throw new IllegalArgumentException(
"All blocks to be queried must be in the same block pool: " +
locatedBlocks.get(0).getBlock() + " and " + lb +
" are from different pools.");
}
}
long[] blockIds = new long[locatedBlocks.size()];
int i = 0;
List<Token<BlockTokenIdentifier>> dnTokens =
new ArrayList<Token<BlockTokenIdentifier>>(
locatedBlocks.size());
for (LocatedBlock b : locatedBlocks) {
blockIds[i++] = b.getBlock().getBlockId();
dnTokens.add(b.getBlockToken());
}
VolumeBlockLocationCallable callable = new VolumeBlockLocationCallable(
conf, datanode, poolId, blockIds, dnTokens, timeout,
connectToDnViaHostname);
callables.add(callable);
}
return callables;
}
/**
* Queries datanodes for the blocks specified in <code>datanodeBlocks</code>,
* making one RPC to each datanode. These RPCs are made in parallel using a
* threadpool.
*
* @param datanodeBlocks
* Map of datanodes to the blocks present on the DN
* @return metadatas Map of datanodes to block metadata of the DN
* @throws InvalidBlockTokenException
* if client does not have read access on a requested block
*/
static Map<DatanodeInfo, HdfsBlocksMetadata> queryDatanodesForHdfsBlocksMetadata(
Configuration conf, Map<DatanodeInfo, List<LocatedBlock>> datanodeBlocks,
int poolsize, int timeoutMs, boolean connectToDnViaHostname)
throws InvalidBlockTokenException {
List<VolumeBlockLocationCallable> callables =
createVolumeBlockLocationCallables(conf, datanodeBlocks, timeoutMs,
connectToDnViaHostname);
// Use a thread pool to execute the Callables in parallel
List<Future<HdfsBlocksMetadata>> futures =
new ArrayList<Future<HdfsBlocksMetadata>>();
ExecutorService executor = new ScheduledThreadPoolExecutor(poolsize);
try {
futures = executor.invokeAll(callables, timeoutMs,
TimeUnit.MILLISECONDS);
} catch (InterruptedException e) {
// Swallow the exception here, because we can return partial results
}
executor.shutdown();
Map<DatanodeInfo, HdfsBlocksMetadata> metadatas =
Maps.newHashMapWithExpectedSize(datanodeBlocks.size());
// Fill in metadatas with results from DN RPCs, where possible
for (int i = 0; i < futures.size(); i++) {
VolumeBlockLocationCallable callable = callables.get(i);
DatanodeInfo datanode = callable.getDatanodeInfo();
Future<HdfsBlocksMetadata> future = futures.get(i);
try {
HdfsBlocksMetadata metadata = future.get();
metadatas.put(callable.getDatanodeInfo(), metadata);
} catch (CancellationException e) {
LOG.info("Cancelled while waiting for datanode "
+ datanode.getIpcAddr(false) + ": " + e.toString());
} catch (ExecutionException e) {
Throwable t = e.getCause();
if (t instanceof InvalidBlockTokenException) {
LOG.warn("Invalid access token when trying to retrieve "
+ "information from datanode " + datanode.getIpcAddr(false));
throw (InvalidBlockTokenException) t;
}
else if (t instanceof UnsupportedOperationException) {
LOG.info("Datanode " + datanode.getIpcAddr(false) + " does not support"
+ " required #getHdfsBlocksMetadata() API");
throw (UnsupportedOperationException) t;
} else {
LOG.info("Failed to query block locations on datanode " +
datanode.getIpcAddr(false) + ": " + t);
}
if (LOG.isDebugEnabled()) {
LOG.debug("Could not fetch information from datanode", t);
}
} catch (InterruptedException e) {
// Shouldn't happen, because invokeAll waits for all Futures to be ready
LOG.info("Interrupted while fetching HdfsBlocksMetadata");
}
}
return metadatas;
}
/**
* Group the per-replica {@link VolumeId} info returned from
* {@link DFSClient#queryDatanodesForHdfsBlocksMetadata(Map)} to be
* associated
* with the corresponding {@link LocatedBlock}.
*
* @param blocks
* Original LocatedBlock array
* @param metadatas
* VolumeId information for the replicas on each datanode
* @return blockVolumeIds per-replica VolumeId information associated with the
* parent LocatedBlock
*/
static Map<LocatedBlock, List<VolumeId>> associateVolumeIdsWithBlocks(
List<LocatedBlock> blocks,
Map<DatanodeInfo, HdfsBlocksMetadata> metadatas) {
// Initialize mapping of ExtendedBlock to LocatedBlock.
// Used to associate results from DN RPCs to the parent LocatedBlock
Map<Long, LocatedBlock> blockIdToLocBlock =
new HashMap<Long, LocatedBlock>();
for (LocatedBlock b : blocks) {
blockIdToLocBlock.put(b.getBlock().getBlockId(), b);
}
// Initialize the mapping of blocks -> list of VolumeIds, one per replica
// This is filled out with real values from the DN RPCs
Map<LocatedBlock, List<VolumeId>> blockVolumeIds =
new HashMap<LocatedBlock, List<VolumeId>>();
for (LocatedBlock b : blocks) {
ArrayList<VolumeId> l = new ArrayList<VolumeId>(b.getLocations().length);
for (int i = 0; i < b.getLocations().length; i++) {
l.add(null);
}
blockVolumeIds.put(b, l);
}
// Iterate through the list of metadatas (one per datanode).
// For each metadata, if it's valid, insert its volume location information
// into the Map returned to the caller
for (Map.Entry<DatanodeInfo, HdfsBlocksMetadata> entry : metadatas.entrySet()) {
DatanodeInfo datanode = entry.getKey();
HdfsBlocksMetadata metadata = entry.getValue();
// Check if metadata is valid
if (metadata == null) {
continue;
}
long[] metaBlockIds = metadata.getBlockIds();
List<byte[]> metaVolumeIds = metadata.getVolumeIds();
List<Integer> metaVolumeIndexes = metadata.getVolumeIndexes();
// Add VolumeId for each replica in the HdfsBlocksMetadata
for (int j = 0; j < metaBlockIds.length; j++) {
int volumeIndex = metaVolumeIndexes.get(j);
long blockId = metaBlockIds[j];
// Skip if block wasn't found, or not a valid index into metaVolumeIds
// Also skip if the DN responded with a block we didn't ask for
if (volumeIndex == Integer.MAX_VALUE
|| volumeIndex >= metaVolumeIds.size()
|| !blockIdToLocBlock.containsKey(blockId)) {
if (LOG.isDebugEnabled()) {
LOG.debug("No data for block " + blockId);
}
continue;
}
// Get the VolumeId by indexing into the list of VolumeIds
// provided by the datanode
byte[] volumeId = metaVolumeIds.get(volumeIndex);
HdfsVolumeId id = new HdfsVolumeId(volumeId);
// Find out which index we are in the LocatedBlock's replicas
LocatedBlock locBlock = blockIdToLocBlock.get(blockId);
DatanodeInfo[] dnInfos = locBlock.getLocations();
int index = -1;
for (int k = 0; k < dnInfos.length; k++) {
if (dnInfos[k].equals(datanode)) {
index = k;
break;
}
}
if (index < 0) {
if (LOG.isDebugEnabled()) {
LOG.debug("Datanode responded with a block volume id we did" +
" not request, omitting.");
}
continue;
}
// Place VolumeId at the same index as the DN's index in the list of
// replicas
List<VolumeId> volumeIds = blockVolumeIds.get(locBlock);
volumeIds.set(index, id);
}
}
return blockVolumeIds;
}
/**
* Helper method to combine a list of {@link LocatedBlock} with associated
* {@link VolumeId} information to form a list of {@link BlockStorageLocation}
* .
*/
static BlockStorageLocation[] convertToVolumeBlockLocations(
List<LocatedBlock> blocks,
Map<LocatedBlock, List<VolumeId>> blockVolumeIds) throws IOException {
// Construct the final return value of VolumeBlockLocation[]
BlockLocation[] locations = DFSUtil.locatedBlocks2Locations(blocks);
List<BlockStorageLocation> volumeBlockLocs =
new ArrayList<BlockStorageLocation>(locations.length);
for (int i = 0; i < locations.length; i++) {
LocatedBlock locBlock = blocks.get(i);
List<VolumeId> volumeIds = blockVolumeIds.get(locBlock);
BlockStorageLocation bsLoc = new BlockStorageLocation(locations[i],
volumeIds.toArray(new VolumeId[0]));
volumeBlockLocs.add(bsLoc);
}
return volumeBlockLocs.toArray(new BlockStorageLocation[] {});
}
/**
* Callable that sets up an RPC proxy to a datanode and queries it for
* volume location information for a list of ExtendedBlocks.
*/
private static class VolumeBlockLocationCallable implements
Callable<HdfsBlocksMetadata> {
private final Configuration configuration;
private final int timeout;
private final DatanodeInfo datanode;
private final String poolId;
private final long[] blockIds;
private final List<Token<BlockTokenIdentifier>> dnTokens;
private final boolean connectToDnViaHostname;
VolumeBlockLocationCallable(Configuration configuration,
DatanodeInfo datanode, String poolId, long []blockIds,
List<Token<BlockTokenIdentifier>> dnTokens, int timeout,
boolean connectToDnViaHostname) {
this.configuration = configuration;
this.timeout = timeout;
this.datanode = datanode;
this.poolId = poolId;
this.blockIds = blockIds;
this.dnTokens = dnTokens;
this.connectToDnViaHostname = connectToDnViaHostname;
}
public DatanodeInfo getDatanodeInfo() {
return datanode;
}
@Override
public HdfsBlocksMetadata call() throws Exception {
HdfsBlocksMetadata metadata = null;
// Create the RPC proxy and make the RPC
ClientDatanodeProtocol cdp = null;
try {
cdp = DFSUtil.createClientDatanodeProtocolProxy(datanode, configuration,
timeout, connectToDnViaHostname);
metadata = cdp.getHdfsBlocksMetadata(poolId, blockIds, dnTokens);
} catch (IOException e) {
// Bubble this up to the caller, handle with the Future
throw e;
} finally {
if (cdp != null) {
RPC.stopProxy(cdp);
}
}
return metadata;
}
}
}