blob: ea0378d65de02f8343b6f5d8e0e88b3d164eeb20 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.tez.mapreduce;
import java.io.File;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.io.OutputStreamWriter;
import java.net.URI;
import java.net.URISyntaxException;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Random;
import java.util.jar.JarEntry;
import java.util.jar.JarOutputStream;
import javax.tools.JavaCompiler;
import javax.tools.JavaFileObject;
import javax.tools.JavaFileObject.Kind;
import javax.tools.SimpleJavaFileObject;
import javax.tools.StandardJavaFileManager;
import javax.tools.StandardLocation;
import javax.tools.ToolProvider;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hdfs.MiniDFSCluster;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapreduce.lib.output.NullOutputFormat;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.ApplicationReport;
import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
import org.apache.hadoop.yarn.api.records.LocalResource;
import org.apache.hadoop.yarn.api.records.LocalResourceType;
import org.apache.hadoop.yarn.api.records.LocalResourceVisibility;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.api.records.URL;
import org.apache.hadoop.yarn.api.records.YarnApplicationState;
import org.apache.hadoop.yarn.client.api.YarnClient;
import org.apache.hadoop.yarn.exceptions.YarnException;
import org.apache.hadoop.yarn.util.ConverterUtils;
import org.apache.tez.client.TezClientUtils;
import org.apache.tez.client.TezClient;
import org.apache.tez.client.TezAppMasterStatus;
import org.apache.tez.common.ReflectionUtils;
import org.apache.tez.dag.api.DAG;
import org.apache.tez.dag.api.Edge;
import org.apache.tez.dag.api.EdgeProperty;
import org.apache.tez.dag.api.EdgeProperty.DataMovementType;
import org.apache.tez.dag.api.EdgeProperty.DataSourceType;
import org.apache.tez.dag.api.EdgeProperty.SchedulingType;
import org.apache.tez.dag.api.InputDescriptor;
import org.apache.tez.dag.api.OutputDescriptor;
import org.apache.tez.dag.api.ProcessorDescriptor;
import org.apache.tez.dag.api.TezConfiguration;
import org.apache.tez.dag.api.TezException;
import org.apache.tez.dag.api.TezUncheckedException;
import org.apache.tez.dag.api.Vertex;
import org.apache.tez.dag.api.client.DAGClient;
import org.apache.tez.dag.api.client.DAGStatus;
import org.apache.tez.dag.api.client.DAGStatus.State;
import org.apache.tez.mapreduce.common.MRInputAMSplitGenerator;
import org.apache.tez.mapreduce.examples.BroadcastAndOneToOneExample;
import org.apache.tez.mapreduce.examples.MRRSleepJob;
import org.apache.tez.mapreduce.examples.MRRSleepJob.ISleepReducer;
import org.apache.tez.mapreduce.examples.MRRSleepJob.MRRSleepJobPartitioner;
import org.apache.tez.mapreduce.examples.MRRSleepJob.SleepInputFormat;
import org.apache.tez.mapreduce.examples.MRRSleepJob.SleepMapper;
import org.apache.tez.mapreduce.examples.MRRSleepJob.SleepReducer;
import org.apache.tez.mapreduce.examples.UnionExample;
import org.apache.tez.mapreduce.hadoop.InputSplitInfo;
import org.apache.tez.mapreduce.hadoop.MRHelpers;
import org.apache.tez.mapreduce.hadoop.MRJobConfig;
import org.apache.tez.mapreduce.processor.map.MapProcessor;
import org.apache.tez.mapreduce.processor.reduce.ReduceProcessor;
import org.apache.tez.mapreduce.protos.MRRuntimeProtos.MRInputUserPayloadProto;
import org.apache.tez.runtime.api.Event;
import org.apache.tez.runtime.api.TezRootInputInitializer;
import org.apache.tez.runtime.api.TezRootInputInitializerContext;
import org.apache.tez.runtime.library.input.ShuffledMergedInputLegacy;
import org.apache.tez.runtime.library.output.OnFileSortedOutput;
import org.apache.tez.test.MiniTezCluster;
import org.junit.AfterClass;
import org.junit.Assert;
import org.junit.BeforeClass;
import org.junit.Test;
import com.google.common.base.Preconditions;
public class TestMRRJobsDAGApi {
private static final Log LOG = LogFactory.getLog(TestMRRJobsDAGApi.class);
protected static MiniTezCluster mrrTezCluster;
protected static MiniDFSCluster dfsCluster;
private static Configuration conf = new Configuration();
private static FileSystem remoteFs;
private Random random = new Random();
private static String TEST_ROOT_DIR = "target" + Path.SEPARATOR
+ TestMRRJobsDAGApi.class.getName() + "-tmpDir";
@BeforeClass
public static void setup() throws IOException {
try {
conf.set(MiniDFSCluster.HDFS_MINIDFS_BASEDIR, TEST_ROOT_DIR);
dfsCluster = new MiniDFSCluster.Builder(conf).numDataNodes(2)
.format(true).racks(null).build();
remoteFs = dfsCluster.getFileSystem();
} catch (IOException io) {
throw new RuntimeException("problem starting mini dfs cluster", io);
}
if (mrrTezCluster == null) {
mrrTezCluster = new MiniTezCluster(TestMRRJobsDAGApi.class.getName(),
1, 1, 1);
Configuration conf = new Configuration();
conf.set("fs.defaultFS", remoteFs.getUri().toString()); // use HDFS
conf.setInt("yarn.nodemanager.delete.debug-delay-sec", 20000);
mrrTezCluster.init(conf);
mrrTezCluster.start();
}
}
@AfterClass
public static void tearDown() {
if (mrrTezCluster != null) {
mrrTezCluster.stop();
mrrTezCluster = null;
}
if (dfsCluster != null) {
dfsCluster.shutdown();
dfsCluster = null;
}
// TODO Add cleanup code.
}
// Submits a simple 5 stage sleep job using the DAG submit API instead of job
// client.
@Test(timeout = 60000)
public void testMRRSleepJobDagSubmit() throws IOException,
InterruptedException, TezException, ClassNotFoundException, YarnException {
State finalState = testMRRSleepJobDagSubmitCore(false, false, false, false);
Assert.assertEquals(DAGStatus.State.SUCCEEDED, finalState);
// TODO Add additional checks for tracking URL etc. - once it's exposed by
// the DAG API.
}
// Submits a simple 5 stage sleep job using the DAG submit API. Then kills it.
@Test(timeout = 60000)
public void testMRRSleepJobDagSubmitAndKill() throws IOException,
InterruptedException, TezException, ClassNotFoundException, YarnException {
State finalState = testMRRSleepJobDagSubmitCore(false, true, false, false);
Assert.assertEquals(DAGStatus.State.KILLED, finalState);
// TODO Add additional checks for tracking URL etc. - once it's exposed by
// the DAG API.
}
// Submits a DAG to AM via RPC after AM has started
@Test(timeout = 60000)
public void testMRRSleepJobViaSession() throws IOException,
InterruptedException, TezException, ClassNotFoundException, YarnException {
State finalState = testMRRSleepJobDagSubmitCore(true, false, false, false);
Assert.assertEquals(DAGStatus.State.SUCCEEDED, finalState);
}
// Submit 2 jobs via RPC using a custom initializer. The second job is submitted with an
// additional local resource, which is verified by the initializer.
@Test(timeout = 120000)
public void testAMRelocalization() throws Exception {
Path relocPath = new Path("/tmp/relocalizationfilefound");
if (remoteFs.exists(relocPath)) {
remoteFs.delete(relocPath, true);
}
TezClient tezSession = createTezSession();
State finalState = testMRRSleepJobDagSubmitCore(true, false, false,
tezSession, true, MRInputAMSplitGeneratorRelocalizationTest.class, null);
Assert.assertEquals(DAGStatus.State.SUCCEEDED, finalState);
Assert.assertFalse(remoteFs.exists(new Path("/tmp/relocalizationfilefound")));
// Start the second job with some additional resources.
// Create a test jar directly to HDFS
LOG.info("Creating jar for relocalization test");
Path relocFilePath = new Path("/tmp/test.jar");
relocFilePath = remoteFs.makeQualified(relocFilePath);
OutputStream os = remoteFs.create(relocFilePath, true);
createTestJar(os, RELOCALIZATION_TEST_CLASS_NAME);
// Also upload one of Tez's own JARs to HDFS and add as resource; should be ignored
Path tezAppJar = new Path(MiniTezCluster.APPJAR);
Path tezAppJarRemote = remoteFs.makeQualified(new Path("/tmp/" + tezAppJar.getName()));
remoteFs.copyFromLocalFile(tezAppJar, tezAppJarRemote);
Map<String, LocalResource> additionalResources = new HashMap<String, LocalResource>();
additionalResources.put("test.jar", createLrObjFromPath(relocFilePath));
additionalResources.put("TezAppJar.jar", createLrObjFromPath(tezAppJarRemote));
Assert.assertEquals(TezAppMasterStatus.READY,
tezSession.getAppMasterStatus());
finalState = testMRRSleepJobDagSubmitCore(true, false, false,
tezSession, true, MRInputAMSplitGeneratorRelocalizationTest.class, additionalResources);
Assert.assertEquals(DAGStatus.State.SUCCEEDED, finalState);
Assert.assertEquals(TezAppMasterStatus.READY,
tezSession.getAppMasterStatus());
Assert.assertTrue(remoteFs.exists(new Path("/tmp/relocalizationfilefound")));
stopAndVerifyYarnApp(tezSession);
}
private void stopAndVerifyYarnApp(TezClient tezSession) throws TezException,
IOException, YarnException {
ApplicationId appId = tezSession.getAppMasterApplicationId();
tezSession.stop();
Assert.assertEquals(TezAppMasterStatus.SHUTDOWN,
tezSession.getAppMasterStatus());
YarnClient yarnClient = YarnClient.createYarnClient();
yarnClient.init(mrrTezCluster.getConfig());
yarnClient.start();
while (true) {
ApplicationReport appReport = yarnClient.getApplicationReport(appId);
if (appReport.getYarnApplicationState().equals(
YarnApplicationState.FINISHED)
|| appReport.getYarnApplicationState().equals(
YarnApplicationState.FAILED)
|| appReport.getYarnApplicationState().equals(
YarnApplicationState.KILLED)) {
break;
}
}
ApplicationReport appReport = yarnClient.getApplicationReport(appId);
Assert.assertEquals(YarnApplicationState.FINISHED,
appReport.getYarnApplicationState());
Assert.assertEquals(FinalApplicationStatus.SUCCEEDED,
appReport.getFinalApplicationStatus());
}
@Test(timeout = 120000)
public void testAMRelocalizationConflict() throws Exception {
Path relocPath = new Path("/tmp/relocalizationfilefound");
if (remoteFs.exists(relocPath)) {
remoteFs.delete(relocPath, true);
}
// Run a DAG w/o a file.
TezClient tezSession = createTezSession();
State finalState = testMRRSleepJobDagSubmitCore(true, false, false,
tezSession, true, MRInputAMSplitGeneratorRelocalizationTest.class, null);
Assert.assertEquals(DAGStatus.State.SUCCEEDED, finalState);
Assert.assertFalse(remoteFs.exists(relocPath));
// Create a bogus TezAppJar directly to HDFS
LOG.info("Creating jar for relocalization test");
Path tezAppJar = new Path(MiniTezCluster.APPJAR);
Path tezAppJarRemote = remoteFs.makeQualified(new Path("/tmp/" + tezAppJar.getName()));
OutputStream os = remoteFs.create(tezAppJarRemote, true);
createTestJar(os, RELOCALIZATION_TEST_CLASS_NAME);
Map<String, LocalResource> additionalResources = new HashMap<String, LocalResource>();
additionalResources.put("TezAppJar.jar", createLrObjFromPath(tezAppJarRemote));
try {
testMRRSleepJobDagSubmitCore(true, false, false,
tezSession, true, MRInputAMSplitGeneratorRelocalizationTest.class, additionalResources);
Assert.fail("should have failed");
} catch (Exception ex) {
// expected
}
stopAndVerifyYarnApp(tezSession);
}
private LocalResource createLrObjFromPath(Path filePath) {
return LocalResource.newInstance(ConverterUtils.getYarnUrlFromPath(filePath),
LocalResourceType.FILE, LocalResourceVisibility.PRIVATE, 0, 0);
}
private TezClient createTezSession() throws IOException, TezException {
Path remoteStagingDir = remoteFs.makeQualified(new Path("/tmp", String
.valueOf(new Random().nextInt(100000))));
remoteFs.mkdirs(remoteStagingDir);
TezConfiguration tezConf = new TezConfiguration(mrrTezCluster.getConfig());
tezConf.set(TezConfiguration.TEZ_AM_STAGING_DIR, remoteStagingDir.toString());
TezClient tezSession = new TezClient("testrelocalizationsession", tezConf, true);
tezSession.start();
Assert.assertEquals(TezAppMasterStatus.INITIALIZING, tezSession.getAppMasterStatus());
return tezSession;
}
// Submits a DAG to AM via RPC after AM has started
@Test(timeout = 120000)
public void testMultipleMRRSleepJobViaSession() throws IOException,
InterruptedException, TezException, ClassNotFoundException, YarnException {
Path remoteStagingDir = remoteFs.makeQualified(new Path("/tmp", String
.valueOf(new Random().nextInt(100000))));
remoteFs.mkdirs(remoteStagingDir);
TezConfiguration tezConf = new TezConfiguration(
mrrTezCluster.getConfig());
tezConf.set(TezConfiguration.TEZ_AM_STAGING_DIR,
remoteStagingDir.toString());
TezClient tezSession = new TezClient("testsession", tezConf, true);
tezSession.start();
Assert.assertEquals(TezAppMasterStatus.INITIALIZING,
tezSession.getAppMasterStatus());
State finalState = testMRRSleepJobDagSubmitCore(true, false, false,
tezSession, false, null, null);
Assert.assertEquals(DAGStatus.State.SUCCEEDED, finalState);
Assert.assertEquals(TezAppMasterStatus.READY,
tezSession.getAppMasterStatus());
finalState = testMRRSleepJobDagSubmitCore(true, false, false,
tezSession, false, null, null);
Assert.assertEquals(DAGStatus.State.SUCCEEDED, finalState);
Assert.assertEquals(TezAppMasterStatus.READY,
tezSession.getAppMasterStatus());
stopAndVerifyYarnApp(tezSession);
}
// Submits a simple 5 stage sleep job using tez session. Then kills it.
@Test(timeout = 60000)
public void testMRRSleepJobDagSubmitAndKillViaRPC() throws IOException,
InterruptedException, TezException, ClassNotFoundException, YarnException {
State finalState = testMRRSleepJobDagSubmitCore(true, true, false, false);
Assert.assertEquals(DAGStatus.State.KILLED, finalState);
// TODO Add additional checks for tracking URL etc. - once it's exposed by
// the DAG API.
}
// Create and close a tez session without submitting a job
@Test(timeout = 60000)
public void testTezSessionShutdown() throws IOException,
InterruptedException, TezException, ClassNotFoundException, YarnException {
testMRRSleepJobDagSubmitCore(true, false, true, false);
}
@Test(timeout = 60000)
public void testAMSplitGeneration() throws IOException, InterruptedException,
TezException, ClassNotFoundException, YarnException {
testMRRSleepJobDagSubmitCore(true, false, false, true);
}
public State testMRRSleepJobDagSubmitCore(
boolean dagViaRPC,
boolean killDagWhileRunning,
boolean closeSessionBeforeSubmit,
boolean genSplitsInAM) throws IOException,
InterruptedException, TezException, ClassNotFoundException,
YarnException {
return testMRRSleepJobDagSubmitCore(dagViaRPC, killDagWhileRunning,
closeSessionBeforeSubmit, null, genSplitsInAM, null, null);
}
public State testMRRSleepJobDagSubmitCore(
boolean dagViaRPC,
boolean killDagWhileRunning,
boolean closeSessionBeforeSubmit,
TezClient reUseTezSession,
boolean genSplitsInAM,
Class<? extends TezRootInputInitializer> initializerClass,
Map<String, LocalResource> additionalLocalResources) throws IOException,
InterruptedException, TezException, ClassNotFoundException,
YarnException {
LOG.info("\n\n\nStarting testMRRSleepJobDagSubmit().");
JobConf stage1Conf = new JobConf(mrrTezCluster.getConfig());
JobConf stage2Conf = new JobConf(mrrTezCluster.getConfig());
JobConf stage3Conf = new JobConf(mrrTezCluster.getConfig());
stage1Conf.setLong(MRRSleepJob.MAP_SLEEP_TIME, 1);
stage1Conf.setInt(MRRSleepJob.MAP_SLEEP_COUNT, 1);
stage1Conf.setInt(MRJobConfig.NUM_MAPS, 1);
stage1Conf.set(MRJobConfig.MAP_CLASS_ATTR, SleepMapper.class.getName());
stage1Conf.set(MRJobConfig.MAP_OUTPUT_KEY_CLASS,
IntWritable.class.getName());
stage1Conf.set(MRJobConfig.MAP_OUTPUT_VALUE_CLASS,
IntWritable.class.getName());
stage1Conf.set(MRJobConfig.INPUT_FORMAT_CLASS_ATTR,
SleepInputFormat.class.getName());
stage1Conf.set(MRJobConfig.PARTITIONER_CLASS_ATTR,
MRRSleepJobPartitioner.class.getName());
stage2Conf.setLong(MRRSleepJob.REDUCE_SLEEP_TIME, 1);
stage2Conf.setInt(MRRSleepJob.REDUCE_SLEEP_COUNT, 1);
stage2Conf.setInt(MRJobConfig.NUM_REDUCES, 1);
stage2Conf
.set(MRJobConfig.REDUCE_CLASS_ATTR, ISleepReducer.class.getName());
stage2Conf.set(MRJobConfig.MAP_OUTPUT_KEY_CLASS,
IntWritable.class.getName());
stage2Conf.set(MRJobConfig.MAP_OUTPUT_VALUE_CLASS,
IntWritable.class.getName());
stage2Conf.set(MRJobConfig.PARTITIONER_CLASS_ATTR,
MRRSleepJobPartitioner.class.getName());
stage3Conf.setLong(MRRSleepJob.REDUCE_SLEEP_TIME, 1);
stage3Conf.setInt(MRRSleepJob.REDUCE_SLEEP_COUNT, 1);
stage3Conf.setInt(MRJobConfig.NUM_REDUCES, 1);
stage3Conf.set(MRJobConfig.REDUCE_CLASS_ATTR, SleepReducer.class.getName());
stage3Conf.set(MRJobConfig.MAP_OUTPUT_KEY_CLASS,
IntWritable.class.getName());
stage3Conf.set(MRJobConfig.MAP_OUTPUT_VALUE_CLASS,
IntWritable.class.getName());
stage3Conf.set(MRJobConfig.OUTPUT_FORMAT_CLASS_ATTR,
NullOutputFormat.class.getName());
MRHelpers.translateVertexConfToTez(stage1Conf);
MRHelpers.translateVertexConfToTez(stage2Conf);
MRHelpers.translateVertexConfToTez(stage3Conf);
MRHelpers.doJobClientMagic(stage1Conf);
MRHelpers.doJobClientMagic(stage2Conf);
MRHelpers.doJobClientMagic(stage3Conf);
Path remoteStagingDir = remoteFs.makeQualified(new Path("/tmp", String
.valueOf(new Random().nextInt(100000))));
TezClientUtils.ensureStagingDirExists(conf, remoteStagingDir);
InputSplitInfo inputSplitInfo = null;
if (!genSplitsInAM) {
inputSplitInfo = MRHelpers.generateInputSplits(stage1Conf,
remoteStagingDir);
}
byte[] stage1Payload = MRHelpers.createUserPayloadFromConf(stage1Conf);
byte[] stage2Payload = MRHelpers.createUserPayloadFromConf(stage2Conf);
byte[] stage1InputPayload = MRHelpers.createMRInputPayload(stage1Payload, null);
byte[] stage3Payload = MRHelpers.createUserPayloadFromConf(stage3Conf);
DAG dag = new DAG("testMRRSleepJobDagSubmit-" + random.nextInt(1000));
int stage1NumTasks = genSplitsInAM ? -1 : inputSplitInfo.getNumTasks();
Class<? extends TezRootInputInitializer> inputInitializerClazz =
genSplitsInAM ? (initializerClass == null ? MRInputAMSplitGenerator.class : initializerClass)
: null;
LOG.info("Using initializer class: " + initializerClass);
Vertex stage1Vertex = new Vertex("map", new ProcessorDescriptor(
MapProcessor.class.getName()).setUserPayload(stage1Payload),
stage1NumTasks, Resource.newInstance(256, 1));
MRHelpers.addMRInput(stage1Vertex, stage1InputPayload, inputInitializerClazz);
Vertex stage2Vertex = new Vertex("ireduce", new ProcessorDescriptor(
ReduceProcessor.class.getName()).setUserPayload(stage2Payload),
1, Resource.newInstance(256, 1));
Vertex stage3Vertex = new Vertex("reduce", new ProcessorDescriptor(
ReduceProcessor.class.getName()).setUserPayload(stage3Payload),
1, Resource.newInstance(256, 1));
MRHelpers.addMROutputLegacy(stage3Vertex, stage3Payload);
if (!genSplitsInAM) {
// TODO Use utility method post TEZ-205.
Map<String, LocalResource> stage1LocalResources = new HashMap<String, LocalResource>();
stage1LocalResources.put(
inputSplitInfo.getSplitsFile().getName(),
createLocalResource(remoteFs, inputSplitInfo.getSplitsFile(),
LocalResourceType.FILE, LocalResourceVisibility.APPLICATION));
stage1LocalResources.put(
inputSplitInfo.getSplitsMetaInfoFile().getName(),
createLocalResource(remoteFs, inputSplitInfo.getSplitsMetaInfoFile(),
LocalResourceType.FILE, LocalResourceVisibility.APPLICATION));
stage1Vertex.setTaskLocalFiles(stage1LocalResources);
stage1Vertex.setTaskLocationsHint(inputSplitInfo.getTaskLocationHints());
}
// TODO env, resources
dag.addVertex(stage1Vertex);
dag.addVertex(stage2Vertex);
dag.addVertex(stage3Vertex);
Edge edge1 = new Edge(stage1Vertex, stage2Vertex, new EdgeProperty(
DataMovementType.SCATTER_GATHER, DataSourceType.PERSISTED,
SchedulingType.SEQUENTIAL, new OutputDescriptor(
OnFileSortedOutput.class.getName()).setUserPayload(stage2Payload), new InputDescriptor(
ShuffledMergedInputLegacy.class.getName()).setUserPayload(stage2Payload)));
Edge edge2 = new Edge(stage2Vertex, stage3Vertex, new EdgeProperty(
DataMovementType.SCATTER_GATHER, DataSourceType.PERSISTED,
SchedulingType.SEQUENTIAL, new OutputDescriptor(
OnFileSortedOutput.class.getName()).setUserPayload(stage3Payload), new InputDescriptor(
ShuffledMergedInputLegacy.class.getName()).setUserPayload(stage3Payload)));
dag.addEdge(edge1);
dag.addEdge(edge2);
TezConfiguration tezConf = new TezConfiguration(
mrrTezCluster.getConfig());
tezConf.set(TezConfiguration.TEZ_AM_STAGING_DIR,
remoteStagingDir.toString());
DAGClient dagClient = null;
boolean reuseSession = reUseTezSession != null;
TezClient tezSession = null;
if (!dagViaRPC) {
Preconditions.checkArgument(reuseSession == false);
}
if (!reuseSession) {
TezConfiguration tempTezconf = new TezConfiguration(tezConf);
if (!dagViaRPC) {
tempTezconf.setBoolean(TezConfiguration.TEZ_AM_SESSION_MODE, false);
} else {
tempTezconf.setBoolean(TezConfiguration.TEZ_AM_SESSION_MODE, true);
}
tezSession = new TezClient("testsession", tempTezconf);
tezSession.start();
} else {
tezSession = reUseTezSession;
}
if(!dagViaRPC) {
// TODO Use utility method post TEZ-205 to figure out AM arguments etc.
dagClient = tezSession.submitDAG(dag);
}
if (dagViaRPC && closeSessionBeforeSubmit) {
YarnClient yarnClient = YarnClient.createYarnClient();
yarnClient.init(mrrTezCluster.getConfig());
yarnClient.start();
boolean sentKillSession = false;
while(true) {
Thread.sleep(500l);
ApplicationReport appReport =
yarnClient.getApplicationReport(tezSession.getAppMasterApplicationId());
if (appReport == null) {
continue;
}
YarnApplicationState appState = appReport.getYarnApplicationState();
if (!sentKillSession) {
if (appState == YarnApplicationState.RUNNING) {
tezSession.stop();
sentKillSession = true;
}
} else {
if (appState == YarnApplicationState.FINISHED
|| appState == YarnApplicationState.KILLED
|| appState == YarnApplicationState.FAILED) {
LOG.info("Application completed after sending session shutdown"
+ ", yarnApplicationState=" + appState
+ ", finalAppStatus=" + appReport.getFinalApplicationStatus());
Assert.assertEquals(YarnApplicationState.FINISHED,
appState);
Assert.assertEquals(FinalApplicationStatus.SUCCEEDED,
appReport.getFinalApplicationStatus());
break;
}
}
}
yarnClient.stop();
return null;
}
if(dagViaRPC) {
LOG.info("Submitting dag to tez session with appId=" + tezSession.getAppMasterApplicationId()
+ " and Dag Name=" + dag.getName());
if (additionalLocalResources != null) {
tezSession.addAppMasterLocalResources(additionalLocalResources);
}
dagClient = tezSession.submitDAG(dag);
Assert.assertEquals(TezAppMasterStatus.RUNNING,
tezSession.getAppMasterStatus());
}
DAGStatus dagStatus = dagClient.getDAGStatus(null);
while (!dagStatus.isCompleted()) {
LOG.info("Waiting for job to complete. Sleeping for 500ms."
+ " Current state: " + dagStatus.getState());
Thread.sleep(500l);
if(killDagWhileRunning
&& dagStatus.getState() == DAGStatus.State.RUNNING) {
LOG.info("Killing running dag/session");
if (dagViaRPC) {
tezSession.stop();
} else {
dagClient.tryKillDAG();
}
}
dagStatus = dagClient.getDAGStatus(null);
}
if (!reuseSession) {
tezSession.stop();
}
return dagStatus.getState();
}
private static LocalResource createLocalResource(FileSystem fc, Path file,
LocalResourceType type, LocalResourceVisibility visibility)
throws IOException {
FileStatus fstat = fc.getFileStatus(file);
URL resourceURL = ConverterUtils.getYarnUrlFromPath(fc.resolvePath(fstat
.getPath()));
long resourceSize = fstat.getLen();
long resourceModificationTime = fstat.getModificationTime();
return LocalResource.newInstance(resourceURL, type, visibility,
resourceSize, resourceModificationTime);
}
@Test(timeout = 60000)
public void testVertexGroups() throws Exception {
LOG.info("Running Group Test");
Path inPath = new Path(TEST_ROOT_DIR, "in-groups");
Path outPath = new Path(TEST_ROOT_DIR, "out-groups");
FSDataOutputStream out = remoteFs.create(inPath);
OutputStreamWriter writer = new OutputStreamWriter(out);
writer.write("abcd ");
writer.write("efgh ");
writer.write("abcd ");
writer.write("efgh ");
writer.close();
out.close();
UnionExample job = new UnionExample();
if (job.run(inPath.toString(), outPath.toString(), mrrTezCluster.getConfig())) {
LOG.info("Success VertexGroups Test");
} else {
throw new TezUncheckedException("VertexGroups Test Failed");
}
}
@Test(timeout = 60000)
public void testBroadcastAndOneToOne() throws Exception {
LOG.info("Running BroadcastAndOneToOne Test");
BroadcastAndOneToOneExample job = new BroadcastAndOneToOneExample();
if (job.run(mrrTezCluster.getConfig(), true)) {
LOG.info("Success BroadcastAndOneToOne Test");
} else {
throw new TezUncheckedException("BroadcastAndOneToOne Test Failed");
}
}
// This class should not be used by more than one test in a single run, since
// the path it writes to is not dynamic.
private static String RELOCALIZATION_TEST_CLASS_NAME = "AMClassloadTestDummyClass";
public static class MRInputAMSplitGeneratorRelocalizationTest extends MRInputAMSplitGenerator {
public List<Event> initialize(TezRootInputInitializerContext rootInputContext) throws Exception {
MRInputUserPayloadProto userPayloadProto = MRHelpers
.parseMRInputPayload(rootInputContext.getUserPayload());
Configuration conf = MRHelpers.createConfFromByteString(userPayloadProto
.getConfigurationBytes());
try {
ReflectionUtils.getClazz(RELOCALIZATION_TEST_CLASS_NAME);
LOG.info("Class found");
FileSystem fs = FileSystem.get(conf);
fs.mkdirs(new Path("/tmp/relocalizationfilefound"));
} catch (TezUncheckedException e) {
LOG.info("Class not found");
}
return super.initialize(rootInputContext);
}
}
private static void createTestJar(OutputStream outStream, String dummyClassName)
throws URISyntaxException, IOException {
JavaCompiler compiler = ToolProvider.getSystemJavaCompiler();
JavaFileObject srcFileObject = new SimpleJavaFileObjectImpl(
URI.create("string:///" + dummyClassName + Kind.SOURCE.extension), Kind.SOURCE);
StandardJavaFileManager fileManager = compiler.getStandardFileManager(null, null, null);
compiler.getTask(null, fileManager, null, null, null, Collections.singletonList(srcFileObject))
.call();
JavaFileObject javaFileObject = fileManager.getJavaFileForOutput(StandardLocation.CLASS_OUTPUT,
dummyClassName, Kind.CLASS, null);
File classFile = new File(dummyClassName + Kind.CLASS.extension);
JarOutputStream jarOutputStream = new JarOutputStream(outStream);
JarEntry jarEntry = new JarEntry(classFile.getName());
jarEntry.setTime(classFile.lastModified());
jarOutputStream.putNextEntry(jarEntry);
InputStream in = javaFileObject.openInputStream();
byte buffer[] = new byte[4096];
while (true) {
int nRead = in.read(buffer, 0, buffer.length);
if (nRead <= 0)
break;
jarOutputStream.write(buffer, 0, nRead);
}
in.close();
jarOutputStream.close();
javaFileObject.delete();
}
private static class SimpleJavaFileObjectImpl extends SimpleJavaFileObject {
static final String code = "public class AMClassloadTestDummyClass {}";
SimpleJavaFileObjectImpl(URI uri, Kind kind) {
super(uri, kind);
}
@Override
public CharSequence getCharContent(boolean ignoreEncodingErrors) {
return code;
}
}
}