blob: ca73a261c424aa05ca4cb71aab250e706ec361ac [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.mapred;
import java.io.File;
import java.io.IOException;
import java.util.HashSet;
import junit.framework.TestCase;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.fs.FileUtil;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.io.WritableComparable;
import org.apache.hadoop.mapred.lib.IdentityReducer;
/**
* Test node blacklisting. This testcase tests
* - node blacklisting along with node refresh
*/
public class TestNodeBlacklisting extends TestCase {
public static final Log LOG = LogFactory.getLog(TestNodeBlacklisting.class);
private static final Path TEST_DIR =
new Path(System.getProperty("test.build.data", "/tmp"), "node-bklisting");
// Mapper that fails once for the first time
static class FailOnceMapper extends MapReduceBase implements
Mapper<WritableComparable, Writable, WritableComparable, Writable> {
private boolean shouldFail = false;
public void map(WritableComparable key, Writable value,
OutputCollector<WritableComparable, Writable> out, Reporter reporter)
throws IOException {
if (shouldFail) {
throw new RuntimeException("failing map");
}
}
@Override
public void configure(JobConf conf) {
TaskAttemptID id = TaskAttemptID.forName(conf.get("mapred.task.id"));
shouldFail = id.getId() == 0 && id.getTaskID().getId() == 0;
}
}
/**
* Check refreshNodes for decommissioning blacklisted nodes.
*/
public void testBlacklistedNodeDecommissioning() throws Exception {
LOG.info("Testing blacklisted node decommissioning");
MiniMRCluster mr = null;
JobTracker jt = null;
try {
// start mini mr
JobConf jtConf = new JobConf();
jtConf.set("mapred.max.tracker.blacklists", "1");
mr = new MiniMRCluster(0, 0, 2, "file:///", 1, null, null, null, jtConf);
jt = mr.getJobTrackerRunner().getJobTracker();
assertEquals("Trackers not up", 2, jt.taskTrackers().size());
// validate the total tracker count
assertEquals("Active tracker count mismatch",
2, jt.getClusterStatus(false).getTaskTrackers());
// validate blacklisted count
assertEquals("Blacklisted tracker count mismatch",
0, jt.getClusterStatus(false).getBlacklistedTrackers());
// run a failing job to blacklist the tracker
JobConf jConf = mr.createJobConf();
jConf.set("mapred.max.tracker.failures", "1");
jConf.setJobName("test-job-fail-once");
jConf.setMapperClass(FailOnceMapper.class);
jConf.setReducerClass(IdentityReducer.class);
jConf.setNumMapTasks(1);
jConf.setNumReduceTasks(0);
RunningJob job =
UtilsForTests.runJob(jConf, new Path(TEST_DIR, "in"),
new Path(TEST_DIR, "out"));
job.waitForCompletion();
// validate the total tracker count
// (graylisted trackers remain active, unlike blacklisted ones)
assertEquals("Active tracker count mismatch",
2, jt.getClusterStatus(false).getTaskTrackers());
// validate graylisted count
assertEquals("Graylisted tracker count mismatch",
1, jt.getClusterStatus(false).getGraylistedTrackers());
// find the graylisted tracker
String trackerName = null;
for (TaskTrackerStatus status : jt.taskTrackers()) {
if (jt.isGraylisted(status.getTrackerName())) {
trackerName = status.getTrackerName();
break;
}
}
// get the hostname
String hostToDecommission =
JobInProgress.convertTrackerNameToHostName(trackerName);
LOG.info("Decommissioning tracker " + hostToDecommission);
// decommission the node
HashSet<String> decom = new HashSet<String>(1);
decom.add(hostToDecommission);
jt.decommissionNodes(decom);
// validate
// check the cluster status and tracker size
assertEquals("Tracker is not lost upon host decommissioning",
1, jt.getClusterStatus(false).getTaskTrackers());
assertEquals("Graylisted tracker count incorrect in cluster status "
+ "after decommissioning",
0, jt.getClusterStatus(false).getGraylistedTrackers());
assertEquals("Tracker is not lost upon host decommissioning",
1, jt.taskTrackers().size());
} finally {
if (mr != null) {
mr.shutdown();
mr = null;
jt = null;
FileUtil.fullyDelete(new File(TEST_DIR.toString()));
}
}
}
}