blob: 59ca11a54043df886ad5d96982565310d96861c6 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hdfs.server.datanode;
import static org.apache.hadoop.util.Time.now;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.net.SocketTimeoutException;
import java.util.*;
import com.google.common.base.Joiner;
import org.apache.commons.logging.Log;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.ha.HAServiceProtocol.HAServiceState;
import org.apache.hadoop.hdfs.DFSUtil;
import org.apache.hadoop.hdfs.StorageType;
import org.apache.hadoop.hdfs.protocol.BlockListAsLongs;
import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
import org.apache.hadoop.hdfs.protocol.LocatedBlock;
import org.apache.hadoop.hdfs.protocol.RollingUpgradeStatus;
import org.apache.hadoop.hdfs.protocol.UnregisteredNodeException;
import org.apache.hadoop.hdfs.protocolPB.DatanodeProtocolClientSideTranslatorPB;
import org.apache.hadoop.hdfs.server.common.IncorrectVersionException;
import org.apache.hadoop.hdfs.server.namenode.FSNamesystem;
import org.apache.hadoop.hdfs.server.protocol.DatanodeCommand;
import org.apache.hadoop.hdfs.server.protocol.DatanodeRegistration;
import org.apache.hadoop.hdfs.server.protocol.DatanodeStorage;
import org.apache.hadoop.hdfs.server.protocol.DisallowedDatanodeException;
import org.apache.hadoop.hdfs.server.protocol.HeartbeatResponse;
import org.apache.hadoop.hdfs.server.protocol.NamespaceInfo;
import org.apache.hadoop.hdfs.server.protocol.ReceivedDeletedBlockInfo;
import org.apache.hadoop.hdfs.server.protocol.StorageBlockReport;
import org.apache.hadoop.hdfs.server.protocol.StorageReceivedDeletedBlocks;
import org.apache.hadoop.hdfs.server.protocol.StorageReport;
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.ipc.RemoteException;
import org.apache.hadoop.util.Time;
import org.apache.hadoop.util.VersionInfo;
import org.apache.hadoop.util.VersionUtil;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.Maps;
/**
* A thread per active or standby namenode to perform:
* <ul>
* <li> Pre-registration handshake with namenode</li>
* <li> Registration with namenode</li>
* <li> Send periodic heartbeats to the namenode</li>
* <li> Handle commands received from the namenode</li>
* </ul>
*/
@InterfaceAudience.Private
class BPServiceActor implements Runnable {
static final Log LOG = DataNode.LOG;
final InetSocketAddress nnAddr;
HAServiceState state;
final BPOfferService bpos;
// lastBlockReport, lastDeletedReport and lastHeartbeat may be assigned/read
// by testing threads (through BPServiceActor#triggerXXX), while also
// assigned/read by the actor thread. Thus they should be declared as volatile
// to make sure the "happens-before" consistency.
volatile long lastBlockReport = 0;
volatile long lastDeletedReport = 0;
boolean resetBlockReportTime = true;
volatile long lastCacheReport = 0;
Thread bpThread;
DatanodeProtocolClientSideTranslatorPB bpNamenode;
private volatile long lastHeartbeat = 0;
static enum RunningState {
CONNECTING, INIT_FAILED, RUNNING, EXITED, FAILED;
}
private volatile RunningState runningState = RunningState.CONNECTING;
/**
* Between block reports (which happen on the order of once an hour) the
* DN reports smaller incremental changes to its block list. This map,
* keyed by block ID, contains the pending changes which have yet to be
* reported to the NN. Access should be synchronized on this object.
*/
private final Map<DatanodeStorage, PerStoragePendingIncrementalBR>
pendingIncrementalBRperStorage = Maps.newHashMap();
// IBR = Incremental Block Report. If this flag is set then an IBR will be
// sent immediately by the actor thread without waiting for the IBR timer
// to elapse.
private volatile boolean sendImmediateIBR = false;
private volatile boolean shouldServiceRun = true;
private final DataNode dn;
private final DNConf dnConf;
private DatanodeRegistration bpRegistration;
BPServiceActor(InetSocketAddress nnAddr, BPOfferService bpos) {
this.bpos = bpos;
this.dn = bpos.getDataNode();
this.nnAddr = nnAddr;
this.dnConf = dn.getDnConf();
}
boolean isAlive() {
if (!shouldServiceRun || !bpThread.isAlive()) {
return false;
}
return runningState == BPServiceActor.RunningState.RUNNING
|| runningState == BPServiceActor.RunningState.CONNECTING;
}
@Override
public String toString() {
return bpos.toString() + " service to " + nnAddr;
}
InetSocketAddress getNNSocketAddress() {
return nnAddr;
}
/**
* Used to inject a spy NN in the unit tests.
*/
@VisibleForTesting
void setNameNode(DatanodeProtocolClientSideTranslatorPB dnProtocol) {
bpNamenode = dnProtocol;
}
@VisibleForTesting
DatanodeProtocolClientSideTranslatorPB getNameNodeProxy() {
return bpNamenode;
}
/**
* Perform the first part of the handshake with the NameNode.
* This calls <code>versionRequest</code> to determine the NN's
* namespace and version info. It automatically retries until
* the NN responds or the DN is shutting down.
*
* @return the NamespaceInfo
*/
@VisibleForTesting
NamespaceInfo retrieveNamespaceInfo() throws IOException {
NamespaceInfo nsInfo = null;
while (shouldRun()) {
try {
nsInfo = bpNamenode.versionRequest();
LOG.debug(this + " received versionRequest response: " + nsInfo);
break;
} catch(SocketTimeoutException e) { // namenode is busy
LOG.warn("Problem connecting to server: " + nnAddr);
} catch(IOException e ) { // namenode is not available
LOG.warn("Problem connecting to server: " + nnAddr);
}
// try again in a second
sleepAndLogInterrupts(5000, "requesting version info from NN");
}
if (nsInfo != null) {
checkNNVersion(nsInfo);
} else {
throw new IOException("DN shut down before block pool connected");
}
return nsInfo;
}
private void checkNNVersion(NamespaceInfo nsInfo)
throws IncorrectVersionException {
// build and layout versions should match
String nnVersion = nsInfo.getSoftwareVersion();
String minimumNameNodeVersion = dnConf.getMinimumNameNodeVersion();
if (VersionUtil.compareVersions(nnVersion, minimumNameNodeVersion) < 0) {
IncorrectVersionException ive = new IncorrectVersionException(
minimumNameNodeVersion, nnVersion, "NameNode", "DataNode");
LOG.warn(ive.getMessage());
throw ive;
}
String dnVersion = VersionInfo.getVersion();
if (!nnVersion.equals(dnVersion)) {
LOG.info("Reported NameNode version '" + nnVersion + "' does not match " +
"DataNode version '" + dnVersion + "' but is within acceptable " +
"limits. Note: This is normal during a rolling upgrade.");
}
}
private void connectToNNAndHandshake() throws IOException {
// get NN proxy
bpNamenode = dn.connectToNN(nnAddr);
// First phase of the handshake with NN - get the namespace
// info.
NamespaceInfo nsInfo = retrieveNamespaceInfo();
// Verify that this matches the other NN in this HA pair.
// This also initializes our block pool in the DN if we are
// the first NN connection for this BP.
bpos.verifyAndSetNamespaceInfo(nsInfo);
// Second phase of the handshake with the NN.
register();
}
// This is useful to make sure NN gets Heartbeat before Blockreport
// upon NN restart while DN keeps retrying Otherwise,
// 1. NN restarts.
// 2. Heartbeat RPC will retry and succeed. NN asks DN to reregister.
// 3. After reregistration completes, DN will send Blockreport first.
// 4. Given NN receives Blockreport after Heartbeat, it won't mark
// DatanodeStorageInfo#blockContentsStale to false until the next
// Blockreport.
void scheduleHeartbeat() {
lastHeartbeat = 0;
}
/**
* This methods arranges for the data node to send the block report at
* the next heartbeat.
*/
void scheduleBlockReport(long delay) {
if (delay > 0) { // send BR after random delay
lastBlockReport = Time.now()
- ( dnConf.blockReportInterval - DFSUtil.getRandom().nextInt((int)(delay)));
} else { // send at next heartbeat
lastBlockReport = lastHeartbeat - dnConf.blockReportInterval;
}
resetBlockReportTime = true; // reset future BRs for randomness
}
void reportBadBlocks(ExtendedBlock block,
String storageUuid, StorageType storageType) {
if (bpRegistration == null) {
return;
}
DatanodeInfo[] dnArr = { new DatanodeInfo(bpRegistration) };
String[] uuids = { storageUuid };
StorageType[] types = { storageType };
LocatedBlock[] blocks = { new LocatedBlock(block, dnArr, uuids, types) };
try {
bpNamenode.reportBadBlocks(blocks);
} catch (IOException e){
/* One common reason is that NameNode could be in safe mode.
* Should we keep on retrying in that case?
*/
LOG.warn("Failed to report bad block " + block + " to namenode : "
+ " Exception", e);
}
}
/**
* Report received blocks and delete hints to the Namenode for each
* storage.
*
* @throws IOException
*/
private void reportReceivedDeletedBlocks() throws IOException {
// Generate a list of the pending reports for each storage under the lock
ArrayList<StorageReceivedDeletedBlocks> reports =
new ArrayList<StorageReceivedDeletedBlocks>(pendingIncrementalBRperStorage.size());
synchronized (pendingIncrementalBRperStorage) {
for (Map.Entry<DatanodeStorage, PerStoragePendingIncrementalBR> entry :
pendingIncrementalBRperStorage.entrySet()) {
final DatanodeStorage storage = entry.getKey();
final PerStoragePendingIncrementalBR perStorageMap = entry.getValue();
if (perStorageMap.getBlockInfoCount() > 0) {
// Send newly-received and deleted blockids to namenode
ReceivedDeletedBlockInfo[] rdbi = perStorageMap.dequeueBlockInfos();
reports.add(new StorageReceivedDeletedBlocks(storage, rdbi));
}
}
sendImmediateIBR = false;
}
if (reports.size() == 0) {
// Nothing new to report.
return;
}
// Send incremental block reports to the Namenode outside the lock
boolean success = false;
try {
bpNamenode.blockReceivedAndDeleted(bpRegistration,
bpos.getBlockPoolId(),
reports.toArray(new StorageReceivedDeletedBlocks[reports.size()]));
success = true;
} finally {
if (!success) {
synchronized (pendingIncrementalBRperStorage) {
for (StorageReceivedDeletedBlocks report : reports) {
// If we didn't succeed in sending the report, put all of the
// blocks back onto our queue, but only in the case where we
// didn't put something newer in the meantime.
PerStoragePendingIncrementalBR perStorageMap =
pendingIncrementalBRperStorage.get(report.getStorage());
perStorageMap.putMissingBlockInfos(report.getBlocks());
sendImmediateIBR = true;
}
}
}
}
}
/**
* @return pending incremental block report for given {@code storage}
*/
private PerStoragePendingIncrementalBR getIncrementalBRMapForStorage(
DatanodeStorage storage) {
PerStoragePendingIncrementalBR mapForStorage =
pendingIncrementalBRperStorage.get(storage);
if (mapForStorage == null) {
// This is the first time we are adding incremental BR state for
// this storage so create a new map. This is required once per
// storage, per service actor.
mapForStorage = new PerStoragePendingIncrementalBR();
pendingIncrementalBRperStorage.put(storage, mapForStorage);
}
return mapForStorage;
}
/**
* Add a blockInfo for notification to NameNode. If another entry
* exists for the same block it is removed.
*
* Caller must synchronize access using pendingIncrementalBRperStorage.
*/
void addPendingReplicationBlockInfo(ReceivedDeletedBlockInfo bInfo,
DatanodeStorage storage) {
// Make sure another entry for the same block is first removed.
// There may only be one such entry.
for (Map.Entry<DatanodeStorage, PerStoragePendingIncrementalBR> entry :
pendingIncrementalBRperStorage.entrySet()) {
if (entry.getValue().removeBlockInfo(bInfo)) {
break;
}
}
getIncrementalBRMapForStorage(storage).putBlockInfo(bInfo);
}
/*
* Informing the name node could take a long long time! Should we wait
* till namenode is informed before responding with success to the
* client? For now we don't.
*/
void notifyNamenodeBlockImmediately(
ReceivedDeletedBlockInfo bInfo, String storageUuid) {
synchronized (pendingIncrementalBRperStorage) {
addPendingReplicationBlockInfo(
bInfo, dn.getFSDataset().getStorage(storageUuid));
sendImmediateIBR = true;
pendingIncrementalBRperStorage.notifyAll();
}
}
void notifyNamenodeDeletedBlock(
ReceivedDeletedBlockInfo bInfo, String storageUuid) {
synchronized (pendingIncrementalBRperStorage) {
addPendingReplicationBlockInfo(
bInfo, dn.getFSDataset().getStorage(storageUuid));
}
}
/**
* Run an immediate block report on this thread. Used by tests.
*/
@VisibleForTesting
void triggerBlockReportForTests() {
synchronized (pendingIncrementalBRperStorage) {
lastBlockReport = 0;
lastHeartbeat = 0;
pendingIncrementalBRperStorage.notifyAll();
while (lastBlockReport == 0) {
try {
pendingIncrementalBRperStorage.wait(100);
} catch (InterruptedException e) {
return;
}
}
}
}
@VisibleForTesting
void triggerHeartbeatForTests() {
synchronized (pendingIncrementalBRperStorage) {
lastHeartbeat = 0;
pendingIncrementalBRperStorage.notifyAll();
while (lastHeartbeat == 0) {
try {
pendingIncrementalBRperStorage.wait(100);
} catch (InterruptedException e) {
return;
}
}
}
}
@VisibleForTesting
void triggerDeletionReportForTests() {
synchronized (pendingIncrementalBRperStorage) {
lastDeletedReport = 0;
pendingIncrementalBRperStorage.notifyAll();
while (lastDeletedReport == 0) {
try {
pendingIncrementalBRperStorage.wait(100);
} catch (InterruptedException e) {
return;
}
}
}
}
@VisibleForTesting
boolean hasPendingIBR() {
return sendImmediateIBR;
}
/**
* Report the list blocks to the Namenode
* @return DatanodeCommands returned by the NN. May be null.
* @throws IOException
*/
List<DatanodeCommand> blockReport() throws IOException {
// send block report if timer has expired.
final long startTime = now();
if (startTime - lastBlockReport <= dnConf.blockReportInterval) {
return null;
}
ArrayList<DatanodeCommand> cmds = new ArrayList<DatanodeCommand>();
// Flush any block information that precedes the block report. Otherwise
// we have a chance that we will miss the delHint information
// or we will report an RBW replica after the BlockReport already reports
// a FINALIZED one.
reportReceivedDeletedBlocks();
lastDeletedReport = startTime;
long brCreateStartTime = now();
Map<DatanodeStorage, BlockListAsLongs> perVolumeBlockLists =
dn.getFSDataset().getBlockReports(bpos.getBlockPoolId());
// Convert the reports to the format expected by the NN.
int i = 0;
int totalBlockCount = 0;
StorageBlockReport reports[] =
new StorageBlockReport[perVolumeBlockLists.size()];
for(Map.Entry<DatanodeStorage, BlockListAsLongs> kvPair : perVolumeBlockLists.entrySet()) {
BlockListAsLongs blockList = kvPair.getValue();
reports[i++] = new StorageBlockReport(
kvPair.getKey(), blockList.getBlockListAsLongs());
totalBlockCount += blockList.getNumberOfBlocks();
}
// Send the reports to the NN.
int numReportsSent;
long brSendStartTime = now();
if (totalBlockCount < dnConf.blockReportSplitThreshold) {
// Below split threshold, send all reports in a single message.
numReportsSent = 1;
DatanodeCommand cmd =
bpNamenode.blockReport(bpRegistration, bpos.getBlockPoolId(), reports);
if (cmd != null) {
cmds.add(cmd);
}
} else {
// Send one block report per message.
numReportsSent = i;
for (StorageBlockReport report : reports) {
StorageBlockReport singleReport[] = { report };
DatanodeCommand cmd = bpNamenode.blockReport(
bpRegistration, bpos.getBlockPoolId(), singleReport);
if (cmd != null) {
cmds.add(cmd);
}
}
}
// Log the block report processing stats from Datanode perspective
long brSendCost = now() - brSendStartTime;
long brCreateCost = brSendStartTime - brCreateStartTime;
dn.getMetrics().addBlockReport(brSendCost);
LOG.info("Sent " + numReportsSent + " blockreports " + totalBlockCount +
" blocks total. Took " + brCreateCost +
" msec to generate and " + brSendCost +
" msecs for RPC and NN processing. " +
" Got back commands " +
(cmds.size() == 0 ? "none" : Joiner.on("; ").join(cmds)));
scheduleNextBlockReport(startTime);
return cmds.size() == 0 ? null : cmds;
}
private void scheduleNextBlockReport(long previousReportStartTime) {
// If we have sent the first set of block reports, then wait a random
// time before we start the periodic block reports.
if (resetBlockReportTime) {
lastBlockReport = previousReportStartTime -
DFSUtil.getRandom().nextInt((int)(dnConf.blockReportInterval));
resetBlockReportTime = false;
} else {
/* say the last block report was at 8:20:14. The current report
* should have started around 9:20:14 (default 1 hour interval).
* If current time is :
* 1) normal like 9:20:18, next report should be at 10:20:14
* 2) unexpected like 11:35:43, next report should be at 12:20:14
*/
lastBlockReport += (now() - lastBlockReport) /
dnConf.blockReportInterval * dnConf.blockReportInterval;
}
}
DatanodeCommand cacheReport() throws IOException {
// If caching is disabled, do not send a cache report
if (dn.getFSDataset().getCacheCapacity() == 0) {
return null;
}
// send cache report if timer has expired.
DatanodeCommand cmd = null;
final long startTime = Time.monotonicNow();
if (startTime - lastCacheReport > dnConf.cacheReportInterval) {
if (LOG.isDebugEnabled()) {
LOG.debug("Sending cacheReport from service actor: " + this);
}
lastCacheReport = startTime;
String bpid = bpos.getBlockPoolId();
List<Long> blockIds = dn.getFSDataset().getCacheReport(bpid);
long createTime = Time.monotonicNow();
cmd = bpNamenode.cacheReport(bpRegistration, bpid, blockIds);
long sendTime = Time.monotonicNow();
long createCost = createTime - startTime;
long sendCost = sendTime - createTime;
dn.getMetrics().addCacheReport(sendCost);
LOG.debug("CacheReport of " + blockIds.size()
+ " block(s) took " + createCost + " msec to generate and "
+ sendCost + " msecs for RPC and NN processing");
}
return cmd;
}
HeartbeatResponse sendHeartBeat() throws IOException {
StorageReport[] reports =
dn.getFSDataset().getStorageReports(bpos.getBlockPoolId());
if (LOG.isDebugEnabled()) {
LOG.debug("Sending heartbeat with " + reports.length +
" storage reports from service actor: " + this);
}
return bpNamenode.sendHeartbeat(bpRegistration,
reports,
dn.getFSDataset().getCacheCapacity(),
dn.getFSDataset().getCacheUsed(),
dn.getXmitsInProgress(),
dn.getXceiverCount(),
dn.getFSDataset().getNumFailedVolumes());
}
//This must be called only by BPOfferService
void start() {
if ((bpThread != null) && (bpThread.isAlive())) {
//Thread is started already
return;
}
bpThread = new Thread(this, formatThreadName());
bpThread.setDaemon(true); // needed for JUnit testing
bpThread.start();
}
private String formatThreadName() {
Collection<StorageLocation> dataDirs =
DataNode.getStorageLocations(dn.getConf());
return "DataNode: [" + dataDirs.toString() + "] " +
" heartbeating to " + nnAddr;
}
//This must be called only by blockPoolManager.
void stop() {
shouldServiceRun = false;
if (bpThread != null) {
bpThread.interrupt();
}
}
//This must be called only by blockPoolManager
void join() {
try {
if (bpThread != null) {
bpThread.join();
}
} catch (InterruptedException ie) { }
}
//Cleanup method to be called by current thread before exiting.
private synchronized void cleanUp() {
shouldServiceRun = false;
IOUtils.cleanup(LOG, bpNamenode);
bpos.shutdownActor(this);
}
private void handleRollingUpgradeStatus(HeartbeatResponse resp) {
RollingUpgradeStatus rollingUpgradeStatus = resp.getRollingUpdateStatus();
if (rollingUpgradeStatus != null &&
rollingUpgradeStatus.getBlockPoolId().compareTo(bpos.getBlockPoolId()) != 0) {
// Can this ever occur?
LOG.error("Invalid BlockPoolId " +
rollingUpgradeStatus.getBlockPoolId() +
" in HeartbeatResponse. Expected " +
bpos.getBlockPoolId());
} else {
bpos.signalRollingUpgrade(rollingUpgradeStatus != null);
}
}
/**
* Main loop for each BP thread. Run until shutdown,
* forever calling remote NameNode functions.
*/
private void offerService() throws Exception {
LOG.info("For namenode " + nnAddr + " using"
+ " DELETEREPORT_INTERVAL of " + dnConf.deleteReportInterval + " msec "
+ " BLOCKREPORT_INTERVAL of " + dnConf.blockReportInterval + "msec"
+ " CACHEREPORT_INTERVAL of " + dnConf.cacheReportInterval + "msec"
+ " Initial delay: " + dnConf.initialBlockReportDelay + "msec"
+ "; heartBeatInterval=" + dnConf.heartBeatInterval);
//
// Now loop for a long time....
//
while (shouldRun()) {
try {
final long startTime = now();
//
// Every so often, send heartbeat or block-report
//
if (startTime - lastHeartbeat >= dnConf.heartBeatInterval) {
//
// All heartbeat messages include following info:
// -- Datanode name
// -- data transfer port
// -- Total capacity
// -- Bytes remaining
//
lastHeartbeat = startTime;
if (!dn.areHeartbeatsDisabledForTests()) {
HeartbeatResponse resp = sendHeartBeat();
assert resp != null;
dn.getMetrics().addHeartbeat(now() - startTime);
// If the state of this NN has changed (eg STANDBY->ACTIVE)
// then let the BPOfferService update itself.
//
// Important that this happens before processCommand below,
// since the first heartbeat to a new active might have commands
// that we should actually process.
bpos.updateActorStatesFromHeartbeat(
this, resp.getNameNodeHaState());
state = resp.getNameNodeHaState().getState();
if (state == HAServiceState.ACTIVE) {
handleRollingUpgradeStatus(resp);
}
long startProcessCommands = now();
if (!processCommand(resp.getCommands()))
continue;
long endProcessCommands = now();
if (endProcessCommands - startProcessCommands > 2000) {
LOG.info("Took " + (endProcessCommands - startProcessCommands)
+ "ms to process " + resp.getCommands().length
+ " commands from NN");
}
}
}
if (sendImmediateIBR ||
(startTime - lastDeletedReport > dnConf.deleteReportInterval)) {
reportReceivedDeletedBlocks();
lastDeletedReport = startTime;
}
List<DatanodeCommand> cmds = blockReport();
processCommand(cmds == null ? null : cmds.toArray(new DatanodeCommand[cmds.size()]));
DatanodeCommand cmd = cacheReport();
processCommand(new DatanodeCommand[]{ cmd });
// Now safe to start scanning the block pool.
// If it has already been started, this is a no-op.
if (dn.blockScanner != null) {
dn.blockScanner.addBlockPool(bpos.getBlockPoolId());
}
//
// There is no work to do; sleep until hearbeat timer elapses,
// or work arrives, and then iterate again.
//
long waitTime = dnConf.heartBeatInterval -
(Time.now() - lastHeartbeat);
synchronized(pendingIncrementalBRperStorage) {
if (waitTime > 0 && !sendImmediateIBR) {
try {
pendingIncrementalBRperStorage.wait(waitTime);
} catch (InterruptedException ie) {
LOG.warn("BPOfferService for " + this + " interrupted");
}
}
} // synchronized
} catch(RemoteException re) {
String reClass = re.getClassName();
if (UnregisteredNodeException.class.getName().equals(reClass) ||
DisallowedDatanodeException.class.getName().equals(reClass) ||
IncorrectVersionException.class.getName().equals(reClass)) {
LOG.warn(this + " is shutting down", re);
shouldServiceRun = false;
return;
}
LOG.warn("RemoteException in offerService", re);
try {
long sleepTime = Math.min(1000, dnConf.heartBeatInterval);
Thread.sleep(sleepTime);
} catch (InterruptedException ie) {
Thread.currentThread().interrupt();
}
} catch (IOException e) {
LOG.warn("IOException in offerService", e);
}
} // while (shouldRun())
} // offerService
/**
* Register one bp with the corresponding NameNode
* <p>
* The bpDatanode needs to register with the namenode on startup in order
* 1) to report which storage it is serving now and
* 2) to receive a registrationID
*
* issued by the namenode to recognize registered datanodes.
*
* @see FSNamesystem#registerDatanode(DatanodeRegistration)
* @throws IOException
*/
void register() throws IOException {
// The handshake() phase loaded the block pool storage
// off disk - so update the bpRegistration object from that info
bpRegistration = bpos.createRegistration();
LOG.info(this + " beginning handshake with NN");
while (shouldRun()) {
try {
// Use returned registration from namenode with updated fields
bpRegistration = bpNamenode.registerDatanode(bpRegistration);
break;
} catch(SocketTimeoutException e) { // namenode is busy
LOG.info("Problem connecting to server: " + nnAddr);
sleepAndLogInterrupts(1000, "connecting to server");
}
}
LOG.info("Block pool " + this + " successfully registered with NN");
bpos.registrationSucceeded(this, bpRegistration);
// random short delay - helps scatter the BR from all DNs
scheduleBlockReport(dnConf.initialBlockReportDelay);
}
private void sleepAndLogInterrupts(int millis,
String stateString) {
try {
Thread.sleep(millis);
} catch (InterruptedException ie) {
LOG.info("BPOfferService " + this + " interrupted while " + stateString);
}
}
/**
* No matter what kind of exception we get, keep retrying to offerService().
* That's the loop that connects to the NameNode and provides basic DataNode
* functionality.
*
* Only stop when "shouldRun" or "shouldServiceRun" is turned off, which can
* happen either at shutdown or due to refreshNamenodes.
*/
@Override
public void run() {
LOG.info(this + " starting to offer service");
try {
while (true) {
// init stuff
try {
// setup storage
connectToNNAndHandshake();
break;
} catch (IOException ioe) {
// Initial handshake, storage recovery or registration failed
runningState = RunningState.INIT_FAILED;
if (shouldRetryInit()) {
// Retry until all namenode's of BPOS failed initialization
LOG.error("Initialization failed for " + this + " "
+ ioe.getLocalizedMessage());
sleepAndLogInterrupts(5000, "initializing");
} else {
runningState = RunningState.FAILED;
LOG.fatal("Initialization failed for " + this + ". Exiting. ", ioe);
return;
}
}
}
runningState = RunningState.RUNNING;
while (shouldRun()) {
try {
offerService();
} catch (Exception ex) {
LOG.error("Exception in BPOfferService for " + this, ex);
sleepAndLogInterrupts(5000, "offering service");
}
}
runningState = RunningState.EXITED;
} catch (Throwable ex) {
LOG.warn("Unexpected exception in block pool " + this, ex);
runningState = RunningState.FAILED;
} finally {
LOG.warn("Ending block pool service for: " + this);
cleanUp();
}
}
private boolean shouldRetryInit() {
return shouldRun() && bpos.shouldRetryInit();
}
private boolean shouldRun() {
return shouldServiceRun && dn.shouldRun();
}
/**
* Process an array of datanode commands
*
* @param cmds an array of datanode commands
* @return true if further processing may be required or false otherwise.
*/
boolean processCommand(DatanodeCommand[] cmds) {
if (cmds != null) {
for (DatanodeCommand cmd : cmds) {
try {
if (bpos.processCommandFromActor(cmd, this) == false) {
return false;
}
} catch (IOException ioe) {
LOG.warn("Error processing datanode Command", ioe);
}
}
}
return true;
}
void trySendErrorReport(int errCode, String errMsg) {
try {
bpNamenode.errorReport(bpRegistration, errCode, errMsg);
} catch(IOException e) {
LOG.warn("Error reporting an error to NameNode " + nnAddr,
e);
}
}
/**
* Report a bad block from another DN in this cluster.
*/
void reportRemoteBadBlock(DatanodeInfo dnInfo, ExtendedBlock block)
throws IOException {
LocatedBlock lb = new LocatedBlock(block,
new DatanodeInfo[] {dnInfo});
bpNamenode.reportBadBlocks(new LocatedBlock[] {lb});
}
void reRegister() throws IOException {
if (shouldRun()) {
// re-retrieve namespace info to make sure that, if the NN
// was restarted, we still match its version (HDFS-2120)
retrieveNamespaceInfo();
// and re-register
register();
scheduleHeartbeat();
}
}
private static class PerStoragePendingIncrementalBR {
private final Map<Long, ReceivedDeletedBlockInfo> pendingIncrementalBR =
Maps.newHashMap();
/**
* Return the number of blocks on this storage that have pending
* incremental block reports.
* @return
*/
int getBlockInfoCount() {
return pendingIncrementalBR.size();
}
/**
* Dequeue and return all pending incremental block report state.
* @return
*/
ReceivedDeletedBlockInfo[] dequeueBlockInfos() {
ReceivedDeletedBlockInfo[] blockInfos =
pendingIncrementalBR.values().toArray(
new ReceivedDeletedBlockInfo[getBlockInfoCount()]);
pendingIncrementalBR.clear();
return blockInfos;
}
/**
* Add blocks from blockArray to pendingIncrementalBR, unless the
* block already exists in pendingIncrementalBR.
* @param blockArray list of blocks to add.
* @return the number of missing blocks that we added.
*/
int putMissingBlockInfos(ReceivedDeletedBlockInfo[] blockArray) {
int blocksPut = 0;
for (ReceivedDeletedBlockInfo rdbi : blockArray) {
if (!pendingIncrementalBR.containsKey(rdbi.getBlock().getBlockId())) {
pendingIncrementalBR.put(rdbi.getBlock().getBlockId(), rdbi);
++blocksPut;
}
}
return blocksPut;
}
/**
* Add pending incremental block report for a single block.
* @param blockID
* @param blockInfo
*/
void putBlockInfo(ReceivedDeletedBlockInfo blockInfo) {
pendingIncrementalBR.put(blockInfo.getBlock().getBlockId(), blockInfo);
}
/**
* Remove pending incremental block report for a single block if it
* exists.
*
* @param blockInfo
* @return true if a report was removed, false if no report existed for
* the given block.
*/
boolean removeBlockInfo(ReceivedDeletedBlockInfo blockInfo) {
return (pendingIncrementalBR.remove(blockInfo.getBlock().getBlockId()) != null);
}
}
}