| /** |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| |
| package org.apache.oozie.action.hadoop; |
| |
| import org.apache.hadoop.conf.Configuration; |
| import org.apache.hadoop.fs.FileSystem; |
| import org.apache.hadoop.fs.Path; |
| import org.apache.hadoop.mapred.JobID; |
| import org.apache.hadoop.yarn.api.records.ApplicationId; |
| import org.apache.hadoop.yarn.exceptions.YarnException; |
| import org.apache.hadoop.yarn.util.ConverterUtils; |
| import org.apache.oozie.DagELFunctions; |
| import org.apache.oozie.WorkflowActionBean; |
| import org.apache.oozie.WorkflowJobBean; |
| import org.apache.oozie.action.ActionExecutor; |
| import org.apache.oozie.action.oozie.JavaSleepAction; |
| import org.apache.oozie.client.OozieClient; |
| import org.apache.oozie.client.OozieClientException; |
| import org.apache.oozie.client.WorkflowAction; |
| import org.apache.oozie.client.WorkflowJob; |
| import org.apache.oozie.command.CommandException; |
| import org.apache.oozie.command.wf.KillXCommand; |
| import org.apache.oozie.service.CallbackService; |
| import org.apache.oozie.service.ELService; |
| import org.apache.oozie.service.HadoopAccessorException; |
| import org.apache.oozie.service.HadoopAccessorService; |
| import org.apache.oozie.service.LiteWorkflowStoreService; |
| import org.apache.oozie.service.Services; |
| import org.apache.oozie.service.UUIDService; |
| import org.apache.oozie.service.ShareLibService; |
| import org.apache.oozie.service.WorkflowAppService; |
| import org.apache.oozie.service.WorkflowStoreService; |
| import org.apache.oozie.test.XHCatTestCase; |
| import org.apache.oozie.util.ELEvaluator; |
| import org.apache.oozie.util.IOUtils; |
| import org.apache.oozie.util.XConfiguration; |
| import org.apache.oozie.util.XmlUtils; |
| import org.apache.oozie.workflow.WorkflowApp; |
| import org.apache.oozie.workflow.WorkflowInstance; |
| import org.apache.oozie.workflow.WorkflowLib; |
| import org.apache.oozie.workflow.lite.EndNodeDef; |
| import org.apache.oozie.workflow.lite.LiteWorkflowApp; |
| import org.apache.oozie.workflow.lite.StartNodeDef; |
| |
| import java.io.BufferedReader; |
| import java.io.File; |
| import java.io.FileOutputStream; |
| import java.io.IOException; |
| import java.io.InputStreamReader; |
| import java.io.OutputStreamWriter; |
| import java.io.PrintWriter; |
| import java.io.Reader; |
| import java.io.StringReader; |
| import java.io.Writer; |
| import java.net.URI; |
| import java.net.URISyntaxException; |
| import java.nio.charset.StandardCharsets; |
| import java.util.ArrayList; |
| import java.util.Date; |
| import java.util.HashMap; |
| import java.util.List; |
| import java.util.Map; |
| import java.util.Properties; |
| import java.util.Collection; |
| import java.io.InputStream; |
| import java.io.OutputStream; |
| import java.io.FileInputStream; |
| |
| import java.text.SimpleDateFormat; |
| |
| public abstract class ActionExecutorTestCase extends XHCatTestCase { |
| protected static final int JOB_TIMEOUT = 100_000; |
| |
| @Override |
| protected void setUp() throws Exception { |
| beforeSetUp(); |
| super.setUp(); |
| setSystemProps(); |
| new Services().init(); |
| } |
| |
| protected void setSystemProps() throws Exception { |
| } |
| |
| protected void beforeSetUp() throws Exception { |
| } |
| |
| @Override |
| protected void tearDown() throws Exception { |
| if (Services.get() != null) { |
| Services.get().destroy(); |
| } |
| super.tearDown(); |
| } |
| |
| public class Context implements ActionExecutor.Context { |
| private WorkflowActionBean action; |
| private WorkflowJobBean workflow; |
| boolean started; |
| boolean executed; |
| boolean ended; |
| private Map<String, String> vars = new HashMap<String, String>(); |
| |
| public Context(WorkflowJobBean workflow, WorkflowActionBean action) { |
| this.workflow = workflow; |
| this.action = action; |
| } |
| |
| public String getCallbackUrl(String externalStatusVar) { |
| return Services.get().get(CallbackService.class).createCallBackUrl(action.getId(), externalStatusVar); |
| } |
| |
| public Configuration getProtoActionConf() { |
| String s = workflow.getProtoActionConf(); |
| try { |
| return new XConfiguration(new StringReader(s)); |
| } |
| catch (IOException ex) { |
| throw new RuntimeException(ex); |
| } |
| } |
| |
| public WorkflowJob getWorkflow() { |
| return workflow; |
| } |
| |
| public WorkflowAction getAction() { |
| return action; |
| } |
| |
| public ELEvaluator getELEvaluator() { |
| ELEvaluator evaluator = Services.get().get(ELService.class).createEvaluator("workflow"); |
| DagELFunctions.configureEvaluator(evaluator, workflow, action); |
| try { |
| XConfiguration xconf = new XConfiguration(new StringReader(action.getConf())); |
| for (Map.Entry<String, String> entry : xconf){ |
| evaluator.setVariable(entry.getKey(), entry.getValue()); |
| } |
| } |
| catch (IOException ex) { |
| throw new RuntimeException(ex); |
| } |
| return evaluator; |
| } |
| |
| public void setVar(String name, String value) { |
| if (value != null) { |
| vars.put(name, value); |
| } |
| else { |
| vars.remove(name); |
| } |
| } |
| |
| public String getVar(String name) { |
| return vars.get(name); |
| } |
| |
| public void setStartData(String externalId, String trackerUri, String consoleUrl) { |
| action.setStartData(externalId, trackerUri, consoleUrl); |
| started = true; |
| } |
| |
| public void setExecutionData(String externalStatus, Properties actionData) { |
| action.setExecutionData(externalStatus, actionData); |
| executed = true; |
| } |
| |
| public String getExecutionStats() { |
| return action.getExecutionStats(); |
| } |
| |
| public void setExecutionStats(String jsonStats) { |
| action.setExecutionStats(jsonStats); |
| } |
| |
| public String getExternalChildIDs() { |
| return action.getExternalChildIDs(); |
| } |
| |
| public void setExternalChildIDs(String externalChildIDs) { |
| action.setExternalChildIDs(externalChildIDs); |
| } |
| |
| public void setEndData(WorkflowAction.Status status, String signalValue) { |
| action.setEndData(status, signalValue); |
| ended = true; |
| } |
| |
| public boolean isRetry() { |
| throw new UnsupportedOperationException(); |
| } |
| |
| public boolean isStarted() { |
| return started; |
| } |
| |
| public boolean isExecuted() { |
| return executed; |
| } |
| |
| public boolean isEnded() { |
| return ended; |
| } |
| |
| public void setExternalStatus(String externalStatus) { |
| action.setExternalStatus(externalStatus); |
| } |
| |
| @Override |
| public String getRecoveryId() { |
| return action.getId(); |
| } |
| |
| public Path getActionDir() throws URISyntaxException, IOException { |
| String name = getWorkflow().getId() + "/" + action.getName() + "--" + action.getType(); |
| FileSystem fs = getAppFileSystem(); |
| String actionDirPath = Services.get().getSystemId() + "/" + name; |
| Path fqActionDir = new Path(fs.getHomeDirectory(), actionDirPath); |
| return fqActionDir; |
| } |
| |
| public FileSystem getAppFileSystem() throws IOException, URISyntaxException { |
| return getFileSystem(); |
| } |
| |
| @Override |
| public void setErrorInfo(String str, String exMsg) { |
| action.setErrorInfo(str, exMsg); |
| } |
| } |
| |
| protected Path getAppPath() { |
| Path baseDir = getFsTestCaseDir(); |
| return new Path(baseDir, "app"); |
| } |
| |
| protected XConfiguration getBaseProtoConf() { |
| XConfiguration protoConf = new XConfiguration(); |
| protoConf.set(WorkflowAppService.HADOOP_USER, getTestUser()); |
| |
| return protoConf; |
| } |
| |
| /** |
| * Return a workflow job which contains one action with no configuration. |
| * |
| * @param protoConf |
| * @param actionName |
| * @return workflow job bean |
| * @throws Exception |
| */ |
| protected WorkflowJobBean createBaseWorkflow(XConfiguration protoConf, String actionName) throws Exception { |
| String content = "<workflow-app xmlns='uri:oozie:workflow:1.0' xmlns:sla='uri:oozie:sla:0.1' name='no-op-wf'>"; |
| content += "<start to='end' />"; |
| content += "<end name='end' /></workflow-app>"; |
| |
| return createBaseWorkflow(protoConf, actionName, content); |
| } |
| |
| protected WorkflowJobBean createBaseWorkflow(XConfiguration protoConf, String actionName, String content) throws Exception { |
| Path appUri = new Path(getAppPath(), "workflow.xml"); |
| |
| writeToFile(content, getAppPath(), "workflow.xml"); |
| |
| WorkflowApp app = new LiteWorkflowApp("testApp", "<workflow-app/>", |
| new StartNodeDef(LiteWorkflowStoreService.LiteControlNodeHandler.class, |
| "end")) |
| .addNode(new EndNodeDef("end", LiteWorkflowStoreService.LiteControlNodeHandler.class)); |
| XConfiguration wfConf = new XConfiguration(); |
| wfConf.set(OozieClient.USER_NAME, getTestUser()); |
| wfConf.set(OozieClient.APP_PATH, appUri.toString()); |
| |
| WorkflowJobBean workflow = createWorkflow(app, wfConf, protoConf); |
| |
| WorkflowActionBean action = new WorkflowActionBean(); |
| action.setName(actionName); |
| action.setId(Services.get().get(UUIDService.class).generateChildId(workflow.getId(), actionName)); |
| workflow.getActions().add(action); |
| return workflow; |
| } |
| |
| /** |
| * Return a workflow job which contains one action with no configuration and workflow contains credentials information. |
| * |
| * @param protoConf |
| * @param actionName |
| * @return workflow job bean |
| * @throws Exception |
| */ |
| protected WorkflowJobBean createBaseWorkflowWithCredentials(XConfiguration protoConf, String actionName) |
| throws Exception { |
| Path appUri = new Path(getAppPath(), "workflow.xml"); |
| Reader reader = IOUtils.getResourceAsReader("wf-credentials.xml", -1); |
| String wfxml = IOUtils.getReaderAsString(reader, -1); |
| |
| writeToFile(wfxml, getAppPath(), "workflow.xml"); |
| |
| WorkflowApp app = new LiteWorkflowApp("test-wf-cred", wfxml, |
| new StartNodeDef(LiteWorkflowStoreService.LiteControlNodeHandler.class, "start")). |
| addNode(new EndNodeDef("end", LiteWorkflowStoreService.LiteControlNodeHandler.class)); |
| XConfiguration wfConf = new XConfiguration(); |
| wfConf.set(OozieClient.USER_NAME, getTestUser()); |
| wfConf.set(OozieClient.APP_PATH, appUri.toString()); |
| |
| |
| WorkflowJobBean workflow = createWorkflow(app, wfConf, protoConf); |
| |
| WorkflowActionBean action = new WorkflowActionBean(); |
| action.setName(actionName); |
| action.setId(Services.get().get(UUIDService.class).generateChildId(workflow.getId(), actionName)); |
| workflow.getActions().add(action); |
| return workflow; |
| } |
| |
| private WorkflowJobBean createWorkflow(WorkflowApp app, Configuration conf, XConfiguration protoConf) |
| throws Exception { |
| WorkflowLib workflowLib = Services.get().get(WorkflowStoreService.class).getWorkflowLibWithNoDB(); |
| WorkflowInstance wfInstance; |
| wfInstance = workflowLib.createInstance(app, conf); |
| WorkflowJobBean workflow = new WorkflowJobBean(); |
| workflow.setId(wfInstance.getId()); |
| workflow.setAppName(app.getName()); |
| workflow.setAppPath(conf.get(OozieClient.APP_PATH)); |
| workflow.setConf(XmlUtils.prettyPrint(conf).toString()); |
| workflow.setProtoActionConf(XmlUtils.prettyPrint(protoConf).toString()); |
| workflow.setCreatedTime(new Date()); |
| workflow.setLogToken(conf.get(OozieClient.LOG_TOKEN, "")); |
| workflow.setStatus(WorkflowJob.Status.PREP); |
| workflow.setRun(0); |
| workflow.setUser(conf.get(OozieClient.USER_NAME)); |
| workflow.setGroup(conf.get(OozieClient.GROUP_NAME)); |
| workflow.setWorkflowInstance(wfInstance); |
| return workflow; |
| } |
| |
| private void writeToFile(String content, Path appPath, String fileName) throws IOException { |
| FileSystem fs = getFileSystem(); |
| Writer writer = new OutputStreamWriter(fs.create(new Path(appPath, fileName), true), |
| StandardCharsets.UTF_8); |
| writer.write(content); |
| writer.close(); |
| } |
| |
| protected void writeToFile(final String appXml, final String appPath) throws IOException { |
| final File wf = new File(URI.create(appPath)); |
| PrintWriter out = null; |
| try { |
| out = new PrintWriter(new OutputStreamWriter(new FileOutputStream(wf), StandardCharsets.UTF_8)); |
| out.println(appXml); |
| } |
| catch (final IOException iex) { |
| throw iex; |
| } |
| finally { |
| if (out != null) { |
| out.close(); |
| } |
| } |
| } |
| |
| protected String submitWorkflow(final String workflowUri, final OozieClient wfClient) throws OozieClientException { |
| final Properties conf = wfClient.createConfiguration(); |
| conf.setProperty(OozieClient.APP_PATH, workflowUri); |
| conf.setProperty(OozieClient.USER_NAME, getTestUser()); |
| conf.setProperty("appName", "var-app-name"); |
| |
| final String jobId = wfClient.submit(conf); |
| wfClient.start(jobId); |
| |
| return jobId; |
| } |
| |
| protected ApplicationId getChildMRJobApplicationId(final Configuration conf) throws IOException { |
| final List<ApplicationId> applicationIdList = new ArrayList<>(); |
| final Path inputDir = new Path(getFsTestCaseDir(), "input"); |
| final Path wfIDFile = new Path(inputDir, LauncherMainTester.JOB_ID_FILE_NAME); |
| final FileSystem fs = FileSystem.get(conf); |
| |
| // wait until we have the running child MR job's ID from HDFS |
| waitFor(JOB_TIMEOUT, new ApplicationIdExistsPredicate(fs, wfIDFile)); |
| if (!fs.exists(wfIDFile) || !fs.isFile(wfIDFile)) { |
| throw new IOException("Workflow ID file does not exist: " + wfIDFile.toString()); |
| } |
| |
| try (final BufferedReader reader = |
| new BufferedReader(new InputStreamReader(fs.open(wfIDFile) ,StandardCharsets.UTF_8))) { |
| final String line = reader.readLine(); |
| JobID.forName(line); |
| final String jobID = line; |
| final String appID = jobID.replace("job", "application"); |
| final ApplicationId id = ConverterUtils.toApplicationId(appID); |
| applicationIdList.add(id); |
| } |
| |
| assertTrue("Application ID should've been found. No external Child ID was found in " + wfIDFile.toString(), |
| applicationIdList.size() == 1); |
| |
| return applicationIdList.get(0); |
| } |
| |
| private static class ApplicationIdExistsPredicate implements Predicate { |
| private final FileSystem fs; |
| private final Path wfIDFile; |
| |
| ApplicationIdExistsPredicate(final FileSystem fs, final Path wfIDFile) { |
| this.fs = fs; |
| this.wfIDFile = wfIDFile; |
| } |
| |
| @Override |
| public boolean evaluate() throws Exception { |
| return fs.exists(wfIDFile) && fs.getFileStatus(wfIDFile).getLen() > 0; |
| } |
| } |
| |
| protected static class WorkflowActionRunningPredicate extends WorkflowActionStatusPredicate { |
| WorkflowActionRunningPredicate(final OozieClient wfClient, final String jobId) { |
| super(wfClient, jobId, WorkflowJob.Status.RUNNING, WorkflowAction.Status.RUNNING); |
| } |
| } |
| |
| protected static class WorkflowActionKilledPredicate extends WorkflowActionStatusPredicate { |
| WorkflowActionKilledPredicate(final OozieClient wfClient, final String jobId) { |
| super(wfClient, jobId, WorkflowJob.Status.KILLED, WorkflowAction.Status.KILLED); |
| } |
| } |
| |
| private static abstract class WorkflowActionStatusPredicate implements Predicate { |
| private final OozieClient wfClient; |
| private final String jobId; |
| private final WorkflowJob.Status expectedWorkflowJobStatus; |
| private final WorkflowAction.Status expectedWorkflowActionStatus; |
| |
| WorkflowActionStatusPredicate(final OozieClient wfClient, |
| final String jobId, |
| final WorkflowJob.Status expectedWorkflowJobStatus, |
| final WorkflowAction.Status expectedWorkflowActionStatus) { |
| this.wfClient = wfClient; |
| this.jobId = jobId; |
| this.expectedWorkflowJobStatus = expectedWorkflowJobStatus; |
| this.expectedWorkflowActionStatus = expectedWorkflowActionStatus; |
| } |
| |
| @Override |
| public boolean evaluate() throws Exception { |
| final WorkflowJob.Status actualWorkflowJobStatus = wfClient.getJobInfo(jobId).getStatus(); |
| final boolean isWorkflowInState = actualWorkflowJobStatus.equals(expectedWorkflowJobStatus); |
| |
| final WorkflowAction.Status actualWorkflowActionStatus = wfClient.getJobInfo(jobId).getActions().get(1).getStatus(); |
| final boolean isWorkflowActionInState = actualWorkflowActionStatus.equals(expectedWorkflowActionStatus); |
| |
| return isWorkflowInState && isWorkflowActionInState; |
| } |
| } |
| |
| protected void killWorkflow(final String jobId) throws CommandException { |
| new KillXCommand(jobId).call(); |
| } |
| |
| protected void waitForWorkflowToStart(final OozieClient wfClient, final String jobId) { |
| waitFor(JOB_TIMEOUT, new WorkflowActionRunningPredicate(wfClient,jobId)); |
| } |
| |
| protected void waitForWorkflowToKill(final OozieClient wfClient, final String jobId) { |
| waitFor(JOB_TIMEOUT, new WorkflowActionKilledPredicate(wfClient,jobId)); |
| } |
| |
| protected String getJavaAction(final boolean launchMRAction) { |
| final Path inputDir = new Path(getFsTestCaseDir(), "input"); |
| final Path outputDir = new Path(getFsTestCaseDir(), "output"); |
| final String javaActionXml = "<java>" + |
| "<job-tracker>" + getJobTrackerUri() + "</job-tracker>" + |
| "<name-node>" + getNameNodeUri() + "</name-node>" + |
| "<main-class>" + JavaSleepAction.class.getName()+ "</main-class>" + |
| "</java>"; |
| final String javaWithMRActionXml = "<java>" + |
| "<job-tracker>" + getJobTrackerUri() + "</job-tracker>" + |
| "<name-node>" + getNameNodeUri() + "</name-node>" + |
| "<main-class>" + LauncherMainTester.class.getName()+ "</main-class>" + |
| "<arg>javamapreduce</arg>" + |
| "<arg>"+inputDir.toString()+"</arg>" + |
| "<arg>"+outputDir.toString()+"</arg>" + |
| "</java>"; |
| |
| return launchMRAction ? javaWithMRActionXml : javaActionXml; |
| } |
| |
| void killYarnApplication(final Configuration configuration, final ApplicationId yarnApplicationId) |
| throws HadoopAccessorException, IOException, YarnException { |
| getHadoopAccessorService().createYarnClient(getTestUser(), configuration).killApplication(yarnApplicationId); |
| } |
| |
| HadoopAccessorService getHadoopAccessorService() { |
| return Services.get().get(HadoopAccessorService.class); |
| } |
| |
| void createFiles(Collection<Path> paths) throws Exception{ |
| for(Path p : paths){ |
| getFileSystem().create(p); |
| } |
| } |
| |
| void makeDirs(Path... dirs) throws Exception{ |
| for(Path p : dirs){ |
| getFileSystem().mkdirs(p); |
| } |
| } |
| |
| private void assertContainsJarsOrNot(boolean contains, String cacheFilesStr, Collection<Path> jars) { |
| for (Path jar : jars) { |
| assertEquals("Unexpected distributed cache file content", contains, cacheFilesStr.contains(jar.toString())); |
| } |
| } |
| |
| void assertContainsJars(String cacheFilesStr, Collection<Path> jars) { |
| assertContainsJarsOrNot(true, cacheFilesStr, jars); |
| } |
| |
| void assertNotContainsJars(String cacheFilesStr, Collection<Path> jars) { |
| assertContainsJarsOrNot(false, cacheFilesStr, jars); |
| } |
| |
| protected Context createContext(String actionXml, String group) throws Exception { |
| JavaActionExecutor ae = new JavaActionExecutor(); |
| |
| Path appJarPath = new Path("lib/test.jar"); |
| File jarFile = IOUtils.createJar(new File(getTestCaseDir()), "test.jar", LauncherMainTester.class); |
| InputStream is = new FileInputStream(jarFile); |
| OutputStream os = getFileSystem().create(new Path(getAppPath(), "lib/test.jar")); |
| IOUtils.copyStream(is, os); |
| |
| Path appSoPath = new Path("lib/test.so"); |
| getFileSystem().create(new Path(getAppPath(), appSoPath)).close(); |
| |
| XConfiguration protoConf = new XConfiguration(); |
| protoConf.set(WorkflowAppService.HADOOP_USER, getTestUser()); |
| protoConf.setStrings(WorkflowAppService.APP_LIB_PATH_LIST, appJarPath.toString(), appSoPath.toString()); |
| |
| WorkflowJobBean wf = createBaseWorkflow(protoConf, "action"); |
| if(group != null) { |
| wf.setGroup(group); |
| } |
| WorkflowActionBean action = (WorkflowActionBean) wf.getActions().get(0); |
| action.setType(ae.getType()); |
| action.setConf(actionXml); |
| |
| return new Context(wf, action); |
| } |
| |
| Path getNewSystemLibPath() { |
| WorkflowAppService wps = Services.get().get(WorkflowAppService.class); |
| return new Path(wps.getSystemLibPath(), ShareLibService.SHARE_LIB_PREFIX |
| + new SimpleDateFormat("yyyyMMddHHmmss").format(new Date())); |
| } |
| } |