blob: cef7f015428dc87f926998dfcb9314dd556903a5 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hdfs.server.namenode;
import static org.apache.hadoop.hdfs.server.common.Util.now;
import java.io.DataOutputStream;
import java.io.File;
import java.io.FileOutputStream;
import java.io.IOException;
import java.io.DataInputStream;
import java.io.FileInputStream;
import java.io.Closeable;
import java.net.URI;
import java.net.UnknownHostException;
import java.security.NoSuchAlgorithmException;
import java.security.SecureRandom;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Iterator;
import java.util.List;
import java.util.Random;
import java.util.Properties;
import java.util.UUID;
import java.io.RandomAccessFile;
import java.util.concurrent.CopyOnWriteArrayList;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.hdfs.protocol.FSConstants;
import org.apache.hadoop.hdfs.protocol.LayoutVersion;
import org.apache.hadoop.hdfs.protocol.LayoutVersion.Feature;
import org.apache.hadoop.hdfs.server.common.Storage;
import org.apache.hadoop.hdfs.server.common.StorageInfo;
import org.apache.hadoop.hdfs.server.common.UpgradeManager;
import org.apache.hadoop.hdfs.server.common.Util;
import org.apache.hadoop.hdfs.server.common.HdfsConstants.NodeType;
import org.apache.hadoop.hdfs.server.common.HdfsConstants.StartupOption;
import org.apache.hadoop.hdfs.server.common.InconsistentFSStateException;
import org.apache.hadoop.hdfs.server.namenode.JournalStream.JournalType;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.io.MD5Hash;
import org.apache.hadoop.net.DNS;
/**
* NNStorage is responsible for management of the StorageDirectories used by
* the NameNode.
*/
@InterfaceAudience.Private
public class NNStorage extends Storage implements Closeable {
private static final Log LOG = LogFactory.getLog(NNStorage.class.getName());
static final String MESSAGE_DIGEST_PROPERTY = "imageMD5Digest";
//
// The filenames used for storing the images
//
enum NameNodeFile {
IMAGE ("fsimage"),
TIME ("fstime"),
EDITS ("edits"),
IMAGE_NEW ("fsimage.ckpt"),
EDITS_NEW ("edits.new");
private String fileName = null;
private NameNodeFile(String name) { this.fileName = name; }
String getName() { return fileName; }
}
/**
* Implementation of StorageDirType specific to namenode storage
* A Storage directory could be of type IMAGE which stores only fsimage,
* or of type EDITS which stores edits or of type IMAGE_AND_EDITS which
* stores both fsimage and edits.
*/
static enum NameNodeDirType implements StorageDirType {
UNDEFINED,
IMAGE,
EDITS,
IMAGE_AND_EDITS;
public StorageDirType getStorageDirType() {
return this;
}
public boolean isOfType(StorageDirType type) {
if ((this == IMAGE_AND_EDITS) && (type == IMAGE || type == EDITS))
return true;
return this == type;
}
}
/**
* Interface to be implemented by classes which make use of storage
* directories. They are notified when a StorageDirectory is causing errors,
* becoming available or being formatted.
*
* This allows the implementors of the interface take their own specific
* action on the StorageDirectory when this occurs.
*/
interface NNStorageListener {
/**
* An error has occurred with a StorageDirectory.
* @param sd The storage directory causing the error.
* @throws IOException
*/
void errorOccurred(StorageDirectory sd) throws IOException;
/**
* A storage directory has been formatted.
* @param sd The storage directory being formatted.
* @throws IOException
*/
void formatOccurred(StorageDirectory sd) throws IOException;
/**
* A storage directory is now available use.
* @param sd The storage directory which has become available.
* @throws IOException
*/
void directoryAvailable(StorageDirectory sd) throws IOException;
}
final private List<NNStorageListener> listeners;
private UpgradeManager upgradeManager = null;
protected MD5Hash imageDigest = null;
protected String blockpoolID = ""; // id of the block pool
/**
* flag that controls if we try to restore failed storages
*/
private boolean restoreFailedStorage = false;
private Object restorationLock = new Object();
private boolean disablePreUpgradableLayoutCheck = false;
private long checkpointTime = -1L; // The age of the image
/**
* list of failed (and thus removed) storages
*/
final protected List<StorageDirectory> removedStorageDirs
= new CopyOnWriteArrayList<StorageDirectory>();
/**
* Construct the NNStorage.
* @param conf Namenode configuration.
*/
public NNStorage(Configuration conf) {
super(NodeType.NAME_NODE);
storageDirs = new CopyOnWriteArrayList<StorageDirectory>();
this.listeners = new CopyOnWriteArrayList<NNStorageListener>();
}
/**
* Construct the NNStorage.
* @param storageInfo storage information
* @param bpid block pool Id
*/
public NNStorage(StorageInfo storageInfo, String bpid) {
super(NodeType.NAME_NODE, storageInfo);
storageDirs = new CopyOnWriteArrayList<StorageDirectory>();
this.listeners = new CopyOnWriteArrayList<NNStorageListener>();
this.blockpoolID = bpid;
}
@Override // Storage
public boolean isPreUpgradableLayout(StorageDirectory sd) throws IOException {
if (disablePreUpgradableLayoutCheck) {
return false;
}
File oldImageDir = new File(sd.getRoot(), "image");
if (!oldImageDir.exists()) {
return false;
}
// check the layout version inside the image file
File oldF = new File(oldImageDir, "fsimage");
RandomAccessFile oldFile = new RandomAccessFile(oldF, "rws");
try {
oldFile.seek(0);
int oldVersion = oldFile.readInt();
if (oldVersion < LAST_PRE_UPGRADE_LAYOUT_VERSION)
return false;
} finally {
oldFile.close();
}
return true;
}
@Override // Closeable
public void close() throws IOException {
listeners.clear();
unlockAll();
storageDirs.clear();
}
/**
* Set flag whether an attempt should be made to restore failed storage
* directories at the next available oppurtuinity.
*
* @param val Whether restoration attempt should be made.
*/
void setRestoreFailedStorage(boolean val) {
LOG.warn("set restore failed storage to " + val);
restoreFailedStorage=val;
}
/**
* @return Whether failed storage directories are to be restored.
*/
boolean getRestoreFailedStorage() {
return restoreFailedStorage;
}
/**
* See if any of removed storages is "writable" again, and can be returned
* into service. If saveNamespace is set, then this method is being
* called from saveNamespace.
*
* @param saveNamespace Whether method is being called from saveNamespace()
*/
void attemptRestoreRemovedStorage() {
// if directory is "alive" - copy the images there...
if(!restoreFailedStorage || removedStorageDirs.size() == 0)
return; //nothing to restore
/* We don't want more than one thread trying to restore at a time */
synchronized (this.restorationLock) {
LOG.info("NNStorage.attemptRestoreRemovedStorage: check removed(failed) "+
"storarge. removedStorages size = " + removedStorageDirs.size());
for(Iterator<StorageDirectory> it
= this.removedStorageDirs.iterator(); it.hasNext();) {
StorageDirectory sd = it.next();
File root = sd.getRoot();
LOG.info("currently disabled dir " + root.getAbsolutePath() +
"; type="+sd.getStorageDirType()
+ ";canwrite="+root.canWrite());
try {
if(root.exists() && root.canWrite()) {
// when we try to restore we just need to remove all the data
// without saving current in-memory state (which could've changed).
sd.clearDirectory();
LOG.info("restoring dir " + sd.getRoot().getAbsolutePath());
for (NNStorageListener listener : listeners) {
listener.directoryAvailable(sd);
}
this.addStorageDir(sd); // restore
this.removedStorageDirs.remove(sd);
}
} catch(IOException e) {
LOG.warn("failed to restore " + sd.getRoot().getAbsolutePath(), e);
}
}
}
}
/**
* @return A list of storage directories which are in the errored state.
*/
List<StorageDirectory> getRemovedStorageDirs() {
return this.removedStorageDirs;
}
/**
* Set the storage directories which will be used. NNStorage.close() should
* be called before this to ensure any previous storage directories have been
* freed.
*
* Synchronized due to initialization of storageDirs and removedStorageDirs.
*
* @param fsNameDirs Locations to store images.
* @param fsEditsDirs Locations to store edit logs.
* @throws IOException
*/
synchronized void setStorageDirectories(Collection<URI> fsNameDirs,
Collection<URI> fsEditsDirs)
throws IOException {
this.storageDirs.clear();
this.removedStorageDirs.clear();
// Add all name dirs with appropriate NameNodeDirType
for (URI dirName : fsNameDirs) {
checkSchemeConsistency(dirName);
boolean isAlsoEdits = false;
for (URI editsDirName : fsEditsDirs) {
if (editsDirName.compareTo(dirName) == 0) {
isAlsoEdits = true;
fsEditsDirs.remove(editsDirName);
break;
}
}
NameNodeDirType dirType = (isAlsoEdits) ?
NameNodeDirType.IMAGE_AND_EDITS :
NameNodeDirType.IMAGE;
// Add to the list of storage directories, only if the
// URI is of type file://
if(dirName.getScheme().compareTo(JournalType.FILE.name().toLowerCase())
== 0){
this.addStorageDir(new StorageDirectory(new File(dirName.getPath()),
dirType));
}
}
// Add edits dirs if they are different from name dirs
for (URI dirName : fsEditsDirs) {
checkSchemeConsistency(dirName);
// Add to the list of storage directories, only if the
// URI is of type file://
if(dirName.getScheme().compareTo(JournalType.FILE.name().toLowerCase())
== 0)
this.addStorageDir(new StorageDirectory(new File(dirName.getPath()),
NameNodeDirType.EDITS));
}
}
/**
* Checks the consistency of a URI, in particular if the scheme
* is specified and is supported by a concrete implementation
* @param u URI whose consistency is being checked.
*/
private static void checkSchemeConsistency(URI u) throws IOException {
String scheme = u.getScheme();
// the URI should have a proper scheme
if(scheme == null)
throw new IOException("Undefined scheme for " + u);
else {
try {
// the scheme should be enumerated as JournalType
JournalType.valueOf(scheme.toUpperCase());
} catch (IllegalArgumentException iae){
throw new IOException("Unknown scheme " + scheme +
". It should correspond to a JournalType enumeration value");
}
}
}
/**
* Retrieve current directories of type IMAGE
* @return Collection of URI representing image directories
* @throws IOException in case of URI processing error
*/
Collection<URI> getImageDirectories() throws IOException {
return getDirectories(NameNodeDirType.IMAGE);
}
/**
* Retrieve current directories of type EDITS
* @return Collection of URI representing edits directories
* @throws IOException in case of URI processing error
*/
Collection<URI> getEditsDirectories() throws IOException {
return getDirectories(NameNodeDirType.EDITS);
}
/**
* Return number of storage directories of the given type.
* @param dirType directory type
* @return number of storage directories of type dirType
*/
int getNumStorageDirs(NameNodeDirType dirType) {
if(dirType == null)
return getNumStorageDirs();
Iterator<StorageDirectory> it = dirIterator(dirType);
int numDirs = 0;
for(; it.hasNext(); it.next())
numDirs++;
return numDirs;
}
/**
* Return the list of locations being used for a specific purpose.
* i.e. Image or edit log storage.
*
* @param dirType Purpose of locations requested.
* @throws IOException
*/
Collection<URI> getDirectories(NameNodeDirType dirType)
throws IOException {
ArrayList<URI> list = new ArrayList<URI>();
Iterator<StorageDirectory> it = (dirType == null) ? dirIterator() :
dirIterator(dirType);
for ( ;it.hasNext(); ) {
StorageDirectory sd = it.next();
try {
list.add(Util.fileAsURI(sd.getRoot()));
} catch (IOException e) {
throw new IOException("Exception while processing " +
"StorageDirectory " + sd.getRoot(), e);
}
}
return list;
}
/**
* Determine the checkpoint time of the specified StorageDirectory
*
* @param sd StorageDirectory to check
* @return If file exists and can be read, last checkpoint time. If not, 0L.
* @throws IOException On errors processing file pointed to by sd
*/
long readCheckpointTime(StorageDirectory sd) throws IOException {
File timeFile = getStorageFile(sd, NameNodeFile.TIME);
long timeStamp = 0L;
if (timeFile.exists() && timeFile.canRead()) {
DataInputStream in = new DataInputStream(new FileInputStream(timeFile));
try {
timeStamp = in.readLong();
} finally {
in.close();
}
}
return timeStamp;
}
/**
* Write last checkpoint time into a separate file.
*
* @param sd
* @throws IOException
*/
public void writeCheckpointTime(StorageDirectory sd) throws IOException {
if (checkpointTime < 0L)
return; // do not write negative time
File timeFile = getStorageFile(sd, NameNodeFile.TIME);
if (timeFile.exists() && ! timeFile.delete()) {
LOG.error("Cannot delete chekpoint time file: "
+ timeFile.getCanonicalPath());
}
FileOutputStream fos = new FileOutputStream(timeFile);
DataOutputStream out = new DataOutputStream(fos);
try {
out.writeLong(checkpointTime);
out.flush();
fos.getChannel().force(true);
} finally {
out.close();
}
}
/**
* Record new checkpoint time in order to
* distinguish healthy directories from the removed ones.
* If there is an error writing new checkpoint time, the corresponding
* storage directory is removed from the list.
*/
public void incrementCheckpointTime() {
setCheckpointTimeInStorage(checkpointTime + 1);
}
/**
* The age of the namespace state.<p>
* Reflects the latest time the image was saved.
* Modified with every save or a checkpoint.
* Persisted in VERSION file.
*
* @return the current checkpoint time.
*/
public long getCheckpointTime() {
return checkpointTime;
}
/**
* Set the checkpoint time.
*
* This method does not persist the checkpoint time to storage immediately.
*
* @see #setCheckpointTimeInStorage
* @param newCpT the new checkpoint time.
*/
public void setCheckpointTime(long newCpT) {
checkpointTime = newCpT;
}
/**
* Set the current checkpoint time. Writes the new checkpoint
* time to all available storage directories.
* @param newCpT The new checkpoint time.
*/
public void setCheckpointTimeInStorage(long newCpT) {
checkpointTime = newCpT;
// Write new checkpoint time in all storage directories
for(Iterator<StorageDirectory> it =
dirIterator(); it.hasNext();) {
StorageDirectory sd = it.next();
try {
writeCheckpointTime(sd);
} catch(IOException e) {
// Close any edits stream associated with this dir and remove directory
LOG.warn("incrementCheckpointTime failed on "
+ sd.getRoot().getPath() + ";type="+sd.getStorageDirType());
}
}
}
/**
* Return the name of the image file that is uploaded by periodic
* checkpointing
*
* @return List of filenames to save checkpoints to.
*/
public File[] getFsImageNameCheckpoint() {
ArrayList<File> list = new ArrayList<File>();
for (Iterator<StorageDirectory> it =
dirIterator(NameNodeDirType.IMAGE); it.hasNext();) {
list.add(getStorageFile(it.next(), NameNodeFile.IMAGE_NEW));
}
return list.toArray(new File[list.size()]);
}
/**
* Return the name of the image file.
* @return The name of the first image file.
*/
public File getFsImageName() {
StorageDirectory sd = null;
for (Iterator<StorageDirectory> it =
dirIterator(NameNodeDirType.IMAGE); it.hasNext();) {
sd = it.next();
File fsImage = getStorageFile(sd, NameNodeFile.IMAGE);
if(sd.getRoot().canRead() && fsImage.exists())
return fsImage;
}
return null;
}
/**
* @return The name of the first editlog file.
*/
public File getFsEditName() throws IOException {
for (Iterator<StorageDirectory> it
= dirIterator(NameNodeDirType.EDITS); it.hasNext();) {
StorageDirectory sd = it.next();
if(sd.getRoot().canRead())
return getEditFile(sd);
}
return null;
}
/**
* @return The name of the first time file.
*/
public File getFsTimeName() {
StorageDirectory sd = null;
// NameNodeFile.TIME shoul be same on all directories
for (Iterator<StorageDirectory> it =
dirIterator(); it.hasNext();)
sd = it.next();
return getStorageFile(sd, NameNodeFile.TIME);
}
/** Create new dfs name directory. Caution: this destroys all files
* in this filesystem. */
private void format(StorageDirectory sd) throws IOException {
sd.clearDirectory(); // create currrent dir
for (NNStorageListener listener : listeners) {
listener.formatOccurred(sd);
}
sd.write();
LOG.info("Storage directory " + sd.getRoot()
+ " has been successfully formatted.");
}
/**
* Format all available storage directories.
*/
public void format(String clusterId) throws IOException {
this.layoutVersion = FSConstants.LAYOUT_VERSION;
this.namespaceID = newNamespaceID();
this.clusterID = clusterId;
this.blockpoolID = newBlockPoolID();
this.cTime = 0L;
this.setCheckpointTime(now());
for (Iterator<StorageDirectory> it =
dirIterator(); it.hasNext();) {
StorageDirectory sd = it.next();
format(sd);
}
}
/**
* Generate new namespaceID.
*
* namespaceID is a persistent attribute of the namespace.
* It is generated when the namenode is formatted and remains the same
* during the life cycle of the namenode.
* When a datanodes register they receive it as the registrationID,
* which is checked every time the datanode is communicating with the
* namenode. Datanodes that do not 'know' the namespaceID are rejected.
*
* @return new namespaceID
*/
private int newNamespaceID() {
Random r = new Random();
r.setSeed(now());
int newID = 0;
while(newID == 0)
newID = r.nextInt(0x7FFFFFFF); // use 31 bits only
return newID;
}
/**
* Move {@code current} to {@code lastcheckpoint.tmp} and
* recreate empty {@code current}.
* {@code current} is moved only if it is well formatted,
* that is contains VERSION file.
*
* @see org.apache.hadoop.hdfs.server.common.Storage.StorageDirectory#getLastCheckpointTmp()
* @see org.apache.hadoop.hdfs.server.common.Storage.StorageDirectory#getPreviousCheckpoint()
*/
protected void moveCurrent(StorageDirectory sd)
throws IOException {
File curDir = sd.getCurrentDir();
File tmpCkptDir = sd.getLastCheckpointTmp();
// mv current -> lastcheckpoint.tmp
// only if current is formatted - has VERSION file
if(sd.getVersionFile().exists()) {
assert curDir.exists() : curDir + " directory must exist.";
assert !tmpCkptDir.exists() : tmpCkptDir + " directory must not exist.";
rename(curDir, tmpCkptDir);
}
// recreate current
if(!curDir.exists() && !curDir.mkdir())
throw new IOException("Cannot create directory " + curDir);
}
/**
* Move {@code lastcheckpoint.tmp} to {@code previous.checkpoint}
*
* @see org.apache.hadoop.hdfs.server.common.Storage.StorageDirectory#getPreviousCheckpoint()
* @see org.apache.hadoop.hdfs.server.common.Storage.StorageDirectory#getLastCheckpointTmp()
*/
protected void moveLastCheckpoint(StorageDirectory sd)
throws IOException {
File tmpCkptDir = sd.getLastCheckpointTmp();
File prevCkptDir = sd.getPreviousCheckpoint();
// remove previous.checkpoint
if (prevCkptDir.exists())
deleteDir(prevCkptDir);
// mv lastcheckpoint.tmp -> previous.checkpoint
if(tmpCkptDir.exists())
rename(tmpCkptDir, prevCkptDir);
}
@Override // Storage
protected void getFields(Properties props,
StorageDirectory sd
) throws IOException {
super.getFields(props, sd);
if (layoutVersion == 0) {
throw new IOException("NameNode directory "
+ sd.getRoot() + " is not formatted.");
}
// Set Block pool ID in version with federation support
if (LayoutVersion.supports(Feature.FEDERATION, layoutVersion)) {
String sbpid = props.getProperty("blockpoolID");
setBlockPoolID(sd.getRoot(), sbpid);
}
String sDUS, sDUV;
sDUS = props.getProperty("distributedUpgradeState");
sDUV = props.getProperty("distributedUpgradeVersion");
setDistributedUpgradeState(
sDUS == null? false : Boolean.parseBoolean(sDUS),
sDUV == null? getLayoutVersion() : Integer.parseInt(sDUV));
String sMd5 = props.getProperty(MESSAGE_DIGEST_PROPERTY);
if (LayoutVersion.supports(Feature.FSIMAGE_CHECKSUM, layoutVersion)) {
if (sMd5 == null) {
throw new InconsistentFSStateException(sd.getRoot(),
"file " + STORAGE_FILE_VERSION
+ " does not have MD5 image digest.");
}
this.imageDigest = new MD5Hash(sMd5);
} else if (sMd5 != null) {
throw new InconsistentFSStateException(sd.getRoot(),
"file " + STORAGE_FILE_VERSION +
" has image MD5 digest when version is " + layoutVersion);
}
this.setCheckpointTime(readCheckpointTime(sd));
}
/**
* Write last checkpoint time and version file into the storage directory.
*
* The version file should always be written last.
* Missing or corrupted version file indicates that
* the checkpoint is not valid.
*
* @param sd storage directory
* @throws IOException
*/
@Override // Storage
protected void setFields(Properties props,
StorageDirectory sd
) throws IOException {
super.setFields(props, sd);
// Set blockpoolID in version with federation support
if (LayoutVersion.supports(Feature.FEDERATION, layoutVersion)) {
props.setProperty("blockpoolID", blockpoolID);
}
boolean uState = getDistributedUpgradeState();
int uVersion = getDistributedUpgradeVersion();
if(uState && uVersion != getLayoutVersion()) {
props.setProperty("distributedUpgradeState", Boolean.toString(uState));
props.setProperty("distributedUpgradeVersion",
Integer.toString(uVersion));
}
if (LayoutVersion.supports(Feature.FSIMAGE_CHECKSUM, layoutVersion)) {
// Though the current NN supports this feature, this function
// is called with old layoutVersions from the upgrade tests.
if (imageDigest == null) {
// May be null on the first save after an upgrade.
imageDigest = MD5Hash.digest(
new FileInputStream(getStorageFile(sd, NameNodeFile.IMAGE)));
}
props.setProperty(MESSAGE_DIGEST_PROPERTY, imageDigest.toString());
}
writeCheckpointTime(sd);
}
/**
* @return A File of 'type' in storage directory 'sd'.
*/
static File getStorageFile(StorageDirectory sd, NameNodeFile type) {
return new File(sd.getCurrentDir(), type.getName());
}
/**
* @return A editlog File in storage directory 'sd'.
*/
File getEditFile(StorageDirectory sd) {
return getStorageFile(sd, NameNodeFile.EDITS);
}
/**
* @return A temporary editlog File in storage directory 'sd'.
*/
File getEditNewFile(StorageDirectory sd) {
return getStorageFile(sd, NameNodeFile.EDITS_NEW);
}
/**
* @return A list of all Files of 'type' in available storage directories.
*/
Collection<File> getFiles(NameNodeFile type, NameNodeDirType dirType) {
ArrayList<File> list = new ArrayList<File>();
Iterator<StorageDirectory> it =
(dirType == null) ? dirIterator() : dirIterator(dirType);
for ( ;it.hasNext(); ) {
list.add(getStorageFile(it.next(), type));
}
return list;
}
/**
* Set the upgrade manager for use in a distributed upgrade.
* @param um The upgrade manager
*/
void setUpgradeManager(UpgradeManager um) {
upgradeManager = um;
}
/**
* @return The current distribued upgrade state.
*/
boolean getDistributedUpgradeState() {
return upgradeManager == null ? false : upgradeManager.getUpgradeState();
}
/**
* @return The current upgrade version.
*/
int getDistributedUpgradeVersion() {
return upgradeManager == null ? 0 : upgradeManager.getUpgradeVersion();
}
/**
* Set the upgrade state and version.
* @param uState the new state.
* @param uVersion the new version.
*/
private void setDistributedUpgradeState(boolean uState, int uVersion) {
upgradeManager.setUpgradeState(uState, uVersion);
}
/**
* Verify that the distributed upgrade state is valid.
* @param startOpt the option the namenode was started with.
*/
void verifyDistributedUpgradeProgress(StartupOption startOpt
) throws IOException {
if(startOpt == StartupOption.ROLLBACK || startOpt == StartupOption.IMPORT)
return;
assert upgradeManager != null : "FSNameSystem.upgradeManager is null.";
if(startOpt != StartupOption.UPGRADE) {
if(upgradeManager.getUpgradeState())
throw new IOException(
"\n Previous distributed upgrade was not completed. "
+ "\n Please restart NameNode with -upgrade option.");
if(upgradeManager.getDistributedUpgrades() != null)
throw new IOException("\n Distributed upgrade for NameNode version "
+ upgradeManager.getUpgradeVersion()
+ " to current LV " + FSConstants.LAYOUT_VERSION
+ " is required.\n Please restart NameNode"
+ " with -upgrade option.");
}
}
/**
* Initialize a distributed upgrade.
*/
void initializeDistributedUpgrade() throws IOException {
if(! upgradeManager.initializeUpgrade())
return;
// write new upgrade state into disk
writeAll();
LOG.info("\n Distributed upgrade for NameNode version "
+ upgradeManager.getUpgradeVersion() + " to current LV "
+ FSConstants.LAYOUT_VERSION + " is initialized.");
}
/**
* Set the digest for the latest image stored by NNStorage.
* @param digest The digest for the image.
*/
void setImageDigest(MD5Hash digest) {
this.imageDigest = digest;
}
/**
* Get the digest for the latest image storage by NNStorage.
* @return The digest for the latest image.
*/
MD5Hash getImageDigest() {
return imageDigest;
}
/**
* Register a listener. The listener will be notified of changes to the list
* of available storage directories.
*
* @see NNStorageListener
* @param sel A storage listener.
*/
void registerListener(NNStorageListener sel) {
listeners.add(sel);
}
/**
* Disable the check for pre-upgradable layouts. Needed for BackupImage.
* @param val Whether to disable the preupgradeable layout check.
*/
void setDisablePreUpgradableLayoutCheck(boolean val) {
disablePreUpgradableLayoutCheck = val;
}
/**
* Marks a list of directories as having experienced an error.
*
* @param sds A list of storage directories to mark as errored.
* @throws IOException
*/
void reportErrorsOnDirectories(List<StorageDirectory> sds) throws IOException {
for (StorageDirectory sd : sds) {
reportErrorsOnDirectory(sd);
}
}
/**
* Reports that a directory has experienced an error.
* Notifies listeners that the directory is no longer
* available.
*
* @param sd A storage directory to mark as errored.
* @throws IOException
*/
void reportErrorsOnDirectory(StorageDirectory sd)
throws IOException {
LOG.warn("Error reported on storage directory " + sd);
String lsd = listStorageDirectories();
LOG.debug("current list of storage dirs:" + lsd);
for (NNStorageListener listener : listeners) {
listener.errorOccurred(sd);
}
LOG.info("About to remove corresponding storage: "
+ sd.getRoot().getAbsolutePath());
try {
sd.unlock();
} catch (Exception e) {
LOG.info("Unable to unlock bad storage directory: "
+ sd.getRoot().getPath(), e);
}
if (this.storageDirs.remove(sd)) {
this.removedStorageDirs.add(sd);
}
incrementCheckpointTime();
lsd = listStorageDirectories();
LOG.debug("at the end current list of storage dirs:" + lsd);
}
/**
* Processes the startup options for the clusterid and blockpoolid
* for the upgrade.
* @param startOpt Startup options
* @param layoutVersion Layout version for the upgrade
* @throws IOException
*/
void processStartupOptionsForUpgrade(StartupOption startOpt, int layoutVersion)
throws IOException {
if (startOpt == StartupOption.UPGRADE) {
// If upgrade from a release that does not support federation,
// if clusterId is provided in the startupOptions use it.
// Else generate a new cluster ID
if (!LayoutVersion.supports(Feature.FEDERATION, layoutVersion)) {
if (startOpt.getClusterId() == null) {
startOpt.setClusterId(newClusterID());
}
setClusterID(startOpt.getClusterId());
setBlockPoolID(newBlockPoolID());
} else {
// Upgrade from one version of federation to another supported
// version of federation doesn't require clusterID.
// Warn the user if the current clusterid didn't match with the input
// clusterid.
if (startOpt.getClusterId() != null
&& !startOpt.getClusterId().equals(getClusterID())) {
LOG.warn("Clusterid mismatch - current clusterid: " + getClusterID()
+ ", Ignoring given clusterid: " + startOpt.getClusterId());
}
}
LOG.info("Using clusterid: " + getClusterID());
}
}
/**
* Generate new clusterID.
*
* clusterID is a persistent attribute of the cluster.
* It is generated when the cluster is created and remains the same
* during the life cycle of the cluster. When a new name node is formated, if
* this is a new cluster, a new clusterID is geneated and stored. Subsequent
* name node must be given the same ClusterID during its format to be in the
* same cluster.
* When a datanode register it receive the clusterID and stick with it.
* If at any point, name node or data node tries to join another cluster, it
* will be rejected.
*
* @return new clusterID
*/
public static String newClusterID() {
return "CID-" + UUID.randomUUID().toString();
}
void setClusterID(String cid) {
clusterID = cid;
}
/**
* try to find current cluster id in the VERSION files
* returns first cluster id found in any VERSION file
* null in case none found
* @return clusterId or null in case no cluster id found
*/
public String determineClusterId() {
String cid = null;
Iterator<StorageDirectory> sdit = dirIterator(NameNodeDirType.IMAGE);
while(sdit.hasNext()) {
StorageDirectory sd = sdit.next();
try {
Properties props = sd.readFrom(sd.getVersionFile());
cid = props.getProperty("clusterID");
LOG.info("current cluster id for sd="+sd.getCurrentDir() +
";lv=" + layoutVersion + ";cid=" + cid);
if(cid != null && !cid.equals(""))
return cid;
} catch (Exception e) {
LOG.warn("this sd not available: " + e.getLocalizedMessage());
} //ignore
}
LOG.warn("couldn't find any VERSION file containing valid ClusterId");
return null;
}
/**
* Generate new blockpoolID.
*
* @return new blockpoolID
*/
String newBlockPoolID() throws UnknownHostException{
String ip = "unknownIP";
try {
ip = DNS.getDefaultIP("default");
} catch (UnknownHostException e) {
LOG.warn("Could not find ip address of \"default\" inteface.");
throw e;
}
int rand = 0;
try {
rand = SecureRandom.getInstance("SHA1PRNG").nextInt(Integer.MAX_VALUE);
} catch (NoSuchAlgorithmException e) {
final Random R = new Random();
LOG.warn("Could not use SecureRandom");
rand = R.nextInt(Integer.MAX_VALUE);
}
String bpid = "BP-" + rand + "-"+ ip + "-" + System.currentTimeMillis();
return bpid;
}
/** Validate and set block pool ID */
void setBlockPoolID(String bpid) {
blockpoolID = bpid;
}
/** Validate and set block pool ID */
private void setBlockPoolID(File storage, String bpid)
throws InconsistentFSStateException {
if (bpid == null || bpid.equals("")) {
throw new InconsistentFSStateException(storage, "file "
+ Storage.STORAGE_FILE_VERSION + " has no block pool Id.");
}
if (!blockpoolID.equals("") && !blockpoolID.equals(bpid)) {
throw new InconsistentFSStateException(storage,
"Unexepcted blockpoolID " + bpid + " . Expected " + blockpoolID);
}
setBlockPoolID(bpid);
}
public String getBlockPoolID() {
return blockpoolID;
}
}