blob: ae41408b2fcd1683b9e1640434429d886a412a46 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.tools.rumen;
import java.io.BufferedInputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.io.PrintStream;
import java.util.ArrayList;
import java.util.List;
import java.util.Properties;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.compress.CodecPool;
import org.apache.hadoop.io.compress.CompressionCodec;
import org.apache.hadoop.io.compress.CompressionCodecFactory;
import org.apache.hadoop.io.compress.Compressor;
import org.apache.hadoop.mapreduce.TaskAttemptID;
import org.apache.hadoop.mapreduce.TaskID;
import org.apache.hadoop.mapreduce.TaskType;
import org.apache.hadoop.mapreduce.jobhistory.HistoryEvent;
import org.apache.hadoop.mapreduce.jobhistory.TaskAttemptFinishedEvent;
import org.apache.hadoop.mapreduce.jobhistory.TaskAttemptUnsuccessfulCompletionEvent;
import org.apache.hadoop.mapreduce.jobhistory.TaskStartedEvent;
import org.apache.hadoop.util.LineReader;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
import org.junit.Test;
import static org.junit.Assert.*;
public class TestRumenJobTraces {
@Test
public void testSmallTrace() throws Exception {
performSingleTest("sample-job-tracker-logs.gz",
"job-tracker-logs-topology-output", "job-tracker-logs-trace-output.gz");
}
@Test
public void testTruncatedTask() throws Exception {
performSingleTest("truncated-job-tracker-log", "truncated-topology-output",
"truncated-trace-output");
}
private void performSingleTest(String jtLogName, String goldTopology,
String goldTrace) throws Exception {
final Configuration conf = new Configuration();
final FileSystem lfs = FileSystem.getLocal(conf);
final Path rootInputDir =
new Path(System.getProperty("test.tools.input.dir", "")).makeQualified(
lfs.getUri(), lfs.getWorkingDirectory());
final Path rootTempDir =
new Path(System.getProperty("test.build.data", "/tmp")).makeQualified(
lfs.getUri(), lfs.getWorkingDirectory());
final Path rootInputFile = new Path(rootInputDir, "rumen/small-trace-test");
final Path tempDir = new Path(rootTempDir, "TestRumenJobTraces");
lfs.delete(tempDir, true);
final Path topologyFile = new Path(tempDir, jtLogName + "-topology.json");
final Path traceFile = new Path(tempDir, jtLogName + "-trace.json");
final Path inputFile = new Path(rootInputFile, jtLogName);
System.out.println("topology result file = " + topologyFile);
System.out.println("trace result file = " + traceFile);
String[] args = new String[6];
args[0] = "-v1";
args[1] = "-write-topology";
args[2] = topologyFile.toString();
args[3] = "-write-job-trace";
args[4] = traceFile.toString();
args[5] = inputFile.toString();
final Path topologyGoldFile = new Path(rootInputFile, goldTopology);
final Path traceGoldFile = new Path(rootInputFile, goldTrace);
@SuppressWarnings("deprecation")
HadoopLogsAnalyzer analyzer = new HadoopLogsAnalyzer();
int result = ToolRunner.run(analyzer, args);
assertEquals("Non-zero exit", 0, result);
TestRumenJobTraces
.<LoggedNetworkTopology> jsonFileMatchesGold(conf, topologyFile,
topologyGoldFile, LoggedNetworkTopology.class, "topology");
TestRumenJobTraces.<LoggedJob> jsonFileMatchesGold(conf, traceFile,
traceGoldFile, LoggedJob.class, "trace");
}
@Test
public void testRumenViaDispatch() throws Exception {
final Configuration conf = new Configuration();
final FileSystem lfs = FileSystem.getLocal(conf);
final Path rootInputDir =
new Path(System.getProperty("test.tools.input.dir", "")).makeQualified(
lfs.getUri(), lfs.getWorkingDirectory());
final Path rootTempDir =
new Path(System.getProperty("test.build.data", "/tmp")).makeQualified(
lfs.getUri(), lfs.getWorkingDirectory());
final Path rootInputPath = new Path(rootInputDir, "rumen/small-trace-test");
final Path tempDir = new Path(rootTempDir, "TestRumenViaDispatch");
lfs.delete(tempDir, true);
final Path topologyPath = new Path(tempDir, "dispatch-topology.json");
final Path tracePath = new Path(tempDir, "dispatch-trace.json");
final Path inputPath =
new Path(rootInputPath, "dispatch-sample-v20-jt-log.gz");
System.out.println("topology result file = " + topologyPath);
System.out.println("testRumenViaDispatch() trace result file = " + tracePath);
String demuxerClassName = ConcatenatedInputFilesDemuxer.class.getName();
String[] args =
{ "-demuxer", demuxerClassName, tracePath.toString(),
topologyPath.toString(), inputPath.toString() };
final Path topologyGoldFile =
new Path(rootInputPath, "dispatch-topology-output.json.gz");
final Path traceGoldFile =
new Path(rootInputPath, "dispatch-trace-output.json.gz");
Tool analyzer = new TraceBuilder();
int result = ToolRunner.run(analyzer, args);
assertEquals("Non-zero exit", 0, result);
TestRumenJobTraces
.<LoggedNetworkTopology> jsonFileMatchesGold(conf, topologyPath,
topologyGoldFile, LoggedNetworkTopology.class, "topology");
TestRumenJobTraces.<LoggedJob> jsonFileMatchesGold(conf, tracePath,
traceGoldFile, LoggedJob.class, "trace");
}
@Test
public void testBracketedCounters() throws Exception {
final Configuration conf = new Configuration();
final FileSystem lfs = FileSystem.getLocal(conf);
final Path rootInputDir =
new Path(System.getProperty("test.tools.input.dir", "")).makeQualified(
lfs.getUri(), lfs.getWorkingDirectory());
final Path rootTempDir =
new Path(System.getProperty("test.build.data", "/tmp")).makeQualified(
lfs.getUri(), lfs.getWorkingDirectory());
final Path rootInputPath = new Path(rootInputDir, "rumen/small-trace-test");
final Path tempDir = new Path(rootTempDir, "TestBracketedCounters");
lfs.delete(tempDir, true);
final Path topologyPath = new Path(tempDir, "dispatch-topology.json");
final Path tracePath = new Path(tempDir, "dispatch-trace.json");
final Path inputPath = new Path(rootInputPath, "counters-format-test-logs");
System.out.println("topology result file = " + topologyPath);
System.out.println("testBracketedCounters() trace result file = " + tracePath);
final Path goldPath =
new Path(rootInputPath, "counters-test-trace.json.gz");
String[] args =
{ tracePath.toString(), topologyPath.toString(), inputPath.toString() };
Tool analyzer = new TraceBuilder();
int result = ToolRunner.run(analyzer, args);
assertEquals("Non-zero exit", 0, result);
TestRumenJobTraces.<LoggedJob> jsonFileMatchesGold(conf, tracePath,
goldPath, LoggedJob.class, "trace");
}
@Test
public void testHadoop20JHParser() throws Exception {
final Configuration conf = new Configuration();
final FileSystem lfs = FileSystem.getLocal(conf);
boolean success = false;
final Path rootInputDir =
new Path(System.getProperty("test.tools.input.dir", "")).makeQualified(
lfs.getUri(), lfs.getWorkingDirectory());
final Path rootTempDir =
new Path(System.getProperty("test.build.data", "/tmp")).makeQualified(
lfs.getUri(), lfs.getWorkingDirectory());
final Path rootInputPath = new Path(rootInputDir, "rumen/small-trace-test");
final Path tempDir = new Path(rootTempDir, "TestRumenViaDispatch");
lfs.delete(tempDir, true);
final Path inputPath = new Path(rootInputPath, "v20-single-input-log.gz");
final Path goldPath =
new Path(rootInputPath, "v20-single-input-log-event-classes.text.gz");
InputStream inputLogStream =
new PossiblyDecompressedInputStream(inputPath, conf);
InputStream inputGoldStream =
new PossiblyDecompressedInputStream(goldPath, conf);
BufferedInputStream bis = new BufferedInputStream(inputLogStream);
bis.mark(10000);
Hadoop20JHParser parser = new Hadoop20JHParser(bis);
final Path resultPath = new Path(tempDir, "result.text");
System.out.println("testHadoop20JHParser sent its output to " + resultPath);
Compressor compressor;
FileSystem fs = resultPath.getFileSystem(conf);
CompressionCodec codec =
new CompressionCodecFactory(conf).getCodec(resultPath);
OutputStream output;
if (codec != null) {
compressor = CodecPool.getCompressor(codec);
output = codec.createOutputStream(fs.create(resultPath), compressor);
} else {
output = fs.create(resultPath);
}
PrintStream printStream = new PrintStream(output);
try {
assertEquals("Hadoop20JHParser can't parse the test file", true,
Hadoop20JHParser.canParse(inputLogStream));
bis.reset();
HistoryEvent event = parser.nextEvent();
while (event != null) {
printStream.println(event.getClass().getCanonicalName());
event = parser.nextEvent();
}
printStream.close();
LineReader goldLines = new LineReader(inputGoldStream);
LineReader resultLines =
new LineReader(new PossiblyDecompressedInputStream(resultPath, conf));
int lineNumber = 1;
try {
Text goldLine = new Text();
Text resultLine = new Text();
int goldRead = goldLines.readLine(goldLine);
int resultRead = resultLines.readLine(resultLine);
while (goldRead * resultRead != 0) {
if (!goldLine.equals(resultLine)) {
assertEquals("Type mismatch detected", goldLine, resultLine);
break;
}
goldRead = goldLines.readLine(goldLine);
resultRead = resultLines.readLine(resultLine);
++lineNumber;
}
if (goldRead != resultRead) {
assertEquals("the " + (goldRead > resultRead ? "gold" : resultRead)
+ " file contains more text at line " + lineNumber, goldRead,
resultRead);
}
success = true;
} finally {
goldLines.close();
resultLines.close();
if (success) {
lfs.delete(resultPath, false);
}
}
} finally {
if (parser == null) {
inputLogStream.close();
} else {
if (parser != null) {
parser.close();
}
}
if (inputGoldStream != null) {
inputGoldStream.close();
}
// it's okay to do this twice [if we get an error on input]
printStream.close();
}
}
@Test
public void testJobConfigurationParser() throws Exception {
String[] list1 =
{ "mapred.job.queue.name", "mapreduce.job.name",
"mapred.child.java.opts" };
String[] list2 = { "mapred.job.queue.name", "mapred.child.java.opts" };
List<String> interested1 = new ArrayList<String>();
for (String interested : list1) {
interested1.add(interested);
}
List<String> interested2 = new ArrayList<String>();
for (String interested : list2) {
interested2.add(interested);
}
JobConfigurationParser jcp1 = new JobConfigurationParser(interested1);
JobConfigurationParser jcp2 = new JobConfigurationParser(interested2);
final Configuration conf = new Configuration();
final FileSystem lfs = FileSystem.getLocal(conf);
@SuppressWarnings("deprecation")
final Path rootInputDir =
new Path(System.getProperty("test.tools.input.dir", ""))
.makeQualified(lfs);
final Path rootInputPath = new Path(rootInputDir, "rumen/small-trace-test");
final Path inputPath = new Path(rootInputPath, "sample-conf.file.xml");
InputStream inputConfStream =
new PossiblyDecompressedInputStream(inputPath, conf);
try {
Properties props1 = jcp1.parse(inputConfStream);
inputConfStream.close();
inputConfStream = new PossiblyDecompressedInputStream(inputPath, conf);
Properties props2 = jcp2.parse(inputConfStream);
assertEquals("testJobConfigurationParser: wrong number of properties", 3,
props1.size());
assertEquals("testJobConfigurationParser: wrong number of properties", 2,
props2.size());
assertEquals("prop test 1", "TheQueue", props1
.get("mapred.job.queue.name"));
assertEquals("prop test 2", "job_0001", props1.get("mapreduce.job.name"));
assertEquals("prop test 3",
"-server -Xmx640m -Djava.net.preferIPv4Stack=true", props1
.get("mapred.child.java.opts"));
assertEquals("prop test 4", "TheQueue", props2
.get("mapred.job.queue.name"));
assertEquals("prop test 5",
"-server -Xmx640m -Djava.net.preferIPv4Stack=true", props2
.get("mapred.child.java.opts"));
} finally {
inputConfStream.close();
}
}
@Test
public void testTopologyBuilder() throws Exception {
final TopologyBuilder subject = new TopologyBuilder();
// currently we extract no host names from the Properties
subject.process(new Properties());
subject.process(new TaskAttemptFinishedEvent(TaskAttemptID
.forName("attempt_200904211745_0003_m_000004_0"), TaskType
.valueOf("MAP"), "STATUS", 1234567890L,
"/194\\.6\\.134\\.64/cluster50261\\.secondleveldomain\\.com",
"SUCCESS", null));
subject.process(new TaskAttemptUnsuccessfulCompletionEvent(TaskAttemptID
.forName("attempt_200904211745_0003_m_000004_1"), TaskType
.valueOf("MAP"), "STATUS", 1234567890L,
"/194\\.6\\.134\\.80/cluster50262\\.secondleveldomain\\.com",
"MACHINE_EXPLODED"));
subject.process(new TaskAttemptUnsuccessfulCompletionEvent(TaskAttemptID
.forName("attempt_200904211745_0003_m_000004_2"), TaskType
.valueOf("MAP"), "STATUS", 1234567890L,
"/194\\.6\\.134\\.80/cluster50263\\.secondleveldomain\\.com",
"MACHINE_EXPLODED"));
subject.process(new TaskStartedEvent(TaskID
.forName("task_200904211745_0003_m_000004"), 1234567890L, TaskType
.valueOf("MAP"),
"/194\\.6\\.134\\.80/cluster50263\\.secondleveldomain\\.com"));
final LoggedNetworkTopology topology = subject.build();
List<LoggedNetworkTopology> racks = topology.getChildren();
assertEquals("Wrong number of racks", 2, racks.size());
boolean sawSingleton = false;
boolean sawDoubleton = false;
for (LoggedNetworkTopology rack : racks) {
List<LoggedNetworkTopology> nodes = rack.getChildren();
if (rack.getName().endsWith(".64")) {
assertEquals("The singleton rack has the wrong number of elements", 1,
nodes.size());
sawSingleton = true;
} else if (rack.getName().endsWith(".80")) {
assertEquals("The doubleton rack has the wrong number of elements", 2,
nodes.size());
sawDoubleton = true;
} else {
assertTrue("Unrecognized rack name", false);
}
}
assertTrue("Did not see singleton rack", sawSingleton);
assertTrue("Did not see doubleton rack", sawDoubleton);
}
static private <T extends DeepCompare> void jsonFileMatchesGold(
Configuration conf, Path result, Path gold, Class<? extends T> clazz,
String fileDescription) throws IOException {
JsonObjectMapperParser<T> goldParser =
new JsonObjectMapperParser<T>(gold, clazz, conf);
JsonObjectMapperParser<T> resultParser =
new JsonObjectMapperParser<T>(result, clazz, conf);
try {
while (true) {
DeepCompare goldJob = goldParser.getNext();
DeepCompare resultJob = resultParser.getNext();
if ((goldJob == null) || (resultJob == null)) {
assertTrue(goldJob == resultJob);
break;
}
try {
resultJob.deepCompare(goldJob, new TreePath(null, "<root>"));
} catch (DeepInequalityException e) {
String error = e.path.toString();
assertFalse(fileDescription + " mismatches: " + error, true);
}
}
} finally {
IOUtils.cleanup(null, goldParser, resultParser);
}
}
}