blob: 903e8d79c82b78a8a547878a3ac8a3f7fe856708 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.mapred;
import java.util.List;
import java.util.concurrent.TimeUnit;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.mapreduce.jobhistory.JobHistory;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.JobID;
import org.apache.hadoop.mapreduce.MapReduceTestUtil;
import org.apache.hadoop.mapreduce.TaskType;
import org.apache.hadoop.mapreduce.TestNoJobSetupCleanup.MyOutputFormat;
import org.apache.hadoop.mapred.MiniMRCluster;
import org.apache.hadoop.mapred.ProgressSplitsBlock;
import org.apache.hadoop.mapreduce.server.tasktracker.TTConfig;
import org.apache.hadoop.mapreduce.jobhistory.JobHistory.JobHistoryRecordRetriever;
import org.apache.hadoop.tools.rumen.TraceBuilder;
import org.apache.hadoop.tools.rumen.ZombieJobProducer;
import org.apache.hadoop.tools.rumen.ZombieJob;
import org.apache.hadoop.tools.rumen.LoggedJob;
import org.apache.hadoop.tools.rumen.LoggedTask;
import org.apache.hadoop.tools.rumen.LoggedTaskAttempt;
import org.apache.hadoop.tools.rumen.TaskAttemptInfo;
import org.apache.hadoop.tools.rumen.ZombieCluster;
import org.apache.hadoop.tools.rumen.MachineNode;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
import org.junit.Test;
import static org.junit.Assert.*;
public class TestTaskPerformanceSplitTranscription {
// This testcase runs a job in a mini cluster, and then it verifies
// that splits are stored in the resulting trace, and also
// retrievable from the ZombieJob resulting from reading the trace.
//
// We can't test for any particular values, unfortunately.
@Test
public void testTranscription() throws Exception {
final Configuration conf = new Configuration();
final FileSystem lfs = FileSystem.getLocal(conf);
final Path rootTempDir =
new Path(System.getProperty("test.build.data", "/tmp")).makeQualified(
lfs.getUri(), lfs.getWorkingDirectory());
final Path tempDir = new Path(rootTempDir, "testTranscription");
lfs.delete(tempDir, true);
// Run a MR job
// create a MR cluster
conf.setInt(TTConfig.TT_MAP_SLOTS, 1);
conf.setInt(TTConfig.TT_REDUCE_SLOTS, 1);
final MiniMRCluster mrCluster
= new MiniMRCluster(1, "file:///", 1, null, null,
new JobConf(conf));
final JobTracker tracker = mrCluster.getJobTrackerRunner().getJobTracker();
final JobHistory history = tracker.getJobHistory();
// run a job
Path inDir = new Path(tempDir, "input");
Path outDir = new Path(tempDir, "output");
ZombieJobProducer story = null;
boolean success = false;
try {
JobConf jConf = mrCluster.createJobConf();
// disable uberization (MR-1220) until JobHistory supports uber-specific
// events with complete split info (for Rumen network topology)
jConf.setBoolean(JobContext.JOB_UBERTASK_ENABLE, false);
// construct a job with 1 map and 1 reduce task.
Job job = MapReduceTestUtil.createJob(jConf, inDir, outDir, 1, 1);
// disable setup/cleanup
job.setJobSetupCleanupNeeded(false);
// set the output format to take care of the _temporary folder
job.setOutputFormatClass(MyOutputFormat.class);
// wait for the job to complete
job.waitForCompletion(false);
assertTrue("Job failed", job.isSuccessful());
JobID id = job.getJobID();
Path inputPath = null;
// wait for 10 secs for the jobhistory file to move into the done folder
for (int i = 0; i < 100; ++i) {
JobHistoryRecordRetriever retriever
= history.getMatchingJobs(null, "", null, id.toString());
if (retriever.hasNext()) {
inputPath = retriever.next().getPath();
System.out.println("History file = " + inputPath);
break;
}
Thread.currentThread().sleep(100);
}
assertTrue("Missing job history file", lfs.exists(inputPath));
System.out.println("testTranscription() input history file is "
+ inputPath.toString());
final Path topologyPath = new Path(tempDir, "dispatch-topology.json");
final Path tracePath = new Path(tempDir, "dispatch-trace.json");
System.err.println("testTranscription() output .json file is "
+ tracePath.toString());
String[] args =
{ tracePath.toString(), topologyPath.toString(), inputPath.toString() };
Tool analyzer = new TraceBuilder();
int result = ToolRunner.run(analyzer, args);
assertEquals("Non-zero exit", 0, result);
MachineNode.Builder builder = new MachineNode.Builder("node.megacorp.com", 0);
MachineNode node = builder.build();
ZombieCluster cluster = new ZombieCluster(topologyPath, node, jConf);
story = new ZombieJobProducer(tracePath, cluster, jConf);
// test that the logged* has everything down to the split vector
ZombieJob theZombieJob = story.getNextJob();
LoggedJob theJob = theZombieJob.getLoggedJob();
LoggedTask firstMapTask = theJob.getMapTasks().get(0);
LoggedTaskAttempt firstAttempt = firstMapTask.getAttempts().get(0);
assertTrue("No clock splits were stored",
firstAttempt.getClockSplits().size() > 0);
TaskAttemptInfo attemptInfo
= theZombieJob.getTaskAttemptInfo(TaskType.MAP, 0, 0);
assertEquals("Can't retrieve clock splits from the LoggedTaskAttempt",
attemptInfo.getSplitVector
(LoggedTaskAttempt.SplitVectorKind.WALLCLOCK_TIME)
.size(),
ProgressSplitsBlock.DEFAULT_NUMBER_PROGRESS_SPLITS);
// test that the ZombieJob can deliver splits vectors
TaskAttemptInfo tinfo = theZombieJob.getTaskAttemptInfo(TaskType.MAP, 0, 0);
List<Integer> splitVector
= tinfo.getSplitVector(LoggedTaskAttempt.SplitVectorKind.WALLCLOCK_TIME);
assertEquals("Can't retrieve clock splits from the ZombieJob",
splitVector.size(), ProgressSplitsBlock.DEFAULT_NUMBER_PROGRESS_SPLITS);
success = true;
} finally {
if (success) {
lfs.delete(tempDir, true);
}
if (story != null) {
story.close();
}
}
}
}