blob: 306d1ba48607e87a4c220ca63b1919236be84fa7 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.tools.rumen;
import java.util.List;
import java.util.ArrayList;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.mapred.TaskStatus.State;
import org.apache.hadoop.mapreduce.TaskType;
import org.junit.Before;
import org.junit.Test;
import static org.junit.Assert.*;
public class TestZombieJob {
final double epsilon = 0.01;
private final int[] attemptTimesPercentiles = new int[] { 10, 50, 90 };
private long[] succeededCDF = new long[] { 5268, 5268, 5268, 5268, 5268 };
private long[] failedCDF = new long[] { 18592, 18592, 18592, 18592, 18592 };
private double[] expectedPs = new double[] { 0.000001, 0.18707660239708182,
0.0013027618551328818, 2.605523710265763E-4 };
private final long[] mapTaskCounts = new long[] { 7838525L, 342277L, 100228L,
1564L, 1234L };
private final long[] reduceTaskCounts = new long[] { 4405338L, 139391L,
1514383L, 139391, 1234L };
List<LoggedJob> loggedJobs = new ArrayList<LoggedJob>();
List<JobStory> jobStories = new ArrayList<JobStory>();
@Before
public void setUp() throws Exception {
final Configuration conf = new Configuration();
final FileSystem lfs = FileSystem.getLocal(conf);
final Path rootInputDir = new Path(
System.getProperty("test.tools.input.dir", "")).makeQualified(lfs);
final Path rootInputFile = new Path(rootInputDir, "rumen/zombie");
ZombieJobProducer parser = new ZombieJobProducer(new Path(rootInputFile,
"input-trace.json"), new ZombieCluster(new Path(rootInputFile,
"input-topology.json"), null, conf), conf);
JobStory job = null;
for (int i = 0; i < 4; i++) {
job = parser.getNextJob();
ZombieJob zJob = (ZombieJob) job;
LoggedJob loggedJob = zJob.getLoggedJob();
System.out.println(i + ":" + job.getNumberMaps() + "m, "
+ job.getNumberReduces() + "r");
System.out
.println(loggedJob.getOutcome() + ", " + loggedJob.getJobtype());
System.out.println("Input Splits -- " + job.getInputSplits().length
+ ", " + job.getNumberMaps());
System.out.println("Successful Map CDF -------");
for (LoggedDiscreteCDF cdf : loggedJob.getSuccessfulMapAttemptCDFs()) {
System.out.println(cdf.getNumberValues() + ": " + cdf.getMinimum()
+ "--" + cdf.getMaximum());
for (LoggedSingleRelativeRanking ranking : cdf.getRankings()) {
System.out.println(" " + ranking.getRelativeRanking() + ":"
+ ranking.getDatum());
}
}
System.out.println("Failed Map CDF -----------");
for (LoggedDiscreteCDF cdf : loggedJob.getFailedMapAttemptCDFs()) {
System.out.println(cdf.getNumberValues() + ": " + cdf.getMinimum()
+ "--" + cdf.getMaximum());
for (LoggedSingleRelativeRanking ranking : cdf.getRankings()) {
System.out.println(" " + ranking.getRelativeRanking() + ":"
+ ranking.getDatum());
}
}
System.out.println("Successful Reduce CDF ----");
LoggedDiscreteCDF cdf = loggedJob.getSuccessfulReduceAttemptCDF();
System.out.println(cdf.getNumberValues() + ": " + cdf.getMinimum() + "--"
+ cdf.getMaximum());
for (LoggedSingleRelativeRanking ranking : cdf.getRankings()) {
System.out.println(" " + ranking.getRelativeRanking() + ":"
+ ranking.getDatum());
}
System.out.println("Failed Reduce CDF --------");
cdf = loggedJob.getFailedReduceAttemptCDF();
System.out.println(cdf.getNumberValues() + ": " + cdf.getMinimum() + "--"
+ cdf.getMaximum());
for (LoggedSingleRelativeRanking ranking : cdf.getRankings()) {
System.out.println(" " + ranking.getRelativeRanking() + ":"
+ ranking.getDatum());
}
System.out.print("map attempts to success -- ");
for (double p : loggedJob.getMapperTriesToSucceed()) {
System.out.print(p + ", ");
}
System.out.println();
System.out.println("===============");
loggedJobs.add(loggedJob);
jobStories.add(job);
}
}
@Test
public void testFirstJob() {
// 20th job seems reasonable: "totalMaps":329,"totalReduces":101
// successful map: 80 node-local, 196 rack-local, 53 rack-remote, 2 unknown
// failed map: 0-0-0-1
// successful reduce: 99 failed reduce: 13
// map attempts to success -- 0.9969879518072289, 0.0030120481927710845,
JobStory job = jobStories.get(0);
assertEquals(1, job.getNumberMaps());
assertEquals(1, job.getNumberReduces());
// get splits
TaskAttemptInfo taInfo = null;
long expectedRuntime = 2423;
// get a succeeded map task attempt, expect the exact same task attempt
taInfo = job.getMapTaskAttemptInfoAdjusted(14, 0, 1);
assertEquals(expectedRuntime, taInfo.getRuntime());
assertEquals(State.SUCCEEDED, taInfo.getRunState());
// get a succeeded map attempt, but reschedule with different locality.
taInfo = job.getMapTaskAttemptInfoAdjusted(14, 0, 2);
assertEquals(State.SUCCEEDED, taInfo.getRunState());
taInfo = job.getMapTaskAttemptInfoAdjusted(14, 0, 0);
assertEquals(State.SUCCEEDED, taInfo.getRunState());
expectedRuntime = 97502;
// get a succeeded reduce task attempt, expect the exact same task attempt
taInfo = job.getTaskAttemptInfo(TaskType.REDUCE, 14, 0);
assertEquals(State.SUCCEEDED, taInfo.getRunState());
// get a failed reduce task attempt, expect the exact same task attempt
taInfo = job.getTaskAttemptInfo(TaskType.REDUCE, 14, 0);
assertEquals(State.SUCCEEDED, taInfo.getRunState());
// get a non-exist reduce task attempt, expect a made-up task attempt
// TODO fill in test case
}
@Test
public void testSecondJob() {
// 7th job has many failed tasks.
// 3204 m, 0 r
// successful maps 497-586-23-1, failed maps 0-0-0-2714
// map attempts to success -- 0.8113600833767587, 0.18707660239708182,
// 0.0013027618551328818, 2.605523710265763E-4,
JobStory job = jobStories.get(1);
assertEquals(20, job.getNumberMaps());
assertEquals(1, job.getNumberReduces());
TaskAttemptInfo taInfo = null;
// get a succeeded map task attempt
taInfo = job.getMapTaskAttemptInfoAdjusted(17, 1, 1);
assertEquals(State.SUCCEEDED, taInfo.getRunState());
// get a succeeded map task attempt, with different locality
taInfo = job.getMapTaskAttemptInfoAdjusted(17, 1, 2);
assertEquals(State.SUCCEEDED, taInfo.getRunState());
taInfo = job.getMapTaskAttemptInfoAdjusted(17, 1, 0);
assertEquals(State.SUCCEEDED, taInfo.getRunState());
// get a failed map task attempt
taInfo = job.getMapTaskAttemptInfoAdjusted(14, 0, 1);
assertEquals(1927, taInfo.getRuntime());
assertEquals(State.SUCCEEDED, taInfo.getRunState());
// get a failed map task attempt, with different locality
// TODO: this test does not make sense here, because I don't have
// available data set.
}
@Test
public void testFourthJob() {
// 7th job has many failed tasks.
// 3204 m, 0 r
// successful maps 497-586-23-1, failed maps 0-0-0-2714
// map attempts to success -- 0.8113600833767587, 0.18707660239708182,
// 0.0013027618551328818, 2.605523710265763E-4,
JobStory job = jobStories.get(3);
assertEquals(131, job.getNumberMaps());
assertEquals(47, job.getNumberReduces());
TaskAttemptInfo taInfo = null;
// get a succeeded map task attempt
long runtime = 5268;
taInfo = job.getMapTaskAttemptInfoAdjusted(113, 1, 1);
assertEquals(State.SUCCEEDED, taInfo.getRunState());
assertEquals(runtime, taInfo.getRuntime());
// get a succeeded map task attempt, with different locality
taInfo = job.getMapTaskAttemptInfoAdjusted(113, 1, 2);
assertEquals(State.SUCCEEDED, taInfo.getRunState());
assertEquals(runtime, taInfo.getRuntime() / 2);
taInfo = job.getMapTaskAttemptInfoAdjusted(113, 1, 0);
assertEquals(State.SUCCEEDED, taInfo.getRunState());
assertEquals((long) (runtime / 1.5), taInfo.getRuntime());
// get a failed map task attempt
taInfo = job.getMapTaskAttemptInfoAdjusted(113, 0, 1);
assertEquals(18592, taInfo.getRuntime());
assertEquals(State.FAILED, taInfo.getRunState());
}
@Test
public void testRecordIOInfo() {
JobStory job = jobStories.get(3);
TaskInfo mapTask = job.getTaskInfo(TaskType.MAP, 113);
TaskInfo reduceTask = job.getTaskInfo(TaskType.REDUCE, 0);
assertEquals(mapTaskCounts[0], mapTask.getInputBytes());
assertEquals(mapTaskCounts[1], mapTask.getInputRecords());
assertEquals(mapTaskCounts[2], mapTask.getOutputBytes());
assertEquals(mapTaskCounts[3], mapTask.getOutputRecords());
assertEquals(mapTaskCounts[4], mapTask.getTaskMemory());
assertEquals(reduceTaskCounts[0], reduceTask.getInputBytes());
assertEquals(reduceTaskCounts[1], reduceTask.getInputRecords());
assertEquals(reduceTaskCounts[2], reduceTask.getOutputBytes());
assertEquals(reduceTaskCounts[3], reduceTask.getOutputRecords());
assertEquals(reduceTaskCounts[4], reduceTask.getTaskMemory());
}
@Test
public void testMakeUpInfo() {
// get many non-exist tasks
// total 3204 map tasks, 3300 is a non-exist task.
checkMakeUpTask(jobStories.get(3), 113, 1);
}
private void checkMakeUpTask(JobStory job, int taskNumber, int locality) {
TaskAttemptInfo taInfo = null;
Histogram sampleSucceeded = new Histogram();
Histogram sampleFailed = new Histogram();
List<Integer> sampleAttempts = new ArrayList<Integer>();
for (int i = 0; i < 100000; i++) {
int attemptId = 0;
while (true) {
taInfo = job.getMapTaskAttemptInfoAdjusted(taskNumber, attemptId, 1);
if (taInfo.getRunState() == State.SUCCEEDED) {
sampleSucceeded.enter(taInfo.getRuntime());
break;
}
sampleFailed.enter(taInfo.getRuntime());
attemptId++;
}
sampleAttempts.add(attemptId);
}
// check state distribution
int[] countTries = new int[] { 0, 0, 0, 0 };
for (int attempts : sampleAttempts) {
assertTrue(attempts < 4);
countTries[attempts]++;
}
/*
* System.out.print("Generated map attempts to success -- "); for (int
* count: countTries) { System.out.print((double)count/sampleAttempts.size()
* + ", "); } System.out.println(); System.out.println("===============");
*/
for (int i = 0; i < 4; i++) {
int count = countTries[i];
double p = (double) count / sampleAttempts.size();
assertTrue(expectedPs[i] - p < epsilon);
}
// check succeeded attempts runtime distribution
long[] expectedCDF = succeededCDF;
LoggedDiscreteCDF cdf = new LoggedDiscreteCDF();
cdf.setCDF(sampleSucceeded, attemptTimesPercentiles, 100);
/*
* System.out.println("generated succeeded map runtime distribution");
* System.out.println(cdf.getNumberValues() + ": " + cdf.getMinimum() + "--"
* + cdf.getMaximum()); for (LoggedSingleRelativeRanking ranking:
* cdf.getRankings()) { System.out.println(" " +
* ranking.getRelativeRanking() + ":" + ranking.getDatum()); }
*/
assertRuntimeEqual(cdf.getMinimum(), expectedCDF[0]);
assertRuntimeEqual(cdf.getMaximum(), expectedCDF[4]);
for (int i = 0; i < 3; i++) {
LoggedSingleRelativeRanking ranking = cdf.getRankings().get(i);
assertRuntimeEqual(expectedCDF[i + 1], ranking.getDatum());
}
// check failed attempts runtime distribution
expectedCDF = failedCDF;
cdf = new LoggedDiscreteCDF();
cdf.setCDF(sampleFailed, attemptTimesPercentiles, 100);
System.out.println("generated failed map runtime distribution");
System.out.println(cdf.getNumberValues() + ": " + cdf.getMinimum() + "--"
+ cdf.getMaximum());
for (LoggedSingleRelativeRanking ranking : cdf.getRankings()) {
System.out.println(" " + ranking.getRelativeRanking() + ":"
+ ranking.getDatum());
}
assertRuntimeEqual(cdf.getMinimum(), expectedCDF[0]);
assertRuntimeEqual(cdf.getMaximum(), expectedCDF[4]);
for (int i = 0; i < 3; i++) {
LoggedSingleRelativeRanking ranking = cdf.getRankings().get(i);
assertRuntimeEqual(expectedCDF[i + 1], ranking.getDatum());
}
}
private void assertRuntimeEqual(long expected, long generated) {
if (expected == 0) {
assertTrue(generated > -1000 && generated < 1000);
} else {
long epsilon = Math.max(expected / 10, 5000);
assertTrue(expected - generated > -epsilon);
assertTrue(expected - generated < epsilon);
}
}
}