| /** |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| |
| package org.apache.oozie.action.hadoop; |
| |
| import java.io.File; |
| import java.io.OutputStreamWriter; |
| import java.io.Writer; |
| import java.nio.charset.StandardCharsets; |
| import java.util.Map; |
| |
| import org.apache.hadoop.conf.Configuration; |
| import org.apache.hadoop.fs.FileSystem; |
| import org.apache.hadoop.fs.Path; |
| import org.apache.hadoop.util.Shell; |
| import org.apache.oozie.WorkflowActionBean; |
| import org.apache.oozie.WorkflowJobBean; |
| import org.apache.oozie.client.WorkflowAction; |
| import org.apache.oozie.service.Services; |
| import org.apache.oozie.service.WorkflowAppService; |
| import org.apache.oozie.util.PropertiesUtils; |
| import org.apache.oozie.util.XConfiguration; |
| import org.apache.oozie.util.XmlUtils; |
| import org.jdom.Element; |
| import org.junit.Assert; |
| |
| public class TestShellActionExecutor extends ActionExecutorTestCase { |
| |
| private static final String SHELL_EXEC = Shell.WINDOWS ? "cmd.exe" : "sh"; |
| private static final String SHELL_PARAM = Shell.WINDOWS ? "/c" : "-c"; |
| private static final String SHELL_SCRIPTNAME = Shell.WINDOWS ? "script.cmd" : "script.sh"; |
| private static final String SHELL_SCRIPT_CONTENT = Shell.WINDOWS |
| ? "dir /s /b\necho %1 %2\necho %PATH%\ntype %0" |
| : "ls -ltr\necho $1 $2\necho $PATH\npwd\ntype sh"; |
| private static final String SHELL_SCRIPT_CONTENT_ENVVAR = Shell.WINDOWS |
| ? "dir /s /b\necho var1=%var1%\necho var2=%var2%" |
| : "ls -ltr\necho var1=$var1\necho var2=$var2"; |
| private static final String SHELL_SCRIPT_CONTENT_ERROR = Shell.WINDOWS |
| ? "dir /s /b\necho %1 %2\nexit 1" |
| : "ls -ltr\necho $1 $2\nexit 1"; |
| private static final String PERL_SCRIPT_CONTENT = "print \"MY_VAR=TESTING\";"; |
| private static final String SHELL_SCRIPT_HADOOP_CONF_DIR_CONTENT = Shell.WINDOWS |
| ? "echo OOZIE_ACTION_CONF_XML=%OOZIE_ACTION_CONF_XML%\necho HADOOP_CONF_DIR=%HADOOP_CONF_DIR%\n" |
| : "echo OOZIE_ACTION_CONF_XML=$OOZIE_ACTION_CONF_XML\necho HADOOP_CONF_DIR=$HADOOP_CONF_DIR\n"; |
| private static final String SHELL_SCRIPT_YARN_CONF_DIR_CONTENT = Shell.WINDOWS |
| ? "echo YARN_CONF_DIR=%YARN_CONF_DIR%\n" |
| : "echo YARN_CONF_DIR=$YARN_CONF_DIR\n"; |
| private static final String SHELL_SCRIPT_LOG4J_EXISTENCE_CHECKER = Shell.WINDOWS |
| ? "IF EXIST %HADOOP_CONF_DIR%\\log4j.properties echo L4J_EXISTS=yes\n" |
| : "if [ -f $HADOOP_CONF_DIR/log4j.properties ]; then echo L4J_EXISTS=yes; fi\n"; |
| private static final String SHELL_SCRIPT_LOG4J_CONTENT_COUNTER = Shell.WINDOWS |
| ? "FOR /f %%i IN ('TYPE %HADOOP_CONF_DIR%\\log4j.properties " + |
| "^| FIND /c /v \"~DOESNOTMATCH~\"') DO " + |
| "SET L4J_LC=%%i\necho L4J_LC=%L4J_LC%\n" + |
| "FOR /f %%i IN ('FINDSTR \"CLA CLRA\" %HADOOP_CONF_DIR%\\log4j.properties " + |
| "^| FIND /c /v \"~DOESNOTMATCH~\"') DO " + |
| "SET L4J_APPENDER=%%i\necho L4J_APPENDER=%L4J_APPENDER%\n" |
| : "echo L4J_LC=$(cat $HADOOP_CONF_DIR/log4j.properties | wc -l)\n" + |
| "echo L4J_APPENDER=$(grep -e 'CLA' -e 'CLRA' -c " + |
| "$HADOOP_CONF_DIR/log4j.properties)\n"; |
| |
| /** |
| * Verify if the ShellActionExecutor indeed setups the basic stuffs |
| * |
| * @throws Exception |
| */ |
| public void testSetupMethods() throws Exception { |
| ShellActionExecutor ae = new ShellActionExecutor(); |
| assertNull(ae.getLauncherClasses()); |
| Element actionXml = XmlUtils.parseXml("<shell>" + "<job-tracker>" + getJobTrackerUri() + "</job-tracker>" |
| + "<name-node>" + getNameNodeUri() + "</name-node>" + "<exec>SCRIPT</exec>" |
| + "<argument>a=A</argument>" + "<argument>b=B</argument>" + "</shell>"); |
| |
| XConfiguration protoConf = new XConfiguration(); |
| protoConf.set(WorkflowAppService.HADOOP_USER, getTestUser()); |
| |
| |
| WorkflowJobBean wf = createBaseWorkflow(protoConf, "pig-action"); |
| WorkflowActionBean action = (WorkflowActionBean) wf.getActions().get(0); |
| action.setType(ae.getType()); |
| |
| Context context = new Context(wf, action); |
| |
| Configuration conf = ae.createBaseHadoopConf(context, actionXml); |
| ae.setupActionConf(conf, context, actionXml, getFsTestCaseDir()); |
| assertEquals("SCRIPT", conf.get("oozie.shell.exec")); |
| assertEquals("2", conf.get("oozie.shell.args.size")); |
| assertEquals("a=A", conf.get("oozie.shell.args.0")); |
| assertEquals("b=B", conf.get("oozie.shell.args.1")); |
| assertEquals("Expected HADOOP_CONF_DIR setup switch to be disabled", |
| "false", conf.get("oozie.action.shell.setup.hadoop.conf.dir")); |
| assertEquals("Expected log4j.properties write switch to be enabled", |
| "true", conf.get("oozie.action.shell.setup.hadoop.conf.dir.write.log4j.properties")); |
| assertNotNull("Expected a default config to exist for log4j.properties", |
| conf.get("oozie.action.shell.setup.hadoop.conf.dir.log4j.content")); |
| } |
| |
| /** |
| * test if a sample shell script could run successfully |
| * |
| * @throws Exception |
| */ |
| public void testShellScript() throws Exception { |
| FileSystem fs = getFileSystem(); |
| // Create the script file with canned shell command |
| Path script = new Path(getAppPath(), SHELL_SCRIPTNAME); |
| Writer w = new OutputStreamWriter(fs.create(script), StandardCharsets.UTF_8); |
| w.write(SHELL_SCRIPT_CONTENT); |
| w.close(); |
| |
| // Create sample Shell action xml |
| String actionXml = "<shell>" + "<job-tracker>" + getJobTrackerUri() + "</job-tracker>" + "<name-node>" |
| + getNameNodeUri() + "</name-node>" + "<exec>" + SHELL_EXEC + "</exec>" + "<argument>" + SHELL_PARAM + "</argument>" |
| + "<argument>" + SHELL_SCRIPTNAME + "</argument>" + "<argument>A</argument>" + "<argument>B</argument>" |
| + "<env-var>var1=val1</env-var>" + "<env-var>var2=val2</env-var>" + "<file>" + script.toString() |
| + "#" + script.getName() + "</file>" + "</shell>"; |
| // Submit and verify the job's status |
| _testSubmit(actionXml, true, ""); |
| } |
| |
| /** |
| * Test if a shell script could run successfully with {@link ShellMain#CONF_OOZIE_SHELL_SETUP_HADOOP_CONF_DIR} enabled. |
| * |
| * @throws Exception |
| */ |
| public void testShellScriptHadoopConfDir() throws Exception { |
| FileSystem fs = getFileSystem(); |
| // Create the script file with canned shell command |
| Path script = new Path(getAppPath(), SHELL_SCRIPTNAME); |
| Writer w = new OutputStreamWriter(fs.create(script), StandardCharsets.UTF_8); |
| w.write(SHELL_SCRIPT_HADOOP_CONF_DIR_CONTENT); |
| w.write(SHELL_SCRIPT_YARN_CONF_DIR_CONTENT); |
| w.write(SHELL_SCRIPT_LOG4J_EXISTENCE_CHECKER); |
| w.write(SHELL_SCRIPT_LOG4J_CONTENT_COUNTER); |
| w.close(); |
| |
| // Create sample Shell action xml |
| String actionXml = "<shell>" + "<job-tracker>" + getJobTrackerUri() + "</job-tracker>" + "<name-node>" |
| + getNameNodeUri() + "</name-node>" + "<configuration>" |
| + "<property><name>oozie.action.shell.setup.hadoop.conf.dir</name><value>true</value></property>" |
| + "</configuration>" + "<exec>" + SHELL_EXEC + "</exec>" + "<argument>" + SHELL_PARAM + "</argument>" |
| + "<argument>" + SHELL_SCRIPTNAME + "</argument>" + "<file>" + script.toString() |
| + "#" + script.getName() + "</file>" + "<capture-output/>" + "</shell>"; |
| // Submit and verify the job's status |
| WorkflowAction action = _testSubmit(actionXml, true, ""); |
| String oozieActionConfXml = PropertiesUtils.stringToProperties(action.getData()).getProperty("OOZIE_ACTION_CONF_XML"); |
| String hadoopConfDir = PropertiesUtils.stringToProperties(action.getData()).getProperty("HADOOP_CONF_DIR"); |
| String yarnConfDir = PropertiesUtils.stringToProperties(action.getData()).getProperty("YARN_CONF_DIR"); |
| String log4jExists = PropertiesUtils.stringToProperties(action.getData()).getProperty("L4J_EXISTS"); |
| String log4jFileLineCount = PropertiesUtils.stringToProperties(action.getData()).getProperty("L4J_LC"); |
| String log4BadAppenderCount = PropertiesUtils.stringToProperties(action.getData()).getProperty("L4J_APPENDER"); |
| assertNotNull(oozieActionConfXml); |
| assertNotNull(hadoopConfDir); |
| String s = new File(oozieActionConfXml).getParent() + File.separator + "oozie-hadoop-conf-"; |
| Assert.assertTrue( |
| "Expected HADOOP_CONF_DIR to start with " + s + " but was " + hadoopConfDir, |
| hadoopConfDir.startsWith(s)); |
| Assert.assertTrue( |
| "Expected YARN_CONF_DIR to start with " + s + " but was " + yarnConfDir, |
| yarnConfDir.startsWith(s)); |
| Assert.assertEquals( |
| "Expected log4j.properties file to exist", "yes", log4jExists); |
| Assert.assertTrue( |
| "Expected log4j.properties to have non-zero line count, but has: " + log4jFileLineCount, |
| Integer.parseInt(log4jFileLineCount) > 0); |
| Assert.assertEquals( |
| "Expected log4j.properties to have no container appender references (CLA/CLRA)", |
| 0, Integer.parseInt(log4BadAppenderCount)); |
| } |
| |
| /** |
| * Test if a shell script could run successfully with {@link ShellMain#CONF_OOZIE_SHELL_SETUP_HADOOP_CONF_DIR} enabled. |
| * Run with {@link ShellMain#CONF_OOZIE_SHELL_SETUP_HADOOP_CONF_DIR_WRITE_LOG4J_PROPERTIES} disabled. |
| * |
| * @throws Exception |
| */ |
| public void testShellScriptHadoopConfDirWithNoL4J() throws Exception { |
| FileSystem fs = getFileSystem(); |
| // Create the script file with canned shell command |
| Path script = new Path(getAppPath(), SHELL_SCRIPTNAME); |
| Writer w = new OutputStreamWriter(fs.create(script), StandardCharsets.UTF_8); |
| w.write(SHELL_SCRIPT_LOG4J_EXISTENCE_CHECKER); |
| w.close(); |
| |
| // Create sample Shell action xml |
| String actionXml = "<shell>" + "<job-tracker>" + getJobTrackerUri() + "</job-tracker>" + "<name-node>" |
| + getNameNodeUri() + "</name-node>" + "<configuration>" |
| + "<property><name>oozie.action.shell.setup.hadoop.conf.dir</name><value>true</value></property>" |
| + "<property><name>oozie.action.shell.setup.hadoop.conf.dir.write.log4j.properties" |
| + "</name><value>false</value></property>" |
| + "</configuration>" + "<exec>" + SHELL_EXEC + "</exec>" + "<argument>" + SHELL_PARAM + "</argument>" |
| + "<argument>" + SHELL_SCRIPTNAME + "</argument>" + "<file>" + script.toString() |
| + "#" + script.getName() + "</file>" + "<capture-output/>" + "</shell>"; |
| // Submit and verify the job's status |
| WorkflowAction action = _testSubmit(actionXml, true, ""); |
| String log4jExists = PropertiesUtils.stringToProperties(action.getData()).getProperty("L4J_EXISTS"); |
| Assert.assertNull( |
| "Expected no log4j.properties file to exist", log4jExists); |
| } |
| |
| /** |
| * test if a sample shell script could run with error when the script return |
| * non-zero exit code |
| * |
| * @throws Exception |
| */ |
| public void testShellScriptError() throws Exception { |
| FileSystem fs = getFileSystem(); |
| // Create the script file with canned shell command |
| Path script = new Path(getAppPath(), SHELL_SCRIPTNAME); |
| Writer w = new OutputStreamWriter(fs.create(script), StandardCharsets.UTF_8); |
| w.write(SHELL_SCRIPT_CONTENT_ERROR); |
| w.close(); |
| |
| // Create sample shell action xml |
| String actionXml = "<shell>" + "<job-tracker>" + getJobTrackerUri() + "</job-tracker>" + "<name-node>" |
| + getNameNodeUri() + "</name-node>" + "<exec>" + SHELL_EXEC + "</exec>" + "<argument>" + SHELL_PARAM + "</argument>" |
| + "<argument>" + SHELL_SCRIPTNAME + "</argument>" + "<argument>A</argument>" + "<argument>B</argument>" + "<file>" |
| + script.toString() + "#" + script.getName() + "</file>" + "</shell>"; |
| // Submit and verify the job's status |
| _testSubmit(actionXml, false, ""); |
| } |
| |
| /** |
| * test if a perl script could run successfully |
| * |
| * @throws Exception |
| */ |
| public void testPerlScript() throws Exception { |
| if (Shell.WINDOWS) { |
| System.out.println("Windows cannot natively execute perl. Skipping test"); |
| return; |
| } |
| |
| FileSystem fs = getFileSystem(); |
| // Create a sample perl script |
| Path script = new Path(getAppPath(), "script.pl"); |
| Writer w = new OutputStreamWriter(fs.create(script), StandardCharsets.UTF_8); |
| w.write(PERL_SCRIPT_CONTENT); |
| w.close(); |
| // Create a Sample Shell action using the perl script |
| String actionXml = "<shell>" + "<job-tracker>" + getJobTrackerUri() + "</job-tracker>" + "<name-node>" |
| + getNameNodeUri() + "</name-node>" + "<exec>perl</exec>" + "<argument>script.pl</argument>" |
| + "<argument>A</argument>" + "<argument>B</argument>" + "<env-var>my_var1=my_val1</env-var>" + "<file>" |
| + script.toString() + "#" + script.getName() + "</file>" + "<capture-output/>" + "</shell>"; |
| _testSubmit(actionXml, true, "TESTING"); |
| } |
| |
| /** |
| * Test that env variable can contain '=' symbol within value |
| * |
| * @throws Exception |
| */ |
| public void testEnvVar() throws Exception { |
| Services.get().destroy(); |
| Services services = new Services(); |
| services.getConf().setInt(LauncherAMUtils.CONF_OOZIE_ACTION_MAX_OUTPUT_DATA, 8 * 1042); |
| services.init(); |
| |
| FileSystem fs = getFileSystem(); |
| // Create the script file with canned shell command |
| Path script = new Path(getAppPath(), SHELL_SCRIPTNAME); |
| Writer w = new OutputStreamWriter(fs.create(script), StandardCharsets.UTF_8); |
| w.write(SHELL_SCRIPT_CONTENT_ENVVAR); |
| w.close(); |
| |
| String envValueHavingEqualSign = "a=b;c=d"; |
| // Create sample shell action xml |
| String actionXml = "<shell>" + "<job-tracker>" + getJobTrackerUri() + "</job-tracker>" + "<name-node>" |
| + getNameNodeUri() + "</name-node>" + "<exec>" + SHELL_EXEC + "</exec>" + "<argument>" + SHELL_PARAM + "</argument>" |
| + "<argument>" + SHELL_SCRIPTNAME + "</argument>" + "<argument>A</argument>" + "<argument>B</argument>" |
| + "<env-var>var1=val1</env-var>" + "<env-var>var2=" + envValueHavingEqualSign + "</env-var>" + "<file>" |
| + script.toString() |
| + "#" + script.getName() + "</file>" + "<capture-output />" + "</shell>"; |
| |
| Context context = createContext(actionXml); |
| // Submit the action |
| final String launcherId = submitAction(context); |
| waitUntilYarnAppDoneAndAssertSuccess(launcherId); |
| ShellActionExecutor ae = new ShellActionExecutor(); |
| WorkflowAction action = context.getAction(); |
| ae.check(context, action); |
| ae.end(context, action); |
| |
| // Checking action data from shell script output |
| assertEquals(envValueHavingEqualSign, PropertiesUtils.stringToProperties(action.getData()) |
| .getProperty("var2")); |
| |
| } |
| |
| /** |
| * Submit the WF with a Shell action and very of the job succeeds |
| * |
| * @param actionXml |
| * @param checkForSuccess |
| * @throws Exception |
| */ |
| private WorkflowAction _testSubmit(String actionXml, boolean checkForSuccess, String capture_output) throws Exception { |
| |
| Context context = createContext(actionXml); |
| final String launcherId = submitAction(context);// Submit the action |
| waitUntilYarnAppDoneAndAssertSuccess(launcherId); |
| |
| Configuration conf = new XConfiguration(); |
| conf.set("user.name", getTestUser()); |
| Map<String, String> actionData = LauncherHelper.getActionData(getFileSystem(), context.getActionDir(), |
| conf); |
| assertFalse(LauncherHelper.hasIdSwap(actionData)); |
| |
| ShellActionExecutor ae = new ShellActionExecutor(); |
| ae.check(context, context.getAction()); |
| ae.end(context, context.getAction()); |
| assertTrue(launcherId.equals(context.getAction().getExternalId())); |
| |
| if (checkForSuccess) { // Postive test cases |
| assertEquals("SUCCEEDED", context.getAction().getExternalStatus()); |
| // Testing capture output |
| if (capture_output != null && capture_output.length() > 0) { |
| assertEquals(capture_output, PropertiesUtils.stringToProperties(context.getAction().getData()) |
| .getProperty("MY_VAR")); |
| } |
| } |
| else { // Negative test cases |
| assertEquals("FAILED/KILLED", context.getAction().getExternalStatus()); |
| assertNotNull(context.getAction().getErrorMessage()); |
| } |
| if (checkForSuccess) { // Positive test cases |
| assertEquals(WorkflowAction.Status.OK, context.getAction().getStatus()); |
| } |
| else {// Negative test cases |
| assertEquals(WorkflowAction.Status.ERROR, context.getAction().getStatus()); |
| } |
| return context.getAction(); |
| } |
| |
| /** |
| * Create WF using the Shell action and return the corresponding contest |
| * structure |
| * |
| * @param actionXml :Shell action |
| * @return :Context |
| * @throws Exception |
| */ |
| private Context createContext(String actionXml) throws Exception { |
| ShellActionExecutor ae = new ShellActionExecutor(); |
| |
| XConfiguration protoConf = new XConfiguration(); |
| protoConf.set(WorkflowAppService.HADOOP_USER, getTestUser()); |
| // Make sure Kerbores prinicpal is in the conf |
| |
| |
| WorkflowJobBean wf = createBaseWorkflow(protoConf, "shell-action"); |
| WorkflowActionBean action = (WorkflowActionBean) wf.getActions().get(0); |
| action.setType(ae.getType()); |
| action.setConf(actionXml); |
| |
| return new Context(wf, action); |
| } |
| |
| /** |
| * Submit the Shell action using Shell ActionExecutor |
| * |
| * @param context |
| * @return The RunningJob of the Launcher Mapper |
| * @throws Exception |
| */ |
| private String submitAction(Context context) throws Exception { |
| ShellActionExecutor ae = new ShellActionExecutor(); |
| |
| WorkflowAction action = context.getAction(); |
| |
| ae.prepareActionDir(getFileSystem(), context); |
| ae.submitLauncher(getFileSystem(), context, action); // Submit the action |
| |
| String jobId = action.getExternalId(); |
| String jobTracker = action.getTrackerUri(); |
| String consoleUrl = action.getConsoleUrl(); |
| |
| assertNotNull(jobId); |
| assertNotNull(jobTracker); |
| assertNotNull(consoleUrl); |
| |
| return jobId; |
| } |
| } |