blob: 9e397b9114087a47b21ba388505b2eaf291187df [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hdfs.server.namenode;
import org.apache.commons.lang3.ArrayUtils;
import org.apache.commons.lang3.tuple.Pair;
import org.apache.hadoop.classification.VisibleForTesting;
import org.apache.hadoop.ipc.CallerContext;
import org.apache.hadoop.thirdparty.com.google.common.base.Joiner;
import org.apache.hadoop.util.Preconditions;
import java.util.Set;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.HadoopIllegalArgumentException;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.ReconfigurableBase;
import org.apache.hadoop.conf.ReconfigurationException;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Trash;
import org.apache.hadoop.ha.HAServiceProtocol.HAServiceState;
import org.apache.hadoop.ha.HAServiceProtocol.StateChangeRequestInfo;
import org.apache.hadoop.ha.HAServiceStatus;
import org.apache.hadoop.ha.HealthCheckFailedException;
import org.apache.hadoop.ha.ServiceFailedException;
import org.apache.hadoop.hdfs.DFSConfigKeys;
import org.apache.hadoop.hdfs.DFSUtil;
import org.apache.hadoop.hdfs.DFSUtilClient;
import org.apache.hadoop.hdfs.DistributedFileSystem;
import org.apache.hadoop.hdfs.HAUtil;
import org.apache.hadoop.hdfs.HdfsConfiguration;
import org.apache.hadoop.hdfs.client.HdfsClientConfigKeys;
import org.apache.hadoop.hdfs.protocol.ClientProtocol;
import org.apache.hadoop.hdfs.protocol.HdfsConstants;
import org.apache.hadoop.hdfs.protocol.HdfsConstants.StoragePolicySatisfierMode;
import org.apache.hadoop.hdfs.security.token.delegation.DelegationTokenIdentifier;
import org.apache.hadoop.hdfs.server.aliasmap.InMemoryAliasMap;
import org.apache.hadoop.hdfs.server.aliasmap.InMemoryLevelDBAliasMapServer;
import org.apache.hadoop.hdfs.server.blockmanagement.BlockManager;
import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeManager;
import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.NamenodeRole;
import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.RollingUpgradeStartupOption;
import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.StartupOption;
import org.apache.hadoop.hdfs.server.common.MetricsLoggerTask;
import org.apache.hadoop.hdfs.server.common.Storage.StorageDirectory;
import org.apache.hadoop.hdfs.server.common.TokenVerifier;
import org.apache.hadoop.hdfs.server.namenode.ha.ActiveState;
import org.apache.hadoop.hdfs.server.namenode.ha.BootstrapStandby;
import org.apache.hadoop.hdfs.server.namenode.ha.HAContext;
import org.apache.hadoop.hdfs.server.namenode.ha.HAState;
import org.apache.hadoop.hdfs.server.namenode.ha.StandbyState;
import org.apache.hadoop.hdfs.server.namenode.metrics.NameNodeMetrics;
import org.apache.hadoop.hdfs.server.namenode.startupprogress.StartupProgress;
import org.apache.hadoop.hdfs.server.namenode.startupprogress.StartupProgressMetrics;
import org.apache.hadoop.hdfs.server.protocol.DatanodeProtocol;
import org.apache.hadoop.hdfs.server.protocol.JournalProtocol;
import org.apache.hadoop.hdfs.server.protocol.NamenodeProtocol;
import org.apache.hadoop.hdfs.server.protocol.NamenodeProtocols;
import org.apache.hadoop.hdfs.server.protocol.NamenodeRegistration;
import org.apache.hadoop.hdfs.server.protocol.NamespaceInfo;
import org.apache.hadoop.http.HttpServer2;
import org.apache.hadoop.ipc.ExternalCall;
import org.apache.hadoop.ipc.RefreshCallQueueProtocol;
import org.apache.hadoop.ipc.RetriableException;
import org.apache.hadoop.ipc.Server;
import org.apache.hadoop.ipc.StandbyException;
import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem;
import org.apache.hadoop.metrics2.util.MBeans;
import org.apache.hadoop.net.NetUtils;
import org.apache.hadoop.security.AccessControlException;
import org.apache.hadoop.security.RefreshUserMappingsProtocol;
import org.apache.hadoop.security.SecurityUtil;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.security.authorize.RefreshAuthorizationPolicyProtocol;
import org.apache.hadoop.tools.GetUserMappingsProtocol;
import org.apache.hadoop.tracing.TraceUtils;
import org.apache.hadoop.util.ExitUtil.ExitException;
import org.apache.hadoop.util.GenericOptionsParser;
import org.apache.hadoop.util.JvmPauseMonitor;
import org.apache.hadoop.util.Lists;
import org.apache.hadoop.util.ServicePlugin;
import org.apache.hadoop.util.Sets;
import org.apache.hadoop.util.StringUtils;
import org.apache.hadoop.util.Time;
import org.apache.hadoop.util.GcTimeMonitor;
import org.apache.hadoop.util.GcTimeMonitor.Builder;
import org.apache.hadoop.tracing.Tracer;
import org.apache.hadoop.util.Timer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import javax.management.ObjectName;
import java.io.IOException;
import java.io.PrintStream;
import java.net.InetSocketAddress;
import java.net.URI;
import java.security.PrivilegedExceptionAction;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.List;
import java.util.Objects;
import java.util.TreeSet;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import static org.apache.hadoop.fs.CommonConfigurationKeysPublic.FS_DEFAULT_NAME_KEY;
import static org.apache.hadoop.fs.CommonConfigurationKeysPublic.FS_TRASH_INTERVAL_DEFAULT;
import static org.apache.hadoop.fs.CommonConfigurationKeysPublic.FS_TRASH_INTERVAL_KEY;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_BLOCK_INVALIDATE_LIMIT_KEY;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_MAX_NODES_TO_REPORT_DEFAULT;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_MAX_NODES_TO_REPORT_KEY;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_PEER_STATS_ENABLED_DEFAULT;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_PEER_STATS_ENABLED_KEY;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_HA_NN_NOT_BECOME_ACTIVE_IN_SAFEMODE;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_HA_NN_NOT_BECOME_ACTIVE_IN_SAFEMODE_DEFAULT;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_IMAGE_PARALLEL_LOAD_DEFAULT;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_IMAGE_PARALLEL_LOAD_KEY;
import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_NAMENODE_RPC_PORT_DEFAULT;
import static org.apache.hadoop.fs.CommonConfigurationKeysPublic.HADOOP_CALLER_CONTEXT_ENABLED_KEY;
import static org.apache.hadoop.fs.CommonConfigurationKeysPublic.HADOOP_CALLER_CONTEXT_ENABLED_DEFAULT;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_HA_AUTO_FAILOVER_ENABLED_DEFAULT;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_HA_AUTO_FAILOVER_ENABLED_KEY;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_HA_FENCE_METHODS_KEY;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_HA_NAMENODE_ID_KEY;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_HA_ZKFC_PORT_KEY;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_METRICS_PERCENTILES_INTERVALS_KEY;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_BACKUP_ADDRESS_KEY;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_BACKUP_HTTP_ADDRESS_KEY;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_BACKUP_SERVICE_RPC_ADDRESS_KEY;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_CHECKPOINT_DIR_KEY;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_CHECKPOINT_EDITS_DIR_KEY;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_EDITS_DIR_KEY;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_HTTPS_ADDRESS_KEY;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_HTTPS_BIND_HOST_KEY;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_HTTP_ADDRESS_DEFAULT;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_HTTP_ADDRESS_KEY;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_HTTP_BIND_HOST_KEY;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_KERBEROS_INTERNAL_SPNEGO_PRINCIPAL_KEY;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_KERBEROS_PRINCIPAL_KEY;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_KEYTAB_FILE_KEY;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_LIFELINE_RPC_ADDRESS_KEY;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_LIFELINE_RPC_BIND_HOST_KEY;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_METRICS_LOGGER_PERIOD_SECONDS_DEFAULT;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_METRICS_LOGGER_PERIOD_SECONDS_KEY;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_NAME_DIR_KEY;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_PLUGINS_KEY;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_RPC_ADDRESS_KEY;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_RPC_BIND_HOST_KEY;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_SECONDARY_HTTPS_ADDRESS_KEY;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_SECONDARY_HTTP_ADDRESS_KEY;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_SERVICE_RPC_ADDRESS_KEY;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_SERVICE_RPC_BIND_HOST_KEY;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_SHARED_EDITS_DIR_KEY;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_STARTUP_KEY;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_SUPPORT_ALLOW_FORMAT_DEFAULT;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_SUPPORT_ALLOW_FORMAT_KEY;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMESERVICE_ID;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_SECONDARY_NAMENODE_KEYTAB_FILE_KEY;
import static org.apache.hadoop.hdfs.DFSConfigKeys.HADOOP_USER_GROUP_METRICS_PERCENTILES_INTERVALS;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_HEARTBEAT_INTERVAL_KEY;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_HEARTBEAT_INTERVAL_DEFAULT;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_HEARTBEAT_RECHECK_INTERVAL_KEY;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_HEARTBEAT_RECHECK_INTERVAL_DEFAULT;
import static org.apache.hadoop.hdfs.DFSConfigKeys.FS_PROTECTED_DIRECTORIES;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_STORAGE_POLICY_SATISFIER_MODE_KEY;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_REPLICATION_MAX_STREAMS_KEY;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_REPLICATION_MAX_STREAMS_DEFAULT;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_REPLICATION_STREAMS_HARD_LIMIT_KEY;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_REPLICATION_STREAMS_HARD_LIMIT_DEFAULT;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_REPLICATION_WORK_MULTIPLIER_PER_ITERATION;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_REPLICATION_WORK_MULTIPLIER_PER_ITERATION_DEFAULT;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_GC_TIME_MONITOR_SLEEP_INTERVAL_MS;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_GC_TIME_MONITOR_SLEEP_INTERVAL_MS_DEFAULT;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_GC_TIME_MONITOR_OBSERVATION_WINDOW_MS;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_GC_TIME_MONITOR_OBSERVATION_WINDOW_MS_DEFAULT;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_GC_TIME_MONITOR_ENABLE;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_GC_TIME_MONITOR_ENABLE_DEFAULT;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_BLOCK_REPLICATOR_CLASSNAME_KEY;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_BLOCK_PLACEMENT_EC_CLASSNAME_KEY;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_AVOID_SLOW_DATANODE_FOR_READ_KEY;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_AVOID_SLOW_DATANODE_FOR_READ_DEFAULT;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_BLOCKPLACEMENTPOLICY_EXCLUDE_SLOW_NODES_ENABLED_KEY;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_BLOCKPLACEMENTPOLICY_EXCLUDE_SLOW_NODES_ENABLED_DEFAULT;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_MAX_SLOWPEER_COLLECT_NODES_KEY;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_MAX_SLOWPEER_COLLECT_NODES_DEFAULT;
import static org.apache.hadoop.util.ExitUtil.terminate;
import static org.apache.hadoop.util.ToolRunner.confirmPrompt;
import static org.apache.hadoop.fs.CommonConfigurationKeys.IPC_BACKOFF_ENABLE;
import static org.apache.hadoop.fs.CommonConfigurationKeys.IPC_NAMESPACE;
import static org.apache.hadoop.fs.CommonConfigurationKeys.IPC_BACKOFF_ENABLE_DEFAULT;
/**********************************************************
* NameNode serves as both directory namespace manager and
* "inode table" for the Hadoop DFS. There is a single NameNode
* running in any DFS deployment. (Well, except when there
* is a second backup/failover NameNode, or when using federated NameNodes.)
*
* The NameNode controls two critical tables:
* 1) filename{@literal ->}blocksequence (namespace)
* 2) block{@literal ->}machinelist ("inodes")
*
* The first table is stored on disk and is very precious.
* The second table is rebuilt every time the NameNode comes up.
*
* 'NameNode' refers to both this class as well as the 'NameNode server'.
* The 'FSNamesystem' class actually performs most of the filesystem
* management. The majority of the 'NameNode' class itself is concerned
* with exposing the IPC interface and the HTTP server to the outside world,
* plus some configuration management.
*
* NameNode implements the
* {@link org.apache.hadoop.hdfs.protocol.ClientProtocol} interface, which
* allows clients to ask for DFS services.
* {@link org.apache.hadoop.hdfs.protocol.ClientProtocol} is not designed for
* direct use by authors of DFS client code. End-users should instead use the
* {@link org.apache.hadoop.fs.FileSystem} class.
*
* NameNode also implements the
* {@link org.apache.hadoop.hdfs.server.protocol.DatanodeProtocol} interface,
* used by DataNodes that actually store DFS data blocks. These
* methods are invoked repeatedly and automatically by all the
* DataNodes in a DFS deployment.
*
* NameNode also implements the
* {@link org.apache.hadoop.hdfs.server.protocol.NamenodeProtocol} interface,
* used by secondary namenodes or rebalancing processes to get partial
* NameNode state, for example partial blocksMap etc.
**********************************************************/
@InterfaceAudience.Private
public class NameNode extends ReconfigurableBase implements
NameNodeStatusMXBean, TokenVerifier<DelegationTokenIdentifier> {
static{
HdfsConfiguration.init();
}
private InMemoryLevelDBAliasMapServer levelDBAliasMapServer;
/**
* Categories of operations supported by the namenode.
*/
public enum OperationCategory {
/** Operations that are state agnostic */
UNCHECKED,
/** Read operation that does not change the namespace state */
READ,
/** Write operation that changes the namespace state */
WRITE,
/** Operations related to checkpointing */
CHECKPOINT,
/** Operations related to {@link JournalProtocol} */
JOURNAL
}
/**
* HDFS configuration can have three types of parameters:
* <ol>
* <li>Parameters that are common for all the name services in the cluster.</li>
* <li>Parameters that are specific to a name service. These keys are suffixed
* with nameserviceId in the configuration. For example,
* "dfs.namenode.rpc-address.nameservice1".</li>
* <li>Parameters that are specific to a single name node. These keys are suffixed
* with nameserviceId and namenodeId in the configuration. for example,
* "dfs.namenode.rpc-address.nameservice1.namenode1"</li>
* </ol>
*
* In the latter cases, operators may specify the configuration without
* any suffix, with a nameservice suffix, or with a nameservice and namenode
* suffix. The more specific suffix will take precedence.
*
* These keys are specific to a given namenode, and thus may be configured
* globally, for a nameservice, or for a specific namenode within a nameservice.
*/
public static final String[] NAMENODE_SPECIFIC_KEYS = {
DFS_NAMENODE_RPC_ADDRESS_KEY,
DFS_NAMENODE_RPC_BIND_HOST_KEY,
DFS_NAMENODE_NAME_DIR_KEY,
DFS_NAMENODE_EDITS_DIR_KEY,
DFS_NAMENODE_SHARED_EDITS_DIR_KEY,
DFS_NAMENODE_CHECKPOINT_DIR_KEY,
DFS_NAMENODE_CHECKPOINT_EDITS_DIR_KEY,
DFS_NAMENODE_LIFELINE_RPC_ADDRESS_KEY,
DFS_NAMENODE_LIFELINE_RPC_BIND_HOST_KEY,
DFS_NAMENODE_SERVICE_RPC_ADDRESS_KEY,
DFS_NAMENODE_SERVICE_RPC_BIND_HOST_KEY,
DFS_NAMENODE_HTTP_ADDRESS_KEY,
DFS_NAMENODE_HTTPS_ADDRESS_KEY,
DFS_NAMENODE_HTTP_BIND_HOST_KEY,
DFS_NAMENODE_HTTPS_BIND_HOST_KEY,
DFS_NAMENODE_KEYTAB_FILE_KEY,
DFS_NAMENODE_SECONDARY_HTTP_ADDRESS_KEY,
DFS_NAMENODE_SECONDARY_HTTPS_ADDRESS_KEY,
DFS_SECONDARY_NAMENODE_KEYTAB_FILE_KEY,
DFS_NAMENODE_BACKUP_ADDRESS_KEY,
DFS_NAMENODE_BACKUP_HTTP_ADDRESS_KEY,
DFS_NAMENODE_BACKUP_SERVICE_RPC_ADDRESS_KEY,
DFS_NAMENODE_KERBEROS_PRINCIPAL_KEY,
DFS_NAMENODE_KERBEROS_INTERNAL_SPNEGO_PRINCIPAL_KEY,
DFS_HA_FENCE_METHODS_KEY,
DFS_HA_ZKFC_PORT_KEY
};
/**
* @see #NAMENODE_SPECIFIC_KEYS
* These keys are specific to a nameservice, but may not be overridden
* for a specific namenode.
*/
public static final String[] NAMESERVICE_SPECIFIC_KEYS = {
DFS_HA_AUTO_FAILOVER_ENABLED_KEY
};
private String ipcClientRPCBackoffEnable;
/** A list of property that are reconfigurable at runtime. */
private final TreeSet<String> reconfigurableProperties = Sets
.newTreeSet(Lists.newArrayList(
DFS_HEARTBEAT_INTERVAL_KEY,
DFS_NAMENODE_HEARTBEAT_RECHECK_INTERVAL_KEY,
FS_PROTECTED_DIRECTORIES,
HADOOP_CALLER_CONTEXT_ENABLED_KEY,
DFS_STORAGE_POLICY_SATISFIER_MODE_KEY,
DFS_NAMENODE_REPLICATION_MAX_STREAMS_KEY,
DFS_NAMENODE_REPLICATION_STREAMS_HARD_LIMIT_KEY,
DFS_NAMENODE_REPLICATION_WORK_MULTIPLIER_PER_ITERATION,
DFS_BLOCK_REPLICATOR_CLASSNAME_KEY,
DFS_BLOCK_PLACEMENT_EC_CLASSNAME_KEY,
DFS_IMAGE_PARALLEL_LOAD_KEY,
DFS_NAMENODE_AVOID_SLOW_DATANODE_FOR_READ_KEY,
DFS_NAMENODE_BLOCKPLACEMENTPOLICY_EXCLUDE_SLOW_NODES_ENABLED_KEY,
DFS_NAMENODE_MAX_SLOWPEER_COLLECT_NODES_KEY,
DFS_BLOCK_INVALIDATE_LIMIT_KEY,
DFS_DATANODE_PEER_STATS_ENABLED_KEY,
DFS_DATANODE_MAX_NODES_TO_REPORT_KEY));
private static final String USAGE = "Usage: hdfs namenode ["
+ StartupOption.BACKUP.getName() + "] | \n\t["
+ StartupOption.CHECKPOINT.getName() + "] | \n\t["
+ StartupOption.FORMAT.getName() + " ["
+ StartupOption.CLUSTERID.getName() + " cid ] ["
+ StartupOption.FORCE.getName() + "] ["
+ StartupOption.NONINTERACTIVE.getName() + "] ] | \n\t["
+ StartupOption.UPGRADE.getName() +
" [" + StartupOption.CLUSTERID.getName() + " cid]" +
" [" + StartupOption.RENAMERESERVED.getName() + "<k-v pairs>] ] | \n\t["
+ StartupOption.UPGRADEONLY.getName() +
" [" + StartupOption.CLUSTERID.getName() + " cid]" +
" [" + StartupOption.RENAMERESERVED.getName() + "<k-v pairs>] ] | \n\t["
+ StartupOption.ROLLBACK.getName() + "] | \n\t["
+ StartupOption.ROLLINGUPGRADE.getName() + " "
+ RollingUpgradeStartupOption.getAllOptionString() + " ] | \n\t["
+ StartupOption.IMPORT.getName() + "] | \n\t["
+ StartupOption.INITIALIZESHAREDEDITS.getName() + "] | \n\t["
+ StartupOption.BOOTSTRAPSTANDBY.getName() + " ["
+ StartupOption.FORCE.getName() + "] ["
+ StartupOption.NONINTERACTIVE.getName() + "] ["
+ StartupOption.SKIPSHAREDEDITSCHECK.getName() + "] ] | \n\t["
+ StartupOption.RECOVER.getName() + " [ "
+ StartupOption.FORCE.getName() + "] ] | \n\t["
+ StartupOption.METADATAVERSION.getName() + " ]";
public long getProtocolVersion(String protocol,
long clientVersion) throws IOException {
if (protocol.equals(ClientProtocol.class.getName())) {
return ClientProtocol.versionID;
} else if (protocol.equals(DatanodeProtocol.class.getName())){
return DatanodeProtocol.versionID;
} else if (protocol.equals(NamenodeProtocol.class.getName())){
return NamenodeProtocol.versionID;
} else if (protocol.equals(RefreshAuthorizationPolicyProtocol.class.getName())){
return RefreshAuthorizationPolicyProtocol.versionID;
} else if (protocol.equals(RefreshUserMappingsProtocol.class.getName())){
return RefreshUserMappingsProtocol.versionID;
} else if (protocol.equals(RefreshCallQueueProtocol.class.getName())) {
return RefreshCallQueueProtocol.versionID;
} else if (protocol.equals(GetUserMappingsProtocol.class.getName())){
return GetUserMappingsProtocol.versionID;
} else {
throw new IOException("Unknown protocol to name node: " + protocol);
}
}
/**
* @deprecated Use {@link HdfsClientConfigKeys#DFS_NAMENODE_RPC_PORT_DEFAULT}
* instead.
*/
@Deprecated
public static final int DEFAULT_PORT = DFS_NAMENODE_RPC_PORT_DEFAULT;
public static final Logger LOG =
LoggerFactory.getLogger(NameNode.class.getName());
public static final Logger stateChangeLog =
LoggerFactory.getLogger("org.apache.hadoop.hdfs.StateChange");
public static final Logger blockStateChangeLog =
LoggerFactory.getLogger("BlockStateChange");
public static final HAState ACTIVE_STATE = new ActiveState();
public static final HAState STANDBY_STATE = new StandbyState();
public static final HAState OBSERVER_STATE = new StandbyState(true);
private static final String NAMENODE_HTRACE_PREFIX = "namenode.htrace.";
public static final Log MetricsLog =
LogFactory.getLog("NameNodeMetricsLog");
protected FSNamesystem namesystem;
protected final NamenodeRole role;
private volatile HAState state;
private final boolean haEnabled;
private final HAContext haContext;
protected final boolean allowStaleStandbyReads;
private AtomicBoolean started = new AtomicBoolean(false);
private final boolean notBecomeActiveInSafemode;
private final static int HEALTH_MONITOR_WARN_THRESHOLD_MS = 5000;
/** httpServer */
protected NameNodeHttpServer httpServer;
private Thread emptier;
/** only used for testing purposes */
protected boolean stopRequested = false;
/** Registration information of this name-node */
protected NamenodeRegistration nodeRegistration;
/** Activated plug-ins. */
private List<ServicePlugin> plugins;
private NameNodeRpcServer rpcServer;
private JvmPauseMonitor pauseMonitor;
private GcTimeMonitor gcTimeMonitor;
private ObjectName nameNodeStatusBeanName;
protected final Tracer tracer;
ScheduledThreadPoolExecutor metricsLoggerTimer;
/**
* The namenode address that clients will use to access this namenode
* or the name service. For HA configurations using logical URI, it
* will be the logical address.
*/
private String clientNamenodeAddress;
/** Format a new filesystem. Destroys any filesystem that may already
* exist at this location. **/
public static void format(Configuration conf) throws IOException {
format(conf, true, true);
}
static NameNodeMetrics metrics;
private static final StartupProgress startupProgress = new StartupProgress();
/** Return the {@link FSNamesystem} object.
* @return {@link FSNamesystem} object.
*/
public FSNamesystem getNamesystem() {
return namesystem;
}
public NamenodeProtocols getRpcServer() {
return rpcServer;
}
@VisibleForTesting
public HttpServer2 getHttpServer() {
return httpServer.getHttpServer();
}
public void queueExternalCall(ExternalCall<?> extCall)
throws IOException, InterruptedException {
if (rpcServer == null) {
throw new RetriableException("Namenode is in startup mode");
}
rpcServer.getClientRpcServer().queueCall(extCall);
}
public static void initMetrics(Configuration conf, NamenodeRole role) {
metrics = NameNodeMetrics.create(conf, role);
}
public static NameNodeMetrics getNameNodeMetrics() {
return metrics;
}
/**
* Try to obtain the actual client info according to the current user.
* @param ipProxyUsers Users who can override client infos
*/
private static String clientInfoFromContext(
final String[] ipProxyUsers) {
if (ipProxyUsers != null) {
UserGroupInformation user =
UserGroupInformation.getRealUserOrSelf(Server.getRemoteUser());
if (user != null &&
ArrayUtils.contains(ipProxyUsers, user.getShortUserName())) {
CallerContext context = CallerContext.getCurrent();
if (context != null && context.isContextValid()) {
return context.getContext();
}
}
}
return null;
}
/**
* Try to obtain the value corresponding to the key by parsing the content.
* @param content the full content to be parsed.
* @param key trying to obtain the value of the key.
* @return the value corresponding to the key.
*/
@VisibleForTesting
public static String parseSpecialValue(String content, String key) {
int posn = content.indexOf(key);
if (posn != -1) {
posn += key.length();
int end = content.indexOf(",", posn);
return end == -1 ? content.substring(posn)
: content.substring(posn, end);
}
return null;
}
/**
* Try to obtain the actual client's machine according to the current user.
* @param ipProxyUsers Users who can override client infos.
* @return The actual client's machine.
*/
public static String getClientMachine(final String[] ipProxyUsers) {
String clientMachine = null;
String cc = clientInfoFromContext(ipProxyUsers);
if (cc != null) {
// if the rpc has a caller context of "clientIp:1.2.3.4,CLI",
// return "1.2.3.4" as the client machine.
String key = CallerContext.CLIENT_IP_STR +
CallerContext.Builder.KEY_VALUE_SEPARATOR;
clientMachine = parseSpecialValue(cc, key);
}
if (clientMachine == null) {
clientMachine = Server.getRemoteAddress();
if (clientMachine == null) { //not a RPC client
clientMachine = "";
}
}
return clientMachine;
}
/**
* Try to obtain the actual client's id and call id
* according to the current user.
* @param ipProxyUsers Users who can override client infos
* @return The actual client's id and call id.
*/
public static Pair<byte[], Integer> getClientIdAndCallId(
final String[] ipProxyUsers) {
byte[] clientId = Server.getClientId();
int callId = Server.getCallId();
String cc = clientInfoFromContext(ipProxyUsers);
if (cc != null) {
String clientIdKey = CallerContext.CLIENT_ID_STR +
CallerContext.Builder.KEY_VALUE_SEPARATOR;
String clientIdStr = parseSpecialValue(cc, clientIdKey);
if (clientIdStr != null) {
clientId = StringUtils.hexStringToByte(clientIdStr);
}
String callIdKey = CallerContext.CLIENT_CALL_ID_STR +
CallerContext.Builder.KEY_VALUE_SEPARATOR;
String callIdStr = parseSpecialValue(cc, callIdKey);
if (callIdStr != null) {
callId = Integer.parseInt(callIdStr);
}
}
return Pair.of(clientId, callId);
}
/**
* Returns object used for reporting namenode startup progress.
*
* @return StartupProgress for reporting namenode startup progress
*/
public static StartupProgress getStartupProgress() {
return startupProgress;
}
/**
* Return the service name of the issued delegation token.
*
* @return The name service id in HA-mode, or the rpc address in non-HA mode
*/
public String getTokenServiceName() {
return getClientNamenodeAddress();
}
/**
* Get the namenode address to be used by clients.
* @return nn address
*/
public String getClientNamenodeAddress() {
return clientNamenodeAddress;
}
/**
* Set the configuration property for the service rpc address
* to address
*/
public static void setServiceAddress(Configuration conf,
String address) {
LOG.info("Setting ADDRESS {}", address);
conf.set(DFS_NAMENODE_SERVICE_RPC_ADDRESS_KEY, address);
}
/**
* Fetches the address for services to use when connecting to namenode
* based on the value of fallback returns null if the special
* address is not specified or returns the default namenode address
* to be used by both clients and services.
* Services here are datanodes, backup node, any non client connection
*/
public static InetSocketAddress getServiceAddress(Configuration conf,
boolean fallback) {
String addr = conf.getTrimmed(DFS_NAMENODE_SERVICE_RPC_ADDRESS_KEY);
if (addr == null || addr.isEmpty()) {
return fallback ? DFSUtilClient.getNNAddress(conf) : null;
}
return DFSUtilClient.getNNAddress(addr);
}
//
// Common NameNode methods implementation for the active name-node role.
//
public NamenodeRole getRole() {
return role;
}
boolean isRole(NamenodeRole that) {
return role.equals(that);
}
public static String composeNotStartedMessage(NamenodeRole role) {
return role + " still not started";
}
/**
* Given a configuration get the address of the lifeline RPC server.
* If the lifeline RPC is not configured returns null.
*
* @param conf configuration
* @return address or null
*/
InetSocketAddress getLifelineRpcServerAddress(Configuration conf) {
String addr = getTrimmedOrNull(conf, DFS_NAMENODE_LIFELINE_RPC_ADDRESS_KEY);
if (addr == null) {
return null;
}
return NetUtils.createSocketAddr(addr);
}
/**
* Given a configuration get the address of the service rpc server
* If the service rpc is not configured returns null
*/
protected InetSocketAddress getServiceRpcServerAddress(Configuration conf) {
return NameNode.getServiceAddress(conf, false);
}
protected InetSocketAddress getRpcServerAddress(Configuration conf) {
return DFSUtilClient.getNNAddress(conf);
}
/**
* Given a configuration get the bind host of the lifeline RPC server.
* If the bind host is not configured returns null.
*
* @param conf configuration
* @return bind host or null
*/
String getLifelineRpcServerBindHost(Configuration conf) {
return getTrimmedOrNull(conf, DFS_NAMENODE_LIFELINE_RPC_BIND_HOST_KEY);
}
/** Given a configuration get the bind host of the service rpc server
* If the bind host is not configured returns null.
*/
protected String getServiceRpcServerBindHost(Configuration conf) {
return getTrimmedOrNull(conf, DFS_NAMENODE_SERVICE_RPC_BIND_HOST_KEY);
}
/** Given a configuration get the bind host of the client rpc server
* If the bind host is not configured returns null.
*/
protected String getRpcServerBindHost(Configuration conf) {
return getTrimmedOrNull(conf, DFS_NAMENODE_RPC_BIND_HOST_KEY);
}
/**
* Gets a trimmed value from configuration, or null if no value is defined.
*
* @param conf configuration
* @param key configuration key to get
* @return trimmed value, or null if no value is defined
*/
private static String getTrimmedOrNull(Configuration conf, String key) {
String addr = conf.getTrimmed(key);
if (addr == null || addr.isEmpty()) {
return null;
}
return addr;
}
/**
* Modifies the configuration to contain the lifeline RPC address setting.
*
* @param conf configuration to modify
* @param lifelineRPCAddress lifeline RPC address
*/
void setRpcLifelineServerAddress(Configuration conf,
InetSocketAddress lifelineRPCAddress) {
LOG.info("Setting lifeline RPC address {}", lifelineRPCAddress);
conf.set(DFS_NAMENODE_LIFELINE_RPC_ADDRESS_KEY,
NetUtils.getHostPortString(lifelineRPCAddress));
}
/**
* Modifies the configuration passed to contain the service rpc address setting
*/
protected void setRpcServiceServerAddress(Configuration conf,
InetSocketAddress serviceRPCAddress) {
setServiceAddress(conf, NetUtils.getHostPortString(serviceRPCAddress));
}
protected void setRpcServerAddress(Configuration conf,
InetSocketAddress rpcAddress) {
FileSystem.setDefaultUri(conf, DFSUtilClient.getNNUri(rpcAddress));
}
protected InetSocketAddress getHttpServerAddress(Configuration conf) {
return getHttpAddress(conf);
}
/**
* HTTP server address for binding the endpoint. This method is
* for use by the NameNode and its derivatives. It may return
* a different address than the one that should be used by clients to
* connect to the NameNode. See
* {@link DFSConfigKeys#DFS_NAMENODE_HTTP_BIND_HOST_KEY}
*
* @param conf
* @return
*/
protected InetSocketAddress getHttpServerBindAddress(Configuration conf) {
InetSocketAddress bindAddress = getHttpServerAddress(conf);
// If DFS_NAMENODE_HTTP_BIND_HOST_KEY exists then it overrides the
// host name portion of DFS_NAMENODE_HTTP_ADDRESS_KEY.
final String bindHost = conf.getTrimmed(DFS_NAMENODE_HTTP_BIND_HOST_KEY);
if (bindHost != null && !bindHost.isEmpty()) {
bindAddress = new InetSocketAddress(bindHost, bindAddress.getPort());
}
return bindAddress;
}
/** @return the NameNode HTTP address. */
public static InetSocketAddress getHttpAddress(Configuration conf) {
return NetUtils.createSocketAddr(
conf.getTrimmed(DFS_NAMENODE_HTTP_ADDRESS_KEY, DFS_NAMENODE_HTTP_ADDRESS_DEFAULT));
}
protected void loadNamesystem(Configuration conf) throws IOException {
this.namesystem = FSNamesystem.loadFromDisk(conf);
}
NamenodeRegistration getRegistration() {
return nodeRegistration;
}
NamenodeRegistration setRegistration() {
nodeRegistration = new NamenodeRegistration(
NetUtils.getHostPortString(getNameNodeAddress()),
NetUtils.getHostPortString(getHttpAddress()),
getFSImage().getStorage(), getRole());
return nodeRegistration;
}
/* optimize ugi lookup for RPC operations to avoid a trip through
* UGI.getCurrentUser which is synch'ed
*/
public static UserGroupInformation getRemoteUser() throws IOException {
UserGroupInformation ugi = Server.getRemoteUser();
return (ugi != null) ? ugi : UserGroupInformation.getCurrentUser();
}
@Override
public void verifyToken(DelegationTokenIdentifier id, byte[] password)
throws IOException {
// during startup namesystem is null, let client retry
if (namesystem == null) {
throw new RetriableException("Namenode is in startup mode");
}
namesystem.verifyToken(id, password);
}
/**
* Login as the configured user for the NameNode.
*/
void loginAsNameNodeUser(Configuration conf) throws IOException {
InetSocketAddress socAddr = getRpcServerAddress(conf);
SecurityUtil.login(conf, DFS_NAMENODE_KEYTAB_FILE_KEY,
DFS_NAMENODE_KERBEROS_PRINCIPAL_KEY, socAddr.getHostName());
}
/**
* Initialize name-node.
*
* @param conf the configuration
*/
protected void initialize(Configuration conf) throws IOException {
if (conf.get(HADOOP_USER_GROUP_METRICS_PERCENTILES_INTERVALS) == null) {
String intervals = conf.get(DFS_METRICS_PERCENTILES_INTERVALS_KEY);
if (intervals != null) {
conf.set(HADOOP_USER_GROUP_METRICS_PERCENTILES_INTERVALS,
intervals);
}
}
UserGroupInformation.setConfiguration(conf);
loginAsNameNodeUser(conf);
NameNode.initMetrics(conf, this.getRole());
StartupProgressMetrics.register(startupProgress);
pauseMonitor = new JvmPauseMonitor();
pauseMonitor.init(conf);
pauseMonitor.start();
metrics.getJvmMetrics().setPauseMonitor(pauseMonitor);
if (conf.getBoolean(DFS_NAMENODE_GC_TIME_MONITOR_ENABLE,
DFS_NAMENODE_GC_TIME_MONITOR_ENABLE_DEFAULT)) {
long observationWindow = conf.getTimeDuration(
DFS_NAMENODE_GC_TIME_MONITOR_OBSERVATION_WINDOW_MS,
DFS_NAMENODE_GC_TIME_MONITOR_OBSERVATION_WINDOW_MS_DEFAULT,
TimeUnit.MILLISECONDS);
long sleepInterval = conf.getTimeDuration(
DFS_NAMENODE_GC_TIME_MONITOR_SLEEP_INTERVAL_MS,
DFS_NAMENODE_GC_TIME_MONITOR_SLEEP_INTERVAL_MS_DEFAULT,
TimeUnit.MILLISECONDS);
gcTimeMonitor = new Builder().observationWindowMs(observationWindow)
.sleepIntervalMs(sleepInterval).build();
gcTimeMonitor.start();
metrics.getJvmMetrics().setGcTimeMonitor(gcTimeMonitor);
}
if (NamenodeRole.NAMENODE == role) {
startHttpServer(conf);
}
loadNamesystem(conf);
startAliasMapServerIfNecessary(conf);
rpcServer = createRpcServer(conf);
initReconfigurableBackoffKey();
if (clientNamenodeAddress == null) {
// This is expected for MiniDFSCluster. Set it now using
// the RPC server's bind address.
clientNamenodeAddress =
NetUtils.getHostPortString(getNameNodeAddress());
LOG.info("Clients are to use " + clientNamenodeAddress + " to access"
+ " this namenode/service.");
}
if (NamenodeRole.NAMENODE == role) {
httpServer.setNameNodeAddress(getNameNodeAddress());
httpServer.setFSImage(getFSImage());
if (levelDBAliasMapServer != null) {
httpServer.setAliasMap(levelDBAliasMapServer.getAliasMap());
}
}
startCommonServices(conf);
startMetricsLogger(conf);
}
@VisibleForTesting
public InMemoryLevelDBAliasMapServer getAliasMapServer() {
return levelDBAliasMapServer;
}
private void startAliasMapServerIfNecessary(Configuration conf)
throws IOException {
if (conf.getBoolean(DFSConfigKeys.DFS_NAMENODE_PROVIDED_ENABLED,
DFSConfigKeys.DFS_NAMENODE_PROVIDED_ENABLED_DEFAULT)
&& conf.getBoolean(DFSConfigKeys.DFS_PROVIDED_ALIASMAP_INMEMORY_ENABLED,
DFSConfigKeys.DFS_PROVIDED_ALIASMAP_INMEMORY_ENABLED_DEFAULT)) {
levelDBAliasMapServer = new InMemoryLevelDBAliasMapServer(
InMemoryAliasMap::init, namesystem.getBlockPoolId());
levelDBAliasMapServer.setConf(conf);
levelDBAliasMapServer.start();
}
}
private void initReconfigurableBackoffKey() {
ipcClientRPCBackoffEnable = buildBackoffEnableKey(rpcServer
.getClientRpcServer().getPort());
reconfigurableProperties.add(ipcClientRPCBackoffEnable);
}
static String buildBackoffEnableKey(final int port) {
// format used to construct backoff enable key, e.g. ipc.8020.backoff.enable
String format = "%s.%d.%s";
return String.format(format, IPC_NAMESPACE, port, IPC_BACKOFF_ENABLE);
}
/**
* Start a timer to periodically write NameNode metrics to the log
* file. This behavior can be disabled by configuration.
* @param conf
*/
protected void startMetricsLogger(Configuration conf) {
long metricsLoggerPeriodSec =
conf.getInt(DFS_NAMENODE_METRICS_LOGGER_PERIOD_SECONDS_KEY,
DFS_NAMENODE_METRICS_LOGGER_PERIOD_SECONDS_DEFAULT);
if (metricsLoggerPeriodSec <= 0) {
return;
}
MetricsLoggerTask.makeMetricsLoggerAsync(MetricsLog);
// Schedule the periodic logging.
metricsLoggerTimer = new ScheduledThreadPoolExecutor(1);
metricsLoggerTimer.setExecuteExistingDelayedTasksAfterShutdownPolicy(
false);
metricsLoggerTimer.scheduleWithFixedDelay(new MetricsLoggerTask(MetricsLog,
"NameNode", (short) 128),
metricsLoggerPeriodSec,
metricsLoggerPeriodSec,
TimeUnit.SECONDS);
}
protected void stopMetricsLogger() {
if (metricsLoggerTimer != null) {
metricsLoggerTimer.shutdown();
metricsLoggerTimer = null;
}
}
/**
* Create the RPC server implementation. Used as an extension point for the
* BackupNode.
*/
protected NameNodeRpcServer createRpcServer(Configuration conf)
throws IOException {
return new NameNodeRpcServer(conf, this);
}
/** Start the services common to active and standby states */
private void startCommonServices(Configuration conf) throws IOException {
namesystem.startCommonServices(conf, haContext);
registerNNSMXBean();
if (NamenodeRole.NAMENODE != role) {
startHttpServer(conf);
httpServer.setNameNodeAddress(getNameNodeAddress());
httpServer.setFSImage(getFSImage());
if (levelDBAliasMapServer != null) {
httpServer.setAliasMap(levelDBAliasMapServer.getAliasMap());
}
}
rpcServer.start();
try {
plugins = conf.getInstances(DFS_NAMENODE_PLUGINS_KEY,
ServicePlugin.class);
} catch (RuntimeException e) {
String pluginsValue = conf.get(DFS_NAMENODE_PLUGINS_KEY);
LOG.error("Unable to load NameNode plugins. Specified list of plugins: " +
pluginsValue, e);
throw e;
}
for (ServicePlugin p: plugins) {
try {
p.start(this);
} catch (Throwable t) {
LOG.warn("ServicePlugin " + p + " could not be started", t);
}
}
LOG.info(getRole() + " RPC up at: " + getNameNodeAddress());
if (rpcServer.getServiceRpcAddress() != null) {
LOG.info(getRole() + " service RPC up at: "
+ rpcServer.getServiceRpcAddress());
}
}
private void stopCommonServices() {
if(rpcServer != null) rpcServer.stop();
if(namesystem != null) namesystem.close();
if (pauseMonitor != null) pauseMonitor.stop();
if (plugins != null) {
for (ServicePlugin p : plugins) {
try {
p.stop();
} catch (Throwable t) {
LOG.warn("ServicePlugin " + p + " could not be stopped", t);
}
}
}
stopHttpServer();
}
private void startTrashEmptier(final Configuration conf) throws IOException {
long trashInterval =
conf.getLong(FS_TRASH_INTERVAL_KEY, FS_TRASH_INTERVAL_DEFAULT);
if (trashInterval == 0) {
return;
} else if (trashInterval < 0) {
throw new IOException("Cannot start trash emptier with negative interval."
+ " Set " + FS_TRASH_INTERVAL_KEY + " to a positive value.");
}
// This may be called from the transitionToActive code path, in which
// case the current user is the administrator, not the NN. The trash
// emptier needs to run as the NN. See HDFS-3972.
FileSystem fs = SecurityUtil.doAsLoginUser(
new PrivilegedExceptionAction<FileSystem>() {
@Override
public FileSystem run() throws IOException {
FileSystem dfs = new DistributedFileSystem();
dfs.initialize(FileSystem.getDefaultUri(conf), conf);
return dfs;
}
});
this.emptier = new Thread(new Trash(fs, conf).getEmptier(), "Trash Emptier");
this.emptier.setDaemon(true);
this.emptier.start();
}
private void stopTrashEmptier() {
if (this.emptier != null) {
emptier.interrupt();
emptier = null;
}
}
private void startHttpServer(final Configuration conf) throws IOException {
httpServer = new NameNodeHttpServer(conf, this, getHttpServerBindAddress(conf));
httpServer.start();
httpServer.setStartupProgress(startupProgress);
}
private void stopHttpServer() {
try {
if (httpServer != null) httpServer.stop();
} catch (Exception e) {
LOG.error("Exception while stopping httpserver", e);
}
}
/**
* Start NameNode.
* <p>
* The name-node can be started with one of the following startup options:
* <ul>
* <li>{@link StartupOption#REGULAR REGULAR} - normal name node startup</li>
* <li>{@link StartupOption#FORMAT FORMAT} - format name node</li>
* <li>{@link StartupOption#BACKUP BACKUP} - start backup node</li>
* <li>{@link StartupOption#CHECKPOINT CHECKPOINT} - start checkpoint node</li>
* <li>{@link StartupOption#UPGRADE UPGRADE} - start the cluster
* <li>{@link StartupOption#UPGRADEONLY UPGRADEONLY} - upgrade the cluster
* upgrade and create a snapshot of the current file system state</li>
* <li>{@link StartupOption#RECOVER RECOVERY} - recover name node
* metadata</li>
* <li>{@link StartupOption#ROLLBACK ROLLBACK} - roll the
* cluster back to the previous state</li>
* <li>{@link StartupOption#IMPORT IMPORT} - import checkpoint</li>
* </ul>
* The option is passed via configuration field:
* <tt>dfs.namenode.startup</tt>
*
* The conf will be modified to reflect the actual ports on which
* the NameNode is up and running if the user passes the port as
* <code>zero</code> in the conf.
*
* @param conf confirguration
* @throws IOException
*/
public NameNode(Configuration conf) throws IOException {
this(conf, NamenodeRole.NAMENODE);
}
protected NameNode(Configuration conf, NamenodeRole role)
throws IOException {
super(conf);
this.tracer = new Tracer.Builder("NameNode").
conf(TraceUtils.wrapHadoopConf(NAMENODE_HTRACE_PREFIX, conf)).
build();
this.role = role;
String nsId = getNameServiceId(conf);
String namenodeId = HAUtil.getNameNodeId(conf, nsId);
clientNamenodeAddress = NameNodeUtils.getClientNamenodeAddress(
conf, nsId);
if (clientNamenodeAddress != null) {
LOG.info("Clients should use {} to access"
+ " this namenode/service.", clientNamenodeAddress);
}
this.haEnabled = HAUtil.isHAEnabled(conf, nsId);
state = createHAState(getStartupOption(conf));
this.allowStaleStandbyReads = HAUtil.shouldAllowStandbyReads(conf);
this.haContext = createHAContext();
try {
initializeGenericKeys(conf, nsId, namenodeId);
initialize(getConf());
state.prepareToEnterState(haContext);
try {
haContext.writeLock();
state.enterState(haContext);
} finally {
haContext.writeUnlock();
}
} catch (IOException e) {
this.stopAtException(e);
throw e;
} catch (HadoopIllegalArgumentException e) {
this.stopAtException(e);
throw e;
}
notBecomeActiveInSafemode = conf.getBoolean(
DFS_HA_NN_NOT_BECOME_ACTIVE_IN_SAFEMODE,
DFS_HA_NN_NOT_BECOME_ACTIVE_IN_SAFEMODE_DEFAULT);
this.started.set(true);
}
private void stopAtException(Exception e){
try {
this.stop();
} catch (Exception ex) {
LOG.warn("Encountered exception when handling exception ("
+ e.getMessage() + "):", ex);
}
}
protected HAState createHAState(StartupOption startOpt) {
if (!haEnabled || startOpt == StartupOption.UPGRADE
|| startOpt == StartupOption.UPGRADEONLY) {
return ACTIVE_STATE;
} else if (startOpt == StartupOption.OBSERVER) {
return OBSERVER_STATE;
} else {
return STANDBY_STATE;
}
}
protected HAContext createHAContext() {
return new NameNodeHAContext();
}
/**
* Wait for service to finish.
* (Normally, it runs forever.)
*/
public void join() {
try {
rpcServer.join();
} catch (InterruptedException ie) {
LOG.info("Caught interrupted exception", ie);
}
}
/**
* Stop all NameNode threads and wait for all to finish.
*/
public void stop() {
synchronized(this) {
if (stopRequested)
return;
stopRequested = true;
}
try {
if (state != null) {
state.exitState(haContext);
}
} catch (ServiceFailedException e) {
LOG.warn("Encountered exception while exiting state", e);
} finally {
stopMetricsLogger();
stopCommonServices();
if (metrics != null) {
metrics.shutdown();
}
if (namesystem != null) {
namesystem.shutdown();
}
if (nameNodeStatusBeanName != null) {
MBeans.unregister(nameNodeStatusBeanName);
nameNodeStatusBeanName = null;
}
if (levelDBAliasMapServer != null) {
levelDBAliasMapServer.close();
}
}
tracer.close();
}
synchronized boolean isStopRequested() {
return stopRequested;
}
/**
* Is the cluster currently in safe mode?
*/
public boolean isInSafeMode() {
return namesystem.isInSafeMode();
}
/** get FSImage */
@VisibleForTesting
public FSImage getFSImage() {
return namesystem.getFSImage();
}
/**
* @return NameNode RPC address
*/
public InetSocketAddress getNameNodeAddress() {
return rpcServer.getRpcAddress();
}
/**
* @return The auxiliary nameNode RPC addresses, or empty set if there
* is none.
*/
public Set<InetSocketAddress> getAuxiliaryNameNodeAddresses() {
return rpcServer.getAuxiliaryRpcAddresses();
}
/**
* @return NameNode RPC address in "host:port" string form
*/
public String getNameNodeAddressHostPortString() {
return NetUtils.getHostPortString(getNameNodeAddress());
}
/**
* Return a host:port format string corresponds to an auxiliary
* port configured on NameNode. If there are multiple auxiliary ports,
* an arbitrary one is returned. If there is no auxiliary listener, returns
* null.
*
* @return a string of format host:port that points to an auxiliary NameNode
* address, or null if there is no such address.
*/
@VisibleForTesting
public String getNNAuxiliaryRpcAddress() {
Set<InetSocketAddress> auxiliaryAddrs = getAuxiliaryNameNodeAddresses();
if (auxiliaryAddrs.isEmpty()) {
return null;
}
// since set has no particular order, returning the first element of
// from the iterator is effectively arbitrary.
InetSocketAddress addr = auxiliaryAddrs.iterator().next();
return NetUtils.getHostPortString(addr);
}
/**
* @return NameNode service RPC address if configured, the
* NameNode RPC address otherwise
*/
public InetSocketAddress getServiceRpcAddress() {
final InetSocketAddress serviceAddr = rpcServer.getServiceRpcAddress();
return serviceAddr == null ? getNameNodeAddress() : serviceAddr;
}
/**
* @return NameNode HTTP address, used by the Web UI, image transfer,
* and HTTP-based file system clients like WebHDFS
*/
public InetSocketAddress getHttpAddress() {
return httpServer.getHttpAddress();
}
/**
* @return NameNode HTTPS address, used by the Web UI, image transfer,
* and HTTP-based file system clients like WebHDFS
*/
public InetSocketAddress getHttpsAddress() {
return httpServer.getHttpsAddress();
}
/**
* NameNodeHttpServer, used by unit tests to ensure a full shutdown,
* so that no bind exception is thrown during restart.
*/
@VisibleForTesting
public void joinHttpServer() {
if (httpServer != null) {
try {
httpServer.join();
} catch (InterruptedException e) {
LOG.info("Caught InterruptedException joining NameNodeHttpServer", e);
Thread.currentThread().interrupt();
}
}
}
/**
* Verify that configured directories exist, then
* Interactively confirm that formatting is desired
* for each existing directory and format them.
*
* @param conf configuration to use
* @param force if true, format regardless of whether dirs exist
* @return true if formatting was aborted, false otherwise
* @throws IOException
*/
private static boolean format(Configuration conf, boolean force,
boolean isInteractive) throws IOException {
String nsId = DFSUtil.getNamenodeNameServiceId(conf);
String namenodeId = HAUtil.getNameNodeId(conf, nsId);
initializeGenericKeys(conf, nsId, namenodeId);
checkAllowFormat(conf);
if (UserGroupInformation.isSecurityEnabled()) {
InetSocketAddress socAddr = DFSUtilClient.getNNAddress(conf);
SecurityUtil.login(conf, DFS_NAMENODE_KEYTAB_FILE_KEY,
DFS_NAMENODE_KERBEROS_PRINCIPAL_KEY, socAddr.getHostName());
}
Collection<URI> nameDirsToFormat = FSNamesystem.getNamespaceDirs(conf);
List<URI> sharedDirs = FSNamesystem.getSharedEditsDirs(conf);
List<URI> dirsToPrompt = new ArrayList<URI>();
dirsToPrompt.addAll(nameDirsToFormat);
dirsToPrompt.addAll(sharedDirs);
List<URI> editDirsToFormat =
FSNamesystem.getNamespaceEditsDirs(conf);
// if clusterID is not provided - see if you can find the current one
String clusterId = StartupOption.FORMAT.getClusterId();
if(clusterId == null || clusterId.equals("")) {
//Generate a new cluster id
clusterId = NNStorage.newClusterID();
}
LOG.info("Formatting using clusterid: {}", clusterId);
FSImage fsImage = new FSImage(conf, nameDirsToFormat, editDirsToFormat);
FSNamesystem fsn = null;
try {
fsn = new FSNamesystem(conf, fsImage);
fsImage.getEditLog().initJournalsForWrite();
// Abort NameNode format if reformat is disabled and if
// meta-dir already exists
if (conf.getBoolean(DFSConfigKeys.DFS_REFORMAT_DISABLED,
DFSConfigKeys.DFS_REFORMAT_DISABLED_DEFAULT)) {
force = false;
isInteractive = false;
for (StorageDirectory sd : fsImage.storage.dirIterable(null)) {
if (sd.hasSomeData()) {
throw new NameNodeFormatException(
"NameNode format aborted as reformat is disabled for "
+ "this cluster.");
}
}
}
if (!fsImage.confirmFormat(force, isInteractive)) {
return true; // aborted
}
fsImage.format(fsn, clusterId, force);
} catch (IOException ioe) {
LOG.warn("Encountered exception during format", ioe);
throw ioe;
} finally {
if (fsImage != null) {
fsImage.close();
}
if (fsn != null) {
fsn.close();
}
}
return false;
}
public static void checkAllowFormat(Configuration conf) throws IOException {
if (!conf.getBoolean(DFS_NAMENODE_SUPPORT_ALLOW_FORMAT_KEY,
DFS_NAMENODE_SUPPORT_ALLOW_FORMAT_DEFAULT)) {
throw new IOException("The option " + DFS_NAMENODE_SUPPORT_ALLOW_FORMAT_KEY
+ " is set to false for this filesystem, so it "
+ "cannot be formatted. You will need to set "
+ DFS_NAMENODE_SUPPORT_ALLOW_FORMAT_KEY +" parameter "
+ "to true in order to format this filesystem");
}
}
@VisibleForTesting
public static boolean initializeSharedEdits(Configuration conf) throws IOException {
return initializeSharedEdits(conf, true);
}
@VisibleForTesting
public static boolean initializeSharedEdits(Configuration conf,
boolean force) throws IOException {
return initializeSharedEdits(conf, force, false);
}
/**
* Clone the supplied configuration but remove the shared edits dirs.
*
* @param conf Supplies the original configuration.
* @return Cloned configuration without the shared edit dirs.
* @throws IOException on failure to generate the configuration.
*/
private static Configuration getConfigurationWithoutSharedEdits(
Configuration conf)
throws IOException {
List<URI> editsDirs = FSNamesystem.getNamespaceEditsDirs(conf, false);
String editsDirsString = Joiner.on(",").join(editsDirs);
Configuration confWithoutShared = new Configuration(conf);
confWithoutShared.unset(DFSConfigKeys.DFS_NAMENODE_SHARED_EDITS_DIR_KEY);
confWithoutShared.setStrings(DFSConfigKeys.DFS_NAMENODE_EDITS_DIR_KEY,
editsDirsString);
return confWithoutShared;
}
/**
* Format a new shared edits dir and copy in enough edit log segments so that
* the standby NN can start up.
*
* @param conf configuration
* @param force format regardless of whether or not the shared edits dir exists
* @param interactive prompt the user when a dir exists
* @return true if the command aborts, false otherwise
*/
private static boolean initializeSharedEdits(Configuration conf,
boolean force, boolean interactive) throws IOException {
String nsId = DFSUtil.getNamenodeNameServiceId(conf);
String namenodeId = HAUtil.getNameNodeId(conf, nsId);
initializeGenericKeys(conf, nsId, namenodeId);
if (conf.get(DFSConfigKeys.DFS_NAMENODE_SHARED_EDITS_DIR_KEY) == null) {
LOG.error("No shared edits directory configured for namespace " +
nsId + " namenode " + namenodeId);
return false;
}
if (UserGroupInformation.isSecurityEnabled()) {
InetSocketAddress socAddr = DFSUtilClient.getNNAddress(conf);
SecurityUtil.login(conf, DFS_NAMENODE_KEYTAB_FILE_KEY,
DFS_NAMENODE_KERBEROS_PRINCIPAL_KEY, socAddr.getHostName());
}
NNStorage existingStorage = null;
FSImage sharedEditsImage = null;
try {
FSNamesystem fsns =
FSNamesystem.loadFromDisk(getConfigurationWithoutSharedEdits(conf));
existingStorage = fsns.getFSImage().getStorage();
NamespaceInfo nsInfo = existingStorage.getNamespaceInfo();
List<URI> sharedEditsDirs = FSNamesystem.getSharedEditsDirs(conf);
sharedEditsImage = new FSImage(conf,
Lists.<URI>newArrayList(),
sharedEditsDirs);
sharedEditsImage.getEditLog().initJournalsForWrite();
if (!sharedEditsImage.confirmFormat(force, interactive)) {
return true; // abort
}
NNStorage newSharedStorage = sharedEditsImage.getStorage();
// Call Storage.format instead of FSImage.format here, since we don't
// actually want to save a checkpoint - just prime the dirs with
// the existing namespace info
newSharedStorage.format(nsInfo);
sharedEditsImage.getEditLog().formatNonFileJournals(nsInfo, force);
// Need to make sure the edit log segments are in good shape to initialize
// the shared edits dir.
fsns.getFSImage().getEditLog().close();
fsns.getFSImage().getEditLog().initJournalsForWrite();
fsns.getFSImage().getEditLog().recoverUnclosedStreams();
copyEditLogSegmentsToSharedDir(fsns, sharedEditsDirs, newSharedStorage,
conf);
} catch (IOException ioe) {
LOG.error("Could not initialize shared edits dir", ioe);
return true; // aborted
} finally {
if (sharedEditsImage != null) {
try {
sharedEditsImage.close();
} catch (IOException ioe) {
LOG.warn("Could not close sharedEditsImage", ioe);
}
}
// Have to unlock storage explicitly for the case when we're running in a
// unit test, which runs in the same JVM as NNs.
if (existingStorage != null) {
try {
existingStorage.unlockAll();
} catch (IOException ioe) {
LOG.warn("Could not unlock storage directories", ioe);
return true; // aborted
}
}
}
return false; // did not abort
}
private static void copyEditLogSegmentsToSharedDir(FSNamesystem fsns,
Collection<URI> sharedEditsDirs, NNStorage newSharedStorage,
Configuration conf) throws IOException {
Preconditions.checkArgument(!sharedEditsDirs.isEmpty(),
"No shared edits specified");
// Copy edit log segments into the new shared edits dir.
List<URI> sharedEditsUris = new ArrayList<URI>(sharedEditsDirs);
FSEditLog newSharedEditLog = new FSEditLog(conf, newSharedStorage,
sharedEditsUris);
newSharedEditLog.initJournalsForWrite();
newSharedEditLog.recoverUnclosedStreams();
FSEditLog sourceEditLog = fsns.getFSImage().editLog;
long fromTxId = fsns.getFSImage().getMostRecentCheckpointTxId();
Collection<EditLogInputStream> streams = null;
try {
streams = sourceEditLog.selectInputStreams(fromTxId + 1, 0);
// Set the nextTxid to the CheckpointTxId+1
newSharedEditLog.setNextTxId(fromTxId + 1);
// Copy all edits after last CheckpointTxId to shared edits dir
for (EditLogInputStream stream : streams) {
LOG.debug("Beginning to copy stream {} to shared edits", stream);
FSEditLogOp op;
boolean segmentOpen = false;
while ((op = stream.readOp()) != null) {
LOG.trace("copying op: {}", op);
if (!segmentOpen) {
newSharedEditLog.startLogSegment(op.txid, false,
fsns.getEffectiveLayoutVersion());
segmentOpen = true;
}
newSharedEditLog.logEdit(op);
if (op.opCode == FSEditLogOpCodes.OP_END_LOG_SEGMENT) {
newSharedEditLog.endCurrentLogSegment(false);
LOG.debug("ending log segment because of END_LOG_SEGMENT op in {}",
stream);
segmentOpen = false;
}
}
if (segmentOpen) {
LOG.debug("ending log segment because of end of stream in {}",
stream);
newSharedEditLog.logSync();
newSharedEditLog.endCurrentLogSegment(false);
segmentOpen = false;
}
}
} finally {
if (streams != null) {
FSEditLog.closeAllStreams(streams);
}
}
}
@VisibleForTesting
public static boolean doRollback(Configuration conf,
boolean isConfirmationNeeded) throws IOException {
String nsId = DFSUtil.getNamenodeNameServiceId(conf);
String namenodeId = HAUtil.getNameNodeId(conf, nsId);
initializeGenericKeys(conf, nsId, namenodeId);
FSNamesystem nsys = new FSNamesystem(conf, new FSImage(conf));
System.err.print(
"\"rollBack\" will remove the current state of the file system,\n"
+ "returning you to the state prior to initiating your recent.\n"
+ "upgrade. This action is permanent and cannot be undone. If you\n"
+ "are performing a rollback in an HA environment, you should be\n"
+ "certain that no NameNode process is running on any host.");
if (isConfirmationNeeded) {
if (!confirmPrompt("Roll back file system state?")) {
System.err.println("Rollback aborted.");
return true;
}
}
nsys.getFSImage().doRollback(nsys);
return false;
}
private static void printUsage(PrintStream out) {
out.println(USAGE + "\n");
}
@VisibleForTesting
static StartupOption parseArguments(String args[]) {
int argsLen = (args == null) ? 0 : args.length;
StartupOption startOpt = StartupOption.REGULAR;
for(int i=0; i < argsLen; i++) {
String cmd = args[i];
if (StartupOption.FORMAT.getName().equalsIgnoreCase(cmd)) {
startOpt = StartupOption.FORMAT;
for (i = i + 1; i < argsLen; i++) {
if (args[i].equalsIgnoreCase(StartupOption.CLUSTERID.getName())) {
i++;
if (i >= argsLen) {
// if no cluster id specified, return null
LOG.error("Must specify a valid cluster ID after the "
+ StartupOption.CLUSTERID.getName() + " flag");
return null;
}
String clusterId = args[i];
// Make sure an id is specified and not another flag
if (clusterId.isEmpty() ||
clusterId.equalsIgnoreCase(StartupOption.FORCE.getName()) ||
clusterId.equalsIgnoreCase(
StartupOption.NONINTERACTIVE.getName())) {
LOG.error("Must specify a valid cluster ID after the "
+ StartupOption.CLUSTERID.getName() + " flag");
return null;
}
startOpt.setClusterId(clusterId);
}
if (args[i].equalsIgnoreCase(StartupOption.FORCE.getName())) {
startOpt.setForceFormat(true);
}
if (args[i].equalsIgnoreCase(StartupOption.NONINTERACTIVE.getName())) {
startOpt.setInteractiveFormat(false);
}
}
} else if (StartupOption.GENCLUSTERID.getName().equalsIgnoreCase(cmd)) {
startOpt = StartupOption.GENCLUSTERID;
} else if (StartupOption.REGULAR.getName().equalsIgnoreCase(cmd)) {
startOpt = StartupOption.REGULAR;
} else if (StartupOption.BACKUP.getName().equalsIgnoreCase(cmd)) {
startOpt = StartupOption.BACKUP;
} else if (StartupOption.CHECKPOINT.getName().equalsIgnoreCase(cmd)) {
startOpt = StartupOption.CHECKPOINT;
} else if (StartupOption.OBSERVER.getName().equalsIgnoreCase(cmd)) {
startOpt = StartupOption.OBSERVER;
} else if (StartupOption.UPGRADE.getName().equalsIgnoreCase(cmd)
|| StartupOption.UPGRADEONLY.getName().equalsIgnoreCase(cmd)) {
startOpt = StartupOption.UPGRADE.getName().equalsIgnoreCase(cmd) ?
StartupOption.UPGRADE : StartupOption.UPGRADEONLY;
/* Can be followed by CLUSTERID with a required parameter or
* RENAMERESERVED with an optional parameter
*/
while (i + 1 < argsLen) {
String flag = args[i + 1];
if (flag.equalsIgnoreCase(StartupOption.CLUSTERID.getName())) {
if (i + 2 < argsLen) {
i += 2;
startOpt.setClusterId(args[i]);
} else {
LOG.error("Must specify a valid cluster ID after the "
+ StartupOption.CLUSTERID.getName() + " flag");
return null;
}
} else if (flag.equalsIgnoreCase(StartupOption.RENAMERESERVED
.getName())) {
if (i + 2 < argsLen) {
FSImageFormat.setRenameReservedPairs(args[i + 2]);
i += 2;
} else {
FSImageFormat.useDefaultRenameReservedPairs();
i += 1;
}
} else {
LOG.error("Unknown upgrade flag: {}", flag);
return null;
}
}
} else if (StartupOption.ROLLINGUPGRADE.getName().equalsIgnoreCase(cmd)) {
startOpt = StartupOption.ROLLINGUPGRADE;
++i;
if (i >= argsLen) {
LOG.error("Must specify a rolling upgrade startup option "
+ RollingUpgradeStartupOption.getAllOptionString());
return null;
}
startOpt.setRollingUpgradeStartupOption(args[i]);
} else if (StartupOption.ROLLBACK.getName().equalsIgnoreCase(cmd)) {
startOpt = StartupOption.ROLLBACK;
} else if (StartupOption.IMPORT.getName().equalsIgnoreCase(cmd)) {
startOpt = StartupOption.IMPORT;
} else if (StartupOption.BOOTSTRAPSTANDBY.getName().equalsIgnoreCase(cmd)) {
startOpt = StartupOption.BOOTSTRAPSTANDBY;
return startOpt;
} else if (StartupOption.INITIALIZESHAREDEDITS.getName().equalsIgnoreCase(cmd)) {
startOpt = StartupOption.INITIALIZESHAREDEDITS;
for (i = i + 1 ; i < argsLen; i++) {
if (StartupOption.NONINTERACTIVE.getName().equals(args[i])) {
startOpt.setInteractiveFormat(false);
} else if (StartupOption.FORCE.getName().equals(args[i])) {
startOpt.setForceFormat(true);
} else {
LOG.error("Invalid argument: " + args[i]);
return null;
}
}
return startOpt;
} else if (StartupOption.RECOVER.getName().equalsIgnoreCase(cmd)) {
if (startOpt != StartupOption.REGULAR) {
throw new RuntimeException("Can't combine -recover with " +
"other startup options.");
}
startOpt = StartupOption.RECOVER;
while (++i < argsLen) {
if (args[i].equalsIgnoreCase(
StartupOption.FORCE.getName())) {
startOpt.setForce(MetaRecoveryContext.FORCE_FIRST_CHOICE);
} else {
throw new RuntimeException("Error parsing recovery options: " +
"can't understand option \"" + args[i] + "\"");
}
}
} else if (StartupOption.METADATAVERSION.getName().equalsIgnoreCase(cmd)) {
startOpt = StartupOption.METADATAVERSION;
} else {
return null;
}
}
return startOpt;
}
private static void setStartupOption(Configuration conf, StartupOption opt) {
conf.set(DFS_NAMENODE_STARTUP_KEY, opt.name());
}
public static StartupOption getStartupOption(Configuration conf) {
return StartupOption.valueOf(conf.get(DFS_NAMENODE_STARTUP_KEY,
StartupOption.REGULAR.toString()));
}
private static void doRecovery(StartupOption startOpt, Configuration conf)
throws IOException {
String nsId = DFSUtil.getNamenodeNameServiceId(conf);
String namenodeId = HAUtil.getNameNodeId(conf, nsId);
initializeGenericKeys(conf, nsId, namenodeId);
if (startOpt.getForce() < MetaRecoveryContext.FORCE_ALL) {
if (!confirmPrompt("You have selected Metadata Recovery mode. " +
"This mode is intended to recover lost metadata on a corrupt " +
"filesystem. Metadata recovery mode often permanently deletes " +
"data from your HDFS filesystem. Please back up your edit log " +
"and fsimage before trying this!\n\n" +
"Are you ready to proceed? (Y/N)\n")) {
System.err.println("Recovery aborted at user request.\n");
return;
}
}
MetaRecoveryContext.LOG.info("starting recovery...");
UserGroupInformation.setConfiguration(conf);
NameNode.initMetrics(conf, startOpt.toNodeRole());
FSNamesystem fsn = null;
try {
fsn = FSNamesystem.loadFromDisk(conf);
fsn.getFSImage().saveNamespace(fsn);
MetaRecoveryContext.LOG.info("RECOVERY COMPLETE");
} catch (IOException e) {
MetaRecoveryContext.LOG.info("RECOVERY FAILED: caught exception", e);
throw e;
} catch (RuntimeException e) {
MetaRecoveryContext.LOG.info("RECOVERY FAILED: caught exception", e);
throw e;
} finally {
if (fsn != null)
fsn.close();
}
}
/**
* Verify that configured directories exist, then print the metadata versions
* of the software and the image.
*
* @param conf configuration to use
* @throws IOException
*/
private static boolean printMetadataVersion(Configuration conf)
throws IOException {
final String nsId = DFSUtil.getNamenodeNameServiceId(conf);
final String namenodeId = HAUtil.getNameNodeId(conf, nsId);
NameNode.initializeGenericKeys(conf, nsId, namenodeId);
final FSImage fsImage = new FSImage(conf);
final FSNamesystem fs = new FSNamesystem(conf, fsImage, false);
return fsImage.recoverTransitionRead(
StartupOption.METADATAVERSION, fs, null);
}
public static NameNode createNameNode(String argv[], Configuration conf)
throws IOException {
LOG.info("createNameNode " + Arrays.asList(argv));
if (conf == null)
conf = new HdfsConfiguration();
// Parse out some generic args into Configuration.
GenericOptionsParser hParser = new GenericOptionsParser(conf, argv);
argv = hParser.getRemainingArgs();
// Parse the rest, NN specific args.
StartupOption startOpt = parseArguments(argv);
if (startOpt == null) {
printUsage(System.err);
return null;
}
setStartupOption(conf, startOpt);
boolean aborted = false;
switch (startOpt) {
case FORMAT:
aborted = format(conf, startOpt.getForceFormat(),
startOpt.getInteractiveFormat());
terminate(aborted ? 1 : 0);
return null; // avoid javac warning
case GENCLUSTERID:
String clusterID = NNStorage.newClusterID();
LOG.info("Generated new cluster id: {}", clusterID);
terminate(0);
return null;
case ROLLBACK:
aborted = doRollback(conf, true);
terminate(aborted ? 1 : 0);
return null; // avoid warning
case BOOTSTRAPSTANDBY:
String[] toolArgs = Arrays.copyOfRange(argv, 1, argv.length);
int rc = BootstrapStandby.run(toolArgs, conf);
terminate(rc);
return null; // avoid warning
case INITIALIZESHAREDEDITS:
aborted = initializeSharedEdits(conf,
startOpt.getForceFormat(),
startOpt.getInteractiveFormat());
terminate(aborted ? 1 : 0);
return null; // avoid warning
case BACKUP:
case CHECKPOINT:
NamenodeRole role = startOpt.toNodeRole();
DefaultMetricsSystem.initialize(role.toString().replace(" ", ""));
return new BackupNode(conf, role);
case RECOVER:
NameNode.doRecovery(startOpt, conf);
return null;
case METADATAVERSION:
printMetadataVersion(conf);
terminate(0);
return null; // avoid javac warning
case UPGRADEONLY:
DefaultMetricsSystem.initialize("NameNode");
new NameNode(conf);
terminate(0);
return null;
default:
DefaultMetricsSystem.initialize("NameNode");
return new NameNode(conf);
}
}
/**
* In federation configuration is set for a set of
* namenode and secondary namenode/backup/checkpointer, which are
* grouped under a logical nameservice ID. The configuration keys specific
* to them have suffix set to configured nameserviceId.
*
* This method copies the value from specific key of format key.nameserviceId
* to key, to set up the generic configuration. Once this is done, only
* generic version of the configuration is read in rest of the code, for
* backward compatibility and simpler code changes.
*
* @param conf
* Configuration object to lookup specific key and to set the value
* to the key passed. Note the conf object is modified
* @param nameserviceId name service Id (to distinguish federated NNs)
* @param namenodeId the namenode ID (to distinguish HA NNs)
* @see DFSUtil#setGenericConf(Configuration, String, String, String...)
*/
public static void initializeGenericKeys(Configuration conf,
String nameserviceId, String namenodeId) {
if ((nameserviceId != null && !nameserviceId.isEmpty()) ||
(namenodeId != null && !namenodeId.isEmpty())) {
if (nameserviceId != null) {
conf.set(DFS_NAMESERVICE_ID, nameserviceId);
}
if (namenodeId != null) {
conf.set(DFS_HA_NAMENODE_ID_KEY, namenodeId);
}
DFSUtil.setGenericConf(conf, nameserviceId, namenodeId,
NAMENODE_SPECIFIC_KEYS);
DFSUtil.setGenericConf(conf, nameserviceId, null,
NAMESERVICE_SPECIFIC_KEYS);
}
// If the RPC address is set use it to (re-)configure the default FS
if (conf.get(DFS_NAMENODE_RPC_ADDRESS_KEY) != null) {
URI defaultUri = URI.create(HdfsConstants.HDFS_URI_SCHEME + "://"
+ conf.get(DFS_NAMENODE_RPC_ADDRESS_KEY));
conf.set(FS_DEFAULT_NAME_KEY, defaultUri.toString());
LOG.debug("Setting {} to {}", FS_DEFAULT_NAME_KEY, defaultUri);
}
}
/**
* Get the name service Id for the node
* @return name service Id or null if federation is not configured
*/
protected String getNameServiceId(Configuration conf) {
return DFSUtil.getNamenodeNameServiceId(conf);
}
/**
*/
public static void main(String argv[]) throws Exception {
if (DFSUtil.parseHelpArgument(argv, NameNode.USAGE, System.out, true)) {
System.exit(0);
}
try {
StringUtils.startupShutdownMessage(NameNode.class, argv, LOG);
NameNode namenode = createNameNode(argv, null);
if (namenode != null) {
namenode.join();
}
} catch (Throwable e) {
LOG.error("Failed to start namenode.", e);
terminate(1, e);
}
}
synchronized void monitorHealth() throws IOException {
String operationName = "monitorHealth";
namesystem.checkSuperuserPrivilege(operationName);
if (!haEnabled) {
return; // no-op, if HA is not enabled
}
long start = Time.monotonicNow();
getNamesystem().checkAvailableResources();
long end = Time.monotonicNow();
if (end - start >= HEALTH_MONITOR_WARN_THRESHOLD_MS) {
// log a warning if it take >= 5 seconds.
LOG.warn("Remote IP {} checking available resources took {}ms",
Server.getRemoteIp(), end - start);
}
if (!getNamesystem().nameNodeHasResourcesAvailable()) {
throw new HealthCheckFailedException(
"The NameNode has no resources available");
}
if (notBecomeActiveInSafemode && isInSafeMode()) {
throw new HealthCheckFailedException("The NameNode is configured to " +
"report UNHEALTHY to ZKFC in Safemode.");
}
}
synchronized void transitionToActive() throws IOException {
String operationName = "transitionToActive";
namesystem.checkSuperuserPrivilege(operationName);
if (!haEnabled) {
throw new ServiceFailedException("HA for namenode is not enabled");
}
if (state == OBSERVER_STATE) {
throw new ServiceFailedException(
"Cannot transition from '" + OBSERVER_STATE + "' to '" +
ACTIVE_STATE + "'");
}
if (notBecomeActiveInSafemode && isInSafeMode()) {
throw new ServiceFailedException(getRole() + " still not leave safemode");
}
state.setState(haContext, ACTIVE_STATE);
}
synchronized void transitionToStandby() throws IOException {
String operationName = "transitionToStandby";
namesystem.checkSuperuserPrivilege(operationName);
if (!haEnabled) {
throw new ServiceFailedException("HA for namenode is not enabled");
}
state.setState(haContext, STANDBY_STATE);
}
synchronized void transitionToObserver() throws IOException {
String operationName = "transitionToObserver";
namesystem.checkSuperuserPrivilege(operationName);
if (!haEnabled) {
throw new ServiceFailedException("HA for namenode is not enabled");
}
// Transition from ACTIVE to OBSERVER is forbidden.
if (state == ACTIVE_STATE) {
throw new ServiceFailedException(
"Cannot transition from '" + ACTIVE_STATE + "' to '" +
OBSERVER_STATE + "'");
}
state.setState(haContext, OBSERVER_STATE);
}
synchronized HAServiceStatus getServiceStatus()
throws ServiceFailedException, AccessControlException {
if (!haEnabled) {
throw new ServiceFailedException("HA for namenode is not enabled");
}
if (state == null) {
return new HAServiceStatus(HAServiceState.INITIALIZING);
}
HAServiceState retState = state.getServiceState();
HAServiceStatus ret = new HAServiceStatus(retState);
if (retState == HAServiceState.STANDBY) {
if (namesystem.isInSafeMode()) {
ret.setNotReadyToBecomeActive("The NameNode is in safemode. " +
namesystem.getSafeModeTip());
} else {
ret.setReadyToBecomeActive();
}
} else if (retState == HAServiceState.ACTIVE) {
ret.setReadyToBecomeActive();
} else {
ret.setNotReadyToBecomeActive("State is " + state);
}
return ret;
}
synchronized HAServiceState getServiceState() {
if (state == null) {
return HAServiceState.INITIALIZING;
}
return state.getServiceState();
}
/**
* Register NameNodeStatusMXBean
*/
private void registerNNSMXBean() {
nameNodeStatusBeanName = MBeans.register("NameNode", "NameNodeStatus", this);
}
@Override // NameNodeStatusMXBean
public String getNNRole() {
NamenodeRole role = getRole();
return Objects.toString(role, "");
}
@Override // NameNodeStatusMXBean
public String getState() {
HAServiceState servState = getServiceState();
return Objects.toString(servState, "");
}
@Override // NameNodeStatusMXBean
public String getHostAndPort() {
return getNameNodeAddressHostPortString();
}
@Override // NameNodeStatusMXBean
public boolean isSecurityEnabled() {
return UserGroupInformation.isSecurityEnabled();
}
@Override // NameNodeStatusMXBean
public long getLastHATransitionTime() {
return state.getLastHATransitionTime();
}
@Override //NameNodeStatusMXBean
public long getBytesWithFutureGenerationStamps() {
return getNamesystem().getBytesInFuture();
}
@Override
public String getSlowPeersReport() {
return namesystem.getBlockManager().getDatanodeManager()
.getSlowPeersReport();
}
@Override //NameNodeStatusMXBean
public String getSlowDisksReport() {
return namesystem.getBlockManager().getDatanodeManager()
.getSlowDisksReport();
}
/**
* Shutdown the NN immediately in an ungraceful way. Used when it would be
* unsafe for the NN to continue operating, e.g. during a failed HA state
* transition.
*
* @param t exception which warrants the shutdown. Printed to the NN log
* before exit.
* @throws ExitException thrown only for testing.
*/
protected synchronized void doImmediateShutdown(Throwable t)
throws ExitException {
try {
LOG.error("Error encountered requiring NN shutdown. " +
"Shutting down immediately.", t);
} catch (Throwable ignored) {
// This is unlikely to happen, but there's nothing we can do if it does.
}
terminate(1, t);
}
/**
* Class used to expose {@link NameNode} as context to {@link HAState}
*/
protected class NameNodeHAContext implements HAContext {
@Override
public void setState(HAState s) {
state = s;
}
@Override
public HAState getState() {
return state;
}
@Override
public void startActiveServices() throws IOException {
try {
namesystem.startActiveServices();
namesystem.checkAndProvisionSnapshotTrashRoots();
startTrashEmptier(getConf());
} catch (Throwable t) {
doImmediateShutdown(t);
}
}
@Override
public void stopActiveServices() throws IOException {
try {
if (namesystem != null) {
namesystem.stopActiveServices();
}
stopTrashEmptier();
} catch (Throwable t) {
doImmediateShutdown(t);
}
}
@Override
public void startStandbyServices() throws IOException {
try {
namesystem.startStandbyServices(getConf(),
state == NameNode.OBSERVER_STATE);
} catch (Throwable t) {
doImmediateShutdown(t);
}
}
@Override
public void prepareToStopStandbyServices() throws ServiceFailedException {
try {
namesystem.prepareToStopStandbyServices();
} catch (Throwable t) {
doImmediateShutdown(t);
}
}
@Override
public void stopStandbyServices() throws IOException {
try {
if (namesystem != null) {
namesystem.stopStandbyServices();
}
} catch (Throwable t) {
doImmediateShutdown(t);
}
}
@Override
public void writeLock() {
namesystem.writeLock();
namesystem.lockRetryCache();
}
@Override
public void writeUnlock() {
namesystem.unlockRetryCache();
namesystem.writeUnlock("HAState");
}
/** Check if an operation of given category is allowed */
@Override
public void checkOperation(final OperationCategory op)
throws StandbyException {
state.checkOperation(haContext, op);
}
@Override
public boolean allowStaleReads() {
if (state == OBSERVER_STATE) {
return true;
}
return allowStaleStandbyReads;
}
}
public boolean isStandbyState() {
return (state.equals(STANDBY_STATE));
}
public boolean isActiveState() {
return (state.equals(ACTIVE_STATE));
}
public boolean isObserverState() {
return state.equals(OBSERVER_STATE);
}
/**
* Returns whether the NameNode is completely started
*/
boolean isStarted() {
return this.started.get();
}
/**
* Check that a request to change this node's HA state is valid.
* In particular, verifies that, if auto failover is enabled, non-forced
* requests from the HAAdmin CLI are rejected, and vice versa.
*
* @param req the request to check
* @throws AccessControlException if the request is disallowed
*/
void checkHaStateChange(StateChangeRequestInfo req)
throws AccessControlException {
boolean autoHaEnabled = getConf().getBoolean(
DFS_HA_AUTO_FAILOVER_ENABLED_KEY, DFS_HA_AUTO_FAILOVER_ENABLED_DEFAULT);
switch (req.getSource()) {
case REQUEST_BY_USER:
if (autoHaEnabled) {
throw new AccessControlException(
"Manual HA control for this NameNode is disallowed, because " +
"automatic HA is enabled.");
}
break;
case REQUEST_BY_USER_FORCED:
if (autoHaEnabled) {
LOG.warn("Allowing manual HA control from " +
Server.getRemoteAddress() +
" even though automatic HA is enabled, because the user " +
"specified the force flag");
}
break;
case REQUEST_BY_ZKFC:
if (!autoHaEnabled) {
throw new AccessControlException(
"Request from ZK failover controller at " +
Server.getRemoteAddress() + " denied since automatic HA " +
"is not enabled");
}
break;
}
}
/*
* {@inheritDoc}
* */
@Override // ReconfigurableBase
public Collection<String> getReconfigurableProperties() {
return reconfigurableProperties;
}
/*
* {@inheritDoc}
* */
@Override // ReconfigurableBase
protected String reconfigurePropertyImpl(String property, String newVal)
throws ReconfigurationException {
final DatanodeManager datanodeManager = namesystem.getBlockManager()
.getDatanodeManager();
if (property.equals(DFS_HEARTBEAT_INTERVAL_KEY)) {
return reconfHeartbeatInterval(datanodeManager, property, newVal);
} else if (property.equals(DFS_NAMENODE_HEARTBEAT_RECHECK_INTERVAL_KEY)) {
return reconfHeartbeatRecheckInterval(datanodeManager, property, newVal);
} else if (property.equals(FS_PROTECTED_DIRECTORIES)) {
return reconfProtectedDirectories(newVal);
} else if (property.equals(HADOOP_CALLER_CONTEXT_ENABLED_KEY)) {
return reconfCallerContextEnabled(newVal);
} else if (property.equals(ipcClientRPCBackoffEnable)) {
return reconfigureIPCBackoffEnabled(newVal);
} else if (property.equals(DFS_STORAGE_POLICY_SATISFIER_MODE_KEY)) {
return reconfigureSPSModeEvent(newVal, property);
} else if (property.equals(DFS_NAMENODE_REPLICATION_MAX_STREAMS_KEY)
|| property.equals(DFS_NAMENODE_REPLICATION_STREAMS_HARD_LIMIT_KEY)
|| property.equals(
DFS_NAMENODE_REPLICATION_WORK_MULTIPLIER_PER_ITERATION)) {
return reconfReplicationParameters(newVal, property);
} else if (property.equals(DFS_BLOCK_REPLICATOR_CLASSNAME_KEY) || property
.equals(DFS_BLOCK_PLACEMENT_EC_CLASSNAME_KEY)) {
reconfBlockPlacementPolicy();
return newVal;
} else if (property.equals(DFS_IMAGE_PARALLEL_LOAD_KEY)) {
return reconfigureParallelLoad(newVal);
} else if (property.equals(DFS_NAMENODE_AVOID_SLOW_DATANODE_FOR_READ_KEY) || (property.equals(
DFS_NAMENODE_BLOCKPLACEMENTPOLICY_EXCLUDE_SLOW_NODES_ENABLED_KEY)) || (property.equals(
DFS_NAMENODE_MAX_SLOWPEER_COLLECT_NODES_KEY)) || (property.equals(
DFS_DATANODE_PEER_STATS_ENABLED_KEY)) || property.equals(
DFS_DATANODE_MAX_NODES_TO_REPORT_KEY)) {
return reconfigureSlowNodesParameters(datanodeManager, property, newVal);
} else if (property.equals(DFS_BLOCK_INVALIDATE_LIMIT_KEY)) {
return reconfigureBlockInvalidateLimit(datanodeManager, property, newVal);
} else {
throw new ReconfigurationException(property, newVal, getConf().get(
property));
}
}
private String reconfReplicationParameters(final String newVal,
final String property) throws ReconfigurationException {
BlockManager bm = namesystem.getBlockManager();
int newSetting;
namesystem.writeLock();
try {
if (property.equals(DFS_NAMENODE_REPLICATION_MAX_STREAMS_KEY)) {
bm.setMaxReplicationStreams(
adjustNewVal(DFS_NAMENODE_REPLICATION_MAX_STREAMS_DEFAULT, newVal));
newSetting = bm.getMaxReplicationStreams();
} else if (property.equals(
DFS_NAMENODE_REPLICATION_STREAMS_HARD_LIMIT_KEY)) {
bm.setReplicationStreamsHardLimit(
adjustNewVal(DFS_NAMENODE_REPLICATION_STREAMS_HARD_LIMIT_DEFAULT,
newVal));
newSetting = bm.getReplicationStreamsHardLimit();
} else if (
property.equals(
DFS_NAMENODE_REPLICATION_WORK_MULTIPLIER_PER_ITERATION)) {
bm.setBlocksReplWorkMultiplier(
adjustNewVal(
DFS_NAMENODE_REPLICATION_WORK_MULTIPLIER_PER_ITERATION_DEFAULT,
newVal));
newSetting = bm.getBlocksReplWorkMultiplier();
} else {
throw new IllegalArgumentException("Unexpected property " +
property + " in reconfReplicationParameters");
}
LOG.info("RECONFIGURE* changed {} to {}", property, newSetting);
return String.valueOf(newSetting);
} catch (IllegalArgumentException e) {
throw new ReconfigurationException(property, newVal, getConf().get(
property), e);
} finally {
namesystem.writeUnlock("reconfReplicationParameters");
}
}
private void reconfBlockPlacementPolicy() {
getNamesystem().getBlockManager()
.refreshBlockPlacementPolicy(getNewConf());
}
private int adjustNewVal(int defaultVal, String newVal) {
if (newVal == null) {
return defaultVal;
} else {
return Integer.parseInt(newVal);
}
}
private String reconfHeartbeatInterval(final DatanodeManager datanodeManager,
final String property, final String newVal)
throws ReconfigurationException {
namesystem.writeLock();
try {
if (newVal == null) {
// set to default
datanodeManager.setHeartbeatInterval(DFS_HEARTBEAT_INTERVAL_DEFAULT);
return String.valueOf(DFS_HEARTBEAT_INTERVAL_DEFAULT);
} else {
long newInterval = getConf()
.getTimeDurationHelper(DFS_HEARTBEAT_INTERVAL_KEY,
newVal, TimeUnit.SECONDS);
datanodeManager.setHeartbeatInterval(newInterval);
return String.valueOf(datanodeManager.getHeartbeatInterval());
}
} catch (NumberFormatException nfe) {
throw new ReconfigurationException(property, newVal, getConf().get(
property), nfe);
} finally {
namesystem.writeUnlock("reconfHeartbeatInterval");
LOG.info("RECONFIGURE* changed heartbeatInterval to "
+ datanodeManager.getHeartbeatInterval());
}
}
private String reconfHeartbeatRecheckInterval(
final DatanodeManager datanodeManager, final String property,
final String newVal) throws ReconfigurationException {
namesystem.writeLock();
try {
if (newVal == null) {
// set to default
datanodeManager.setHeartbeatRecheckInterval(
DFS_NAMENODE_HEARTBEAT_RECHECK_INTERVAL_DEFAULT);
return String.valueOf(DFS_NAMENODE_HEARTBEAT_RECHECK_INTERVAL_DEFAULT);
} else {
datanodeManager.setHeartbeatRecheckInterval(Integer.parseInt(newVal));
return String.valueOf(datanodeManager.getHeartbeatRecheckInterval());
}
} catch (NumberFormatException nfe) {
throw new ReconfigurationException(property, newVal, getConf().get(
property), nfe);
} finally {
namesystem.writeUnlock("reconfHeartbeatRecheckInterval");
LOG.info("RECONFIGURE* changed heartbeatRecheckInterval to "
+ datanodeManager.getHeartbeatRecheckInterval());
}
}
private String reconfProtectedDirectories(String newVal) {
return getNamesystem().getFSDirectory().setProtectedDirectories(newVal);
}
private String reconfCallerContextEnabled(String newVal) {
Boolean callerContextEnabled;
if (newVal == null) {
callerContextEnabled = HADOOP_CALLER_CONTEXT_ENABLED_DEFAULT;
} else {
callerContextEnabled = Boolean.parseBoolean(newVal);
}
namesystem.setCallerContextEnabled(callerContextEnabled);
return Boolean.toString(callerContextEnabled);
}
String reconfigureIPCBackoffEnabled(String newVal) {
boolean clientBackoffEnabled;
if (newVal == null) {
clientBackoffEnabled = IPC_BACKOFF_ENABLE_DEFAULT;
} else {
clientBackoffEnabled = Boolean.parseBoolean(newVal);
}
rpcServer.getClientRpcServer()
.setClientBackoffEnabled(clientBackoffEnabled);
return Boolean.toString(clientBackoffEnabled);
}
String reconfigureSPSModeEvent(String newVal, String property)
throws ReconfigurationException {
if (newVal == null
|| StoragePolicySatisfierMode.fromString(newVal) == null) {
throw new ReconfigurationException(property, newVal,
getConf().get(property),
new HadoopIllegalArgumentException(
"For enabling or disabling storage policy satisfier, must "
+ "pass either internal/external/none string value only"));
}
if (!isActiveState()) {
throw new ReconfigurationException(property, newVal,
getConf().get(property),
new HadoopIllegalArgumentException(
"Enabling or disabling storage policy satisfier service on "
+ state + " NameNode is not allowed"));
}
StoragePolicySatisfierMode mode = StoragePolicySatisfierMode
.fromString(newVal);
if (mode == StoragePolicySatisfierMode.NONE) {
// disabling sps service
if (namesystem.getBlockManager().getSPSManager() != null) {
namesystem.getBlockManager().getSPSManager().changeModeEvent(mode);
namesystem.getBlockManager().disableSPS();
}
} else {
// enabling sps service
boolean spsCreated = (namesystem.getBlockManager()
.getSPSManager() != null);
if (!spsCreated) {
spsCreated = namesystem.getBlockManager().createSPSManager(getConf(),
newVal);
}
if (spsCreated) {
namesystem.getBlockManager().getSPSManager().changeModeEvent(mode);
}
}
return newVal;
}
String reconfigureParallelLoad(String newVal) {
boolean enableParallelLoad;
if (newVal == null) {
enableParallelLoad = DFS_IMAGE_PARALLEL_LOAD_DEFAULT;
} else {
enableParallelLoad = Boolean.parseBoolean(newVal);
}
FSImageFormatProtobuf.refreshParallelSaveAndLoad(enableParallelLoad);
return Boolean.toString(enableParallelLoad);
}
String reconfigureSlowNodesParameters(final DatanodeManager datanodeManager,
final String property, final String newVal) throws ReconfigurationException {
BlockManager bm = namesystem.getBlockManager();
namesystem.writeLock();
String result;
try {
switch (property) {
case DFS_NAMENODE_AVOID_SLOW_DATANODE_FOR_READ_KEY: {
boolean enable = (newVal == null ?
DFS_NAMENODE_AVOID_SLOW_DATANODE_FOR_READ_DEFAULT :
Boolean.parseBoolean(newVal));
result = Boolean.toString(enable);
datanodeManager.setAvoidSlowDataNodesForReadEnabled(enable);
break;
}
case DFS_NAMENODE_BLOCKPLACEMENTPOLICY_EXCLUDE_SLOW_NODES_ENABLED_KEY: {
boolean enable = (newVal == null ?
DFS_NAMENODE_BLOCKPLACEMENTPOLICY_EXCLUDE_SLOW_NODES_ENABLED_DEFAULT :
Boolean.parseBoolean(newVal));
result = Boolean.toString(enable);
bm.setExcludeSlowNodesEnabled(enable);
break;
}
case DFS_NAMENODE_MAX_SLOWPEER_COLLECT_NODES_KEY: {
int maxSlowpeerCollectNodes = (newVal == null ?
DFS_NAMENODE_MAX_SLOWPEER_COLLECT_NODES_DEFAULT :
Integer.parseInt(newVal));
result = Integer.toString(maxSlowpeerCollectNodes);
datanodeManager.setMaxSlowpeerCollectNodes(maxSlowpeerCollectNodes);
break;
}
case DFS_DATANODE_PEER_STATS_ENABLED_KEY: {
Timer timer = new Timer();
if (newVal != null && !newVal.equalsIgnoreCase("true") && !newVal.equalsIgnoreCase(
"false")) {
throw new IllegalArgumentException(newVal + " is not boolean value");
}
final boolean peerStatsEnabled = newVal == null ?
DFS_DATANODE_PEER_STATS_ENABLED_DEFAULT :
Boolean.parseBoolean(newVal);
result = Boolean.toString(peerStatsEnabled);
datanodeManager.initSlowPeerTracker(getConf(), timer, peerStatsEnabled);
break;
}
case DFS_DATANODE_MAX_NODES_TO_REPORT_KEY: {
int maxSlowPeersToReport = (newVal == null
? DFS_DATANODE_MAX_NODES_TO_REPORT_DEFAULT : Integer.parseInt(newVal));
result = Integer.toString(maxSlowPeersToReport);
datanodeManager.setMaxSlowPeersToReport(maxSlowPeersToReport);
break;
}
default: {
throw new IllegalArgumentException(
"Unexpected property " + property + " in reconfigureSlowNodesParameters");
}
}
LOG.info("RECONFIGURE* changed {} to {}", property, newVal);
return result;
} catch (IllegalArgumentException e) {
throw new ReconfigurationException(property, newVal, getConf().get(
property), e);
} finally {
namesystem.writeUnlock("reconfigureSlowNodesParameters");
}
}
private String reconfigureBlockInvalidateLimit(final DatanodeManager datanodeManager,
final String property, final String newVal) throws ReconfigurationException {
namesystem.writeLock();
try {
if (newVal == null) {
datanodeManager.setBlockInvalidateLimit(DFSConfigKeys.DFS_BLOCK_INVALIDATE_LIMIT_DEFAULT);
} else {
datanodeManager.setBlockInvalidateLimit(Integer.parseInt(newVal));
}
final String updatedBlockInvalidateLimit =
String.valueOf(datanodeManager.getBlockInvalidateLimit());
LOG.info("RECONFIGURE* changed blockInvalidateLimit to {}", updatedBlockInvalidateLimit);
return updatedBlockInvalidateLimit;
} catch (NumberFormatException e) {
throw new ReconfigurationException(property, newVal, getConf().get(property), e);
} finally {
namesystem.writeUnlock("reconfigureBlockInvalidateLimit");
}
}
@Override // ReconfigurableBase
protected Configuration getNewConf() {
return new HdfsConfiguration();
}
}