blob: 36483e44b416a0efc8752257c284390cc5eafb6c [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.mapred;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.EnumSet;
import java.util.HashMap;
import java.util.List;
import java.util.Set;
import java.util.Map.Entry;
import javax.security.auth.login.LoginException;
import junit.extensions.TestSetup;
import junit.framework.Test;
import junit.framework.TestCase;
import junit.framework.TestSuite;
import org.apache.hadoop.mapred.ClusterStatus.BlackListInfo;
import org.apache.hadoop.mapred.JobTracker.ReasonForBlackListing;
import org.apache.hadoop.mapred.TaskTrackerStatus.TaskTrackerHealthStatus;
import org.apache.hadoop.mapreduce.TaskType;
import org.apache.hadoop.mapreduce.server.jobtracker.JTConfig;
import org.apache.hadoop.mapreduce.server.jobtracker.TaskTracker;
public class TestTaskTrackerBlacklisting extends TestCase {
static String trackers[] = new String[] { "tracker_tracker1:1000",
"tracker_tracker2:1000", "tracker_tracker3:1000" };
static String hosts[] = new String[] { "tracker1", "tracker2", "tracker3" };
private static FakeJobTracker jobTracker;
private static FakeJobTrackerClock clock;
private static short responseId;
private static final Set<ReasonForBlackListing> nodeUnHealthyReasonSet =
EnumSet.of(ReasonForBlackListing.NODE_UNHEALTHY);
private static final Set<ReasonForBlackListing> exceedsFailuresReasonSet =
EnumSet.of(ReasonForBlackListing.EXCEEDING_FAILURES);
private static final Set<ReasonForBlackListing>
unhealthyAndExceedsFailure = EnumSet.of(
ReasonForBlackListing.NODE_UNHEALTHY,
ReasonForBlackListing.EXCEEDING_FAILURES);
// Add extra millisecond where timer granularity is too coarse
private static final long aDay = 24 * 60 * 60 * 1000 + 1;
private static class FakeJobTrackerClock extends Clock {
boolean jumpADay = false;
@Override
long getTime() {
if (!jumpADay) {
return super.getTime();
} else {
long now = super.getTime();
return now + aDay;
}
}
}
static class FakeJobTracker extends
org.apache.hadoop.mapred.FakeObjectUtilities.FakeJobTracker {
FakeJobTracker(JobConf conf, Clock clock, String[] tts) throws IOException,
InterruptedException, LoginException {
super(conf, clock, tts);
}
@Override
synchronized void finalizeJob(JobInProgress job) {
List<String> blackListedTrackers = job.getBlackListedTrackers();
for (String tracker : blackListedTrackers) {
incrementFaults(tracker);
}
}
}
static class FakeJobInProgress extends
org.apache.hadoop.mapred.FakeObjectUtilities.FakeJobInProgress {
HashMap<String, Integer> trackerToFailureMap;
FakeJobInProgress(JobConf jobConf, JobTracker tracker) throws IOException {
super(jobConf, tracker);
// initObjects(tracker, numMaps, numReduces);
trackerToFailureMap = new HashMap<String, Integer>();
}
public void failTask(TaskAttemptID taskId) {
super.failTask(taskId);
TaskInProgress tip = jobtracker.taskidToTIPMap.get(taskId);
addFailuresToTrackers(tip.machineWhereTaskRan(taskId));
}
public void addFailuresToTrackers(String trackerName) {
Integer numOfFailures = trackerToFailureMap.get(trackerName);
if (numOfFailures == null) {
numOfFailures = 0;
}
trackerToFailureMap.put(trackerName, numOfFailures + 1);
}
public List<String> getBlackListedTrackers() {
ArrayList<String> blackListedTrackers = new ArrayList<String>();
for (Entry<String, Integer> entry : trackerToFailureMap.entrySet()) {
Integer failures = entry.getValue();
String tracker = entry.getKey();
if (failures.intValue() >= this.getJobConf()
.getMaxTaskFailuresPerTracker()) {
blackListedTrackers.add(JobInProgress
.convertTrackerNameToHostName(tracker));
}
}
return blackListedTrackers;
}
}
public static Test suite() {
TestSetup setup =
new TestSetup(new TestSuite(TestTaskTrackerBlacklisting.class)) {
protected void setUp() throws Exception {
JobConf conf = new JobConf();
conf.set(JTConfig.JT_IPC_ADDRESS, "localhost:0");
conf.set(JTConfig.JT_HTTP_ADDRESS, "0.0.0.0:0");
conf.setInt(JTConfig.JT_MAX_TRACKER_BLACKLISTS, 1);
jobTracker =
new FakeJobTracker(conf, (clock = new FakeJobTrackerClock()),
trackers);
sendHeartBeat(null, true);
}
protected void tearDown() throws Exception {
//delete the build/test/logs/ dir
}
};
return setup;
}
private static void sendHeartBeat(TaskTrackerHealthStatus status,
boolean initialContact)
throws IOException {
for (String tracker : trackers) {
TaskTrackerStatus tts = new TaskTrackerStatus(tracker, JobInProgress
.convertTrackerNameToHostName(tracker));
if (status != null) {
TaskTrackerHealthStatus healthStatus = tts.getHealthStatus();
healthStatus.setNodeHealthy(status.isNodeHealthy());
healthStatus.setHealthReport(status.getHealthReport());
healthStatus.setLastReported(status.getLastReported());
}
jobTracker.heartbeat(tts, false, initialContact,
false, responseId);
}
responseId++;
}
public void testTrackerBlacklistingForJobFailures() throws Exception {
runBlackListingJob(jobTracker, trackers);
assertEquals("Tracker 1 not blacklisted", jobTracker
.getBlacklistedTrackerCount(), 1);
checkReasonForBlackListing(hosts[0], exceedsFailuresReasonSet);
clock.jumpADay = true;
sendHeartBeat(null, false);
assertEquals("Tracker 1 still blacklisted after a day", 0, jobTracker
.getBlacklistedTrackerCount());
//Cleanup the blacklisted trackers.
//Tracker is black listed due to failure count, so clock has to be
//forwarded by a day.
clock.jumpADay = false;
}
public void testNodeHealthBlackListing() throws Exception {
TaskTrackerHealthStatus status = getUnhealthyNodeStatus("ERROR");
//Blacklist tracker due to node health failures.
sendHeartBeat(status, false);
for (String host : hosts) {
checkReasonForBlackListing(host, nodeUnHealthyReasonSet);
}
status.setNodeHealthy(true);
status.setLastReported(System.currentTimeMillis());
status.setHealthReport("");
//white list tracker so the further test cases can be
//using trackers.
sendHeartBeat(status, false);
assertEquals("Trackers still blacklisted after healthy report", 0,
jobTracker.getBlacklistedTrackerCount());
}
/**
* Test case to check if the task tracker node health failure statistics
* is populated correctly.
*
* We check the since start property and assume that other properties would
* be populated in a correct manner.
*/
public void testTaskTrackerNodeHealthFailureStatistics() throws Exception {
//populate previous failure count, as the job tracker is bought up only
//once in setup of test cases to run all node health blacklist stuff.
int failureCount = getFailureCountSinceStart(jobTracker, trackers[0]);
sendHeartBeat(null, false);
for(String tracker: trackers) {
assertEquals("Failure count updated wrongly for tracker : " + tracker,
failureCount, getFailureCountSinceStart(jobTracker, tracker));
}
TaskTrackerHealthStatus status = getUnhealthyNodeStatus("ERROR");
sendHeartBeat(status, false);
//When the node fails due to health check, the statistics is
//incremented.
failureCount++;
for(String tracker: trackers) {
assertEquals("Failure count updated wrongly for tracker : " + tracker,
failureCount, getFailureCountSinceStart(jobTracker, tracker));
}
//even if the node reports unhealthy in next status update we dont
//increment it. We increment the statistics if the node goes back to
//healthy and then becomes unhealthy.
sendHeartBeat(status, false);
for(String tracker: trackers) {
assertEquals("Failure count updated wrongly for tracker : " + tracker,
failureCount, getFailureCountSinceStart(jobTracker, tracker));
}
//make nodes all healthy, but the failure statistics should be
//carried forward.
sendHeartBeat(null, false);
for(String tracker: trackers) {
assertEquals("Failure count updated wrongly for tracker : " + tracker,
failureCount, getFailureCountSinceStart(jobTracker, tracker));
}
}
private int getFailureCountSinceStart(JobTracker jt, String tracker) {
JobTrackerStatistics jtStats = jt.getStatistics();
StatisticsCollector collector = jtStats.collector;
collector.update();
return jtStats.getTaskTrackerStat(tracker).healthCheckFailedStat
.getValues().get(StatisticsCollector.SINCE_START).getValue();
}
public void testBlackListingWithFailuresAndHealthStatus() throws Exception {
runBlackListingJob(jobTracker, trackers);
assertEquals("Tracker 1 not blacklisted", 1,
jobTracker.getBlacklistedTrackerCount());
checkReasonForBlackListing(hosts[0], exceedsFailuresReasonSet);
TaskTrackerHealthStatus status = getUnhealthyNodeStatus("ERROR");
sendHeartBeat(status, false);
assertEquals("All trackers not blacklisted", 3,
jobTracker.getBlacklistedTrackerCount());
checkReasonForBlackListing(hosts[0], unhealthyAndExceedsFailure);
checkReasonForBlackListing(hosts[1], nodeUnHealthyReasonSet);
checkReasonForBlackListing(hosts[2], nodeUnHealthyReasonSet);
clock.jumpADay = true;
sendHeartBeat(status, false);
assertEquals("All trackers not blacklisted", 3,
jobTracker.getBlacklistedTrackerCount());
for (String host : hosts) {
checkReasonForBlackListing(host, nodeUnHealthyReasonSet);
}
//clear blacklisted trackers due to node health reasons.
sendHeartBeat(null, false);
assertEquals("All trackers not white listed", 0,
jobTracker.getBlacklistedTrackerCount());
//Clear the blacklisted trackers due to failures.
clock.jumpADay = false;
}
public void testBlacklistingReasonString() throws Exception {
String error = "ERROR";
String error1 = "ERROR1";
TaskTrackerHealthStatus status = getUnhealthyNodeStatus(error);
sendHeartBeat(status, false);
assertEquals("All trackers not blacklisted", 3,
jobTracker.getBlacklistedTrackerCount());
checkReasonForBlackListing(hosts[0], nodeUnHealthyReasonSet);
checkReasonForBlackListing(hosts[1], nodeUnHealthyReasonSet);
checkReasonForBlackListing(hosts[2], nodeUnHealthyReasonSet);
for (int i = 0; i < hosts.length; i++) {
//Replace new line as we are adding new line
//in getFaultReport
assertEquals("Blacklisting reason string not correct for host " + i,
error,
jobTracker.getFaultReport(hosts[i]).replace("\n", ""));
}
status.setNodeHealthy(false);
status.setLastReported(System.currentTimeMillis());
status.setHealthReport(error1);
sendHeartBeat(status, false);
checkReasonForBlackListing(hosts[0], nodeUnHealthyReasonSet);
checkReasonForBlackListing(hosts[1], nodeUnHealthyReasonSet);
checkReasonForBlackListing(hosts[2], nodeUnHealthyReasonSet);
for (int i = 0; i < hosts.length; i++) {
//Replace new line as we are adding new line
//in getFaultReport
assertEquals("Blacklisting reason string not correct for host " + i,
error1,
jobTracker.getFaultReport(hosts[i]).replace("\n", ""));
}
//clear the blacklisted trackers with node health reasons.
sendHeartBeat(null, false);
}
private TaskTrackerHealthStatus getUnhealthyNodeStatus(String error) {
TaskTrackerHealthStatus status = new TaskTrackerHealthStatus();
status.setNodeHealthy(false);
status.setLastReported(System.currentTimeMillis());
status.setHealthReport(error);
return status;
}
public void testBlackListingWithTrackerReservation() throws Exception {
JobConf conf = new JobConf();
conf.setNumMapTasks(1);
conf.setNumReduceTasks(1);
FakeJobInProgress job = new FakeJobInProgress(conf, jobTracker);
TaskTracker tt1 = jobTracker.getTaskTracker(trackers[0]);
TaskTracker tt2 = jobTracker.getTaskTracker(trackers[1]);
tt1.reserveSlots(TaskType.MAP, job, 1);
tt1.reserveSlots(TaskType.REDUCE, job, 1);
tt2.reserveSlots(TaskType.MAP, job, 1);
tt2.reserveSlots(TaskType.REDUCE, job, 1);
assertEquals("Tracker 1 not reserved for the job 1", 2, job
.getNumReservedTaskTrackersForMaps());
assertEquals("Tracker 1 not reserved for the job 1", 2, job
.getNumReservedTaskTrackersForReduces());
runBlackListingJob(jobTracker, trackers);
assertEquals("Tracker 1 not unreserved for the job 1", 1, job
.getNumReservedTaskTrackersForMaps());
assertEquals("Tracker 1 not unreserved for the job 1", 1, job
.getNumReservedTaskTrackersForReduces());
assertEquals("Tracker 1 not blacklisted", 1, jobTracker
.getBlacklistedTrackerCount());
checkReasonForBlackListing(hosts[0], exceedsFailuresReasonSet);
TaskTrackerHealthStatus status = getUnhealthyNodeStatus("ERROR");
sendHeartBeat(status, false);
assertEquals("All trackers not blacklisted", 3,
jobTracker.getBlacklistedTrackerCount());
checkReasonForBlackListing(hosts[0], unhealthyAndExceedsFailure);
checkReasonForBlackListing(hosts[1], nodeUnHealthyReasonSet);
checkReasonForBlackListing(hosts[2], nodeUnHealthyReasonSet);
assertEquals("Tracker 1 not unreserved for the job 1", 0, job
.getNumReservedTaskTrackersForMaps());
assertEquals("Tracker 1 not unreserved for the job 1", 0, job
.getNumReservedTaskTrackersForReduces());
//white list all trackers for health reasons and failure counts
clock.jumpADay = true;
sendHeartBeat(null, false);
}
/**
* Test case to test if the cluster status is populated with the right
* blacklist information, which would be used by the {@link JobClient} to
* display information on the Command Line interface.
*
*/
public void testClusterStatusBlacklistedReason() throws Exception {
String error = "ERROR";
String errorWithNewLines = "ERROR\nERROR";
String expectedErrorReport = "ERROR:ERROR";
// Create an unhealthy tracker health status.
Collection<BlackListInfo> blackListedTrackerInfo = jobTracker
.getBlackListedTrackers();
assertTrue("The blacklisted tracker nodes is not empty.",
blackListedTrackerInfo.isEmpty());
TaskTrackerHealthStatus status = getUnhealthyNodeStatus(errorWithNewLines);
// make all tracker unhealthy
sendHeartBeat(status, false);
assertEquals("All trackers not blacklisted", 3, jobTracker
.getBlacklistedTrackerCount());
// Verify the new method .getBlackListedTracker() which is
// used by the ClusterStatus to set the list of blacklisted
// tracker.
blackListedTrackerInfo = jobTracker.getBlackListedTrackers();
// Check if all the black listed tracker information is obtained
// in new method.
assertEquals("Blacklist tracker info does not contain all trackers", 3,
blackListedTrackerInfo.size());
// verify all the trackers are blacklisted for health reasons.
// Also check the health report.
for (BlackListInfo bi : blackListedTrackerInfo) {
assertEquals("Tracker not blacklisted for health reason",
ReasonForBlackListing.NODE_UNHEALTHY.toString().trim(), bi
.getReasonForBlackListing().trim());
assertTrue("Tracker blacklist report does not match",
bi.toString().endsWith(expectedErrorReport));
}
// reset the tracker health status back to normal.
sendHeartBeat(null, false);
runBlackListingJob(jobTracker, trackers);
sendHeartBeat(status, false);
blackListedTrackerInfo = jobTracker.getBlackListedTrackers();
for (BlackListInfo bi : blackListedTrackerInfo) {
if (bi.getTrackerName().equals(trackers[0])) {
assertTrue(
"Reason for blacklisting of tracker 1 does not contain Unhealthy reasons",
bi.getReasonForBlackListing().contains(
ReasonForBlackListing.NODE_UNHEALTHY.toString().trim()));
assertTrue(
"Reason for blacklisting of tracker 1 does not contain Unhealthy reasons",
bi.getReasonForBlackListing().contains(
ReasonForBlackListing.EXCEEDING_FAILURES.toString().trim()));
assertTrue("Blacklist failure does not contain failure report string",
bi.getBlackListReport().contains("failures on the tracker"));
} else {
assertEquals("Tracker not blacklisted for health reason",
ReasonForBlackListing.NODE_UNHEALTHY.toString().trim(), bi
.getReasonForBlackListing().trim());
}
assertTrue("Tracker blacklist report does not match", bi
.getBlackListReport().trim().contains(error));
}
clock.jumpADay = true;
sendHeartBeat(null, false);
}
/**
* Runs a job which blacklists the first of the tracker
* which is passed to the method.
*
* @param jobTracker JobTracker instance
* @param trackers array of trackers, the method would blacklist
* first element of the array
* @return A job in progress object.
* @throws Exception
*/
static FakeJobInProgress runBlackListingJob(JobTracker jobTracker,
String[] trackers) throws Exception {
TaskAttemptID[] taskAttemptID = new TaskAttemptID[3];
JobConf conf = new JobConf();
conf.setSpeculativeExecution(false);
conf.setNumMapTasks(0);
conf.setNumReduceTasks(5);
conf.set(JobContext.REDUCE_FAILURES_MAXPERCENT, ".70");
conf.setBoolean(JobContext.SETUP_CLEANUP_NEEDED, false);
conf.setMaxTaskFailuresPerTracker(1);
FakeJobInProgress job = new FakeJobInProgress(conf, jobTracker);
job.setClusterSize(trackers.length);
job.initTasks();
taskAttemptID[0] = job.findReduceTask(trackers[0]);
taskAttemptID[1] = job.findReduceTask(trackers[1]);
taskAttemptID[2] = job.findReduceTask(trackers[2]);
job.finishTask(taskAttemptID[1]);
job.finishTask(taskAttemptID[2]);
job.failTask(taskAttemptID[0]);
taskAttemptID[0] = job.findReduceTask(trackers[0]);
job.failTask(taskAttemptID[0]);
taskAttemptID[0] = job.findReduceTask(trackers[1]);
job.finishTask(taskAttemptID[0]);
taskAttemptID[0] = job.findReduceTask(trackers[1]);
taskAttemptID[1] = job.findReduceTask(trackers[2]);
job.finishTask(taskAttemptID[0]);
job.finishTask(taskAttemptID[1]);
jobTracker.finalizeJob(job);
return job;
}
private void checkReasonForBlackListing(String host,
Set<ReasonForBlackListing> reasonsForBlackListing) {
Set<ReasonForBlackListing> rfbs = jobTracker.getReasonForBlackList(host);
assertEquals("Reasons for blacklisting of " + host + " does not match",
reasonsForBlackListing, rfbs);
}
}