| /** |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| package org.apache.hadoop.hdfs.server.namenode; |
| |
| import java.io.File; |
| import java.io.IOException; |
| import java.net.InetAddress; |
| import java.net.InetSocketAddress; |
| import java.net.URI; |
| import java.util.ArrayList; |
| import java.util.Collection; |
| import java.util.Date; |
| import java.util.Iterator; |
| |
| import org.apache.commons.logging.Log; |
| import org.apache.commons.logging.LogFactory; |
| import org.apache.hadoop.conf.Configuration; |
| import org.apache.hadoop.fs.FileSystem; |
| import org.apache.hadoop.hdfs.DFSUtil.ErrorSimulator; |
| import org.apache.hadoop.hdfs.protocol.FSConstants; |
| import org.apache.hadoop.hdfs.server.common.HdfsConstants; |
| import org.apache.hadoop.hdfs.server.common.InconsistentFSStateException; |
| import org.apache.hadoop.hdfs.server.namenode.FSImage.NameNodeDirType; |
| import org.apache.hadoop.hdfs.server.namenode.FSImage.NameNodeFile; |
| import org.apache.hadoop.hdfs.server.protocol.NamenodeProtocol; |
| import org.apache.hadoop.hdfs.DFSConfigKeys; |
| import org.apache.hadoop.hdfs.HdfsConfiguration; |
| import org.apache.hadoop.http.HttpServer; |
| import org.apache.hadoop.ipc.RPC; |
| import org.apache.hadoop.ipc.RemoteException; |
| import org.apache.hadoop.metrics.jvm.JvmMetrics; |
| import org.apache.hadoop.net.NetUtils; |
| import org.apache.hadoop.util.Daemon; |
| import org.apache.hadoop.util.StringUtils; |
| |
| /********************************************************** |
| * The Secondary NameNode is a helper to the primary NameNode. |
| * The Secondary is responsible for supporting periodic checkpoints |
| * of the HDFS metadata. The current design allows only one Secondary |
| * NameNode per HDFs cluster. |
| * |
| * The Secondary NameNode is a daemon that periodically wakes |
| * up (determined by the schedule specified in the configuration), |
| * triggers a periodic checkpoint and then goes back to sleep. |
| * The Secondary NameNode uses the ClientProtocol to talk to the |
| * primary NameNode. |
| * |
| **********************************************************/ |
| @Deprecated // use BackupNode with -checkpoint argument instead. |
| public class SecondaryNameNode implements Runnable { |
| |
| public static final Log LOG = |
| LogFactory.getLog(SecondaryNameNode.class.getName()); |
| |
| private final long starttime = System.currentTimeMillis(); |
| private volatile long lastCheckpointTime = 0; |
| |
| private String fsName; |
| private CheckpointStorage checkpointImage; |
| |
| private NamenodeProtocol namenode; |
| private Configuration conf; |
| private InetSocketAddress nameNodeAddr; |
| private volatile boolean shouldRun; |
| private HttpServer infoServer; |
| private int infoPort; |
| private String infoBindAddress; |
| |
| private Collection<URI> checkpointDirs; |
| private Collection<URI> checkpointEditsDirs; |
| private long checkpointPeriod; // in seconds |
| private long checkpointSize; // size (in MB) of current Edit Log |
| |
| /** {@inheritDoc} */ |
| public String toString() { |
| return getClass().getSimpleName() + " Status" |
| + "\nName Node Address : " + nameNodeAddr |
| + "\nStart Time : " + new Date(starttime) |
| + "\nLast Checkpoint Time : " + (lastCheckpointTime == 0? "--": new Date(lastCheckpointTime)) |
| + "\nCheckpoint Period : " + checkpointPeriod + " seconds" |
| + "\nCheckpoint Size : " + checkpointSize + " MB" |
| + "\nCheckpoint Dirs : " + checkpointDirs |
| + "\nCheckpoint Edits Dirs: " + checkpointEditsDirs; |
| } |
| |
| FSImage getFSImage() { |
| return checkpointImage; |
| } |
| |
| /** |
| * Create a connection to the primary namenode. |
| */ |
| public SecondaryNameNode(Configuration conf) throws IOException { |
| try { |
| initialize(conf); |
| } catch(IOException e) { |
| shutdown(); |
| throw e; |
| } |
| } |
| |
| /** |
| * Initialize SecondaryNameNode. |
| */ |
| private void initialize(Configuration conf) throws IOException { |
| // initiate Java VM metrics |
| JvmMetrics.init("SecondaryNameNode", conf.get(DFSConfigKeys.DFS_METRICS_SESSION_ID_KEY)); |
| |
| // Create connection to the namenode. |
| shouldRun = true; |
| nameNodeAddr = NameNode.getAddress(conf); |
| |
| this.conf = conf; |
| this.namenode = |
| (NamenodeProtocol) RPC.waitForProxy(NamenodeProtocol.class, |
| NamenodeProtocol.versionID, nameNodeAddr, conf); |
| |
| // initialize checkpoint directories |
| fsName = getInfoServer(); |
| checkpointDirs = FSImage.getCheckpointDirs(conf, |
| "/tmp/hadoop/dfs/namesecondary"); |
| checkpointEditsDirs = FSImage.getCheckpointEditsDirs(conf, |
| "/tmp/hadoop/dfs/namesecondary"); |
| checkpointImage = new CheckpointStorage(); |
| checkpointImage.recoverCreate(checkpointDirs, checkpointEditsDirs); |
| |
| // Initialize other scheduling parameters from the configuration |
| checkpointPeriod = conf.getLong(DFSConfigKeys.DFS_NAMENODE_CHECKPOINT_PERIOD_KEY, |
| DFSConfigKeys.DFS_NAMENODE_CHECKPOINT_PERIOD_DEFAULT); |
| checkpointSize = conf.getLong(DFSConfigKeys.DFS_NAMENODE_CHECKPOINT_SIZE_KEY, |
| DFSConfigKeys.DFS_NAMENODE_CHECKPOINT_SIZE_DEFAULT); |
| |
| // initialize the webserver for uploading files. |
| InetSocketAddress infoSocAddr = NetUtils.createSocketAddr( |
| conf.get(DFSConfigKeys.DFS_NAMENODE_SECONDARY_HTTP_ADDRESS_KEY, |
| DFSConfigKeys.DFS_NAMENODE_SECONDARY_HTTP_ADDRESS_DEFAULT)); |
| infoBindAddress = infoSocAddr.getHostName(); |
| int tmpInfoPort = infoSocAddr.getPort(); |
| infoServer = new HttpServer("secondary", infoBindAddress, tmpInfoPort, |
| tmpInfoPort == 0, conf); |
| infoServer.setAttribute("secondary.name.node", this); |
| infoServer.setAttribute("name.system.image", checkpointImage); |
| this.infoServer.setAttribute("name.conf", conf); |
| infoServer.addInternalServlet("getimage", "/getimage", GetImageServlet.class); |
| infoServer.start(); |
| |
| // The web-server port can be ephemeral... ensure we have the correct info |
| infoPort = infoServer.getPort(); |
| conf.set(DFSConfigKeys.DFS_NAMENODE_SECONDARY_HTTP_ADDRESS_KEY, infoBindAddress + ":" +infoPort); |
| LOG.info("Secondary Web-server up at: " + infoBindAddress + ":" +infoPort); |
| LOG.warn("Checkpoint Period :" + checkpointPeriod + " secs " + |
| "(" + checkpointPeriod/60 + " min)"); |
| LOG.warn("Log Size Trigger :" + checkpointSize + " bytes " + |
| "(" + checkpointSize/1024 + " KB)"); |
| } |
| |
| /** |
| * Shut down this instance of the datanode. |
| * Returns only after shutdown is complete. |
| */ |
| public void shutdown() { |
| shouldRun = false; |
| try { |
| if (infoServer != null) infoServer.stop(); |
| } catch (Exception e) { |
| LOG.warn("Exception shutting down SecondaryNameNode", e); |
| } |
| try { |
| if (checkpointImage != null) checkpointImage.close(); |
| } catch(IOException e) { |
| LOG.warn(StringUtils.stringifyException(e)); |
| } |
| } |
| |
| // |
| // The main work loop |
| // |
| public void run() { |
| |
| // |
| // Poll the Namenode (once every 5 minutes) to find the size of the |
| // pending edit log. |
| // |
| long period = 5 * 60; // 5 minutes |
| if (checkpointPeriod < period) { |
| period = checkpointPeriod; |
| } |
| |
| while (shouldRun) { |
| try { |
| Thread.sleep(1000 * period); |
| } catch (InterruptedException ie) { |
| // do nothing |
| } |
| if (!shouldRun) { |
| break; |
| } |
| try { |
| long now = System.currentTimeMillis(); |
| |
| long size = namenode.getEditLogSize(); |
| if (size >= checkpointSize || |
| now >= lastCheckpointTime + 1000 * checkpointPeriod) { |
| doCheckpoint(); |
| lastCheckpointTime = now; |
| } |
| } catch (IOException e) { |
| LOG.error("Exception in doCheckpoint: "); |
| LOG.error(StringUtils.stringifyException(e)); |
| e.printStackTrace(); |
| } catch (Throwable e) { |
| LOG.error("Throwable Exception in doCheckpoint: "); |
| LOG.error(StringUtils.stringifyException(e)); |
| e.printStackTrace(); |
| Runtime.getRuntime().exit(-1); |
| } |
| } |
| } |
| |
| /** |
| * Download <code>fsimage</code> and <code>edits</code> |
| * files from the name-node. |
| * @throws IOException |
| */ |
| private void downloadCheckpointFiles(CheckpointSignature sig |
| ) throws IOException { |
| |
| checkpointImage.cTime = sig.cTime; |
| checkpointImage.checkpointTime = sig.checkpointTime; |
| |
| // get fsimage |
| String fileid = "getimage=1"; |
| Collection<File> list = checkpointImage.getFiles(NameNodeFile.IMAGE, |
| NameNodeDirType.IMAGE); |
| File[] srcNames = list.toArray(new File[list.size()]); |
| assert srcNames.length > 0 : "No checkpoint targets."; |
| TransferFsImage.getFileClient(fsName, fileid, srcNames); |
| LOG.info("Downloaded file " + srcNames[0].getName() + " size " + |
| srcNames[0].length() + " bytes."); |
| |
| // get edits file |
| fileid = "getedit=1"; |
| list = getFSImage().getFiles(NameNodeFile.EDITS, NameNodeDirType.EDITS); |
| srcNames = list.toArray(new File[list.size()]);; |
| assert srcNames.length > 0 : "No checkpoint targets."; |
| TransferFsImage.getFileClient(fsName, fileid, srcNames); |
| LOG.info("Downloaded file " + srcNames[0].getName() + " size " + |
| srcNames[0].length() + " bytes."); |
| |
| checkpointImage.checkpointUploadDone(); |
| } |
| |
| /** |
| * Copy the new fsimage into the NameNode |
| */ |
| private void putFSImage(CheckpointSignature sig) throws IOException { |
| String fileid = "putimage=1&port=" + infoPort + |
| "&machine=" + |
| InetAddress.getLocalHost().getHostAddress() + |
| "&token=" + sig.toString(); |
| LOG.info("Posted URL " + fsName + fileid); |
| TransferFsImage.getFileClient(fsName, fileid, (File[])null); |
| } |
| |
| /** |
| * Returns the Jetty server that the Namenode is listening on. |
| */ |
| private String getInfoServer() throws IOException { |
| URI fsName = FileSystem.getDefaultUri(conf); |
| if (!FSConstants.HDFS_URI_SCHEME.equalsIgnoreCase(fsName.getScheme())) { |
| throw new IOException("This is not a DFS"); |
| } |
| String configuredAddress = conf.get(DFSConfigKeys.DFS_NAMENODE_HTTP_ADDRESS_KEY, |
| DFSConfigKeys.DFS_NAMENODE_HTTP_ADDRESS_DEFAULT); |
| InetSocketAddress sockAddr = NetUtils.createSocketAddr(configuredAddress); |
| if (sockAddr.getAddress().isAnyLocalAddress()) { |
| return fsName.getHost() + ":" + sockAddr.getPort(); |
| } else { |
| return configuredAddress; |
| } |
| } |
| |
| /** |
| * Create a new checkpoint |
| */ |
| void doCheckpoint() throws IOException { |
| |
| // Do the required initialization of the merge work area. |
| startCheckpoint(); |
| |
| // Tell the namenode to start logging transactions in a new edit file |
| // Returns a token that would be used to upload the merged image. |
| CheckpointSignature sig = namenode.rollEditLog(); |
| |
| // error simulation code for junit test |
| if (ErrorSimulator.getErrorSimulation(0)) { |
| throw new IOException("Simulating error0 " + |
| "after creating edits.new"); |
| } |
| |
| downloadCheckpointFiles(sig); // Fetch fsimage and edits |
| doMerge(sig); // Do the merge |
| |
| // |
| // Upload the new image into the NameNode. Then tell the Namenode |
| // to make this new uploaded image as the most current image. |
| // |
| putFSImage(sig); |
| |
| // error simulation code for junit test |
| if (ErrorSimulator.getErrorSimulation(1)) { |
| throw new IOException("Simulating error1 " + |
| "after uploading new image to NameNode"); |
| } |
| |
| namenode.rollFsImage(); |
| checkpointImage.endCheckpoint(); |
| |
| LOG.warn("Checkpoint done. New Image Size: " |
| + checkpointImage.getFsImageName().length()); |
| } |
| |
| private void startCheckpoint() throws IOException { |
| checkpointImage.unlockAll(); |
| checkpointImage.getEditLog().close(); |
| checkpointImage.recoverCreate(checkpointDirs, checkpointEditsDirs); |
| checkpointImage.startCheckpoint(); |
| } |
| |
| /** |
| * Merge downloaded image and edits and write the new image into |
| * current storage directory. |
| */ |
| private void doMerge(CheckpointSignature sig) throws IOException { |
| FSNamesystem namesystem = |
| new FSNamesystem(checkpointImage, conf); |
| assert namesystem.dir.fsImage == checkpointImage; |
| checkpointImage.doMerge(sig); |
| } |
| |
| /** |
| * @param argv The parameters passed to this program. |
| * @exception Exception if the filesystem does not exist. |
| * @return 0 on success, non zero on error. |
| */ |
| private int processArgs(String[] argv) throws Exception { |
| |
| if (argv.length < 1) { |
| printUsage(""); |
| return -1; |
| } |
| |
| int exitCode = -1; |
| int i = 0; |
| String cmd = argv[i++]; |
| |
| // |
| // verify that we have enough command line parameters |
| // |
| if ("-geteditsize".equals(cmd)) { |
| if (argv.length != 1) { |
| printUsage(cmd); |
| return exitCode; |
| } |
| } else if ("-checkpoint".equals(cmd)) { |
| if (argv.length != 1 && argv.length != 2) { |
| printUsage(cmd); |
| return exitCode; |
| } |
| if (argv.length == 2 && !"force".equals(argv[i])) { |
| printUsage(cmd); |
| return exitCode; |
| } |
| } |
| |
| exitCode = 0; |
| try { |
| if ("-checkpoint".equals(cmd)) { |
| long size = namenode.getEditLogSize(); |
| if (size >= checkpointSize || |
| argv.length == 2 && "force".equals(argv[i])) { |
| doCheckpoint(); |
| } else { |
| System.err.println("EditLog size " + size + " bytes is " + |
| "smaller than configured checkpoint " + |
| "size " + checkpointSize + " bytes."); |
| System.err.println("Skipping checkpoint."); |
| } |
| } else if ("-geteditsize".equals(cmd)) { |
| long size = namenode.getEditLogSize(); |
| System.out.println("EditLog size is " + size + " bytes"); |
| } else { |
| exitCode = -1; |
| LOG.error(cmd.substring(1) + ": Unknown command"); |
| printUsage(""); |
| } |
| } catch (RemoteException e) { |
| // |
| // This is a error returned by hadoop server. Print |
| // out the first line of the error mesage, ignore the stack trace. |
| exitCode = -1; |
| try { |
| String[] content; |
| content = e.getLocalizedMessage().split("\n"); |
| LOG.error(cmd.substring(1) + ": " |
| + content[0]); |
| } catch (Exception ex) { |
| LOG.error(cmd.substring(1) + ": " |
| + ex.getLocalizedMessage()); |
| } |
| } catch (IOException e) { |
| // |
| // IO exception encountered locally. |
| // |
| exitCode = -1; |
| LOG.error(cmd.substring(1) + ": " |
| + e.getLocalizedMessage()); |
| } finally { |
| // Does the RPC connection need to be closed? |
| } |
| return exitCode; |
| } |
| |
| /** |
| * Displays format of commands. |
| * @param cmd The command that is being executed. |
| */ |
| private void printUsage(String cmd) { |
| if ("-geteditsize".equals(cmd)) { |
| System.err.println("Usage: java SecondaryNameNode" |
| + " [-geteditsize]"); |
| } else if ("-checkpoint".equals(cmd)) { |
| System.err.println("Usage: java SecondaryNameNode" |
| + " [-checkpoint [force]]"); |
| } else { |
| System.err.println("Usage: java SecondaryNameNode " + |
| "[-checkpoint [force]] " + |
| "[-geteditsize] "); |
| } |
| } |
| |
| /** |
| * main() has some simple utility methods. |
| * @param argv Command line parameters. |
| * @exception Exception if the filesystem does not exist. |
| */ |
| public static void main(String[] argv) throws Exception { |
| StringUtils.startupShutdownMessage(SecondaryNameNode.class, argv, LOG); |
| Configuration tconf = new HdfsConfiguration(); |
| if (argv.length >= 1) { |
| SecondaryNameNode secondary = new SecondaryNameNode(tconf); |
| int ret = secondary.processArgs(argv); |
| System.exit(ret); |
| } |
| |
| // Create a never ending deamon |
| Daemon checkpointThread = new Daemon(new SecondaryNameNode(tconf)); |
| checkpointThread.start(); |
| } |
| |
| static class CheckpointStorage extends FSImage { |
| /** |
| */ |
| CheckpointStorage() throws IOException { |
| super(); |
| } |
| |
| @Override |
| public |
| boolean isConversionNeeded(StorageDirectory sd) { |
| return false; |
| } |
| |
| /** |
| * Analyze checkpoint directories. |
| * Create directories if they do not exist. |
| * Recover from an unsuccessful checkpoint is necessary. |
| * |
| * @param dataDirs |
| * @param editsDirs |
| * @throws IOException |
| */ |
| void recoverCreate(Collection<URI> dataDirs, |
| Collection<URI> editsDirs) throws IOException { |
| Collection<URI> tempDataDirs = new ArrayList<URI>(dataDirs); |
| Collection<URI> tempEditsDirs = new ArrayList<URI>(editsDirs); |
| this.storageDirs = new ArrayList<StorageDirectory>(); |
| setStorageDirectories(tempDataDirs, tempEditsDirs); |
| for (Iterator<StorageDirectory> it = |
| dirIterator(); it.hasNext();) { |
| StorageDirectory sd = it.next(); |
| boolean isAccessible = true; |
| try { // create directories if don't exist yet |
| if(!sd.getRoot().mkdirs()) { |
| // do nothing, directory is already created |
| } |
| } catch(SecurityException se) { |
| isAccessible = false; |
| } |
| if(!isAccessible) |
| throw new InconsistentFSStateException(sd.getRoot(), |
| "cannot access checkpoint directory."); |
| StorageState curState; |
| try { |
| curState = sd.analyzeStorage(HdfsConstants.StartupOption.REGULAR); |
| // sd is locked but not opened |
| switch(curState) { |
| case NON_EXISTENT: |
| // fail if any of the configured checkpoint dirs are inaccessible |
| throw new InconsistentFSStateException(sd.getRoot(), |
| "checkpoint directory does not exist or is not accessible."); |
| case NOT_FORMATTED: |
| break; // it's ok since initially there is no current and VERSION |
| case NORMAL: |
| break; |
| default: // recovery is possible |
| sd.doRecover(curState); |
| } |
| } catch (IOException ioe) { |
| sd.unlock(); |
| throw ioe; |
| } |
| } |
| } |
| |
| /** |
| * Prepare directories for a new checkpoint. |
| * <p> |
| * Rename <code>current</code> to <code>lastcheckpoint.tmp</code> |
| * and recreate <code>current</code>. |
| * @throws IOException |
| */ |
| void startCheckpoint() throws IOException { |
| for(StorageDirectory sd : storageDirs) { |
| moveCurrent(sd); |
| } |
| } |
| |
| void endCheckpoint() throws IOException { |
| for(StorageDirectory sd : storageDirs) { |
| moveLastCheckpoint(sd); |
| } |
| } |
| |
| /** |
| * Merge image and edits, and verify consistency with the signature. |
| */ |
| private void doMerge(CheckpointSignature sig) throws IOException { |
| getEditLog().open(); |
| StorageDirectory sdName = null; |
| StorageDirectory sdEdits = null; |
| Iterator<StorageDirectory> it = null; |
| it = dirIterator(NameNodeDirType.IMAGE); |
| if (it.hasNext()) |
| sdName = it.next(); |
| it = dirIterator(NameNodeDirType.EDITS); |
| if (it.hasNext()) |
| sdEdits = it.next(); |
| if ((sdName == null) || (sdEdits == null)) |
| throw new IOException("Could not locate checkpoint directories"); |
| this.layoutVersion = -1; // to avoid assert in loadFSImage() |
| loadFSImage(FSImage.getImageFile(sdName, NameNodeFile.IMAGE)); |
| loadFSEdits(sdEdits); |
| sig.validateStorageInfo(this); |
| saveNamespace(false); |
| } |
| } |
| } |