blob: ef25983ca2603eeff680c4fb0024a129ed209522 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.mapred;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.FileUtil;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.permission.FsPermission;
import org.apache.hadoop.util.StringUtils;
import org.apache.hadoop.metrics2.MetricsSource;
import org.apache.hadoop.metrics2.MetricsRecordBuilder;
import static org.apache.hadoop.test.MetricsAsserts.*;
import java.io.File;
import java.io.IOException;
import java.util.Properties;
/**
* Verify if TaskTracker's in-memory good mapred local dirs list gets updated
* properly when disks fail.
*/
public class TestDiskFailures extends ClusterMapReduceTestCase {
private static final Log LOG = LogFactory.getLog(TestDiskFailures.class);
private static String localPathRoot = System.getProperty(
"test.build.data", "/tmp").replace(' ', '+');
private String DISK_HEALTH_CHECK_INTERVAL = "1000";//1 sec
@Override
protected void setUp() throws Exception {
// Do not start cluster here
};
@Override
protected void tearDown() throws Exception {
super.tearDown();
FileUtil.fullyDelete(new File(localPathRoot));
};
/**
* Make some of the the mapred-local-dirs fail/inaccessible and verify if
* TaskTracker gets reinited properly.
* @throws Exception
*/
public void testDiskFailures() throws Exception {
FileSystem fs = FileSystem.get(new Configuration());
Path dir = new Path(localPathRoot, "mapred_local_dirs_base");
FileSystem.mkdirs(fs, dir, new FsPermission((short)0777));
Properties props = new Properties();
props.setProperty(JobConf.MAPRED_LOCAL_DIR_PROPERTY, dir.toUri().getPath());
// set disk health check interval to a small value (say 4 sec).
props.setProperty(TaskTracker.DISK_HEALTH_CHECK_INTERVAL_PROPERTY,
DISK_HEALTH_CHECK_INTERVAL);
// Let us have 4 mapred-local-dirs per tracker
final int numMapredLocalDirs = 4;
startCluster(true, props, numMapredLocalDirs);
MiniMRCluster cluster = getMRCluster();
TaskTracker tt = cluster.getTaskTrackerRunner(0).getTaskTracker();
MetricsRecordBuilder rb = getMetrics(new TaskTrackerMetricsSource(tt));
String[] localDirs = cluster.getTaskTrackerLocalDirs(0);
assertGauge("failedDirs", 0, rb);
// Make 1 disk fail and verify if TaskTracker gets re-inited or not and
// the good mapred local dirs list gets updated properly in TaskTracker.
prepareDirToFail(localDirs[2]);
String expectedMapredLocalDirs = localDirs[0] + "," + localDirs[1] + ","
+ localDirs[3];
verifyReinitTaskTrackerAfterDiskFailure(expectedMapredLocalDirs, cluster);
rb = getMetrics(new TaskTrackerMetricsSource(tt));
assertGauge("failedDirs", 1, rb);
// Make 2 more disks fail and verify if TaskTracker gets re-inited or not
// and the good mapred local dirs list gets updated properly in TaskTracker.
prepareDirToFail(localDirs[0]);
prepareDirToFail(localDirs[3]);
expectedMapredLocalDirs = localDirs[1];
verifyReinitTaskTrackerAfterDiskFailure(expectedMapredLocalDirs, cluster);
rb = getMetrics(new TaskTrackerMetricsSource(tt));
assertGauge("failedDirs", 3, rb);
// Fail the remaining single disk(i.e. the remaining good mapred-local-dir).
prepareDirToFail(localDirs[1]);
waitForDiskHealthCheck();
assertTrue(
"Tasktracker is not dead even though all mapred local dirs became bad.",
cluster.getTaskTrackerRunner(0).exited);
rb = getMetrics(new TaskTrackerMetricsSource(tt));
assertGauge("failedDirs", 4, rb);
}
/**
* Wait for the TaskTracker to go for the disk-health-check and (possibly)
* reinit.
* DiskHealthCheckInterval is 1 sec. So this wait time should be greater than
* [1 sec + TT_reinit_execution_time]. Let us have this as 4sec.
*/
private void waitForDiskHealthCheck() {
try {
Thread.sleep(4000);
} catch(InterruptedException e) {
LOG.error("Interrupted while waiting for TaskTracker reinit.");
}
}
/**
* Verify if TaskTracker gets reinited properly after disk failure.
* @param expectedMapredLocalDirs expected mapred local dirs
* @param cluster MiniMRCluster in which 1st TaskTracker is supposed to get
* reinited because of disk failure
* @throws IOException
*/
private void verifyReinitTaskTrackerAfterDiskFailure(
String expectedMapredLocalDirs, MiniMRCluster cluster)
throws IOException {
// Wait for the TaskTracker to get reinited. DiskHealthCheckInterval is
// 1 sec. So this wait time should be > [1 sec + TT_reinit_execution_time].
waitForDiskHealthCheck();
String[] updatedLocalDirs = cluster.getTaskTrackerRunner(0)
.getTaskTracker().getJobConf().getLocalDirs();
String seenMapredLocalDirs = StringUtils.arrayToString(updatedLocalDirs);
LOG.info("ExpectedMapredLocalDirs=" + expectedMapredLocalDirs);
assertTrue("TaskTracker could not reinit properly after disk failure.",
expectedMapredLocalDirs.equals(seenMapredLocalDirs));
}
/**
* Prepare directory for a failure. Replace the given directory on the
* local FileSystem with a regular file with the same name.
* This would cause failure of creation of directory in DiskChecker.checkDir()
* with the same name.
* @throws IOException
*/
private void prepareDirToFail(String dir)
throws IOException {
File file = new File(dir);
FileUtil.fullyDelete(file);
file.createNewFile();
}
}