blob: 82e5b56f97e5c3026bd29041318fba07f897cb7f [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.mapred;
import java.io.IOException;
import junit.extensions.TestSetup;
import junit.framework.Test;
import junit.framework.TestCase;
import junit.framework.TestSuite;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.mapred.FakeObjectUtilities.FakeJobHistory;
import org.apache.hadoop.mapred.FakeObjectUtilities.FakeJobTracker;
import org.apache.hadoop.mapred.FakeObjectUtilities.FakeJobTrackerMetricsInst;
import org.apache.hadoop.mapred.UtilsForTests.FakeClock;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.JobCounter;
import org.apache.hadoop.mapreduce.JobSubmissionFiles;
import org.apache.hadoop.mapreduce.server.jobtracker.JTConfig;
import org.apache.hadoop.mapreduce.split.JobSplit;
import org.apache.hadoop.mapreduce.split.JobSplit.TaskSplitMetaInfo;
import org.apache.hadoop.net.DNSToSwitchMapping;
import org.apache.hadoop.net.StaticMapping;
import org.mortbay.log.Log;
/**
* A JUnit test to test configured task limits.
*/
public class TestRackAwareTaskPlacement extends TestCase {
static String trackers[] = new String[] {"tracker_tracker1.r1.com:1000",
"tracker_tracker2.r1.com:1000", "tracker_tracker3.r2.com:1000",
"tracker_tracker4.r3.com:1000"};
static String[] allHosts =
new String[] {"tracker1.r1.com", "tracker2.r1.com", "tracker3.r2.com",
"tracker4.r3.com"};
static String[] allRacks =
new String[] { "/r1", "/r1", "/r2", "/r3"};
static FakeJobTracker jobTracker;
static String jtIdentifier = "test";
private static int jobCounter;
private static FakeJobTrackerMetricsInst fakeInst;
public static Test suite() {
TestSetup setup =
new TestSetup(new TestSuite(TestRackAwareTaskPlacement.class)) {
protected void setUp() throws Exception {
JobConf conf = new JobConf();
conf.set(JTConfig.JT_IPC_ADDRESS, "localhost:0");
conf.set(JTConfig.JT_HTTP_ADDRESS, "0.0.0.0:0");
conf.setClass("topology.node.switch.mapping.impl",
StaticMapping.class, DNSToSwitchMapping.class);
conf.set(JTConfig.JT_INSTRUMENTATION,
FakeJobTrackerMetricsInst.class.getName());
jobTracker = new FakeJobTracker(conf, new FakeClock(), trackers);
fakeInst = (FakeJobTrackerMetricsInst) jobTracker.getInstrumentation();
// Set up the Topology Information
for (int i = 0; i < allHosts.length; i++) {
StaticMapping.addNodeToRack(allHosts[i], allRacks[i]);
}
for (String tracker : trackers) {
FakeObjectUtilities.establishFirstContact(jobTracker, tracker);
}
}
};
return setup;
}
static class MyFakeJobInProgress extends JobInProgress {
static JobID jobid;
int numMaps;
MyFakeJobInProgress(JobConf jc, JobTracker jt) throws IOException {
super((jobid = new JobID(jtIdentifier, jobCounter ++)), jc, jt);
Path jobFile = new Path("Dummy");
this.profile = new JobProfile(jc.getUser(), jobid,
jobFile.toString(), null, jc.getJobName(),
jc.getQueueName());
this.jobHistory = new FakeJobHistory();
}
@Override
public void initTasks() throws IOException {
TaskSplitMetaInfo[] taskSplitMetaInfo = createSplits(jobId);
numMapTasks = taskSplitMetaInfo.length;
createMapTasks(null, taskSplitMetaInfo);
nonRunningMapCache = createCache(taskSplitMetaInfo, maxLevel);
tasksInited.set(true);
this.status.setRunState(JobStatus.RUNNING);
}
@Override
protected TaskSplitMetaInfo [] createSplits(
org.apache.hadoop.mapreduce.JobID jobId) throws IOException {
TaskSplitMetaInfo[] splits = new TaskSplitMetaInfo[numMaps];
// Hand code for now.
// M0,2,3 reside in Host1
// M1 resides in Host3
// M4 resides in Host4
String[] splitHosts0 = new String[] { allHosts[0] };
String[] splitHosts1 = new String[] { allHosts[2] };
String[] splitHosts2 = new String[] { allHosts[3] };
for (int i = 0; i < numMaps; i++) {
if (i == 0 || i == 2 || i == 3) {
splits[i] = new TaskSplitMetaInfo(splitHosts0, 0, 0);
} else if (i == 1) {
splits[i] = new TaskSplitMetaInfo(splitHosts1, 0, 0);
} else if (i == 4) {
splits[i] = new TaskSplitMetaInfo(splitHosts2, 0, 0);
}
}
return splits;
}
}
@SuppressWarnings("deprecation")
public void testTaskPlacement() throws IOException {
JobConf conf = new JobConf();
conf.setNumReduceTasks(0);
conf.setJobName("TestTaskPlacement");
MyFakeJobInProgress jip = new MyFakeJobInProgress(conf, jobTracker);
jip.numMaps = 5;
jip.initTasks();
// Tracker1 should get a rack local
TaskTrackerStatus tts = new TaskTrackerStatus(trackers[1], allHosts[1]);
jip.obtainNewMapTask(tts, 4, 4);
// Tracker0 should get a data local
tts = new TaskTrackerStatus(trackers[0], allHosts[0]);
jip.obtainNewMapTask(tts, 4, 4);
// Tracker2 should get a data local
tts = new TaskTrackerStatus(trackers[2], allHosts[2]);
jip.obtainNewMapTask(tts, 4, 4);
// Tracker0 should get a data local
tts = new TaskTrackerStatus(trackers[0], allHosts[0]);
jip.obtainNewMapTask(tts, 4, 4);
// Tracker1 should not get any locality at all
tts = new TaskTrackerStatus(trackers[1], allHosts[1]);
jip.obtainNewMapTask(tts, 4, 4);
Counters counters = jip.getCounters();
assertEquals("Number of data local maps", 3,
counters.getCounter(JobCounter.DATA_LOCAL_MAPS));
assertEquals("Number of Rack-local maps", 1 ,
counters.getCounter(JobCounter.RACK_LOCAL_MAPS));
assertEquals("Number of Other-local maps", 0,
counters.getCounter(JobCounter.OTHER_LOCAL_MAPS));
// Also verify jobtracker instrumentation
assertEquals("Number of data local maps", 3, fakeInst.numDataLocalMaps);
assertEquals("Number of rack local maps", 1, fakeInst.numRackLocalMaps);
}
}