blob: 86cbf31984d47c04c69158f641e907e413f035a9 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.mapred;
import java.io.IOException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.mapred.FakeObjectUtilities.FakeJobInProgress;
import org.apache.hadoop.mapreduce.Cluster;
import org.apache.hadoop.mapreduce.ClusterMetrics;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.TaskType;
import org.apache.hadoop.mapreduce.server.jobtracker.JTConfig;
import org.apache.hadoop.mapreduce.server.jobtracker.TaskTracker;
import junit.extensions.TestSetup;
import junit.framework.Test;
import junit.framework.TestCase;
import junit.framework.TestSuite;
public class TestClusterStatus extends TestCase {
private static String[] trackers = new String[] { "tracker_tracker1:1000",
"tracker_tracker2:1000", "tracker_tracker3:1000" };
private static JobTracker jobTracker;
private static int mapSlotsPerTracker = 4;
private static int reduceSlotsPerTracker = 2;
private static MiniMRCluster mr;
private static FakeJobInProgress fakeJob = null;
private static Cluster cluster;
// heartbeat responseId. increment this after sending a heartbeat
private static short responseId = 1;
public static Test suite() {
TestSetup setup = new TestSetup(new TestSuite(TestClusterStatus.class)) {
protected void setUp() throws Exception {
Configuration conf = new Configuration();
conf.setClass(JTConfig.JT_TASK_SCHEDULER, FakeTaskScheduler.class,
TaskScheduler.class);
mr = new MiniMRCluster(0, "file:///", 1, null, null, new JobConf(conf));
jobTracker = mr.getJobTrackerRunner().getJobTracker();
for (String tracker : trackers) {
FakeObjectUtilities.establishFirstContact(jobTracker, tracker);
}
cluster = new Cluster(mr.createJobConf());
}
protected void tearDown() throws Exception {
cluster.close();
mr.shutdown();
}
};
return setup;
}
/**
* Fake scheduler to test reservations.
*
* The reservations are updated incrementally in each
* heartbeat to pass through the re-reservation logic.
*/
static class FakeTaskScheduler extends JobQueueTaskScheduler {
private Map<TaskTracker, Integer> reservedCounts
= new HashMap<TaskTracker, Integer>();
public FakeTaskScheduler() {
super();
}
public List<Task> assignTasks(TaskTracker tt) {
int currCount = 1;
if (reservedCounts.containsKey(tt)) {
currCount = reservedCounts.get(tt) + 1;
}
reservedCounts.put(tt, currCount);
tt.reserveSlots(TaskType.MAP, fakeJob, currCount);
tt.reserveSlots(TaskType.REDUCE, fakeJob, currCount);
return new ArrayList<Task>();
}
}
private TaskTrackerStatus getTTStatus(String trackerName,
List<TaskStatus> taskStatuses) {
return new TaskTrackerStatus(trackerName,
JobInProgress.convertTrackerNameToHostName(trackerName), 0,
taskStatuses, 0, mapSlotsPerTracker, reduceSlotsPerTracker);
}
public void testClusterMetrics() throws IOException, InterruptedException {
assertEquals("tasktracker count doesn't match", trackers.length,
cluster.getClusterStatus().getTaskTrackerCount());
List<TaskStatus> list = new ArrayList<TaskStatus>();
// create a map task status, which uses 2 slots.
int mapSlotsPerTask = 2;
addMapTaskAttemptToList(list, mapSlotsPerTask, TaskStatus.State.RUNNING);
// create a reduce task status, which uses 1 slot.
int reduceSlotsPerTask = 1;
addReduceTaskAttemptToList(list,
reduceSlotsPerTask, TaskStatus.State.RUNNING);
// create TaskTrackerStatus and send heartbeats
sendHeartbeats(list);
// assert ClusterMetrics
ClusterMetrics metrics = cluster.getClusterStatus();
assertEquals("occupied map slots do not match", mapSlotsPerTask,
metrics.getOccupiedMapSlots());
assertEquals("occupied reduce slots do not match", reduceSlotsPerTask,
metrics.getOccupiedReduceSlots());
assertEquals("map slot capacities do not match",
mapSlotsPerTracker * trackers.length,
metrics.getMapSlotCapacity());
assertEquals("reduce slot capacities do not match",
reduceSlotsPerTracker * trackers.length,
metrics.getReduceSlotCapacity());
assertEquals("running map tasks do not match", 1,
metrics.getRunningMaps());
assertEquals("running reduce tasks do not match", 1,
metrics.getRunningReduces());
// assert the values in ClusterStatus also
assertEquals("running map tasks do not match", 1,
jobTracker.getClusterStatus().getMapTasks());
assertEquals("running reduce tasks do not match", 1,
jobTracker.getClusterStatus().getReduceTasks());
assertEquals("map slot capacities do not match",
mapSlotsPerTracker * trackers.length,
jobTracker.getClusterStatus().getMaxMapTasks());
assertEquals("reduce slot capacities do not match",
reduceSlotsPerTracker * trackers.length,
jobTracker.getClusterStatus().getMaxReduceTasks());
// send a heartbeat finishing only a map and check
// counts are updated.
list.clear();
addMapTaskAttemptToList(list, mapSlotsPerTask, TaskStatus.State.SUCCEEDED);
addReduceTaskAttemptToList(list,
reduceSlotsPerTask, TaskStatus.State.RUNNING);
sendHeartbeats(list);
metrics = jobTracker.getClusterMetrics();
assertEquals(0, metrics.getOccupiedMapSlots());
assertEquals(reduceSlotsPerTask, metrics.getOccupiedReduceSlots());
// send a heartbeat finishing the reduce task also.
list.clear();
addReduceTaskAttemptToList(list,
reduceSlotsPerTask, TaskStatus.State.SUCCEEDED);
sendHeartbeats(list);
metrics = jobTracker.getClusterMetrics();
assertEquals(0, metrics.getOccupiedReduceSlots());
}
private void sendHeartbeats(List<TaskStatus> list) throws IOException {
TaskTrackerStatus[] status = new TaskTrackerStatus[trackers.length];
status[0] = getTTStatus(trackers[0], list);
status[1] = getTTStatus(trackers[1], new ArrayList<TaskStatus>());
status[2] = getTTStatus(trackers[2], new ArrayList<TaskStatus>());
for (int i = 0; i< trackers.length; i++) {
FakeObjectUtilities.sendHeartBeat(jobTracker, status[i], false, false,
trackers[i], responseId);
}
responseId++;
}
private void addReduceTaskAttemptToList(List<TaskStatus> list,
int reduceSlotsPerTask, TaskStatus.State state) {
TaskStatus ts = TaskStatus.createTaskStatus(false,
new TaskAttemptID("jt", 1, TaskType.REDUCE, 0, 0), 0.0f,
reduceSlotsPerTask,
state, "", "", trackers[0],
TaskStatus.Phase.REDUCE, null);
list.add(ts);
}
private void addMapTaskAttemptToList(List<TaskStatus> list,
int mapSlotsPerTask, TaskStatus.State state) {
TaskStatus ts = TaskStatus.createTaskStatus(true,
new TaskAttemptID("jt", 1, TaskType.MAP, 0, 0), 0.0f, mapSlotsPerTask,
state, "", "", trackers[0],
TaskStatus.Phase.MAP, null);
list.add(ts);
}
public void testReservedSlots() throws Exception {
Configuration conf = mr.createJobConf();
conf.setInt(JobContext.NUM_MAPS, 1);
Job job = Job.getInstance(cluster, conf);
job.setNumReduceTasks(1);
job.setSpeculativeExecution(false);
job.setJobSetupCleanupNeeded(false);
//Set task tracker objects for reservation.
TaskTracker tt1 = jobTracker.getTaskTracker(trackers[0]);
TaskTracker tt2 = jobTracker.getTaskTracker(trackers[1]);
TaskTrackerStatus status1 = new TaskTrackerStatus(
trackers[0],JobInProgress.convertTrackerNameToHostName(
trackers[0]),0,new ArrayList<TaskStatus>(), 0, 2, 2);
TaskTrackerStatus status2 = new TaskTrackerStatus(
trackers[1],JobInProgress.convertTrackerNameToHostName(
trackers[1]),0,new ArrayList<TaskStatus>(), 0, 2, 2);
tt1.setStatus(status1);
tt2.setStatus(status2);
fakeJob = new FakeJobInProgress(new JobConf(job.getConfiguration()),
jobTracker);
fakeJob.setClusterSize(3);
fakeJob.initTasks();
FakeObjectUtilities.sendHeartBeat(jobTracker, status1, false,
true, trackers[0], responseId);
FakeObjectUtilities.sendHeartBeat(jobTracker, status2, false,
true, trackers[1], responseId);
responseId++;
ClusterMetrics metrics = cluster.getClusterStatus();
assertEquals("reserved map slots do not match",
2, metrics.getReservedMapSlots());
assertEquals("reserved reduce slots do not match",
2, metrics.getReservedReduceSlots());
// redo to test re-reservations.
FakeObjectUtilities.sendHeartBeat(jobTracker, status1, false,
true, trackers[0], responseId);
FakeObjectUtilities.sendHeartBeat(jobTracker, status2, false,
true, trackers[1], responseId);
responseId++;
metrics = cluster.getClusterStatus();
assertEquals("reserved map slots do not match",
4, metrics.getReservedMapSlots());
assertEquals("reserved reduce slots do not match",
4, metrics.getReservedReduceSlots());
TaskAttemptID mTid = fakeJob.findMapTask(trackers[1]);
TaskAttemptID rTid = fakeJob.findReduceTask(trackers[1]);
fakeJob.finishTask(mTid);
fakeJob.finishTask(rTid);
assertEquals("Job didnt complete successfully complete",
fakeJob.getStatus().getRunState(), JobStatus.SUCCEEDED);
metrics = cluster.getClusterStatus();
assertEquals("reserved map slots do not match",
0, metrics.getReservedMapSlots());
assertEquals("reserved reduce slots do not match",
0, metrics.getReservedReduceSlots());
}
}