| /** |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| |
| package org.apache.oozie.action.oozie; |
| |
| import java.io.IOException; |
| import java.io.StringReader; |
| import java.util.HashSet; |
| import java.util.Set; |
| |
| import org.apache.hadoop.conf.Configuration; |
| import org.apache.oozie.DagEngine; |
| import org.apache.oozie.LocalOozieClient; |
| import org.apache.oozie.WorkflowJobBean; |
| import org.apache.oozie.action.ActionExecutor; |
| import org.apache.oozie.action.ActionExecutorException; |
| import org.apache.oozie.action.hadoop.OozieJobInfo; |
| import org.apache.oozie.client.OozieClient; |
| import org.apache.oozie.client.OozieClientException; |
| import org.apache.oozie.client.WorkflowAction; |
| import org.apache.oozie.client.WorkflowJob; |
| import org.apache.oozie.command.CommandException; |
| import org.apache.oozie.command.wf.ActionStartXCommand; |
| import org.apache.oozie.service.ConfigurationService; |
| import org.apache.oozie.service.DagEngineService; |
| import org.apache.oozie.service.Services; |
| import org.apache.oozie.util.ConfigUtils; |
| import org.apache.oozie.util.JobUtils; |
| import org.apache.oozie.util.PropertiesUtils; |
| import org.apache.oozie.util.XConfiguration; |
| import org.apache.oozie.util.XLog; |
| import org.apache.oozie.util.XmlUtils; |
| import org.jdom.Element; |
| import org.jdom.Namespace; |
| |
| public class SubWorkflowActionExecutor extends ActionExecutor { |
| public static final String ACTION_TYPE = "sub-workflow"; |
| public static final String LOCAL = "local"; |
| public static final String PARENT_ID = "oozie.wf.parent.id"; |
| public static final String SUPER_PARENT_ID = "oozie.wf.superparent.id"; |
| public static final String SUBWORKFLOW_MAX_DEPTH = "oozie.action.subworkflow.max.depth"; |
| public static final String SUBWORKFLOW_DEPTH = "oozie.action.subworkflow.depth"; |
| public static final String SUBWORKFLOW_RERUN = "oozie.action.subworkflow.rerun"; |
| |
| private static final Set<String> DISALLOWED_DEFAULT_PROPERTIES = new HashSet<String>(); |
| public XLog LOG = XLog.getLog(getClass()); |
| |
| |
| static { |
| String[] badUserProps = {PropertiesUtils.DAYS, PropertiesUtils.HOURS, PropertiesUtils.MINUTES, |
| PropertiesUtils.KB, PropertiesUtils.MB, PropertiesUtils.GB, PropertiesUtils.TB, PropertiesUtils.PB, |
| PropertiesUtils.RECORDS, PropertiesUtils.MAP_IN, PropertiesUtils.MAP_OUT, PropertiesUtils.REDUCE_IN, |
| PropertiesUtils.REDUCE_OUT, PropertiesUtils.GROUPS}; |
| |
| String[] badDefaultProps = {PropertiesUtils.HADOOP_USER}; |
| PropertiesUtils.createPropertySet(badUserProps, DISALLOWED_DEFAULT_PROPERTIES); |
| PropertiesUtils.createPropertySet(badDefaultProps, DISALLOWED_DEFAULT_PROPERTIES); |
| } |
| |
| protected SubWorkflowActionExecutor() { |
| super(ACTION_TYPE); |
| } |
| |
| public void initActionType() { |
| super.initActionType(); |
| } |
| |
| protected OozieClient getWorkflowClient(Context context, String oozieUri) { |
| OozieClient oozieClient; |
| if (oozieUri.equals(LOCAL)) { |
| WorkflowJobBean workflow = (WorkflowJobBean) context.getWorkflow(); |
| String user = workflow.getUser(); |
| String group = workflow.getGroup(); |
| DagEngine dagEngine = Services.get().get(DagEngineService.class).getDagEngine(user); |
| oozieClient = new LocalOozieClient(dagEngine); |
| } |
| else { |
| // TODO we need to add authToken to the WC for the remote case |
| oozieClient = new OozieClient(oozieUri); |
| } |
| return oozieClient; |
| } |
| |
| protected void injectInline(Element eConf, Configuration subWorkflowConf) throws IOException, |
| ActionExecutorException { |
| if (eConf != null) { |
| String strConf = XmlUtils.prettyPrint(eConf).toString(); |
| Configuration conf = new XConfiguration(new StringReader(strConf)); |
| try { |
| PropertiesUtils.checkDisallowedProperties(conf, DISALLOWED_DEFAULT_PROPERTIES); |
| } |
| catch (CommandException ex) { |
| throw convertException(ex); |
| } |
| XConfiguration.copy(conf, subWorkflowConf); |
| } |
| } |
| |
| @SuppressWarnings("unchecked") |
| protected void injectCallback(Context context, Configuration conf) { |
| String callback = context.getCallbackUrl("$status"); |
| if (conf.get(OozieClient.WORKFLOW_NOTIFICATION_URL) != null) { |
| XLog.getLog(getClass()) |
| .warn("Sub-Workflow configuration has a custom job end notification URI, overriding"); |
| } |
| conf.set(OozieClient.WORKFLOW_NOTIFICATION_URL, callback); |
| } |
| |
| protected void injectRecovery(String externalId, Configuration conf) { |
| conf.set(OozieClient.EXTERNAL_ID, externalId); |
| } |
| |
| protected void injectParent(String parentId, Configuration conf) { |
| conf.set(PARENT_ID, parentId); |
| } |
| |
| protected void injectSuperParent(WorkflowJob parentWorkflow, Configuration parentConf, Configuration conf) { |
| String superParentId = parentConf.get(SUPER_PARENT_ID); |
| if (superParentId == null) { |
| // This is a sub-workflow at depth 1 |
| superParentId = parentWorkflow.getParentId(); |
| |
| // If the parent workflow is not submitted through a coordinator then the parentId will be the super parent id. |
| if (superParentId == null) { |
| superParentId = parentWorkflow.getId(); |
| } |
| conf.set(SUPER_PARENT_ID, superParentId); |
| } else { |
| // Sub-workflow at depth 2 or more. |
| conf.set(SUPER_PARENT_ID, superParentId); |
| } |
| } |
| |
| protected void verifyAndInjectSubworkflowDepth(Configuration parentConf, Configuration conf) throws ActionExecutorException { |
| int depth = parentConf.getInt(SUBWORKFLOW_DEPTH, 0); |
| int maxDepth = ConfigurationService.getInt(SUBWORKFLOW_MAX_DEPTH); |
| if (depth >= maxDepth) { |
| throw new ActionExecutorException(ActionExecutorException.ErrorType.ERROR, "SUBWF001", |
| "Depth [{0}] cannot exceed maximum subworkflow depth [{1}]", (depth + 1), maxDepth); |
| } |
| conf.setInt(SUBWORKFLOW_DEPTH, depth + 1); |
| } |
| |
| protected String checkIfRunning(OozieClient oozieClient, String extId) throws OozieClientException { |
| String jobId = oozieClient.getJobId(extId); |
| if (jobId.equals("")) { |
| return null; |
| } |
| return jobId; |
| } |
| |
| public void start(Context context, WorkflowAction action) throws ActionExecutorException { |
| try { |
| Element eConf = XmlUtils.parseXml(action.getConf()); |
| Namespace ns = eConf.getNamespace(); |
| Element e = eConf.getChild("oozie", ns); |
| String oozieUri = (e == null) ? LOCAL : e.getTextTrim(); |
| OozieClient oozieClient = getWorkflowClient(context, oozieUri); |
| String subWorkflowId = null; |
| String extId = context.getRecoveryId(); |
| String runningJobId = null; |
| if (extId != null) { |
| runningJobId = checkIfRunning(oozieClient, extId); |
| } |
| if (runningJobId == null) { |
| String appPath = eConf.getChild("app-path", ns).getTextTrim(); |
| |
| XConfiguration subWorkflowConf = new XConfiguration(); |
| |
| injectInline(eConf.getChild("configuration", ns), subWorkflowConf); |
| |
| Configuration parentConf = new XConfiguration(new StringReader(context.getWorkflow().getConf())); |
| |
| if (eConf.getChild(("propagate-configuration"), ns) != null) { |
| XConfiguration.copy(parentConf, subWorkflowConf); |
| } |
| |
| // Propagate coordinator and bundle info to subworkflow |
| if (OozieJobInfo.isJobInfoEnabled()) { |
| if (parentConf.get(OozieJobInfo.COORD_ID) != null) { |
| subWorkflowConf.set(OozieJobInfo.COORD_ID, parentConf.get(OozieJobInfo.COORD_ID)); |
| subWorkflowConf.set(OozieJobInfo.COORD_NAME, parentConf.get(OozieJobInfo.COORD_NAME)); |
| subWorkflowConf.set(OozieJobInfo.COORD_NOMINAL_TIME, parentConf.get(OozieJobInfo.COORD_NOMINAL_TIME)); |
| } |
| if (parentConf.get(OozieJobInfo.BUNDLE_ID) != null) { |
| subWorkflowConf.set(OozieJobInfo.BUNDLE_ID, parentConf.get(OozieJobInfo.BUNDLE_ID)); |
| subWorkflowConf.set(OozieJobInfo.BUNDLE_NAME, parentConf.get(OozieJobInfo.BUNDLE_NAME)); |
| } |
| } |
| |
| // the proto has the necessary credentials |
| Configuration protoActionConf = context.getProtoActionConf(); |
| XConfiguration.copy(protoActionConf, subWorkflowConf); |
| subWorkflowConf.set(OozieClient.APP_PATH, appPath); |
| String group = ConfigUtils.getWithDeprecatedCheck(parentConf, OozieClient.JOB_ACL, OozieClient.GROUP_NAME, null); |
| if(group != null) { |
| subWorkflowConf.set(OozieClient.GROUP_NAME, group); |
| } |
| |
| injectCallback(context, subWorkflowConf); |
| injectRecovery(extId, subWorkflowConf); |
| injectParent(context.getWorkflow().getId(), subWorkflowConf); |
| injectSuperParent(context.getWorkflow(), parentConf, subWorkflowConf); |
| verifyAndInjectSubworkflowDepth(parentConf, subWorkflowConf); |
| |
| //TODO: this has to be refactored later to be done in a single place for REST calls and this |
| JobUtils.normalizeAppPath(context.getWorkflow().getUser(), context.getWorkflow().getGroup(), |
| subWorkflowConf); |
| |
| subWorkflowConf.set(OOZIE_ACTION_YARN_TAG, getActionYarnTag(parentConf, context.getWorkflow(), action)); |
| |
| // if the rerun failed node option is provided during the time of rerun command, old subworkflow will |
| // rerun again. |
| if(action.getExternalId() != null && parentConf.getBoolean(OozieClient.RERUN_FAIL_NODES, false)) { |
| subWorkflowConf.setBoolean(SUBWORKFLOW_RERUN, true); |
| oozieClient.reRun(action.getExternalId(), subWorkflowConf.toProperties()); |
| subWorkflowId = action.getExternalId(); |
| } else { |
| subWorkflowId = oozieClient.run(subWorkflowConf.toProperties()); |
| } |
| } |
| else { |
| subWorkflowId = runningJobId; |
| } |
| WorkflowJob workflow = oozieClient.getJobInfo(subWorkflowId); |
| String consoleUrl = workflow.getConsoleUrl(); |
| context.setStartData(subWorkflowId, oozieUri, consoleUrl); |
| if (runningJobId != null) { |
| check(context, action); |
| } |
| } |
| catch (Exception ex) { |
| LOG.error(ex); |
| throw convertException(ex); |
| } |
| } |
| |
| public void end(Context context, WorkflowAction action) throws ActionExecutorException { |
| try { |
| String externalStatus = action.getExternalStatus(); |
| WorkflowAction.Status status = externalStatus.equals("SUCCEEDED") ? WorkflowAction.Status.OK |
| : WorkflowAction.Status.ERROR; |
| context.setEndData(status, getActionSignal(status)); |
| } |
| catch (Exception ex) { |
| throw convertException(ex); |
| } |
| } |
| |
| public void check(Context context, WorkflowAction action) throws ActionExecutorException { |
| try { |
| String subWorkflowId = action.getExternalId(); |
| String oozieUri = action.getTrackerUri(); |
| OozieClient oozieClient = getWorkflowClient(context, oozieUri); |
| WorkflowJob subWorkflow = oozieClient.getJobInfo(subWorkflowId); |
| WorkflowJob.Status status = subWorkflow.getStatus(); |
| switch (status) { |
| case FAILED: |
| case KILLED: |
| case SUCCEEDED: |
| context.setExecutionData(status.toString(), null); |
| break; |
| default: |
| context.setExternalStatus(status.toString()); |
| break; |
| } |
| } |
| catch (Exception ex) { |
| throw convertException(ex); |
| } |
| } |
| |
| public void kill(Context context, WorkflowAction action) throws ActionExecutorException { |
| try { |
| String subWorkflowId = action.getExternalId(); |
| String oozieUri = action.getTrackerUri(); |
| if (subWorkflowId != null && oozieUri != null) { |
| OozieClient oozieClient = getWorkflowClient(context, oozieUri); |
| oozieClient.kill(subWorkflowId); |
| } |
| context.setEndData(WorkflowAction.Status.KILLED, getActionSignal(WorkflowAction.Status.KILLED)); |
| } |
| catch (Exception ex) { |
| throw convertException(ex); |
| } |
| } |
| |
| private static Set<String> FINAL_STATUS = new HashSet<String>(); |
| |
| static { |
| FINAL_STATUS.add("SUCCEEDED"); |
| FINAL_STATUS.add("KILLED"); |
| FINAL_STATUS.add("FAILED"); |
| } |
| |
| public boolean isCompleted(String externalStatus) { |
| return FINAL_STATUS.contains(externalStatus); |
| } |
| |
| public boolean supportsConfigurationJobXML() { |
| return true; |
| } |
| } |