blob: a038a56493028ccffec837bacd20396599e7f36c [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hdfs.server.namenode;
import java.io.File;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.net.URI;
import java.security.PrivilegedExceptionAction;
import java.util.Arrays;
import java.util.Collection;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.CommonConfigurationKeys;
import org.apache.hadoop.fs.ContentSummary;
import org.apache.hadoop.fs.CreateFlag;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.FsServerDefaults;
import org.apache.hadoop.fs.Options;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.Trash;
import org.apache.hadoop.fs.UnresolvedLinkException;
import org.apache.hadoop.fs.permission.FsPermission;
import org.apache.hadoop.fs.permission.PermissionStatus;
import org.apache.hadoop.hdfs.DFSConfigKeys;
import org.apache.hadoop.hdfs.DFSUtil;
import org.apache.hadoop.hdfs.HDFSPolicyProvider;
import org.apache.hadoop.hdfs.HdfsConfiguration;
import org.apache.hadoop.hdfs.protocol.Block;
import org.apache.hadoop.hdfs.protocol.BlockListAsLongs;
import org.apache.hadoop.hdfs.protocol.ClientProtocol;
import org.apache.hadoop.hdfs.protocol.CorruptFileBlocks;
import org.apache.hadoop.hdfs.protocol.DatanodeID;
import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
import org.apache.hadoop.hdfs.protocol.DirectoryListing;
import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
import org.apache.hadoop.hdfs.protocol.FSConstants;
import org.apache.hadoop.hdfs.protocol.HdfsFileStatus;
import org.apache.hadoop.hdfs.protocol.LocatedBlock;
import org.apache.hadoop.hdfs.protocol.LocatedBlocks;
import org.apache.hadoop.hdfs.protocol.UnregisteredNodeException;
import org.apache.hadoop.hdfs.protocol.UnresolvedPathException;
import org.apache.hadoop.hdfs.security.token.block.ExportedBlockKeys;
import org.apache.hadoop.hdfs.security.token.delegation.DelegationTokenIdentifier;
import org.apache.hadoop.hdfs.server.common.HdfsConstants.NamenodeRole;
import org.apache.hadoop.hdfs.server.common.HdfsConstants.StartupOption;
import org.apache.hadoop.hdfs.server.common.IncorrectVersionException;
import org.apache.hadoop.hdfs.server.common.JspHelper;
import org.apache.hadoop.hdfs.server.common.UpgradeStatusReport;
import org.apache.hadoop.hdfs.server.namenode.metrics.NameNodeMetrics;
import org.apache.hadoop.hdfs.server.protocol.BlocksWithLocations;
import org.apache.hadoop.hdfs.server.protocol.DatanodeCommand;
import org.apache.hadoop.hdfs.server.protocol.DatanodeProtocol;
import org.apache.hadoop.hdfs.server.protocol.DatanodeRegistration;
import org.apache.hadoop.hdfs.server.protocol.NamenodeCommand;
import org.apache.hadoop.hdfs.server.protocol.NamenodeProtocol;
import org.apache.hadoop.hdfs.server.protocol.NamenodeProtocols;
import org.apache.hadoop.hdfs.server.protocol.NamenodeRegistration;
import org.apache.hadoop.hdfs.server.protocol.NamespaceInfo;
import org.apache.hadoop.hdfs.server.protocol.NodeRegistration;
import org.apache.hadoop.hdfs.server.protocol.UpgradeCommand;
import org.apache.hadoop.http.HttpServer;
import org.apache.hadoop.io.EnumSetWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.ipc.ProtocolSignature;
import org.apache.hadoop.ipc.RPC;
import org.apache.hadoop.ipc.Server;
import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem;
import org.apache.hadoop.net.NetUtils;
import org.apache.hadoop.net.NetworkTopology;
import org.apache.hadoop.net.Node;
import org.apache.hadoop.security.AccessControlException;
import org.apache.hadoop.security.Groups;
import org.apache.hadoop.security.RefreshUserMappingsProtocol;
import org.apache.hadoop.security.SecurityUtil;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.security.authorize.AccessControlList;
import org.apache.hadoop.security.authorize.AuthorizationException;
import org.apache.hadoop.security.authorize.ProxyUsers;
import org.apache.hadoop.security.authorize.RefreshAuthorizationPolicyProtocol;
import org.apache.hadoop.security.token.SecretManager.InvalidToken;
import org.apache.hadoop.security.token.Token;
import org.apache.hadoop.tools.GetUserMappingsProtocol;
import org.apache.hadoop.util.ServicePlugin;
import org.apache.hadoop.util.StringUtils;
/**********************************************************
* NameNode serves as both directory namespace manager and
* "inode table" for the Hadoop DFS. There is a single NameNode
* running in any DFS deployment. (Well, except when there
* is a second backup/failover NameNode.)
*
* The NameNode controls two critical tables:
* 1) filename->blocksequence (namespace)
* 2) block->machinelist ("inodes")
*
* The first table is stored on disk and is very precious.
* The second table is rebuilt every time the NameNode comes
* up.
*
* 'NameNode' refers to both this class as well as the 'NameNode server'.
* The 'FSNamesystem' class actually performs most of the filesystem
* management. The majority of the 'NameNode' class itself is concerned
* with exposing the IPC interface and the http server to the outside world,
* plus some configuration management.
*
* NameNode implements the ClientProtocol interface, which allows
* clients to ask for DFS services. ClientProtocol is not
* designed for direct use by authors of DFS client code. End-users
* should instead use the org.apache.nutch.hadoop.fs.FileSystem class.
*
* NameNode also implements the DatanodeProtocol interface, used by
* DataNode programs that actually store DFS data blocks. These
* methods are invoked repeatedly and automatically by all the
* DataNodes in a DFS deployment.
*
* NameNode also implements the NamenodeProtocol interface, used by
* secondary namenodes or rebalancing processes to get partial namenode's
* state, for example partial blocksMap etc.
**********************************************************/
@InterfaceAudience.Private
public class NameNode implements NamenodeProtocols, FSConstants {
static{
Configuration.addDefaultResource("hdfs-default.xml");
Configuration.addDefaultResource("hdfs-site.xml");
}
/**
* HDFS federation configuration can have two types of parameters:
* <ol>
* <li>Parameter that is common for all the name services in the cluster.</li>
* <li>Parameters that are specific to a name service. This keys are suffixed
* with nameserviceId in the configuration. For example,
* "dfs.namenode.rpc-address.nameservice1".</li>
* </ol>
*
* Following are nameservice specific keys.
*/
public static final String[] NAMESERVICE_SPECIFIC_KEYS = {
DFSConfigKeys.DFS_NAMENODE_RPC_ADDRESS_KEY,
DFSConfigKeys.DFS_NAMENODE_NAME_DIR_KEY,
DFSConfigKeys.DFS_NAMENODE_EDITS_DIR_KEY,
DFSConfigKeys.DFS_NAMENODE_CHECKPOINT_DIR_KEY,
DFSConfigKeys.DFS_NAMENODE_CHECKPOINT_EDITS_DIR_KEY,
DFSConfigKeys.DFS_NAMENODE_SERVICE_RPC_ADDRESS_KEY,
DFSConfigKeys.DFS_NAMENODE_HTTP_ADDRESS_KEY,
DFSConfigKeys.DFS_NAMENODE_HTTPS_ADDRESS_KEY,
DFSConfigKeys.DFS_NAMENODE_KEYTAB_FILE_KEY,
DFSConfigKeys.DFS_NAMENODE_SECONDARY_HTTP_ADDRESS_KEY,
DFSConfigKeys.DFS_SECONDARY_NAMENODE_KEYTAB_FILE_KEY,
DFSConfigKeys.DFS_NAMENODE_BACKUP_ADDRESS_KEY,
DFSConfigKeys.DFS_NAMENODE_BACKUP_HTTP_ADDRESS_KEY,
DFSConfigKeys.DFS_NAMENODE_BACKUP_SERVICE_RPC_ADDRESS_KEY
};
public long getProtocolVersion(String protocol,
long clientVersion) throws IOException {
if (protocol.equals(ClientProtocol.class.getName())) {
return ClientProtocol.versionID;
} else if (protocol.equals(DatanodeProtocol.class.getName())){
return DatanodeProtocol.versionID;
} else if (protocol.equals(NamenodeProtocol.class.getName())){
return NamenodeProtocol.versionID;
} else if (protocol.equals(RefreshAuthorizationPolicyProtocol.class.getName())){
return RefreshAuthorizationPolicyProtocol.versionID;
} else if (protocol.equals(RefreshUserMappingsProtocol.class.getName())){
return RefreshUserMappingsProtocol.versionID;
} else if (protocol.equals(GetUserMappingsProtocol.class.getName())){
return GetUserMappingsProtocol.versionID;
} else {
throw new IOException("Unknown protocol to name node: " + protocol);
}
}
@Override
public ProtocolSignature getProtocolSignature(String protocol,
long clientVersion, int clientMethodsHash) throws IOException {
return ProtocolSignature.getProtocolSignature(
this, protocol, clientVersion, clientMethodsHash);
}
public static final int DEFAULT_PORT = 8020;
public static final Log LOG = LogFactory.getLog(NameNode.class.getName());
public static final Log stateChangeLog = LogFactory.getLog("org.apache.hadoop.hdfs.StateChange");
public static final String NAMENODE_ADDRESS_ATTRIBUTE_KEY = "name.node.address";
protected FSNamesystem namesystem;
protected NamenodeRole role;
/** RPC server. Package-protected for use in tests. */
Server server;
/** RPC server for HDFS Services communication.
BackupNode, Datanodes and all other services
should be connecting to this server if it is
configured. Clients should only go to NameNode#server
*/
protected Server serviceRpcServer;
/** RPC server address */
protected InetSocketAddress rpcAddress = null;
/** RPC server for DN address */
protected InetSocketAddress serviceRPCAddress = null;
/** httpServer */
protected HttpServer httpServer;
/** HTTP server address */
protected InetSocketAddress httpAddress = null;
private Thread emptier;
/** only used for testing purposes */
protected boolean stopRequested = false;
/** Registration information of this name-node */
protected NamenodeRegistration nodeRegistration;
/** Is service level authorization enabled? */
private boolean serviceAuthEnabled = false;
/** Activated plug-ins. */
private List<ServicePlugin> plugins;
/** Format a new filesystem. Destroys any filesystem that may already
* exist at this location. **/
public static void format(Configuration conf) throws IOException {
format(conf, false);
}
static NameNodeMetrics metrics;
/** Return the {@link FSNamesystem} object.
* @return {@link FSNamesystem} object.
*/
FSNamesystem getNamesystem() {
return namesystem;
}
static void initMetrics(Configuration conf, NamenodeRole role) {
metrics = NameNodeMetrics.create(conf, role);
}
public static NameNodeMetrics getNameNodeMetrics() {
return metrics;
}
public static InetSocketAddress getAddress(String address) {
return NetUtils.createSocketAddr(address, DEFAULT_PORT);
}
/**
* Set the configuration property for the service rpc address
* to address
*/
public static void setServiceAddress(Configuration conf,
String address) {
LOG.info("Setting ADDRESS " + address);
conf.set(DFSConfigKeys.DFS_NAMENODE_SERVICE_RPC_ADDRESS_KEY, address);
}
/**
* Fetches the address for services to use when connecting to namenode
* based on the value of fallback returns null if the special
* address is not specified or returns the default namenode address
* to be used by both clients and services.
* Services here are datanodes, backup node, any non client connection
*/
public static InetSocketAddress getServiceAddress(Configuration conf,
boolean fallback) {
String addr = conf.get(DFSConfigKeys.DFS_NAMENODE_SERVICE_RPC_ADDRESS_KEY);
if (addr == null || addr.isEmpty()) {
return fallback ? getAddress(conf) : null;
}
return getAddress(addr);
}
public static InetSocketAddress getAddress(Configuration conf) {
URI filesystemURI = FileSystem.getDefaultUri(conf);
return getAddress(filesystemURI);
}
/**
* TODO:FEDERATION
* @param filesystemURI
* @return address of file system
*/
public static InetSocketAddress getAddress(URI filesystemURI) {
String authority = filesystemURI.getAuthority();
if (authority == null) {
throw new IllegalArgumentException(String.format(
"Invalid URI for NameNode address (check %s): %s has no authority.",
FileSystem.FS_DEFAULT_NAME_KEY, filesystemURI.toString()));
}
if (!FSConstants.HDFS_URI_SCHEME.equalsIgnoreCase(
filesystemURI.getScheme())) {
throw new IllegalArgumentException(String.format(
"Invalid URI for NameNode address (check %s): %s is not of scheme '%s'.",
FileSystem.FS_DEFAULT_NAME_KEY, filesystemURI.toString(),
FSConstants.HDFS_URI_SCHEME));
}
return getAddress(authority);
}
public static URI getUri(InetSocketAddress namenode) {
int port = namenode.getPort();
String portString = port == DEFAULT_PORT ? "" : (":"+port);
return URI.create(FSConstants.HDFS_URI_SCHEME + "://"
+ namenode.getHostName()+portString);
}
/**
* Compose a "host:port" string from the address.
*/
public static String getHostPortString(InetSocketAddress addr) {
return addr.getHostName() + ":" + addr.getPort();
}
//
// Common NameNode methods implementation for the active name-node role.
//
public NamenodeRole getRole() {
return role;
}
boolean isRole(NamenodeRole that) {
return role.equals(that);
}
/**
* Given a configuration get the address of the service rpc server
* If the service rpc is not configured returns null
*/
protected InetSocketAddress getServiceRpcServerAddress(Configuration conf)
throws IOException {
return NameNode.getServiceAddress(conf, false);
}
protected InetSocketAddress getRpcServerAddress(Configuration conf) throws IOException {
return getAddress(conf);
}
/**
* Modifies the configuration passed to contain the service rpc address setting
*/
protected void setRpcServiceServerAddress(Configuration conf) {
setServiceAddress(conf, getHostPortString(serviceRPCAddress));
}
protected void setRpcServerAddress(Configuration conf) {
FileSystem.setDefaultUri(conf, getUri(rpcAddress));
}
protected InetSocketAddress getHttpServerAddress(Configuration conf) {
return NetUtils.createSocketAddr(
conf.get(DFSConfigKeys.DFS_NAMENODE_HTTP_ADDRESS_KEY, "0.0.0.0:50070"));
}
protected void setHttpServerAddress(Configuration conf){
conf.set(DFSConfigKeys.DFS_NAMENODE_HTTP_ADDRESS_KEY, getHostPortString(httpAddress));
}
protected void loadNamesystem(Configuration conf) throws IOException {
this.namesystem = new FSNamesystem(conf);
}
NamenodeRegistration getRegistration() {
return nodeRegistration;
}
NamenodeRegistration setRegistration() {
nodeRegistration = new NamenodeRegistration(
getHostPortString(rpcAddress),
getHostPortString(httpAddress),
getFSImage().getStorage(), getRole(), getFSImage().getStorage().getCheckpointTime());
return nodeRegistration;
}
/**
* Initialize name-node.
*
* @param conf the configuration
*/
protected void initialize(Configuration conf) throws IOException {
InetSocketAddress socAddr = getRpcServerAddress(conf);
UserGroupInformation.setConfiguration(conf);
SecurityUtil.login(conf, DFSConfigKeys.DFS_NAMENODE_KEYTAB_FILE_KEY,
DFSConfigKeys.DFS_NAMENODE_USER_NAME_KEY, socAddr.getHostName());
int handlerCount =
conf.getInt(DFSConfigKeys.DFS_DATANODE_HANDLER_COUNT_KEY,
DFSConfigKeys.DFS_DATANODE_HANDLER_COUNT_DEFAULT);
NameNode.initMetrics(conf, this.getRole());
loadNamesystem(conf);
// create rpc server
InetSocketAddress dnSocketAddr = getServiceRpcServerAddress(conf);
if (dnSocketAddr != null) {
int serviceHandlerCount =
conf.getInt(DFSConfigKeys.DFS_NAMENODE_SERVICE_HANDLER_COUNT_KEY,
DFSConfigKeys.DFS_NAMENODE_SERVICE_HANDLER_COUNT_DEFAULT);
this.serviceRpcServer = RPC.getServer(NamenodeProtocols.class, this,
dnSocketAddr.getHostName(), dnSocketAddr.getPort(), serviceHandlerCount,
false, conf, namesystem.getDelegationTokenSecretManager());
this.serviceRPCAddress = this.serviceRpcServer.getListenerAddress();
setRpcServiceServerAddress(conf);
}
this.server = RPC.getServer(NamenodeProtocols.class, this,
socAddr.getHostName(), socAddr.getPort(),
handlerCount, false, conf,
namesystem.getDelegationTokenSecretManager());
// set service-level authorization security policy
if (serviceAuthEnabled =
conf.getBoolean(
CommonConfigurationKeys.HADOOP_SECURITY_AUTHORIZATION, false)) {
this.server.refreshServiceAcl(conf, new HDFSPolicyProvider());
if (this.serviceRpcServer != null) {
this.serviceRpcServer.refreshServiceAcl(conf, new HDFSPolicyProvider());
}
}
// The rpc-server port can be ephemeral... ensure we have the correct info
this.rpcAddress = this.server.getListenerAddress();
setRpcServerAddress(conf);
activate(conf);
LOG.info(getRole() + " up at: " + rpcAddress);
if (serviceRPCAddress != null) {
LOG.info(getRole() + " service server is up at: " + serviceRPCAddress);
}
}
/**
* Activate name-node servers and threads.
*/
void activate(Configuration conf) throws IOException {
if ((isRole(NamenodeRole.ACTIVE))
&& (UserGroupInformation.isSecurityEnabled())) {
namesystem.activateSecretManager();
}
namesystem.activate(conf);
startHttpServer(conf);
server.start(); //start RPC server
if (serviceRpcServer != null) {
serviceRpcServer.start();
}
startTrashEmptier(conf);
plugins = conf.getInstances("dfs.namenode.plugins", ServicePlugin.class);
for (ServicePlugin p: plugins) {
try {
p.start(this);
} catch (Throwable t) {
LOG.warn("ServicePlugin " + p + " could not be started", t);
}
}
}
private void startTrashEmptier(Configuration conf) throws IOException {
long trashInterval
= conf.getLong(CommonConfigurationKeys.FS_TRASH_INTERVAL_KEY,
CommonConfigurationKeys.FS_TRASH_INTERVAL_DEFAULT);
if(trashInterval == 0)
return;
this.emptier = new Thread(new Trash(conf).getEmptier(), "Trash Emptier");
this.emptier.setDaemon(true);
this.emptier.start();
}
private void startHttpServer(final Configuration conf) throws IOException {
final InetSocketAddress infoSocAddr = getHttpServerAddress(conf);
final String infoHost = infoSocAddr.getHostName();
if(UserGroupInformation.isSecurityEnabled()) {
String httpsUser = SecurityUtil.getServerPrincipal(conf
.get(DFSConfigKeys.DFS_NAMENODE_KRB_HTTPS_USER_NAME_KEY), infoHost);
if (httpsUser == null) {
LOG.warn(DFSConfigKeys.DFS_NAMENODE_KRB_HTTPS_USER_NAME_KEY
+ " not defined in config. Starting http server as "
+ SecurityUtil.getServerPrincipal(conf
.get(DFSConfigKeys.DFS_NAMENODE_USER_NAME_KEY), rpcAddress
.getHostName())
+ ": Kerberized SSL may be not function correctly.");
} else {
// Kerberized SSL servers must be run from the host principal...
LOG.info("Logging in as " + httpsUser + " to start http server.");
SecurityUtil.login(conf, DFSConfigKeys.DFS_NAMENODE_KEYTAB_FILE_KEY,
DFSConfigKeys.DFS_NAMENODE_KRB_HTTPS_USER_NAME_KEY, infoHost);
}
}
UserGroupInformation ugi = UserGroupInformation.getLoginUser();
try {
this.httpServer = ugi.doAs(new PrivilegedExceptionAction<HttpServer>() {
@Override
public HttpServer run() throws IOException, InterruptedException {
int infoPort = infoSocAddr.getPort();
httpServer = new HttpServer("hdfs", infoHost, infoPort,
infoPort == 0, conf,
new AccessControlList(conf.get(DFSConfigKeys.DFS_ADMIN, " ")));
boolean certSSL = conf.getBoolean("dfs.https.enable", false);
boolean useKrb = UserGroupInformation.isSecurityEnabled();
if (certSSL || useKrb) {
boolean needClientAuth = conf.getBoolean(
DFSConfigKeys.DFS_CLIENT_HTTPS_NEED_AUTH_KEY,
DFSConfigKeys.DFS_CLIENT_HTTPS_NEED_AUTH_DEFAULT);
InetSocketAddress secInfoSocAddr = NetUtils.createSocketAddr(conf
.get(DFSConfigKeys.DFS_NAMENODE_HTTPS_ADDRESS_KEY,
DFSConfigKeys.DFS_NAMENODE_HTTPS_ADDRESS_DEFAULT));
Configuration sslConf = new HdfsConfiguration(false);
if (certSSL) {
sslConf.addResource(conf.get(
"dfs.https.server.keystore.resource", "ssl-server.xml"));
}
httpServer.addSslListener(secInfoSocAddr, sslConf, needClientAuth,
useKrb);
// assume same ssl port for all datanodes
InetSocketAddress datanodeSslPort = NetUtils.createSocketAddr(conf
.get("dfs.datanode.https.address", infoHost + ":" + 50475));
httpServer.setAttribute("datanode.https.port", datanodeSslPort
.getPort());
}
httpServer.setAttribute("name.node", NameNode.this);
httpServer.setAttribute(NAMENODE_ADDRESS_ATTRIBUTE_KEY,
getNameNodeAddress());
httpServer.setAttribute("name.system.image", getFSImage());
httpServer.setAttribute(JspHelper.CURRENT_CONF, conf);
httpServer.addInternalServlet("getDelegationToken",
GetDelegationTokenServlet.PATH_SPEC,
GetDelegationTokenServlet.class, true);
httpServer.addInternalServlet("renewDelegationToken",
RenewDelegationTokenServlet.PATH_SPEC,
RenewDelegationTokenServlet.class, true);
httpServer.addInternalServlet("cancelDelegationToken",
CancelDelegationTokenServlet.PATH_SPEC,
CancelDelegationTokenServlet.class, true);
httpServer.addInternalServlet("fsck", "/fsck", FsckServlet.class,
true);
httpServer.addInternalServlet("getimage", "/getimage",
GetImageServlet.class, true);
httpServer.addInternalServlet("listPaths", "/listPaths/*",
ListPathsServlet.class, false);
httpServer.addInternalServlet("data", "/data/*",
FileDataServlet.class, false);
httpServer.addInternalServlet("checksum", "/fileChecksum/*",
FileChecksumServlets.RedirectServlet.class, false);
httpServer.addInternalServlet("contentSummary", "/contentSummary/*",
ContentSummaryServlet.class, false);
httpServer.start();
// The web-server port can be ephemeral... ensure we have the correct
// info
infoPort = httpServer.getPort();
httpAddress = new InetSocketAddress(infoHost, infoPort);
setHttpServerAddress(conf);
LOG.info(getRole() + " Web-server up at: " + httpAddress);
return httpServer;
}
});
} catch (InterruptedException e) {
throw new IOException(e);
} finally {
if(UserGroupInformation.isSecurityEnabled() &&
conf.get(DFSConfigKeys.DFS_NAMENODE_KRB_HTTPS_USER_NAME_KEY) != null) {
// Go back to being the correct Namenode principal
LOG.info("Logging back in as "
+ SecurityUtil.getServerPrincipal(conf
.get(DFSConfigKeys.DFS_NAMENODE_USER_NAME_KEY), rpcAddress
.getHostName()) + " following http server start.");
SecurityUtil.login(conf, DFSConfigKeys.DFS_NAMENODE_KEYTAB_FILE_KEY,
DFSConfigKeys.DFS_NAMENODE_USER_NAME_KEY, rpcAddress.getHostName());
}
}
}
/**
* Start NameNode.
* <p>
* The name-node can be started with one of the following startup options:
* <ul>
* <li>{@link StartupOption#REGULAR REGULAR} - normal name node startup</li>
* <li>{@link StartupOption#FORMAT FORMAT} - format name node</li>
* <li>{@link StartupOption#BACKUP BACKUP} - start backup node</li>
* <li>{@link StartupOption#CHECKPOINT CHECKPOINT} - start checkpoint node</li>
* <li>{@link StartupOption#UPGRADE UPGRADE} - start the cluster
* upgrade and create a snapshot of the current file system state</li>
* <li>{@link StartupOption#ROLLBACK ROLLBACK} - roll the
* cluster back to the previous state</li>
* <li>{@link StartupOption#FINALIZE FINALIZE} - finalize
* previous upgrade</li>
* <li>{@link StartupOption#IMPORT IMPORT} - import checkpoint</li>
* </ul>
* The option is passed via configuration field:
* <tt>dfs.namenode.startup</tt>
*
* The conf will be modified to reflect the actual ports on which
* the NameNode is up and running if the user passes the port as
* <code>zero</code> in the conf.
*
* @param conf confirguration
* @throws IOException
*/
public NameNode(Configuration conf) throws IOException {
this(conf, NamenodeRole.ACTIVE);
}
protected NameNode(Configuration conf, NamenodeRole role)
throws IOException {
this.role = role;
try {
initializeGenericKeys(conf);
initialize(conf);
} catch (IOException e) {
this.stop();
throw e;
}
}
/**
* Wait for service to finish.
* (Normally, it runs forever.)
*/
public void join() {
try {
this.server.join();
} catch (InterruptedException ie) {
}
}
/**
* Stop all NameNode threads and wait for all to finish.
*/
public void stop() {
synchronized(this) {
if (stopRequested)
return;
stopRequested = true;
}
if (plugins != null) {
for (ServicePlugin p : plugins) {
try {
p.stop();
} catch (Throwable t) {
LOG.warn("ServicePlugin " + p + " could not be stopped", t);
}
}
}
try {
if (httpServer != null) httpServer.stop();
} catch (Exception e) {
LOG.error(StringUtils.stringifyException(e));
}
if(namesystem != null) namesystem.close();
if(emptier != null) emptier.interrupt();
if(server != null) server.stop();
if(serviceRpcServer != null) serviceRpcServer.stop();
if (metrics != null) {
metrics.shutdown();
}
if (namesystem != null) {
namesystem.shutdown();
}
}
synchronized boolean isStopRequested() {
return stopRequested;
}
/////////////////////////////////////////////////////
// NamenodeProtocol
/////////////////////////////////////////////////////
@Override // NamenodeProtocol
public BlocksWithLocations getBlocks(DatanodeInfo datanode, long size)
throws IOException {
if(size <= 0) {
throw new IllegalArgumentException(
"Unexpected not positive size: "+size);
}
return namesystem.getBlocks(datanode, size);
}
/** {@inheritDoc} */
public ExportedBlockKeys getBlockKeys() throws IOException {
return namesystem.getBlockKeys();
}
@Override // NamenodeProtocol
public void errorReport(NamenodeRegistration registration,
int errorCode,
String msg) throws IOException {
verifyRequest(registration);
LOG.info("Error report from " + registration + ": " + msg);
if(errorCode == FATAL)
namesystem.releaseBackupNode(registration);
}
@Override // NamenodeProtocol
public NamenodeRegistration register(NamenodeRegistration registration)
throws IOException {
verifyVersion(registration.getVersion());
namesystem.registerBackupNode(registration);
return setRegistration();
}
@Override // NamenodeProtocol
public NamenodeCommand startCheckpoint(NamenodeRegistration registration)
throws IOException {
verifyRequest(registration);
if(!isRole(NamenodeRole.ACTIVE))
throw new IOException("Only an ACTIVE node can invoke startCheckpoint.");
return namesystem.startCheckpoint(registration, setRegistration());
}
@Override // NamenodeProtocol
public void endCheckpoint(NamenodeRegistration registration,
CheckpointSignature sig) throws IOException {
verifyRequest(registration);
if(!isRole(NamenodeRole.ACTIVE))
throw new IOException("Only an ACTIVE node can invoke endCheckpoint.");
namesystem.endCheckpoint(registration, sig);
}
@Override // NamenodeProtocol
public long journalSize(NamenodeRegistration registration)
throws IOException {
verifyRequest(registration);
return namesystem.getEditLogSize();
}
/*
* Active name-node cannot journal.
*/
@Override // NamenodeProtocol
public void journal(NamenodeRegistration registration,
int jAction,
int length,
byte[] args) throws IOException {
throw new UnsupportedActionException("journal");
}
/////////////////////////////////////////////////////
// ClientProtocol
/////////////////////////////////////////////////////
public Token<DelegationTokenIdentifier> getDelegationToken(Text renewer)
throws IOException {
return namesystem.getDelegationToken(renewer);
}
@Override
public long renewDelegationToken(Token<DelegationTokenIdentifier> token)
throws InvalidToken, IOException {
return namesystem.renewDelegationToken(token);
}
@Override
public void cancelDelegationToken(Token<DelegationTokenIdentifier> token)
throws IOException {
namesystem.cancelDelegationToken(token);
}
/** {@inheritDoc} */
public LocatedBlocks getBlockLocations(String src,
long offset,
long length)
throws IOException {
metrics.incrGetBlockLocations();
return namesystem.getBlockLocations(getClientMachine(),
src, offset, length);
}
private static String getClientMachine() {
String clientMachine = Server.getRemoteAddress();
if (clientMachine == null) {
clientMachine = "";
}
return clientMachine;
}
/** {@inheritDoc} */
public FsServerDefaults getServerDefaults() throws IOException {
return namesystem.getServerDefaults();
}
/** {@inheritDoc} */
public void create(String src,
FsPermission masked,
String clientName,
EnumSetWritable<CreateFlag> flag,
boolean createParent,
short replication,
long blockSize) throws IOException {
String clientMachine = getClientMachine();
if (stateChangeLog.isDebugEnabled()) {
stateChangeLog.debug("*DIR* NameNode.create: file "
+src+" for "+clientName+" at "+clientMachine);
}
if (!checkPathLength(src)) {
throw new IOException("create: Pathname too long. Limit "
+ MAX_PATH_LENGTH + " characters, " + MAX_PATH_DEPTH + " levels.");
}
namesystem.startFile(src,
new PermissionStatus(UserGroupInformation.getCurrentUser().getShortUserName(),
null, masked),
clientName, clientMachine, flag.get(), createParent, replication, blockSize);
metrics.incrFilesCreated();
metrics.incrCreateFileOps();
}
/** {@inheritDoc} */
public LocatedBlock append(String src, String clientName)
throws IOException {
String clientMachine = getClientMachine();
if (stateChangeLog.isDebugEnabled()) {
stateChangeLog.debug("*DIR* NameNode.append: file "
+src+" for "+clientName+" at "+clientMachine);
}
LocatedBlock info = namesystem.appendFile(src, clientName, clientMachine);
metrics.incrFilesAppended();
return info;
}
/** {@inheritDoc} */
public boolean setReplication(String src, short replication)
throws IOException {
return namesystem.setReplication(src, replication);
}
/** {@inheritDoc} */
public void setPermission(String src, FsPermission permissions)
throws IOException {
namesystem.setPermission(src, permissions);
}
/** {@inheritDoc} */
public void setOwner(String src, String username, String groupname)
throws IOException {
namesystem.setOwner(src, username, groupname);
}
@Override
public LocatedBlock addBlock(String src,
String clientName,
ExtendedBlock previous,
DatanodeInfo[] excludedNodes)
throws IOException {
if(stateChangeLog.isDebugEnabled()) {
stateChangeLog.debug("*BLOCK* NameNode.addBlock: file "
+src+" for "+clientName);
}
HashMap<Node, Node> excludedNodesSet = null;
if (excludedNodes != null) {
excludedNodesSet = new HashMap<Node, Node>(excludedNodes.length);
for (Node node:excludedNodes) {
excludedNodesSet.put(node, node);
}
}
LocatedBlock locatedBlock =
namesystem.getAdditionalBlock(src, clientName, previous, excludedNodesSet);
if (locatedBlock != null)
metrics.incrAddBlockOps();
return locatedBlock;
}
@Override
public LocatedBlock getAdditionalDatanode(final String src, final ExtendedBlock blk,
final DatanodeInfo[] existings, final DatanodeInfo[] excludes,
final int numAdditionalNodes, final String clientName
) throws IOException {
if (LOG.isDebugEnabled()) {
LOG.debug("getAdditionalDatanode: src=" + src
+ ", blk=" + blk
+ ", existings=" + Arrays.asList(existings)
+ ", excludes=" + Arrays.asList(excludes)
+ ", numAdditionalNodes=" + numAdditionalNodes
+ ", clientName=" + clientName);
}
metrics.incrGetAdditionalDatanodeOps();
HashMap<Node, Node> excludeSet = null;
if (excludes != null) {
excludeSet = new HashMap<Node, Node>(excludes.length);
for (Node node : excludes) {
excludeSet.put(node, node);
}
}
return namesystem.getAdditionalDatanode(src, blk,
existings, excludeSet, numAdditionalNodes, clientName);
}
/**
* The client needs to give up on the block.
*/
public void abandonBlock(ExtendedBlock b, String src, String holder)
throws IOException {
if(stateChangeLog.isDebugEnabled()) {
stateChangeLog.debug("*BLOCK* NameNode.abandonBlock: "
+b+" of file "+src);
}
if (!namesystem.abandonBlock(b, src, holder)) {
throw new IOException("Cannot abandon block during write to " + src);
}
}
/** {@inheritDoc} */
public boolean complete(String src, String clientName, ExtendedBlock last)
throws IOException {
if(stateChangeLog.isDebugEnabled()) {
stateChangeLog.debug("*DIR* NameNode.complete: "
+ src + " for " + clientName);
}
return namesystem.completeFile(src, clientName, last);
}
/**
* The client has detected an error on the specified located blocks
* and is reporting them to the server. For now, the namenode will
* mark the block as corrupt. In the future we might
* check the blocks are actually corrupt.
*/
public void reportBadBlocks(LocatedBlock[] blocks) throws IOException {
stateChangeLog.info("*DIR* NameNode.reportBadBlocks");
for (int i = 0; i < blocks.length; i++) {
ExtendedBlock blk = blocks[i].getBlock();
DatanodeInfo[] nodes = blocks[i].getLocations();
for (int j = 0; j < nodes.length; j++) {
DatanodeInfo dn = nodes[j];
namesystem.markBlockAsCorrupt(blk, dn);
}
}
}
/** {@inheritDoc} */
@Override
public LocatedBlock updateBlockForPipeline(ExtendedBlock block, String clientName)
throws IOException {
return namesystem.updateBlockForPipeline(block, clientName);
}
@Override
public void updatePipeline(String clientName, ExtendedBlock oldBlock,
ExtendedBlock newBlock, DatanodeID[] newNodes)
throws IOException {
namesystem.updatePipeline(clientName, oldBlock, newBlock, newNodes);
}
/** {@inheritDoc} */
public void commitBlockSynchronization(ExtendedBlock block,
long newgenerationstamp, long newlength,
boolean closeFile, boolean deleteblock, DatanodeID[] newtargets)
throws IOException {
namesystem.commitBlockSynchronization(block,
newgenerationstamp, newlength, closeFile, deleteblock, newtargets);
}
public long getPreferredBlockSize(String filename)
throws IOException {
return namesystem.getPreferredBlockSize(filename);
}
/** {@inheritDoc} */
@Deprecated
@Override
public boolean rename(String src, String dst) throws IOException {
if(stateChangeLog.isDebugEnabled()) {
stateChangeLog.debug("*DIR* NameNode.rename: " + src + " to " + dst);
}
if (!checkPathLength(dst)) {
throw new IOException("rename: Pathname too long. Limit "
+ MAX_PATH_LENGTH + " characters, " + MAX_PATH_DEPTH + " levels.");
}
boolean ret = namesystem.renameTo(src, dst);
if (ret) {
metrics.incrFilesRenamed();
}
return ret;
}
/**
* {@inheritDoc}
*/
public void concat(String trg, String[] src) throws IOException {
namesystem.concat(trg, src);
}
/** {@inheritDoc} */
@Override
public void rename(String src, String dst, Options.Rename... options)
throws IOException {
if(stateChangeLog.isDebugEnabled()) {
stateChangeLog.debug("*DIR* NameNode.rename: " + src + " to " + dst);
}
if (!checkPathLength(dst)) {
throw new IOException("rename: Pathname too long. Limit "
+ MAX_PATH_LENGTH + " characters, " + MAX_PATH_DEPTH + " levels.");
}
namesystem.renameTo(src, dst, options);
metrics.incrFilesRenamed();
}
/**
*/
@Deprecated
public boolean delete(String src) throws IOException {
return delete(src, true);
}
/** {@inheritDoc} */
public boolean delete(String src, boolean recursive) throws IOException {
if (stateChangeLog.isDebugEnabled()) {
stateChangeLog.debug("*DIR* Namenode.delete: src=" + src
+ ", recursive=" + recursive);
}
boolean ret = namesystem.delete(src, recursive);
if (ret)
metrics.incrDeleteFileOps();
return ret;
}
/**
* Check path length does not exceed maximum. Returns true if
* length and depth are okay. Returns false if length is too long
* or depth is too great.
*
*/
private boolean checkPathLength(String src) {
Path srcPath = new Path(src);
return (src.length() <= MAX_PATH_LENGTH &&
srcPath.depth() <= MAX_PATH_DEPTH);
}
/** {@inheritDoc} */
public boolean mkdirs(String src, FsPermission masked, boolean createParent)
throws IOException {
if(stateChangeLog.isDebugEnabled()) {
stateChangeLog.debug("*DIR* NameNode.mkdirs: " + src);
}
if (!checkPathLength(src)) {
throw new IOException("mkdirs: Pathname too long. Limit "
+ MAX_PATH_LENGTH + " characters, " + MAX_PATH_DEPTH + " levels.");
}
return namesystem.mkdirs(src,
new PermissionStatus(UserGroupInformation.getCurrentUser().getShortUserName(),
null, masked), createParent);
}
/**
*/
public void renewLease(String clientName) throws IOException {
namesystem.renewLease(clientName);
}
/**
*/
@Override
public DirectoryListing getListing(String src, byte[] startAfter,
boolean needLocation)
throws IOException {
DirectoryListing files = namesystem.getListing(
src, startAfter, needLocation);
if (files != null) {
metrics.incrGetListingOps();
metrics.incrFilesInGetListingOps(files.getPartialListing().length);
}
return files;
}
/**
* Get the file info for a specific file.
* @param src The string representation of the path to the file
* @return object containing information regarding the file
* or null if file not found
*/
public HdfsFileStatus getFileInfo(String src) throws IOException {
metrics.incrFileInfoOps();
return namesystem.getFileInfo(src, true);
}
/**
* Get the file info for a specific file. If the path refers to a
* symlink then the FileStatus of the symlink is returned.
* @param src The string representation of the path to the file
* @return object containing information regarding the file
* or null if file not found
*/
public HdfsFileStatus getFileLinkInfo(String src) throws IOException {
metrics.incrFileInfoOps();
return namesystem.getFileInfo(src, false);
}
@Override
public long[] getStats() {
return namesystem.getStats();
}
/**
*/
public DatanodeInfo[] getDatanodeReport(DatanodeReportType type)
throws IOException {
DatanodeInfo results[] = namesystem.datanodeReport(type);
if (results == null ) {
throw new IOException("Cannot find datanode report");
}
return results;
}
@Override
public boolean setSafeMode(SafeModeAction action) throws IOException {
return namesystem.setSafeMode(action);
}
/**
* Is the cluster currently in safe mode?
*/
public boolean isInSafeMode() {
return namesystem.isInSafeMode();
}
@Override
public boolean restoreFailedStorage(String arg)
throws AccessControlException {
return namesystem.restoreFailedStorage(arg);
}
@Override
public void saveNamespace() throws IOException {
namesystem.saveNamespace();
}
/**
* Refresh the list of datanodes that the namenode should allow to
* connect. Re-reads conf by creating new HdfsConfiguration object and
* uses the files list in the configuration to update the list.
*/
public void refreshNodes() throws IOException {
namesystem.refreshNodes(new HdfsConfiguration());
}
/**
* Returns the size of the current edit log.
*/
@Deprecated
public long getEditLogSize() throws IOException {
return namesystem.getEditLogSize();
}
/**
* Roll the edit log.
*/
@Deprecated
public CheckpointSignature rollEditLog() throws IOException {
return namesystem.rollEditLog();
}
/**
* Roll the image
*/
@Deprecated @Override
public void rollFsImage(CheckpointSignature sig) throws IOException {
namesystem.rollFSImage(sig);
}
public void finalizeUpgrade() throws IOException {
namesystem.finalizeUpgrade();
}
public UpgradeStatusReport distributedUpgradeProgress(UpgradeAction action)
throws IOException {
return namesystem.distributedUpgradeProgress(action);
}
/**
* Dumps namenode state into specified file
*/
public void metaSave(String filename) throws IOException {
namesystem.metaSave(filename);
}
/**
* {@inheritDoc}
*/
@Override
public CorruptFileBlocks
listCorruptFileBlocks(String path, String cookie)
throws IOException {
Collection<FSNamesystem.CorruptFileBlockInfo> fbs =
namesystem.listCorruptFileBlocks(path, cookie);
String[] files = new String[fbs.size()];
String lastCookie = "";
int i = 0;
for(FSNamesystem.CorruptFileBlockInfo fb: fbs) {
files[i++] = fb.path;
lastCookie = fb.block.getBlockName();
}
return new CorruptFileBlocks(files, lastCookie);
}
/** {@inheritDoc} */
public ContentSummary getContentSummary(String path) throws IOException {
return namesystem.getContentSummary(path);
}
/** {@inheritDoc} */
public void setQuota(String path, long namespaceQuota, long diskspaceQuota)
throws IOException {
namesystem.setQuota(path, namespaceQuota, diskspaceQuota);
}
/** {@inheritDoc} */
public void fsync(String src, String clientName) throws IOException {
namesystem.fsync(src, clientName);
}
@Override
public void setTimes(String src, long mtime, long atime)
throws IOException {
namesystem.setTimes(src, mtime, atime);
}
@Override
public void createSymlink(String target, String link, FsPermission dirPerms,
boolean createParent)
throws IOException {
metrics.incrCreateSymlinkOps();
/* We enforce the MAX_PATH_LENGTH limit even though a symlink target
* URI may refer to a non-HDFS file system.
*/
if (!checkPathLength(link)) {
throw new IOException("Symlink path exceeds " + MAX_PATH_LENGTH +
" character limit");
}
if ("".equals(target)) {
throw new IOException("Invalid symlink target");
}
final UserGroupInformation ugi = UserGroupInformation.getCurrentUser();
namesystem.createSymlink(target, link,
new PermissionStatus(ugi.getShortUserName(), null, dirPerms), createParent);
}
@Override
public String getLinkTarget(String path) throws IOException {
metrics.incrGetLinkTargetOps();
/* Resolves the first symlink in the given path, returning a
* new path consisting of the target of the symlink and any
* remaining path components from the original path.
*/
try {
HdfsFileStatus stat = namesystem.getFileInfo(path, false);
if (stat != null) {
// NB: getSymlink throws IOException if !stat.isSymlink()
return stat.getSymlink();
}
} catch (UnresolvedPathException e) {
return e.getResolvedPath().toString();
} catch (UnresolvedLinkException e) {
// The NameNode should only throw an UnresolvedPathException
throw new AssertionError("UnresolvedLinkException thrown");
}
return null;
}
////////////////////////////////////////////////////////////////
// DatanodeProtocol
////////////////////////////////////////////////////////////////
/**
*/
public DatanodeRegistration registerDatanode(DatanodeRegistration nodeReg)
throws IOException {
verifyVersion(nodeReg.getVersion());
namesystem.registerDatanode(nodeReg);
return nodeReg;
}
/**
* Data node notify the name node that it is alive
* Return an array of block-oriented commands for the datanode to execute.
* This will be either a transfer or a delete operation.
*/
public DatanodeCommand[] sendHeartbeat(DatanodeRegistration nodeReg,
long capacity,
long dfsUsed,
long remaining,
long blockPoolUsed,
int xmitsInProgress,
int xceiverCount,
int failedVolumes) throws IOException {
verifyRequest(nodeReg);
return namesystem.handleHeartbeat(nodeReg, capacity, dfsUsed, remaining,
blockPoolUsed, xceiverCount, xmitsInProgress, failedVolumes);
}
/**
* sends block report to the corresponding namenode (for the poolId)
* @return DataNodeCommand from the namenode
* @throws IOException
*/
public DatanodeCommand blockReport(DatanodeRegistration nodeReg,
String poolId,
long[] blocks) throws IOException {
verifyRequest(nodeReg);
BlockListAsLongs blist = new BlockListAsLongs(blocks);
if(stateChangeLog.isDebugEnabled()) {
stateChangeLog.debug("*BLOCK* NameNode.blockReport: "
+ "from " + nodeReg.getName() + " " + blist.getNumberOfBlocks()
+ " blocks");
}
namesystem.processReport(nodeReg, poolId, blist);
if (getFSImage().isUpgradeFinalized())
return new DatanodeCommand.Finalize(poolId);
return null;
}
public void blockReceived(DatanodeRegistration nodeReg,
String poolId,
Block blocks[],
String delHints[]) throws IOException {
verifyRequest(nodeReg);
if(stateChangeLog.isDebugEnabled()) {
stateChangeLog.debug("*BLOCK* NameNode.blockReceived: "
+"from "+nodeReg.getName()+" "+blocks.length+" blocks.");
}
for (int i = 0; i < blocks.length; i++) {
namesystem.blockReceived(nodeReg, poolId, blocks[i], delHints[i]);
}
}
/**
* Handle an error report from a datanode.
*/
public void errorReport(DatanodeRegistration nodeReg,
int errorCode, String msg) throws IOException {
String dnName = (nodeReg == null ? "unknown DataNode" : nodeReg.getName());
if (errorCode == DatanodeProtocol.NOTIFY) {
LOG.info("Error report from " + dnName + ": " + msg);
return;
}
verifyRequest(nodeReg);
if (errorCode == DatanodeProtocol.DISK_ERROR) {
LOG.warn("Disk error on " + dnName + ": " + msg);
} else if (errorCode == DatanodeProtocol.FATAL_DISK_ERROR) {
LOG.warn("Fatal disk error on " + dnName + ": " + msg);
namesystem.removeDatanode(nodeReg);
} else {
LOG.info("Error report from " + dnName + ": " + msg);
}
}
public NamespaceInfo versionRequest() throws IOException {
return namesystem.getNamespaceInfo();
}
public UpgradeCommand processUpgradeCommand(UpgradeCommand comm) throws IOException {
return namesystem.processDistributedUpgradeCommand(comm);
}
/**
* Verify request.
*
* Verifies correctness of the datanode version, registration ID, and
* if the datanode does not need to be shutdown.
*
* @param nodeReg data node registration
* @throws IOException
*/
public void verifyRequest(NodeRegistration nodeReg) throws IOException {
verifyVersion(nodeReg.getVersion());
if (!namesystem.getRegistrationID().equals(nodeReg.getRegistrationID())) {
LOG.warn("Invalid registrationID - expected: "
+ namesystem.getRegistrationID() + " received: "
+ nodeReg.getRegistrationID());
throw new UnregisteredNodeException(nodeReg);
}
}
/**
* Verify version.
*
* @param version
* @throws IOException
*/
public void verifyVersion(int version) throws IOException {
if (version != LAYOUT_VERSION)
throw new IncorrectVersionException(version, "data node");
}
/**
* Returns the name of the fsImage file
*/
public File getFsImageName() throws IOException {
return getFSImage().getStorage().getFsImageName();
}
public FSImage getFSImage() {
return namesystem.dir.fsImage;
}
/**
* Returns the name of the fsImage file uploaded by periodic
* checkpointing
*/
public File[] getFsImageNameCheckpoint() throws IOException {
return getFSImage().getStorage().getFsImageNameCheckpoint();
}
/**
* Returns the address on which the NameNodes is listening to.
* @return namenode rpc address
*/
public InetSocketAddress getNameNodeAddress() {
return rpcAddress;
}
/**
* Returns namenode service rpc address, if set. Otherwise returns
* namenode rpc address.
* @return namenode service rpc address used by datanodes
*/
public InetSocketAddress getServiceRpcAddress() {
return serviceRPCAddress != null ? serviceRPCAddress : rpcAddress;
}
/**
* Returns the address of the NameNodes http server,
* which is used to access the name-node web UI.
*
* @return the http address.
*/
public InetSocketAddress getHttpAddress() {
return httpAddress;
}
NetworkTopology getNetworkTopology() {
return this.namesystem.clusterMap;
}
/**
* Verify that configured directories exist, then
* Interactively confirm that formatting is desired
* for each existing directory and format them.
*
* @param conf
* @param isConfirmationNeeded
* @return true if formatting was aborted, false otherwise
* @throws IOException
*/
private static boolean format(Configuration conf,
boolean isConfirmationNeeded)
throws IOException {
if (!conf.getBoolean(DFSConfigKeys.DFS_NAMENODE_SUPPORT_ALLOW_FORMAT_KEY,
DFSConfigKeys.DFS_NAMENODE_SUPPORT_ALLOW_FORMAT_DEFAULT)) {
throw new IOException("The option " + DFSConfigKeys.DFS_NAMENODE_SUPPORT_ALLOW_FORMAT_KEY
+ " is set to false for this filesystem, so it "
+ "cannot be formatted. You will need to set "
+ DFSConfigKeys.DFS_NAMENODE_SUPPORT_ALLOW_FORMAT_KEY +" parameter "
+ "to true in order to format this filesystem");
}
Collection<URI> dirsToFormat = FSNamesystem.getNamespaceDirs(conf);
Collection<URI> editDirsToFormat =
FSNamesystem.getNamespaceEditsDirs(conf);
for(Iterator<URI> it = dirsToFormat.iterator(); it.hasNext();) {
File curDir = new File(it.next().getPath());
if (!curDir.exists())
continue;
if (isConfirmationNeeded) {
if (!confirmPrompt("Re-format filesystem in " + curDir + " ?")) {
System.err.println("Format aborted in "+ curDir);
return true;
}
}
}
FSImage fsImage = new FSImage(dirsToFormat, editDirsToFormat);
FSNamesystem nsys = new FSNamesystem(fsImage, conf);
// if clusterID is not provided - see if you can find the current one
String clusterId = StartupOption.FORMAT.getClusterId();
if(clusterId == null || clusterId.equals("")) {
// try to get one from the existing storage
clusterId = fsImage.getStorage().determineClusterId();
if (clusterId == null || clusterId.equals("")) {
throw new IllegalArgumentException("Format must be provided with clusterid");
}
if(isConfirmationNeeded) {
if (!confirmPrompt("Use existing cluster id=" + clusterId + "?")) {
throw new IllegalArgumentException("Format must be provided with clusterid");
}
}
}
nsys.dir.fsImage.getStorage().format(clusterId);
return false;
}
private static boolean finalize(Configuration conf,
boolean isConfirmationNeeded
) throws IOException {
Collection<URI> dirsToFormat = FSNamesystem.getNamespaceDirs(conf);
Collection<URI> editDirsToFormat =
FSNamesystem.getNamespaceEditsDirs(conf);
FSNamesystem nsys = new FSNamesystem(new FSImage(dirsToFormat,
editDirsToFormat), conf);
System.err.print(
"\"finalize\" will remove the previous state of the files system.\n"
+ "Recent upgrade will become permanent.\n"
+ "Rollback option will not be available anymore.\n");
if (isConfirmationNeeded) {
if (!confirmPrompt("Finalize filesystem state?")) {
System.err.println("Finalize aborted.");
return true;
}
}
nsys.dir.fsImage.finalizeUpgrade();
return false;
}
@Override
public void refreshServiceAcl() throws IOException {
if (!serviceAuthEnabled) {
throw new AuthorizationException("Service Level Authorization not enabled!");
}
this.server.refreshServiceAcl(new Configuration(), new HDFSPolicyProvider());
if (this.serviceRpcServer != null) {
this.serviceRpcServer.refreshServiceAcl(new Configuration(), new HDFSPolicyProvider());
}
}
@Override
public void refreshUserToGroupsMappings() throws IOException {
LOG.info("Refreshing all user-to-groups mappings. Requested by user: " +
UserGroupInformation.getCurrentUser().getShortUserName());
Groups.getUserToGroupsMappingService().refresh();
}
@Override
public void refreshSuperUserGroupsConfiguration() {
LOG.info("Refreshing SuperUser proxy group mapping list ");
ProxyUsers.refreshSuperUserGroupsConfiguration();
}
@Override
public String[] getGroupsForUser(String user) throws IOException {
if (LOG.isDebugEnabled()) {
LOG.debug("Getting groups for user " + user);
}
return UserGroupInformation.createRemoteUser(user).getGroupNames();
}
private static void printUsage() {
System.err.println(
"Usage: java NameNode [" +
StartupOption.BACKUP.getName() + "] | [" +
StartupOption.CHECKPOINT.getName() + "] | [" +
StartupOption.FORMAT.getName() + "[" + StartupOption.CLUSTERID.getName() +
" cid ]] | [" +
StartupOption.UPGRADE.getName() + "] | [" +
StartupOption.ROLLBACK.getName() + "] | [" +
StartupOption.FINALIZE.getName() + "] | [" +
StartupOption.IMPORT.getName() + "]");
}
private static StartupOption parseArguments(String args[]) {
int argsLen = (args == null) ? 0 : args.length;
StartupOption startOpt = StartupOption.REGULAR;
for(int i=0; i < argsLen; i++) {
String cmd = args[i];
if (StartupOption.FORMAT.getName().equalsIgnoreCase(cmd)) {
startOpt = StartupOption.FORMAT;
// might be followed by two args
if (i + 2 < argsLen
&& args[i + 1].equalsIgnoreCase(StartupOption.CLUSTERID.getName())) {
i += 2;
startOpt.setClusterId(args[i]);
}
} else if (StartupOption.GENCLUSTERID.getName().equalsIgnoreCase(cmd)) {
startOpt = StartupOption.GENCLUSTERID;
} else if (StartupOption.REGULAR.getName().equalsIgnoreCase(cmd)) {
startOpt = StartupOption.REGULAR;
} else if (StartupOption.BACKUP.getName().equalsIgnoreCase(cmd)) {
startOpt = StartupOption.BACKUP;
} else if (StartupOption.CHECKPOINT.getName().equalsIgnoreCase(cmd)) {
startOpt = StartupOption.CHECKPOINT;
} else if (StartupOption.UPGRADE.getName().equalsIgnoreCase(cmd)) {
startOpt = StartupOption.UPGRADE;
// might be followed by two args
if (i + 2 < argsLen
&& args[i + 1].equalsIgnoreCase(StartupOption.CLUSTERID.getName())) {
i += 2;
startOpt.setClusterId(args[i]);
}
} else if (StartupOption.ROLLBACK.getName().equalsIgnoreCase(cmd)) {
startOpt = StartupOption.ROLLBACK;
} else if (StartupOption.FINALIZE.getName().equalsIgnoreCase(cmd)) {
startOpt = StartupOption.FINALIZE;
} else if (StartupOption.IMPORT.getName().equalsIgnoreCase(cmd)) {
startOpt = StartupOption.IMPORT;
} else
return null;
}
return startOpt;
}
private static void setStartupOption(Configuration conf, StartupOption opt) {
conf.set("dfs.namenode.startup", opt.toString());
}
static StartupOption getStartupOption(Configuration conf) {
return StartupOption.valueOf(conf.get("dfs.namenode.startup",
StartupOption.REGULAR.toString()));
}
/**
* Print out a prompt to the user, and return true if the user
* responds with "Y" or "yes".
*/
static boolean confirmPrompt(String prompt) throws IOException {
while (true) {
System.err.print(prompt + " (Y or N) ");
StringBuilder responseBuilder = new StringBuilder();
while (true) {
int c = System.in.read();
if (c == -1 || c == '\r' || c == '\n') {
break;
}
responseBuilder.append((char)c);
}
String response = responseBuilder.toString();
if (response.equalsIgnoreCase("y") ||
response.equalsIgnoreCase("yes")) {
return true;
} else if (response.equalsIgnoreCase("n") ||
response.equalsIgnoreCase("no")) {
return false;
}
// else ask them again
}
}
public static NameNode createNameNode(String argv[], Configuration conf)
throws IOException {
if (conf == null)
conf = new HdfsConfiguration();
StartupOption startOpt = parseArguments(argv);
if (startOpt == null) {
printUsage();
return null;
}
setStartupOption(conf, startOpt);
switch (startOpt) {
case FORMAT:
boolean aborted = format(conf, true);
System.exit(aborted ? 1 : 0);
return null; // avoid javac warning
case GENCLUSTERID:
System.err.println("Generating new cluster id:");
System.out.println(NNStorage.newClusterID());
System.exit(0);
return null;
case FINALIZE:
aborted = finalize(conf, true);
System.exit(aborted ? 1 : 0);
return null; // avoid javac warning
case BACKUP:
case CHECKPOINT:
NamenodeRole role = startOpt.toNodeRole();
DefaultMetricsSystem.initialize(role.toString().replace(" ", ""));
return new BackupNode(conf, role);
default:
DefaultMetricsSystem.initialize("NameNode");
return new NameNode(conf);
}
}
/**
* In federation configuration is set for a set of
* namenode and secondary namenode/backup/checkpointer, which are
* grouped under a logical nameservice ID. The configuration keys specific
* to them have suffix set to configured nameserviceId.
*
* This method copies the value from specific key of format key.nameserviceId
* to key, to set up the generic configuration. Once this is done, only
* generic version of the configuration is read in rest of the code, for
* backward compatibility and simpler code changes.
*
* @param conf
* Configuration object to lookup specific key and to set the value
* to the key passed. Note the conf object is modified
* @see DFSUtil#setGenericConf(Configuration, String, String...)
*/
public static void initializeGenericKeys(Configuration conf) {
final String nameserviceId = DFSUtil.getNameServiceId(conf);
if ((nameserviceId == null) || nameserviceId.isEmpty()) {
return;
}
DFSUtil.setGenericConf(conf, nameserviceId, NAMESERVICE_SPECIFIC_KEYS);
if (conf.get(DFSConfigKeys.DFS_NAMENODE_RPC_ADDRESS_KEY) != null) {
URI defaultUri = URI.create(FSConstants.HDFS_URI_SCHEME + "://"
+ conf.get(DFSConfigKeys.DFS_NAMENODE_RPC_ADDRESS_KEY));
conf.set(DFSConfigKeys.FS_DEFAULT_NAME_KEY, defaultUri.toString());
}
}
/**
*/
public static void main(String argv[]) throws Exception {
try {
StringUtils.startupShutdownMessage(NameNode.class, argv, LOG);
NameNode namenode = createNameNode(argv, null);
if (namenode != null)
namenode.join();
} catch (Throwable e) {
LOG.error(StringUtils.stringifyException(e));
System.exit(-1);
}
}
}