blob: d8fa8d88045d20f6607b65bd643ca4b750369ec7 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.oozie.action.hadoop;
import java.io.File;
import java.io.FileInputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.io.OutputStreamWriter;
import java.io.Writer;
import java.net.URI;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.filecache.DistributedCache;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.oozie.WorkflowActionBean;
import org.apache.oozie.WorkflowJobBean;
import org.apache.oozie.client.WorkflowAction;
import org.apache.oozie.service.ConfigurationService;
import org.apache.oozie.service.Services;
import org.apache.oozie.service.WorkflowAppService;
import org.apache.oozie.util.IOUtils;
import org.apache.oozie.util.XConfiguration;
import org.apache.oozie.util.XmlUtils;
import org.jdom.Element;
import org.json.simple.JSONValue;
import org.json.simple.parser.JSONParser;
public class TestPigActionExecutor extends ActionExecutorTestCase {
private static final String PIG_SCRIPT = "set job.name 'test'\n" + "set debug on\n" +
"A = load '$IN' using PigStorage(':');\n" +
"B = foreach A generate $0 as id;\n" +
"store B into '$OUT' USING PigStorage();\n";
private static final String ERROR_PIG_SCRIPT = "set job.name 'test'\n" + "set debug on\n" +
"A = load '$IN' using PigStorage(':');\n" +
"ERROR @#$@#$;\n";
private static final String UDF_PIG_SCRIPT = "register udf.jar\n" +
"set job.name 'test'\n" + "set debug on\n" +
"A = load '$IN' using PigStorage(':');\n" +
"B = foreach A generate" +
" org.apache.oozie.action.hadoop.UDFTester($0) as id;\n" +
"store B into '$OUT' USING PigStorage();\n";
@Override
protected void setUp() throws Exception {
super.setUp();
PigTestCase.resetPigStats();
}
@Override
protected void tearDown() throws Exception {
PigTestCase.resetPigStats();
super.tearDown();
}
@Override
protected void setSystemProps() throws Exception {
super.setSystemProps();
setSystemProperty("oozie.service.ActionService.executor.classes", PigActionExecutor.class.getName());
}
public void testSetupMethods() throws Exception {
PigActionExecutor ae = new PigActionExecutor();
List<Class> classes = new ArrayList<Class>();
classes.add(PigMain.class);
classes.add(JSONParser.class);
assertEquals(classes, ae.getLauncherClasses());
Element actionXml = XmlUtils.parseXml("<pig>" +
"<job-tracker>" + getJobTrackerUri() + "</job-tracker>" +
"<name-node>" + getNameNodeUri() + "</name-node>" +
"<script>SCRIPT</script>" +
"<param>a=A</param>" +
"<param>b=B</param>" +
"</pig>");
XConfiguration protoConf = new XConfiguration();
protoConf.set(WorkflowAppService.HADOOP_USER, getTestUser());
WorkflowJobBean wf = createBaseWorkflow(protoConf, "pig-action");
WorkflowActionBean action = (WorkflowActionBean) wf.getActions().get(0);
action.setType(ae.getType());
Context context = new Context(wf, action);
Configuration conf = ae.createBaseHadoopConf(context, actionXml);
ae.setupActionConf(conf, context, actionXml, getFsTestCaseDir());
assertEquals("SCRIPT", conf.get("oozie.pig.script"));
assertEquals("2", conf.get("oozie.pig.params.size"));
assertEquals("a=A", conf.get("oozie.pig.params.0"));
assertEquals("b=B", conf.get("oozie.pig.params.1"));
}
private Context createContext(String actionXml) throws Exception {
PigActionExecutor ae = new PigActionExecutor();
FileSystem fs = getFileSystem();
XConfiguration protoConf = new XConfiguration();
protoConf.set(WorkflowAppService.HADOOP_USER, getTestUser());
SharelibUtils.addToDistributedCache("pig", fs, getFsTestCaseDir(), protoConf);
WorkflowJobBean wf = createBaseWorkflow(protoConf, "pig-action");
WorkflowActionBean action = (WorkflowActionBean) wf.getActions().get(0);
action.setType(ae.getType());
action.setConf(actionXml);
return new Context(wf, action);
}
private String submitAction(Context context) throws Exception {
PigActionExecutor ae = new PigActionExecutor();
WorkflowAction action = context.getAction();
ae.prepareActionDir(getFileSystem(), context);
ae.submitLauncher(getFileSystem(), context, action);
String jobId = action.getExternalId();
return jobId;
}
private void _testSubmit(String actionXml, boolean checkForSuccess) throws Exception {
Context context = createContext(actionXml);
final String launcherId = submitAction(context);
waitUntilYarnAppDoneAndAssertSuccess(launcherId);
PigActionExecutor ae = new PigActionExecutor();
ae.check(context, context.getAction());
ae.end(context, context.getAction());
if (checkForSuccess) {
assertFalse(context.getExternalChildIDs().equals(launcherId));
assertNotNull(context.getAction().getStats());
}
assertTrue(launcherId.equals(context.getAction().getExternalId()));
if (checkForSuccess) {
assertEquals("SUCCEEDED", context.getAction().getExternalStatus());
assertNull(context.getAction().getData());
}
else {
assertEquals("FAILED/KILLED", context.getAction().getExternalStatus());
assertNotNull(context.getAction().getErrorMessage());
}
if (checkForSuccess) {
assertEquals(WorkflowAction.Status.OK, context.getAction().getStatus());
}
else {
assertEquals(WorkflowAction.Status.ERROR, context.getAction().getStatus());
}
}
/*
* Test the stats retrieved from a Pig job
*/
public void testExecutionStats() throws Exception {
// Set the action xml with the option for retrieving stats to true
String actionXml = setPigActionXml(PIG_SCRIPT, true);
Context context = createContext(actionXml);
final String launcherId = submitAction(context);
waitUntilYarnAppDoneAndAssertSuccess(launcherId);
Configuration conf = new XConfiguration();
conf.set("user.name", getTestUser());
Map<String, String> actionData = LauncherHelper.getActionData(getFileSystem(), context.getActionDir(),
conf);
assertTrue(LauncherHelper.hasStatsData(actionData));
PigActionExecutor ae = new PigActionExecutor();
WorkflowAction wfAction = context.getAction();
ae.check(context, wfAction);
ae.end(context, wfAction);
assertEquals(JavaActionExecutor.SUCCEEDED, wfAction.getExternalStatus());
String stats = wfAction.getStats();
assertNotNull(stats);
// check for some of the expected key values in the stats
Map m = (Map) JSONValue.parse(stats);
// check for expected 1st level JSON keys
assertTrue(m.containsKey("PIG_VERSION"));
String expectedChildIDs = wfAction.getExternalChildIDs();
String[] childIDs = expectedChildIDs.split(",");
assertTrue(m.containsKey(childIDs[0]));
Map q = (Map) m.get(childIDs[0]);
// check for expected 2nd level JSON keys
assertTrue(q.containsKey("HADOOP_COUNTERS"));
}
public void testActionConfLoadDefaultResources() throws Exception {
ConfigurationService.setBoolean(
"oozie.service.HadoopAccessorService.action.configurations.load.default.resources", false);
String actionXml = setPigActionXml(PIG_SCRIPT, true);
Context context = createContext(actionXml);
submitAction(context);
FileSystem fs = getFileSystem();
FSDataInputStream os = fs.open(new Path(context.getActionDir(), LauncherAMUtils.ACTION_CONF_XML));
XConfiguration conf = new XConfiguration();
conf.addResource(os);
assertNull(conf.get("oozie.HadoopAccessorService.created"));
}
/*
* Test the Hadoop IDs obtained from the Pig job
*/
public void testExternalChildIds() throws Exception {
// Set the action xml with the option for retrieving stats to false
String actionXml = setPigActionXml(PIG_SCRIPT, false);
Context context = createContext(actionXml);
final String launcherId = submitAction(context);
waitUntilYarnAppDoneAndAssertSuccess(launcherId);
PigActionExecutor ae = new PigActionExecutor();
WorkflowAction wfAction = context.getAction();
ae.check(context, wfAction);
ae.end(context, wfAction);
assertEquals("SUCCEEDED", wfAction.getExternalStatus());
String externalIds = wfAction.getExternalChildIDs();
assertNotNull(externalIds);
assertNotSame("", externalIds);
// check for the expected prefix of hadoop jobIDs
assertTrue(externalIds.contains("job_"));
}
/*
* Test the stats after setting the maximum allowed size of stats to a small
* value
*/
public void testExecutionStatsWithMaxStatsSizeLimit() throws Exception {
Services.get().destroy();
// Set a very small value for max size of stats
setSystemProperty(JavaActionExecutor.MAX_EXTERNAL_STATS_SIZE, new String("1"));
new Services().init();
// Set the action xml with the option for retrieving stats to true
String actionXml = setPigActionXml(PIG_SCRIPT, true);
Context context = createContext(actionXml);
final String launcherId = submitAction(context);
waitUntilYarnAppDoneAndAssertSuccess(launcherId);
PigActionExecutor ae = new PigActionExecutor();
WorkflowAction wfAction = context.getAction();
ae.check(context, wfAction);
ae.end(context, wfAction);
// action should fail as the size of pig stats will always be greater
// than 1 byte
assertEquals("FAILED/KILLED", wfAction.getExternalStatus());
assertNull(wfAction.getStats());
}
/*
* Test the stats with retrieve stats option set to false
*/
public void testExecutionStatsWithRetrieveStatsFalse() throws Exception {
// Set the action xml with the option for retrieving stats to false
String actionXml = setPigActionXml(PIG_SCRIPT, false);
Context context = createContext(actionXml);
final String launcherId = submitAction(context);
waitUntilYarnAppDoneAndAssertSuccess(launcherId);
Configuration conf = new XConfiguration();
conf.set("user.name", getTestUser());
Map<String, String> actionData = LauncherHelper.getActionData(getFileSystem(), context.getActionDir(),
conf);
assertFalse(LauncherHelper.hasStatsData(actionData));
PigActionExecutor ae = new PigActionExecutor();
WorkflowAction wfAction = context.getAction();
ae.check(context, wfAction);
ae.end(context, wfAction);
assertEquals("SUCCEEDED", wfAction.getExternalStatus());
assertNotNull(wfAction.getExternalChildIDs());
}
protected XConfiguration setPigConfig(boolean writeStats) {
XConfiguration conf = new XConfiguration();
conf.set("oozie.pig.log.level", "INFO");
conf.set(PigMain.EXTERNAL_STATS_WRITE, String.valueOf(writeStats));
return conf;
}
public void testPig() throws Exception {
// Set the action xml with the option for retrieving stats to true
String actionXml = setPigActionXml(PIG_SCRIPT, true);
_testSubmit(actionXml, true);
}
public void testPigError() throws Exception {
// Set the action xml with the option for retrieving stats to true
String actionXml = setPigActionXml(ERROR_PIG_SCRIPT, true);
_testSubmit(actionXml, false);
}
private String setPigActionXml(String pigScript, boolean writeStats) throws IOException{
FileSystem fs = getFileSystem();
Path script = new Path(getAppPath(), "script.pig");
Writer w = new OutputStreamWriter(fs.create(script), StandardCharsets.UTF_8);
w.write(pigScript);
w.close();
Path inputDir = new Path(getFsTestCaseDir(), "input");
Path outputDir = new Path(getFsTestCaseDir(), "output");
w = new OutputStreamWriter(fs.create(new Path(inputDir, "data.txt")), StandardCharsets.UTF_8);
w.write("dummy\n");
w.write("dummy\n");
w.close();
String actionXml = "<pig>" +
"<job-tracker>" + getJobTrackerUri() + "</job-tracker>" +
"<name-node>" + getNameNodeUri() + "</name-node>" +
setPigConfig(writeStats).toXmlString(false) +
"<script>" + script.getName() + "</script>" +
"<param>IN=" + inputDir.toUri().getPath() + "</param>" +
"<param>OUT=" + outputDir.toUri().getPath() + "</param>" +
"</pig>";
return actionXml;
}
public void testUdfPig() throws Exception {
FileSystem fs = getFileSystem();
Path udfJar = new Path(getFsTestCaseDir(), "udf.jar");
File jarFile = IOUtils.createJar(new File(getTestCaseDir()), "udf.jar", UDFTester.class);
InputStream is = new FileInputStream(jarFile);
OutputStream os = getFileSystem().create(udfJar);
IOUtils.copyStream(is, os);
Path script = new Path(getAppPath(), "script.pig");
Writer w = new OutputStreamWriter(fs.create(script), StandardCharsets.UTF_8);
w.write(UDF_PIG_SCRIPT);
w.close();
Path inputDir = new Path(getFsTestCaseDir(), "input");
Path outputDir = new Path(getFsTestCaseDir(), "output");
w = new OutputStreamWriter(fs.create(new Path(inputDir, "data.txt")), StandardCharsets.UTF_8);
w.write("dummy\n");
w.write("dummy\n");
w.close();
String actionXml = "<pig>" +
"<job-tracker>" + getJobTrackerUri() + "</job-tracker>" +
"<name-node>" + getNameNodeUri() + "</name-node>" +
setPigConfig(true).toXmlString(false) +
"<script>" + script.getName() + "</script>" +
"<param>IN=" + inputDir.toUri().getPath() + "</param>" +
"<param>OUT=" + outputDir.toUri().getPath() + "</param>" +
"<file>" + udfJar.toString() + "#" + udfJar.getName() + "</file>" +
"</pig>";
_testSubmit(actionXml, true);
}
/**
* https://issues.apache.org/jira/browse/OOZIE-87
* This test covers pig action
* @throws Exception
*/
public void testCommaSeparatedFilesAndArchives() throws Exception {
Path root = new Path(getFsTestCaseDir(), "root");
Path jar = new Path("jar.jar");
getFileSystem().create(new Path(getAppPath(), jar)).close();
Path rootJar = new Path(root, "rootJar.jar");
getFileSystem().create(rootJar).close();
Path file = new Path("file");
getFileSystem().create(new Path(getAppPath(), file)).close();
Path rootFile = new Path(root, "rootFile");
getFileSystem().create(rootFile).close();
Path so = new Path("soFile.so");
getFileSystem().create(new Path(getAppPath(), so)).close();
Path rootSo = new Path(root, "rootSoFile.so");
getFileSystem().create(rootSo).close();
Path so1 = new Path("soFile.so.1");
getFileSystem().create(new Path(getAppPath(), so1)).close();
Path rootSo1 = new Path(root, "rootSoFile.so.1");
getFileSystem().create(rootSo1).close();
Path archive = new Path("archive.tar");
getFileSystem().create(new Path(getAppPath(), archive)).close();
Path rootArchive = new Path(root, "rootArchive.tar");
getFileSystem().create(rootArchive).close();
String actionXml = "<pig>" +
" <job-tracker>" + getJobTrackerUri() + "</job-tracker>" +
" <name-node>" + getNameNodeUri() + "</name-node>" +
" <script>id.pig</script>" +
" <file>" + jar.toString() +
"," + rootJar.toString() +
"," + file.toString() +
", " + rootFile.toString() + // with leading and trailing spaces
" ," + so.toString() +
"," + rootSo.toString() +
"," + so1.toString() +
"," + rootSo1.toString() + "</file>\n" +
" <archive>" + archive.toString() + ", "
+ rootArchive.toString() + " </archive>\n" + // with leading and trailing spaces
"</pig>";
Element eActionXml = XmlUtils.parseXml(actionXml);
Context context = createContext(actionXml);
Path appPath = getAppPath();
PigActionExecutor ae = new PigActionExecutor();
Configuration jobConf = ae.createBaseHadoopConf(context, eActionXml);
ae.setupActionConf(jobConf, context, eActionXml, appPath);
ae.setLibFilesArchives(context, eActionXml, appPath, jobConf);
assertTrue(DistributedCache.getSymlink(jobConf));
Path[] filesInClasspath = DistributedCache.getFileClassPaths(jobConf);
for (Path p : new Path[]{new Path(getAppPath(), jar), rootJar}) {
boolean found = false;
for (Path c : filesInClasspath) {
if (!found && p.toUri().getPath().equals(c.toUri().getPath())) {
found = true;
}
}
assertTrue("file " + p.toUri().getPath() + " not found in classpath", found);
}
for (Path p : new Path[]{new Path(getAppPath(), file), rootFile, new Path(getAppPath(), so), rootSo,
new Path(getAppPath(), so1), rootSo1}) {
boolean found = false;
for (Path c : filesInClasspath) {
if (!found && p.toUri().getPath().equals(c.toUri().getPath())) {
found = true;
}
}
assertFalse("file " + p.toUri().getPath() + " found in classpath", found);
}
URI[] filesInCache = DistributedCache.getCacheFiles(jobConf);
for (Path p : new Path[]{new Path(getAppPath(), jar), rootJar, new Path(getAppPath(), file), rootFile,
new Path(getAppPath(), so), rootSo, new Path(getAppPath(), so1), rootSo1}) {
boolean found = false;
for (URI c : filesInCache) {
if (!found && p.toUri().getPath().equals(c.getPath())) {
found = true;
}
}
assertTrue("file " + p.toUri().getPath() + " not found in cache", found);
}
URI[] archivesInCache = DistributedCache.getCacheArchives(jobConf);
for (Path p : new Path[]{new Path(getAppPath(), archive), rootArchive}) {
boolean found = false;
for (URI c : archivesInCache) {
if (!found && p.toUri().getPath().equals(c.getPath())) {
found = true;
}
}
assertTrue("archive " + p.toUri().getPath() + " not found in cache", found);
}
}
}