| /** |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| package org.apache.hadoop.hdfs.server.namenode; |
| |
| import java.io.BufferedInputStream; |
| import java.io.BufferedOutputStream; |
| import java.io.DataInput; |
| import java.io.DataInputStream; |
| import java.io.DataOutput; |
| import java.io.DataOutputStream; |
| import java.io.File; |
| import java.io.FileInputStream; |
| import java.io.FileOutputStream; |
| import java.io.IOException; |
| import java.io.RandomAccessFile; |
| import java.net.URI; |
| import java.nio.ByteBuffer; |
| import java.text.SimpleDateFormat; |
| import java.util.ArrayList; |
| import java.util.Collection; |
| import java.util.Date; |
| import java.util.HashMap; |
| import java.util.HashSet; |
| import java.util.Iterator; |
| import java.util.List; |
| import java.util.Map; |
| import java.util.Properties; |
| import java.util.Random; |
| import java.util.Set; |
| |
| import org.apache.hadoop.conf.Configuration; |
| import org.apache.hadoop.fs.Path; |
| import org.apache.hadoop.fs.permission.FsPermission; |
| import org.apache.hadoop.fs.permission.PermissionStatus; |
| import org.apache.hadoop.hdfs.DeprecatedUTF8; |
| import org.apache.hadoop.hdfs.protocol.Block; |
| import org.apache.hadoop.hdfs.protocol.DatanodeID; |
| import org.apache.hadoop.hdfs.protocol.FSConstants; |
| import org.apache.hadoop.hdfs.server.common.GenerationStamp; |
| import org.apache.hadoop.hdfs.server.common.InconsistentFSStateException; |
| import org.apache.hadoop.hdfs.server.common.Storage; |
| import org.apache.hadoop.hdfs.server.common.StorageInfo; |
| import org.apache.hadoop.hdfs.server.common.UpgradeManager; |
| import org.apache.hadoop.hdfs.server.common.Util; |
| import org.apache.hadoop.hdfs.server.common.HdfsConstants.BlockUCState; |
| import org.apache.hadoop.hdfs.server.common.HdfsConstants.NamenodeRole; |
| import org.apache.hadoop.hdfs.server.common.HdfsConstants.NodeType; |
| import org.apache.hadoop.hdfs.server.common.HdfsConstants.StartupOption; |
| import org.apache.hadoop.hdfs.server.namenode.JournalStream.JournalType; |
| import org.apache.hadoop.hdfs.server.protocol.CheckpointCommand; |
| import org.apache.hadoop.hdfs.server.protocol.NamenodeCommand; |
| import org.apache.hadoop.hdfs.server.protocol.NamenodeProtocol; |
| import org.apache.hadoop.hdfs.server.protocol.NamenodeRegistration; |
| import org.apache.hadoop.hdfs.DFSConfigKeys; |
| import org.apache.hadoop.io.Writable; |
| |
| /** |
| * FSImage handles checkpointing and logging of the namespace edits. |
| * |
| */ |
| public class FSImage extends Storage { |
| |
| private static final SimpleDateFormat DATE_FORM = |
| new SimpleDateFormat("yyyy-MM-dd HH:mm:ss"); |
| |
| // |
| // The filenames used for storing the images |
| // |
| enum NameNodeFile { |
| IMAGE ("fsimage"), |
| TIME ("fstime"), |
| EDITS ("edits"), |
| IMAGE_NEW ("fsimage.ckpt"), |
| EDITS_NEW ("edits.new"); |
| |
| private String fileName = null; |
| private NameNodeFile(String name) {this.fileName = name;} |
| String getName() {return fileName;} |
| } |
| |
| // checkpoint states |
| enum CheckpointStates{START, ROLLED_EDITS, UPLOAD_START, UPLOAD_DONE; } |
| /** |
| * Implementation of StorageDirType specific to namenode storage |
| * A Storage directory could be of type IMAGE which stores only fsimage, |
| * or of type EDITS which stores edits or of type IMAGE_AND_EDITS which |
| * stores both fsimage and edits. |
| */ |
| static enum NameNodeDirType implements StorageDirType { |
| UNDEFINED, |
| IMAGE, |
| EDITS, |
| IMAGE_AND_EDITS; |
| |
| public StorageDirType getStorageDirType() { |
| return this; |
| } |
| |
| public boolean isOfType(StorageDirType type) { |
| if ((this == IMAGE_AND_EDITS) && (type == IMAGE || type == EDITS)) |
| return true; |
| return this == type; |
| } |
| } |
| |
| protected FSNamesystem namesystem = null; |
| protected long checkpointTime = -1L; // The age of the image |
| protected FSEditLog editLog = null; |
| private boolean isUpgradeFinalized = false; |
| |
| /** |
| * flag that controls if we try to restore failed storages |
| */ |
| private boolean restoreFailedStorage = false; |
| |
| /** |
| * list of failed (and thus removed) storages |
| */ |
| protected List<StorageDirectory> removedStorageDirs = new ArrayList<StorageDirectory>(); |
| |
| /** |
| * URIs for importing an image from a checkpoint. In the default case, |
| * URIs will represent directories. |
| */ |
| private Collection<URI> checkpointDirs; |
| private Collection<URI> checkpointEditsDirs; |
| |
| /** |
| * Can fs-image be rolled? |
| */ |
| volatile protected CheckpointStates ckptState = FSImage.CheckpointStates.START; |
| |
| /** |
| * Used for saving the image to disk |
| */ |
| static private final FsPermission FILE_PERM = new FsPermission((short)0); |
| static private final byte[] PATH_SEPARATOR = INode.string2Bytes(Path.SEPARATOR); |
| |
| /** |
| */ |
| FSImage() { |
| this((FSNamesystem)null); |
| } |
| |
| FSImage(FSNamesystem ns) { |
| super(NodeType.NAME_NODE); |
| this.editLog = new FSEditLog(this); |
| setFSNamesystem(ns); |
| } |
| |
| /** |
| * @throws IOException |
| */ |
| FSImage(Collection<URI> fsDirs, Collection<URI> fsEditsDirs) |
| throws IOException { |
| this(); |
| setStorageDirectories(fsDirs, fsEditsDirs); |
| } |
| |
| public FSImage(StorageInfo storageInfo) { |
| super(NodeType.NAME_NODE, storageInfo); |
| } |
| |
| /** |
| * Represents an Image (image and edit file). |
| * @throws IOException |
| */ |
| FSImage(URI imageDir) throws IOException { |
| this(); |
| ArrayList<URI> dirs = new ArrayList<URI>(1); |
| ArrayList<URI> editsDirs = new ArrayList<URI>(1); |
| dirs.add(imageDir); |
| editsDirs.add(imageDir); |
| setStorageDirectories(dirs, editsDirs); |
| } |
| |
| protected FSNamesystem getFSNamesystem() { |
| return namesystem; |
| } |
| |
| void setFSNamesystem(FSNamesystem ns) { |
| namesystem = ns; |
| } |
| |
| public void setRestoreFailedStorage(boolean val) { |
| LOG.info("set restore failed storage to " + val); |
| restoreFailedStorage=val; |
| } |
| |
| public boolean getRestoreFailedStorage() { |
| return restoreFailedStorage; |
| } |
| |
| void setStorageDirectories(Collection<URI> fsNameDirs, |
| Collection<URI> fsEditsDirs) throws IOException { |
| this.storageDirs = new ArrayList<StorageDirectory>(); |
| this.removedStorageDirs = new ArrayList<StorageDirectory>(); |
| |
| // Add all name dirs with appropriate NameNodeDirType |
| for (URI dirName : fsNameDirs) { |
| checkSchemeConsistency(dirName); |
| boolean isAlsoEdits = false; |
| for (URI editsDirName : fsEditsDirs) { |
| if (editsDirName.compareTo(dirName) == 0) { |
| isAlsoEdits = true; |
| fsEditsDirs.remove(editsDirName); |
| break; |
| } |
| } |
| NameNodeDirType dirType = (isAlsoEdits) ? |
| NameNodeDirType.IMAGE_AND_EDITS : |
| NameNodeDirType.IMAGE; |
| // Add to the list of storage directories, only if the |
| // URI is of type file:// |
| if(dirName.getScheme().compareTo(JournalType.FILE.name().toLowerCase()) |
| == 0){ |
| this.addStorageDir(new StorageDirectory(new File(dirName.getPath()), |
| dirType)); |
| } |
| } |
| |
| // Add edits dirs if they are different from name dirs |
| for (URI dirName : fsEditsDirs) { |
| checkSchemeConsistency(dirName); |
| // Add to the list of storage directories, only if the |
| // URI is of type file:// |
| if(dirName.getScheme().compareTo(JournalType.FILE.name().toLowerCase()) |
| == 0) |
| this.addStorageDir(new StorageDirectory(new File(dirName.getPath()), |
| NameNodeDirType.EDITS)); |
| } |
| } |
| |
| /* |
| * Checks the consistency of a URI, in particular if the scheme |
| * is specified and is supported by a concrete implementation |
| */ |
| static void checkSchemeConsistency(URI u) throws IOException { |
| String scheme = u.getScheme(); |
| // the URI should have a proper scheme |
| if(scheme == null) |
| throw new IOException("Undefined scheme for " + u); |
| else { |
| try { |
| // the scheme should be enumerated as JournalType |
| JournalType.valueOf(scheme.toUpperCase()); |
| } catch (IllegalArgumentException iae){ |
| throw new IOException("Unknown scheme " + scheme + |
| ". It should correspond to a JournalType enumeration value"); |
| } |
| } |
| }; |
| |
| void setCheckpointDirectories(Collection<URI> dirs, |
| Collection<URI> editsDirs) { |
| checkpointDirs = dirs; |
| checkpointEditsDirs = editsDirs; |
| } |
| |
| static File getImageFile(StorageDirectory sd, NameNodeFile type) { |
| return new File(sd.getCurrentDir(), type.getName()); |
| } |
| |
| List<StorageDirectory> getRemovedStorageDirs() { |
| return this.removedStorageDirs; |
| } |
| |
| File getEditFile(StorageDirectory sd) { |
| return getImageFile(sd, NameNodeFile.EDITS); |
| } |
| |
| File getEditNewFile(StorageDirectory sd) { |
| return getImageFile(sd, NameNodeFile.EDITS_NEW); |
| } |
| |
| Collection<File> getFiles(NameNodeFile type, NameNodeDirType dirType) { |
| ArrayList<File> list = new ArrayList<File>(); |
| Iterator<StorageDirectory> it = (dirType == null) ? dirIterator() : |
| dirIterator(dirType); |
| for ( ;it.hasNext(); ) { |
| list.add(getImageFile(it.next(), type)); |
| } |
| return list; |
| } |
| |
| Collection<URI> getDirectories(NameNodeDirType dirType) |
| throws IOException { |
| ArrayList<URI> list = new ArrayList<URI>(); |
| Iterator<StorageDirectory> it = (dirType == null) ? dirIterator() : |
| dirIterator(dirType); |
| for ( ;it.hasNext(); ) { |
| StorageDirectory sd = it.next(); |
| try { |
| list.add(Util.fileAsURI(sd.getRoot())); |
| } catch (IOException e) { |
| throw new IOException("Exception while processing " + |
| "StorageDirectory " + sd.getRoot(), e); |
| } |
| } |
| return list; |
| } |
| |
| /** |
| * Retrieve current directories of type IMAGE |
| * @return Collection of URI representing image directories |
| * @throws IOException in case of URI processing error |
| */ |
| Collection<URI> getImageDirectories() throws IOException { |
| return getDirectories(NameNodeDirType.IMAGE); |
| } |
| |
| /** |
| * Retrieve current directories of type EDITS |
| * @return Collection of URI representing edits directories |
| * @throws IOException in case of URI processing error |
| */ |
| Collection<URI> getEditsDirectories() throws IOException { |
| return getDirectories(NameNodeDirType.EDITS); |
| } |
| |
| /** |
| * Return number of storage directories of the given type. |
| * @param dirType directory type |
| * @return number of storage directories of type dirType |
| */ |
| int getNumStorageDirs(NameNodeDirType dirType) { |
| if(dirType == null) |
| return getNumStorageDirs(); |
| Iterator<StorageDirectory> it = dirIterator(dirType); |
| int numDirs = 0; |
| for(; it.hasNext(); it.next()) |
| numDirs++; |
| return numDirs; |
| } |
| |
| /** |
| * Analyze storage directories. |
| * Recover from previous transitions if required. |
| * Perform fs state transition if necessary depending on the namespace info. |
| * Read storage info. |
| * |
| * @param dataDirs |
| * @param startOpt startup option |
| * @throws IOException |
| * @return true if the image needs to be saved or false otherwise |
| */ |
| boolean recoverTransitionRead(Collection<URI> dataDirs, |
| Collection<URI> editsDirs, |
| StartupOption startOpt |
| ) throws IOException { |
| assert startOpt != StartupOption.FORMAT : |
| "NameNode formatting should be performed before reading the image"; |
| |
| // none of the data dirs exist |
| if((dataDirs.size() == 0 || editsDirs.size() == 0) |
| && startOpt != StartupOption.IMPORT) |
| throw new IOException( |
| "All specified directories are not accessible or do not exist."); |
| |
| if(startOpt == StartupOption.IMPORT |
| && (checkpointDirs == null || checkpointDirs.isEmpty())) |
| throw new IOException("Cannot import image from a checkpoint. " |
| + "\"dfs.namenode.checkpoint.dir\" is not set." ); |
| |
| if(startOpt == StartupOption.IMPORT |
| && (checkpointEditsDirs == null || checkpointEditsDirs.isEmpty())) |
| throw new IOException("Cannot import image from a checkpoint. " |
| + "\"dfs.namenode.checkpoint.dir\" is not set." ); |
| |
| setStorageDirectories(dataDirs, editsDirs); |
| // 1. For each data directory calculate its state and |
| // check whether all is consistent before transitioning. |
| Map<StorageDirectory, StorageState> dataDirStates = |
| new HashMap<StorageDirectory, StorageState>(); |
| boolean isFormatted = false; |
| for (Iterator<StorageDirectory> it = |
| dirIterator(); it.hasNext();) { |
| StorageDirectory sd = it.next(); |
| StorageState curState; |
| try { |
| curState = sd.analyzeStorage(startOpt); |
| // sd is locked but not opened |
| switch(curState) { |
| case NON_EXISTENT: |
| // name-node fails if any of the configured storage dirs are missing |
| throw new InconsistentFSStateException(sd.getRoot(), |
| "storage directory does not exist or is not accessible."); |
| case NOT_FORMATTED: |
| break; |
| case NORMAL: |
| break; |
| default: // recovery is possible |
| sd.doRecover(curState); |
| } |
| if (curState != StorageState.NOT_FORMATTED |
| && startOpt != StartupOption.ROLLBACK) { |
| sd.read(); // read and verify consistency with other directories |
| isFormatted = true; |
| } |
| if (startOpt == StartupOption.IMPORT && isFormatted) |
| // import of a checkpoint is allowed only into empty image directories |
| throw new IOException("Cannot import image from a checkpoint. " |
| + " NameNode already contains an image in " + sd.getRoot()); |
| } catch (IOException ioe) { |
| sd.unlock(); |
| throw ioe; |
| } |
| dataDirStates.put(sd,curState); |
| } |
| |
| if (!isFormatted && startOpt != StartupOption.ROLLBACK |
| && startOpt != StartupOption.IMPORT) |
| throw new IOException("NameNode is not formatted."); |
| if (layoutVersion < LAST_PRE_UPGRADE_LAYOUT_VERSION) { |
| checkVersionUpgradable(layoutVersion); |
| } |
| if (startOpt != StartupOption.UPGRADE |
| && layoutVersion < LAST_PRE_UPGRADE_LAYOUT_VERSION |
| && layoutVersion != FSConstants.LAYOUT_VERSION) |
| throw new IOException( |
| "\nFile system image contains an old layout version " + layoutVersion |
| + ".\nAn upgrade to version " + FSConstants.LAYOUT_VERSION |
| + " is required.\nPlease restart NameNode with -upgrade option."); |
| // check whether distributed upgrade is reguired and/or should be continued |
| verifyDistributedUpgradeProgress(startOpt); |
| |
| // 2. Format unformatted dirs. |
| this.checkpointTime = 0L; |
| for (Iterator<StorageDirectory> it = |
| dirIterator(); it.hasNext();) { |
| StorageDirectory sd = it.next(); |
| StorageState curState = dataDirStates.get(sd); |
| switch(curState) { |
| case NON_EXISTENT: |
| throw new IOException(StorageState.NON_EXISTENT + |
| " state cannot be here"); |
| case NOT_FORMATTED: |
| LOG.info("Storage directory " + sd.getRoot() + " is not formatted."); |
| LOG.info("Formatting ..."); |
| sd.clearDirectory(); // create empty currrent dir |
| break; |
| default: |
| break; |
| } |
| } |
| |
| // 3. Do transitions |
| switch(startOpt) { |
| case UPGRADE: |
| doUpgrade(); |
| return false; // upgrade saved image already |
| case IMPORT: |
| doImportCheckpoint(); |
| return false; // import checkpoint saved image already |
| case ROLLBACK: |
| doRollback(); |
| break; |
| case REGULAR: |
| // just load the image |
| } |
| |
| boolean needToSave = loadFSImage(); |
| |
| assert editLog != null : "editLog must be initialized"; |
| if(!editLog.isOpen()) |
| editLog.open(); |
| |
| return needToSave; |
| } |
| |
| private void doUpgrade() throws IOException { |
| if(getDistributedUpgradeState()) { |
| // only distributed upgrade need to continue |
| // don't do version upgrade |
| this.loadFSImage(); |
| initializeDistributedUpgrade(); |
| return; |
| } |
| // Upgrade is allowed only if there are |
| // no previous fs states in any of the directories |
| for (Iterator<StorageDirectory> it = |
| dirIterator(); it.hasNext();) { |
| StorageDirectory sd = it.next(); |
| if (sd.getPreviousDir().exists()) |
| throw new InconsistentFSStateException(sd.getRoot(), |
| "previous fs state should not exist during upgrade. " |
| + "Finalize or rollback first."); |
| } |
| |
| // load the latest image |
| this.loadFSImage(); |
| |
| // Do upgrade for each directory |
| long oldCTime = this.getCTime(); |
| this.cTime = FSNamesystem.now(); // generate new cTime for the state |
| int oldLV = this.getLayoutVersion(); |
| this.layoutVersion = FSConstants.LAYOUT_VERSION; |
| this.checkpointTime = FSNamesystem.now(); |
| for (Iterator<StorageDirectory> it = |
| dirIterator(); it.hasNext();) { |
| StorageDirectory sd = it.next(); |
| LOG.info("Upgrading image directory " + sd.getRoot() |
| + ".\n old LV = " + oldLV |
| + "; old CTime = " + oldCTime |
| + ".\n new LV = " + this.getLayoutVersion() |
| + "; new CTime = " + this.getCTime()); |
| File curDir = sd.getCurrentDir(); |
| File prevDir = sd.getPreviousDir(); |
| File tmpDir = sd.getPreviousTmp(); |
| assert curDir.exists() : "Current directory must exist."; |
| assert !prevDir.exists() : "prvious directory must not exist."; |
| assert !tmpDir.exists() : "prvious.tmp directory must not exist."; |
| assert !editLog.isOpen() : "Edits log must not be open."; |
| // rename current to tmp |
| rename(curDir, tmpDir); |
| // save new image |
| saveCurrent(sd); |
| // rename tmp to previous |
| rename(tmpDir, prevDir); |
| isUpgradeFinalized = false; |
| LOG.info("Upgrade of " + sd.getRoot() + " is complete."); |
| } |
| initializeDistributedUpgrade(); |
| editLog.open(); |
| } |
| |
| private void doRollback() throws IOException { |
| // Rollback is allowed only if there is |
| // a previous fs states in at least one of the storage directories. |
| // Directories that don't have previous state do not rollback |
| boolean canRollback = false; |
| FSImage prevState = new FSImage(getFSNamesystem()); |
| prevState.layoutVersion = FSConstants.LAYOUT_VERSION; |
| for (Iterator<StorageDirectory> it = |
| dirIterator(); it.hasNext();) { |
| StorageDirectory sd = it.next(); |
| File prevDir = sd.getPreviousDir(); |
| if (!prevDir.exists()) { // use current directory then |
| LOG.info("Storage directory " + sd.getRoot() |
| + " does not contain previous fs state."); |
| sd.read(); // read and verify consistency with other directories |
| continue; |
| } |
| StorageDirectory sdPrev = prevState.new StorageDirectory(sd.getRoot()); |
| sdPrev.read(sdPrev.getPreviousVersionFile()); // read and verify consistency of the prev dir |
| canRollback = true; |
| } |
| if (!canRollback) |
| throw new IOException("Cannot rollback. " |
| + "None of the storage directories contain previous fs state."); |
| |
| // Now that we know all directories are going to be consistent |
| // Do rollback for each directory containing previous state |
| for (Iterator<StorageDirectory> it = |
| dirIterator(); it.hasNext();) { |
| StorageDirectory sd = it.next(); |
| File prevDir = sd.getPreviousDir(); |
| if (!prevDir.exists()) |
| continue; |
| |
| LOG.info("Rolling back storage directory " + sd.getRoot() |
| + ".\n new LV = " + prevState.getLayoutVersion() |
| + "; new CTime = " + prevState.getCTime()); |
| File tmpDir = sd.getRemovedTmp(); |
| assert !tmpDir.exists() : "removed.tmp directory must not exist."; |
| // rename current to tmp |
| File curDir = sd.getCurrentDir(); |
| assert curDir.exists() : "Current directory must exist."; |
| rename(curDir, tmpDir); |
| // rename previous to current |
| rename(prevDir, curDir); |
| |
| // delete tmp dir |
| deleteDir(tmpDir); |
| LOG.info("Rollback of " + sd.getRoot()+ " is complete."); |
| } |
| isUpgradeFinalized = true; |
| // check whether name-node can start in regular mode |
| verifyDistributedUpgradeProgress(StartupOption.REGULAR); |
| } |
| |
| private void doFinalize(StorageDirectory sd) throws IOException { |
| File prevDir = sd.getPreviousDir(); |
| if (!prevDir.exists()) { // already discarded |
| LOG.info("Directory " + prevDir + " does not exist."); |
| LOG.info("Finalize upgrade for " + sd.getRoot()+ " is not required."); |
| return; |
| } |
| LOG.info("Finalizing upgrade for storage directory " |
| + sd.getRoot() + "." |
| + (getLayoutVersion()==0 ? "" : |
| "\n cur LV = " + this.getLayoutVersion() |
| + "; cur CTime = " + this.getCTime())); |
| assert sd.getCurrentDir().exists() : "Current directory must exist."; |
| final File tmpDir = sd.getFinalizedTmp(); |
| // rename previous to tmp and remove |
| rename(prevDir, tmpDir); |
| deleteDir(tmpDir); |
| isUpgradeFinalized = true; |
| LOG.info("Finalize upgrade for " + sd.getRoot()+ " is complete."); |
| } |
| |
| /** |
| * Load image from a checkpoint directory and save it into the current one. |
| * @throws IOException |
| */ |
| void doImportCheckpoint() throws IOException { |
| FSNamesystem fsNamesys = getFSNamesystem(); |
| FSImage ckptImage = new FSImage(fsNamesys); |
| // replace real image with the checkpoint image |
| FSImage realImage = fsNamesys.getFSImage(); |
| assert realImage == this; |
| fsNamesys.dir.fsImage = ckptImage; |
| // load from the checkpoint dirs |
| try { |
| ckptImage.recoverTransitionRead(checkpointDirs, checkpointEditsDirs, |
| StartupOption.REGULAR); |
| } finally { |
| ckptImage.close(); |
| } |
| // return back the real image |
| realImage.setStorageInfo(ckptImage); |
| checkpointTime = ckptImage.checkpointTime; |
| fsNamesys.dir.fsImage = realImage; |
| // and save it but keep the same checkpointTime |
| saveNamespace(false); |
| } |
| |
| void finalizeUpgrade() throws IOException { |
| for (Iterator<StorageDirectory> it = |
| dirIterator(); it.hasNext();) { |
| doFinalize(it.next()); |
| } |
| } |
| |
| boolean isUpgradeFinalized() { |
| return isUpgradeFinalized; |
| } |
| |
| protected void getFields(Properties props, |
| StorageDirectory sd |
| ) throws IOException { |
| super.getFields(props, sd); |
| if (layoutVersion == 0) |
| throw new IOException("NameNode directory " |
| + sd.getRoot() + " is not formatted."); |
| String sDUS, sDUV; |
| sDUS = props.getProperty("distributedUpgradeState"); |
| sDUV = props.getProperty("distributedUpgradeVersion"); |
| setDistributedUpgradeState( |
| sDUS == null? false : Boolean.parseBoolean(sDUS), |
| sDUV == null? getLayoutVersion() : Integer.parseInt(sDUV)); |
| this.checkpointTime = readCheckpointTime(sd); |
| } |
| |
| /** |
| * Determine the checkpoint time of the specified StorageDirectory |
| * |
| * @param sd StorageDirectory to check |
| * @return If file exists and can be read, last checkpoint time. If not, 0L. |
| * @throws IOException On errors processing file pointed to by sd |
| */ |
| long readCheckpointTime(StorageDirectory sd) throws IOException { |
| File timeFile = getImageFile(sd, NameNodeFile.TIME); |
| long timeStamp = 0L; |
| if (timeFile.exists() && timeFile.canRead()) { |
| DataInputStream in = new DataInputStream(new FileInputStream(timeFile)); |
| try { |
| timeStamp = in.readLong(); |
| } finally { |
| in.close(); |
| } |
| } |
| return timeStamp; |
| } |
| |
| /** |
| * Write last checkpoint time and version file into the storage directory. |
| * |
| * The version file should always be written last. |
| * Missing or corrupted version file indicates that |
| * the checkpoint is not valid. |
| * |
| * @param sd storage directory |
| * @throws IOException |
| */ |
| protected void setFields(Properties props, |
| StorageDirectory sd |
| ) throws IOException { |
| super.setFields(props, sd); |
| boolean uState = getDistributedUpgradeState(); |
| int uVersion = getDistributedUpgradeVersion(); |
| if(uState && uVersion != getLayoutVersion()) { |
| props.setProperty("distributedUpgradeState", Boolean.toString(uState)); |
| props.setProperty("distributedUpgradeVersion", Integer.toString(uVersion)); |
| } |
| writeCheckpointTime(sd); |
| } |
| |
| /** |
| * Write last checkpoint time into a separate file. |
| * |
| * @param sd |
| * @throws IOException |
| */ |
| void writeCheckpointTime(StorageDirectory sd) throws IOException { |
| if (checkpointTime < 0L) |
| return; // do not write negative time |
| File timeFile = getImageFile(sd, NameNodeFile.TIME); |
| if (timeFile.exists() && ! timeFile.delete()) { |
| LOG.error("Cannot delete chekpoint time file: " |
| + timeFile.getCanonicalPath()); |
| } |
| DataOutputStream out = new DataOutputStream( |
| new FileOutputStream(timeFile)); |
| try { |
| out.writeLong(checkpointTime); |
| } finally { |
| out.close(); |
| } |
| } |
| |
| /** |
| * Record new checkpoint time in order to |
| * distinguish healthy directories from the removed ones. |
| * If there is an error writing new checkpoint time, the corresponding |
| * storage directory is removed from the list. |
| */ |
| void incrementCheckpointTime() { |
| setCheckpointTime(checkpointTime + 1); |
| } |
| |
| /** |
| * The age of the namespace state.<p> |
| * Reflects the latest time the image was saved. |
| * Modified with every save or a checkpoint. |
| * Persisted in VERSION file. |
| */ |
| long getCheckpointTime() { |
| return checkpointTime; |
| } |
| |
| void setCheckpointTime(long newCpT) { |
| checkpointTime = newCpT; |
| // Write new checkpoint time in all storage directories |
| for(Iterator<StorageDirectory> it = |
| dirIterator(); it.hasNext();) { |
| StorageDirectory sd = it.next(); |
| try { |
| writeCheckpointTime(sd); |
| } catch(IOException e) { |
| // Close any edits stream associated with this dir and remove directory |
| LOG.warn("incrementCheckpointTime failed on " + sd.getRoot().getPath() + ";type="+sd.getStorageDirType()); |
| } |
| } |
| } |
| |
| /** |
| * @param sds - array of SDs to process |
| * @param propagate - flag, if set - then call corresponding EditLog stream's |
| * processIOError function. |
| */ |
| void processIOError(ArrayList<StorageDirectory> sds, boolean propagate) { |
| ArrayList<EditLogOutputStream> al = null; |
| for(StorageDirectory sd:sds) { |
| // if has a stream assosiated with it - remove it too.. |
| if (propagate && sd.getStorageDirType().isOfType(NameNodeDirType.EDITS)) { |
| EditLogOutputStream eStream = editLog.getEditsStream(sd); |
| if(al == null) al = new ArrayList<EditLogOutputStream>(1); |
| al.add(eStream); |
| } |
| |
| for (Iterator<StorageDirectory> it = dirIterator(); it.hasNext();) { |
| StorageDirectory sd1 = it.next(); |
| if (sd.equals(sd1)) { |
| //add storage to the removed list |
| LOG.warn("FSImage:processIOError: removing storage: " |
| + sd.getRoot().getPath()); |
| try { |
| sd1.unlock(); //unlock before removing (in case it will be restored) |
| } catch (Exception e) { |
| // nothing |
| } |
| removedStorageDirs.add(sd1); |
| it.remove(); |
| break; |
| } |
| } |
| } |
| // if there are some edit log streams to remove |
| if(propagate && al != null) |
| editLog.processIOError(al, false); |
| |
| //if called from edits log, the it will call increment from there |
| if(propagate) incrementCheckpointTime(); |
| } |
| |
| public FSEditLog getEditLog() { |
| return editLog; |
| } |
| |
| public boolean isConversionNeeded(StorageDirectory sd) throws IOException { |
| File oldImageDir = new File(sd.getRoot(), "image"); |
| if (!oldImageDir.exists()) { |
| if(sd.getVersionFile().exists()) |
| throw new InconsistentFSStateException(sd.getRoot(), |
| oldImageDir + " does not exist."); |
| return false; |
| } |
| // check the layout version inside the image file |
| File oldF = new File(oldImageDir, "fsimage"); |
| RandomAccessFile oldFile = new RandomAccessFile(oldF, "rws"); |
| try { |
| oldFile.seek(0); |
| int odlVersion = oldFile.readInt(); |
| if (odlVersion < LAST_PRE_UPGRADE_LAYOUT_VERSION) |
| return false; |
| } finally { |
| oldFile.close(); |
| } |
| return true; |
| } |
| |
| // |
| // Atomic move sequence, to recover from interrupted checkpoint |
| // |
| boolean recoverInterruptedCheckpoint(StorageDirectory nameSD, |
| StorageDirectory editsSD) |
| throws IOException { |
| boolean needToSave = false; |
| File curFile = getImageFile(nameSD, NameNodeFile.IMAGE); |
| File ckptFile = getImageFile(nameSD, NameNodeFile.IMAGE_NEW); |
| |
| // |
| // If we were in the midst of a checkpoint |
| // |
| if (ckptFile.exists()) { |
| needToSave = true; |
| if (getImageFile(editsSD, NameNodeFile.EDITS_NEW).exists()) { |
| // |
| // checkpointing migth have uploaded a new |
| // merged image, but we discard it here because we are |
| // not sure whether the entire merged image was uploaded |
| // before the namenode crashed. |
| // |
| if (!ckptFile.delete()) { |
| throw new IOException("Unable to delete " + ckptFile); |
| } |
| } else { |
| // |
| // checkpointing was in progress when the namenode |
| // shutdown. The fsimage.ckpt was created and the edits.new |
| // file was moved to edits. We complete that checkpoint by |
| // moving fsimage.new to fsimage. There is no need to |
| // update the fstime file here. renameTo fails on Windows |
| // if the destination file already exists. |
| // |
| if (!ckptFile.renameTo(curFile)) { |
| if (!curFile.delete()) |
| LOG.warn("Unable to delete dir " + curFile + " before rename"); |
| if (!ckptFile.renameTo(curFile)) { |
| throw new IOException("Unable to rename " + ckptFile + |
| " to " + curFile); |
| } |
| } |
| } |
| } |
| return needToSave; |
| } |
| |
| /** |
| * Choose latest image from one of the directories, |
| * load it and merge with the edits from that directory. |
| * |
| * @return whether the image should be saved |
| * @throws IOException |
| */ |
| boolean loadFSImage() throws IOException { |
| long latestNameCheckpointTime = Long.MIN_VALUE; |
| long latestEditsCheckpointTime = Long.MIN_VALUE; |
| boolean needToSave = false; |
| isUpgradeFinalized = true; |
| |
| StorageDirectory latestNameSD = null; |
| StorageDirectory latestEditsSD = null; |
| |
| Collection<String> imageDirs = new ArrayList<String>(); |
| Collection<String> editsDirs = new ArrayList<String>(); |
| |
| // Set to determine if all of storageDirectories share the same checkpoint |
| Set<Long> checkpointTimes = new HashSet<Long>(); |
| |
| // Process each of the storage directories to find the pair of |
| // newest image file and edit file |
| for (Iterator<StorageDirectory> it = dirIterator(); it.hasNext();) { |
| StorageDirectory sd = it.next(); |
| |
| // Was the file just formatted? |
| if (!sd.getVersionFile().exists()) { |
| needToSave |= true; |
| continue; |
| } |
| |
| boolean imageExists = false; |
| boolean editsExists = false; |
| |
| // Determine if sd is image, edits or both |
| if (sd.getStorageDirType().isOfType(NameNodeDirType.IMAGE)) { |
| imageExists = getImageFile(sd, NameNodeFile.IMAGE).exists(); |
| imageDirs.add(sd.getRoot().getCanonicalPath()); |
| } |
| |
| if (sd.getStorageDirType().isOfType(NameNodeDirType.EDITS)) { |
| editsExists = getImageFile(sd, NameNodeFile.EDITS).exists(); |
| editsDirs.add(sd.getRoot().getCanonicalPath()); |
| } |
| |
| checkpointTime = readCheckpointTime(sd); |
| |
| checkpointTimes.add(checkpointTime); |
| |
| if (sd.getStorageDirType().isOfType(NameNodeDirType.IMAGE) && |
| (latestNameCheckpointTime < checkpointTime) && imageExists) { |
| latestNameCheckpointTime = checkpointTime; |
| latestNameSD = sd; |
| } |
| |
| if (sd.getStorageDirType().isOfType(NameNodeDirType.EDITS) && |
| (latestEditsCheckpointTime < checkpointTime) && editsExists) { |
| latestEditsCheckpointTime = checkpointTime; |
| latestEditsSD = sd; |
| } |
| |
| // check that we have a valid, non-default checkpointTime |
| if (checkpointTime <= 0L) |
| needToSave |= true; |
| |
| // set finalized flag |
| isUpgradeFinalized = isUpgradeFinalized && !sd.getPreviousDir().exists(); |
| } |
| |
| // We should have at least one image and one edits dirs |
| if (latestNameSD == null) |
| throw new IOException("Image file is not found in " + imageDirs); |
| if (latestEditsSD == null) |
| throw new IOException("Edits file is not found in " + editsDirs); |
| |
| // Make sure we are loading image and edits from same checkpoint |
| if (latestNameCheckpointTime > latestEditsCheckpointTime |
| && latestNameSD != latestEditsSD |
| && latestNameSD.getStorageDirType() == NameNodeDirType.IMAGE |
| && latestEditsSD.getStorageDirType() == NameNodeDirType.EDITS) { |
| // This is a rare failure when NN has image-only and edits-only |
| // storage directories, and fails right after saving images, |
| // in some of the storage directories, but before purging edits. |
| // See -NOTE- in saveNamespace(). |
| LOG.error("This is a rare failure scenario!!!"); |
| LOG.error("Image checkpoint time " + latestNameCheckpointTime + |
| " > edits checkpoint time " + latestEditsCheckpointTime); |
| LOG.error("Name-node will treat the image as the latest state of " + |
| "the namespace. Old edits will be discarded."); |
| } else if (latestNameCheckpointTime != latestEditsCheckpointTime) |
| throw new IOException("Inconsistent storage detected, " + |
| "image and edits checkpoint times do not match. " + |
| "image checkpoint time = " + latestNameCheckpointTime + |
| "edits checkpoint time = " + latestEditsCheckpointTime); |
| |
| // If there was more than one checkpointTime recorded we should save |
| needToSave |= checkpointTimes.size() != 1; |
| |
| // Recover from previous interrupted checkpoint, if any |
| needToSave |= recoverInterruptedCheckpoint(latestNameSD, latestEditsSD); |
| |
| long startTime = FSNamesystem.now(); |
| long imageSize = getImageFile(latestNameSD, NameNodeFile.IMAGE).length(); |
| |
| // |
| // Load in bits |
| // |
| latestNameSD.read(); |
| needToSave |= loadFSImage(getImageFile(latestNameSD, NameNodeFile.IMAGE)); |
| LOG.info("Image file of size " + imageSize + " loaded in " |
| + (FSNamesystem.now() - startTime)/1000 + " seconds."); |
| |
| // Load latest edits |
| if (latestNameCheckpointTime > latestEditsCheckpointTime) |
| // the image is already current, discard edits |
| needToSave |= true; |
| else // latestNameCheckpointTime == latestEditsCheckpointTime |
| needToSave |= (loadFSEdits(latestEditsSD) > 0); |
| |
| return needToSave; |
| } |
| |
| /** |
| * Load in the filesystem image from file. It's a big list of |
| * filenames and blocks. Return whether we should |
| * "re-save" and consolidate the edit-logs |
| */ |
| boolean loadFSImage(File curFile) throws IOException { |
| assert this.getLayoutVersion() < 0 : "Negative layout version is expected."; |
| assert curFile != null : "curFile is null"; |
| |
| FSNamesystem fsNamesys = getFSNamesystem(); |
| FSDirectory fsDir = fsNamesys.dir; |
| |
| // |
| // Load in bits |
| // |
| boolean needToSave = true; |
| DataInputStream in = new DataInputStream(new BufferedInputStream( |
| new FileInputStream(curFile))); |
| try { |
| /* |
| * Note: Remove any checks for version earlier than |
| * Storage.LAST_UPGRADABLE_LAYOUT_VERSION since we should never get |
| * to here with older images. |
| */ |
| |
| /* |
| * TODO we need to change format of the image file |
| * it should not contain version and namespace fields |
| */ |
| // read image version: first appeared in version -1 |
| int imgVersion = in.readInt(); |
| // read namespaceID: first appeared in version -2 |
| this.namespaceID = in.readInt(); |
| |
| // read number of files |
| long numFiles; |
| if (imgVersion <= -16) { |
| numFiles = in.readLong(); |
| } else { |
| numFiles = in.readInt(); |
| } |
| |
| this.layoutVersion = imgVersion; |
| // read in the last generation stamp. |
| if (imgVersion <= -12) { |
| long genstamp = in.readLong(); |
| fsNamesys.setGenerationStamp(genstamp); |
| } |
| |
| needToSave = (imgVersion != FSConstants.LAYOUT_VERSION); |
| |
| // read file info |
| short replication = fsNamesys.getDefaultReplication(); |
| |
| LOG.info("Number of files = " + numFiles); |
| |
| String path; |
| String parentPath = ""; |
| INodeDirectory parentINode = fsDir.rootDir; |
| for (long i = 0; i < numFiles; i++) { |
| long modificationTime = 0; |
| long atime = 0; |
| long blockSize = 0; |
| path = readString(in); |
| replication = in.readShort(); |
| replication = editLog.adjustReplication(replication); |
| modificationTime = in.readLong(); |
| if (imgVersion <= -17) { |
| atime = in.readLong(); |
| } |
| if (imgVersion <= -8) { |
| blockSize = in.readLong(); |
| } |
| int numBlocks = in.readInt(); |
| Block blocks[] = null; |
| |
| // for older versions, a blocklist of size 0 |
| // indicates a directory. |
| if ((-9 <= imgVersion && numBlocks > 0) || |
| (imgVersion < -9 && numBlocks >= 0)) { |
| blocks = new Block[numBlocks]; |
| for (int j = 0; j < numBlocks; j++) { |
| blocks[j] = new Block(); |
| if (-14 < imgVersion) { |
| blocks[j].set(in.readLong(), in.readLong(), |
| GenerationStamp.GRANDFATHER_GENERATION_STAMP); |
| } else { |
| blocks[j].readFields(in); |
| } |
| } |
| } |
| // Older versions of HDFS does not store the block size in inode. |
| // If the file has more than one block, use the size of the |
| // first block as the blocksize. Otherwise use the default block size. |
| // |
| if (-8 <= imgVersion && blockSize == 0) { |
| if (numBlocks > 1) { |
| blockSize = blocks[0].getNumBytes(); |
| } else { |
| long first = ((numBlocks == 1) ? blocks[0].getNumBytes(): 0); |
| blockSize = Math.max(fsNamesys.getDefaultBlockSize(), first); |
| } |
| } |
| |
| // get quota only when the node is a directory |
| long nsQuota = -1L; |
| if (imgVersion <= -16 && blocks == null) { |
| nsQuota = in.readLong(); |
| } |
| long dsQuota = -1L; |
| if (imgVersion <= -18 && blocks == null) { |
| dsQuota = in.readLong(); |
| } |
| |
| PermissionStatus permissions = fsNamesys.getUpgradePermission(); |
| if (imgVersion <= -11) { |
| permissions = PermissionStatus.read(in); |
| } |
| if (path.length() == 0) { // it is the root |
| // update the root's attributes |
| if (nsQuota != -1 || dsQuota != -1) { |
| fsDir.rootDir.setQuota(nsQuota, dsQuota); |
| } |
| fsDir.rootDir.setModificationTime(modificationTime); |
| fsDir.rootDir.setPermissionStatus(permissions); |
| continue; |
| } |
| // check if the new inode belongs to the same parent |
| if(!isParent(path, parentPath)) { |
| parentINode = null; |
| parentPath = getParent(path); |
| } |
| // add new inode |
| parentINode = fsDir.addToParent(path, parentINode, permissions, |
| blocks, replication, modificationTime, |
| atime, nsQuota, dsQuota, blockSize); |
| } |
| |
| // load datanode info |
| this.loadDatanodes(imgVersion, in); |
| |
| // load Files Under Construction |
| this.loadFilesUnderConstruction(imgVersion, in, fsNamesys); |
| |
| } finally { |
| in.close(); |
| } |
| |
| return needToSave; |
| } |
| |
| /** |
| * Return string representing the parent of the given path. |
| */ |
| String getParent(String path) { |
| return path.substring(0, path.lastIndexOf(Path.SEPARATOR)); |
| } |
| |
| private boolean isParent(String path, String parent) { |
| return parent != null && path != null |
| && path.indexOf(parent) == 0 |
| && path.lastIndexOf(Path.SEPARATOR) == parent.length(); |
| } |
| |
| /** |
| * Load and merge edits from two edits files |
| * |
| * @param sd storage directory |
| * @return number of edits loaded |
| * @throws IOException |
| */ |
| int loadFSEdits(StorageDirectory sd) throws IOException { |
| int numEdits = 0; |
| EditLogFileInputStream edits = |
| new EditLogFileInputStream(getImageFile(sd, NameNodeFile.EDITS)); |
| |
| numEdits = editLog.loadFSEdits(edits); |
| edits.close(); |
| File editsNew = getImageFile(sd, NameNodeFile.EDITS_NEW); |
| |
| if (editsNew.exists() && editsNew.length() > 0) { |
| edits = new EditLogFileInputStream(editsNew); |
| numEdits += editLog.loadFSEdits(edits); |
| edits.close(); |
| } |
| |
| // update the counts. |
| getFSNamesystem().dir.updateCountForINodeWithQuota(); |
| |
| return numEdits; |
| } |
| |
| /** |
| * Save the contents of the FS image to the file. |
| */ |
| void saveFSImage(File newFile) throws IOException { |
| FSNamesystem fsNamesys = getFSNamesystem(); |
| FSDirectory fsDir = fsNamesys.dir; |
| long startTime = FSNamesystem.now(); |
| // |
| // Write out data |
| // |
| DataOutputStream out = new DataOutputStream( |
| new BufferedOutputStream( |
| new FileOutputStream(newFile))); |
| try { |
| out.writeInt(FSConstants.LAYOUT_VERSION); |
| out.writeInt(namespaceID); |
| out.writeLong(fsDir.rootDir.numItemsInTree()); |
| out.writeLong(fsNamesys.getGenerationStamp()); |
| byte[] byteStore = new byte[4*FSConstants.MAX_PATH_LENGTH]; |
| ByteBuffer strbuf = ByteBuffer.wrap(byteStore); |
| // save the root |
| saveINode2Image(strbuf, fsDir.rootDir, out); |
| // save the rest of the nodes |
| saveImage(strbuf, 0, fsDir.rootDir, out); |
| fsNamesys.saveFilesUnderConstruction(out); |
| strbuf = null; |
| } finally { |
| out.close(); |
| } |
| |
| LOG.info("Image file of size " + newFile.length() + " saved in " |
| + (FSNamesystem.now() - startTime)/1000 + " seconds."); |
| } |
| |
| /** |
| * Save the contents of the FS image and create empty edits. |
| * |
| * In order to minimize the recovery effort in case of failure during |
| * saveNamespace the algorithm reduces discrepancy between directory states |
| * by performing updates in the following order: |
| * <ol> |
| * <li> rename current to lastcheckpoint.tmp for all of them,</li> |
| * <li> save image and recreate edits for all of them,</li> |
| * <li> rename lastcheckpoint.tmp to previous.checkpoint.</li> |
| * </ol> |
| * On stage (2) we first save all images, then recreate edits. |
| * Otherwise the name-node may purge all edits and fail, |
| * in which case the journal will be lost. |
| */ |
| void saveNamespace(boolean renewCheckpointTime) throws IOException { |
| assert editLog != null : "editLog must be initialized"; |
| editLog.close(); |
| if(renewCheckpointTime) |
| this.checkpointTime = FSNamesystem.now(); |
| ArrayList<StorageDirectory> errorSDs = new ArrayList<StorageDirectory>(); |
| |
| // mv current -> lastcheckpoint.tmp |
| for (Iterator<StorageDirectory> it = dirIterator(); it.hasNext();) { |
| StorageDirectory sd = it.next(); |
| try { |
| moveCurrent(sd); |
| } catch(IOException ie) { |
| LOG.error("Unable to move current for " + sd.getRoot(), ie); |
| errorSDs.add(sd); |
| } |
| } |
| |
| // save images into current |
| for (Iterator<StorageDirectory> it = dirIterator(NameNodeDirType.IMAGE); |
| it.hasNext();) { |
| StorageDirectory sd = it.next(); |
| try { |
| saveCurrent(sd); |
| } catch(IOException ie) { |
| LOG.error("Unable to save image for " + sd.getRoot(), ie); |
| errorSDs.add(sd); |
| } |
| } |
| |
| // -NOTE- |
| // If NN has image-only and edits-only storage directories and fails here |
| // the image will have the latest namespace state. |
| // During startup the image-only directories will recover by discarding |
| // lastcheckpoint.tmp, while |
| // the edits-only directories will recover by falling back |
| // to the old state contained in their lastcheckpoint.tmp. |
| // The edits directories should be discarded during startup because their |
| // checkpointTime is older than that of image directories. |
| |
| // recreate edits in current |
| for (Iterator<StorageDirectory> it = dirIterator(NameNodeDirType.EDITS); |
| it.hasNext();) { |
| StorageDirectory sd = it.next(); |
| try { |
| saveCurrent(sd); |
| } catch(IOException ie) { |
| LOG.error("Unable to save edits for " + sd.getRoot(), ie); |
| errorSDs.add(sd); |
| } |
| } |
| // mv lastcheckpoint.tmp -> previous.checkpoint |
| for (Iterator<StorageDirectory> it = dirIterator(); it.hasNext();) { |
| StorageDirectory sd = it.next(); |
| try { |
| moveLastCheckpoint(sd); |
| } catch(IOException ie) { |
| LOG.error("Unable to move last checkpoint for " + sd.getRoot(), ie); |
| errorSDs.add(sd); |
| } |
| } |
| processIOError(errorSDs, false); |
| if(!editLog.isOpen()) editLog.open(); |
| ckptState = CheckpointStates.UPLOAD_DONE; |
| } |
| |
| /** |
| * Save current image and empty journal into {@code current} directory. |
| */ |
| protected void saveCurrent(StorageDirectory sd) throws IOException { |
| File curDir = sd.getCurrentDir(); |
| NameNodeDirType dirType = (NameNodeDirType)sd.getStorageDirType(); |
| // save new image or new edits |
| if (!curDir.exists() && !curDir.mkdir()) |
| throw new IOException("Cannot create directory " + curDir); |
| if (dirType.isOfType(NameNodeDirType.IMAGE)) |
| saveFSImage(getImageFile(sd, NameNodeFile.IMAGE)); |
| if (dirType.isOfType(NameNodeDirType.EDITS)) |
| editLog.createEditLogFile(getImageFile(sd, NameNodeFile.EDITS)); |
| // write version and time files |
| sd.write(); |
| } |
| |
| /** |
| * Move {@code current} to {@code lastcheckpoint.tmp} and |
| * recreate empty {@code current}. |
| * {@code current} is moved only if it is well formatted, |
| * that is contains VERSION file. |
| * |
| * @see org.apache.hadoop.hdfs.server.common.Storage.StorageDirectory#getLastCheckpointTmp() |
| * @see org.apache.hadoop.hdfs.server.common.Storage.StorageDirectory#getPreviousCheckpoint() |
| */ |
| protected void moveCurrent(StorageDirectory sd) |
| throws IOException { |
| File curDir = sd.getCurrentDir(); |
| File tmpCkptDir = sd.getLastCheckpointTmp(); |
| // mv current -> lastcheckpoint.tmp |
| // only if current is formatted - has VERSION file |
| if(sd.getVersionFile().exists()) { |
| assert curDir.exists() : curDir + " directory must exist."; |
| assert !tmpCkptDir.exists() : tmpCkptDir + " directory must not exist."; |
| rename(curDir, tmpCkptDir); |
| } |
| // recreate current |
| if(!curDir.exists() && !curDir.mkdir()) |
| throw new IOException("Cannot create directory " + curDir); |
| } |
| |
| /** |
| * Move {@code lastcheckpoint.tmp} to {@code previous.checkpoint} |
| * |
| * @see org.apache.hadoop.hdfs.server.common.Storage.StorageDirectory#getPreviousCheckpoint() |
| * @see org.apache.hadoop.hdfs.server.common.Storage.StorageDirectory#getLastCheckpointTmp() |
| */ |
| protected void moveLastCheckpoint(StorageDirectory sd) |
| throws IOException { |
| File tmpCkptDir = sd.getLastCheckpointTmp(); |
| File prevCkptDir = sd.getPreviousCheckpoint(); |
| // remove previous.checkpoint |
| if (prevCkptDir.exists()) |
| deleteDir(prevCkptDir); |
| // rename lastcheckpoint.tmp -> previous.checkpoint |
| if(tmpCkptDir.exists()) |
| rename(tmpCkptDir, prevCkptDir); |
| } |
| |
| /** |
| * Generate new namespaceID. |
| * |
| * namespaceID is a persistent attribute of the namespace. |
| * It is generated when the namenode is formatted and remains the same |
| * during the life cycle of the namenode. |
| * When a datanodes register they receive it as the registrationID, |
| * which is checked every time the datanode is communicating with the |
| * namenode. Datanodes that do not 'know' the namespaceID are rejected. |
| * |
| * @return new namespaceID |
| */ |
| private int newNamespaceID() { |
| Random r = new Random(); |
| r.setSeed(FSNamesystem.now()); |
| int newID = 0; |
| while(newID == 0) |
| newID = r.nextInt(0x7FFFFFFF); // use 31 bits only |
| return newID; |
| } |
| |
| /** Create new dfs name directory. Caution: this destroys all files |
| * in this filesystem. */ |
| void format(StorageDirectory sd) throws IOException { |
| sd.clearDirectory(); // create currrent dir |
| sd.lock(); |
| try { |
| saveCurrent(sd); |
| } finally { |
| sd.unlock(); |
| } |
| LOG.info("Storage directory " + sd.getRoot() |
| + " has been successfully formatted."); |
| } |
| |
| public void format() throws IOException { |
| this.layoutVersion = FSConstants.LAYOUT_VERSION; |
| this.namespaceID = newNamespaceID(); |
| this.cTime = 0L; |
| this.checkpointTime = FSNamesystem.now(); |
| for (Iterator<StorageDirectory> it = |
| dirIterator(); it.hasNext();) { |
| StorageDirectory sd = it.next(); |
| format(sd); |
| } |
| } |
| |
| /* |
| * Save one inode's attributes to the image. |
| */ |
| private static void saveINode2Image(ByteBuffer name, |
| INode node, |
| DataOutputStream out) throws IOException { |
| int nameLen = name.position(); |
| out.writeShort(nameLen); |
| out.write(name.array(), name.arrayOffset(), nameLen); |
| if (!node.isDirectory()) { // write file inode |
| INodeFile fileINode = (INodeFile)node; |
| out.writeShort(fileINode.getReplication()); |
| out.writeLong(fileINode.getModificationTime()); |
| out.writeLong(fileINode.getAccessTime()); |
| out.writeLong(fileINode.getPreferredBlockSize()); |
| Block[] blocks = fileINode.getBlocks(); |
| out.writeInt(blocks.length); |
| for (Block blk : blocks) |
| blk.write(out); |
| FILE_PERM.fromShort(fileINode.getFsPermissionShort()); |
| PermissionStatus.write(out, fileINode.getUserName(), |
| fileINode.getGroupName(), |
| FILE_PERM); |
| } else { // write directory inode |
| out.writeShort(0); // replication |
| out.writeLong(node.getModificationTime()); |
| out.writeLong(0); // access time |
| out.writeLong(0); // preferred block size |
| out.writeInt(-1); // # of blocks |
| out.writeLong(node.getNsQuota()); |
| out.writeLong(node.getDsQuota()); |
| FILE_PERM.fromShort(node.getFsPermissionShort()); |
| PermissionStatus.write(out, node.getUserName(), |
| node.getGroupName(), |
| FILE_PERM); |
| } |
| } |
| /** |
| * Save file tree image starting from the given root. |
| * This is a recursive procedure, which first saves all children of |
| * a current directory and then moves inside the sub-directories. |
| */ |
| private static void saveImage(ByteBuffer parentPrefix, |
| int prefixLength, |
| INodeDirectory current, |
| DataOutputStream out) throws IOException { |
| int newPrefixLength = prefixLength; |
| if (current.getChildrenRaw() == null) |
| return; |
| for(INode child : current.getChildren()) { |
| // print all children first |
| parentPrefix.position(prefixLength); |
| parentPrefix.put(PATH_SEPARATOR).put(child.getLocalNameBytes()); |
| saveINode2Image(parentPrefix, child, out); |
| } |
| for(INode child : current.getChildren()) { |
| if(!child.isDirectory()) |
| continue; |
| parentPrefix.position(prefixLength); |
| parentPrefix.put(PATH_SEPARATOR).put(child.getLocalNameBytes()); |
| newPrefixLength = parentPrefix.position(); |
| saveImage(parentPrefix, newPrefixLength, (INodeDirectory)child, out); |
| } |
| parentPrefix.position(prefixLength); |
| } |
| |
| void loadDatanodes(int version, DataInputStream in) throws IOException { |
| if (version > -3) // pre datanode image version |
| return; |
| if (version <= -12) { |
| return; // new versions do not store the datanodes any more. |
| } |
| int size = in.readInt(); |
| for(int i = 0; i < size; i++) { |
| DatanodeImage nodeImage = new DatanodeImage(); |
| nodeImage.readFields(in); |
| // We don't need to add these descriptors any more. |
| } |
| } |
| |
| private void loadFilesUnderConstruction(int version, DataInputStream in, |
| FSNamesystem fs) throws IOException { |
| |
| FSDirectory fsDir = fs.dir; |
| if (version > -13) // pre lease image version |
| return; |
| int size = in.readInt(); |
| |
| LOG.info("Number of files under construction = " + size); |
| |
| for (int i = 0; i < size; i++) { |
| INodeFileUnderConstruction cons = readINodeUnderConstruction(in); |
| |
| // verify that file exists in namespace |
| String path = cons.getLocalName(); |
| INode old = fsDir.getFileINode(path); |
| if (old == null) { |
| throw new IOException("Found lease for non-existent file " + path); |
| } |
| if (old.isDirectory()) { |
| throw new IOException("Found lease for directory " + path); |
| } |
| INodeFile oldnode = (INodeFile) old; |
| fsDir.replaceNode(path, oldnode, cons); |
| fs.leaseManager.addLease(cons.getClientName(), path); |
| } |
| } |
| |
| // Helper function that reads in an INodeUnderConstruction |
| // from the input stream |
| // |
| static INodeFileUnderConstruction readINodeUnderConstruction( |
| DataInputStream in) throws IOException { |
| byte[] name = readBytes(in); |
| short blockReplication = in.readShort(); |
| long modificationTime = in.readLong(); |
| long preferredBlockSize = in.readLong(); |
| int numBlocks = in.readInt(); |
| BlockInfo[] blocks = new BlockInfo[numBlocks]; |
| Block blk = new Block(); |
| int i = 0; |
| for (; i < numBlocks-1; i++) { |
| blk.readFields(in); |
| blocks[i] = new BlockInfo(blk, blockReplication); |
| } |
| // last block is UNDER_CONSTRUCTION |
| if(numBlocks > 0) { |
| blk.readFields(in); |
| blocks[i] = new BlockInfoUnderConstruction( |
| blk, blockReplication, BlockUCState.UNDER_CONSTRUCTION, null); |
| } |
| PermissionStatus perm = PermissionStatus.read(in); |
| String clientName = readString(in); |
| String clientMachine = readString(in); |
| |
| // These locations are not used at all |
| int numLocs = in.readInt(); |
| DatanodeDescriptor[] locations = new DatanodeDescriptor[numLocs]; |
| for (i = 0; i < numLocs; i++) { |
| locations[i] = new DatanodeDescriptor(); |
| locations[i].readFields(in); |
| } |
| |
| return new INodeFileUnderConstruction(name, |
| blockReplication, |
| modificationTime, |
| preferredBlockSize, |
| blocks, |
| perm, |
| clientName, |
| clientMachine, |
| null); |
| } |
| |
| // Helper function that writes an INodeUnderConstruction |
| // into the input stream |
| // |
| static void writeINodeUnderConstruction(DataOutputStream out, |
| INodeFileUnderConstruction cons, |
| String path) |
| throws IOException { |
| writeString(path, out); |
| out.writeShort(cons.getReplication()); |
| out.writeLong(cons.getModificationTime()); |
| out.writeLong(cons.getPreferredBlockSize()); |
| int nrBlocks = cons.getBlocks().length; |
| out.writeInt(nrBlocks); |
| for (int i = 0; i < nrBlocks; i++) { |
| cons.getBlocks()[i].write(out); |
| } |
| cons.getPermissionStatus().write(out); |
| writeString(cons.getClientName(), out); |
| writeString(cons.getClientMachine(), out); |
| |
| out.writeInt(0); // do not store locations of last block |
| } |
| |
| /** |
| * Moves fsimage.ckpt to fsImage and edits.new to edits |
| * Reopens the new edits file. |
| */ |
| void rollFSImage() throws IOException { |
| rollFSImage(true); |
| } |
| |
| void rollFSImage(boolean renewCheckpointTime) throws IOException { |
| if (ckptState != CheckpointStates.UPLOAD_DONE |
| && !(ckptState == CheckpointStates.ROLLED_EDITS |
| && getNumStorageDirs(NameNodeDirType.IMAGE) == 0)) { |
| throw new IOException("Cannot roll fsImage before rolling edits log."); |
| } |
| |
| for (Iterator<StorageDirectory> it = |
| dirIterator(NameNodeDirType.IMAGE); it.hasNext();) { |
| StorageDirectory sd = it.next(); |
| File ckpt = getImageFile(sd, NameNodeFile.IMAGE_NEW); |
| if (!ckpt.exists()) { |
| throw new IOException("Checkpoint file " + ckpt + |
| " does not exist"); |
| } |
| } |
| editLog.purgeEditLog(); // renamed edits.new to edits |
| LOG.debug("rollFSImage after purgeEditLog: storageList=" + listStorageDirectories()); |
| // |
| // Renames new image |
| // |
| renameCheckpoint(); |
| resetVersion(renewCheckpointTime); |
| } |
| |
| /** |
| * Renames new image |
| */ |
| void renameCheckpoint() { |
| ArrayList<StorageDirectory> al = null; |
| for (Iterator<StorageDirectory> it = |
| dirIterator(NameNodeDirType.IMAGE); it.hasNext();) { |
| StorageDirectory sd = it.next(); |
| File ckpt = getImageFile(sd, NameNodeFile.IMAGE_NEW); |
| File curFile = getImageFile(sd, NameNodeFile.IMAGE); |
| // renameTo fails on Windows if the destination file |
| // already exists. |
| LOG.debug("renaming " + ckpt.getAbsolutePath() + " to " + curFile.getAbsolutePath()); |
| if (!ckpt.renameTo(curFile)) { |
| if (!curFile.delete() || !ckpt.renameTo(curFile)) { |
| LOG.warn("renaming " + ckpt.getAbsolutePath() + " to " + |
| curFile.getAbsolutePath() + " FAILED"); |
| |
| if(al == null) al = new ArrayList<StorageDirectory> (1); |
| al.add(sd); |
| } |
| } |
| } |
| if(al != null) processIOError(al, true); |
| } |
| |
| /** |
| * Updates version and fstime files in all directories (fsimage and edits). |
| */ |
| void resetVersion(boolean renewCheckpointTime) throws IOException { |
| this.layoutVersion = FSConstants.LAYOUT_VERSION; |
| if(renewCheckpointTime) |
| this.checkpointTime = FSNamesystem.now(); |
| |
| ArrayList<StorageDirectory> al = null; |
| for (Iterator<StorageDirectory> it = |
| dirIterator(); it.hasNext();) { |
| StorageDirectory sd = it.next(); |
| // delete old edits if sd is the image only the directory |
| if (!sd.getStorageDirType().isOfType(NameNodeDirType.EDITS)) { |
| File editsFile = getImageFile(sd, NameNodeFile.EDITS); |
| if(editsFile.exists() && !editsFile.delete()) |
| throw new IOException("Cannot delete edits file " |
| + editsFile.getCanonicalPath()); |
| } |
| // delete old fsimage if sd is the edits only the directory |
| if (!sd.getStorageDirType().isOfType(NameNodeDirType.IMAGE)) { |
| File imageFile = getImageFile(sd, NameNodeFile.IMAGE); |
| if(imageFile.exists() && !imageFile.delete()) |
| throw new IOException("Cannot delete image file " |
| + imageFile.getCanonicalPath()); |
| } |
| try { |
| sd.write(); |
| } catch (IOException e) { |
| LOG.error("Cannot write file " + sd.getRoot(), e); |
| |
| if(al == null) al = new ArrayList<StorageDirectory> (1); |
| al.add(sd); |
| } |
| } |
| if(al != null) processIOError(al, true); |
| ckptState = FSImage.CheckpointStates.START; |
| } |
| |
| CheckpointSignature rollEditLog() throws IOException { |
| getEditLog().rollEditLog(); |
| ckptState = CheckpointStates.ROLLED_EDITS; |
| // If checkpoint fails this should be the most recent image, therefore |
| incrementCheckpointTime(); |
| return new CheckpointSignature(this); |
| } |
| |
| /** |
| * This is called just before a new checkpoint is uploaded to the |
| * namenode. |
| */ |
| void validateCheckpointUpload(CheckpointSignature sig) throws IOException { |
| if (ckptState != CheckpointStates.ROLLED_EDITS) { |
| throw new IOException("Namenode is not expecting an new image " + |
| ckptState); |
| } |
| // verify token |
| long modtime = getEditLog().getFsEditTime(); |
| if (sig.editsTime != modtime) { |
| throw new IOException("Namenode has an edit log with timestamp of " + |
| DATE_FORM.format(new Date(modtime)) + |
| " but new checkpoint was created using editlog " + |
| " with timestamp " + |
| DATE_FORM.format(new Date(sig.editsTime)) + |
| ". Checkpoint Aborted."); |
| } |
| sig.validateStorageInfo(this); |
| ckptState = FSImage.CheckpointStates.UPLOAD_START; |
| } |
| |
| /** |
| * Start checkpoint. |
| * <p> |
| * If backup storage contains image that is newer than or incompatible with |
| * what the active name-node has, then the backup node should shutdown.<br> |
| * If the backup image is older than the active one then it should |
| * be discarded and downloaded from the active node.<br> |
| * If the images are the same then the backup image will be used as current. |
| * |
| * @param bnReg the backup node registration. |
| * @param nnReg this (active) name-node registration. |
| * @return {@link NamenodeCommand} if backup node should shutdown or |
| * {@link CheckpointCommand} prescribing what backup node should |
| * do with its image. |
| * @throws IOException |
| */ |
| NamenodeCommand startCheckpoint(NamenodeRegistration bnReg, // backup node |
| NamenodeRegistration nnReg) // active name-node |
| throws IOException { |
| String msg = null; |
| // Verify that checkpoint is allowed |
| if(bnReg.getNamespaceID() != this.getNamespaceID()) |
| msg = "Name node " + bnReg.getAddress() |
| + " has incompatible namespace id: " + bnReg.getNamespaceID() |
| + " expected: " + getNamespaceID(); |
| else if(bnReg.isRole(NamenodeRole.ACTIVE)) |
| msg = "Name node " + bnReg.getAddress() |
| + " role " + bnReg.getRole() + ": checkpoint is not allowed."; |
| else if(bnReg.getLayoutVersion() < this.getLayoutVersion() |
| || (bnReg.getLayoutVersion() == this.getLayoutVersion() |
| && bnReg.getCTime() > this.getCTime()) |
| || (bnReg.getLayoutVersion() == this.getLayoutVersion() |
| && bnReg.getCTime() == this.getCTime() |
| && bnReg.getCheckpointTime() > this.checkpointTime)) |
| // remote node has newer image age |
| msg = "Name node " + bnReg.getAddress() |
| + " has newer image layout version: LV = " +bnReg.getLayoutVersion() |
| + " cTime = " + bnReg.getCTime() |
| + " checkpointTime = " + bnReg.getCheckpointTime() |
| + ". Current version: LV = " + getLayoutVersion() |
| + " cTime = " + getCTime() |
| + " checkpointTime = " + checkpointTime; |
| if(msg != null) { |
| LOG.error(msg); |
| return new NamenodeCommand(NamenodeProtocol.ACT_SHUTDOWN); |
| } |
| boolean isImgObsolete = true; |
| if(bnReg.getLayoutVersion() == this.getLayoutVersion() |
| && bnReg.getCTime() == this.getCTime() |
| && bnReg.getCheckpointTime() == this.checkpointTime) |
| isImgObsolete = false; |
| boolean needToReturnImg = true; |
| if(getNumStorageDirs(NameNodeDirType.IMAGE) == 0) |
| // do not return image if there are no image directories |
| needToReturnImg = false; |
| CheckpointSignature sig = rollEditLog(); |
| getEditLog().logJSpoolStart(bnReg, nnReg); |
| return new CheckpointCommand(sig, isImgObsolete, needToReturnImg); |
| } |
| |
| /** |
| * End checkpoint. |
| * <p> |
| * Rename uploaded checkpoint to the new image; |
| * purge old edits file; |
| * rename edits.new to edits; |
| * redirect edit log streams to the new edits; |
| * update checkpoint time if the remote node is a checkpoint only node. |
| * |
| * @param sig |
| * @param remoteNNRole |
| * @throws IOException |
| */ |
| void endCheckpoint(CheckpointSignature sig, |
| NamenodeRole remoteNNRole) throws IOException { |
| sig.validateStorageInfo(this); |
| // Renew checkpoint time for the active if the other is a checkpoint-node. |
| // The checkpoint-node should have older image for the next checkpoint |
| // to take effect. |
| // The backup-node always has up-to-date image and will have the same |
| // checkpoint time as the active node. |
| boolean renewCheckpointTime = remoteNNRole.equals(NamenodeRole.CHECKPOINT); |
| rollFSImage(renewCheckpointTime); |
| } |
| |
| CheckpointStates getCheckpointState() { |
| return ckptState; |
| } |
| |
| void setCheckpointState(CheckpointStates cs) { |
| ckptState = cs; |
| } |
| |
| /** |
| * This is called when a checkpoint upload finishes successfully. |
| */ |
| synchronized void checkpointUploadDone() { |
| ckptState = CheckpointStates.UPLOAD_DONE; |
| } |
| |
| synchronized void close() throws IOException { |
| getEditLog().close(); |
| unlockAll(); |
| } |
| |
| /** |
| * Return the name of the image file. |
| */ |
| File getFsImageName() { |
| StorageDirectory sd = null; |
| for (Iterator<StorageDirectory> it = |
| dirIterator(NameNodeDirType.IMAGE); it.hasNext();) { |
| sd = it.next(); |
| if(sd.getRoot().canRead()) |
| return getImageFile(sd, NameNodeFile.IMAGE); |
| } |
| return null; |
| } |
| |
| /** |
| * See if any of removed storages iw "writable" again, and can be returned |
| * into service |
| */ |
| synchronized void attemptRestoreRemovedStorage() { |
| // if directory is "alive" - copy the images there... |
| if(!restoreFailedStorage || removedStorageDirs.size() == 0) |
| return; //nothing to restore |
| |
| LOG.info("FSImage.attemptRestoreRemovedStorage: check removed(failed) " + |
| "storarge. removedStorages size = " + removedStorageDirs.size()); |
| for(Iterator<StorageDirectory> it = this.removedStorageDirs.iterator(); it.hasNext();) { |
| StorageDirectory sd = it.next(); |
| File root = sd.getRoot(); |
| LOG.info("currently disabled dir " + root.getAbsolutePath() + |
| "; type="+sd.getStorageDirType() + ";canwrite="+root.canWrite()); |
| try { |
| |
| if(root.exists() && root.canWrite()) { |
| format(sd); |
| LOG.info("restoring dir " + sd.getRoot().getAbsolutePath()); |
| if(sd.getStorageDirType().isOfType(NameNodeDirType.EDITS)) { |
| File eFile = getEditFile(sd); |
| editLog.addNewEditLogStream(eFile); |
| } |
| this.addStorageDir(sd); // restore |
| it.remove(); |
| } |
| } catch(IOException e) { |
| LOG.warn("failed to restore " + sd.getRoot().getAbsolutePath(), e); |
| } |
| } |
| } |
| |
| public File getFsEditName() throws IOException { |
| return getEditLog().getFsEditName(); |
| } |
| |
| File getFsTimeName() { |
| StorageDirectory sd = null; |
| // NameNodeFile.TIME shoul be same on all directories |
| for (Iterator<StorageDirectory> it = |
| dirIterator(); it.hasNext();) |
| sd = it.next(); |
| return getImageFile(sd, NameNodeFile.TIME); |
| } |
| |
| /** |
| * Return the name of the image file that is uploaded by periodic |
| * checkpointing. |
| */ |
| File[] getFsImageNameCheckpoint() { |
| ArrayList<File> list = new ArrayList<File>(); |
| for (Iterator<StorageDirectory> it = |
| dirIterator(NameNodeDirType.IMAGE); it.hasNext();) { |
| list.add(getImageFile(it.next(), NameNodeFile.IMAGE_NEW)); |
| } |
| return list.toArray(new File[list.size()]); |
| } |
| |
| /** |
| * DatanodeImage is used to store persistent information |
| * about datanodes into the fsImage. |
| */ |
| static class DatanodeImage implements Writable { |
| DatanodeDescriptor node = new DatanodeDescriptor(); |
| |
| ///////////////////////////////////////////////// |
| // Writable |
| ///////////////////////////////////////////////// |
| /** |
| * Public method that serializes the information about a |
| * Datanode to be stored in the fsImage. |
| */ |
| public void write(DataOutput out) throws IOException { |
| new DatanodeID(node).write(out); |
| out.writeLong(node.getCapacity()); |
| out.writeLong(node.getRemaining()); |
| out.writeLong(node.getLastUpdate()); |
| out.writeInt(node.getXceiverCount()); |
| } |
| |
| /** |
| * Public method that reads a serialized Datanode |
| * from the fsImage. |
| */ |
| public void readFields(DataInput in) throws IOException { |
| DatanodeID id = new DatanodeID(); |
| id.readFields(in); |
| long capacity = in.readLong(); |
| long remaining = in.readLong(); |
| long lastUpdate = in.readLong(); |
| int xceiverCount = in.readInt(); |
| |
| // update the DatanodeDescriptor with the data we read in |
| node.updateRegInfo(id); |
| node.setStorageID(id.getStorageID()); |
| node.setCapacity(capacity); |
| node.setRemaining(remaining); |
| node.setLastUpdate(lastUpdate); |
| node.setXceiverCount(xceiverCount); |
| } |
| } |
| |
| protected void corruptPreUpgradeStorage(File rootDir) throws IOException { |
| File oldImageDir = new File(rootDir, "image"); |
| if (!oldImageDir.exists()) |
| if (!oldImageDir.mkdir()) |
| throw new IOException("Cannot create directory " + oldImageDir); |
| File oldImage = new File(oldImageDir, "fsimage"); |
| if (!oldImage.exists()) |
| // recreate old image file to let pre-upgrade versions fail |
| if (!oldImage.createNewFile()) |
| throw new IOException("Cannot create file " + oldImage); |
| RandomAccessFile oldFile = new RandomAccessFile(oldImage, "rws"); |
| // write new version into old image file |
| try { |
| writeCorruptedData(oldFile); |
| } finally { |
| oldFile.close(); |
| } |
| } |
| |
| private boolean getDistributedUpgradeState() { |
| FSNamesystem ns = getFSNamesystem(); |
| return ns == null ? false : ns.getDistributedUpgradeState(); |
| } |
| |
| private int getDistributedUpgradeVersion() { |
| FSNamesystem ns = getFSNamesystem(); |
| return ns == null ? 0 : ns.getDistributedUpgradeVersion(); |
| } |
| |
| private void setDistributedUpgradeState(boolean uState, int uVersion) { |
| getFSNamesystem().upgradeManager.setUpgradeState(uState, uVersion); |
| } |
| |
| private void verifyDistributedUpgradeProgress(StartupOption startOpt |
| ) throws IOException { |
| if(startOpt == StartupOption.ROLLBACK || startOpt == StartupOption.IMPORT) |
| return; |
| UpgradeManager um = getFSNamesystem().upgradeManager; |
| assert um != null : "FSNameSystem.upgradeManager is null."; |
| if(startOpt != StartupOption.UPGRADE) { |
| if(um.getUpgradeState()) |
| throw new IOException( |
| "\n Previous distributed upgrade was not completed. " |
| + "\n Please restart NameNode with -upgrade option."); |
| if(um.getDistributedUpgrades() != null) |
| throw new IOException("\n Distributed upgrade for NameNode version " |
| + um.getUpgradeVersion() + " to current LV " + FSConstants.LAYOUT_VERSION |
| + " is required.\n Please restart NameNode with -upgrade option."); |
| } |
| } |
| |
| private void initializeDistributedUpgrade() throws IOException { |
| UpgradeManagerNamenode um = getFSNamesystem().upgradeManager; |
| if(! um.initializeUpgrade()) |
| return; |
| // write new upgrade state into disk |
| writeAll(); |
| NameNode.LOG.info("\n Distributed upgrade for NameNode version " |
| + um.getUpgradeVersion() + " to current LV " |
| + FSConstants.LAYOUT_VERSION + " is initialized."); |
| } |
| |
| /** |
| * Retrieve checkpoint dirs from configuration. |
| * |
| * @param conf the Configuration |
| * @param defaultValue a default value for the attribute, if null |
| * @return a Collection of URIs representing the values in |
| * fs.checkpoint.dir configuration property |
| */ |
| static Collection<URI> getCheckpointDirs(Configuration conf, |
| String defaultValue) { |
| Collection<String> dirNames = conf.getStringCollection(DFSConfigKeys.DFS_NAMENODE_CHECKPOINT_DIR_KEY); |
| if (dirNames.size() == 0 && defaultValue != null) { |
| dirNames.add(defaultValue); |
| } |
| return Util.stringCollectionAsURIs(dirNames); |
| } |
| |
| static Collection<URI> getCheckpointEditsDirs(Configuration conf, |
| String defaultName) { |
| Collection<String> dirNames = |
| conf.getStringCollection(DFSConfigKeys.DFS_NAMENODE_CHECKPOINT_EDITS_DIR_KEY); |
| if (dirNames.size() == 0 && defaultName != null) { |
| dirNames.add(defaultName); |
| } |
| return Util.stringCollectionAsURIs(dirNames); |
| } |
| |
| static private final DeprecatedUTF8 U_STR = new DeprecatedUTF8(); |
| // This should be reverted to package private once the ImageLoader |
| // code is moved into this package. This method should not be called |
| // by other code. |
| public static String readString(DataInputStream in) throws IOException { |
| U_STR.readFields(in); |
| return U_STR.toString(); |
| } |
| |
| static String readString_EmptyAsNull(DataInputStream in) throws IOException { |
| final String s = readString(in); |
| return s.isEmpty()? null: s; |
| } |
| |
| // Same comments apply for this method as for readString() |
| public static byte[] readBytes(DataInputStream in) throws IOException { |
| U_STR.readFields(in); |
| int len = U_STR.getLength(); |
| byte[] bytes = new byte[len]; |
| System.arraycopy(U_STR.getBytes(), 0, bytes, 0, len); |
| return bytes; |
| } |
| |
| static void writeString(String str, DataOutputStream out) throws IOException { |
| U_STR.set(str); |
| U_STR.write(out); |
| } |
| } |