blob: babbb53301a46eeb6c29a88d5cb88d3976be9d52 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hdfs.server.datanode;
import static org.apache.hadoop.hdfs.DFSConfigKeys.*;
import static org.apache.hadoop.hdfs.protocol.DataTransferProtocol.Status.ERROR_ACCESS_TOKEN;
import static org.apache.hadoop.hdfs.protocol.DataTransferProtocol.Status.SUCCESS;
import static org.apache.hadoop.hdfs.server.common.Util.now;
import java.io.BufferedOutputStream;
import java.io.DataInputStream;
import java.io.DataOutputStream;
import java.io.File;
import java.io.IOException;
import java.io.OutputStream;
import java.net.InetSocketAddress;
import java.net.ServerSocket;
import java.net.Socket;
import java.net.SocketTimeoutException;
import java.net.URI;
import java.net.UnknownHostException;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;
import java.security.PrivilegedExceptionAction;
import java.security.SecureRandom;
import java.util.AbstractList;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.EnumSet;
import java.util.HashMap;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Random;
import java.util.Set;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.CommonConfigurationKeys;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.LocalFileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.permission.FsPermission;
import org.apache.hadoop.hdfs.DFSUtil;
import org.apache.hadoop.hdfs.HDFSPolicyProvider;
import org.apache.hadoop.hdfs.HdfsConfiguration;
import org.apache.hadoop.hdfs.protocol.Block;
import org.apache.hadoop.hdfs.protocol.BlockListAsLongs;
import org.apache.hadoop.hdfs.protocol.ClientDatanodeProtocol;
import org.apache.hadoop.hdfs.protocol.DataTransferProtocol;
import org.apache.hadoop.hdfs.protocol.DataTransferProtocol.BlockConstructionStage;
import org.apache.hadoop.hdfs.protocol.DatanodeID;
import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
import org.apache.hadoop.hdfs.protocol.FSConstants;
import org.apache.hadoop.hdfs.protocol.LocatedBlock;
import org.apache.hadoop.hdfs.protocol.RecoveryInProgressException;
import org.apache.hadoop.hdfs.protocol.UnregisteredNodeException;
import org.apache.hadoop.hdfs.security.token.block.BlockPoolTokenSecretManager;
import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier;
import org.apache.hadoop.hdfs.security.token.block.BlockTokenSecretManager;
import org.apache.hadoop.hdfs.security.token.block.ExportedBlockKeys;
import org.apache.hadoop.hdfs.security.token.block.InvalidBlockTokenException;
import org.apache.hadoop.hdfs.server.common.HdfsConstants;
import org.apache.hadoop.hdfs.server.common.HdfsConstants.ReplicaState;
import org.apache.hadoop.hdfs.server.common.HdfsConstants.StartupOption;
import org.apache.hadoop.hdfs.server.common.IncorrectVersionException;
import org.apache.hadoop.hdfs.server.common.JspHelper;
import org.apache.hadoop.hdfs.server.common.Storage;
import org.apache.hadoop.hdfs.server.common.Util;
import org.apache.hadoop.hdfs.server.datanode.FSDataset.VolumeInfo;
import org.apache.hadoop.hdfs.server.datanode.SecureDataNodeStarter.SecureResources;
import org.apache.hadoop.hdfs.server.datanode.metrics.DataNodeMetrics;
import org.apache.hadoop.hdfs.server.namenode.FSNamesystem;
import org.apache.hadoop.hdfs.server.namenode.FileChecksumServlets;
import org.apache.hadoop.hdfs.server.namenode.StreamFile;
import org.apache.hadoop.hdfs.server.protocol.BlockCommand;
import org.apache.hadoop.hdfs.server.protocol.BlockRecoveryCommand;
import org.apache.hadoop.hdfs.server.protocol.BlockRecoveryCommand.RecoveringBlock;
import org.apache.hadoop.hdfs.server.protocol.DatanodeCommand;
import org.apache.hadoop.hdfs.server.protocol.DatanodeProtocol;
import org.apache.hadoop.hdfs.server.protocol.DatanodeRegistration;
import org.apache.hadoop.hdfs.server.protocol.DisallowedDatanodeException;
import org.apache.hadoop.hdfs.server.protocol.InterDatanodeProtocol;
import org.apache.hadoop.hdfs.server.protocol.KeyUpdateCommand;
import org.apache.hadoop.hdfs.server.protocol.NamespaceInfo;
import org.apache.hadoop.hdfs.server.protocol.ReplicaRecoveryInfo;
import org.apache.hadoop.hdfs.server.protocol.UpgradeCommand;
import org.apache.hadoop.http.HttpServer;
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.ipc.ProtocolSignature;
import org.apache.hadoop.ipc.RPC;
import org.apache.hadoop.ipc.RemoteException;
import org.apache.hadoop.ipc.Server;
import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem;
import org.apache.hadoop.metrics2.util.MBeans;
import org.apache.hadoop.net.DNS;
import org.apache.hadoop.net.NetUtils;
import org.apache.hadoop.security.SecurityUtil;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.security.authorize.AccessControlList;
import org.apache.hadoop.security.token.Token;
import org.apache.hadoop.security.token.TokenIdentifier;
import org.apache.hadoop.util.Daemon;
import org.apache.hadoop.util.DiskChecker;
import org.apache.hadoop.util.DiskChecker.DiskErrorException;
import org.apache.hadoop.util.DiskChecker.DiskOutOfSpaceException;
import org.apache.hadoop.util.GenericOptionsParser;
import org.apache.hadoop.util.ReflectionUtils;
import org.apache.hadoop.util.ServicePlugin;
import org.apache.hadoop.util.StringUtils;
import org.apache.hadoop.util.VersionInfo;
import org.mortbay.util.ajax.JSON;
/**********************************************************
* DataNode is a class (and program) that stores a set of
* blocks for a DFS deployment. A single deployment can
* have one or many DataNodes. Each DataNode communicates
* regularly with a single NameNode. It also communicates
* with client code and other DataNodes from time to time.
*
* DataNodes store a series of named blocks. The DataNode
* allows client code to read these blocks, or to write new
* block data. The DataNode may also, in response to instructions
* from its NameNode, delete blocks or copy blocks to/from other
* DataNodes.
*
* The DataNode maintains just one critical table:
* block-> stream of bytes (of BLOCK_SIZE or less)
*
* This info is stored on a local disk. The DataNode
* reports the table's contents to the NameNode upon startup
* and every so often afterwards.
*
* DataNodes spend their lives in an endless loop of asking
* the NameNode for something to do. A NameNode cannot connect
* to a DataNode directly; a NameNode simply returns values from
* functions invoked by a DataNode.
*
* DataNodes maintain an open server socket so that client code
* or other DataNodes can read/write data. The host/port for
* this server is reported to the NameNode, which then sends that
* information to clients or other DataNodes that might be interested.
*
**********************************************************/
@InterfaceAudience.Private
public class DataNode extends Configured
implements InterDatanodeProtocol, ClientDatanodeProtocol, FSConstants,
DataNodeMXBean {
public static final Log LOG = LogFactory.getLog(DataNode.class);
static{
HdfsConfiguration.init();
}
public static final String DN_CLIENTTRACE_FORMAT =
"src: %s" + // src IP
", dest: %s" + // dst IP
", bytes: %s" + // byte count
", op: %s" + // operation
", cliID: %s" + // DFSClient id
", offset: %s" + // offset
", srvID: %s" + // DatanodeRegistration
", blockid: %s" + // block id
", duration: %s"; // duration time
static final Log ClientTraceLog =
LogFactory.getLog(DataNode.class.getName() + ".clienttrace");
/**
* Use {@link NetUtils#createSocketAddr(String)} instead.
*/
@Deprecated
public static InetSocketAddress createSocketAddr(String target
) throws IOException {
return NetUtils.createSocketAddr(target);
}
/**
* Manages he BPOfferService objects for the data node.
* Creation, removal, starting, stopping, shutdown on BPOfferService
* objects must be done via APIs in this class.
*/
@InterfaceAudience.Private
class BlockPoolManager {
private final Map<String, BPOfferService> bpMapping;
private final Map<InetSocketAddress, BPOfferService> nameNodeThreads;
//This lock is used only to ensure exclusion of refreshNamenodes
private final Object refreshNamenodesLock = new Object();
BlockPoolManager(Configuration conf)
throws IOException {
bpMapping = new HashMap<String, BPOfferService>();
nameNodeThreads = new HashMap<InetSocketAddress, BPOfferService>();
List<InetSocketAddress> isas = DFSUtil.getNNServiceRpcAddresses(conf);
for(InetSocketAddress isa : isas) {
BPOfferService bpos = new BPOfferService(isa);
nameNodeThreads.put(bpos.getNNSocketAddress(), bpos);
}
}
synchronized void addBlockPool(BPOfferService t) {
if (nameNodeThreads.get(t.getNNSocketAddress()) == null) {
throw new IllegalArgumentException(
"Unknown BPOfferService thread for namenode address:"
+ t.getNNSocketAddress());
}
if (t.getBlockPoolId() == null) {
throw new IllegalArgumentException("Null blockpool id");
}
bpMapping.put(t.getBlockPoolId(), t);
}
/**
* Returns the array of BPOfferService objects.
* Caution: The BPOfferService returned could be shutdown any time.
*/
synchronized BPOfferService[] getAllNamenodeThreads() {
BPOfferService[] bposArray = new BPOfferService[nameNodeThreads.values()
.size()];
return nameNodeThreads.values().toArray(bposArray);
}
synchronized BPOfferService get(InetSocketAddress addr) {
return nameNodeThreads.get(addr);
}
synchronized BPOfferService get(String bpid) {
return bpMapping.get(bpid);
}
synchronized void remove(BPOfferService t) {
nameNodeThreads.remove(t.getNNSocketAddress());
bpMapping.remove(t.getBlockPoolId());
}
void shutDownAll() throws InterruptedException {
BPOfferService[] bposArray = this.getAllNamenodeThreads();
for (BPOfferService bpos : bposArray) {
bpos.stop(); //interrupts the threads
}
//now join
for (BPOfferService bpos : bposArray) {
bpos.join();
}
}
synchronized void startAll() throws IOException {
try {
UserGroupInformation.getLoginUser().doAs(
new PrivilegedExceptionAction<Object>() {
public Object run() throws Exception {
for (BPOfferService bpos : nameNodeThreads.values()) {
bpos.start();
}
return null;
}
});
} catch (InterruptedException ex) {
IOException ioe = new IOException();
ioe.initCause(ex.getCause());
throw ioe;
}
}
void joinAll() throws InterruptedException {
for (BPOfferService bpos: this.getAllNamenodeThreads()) {
bpos.join();
}
}
void refreshNamenodes(Configuration conf)
throws IOException, InterruptedException {
LOG.info("Refresh request received for nameservices: "
+ conf.get(DFS_FEDERATION_NAMESERVICES));
List<InetSocketAddress> newAddresses =
DFSUtil.getNNServiceRpcAddresses(conf);
List<BPOfferService> toShutdown = new ArrayList<BPOfferService>();
List<InetSocketAddress> toStart = new ArrayList<InetSocketAddress>();
synchronized (refreshNamenodesLock) {
synchronized (this) {
for (InetSocketAddress nnaddr : nameNodeThreads.keySet()) {
if (!(newAddresses.contains(nnaddr))) {
toShutdown.add(nameNodeThreads.get(nnaddr));
}
}
for (InetSocketAddress nnaddr : newAddresses) {
if (!(nameNodeThreads.containsKey(nnaddr))) {
toStart.add(nnaddr);
}
}
for (InetSocketAddress nnaddr : toStart) {
BPOfferService bpos = new BPOfferService(nnaddr);
nameNodeThreads.put(bpos.getNNSocketAddress(), bpos);
}
for (BPOfferService bpos : toShutdown) {
remove(bpos);
}
}
for (BPOfferService bpos : toShutdown) {
bpos.stop();
bpos.join();
}
// Now start the threads that are not already running.
startAll();
}
}
}
volatile boolean shouldRun = true;
private BlockPoolManager blockPoolManager;
public volatile FSDatasetInterface data = null;
private String clusterId = null;
public final static String EMPTY_DEL_HINT = "";
AtomicInteger xmitsInProgress = new AtomicInteger();
Daemon dataXceiverServer = null;
ThreadGroup threadGroup = null;
long blockReportInterval;
boolean resetBlockReportTime = true;
long initialBlockReportDelay = BLOCKREPORT_INITIAL_DELAY * 1000L;
long heartBeatInterval;
private boolean heartbeatsDisabledForTests = false;
private DataStorage storage = null;
private HttpServer infoServer = null;
DataNodeMetrics metrics;
private InetSocketAddress selfAddr;
private volatile String hostName; // Host name of this datanode
private static String dnThreadName;
int socketTimeout;
int socketWriteTimeout = 0;
boolean transferToAllowed = true;
int writePacketSize = 0;
boolean isBlockTokenEnabled;
BlockPoolTokenSecretManager blockPoolTokenSecretManager;
boolean syncOnClose;
public DataBlockScanner blockScanner = null;
private DirectoryScanner directoryScanner = null;
/** Activated plug-ins. */
private List<ServicePlugin> plugins;
private static final Random R = new Random();
// For InterDataNodeProtocol
public Server ipcServer;
private SecureResources secureResources = null;
private AbstractList<File> dataDirs;
private Configuration conf;
/**
* Create the DataNode given a configuration and an array of dataDirs.
* 'dataDirs' is where the blocks are stored.
*/
DataNode(final Configuration conf,
final AbstractList<File> dataDirs) throws IOException {
this(conf, dataDirs, null);
}
/**
* Create the DataNode given a configuration, an array of dataDirs,
* and a namenode proxy
*/
DataNode(final Configuration conf,
final AbstractList<File> dataDirs,
final SecureResources resources) throws IOException {
super(conf);
try {
hostName = getHostName(conf);
startDataNode(conf, dataDirs, resources);
} catch (IOException ie) {
shutdown();
throw ie;
}
}
private synchronized void setClusterId(String cid) throws IOException {
if(clusterId != null && !clusterId.equals(cid)) {
throw new IOException ("cluster id doesn't match. old cid=" + clusterId
+ " new cid="+ cid);
}
// else
clusterId = cid;
}
private static String getHostName(Configuration config)
throws UnknownHostException {
String name = null;
// use configured nameserver & interface to get local hostname
if (config.get(DFS_DATANODE_HOST_NAME_KEY) != null) {
name = config.get(DFS_DATANODE_HOST_NAME_KEY);
}
if (name == null) {
name = DNS.getDefaultHost(config.get("dfs.datanode.dns.interface",
"default"), config.get("dfs.datanode.dns.nameserver", "default"));
}
return name;
}
private void initConfig(Configuration conf) {
this.socketTimeout = conf.getInt(DFS_CLIENT_SOCKET_TIMEOUT_KEY,
HdfsConstants.READ_TIMEOUT);
this.socketWriteTimeout = conf.getInt(DFS_DATANODE_SOCKET_WRITE_TIMEOUT_KEY,
HdfsConstants.WRITE_TIMEOUT);
/* Based on results on different platforms, we might need set the default
* to false on some of them. */
this.transferToAllowed = conf.getBoolean("dfs.datanode.transferTo.allowed",
true);
this.writePacketSize = conf.getInt(DFS_CLIENT_WRITE_PACKET_SIZE_KEY,
DFS_CLIENT_WRITE_PACKET_SIZE_DEFAULT);
this.blockReportInterval =
conf.getLong(DFS_BLOCKREPORT_INTERVAL_MSEC_KEY, BLOCKREPORT_INTERVAL);
this.initialBlockReportDelay = conf.getLong(DFS_BLOCKREPORT_INITIAL_DELAY_KEY,
BLOCKREPORT_INITIAL_DELAY)* 1000L;
if (this.initialBlockReportDelay >= blockReportInterval) {
this.initialBlockReportDelay = 0;
LOG.info("dfs.blockreport.initialDelay is greater than " +
"dfs.blockreport.intervalMsec." + " Setting initial delay to 0 msec:");
}
this.heartBeatInterval = conf.getLong(DFS_HEARTBEAT_INTERVAL_KEY, HEARTBEAT_INTERVAL) * 1000L;
// do we need to sync block file contents to disk when blockfile is closed?
this.syncOnClose = conf.getBoolean(DFS_DATANODE_SYNCONCLOSE_KEY,
DFS_DATANODE_SYNCONCLOSE_DEFAULT);
}
private void startInfoServer(Configuration conf) throws IOException {
// create a servlet to serve full-file content
InetSocketAddress infoSocAddr = DataNode.getInfoAddr(conf);
String infoHost = infoSocAddr.getHostName();
int tmpInfoPort = infoSocAddr.getPort();
this.infoServer = (secureResources == null)
? new HttpServer("datanode", infoHost, tmpInfoPort, tmpInfoPort == 0,
conf, new AccessControlList(conf.get(DFS_ADMIN, " ")))
: new HttpServer("datanode", infoHost, tmpInfoPort, tmpInfoPort == 0,
conf, new AccessControlList(conf.get(DFS_ADMIN, " ")),
secureResources.getListener());
if(LOG.isDebugEnabled()) {
LOG.debug("Datanode listening on " + infoHost + ":" + tmpInfoPort);
}
if (conf.getBoolean("dfs.https.enable", false)) {
boolean needClientAuth = conf.getBoolean(DFS_CLIENT_HTTPS_NEED_AUTH_KEY,
DFS_CLIENT_HTTPS_NEED_AUTH_DEFAULT);
InetSocketAddress secInfoSocAddr = NetUtils.createSocketAddr(conf.get(
"dfs.datanode.https.address", infoHost + ":" + 0));
Configuration sslConf = new HdfsConfiguration(false);
sslConf.addResource(conf.get("dfs.https.server.keystore.resource",
"ssl-server.xml"));
this.infoServer.addSslListener(secInfoSocAddr, sslConf, needClientAuth);
if(LOG.isDebugEnabled()) {
LOG.debug("Datanode listening for SSL on " + secInfoSocAddr);
}
}
this.infoServer.addInternalServlet(null, "/streamFile/*", StreamFile.class);
this.infoServer.addInternalServlet(null, "/getFileChecksum/*",
FileChecksumServlets.GetServlet.class);
this.infoServer.setAttribute("datanode", this);
this.infoServer.setAttribute(JspHelper.CURRENT_CONF, conf);
this.infoServer.addServlet(null, "/blockScannerReport",
DataBlockScanner.Servlet.class);
this.infoServer.start();
}
private void startPlugins(Configuration conf) {
plugins = conf.getInstances("dfs.datanode.plugins", ServicePlugin.class);
for (ServicePlugin p: plugins) {
try {
p.start(this);
LOG.info("Started plug-in " + p);
} catch (Throwable t) {
LOG.warn("ServicePlugin " + p + " could not be started", t);
}
}
}
private void initIpcServer(Configuration conf) throws IOException {
InetSocketAddress ipcAddr = NetUtils.createSocketAddr(
conf.get("dfs.datanode.ipc.address"));
ipcServer = RPC.getServer(DataNode.class, this, ipcAddr.getHostName(),
ipcAddr.getPort(),
conf.getInt(DFS_DATANODE_HANDLER_COUNT_KEY,
DFS_DATANODE_HANDLER_COUNT_DEFAULT),
false, conf, blockPoolTokenSecretManager);
// set service-level authorization security policy
if (conf.getBoolean(
CommonConfigurationKeys.HADOOP_SECURITY_AUTHORIZATION, false)) {
ipcServer.refreshServiceAcl(conf, new HDFSPolicyProvider());
}
}
/**
* Initialize the datanode's periodic scanners:
* {@link DataBlockScanner}
* {@link DirectoryScanner}
* They report results on a per-blockpool basis but do their scanning
* on a per-Volume basis to minimize competition for disk iops.
*
* @param conf - Configuration has the run intervals and other
* parameters for these periodic scanners
*/
private void initPeriodicScanners(Configuration conf) {
initDataBlockScanner(conf);
initDirectoryScanner(conf);
}
private void shutdownPeriodicScanners() {
shutdownDirectoryScanner();
shutdownDataBlockScanner();
}
/**
* See {@link DataBlockScanner}
*/
private synchronized void initDataBlockScanner(Configuration conf) {
if (blockScanner != null) {
return;
}
String reason = null;
if (conf.getInt(DFS_DATANODE_SCAN_PERIOD_HOURS_KEY,
DFS_DATANODE_SCAN_PERIOD_HOURS_DEFAULT) < 0) {
reason = "verification is turned off by configuration";
} else if (!(data instanceof FSDataset)) {
reason = "verifcation is supported only with FSDataset";
}
if (reason == null) {
blockScanner = new DataBlockScanner(this, (FSDataset)data, conf);
blockScanner.start();
} else {
LOG.info("Periodic Block Verification scan is disabled because " +
reason + ".");
}
}
private void shutdownDataBlockScanner() {
if (blockScanner != null) {
blockScanner.shutdown();
}
}
/**
* See {@link DirectoryScanner}
*/
private synchronized void initDirectoryScanner(Configuration conf) {
if (directoryScanner != null) {
return;
}
String reason = null;
if (conf.getInt(DFS_DATANODE_DIRECTORYSCAN_INTERVAL_KEY,
DFS_DATANODE_DIRECTORYSCAN_INTERVAL_DEFAULT) < 0) {
reason = "verification is turned off by configuration";
} else if (!(data instanceof FSDataset)) {
reason = "verification is supported only with FSDataset";
}
if (reason == null) {
directoryScanner = new DirectoryScanner(this, (FSDataset) data, conf);
directoryScanner.start();
} else {
LOG.info("Periodic Directory Tree Verification scan is disabled because " +
reason + ".");
}
}
private synchronized void shutdownDirectoryScanner() {
if (directoryScanner != null) {
directoryScanner.shutdown();
}
}
private void initDataXceiver(Configuration conf) throws IOException {
InetSocketAddress socAddr = DataNode.getStreamingAddr(conf);
// find free port or use privileged port provided
ServerSocket ss;
if(secureResources == null) {
ss = (socketWriteTimeout > 0) ?
ServerSocketChannel.open().socket() : new ServerSocket();
Server.bind(ss, socAddr, 0);
} else {
ss = secureResources.getStreamingSocket();
}
ss.setReceiveBufferSize(DEFAULT_DATA_SOCKET_SIZE);
// adjust machine name with the actual port
int tmpPort = ss.getLocalPort();
selfAddr = new InetSocketAddress(ss.getInetAddress().getHostAddress(),
tmpPort);
LOG.info("Opened info server at " + tmpPort);
this.threadGroup = new ThreadGroup("dataXceiverServer");
this.dataXceiverServer = new Daemon(threadGroup,
new DataXceiverServer(ss, conf, this));
this.threadGroup.setDaemon(true); // auto destroy when empty
}
// calls specific to BP
protected void notifyNamenodeReceivedBlock(ExtendedBlock block, String delHint) {
BPOfferService bpos = blockPoolManager.get(block.getBlockPoolId());
if(bpos != null) {
bpos.notifyNamenodeReceivedBlock(block, delHint);
} else {
LOG.warn("Cannot find BPOfferService for reporting block received for bpid="
+ block.getBlockPoolId());
}
}
public void reportBadBlocks(ExtendedBlock block) throws IOException{
BPOfferService bpos = blockPoolManager.get(block.getBlockPoolId());
if(bpos == null || bpos.bpNamenode == null) {
throw new IOException("cannot locate OfferService thread for bp="+block.getBlockPoolId());
}
bpos.reportBadBlocks(block);
}
// used only for testing
void setHeartbeatsDisabledForTests(
boolean heartbeatsDisabledForTests) {
this.heartbeatsDisabledForTests = heartbeatsDisabledForTests;
}
/**
* A thread per namenode to perform:
* <ul>
* <li> Pre-registration handshake with namenode</li>
* <li> Registration with namenode</li>
* <li> Send periodic heartbeats to the namenode</li>
* <li> Handle commands received from the datanode</li>
* </ul>
*/
@InterfaceAudience.Private
class BPOfferService implements Runnable {
final InetSocketAddress nnAddr;
DatanodeRegistration bpRegistration;
NamespaceInfo bpNSInfo;
long lastBlockReport = 0;
private Thread bpThread;
private DatanodeProtocol bpNamenode;
private String blockPoolId;
private long lastHeartbeat = 0;
private volatile boolean initialized = false;
private final LinkedList<Block> receivedBlockList = new LinkedList<Block>();
private final LinkedList<String> delHints = new LinkedList<String>();
private volatile boolean shouldServiceRun = true;
private boolean isBlockTokenInitialized = false;
UpgradeManagerDatanode upgradeManager = null;
BPOfferService(InetSocketAddress isa) {
this.bpRegistration = new DatanodeRegistration(getMachineName());
bpRegistration.setInfoPort(infoServer.getPort());
bpRegistration.setIpcPort(getIpcPort());
this.nnAddr = isa;
}
/**
* returns true if BP thread has completed initialization of storage
* and has registered with the corresponding namenode
* @return true if initialized
*/
public boolean initialized() {
return initialized;
}
public boolean isAlive() {
return shouldServiceRun && bpThread.isAlive();
}
public String getBlockPoolId() {
return blockPoolId;
}
private InetSocketAddress getNNSocketAddress() {
return nnAddr;
}
void setNamespaceInfo(NamespaceInfo nsinfo) {
bpNSInfo = nsinfo;
this.blockPoolId = nsinfo.getBlockPoolID();
blockPoolManager.addBlockPool(this);
}
void setNameNode(DatanodeProtocol dnProtocol) {
bpNamenode = dnProtocol;
}
private NamespaceInfo handshake() throws IOException {
NamespaceInfo nsInfo = new NamespaceInfo();
while (shouldRun && shouldServiceRun) {
try {
nsInfo = bpNamenode.versionRequest();
// verify build version
String nsVer = nsInfo.getBuildVersion();
String stVer = Storage.getBuildVersion();
LOG.info("handshake: namespace info = " + nsInfo);
if(! nsVer.equals(stVer)) {
String errorMsg = "Incompatible build versions: bp = " + blockPoolId +
"namenode BV = " + nsVer + "; datanode BV = " + stVer;
LOG.warn(errorMsg);
bpNamenode.errorReport( bpRegistration,
DatanodeProtocol.NOTIFY, errorMsg );
} else {
break;
}
} catch(SocketTimeoutException e) { // namenode is busy
LOG.warn("Problem connecting to server: " + nnAddr);
} catch(IOException e ) { // namenode is not available
LOG.warn("Problem connecting to server: " + nnAddr);
}
// try again in a second
try {
Thread.sleep(5000);
} catch (InterruptedException ie) {}
}
assert FSConstants.LAYOUT_VERSION == nsInfo.getLayoutVersion() :
"Data-node and name-node layout versions must be the same."
+ "Expected: "+ FSConstants.LAYOUT_VERSION
+ " actual "+ nsInfo.getLayoutVersion();
return nsInfo;
}
void setupBP(Configuration conf, AbstractList<File> dataDirs)
throws IOException {
// get NN proxy
DatanodeProtocol dnp =
(DatanodeProtocol)RPC.waitForProxy(DatanodeProtocol.class,
DatanodeProtocol.versionID, nnAddr, conf);
setNameNode(dnp);
// handshake with NN
NamespaceInfo nsInfo = handshake();
setNamespaceInfo(nsInfo);
synchronized(DataNode.this) {
// we do not allow namenode from different cluster to register
if(clusterId != null && !clusterId.equals(nsInfo.clusterID)) {
throw new IOException(
"cannot register with the namenode because clusterid do not match:"
+ " nn=" + nsInfo.getBlockPoolID() + "; nn cid=" + nsInfo.clusterID +
";dn cid=" + clusterId);
}
setupBPStorage();
setClusterId(nsInfo.clusterID);
}
initPeriodicScanners(conf);
}
void setupBPStorage() throws IOException {
StartupOption startOpt = getStartupOption(conf);
assert startOpt != null : "Startup option must be set.";
boolean simulatedFSDataset =
conf.getBoolean("dfs.datanode.simulateddatastorage", false);
if (simulatedFSDataset) {
initFsDataSet(conf, dataDirs);
bpRegistration.setStorageID(getStorageId()); //same as DN
bpRegistration.storageInfo.layoutVersion = FSConstants.LAYOUT_VERSION;
bpRegistration.storageInfo.namespaceID = bpNSInfo.namespaceID;
bpRegistration.storageInfo.clusterID = bpNSInfo.clusterID;
} else {
// read storage info, lock data dirs and transition fs state if necessary
storage.recoverTransitionRead(DataNode.this, blockPoolId, bpNSInfo,
dataDirs, startOpt);
LOG.info("setting up storage: nsid=" + storage.namespaceID + ";bpid="
+ blockPoolId + ";lv=" + storage.layoutVersion + ";nsInfo="
+ bpNSInfo);
bpRegistration.setStorageID(getStorageId());
bpRegistration.setStorageInfo(storage.getBPStorage(blockPoolId));
initFsDataSet(conf, dataDirs);
}
data.addBlockPool(blockPoolId, conf);
}
/**
* This methods arranges for the data node to send the block report at
* the next heartbeat.
*/
void scheduleBlockReport(long delay) {
if (delay > 0) { // send BR after random delay
lastBlockReport = System.currentTimeMillis()
- ( blockReportInterval - R.nextInt((int)(delay)));
} else { // send at next heartbeat
lastBlockReport = lastHeartbeat - blockReportInterval;
}
resetBlockReportTime = true; // reset future BRs for randomness
}
private void reportBadBlocks(ExtendedBlock block) {
DatanodeInfo[] dnArr = { new DatanodeInfo(bpRegistration) };
LocatedBlock[] blocks = { new LocatedBlock(block, dnArr) };
try {
bpNamenode.reportBadBlocks(blocks);
} catch (IOException e){
/* One common reason is that NameNode could be in safe mode.
* Should we keep on retrying in that case?
*/
LOG.warn("Failed to report bad block " + block + " to namenode : " +
" Exception : " + StringUtils.stringifyException(e));
}
}
/**
* Report received blocks and delete hints to the Namenode
* @throws IOException
*/
private void reportReceivedBlocks() throws IOException {
//check if there are newly received blocks
Block [] blockArray=null;
String [] delHintArray=null;
synchronized(receivedBlockList) {
synchronized(delHints){
int numBlocks = receivedBlockList.size();
if (numBlocks > 0) {
if(numBlocks!=delHints.size()) {
LOG.warn("Panic: receiveBlockList and delHints are not of " +
"the same length" );
}
//
// Send newly-received blockids to namenode
//
blockArray = receivedBlockList.toArray(new Block[numBlocks]);
delHintArray = delHints.toArray(new String[numBlocks]);
}
}
}
if (blockArray != null) {
if(delHintArray == null || delHintArray.length != blockArray.length ) {
LOG.warn("Panic: block array & delHintArray are not the same" );
}
bpNamenode.blockReceived(bpRegistration, blockPoolId, blockArray,
delHintArray);
synchronized(receivedBlockList) {
synchronized(delHints){
for(int i=0; i<blockArray.length; i++) {
receivedBlockList.remove(blockArray[i]);
delHints.remove(delHintArray[i]);
}
}
}
}
}
/*
* Informing the name node could take a long long time! Should we wait
* till namenode is informed before responding with success to the
* client? For now we don't.
*/
void notifyNamenodeReceivedBlock(ExtendedBlock block, String delHint) {
if(block==null || delHint==null) {
throw new IllegalArgumentException(
block==null?"Block is null":"delHint is null");
}
if (!block.getBlockPoolId().equals(blockPoolId)) {
LOG.warn("BlockPool mismatch " + block.getBlockPoolId() +
" vs. " + blockPoolId);
return;
}
synchronized (receivedBlockList) {
synchronized (delHints) {
receivedBlockList.add(block.getLocalBlock());
delHints.add(delHint);
receivedBlockList.notifyAll();
}
}
}
/**
* Report the list blocks to the Namenode
* @throws IOException
*/
DatanodeCommand blockReport() throws IOException {
// send block report
DatanodeCommand cmd = null;
long startTime = now();
if (startTime - lastBlockReport > blockReportInterval) {
//
// Send latest block report if timer has expired.
// Get back a list of local block(s) that are obsolete
// and can be safely GC'ed.
//
long brStartTime = now();
BlockListAsLongs bReport = data.getBlockReport(blockPoolId);
cmd = bpNamenode.blockReport(bpRegistration, blockPoolId, bReport
.getBlockListAsLongs());
long brTime = now() - brStartTime;
metrics.addBlockReport(brTime);
LOG.info("BlockReport of " + bReport.getNumberOfBlocks() +
" blocks got processed in " + brTime + " msecs");
//
// If we have sent the first block report, then wait a random
// time before we start the periodic block reports.
//
if (resetBlockReportTime) {
lastBlockReport = startTime - R.nextInt((int)(blockReportInterval));
resetBlockReportTime = false;
} else {
/* say the last block report was at 8:20:14. The current report
* should have started around 9:20:14 (default 1 hour interval).
* If current time is :
* 1) normal like 9:20:18, next report should be at 10:20:14
* 2) unexpected like 11:35:43, next report should be at 12:20:14
*/
lastBlockReport += (now() - lastBlockReport) /
blockReportInterval * blockReportInterval;
}
LOG.info("sent block report, processed command:" + cmd);
}
return cmd;
}
DatanodeCommand [] sendHeartBeat() throws IOException {
return bpNamenode.sendHeartbeat(bpRegistration,
data.getCapacity(),
data.getDfsUsed(),
data.getRemaining(),
data.getBlockPoolUsed(blockPoolId),
xmitsInProgress.get(),
getXceiverCount(), data.getNumFailedVolumes());
}
//This must be called only by blockPoolManager
void start() {
if ((bpThread != null) && (bpThread.isAlive())) {
//Thread is started already
return;
}
bpThread = new Thread(this, dnThreadName);
bpThread.setDaemon(true); // needed for JUnit testing
bpThread.start();
}
//This must be called only by blockPoolManager.
void stop() {
shouldServiceRun = false;
if (bpThread != null) {
bpThread.interrupt();
}
}
//This must be called only by blockPoolManager
void join() {
try {
if (bpThread != null) {
bpThread.join();
}
} catch (InterruptedException ie) { }
}
//Cleanup method to be called by current thread before exiting.
private synchronized void cleanUp() {
if(upgradeManager != null)
upgradeManager.shutdownUpgrade();
blockPoolManager.remove(this);
shouldServiceRun = false;
RPC.stopProxy(bpNamenode);
if (blockScanner != null) {
blockScanner.removeBlockPool(this.getBlockPoolId());
}
if (data != null) {
data.shutdownBlockPool(this.getBlockPoolId());
}
if (storage != null) {
storage.removeBlockPoolStorage(this.getBlockPoolId());
}
}
/**
* Main loop for each BP thread. Run until shutdown,
* forever calling remote NameNode functions.
*/
private void offerService() throws Exception {
LOG.info("For namenode " + nnAddr + " using BLOCKREPORT_INTERVAL of "
+ blockReportInterval + "msec" + " Initial delay: "
+ initialBlockReportDelay + "msec" + "; heartBeatInterval="
+ heartBeatInterval);
//
// Now loop for a long time....
//
while (shouldRun && shouldServiceRun) {
try {
long startTime = now();
//
// Every so often, send heartbeat or block-report
//
if (startTime - lastHeartbeat > heartBeatInterval) {
//
// All heartbeat messages include following info:
// -- Datanode name
// -- data transfer port
// -- Total capacity
// -- Bytes remaining
//
lastHeartbeat = startTime;
if (!heartbeatsDisabledForTests) {
DatanodeCommand[] cmds = sendHeartBeat();
metrics.addHeartbeat(now() - startTime);
if (!processCommand(cmds))
continue;
}
}
reportReceivedBlocks();
DatanodeCommand cmd = blockReport();
processCommand(cmd);
// Now safe to start scanning the block pool
if (blockScanner != null) {
blockScanner.addBlockPool(this.blockPoolId);
}
//
// There is no work to do; sleep until hearbeat timer elapses,
// or work arrives, and then iterate again.
//
long waitTime = heartBeatInterval -
(System.currentTimeMillis() - lastHeartbeat);
synchronized(receivedBlockList) {
if (waitTime > 0 && receivedBlockList.size() == 0) {
try {
receivedBlockList.wait(waitTime);
} catch (InterruptedException ie) {
LOG.warn("BPOfferService for block pool="
+ this.getBlockPoolId() + " received exception:" + ie);
}
}
} // synchronized
} catch(RemoteException re) {
String reClass = re.getClassName();
if (UnregisteredNodeException.class.getName().equals(reClass) ||
DisallowedDatanodeException.class.getName().equals(reClass) ||
IncorrectVersionException.class.getName().equals(reClass)) {
LOG.warn("blockpool " + blockPoolId + " is shutting down: " +
StringUtils.stringifyException(re));
shouldServiceRun = false;
return;
}
LOG.warn(StringUtils.stringifyException(re));
try {
long sleepTime = Math.min(1000, heartBeatInterval);
Thread.sleep(sleepTime);
} catch (InterruptedException ie) {
Thread.currentThread().interrupt();
}
} catch (IOException e) {
LOG.warn(StringUtils.stringifyException(e));
}
} // while (shouldRun && shouldServiceRun)
} // offerService
/**
* Register one bp with the corresponding NameNode
* <p>
* The bpDatanode needs to register with the namenode on startup in order
* 1) to report which storage it is serving now and
* 2) to receive a registrationID
*
* issued by the namenode to recognize registered datanodes.
*
* @see FSNamesystem#registerDatanode(DatanodeRegistration)
* @throws IOException
*/
void register() throws IOException {
LOG.info("in register: sid=" + bpRegistration.getStorageID() + ";SI="
+ bpRegistration.storageInfo);
while(shouldRun && shouldServiceRun) {
try {
// Use returned registration from namenode with updated machine name.
bpRegistration = bpNamenode.registerDatanode(bpRegistration);
LOG.info("bpReg after =" + bpRegistration.storageInfo +
";sid=" + bpRegistration.storageID + ";name="+bpRegistration.getName());
NetUtils.getHostname();
hostName = bpRegistration.getHost();
break;
} catch(SocketTimeoutException e) { // namenode is busy
LOG.info("Problem connecting to server: " + nnAddr);
try {
Thread.sleep(1000);
} catch (InterruptedException ie) {}
}
}
if (storage.getStorageID().equals("")) {
storage.setStorageID(bpRegistration.getStorageID());
storage.writeAll();
LOG.info("New storage id " + bpRegistration.getStorageID()
+ " is assigned to data-node " + bpRegistration.getName());
} else if(!storage.getStorageID().equals(bpRegistration.getStorageID())) {
throw new IOException("Inconsistent storage IDs. Name-node returned "
+ bpRegistration.getStorageID()
+ ". Expecting " + storage.getStorageID());
}
if (!isBlockTokenInitialized) {
/* first time registering with NN */
ExportedBlockKeys keys = bpRegistration.exportedKeys;
isBlockTokenEnabled = keys.isBlockTokenEnabled();
if (isBlockTokenEnabled) {
long blockKeyUpdateInterval = keys.getKeyUpdateInterval();
long blockTokenLifetime = keys.getTokenLifetime();
LOG.info("Block token params received from NN: for block pool " +
blockPoolId + " keyUpdateInterval="
+ blockKeyUpdateInterval / (60 * 1000)
+ " min(s), tokenLifetime=" + blockTokenLifetime / (60 * 1000)
+ " min(s)");
final BlockTokenSecretManager secretMgr =
new BlockTokenSecretManager(false, 0, blockTokenLifetime);
blockPoolTokenSecretManager.addBlockPool(blockPoolId, secretMgr);
}
isBlockTokenInitialized = true;
}
if (isBlockTokenEnabled) {
blockPoolTokenSecretManager.setKeys(blockPoolId,
bpRegistration.exportedKeys);
bpRegistration.exportedKeys = ExportedBlockKeys.DUMMY_KEYS;
}
LOG.info("in register:" + ";bpDNR="+bpRegistration.storageInfo);
// random short delay - helps scatter the BR from all DNs
scheduleBlockReport(initialBlockReportDelay);
}
/**
* No matter what kind of exception we get, keep retrying to offerService().
* That's the loop that connects to the NameNode and provides basic DataNode
* functionality.
*
* Only stop when "shouldRun" or "shouldServiceRun" is turned off, which can
* happen either at shutdown or due to refreshNamenodes.
*/
@Override
public void run() {
LOG.info(bpRegistration + "In BPOfferService.run, data = " + data
+ ";bp=" + blockPoolId);
try {
// init stuff
try {
// setup storage
setupBP(conf, dataDirs);
register();
} catch (IOException ioe) {
// Initial handshake, storage recovery or registration failed
// End BPOfferService thread
LOG.fatal(bpRegistration + " initialization failed for block pool "
+ blockPoolId, ioe);
return;
}
initialized = true; // bp is initialized;
while (shouldRun && shouldServiceRun) {
try {
startDistributedUpgradeIfNeeded();
offerService();
} catch (Exception ex) {
LOG.error("Exception: " + StringUtils.stringifyException(ex));
if (shouldRun && shouldServiceRun) {
try {
Thread.sleep(5000);
} catch (InterruptedException ie) {
LOG.warn("Received exception: ", ie);
}
}
}
}
} catch (Throwable ex) {
LOG.warn("Unexpected exception ", ex);
} finally {
LOG.warn(bpRegistration + " ending block pool service for: "
+ blockPoolId);
cleanUp();
}
}
/**
* Process an array of datanode commands
*
* @param cmds an array of datanode commands
* @return true if further processing may be required or false otherwise.
*/
private boolean processCommand(DatanodeCommand[] cmds) {
if (cmds != null) {
for (DatanodeCommand cmd : cmds) {
try {
if (processCommand(cmd) == false) {
return false;
}
} catch (IOException ioe) {
LOG.warn("Error processing datanode Command", ioe);
}
}
}
return true;
}
/**
*
* @param cmd
* @return true if further processing may be required or false otherwise.
* @throws IOException
*/
private boolean processCommand(DatanodeCommand cmd) throws IOException {
if (cmd == null)
return true;
final BlockCommand bcmd =
cmd instanceof BlockCommand? (BlockCommand)cmd: null;
switch(cmd.getAction()) {
case DatanodeProtocol.DNA_TRANSFER:
// Send a copy of a block to another datanode
transferBlocks(bcmd.getBlockPoolId(), bcmd.getBlocks(), bcmd.getTargets());
metrics.incrBlocksReplicated(bcmd.getBlocks().length);
break;
case DatanodeProtocol.DNA_INVALIDATE:
//
// Some local block(s) are obsolete and can be
// safely garbage-collected.
//
Block toDelete[] = bcmd.getBlocks();
try {
if (blockScanner != null) {
blockScanner.deleteBlocks(bcmd.getBlockPoolId(), toDelete);
}
// using global fsdataset
data.invalidate(bcmd.getBlockPoolId(), toDelete);
} catch(IOException e) {
checkDiskError();
throw e;
}
metrics.incrBlocksRemoved(toDelete.length);
break;
case DatanodeProtocol.DNA_SHUTDOWN:
// shut down the data node
shouldServiceRun = false;
return false;
case DatanodeProtocol.DNA_REGISTER:
// namenode requested a registration - at start or if NN lost contact
LOG.info("DatanodeCommand action: DNA_REGISTER");
if (shouldRun && shouldServiceRun) {
register();
}
break;
case DatanodeProtocol.DNA_FINALIZE:
storage.finalizeUpgrade(((DatanodeCommand.Finalize) cmd)
.getBlockPoolId());
break;
case UpgradeCommand.UC_ACTION_START_UPGRADE:
// start distributed upgrade here
processDistributedUpgradeCommand((UpgradeCommand)cmd);
break;
case DatanodeProtocol.DNA_RECOVERBLOCK:
recoverBlocks(((BlockRecoveryCommand)cmd).getRecoveringBlocks());
break;
case DatanodeProtocol.DNA_ACCESSKEYUPDATE:
LOG.info("DatanodeCommand action: DNA_ACCESSKEYUPDATE");
if (isBlockTokenEnabled) {
blockPoolTokenSecretManager.setKeys(blockPoolId,
((KeyUpdateCommand) cmd).getExportedKeys());
}
break;
default:
LOG.warn("Unknown DatanodeCommand action: " + cmd.getAction());
}
return true;
}
private void processDistributedUpgradeCommand(UpgradeCommand comm)
throws IOException {
UpgradeManagerDatanode upgradeManager = getUpgradeManager();
upgradeManager.processUpgradeCommand(comm);
}
synchronized UpgradeManagerDatanode getUpgradeManager() {
if(upgradeManager == null)
upgradeManager =
new UpgradeManagerDatanode(DataNode.this, blockPoolId);
return upgradeManager;
}
/**
* Start distributed upgrade if it should be initiated by the data-node.
*/
private void startDistributedUpgradeIfNeeded() throws IOException {
UpgradeManagerDatanode um = getUpgradeManager();
if(!um.getUpgradeState())
return;
um.setUpgradeState(false, um.getUpgradeVersion());
um.startUpgrade();
return;
}
}
/**
* This method starts the data node with the specified conf.
*
* @param conf - the configuration
* if conf's CONFIG_PROPERTY_SIMULATED property is set
* then a simulated storage based data node is created.
*
* @param dataDirs - only for a non-simulated storage data node
* @throws IOException
*/
void startDataNode(Configuration conf,
AbstractList<File> dataDirs,
// DatanodeProtocol namenode,
SecureResources resources
) throws IOException {
if(UserGroupInformation.isSecurityEnabled() && resources == null)
throw new RuntimeException("Cannot start secure cluster without " +
"privileged resources.");
// settings global for all BPs in the Data Node
this.secureResources = resources;
this.dataDirs = dataDirs;
this.conf = conf;
storage = new DataStorage();
// global DN settings
initConfig(conf);
registerMXBean();
initDataXceiver(conf);
startInfoServer(conf);
// BlockPoolTokenSecretManager is required to create ipc server.
this.blockPoolTokenSecretManager = new BlockPoolTokenSecretManager();
initIpcServer(conf);
metrics = DataNodeMetrics.create(conf, getMachineName());
blockPoolManager = new BlockPoolManager(conf);
}
BPOfferService[] getAllBpOs() {
return blockPoolManager.getAllNamenodeThreads();
}
/**
* Initializes the {@link #data}. The initialization is done only once, when
* handshake with the the first namenode is completed.
*/
private synchronized void initFsDataSet(Configuration conf,
AbstractList<File> dataDirs) throws IOException {
if (data != null) { // Already initialized
return;
}
// get version and id info from the name-node
boolean simulatedFSDataset =
conf.getBoolean("dfs.datanode.simulateddatastorage", false);
if (simulatedFSDataset) {
storage.createStorageID(getPort());
// it would have been better to pass storage as a parameter to
// constructor below - need to augment ReflectionUtils used below.
conf.set(DFS_DATANODE_STORAGEID_KEY, getStorageId());
try {
data = (FSDatasetInterface) ReflectionUtils.newInstance(
Class.forName(
"org.apache.hadoop.hdfs.server.datanode.SimulatedFSDataset"),
conf);
} catch (ClassNotFoundException e) {
throw new IOException(StringUtils.stringifyException(e));
}
} else {
data = new FSDataset(this, storage, conf);
}
}
/**
* Determine the http server's effective addr
*/
public static InetSocketAddress getInfoAddr(Configuration conf) {
return NetUtils.createSocketAddr(
conf.get("dfs.datanode.http.address", "0.0.0.0:50075"));
}
private void registerMXBean() {
MBeans.register("DataNode", "DataNodeInfo", this);
}
int getPort() {
return selfAddr.getPort();
}
String getStorageId() {
return storage.getStorageID();
}
/**
* Get host:port with host set to Datanode host and port set to the
* port {@link DataXceiver} is serving.
* @return host:port string
*/
public String getMachineName() {
return hostName + ":" + getPort();
}
public int getIpcPort() {
return ipcServer.getListenerAddress().getPort();
}
/**
* get BP registration by blockPool id
* @param bpid
* @return BP registration object
* @throws IOException
*/
DatanodeRegistration getDNRegistrationForBP(String bpid)
throws IOException {
BPOfferService bpos = blockPoolManager.get(bpid);
if(bpos==null || bpos.bpRegistration==null) {
throw new IOException("cannot find BPOfferService for bpid="+bpid);
}
return bpos.bpRegistration;
}
/**
* get BP registration by machine and port name (host:port)
* @param mName
* @return BP registration
* @throws IOException
*/
DatanodeRegistration getDNRegistrationByMachineName(String mName) {
BPOfferService [] bposArray = blockPoolManager.getAllNamenodeThreads();
for (BPOfferService bpos : bposArray) {
if(bpos.bpRegistration.getName().equals(mName))
return bpos.bpRegistration;
}
return null;
}
/**
* Creates either NIO or regular depending on socketWriteTimeout.
*/
protected Socket newSocket() throws IOException {
return (socketWriteTimeout > 0) ?
SocketChannel.open().socket() : new Socket();
}
public static InterDatanodeProtocol createInterDataNodeProtocolProxy(
DatanodeID datanodeid, final Configuration conf, final int socketTimeout)
throws IOException {
final InetSocketAddress addr = NetUtils.createSocketAddr(
datanodeid.getHost() + ":" + datanodeid.getIpcPort());
if (InterDatanodeProtocol.LOG.isDebugEnabled()) {
InterDatanodeProtocol.LOG.debug("InterDatanodeProtocol addr=" + addr);
}
UserGroupInformation loginUgi = UserGroupInformation.getLoginUser();
try {
return loginUgi
.doAs(new PrivilegedExceptionAction<InterDatanodeProtocol>() {
public InterDatanodeProtocol run() throws IOException {
return (InterDatanodeProtocol) RPC.getProxy(
InterDatanodeProtocol.class, InterDatanodeProtocol.versionID,
addr, UserGroupInformation.getCurrentUser(), conf,
NetUtils.getDefaultSocketFactory(conf), socketTimeout);
}
});
} catch (InterruptedException ie) {
throw new IOException(ie.getMessage());
}
}
/**
* get the name node address based on the block pool id
* @param bpid block pool ID
* @return namenode address corresponding to the bpid
*/
public InetSocketAddress getNameNodeAddr(String bpid) {
BPOfferService bp = blockPoolManager.get(bpid);
if (bp != null) {
return bp.getNNSocketAddress();
}
LOG.warn("No name node address found for block pool ID " + bpid);
return null;
}
public InetSocketAddress getSelfAddr() {
return selfAddr;
}
DataNodeMetrics getMetrics() {
return metrics;
}
public static void setNewStorageID(DatanodeID dnId) {
LOG.info("Datanode is " + dnId);
dnId.storageID = createNewStorageId(dnId.getPort());
}
static String createNewStorageId(int port) {
/* Return
* "DS-randInt-ipaddr-currentTimeMillis"
* It is considered extermely rare for all these numbers to match
* on a different machine accidentally for the following
* a) SecureRandom(INT_MAX) is pretty much random (1 in 2 billion), and
* b) Good chance ip address would be different, and
* c) Even on the same machine, Datanode is designed to use different ports.
* d) Good chance that these are started at different times.
* For a confict to occur all the 4 above have to match!.
* The format of this string can be changed anytime in future without
* affecting its functionality.
*/
String ip = "unknownIP";
try {
ip = DNS.getDefaultIP("default");
} catch (UnknownHostException ignored) {
LOG.warn("Could not find ip address of \"default\" inteface.");
}
int rand = new SecureRandom().nextInt(Integer.MAX_VALUE);
return "DS-" + rand + "-" + ip + "-" + port + "-"
+ System.currentTimeMillis();
}
/**
* Shut down this instance of the datanode.
* Returns only after shutdown is complete.
* This method can only be called by the offerService thread.
* Otherwise, deadlock might occur.
*/
public void shutdown() {
if (plugins != null) {
for (ServicePlugin p : plugins) {
try {
p.stop();
LOG.info("Stopped plug-in " + p);
} catch (Throwable t) {
LOG.warn("ServicePlugin " + p + " could not be stopped", t);
}
}
}
shutdownPeriodicScanners();
if (infoServer != null) {
try {
infoServer.stop();
} catch (Exception e) {
LOG.warn("Exception shutting down DataNode", e);
}
}
if (ipcServer != null) {
ipcServer.stop();
}
this.shouldRun = false;
if (dataXceiverServer != null) {
((DataXceiverServer) this.dataXceiverServer.getRunnable()).kill();
this.dataXceiverServer.interrupt();
// wait for all data receiver threads to exit
if (this.threadGroup != null) {
int sleepMs = 2;
while (true) {
this.threadGroup.interrupt();
LOG.info("Waiting for threadgroup to exit, active threads is " +
this.threadGroup.activeCount());
if (this.threadGroup.activeCount() == 0) {
break;
}
try {
Thread.sleep(sleepMs);
} catch (InterruptedException e) {}
sleepMs = sleepMs * 3 / 2; // exponential backoff
if (sleepMs > 1000) {
sleepMs = 1000;
}
}
}
// wait for dataXceiveServer to terminate
try {
this.dataXceiverServer.join();
} catch (InterruptedException ie) {
}
}
if(blockPoolManager != null) {
try {
this.blockPoolManager.shutDownAll();
} catch (InterruptedException ie) {
LOG.warn("Received exception in BlockPoolManager#shutDownAll: ", ie);
}
}
if (storage != null) {
try {
this.storage.unlockAll();
} catch (IOException ie) {
LOG.warn("Exception when unlocking storage: " + ie, ie);
}
}
if (data != null) {
data.shutdown();
}
if (metrics != null) {
metrics.shutdown();
}
}
/** Check if there is no space in disk
* @param e that caused this checkDiskError call
**/
protected void checkDiskError(Exception e ) throws IOException {
LOG.warn("checkDiskError: exception: ", e);
if (e.getMessage() != null &&
e.getMessage().startsWith("No space left on device")) {
throw new DiskOutOfSpaceException("No space left on device");
} else {
checkDiskError();
}
}
/**
* Check if there is a disk failure and if so, handle the error
*
**/
protected void checkDiskError( ) {
try {
data.checkDataDir();
} catch (DiskErrorException de) {
handleDiskError(de.getMessage());
}
}
private void handleDiskError(String errMsgr) {
final boolean hasEnoughResources = data.hasEnoughResource();
LOG.warn("DataNode.handleDiskError: Keep Running: " + hasEnoughResources);
// If we have enough active valid volumes then we do not want to
// shutdown the DN completely.
int dpError = hasEnoughResources ? DatanodeProtocol.DISK_ERROR
: DatanodeProtocol.FATAL_DISK_ERROR;
metrics.incrVolumeFailures();
//inform NameNodes
for(BPOfferService bpos: blockPoolManager.getAllNamenodeThreads()) {
DatanodeProtocol nn = bpos.bpNamenode;
try {
nn.errorReport(bpos.bpRegistration, dpError, errMsgr);
} catch(IOException e) {
LOG.warn("Error reporting disk failure to NameNode: " +
StringUtils.stringifyException(e));
}
}
if(hasEnoughResources) {
scheduleAllBlockReport(0);
return; // do not shutdown
}
LOG.warn("DataNode is shutting down: " + errMsgr);
shouldRun = false;
}
/** Number of concurrent xceivers per node. */
int getXceiverCount() {
return threadGroup == null ? 0 : threadGroup.activeCount();
}
UpgradeManagerDatanode getUpgradeManagerDatanode(String bpid) {
BPOfferService bpos = blockPoolManager.get(bpid);
if(bpos==null) {
return null;
}
return bpos.getUpgradeManager();
}
private void transferBlock( ExtendedBlock block,
DatanodeInfo xferTargets[]
) throws IOException {
DatanodeProtocol nn = getBPNamenode(block.getBlockPoolId());
DatanodeRegistration bpReg = getDNRegistrationForBP(block.getBlockPoolId());
if (!data.isValidBlock(block)) {
// block does not exist or is under-construction
String errStr = "Can't send invalid block " + block;
LOG.info(errStr);
nn.errorReport(bpReg, DatanodeProtocol.INVALID_BLOCK, errStr);
return;
}
// Check if NN recorded length matches on-disk length
long onDiskLength = data.getLength(block);
if (block.getNumBytes() > onDiskLength) {
// Shorter on-disk len indicates corruption so report NN the corrupt block
nn.reportBadBlocks(new LocatedBlock[]{
new LocatedBlock(block, new DatanodeInfo[] {
new DatanodeInfo(bpReg)})});
LOG.info("Can't replicate block " + block
+ " because on-disk length " + onDiskLength
+ " is shorter than NameNode recorded length " + block.getNumBytes());
return;
}
int numTargets = xferTargets.length;
if (numTargets > 0) {
if (LOG.isInfoEnabled()) {
StringBuilder xfersBuilder = new StringBuilder();
for (int i = 0; i < numTargets; i++) {
xfersBuilder.append(xferTargets[i].getName());
xfersBuilder.append(" ");
}
LOG.info(bpReg + " Starting thread to transfer block " +
block + " to " + xfersBuilder);
}
new Daemon(new DataTransfer(xferTargets, block,
BlockConstructionStage.PIPELINE_SETUP_CREATE, "")).start();
}
}
private void transferBlocks(String poolId, Block blocks[],
DatanodeInfo xferTargets[][]) {
for (int i = 0; i < blocks.length; i++) {
try {
transferBlock(new ExtendedBlock(poolId, blocks[i]), xferTargets[i]);
} catch (IOException ie) {
LOG.warn("Failed to transfer block " + blocks[i], ie);
}
}
}
/* ********************************************************************
Protocol when a client reads data from Datanode (Cur Ver: 9):
Client's Request :
=================
Processed in DataXceiver:
+----------------------------------------------+
| Common Header | 1 byte OP == OP_READ_BLOCK |
+----------------------------------------------+
Processed in readBlock() :
+-------------------------------------------------------------------------+
| 8 byte Block ID | 8 byte genstamp | 8 byte start offset | 8 byte length |
+-------------------------------------------------------------------------+
| vInt length | <DFSClient id> |
+-----------------------------------+
Client sends optional response only at the end of receiving data.
DataNode Response :
===================
In readBlock() :
If there is an error while initializing BlockSender :
+---------------------------+
| 2 byte OP_STATUS_ERROR | and connection will be closed.
+---------------------------+
Otherwise
+---------------------------+
| 2 byte OP_STATUS_SUCCESS |
+---------------------------+
Actual data, sent by BlockSender.sendBlock() :
ChecksumHeader :
+--------------------------------------------------+
| 1 byte CHECKSUM_TYPE | 4 byte BYTES_PER_CHECKSUM |
+--------------------------------------------------+
Followed by actual data in the form of PACKETS:
+------------------------------------+
| Sequence of data PACKETs .... |
+------------------------------------+
A "PACKET" is defined further below.
The client reads data until it receives a packet with
"LastPacketInBlock" set to true or with a zero length. If there is
no checksum error, it replies to DataNode with OP_STATUS_CHECKSUM_OK:
Client optional response at the end of data transmission of any length:
+------------------------------+
| 2 byte OP_STATUS_CHECKSUM_OK |
+------------------------------+
The DataNode always checks OP_STATUS_CHECKSUM_OK. It will close the
client connection if it is absent.
PACKET : Contains a packet header, checksum and data. Amount of data
======== carried is set by BUFFER_SIZE.
+-----------------------------------------------------+
| 4 byte packet length (excluding packet header) |
+-----------------------------------------------------+
| 8 byte offset in the block | 8 byte sequence number |
+-----------------------------------------------------+
| 1 byte isLastPacketInBlock |
+-----------------------------------------------------+
| 4 byte Length of actual data |
+-----------------------------------------------------+
| x byte checksum data. x is defined below |
+-----------------------------------------------------+
| actual data ...... |
+-----------------------------------------------------+
x = (length of data + BYTE_PER_CHECKSUM - 1)/BYTES_PER_CHECKSUM *
CHECKSUM_SIZE
CHECKSUM_SIZE depends on CHECKSUM_TYPE (usually, 4 for CRC32)
The above packet format is used while writing data to DFS also.
Not all the fields might be used while reading.
************************************************************************ */
/**
* Used for transferring a block of data. This class
* sends a piece of data to another DataNode.
*/
private class DataTransfer implements Runnable {
final DatanodeInfo[] targets;
final ExtendedBlock b;
final BlockConstructionStage stage;
final private DatanodeRegistration bpReg;
final String clientname;
/**
* Connect to the first item in the target list. Pass along the
* entire target list, the block, and the data.
*/
DataTransfer(DatanodeInfo targets[], ExtendedBlock b, BlockConstructionStage stage,
final String clientname) throws IOException {
if (DataTransferProtocol.LOG.isDebugEnabled()) {
DataTransferProtocol.LOG.debug(getClass().getSimpleName() + ": "
+ b + " (numBytes=" + b.getNumBytes() + ")"
+ ", stage=" + stage
+ ", clientname=" + clientname
+ ", targests=" + Arrays.asList(targets));
}
this.targets = targets;
this.b = b;
this.stage = stage;
BPOfferService bpos = blockPoolManager.get(b.getBlockPoolId());
bpReg = bpos.bpRegistration;
this.clientname = clientname;
}
/**
* Do the deed, write the bytes
*/
public void run() {
xmitsInProgress.getAndIncrement();
Socket sock = null;
DataOutputStream out = null;
DataInputStream in = null;
BlockSender blockSender = null;
final boolean isClient = clientname.length() > 0;
try {
InetSocketAddress curTarget =
NetUtils.createSocketAddr(targets[0].getName());
sock = newSocket();
NetUtils.connect(sock, curTarget, socketTimeout);
sock.setSoTimeout(targets.length * socketTimeout);
long writeTimeout = socketWriteTimeout +
HdfsConstants.WRITE_TIMEOUT_EXTENSION * (targets.length-1);
OutputStream baseStream = NetUtils.getOutputStream(sock, writeTimeout);
out = new DataOutputStream(new BufferedOutputStream(baseStream,
SMALL_BUFFER_SIZE));
blockSender = new BlockSender(b, 0, b.getNumBytes(),
false, false, false, DataNode.this);
DatanodeInfo srcNode = new DatanodeInfo(bpReg);
//
// Header info
//
Token<BlockTokenIdentifier> accessToken = BlockTokenSecretManager.DUMMY_TOKEN;
if (isBlockTokenEnabled) {
accessToken = blockPoolTokenSecretManager.generateToken(b,
EnumSet.of(BlockTokenSecretManager.AccessMode.WRITE));
}
DataTransferProtocol.Sender.opWriteBlock(out,
b, 0, stage, 0, 0, 0, clientname, srcNode, targets, accessToken);
// send data & checksum
blockSender.sendBlock(out, baseStream, null);
// no response necessary
LOG.info(getClass().getSimpleName() + ": Transmitted " + b
+ " (numBytes=" + b.getNumBytes() + ") to " + curTarget);
// read ack
if (isClient) {
in = new DataInputStream(NetUtils.getInputStream(sock));
final DataTransferProtocol.Status s = DataTransferProtocol.Status.read(in);
if (LOG.isDebugEnabled()) {
LOG.debug(getClass().getSimpleName() + ": close-ack=" + s);
}
if (s != SUCCESS) {
if (s == ERROR_ACCESS_TOKEN) {
throw new InvalidBlockTokenException(
"Got access token error for connect ack, targets="
+ Arrays.asList(targets));
} else {
throw new IOException("Bad connect ack, targets="
+ Arrays.asList(targets));
}
}
}
} catch (IOException ie) {
LOG.warn(bpReg + ":Failed to transfer " + b + " to " + targets[0].getName()
+ " got " + StringUtils.stringifyException(ie));
// check if there are any disk problem
checkDiskError();
} finally {
xmitsInProgress.getAndDecrement();
IOUtils.closeStream(blockSender);
IOUtils.closeStream(out);
IOUtils.closeStream(in);
IOUtils.closeSocket(sock);
}
}
}
/**
* After a block becomes finalized, a datanode increases metric counter,
* notifies namenode, and adds it to the block scanner
* @param block
* @param delHint
*/
void closeBlock(ExtendedBlock block, String delHint) {
metrics.incrBlocksWritten();
BPOfferService bpos = blockPoolManager.get(block.getBlockPoolId());
if(bpos != null) {
bpos.notifyNamenodeReceivedBlock(block, delHint);
} else {
LOG.warn("Cannot find BPOfferService for reporting block received for bpid="
+ block.getBlockPoolId());
}
if (blockScanner != null) {
blockScanner.addBlock(block);
}
}
/** Start a single datanode daemon and wait for it to finish.
* If this thread is specifically interrupted, it will stop waiting.
*/
public void runDatanodeDaemon() throws IOException {
blockPoolManager.startAll();
// start dataXceiveServer
dataXceiverServer.start();
ipcServer.start();
startPlugins(conf);
}
/**
* A data node is considered to be up if one of the bp services is up
*/
public boolean isDatanodeUp() {
for (BPOfferService bp : blockPoolManager.getAllNamenodeThreads()) {
if (bp.isAlive()) {
return true;
}
}
return false;
}
/** Instantiate a single datanode object. This must be run by invoking
* {@link DataNode#runDatanodeDaemon()} subsequently.
*/
public static DataNode instantiateDataNode(String args[],
Configuration conf) throws IOException {
return instantiateDataNode(args, conf, null);
}
/** Instantiate a single datanode object, along with its secure resources.
* This must be run by invoking{@link DataNode#runDatanodeDaemon()}
* subsequently.
*/
public static DataNode instantiateDataNode(String args [], Configuration conf,
SecureResources resources) throws IOException {
if (conf == null)
conf = new HdfsConfiguration();
if (args != null) {
// parse generic hadoop options
GenericOptionsParser hParser = new GenericOptionsParser(conf, args);
args = hParser.getRemainingArgs();
}
if (!parseArguments(args, conf)) {
printUsage();
return null;
}
if (conf.get("dfs.network.script") != null) {
LOG.error("This configuration for rack identification is not supported" +
" anymore. RackID resolution is handled by the NameNode.");
System.exit(-1);
}
Collection<URI> dataDirs = getStorageDirs(conf);
dnThreadName = "DataNode: [" +
StringUtils.uriToString(dataDirs.toArray(new URI[0])) + "]";
UserGroupInformation.setConfiguration(conf);
SecurityUtil.login(conf, DFS_DATANODE_KEYTAB_FILE_KEY,
DFS_DATANODE_USER_NAME_KEY);
return makeInstance(dataDirs, conf, resources);
}
static Collection<URI> getStorageDirs(Configuration conf) {
Collection<String> dirNames =
conf.getTrimmedStringCollection(DFS_DATANODE_DATA_DIR_KEY);
return Util.stringCollectionAsURIs(dirNames);
}
/** Instantiate & Start a single datanode daemon and wait for it to finish.
* If this thread is specifically interrupted, it will stop waiting.
*/
public static DataNode createDataNode(String args[],
Configuration conf) throws IOException {
return createDataNode(args, conf, null);
}
/** Instantiate & Start a single datanode daemon and wait for it to finish.
* If this thread is specifically interrupted, it will stop waiting.
*/
@InterfaceAudience.Private
public static DataNode createDataNode(String args[], Configuration conf,
SecureResources resources) throws IOException {
DataNode dn = instantiateDataNode(args, conf, resources);
if (dn != null) {
dn.runDatanodeDaemon();
}
return dn;
}
void join() {
while (shouldRun) {
try {
blockPoolManager.joinAll();
Thread.sleep(2000);
} catch (InterruptedException ex) {
LOG.warn("Received exception in Datanode#join: " + ex);
}
}
}
/**
* Make an instance of DataNode after ensuring that at least one of the
* given data directories (and their parent directories, if necessary)
* can be created.
* @param dataDirs List of directories, where the new DataNode instance should
* keep its files.
* @param conf Configuration instance to use.
* @param resources Secure resources needed to run under Kerberos
* @return DataNode instance for given list of data dirs and conf, or null if
* no directory from this directory list can be created.
* @throws IOException
*/
static DataNode makeInstance(Collection<URI> dataDirs, Configuration conf,
SecureResources resources) throws IOException {
LocalFileSystem localFS = FileSystem.getLocal(conf);
FsPermission permission = new FsPermission(
conf.get(DFS_DATANODE_DATA_DIR_PERMISSION_KEY,
DFS_DATANODE_DATA_DIR_PERMISSION_DEFAULT));
ArrayList<File> dirs = getDataDirsFromURIs(dataDirs, localFS, permission);
DefaultMetricsSystem.initialize("DataNode");
assert dirs.size() > 0 : "number of data directories should be > 0";
return new DataNode(conf, dirs, resources);
}
// DataNode ctor expects AbstractList instead of List or Collection...
static ArrayList<File> getDataDirsFromURIs(Collection<URI> dataDirs,
LocalFileSystem localFS, FsPermission permission) throws IOException {
ArrayList<File> dirs = new ArrayList<File>();
StringBuilder invalidDirs = new StringBuilder();
for (URI dirURI : dataDirs) {
if (!"file".equalsIgnoreCase(dirURI.getScheme())) {
LOG.warn("Unsupported URI schema in " + dirURI + ". Ignoring ...");
invalidDirs.append("\"").append(dirURI).append("\" ");
continue;
}
// drop any (illegal) authority in the URI for backwards compatibility
File data = new File(dirURI.getPath());
try {
DiskChecker.checkDir(localFS, new Path(data.toURI()), permission);
dirs.add(data);
} catch (IOException e) {
LOG.warn("Invalid directory in: "
+ DFS_DATANODE_DATA_DIR_KEY + ": ", e);
invalidDirs.append("\"").append(data.getCanonicalPath()).append("\" ");
}
}
if (dirs.size() == 0)
throw new IOException("All directories in "
+ DFS_DATANODE_DATA_DIR_KEY + " are invalid: "
+ invalidDirs);
return dirs;
}
@Override
public String toString() {
return "DataNode{data=" + data + ", localName='" + getMachineName()
+ "', storageID='" + getStorageId() + "', xmitsInProgress="
+ xmitsInProgress.get() + "}";
}
private static void printUsage() {
System.err.println("Usage: java DataNode");
System.err.println(" [-rollback]");
}
/**
* Parse and verify command line arguments and set configuration parameters.
*
* @return false if passed argements are incorrect
*/
private static boolean parseArguments(String args[],
Configuration conf) {
int argsLen = (args == null) ? 0 : args.length;
StartupOption startOpt = StartupOption.REGULAR;
for(int i=0; i < argsLen; i++) {
String cmd = args[i];
if ("-r".equalsIgnoreCase(cmd) || "--rack".equalsIgnoreCase(cmd)) {
LOG.error("-r, --rack arguments are not supported anymore. RackID " +
"resolution is handled by the NameNode.");
System.exit(-1);
} else if ("-rollback".equalsIgnoreCase(cmd)) {
startOpt = StartupOption.ROLLBACK;
} else if ("-regular".equalsIgnoreCase(cmd)) {
startOpt = StartupOption.REGULAR;
} else
return false;
}
setStartupOption(conf, startOpt);
return true;
}
private static void setStartupOption(Configuration conf, StartupOption opt) {
conf.set("dfs.datanode.startup", opt.toString());
}
static StartupOption getStartupOption(Configuration conf) {
return StartupOption.valueOf(conf.get("dfs.datanode.startup",
StartupOption.REGULAR.toString()));
}
/**
* This methods arranges for the data node to send
* the block report at the next heartbeat.
*/
public void scheduleAllBlockReport(long delay) {
for(BPOfferService bpos : blockPoolManager.getAllNamenodeThreads()) {
bpos.scheduleBlockReport(delay);
}
}
/**
* This method is used for testing.
* Examples are adding and deleting blocks directly.
* The most common usage will be when the data node's storage is similated.
*
* @return the fsdataset that stores the blocks
*/
public FSDatasetInterface getFSDataset() {
return data;
}
public static void secureMain(String args[], SecureResources resources) {
try {
StringUtils.startupShutdownMessage(DataNode.class, args, LOG);
DataNode datanode = createDataNode(args, null, resources);
if (datanode != null)
datanode.join();
} catch (Throwable e) {
LOG.error(StringUtils.stringifyException(e));
System.exit(-1);
}
}
public static void main(String args[]) {
secureMain(args, null);
}
public Daemon recoverBlocks(final Collection<RecoveringBlock> blocks) {
Daemon d = new Daemon(threadGroup, new Runnable() {
/** Recover a list of blocks. It is run by the primary datanode. */
public void run() {
for(RecoveringBlock b : blocks) {
try {
logRecoverBlock("NameNode", b.getBlock(), b.getLocations());
recoverBlock(b);
} catch (IOException e) {
LOG.warn("recoverBlocks FAILED: " + b, e);
}
}
}
});
d.start();
return d;
}
// InterDataNodeProtocol implementation
@Override // InterDatanodeProtocol
public ReplicaRecoveryInfo initReplicaRecovery(RecoveringBlock rBlock)
throws IOException {
return data.initReplicaRecovery(rBlock);
}
/**
* Convenience method, which unwraps RemoteException.
* @throws IOException not a RemoteException.
*/
private static ReplicaRecoveryInfo callInitReplicaRecovery(
InterDatanodeProtocol datanode,
RecoveringBlock rBlock) throws IOException {
try {
return datanode.initReplicaRecovery(rBlock);
} catch(RemoteException re) {
throw re.unwrapRemoteException();
}
}
/**
* Update replica with the new generation stamp and length.
*/
@Override // InterDatanodeProtocol
public ExtendedBlock updateReplicaUnderRecovery(ExtendedBlock oldBlock,
long recoveryId,
long newLength) throws IOException {
ReplicaInfo r = data.updateReplicaUnderRecovery(oldBlock,
recoveryId, newLength);
return new ExtendedBlock(oldBlock.getBlockPoolId(), r);
}
/** {@inheritDoc} */
public long getProtocolVersion(String protocol, long clientVersion
) throws IOException {
if (protocol.equals(InterDatanodeProtocol.class.getName())) {
return InterDatanodeProtocol.versionID;
} else if (protocol.equals(ClientDatanodeProtocol.class.getName())) {
return ClientDatanodeProtocol.versionID;
}
throw new IOException("Unknown protocol to " + getClass().getSimpleName()
+ ": " + protocol);
}
@Override
public ProtocolSignature getProtocolSignature(String protocol,
long clientVersion, int clientMethodsHash) throws IOException {
return ProtocolSignature.getProtocolSignature(
this, protocol, clientVersion, clientMethodsHash);
}
/** A convenient class used in block recovery */
static class BlockRecord {
final DatanodeID id;
final InterDatanodeProtocol datanode;
final ReplicaRecoveryInfo rInfo;
BlockRecord(DatanodeID id,
InterDatanodeProtocol datanode,
ReplicaRecoveryInfo rInfo) {
this.id = id;
this.datanode = datanode;
this.rInfo = rInfo;
}
/** {@inheritDoc} */
public String toString() {
return "block:" + rInfo + " node:" + id;
}
}
/** Recover a block */
private void recoverBlock(RecoveringBlock rBlock) throws IOException {
ExtendedBlock block = rBlock.getBlock();
String blookPoolId = block.getBlockPoolId();
DatanodeInfo[] targets = rBlock.getLocations();
DatanodeID[] datanodeids = (DatanodeID[])targets;
List<BlockRecord> syncList = new ArrayList<BlockRecord>(datanodeids.length);
int errorCount = 0;
//check generation stamps
for(DatanodeID id : datanodeids) {
try {
BPOfferService bpos = blockPoolManager.get(blookPoolId);
DatanodeRegistration bpReg = bpos.bpRegistration;
InterDatanodeProtocol datanode = bpReg.equals(id)?
this: DataNode.createInterDataNodeProtocolProxy(id, getConf(),
socketTimeout);
ReplicaRecoveryInfo info = callInitReplicaRecovery(datanode, rBlock);
if (info != null &&
info.getGenerationStamp() >= block.getGenerationStamp() &&
info.getNumBytes() > 0) {
syncList.add(new BlockRecord(id, datanode, info));
}
} catch (RecoveryInProgressException ripE) {
InterDatanodeProtocol.LOG.warn(
"Recovery for replica " + block + " on data-node " + id
+ " is already in progress. Recovery id = "
+ rBlock.getNewGenerationStamp() + " is aborted.", ripE);
return;
} catch (IOException e) {
++errorCount;
InterDatanodeProtocol.LOG.warn(
"Failed to obtain replica info for block (=" + block
+ ") from datanode (=" + id + ")", e);
}
}
if (errorCount == datanodeids.length) {
throw new IOException("All datanodes failed: block=" + block
+ ", datanodeids=" + Arrays.asList(datanodeids));
}
syncBlock(rBlock, syncList);
}
/**
* Get namenode corresponding to a block pool
* @param bpid Block pool Id
* @return Namenode corresponding to the bpid
* @throws IOException
*/
public DatanodeProtocol getBPNamenode(String bpid) throws IOException {
BPOfferService bpos = blockPoolManager.get(bpid);
if(bpos == null || bpos.bpNamenode == null) {
throw new IOException("cannot find a namnode proxy for bpid=" + bpid);
}
return bpos.bpNamenode;
}
/**
* To be used by tests only to set a mock namenode in BPOfferService
*/
void setBPNamenode(String bpid, DatanodeProtocol namenode) {
BPOfferService bp = blockPoolManager.get(bpid);
if (bp != null) {
bp.setNameNode(namenode);
}
}
/** Block synchronization */
void syncBlock(RecoveringBlock rBlock,
List<BlockRecord> syncList) throws IOException {
ExtendedBlock block = rBlock.getBlock();
DatanodeProtocol nn = getBPNamenode(block.getBlockPoolId());
long recoveryId = rBlock.getNewGenerationStamp();
if (LOG.isDebugEnabled()) {
LOG.debug("block=" + block + ", (length=" + block.getNumBytes()
+ "), syncList=" + syncList);
}
// syncList.isEmpty() means that all data-nodes do not have the block
// or their replicas have 0 length.
// The block can be deleted.
if (syncList.isEmpty()) {
nn.commitBlockSynchronization(block, recoveryId, 0,
true, true, DatanodeID.EMPTY_ARRAY);
return;
}
// Calculate the best available replica state.
ReplicaState bestState = ReplicaState.RWR;
long finalizedLength = -1;
for(BlockRecord r : syncList) {
assert r.rInfo.getNumBytes() > 0 : "zero length replica";
ReplicaState rState = r.rInfo.getOriginalReplicaState();
if(rState.getValue() < bestState.getValue())
bestState = rState;
if(rState == ReplicaState.FINALIZED) {
if(finalizedLength > 0 && finalizedLength != r.rInfo.getNumBytes())
throw new IOException("Inconsistent size of finalized replicas. " +
"Replica " + r.rInfo + " expected size: " + finalizedLength);
finalizedLength = r.rInfo.getNumBytes();
}
}
// Calculate list of nodes that will participate in the recovery
// and the new block size
List<BlockRecord> participatingList = new ArrayList<BlockRecord>();
final ExtendedBlock newBlock = new ExtendedBlock(block.getBlockPoolId(), block
.getBlockId(), -1, recoveryId);
switch(bestState) {
case FINALIZED:
assert finalizedLength > 0 : "finalizedLength is not positive";
for(BlockRecord r : syncList) {
ReplicaState rState = r.rInfo.getOriginalReplicaState();
if(rState == ReplicaState.FINALIZED ||
rState == ReplicaState.RBW &&
r.rInfo.getNumBytes() == finalizedLength)
participatingList.add(r);
}
newBlock.setNumBytes(finalizedLength);
break;
case RBW:
case RWR:
long minLength = Long.MAX_VALUE;
for(BlockRecord r : syncList) {
ReplicaState rState = r.rInfo.getOriginalReplicaState();
if(rState == bestState) {
minLength = Math.min(minLength, r.rInfo.getNumBytes());
participatingList.add(r);
}
}
newBlock.setNumBytes(minLength);
break;
case RUR:
case TEMPORARY:
assert false : "bad replica state: " + bestState;
}
List<DatanodeID> failedList = new ArrayList<DatanodeID>();
List<DatanodeID> successList = new ArrayList<DatanodeID>();
for(BlockRecord r : participatingList) {
try {
ExtendedBlock reply = r.datanode.updateReplicaUnderRecovery(
new ExtendedBlock(newBlock.getBlockPoolId(), r.rInfo), recoveryId,
newBlock.getNumBytes());
assert reply.equals(newBlock) &&
reply.getNumBytes() == newBlock.getNumBytes() :
"Updated replica must be the same as the new block.";
successList.add(r.id);
} catch (IOException e) {
InterDatanodeProtocol.LOG.warn("Failed to updateBlock (newblock="
+ newBlock + ", datanode=" + r.id + ")", e);
failedList.add(r.id);
}
}
// If any of the data-nodes failed, the recovery fails, because
// we never know the actual state of the replica on failed data-nodes.
// The recovery should be started over.
if(!failedList.isEmpty()) {
StringBuilder b = new StringBuilder();
for(DatanodeID id : failedList) {
b.append("\n " + id);
}
throw new IOException("Cannot recover " + block + ", the following "
+ failedList.size() + " data-nodes failed {" + b + "\n}");
}
// Notify the name-node about successfully recovered replicas.
DatanodeID[] nlist = successList.toArray(new DatanodeID[successList.size()]);
nn.commitBlockSynchronization(block,
newBlock.getGenerationStamp(), newBlock.getNumBytes(), true, false,
nlist);
}
private static void logRecoverBlock(String who,
ExtendedBlock block, DatanodeID[] targets) {
StringBuilder msg = new StringBuilder(targets[0].getName());
for (int i = 1; i < targets.length; i++) {
msg.append(", " + targets[i].getName());
}
LOG.info(who + " calls recoverBlock(block=" + block
+ ", targets=[" + msg + "])");
}
// ClientDataNodeProtocol implementation
/** {@inheritDoc} */
@Override // ClientDataNodeProtocol
public long getReplicaVisibleLength(final ExtendedBlock block) throws IOException {
checkWriteAccess(block);
return data.getReplicaVisibleLength(block);
}
private void checkWriteAccess(final ExtendedBlock block) throws IOException {
if (isBlockTokenEnabled) {
Set<TokenIdentifier> tokenIds = UserGroupInformation.getCurrentUser()
.getTokenIdentifiers();
if (tokenIds.size() != 1) {
throw new IOException("Can't continue since none or more than one "
+ "BlockTokenIdentifier is found.");
}
for (TokenIdentifier tokenId : tokenIds) {
BlockTokenIdentifier id = (BlockTokenIdentifier) tokenId;
if (LOG.isDebugEnabled()) {
LOG.debug("Got: " + id.toString());
}
blockPoolTokenSecretManager.checkAccess(id, null, block,
BlockTokenSecretManager.AccessMode.READ);
}
}
}
/**
* Transfer a replica to the datanode targets.
* @param b the block to transfer.
* The corresponding replica must be an RBW or a Finalized.
* Its GS and numBytes will be set to
* the stored GS and the visible length.
* @param targets
* @param client
*/
void transferReplicaForPipelineRecovery(final ExtendedBlock b,
final DatanodeInfo[] targets, final String client) throws IOException {
final long storedGS;
final long visible;
final BlockConstructionStage stage;
//get replica information
synchronized(data) {
if (data.isValidRbw(b)) {
stage = BlockConstructionStage.TRANSFER_RBW;
} else if (data.isValidBlock(b)) {
stage = BlockConstructionStage.TRANSFER_FINALIZED;
} else {
final String r = data.getReplicaString(b.getBlockPoolId(), b.getBlockId());
throw new IOException(b + " is neither a RBW nor a Finalized, r=" + r);
}
storedGS = data.getStoredBlock(b.getBlockPoolId(),
b.getBlockId()).getGenerationStamp();
if (storedGS < b.getGenerationStamp()) {
throw new IOException(
storedGS + " = storedGS < b.getGenerationStamp(), b=" + b);
}
visible = data.getReplicaVisibleLength(b);
}
//set storedGS and visible length
b.setGenerationStamp(storedGS);
b.setNumBytes(visible);
if (targets.length > 0) {
new DataTransfer(targets, b, stage, client).run();
}
}
// Determine a Datanode's streaming address
public static InetSocketAddress getStreamingAddr(Configuration conf) {
return NetUtils.createSocketAddr(
conf.get("dfs.datanode.address", "0.0.0.0:50010"));
}
@Override // DataNodeMXBean
public String getVersion() {
return VersionInfo.getVersion();
}
@Override // DataNodeMXBean
public String getRpcPort(){
InetSocketAddress ipcAddr = NetUtils.createSocketAddr(
this.getConf().get("dfs.datanode.ipc.address"));
return Integer.toString(ipcAddr.getPort());
}
@Override // DataNodeMXBean
public String getHttpPort(){
return this.getConf().get("dfs.datanode.info.port");
}
public int getInfoPort(){
return this.infoServer.getPort();
}
/**
* Returned information is a JSON representation of a map with
* name node host name as the key and block pool Id as the value
*/
@Override // DataNodeMXBean
public String getNamenodeAddresses() {
final Map<String, String> info = new HashMap<String, String>();
for (BPOfferService bpos : blockPoolManager.getAllNamenodeThreads()) {
if (bpos != null && bpos.bpThread != null) {
info.put(bpos.getNNSocketAddress().getHostName(), bpos.blockPoolId);
}
}
return JSON.toString(info);
}
/**
* Returned information is a JSON representation of a map with
* volume name as the key and value is a map of volume attribute
* keys to its values
*/
@Override // DataNodeMXBean
public String getVolumeInfo() {
final Map<String, Object> info = new HashMap<String, Object>();
Collection<VolumeInfo> volumes = ((FSDataset)this.data).getVolumeInfo();
for (VolumeInfo v : volumes) {
final Map<String, Object> innerInfo = new HashMap<String, Object>();
innerInfo.put("usedSpace", v.usedSpace);
innerInfo.put("freeSpace", v.freeSpace);
innerInfo.put("reservedSpace", v.reservedSpace);
info.put(v.directory, innerInfo);
}
return JSON.toString(info);
}
@Override // DataNodeMXBean
public synchronized String getClusterId() {
return clusterId;
}
public void refreshNamenodes(Configuration conf) throws IOException {
try {
blockPoolManager.refreshNamenodes(conf);
} catch (InterruptedException ex) {
IOException eio = new IOException();
eio.initCause(ex);
throw eio;
}
}
@Override //ClientDatanodeProtocol
public void refreshNamenodes() throws IOException {
conf = new Configuration();
refreshNamenodes(conf);
}
@Override // ClientDatanodeProtocol
public void deleteBlockPool(String blockPoolId, boolean force)
throws IOException {
LOG.info("deleteBlockPool command received for block pool " + blockPoolId
+ ", force=" + force);
if (blockPoolManager.get(blockPoolId) != null) {
LOG.warn("The block pool "+blockPoolId+
" is still running, cannot be deleted.");
throw new IOException(
"The block pool is still running. First do a refreshNamenodes to " +
"shutdown the block pool service");
}
data.deleteBlockPool(blockPoolId, force);
}
/**
* @param addr rpc address of the namenode
* @return true - if BPOfferService corresponding to the namenode is alive
*/
public boolean isBPServiceAlive(InetSocketAddress addr) {
BPOfferService bp = blockPoolManager.get(addr);
return bp != null ? bp.isAlive() : false;
}
/**
* @param bpid block pool Id
* @return true - if BPOfferService thread is alive
*/
public boolean isBPServiceAlive(String bpid) {
BPOfferService bp = blockPoolManager.get(bpid);
return bp != null ? bp.isAlive() : false;
}
/**
* A datanode is considered to be fully started if all the BP threads are
* alive and all the block pools are initialized.
*
* @return true - if the data node is fully started
*/
public boolean isDatanodeFullyStarted() {
for (BPOfferService bp : blockPoolManager.getAllNamenodeThreads()) {
if (!bp.initialized() || !bp.isAlive()) {
return false;
}
}
return true;
}
/** Methods used by fault injection tests */
public DatanodeID getDatanodeId() {
return new DatanodeID(getMachineName(), getStorageId(),
infoServer.getPort(), getIpcPort());
}
}