blob: ff83e3493c616f619b32309191d4baf6fc964b0c [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hdfs.server.namenode;
import static org.apache.hadoop.util.ExitUtil.terminate;
import java.io.File;
import java.io.FilenameFilter;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.net.URI;
import java.net.URL;
import java.security.PrivilegedAction;
import java.security.PrivilegedExceptionAction;
import java.util.*;
import com.google.common.collect.Lists;
import org.apache.commons.cli.CommandLine;
import org.apache.commons.cli.CommandLineParser;
import org.apache.commons.cli.HelpFormatter;
import org.apache.commons.cli.Option;
import org.apache.commons.cli.OptionBuilder;
import org.apache.commons.cli.Options;
import org.apache.commons.cli.ParseException;
import org.apache.commons.cli.PosixParser;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.HadoopIllegalArgumentException;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.hdfs.DFSConfigKeys;
import org.apache.hadoop.hdfs.DFSUtil;
import org.apache.hadoop.hdfs.HAUtil;
import org.apache.hadoop.hdfs.HdfsConfiguration;
import org.apache.hadoop.hdfs.NameNodeProxies;
import org.apache.hadoop.hdfs.protocol.HdfsConstants;
import org.apache.hadoop.hdfs.server.common.HdfsServerConstants;
import org.apache.hadoop.hdfs.server.common.InconsistentFSStateException;
import org.apache.hadoop.hdfs.server.common.JspHelper;
import org.apache.hadoop.hdfs.server.common.Storage.StorageDirectory;
import org.apache.hadoop.hdfs.server.common.Storage.StorageState;
import org.apache.hadoop.hdfs.server.namenode.FileJournalManager.EditLogFile;
import org.apache.hadoop.hdfs.server.namenode.NNStorage.NameNodeDirType;
import org.apache.hadoop.hdfs.server.namenode.NNStorage.NameNodeFile;
import org.apache.hadoop.hdfs.server.namenode.NNStorageRetentionManager.StoragePurger;
import org.apache.hadoop.hdfs.server.protocol.NamenodeProtocol;
import org.apache.hadoop.hdfs.server.protocol.RemoteEditLog;
import org.apache.hadoop.hdfs.server.protocol.RemoteEditLogManifest;
import org.apache.hadoop.hdfs.util.Canceler;
import org.apache.hadoop.http.HttpConfig;
import org.apache.hadoop.http.HttpServer2;
import org.apache.hadoop.io.MD5Hash;
import org.apache.hadoop.ipc.RemoteException;
import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem;
import org.apache.hadoop.metrics2.source.JvmMetrics;
import org.apache.hadoop.metrics2.util.MBeans;
import org.apache.hadoop.net.NetUtils;
import org.apache.hadoop.security.SecurityUtil;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.util.Daemon;
import org.apache.hadoop.util.StringUtils;
import org.apache.hadoop.util.Time;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import org.apache.hadoop.util.VersionInfo;
import javax.management.ObjectName;
/**********************************************************
* The Secondary NameNode is a helper to the primary NameNode.
* The Secondary is responsible for supporting periodic checkpoints
* of the HDFS metadata. The current design allows only one Secondary
* NameNode per HDFs cluster.
*
* The Secondary NameNode is a daemon that periodically wakes
* up (determined by the schedule specified in the configuration),
* triggers a periodic checkpoint and then goes back to sleep.
* The Secondary NameNode uses the NamenodeProtocol to talk to the
* primary NameNode.
*
**********************************************************/
@InterfaceAudience.Private
public class SecondaryNameNode implements Runnable,
SecondaryNameNodeInfoMXBean {
static{
HdfsConfiguration.init();
}
public static final Log LOG =
LogFactory.getLog(SecondaryNameNode.class.getName());
private final long starttime = Time.now();
private volatile long lastCheckpointTime = 0;
private volatile long lastCheckpointWallclockTime = 0;
private URL fsName;
private CheckpointStorage checkpointImage;
private NamenodeProtocol namenode;
private Configuration conf;
private InetSocketAddress nameNodeAddr;
private volatile boolean shouldRun;
private HttpServer2 infoServer;
private Collection<URI> checkpointDirs;
private List<URI> checkpointEditsDirs;
private CheckpointConf checkpointConf;
private FSNamesystem namesystem;
private Thread checkpointThread;
private ObjectName nameNodeStatusBeanName;
private String legacyOivImageDir;
@Override
public String toString() {
return getClass().getSimpleName() + " Status"
+ "\nName Node Address : " + nameNodeAddr
+ "\nStart Time : " + new Date(starttime)
+ "\nLast Checkpoint : " + (lastCheckpointTime == 0? "--":
new Date(lastCheckpointWallclockTime))
+ " (" + ((Time.monotonicNow() - lastCheckpointTime) / 1000)
+ " seconds ago)"
+ "\nCheckpoint Period : " + checkpointConf.getPeriod() + " seconds"
+ "\nCheckpoint Transactions: " + checkpointConf.getTxnCount()
+ "\nCheckpoint Dirs : " + checkpointDirs
+ "\nCheckpoint Edits Dirs : " + checkpointEditsDirs;
}
@VisibleForTesting
FSImage getFSImage() {
return checkpointImage;
}
@VisibleForTesting
int getMergeErrorCount() {
return checkpointImage.getMergeErrorCount();
}
@VisibleForTesting
public FSNamesystem getFSNamesystem() {
return namesystem;
}
@VisibleForTesting
void setFSImage(CheckpointStorage image) {
this.checkpointImage = image;
}
@VisibleForTesting
NamenodeProtocol getNameNode() {
return namenode;
}
@VisibleForTesting
void setNameNode(NamenodeProtocol namenode) {
this.namenode = namenode;
}
/**
* Create a connection to the primary namenode.
*/
public SecondaryNameNode(Configuration conf) throws IOException {
this(conf, new CommandLineOpts());
}
public SecondaryNameNode(Configuration conf,
CommandLineOpts commandLineOpts) throws IOException {
try {
String nsId = DFSUtil.getSecondaryNameServiceId(conf);
if (HAUtil.isHAEnabled(conf, nsId)) {
throw new IOException(
"Cannot use SecondaryNameNode in an HA cluster." +
" The Standby Namenode will perform checkpointing.");
}
NameNode.initializeGenericKeys(conf, nsId, null);
initialize(conf, commandLineOpts);
} catch (IOException e) {
shutdown();
throw e;
} catch (HadoopIllegalArgumentException e) {
shutdown();
throw e;
}
}
public static InetSocketAddress getHttpAddress(Configuration conf) {
return NetUtils.createSocketAddr(conf.getTrimmed(
DFSConfigKeys.DFS_NAMENODE_SECONDARY_HTTP_ADDRESS_KEY,
DFSConfigKeys.DFS_NAMENODE_SECONDARY_HTTP_ADDRESS_DEFAULT));
}
/**
* Initialize SecondaryNameNode.
*/
private void initialize(final Configuration conf,
CommandLineOpts commandLineOpts) throws IOException {
final InetSocketAddress infoSocAddr = getHttpAddress(conf);
final String infoBindAddress = infoSocAddr.getHostName();
UserGroupInformation.setConfiguration(conf);
if (UserGroupInformation.isSecurityEnabled()) {
SecurityUtil.login(conf,
DFSConfigKeys.DFS_SECONDARY_NAMENODE_KEYTAB_FILE_KEY,
DFSConfigKeys.DFS_SECONDARY_NAMENODE_KERBEROS_PRINCIPAL_KEY, infoBindAddress);
}
// initiate Java VM metrics
DefaultMetricsSystem.initialize("SecondaryNameNode");
JvmMetrics.create("SecondaryNameNode",
conf.get(DFSConfigKeys.DFS_METRICS_SESSION_ID_KEY),
DefaultMetricsSystem.instance());
// Create connection to the namenode.
shouldRun = true;
nameNodeAddr = NameNode.getServiceAddress(conf, true);
this.conf = conf;
this.namenode = NameNodeProxies.createNonHAProxy(conf, nameNodeAddr,
NamenodeProtocol.class, UserGroupInformation.getCurrentUser(),
true).getProxy();
// initialize checkpoint directories
fsName = getInfoServer();
checkpointDirs = FSImage.getCheckpointDirs(conf,
"/tmp/hadoop/dfs/namesecondary");
checkpointEditsDirs = FSImage.getCheckpointEditsDirs(conf,
"/tmp/hadoop/dfs/namesecondary");
checkpointImage = new CheckpointStorage(conf, checkpointDirs, checkpointEditsDirs);
checkpointImage.recoverCreate(commandLineOpts.shouldFormat());
checkpointImage.deleteTempEdits();
namesystem = new FSNamesystem(conf, checkpointImage, true);
// Disable quota checks
namesystem.dir.disableQuotaChecks();
// Initialize other scheduling parameters from the configuration
checkpointConf = new CheckpointConf(conf);
nameNodeStatusBeanName = MBeans.register("SecondaryNameNode",
"SecondaryNameNodeInfo", this);
legacyOivImageDir = conf.get(
DFSConfigKeys.DFS_NAMENODE_LEGACY_OIV_IMAGE_DIR_KEY);
LOG.info("Checkpoint Period :" + checkpointConf.getPeriod() + " secs "
+ "(" + checkpointConf.getPeriod() / 60 + " min)");
LOG.info("Log Size Trigger :" + checkpointConf.getTxnCount() + " txns");
}
/**
* Wait for the service to finish.
* (Normally, it runs forever.)
*/
private void join() {
try {
infoServer.join();
} catch (InterruptedException ie) {
LOG.debug("Exception ", ie);
}
}
/**
* Shut down this instance of the datanode.
* Returns only after shutdown is complete.
*/
public void shutdown() {
shouldRun = false;
if (checkpointThread != null) {
checkpointThread.interrupt();
try {
checkpointThread.join(10000);
} catch (InterruptedException e) {
LOG.info("Interrupted waiting to join on checkpointer thread");
Thread.currentThread().interrupt(); // maintain status
}
}
try {
if (infoServer != null) {
infoServer.stop();
infoServer = null;
}
} catch (Exception e) {
LOG.warn("Exception shutting down SecondaryNameNode", e);
}
if (nameNodeStatusBeanName != null) {
MBeans.unregister(nameNodeStatusBeanName);
nameNodeStatusBeanName = null;
}
try {
if (checkpointImage != null) {
checkpointImage.close();
checkpointImage = null;
}
} catch(IOException e) {
LOG.warn("Exception while closing CheckpointStorage", e);
}
if (namesystem != null) {
namesystem.shutdown();
namesystem = null;
}
}
@Override
public void run() {
SecurityUtil.doAsLoginUserOrFatal(
new PrivilegedAction<Object>() {
@Override
public Object run() {
doWork();
return null;
}
});
}
//
// The main work loop
//
public void doWork() {
//
// Poll the Namenode (once every checkpointCheckPeriod seconds) to find the
// number of transactions in the edit log that haven't yet been checkpointed.
//
long period = checkpointConf.getCheckPeriod();
int maxRetries = checkpointConf.getMaxRetriesOnMergeError();
while (shouldRun) {
try {
Thread.sleep(1000 * period);
} catch (InterruptedException ie) {
// do nothing
}
if (!shouldRun) {
break;
}
try {
// We may have lost our ticket since last checkpoint, log in again, just in case
if(UserGroupInformation.isSecurityEnabled())
UserGroupInformation.getCurrentUser().checkTGTAndReloginFromKeytab();
final long monotonicNow = Time.monotonicNow();
final long now = Time.now();
if (shouldCheckpointBasedOnCount() ||
monotonicNow >= lastCheckpointTime + 1000 * checkpointConf.getPeriod()) {
doCheckpoint();
lastCheckpointTime = monotonicNow;
lastCheckpointWallclockTime = now;
}
} catch (IOException e) {
LOG.error("Exception in doCheckpoint", e);
e.printStackTrace();
// Prevent a huge number of edits from being created due to
// unrecoverable conditions and endless retries.
if (checkpointImage.getMergeErrorCount() > maxRetries) {
LOG.fatal("Merging failed " +
checkpointImage.getMergeErrorCount() + " times.");
terminate(1);
}
} catch (Throwable e) {
LOG.fatal("Throwable Exception in doCheckpoint", e);
e.printStackTrace();
terminate(1, e);
}
}
}
/**
* Download <code>fsimage</code> and <code>edits</code>
* files from the name-node.
* @return true if a new image has been downloaded and needs to be loaded
* @throws IOException
*/
static boolean downloadCheckpointFiles(
final URL nnHostPort,
final FSImage dstImage,
final CheckpointSignature sig,
final RemoteEditLogManifest manifest
) throws IOException {
// Sanity check manifest - these could happen if, eg, someone on the
// NN side accidentally rmed the storage directories
if (manifest.getLogs().isEmpty()) {
throw new IOException("Found no edit logs to download on NN since txid "
+ sig.mostRecentCheckpointTxId);
}
long expectedTxId = sig.mostRecentCheckpointTxId + 1;
if (manifest.getLogs().get(0).getStartTxId() != expectedTxId) {
throw new IOException("Bad edit log manifest (expected txid = " +
expectedTxId + ": " + manifest);
}
try {
Boolean b = UserGroupInformation.getCurrentUser().doAs(
new PrivilegedExceptionAction<Boolean>() {
@Override
public Boolean run() throws Exception {
dstImage.getStorage().cTime = sig.cTime;
// get fsimage
if (sig.mostRecentCheckpointTxId ==
dstImage.getStorage().getMostRecentCheckpointTxId()) {
LOG.info("Image has not changed. Will not download image.");
} else {
LOG.info("Image has changed. Downloading updated image from NN.");
MD5Hash downloadedHash = TransferFsImage.downloadImageToStorage(
nnHostPort, sig.mostRecentCheckpointTxId,
dstImage.getStorage(), true, false);
dstImage.saveDigestAndRenameCheckpointImage(NameNodeFile.IMAGE,
sig.mostRecentCheckpointTxId, downloadedHash);
}
// get edits file
for (RemoteEditLog log : manifest.getLogs()) {
TransferFsImage.downloadEditsToStorage(
nnHostPort, log, dstImage.getStorage());
}
// true if we haven't loaded all the transactions represented by the
// downloaded fsimage.
return dstImage.getLastAppliedTxId() < sig.mostRecentCheckpointTxId;
}
});
return b.booleanValue();
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}
InetSocketAddress getNameNodeAddress() {
return nameNodeAddr;
}
/**
* Returns the Jetty server that the Namenode is listening on.
*/
private URL getInfoServer() throws IOException {
URI fsName = FileSystem.getDefaultUri(conf);
if (!HdfsConstants.HDFS_URI_SCHEME.equalsIgnoreCase(fsName.getScheme())) {
throw new IOException("This is not a DFS");
}
final String scheme = DFSUtil.getHttpClientScheme(conf);
URI address = DFSUtil.getInfoServerWithDefaultHost(fsName.getHost(), conf,
scheme);
LOG.debug("Will connect to NameNode at " + address);
return address.toURL();
}
/**
* Start the web server.
*/
@VisibleForTesting
public void startInfoServer() throws IOException {
final InetSocketAddress httpAddr = getHttpAddress(conf);
final String httpsAddrString = conf.getTrimmed(
DFSConfigKeys.DFS_NAMENODE_SECONDARY_HTTPS_ADDRESS_KEY,
DFSConfigKeys.DFS_NAMENODE_SECONDARY_HTTPS_ADDRESS_DEFAULT);
InetSocketAddress httpsAddr = NetUtils.createSocketAddr(httpsAddrString);
HttpServer2.Builder builder = DFSUtil.httpServerTemplateForNNAndJN(conf,
httpAddr, httpsAddr, "secondary", DFSConfigKeys.
DFS_SECONDARY_NAMENODE_KERBEROS_INTERNAL_SPNEGO_PRINCIPAL_KEY,
DFSConfigKeys.DFS_SECONDARY_NAMENODE_KEYTAB_FILE_KEY);
final boolean xFrameEnabled = conf.getBoolean(
DFSConfigKeys.DFS_XFRAME_OPTION_ENABLED,
DFSConfigKeys.DFS_XFRAME_OPTION_ENABLED_DEFAULT);
final String xFrameOptionValue = conf.getTrimmed(
DFSConfigKeys.DFS_XFRAME_OPTION_VALUE,
DFSConfigKeys.DFS_XFRAME_OPTION_VALUE_DEFAULT);
builder.configureXFrame(xFrameEnabled).setXFrameOption(xFrameOptionValue);
infoServer = builder.build();
infoServer.setAttribute("secondary.name.node", this);
infoServer.setAttribute("name.system.image", checkpointImage);
infoServer.setAttribute(JspHelper.CURRENT_CONF, conf);
infoServer.addInternalServlet("imagetransfer", ImageServlet.PATH_SPEC,
ImageServlet.class, true);
infoServer.start();
LOG.info("Web server init done");
HttpConfig.Policy policy = DFSUtil.getHttpPolicy(conf);
int connIdx = 0;
if (policy.isHttpEnabled()) {
InetSocketAddress httpAddress =
infoServer.getConnectorAddress(connIdx++);
conf.set(DFSConfigKeys.DFS_NAMENODE_SECONDARY_HTTP_ADDRESS_KEY,
NetUtils.getHostPortString(httpAddress));
}
if (policy.isHttpsEnabled()) {
InetSocketAddress httpsAddress =
infoServer.getConnectorAddress(connIdx);
conf.set(DFSConfigKeys.DFS_NAMENODE_SECONDARY_HTTPS_ADDRESS_KEY,
NetUtils.getHostPortString(httpsAddress));
}
}
/**
* Create a new checkpoint
* @return if the image is fetched from primary or not
*/
@VisibleForTesting
@SuppressWarnings("deprecated")
public boolean doCheckpoint() throws IOException {
checkpointImage.ensureCurrentDirExists();
NNStorage dstStorage = checkpointImage.getStorage();
// Tell the namenode to start logging transactions in a new edit file
// Returns a token that would be used to upload the merged image.
CheckpointSignature sig = namenode.rollEditLog();
boolean loadImage = false;
boolean isFreshCheckpointer = (checkpointImage.getNamespaceID() == 0);
boolean isSameCluster =
(dstStorage.versionSupportsFederation(NameNodeLayoutVersion.FEATURES)
&& sig.isSameCluster(checkpointImage)) ||
(!dstStorage.versionSupportsFederation(NameNodeLayoutVersion.FEATURES)
&& sig.namespaceIdMatches(checkpointImage));
if (isFreshCheckpointer ||
(isSameCluster &&
!sig.storageVersionMatches(checkpointImage.getStorage()))) {
// if we're a fresh 2NN, or if we're on the same cluster and our storage
// needs an upgrade, just take the storage info from the server.
dstStorage.setStorageInfo(sig);
dstStorage.setClusterID(sig.getClusterID());
dstStorage.setBlockPoolID(sig.getBlockpoolID());
loadImage = true;
}
sig.validateStorageInfo(checkpointImage);
// error simulation code for junit test
CheckpointFaultInjector.getInstance().afterSecondaryCallsRollEditLog();
RemoteEditLogManifest manifest =
namenode.getEditLogManifest(sig.mostRecentCheckpointTxId + 1);
// Fetch fsimage and edits. Reload the image if previous merge failed.
loadImage |= downloadCheckpointFiles(
fsName, checkpointImage, sig, manifest) |
checkpointImage.hasMergeError();
try {
doMerge(sig, manifest, loadImage, checkpointImage, namesystem);
} catch (IOException ioe) {
// A merge error occurred. The in-memory file system state may be
// inconsistent, so the image and edits need to be reloaded.
checkpointImage.setMergeError();
throw ioe;
}
// Clear any error since merge was successful.
checkpointImage.clearMergeError();
//
// Upload the new image into the NameNode. Then tell the Namenode
// to make this new uploaded image as the most current image.
//
long txid = checkpointImage.getLastAppliedTxId();
TransferFsImage.uploadImageFromStorage(fsName, conf, dstStorage,
NameNodeFile.IMAGE, txid);
// error simulation code for junit test
CheckpointFaultInjector.getInstance().afterSecondaryUploadsNewImage();
LOG.warn("Checkpoint done. New Image Size: "
+ dstStorage.getFsImageName(txid).length());
if (legacyOivImageDir != null && !legacyOivImageDir.isEmpty()) {
try {
checkpointImage.saveLegacyOIVImage(namesystem, legacyOivImageDir,
new Canceler());
} catch (IOException e) {
LOG.warn("Failed to write legacy OIV image: ", e);
}
}
return loadImage;
}
/**
* @param opts The parameters passed to this program.
* @exception Exception if the filesystem does not exist.
* @return 0 on success, non zero on error.
*/
private int processStartupCommand(CommandLineOpts opts) throws Exception {
if (opts.getCommand() == null) {
return 0;
}
String cmd = StringUtils.toLowerCase(opts.getCommand().toString());
int exitCode = 0;
try {
switch (opts.getCommand()) {
case CHECKPOINT:
long count = countUncheckpointedTxns();
if (count > checkpointConf.getTxnCount() ||
opts.shouldForceCheckpoint()) {
doCheckpoint();
} else {
System.err.println("EditLog size " + count + " transactions is " +
"smaller than configured checkpoint " +
"interval " + checkpointConf.getTxnCount() + " transactions.");
System.err.println("Skipping checkpoint.");
}
break;
case GETEDITSIZE:
long uncheckpointed = countUncheckpointedTxns();
System.out.println("NameNode has " + uncheckpointed +
" uncheckpointed transactions");
break;
default:
throw new AssertionError("bad command enum: " + opts.getCommand());
}
} catch (RemoteException e) {
//
// This is a error returned by hadoop server. Print
// out the first line of the error mesage, ignore the stack trace.
exitCode = 1;
try {
String[] content;
content = e.getLocalizedMessage().split("\n");
LOG.error(cmd + ": " + content[0]);
} catch (Exception ex) {
LOG.error(cmd + ": " + ex.getLocalizedMessage());
}
} catch (IOException e) {
//
// IO exception encountered locally.
//
exitCode = 1;
LOG.error(cmd + ": " + e.getLocalizedMessage());
} finally {
// Does the RPC connection need to be closed?
}
return exitCode;
}
private long countUncheckpointedTxns() throws IOException {
long curTxId = namenode.getTransactionID();
long uncheckpointedTxns = curTxId -
checkpointImage.getStorage().getMostRecentCheckpointTxId();
assert uncheckpointedTxns >= 0;
return uncheckpointedTxns;
}
boolean shouldCheckpointBasedOnCount() throws IOException {
return countUncheckpointedTxns() >= checkpointConf.getTxnCount();
}
/**
* main() has some simple utility methods.
* @param argv Command line parameters.
* @exception Exception if the filesystem does not exist.
*/
public static void main(String[] argv) throws Exception {
CommandLineOpts opts = SecondaryNameNode.parseArgs(argv);
if (opts == null) {
LOG.fatal("Failed to parse options");
terminate(1);
} else if (opts.shouldPrintHelp()) {
opts.usage();
System.exit(0);
}
try {
StringUtils.startupShutdownMessage(SecondaryNameNode.class, argv, LOG);
Configuration tconf = new HdfsConfiguration();
SecondaryNameNode secondary = null;
secondary = new SecondaryNameNode(tconf, opts);
// SecondaryNameNode can be started in 2 modes:
// 1. run a command (i.e. checkpoint or geteditsize) then terminate
// 2. run as a daemon when {@link #parseArgs} yields no commands
if (opts != null && opts.getCommand() != null) {
// mode 1
int ret = secondary.processStartupCommand(opts);
terminate(ret);
} else {
// mode 2
secondary.startInfoServer();
secondary.startCheckpointThread();
secondary.join();
}
} catch (Throwable e) {
LOG.fatal("Failed to start secondary namenode", e);
terminate(1);
}
}
public void startCheckpointThread() {
Preconditions.checkState(checkpointThread == null,
"Should not already have a thread");
Preconditions.checkState(shouldRun, "shouldRun should be true");
checkpointThread = new Daemon(this);
checkpointThread.start();
}
@Override // SecondaryNameNodeInfoMXBean
public String getHostAndPort() {
return NetUtils.getHostPortString(nameNodeAddr);
}
@Override // SecondaryNameNodeInfoMXBean
public long getStartTime() {
return starttime;
}
@Override // SecondaryNameNodeInfoMXBean
public long getLastCheckpointTime() {
return lastCheckpointWallclockTime;
}
@Override // SecondaryNameNodeInfoMXBean
public long getLastCheckpointDeltaMs() {
if (lastCheckpointTime == 0) {
return -1;
} else {
return (Time.monotonicNow() - lastCheckpointTime);
}
}
@Override // SecondaryNameNodeInfoMXBean
public String[] getCheckpointDirectories() {
ArrayList<String> r = Lists.newArrayListWithCapacity(checkpointDirs.size());
for (URI d : checkpointDirs) {
r.add(d.toString());
}
return r.toArray(new String[r.size()]);
}
@Override // SecondaryNameNodeInfoMXBean
public String[] getCheckpointEditlogDirectories() {
ArrayList<String> r = Lists.newArrayListWithCapacity(checkpointEditsDirs.size());
for (URI d : checkpointEditsDirs) {
r.add(d.toString());
}
return r.toArray(new String[r.size()]);
}
@Override // VersionInfoMXBean
public String getCompileInfo() {
return VersionInfo.getDate() + " by " + VersionInfo.getUser() +
" from " + VersionInfo.getBranch();
}
@Override // VersionInfoMXBean
public String getSoftwareVersion() {
return VersionInfo.getVersion();
}
/**
* Container for parsed command-line options.
*/
@SuppressWarnings("static-access")
static class CommandLineOpts {
private final Options options = new Options();
private final Option geteditsizeOpt;
private final Option checkpointOpt;
private final Option formatOpt;
private final Option helpOpt;
Command cmd;
enum Command {
GETEDITSIZE,
CHECKPOINT;
}
private boolean shouldForce;
private boolean shouldFormat;
private boolean shouldPrintHelp;
CommandLineOpts() {
geteditsizeOpt = new Option("geteditsize",
"return the number of uncheckpointed transactions on the NameNode");
checkpointOpt = OptionBuilder.withArgName("force")
.hasOptionalArg().withDescription("checkpoint on startup").create("checkpoint");;
formatOpt = new Option("format", "format the local storage during startup");
helpOpt = new Option("h", "help", false, "get help information");
options.addOption(geteditsizeOpt);
options.addOption(checkpointOpt);
options.addOption(formatOpt);
options.addOption(helpOpt);
}
public boolean shouldFormat() {
return shouldFormat;
}
public boolean shouldPrintHelp() {
return shouldPrintHelp;
}
public void parse(String ... argv) throws ParseException {
CommandLineParser parser = new PosixParser();
CommandLine cmdLine = parser.parse(options, argv);
if (cmdLine.hasOption(helpOpt.getOpt())
|| cmdLine.hasOption(helpOpt.getLongOpt())) {
shouldPrintHelp = true;
return;
}
boolean hasGetEdit = cmdLine.hasOption(geteditsizeOpt.getOpt());
boolean hasCheckpoint = cmdLine.hasOption(checkpointOpt.getOpt());
if (hasGetEdit && hasCheckpoint) {
throw new ParseException("May not pass both "
+ geteditsizeOpt.getOpt() + " and "
+ checkpointOpt.getOpt());
}
if (hasGetEdit) {
cmd = Command.GETEDITSIZE;
} else if (hasCheckpoint) {
cmd = Command.CHECKPOINT;
String arg = cmdLine.getOptionValue(checkpointOpt.getOpt());
if ("force".equals(arg)) {
shouldForce = true;
} else if (arg != null) {
throw new ParseException("-checkpoint may only take 'force' as an "
+ "argument");
}
}
if (cmdLine.hasOption(formatOpt.getOpt())) {
shouldFormat = true;
}
}
public Command getCommand() {
return cmd;
}
public boolean shouldForceCheckpoint() {
return shouldForce;
}
void usage() {
String header = "The Secondary NameNode is a helper "
+ "to the primary NameNode. The Secondary is responsible "
+ "for supporting periodic checkpoints of the HDFS metadata. "
+ "The current design allows only one Secondary NameNode "
+ "per HDFS cluster.";
HelpFormatter formatter = new HelpFormatter();
formatter.printHelp("secondarynamenode", header, options, "", false);
}
}
private static CommandLineOpts parseArgs(String[] argv) {
CommandLineOpts opts = new CommandLineOpts();
try {
opts.parse(argv);
} catch (ParseException pe) {
LOG.error(pe.getMessage());
opts.usage();
return null;
}
return opts;
}
static class CheckpointStorage extends FSImage {
private int mergeErrorCount;
private static class CheckpointLogPurger implements LogsPurgeable {
private final NNStorage storage;
private final StoragePurger purger
= new NNStorageRetentionManager.DeletionStoragePurger();
public CheckpointLogPurger(NNStorage storage) {
this.storage = storage;
}
@Override
public void purgeLogsOlderThan(long minTxIdToKeep) throws IOException {
Iterator<StorageDirectory> iter = storage.dirIterator();
while (iter.hasNext()) {
StorageDirectory dir = iter.next();
List<EditLogFile> editFiles = FileJournalManager.matchEditLogs(
dir.getCurrentDir());
for (EditLogFile f : editFiles) {
if (f.getLastTxId() < minTxIdToKeep) {
purger.purgeLog(f);
}
}
}
}
@Override
public void selectInputStreams(Collection<EditLogInputStream> streams,
long fromTxId, boolean inProgressOk, boolean onlyDurableTxns) {
Iterator<StorageDirectory> iter = storage.dirIterator();
while (iter.hasNext()) {
StorageDirectory dir = iter.next();
List<EditLogFile> editFiles;
try {
editFiles = FileJournalManager.matchEditLogs(
dir.getCurrentDir());
} catch (IOException ioe) {
throw new RuntimeException(ioe);
}
FileJournalManager.addStreamsToCollectionFromFiles(editFiles, streams,
fromTxId, Long.MAX_VALUE, inProgressOk);
}
}
}
/**
* Construct a checkpoint image.
* @param conf Node configuration.
* @param imageDirs URIs of storage for image.
* @param editsDirs URIs of storage for edit logs.
* @throws IOException If storage cannot be access.
*/
CheckpointStorage(Configuration conf,
Collection<URI> imageDirs,
List<URI> editsDirs) throws IOException {
super(conf, imageDirs, editsDirs);
// the 2NN never writes edits -- it only downloads them. So
// we shouldn't have any editLog instance. Setting to null
// makes sure we don't accidentally depend on it.
editLog = null;
mergeErrorCount = 0;
// Replace the archival manager with one that can actually work on the
// 2NN's edits storage.
this.archivalManager = new NNStorageRetentionManager(conf, storage,
new CheckpointLogPurger(storage));
}
/**
* Analyze checkpoint directories.
* Create directories if they do not exist.
* Recover from an unsuccessful checkpoint if necessary.
*
* @throws IOException
*/
void recoverCreate(boolean format) throws IOException {
storage.attemptRestoreRemovedStorage();
storage.unlockAll();
for (Iterator<StorageDirectory> it =
storage.dirIterator(); it.hasNext();) {
StorageDirectory sd = it.next();
boolean isAccessible = true;
try { // create directories if don't exist yet
if(!sd.getRoot().mkdirs()) {
// do nothing, directory is already created
}
} catch(SecurityException se) {
isAccessible = false;
}
if(!isAccessible)
throw new InconsistentFSStateException(sd.getRoot(),
"cannot access checkpoint directory.");
if (format) {
// Don't confirm, since this is just the secondary namenode.
LOG.info("Formatting storage directory " + sd);
sd.clearDirectory();
}
StorageState curState;
try {
curState = sd.analyzeStorage(HdfsServerConstants.StartupOption.REGULAR, storage);
// sd is locked but not opened
switch(curState) {
case NON_EXISTENT:
// fail if any of the configured checkpoint dirs are inaccessible
throw new InconsistentFSStateException(sd.getRoot(),
"checkpoint directory does not exist or is not accessible.");
case NOT_FORMATTED:
break; // it's ok since initially there is no current and VERSION
case NORMAL:
// Read the VERSION file. This verifies that:
// (a) the VERSION file for each of the directories is the same,
// and (b) when we connect to a NN, we can verify that the remote
// node matches the same namespace that we ran on previously.
storage.readProperties(sd);
break;
default: // recovery is possible
sd.doRecover(curState);
}
} catch (IOException ioe) {
sd.unlock();
throw ioe;
}
}
}
boolean hasMergeError() {
return (mergeErrorCount > 0);
}
int getMergeErrorCount() {
return mergeErrorCount;
}
void setMergeError() {
mergeErrorCount++;
}
void clearMergeError() {
mergeErrorCount = 0;
}
/**
* Ensure that the current/ directory exists in all storage
* directories
*/
void ensureCurrentDirExists() throws IOException {
for (Iterator<StorageDirectory> it
= storage.dirIterator(); it.hasNext();) {
StorageDirectory sd = it.next();
File curDir = sd.getCurrentDir();
if (!curDir.exists() && !curDir.mkdirs()) {
throw new IOException("Could not create directory " + curDir);
}
}
}
void deleteTempEdits() throws IOException {
FilenameFilter filter = new FilenameFilter() {
@Override
public boolean accept(File dir, String name) {
return name.matches(NameNodeFile.EDITS_TMP.getName()
+ "_(\\d+)-(\\d+)_(\\d+)");
}
};
Iterator<StorageDirectory> it = storage.dirIterator(NameNodeDirType.EDITS);
for (;it.hasNext();) {
StorageDirectory dir = it.next();
File[] tempEdits = dir.getCurrentDir().listFiles(filter);
if (tempEdits != null) {
for (File t : tempEdits) {
boolean success = t.delete();
if (!success) {
LOG.warn("Failed to delete temporary edits file: "
+ t.getAbsolutePath());
}
}
}
}
}
}
void doMerge(
CheckpointSignature sig, RemoteEditLogManifest manifest,
boolean loadImage, FSImage dstImage, FSNamesystem dstNamesystem)
throws IOException {
NNStorage dstStorage = dstImage.getStorage();
dstStorage.setStorageInfo(sig);
if (loadImage) {
File file = dstStorage.findImageFile(NameNodeFile.IMAGE,
sig.mostRecentCheckpointTxId);
if (file == null) {
throw new IOException("Couldn't find image file at txid " +
sig.mostRecentCheckpointTxId + " even though it should have " +
"just been downloaded");
}
dstNamesystem.writeLock();
try {
dstImage.reloadFromImageFile(file, dstNamesystem);
} finally {
dstNamesystem.writeUnlock();
}
dstNamesystem.imageLoadComplete();
}
// error simulation code for junit test
CheckpointFaultInjector.getInstance().duringMerge();
Checkpointer.rollForwardByApplyingLogs(manifest, dstImage, dstNamesystem);
// The following has the side effect of purging old fsimages/edit logs.
dstImage.saveFSImageInAllDirs(dstNamesystem, dstImage.getLastAppliedTxId());
if (!namenode.isRollingUpgrade()) {
dstImage.updateStorageVersion();
}
}
}