blob: 75b94d1102c30d6df7f32777d720bd78344351d1 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.oozie.action.hadoop;
import java.io.File;
import java.io.FileInputStream;
import java.io.FileNotFoundException;
import java.io.FileOutputStream;
import java.io.FileWriter;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.io.OutputStreamWriter;
import java.io.StringReader;
import java.io.Writer;
import java.net.URI;
import java.nio.charset.StandardCharsets;
import java.util.Arrays;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.Scanner;
import java.util.jar.JarOutputStream;
import java.util.regex.Pattern;
import java.util.zip.ZipEntry;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.filecache.DistributedCache;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.PathFilter;
import org.apache.hadoop.io.SequenceFile;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapred.JobClient;
import org.apache.hadoop.mapred.JobID;
import org.apache.hadoop.mapred.RunningJob;
import org.apache.hadoop.mapreduce.TypeConverter;
import org.apache.hadoop.streaming.StreamJob;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.ApplicationReport;
import org.apache.hadoop.yarn.client.api.YarnClient;
import org.apache.hadoop.yarn.exceptions.YarnException;
import org.apache.hadoop.yarn.util.ConverterUtils;
import org.apache.oozie.WorkflowActionBean;
import org.apache.oozie.WorkflowJobBean;
import org.apache.oozie.action.ActionExecutor;
import org.apache.oozie.action.ActionExecutorException;
import org.apache.oozie.client.OozieClient;
import org.apache.oozie.client.WorkflowAction;
import org.apache.oozie.client.WorkflowAction.Status;
import org.apache.oozie.command.wf.StartXCommand;
import org.apache.oozie.command.wf.SubmitXCommand;
import org.apache.oozie.executor.jpa.WorkflowActionQueryExecutor;
import org.apache.oozie.executor.jpa.WorkflowActionQueryExecutor.WorkflowActionQuery;
import org.apache.oozie.local.LocalOozie;
import org.apache.oozie.service.HadoopAccessorException;
import org.apache.oozie.service.HadoopAccessorService;
import org.apache.oozie.service.Services;
import org.apache.oozie.service.WorkflowAppService;
import org.apache.oozie.util.ClassUtils;
import org.apache.oozie.util.IOUtils;
import org.apache.oozie.util.PropertiesUtils;
import org.apache.oozie.util.XConfiguration;
import org.apache.oozie.util.XLog;
import org.apache.oozie.util.XmlUtils;
import org.jdom.Element;
public class TestMapReduceActionExecutor extends ActionExecutorTestCase {
private static final XLog LOG = XLog.getLog(TestMapReduceActionExecutor.class);
private static final String PIPES = "pipes";
private static final String MAP_REDUCE = "map-reduce";
@Override
protected void setSystemProps() throws Exception {
super.setSystemProps();
setSystemProperty("oozie.service.ActionService.executor.classes",
String.join(",", MapReduceActionExecutor.class.getName(), JavaActionExecutor.class.getName()));
setSystemProperty("oozie.credentials.credentialclasses", "cred=org.apache.oozie.action.hadoop.CredentialForTest");
}
public Element createUberJarActionXML(String uberJarPath, String additional) throws Exception{
return XmlUtils.parseXml("<map-reduce>" + "<job-tracker>" + getJobTrackerUri() + "</job-tracker>"
+ "<name-node>" + getNameNodeUri() + "</name-node>" + additional + "<configuration>"
+ "<property><name>oozie.mapreduce.uber.jar</name><value>" + uberJarPath + "</value></property>"
+ "</configuration>" + "</map-reduce>");
}
public void testConfigDefaultPropsToAction() throws Exception {
String actionXml = "<map-reduce>"
+ " <prepare>"
+ " <delete path=\"${nameNode}/user/${wf:user()}/mr/${outputDir}\"/>"
+ " </prepare>"
+ " <configuration>"
+ " <property><name>bb</name><value>BB</value></property>"
+ " <property><name>cc</name><value>from_action</value></property>"
+ " </configuration>"
+ " </map-reduce>";
String wfXml = "<workflow-app xmlns=\"uri:oozie:workflow:0.5\" name=\"map-reduce-wf\">"
+ "<global>"
+ "<job-tracker>${jobTracker}</job-tracker>"
+ "<name-node>${nameNode}</name-node>"
+ "<configuration><property><name>aa</name><value>AA</value></property></configuration>"
+ "</global>"
+ " <start to=\"mr-node\"/>"
+ " <action name=\"mr-node\">"
+ actionXml
+ " <ok to=\"end\"/>"
+ " <error to=\"fail\"/>"
+ "</action>"
+ "<kill name=\"fail\">"
+ " <message>Map/Reduce failed, error message[${wf:errorMessage(wf:lastErrorNode())}]</message>"
+ "</kill>"
+ "<end name=\"end\"/>"
+ "</workflow-app>";
Writer writer = new OutputStreamWriter(new FileOutputStream(getTestCaseDir() + "/workflow.xml"),
StandardCharsets.UTF_8);
IOUtils.copyCharStream(new StringReader(wfXml), writer);
Configuration conf = new XConfiguration();
conf.set("nameNode", getNameNodeUri());
conf.set("jobTracker", getJobTrackerUri());
conf.set(OozieClient.USER_NAME, getTestUser());
conf.set(OozieClient.APP_PATH, new File(getTestCaseDir(), "workflow.xml").toURI().toString());
conf.set(OozieClient.LOG_TOKEN, "t");
OutputStream os = new FileOutputStream(getTestCaseDir() + "/config-default.xml");
XConfiguration defaultConf = new XConfiguration();
defaultConf.set("outputDir", "default-output-dir");
defaultConf.set("mapred.mapper.class", "MM");
defaultConf.set("mapred.reducer.class", "RR");
defaultConf.set("cc", "from_default");
defaultConf.writeXml(os);
os.close();
String wfId = new SubmitXCommand(conf).call();
new StartXCommand(wfId).call();
waitForWorkflowAction(wfId + "@mr-node");
WorkflowActionBean mrAction = WorkflowActionQueryExecutor.getInstance().get(WorkflowActionQuery.GET_ACTION,
wfId + "@mr-node");
// check NN and JT settings
Element eAction = XmlUtils.parseXml(mrAction.getConf());
Element eConf = eAction.getChild("name-node", eAction.getNamespace());
assertEquals(getNameNodeUri(), eConf.getText());
eConf = eAction.getChild("job-tracker", eAction.getNamespace());
assertEquals(getJobTrackerUri(), eConf.getText());
// check other m-r settings
eConf = eAction.getChild("configuration", eAction.getNamespace());
Configuration actionConf = new XConfiguration(new StringReader(XmlUtils.prettyPrint(eConf).toString()));
assertEquals("default-output-dir", actionConf.get("outputDir"));
assertEquals("MM", actionConf.get("mapred.mapper.class"));
assertEquals("RR", actionConf.get("mapred.reducer.class"));
// check that default did not overwrite same property explicit in action conf
assertEquals("from_action", actionConf.get("cc"));
// check that original conf and from global was not deleted
assertEquals("AA", actionConf.get("aa"));
assertEquals("BB", actionConf.get("bb"));
//test no infinite recursion by param referring to itself e.g. path = ${path}/sub-path
actionXml = "<map-reduce>"
+ " <prepare>"
+ " <delete path=\"${nameNode}/user/${wf:user()}/mr/${outputDir}\"/>"
+ " </prepare>"
+ " <configuration>"
+ " <property><name>cc</name><value>${cc}/action_cc</value></property>"
+ " </configuration>"
+ " </map-reduce>";
wfXml = "<workflow-app xmlns=\"uri:oozie:workflow:0.5\" name=\"map-reduce-wf\">"
+ "<global>"
+ "<job-tracker>${jobTracker}</job-tracker>"
+ "<name-node>${nameNode}</name-node>"
+ "<configuration><property><name>outputDir</name><value>global-output-dir</value></property></configuration>"
+ "</global>"
+ " <start to=\"mr-node\"/>"
+ " <action name=\"mr-node\">"
+ actionXml
+ " <ok to=\"end\"/>"
+ " <error to=\"fail\"/>"
+ "</action>"
+ "<kill name=\"fail\">"
+ " <message>Map/Reduce failed, error message[${wf:errorMessage(wf:lastErrorNode())}]</message>"
+ "</kill>"
+ "<end name=\"end\"/>"
+ "</workflow-app>";
writer = new OutputStreamWriter(new FileOutputStream(getTestCaseDir() + "/workflow.xml"),
StandardCharsets.UTF_8);
IOUtils.copyCharStream(new StringReader(wfXml), writer);
wfId = new SubmitXCommand(conf).call();
new StartXCommand(wfId).call();
waitForWorkflowAction(wfId + "@mr-node");
mrAction = WorkflowActionQueryExecutor.getInstance().get(WorkflowActionQuery.GET_ACTION,
wfId + "@mr-node");
// check param
eAction = XmlUtils.parseXml(mrAction.getConf());
eConf = eAction.getChild("configuration", eAction.getNamespace());
actionConf = new XConfiguration(new StringReader(XmlUtils.prettyPrint(eConf).toString()));
// action param referring to same param name given in defaults cc = ${cc}/action_cc
assertEquals("from_default/action_cc", actionConf.get("cc"));
// check global is retained and has precedence over config-default
eConf = eAction.getChild("name-node", eAction.getNamespace());
assertEquals(getNameNodeUri(), eConf.getText());
assertEquals("global-output-dir", actionConf.get("outputDir"));
}
public void testGlobalOverrideJobXml() throws Exception {
FileSystem fs = getFileSystem();
String jobXml1 = createJobXml("aa", "from_jobXml1", "bb", "from_jobXml1", "jobXml1", fs);
String jobXml2 = createJobXml("bb", "from_jobXml2", "cc", "from_jobXml2", "jobXml2", fs);
String jobXml3 = createJobXml("cc", "from_jobXml3", "dd", "from_jobXml3", "jobXml3", fs);
String jobXml4 = createJobXml("dd", "from_jobXml4", "ee", "from_jobXml4", "jobXml4", fs);
String actionXml = "<map-reduce>"
+ " <prepare>"
+ " <delete path=\"${nameNode}/user/${wf:user()}/mr/${outputDir}\"/>"
+ " </prepare>"
+ " <job-xml>" + jobXml3 + "</job-xml>"
+ " <job-xml>" + jobXml4 + "</job-xml>"
+ " <configuration>"
+ " <property><name>ee</name><value>from_action_config</value></property>"
+ " </configuration>"
+ " </map-reduce>";
String wfXml = "<workflow-app xmlns=\"uri:oozie:workflow:0.5\" name=\"map-reduce-wf\">"
+ "<global>"
+ "<job-tracker>${jobTracker}</job-tracker>"
+ "<name-node>${nameNode}</name-node>"
+ "<job-xml>" + jobXml1 + "</job-xml>"
+ "<job-xml>" + jobXml2 + "</job-xml>"
+ "<configuration>"
+ "<property><name>aa</name><value>from_global_config</value></property>"
+ "<property><name>ee</name><value>from_global_config</value></property>"
+ "</configuration>"
+ "</global>"
+ " <start to=\"mr-node\"/>"
+ " <action name=\"mr-node\">"
+ actionXml
+ " <ok to=\"end\"/>"
+ " <error to=\"fail\"/>"
+ "</action>"
+ "<kill name=\"fail\">"
+ " <message>Map/Reduce failed, error message[${wf:errorMessage(wf:lastErrorNode())}]</message>"
+ "</kill>"
+ "<end name=\"end\"/>"
+ "</workflow-app>";
Writer writer = new FileWriter(getTestCaseDir() + "/workflow.xml");
IOUtils.copyCharStream(new StringReader(wfXml), writer);
Configuration conf = new XConfiguration();
conf.set("nameNode", getNameNodeUri());
conf.set("jobTracker", getJobTrackerUri());
conf.set(OozieClient.USER_NAME, getTestUser());
conf.set(OozieClient.APP_PATH, "file://" + getTestCaseDir() + File.separator + "workflow.xml");
OutputStream os = new FileOutputStream(getTestCaseDir() + "/config-default.xml");
XConfiguration defaultConf = new XConfiguration();
defaultConf.set("aa", "from_config_default");
defaultConf.set("ff", "from_config_default");
defaultConf.writeXml(os);
os.close();
defaultConf.set(WorkflowAppService.HADOOP_USER, getTestUser());
String wfId = new SubmitXCommand(conf).call();
new StartXCommand(wfId).call();
waitForWorkflowAction(wfId + "@mr-node");
WorkflowActionBean mrAction = WorkflowActionQueryExecutor.getInstance().get(WorkflowActionQuery.GET_ACTION,
wfId + "@mr-node");
JavaActionExecutor ae = new JavaActionExecutor();
WorkflowJobBean wf = createBaseWorkflow(defaultConf, "mr-action", wfXml);
Context context = new Context(wf, mrAction);
Element mrActionXml = XmlUtils.parseXml(mrAction.getConf());
Configuration actionConfig = ae.setupActionConf(conf, context, mrActionXml, getAppPath());
// Check attribute values
assertEquals("from_global_config", actionConfig.get("aa"));
assertEquals("from_jobXml2", actionConfig.get("bb"));
assertEquals("from_jobXml3", actionConfig.get("cc"));
assertEquals("from_jobXml4", actionConfig.get("dd"));
assertEquals("from_action_config", actionConfig.get("ee"));
assertEquals("from_config_default", actionConfig.get("ff"));
}
protected String createJobXml(String key1, String value1, String key2, String value2, String filename, FileSystem fs)
throws Exception {
String content = "<configuration>"
+ "<property><name>" + key1 + "</name><value>" + value1 + "</value></property>"
+ "<property><name>" + key2 + "</name><value>" + value2 + "</value></property>"
+ "</configuration>";
Path path = new Path(getAppPath(), filename);
Writer writer = new OutputStreamWriter(fs.create(path, true));
writer.write(content);
writer.close();
return path.toString();
}
public void testSetupMethods() throws Exception {
MapReduceActionExecutor ae = new MapReduceActionExecutor();
List<Class<?>> classes = Arrays.<Class<?>>asList(StreamingMain.class);
assertEquals(classes, ae.getLauncherClasses());
Element actionXml = XmlUtils.parseXml("<map-reduce>" + "<job-tracker>" + getJobTrackerUri() + "</job-tracker>"
+ "<name-node>" + getNameNodeUri() + "</name-node>" + "<configuration>"
+ "<property><name>mapred.input.dir</name><value>IN</value></property>"
+ "<property><name>mapred.output.dir</name><value>OUT</value></property>" + "</configuration>"
+ "</map-reduce>");
XConfiguration protoConf = new XConfiguration();
protoConf.set(WorkflowAppService.HADOOP_USER, getTestUser());
WorkflowJobBean wf = createBaseWorkflow(protoConf, "mr-action");
WorkflowActionBean action = (WorkflowActionBean) wf.getActions().get(0);
action.setType(ae.getType());
Context context = new Context(wf, action);
Configuration conf = ae.createBaseHadoopConf(context, actionXml);
ae.setupActionConf(conf, context, actionXml, getFsTestCaseDir());
assertEquals("IN", conf.get("mapred.input.dir"));
Configuration launcherJobConf = ae.createLauncherConf(getFileSystem(), context, action, actionXml, conf);
assertEquals(false, launcherJobConf.getBoolean("mapreduce.job.complete.cancel.delegation.tokens", true));
assertEquals(true, conf.getBoolean("mapreduce.job.complete.cancel.delegation.tokens", false));
// Enable uber jars to test that MapReduceActionExecutor picks up the oozie.mapreduce.uber.jar property correctly
Services serv = Services.get();
boolean originalUberJarDisabled = serv.getConf().getBoolean("oozie.action.mapreduce.uber.jar.enable", false);
serv.getConf().setBoolean("oozie.action.mapreduce.uber.jar.enable", true);
actionXml = createUberJarActionXML(getNameNodeUri() + "/app/job.jar", "");
conf = ae.createBaseHadoopConf(context, actionXml);
ae.setupActionConf(conf, context, actionXml, getFsTestCaseDir());
// absolute path with namenode
assertEquals(getNameNodeUri() + "/app/job.jar", conf.get(MapReduceMain.OOZIE_MAPREDUCE_UBER_JAR));
actionXml = createUberJarActionXML("/app/job.jar", "");
conf = ae.createBaseHadoopConf(context, actionXml);
ae.setupActionConf(conf, context, actionXml, getFsTestCaseDir());
// absolute path without namenode
assertEquals(getNameNodeUri() + "/app/job.jar", conf.get(MapReduceMain.OOZIE_MAPREDUCE_UBER_JAR));
actionXml = createUberJarActionXML("job.jar", "");
conf = ae.createBaseHadoopConf(context, actionXml);
ae.setupActionConf(conf, context, actionXml, getFsTestCaseDir());
assertEquals(getFsTestCaseDir() + "/job.jar", conf.get(MapReduceMain.OOZIE_MAPREDUCE_UBER_JAR)); // relative path
actionXml = createUberJarActionXML("job.jar", "<streaming></streaming>");
conf = ae.createBaseHadoopConf(context, actionXml);
ae.setupActionConf(conf, context, actionXml, getFsTestCaseDir());
// ignored for streaming
assertEquals("", conf.get(MapReduceMain.OOZIE_MAPREDUCE_UBER_JAR));
actionXml = createUberJarActionXML("job.jar", "<pipes></pipes>");
conf = ae.createBaseHadoopConf(context, actionXml);
ae.setupActionConf(conf, context, actionXml, getFsTestCaseDir());
assertEquals("", conf.get("oozie.mapreduce.uber.jar")); // ignored for pipes
actionXml = XmlUtils.parseXml("<map-reduce>" + "<job-tracker>" + getJobTrackerUri() + "</job-tracker>"
+ "<name-node>" + getNameNodeUri() + "</name-node>" + "</map-reduce>");
conf = ae.createBaseHadoopConf(context, actionXml);
ae.setupActionConf(conf, context, actionXml, getFsTestCaseDir());
assertNull(conf.get("oozie.mapreduce.uber.jar")); // doesn't resolve if not set
// Disable uber jars to test that MapReduceActionExecutor won't allow the oozie.mapreduce.uber.jar property
serv.getConf().setBoolean("oozie.action.mapreduce.uber.jar.enable", false);
try {
actionXml = createUberJarActionXML(getNameNodeUri() + "/app/job.jar", "");
conf = ae.createBaseHadoopConf(context, actionXml);
ae.setupActionConf(conf, context, actionXml, getFsTestCaseDir());
fail("ActionExecutorException expected because uber jars are disabled");
} catch (ActionExecutorException aee) {
assertEquals("MR003", aee.getErrorCode());
assertEquals(ActionExecutorException.ErrorType.ERROR, aee.getErrorType());
assertTrue(aee.getMessage().contains("oozie.action.mapreduce.uber.jar.enable"));
assertTrue(aee.getMessage().contains("oozie.mapreduce.uber.jar"));
}
serv.getConf().setBoolean("oozie.action.mapreduce.uber.jar.enable", originalUberJarDisabled);
actionXml = XmlUtils.parseXml("<map-reduce>" + "<job-tracker>" + getJobTrackerUri() + "</job-tracker>"
+ "<name-node>" + getNameNodeUri() + "</name-node>" + "<streaming>" + "<mapper>M</mapper>"
+ "<reducer>R</reducer>" + "<record-reader>RR</record-reader>"
+ "<record-reader-mapping>RRM1=1</record-reader-mapping>"
+ "<record-reader-mapping>RRM2=2</record-reader-mapping>" + "<env>e=E</env>" + "<env>ee=EE</env>"
+ "</streaming>" + "<configuration>"
+ "<property><name>mapred.input.dir</name><value>IN</value></property>"
+ "<property><name>mapred.output.dir</name><value>OUT</value></property>" + "</configuration>"
+ "</map-reduce>");
conf = ae.createBaseHadoopConf(context, actionXml);
ae.setupActionConf(conf, context, actionXml, getFsTestCaseDir());
assertEquals("M", conf.get("oozie.streaming.mapper"));
assertEquals("R", conf.get("oozie.streaming.reducer"));
assertEquals("RR", conf.get("oozie.streaming.record-reader"));
assertEquals("2", conf.get("oozie.streaming.record-reader-mapping.size"));
assertEquals("2", conf.get("oozie.streaming.env.size"));
actionXml = XmlUtils.parseXml("<map-reduce>" + "<job-tracker>" + getJobTrackerUri() + "</job-tracker>"
+ "<name-node>" + getNameNodeUri() + "</name-node>" + "<pipes>" + "<map>M</map>" + "<reduce>R</reduce>"
+ "<inputformat>IF</inputformat>" + "<partitioner>P</partitioner>" + "<writer>W</writer>"
+ "<program>PP</program>" + "</pipes>" + "<configuration>"
+ "<property><name>mapred.input.dir</name><value>IN</value></property>"
+ "<property><name>mapred.output.dir</name><value>OUT</value></property>" + "</configuration>"
+ "</map-reduce>");
conf = ae.createBaseHadoopConf(context, actionXml);
ae.setupActionConf(conf, context, actionXml, getFsTestCaseDir());
assertEquals("M", conf.get("oozie.pipes.map"));
assertEquals("R", conf.get("oozie.pipes.reduce"));
assertEquals("IF", conf.get("oozie.pipes.inputformat"));
assertEquals("P", conf.get("oozie.pipes.partitioner"));
assertEquals("W", conf.get("oozie.pipes.writer"));
assertEquals(getFsTestCaseDir()+"/PP", conf.get("oozie.pipes.program"));
}
protected Context createContext(String name, String actionXml) throws Exception {
JavaActionExecutor ae = new JavaActionExecutor();
Path appJarPath = new Path("lib/test.jar");
File jarFile = IOUtils.createJar(new File(getTestCaseDir()), "test.jar", MapperReducerForTest.class);
InputStream is = new FileInputStream(jarFile);
OutputStream os = getFileSystem().create(new Path(getAppPath(), "lib/test.jar"));
IOUtils.copyStream(is, os);
XConfiguration protoConf = new XConfiguration();
protoConf.set(WorkflowAppService.HADOOP_USER, getTestUser());
protoConf.setStrings(WorkflowAppService.APP_LIB_PATH_LIST, appJarPath.toString());
WorkflowJobBean wf = createBaseWorkflow(protoConf, "mr-action");
WorkflowActionBean action = (WorkflowActionBean) wf.getActions().get(0);
action.setName(name);
action.setType(ae.getType());
action.setConf(actionXml);
return new Context(wf, action);
}
protected Context createContextWithCredentials(String name, String actionXml) throws Exception {
JavaActionExecutor ae = new JavaActionExecutor();
Path appJarPath = new Path("lib/test.jar");
File jarFile = IOUtils.createJar(new File(getTestCaseDir()), "test.jar", MapperReducerForTest.class);
InputStream is = new FileInputStream(jarFile);
OutputStream os = getFileSystem().create(new Path(getAppPath(), "lib/test.jar"));
IOUtils.copyStream(is, os);
XConfiguration protoConf = new XConfiguration();
protoConf.set(WorkflowAppService.HADOOP_USER, getTestUser());
protoConf.setStrings(WorkflowAppService.APP_LIB_PATH_LIST, appJarPath.toString());
WorkflowJobBean wf = createBaseWorkflowWithCredentials(protoConf, "mr-action");
WorkflowActionBean action = (WorkflowActionBean) wf.getActions().get(0);
action.setName(name);
action.setType(ae.getType());
action.setConf(actionXml);
action.setCred("testcred");
return new Context(wf, action);
}
protected String submitAction(Context context) throws Exception {
MapReduceActionExecutor ae = new MapReduceActionExecutor();
WorkflowAction action = context.getAction();
ae.prepareActionDir(getFileSystem(), context);
ae.submitLauncher(getFileSystem(), context, action);
return context.getAction().getExternalId();
}
private String _testSubmit(String name, String actionXml) throws Exception {
Context context = createContext(name, actionXml);
final String launcherId = submitAction(context);
waitUntilYarnAppDoneAndAssertSuccess(launcherId);
Map<String, String> actionData = LauncherHelper.getActionData(getFileSystem(), context.getActionDir(),
context.getProtoActionConf());
assertTrue(LauncherHelper.hasIdSwap(actionData));
MapReduceActionExecutor ae = new MapReduceActionExecutor();
ae.check(context, context.getAction());
assertTrue(launcherId.equals(context.getAction().getExternalId()));
String externalChildIDs = context.getAction().getExternalChildIDs();
waitUntilYarnAppDoneAndAssertSuccess(externalChildIDs);
ae.check(context, context.getAction());
assertEquals(JavaActionExecutor.SUCCEEDED, context.getAction().getExternalStatus());
assertNull(context.getAction().getData());
ae.end(context, context.getAction());
assertEquals(WorkflowAction.Status.OK, context.getAction().getStatus());
//hadoop.counters will always be set in case of MR action.
assertNotNull(context.getVar("hadoop.counters"));
String counters = context.getVar("hadoop.counters");
assertTrue(counters.contains("Counter"));
assertTrue(counters.contains("\"MAP_OUTPUT_RECORDS\":2"));
//External Child IDs used to be null, but after 4.0, become Non-Null in case of MR action.
assertNotNull(context.getExternalChildIDs());
return externalChildIDs;
}
private void _testSubmitError(String actionXml, String errorMessage) throws Exception {
Context context = createContext(MAP_REDUCE, actionXml);
final String launcherId = submitAction(context);
waitUntilYarnAppDoneAndAssertSuccess(launcherId);
MapReduceActionExecutor ae = new MapReduceActionExecutor();
ae.check(context, context.getAction());
assertEquals(JavaActionExecutor.FAILED_KILLED, context.getAction().getExternalStatus());
ae.end(context, context.getAction());
assertEquals(WorkflowAction.Status.ERROR, context.getAction().getStatus());
assertTrue(context.getAction().getErrorMessage().contains("already exists"));
}
private void _testSubmitWithCredentials(String name, String actionXml) throws Exception {
Context context = createContextWithCredentials(MAP_REDUCE, actionXml);
final String launcherId = submitAction(context);
waitUntilYarnAppDoneAndAssertSuccess(launcherId);
Map<String, String> actionData = LauncherHelper.getActionData(getFileSystem(), context.getActionDir(),
context.getProtoActionConf());
assertTrue(LauncherHelper.hasIdSwap(actionData));
MapReduceActionExecutor ae = new MapReduceActionExecutor();
ae.check(context, context.getAction());
assertTrue(launcherId.equals(context.getAction().getExternalId()));
String externalChildIDs = context.getAction().getExternalChildIDs();
waitUntilYarnAppDoneAndAssertSuccess(externalChildIDs);
ae.check(context, context.getAction());
assertEquals(JavaActionExecutor.SUCCEEDED, context.getAction().getExternalStatus());
assertNull(context.getAction().getData());
ae.end(context, context.getAction());
assertEquals(WorkflowAction.Status.OK, context.getAction().getStatus());
Configuration conf = ae.createBaseHadoopConf(context, XmlUtils.parseXml(actionXml));
String user = conf.get("user.name");
JobClient jobClient = getHadoopAccessorService().createJobClient(user, conf);
org.apache.hadoop.mapreduce.JobID jobID = TypeConverter.fromYarn(
ConverterUtils.toApplicationId(externalChildIDs));
final RunningJob mrJob = jobClient.getJob(JobID.downgrade(jobID));
assertTrue(MapperReducerCredentialsForTest.hasCredentials(mrJob));
}
protected XConfiguration getSleepMapReduceConfig(String inputDir, String outputDir) {
XConfiguration conf = getMapReduceConfig(inputDir, outputDir);
conf.set("mapred.mapper.class", BlockingMapper.class.getName());
return conf;
}
protected XConfiguration getMapReduceConfig(String inputDir, String outputDir) {
XConfiguration conf = new XConfiguration();
conf.set("mapred.mapper.class", MapperReducerForTest.class.getName());
conf.set("mapred.reducer.class", MapperReducerForTest.class.getName());
conf.set("mapred.input.dir", inputDir);
conf.set("mapred.output.dir", outputDir);
return conf;
}
protected XConfiguration getMapReduceCredentialsConfig(String inputDir, String outputDir) {
XConfiguration conf = new XConfiguration();
conf.set("mapred.mapper.class", MapperReducerCredentialsForTest.class.getName());
conf.set("mapred.reducer.class", MapperReducerForTest.class.getName());
conf.set("mapred.input.dir", inputDir);
conf.set("mapred.output.dir", outputDir);
return conf;
}
protected XConfiguration getMapReduceUberJarConfig(String inputDir, String outputDir) throws Exception{
XConfiguration conf = new XConfiguration();
conf.set("mapred.mapper.class", MapperReducerUberJarForTest.class.getName());
conf.set("mapred.reducer.class", MapperReducerUberJarForTest.class.getName());
conf.set("mapred.input.dir", inputDir);
conf.set("mapred.output.dir", outputDir);
conf.set("oozie.mapreduce.uber.jar", createAndUploadUberJar().toUri().toString());
return conf;
}
public void testMapReduce() throws Exception {
FileSystem fs = getFileSystem();
Path inputDir = new Path(getFsTestCaseDir(), "input");
Path outputDir = new Path(getFsTestCaseDir(), "output");
writeDummyInput(fs, inputDir);
String actionXml = "<map-reduce>" + "<job-tracker>" + getJobTrackerUri() + "</job-tracker>" + "<name-node>"
+ getNameNodeUri() + "</name-node>"
+ getMapReduceConfig(inputDir.toString(), outputDir.toString()).toXmlString(false) + "</map-reduce>";
_testSubmit(MAP_REDUCE, actionXml);
}
public void testMapReduceActionError() throws Exception {
FileSystem fs = getFileSystem();
Path inputDir = new Path(getFsTestCaseDir(), "input");
Path outputDir = new Path(getFsTestCaseDir(), "output1");
Writer w = new OutputStreamWriter(fs.create(new Path(inputDir, "data.txt")), StandardCharsets.UTF_8);
w.write("dummy\n");
w.write("dummy\n");
writeDummyInput(fs, outputDir);
String actionXml = "<map-reduce>" +
"<job-tracker>" + getJobTrackerUri() + "</job-tracker>" +
"<name-node>" + getNameNodeUri() + "</name-node>" +
"<configuration>" +
"<property><name>mapred.mapper.class</name><value>" + MapperReducerForTest.class.getName() +
"</value></property>" +
"<property><name>mapred.reducer.class</name><value>" + MapperReducerForTest.class.getName() +
"</value></property>" +
"<property><name>mapred.input.dir</name><value>" + inputDir + "</value></property>" +
"<property><name>mapred.output.dir</name><value>" + outputDir + "</value></property>" +
"</configuration>" +
"</map-reduce>";
_testSubmitError(actionXml, "already exists");
}
public void testMapReduceWithConfigClass() throws Exception {
FileSystem fs = getFileSystem();
Path inputDir = new Path(getFsTestCaseDir(), "input");
Path outputDir = new Path(getFsTestCaseDir(), "output");
writeDummyInput(fs, inputDir);
Path jobXml = new Path(getFsTestCaseDir(), "action.xml");
XConfiguration conf = getMapReduceConfig(inputDir.toString(), outputDir.toString());
conf.set(MapperReducerForTest.JOB_XML_OUTPUT_LOCATION, jobXml.toUri().toString());
conf.set("B", "b");
String actionXml = "<map-reduce>" + "<job-tracker>" + getJobTrackerUri() + "</job-tracker>" + "<name-node>"
+ getNameNodeUri() + "</name-node>"
+ conf.toXmlString(false)
+ "<config-class>" + OozieActionConfiguratorForTest.class.getName() + "</config-class>" + "</map-reduce>";
_testSubmit(MAP_REDUCE, actionXml);
Configuration conf2 = new Configuration(false);
conf2.addResource(fs.open(jobXml));
assertEquals("a", conf2.get("A"));
assertEquals("c", conf2.get("B"));
}
public void testMapReduceWithConfigClassNotFound() throws Exception {
FileSystem fs = getFileSystem();
Path inputDir = new Path(getFsTestCaseDir(), "input");
Path outputDir = new Path(getFsTestCaseDir(), "output");
writeDummyInput(fs, inputDir);
String actionXml = "<map-reduce>" + "<job-tracker>" + getJobTrackerUri() + "</job-tracker>" + "<name-node>"
+ getNameNodeUri() + "</name-node>"
+ getMapReduceConfig(inputDir.toString(), outputDir.toString()).toXmlString(false)
+ "<config-class>org.apache.oozie.does.not.exist</config-class>" + "</map-reduce>";
Context context = createContext(MAP_REDUCE, actionXml);
final String launcherId = submitAction(context);
waitUntilYarnAppDoneAndAssertSuccess(launcherId);
final Map<String, String> actionData = LauncherHelper.getActionData(fs, context.getActionDir(),
context.getProtoActionConf());
Properties errorProps = PropertiesUtils.stringToProperties(actionData.get(LauncherAMUtils.ACTION_DATA_ERROR_PROPS));
assertEquals("An Exception occurred while instantiating the action config class",
errorProps.getProperty("exception.message"));
assertTrue(errorProps.getProperty("exception.stacktrace").startsWith(OozieActionConfiguratorException.class.getName()));
}
public void testMapReduceWithConfigClassThrowException() throws Exception {
FileSystem fs = getFileSystem();
Path inputDir = new Path(getFsTestCaseDir(), "input");
Path outputDir = new Path(getFsTestCaseDir(), "output");
writeDummyInput(fs, inputDir);
XConfiguration conf = getMapReduceConfig(inputDir.toString(), outputDir.toString());
conf.setBoolean("oozie.test.throw.exception", true); // causes OozieActionConfiguratorForTest to throw an exception
String actionXml = "<map-reduce>" + "<job-tracker>" + getJobTrackerUri() + "</job-tracker>" + "<name-node>"
+ getNameNodeUri() + "</name-node>"
+ conf.toXmlString(false)
+ "<config-class>" + OozieActionConfiguratorForTest.class.getName() + "</config-class>" + "</map-reduce>";
Context context = createContext(MAP_REDUCE, actionXml);
final String launcherId = submitAction(context);
waitUntilYarnAppDoneAndAssertSuccess(launcherId);
final Map<String, String> actionData = LauncherHelper.getActionData(fs, context.getActionDir(),
context.getProtoActionConf());
Properties errorProps = PropertiesUtils.stringToProperties(actionData.get(LauncherAMUtils.ACTION_DATA_ERROR_PROPS));
assertEquals("doh", errorProps.getProperty("exception.message"));
assertTrue(errorProps.getProperty("exception.stacktrace").startsWith(OozieActionConfiguratorException.class.getName()));
}
public void testMapReduceActionKill() throws Exception {
FileSystem fs = getFileSystem();
Path inputDir = new Path(getFsTestCaseDir(), "input");
Path outputDir = new Path(getFsTestCaseDir(), "output");
writeDummyInput(fs, inputDir);
String actionXml = "<map-reduce>" + "<job-tracker>" + getJobTrackerUri() + "</job-tracker>" + "<name-node>"
+ getNameNodeUri() + "</name-node>"
+ getSleepMapReduceConfig(inputDir.toString(), outputDir.toString()).toXmlString(false) + "</map-reduce>";
Context context = createContext(MAP_REDUCE, actionXml);
final String launcherId = submitAction(context);
// wait until LauncherAM terminates - the MR job keeps running the background
waitUntilYarnAppDoneAndAssertSuccess(launcherId);
MapReduceActionExecutor mae = new MapReduceActionExecutor();
mae.check(context, context.getAction()); // must be called so that externalChildIDs are read from HDFS
mae.kill(context, context.getAction());
waitUntilYarnAppKilledAndAssertSuccess(context.getAction().getExternalChildIDs());
}
public void testMapReduceWithCredentials() throws Exception {
FileSystem fs = getFileSystem();
Path inputDir = new Path(getFsTestCaseDir(), "input");
Path outputDir = new Path(getFsTestCaseDir(), "output");
writeDummyInput(fs, inputDir);
String actionXml = "<map-reduce>" + "<job-tracker>" + getJobTrackerUri() + "</job-tracker>" + "<name-node>"
+ getNameNodeUri() + "</name-node>"
+ getMapReduceCredentialsConfig(inputDir.toString(), outputDir.toString()).toXmlString(false)
+ "</map-reduce>";
_testSubmitWithCredentials(MAP_REDUCE, actionXml);
}
protected Path createAndUploadUberJar() throws Exception {
Path localJobJarPath = makeUberJarWithLib(getTestCaseDir());
Path remoteJobJarPath = new Path(getAppPath(), localJobJarPath.getName());
getFileSystem().moveFromLocalFile(localJobJarPath, remoteJobJarPath);
File localJobJarFile = new File(localJobJarPath.toUri().toString());
if (localJobJarFile.exists()) { // just to make sure
localJobJarFile.delete();
}
return remoteJobJarPath;
}
private Path makeUberJarWithLib(String testDir) throws Exception {
Path jobJarPath = new Path(testDir, "uber.jar");
FileOutputStream fos = new FileOutputStream(new File(jobJarPath.toUri().getPath()));
JarOutputStream jos = new JarOutputStream(fos);
// Have to put in real jar files or it will complain
createAndAddJarToJar(jos, new File(new Path(testDir, "lib1.jar").toUri().getPath()));
createAndAddJarToJar(jos, new File(new Path(testDir, "lib2.jar").toUri().getPath()));
jos.close();
return jobJarPath;
}
private void createAndAddJarToJar(JarOutputStream jos, File jarFile) throws Exception {
FileOutputStream fos2 = new FileOutputStream(jarFile);
JarOutputStream jos2 = new JarOutputStream(fos2);
// Have to have at least one entry or it will complain
ZipEntry ze = new ZipEntry(jarFile.getName() + ".inside");
jos2.putNextEntry(ze);
jos2.closeEntry();
jos2.close();
ze = new ZipEntry("lib/" + jarFile.getName());
jos.putNextEntry(ze);
FileInputStream in = new FileInputStream(jarFile);
byte buf[] = new byte[1024];
int numRead;
do {
numRead = in.read(buf);
if (numRead >= 0) {
jos.write(buf, 0, numRead);
}
} while (numRead != -1);
in.close();
jos.closeEntry();
jarFile.delete();
}
public void _testMapReduceWithUberJar() throws Exception {
FileSystem fs = getFileSystem();
Path inputDir = new Path(getFsTestCaseDir(), "input");
Path outputDir = new Path(getFsTestCaseDir(), "output");
writeDummyInput(fs, inputDir);
String actionXml = "<map-reduce>" + "<job-tracker>" + getJobTrackerUri() + "</job-tracker>" + "<name-node>"
+ getNameNodeUri() + "</name-node>"
+ getMapReduceUberJarConfig(inputDir.toString(), outputDir.toString()).toXmlString(false) + "</map-reduce>";
String appID = _testSubmit(MAP_REDUCE, actionXml);
boolean containsLib1Jar = false;
String lib1JarStr = "jobcache/" + appID + "/jars/lib/lib1.jar";
Pattern lib1JarPatYarn = Pattern.compile(
".*appcache/" + appID + "/filecache/.*/uber.jar/lib/lib1.jar.*");
boolean containsLib2Jar = false;
String lib2JarStr = "jobcache/" + appID + "/jars/lib/lib1.jar";
Pattern lib2JarPatYarn = Pattern.compile(
".*appcache/" + appID + "/filecache/.*/uber.jar/lib/lib2.jar.*");
FileStatus[] fstats = getFileSystem().listStatus(outputDir);
for (FileStatus fstat : fstats) {
Path p = fstat.getPath();
if (getFileSystem().isFile(p) && p.getName().startsWith("part-")) {
InputStream is = getFileSystem().open(p);
Scanner sc = new Scanner(is,StandardCharsets.UTF_8.name());
while (sc.hasNextLine()) {
String line = sc.nextLine();
containsLib1Jar = (containsLib1Jar || line.contains(lib1JarStr) || lib1JarPatYarn.matcher(line).matches());
containsLib2Jar = (containsLib2Jar || line.contains(lib2JarStr) || lib2JarPatYarn.matcher(line).matches());
}
sc.close();
is.close();
}
}
assertTrue("lib/lib1.jar should have been unzipped from the uber jar and added to the classpath but was not",
containsLib1Jar);
assertTrue("lib/lib2.jar should have been unzipped from the uber jar and added to the classpath but was not",
containsLib2Jar);
}
// With the oozie.action.mapreduce.uber.jar.enable property set to false, a workflow with an uber jar should fail
public void testMapReduceWithUberJarDisabled() throws Exception {
Services serv = Services.get();
boolean originalUberJarDisabled = serv.getConf().getBoolean("oozie.action.mapreduce.uber.jar.enable", false);
try {
serv.getConf().setBoolean("oozie.action.mapreduce.uber.jar.enable", false);
_testMapReduceWithUberJar();
} catch (ActionExecutorException aee) {
assertEquals("MR003", aee.getErrorCode());
assertEquals(ActionExecutorException.ErrorType.ERROR, aee.getErrorType());
assertTrue(aee.getMessage().contains("oozie.action.mapreduce.uber.jar.enable"));
assertTrue(aee.getMessage().contains("oozie.mapreduce.uber.jar"));
} catch (Exception e) {
throw e;
} finally {
serv.getConf().setBoolean("oozie.action.mapreduce.uber.jar.enable", originalUberJarDisabled);
}
}
public void testJobNameSetForMapReduceChild() throws Exception {
Services serv = Services.get();
serv.getConf().setBoolean("oozie.action.mapreduce.uber.jar.enable", true);
final FileSystem fs = getFileSystem();
final Path inputDir = new Path(getFsTestCaseDir(), "input");
final Path outputDir = new Path(getFsTestCaseDir(), "output");
writeDummyInput(fs, inputDir);
final String actionXml = "<map-reduce>" + "<job-tracker>" + getJobTrackerUri() + "</job-tracker>" + "<name-node>"
+ getNameNodeUri() + "</name-node>"
+ getMapReduceUberJarConfig(inputDir.toString(), outputDir.toString()).toXmlString(false) + "</map-reduce>";
final String extId = _testSubmit(MAP_REDUCE, actionXml);
final ApplicationId appId = ConverterUtils.toApplicationId(extId);
final Configuration conf = getHadoopAccessorService().createConfiguration(getJobTrackerUri());
final String name = getHadoopAccessorService().createYarnClient(getTestUser(), conf).getApplicationReport(appId).getName();
assertTrue(name.contains("oozie:action"));
}
private void writeDummyInput(FileSystem fs, Path inputDir) throws IOException {
Writer w = new OutputStreamWriter(fs.create(new Path(inputDir, "data.txt")), StandardCharsets.UTF_8);
w.write("dummy\n");
w.write("dummy\n");
w.close();
}
public void testMapReduceWithUberJarEnabled() throws Exception {
Services serv = Services.get();
boolean originalUberJarDisabled = serv.getConf().getBoolean("oozie.action.mapreduce.uber.jar.enable", false);
try {
serv.getConf().setBoolean("oozie.action.mapreduce.uber.jar.enable", true);
_testMapReduceWithUberJar();
} catch (Exception e) {
throw e;
} finally {
serv.getConf().setBoolean("oozie.action.mapreduce.uber.jar.enable", originalUberJarDisabled);
}
}
protected XConfiguration getStreamingConfig(String inputDir, String outputDir) {
XConfiguration conf = new XConfiguration();
conf.set("mapred.input.dir", inputDir);
conf.set("mapred.output.dir", outputDir);
return conf;
}
private void runStreamingWordCountJob(Path inputDir, Path outputDir, XConfiguration streamingConf) throws Exception {
FileSystem fs = getFileSystem();
Path streamingJar = new Path(getFsTestCaseDir(), "jar/hadoop-streaming.jar");
InputStream is = new FileInputStream(ClassUtils.findContainingJar(StreamJob.class));
OutputStream os = fs.create(new Path(getAppPath(), streamingJar));
IOUtils.copyStream(is, os);
writeDummyInput(fs, inputDir);
String actionXml = "<map-reduce>" + "<job-tracker>" + getJobTrackerUri() + "</job-tracker>" + "<name-node>"
+ getNameNodeUri() + "</name-node>" + " <streaming>" + " <mapper>cat</mapper>"
+ " <reducer>wc</reducer>" + " </streaming>"
+ streamingConf.toXmlString(false) + "<file>"
+ streamingJar + "</file>" + "</map-reduce>";
_testSubmit("streaming", actionXml);
}
public void testStreaming() throws Exception {
FileSystem fs = getFileSystem();
Path inputDir = new Path(getFsTestCaseDir(), "input");
Path outputDir = new Path(getFsTestCaseDir(), "output");
final XConfiguration streamingConf = getStreamingConfig(inputDir.toString(), outputDir.toString());
runStreamingWordCountJob(inputDir, outputDir, streamingConf);
final FSDataInputStream dis = fs.open(getOutputFile(outputDir, fs));
final List<String> lines = org.apache.commons.io.IOUtils.readLines(dis);
dis.close();
assertEquals(1, lines.size());
// Not sure why it is 14 instead of 12. \n twice ??
assertEquals("2 2 14", lines.get(0).trim());
}
public void testStreamingConfOverride() throws Exception {
FileSystem fs = getFileSystem();
Path inputDir = new Path(getFsTestCaseDir(), "input");
Path outputDir = new Path(getFsTestCaseDir(), "output");
final XConfiguration streamingConf = getStreamingConfig(inputDir.toString(), outputDir.toString());
streamingConf.set("mapred.output.format.class", "org.apache.hadoop.mapred.SequenceFileOutputFormat");
runStreamingWordCountJob(inputDir, outputDir, streamingConf);
SequenceFile.Reader seqFile = new SequenceFile.Reader(fs, getOutputFile(outputDir, fs), getFileSystem().getConf());
Text key = new Text(), value = new Text();
if (seqFile.next(key, value)) {
assertEquals("2 2 14", key.toString().trim());
assertEquals("", value.toString());
}
assertFalse(seqFile.next(key, value));
seqFile.close();
}
private Path getOutputFile(Path outputDir, FileSystem fs) throws FileNotFoundException, IOException {
final FileStatus[] files = fs.listStatus(outputDir, new PathFilter() {
@Override
public boolean accept(Path path) {
return path.getName().startsWith("part");
}
});
return files[0].getPath(); //part-[m/r]-00000
}
protected XConfiguration getPipesConfig(String inputDir, String outputDir) {
XConfiguration conf = new XConfiguration();
conf.setBoolean("hadoop.pipes.java.recordreader", true);
conf.setBoolean("hadoop.pipes.java.recordwriter", true);
conf.set("mapred.input.dir", inputDir);
conf.set("mapred.output.dir", outputDir);
return conf;
}
private XConfiguration getOozieActionExternalStatsWriteProperty(String inputDir, String outputDir,
String oozieProperty) {
XConfiguration conf = new XConfiguration();
conf.set("mapred.input.dir", inputDir);
conf.set("mapred.output.dir", outputDir);
conf.set("oozie.action.external.stats.write", oozieProperty);
return conf;
}
public void testPipes() throws Exception {
Path programPath = new Path(getFsTestCaseDir(), "wordcount-simple");
FileSystem fs = getFileSystem();
InputStream is = Thread.currentThread().getContextClassLoader().getResourceAsStream("wordcount-simple");
if (is != null) {
OutputStream os = fs.create(programPath);
IOUtils.copyStream(is, os);
Path inputDir = new Path(getFsTestCaseDir(), "input");
Path outputDir = new Path(getFsTestCaseDir(), "output");
writeDummyInput(fs, inputDir);
String actionXml = "<map-reduce>" + "<job-tracker>" + getJobTrackerUri() + "</job-tracker>" + "<name-node>"
+ getNameNodeUri() + "</name-node>" + " <pipes>" + " <program>" + programPath
+ "#wordcount-simple" + "</program>" + " </pipes>"
+ getPipesConfig(inputDir.toString(), outputDir.toString()).toXmlString(false) + "<file>"
+ programPath + "</file>" + "</map-reduce>";
_testSubmit(PIPES, actionXml);
}
else {
System.out.println(
"SKIPPING TEST: TestMapReduceActionExecutor.testPipes(), " +
"binary 'wordcount-simple' not available in the classpath");
}
}
// Test to assert that executionStats is set when user has specified stats
// write property as true.
public void testSetExecutionStats_when_user_has_specified_stats_write_TRUE() throws Exception {
FileSystem fs = getFileSystem();
Path inputDir = new Path(getFsTestCaseDir(), "input");
Path outputDir = new Path(getFsTestCaseDir(), "output");
writeDummyInput(fs, inputDir);
// set user stats write property as true explicitly in the
// configuration.
String actionXml = "<map-reduce>"
+ "<job-tracker>"
+ getJobTrackerUri()
+ "</job-tracker>"
+ "<name-node>"
+ getNameNodeUri()
+ "</name-node>"
+ getOozieActionExternalStatsWriteProperty(inputDir.toString(), outputDir.toString(), "true")
.toXmlString(false) + "</map-reduce>";
Context context = createContext(MAP_REDUCE, actionXml);
final String launcherId = submitAction(context);
waitUntilYarnAppDoneAndAssertSuccess(launcherId);
MapReduceActionExecutor ae = new MapReduceActionExecutor();
Configuration conf = ae.createBaseHadoopConf(context, XmlUtils.parseXml(actionXml));
Map<String, String> actionData = LauncherHelper.getActionData(getFileSystem(), context.getActionDir(),
conf);
assertTrue(LauncherHelper.hasIdSwap(actionData));
ae.check(context, context.getAction());
assertTrue(launcherId.equals(context.getAction().getExternalId()));
waitUntilYarnAppDoneAndAssertSuccess(context.getAction().getExternalChildIDs());
ae.check(context, context.getAction());
assertEquals(JavaActionExecutor.SUCCEEDED, context.getAction().getExternalStatus());
assertNull(context.getAction().getData());
ae.end(context, context.getAction());
assertEquals(WorkflowAction.Status.OK, context.getAction().getStatus());
// Assert for stats info stored in the context.
assertNotNull(context.getExecutionStats());
assertTrue(context.getExecutionStats().contains("ACTION_TYPE"));
assertTrue(context.getExecutionStats().contains("Counter"));
// External Child IDs used to be null, but after 4.0, become Non-Null in case of MR action.
assertNotNull(context.getExternalChildIDs());
// hadoop.counters will always be set in case of MR action.
assertNotNull(context.getVar("hadoop.counters"));
String counters = context.getVar("hadoop.counters");
assertTrue(counters.contains("Counter"));
}
// Test to assert that executionStats is not set when user has specified
// stats write property as false.
public void testSetExecutionStats_when_user_has_specified_stats_write_FALSE() throws Exception {
FileSystem fs = getFileSystem();
Path inputDir = new Path(getFsTestCaseDir(), "input");
Path outputDir = new Path(getFsTestCaseDir(), "output");
writeDummyInput(fs, inputDir);
// set user stats write property as false explicitly in the
// configuration.
String actionXml = "<map-reduce>"
+ "<job-tracker>"
+ getJobTrackerUri()
+ "</job-tracker>"
+ "<name-node>"
+ getNameNodeUri()
+ "</name-node>"
+ getOozieActionExternalStatsWriteProperty(inputDir.toString(), outputDir.toString(), "false")
.toXmlString(false) + "</map-reduce>";
Context context = createContext(MAP_REDUCE, actionXml);
final String launcherId = submitAction(context);
waitUntilYarnAppDoneAndAssertSuccess(launcherId);
Map<String, String> actionData = LauncherHelper.getActionData(getFileSystem(), context.getActionDir(),
context.getProtoActionConf());
assertTrue(LauncherHelper.hasIdSwap(actionData));
MapReduceActionExecutor ae = new MapReduceActionExecutor();
ae.check(context, context.getAction());
assertTrue(launcherId.equals(context.getAction().getExternalId()));
waitUntilYarnAppDoneAndAssertSuccess(context.getAction().getExternalChildIDs());
ae.check(context, context.getAction());
assertEquals(JavaActionExecutor.SUCCEEDED, context.getAction().getExternalStatus());
assertNull(context.getAction().getData());
ae.end(context, context.getAction());
assertEquals(WorkflowAction.Status.OK, context.getAction().getStatus());
// Assert for stats info stored in the context.
assertNull(context.getExecutionStats());
// External Child IDs used to be null, but after 4.0, become Non-Null in case of MR action.
assertNotNull(context.getExternalChildIDs());
// hadoop.counters will always be set in case of MR action.
assertNotNull(context.getVar("hadoop.counters"));
String counters = context.getVar("hadoop.counters");
assertTrue(counters.contains("Counter"));
}
public void testEndWithoutConfiguration() throws Exception {
FileSystem fs = getFileSystem();
Path inputDir = new Path(getFsTestCaseDir(), "input");
Path outputDir = new Path(getFsTestCaseDir(), "output");
writeDummyInput(fs, inputDir);
// set user stats write property as false explicitly in the
// configuration.
String actionXml = "<map-reduce>"
+ "<job-tracker>"
+ getJobTrackerUri()
+ "</job-tracker>"
+ "<name-node>"
+ getNameNodeUri()
+ "</name-node>"
+ getOozieActionExternalStatsWriteProperty(inputDir.toString(), outputDir.toString(), "false")
.toXmlString(false) + "</map-reduce>";
Context context = createContext(MAP_REDUCE, actionXml);
final String launcherId = submitAction(context);
waitUntilYarnAppDoneAndAssertSuccess(launcherId);
Map<String, String> actionData = LauncherHelper.getActionData(getFileSystem(), context.getActionDir(),
context.getProtoActionConf());
assertTrue(LauncherHelper.hasIdSwap(actionData));
MapReduceActionExecutor ae = new MapReduceActionExecutor();
ae.check(context, context.getAction());
assertTrue(launcherId.equals(context.getAction().getExternalId()));
waitUntilYarnAppDoneAndAssertSuccess(context.getAction().getExternalChildIDs());
ae.check(context, context.getAction());
assertEquals(JavaActionExecutor.SUCCEEDED, context.getAction().getExternalStatus());
assertNull(context.getAction().getData());
actionXml = "<map-reduce>"
+ "<job-tracker>"
+ getJobTrackerUri()
+ "</job-tracker>"
+ "<name-node>"
+ getNameNodeUri()
+ "</name-node></map-reduce>";
WorkflowActionBean action = (WorkflowActionBean) context.getAction();
action.setConf(actionXml);
try {
ae.end(context, context.getAction());
}
catch(Exception e)
{
fail("unexpected exception throwing " + e);
}
}
/**
* Test "oozie.launcher.mapred.job.name" and "mapred.job.name" can be set in
* the action configuration and not overridden by the action executor
*
* @throws Exception
*/
public void testSetMapredJobName() throws Exception {
final String launcherJobName = "MapReduceLauncherTest";
final String mapredJobName = "MapReduceTest";
FileSystem fs = getFileSystem();
Path inputDir = new Path(getFsTestCaseDir(), "input");
Path outputDir = new Path(getFsTestCaseDir(), "output");
writeDummyInput(fs, inputDir);
XConfiguration mrConfig = getMapReduceConfig(inputDir.toString(),
outputDir.toString());
mrConfig.set("oozie.launcher.mapred.job.name", launcherJobName);
mrConfig.set("mapred.job.name", mapredJobName);
StringBuilder sb = new StringBuilder("<map-reduce>")
.append("<job-tracker>").append(getJobTrackerUri())
.append("</job-tracker>").append("<name-node>")
.append(getNameNodeUri()).append("</name-node>")
.append(mrConfig.toXmlString(false)).append("</map-reduce>");
String actionXml = sb.toString();
Context context = createContext(MAP_REDUCE, actionXml);
final String launcherId = submitAction(context);
waitUntilYarnAppDoneAndAssertSuccess(launcherId);
Map<String, String> actionData = LauncherHelper.getActionData(getFileSystem(), context.getActionDir(),
context.getProtoActionConf());
assertTrue(LauncherHelper.hasIdSwap(actionData));
MapReduceActionExecutor ae = new MapReduceActionExecutor();
ae.check(context, context.getAction());
assertTrue(launcherId.equals(context.getAction().getExternalId()));
String externalChildIDs = context.getAction().getExternalChildIDs();
waitUntilYarnAppDoneAndAssertSuccess(externalChildIDs);
ae.check(context, context.getAction());
assertEquals(JavaActionExecutor.SUCCEEDED, context.getAction().getExternalStatus());
assertNull(context.getAction().getData());
ae.end(context, context.getAction());
assertEquals(WorkflowAction.Status.OK, context.getAction().getStatus());
Configuration conf = getHadoopAccessorService().createConfiguration(getJobTrackerUri());
final YarnClient yarnClient = getHadoopAccessorService().createYarnClient(getTestUser(), conf);
ApplicationReport report = yarnClient.getApplicationReport(ConverterUtils.toApplicationId(externalChildIDs));
// Assert Mapred job name has been set
assertEquals(mapredJobName, report.getName());
// Assert for stats info stored in the context.
assertNull(context.getExecutionStats());
// External Child IDs used to be null, but after 4.0, become Non-Null in case of MR action.
assertNotNull(context.getExternalChildIDs());
// hadoop.counters will always be set in case of MR action.
assertNotNull(context.getVar("hadoop.counters"));
String counters = context.getVar("hadoop.counters");
assertTrue(counters.contains("Counter"));
}
public void testDefaultShareLibName() {
MapReduceActionExecutor ae = new MapReduceActionExecutor();
Element e = new Element("mapreduce");
assertNull(ae.getDefaultShareLibName(e));
e.addContent(new Element("streaming"));
assertEquals("mapreduce-streaming", ae.getDefaultShareLibName(e));
}
/**
* https://issues.apache.org/jira/browse/OOZIE-87
* This test covers map-reduce action
* @throws Exception
*/
public void testCommaSeparatedFilesAndArchives() throws Exception {
Path root = new Path(getFsTestCaseDir(), "root");
Path jar = new Path("jar.jar");
getFileSystem().create(new Path(getAppPath(), jar)).close();
Path rootJar = new Path(root, "rootJar.jar");
getFileSystem().create(rootJar).close();
Path file = new Path("file");
getFileSystem().create(new Path(getAppPath(), file)).close();
Path rootFile = new Path(root, "rootFile");
getFileSystem().create(rootFile).close();
Path so = new Path("soFile.so");
getFileSystem().create(new Path(getAppPath(), so)).close();
Path rootSo = new Path(root, "rootSoFile.so");
getFileSystem().create(rootSo).close();
Path so1 = new Path("soFile.so.1");
getFileSystem().create(new Path(getAppPath(), so1)).close();
Path rootSo1 = new Path(root, "rootSoFile.so.1");
getFileSystem().create(rootSo1).close();
Path archive = new Path("archive.tar");
getFileSystem().create(new Path(getAppPath(), archive)).close();
Path rootArchive = new Path(root, "rootArchive.tar");
getFileSystem().create(rootArchive).close();
String actionXml = "<map-reduce>" +
" <job-tracker>" + getJobTrackerUri() + "</job-tracker>" +
" <name-node>" + getNameNodeUri() + "</name-node>" +
" <main-class>CLASS</main-class>" +
" <file>" + jar.toString() +
"," + rootJar.toString() +
"," + file.toString() +
", " + rootFile.toString() + // with leading and trailing spaces
" ," + so.toString() +
"," + rootSo.toString() +
"," + so1.toString() +
"," + rootSo1.toString() + "</file>\n" +
" <archive>" + archive.toString() + ", "
+ rootArchive.toString() + " </archive>\n" + // with leading and trailing spaces
"</map-reduce>";
Element eActionXml = XmlUtils.parseXml(actionXml);
Context context = createContext(MAP_REDUCE, actionXml);
Path appPath = getAppPath();
MapReduceActionExecutor ae = new MapReduceActionExecutor();
Configuration jobConf = ae.createBaseHadoopConf(context, eActionXml);
ae.setupActionConf(jobConf, context, eActionXml, appPath);
ae.setLibFilesArchives(context, eActionXml, appPath, jobConf);
assertTrue(DistributedCache.getSymlink(jobConf));
Path[] filesInClasspath = DistributedCache.getFileClassPaths(jobConf);
for (Path p : new Path[]{new Path(getAppPath(), jar), rootJar}) {
boolean found = false;
for (Path c : filesInClasspath) {
if (!found && p.toUri().getPath().equals(c.toUri().getPath())) {
found = true;
}
}
assertTrue("file " + p.toUri().getPath() + " not found in classpath", found);
}
for (Path p : new Path[]{new Path(getAppPath(), file), rootFile, new Path(getAppPath(), so), rootSo,
new Path(getAppPath(), so1), rootSo1}) {
boolean found = false;
for (Path c : filesInClasspath) {
if (!found && p.toUri().getPath().equals(c.toUri().getPath())) {
found = true;
}
}
assertFalse("file " + p.toUri().getPath() + " found in classpath", found);
}
URI[] filesInCache = DistributedCache.getCacheFiles(jobConf);
for (Path p : new Path[]{new Path(getAppPath(), jar), rootJar, new Path(getAppPath(), file), rootFile,
new Path(getAppPath(), so), rootSo, new Path(getAppPath(), so1), rootSo1}) {
boolean found = false;
for (URI c : filesInCache) {
if (!found && p.toUri().getPath().equals(c.getPath())) {
found = true;
}
}
assertTrue("file " + p.toUri().getPath() + " not found in cache", found);
}
URI[] archivesInCache = DistributedCache.getCacheArchives(jobConf);
for (Path p : new Path[]{new Path(getAppPath(), archive), rootArchive}) {
boolean found = false;
for (URI c : archivesInCache) {
if (!found && p.toUri().getPath().equals(c.getPath())) {
found = true;
}
}
assertTrue("archive " + p.toUri().getPath() + " not found in cache", found);
}
}
private void waitForWorkflowAction(final String actionId) {
waitFor(3 * 60 * 1000, new Predicate() {
public boolean evaluate() throws Exception {
WorkflowActionBean mrAction = WorkflowActionQueryExecutor.getInstance()
.get(WorkflowActionQuery.GET_ACTION, actionId);
return mrAction.inTerminalState() || mrAction.getStatus() == Status.RUNNING;
}
});
}
public void testFailingMapReduceJobCausesOozieLauncherAMToFail() throws Exception {
final String workflowUri = createWorkflowWithMapReduceAction();
startWorkflowAndFailChildMRJob(workflowUri);
}
private String createWorkflowWithMapReduceAction() throws IOException {
final String workflowUri = getTestCaseFileUri("workflow.xml");
final String appXml = "<workflow-app xmlns=\"uri:oozie:workflow:1.0\" name=\"workflow\">" +
" <start to=\"map-reduce\"/>" +
" <action name=\"map-reduce\">" +
" <map-reduce>" +
" <resource-manager>" + getJobTrackerUri() + "</resource-manager>" +
" <name-node>" + getNameNodeUri() + "</name-node>" +
" <configuration>\n" +
" <property>\n" +
" <name>mapred.job.queue.name</name>\n" +
" <value>default</value>\n" +
" </property>\n" +
" <property>\n" +
" <name>mapred.mapper.class</name>\n" +
" <value>org.apache.oozie.action.hadoop.SleepMapperReducerForTest</value>\n" +
" </property>\n" +
" <property>\n" +
" <name>mapred.reducer.class</name>\n" +
" <value>org.apache.oozie.action.hadoop.SleepMapperReducerForTest</value>\n" +
" </property>\n" +
" <property>\n" +
" <name>mapred.input.dir</name>\n" +
" <value>" + getFsTestCaseDir() + "/input</value>\n" +
" </property>\n" +
" <property>\n" +
" <name>mapred.output.dir</name>\n" +
" <value>" + getFsTestCaseDir() + "/output</value>\n" +
" </property>\n" +
" </configuration>\n" +
" </map-reduce>" +
" <ok to=\"end\"/>" +
" <error to=\"fail\"/>" +
" </action>" +
" <kill name=\"fail\">" +
" <message>Sub workflow failed, error message[${wf:errorMessage(wf:lastErrorNode())}]</message>" +
" </kill>" +
" <end name=\"end\"/>" +
"</workflow-app>";
writeToFile(appXml, workflowUri);
return workflowUri;
}
private void startWorkflowAndFailChildMRJob(final String workflowUri) throws Exception {
try {
LocalOozie.start();
final OozieClient wfClient = LocalOozie.getClient();
final String workflowId = submitWorkflow(workflowUri, wfClient);
final Configuration conf = Services.get().get(HadoopAccessorService.class).createConfiguration(getJobTrackerUri());
final Path inputFolder = createInputFolder(conf);
waitForWorkflowToStart(wfClient, workflowId);
waitForChildYarnApplication(getHadoopAccessorService().createYarnClient(getTestUser(), conf), workflowId);
assertAndWriteNextMRJobId(workflowId, conf, inputFolder);
final ApplicationId externalChildJobId = getChildMRJobApplicationId(conf);
killYarnApplication(conf, externalChildJobId);
waitUntilYarnAppKilledAndAssertSuccess(externalChildJobId.toString());
waitForWorkflowToKill(wfClient, workflowId);
} finally {
LocalOozie.stop();
}
}
/**
* Get all YARN application IDs, select the one of type {@code MAPREDUCE} that is relevant to {@code workflowId},
* and write to {@code inputFolder/jobID.txt}.
* <p>
* Simulating functional parts of {@link LauncherMain#writeExternalChildIDs(String, Pattern[], String)} in order
* {@link MapReduceActionExecutor#check(ActionExecutor.Context, WorkflowAction)} can find it later on the call chain.
* <p>
* We need to write out an own sequence file to {@link LauncherMainTester#JOB_ID_FILE_NAME} in order
* {@link ActionExecutorTestCase#getChildMRJobApplicationId(Configuration)} can find it. We unfortunately cannot rely on the
* original sequence file written by {@link LauncherMain#writeExternalChildIDs(String, Pattern[], String)} because we don't own
* a reference to the original {@link ActionExecutor.Context} as in {@link MapReduceActionExecutor}.
* @param workflowId the workflow ID
* @param conf the {@link Configuration} used for Hadoop Common / YARN API calls
* @param inputFolder where to write the output text file
* @throws IOException when the output text file cannot be written
* @throws YarnException when the list of YARN applications cannot be queried
* @throws HadoopAccessorException when {@link YarnClient} cannot be created
*/
private void assertAndWriteNextMRJobId(final String workflowId, final Configuration conf, final Path inputFolder)
throws IOException, YarnException, HadoopAccessorException {
final Path wfIDFile = new Path(inputFolder, LauncherMainTester.JOB_ID_FILE_NAME);
try (final FileSystem fs = FileSystem.get(conf);
final Writer w = new OutputStreamWriter(fs.create(wfIDFile), StandardCharsets.UTF_8)) {
final List<ApplicationReport> allApplications =
getHadoopAccessorService().createYarnClient(getTestUser(), conf).getApplications();
assertTrue("YARN applications number mismatch", allApplications.size() >= 2);
ApplicationReport mapReduce = null;
for (final ApplicationReport candidate : allApplications) {
if (candidate.getApplicationType().equals(MapReduceActionExecutor.YARN_APPLICATION_TYPE_MAPREDUCE)
&& candidate.getName().contains(workflowId)) {
mapReduce = candidate;
}
}
assertNotNull("MAPREDUCE YARN application not found", mapReduce);
final String applicationId = mapReduce.getApplicationId().toString();
final String nextMRJobId = applicationId.replace("application", "job");
LOG.debug("Writing next MapReduce job ID: {0}", nextMRJobId);
w.write(nextMRJobId);
}
}
private Path createInputFolder(final Configuration conf) throws IOException {
final Path inputDir = new Path(getFsTestCaseDir(), "input");
try (final FileSystem fs = FileSystem.get(conf)) {
fs.mkdirs(inputDir);
}
return inputDir;
}
private void waitForChildYarnApplication(final YarnClient yarnClient, final String workflowId) {
waitFor(JOB_TIMEOUT, new ChildYarnApplicationPresentPredicate(yarnClient, workflowId));
}
private class ChildYarnApplicationPresentPredicate implements Predicate {
private final YarnClient yarnClient;
private final String workflowId;
ChildYarnApplicationPresentPredicate(final YarnClient yarnClient, final String workflowId) {
this.yarnClient = yarnClient;
this.workflowId = workflowId;
}
@Override
public boolean evaluate() throws Exception {
if (yarnClient.getApplications().isEmpty()) {
return false;
}
for (final ApplicationReport applicationReport : yarnClient.getApplications()) {
final String name = applicationReport.getName();
final String type = applicationReport.getApplicationType();
if (type.equals(MapReduceActionExecutor.YARN_APPLICATION_TYPE_MAPREDUCE) && name.contains(workflowId)) {
return true;
}
}
return false;
}
}
}