| /** |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| package org.apache.hadoop.dfs; |
| |
| import org.apache.commons.logging.*; |
| |
| import org.apache.hadoop.fs.FileSystem; |
| import org.apache.hadoop.ipc.*; |
| import org.apache.hadoop.conf.*; |
| import org.apache.hadoop.util.StringUtils; |
| import org.apache.hadoop.util.Daemon; |
| import org.apache.hadoop.mapred.StatusHttpServer; |
| import org.apache.hadoop.net.NetUtils; |
| |
| import java.io.*; |
| import java.net.*; |
| import java.util.ArrayList; |
| import java.util.Collection; |
| import org.apache.hadoop.metrics.jvm.JvmMetrics; |
| |
| /********************************************************** |
| * The Secondary NameNode is a helper to the primary NameNode. |
| * The Secondary is responsible for supporting periodic checkpoints |
| * of the HDFS metadata. The current design allows only one Secondary |
| * NameNode per HDFs cluster. |
| * |
| * The Secondary NameNode is a daemon that periodically wakes |
| * up (determined by the schedule specified in the configuration), |
| * triggers a periodic checkpoint and then goes back to sleep. |
| * The Secondary NameNode uses the ClientProtocol to talk to the |
| * primary NameNode. |
| * |
| **********************************************************/ |
| public class SecondaryNameNode implements FSConstants, Runnable { |
| |
| public static final Log LOG = |
| LogFactory.getLog("org.apache.hadoop.dfs.NameNode.Secondary"); |
| |
| private String fsName; |
| private CheckpointStorage checkpointImage; |
| |
| private ClientProtocol namenode; |
| private Configuration conf; |
| private InetSocketAddress nameNodeAddr; |
| private boolean shouldRun; |
| private StatusHttpServer infoServer; |
| private int infoPort; |
| private String infoBindAddress; |
| |
| private Collection<File> checkpointDirs; |
| private long checkpointPeriod; // in seconds |
| private long checkpointSize; // size (in MB) of current Edit Log |
| |
| /** |
| * Utility class to facilitate junit test error simulation. |
| */ |
| static class ErrorSimulator { |
| private static boolean[] simulation = null; // error simulation events |
| static void initializeErrorSimulationEvent(int numberOfEvents) { |
| simulation = new boolean[numberOfEvents]; |
| for (int i = 0; i < numberOfEvents; i++) { |
| simulation[i] = false; |
| } |
| } |
| |
| static boolean getErrorSimulation(int index) { |
| if(simulation == null) |
| return false; |
| assert(index < simulation.length); |
| return simulation[index]; |
| } |
| |
| static void setErrorSimulation(int index) { |
| assert(index < simulation.length); |
| simulation[index] = true; |
| } |
| |
| static void clearErrorSimulation(int index) { |
| assert(index < simulation.length); |
| simulation[index] = false; |
| } |
| } |
| |
| FSImage getFSImage() { |
| return checkpointImage; |
| } |
| |
| /** |
| * Create a connection to the primary namenode. |
| */ |
| SecondaryNameNode(Configuration conf) throws IOException { |
| try { |
| initialize(conf); |
| } catch(IOException e) { |
| shutdown(); |
| throw e; |
| } |
| } |
| |
| /** |
| * Initialize SecondaryNameNode. |
| */ |
| private void initialize(Configuration conf) throws IOException { |
| // initiate Java VM metrics |
| JvmMetrics.init("SecondaryNameNode", conf.get("session.id")); |
| |
| // Create connection to the namenode. |
| shouldRun = true; |
| nameNodeAddr = NameNode.getAddress(conf); |
| |
| this.conf = conf; |
| this.namenode = |
| (ClientProtocol) RPC.waitForProxy(ClientProtocol.class, |
| ClientProtocol.versionID, nameNodeAddr, conf); |
| |
| // initialize checkpoint directories |
| fsName = getInfoServer(); |
| checkpointDirs = FSImage.getCheckpointDirs(conf, |
| "/tmp/hadoop/dfs/namesecondary"); |
| checkpointImage = new CheckpointStorage(); |
| checkpointImage.recoverCreate(checkpointDirs); |
| |
| // Initialize other scheduling parameters from the configuration |
| checkpointPeriod = conf.getLong("fs.checkpoint.period", 3600); |
| checkpointSize = conf.getLong("fs.checkpoint.size", 4194304); |
| |
| // initialize the webserver for uploading files. |
| String infoAddr = |
| NetUtils.getServerAddress(conf, |
| "dfs.secondary.info.bindAddress", |
| "dfs.secondary.info.port", |
| "dfs.secondary.http.address"); |
| InetSocketAddress infoSocAddr = NetUtils.createSocketAddr(infoAddr); |
| infoBindAddress = infoSocAddr.getHostName(); |
| int tmpInfoPort = infoSocAddr.getPort(); |
| infoServer = new StatusHttpServer("secondary", infoBindAddress, tmpInfoPort, |
| tmpInfoPort == 0); |
| infoServer.setAttribute("name.system.image", checkpointImage); |
| this.infoServer.setAttribute("name.conf", conf); |
| infoServer.addServlet("getimage", "/getimage", GetImageServlet.class); |
| infoServer.start(); |
| |
| // The web-server port can be ephemeral... ensure we have the correct info |
| infoPort = infoServer.getPort(); |
| conf.set("dfs.secondary.http.address", infoBindAddress + ":" +infoPort); |
| LOG.info("Secondary Web-server up at: " + infoBindAddress + ":" +infoPort); |
| LOG.warn("Checkpoint Period :" + checkpointPeriod + " secs " + |
| "(" + checkpointPeriod/60 + " min)"); |
| LOG.warn("Log Size Trigger :" + checkpointSize + " bytes " + |
| "(" + checkpointSize/1024 + " KB)"); |
| } |
| |
| /** |
| * Shut down this instance of the datanode. |
| * Returns only after shutdown is complete. |
| */ |
| public void shutdown() { |
| shouldRun = false; |
| try { |
| if (infoServer != null) infoServer.stop(); |
| } catch(InterruptedException ie) { |
| LOG.warn(StringUtils.stringifyException(ie)); |
| } |
| try { |
| if (checkpointImage != null) checkpointImage.close(); |
| } catch(IOException e) { |
| LOG.warn(StringUtils.stringifyException(e)); |
| } |
| } |
| |
| // |
| // The main work loop |
| // |
| public void run() { |
| |
| // |
| // Poll the Namenode (once every 5 minutes) to find the size of the |
| // pending edit log. |
| // |
| long period = 5 * 60; // 5 minutes |
| long lastCheckpointTime = 0; |
| if (checkpointPeriod < period) { |
| period = checkpointPeriod; |
| } |
| |
| while (shouldRun) { |
| try { |
| Thread.sleep(1000 * period); |
| } catch (InterruptedException ie) { |
| // do nothing |
| } |
| if (!shouldRun) { |
| break; |
| } |
| try { |
| long now = System.currentTimeMillis(); |
| |
| long size = namenode.getEditLogSize(); |
| if (size >= checkpointSize || |
| now >= lastCheckpointTime + 1000 * checkpointPeriod) { |
| doCheckpoint(); |
| lastCheckpointTime = now; |
| } |
| } catch (IOException e) { |
| LOG.error("Exception in doCheckpoint: "); |
| LOG.error(StringUtils.stringifyException(e)); |
| e.printStackTrace(); |
| } catch (Throwable e) { |
| LOG.error("Throwable Exception in doCheckpoint: "); |
| LOG.error(StringUtils.stringifyException(e)); |
| e.printStackTrace(); |
| Runtime.getRuntime().exit(-1); |
| } |
| } |
| } |
| |
| /** |
| * Download <code>fsimage</code> and <code>edits</code> |
| * files from the name-node. |
| * @throws IOException |
| */ |
| private void downloadCheckpointFiles(CheckpointSignature sig |
| ) throws IOException { |
| |
| checkpointImage.cTime = sig.cTime; |
| checkpointImage.checkpointTime = sig.checkpointTime; |
| |
| // get fsimage |
| String fileid = "getimage=1"; |
| File[] srcNames = checkpointImage.getImageFiles(); |
| assert srcNames.length > 0 : "No checkpoint targets."; |
| TransferFsImage.getFileClient(fsName, fileid, srcNames); |
| LOG.info("Downloaded file " + srcNames[0].getName() + " size " + |
| srcNames[0].length() + " bytes."); |
| |
| // get edits file |
| fileid = "getedit=1"; |
| srcNames = checkpointImage.getEditsFiles(); |
| assert srcNames.length > 0 : "No checkpoint targets."; |
| TransferFsImage.getFileClient(fsName, fileid, srcNames); |
| LOG.info("Downloaded file " + srcNames[0].getName() + " size " + |
| srcNames[0].length() + " bytes."); |
| |
| checkpointImage.checkpointUploadDone(); |
| } |
| |
| /** |
| * Copy the new fsimage into the NameNode |
| */ |
| private void putFSImage(CheckpointSignature sig) throws IOException { |
| String fileid = "putimage=1&port=" + infoPort + |
| "&machine=" + |
| InetAddress.getLocalHost().getHostAddress() + |
| "&token=" + sig.toString(); |
| LOG.info("Posted URL " + fsName + fileid); |
| TransferFsImage.getFileClient(fsName, fileid, (File[])null); |
| } |
| |
| /** |
| * Returns the Jetty server that the Namenode is listening on. |
| */ |
| private String getInfoServer() throws IOException { |
| URI fsName = FileSystem.getDefaultUri(conf); |
| if (!"hdfs".equals(fsName.getScheme())) { |
| throw new IOException("This is not a DFS"); |
| } |
| return NetUtils.getServerAddress(conf, "dfs.info.bindAddress", |
| "dfs.info.port", "dfs.http.address"); |
| } |
| |
| /** |
| * Create a new checkpoint |
| */ |
| void doCheckpoint() throws IOException { |
| |
| // Do the required initialization of the merge work area. |
| startCheckpoint(); |
| |
| // Tell the namenode to start logging transactions in a new edit file |
| // Retuns a token that would be used to upload the merged image. |
| CheckpointSignature sig = (CheckpointSignature)namenode.rollEditLog(); |
| |
| // error simulation code for junit test |
| if (ErrorSimulator.getErrorSimulation(0)) { |
| throw new IOException("Simulating error0 " + |
| "after creating edits.new"); |
| } |
| |
| downloadCheckpointFiles(sig); // Fetch fsimage and edits |
| doMerge(sig); // Do the merge |
| |
| // |
| // Upload the new image into the NameNode. Then tell the Namenode |
| // to make this new uploaded image as the most current image. |
| // |
| putFSImage(sig); |
| |
| // error simulation code for junit test |
| if (ErrorSimulator.getErrorSimulation(1)) { |
| throw new IOException("Simulating error1 " + |
| "after uploading new image to NameNode"); |
| } |
| |
| namenode.rollFsImage(); |
| checkpointImage.endCheckpoint(); |
| |
| LOG.warn("Checkpoint done. New Image Size: " |
| + checkpointImage.getFsImageName().length()); |
| } |
| |
| private void startCheckpoint() throws IOException { |
| checkpointImage.unlockAll(); |
| checkpointImage.getEditLog().close(); |
| checkpointImage.recoverCreate(checkpointDirs); |
| checkpointImage.startCheckpoint(); |
| } |
| |
| /** |
| * Merge downloaded image and edits and write the new image into |
| * current storage directory. |
| */ |
| private void doMerge(CheckpointSignature sig) throws IOException { |
| FSNamesystem namesystem = |
| new FSNamesystem(checkpointImage, conf); |
| assert namesystem.dir.fsImage == checkpointImage; |
| checkpointImage.doMerge(sig); |
| } |
| |
| /** |
| * @param argv The parameters passed to this program. |
| * @exception Exception if the filesystem does not exist. |
| * @return 0 on success, non zero on error. |
| */ |
| private int processArgs(String[] argv) throws Exception { |
| |
| if (argv.length < 1) { |
| printUsage(""); |
| return -1; |
| } |
| |
| int exitCode = -1; |
| int i = 0; |
| String cmd = argv[i++]; |
| |
| // |
| // verify that we have enough command line parameters |
| // |
| if ("-geteditsize".equals(cmd)) { |
| if (argv.length != 1) { |
| printUsage(cmd); |
| return exitCode; |
| } |
| } else if ("-checkpoint".equals(cmd)) { |
| if (argv.length != 1 && argv.length != 2) { |
| printUsage(cmd); |
| return exitCode; |
| } |
| if (argv.length == 2 && !"force".equals(argv[i])) { |
| printUsage(cmd); |
| return exitCode; |
| } |
| } |
| |
| exitCode = 0; |
| try { |
| if ("-checkpoint".equals(cmd)) { |
| long size = namenode.getEditLogSize(); |
| if (size >= checkpointSize || |
| argv.length == 2 && "force".equals(argv[i])) { |
| doCheckpoint(); |
| } else { |
| System.err.println("EditLog size " + size + " bytes is " + |
| "smaller than configured checkpoint " + |
| "size " + checkpointSize + " bytes."); |
| System.err.println("Skipping checkpoint."); |
| } |
| } else if ("-geteditsize".equals(cmd)) { |
| long size = namenode.getEditLogSize(); |
| System.out.println("EditLog size is " + size + " bytes"); |
| } else { |
| exitCode = -1; |
| LOG.error(cmd.substring(1) + ": Unknown command"); |
| printUsage(""); |
| } |
| } catch (RemoteException e) { |
| // |
| // This is a error returned by hadoop server. Print |
| // out the first line of the error mesage, ignore the stack trace. |
| exitCode = -1; |
| try { |
| String[] content; |
| content = e.getLocalizedMessage().split("\n"); |
| LOG.error(cmd.substring(1) + ": " |
| + content[0]); |
| } catch (Exception ex) { |
| LOG.error(cmd.substring(1) + ": " |
| + ex.getLocalizedMessage()); |
| } |
| } catch (IOException e) { |
| // |
| // IO exception encountered locally. |
| // |
| exitCode = -1; |
| LOG.error(cmd.substring(1) + ": " |
| + e.getLocalizedMessage()); |
| } finally { |
| // Does the RPC connection need to be closed? |
| } |
| return exitCode; |
| } |
| |
| /** |
| * Displays format of commands. |
| * @param cmd The command that is being executed. |
| */ |
| private void printUsage(String cmd) { |
| if ("-geteditsize".equals(cmd)) { |
| System.err.println("Usage: java SecondaryNameNode" |
| + " [-geteditsize]"); |
| } else if ("-checkpoint".equals(cmd)) { |
| System.err.println("Usage: java SecondaryNameNode" |
| + " [-checkpoint [force]]"); |
| } else { |
| System.err.println("Usage: java SecondaryNameNode " + |
| "[-checkpoint [force]] " + |
| "[-geteditsize] "); |
| } |
| } |
| |
| /** |
| * main() has some simple utility methods. |
| * @param argv Command line parameters. |
| * @exception Exception if the filesystem does not exist. |
| */ |
| public static void main(String[] argv) throws Exception { |
| StringUtils.startupShutdownMessage(SecondaryNameNode.class, argv, LOG); |
| Configuration tconf = new Configuration(); |
| if (argv.length >= 1) { |
| SecondaryNameNode secondary = new SecondaryNameNode(tconf); |
| int ret = secondary.processArgs(argv); |
| System.exit(ret); |
| } |
| |
| // Create a never ending deamon |
| Daemon checkpointThread = new Daemon(new SecondaryNameNode(tconf)); |
| checkpointThread.start(); |
| } |
| |
| static class CheckpointStorage extends FSImage { |
| /** |
| */ |
| CheckpointStorage() throws IOException { |
| super(); |
| } |
| |
| @Override |
| boolean isConversionNeeded(StorageDirectory sd) { |
| return false; |
| } |
| |
| /** |
| * Analyze checkpoint directories. |
| * Create directories if they do not exist. |
| * Recover from an unsuccessful checkpoint is necessary. |
| * |
| * @param dataDirs |
| * @throws IOException |
| */ |
| void recoverCreate(Collection<File> dataDirs) throws IOException { |
| this.storageDirs = new ArrayList<StorageDirectory>(dataDirs.size()); |
| for(File dataDir : dataDirs) { |
| boolean isAccessible = true; |
| try { // create directories if don't exist yet |
| if(!dataDir.mkdirs()) { |
| // do nothing, directory is already ctreated |
| } |
| } catch(SecurityException se) { |
| isAccessible = false; |
| } |
| if(!isAccessible) |
| throw new InconsistentFSStateException(dataDir, |
| "cannot access checkpoint directory."); |
| StorageDirectory sd = new StorageDirectory(dataDir); |
| StorageState curState; |
| try { |
| curState = sd.analyzeStorage(StartupOption.REGULAR); |
| // sd is locked but not opened |
| switch(curState) { |
| case NON_EXISTENT: |
| // fail if any of the configured checkpoint dirs are inaccessible |
| throw new InconsistentFSStateException(sd.root, |
| "checkpoint directory does not exist or is not accessible."); |
| case NOT_FORMATTED: |
| break; // it's ok since initially there is no current and VERSION |
| case NORMAL: |
| break; |
| default: // recovery is possible |
| sd.doRecover(curState); |
| } |
| } catch (IOException ioe) { |
| sd.unlock(); |
| throw ioe; |
| } |
| // add to the storage list |
| addStorageDir(sd); |
| LOG.warn("Checkpoint directory " + sd.root + " is added."); |
| } |
| } |
| |
| /** |
| * Prepare directories for a new checkpoint. |
| * <p> |
| * Rename <code>current</code> to <code>lastcheckpoint.tmp</code> |
| * and recreate <code>current</code>. |
| * @throws IOException |
| */ |
| void startCheckpoint() throws IOException { |
| for(StorageDirectory sd : storageDirs) { |
| File curDir = sd.getCurrentDir(); |
| File tmpCkptDir = sd.getLastCheckpointTmp(); |
| assert !tmpCkptDir.exists() : |
| tmpCkptDir.getName() + " directory must not exist."; |
| if(curDir.exists()) { |
| // rename current to tmp |
| rename(curDir, tmpCkptDir); |
| } |
| if (!curDir.mkdir()) |
| throw new IOException("Cannot create directory " + curDir); |
| } |
| } |
| |
| void endCheckpoint() throws IOException { |
| for(StorageDirectory sd : storageDirs) { |
| File tmpCkptDir = sd.getLastCheckpointTmp(); |
| File prevCkptDir = sd.getPreviousCheckpoint(); |
| // delete previous dir |
| if (prevCkptDir.exists()) |
| deleteDir(prevCkptDir); |
| // rename tmp to previous |
| if (tmpCkptDir.exists()) |
| rename(tmpCkptDir, prevCkptDir); |
| } |
| } |
| |
| /** |
| * Merge image and edits, and verify consistency with the signature. |
| */ |
| private void doMerge(CheckpointSignature sig) throws IOException { |
| getEditLog().open(); |
| StorageDirectory sd = getStorageDir(0); |
| loadFSImage(FSImage.getImageFile(sd, NameNodeFile.IMAGE)); |
| loadFSEdits(sd); |
| sig.validateStorageInfo(this); |
| saveFSImage(); |
| } |
| } |
| } |