blob: f1d64e8f6665e5beab6dbe05339d2deb325add67 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.mapred;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.Set;
import java.util.HashSet;
import junit.framework.Assert;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.mapred.TaskStatus.Phase;
import org.apache.hadoop.mapred.TaskStatus.State;
import org.apache.hadoop.mapreduce.protocol.ClientProtocol;
import org.apache.hadoop.mapred.FakeJobs;
import org.junit.Test;
public class TestSimulatorJobTracker {
SimulatorTaskTracker taskTracker;
public static final Log LOG = LogFactory
.getLog(TestSimulatorJobTracker.class);
@SuppressWarnings("deprecation")
public JobConf createJobConf() {
JobConf jtConf = new JobConf();
jtConf.set("mapred.job.tracker", "localhost:8012");
jtConf.set("mapred.jobtracker.job.history.block.size", "512");
jtConf.set("mapred.jobtracker.job.history.buffer.size", "512");
jtConf.setLong("mapred.tasktracker.expiry.interval", 5000);
jtConf.setInt("mapred.reduce.copy.backoff", 4);
jtConf.setLong("mapred.job.reuse.jvm.num.tasks", -1);
jtConf.setUser("mumak");
jtConf.set("mapred.system.dir", jtConf.get("hadoop.tmp.dir", "/tmp/hadoop-"
+ jtConf.getUser())
+ "/mapred/system");
System.out.println("Created JobConf");
return jtConf;
}
public static class FakeJobClient {
ClientProtocol jobTracker;
int numMaps;
int numReduces;
public FakeJobClient(ClientProtocol jobTracker, int numMaps,
int numReduces) {
this.jobTracker = jobTracker;
this.numMaps = numMaps;
this.numReduces = numReduces;
}
public void submitNewJob() throws IOException, InterruptedException {
org.apache.hadoop.mapreduce.JobID jobId = jobTracker.getNewJobID();
LOG.info("Obtained from Jobtracker jobid = " + jobId);
FakeJobs job = new FakeJobs("job1", 0, numMaps, numReduces);
SimulatorJobCache.put(org.apache.hadoop.mapred.JobID.downgrade(jobId), job);
jobTracker.submitJob(jobId, "dummy-path", null);
}
}
public static class FakeTaskTracker extends SimulatorTaskTracker {
boolean firstHeartbeat = true;
short responseId = 0;
int now = 0;
FakeTaskTracker(InterTrackerProtocol jobTracker, String taskTrackerName,
String hostName, int maxMapTasks, int maxReduceTasks) {
super(jobTracker, taskTrackerName, hostName, maxMapTasks, maxReduceTasks);
LOG.info("FakeTaskTracker constructor, taskTrackerName="
+ taskTrackerName);
}
private List<TaskStatus> collectAndCloneTaskStatuses() {
ArrayList<TaskStatus> statuses = new ArrayList<TaskStatus>();
Set<TaskAttemptID> mark = new HashSet<TaskAttemptID>();
for (SimulatorTaskInProgress tip : tasks.values()) {
statuses.add((TaskStatus) tip.getTaskStatus().clone());
if (tip.getFinalRunState() == State.SUCCEEDED) {
mark.add(tip.getTaskStatus().getTaskID());
}
}
for (TaskAttemptID taskId : mark) {
tasks.remove(taskId);
}
return statuses;
}
public int sendFakeHeartbeat(int current) throws IOException {
int numLaunchTaskActions = 0;
this.now = current;
List<TaskStatus> taskStatuses = collectAndCloneTaskStatuses();
TaskTrackerStatus taskTrackerStatus = new SimulatorTaskTrackerStatus(
taskTrackerName, hostName, httpPort, taskStatuses, 0, maxMapSlots,
maxReduceSlots, this.now);
// Transmit the heartbeat
HeartbeatResponse response = null;
LOG.debug("sending heartbeat at time = " + this.now + " responseId = "
+ responseId);
response = jobTracker.heartbeat(taskTrackerStatus, false, firstHeartbeat,
true, responseId);
firstHeartbeat = false;
responseId = response.getResponseId();
numLaunchTaskActions = findLaunchTaskActions(response);
return numLaunchTaskActions;
}
int findLaunchTaskActions(HeartbeatResponse response) {
TaskTrackerAction[] actions = response.getActions();
int numLaunchTaskActions = 0;
// HashSet<> numLaunchTaskActions
for (TaskTrackerAction action : actions) {
if (action instanceof SimulatorLaunchTaskAction) {
Task task = ((SimulatorLaunchTaskAction) action).getTask();
numLaunchTaskActions++;
TaskAttemptID taskId = task.getTaskID();
if (tasks.containsKey(taskId)) {
// already have this task..do not need to generate new status
continue;
}
TaskStatus status;
if (task.isMapTask()) {
status = new MapTaskStatus(taskId, 0f, 1, State.RUNNING, "", "",
taskTrackerName, Phase.MAP, new Counters());
} else {
status = new ReduceTaskStatus(taskId, 0f, 1, State.RUNNING, "", "",
taskTrackerName, Phase.SHUFFLE, new Counters());
}
status.setRunState(State.SUCCEEDED);
status.setStartTime(this.now);
SimulatorTaskInProgress tip = new SimulatorTaskInProgress(
(SimulatorLaunchTaskAction) action, status, this.now);
tasks.put(taskId, tip);
}
}
return numLaunchTaskActions;
}
}
@Test
public void testTrackerInteraction() throws IOException, InterruptedException {
LOG.info("Testing Inter Tracker protocols");
int now = 0;
JobConf jtConf = createJobConf();
int NoMaps = 2;
int NoReduces = 10;
// jtConf.set("mapred.jobtracker.taskScheduler",
// DummyTaskScheduler.class.getName());
jtConf.set("fs.default.name", "file:///");
jtConf.set("mapred.jobtracker.taskScheduler", JobQueueTaskScheduler.class
.getName());
SimulatorJobTracker sjobTracker = SimulatorJobTracker.startTracker(jtConf,
0);
System.out.println("Created the SimulatorJobTracker successfully");
sjobTracker.offerService();
FakeJobClient jbc = new FakeJobClient(sjobTracker, NoMaps, NoReduces);
int NoJobs = 1;
for (int i = 0; i < NoJobs; i++) {
jbc.submitNewJob();
}
org.apache.hadoop.mapreduce.JobStatus[] allJobs = sjobTracker.getAllJobs();
Assert.assertTrue("allJobs queue length is " + allJobs.length, allJobs.length >= 1);
for (org.apache.hadoop.mapreduce.JobStatus js : allJobs) {
LOG.info("From JTQueue: job id = " + js.getJobID());
}
FakeTaskTracker fakeTracker = new FakeTaskTracker(sjobTracker,
"tracker_host1.foo.com:localhost/127.0.0.1:9010", "host1.foo.com", 10,
10);
int numLaunchTaskActions = 0;
for (int i = 0; i < NoMaps * 2; ++i) { // we should be able to assign all
// tasks within 2X of NoMaps
// heartbeats
numLaunchTaskActions += fakeTracker.sendFakeHeartbeat(now);
if (numLaunchTaskActions >= NoMaps) {
break;
}
now += 5;
LOG.debug("Number of MapLaunchTasks=" + numLaunchTaskActions + " now = "
+ now);
}
Assert.assertTrue("Failed to launch all maps: " + numLaunchTaskActions,
numLaunchTaskActions >= NoMaps);
// sending the completed status
LOG.info("Sending task completed status");
numLaunchTaskActions += fakeTracker.sendFakeHeartbeat(now);
// now for the reduce tasks
for (int i = 0; i < NoReduces * 2; ++i) { // we should be able to assign all
// tasks within 2X of NoReduces
// heartbeats
if (numLaunchTaskActions >= NoMaps + NoReduces) {
break;
}
numLaunchTaskActions += fakeTracker.sendFakeHeartbeat(now);
now += 5;
LOG.debug("Number of ReduceLaunchTasks=" + numLaunchTaskActions
+ " now = " + now);
}
Assert.assertTrue("Failed to launch all reduces: " + numLaunchTaskActions,
numLaunchTaskActions >= NoMaps + NoReduces);
// sending the reduce completion
numLaunchTaskActions += fakeTracker.sendFakeHeartbeat(now);
}
}