blob: 91e8e01c20bb6dd51ea4d8fa9eb9cfb9887b5136 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hdfs.server.datanode;
import java.io.File;
import java.io.FileInputStream;
import java.io.FileOutputStream;
import java.io.IOException;
import java.io.RandomAccessFile;
import java.nio.channels.FileLock;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Iterator;
import java.util.Properties;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.fs.FileUtil.HardLink;
import org.apache.hadoop.hdfs.protocol.FSConstants;
import org.apache.hadoop.hdfs.server.common.GenerationStamp;
import org.apache.hadoop.hdfs.server.common.InconsistentFSStateException;
import org.apache.hadoop.hdfs.server.common.Storage;
import org.apache.hadoop.hdfs.server.common.StorageInfo;
import org.apache.hadoop.hdfs.server.common.HdfsConstants.NodeType;
import org.apache.hadoop.hdfs.server.common.HdfsConstants.StartupOption;
import org.apache.hadoop.hdfs.server.protocol.NamespaceInfo;
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.util.Daemon;
/**
* Data storage information file.
* <p>
* @see Storage
*/
@InterfaceAudience.Private
public class DataStorage extends Storage {
// Constants
final static String BLOCK_SUBDIR_PREFIX = "subdir";
final static String BLOCK_FILE_PREFIX = "blk_";
final static String COPY_FILE_PREFIX = "dncp_";
final static String STORAGE_DIR_RBW = "rbw";
final static String STORAGE_DIR_FINALIZED = "finalized";
final static String STORAGE_DIR_DETACHED = "detach";
private String storageID;
DataStorage() {
super(NodeType.DATA_NODE);
storageID = "";
}
DataStorage(int nsID, long cT, String strgID) {
super(NodeType.DATA_NODE, nsID, cT);
this.storageID = strgID;
}
public DataStorage(StorageInfo storageInfo, String strgID) {
super(NodeType.DATA_NODE, storageInfo);
this.storageID = strgID;
}
public String getStorageID() {
return storageID;
}
void setStorageID(String newStorageID) {
this.storageID = newStorageID;
}
/**
* Analyze storage directories.
* Recover from previous transitions if required.
* Perform fs state transition if necessary depending on the namespace info.
* Read storage info.
*
* @param nsInfo namespace information
* @param dataDirs array of data storage directories
* @param startOpt startup option
* @throws IOException
*/
void recoverTransitionRead(NamespaceInfo nsInfo,
Collection<File> dataDirs,
StartupOption startOpt
) throws IOException {
assert FSConstants.LAYOUT_VERSION == nsInfo.getLayoutVersion() :
"Data-node and name-node layout versions must be the same.";
// 1. For each data directory calculate its state and
// check whether all is consistent before transitioning.
// Format and recover.
this.storageID = "";
this.storageDirs = new ArrayList<StorageDirectory>(dataDirs.size());
ArrayList<StorageState> dataDirStates = new ArrayList<StorageState>(dataDirs.size());
for(Iterator<File> it = dataDirs.iterator(); it.hasNext();) {
File dataDir = it.next();
StorageDirectory sd = new StorageDirectory(dataDir);
StorageState curState;
try {
curState = sd.analyzeStorage(startOpt);
// sd is locked but not opened
switch(curState) {
case NORMAL:
break;
case NON_EXISTENT:
// ignore this storage
LOG.info("Storage directory " + dataDir + " does not exist.");
it.remove();
continue;
case NOT_FORMATTED: // format
LOG.info("Storage directory " + dataDir + " is not formatted.");
LOG.info("Formatting ...");
format(sd, nsInfo);
break;
default: // recovery part is common
sd.doRecover(curState);
}
} catch (IOException ioe) {
sd.unlock();
throw ioe;
}
// add to the storage list
addStorageDir(sd);
dataDirStates.add(curState);
}
if (dataDirs.size() == 0) // none of the data dirs exist
throw new IOException(
"All specified directories are not accessible or do not exist.");
// 2. Do transitions
// Each storage directory is treated individually.
// During sturtup some of them can upgrade or rollback
// while others could be uptodate for the regular startup.
for(int idx = 0; idx < getNumStorageDirs(); idx++) {
doTransition(getStorageDir(idx), nsInfo, startOpt);
assert this.getLayoutVersion() == nsInfo.getLayoutVersion() :
"Data-node and name-node layout versions must be the same.";
assert this.getCTime() == nsInfo.getCTime() :
"Data-node and name-node CTimes must be the same.";
}
// 3. Update all storages. Some of them might have just been formatted.
this.writeAll();
}
void format(StorageDirectory sd, NamespaceInfo nsInfo) throws IOException {
sd.clearDirectory(); // create directory
this.layoutVersion = FSConstants.LAYOUT_VERSION;
this.namespaceID = nsInfo.getNamespaceID();
this.cTime = 0;
// store storageID as it currently is
sd.write();
}
protected void setFields(Properties props,
StorageDirectory sd
) throws IOException {
super.setFields(props, sd);
props.setProperty("storageID", storageID);
}
protected void getFields(Properties props,
StorageDirectory sd
) throws IOException {
super.getFields(props, sd);
String ssid = props.getProperty("storageID");
if (ssid == null ||
!("".equals(storageID) || "".equals(ssid) ||
storageID.equals(ssid)))
throw new InconsistentFSStateException(sd.getRoot(),
"has incompatible storage Id.");
if ("".equals(storageID)) // update id only if it was empty
storageID = ssid;
}
public boolean isConversionNeeded(StorageDirectory sd) throws IOException {
File oldF = new File(sd.getRoot(), "storage");
if (!oldF.exists())
return false;
// check the layout version inside the storage file
// Lock and Read old storage file
RandomAccessFile oldFile = new RandomAccessFile(oldF, "rws");
FileLock oldLock = oldFile.getChannel().tryLock();
try {
oldFile.seek(0);
int oldVersion = oldFile.readInt();
if (oldVersion < LAST_PRE_UPGRADE_LAYOUT_VERSION)
return false;
} finally {
oldLock.release();
oldFile.close();
}
return true;
}
/**
* Analize which and whether a transition of the fs state is required
* and perform it if necessary.
*
* Rollback if previousLV >= LAYOUT_VERSION && prevCTime <= namenode.cTime
* Upgrade if this.LV > LAYOUT_VERSION || this.cTime < namenode.cTime
* Regular startup if this.LV = LAYOUT_VERSION && this.cTime = namenode.cTime
*
* @param sd storage directory
* @param nsInfo namespace info
* @param startOpt startup option
* @throws IOException
*/
private void doTransition( StorageDirectory sd,
NamespaceInfo nsInfo,
StartupOption startOpt
) throws IOException {
if (startOpt == StartupOption.ROLLBACK)
doRollback(sd, nsInfo); // rollback if applicable
sd.read();
checkVersionUpgradable(this.layoutVersion);
assert this.layoutVersion >= FSConstants.LAYOUT_VERSION :
"Future version is not allowed";
if (getNamespaceID() != nsInfo.getNamespaceID())
throw new IOException(
"Incompatible namespaceIDs in " + sd.getRoot().getCanonicalPath()
+ ": namenode namespaceID = " + nsInfo.getNamespaceID()
+ "; datanode namespaceID = " + getNamespaceID());
if (this.layoutVersion == FSConstants.LAYOUT_VERSION
&& this.cTime == nsInfo.getCTime())
return; // regular startup
// verify necessity of a distributed upgrade
verifyDistributedUpgradeProgress(nsInfo);
if (this.layoutVersion > FSConstants.LAYOUT_VERSION
|| this.cTime < nsInfo.getCTime()) {
doUpgrade(sd, nsInfo); // upgrade
return;
}
// layoutVersion == LAYOUT_VERSION && this.cTime > nsInfo.cTime
// must shutdown
throw new IOException("Datanode state: LV = " + this.getLayoutVersion()
+ " CTime = " + this.getCTime()
+ " is newer than the namespace state: LV = "
+ nsInfo.getLayoutVersion()
+ " CTime = " + nsInfo.getCTime());
}
/**
* Move current storage into a backup directory,
* and hardlink all its blocks into the new current directory.
*
* @param sd storage directory
* @throws IOException
*/
void doUpgrade(StorageDirectory sd,
NamespaceInfo nsInfo
) throws IOException {
LOG.info("Upgrading storage directory " + sd.getRoot()
+ ".\n old LV = " + this.getLayoutVersion()
+ "; old CTime = " + this.getCTime()
+ ".\n new LV = " + nsInfo.getLayoutVersion()
+ "; new CTime = " + nsInfo.getCTime());
File curDir = sd.getCurrentDir();
File prevDir = sd.getPreviousDir();
assert curDir.exists() : "Current directory must exist.";
// Cleanup directory "detach"
cleanupDetachDir(new File(curDir, STORAGE_DIR_DETACHED));
// delete previous dir before upgrading
if (prevDir.exists())
deleteDir(prevDir);
File tmpDir = sd.getPreviousTmp();
assert !tmpDir.exists() : "previous.tmp directory must not exist.";
// rename current to tmp
rename(curDir, tmpDir);
// hard link finalized & rbw blocks
linkAllBlocks(tmpDir, curDir);
// create current directory if not exists
if (!curDir.exists() && !curDir.mkdirs())
throw new IOException("Cannot create directory " + curDir);
// write version file
this.layoutVersion = FSConstants.LAYOUT_VERSION;
assert this.namespaceID == nsInfo.getNamespaceID() :
"Data-node and name-node layout versions must be the same.";
this.cTime = nsInfo.getCTime();
sd.write();
// rename tmp to previous
rename(tmpDir, prevDir);
LOG.info("Upgrade of " + sd.getRoot()+ " is complete.");
}
/**
* Cleanup the detachDir.
*
* If the directory is not empty report an error;
* Otherwise remove the directory.
*
* @param detachDir detach directory
* @throws IOException if the directory is not empty or it can not be removed
*/
private void cleanupDetachDir(File detachDir) throws IOException {
if (layoutVersion >= PRE_RBW_LAYOUT_VERSION &&
detachDir.exists() && detachDir.isDirectory() ) {
if (detachDir.list().length != 0 ) {
throw new IOException("Detached directory " + detachDir +
" is not empty. Please manually move each file under this " +
"directory to the finalized directory if the finalized " +
"directory tree does not have the file.");
} else if (!detachDir.delete()) {
throw new IOException("Cannot remove directory " + detachDir);
}
}
}
void doRollback( StorageDirectory sd,
NamespaceInfo nsInfo
) throws IOException {
File prevDir = sd.getPreviousDir();
// regular startup if previous dir does not exist
if (!prevDir.exists())
return;
DataStorage prevInfo = new DataStorage();
StorageDirectory prevSD = prevInfo.new StorageDirectory(sd.getRoot());
prevSD.read(prevSD.getPreviousVersionFile());
// We allow rollback to a state, which is either consistent with
// the namespace state or can be further upgraded to it.
if (!(prevInfo.getLayoutVersion() >= FSConstants.LAYOUT_VERSION
&& prevInfo.getCTime() <= nsInfo.getCTime())) // cannot rollback
throw new InconsistentFSStateException(prevSD.getRoot(),
"Cannot rollback to a newer state.\nDatanode previous state: LV = "
+ prevInfo.getLayoutVersion() + " CTime = " + prevInfo.getCTime()
+ " is newer than the namespace state: LV = "
+ nsInfo.getLayoutVersion() + " CTime = " + nsInfo.getCTime());
LOG.info("Rolling back storage directory " + sd.getRoot()
+ ".\n target LV = " + nsInfo.getLayoutVersion()
+ "; target CTime = " + nsInfo.getCTime());
File tmpDir = sd.getRemovedTmp();
assert !tmpDir.exists() : "removed.tmp directory must not exist.";
// rename current to tmp
File curDir = sd.getCurrentDir();
assert curDir.exists() : "Current directory must exist.";
rename(curDir, tmpDir);
// rename previous to current
rename(prevDir, curDir);
// delete tmp dir
deleteDir(tmpDir);
LOG.info("Rollback of " + sd.getRoot() + " is complete.");
}
void doFinalize(StorageDirectory sd) throws IOException {
File prevDir = sd.getPreviousDir();
if (!prevDir.exists())
return; // already discarded
final String dataDirPath = sd.getRoot().getCanonicalPath();
LOG.info("Finalizing upgrade for storage directory "
+ dataDirPath
+ ".\n cur LV = " + this.getLayoutVersion()
+ "; cur CTime = " + this.getCTime());
assert sd.getCurrentDir().exists() : "Current directory must exist.";
final File tmpDir = sd.getFinalizedTmp();
// rename previous to tmp
rename(prevDir, tmpDir);
// delete tmp dir in a separate thread
new Daemon(new Runnable() {
public void run() {
try {
deleteDir(tmpDir);
} catch(IOException ex) {
LOG.error("Finalize upgrade for " + dataDirPath + " failed.", ex);
}
LOG.info("Finalize upgrade for " + dataDirPath + " is complete.");
}
public String toString() { return "Finalize " + dataDirPath; }
}).start();
}
void finalizeUpgrade() throws IOException {
for (Iterator<StorageDirectory> it = storageDirs.iterator(); it.hasNext();) {
doFinalize(it.next());
}
}
/**
* Hardlink all finalized and RBW blocks in fromDir to toDir
* @param fromDir directory where the snapshot is stored
* @param toDir the current data directory
* @throws IOException if error occurs during hardlink
*/
private void linkAllBlocks(File fromDir, File toDir) throws IOException {
// do the link
int diskLayoutVersion = this.getLayoutVersion();
if (diskLayoutVersion < PRE_RBW_LAYOUT_VERSION) { // RBW version
// hardlink finalized blocks in tmpDir/finalized
linkBlocks(new File(fromDir, STORAGE_DIR_FINALIZED),
new File(toDir, STORAGE_DIR_FINALIZED), diskLayoutVersion);
// hardlink rbw blocks in tmpDir/finalized
linkBlocks(new File(fromDir, STORAGE_DIR_RBW),
new File(toDir, STORAGE_DIR_RBW), diskLayoutVersion);
} else { // pre-RBW version
// hardlink finalized blocks in tmpDir
linkBlocks(fromDir,
new File(toDir, STORAGE_DIR_FINALIZED), diskLayoutVersion);
}
}
static void linkBlocks(File from, File to, int oldLV) throws IOException {
if (!from.exists()) {
return;
}
if (!from.isDirectory()) {
if (from.getName().startsWith(COPY_FILE_PREFIX)) {
FileInputStream in = new FileInputStream(from);
try {
FileOutputStream out = new FileOutputStream(to);
try {
IOUtils.copyBytes(in, out, 16*1024);
} finally {
out.close();
}
} finally {
in.close();
}
} else {
//check if we are upgrading from pre-generation stamp version.
if (oldLV >= PRE_GENERATIONSTAMP_LAYOUT_VERSION) {
// Link to the new file name.
to = new File(convertMetatadataFileName(to.getAbsolutePath()));
}
HardLink.createHardLink(from, to);
}
return;
}
// from is a directory
if (!to.mkdirs())
throw new IOException("Cannot create directory " + to);
String[] blockNames = from.list(new java.io.FilenameFilter() {
public boolean accept(File dir, String name) {
return name.startsWith(BLOCK_SUBDIR_PREFIX)
|| name.startsWith(BLOCK_FILE_PREFIX)
|| name.startsWith(COPY_FILE_PREFIX);
}
});
for(int i = 0; i < blockNames.length; i++)
linkBlocks(new File(from, blockNames[i]),
new File(to, blockNames[i]), oldLV);
}
protected void corruptPreUpgradeStorage(File rootDir) throws IOException {
File oldF = new File(rootDir, "storage");
if (oldF.exists())
return;
// recreate old storage file to let pre-upgrade versions fail
if (!oldF.createNewFile())
throw new IOException("Cannot create file " + oldF);
RandomAccessFile oldFile = new RandomAccessFile(oldF, "rws");
// write new version into old storage file
try {
writeCorruptedData(oldFile);
} finally {
oldFile.close();
}
}
private void verifyDistributedUpgradeProgress(
NamespaceInfo nsInfo
) throws IOException {
UpgradeManagerDatanode um = DataNode.getDataNode().upgradeManager;
assert um != null : "DataNode.upgradeManager is null.";
um.setUpgradeState(false, getLayoutVersion());
um.initializeUpgrade(nsInfo);
}
private static final Pattern PRE_GENSTAMP_META_FILE_PATTERN =
Pattern.compile("(.*blk_[-]*\\d+)\\.meta$");
/**
* This is invoked on target file names when upgrading from pre generation
* stamp version (version -13) to correct the metatadata file name.
* @param oldFileName
* @return the new metadata file name with the default generation stamp.
*/
private static String convertMetatadataFileName(String oldFileName) {
Matcher matcher = PRE_GENSTAMP_META_FILE_PATTERN.matcher(oldFileName);
if (matcher.matches()) {
//return the current metadata file name
return FSDataset.getMetaFileName(matcher.group(1),
GenerationStamp.GRANDFATHER_GENERATION_STAMP);
}
return oldFileName;
}
}