| /** |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| |
| package org.apache.hadoop.yarn.server; |
| |
| import org.apache.commons.logging.Log; |
| import org.apache.commons.logging.LogFactory; |
| import org.apache.hadoop.conf.Configuration; |
| import org.apache.hadoop.fs.FileContext; |
| import org.apache.hadoop.fs.FileUtil; |
| import org.apache.hadoop.fs.Path; |
| import org.apache.hadoop.fs.UnsupportedFileSystemException; |
| import org.apache.hadoop.security.AccessControlException; |
| import org.apache.hadoop.util.StringUtils; |
| import org.apache.hadoop.yarn.conf.YarnConfiguration; |
| import org.apache.hadoop.yarn.server.MiniYARNCluster; |
| import org.apache.hadoop.yarn.server.nodemanager.LocalDirsHandlerService; |
| import org.apache.hadoop.yarn.server.nodemanager.NodeManager; |
| import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode; |
| |
| import java.io.File; |
| import java.io.FileNotFoundException; |
| import java.io.IOException; |
| import java.util.Iterator; |
| import java.util.List; |
| |
| import org.junit.AfterClass; |
| import org.junit.BeforeClass; |
| import org.junit.Test; |
| |
| import junit.framework.Assert; |
| |
| /** |
| * Verify if NodeManager's in-memory good local dirs list and good log dirs list |
| * get updated properly when disks(nm-local-dirs and nm-log-dirs) fail. Also |
| * verify if the overall health status of the node gets updated properly when |
| * specified percentage of disks fail. |
| */ |
| public class TestDiskFailures { |
| |
| private static final Log LOG = LogFactory.getLog(TestDiskFailures.class); |
| |
| private static final long DISK_HEALTH_CHECK_INTERVAL = 1000;//1 sec |
| |
| private static FileContext localFS = null; |
| private static final File testDir = new File("target", |
| TestDiskFailures.class.getName()).getAbsoluteFile(); |
| private static final File localFSDirBase = new File(testDir, |
| TestDiskFailures.class.getName() + "-localDir"); |
| private static final int numLocalDirs = 4; |
| private static final int numLogDirs = 4; |
| |
| private static MiniYARNCluster yarnCluster; |
| LocalDirsHandlerService dirsHandler; |
| |
| @BeforeClass |
| public static void setup() throws AccessControlException, |
| FileNotFoundException, UnsupportedFileSystemException, IOException { |
| localFS = FileContext.getLocalFSFileContext(); |
| localFS.delete(new Path(localFSDirBase.getAbsolutePath()), true); |
| localFSDirBase.mkdirs(); |
| // Do not start cluster here |
| } |
| |
| @AfterClass |
| public static void teardown() { |
| if (yarnCluster != null) { |
| yarnCluster.stop(); |
| yarnCluster = null; |
| } |
| FileUtil.fullyDelete(localFSDirBase); |
| } |
| |
| /** |
| * Make local-dirs fail/inaccessible and verify if NodeManager can |
| * recognize the disk failures properly and can update the list of |
| * local-dirs accordingly with good disks. Also verify the overall |
| * health status of the node. |
| * @throws IOException |
| */ |
| @Test |
| public void testLocalDirsFailures() throws IOException { |
| testDirsFailures(true); |
| } |
| |
| /** |
| * Make log-dirs fail/inaccessible and verify if NodeManager can |
| * recognize the disk failures properly and can update the list of |
| * log-dirs accordingly with good disks. Also verify the overall health |
| * status of the node. |
| * @throws IOException |
| */ |
| @Test |
| public void testLogDirsFailures() throws IOException { |
| testDirsFailures(false); |
| } |
| |
| /** |
| * Make a local and log directory inaccessible during initialization |
| * and verify those bad directories are recognized and removed from |
| * the list of available local and log directories. |
| * @throws IOException |
| */ |
| @Test |
| public void testDirFailuresOnStartup() throws IOException { |
| Configuration conf = new YarnConfiguration(); |
| String localDir1 = new File(testDir, "localDir1").getPath(); |
| String localDir2 = new File(testDir, "localDir2").getPath(); |
| String logDir1 = new File(testDir, "logDir1").getPath(); |
| String logDir2 = new File(testDir, "logDir2").getPath(); |
| conf.set(YarnConfiguration.NM_LOCAL_DIRS, localDir1 + "," + localDir2); |
| conf.set(YarnConfiguration.NM_LOG_DIRS, logDir1 + "," + logDir2); |
| |
| prepareDirToFail(localDir1); |
| prepareDirToFail(logDir2); |
| |
| LocalDirsHandlerService dirSvc = new LocalDirsHandlerService(); |
| dirSvc.init(conf); |
| List<String> localDirs = dirSvc.getLocalDirs(); |
| Assert.assertEquals(1, localDirs.size()); |
| Assert.assertEquals(localDir2, localDirs.get(0)); |
| List<String> logDirs = dirSvc.getLogDirs(); |
| Assert.assertEquals(1, logDirs.size()); |
| Assert.assertEquals(logDir1, logDirs.get(0)); |
| } |
| |
| private void testDirsFailures(boolean localORLogDirs) throws IOException { |
| String dirType = localORLogDirs ? "local" : "log"; |
| String dirsProperty = localORLogDirs ? YarnConfiguration.NM_LOCAL_DIRS |
| : YarnConfiguration.NM_LOG_DIRS; |
| |
| Configuration conf = new Configuration(); |
| // set disk health check interval to a small value (say 1 sec). |
| conf.setLong(YarnConfiguration.NM_DISK_HEALTH_CHECK_INTERVAL_MS, |
| DISK_HEALTH_CHECK_INTERVAL); |
| |
| // If 2 out of the total 4 local-dirs fail OR if 2 Out of the total 4 |
| // log-dirs fail, then the node's health status should become unhealthy. |
| conf.setFloat(YarnConfiguration.NM_MIN_HEALTHY_DISKS_FRACTION, 0.60F); |
| |
| if (yarnCluster != null) { |
| yarnCluster.stop(); |
| FileUtil.fullyDelete(localFSDirBase); |
| localFSDirBase.mkdirs(); |
| } |
| LOG.info("Starting up YARN cluster"); |
| yarnCluster = new MiniYARNCluster(TestDiskFailures.class.getName(), |
| 1, numLocalDirs, numLogDirs); |
| yarnCluster.init(conf); |
| yarnCluster.start(); |
| |
| NodeManager nm = yarnCluster.getNodeManager(0); |
| LOG.info("Configured nm-" + dirType + "-dirs=" |
| + nm.getConfig().get(dirsProperty)); |
| dirsHandler = nm.getNodeHealthChecker().getDiskHandler(); |
| List<String> list = localORLogDirs ? dirsHandler.getLocalDirs() |
| : dirsHandler.getLogDirs(); |
| String[] dirs = list.toArray(new String[list.size()]); |
| Assert.assertEquals("Number of nm-" + dirType + "-dirs is wrong.", |
| numLocalDirs, dirs.length); |
| String expectedDirs = StringUtils.join(",", list); |
| // validate the health of disks initially |
| verifyDisksHealth(localORLogDirs, expectedDirs, true); |
| |
| // Make 1 nm-local-dir fail and verify if "the nodemanager can identify |
| // the disk failure(s) and can update the list of good nm-local-dirs. |
| prepareDirToFail(dirs[2]); |
| expectedDirs = dirs[0] + "," + dirs[1] + "," |
| + dirs[3]; |
| verifyDisksHealth(localORLogDirs, expectedDirs, true); |
| |
| // Now, make 1 more nm-local-dir/nm-log-dir fail and verify if "the |
| // nodemanager can identify the disk failures and can update the list of |
| // good nm-local-dirs/nm-log-dirs and can update the overall health status |
| // of the node to unhealthy". |
| prepareDirToFail(dirs[0]); |
| expectedDirs = dirs[1] + "," + dirs[3]; |
| verifyDisksHealth(localORLogDirs, expectedDirs, false); |
| |
| // Fail the remaining 2 local-dirs/log-dirs and verify if NM remains with |
| // empty list of local-dirs/log-dirs and the overall health status is |
| // unhealthy. |
| prepareDirToFail(dirs[1]); |
| prepareDirToFail(dirs[3]); |
| expectedDirs = ""; |
| verifyDisksHealth(localORLogDirs, expectedDirs, false); |
| } |
| |
| /** |
| * Wait for the NodeManger to go for the disk-health-check at least once. |
| */ |
| private void waitForDiskHealthCheck() { |
| long lastDisksCheckTime = dirsHandler.getLastDisksCheckTime(); |
| long time = lastDisksCheckTime; |
| for (int i = 0; i < 10 && (time <= lastDisksCheckTime); i++) { |
| try { |
| Thread.sleep(1000); |
| } catch(InterruptedException e) { |
| LOG.error( |
| "Interrupted while waiting for NodeManager's disk health check."); |
| } |
| time = dirsHandler.getLastDisksCheckTime(); |
| } |
| } |
| |
| /** |
| * Verify if the NodeManager could identify disk failures. |
| * @param localORLogDirs <em>true</em> represent nm-local-dirs and <em>false |
| * </em> means nm-log-dirs |
| * @param expectedDirs expected nm-local-dirs/nm-log-dirs as a string |
| * @param isHealthy <em>true</em> if the overall node should be healthy |
| */ |
| private void verifyDisksHealth(boolean localORLogDirs, String expectedDirs, |
| boolean isHealthy) { |
| // Wait for the NodeManager to identify disk failures. |
| waitForDiskHealthCheck(); |
| |
| List<String> list = localORLogDirs ? dirsHandler.getLocalDirs() |
| : dirsHandler.getLogDirs(); |
| String seenDirs = StringUtils.join(",", list); |
| LOG.info("ExpectedDirs=" + expectedDirs); |
| LOG.info("SeenDirs=" + seenDirs); |
| Assert.assertTrue("NodeManager could not identify disk failure.", |
| expectedDirs.equals(seenDirs)); |
| |
| Assert.assertEquals("Node's health in terms of disks is wrong", |
| isHealthy, dirsHandler.areDisksHealthy()); |
| for (int i = 0; i < 10; i++) { |
| Iterator<RMNode> iter = yarnCluster.getResourceManager().getRMContext() |
| .getRMNodes().values().iterator(); |
| if (iter.next().getNodeHealthStatus().getIsNodeHealthy() == isHealthy) { |
| break; |
| } |
| // wait for the node health info to go to RM |
| try { |
| Thread.sleep(1000); |
| } catch(InterruptedException e) { |
| LOG.error("Interrupted while waiting for NM->RM heartbeat."); |
| } |
| } |
| Iterator<RMNode> iter = yarnCluster.getResourceManager().getRMContext() |
| .getRMNodes().values().iterator(); |
| Assert.assertEquals("RM is not updated with the health status of a node", |
| isHealthy, iter.next().getNodeHealthStatus().getIsNodeHealthy()); |
| } |
| |
| /** |
| * Prepare directory for a failure: Replace the given directory on the |
| * local FileSystem with a regular file with the same name. |
| * This would cause failure of creation of directory in DiskChecker.checkDir() |
| * with the same name. |
| * @param dir the directory to be failed |
| * @throws IOException |
| */ |
| private void prepareDirToFail(String dir) throws IOException { |
| File file = new File(dir); |
| FileUtil.fullyDelete(file); |
| file.createNewFile(); |
| LOG.info("Prepared " + dir + " to fail."); |
| } |
| } |