| /** |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| |
| package org.apache.hadoop.hdfs.server.datanode; |
| |
| import java.io.File; |
| import java.io.IOException; |
| import java.net.URI; |
| import java.util.ArrayList; |
| import java.util.Collections; |
| import java.util.Iterator; |
| import java.util.List; |
| import java.util.Properties; |
| import java.util.Set; |
| import java.util.concurrent.Callable; |
| import java.util.concurrent.ConcurrentHashMap; |
| import java.util.regex.Matcher; |
| import java.util.regex.Pattern; |
| |
| import org.apache.hadoop.classification.InterfaceAudience; |
| import org.apache.hadoop.conf.Configuration; |
| import org.apache.hadoop.fs.FileUtil; |
| import org.apache.hadoop.fs.HardLink; |
| import org.apache.hadoop.fs.StorageType; |
| import org.apache.hadoop.hdfs.protocol.LayoutVersion; |
| import org.apache.hadoop.hdfs.server.common.HdfsServerConstants; |
| import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.NodeType; |
| import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.StartupOption; |
| import org.apache.hadoop.hdfs.server.common.InconsistentFSStateException; |
| import org.apache.hadoop.hdfs.server.common.Storage; |
| import org.apache.hadoop.hdfs.server.common.StorageInfo; |
| import org.apache.hadoop.hdfs.server.protocol.NamespaceInfo; |
| import org.apache.hadoop.util.Daemon; |
| |
| import com.google.common.annotations.VisibleForTesting; |
| import com.google.common.base.Preconditions; |
| import com.google.common.collect.Lists; |
| |
| /** |
| * Manages storage for the set of BlockPoolSlices which share a particular |
| * block pool id, on this DataNode. |
| * |
| * This class supports the following functionality: |
| * <ol> |
| * <li> Formatting a new block pool storage</li> |
| * <li> Recovering a storage state to a consistent state (if possible></li> |
| * <li> Taking a snapshot of the block pool during upgrade</li> |
| * <li> Rolling back a block pool to a previous snapshot</li> |
| * <li> Finalizing block storage by deletion of a snapshot</li> |
| * </ul> |
| * |
| * @see Storage |
| */ |
| @InterfaceAudience.Private |
| public class BlockPoolSliceStorage extends Storage { |
| static final String TRASH_ROOT_DIR = "trash"; |
| |
| /** |
| * A marker file that is created on each root directory if a rolling upgrade |
| * is in progress. The NN does not inform the DN when a rolling upgrade is |
| * finalized. All the DN can infer is whether or not a rolling upgrade is |
| * currently in progress. When the rolling upgrade is not in progress: |
| * 1. If the marker file is present, then a rolling upgrade just completed. |
| * If a 'previous' directory exists, it can be deleted now. |
| * 2. If the marker file is absent, then a regular upgrade may be in |
| * progress. Do not delete the 'previous' directory. |
| */ |
| static final String ROLLING_UPGRADE_MARKER_FILE = "RollingUpgradeInProgress"; |
| |
| private static final String BLOCK_POOL_ID_PATTERN_BASE = |
| Pattern.quote(File.separator) + |
| "BP-\\d+-\\d{1,3}\\.\\d{1,3}\\.\\d{1,3}\\.\\d{1,3}-\\d+" + |
| Pattern.quote(File.separator); |
| |
| private static final Pattern BLOCK_POOL_PATH_PATTERN = Pattern.compile( |
| "^(.*)(" + BLOCK_POOL_ID_PATTERN_BASE + ")(.*)$"); |
| |
| private static final Pattern BLOCK_POOL_CURRENT_PATH_PATTERN = Pattern.compile( |
| "^(.*)(" + BLOCK_POOL_ID_PATTERN_BASE + ")(" + STORAGE_DIR_CURRENT + ")(.*)$"); |
| |
| private static final Pattern BLOCK_POOL_TRASH_PATH_PATTERN = Pattern.compile( |
| "^(.*)(" + BLOCK_POOL_ID_PATTERN_BASE + ")(" + TRASH_ROOT_DIR + ")(.*)$"); |
| |
| private String blockpoolID = ""; // id of the blockpool |
| private Daemon trashCleaner; |
| |
| public BlockPoolSliceStorage(StorageInfo storageInfo, String bpid) { |
| super(storageInfo); |
| blockpoolID = bpid; |
| } |
| |
| /** |
| * These maps are used as an optimization to avoid one filesystem operation |
| * per storage on each heartbeat response. |
| */ |
| private static Set<String> storagesWithRollingUpgradeMarker; |
| private static Set<String> storagesWithoutRollingUpgradeMarker; |
| |
| BlockPoolSliceStorage(int namespaceID, String bpID, long cTime, |
| String clusterId) { |
| super(NodeType.DATA_NODE); |
| this.namespaceID = namespaceID; |
| this.blockpoolID = bpID; |
| this.cTime = cTime; |
| this.clusterID = clusterId; |
| storagesWithRollingUpgradeMarker = Collections.newSetFromMap( |
| new ConcurrentHashMap<String, Boolean>()); |
| storagesWithoutRollingUpgradeMarker = Collections.newSetFromMap( |
| new ConcurrentHashMap<String, Boolean>()); |
| } |
| |
| private BlockPoolSliceStorage() { |
| super(NodeType.DATA_NODE); |
| storagesWithRollingUpgradeMarker = Collections.newSetFromMap( |
| new ConcurrentHashMap<String, Boolean>()); |
| storagesWithoutRollingUpgradeMarker = Collections.newSetFromMap( |
| new ConcurrentHashMap<String, Boolean>()); |
| } |
| |
| // Expose visibility for VolumeBuilder#commit(). |
| public void addStorageDir(StorageDirectory sd) { |
| super.addStorageDir(sd); |
| } |
| |
| /** |
| * Load one storage directory. Recover from previous transitions if required. |
| * |
| * @param nsInfo namespace information |
| * @param dataDir the root path of the storage directory |
| * @param startOpt startup option |
| * @return the StorageDirectory successfully loaded. |
| * @throws IOException |
| */ |
| private StorageDirectory loadStorageDirectory(NamespaceInfo nsInfo, |
| StorageLocation location, StartupOption startOpt, |
| List<Callable<StorageDirectory>> callables, Configuration conf) |
| throws IOException { |
| StorageDirectory sd = new StorageDirectory( |
| nsInfo.getBlockPoolID(), null, true, location); |
| try { |
| StorageState curState = sd.analyzeStorage(startOpt, this, true); |
| // sd is locked but not opened |
| switch (curState) { |
| case NORMAL: |
| break; |
| case NON_EXISTENT: |
| LOG.info("Block pool storage directory for location {} and block pool" |
| + " id {} does not exist", location, nsInfo.getBlockPoolID()); |
| throw new IOException("Storage directory for location " + location + |
| " and block pool id " + nsInfo.getBlockPoolID() + |
| " does not exist"); |
| case NOT_FORMATTED: // format |
| LOG.info("Block pool storage directory for location {} and block pool" |
| + " id {} is not formatted. Formatting ...", location, |
| nsInfo.getBlockPoolID()); |
| format(sd, nsInfo); |
| break; |
| default: // recovery part is common |
| sd.doRecover(curState); |
| } |
| |
| // 2. Do transitions |
| // Each storage directory is treated individually. |
| // During startup some of them can upgrade or roll back |
| // while others could be up-to-date for the regular startup. |
| if (!doTransition(sd, nsInfo, startOpt, callables, conf)) { |
| |
| // 3. Check CTime and update successfully loaded storage. |
| if (getCTime() != nsInfo.getCTime()) { |
| throw new IOException("Datanode CTime (=" + getCTime() |
| + ") is not equal to namenode CTime (=" + nsInfo.getCTime() + ")"); |
| } |
| setServiceLayoutVersion(getServiceLayoutVersion()); |
| writeProperties(sd); |
| } |
| |
| return sd; |
| } catch (IOException ioe) { |
| sd.unlock(); |
| throw ioe; |
| } |
| } |
| |
| /** |
| * Analyze and load storage directories. Recover from previous transitions if |
| * required. |
| * |
| * The block pool storages are either all analyzed or none of them is loaded. |
| * Therefore, a failure on loading any block pool storage results a faulty |
| * data volume. |
| * |
| * @param nsInfo namespace information |
| * @param dataDirs storage directories of block pool |
| * @param startOpt startup option |
| * @return an array of loaded block pool directories. |
| * @throws IOException on error |
| */ |
| List<StorageDirectory> loadBpStorageDirectories(NamespaceInfo nsInfo, |
| StorageLocation location, StartupOption startOpt, |
| List<Callable<StorageDirectory>> callables, Configuration conf) |
| throws IOException { |
| List<StorageDirectory> succeedDirs = Lists.newArrayList(); |
| try { |
| if (containsStorageDir(location, nsInfo.getBlockPoolID())) { |
| throw new IOException( |
| "BlockPoolSliceStorage.recoverTransitionRead: " + |
| "attempt to load an used block storage: " + location); |
| } |
| final StorageDirectory sd = loadStorageDirectory( |
| nsInfo, location, startOpt, callables, conf); |
| succeedDirs.add(sd); |
| } catch (IOException e) { |
| LOG.warn("Failed to analyze storage directories for block pool {}", |
| nsInfo.getBlockPoolID(), e); |
| throw e; |
| } |
| return succeedDirs; |
| } |
| |
| /** |
| * Analyze storage directories. Recover from previous transitions if required. |
| * |
| * The block pool storages are either all analyzed or none of them is loaded. |
| * Therefore, a failure on loading any block pool storage results a faulty |
| * data volume. |
| * |
| * @param nsInfo namespace information |
| * @param dataDirs storage directories of block pool |
| * @param startOpt startup option |
| * @throws IOException on error |
| */ |
| List<StorageDirectory> recoverTransitionRead(NamespaceInfo nsInfo, |
| StorageLocation location, StartupOption startOpt, |
| List<Callable<StorageDirectory>> callables, Configuration conf) |
| throws IOException { |
| LOG.info("Analyzing storage directories for bpid {}", nsInfo |
| .getBlockPoolID()); |
| final List<StorageDirectory> loaded = loadBpStorageDirectories( |
| nsInfo, location, startOpt, callables, conf); |
| for (StorageDirectory sd : loaded) { |
| addStorageDir(sd); |
| } |
| return loaded; |
| } |
| |
| /** |
| * Format a block pool slice storage. |
| * @param dnCurDir DataStorage current directory |
| * @param nsInfo the name space info |
| * @throws IOException Signals that an I/O exception has occurred. |
| */ |
| void format(File dnCurDir, NamespaceInfo nsInfo) throws IOException { |
| File curBpDir = getBpRoot(nsInfo.getBlockPoolID(), dnCurDir); |
| StorageDirectory bpSdir = new StorageDirectory(curBpDir); |
| format(bpSdir, nsInfo); |
| } |
| |
| /** |
| * Format a block pool slice storage. |
| * @param bpSdir the block pool storage |
| * @param nsInfo the name space info |
| * @throws IOException Signals that an I/O exception has occurred. |
| */ |
| private void format(StorageDirectory bpSdir, NamespaceInfo nsInfo) throws IOException { |
| LOG.info("Formatting block pool {} directory {}", blockpoolID, bpSdir |
| .getCurrentDir()); |
| bpSdir.clearDirectory(); // create directory |
| this.layoutVersion = HdfsServerConstants.DATANODE_LAYOUT_VERSION; |
| this.cTime = nsInfo.getCTime(); |
| this.namespaceID = nsInfo.getNamespaceID(); |
| this.blockpoolID = nsInfo.getBlockPoolID(); |
| writeProperties(bpSdir); |
| } |
| |
| /** |
| * Remove block pool level storage directory. |
| * @param absPathToRemove the absolute path of the root for the block pool |
| * level storage to remove. |
| */ |
| void remove(File absPathToRemove) { |
| Preconditions.checkArgument(absPathToRemove.isAbsolute()); |
| LOG.info("Removing block level storage: {}", absPathToRemove); |
| for (Iterator<StorageDirectory> it = getStorageDirs().iterator(); |
| it.hasNext(); ) { |
| StorageDirectory sd = it.next(); |
| if (sd.getRoot().getAbsoluteFile().equals(absPathToRemove)) { |
| getStorageDirs().remove(sd); |
| break; |
| } |
| } |
| } |
| |
| /** |
| * Set layoutVersion, namespaceID and blockpoolID into block pool storage |
| * VERSION file |
| */ |
| @Override |
| protected void setPropertiesFromFields(Properties props, StorageDirectory sd) |
| throws IOException { |
| props.setProperty("layoutVersion", String.valueOf(layoutVersion)); |
| props.setProperty("namespaceID", String.valueOf(namespaceID)); |
| props.setProperty("blockpoolID", blockpoolID); |
| props.setProperty("cTime", String.valueOf(cTime)); |
| } |
| |
| /** Validate and set block pool ID */ |
| private void setBlockPoolID(File storage, String bpid) |
| throws InconsistentFSStateException { |
| if (bpid == null || bpid.equals("")) { |
| throw new InconsistentFSStateException(storage, "file " |
| + STORAGE_FILE_VERSION + " is invalid."); |
| } |
| |
| if (!blockpoolID.equals("") && !blockpoolID.equals(bpid)) { |
| throw new InconsistentFSStateException(storage, |
| "Unexpected blockpoolID " + bpid + ". Expected " + blockpoolID); |
| } |
| blockpoolID = bpid; |
| } |
| |
| @Override |
| protected void setFieldsFromProperties(Properties props, StorageDirectory sd) |
| throws IOException { |
| setLayoutVersion(props, sd); |
| setNamespaceID(props, sd); |
| setcTime(props, sd); |
| |
| String sbpid = props.getProperty("blockpoolID"); |
| setBlockPoolID(sd.getRoot(), sbpid); |
| } |
| |
| /** |
| * Analyze whether a transition of the BP state is required and |
| * perform it if necessary. |
| * <br> |
| * Rollback if previousLV >= LAYOUT_VERSION && prevCTime <= namenode.cTime. |
| * Upgrade if this.LV > LAYOUT_VERSION || this.cTime < namenode.cTime Regular |
| * startup if this.LV = LAYOUT_VERSION && this.cTime = namenode.cTime |
| * |
| * @param sd storage directory <SD>/current/<bpid> |
| * @param nsInfo namespace info |
| * @param startOpt startup option |
| * @return true if the new properties has been written. |
| */ |
| private boolean doTransition(StorageDirectory sd, NamespaceInfo nsInfo, |
| StartupOption startOpt, List<Callable<StorageDirectory>> callables, |
| Configuration conf) throws IOException { |
| if (sd.getStorageLocation().getStorageType() == StorageType.PROVIDED) { |
| return false; // regular startup for PROVIDED storage directories |
| } |
| if (startOpt == StartupOption.ROLLBACK && sd.getPreviousDir().exists()) { |
| Preconditions.checkState(!getTrashRootDir(sd).exists(), |
| sd.getPreviousDir() + " and " + getTrashRootDir(sd) + " should not " + |
| " both be present."); |
| doRollback(sd, nsInfo); // rollback if applicable |
| } else if (startOpt == StartupOption.ROLLBACK && |
| !sd.getPreviousDir().exists()) { |
| // Restore all the files in the trash. The restored files are retained |
| // during rolling upgrade rollback. They are deleted during rolling |
| // upgrade downgrade. |
| int restored = restoreBlockFilesFromTrash(getTrashRootDir(sd)); |
| LOG.info("Restored {} block files from trash.", restored); |
| } |
| readProperties(sd); |
| checkVersionUpgradable(this.layoutVersion); |
| assert this.layoutVersion >= HdfsServerConstants.DATANODE_LAYOUT_VERSION |
| : "Future version is not allowed"; |
| if (getNamespaceID() != nsInfo.getNamespaceID()) { |
| throw new IOException("Incompatible namespaceIDs in " |
| + sd.getRoot().getCanonicalPath() + ": namenode namespaceID = " |
| + nsInfo.getNamespaceID() + "; datanode namespaceID = " |
| + getNamespaceID()); |
| } |
| if (!blockpoolID.equals(nsInfo.getBlockPoolID())) { |
| throw new IOException("Incompatible blockpoolIDs in " |
| + sd.getRoot().getCanonicalPath() + ": namenode blockpoolID = " |
| + nsInfo.getBlockPoolID() + "; datanode blockpoolID = " |
| + blockpoolID); |
| } |
| if (this.layoutVersion == HdfsServerConstants.DATANODE_LAYOUT_VERSION |
| && this.cTime == nsInfo.getCTime()) { |
| return false; // regular startup |
| } |
| if (this.layoutVersion > HdfsServerConstants.DATANODE_LAYOUT_VERSION) { |
| int restored = restoreBlockFilesFromTrash(getTrashRootDir(sd)); |
| LOG.info("Restored {} block files from trash " + |
| "before the layout upgrade. These blocks will be moved to " + |
| "the previous directory during the upgrade", restored); |
| } |
| if (this.layoutVersion > HdfsServerConstants.DATANODE_LAYOUT_VERSION |
| || this.cTime < nsInfo.getCTime()) { |
| doUpgrade(sd, nsInfo, callables, conf); // upgrade |
| return true; |
| } |
| // layoutVersion == LAYOUT_VERSION && this.cTime > nsInfo.cTime |
| // must shutdown |
| throw new IOException("Datanode state: LV = " + this.getLayoutVersion() |
| + " CTime = " + this.getCTime() |
| + " is newer than the namespace state: LV = " |
| + nsInfo.getLayoutVersion() + " CTime = " + nsInfo.getCTime()); |
| } |
| |
| /** |
| * Upgrade to any release after 0.22 (0.22 included) release e.g. 0.22 => 0.23 |
| * Upgrade procedure is as follows: |
| * <ol> |
| * <li>If <SD>/current/<bpid>/previous exists then delete it</li> |
| * <li>Rename <SD>/current/<bpid>/current to |
| * <SD>/current/bpid/current/previous.tmp</li> |
| * <li>Create new <SD>current/<bpid>/current directory</li> |
| * <ol> |
| * <li>Hard links for block files are created from previous.tmp to current</li> |
| * <li>Save new version file in current directory</li> |
| * </ol> |
| * <li>Rename previous.tmp to previous</li> </ol> |
| * |
| * @param bpSd storage directory <SD>/current/<bpid> |
| * @param nsInfo Namespace Info from the namenode |
| * @throws IOException on error |
| */ |
| private void doUpgrade(final StorageDirectory bpSd, |
| final NamespaceInfo nsInfo, |
| final List<Callable<StorageDirectory>> callables, |
| final Configuration conf) throws IOException { |
| // Upgrading is applicable only to release with federation or after |
| if (!DataNodeLayoutVersion.supports( |
| LayoutVersion.Feature.FEDERATION, layoutVersion)) { |
| return; |
| } |
| // no upgrades for storage directories that are PROVIDED |
| if (bpSd.getRoot() == null) { |
| return; |
| } |
| final int oldLV = getLayoutVersion(); |
| LOG.info("Upgrading block pool storage directory {}.\n old LV = {}; old" |
| + " CTime = {}.\n new LV = {}; new CTime = {}", |
| bpSd.getRoot(), oldLV, this.getCTime(), HdfsServerConstants |
| .DATANODE_LAYOUT_VERSION, nsInfo.getCTime()); |
| // get <SD>/previous directory |
| String dnRoot = getDataNodeStorageRoot(bpSd.getRoot().getCanonicalPath()); |
| StorageDirectory dnSdStorage = new StorageDirectory(new File(dnRoot)); |
| File dnPrevDir = dnSdStorage.getPreviousDir(); |
| |
| // If <SD>/previous directory exists delete it |
| if (dnPrevDir.exists()) { |
| deleteDir(dnPrevDir); |
| } |
| final File bpCurDir = bpSd.getCurrentDir(); |
| final File bpPrevDir = bpSd.getPreviousDir(); |
| assert bpCurDir.exists() : "BP level current directory must exist."; |
| cleanupDetachDir(new File(bpCurDir, DataStorage.STORAGE_DIR_DETACHED)); |
| |
| // 1. Delete <SD>/current/<bpid>/previous dir before upgrading |
| if (bpPrevDir.exists()) { |
| deleteDir(bpPrevDir); |
| } |
| final File bpTmpDir = bpSd.getPreviousTmp(); |
| assert !bpTmpDir.exists() : "previous.tmp directory must not exist."; |
| |
| // 2. Rename <SD>/current/<bpid>/current to |
| // <SD>/current/<bpid>/previous.tmp |
| rename(bpCurDir, bpTmpDir); |
| |
| final String name = "block pool " + blockpoolID + " at " + bpSd.getRoot(); |
| if (callables == null) { |
| doUpgrade(name, bpSd, nsInfo, bpPrevDir, bpTmpDir, bpCurDir, oldLV, conf); |
| } else { |
| callables.add(new Callable<StorageDirectory>() { |
| @Override |
| public StorageDirectory call() throws Exception { |
| doUpgrade(name, bpSd, nsInfo, bpPrevDir, bpTmpDir, bpCurDir, oldLV, |
| conf); |
| return bpSd; |
| } |
| }); |
| } |
| } |
| |
| private void doUpgrade(String name, final StorageDirectory bpSd, |
| NamespaceInfo nsInfo, final File bpPrevDir, final File bpTmpDir, |
| final File bpCurDir, final int oldLV, Configuration conf) |
| throws IOException { |
| // 3. Create new <SD>/current with block files hardlinks and VERSION |
| linkAllBlocks(bpTmpDir, bpCurDir, oldLV, conf); |
| this.layoutVersion = HdfsServerConstants.DATANODE_LAYOUT_VERSION; |
| assert this.namespaceID == nsInfo.getNamespaceID() |
| : "Data-node and name-node layout versions must be the same."; |
| this.cTime = nsInfo.getCTime(); |
| writeProperties(bpSd); |
| |
| // 4.rename <SD>/current/<bpid>/previous.tmp to |
| // <SD>/current/<bpid>/previous |
| rename(bpTmpDir, bpPrevDir); |
| LOG.info("Upgrade of {} is complete", name); |
| } |
| |
| /** |
| * Cleanup the detachDir. |
| * |
| * If the directory is not empty report an error; Otherwise remove the |
| * directory. |
| * |
| * @param detachDir detach directory |
| * @throws IOException if the directory is not empty or it can not be removed |
| */ |
| private void cleanupDetachDir(File detachDir) throws IOException { |
| if (!DataNodeLayoutVersion.supports( |
| LayoutVersion.Feature.APPEND_RBW_DIR, layoutVersion) |
| && detachDir.exists() && detachDir.isDirectory()) { |
| |
| if (FileUtil.list(detachDir).length != 0) { |
| throw new IOException("Detached directory " + detachDir |
| + " is not empty. Please manually move each file under this " |
| + "directory to the finalized directory if the finalized " |
| + "directory tree does not have the file."); |
| } else if (!detachDir.delete()) { |
| throw new IOException("Cannot remove directory " + detachDir); |
| } |
| } |
| } |
| |
| /** |
| * Restore all files from the trash directory to their corresponding |
| * locations under current/ |
| */ |
| private int restoreBlockFilesFromTrash(File trashRoot) |
| throws IOException { |
| int filesRestored = 0; |
| File[] children = trashRoot.exists() ? trashRoot.listFiles() : null; |
| if (children == null) { |
| return 0; |
| } |
| |
| File restoreDirectory = null; |
| for (File child : children) { |
| if (child.isDirectory()) { |
| // Recurse to process subdirectories. |
| filesRestored += restoreBlockFilesFromTrash(child); |
| continue; |
| } |
| |
| if (restoreDirectory == null) { |
| restoreDirectory = new File(getRestoreDirectory(child)); |
| if (!restoreDirectory.exists() && !restoreDirectory.mkdirs()) { |
| throw new IOException("Failed to create directory " + restoreDirectory); |
| } |
| } |
| |
| final File newChild = new File(restoreDirectory, child.getName()); |
| |
| if (newChild.exists() && newChild.length() >= child.length()) { |
| // Failsafe - we should not hit this case but let's make sure |
| // we never overwrite a newer version of a block file with an |
| // older version. |
| LOG.info("Not overwriting {} with smaller file from " + |
| "trash directory. This message can be safely ignored.", newChild); |
| } else if (!child.renameTo(newChild)) { |
| throw new IOException("Failed to rename " + child + " to " + newChild); |
| } else { |
| ++filesRestored; |
| } |
| } |
| FileUtil.fullyDelete(trashRoot); |
| return filesRestored; |
| } |
| |
| /* |
| * Roll back to old snapshot at the block pool level |
| * If previous directory exists: |
| * <ol> |
| * <li>Rename <SD>/current/<bpid>/current to removed.tmp</li> |
| * <li>Rename * <SD>/current/<bpid>/previous to current</li> |
| * <li>Remove removed.tmp</li> |
| * </ol> |
| * |
| * Do nothing if previous directory does not exist. |
| * @param bpSd Block pool storage directory at <SD>/current/<bpid> |
| */ |
| void doRollback(StorageDirectory bpSd, NamespaceInfo nsInfo) |
| throws IOException { |
| File prevDir = bpSd.getPreviousDir(); |
| // regular startup if previous dir does not exist |
| if (prevDir == null || !prevDir.exists()) { |
| return; |
| } |
| // read attributes out of the VERSION file of previous directory |
| BlockPoolSliceStorage prevInfo = new BlockPoolSliceStorage(); |
| prevInfo.readPreviousVersionProperties(bpSd); |
| |
| // We allow rollback to a state, which is either consistent with |
| // the namespace state or can be further upgraded to it. |
| // In another word, we can only roll back when ( storedLV >= software LV) |
| // && ( DN.previousCTime <= NN.ctime) |
| if (!(prevInfo.getLayoutVersion() >= HdfsServerConstants.DATANODE_LAYOUT_VERSION && |
| prevInfo.getCTime() <= nsInfo.getCTime())) { // cannot rollback |
| throw new InconsistentFSStateException(bpSd.getRoot(), |
| "Cannot rollback to a newer state.\nDatanode previous state: LV = " |
| + prevInfo.getLayoutVersion() + " CTime = " + prevInfo.getCTime() |
| + " is newer than the namespace state: LV = " |
| + HdfsServerConstants.DATANODE_LAYOUT_VERSION + " CTime = " + nsInfo.getCTime()); |
| } |
| |
| LOG.info("Rolling back storage directory {}.\n target LV = {}; target " |
| + "CTime = {}", bpSd.getRoot(), nsInfo.getLayoutVersion(), |
| nsInfo.getCTime()); |
| File tmpDir = bpSd.getRemovedTmp(); |
| assert !tmpDir.exists() : "removed.tmp directory must not exist."; |
| // 1. rename current to tmp |
| File curDir = bpSd.getCurrentDir(); |
| assert curDir.exists() : "Current directory must exist."; |
| rename(curDir, tmpDir); |
| |
| // 2. rename previous to current |
| rename(prevDir, curDir); |
| |
| // 3. delete removed.tmp dir |
| deleteDir(tmpDir); |
| LOG.info("Rollback of {} is complete", bpSd.getRoot()); |
| } |
| |
| /* |
| * Finalize the block pool storage by deleting <BP>/previous directory |
| * that holds the snapshot. |
| */ |
| void doFinalize(File dnCurDir) throws IOException { |
| if (dnCurDir == null) { |
| return; //we do nothing if the directory is null |
| } |
| File bpRoot = getBpRoot(blockpoolID, dnCurDir); |
| StorageDirectory bpSd = new StorageDirectory(bpRoot); |
| // block pool level previous directory |
| File prevDir = bpSd.getPreviousDir(); |
| if (!prevDir.exists()) { |
| return; // already finalized |
| } |
| final String dataDirPath = bpSd.getRoot().getCanonicalPath(); |
| LOG.info("Finalizing upgrade for storage directory {}.\n cur LV = {}; " |
| + "cur CTime = {}", dataDirPath, this.getLayoutVersion(), |
| this.getCTime()); |
| assert bpSd.getCurrentDir().exists() : "Current directory must exist."; |
| |
| // rename previous to finalized.tmp |
| final File tmpDir = bpSd.getFinalizedTmp(); |
| rename(prevDir, tmpDir); |
| |
| // delete finalized.tmp dir in a separate thread |
| new Daemon(new Runnable() { |
| @Override |
| public void run() { |
| try { |
| deleteDir(tmpDir); |
| } catch (IOException ex) { |
| LOG.error("Finalize upgrade for {} failed.", dataDirPath, ex); |
| } |
| LOG.info("Finalize upgrade for {} is complete.", dataDirPath); |
| } |
| |
| @Override |
| public String toString() { |
| return "Finalize " + dataDirPath; |
| } |
| }).start(); |
| } |
| |
| /** |
| * Hardlink all finalized and RBW blocks in fromDir to toDir |
| * |
| * @param fromDir directory where the snapshot is stored |
| * @param toDir the current data directory |
| * @throws IOException if error occurs during hardlink |
| */ |
| private static void linkAllBlocks(File fromDir, File toDir, |
| int diskLayoutVersion, Configuration conf) throws IOException { |
| // do the link |
| // hardlink finalized blocks in tmpDir |
| HardLink hardLink = new HardLink(); |
| DataStorage.linkBlocks(fromDir, toDir, DataStorage.STORAGE_DIR_FINALIZED, |
| diskLayoutVersion, hardLink, conf); |
| DataStorage.linkBlocks(fromDir, toDir, DataStorage.STORAGE_DIR_RBW, |
| diskLayoutVersion, hardLink, conf); |
| LOG.info("Linked blocks from {} to {}. {}", fromDir, toDir, |
| hardLink.linkStats.report()); |
| } |
| |
| /** |
| * gets the data node storage directory based on block pool storage |
| */ |
| private static String getDataNodeStorageRoot(String bpRoot) { |
| Matcher matcher = BLOCK_POOL_PATH_PATTERN.matcher(bpRoot); |
| if (matcher.matches()) { |
| // return the data node root directory |
| return matcher.group(1); |
| } |
| return bpRoot; |
| } |
| |
| @Override |
| public String toString() { |
| return super.toString() + ";bpid=" + blockpoolID; |
| } |
| |
| /** |
| * Get a block pool storage root based on data node storage root |
| * @param bpID block pool ID |
| * @param dnCurDir data node storage root directory |
| * @return root directory for block pool storage |
| */ |
| public static File getBpRoot(String bpID, File dnCurDir) { |
| return new File(dnCurDir, bpID); |
| } |
| |
| @Override |
| public boolean isPreUpgradableLayout(StorageDirectory sd) throws IOException { |
| return false; |
| } |
| |
| private File getTrashRootDir(StorageDirectory sd) { |
| return new File(sd.getRoot(), TRASH_ROOT_DIR); |
| } |
| |
| /** |
| * Determine whether we can use trash for the given blockFile. Trash |
| * is disallowed if a 'previous' directory exists for the |
| * storage directory containing the block. |
| */ |
| @VisibleForTesting |
| public boolean isTrashAllowed(File blockFile) { |
| Matcher matcher = BLOCK_POOL_CURRENT_PATH_PATTERN.matcher(blockFile.getParent()); |
| String previousDir = matcher.replaceFirst("$1$2" + STORAGE_DIR_PREVIOUS); |
| return !(new File(previousDir)).exists(); |
| } |
| |
| /** |
| * Get a target subdirectory under trash/ for a given block file that is being |
| * deleted. |
| * |
| * The subdirectory structure under trash/ mirrors that under current/ to keep |
| * implicit memory of where the files are to be restored (if necessary). |
| * |
| * @return the trash directory for a given block file that is being deleted. |
| */ |
| public String getTrashDirectory(ReplicaInfo info) { |
| |
| URI blockURI = info.getBlockURI(); |
| try{ |
| File blockFile = new File(blockURI); |
| return getTrashDirectory(blockFile); |
| } catch (IllegalArgumentException e) { |
| LOG.warn("Failed to get block file for replica {}", info, e); |
| } |
| |
| return null; |
| } |
| |
| private String getTrashDirectory(File blockFile) { |
| if (isTrashAllowed(blockFile)) { |
| Matcher matcher = BLOCK_POOL_CURRENT_PATH_PATTERN.matcher(blockFile.getParent()); |
| String trashDirectory = matcher.replaceFirst("$1$2" + TRASH_ROOT_DIR + "$4"); |
| return trashDirectory; |
| } |
| return null; |
| } |
| |
| /** |
| * Get a target subdirectory under current/ for a given block file that is being |
| * restored from trash. |
| * |
| * The subdirectory structure under trash/ mirrors that under current/ to keep |
| * implicit memory of where the files are to be restored. |
| * |
| * @return the target directory to restore a previously deleted block file. |
| */ |
| @VisibleForTesting |
| String getRestoreDirectory(File blockFile) { |
| Matcher matcher = BLOCK_POOL_TRASH_PATH_PATTERN.matcher(blockFile.getParent()); |
| String restoreDirectory = matcher.replaceFirst("$1$2" + STORAGE_DIR_CURRENT + "$4"); |
| LOG.info("Restoring {} to {}", blockFile, restoreDirectory); |
| return restoreDirectory; |
| } |
| |
| /** |
| * Delete all files and directories in the trash directories. |
| */ |
| public void clearTrash() { |
| final List<File> trashRoots = new ArrayList<>(); |
| for (StorageDirectory sd : getStorageDirs()) { |
| File trashRoot = getTrashRootDir(sd); |
| if (trashRoot.exists() && sd.getPreviousDir().exists()) { |
| LOG.error("Trash and PreviousDir shouldn't both exist for storage " |
| + "directory {}", sd); |
| assert false; |
| } else { |
| trashRoots.add(trashRoot); |
| } |
| } |
| |
| stopTrashCleaner(); |
| trashCleaner = new Daemon(new Runnable() { |
| @Override |
| public void run() { |
| for(File trashRoot : trashRoots){ |
| FileUtil.fullyDelete(trashRoot); |
| LOG.info("Cleared trash for storage directory {}", trashRoot); |
| } |
| } |
| |
| @Override |
| public String toString() { |
| return "clearTrash() for " + blockpoolID; |
| } |
| }); |
| trashCleaner.start(); |
| } |
| |
| public void stopTrashCleaner() { |
| if (trashCleaner != null) { |
| trashCleaner.interrupt(); |
| } |
| } |
| |
| /** trash is enabled if at least one storage directory contains trash root */ |
| @VisibleForTesting |
| public boolean trashEnabled() { |
| for (StorageDirectory sd : getStorageDirs()) { |
| if (getTrashRootDir(sd).exists()) { |
| return true; |
| } |
| } |
| return false; |
| } |
| |
| /** |
| * Create a rolling upgrade marker file for each BP storage root, if it |
| * does not exist already. |
| */ |
| public void setRollingUpgradeMarkers(List<StorageDirectory> dnStorageDirs) |
| throws IOException { |
| for (StorageDirectory sd : dnStorageDirs) { |
| if (sd.getCurrentDir() == null) { |
| return; |
| } |
| File bpRoot = getBpRoot(blockpoolID, sd.getCurrentDir()); |
| File markerFile = new File(bpRoot, ROLLING_UPGRADE_MARKER_FILE); |
| if (!storagesWithRollingUpgradeMarker.contains(bpRoot.toString())) { |
| if (!markerFile.exists() && markerFile.createNewFile()) { |
| LOG.info("Created {}", markerFile); |
| } else { |
| LOG.info("{} already exists.", markerFile); |
| } |
| storagesWithRollingUpgradeMarker.add(bpRoot.toString()); |
| storagesWithoutRollingUpgradeMarker.remove(bpRoot.toString()); |
| } |
| } |
| } |
| |
| /** |
| * Check whether the rolling upgrade marker file exists for each BP storage |
| * root. If it does exist, then the marker file is cleared and more |
| * importantly the layout upgrade is finalized. |
| */ |
| public void clearRollingUpgradeMarkers(List<StorageDirectory> dnStorageDirs) |
| throws IOException { |
| for (StorageDirectory sd : dnStorageDirs) { |
| if (sd.getCurrentDir() == null) { |
| continue; |
| } |
| File bpRoot = getBpRoot(blockpoolID, sd.getCurrentDir()); |
| File markerFile = new File(bpRoot, ROLLING_UPGRADE_MARKER_FILE); |
| if (!storagesWithoutRollingUpgradeMarker.contains(bpRoot.toString())) { |
| if (markerFile.exists()) { |
| LOG.info("Deleting {}", markerFile); |
| doFinalize(sd.getCurrentDir()); |
| if (!markerFile.delete()) { |
| LOG.warn("Failed to delete {}", markerFile); |
| } |
| } |
| storagesWithoutRollingUpgradeMarker.add(bpRoot.toString()); |
| storagesWithRollingUpgradeMarker.remove(bpRoot.toString()); |
| } |
| } |
| } |
| } |