blob: 8d218aef9e8a8b5a52d1d200d95c9dcf3e603aa9 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hdfs.server.datanode;
import com.google.common.base.Joiner;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hdfs.protocol.DatanodeID;
import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
import org.apache.hadoop.hdfs.protocol.RecoveryInProgressException;
import org.apache.hadoop.hdfs.protocolPB.DatanodeProtocolClientSideTranslatorPB;
import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.ReplicaState;
import org.apache.hadoop.hdfs.server.protocol.BlockRecoveryCommand.RecoveringBlock;
import org.apache.hadoop.hdfs.server.protocol.InterDatanodeProtocol;
import org.apache.hadoop.hdfs.server.protocol.ReplicaRecoveryInfo;
import org.apache.hadoop.ipc.RemoteException;
import org.apache.hadoop.util.Daemon;
import org.slf4j.Logger;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.List;
/**
* This class handles the block recovery work commands.
*/
@InterfaceAudience.Private
public class BlockRecoveryWorker {
public static final Logger LOG = DataNode.LOG;
private final DataNode datanode;
private final Configuration conf;
private final DNConf dnConf;
BlockRecoveryWorker(DataNode datanode) {
this.datanode = datanode;
conf = datanode.getConf();
dnConf = datanode.getDnConf();
}
/** A convenient class used in block recovery. */
static class BlockRecord {
private final DatanodeID id;
private final InterDatanodeProtocol datanode;
private final ReplicaRecoveryInfo rInfo;
private String storageID;
BlockRecord(DatanodeID id, InterDatanodeProtocol datanode,
ReplicaRecoveryInfo rInfo) {
this.id = id;
this.datanode = datanode;
this.rInfo = rInfo;
}
private void updateReplicaUnderRecovery(String bpid, long recoveryId,
long newBlockId, long newLength) throws IOException {
final ExtendedBlock b = new ExtendedBlock(bpid, rInfo);
storageID = datanode.updateReplicaUnderRecovery(b, recoveryId, newBlockId,
newLength);
}
@Override
public String toString() {
return "block:" + rInfo + " node:" + id;
}
}
/** A block recovery task for a contiguous block. */
class RecoveryTaskContiguous {
private final RecoveringBlock rBlock;
private final ExtendedBlock block;
private final String bpid;
private final DatanodeInfo[] locs;
private final long recoveryId;
RecoveryTaskContiguous(RecoveringBlock rBlock) {
this.rBlock = rBlock;
block = rBlock.getBlock();
bpid = block.getBlockPoolId();
locs = rBlock.getLocations();
recoveryId = rBlock.getNewGenerationStamp();
}
protected void recover() throws IOException {
List<BlockRecord> syncList = new ArrayList<>(locs.length);
int errorCount = 0;
int candidateReplicaCnt = 0;
// Check generation stamps, replica size and state. Replica must satisfy
// the following criteria to be included in syncList for recovery:
// - Valid generation stamp
// - Non-zero length
// - Original state is RWR or better
for(DatanodeID id : locs) {
try {
DatanodeID bpReg = new DatanodeID(
datanode.getBPOfferService(bpid).bpRegistration);
InterDatanodeProtocol proxyDN = bpReg.equals(id)?
datanode: DataNode.createInterDataNodeProtocolProxy(id, conf,
dnConf.socketTimeout, dnConf.connectToDnViaHostname);
ReplicaRecoveryInfo info = callInitReplicaRecovery(proxyDN, rBlock);
if (info != null &&
info.getGenerationStamp() >= block.getGenerationStamp() &&
info.getNumBytes() > 0) {
// Count the number of candidate replicas received.
++candidateReplicaCnt;
if (info.getOriginalReplicaState().getValue() <=
ReplicaState.RWR.getValue()) {
syncList.add(new BlockRecord(id, proxyDN, info));
} else {
if (LOG.isDebugEnabled()) {
LOG.debug("Block recovery: Ignored replica with invalid " +
"original state: " + info + " from DataNode: " + id);
}
}
} else {
if (LOG.isDebugEnabled()) {
if (info == null) {
LOG.debug("Block recovery: DataNode: " + id + " does not have "
+ "replica for block: " + block);
} else {
LOG.debug("Block recovery: Ignored replica with invalid "
+ "generation stamp or length: " + info + " from " +
"DataNode: " + id);
}
}
}
} catch (RecoveryInProgressException ripE) {
InterDatanodeProtocol.LOG.warn(
"Recovery for replica " + block + " on data-node " + id
+ " is already in progress. Recovery id = "
+ rBlock.getNewGenerationStamp() + " is aborted.", ripE);
return;
} catch (IOException e) {
++errorCount;
InterDatanodeProtocol.LOG.warn("Failed to recover block (block="
+ block + ", datanode=" + id + ")", e);
}
}
if (errorCount == locs.length) {
throw new IOException("All datanodes failed: block=" + block
+ ", datanodeids=" + Arrays.asList(locs));
}
// None of the replicas reported by DataNodes has the required original
// state, report the error.
if (candidateReplicaCnt > 0 && syncList.isEmpty()) {
throw new IOException("Found " + candidateReplicaCnt +
" replica(s) for block " + block + " but none is in " +
ReplicaState.RWR.name() + " or better state. datanodeids=" +
Arrays.asList(locs));
}
syncBlock(syncList);
}
/** Block synchronization. */
void syncBlock(List<BlockRecord> syncList) throws IOException {
DatanodeProtocolClientSideTranslatorPB nn =
getActiveNamenodeForBP(block.getBlockPoolId());
boolean isTruncateRecovery = rBlock.getNewBlock() != null;
long blockId = (isTruncateRecovery) ?
rBlock.getNewBlock().getBlockId() : block.getBlockId();
LOG.info("BlockRecoveryWorker: block={} (length={}),"
+ " isTruncateRecovery={}, syncList={}", block,
block.getNumBytes(), isTruncateRecovery, syncList);
// syncList.isEmpty() means that all data-nodes do not have the block
// or their replicas have 0 length.
// The block can be deleted.
if (syncList.isEmpty()) {
if (LOG.isDebugEnabled()) {
LOG.debug("syncBlock for block " + block + ", all datanodes don't " +
"have the block or their replicas have 0 length. The block can " +
"be deleted.");
}
nn.commitBlockSynchronization(block, recoveryId, 0,
true, true, DatanodeID.EMPTY_ARRAY, null);
return;
}
// Calculate the best available replica state.
ReplicaState bestState = ReplicaState.RWR;
long finalizedLength = -1;
for (BlockRecord r : syncList) {
assert r.rInfo.getNumBytes() > 0 : "zero length replica";
ReplicaState rState = r.rInfo.getOriginalReplicaState();
if (rState.getValue() < bestState.getValue()) {
bestState = rState;
}
if(rState == ReplicaState.FINALIZED) {
if (finalizedLength > 0 && finalizedLength != r.rInfo.getNumBytes()) {
throw new IOException("Inconsistent size of finalized replicas. " +
"Replica " + r.rInfo + " expected size: " + finalizedLength);
}
finalizedLength = r.rInfo.getNumBytes();
}
}
// Calculate list of nodes that will participate in the recovery
// and the new block size
List<BlockRecord> participatingList = new ArrayList<>();
final ExtendedBlock newBlock = new ExtendedBlock(bpid, blockId,
-1, recoveryId);
switch(bestState) {
case FINALIZED:
assert finalizedLength > 0 : "finalizedLength is not positive";
for(BlockRecord r : syncList) {
ReplicaState rState = r.rInfo.getOriginalReplicaState();
if (rState == ReplicaState.FINALIZED ||
rState == ReplicaState.RBW &&
r.rInfo.getNumBytes() == finalizedLength) {
participatingList.add(r);
}
if (LOG.isDebugEnabled()) {
LOG.debug("syncBlock replicaInfo: block=" + block +
", from datanode " + r.id + ", receivedState=" + rState.name() +
", receivedLength=" + r.rInfo.getNumBytes() +
", bestState=FINALIZED, finalizedLength=" + finalizedLength);
}
}
newBlock.setNumBytes(finalizedLength);
break;
case RBW:
case RWR:
long minLength = Long.MAX_VALUE;
for(BlockRecord r : syncList) {
ReplicaState rState = r.rInfo.getOriginalReplicaState();
if(rState == bestState) {
minLength = Math.min(minLength, r.rInfo.getNumBytes());
participatingList.add(r);
}
if (LOG.isDebugEnabled()) {
LOG.debug("syncBlock replicaInfo: block=" + block +
", from datanode " + r.id + ", receivedState=" + rState.name() +
", receivedLength=" + r.rInfo.getNumBytes() + ", bestState=" +
bestState.name());
}
}
// recover() guarantees syncList will have at least one replica with RWR
// or better state.
assert minLength != Long.MAX_VALUE : "wrong minLength";
newBlock.setNumBytes(minLength);
break;
case RUR:
case TEMPORARY:
assert false : "bad replica state: " + bestState;
default:
break; // we have 'case' all enum values
}
if (isTruncateRecovery) {
newBlock.setNumBytes(rBlock.getNewBlock().getNumBytes());
}
LOG.info("BlockRecoveryWorker: block={} (length={}), bestState={},"
+ " newBlock={} (length={}), participatingList={}",
block, block.getNumBytes(), bestState.name(), newBlock,
newBlock.getNumBytes(), participatingList);
List<DatanodeID> failedList = new ArrayList<>();
final List<BlockRecord> successList = new ArrayList<>();
for (BlockRecord r : participatingList) {
try {
r.updateReplicaUnderRecovery(bpid, recoveryId, blockId,
newBlock.getNumBytes());
successList.add(r);
} catch (IOException e) {
InterDatanodeProtocol.LOG.warn("Failed to updateBlock (newblock="
+ newBlock + ", datanode=" + r.id + ")", e);
failedList.add(r.id);
}
}
// Abort if all failed.
if (successList.isEmpty()) {
StringBuilder b = new StringBuilder();
for(DatanodeID id : failedList) {
b.append("\n " + id);
}
throw new IOException("Cannot recover " + block + ", the following "
+ failedList.size() + " data-nodes failed {" + b + "\n}");
}
// Notify the name-node about successfully recovered replicas.
final DatanodeID[] datanodes = new DatanodeID[successList.size()];
final String[] storages = new String[datanodes.length];
for (int i = 0; i < datanodes.length; i++) {
final BlockRecord r = successList.get(i);
datanodes[i] = r.id;
storages[i] = r.storageID;
}
if (LOG.isDebugEnabled()) {
LOG.debug("Datanode triggering commitBlockSynchronization, block=" +
block + ", newGs=" + newBlock.getGenerationStamp() +
", newLength=" + newBlock.getNumBytes());
}
nn.commitBlockSynchronization(block,
newBlock.getGenerationStamp(), newBlock.getNumBytes(), true, false,
datanodes, storages);
}
}
private static void logRecoverBlock(String who, RecoveringBlock rb) {
ExtendedBlock block = rb.getBlock();
DatanodeInfo[] targets = rb.getLocations();
LOG.info("BlockRecoveryWorker: " + who + " calls recoverBlock(" + block
+ ", targets=[" + Joiner.on(", ").join(targets) + "]"
+ ", newGenerationStamp=" + rb.getNewGenerationStamp()
+ ", newBlock=" + rb.getNewBlock()
+ ")");
}
/**
* Convenience method, which unwraps RemoteException.
* @throws IOException not a RemoteException.
*/
private static ReplicaRecoveryInfo callInitReplicaRecovery(
InterDatanodeProtocol datanode, RecoveringBlock rBlock)
throws IOException {
try {
return datanode.initReplicaRecovery(rBlock);
} catch(RemoteException re) {
throw re.unwrapRemoteException();
}
}
/**
* Get the NameNode corresponding to the given block pool.
*
* @param bpid Block pool Id
* @return Namenode corresponding to the bpid
* @throws IOException if unable to get the corresponding NameNode
*/
DatanodeProtocolClientSideTranslatorPB getActiveNamenodeForBP(
String bpid) throws IOException {
BPOfferService bpos = datanode.getBPOfferService(bpid);
if (bpos == null) {
throw new IOException("No block pool offer service for bpid=" + bpid);
}
DatanodeProtocolClientSideTranslatorPB activeNN = bpos.getActiveNN();
if (activeNN == null) {
throw new IOException(
"Block pool " + bpid + " has not recognized an active NN");
}
return activeNN;
}
public Daemon recoverBlocks(final String who,
final Collection<RecoveringBlock> blocks) {
Daemon d = new Daemon(datanode.threadGroup, new Runnable() {
@Override
public void run() {
for(RecoveringBlock b : blocks) {
try {
logRecoverBlock(who, b);
RecoveryTaskContiguous task = new RecoveryTaskContiguous(b);
task.recover();
} catch (IOException e) {
LOG.warn("recoverBlocks FAILED: " + b, e);
}
}
}
});
d.start();
return d;
}
}