blob: e4c4f1c3ff8e654ec982054e7cf4b7cbf3f9c89e [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.mapred;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import org.apache.hadoop.mapreduce.Cluster;
import org.apache.hadoop.mapreduce.ClusterMetrics;
import org.apache.hadoop.mapreduce.TaskType;
import junit.extensions.TestSetup;
import junit.framework.Test;
import junit.framework.TestCase;
import junit.framework.TestSuite;
public class TestClusterStatus extends TestCase {
private static String[] trackers = new String[] { "tracker_tracker1:1000",
"tracker_tracker2:1000", "tracker_tracker3:1000" };
private static JobTracker jobTracker;
private static int mapSlotsPerTracker = 4;
private static int reduceSlotsPerTracker = 2;
private static MiniMRCluster mr;
private static Cluster cluster;
// heartbeat responseId. increment this after sending a heartbeat
private static short responseId = 1;
public static Test suite() {
TestSetup setup = new TestSetup(new TestSuite(TestClusterStatus.class)) {
protected void setUp() throws Exception {
mr = new MiniMRCluster(0, "file:///", 1);
jobTracker = mr.getJobTrackerRunner().getJobTracker();
for (String tracker : trackers) {
FakeObjectUtilities.establishFirstContact(jobTracker, tracker);
}
cluster = new Cluster(mr.createJobConf());
}
protected void tearDown() throws Exception {
cluster.close();
mr.shutdown();
}
};
return setup;
}
private TaskTrackerStatus getTTStatus(String trackerName,
List<TaskStatus> taskStatuses) {
return new TaskTrackerStatus(trackerName,
JobInProgress.convertTrackerNameToHostName(trackerName), 0,
taskStatuses, 0, mapSlotsPerTracker, reduceSlotsPerTracker);
}
public void testClusterMetrics() throws IOException, InterruptedException {
assertEquals("tasktracker count doesn't match", trackers.length,
cluster.getClusterStatus().getTaskTrackerCount());
List<TaskStatus> list = new ArrayList<TaskStatus>();
// create a map task status, which uses 2 slots.
int mapSlotsPerTask = 2;
TaskStatus ts = TaskStatus.createTaskStatus(true,
new TaskAttemptID("jt", 1, TaskType.MAP, 0, 0), 0.0f, mapSlotsPerTask,
TaskStatus.State.RUNNING, "", "", trackers[0],
TaskStatus.Phase.MAP, null);
list.add(ts);
// create a reduce task status, which uses 1 slot.
int reduceSlotsPerTask = 1;
ts = TaskStatus.createTaskStatus(false,
new TaskAttemptID("jt", 1, TaskType.REDUCE, 0, 0), 0.0f,
reduceSlotsPerTask,
TaskStatus.State.RUNNING, "", "", trackers[0],
TaskStatus.Phase.REDUCE, null);
list.add(ts);
// create TaskTrackerStatus and send heartbeats
TaskTrackerStatus[] status = new TaskTrackerStatus[trackers.length];
status[0] = getTTStatus(trackers[0], list);
status[1] = getTTStatus(trackers[1], new ArrayList<TaskStatus>());
status[2] = getTTStatus(trackers[2], new ArrayList<TaskStatus>());
for (int i = 0; i< trackers.length; i++) {
FakeObjectUtilities.sendHeartBeat(jobTracker, status[i], false,
trackers[i], responseId);
}
responseId++;
// assert ClusterMetrics
ClusterMetrics metrics = cluster.getClusterStatus();
assertEquals("occupied map slots do not match", mapSlotsPerTask,
metrics.getOccupiedMapSlots());
assertEquals("occupied reduce slots do not match", reduceSlotsPerTask,
metrics.getOccupiedReduceSlots());
assertEquals("map slot capacities do not match",
mapSlotsPerTracker * trackers.length,
metrics.getMapSlotCapacity());
assertEquals("reduce slot capacities do not match",
reduceSlotsPerTracker * trackers.length,
metrics.getReduceSlotCapacity());
// assert the values in ClusterStatus also
assertEquals("running map tasks do not match", 1,
jobTracker.getClusterStatus().getMapTasks());
assertEquals("running reduce tasks do not match", 1,
jobTracker.getClusterStatus().getReduceTasks());
assertEquals("map slot capacities do not match",
mapSlotsPerTracker * trackers.length,
jobTracker.getClusterStatus().getMaxMapTasks());
assertEquals("reduce slot capacities do not match",
reduceSlotsPerTracker * trackers.length,
jobTracker.getClusterStatus().getMaxReduceTasks());
cluster.close();
}
}