blob: f84984346eadbfb5cbb885568b39a9be60cd01eb [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.mapred.gridmix;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.test.GenericTestUtils;
import org.apache.hadoop.tools.rumen.JobStory;
import org.apache.hadoop.tools.rumen.JobStoryProducer;
import org.apache.hadoop.util.ExitUtil;
import org.junit.AfterClass;
import org.junit.BeforeClass;
import org.junit.Test;
import org.slf4j.event.Level;
import java.io.ByteArrayOutputStream;
import java.io.File;
import java.io.InputStream;
import java.io.IOException;
import java.io.PrintStream;
import java.util.zip.GZIPInputStream;
import static org.apache.hadoop.test.GenericTestUtils.assertExceptionContains;
import static org.junit.Assert.*;
import static org.slf4j.LoggerFactory.getLogger;
public class TestGridmixSubmission extends CommonJobTest {
private static File inSpace = new File("src" + File.separator + "test"
+ File.separator + "resources" + File.separator + "data");
static {
GenericTestUtils.setLogLevel(
getLogger("org.apache.hadoop.mapred.gridmix"), Level.DEBUG);
}
@BeforeClass
public static void init() throws IOException {
GridmixTestUtils.initCluster(TestGridmixSubmission.class);
System.setProperty("src.test.data", inSpace.getAbsolutePath());
}
@AfterClass
public static void shutDown() throws IOException {
GridmixTestUtils.shutdownCluster();
}
/**
* Verifies that the given {@code JobStory} corresponds to the checked-in
* WordCount {@code JobStory}. The verification is effected via JUnit
* assertions.
*
* @param js the candidate JobStory.
*/
private void verifyWordCountJobStory(JobStory js) {
assertNotNull("Null JobStory", js);
String expectedJobStory = "WordCount:johndoe:default:1285322645148:3:1";
String actualJobStory = js.getName() + ":" + js.getUser() + ":"
+ js.getQueueName() + ":" + js.getSubmissionTime() + ":"
+ js.getNumberMaps() + ":" + js.getNumberReduces();
assertEquals("Unexpected JobStory", expectedJobStory, actualJobStory);
}
/**
* Expands a file compressed using {@code gzip}.
*
* @param fs the {@code FileSystem} corresponding to the given file.
* @param in the path to the compressed file.
* @param out the path to the uncompressed output.
* @throws Exception if there was an error during the operation.
*/
private void expandGzippedTrace(FileSystem fs, Path in, Path out)
throws Exception {
byte[] buff = new byte[4096];
GZIPInputStream gis = new GZIPInputStream(fs.open(in));
FSDataOutputStream fsdOs = fs.create(out);
int numRead;
while ((numRead = gis.read(buff, 0, buff.length)) != -1) {
fsdOs.write(buff, 0, numRead);
}
gis.close();
fsdOs.close();
}
/**
* Tests the reading of traces in GridMix3. These traces are generated by
* Rumen and are in the JSON format. The traces can optionally be compressed
* and uncompressed traces can also be passed to GridMix3 via its standard
* input stream. The testing is effected via JUnit assertions.
*
* @throws Exception if there was an error.
*/
@Test (timeout=20000)
public void testTraceReader() throws Exception {
Configuration conf = new Configuration();
FileSystem lfs = FileSystem.getLocal(conf);
Path rootInputDir = new Path(System.getProperty("src.test.data"));
rootInputDir = rootInputDir.makeQualified(lfs.getUri(),
lfs.getWorkingDirectory());
Path rootTempDir = new Path(System.getProperty("test.build.data",
System.getProperty("java.io.tmpdir")), "testTraceReader");
rootTempDir = rootTempDir.makeQualified(lfs.getUri(),
lfs.getWorkingDirectory());
Path inputFile = new Path(rootInputDir, "wordcount.json.gz");
Path tempFile = new Path(rootTempDir, "gridmix3-wc.json");
InputStream origStdIn = System.in;
InputStream tmpIs = null;
try {
DebugGridmix dgm = new DebugGridmix();
JobStoryProducer jsp = dgm.createJobStoryProducer(inputFile.toString(),
conf);
LOG.info("Verifying JobStory from compressed trace...");
verifyWordCountJobStory(jsp.getNextJob());
expandGzippedTrace(lfs, inputFile, tempFile);
jsp = dgm.createJobStoryProducer(tempFile.toString(), conf);
LOG.info("Verifying JobStory from uncompressed trace...");
verifyWordCountJobStory(jsp.getNextJob());
tmpIs = lfs.open(tempFile);
System.setIn(tmpIs);
LOG.info("Verifying JobStory from trace in standard input...");
jsp = dgm.createJobStoryProducer("-", conf);
verifyWordCountJobStory(jsp.getNextJob());
} finally {
System.setIn(origStdIn);
if (tmpIs != null) {
tmpIs.close();
}
lfs.delete(rootTempDir, true);
}
}
@Test (timeout=500000)
public void testReplaySubmit() throws Exception {
policy = GridmixJobSubmissionPolicy.REPLAY;
LOG.info(" Replay started at " + System.currentTimeMillis());
doSubmission(null, false);
LOG.info(" Replay ended at " + System.currentTimeMillis());
}
@Test (timeout=500000)
public void testStressSubmit() throws Exception {
policy = GridmixJobSubmissionPolicy.STRESS;
LOG.info(" Stress started at " + System.currentTimeMillis());
doSubmission(null, false);
LOG.info(" Stress ended at " + System.currentTimeMillis());
}
// test empty request should be hint message
@Test (timeout=100000)
public void testMain() throws Exception {
SecurityManager securityManager = System.getSecurityManager();
final ByteArrayOutputStream bytes = new ByteArrayOutputStream();
final PrintStream out = new PrintStream(bytes);
final PrintStream oldOut = System.out;
System.setErr(out);
ExitUtil.disableSystemExit();
try {
String[] argv = new String[0];
DebugGridmix.main(argv);
} catch (ExitUtil.ExitException e) {
assertExceptionContains(ExitUtil.EXIT_EXCEPTION_MESSAGE, e);
ExitUtil.resetFirstExitException();
} finally {
System.setErr(oldOut);
System.setSecurityManager(securityManager);
}
String print = bytes.toString();
// should be printed tip in std error stream
assertTrue(print
.contains("Usage: gridmix [-generate <MiB>] [-users URI] [-Dname=value ...] <iopath> <trace>"));
assertTrue(print.contains("e.g. gridmix -generate 100m foo -"));
}
}