| /** |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| |
| package org.apache.hadoop.hdfs.server.datanode; |
| |
| import com.google.common.collect.Lists; |
| import com.google.common.util.concurrent.Futures; |
| import org.apache.hadoop.classification.InterfaceAudience; |
| import org.apache.hadoop.conf.Configuration; |
| import org.apache.hadoop.fs.FileSystem; |
| import org.apache.hadoop.fs.FileUtil; |
| import org.apache.hadoop.fs.HardLink; |
| import org.apache.hadoop.fs.LocalFileSystem; |
| import org.apache.hadoop.fs.Path; |
| import org.apache.hadoop.fs.permission.FsPermission; |
| import org.apache.hadoop.hdfs.DFSConfigKeys; |
| import org.apache.hadoop.hdfs.HdfsConfiguration; |
| import org.apache.hadoop.hdfs.protocol.Block; |
| import org.apache.hadoop.hdfs.protocol.HdfsConstants; |
| import org.apache.hadoop.hdfs.protocol.LayoutVersion; |
| import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.NodeType; |
| import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.StartupOption; |
| import org.apache.hadoop.hdfs.server.common.InconsistentFSStateException; |
| import org.apache.hadoop.hdfs.server.common.Storage; |
| import org.apache.hadoop.hdfs.server.common.StorageInfo; |
| import org.apache.hadoop.hdfs.server.protocol.DatanodeStorage; |
| import org.apache.hadoop.hdfs.server.protocol.NamespaceInfo; |
| import org.apache.hadoop.io.IOUtils; |
| import org.apache.hadoop.io.nativeio.NativeIO; |
| import org.apache.hadoop.util.Daemon; |
| import org.apache.hadoop.util.DiskChecker; |
| |
| import java.io.File; |
| import java.io.FileInputStream; |
| import java.io.FileOutputStream; |
| import java.io.IOException; |
| import java.io.RandomAccessFile; |
| import java.nio.channels.FileLock; |
| import java.util.ArrayList; |
| import java.util.Collection; |
| import java.util.Collections; |
| import java.util.HashMap; |
| import java.util.HashSet; |
| import java.util.Iterator; |
| import java.util.List; |
| import java.util.Map; |
| import java.util.Properties; |
| import java.util.Set; |
| import java.util.concurrent.Callable; |
| import java.util.concurrent.ConcurrentHashMap; |
| import java.util.concurrent.ExecutorService; |
| import java.util.concurrent.Executors; |
| import java.util.concurrent.Future; |
| |
| /** |
| * Data storage information file. |
| * <p> |
| * @see Storage |
| */ |
| @InterfaceAudience.Private |
| public class DataStorage extends Storage { |
| |
| public final static String BLOCK_SUBDIR_PREFIX = "subdir"; |
| final static String BLOCK_FILE_PREFIX = "blk_"; |
| final static String COPY_FILE_PREFIX = "dncp_"; |
| final static String STORAGE_DIR_DETACHED = "detach"; |
| public final static String STORAGE_DIR_RBW = "rbw"; |
| public final static String STORAGE_DIR_FINALIZED = "finalized"; |
| public final static String STORAGE_DIR_TMP = "tmp"; |
| |
| // Set of bpids for which 'trash' is currently enabled. |
| // When trash is enabled block files are moved under a separate |
| // 'trash' folder instead of being deleted right away. This can |
| // be useful during rolling upgrades, for example. |
| // The set is backed by a concurrent HashMap. |
| private Set<String> trashEnabledBpids; |
| |
| /** |
| * Datanode UUID that this storage is currently attached to. This |
| * is the same as the legacy StorageID for datanodes that were |
| * upgraded from a pre-UUID version. For compatibility with prior |
| * versions of Datanodes we cannot make this field a UUID. |
| */ |
| private String datanodeUuid = null; |
| |
| // Flag to ensure we only initialize storage once |
| private boolean initialized = false; |
| |
| // Maps block pool IDs to block pool storage |
| private final Map<String, BlockPoolSliceStorage> bpStorageMap |
| = Collections.synchronizedMap(new HashMap<String, BlockPoolSliceStorage>()); |
| |
| |
| DataStorage() { |
| super(NodeType.DATA_NODE); |
| trashEnabledBpids = Collections.newSetFromMap( |
| new ConcurrentHashMap<String, Boolean>()); |
| } |
| |
| public BlockPoolSliceStorage getBPStorage(String bpid) { |
| return bpStorageMap.get(bpid); |
| } |
| |
| public DataStorage(StorageInfo storageInfo) { |
| super(storageInfo); |
| } |
| |
| public synchronized String getDatanodeUuid() { |
| return datanodeUuid; |
| } |
| |
| public synchronized void setDatanodeUuid(String newDatanodeUuid) { |
| this.datanodeUuid = newDatanodeUuid; |
| } |
| |
| /** Create an ID for this storage. */ |
| public synchronized void createStorageID(StorageDirectory sd) { |
| if (sd.getStorageUuid() == null) { |
| sd.setStorageUuid(DatanodeStorage.generateUuid()); |
| } |
| } |
| |
| /** |
| * Enable trash for the specified block pool storage. |
| */ |
| public void enableTrash(String bpid) { |
| if (trashEnabledBpids.add(bpid)) { |
| LOG.info("Enabled trash for bpid " + bpid); |
| } |
| } |
| |
| public void restoreTrash(String bpid) { |
| if (trashEnabledBpids.contains(bpid)) { |
| getBPStorage(bpid).restoreTrash(); |
| trashEnabledBpids.remove(bpid); |
| LOG.info("Restored trash for bpid " + bpid); |
| } |
| } |
| |
| public boolean trashEnabled(String bpid) { |
| return trashEnabledBpids.contains(bpid); |
| } |
| |
| /** |
| * If rolling upgrades are in progress then do not delete block files |
| * immediately. Instead we move the block files to an intermediate |
| * 'trash' directory. If there is a subsequent rollback, then the block |
| * files will be restored from trash. |
| * |
| * @return trash directory if rolling upgrade is in progress, null |
| * otherwise. |
| */ |
| public String getTrashDirectoryForBlockFile(String bpid, File blockFile) { |
| if (trashEnabledBpids.contains(bpid)) { |
| return getBPStorage(bpid).getTrashDirectory(blockFile); |
| } |
| return null; |
| } |
| |
| /** |
| * {{@inheritDoc org.apache.hadoop.hdfs.server.common.Storage#writeAll()}} |
| */ |
| private void writeAll(Collection<StorageDirectory> dirs) throws IOException { |
| this.layoutVersion = getServiceLayoutVersion(); |
| for (StorageDirectory dir : dirs) { |
| writeProperties(dir); |
| } |
| } |
| |
| /** |
| * Add a list of volumes to be managed by DataStorage. If the volume is empty, |
| * format it, otherwise recover it from previous transitions if required. |
| * |
| * @param datanode the reference to DataNode. |
| * @param nsInfo namespace information |
| * @param dataDirs array of data storage directories |
| * @param startOpt startup option |
| * @throws IOException |
| */ |
| synchronized void addStorageLocations(DataNode datanode, |
| NamespaceInfo nsInfo, Collection<StorageLocation> dataDirs, |
| StartupOption startOpt) |
| throws IOException { |
| // Similar to recoverTransitionRead, it first ensures the datanode level |
| // format is completed. |
| List<StorageLocation> tmpDataDirs = |
| new ArrayList<StorageLocation>(dataDirs); |
| addStorageLocations(datanode, nsInfo, tmpDataDirs, startOpt, false, true); |
| |
| Collection<File> bpDataDirs = new ArrayList<File>(); |
| String bpid = nsInfo.getBlockPoolID(); |
| for (StorageLocation dir : dataDirs) { |
| File dnRoot = dir.getFile(); |
| File bpRoot = BlockPoolSliceStorage.getBpRoot(bpid, new File(dnRoot, |
| STORAGE_DIR_CURRENT)); |
| bpDataDirs.add(bpRoot); |
| } |
| // mkdir for the list of BlockPoolStorage |
| makeBlockPoolDataDir(bpDataDirs, null); |
| BlockPoolSliceStorage bpStorage = this.bpStorageMap.get(bpid); |
| if (bpStorage == null) { |
| bpStorage = new BlockPoolSliceStorage( |
| nsInfo.getNamespaceID(), bpid, nsInfo.getCTime(), |
| nsInfo.getClusterID()); |
| } |
| |
| bpStorage.recoverTransitionRead(datanode, nsInfo, bpDataDirs, startOpt); |
| addBlockPoolStorage(bpid, bpStorage); |
| } |
| |
| /** |
| * Add a list of volumes to be managed by this DataStorage. If the volume is |
| * empty, it formats the volume, otherwise it recovers it from previous |
| * transitions if required. |
| * |
| * If isInitialize is false, only the directories that have finished the |
| * doTransition() process will be added into DataStorage. |
| * |
| * @param datanode the reference to DataNode. |
| * @param nsInfo namespace information |
| * @param dataDirs array of data storage directories |
| * @param startOpt startup option |
| * @param isInitialize whether it is called when DataNode starts up. |
| * @throws IOException |
| */ |
| private synchronized void addStorageLocations(DataNode datanode, |
| NamespaceInfo nsInfo, Collection<StorageLocation> dataDirs, |
| StartupOption startOpt, boolean isInitialize, boolean ignoreExistingDirs) |
| throws IOException { |
| Set<String> existingStorageDirs = new HashSet<String>(); |
| for (int i = 0; i < getNumStorageDirs(); i++) { |
| existingStorageDirs.add(getStorageDir(i).getRoot().getAbsolutePath()); |
| } |
| |
| // 1. For each data directory calculate its state and check whether all is |
| // consistent before transitioning. Format and recover. |
| ArrayList<StorageState> dataDirStates = |
| new ArrayList<StorageState>(dataDirs.size()); |
| List<StorageDirectory> addedStorageDirectories = |
| new ArrayList<StorageDirectory>(); |
| for(Iterator<StorageLocation> it = dataDirs.iterator(); it.hasNext();) { |
| File dataDir = it.next().getFile(); |
| if (existingStorageDirs.contains(dataDir.getAbsolutePath())) { |
| LOG.info("Storage directory " + dataDir + " has already been used."); |
| it.remove(); |
| continue; |
| } |
| StorageDirectory sd = new StorageDirectory(dataDir); |
| StorageState curState; |
| try { |
| curState = sd.analyzeStorage(startOpt, this); |
| // sd is locked but not opened |
| switch (curState) { |
| case NORMAL: |
| break; |
| case NON_EXISTENT: |
| // ignore this storage |
| LOG.info("Storage directory " + dataDir + " does not exist"); |
| it.remove(); |
| continue; |
| case NOT_FORMATTED: // format |
| LOG.info("Storage directory " + dataDir + " is not formatted for " |
| + nsInfo.getBlockPoolID()); |
| LOG.info("Formatting ..."); |
| format(sd, nsInfo, datanode.getDatanodeUuid()); |
| break; |
| default: // recovery part is common |
| sd.doRecover(curState); |
| } |
| } catch (IOException ioe) { |
| sd.unlock(); |
| LOG.warn("Ignoring storage directory " + dataDir |
| + " due to an exception", ioe); |
| //continue with other good dirs |
| continue; |
| } |
| if (isInitialize) { |
| addStorageDir(sd); |
| } |
| addedStorageDirectories.add(sd); |
| dataDirStates.add(curState); |
| } |
| |
| if (dataDirs.size() == 0 || dataDirStates.size() == 0) { |
| // none of the data dirs exist |
| if (ignoreExistingDirs) { |
| return; |
| } |
| throw new IOException( |
| "All specified directories are not accessible or do not exist."); |
| } |
| |
| // 2. Do transitions |
| // Each storage directory is treated individually. |
| // During startup some of them can upgrade or rollback |
| // while others could be up-to-date for the regular startup. |
| for (Iterator<StorageDirectory> it = addedStorageDirectories.iterator(); |
| it.hasNext(); ) { |
| StorageDirectory sd = it.next(); |
| try { |
| doTransition(datanode, sd, nsInfo, startOpt); |
| createStorageID(sd); |
| } catch (IOException e) { |
| if (!isInitialize) { |
| sd.unlock(); |
| it.remove(); |
| continue; |
| } |
| unlockAll(); |
| throw e; |
| } |
| } |
| |
| // 3. Update all successfully loaded storages. Some of them might have just |
| // been formatted. |
| this.writeAll(addedStorageDirectories); |
| |
| // 4. Make newly loaded storage directories visible for service. |
| if (!isInitialize) { |
| this.storageDirs.addAll(addedStorageDirectories); |
| } |
| } |
| |
| /** |
| * Analyze storage directories. |
| * Recover from previous transitions if required. |
| * Perform fs state transition if necessary depending on the namespace info. |
| * Read storage info. |
| * <br> |
| * This method should be synchronized between multiple DN threads. Only the |
| * first DN thread does DN level storage dir recoverTransitionRead. |
| * |
| * @param nsInfo namespace information |
| * @param dataDirs array of data storage directories |
| * @param startOpt startup option |
| * @throws IOException |
| */ |
| synchronized void recoverTransitionRead(DataNode datanode, |
| NamespaceInfo nsInfo, Collection<StorageLocation> dataDirs, |
| StartupOption startOpt) |
| throws IOException { |
| if (initialized) { |
| // DN storage has been initialized, no need to do anything |
| return; |
| } |
| LOG.info("DataNode version: " + HdfsConstants.DATANODE_LAYOUT_VERSION |
| + " and NameNode layout version: " + nsInfo.getLayoutVersion()); |
| |
| this.storageDirs = new ArrayList<StorageDirectory>(dataDirs.size()); |
| addStorageLocations(datanode, nsInfo, dataDirs, startOpt, true, false); |
| |
| // mark DN storage is initialized |
| this.initialized = true; |
| } |
| |
| /** |
| * recoverTransitionRead for a specific block pool |
| * |
| * @param datanode DataNode |
| * @param bpID Block pool Id |
| * @param nsInfo Namespace info of namenode corresponding to the block pool |
| * @param dataDirs Storage directories |
| * @param startOpt startup option |
| * @throws IOException on error |
| */ |
| void recoverTransitionRead(DataNode datanode, String bpID, NamespaceInfo nsInfo, |
| Collection<StorageLocation> dataDirs, StartupOption startOpt) throws IOException { |
| // First ensure datanode level format/snapshot/rollback is completed |
| recoverTransitionRead(datanode, nsInfo, dataDirs, startOpt); |
| |
| // Create list of storage directories for the block pool |
| Collection<File> bpDataDirs = new ArrayList<File>(); |
| for(StorageLocation dir : dataDirs) { |
| File dnRoot = dir.getFile(); |
| File bpRoot = BlockPoolSliceStorage.getBpRoot(bpID, new File(dnRoot, |
| STORAGE_DIR_CURRENT)); |
| bpDataDirs.add(bpRoot); |
| } |
| |
| // mkdir for the list of BlockPoolStorage |
| makeBlockPoolDataDir(bpDataDirs, null); |
| BlockPoolSliceStorage bpStorage = new BlockPoolSliceStorage( |
| nsInfo.getNamespaceID(), bpID, nsInfo.getCTime(), nsInfo.getClusterID()); |
| |
| bpStorage.recoverTransitionRead(datanode, nsInfo, bpDataDirs, startOpt); |
| addBlockPoolStorage(bpID, bpStorage); |
| } |
| |
| /** |
| * Create physical directory for block pools on the data node |
| * |
| * @param dataDirs |
| * List of data directories |
| * @param conf |
| * Configuration instance to use. |
| * @throws IOException on errors |
| */ |
| static void makeBlockPoolDataDir(Collection<File> dataDirs, |
| Configuration conf) throws IOException { |
| if (conf == null) |
| conf = new HdfsConfiguration(); |
| |
| LocalFileSystem localFS = FileSystem.getLocal(conf); |
| FsPermission permission = new FsPermission(conf.get( |
| DFSConfigKeys.DFS_DATANODE_DATA_DIR_PERMISSION_KEY, |
| DFSConfigKeys.DFS_DATANODE_DATA_DIR_PERMISSION_DEFAULT)); |
| for (File data : dataDirs) { |
| try { |
| DiskChecker.checkDir(localFS, new Path(data.toURI()), permission); |
| } catch ( IOException e ) { |
| LOG.warn("Invalid directory in: " + data.getCanonicalPath() + ": " |
| + e.getMessage()); |
| } |
| } |
| } |
| |
| void format(StorageDirectory sd, NamespaceInfo nsInfo, |
| String datanodeUuid) throws IOException { |
| sd.clearDirectory(); // create directory |
| this.layoutVersion = HdfsConstants.DATANODE_LAYOUT_VERSION; |
| this.clusterID = nsInfo.getClusterID(); |
| this.namespaceID = nsInfo.getNamespaceID(); |
| this.cTime = 0; |
| this.datanodeUuid = datanodeUuid; |
| |
| if (sd.getStorageUuid() == null) { |
| // Assign a new Storage UUID. |
| sd.setStorageUuid(DatanodeStorage.generateUuid()); |
| } |
| |
| writeProperties(sd); |
| } |
| |
| /* |
| * Set ClusterID, StorageID, StorageType, CTime into |
| * DataStorage VERSION file. |
| * Always called just before writing the properties to |
| * the VERSION file. |
| */ |
| @Override |
| protected void setPropertiesFromFields(Properties props, |
| StorageDirectory sd |
| ) throws IOException { |
| props.setProperty("storageType", storageType.toString()); |
| props.setProperty("clusterID", clusterID); |
| props.setProperty("cTime", String.valueOf(cTime)); |
| props.setProperty("layoutVersion", String.valueOf(layoutVersion)); |
| props.setProperty("storageID", sd.getStorageUuid()); |
| |
| String datanodeUuid = getDatanodeUuid(); |
| if (datanodeUuid != null) { |
| props.setProperty("datanodeUuid", datanodeUuid); |
| } |
| |
| // Set NamespaceID in version before federation |
| if (!DataNodeLayoutVersion.supports( |
| LayoutVersion.Feature.FEDERATION, layoutVersion)) { |
| props.setProperty("namespaceID", String.valueOf(namespaceID)); |
| } |
| } |
| |
| /* |
| * Read ClusterID, StorageID, StorageType, CTime from |
| * DataStorage VERSION file and verify them. |
| * Always called just after reading the properties from the VERSION file. |
| */ |
| @Override |
| protected void setFieldsFromProperties(Properties props, StorageDirectory sd) |
| throws IOException { |
| setFieldsFromProperties(props, sd, false, 0); |
| } |
| |
| private void setFieldsFromProperties(Properties props, StorageDirectory sd, |
| boolean overrideLayoutVersion, int toLayoutVersion) throws IOException { |
| if (overrideLayoutVersion) { |
| this.layoutVersion = toLayoutVersion; |
| } else { |
| setLayoutVersion(props, sd); |
| } |
| setcTime(props, sd); |
| checkStorageType(props, sd); |
| setClusterId(props, layoutVersion, sd); |
| |
| // Read NamespaceID in version before federation |
| if (!DataNodeLayoutVersion.supports( |
| LayoutVersion.Feature.FEDERATION, layoutVersion)) { |
| setNamespaceID(props, sd); |
| } |
| |
| |
| // valid storage id, storage id may be empty |
| String ssid = props.getProperty("storageID"); |
| if (ssid == null) { |
| throw new InconsistentFSStateException(sd.getRoot(), "file " |
| + STORAGE_FILE_VERSION + " is invalid."); |
| } |
| String sid = sd.getStorageUuid(); |
| if (!(sid == null || sid.equals("") || |
| ssid.equals("") || sid.equals(ssid))) { |
| throw new InconsistentFSStateException(sd.getRoot(), |
| "has incompatible storage Id."); |
| } |
| |
| if (sid == null) { // update id only if it was null |
| sd.setStorageUuid(ssid); |
| } |
| |
| // Update the datanode UUID if present. |
| if (props.getProperty("datanodeUuid") != null) { |
| String dnUuid = props.getProperty("datanodeUuid"); |
| |
| if (getDatanodeUuid() == null) { |
| setDatanodeUuid(dnUuid); |
| } else if (getDatanodeUuid().compareTo(dnUuid) != 0) { |
| throw new InconsistentFSStateException(sd.getRoot(), |
| "Root " + sd.getRoot() + ": DatanodeUuid=" + dnUuid + |
| ", does not match " + getDatanodeUuid() + " from other" + |
| " StorageDirectory."); |
| } |
| } |
| } |
| |
| @Override |
| public boolean isPreUpgradableLayout(StorageDirectory sd) throws IOException { |
| File oldF = new File(sd.getRoot(), "storage"); |
| if (!oldF.exists()) |
| return false; |
| // check the layout version inside the storage file |
| // Lock and Read old storage file |
| RandomAccessFile oldFile = new RandomAccessFile(oldF, "rws"); |
| FileLock oldLock = oldFile.getChannel().tryLock(); |
| try { |
| oldFile.seek(0); |
| int oldVersion = oldFile.readInt(); |
| if (oldVersion < LAST_PRE_UPGRADE_LAYOUT_VERSION) |
| return false; |
| } finally { |
| oldLock.release(); |
| oldFile.close(); |
| } |
| return true; |
| } |
| |
| /** Read VERSION file for rollback */ |
| void readProperties(StorageDirectory sd, int rollbackLayoutVersion) |
| throws IOException { |
| Properties props = readPropertiesFile(sd.getVersionFile()); |
| setFieldsFromProperties(props, sd, true, rollbackLayoutVersion); |
| } |
| |
| /** |
| * Analize which and whether a transition of the fs state is required |
| * and perform it if necessary. |
| * |
| * Rollback if the rollback startup option was specified. |
| * Upgrade if this.LV > LAYOUT_VERSION |
| * Regular startup if this.LV = LAYOUT_VERSION |
| * |
| * @param datanode Datanode to which this storage belongs to |
| * @param sd storage directory |
| * @param nsInfo namespace info |
| * @param startOpt startup option |
| * @throws IOException |
| */ |
| private void doTransition( DataNode datanode, |
| StorageDirectory sd, |
| NamespaceInfo nsInfo, |
| StartupOption startOpt |
| ) throws IOException { |
| if (startOpt == StartupOption.ROLLBACK) { |
| doRollback(sd, nsInfo); // rollback if applicable |
| } |
| readProperties(sd); |
| checkVersionUpgradable(this.layoutVersion); |
| assert this.layoutVersion >= HdfsConstants.DATANODE_LAYOUT_VERSION : |
| "Future version is not allowed"; |
| |
| boolean federationSupported = |
| DataNodeLayoutVersion.supports( |
| LayoutVersion.Feature.FEDERATION, layoutVersion); |
| // For pre-federation version - validate the namespaceID |
| if (!federationSupported && |
| getNamespaceID() != nsInfo.getNamespaceID()) { |
| throw new IOException("Incompatible namespaceIDs in " |
| + sd.getRoot().getCanonicalPath() + ": namenode namespaceID = " |
| + nsInfo.getNamespaceID() + "; datanode namespaceID = " |
| + getNamespaceID()); |
| } |
| |
| // For version that supports federation, validate clusterID |
| if (federationSupported |
| && !getClusterID().equals(nsInfo.getClusterID())) { |
| throw new IOException("Incompatible clusterIDs in " |
| + sd.getRoot().getCanonicalPath() + ": namenode clusterID = " |
| + nsInfo.getClusterID() + "; datanode clusterID = " + getClusterID()); |
| } |
| |
| // After addition of the federation feature, ctime check is only |
| // meaningful at BlockPoolSliceStorage level. |
| |
| // regular start up. |
| if (this.layoutVersion == HdfsConstants.DATANODE_LAYOUT_VERSION) |
| return; // regular startup |
| |
| // do upgrade |
| if (this.layoutVersion > HdfsConstants.DATANODE_LAYOUT_VERSION) { |
| doUpgrade(datanode, sd, nsInfo); // upgrade |
| return; |
| } |
| |
| // layoutVersion < DATANODE_LAYOUT_VERSION. I.e. stored layout version is newer |
| // than the version supported by datanode. This should have been caught |
| // in readProperties(), even if rollback was not carried out or somehow |
| // failed. |
| throw new IOException("BUG: The stored LV = " + this.getLayoutVersion() |
| + " is newer than the supported LV = " |
| + HdfsConstants.DATANODE_LAYOUT_VERSION); |
| } |
| |
| /** |
| * Upgrade -- Move current storage into a backup directory, |
| * and hardlink all its blocks into the new current directory. |
| * |
| * Upgrade from pre-0.22 to 0.22 or later release e.g. 0.19/0.20/ => 0.22/0.23 |
| * <ul> |
| * <li> If <SD>/previous exists then delete it </li> |
| * <li> Rename <SD>/current to <SD>/previous.tmp </li> |
| * <li>Create new <SD>/current/<bpid>/current directory<li> |
| * <ul> |
| * <li> Hard links for block files are created from <SD>/previous.tmp |
| * to <SD>/current/<bpid>/current </li> |
| * <li> Saves new version file in <SD>/current/<bpid>/current directory </li> |
| * </ul> |
| * <li> Rename <SD>/previous.tmp to <SD>/previous </li> |
| * </ul> |
| * |
| * There should be only ONE namenode in the cluster for first |
| * time upgrade to 0.22 |
| * @param sd storage directory |
| * @throws IOException on error |
| */ |
| void doUpgrade(DataNode datanode, StorageDirectory sd, NamespaceInfo nsInfo) |
| throws IOException { |
| // If the existing on-disk layout version supportes federation, simply |
| // update its layout version. |
| if (DataNodeLayoutVersion.supports( |
| LayoutVersion.Feature.FEDERATION, layoutVersion)) { |
| // The VERSION file is already read in. Override the layoutVersion |
| // field and overwrite the file. |
| LOG.info("Updating layout version from " + layoutVersion + " to " |
| + HdfsConstants.DATANODE_LAYOUT_VERSION + " for storage " |
| + sd.getRoot()); |
| layoutVersion = HdfsConstants.DATANODE_LAYOUT_VERSION; |
| writeProperties(sd); |
| return; |
| } |
| |
| LOG.info("Upgrading storage directory " + sd.getRoot() |
| + ".\n old LV = " + this.getLayoutVersion() |
| + "; old CTime = " + this.getCTime() |
| + ".\n new LV = " + HdfsConstants.DATANODE_LAYOUT_VERSION |
| + "; new CTime = " + nsInfo.getCTime()); |
| |
| File curDir = sd.getCurrentDir(); |
| File prevDir = sd.getPreviousDir(); |
| File bbwDir = new File(sd.getRoot(), Storage.STORAGE_1_BBW); |
| |
| assert curDir.exists() : "Data node current directory must exist."; |
| // Cleanup directory "detach" |
| cleanupDetachDir(new File(curDir, STORAGE_DIR_DETACHED)); |
| |
| // 1. delete <SD>/previous dir before upgrading |
| if (prevDir.exists()) |
| deleteDir(prevDir); |
| // get previous.tmp directory, <SD>/previous.tmp |
| File tmpDir = sd.getPreviousTmp(); |
| assert !tmpDir.exists() : |
| "Data node previous.tmp directory must not exist."; |
| |
| // 2. Rename <SD>/current to <SD>/previous.tmp |
| rename(curDir, tmpDir); |
| |
| // 3. Format BP and hard link blocks from previous directory |
| File curBpDir = BlockPoolSliceStorage.getBpRoot(nsInfo.getBlockPoolID(), curDir); |
| BlockPoolSliceStorage bpStorage = new BlockPoolSliceStorage(nsInfo.getNamespaceID(), |
| nsInfo.getBlockPoolID(), nsInfo.getCTime(), nsInfo.getClusterID()); |
| bpStorage.format(curDir, nsInfo); |
| linkAllBlocks(datanode, tmpDir, bbwDir, new File(curBpDir, |
| STORAGE_DIR_CURRENT)); |
| |
| // 4. Write version file under <SD>/current |
| layoutVersion = HdfsConstants.DATANODE_LAYOUT_VERSION; |
| clusterID = nsInfo.getClusterID(); |
| writeProperties(sd); |
| |
| // 5. Rename <SD>/previous.tmp to <SD>/previous |
| rename(tmpDir, prevDir); |
| LOG.info("Upgrade of " + sd.getRoot()+ " is complete"); |
| addBlockPoolStorage(nsInfo.getBlockPoolID(), bpStorage); |
| } |
| |
| /** |
| * Cleanup the detachDir. |
| * |
| * If the directory is not empty report an error; |
| * Otherwise remove the directory. |
| * |
| * @param detachDir detach directory |
| * @throws IOException if the directory is not empty or it can not be removed |
| */ |
| private void cleanupDetachDir(File detachDir) throws IOException { |
| if (!DataNodeLayoutVersion.supports( |
| LayoutVersion.Feature.APPEND_RBW_DIR, layoutVersion) && |
| detachDir.exists() && detachDir.isDirectory() ) { |
| |
| if (FileUtil.list(detachDir).length != 0 ) { |
| throw new IOException("Detached directory " + detachDir + |
| " is not empty. Please manually move each file under this " + |
| "directory to the finalized directory if the finalized " + |
| "directory tree does not have the file."); |
| } else if (!detachDir.delete()) { |
| throw new IOException("Cannot remove directory " + detachDir); |
| } |
| } |
| } |
| |
| /** |
| * Rolling back to a snapshot in previous directory by moving it to current |
| * directory. |
| * Rollback procedure: |
| * <br> |
| * If previous directory exists: |
| * <ol> |
| * <li> Rename current to removed.tmp </li> |
| * <li> Rename previous to current </li> |
| * <li> Remove removed.tmp </li> |
| * </ol> |
| * |
| * If previous directory does not exist and the current version supports |
| * federation, perform a simple rollback of layout version. This does not |
| * involve saving/restoration of actual data. |
| */ |
| void doRollback( StorageDirectory sd, |
| NamespaceInfo nsInfo |
| ) throws IOException { |
| File prevDir = sd.getPreviousDir(); |
| // This is a regular startup or a post-federation rollback |
| if (!prevDir.exists()) { |
| if (DataNodeLayoutVersion.supports(LayoutVersion.Feature.FEDERATION, |
| HdfsConstants.DATANODE_LAYOUT_VERSION)) { |
| readProperties(sd, HdfsConstants.DATANODE_LAYOUT_VERSION); |
| writeProperties(sd); |
| LOG.info("Layout version rolled back to " |
| + HdfsConstants.DATANODE_LAYOUT_VERSION + " for storage " |
| + sd.getRoot()); |
| } |
| return; |
| } |
| DataStorage prevInfo = new DataStorage(); |
| prevInfo.readPreviousVersionProperties(sd); |
| |
| // We allow rollback to a state, which is either consistent with |
| // the namespace state or can be further upgraded to it. |
| if (!(prevInfo.getLayoutVersion() >= HdfsConstants.DATANODE_LAYOUT_VERSION |
| && prevInfo.getCTime() <= nsInfo.getCTime())) // cannot rollback |
| throw new InconsistentFSStateException(sd.getRoot(), |
| "Cannot rollback to a newer state.\nDatanode previous state: LV = " |
| + prevInfo.getLayoutVersion() + " CTime = " + prevInfo.getCTime() |
| + " is newer than the namespace state: LV = " |
| + HdfsConstants.DATANODE_LAYOUT_VERSION + " CTime = " |
| + nsInfo.getCTime()); |
| LOG.info("Rolling back storage directory " + sd.getRoot() |
| + ".\n target LV = " + HdfsConstants.DATANODE_LAYOUT_VERSION |
| + "; target CTime = " + nsInfo.getCTime()); |
| File tmpDir = sd.getRemovedTmp(); |
| assert !tmpDir.exists() : "removed.tmp directory must not exist."; |
| // rename current to tmp |
| File curDir = sd.getCurrentDir(); |
| assert curDir.exists() : "Current directory must exist."; |
| rename(curDir, tmpDir); |
| // rename previous to current |
| rename(prevDir, curDir); |
| // delete tmp dir |
| deleteDir(tmpDir); |
| LOG.info("Rollback of " + sd.getRoot() + " is complete"); |
| } |
| |
| /** |
| * Finalize procedure deletes an existing snapshot. |
| * <ol> |
| * <li>Rename previous to finalized.tmp directory</li> |
| * <li>Fully delete the finalized.tmp directory</li> |
| * </ol> |
| * |
| * Do nothing, if previous directory does not exist |
| */ |
| void doFinalize(StorageDirectory sd) throws IOException { |
| File prevDir = sd.getPreviousDir(); |
| if (!prevDir.exists()) |
| return; // already discarded |
| |
| final String dataDirPath = sd.getRoot().getCanonicalPath(); |
| LOG.info("Finalizing upgrade for storage directory " |
| + dataDirPath |
| + ".\n cur LV = " + this.getLayoutVersion() |
| + "; cur CTime = " + this.getCTime()); |
| assert sd.getCurrentDir().exists() : "Current directory must exist."; |
| final File tmpDir = sd.getFinalizedTmp();//finalized.tmp directory |
| final File bbwDir = new File(sd.getRoot(), Storage.STORAGE_1_BBW); |
| // 1. rename previous to finalized.tmp |
| rename(prevDir, tmpDir); |
| |
| // 2. delete finalized.tmp dir in a separate thread |
| // Also delete the blocksBeingWritten from HDFS 1.x and earlier, if |
| // it exists. |
| new Daemon(new Runnable() { |
| @Override |
| public void run() { |
| try { |
| deleteDir(tmpDir); |
| if (bbwDir.exists()) { |
| deleteDir(bbwDir); |
| } |
| } catch(IOException ex) { |
| LOG.error("Finalize upgrade for " + dataDirPath + " failed", ex); |
| } |
| LOG.info("Finalize upgrade for " + dataDirPath + " is complete"); |
| } |
| @Override |
| public String toString() { return "Finalize " + dataDirPath; } |
| }).start(); |
| } |
| |
| |
| /* |
| * Finalize the upgrade for a block pool |
| * This also empties trash created during rolling upgrade and disables |
| * trash functionality. |
| */ |
| void finalizeUpgrade(String bpID) throws IOException { |
| // To handle finalizing a snapshot taken at datanode level while |
| // upgrading to federation, if datanode level snapshot previous exists, |
| // then finalize it. Else finalize the corresponding BP. |
| for (StorageDirectory sd : storageDirs) { |
| File prevDir = sd.getPreviousDir(); |
| if (prevDir.exists()) { |
| // data node level storage finalize |
| doFinalize(sd); |
| } else { |
| // block pool storage finalize using specific bpID |
| BlockPoolSliceStorage bpStorage = bpStorageMap.get(bpID); |
| bpStorage.doFinalize(sd.getCurrentDir()); |
| } |
| } |
| } |
| |
| /** |
| * Hardlink all finalized and RBW blocks in fromDir to toDir |
| * |
| * @param fromDir The directory where the 'from' snapshot is stored |
| * @param fromBbwDir In HDFS 1.x, the directory where blocks |
| * that are under construction are stored. |
| * @param toDir The current data directory |
| * |
| * @throws IOException If error occurs during hardlink |
| */ |
| private void linkAllBlocks(DataNode datanode, File fromDir, File fromBbwDir, |
| File toDir) throws IOException { |
| HardLink hardLink = new HardLink(); |
| // do the link |
| int diskLayoutVersion = this.getLayoutVersion(); |
| if (DataNodeLayoutVersion.supports( |
| LayoutVersion.Feature.APPEND_RBW_DIR, diskLayoutVersion)) { |
| // hardlink finalized blocks in tmpDir/finalized |
| linkBlocks(datanode, new File(fromDir, STORAGE_DIR_FINALIZED), |
| new File(toDir, STORAGE_DIR_FINALIZED), diskLayoutVersion, hardLink); |
| // hardlink rbw blocks in tmpDir/rbw |
| linkBlocks(datanode, new File(fromDir, STORAGE_DIR_RBW), |
| new File(toDir, STORAGE_DIR_RBW), diskLayoutVersion, hardLink); |
| } else { // pre-RBW version |
| // hardlink finalized blocks in tmpDir |
| linkBlocks(datanode, fromDir, new File(toDir, STORAGE_DIR_FINALIZED), |
| diskLayoutVersion, hardLink); |
| if (fromBbwDir.exists()) { |
| /* |
| * We need to put the 'blocksBeingWritten' from HDFS 1.x into the rbw |
| * directory. It's a little messy, because the blocksBeingWriten was |
| * NOT underneath the 'current' directory in those releases. See |
| * HDFS-3731 for details. |
| */ |
| linkBlocks(datanode, fromBbwDir, |
| new File(toDir, STORAGE_DIR_RBW), diskLayoutVersion, hardLink); |
| } |
| } |
| LOG.info( hardLink.linkStats.report() ); |
| } |
| |
| private static class LinkArgs { |
| public File src; |
| public File dst; |
| |
| public LinkArgs(File src, File dst) { |
| this.src = src; |
| this.dst = dst; |
| } |
| } |
| |
| static void linkBlocks(DataNode datanode, File from, File to, int oldLV, |
| HardLink hl) throws IOException { |
| boolean upgradeToIdBasedLayout = false; |
| // If we are upgrading from a version older than the one where we introduced |
| // block ID-based layout AND we're working with the finalized directory, |
| // we'll need to upgrade from the old flat layout to the block ID-based one |
| if (oldLV > DataNodeLayoutVersion.Feature.BLOCKID_BASED_LAYOUT.getInfo(). |
| getLayoutVersion() && to.getName().equals(STORAGE_DIR_FINALIZED)) { |
| upgradeToIdBasedLayout = true; |
| } |
| |
| final List<LinkArgs> idBasedLayoutSingleLinks = Lists.newArrayList(); |
| linkBlocksHelper(from, to, oldLV, hl, upgradeToIdBasedLayout, to, |
| idBasedLayoutSingleLinks); |
| int numLinkWorkers = datanode.getConf().getInt( |
| DFSConfigKeys.DFS_DATANODE_BLOCK_ID_LAYOUT_UPGRADE_THREADS_KEY, |
| DFSConfigKeys.DFS_DATANODE_BLOCK_ID_LAYOUT_UPGRADE_THREADS); |
| ExecutorService linkWorkers = Executors.newFixedThreadPool(numLinkWorkers); |
| final int step = idBasedLayoutSingleLinks.size() / numLinkWorkers + 1; |
| List<Future<Void>> futures = Lists.newArrayList(); |
| for (int i = 0; i < idBasedLayoutSingleLinks.size(); i += step) { |
| final int iCopy = i; |
| futures.add(linkWorkers.submit(new Callable<Void>() { |
| @Override |
| public Void call() throws IOException { |
| int upperBound = Math.min(iCopy + step, |
| idBasedLayoutSingleLinks.size()); |
| for (int j = iCopy; j < upperBound; j++) { |
| LinkArgs cur = idBasedLayoutSingleLinks.get(j); |
| NativeIO.link(cur.src, cur.dst); |
| } |
| return null; |
| } |
| })); |
| } |
| linkWorkers.shutdown(); |
| for (Future<Void> f : futures) { |
| Futures.get(f, IOException.class); |
| } |
| } |
| |
| static void linkBlocksHelper(File from, File to, int oldLV, HardLink hl, |
| boolean upgradeToIdBasedLayout, File blockRoot, |
| List<LinkArgs> idBasedLayoutSingleLinks) throws IOException { |
| if (!from.exists()) { |
| return; |
| } |
| if (!from.isDirectory()) { |
| if (from.getName().startsWith(COPY_FILE_PREFIX)) { |
| FileInputStream in = new FileInputStream(from); |
| try { |
| FileOutputStream out = new FileOutputStream(to); |
| try { |
| IOUtils.copyBytes(in, out, 16*1024); |
| hl.linkStats.countPhysicalFileCopies++; |
| } finally { |
| out.close(); |
| } |
| } finally { |
| in.close(); |
| } |
| } else { |
| HardLink.createHardLink(from, to); |
| hl.linkStats.countSingleLinks++; |
| } |
| return; |
| } |
| // from is a directory |
| hl.linkStats.countDirs++; |
| |
| String[] blockNames = from.list(new java.io.FilenameFilter() { |
| @Override |
| public boolean accept(File dir, String name) { |
| return name.startsWith(BLOCK_FILE_PREFIX); |
| } |
| }); |
| |
| // If we are upgrading to block ID-based layout, we don't want to recreate |
| // any subdirs from the source that contain blocks, since we have a new |
| // directory structure |
| if (!upgradeToIdBasedLayout || !to.getName().startsWith( |
| BLOCK_SUBDIR_PREFIX)) { |
| if (!to.mkdirs()) |
| throw new IOException("Cannot create directory " + to); |
| } |
| |
| // Block files just need hard links with the same file names |
| // but a different directory |
| if (blockNames.length > 0) { |
| if (upgradeToIdBasedLayout) { |
| for (String blockName : blockNames) { |
| long blockId = Block.getBlockId(blockName); |
| File blockLocation = DatanodeUtil.idToBlockDir(blockRoot, blockId); |
| if (!blockLocation.exists()) { |
| if (!blockLocation.mkdirs()) { |
| throw new IOException("Failed to mkdirs " + blockLocation); |
| } |
| } |
| idBasedLayoutSingleLinks.add(new LinkArgs(new File(from, blockName), |
| new File(blockLocation, blockName))); |
| hl.linkStats.countSingleLinks++; |
| } |
| } else { |
| HardLink.createHardLinkMult(from, blockNames, to); |
| hl.linkStats.countMultLinks++; |
| hl.linkStats.countFilesMultLinks += blockNames.length; |
| } |
| } else { |
| hl.linkStats.countEmptyDirs++; |
| } |
| |
| // Now take care of the rest of the files and subdirectories |
| String[] otherNames = from.list(new java.io.FilenameFilter() { |
| @Override |
| public boolean accept(File dir, String name) { |
| return name.startsWith(BLOCK_SUBDIR_PREFIX) |
| || name.startsWith(COPY_FILE_PREFIX); |
| } |
| }); |
| for(int i = 0; i < otherNames.length; i++) |
| linkBlocksHelper(new File(from, otherNames[i]), |
| new File(to, otherNames[i]), oldLV, hl, upgradeToIdBasedLayout, |
| blockRoot, idBasedLayoutSingleLinks); |
| } |
| |
| /** |
| * Add bpStorage into bpStorageMap |
| */ |
| private void addBlockPoolStorage(String bpID, BlockPoolSliceStorage bpStorage |
| ) { |
| if (!this.bpStorageMap.containsKey(bpID)) { |
| this.bpStorageMap.put(bpID, bpStorage); |
| } |
| } |
| |
| synchronized void removeBlockPoolStorage(String bpId) { |
| bpStorageMap.remove(bpId); |
| } |
| } |