blob: 1352db91bf0f53ebeecf2b2565cf9924601497a6 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.oozie.command.wf;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.FileSystem;
import org.apache.oozie.AppType;
import org.apache.oozie.SLAEventBean;
import org.apache.oozie.command.wf.ActionXCommand.ActionExecutorContext;
import org.apache.oozie.WorkflowJobBean;
import org.apache.oozie.ErrorCode;
import org.apache.oozie.action.oozie.SubWorkflowActionExecutor;
import org.apache.oozie.service.HadoopAccessorException;
import org.apache.oozie.service.JPAService;
import org.apache.oozie.service.UUIDService;
import org.apache.oozie.service.WorkflowStoreService;
import org.apache.oozie.service.WorkflowAppService;
import org.apache.oozie.service.HadoopAccessorService;
import org.apache.oozie.service.Services;
import org.apache.oozie.service.DagXLogInfoService;
import org.apache.oozie.util.ELUtils;
import org.apache.oozie.util.LogUtils;
import org.apache.oozie.sla.SLAOperations;
import org.apache.oozie.util.XLog;
import org.apache.oozie.util.XConfiguration;
import org.apache.oozie.util.XmlUtils;
import org.apache.oozie.command.CommandException;
import org.apache.oozie.executor.jpa.BatchQueryExecutor;
import org.apache.oozie.executor.jpa.JPAExecutorException;
import org.apache.oozie.service.ELService;
import org.apache.oozie.store.StoreException;
import org.apache.oozie.workflow.WorkflowApp;
import org.apache.oozie.workflow.WorkflowException;
import org.apache.oozie.workflow.WorkflowInstance;
import org.apache.oozie.workflow.WorkflowLib;
import org.apache.oozie.util.ELEvaluator;
import org.apache.oozie.util.InstrumentUtils;
import org.apache.oozie.util.PropertiesUtils;
import org.apache.oozie.util.db.SLADbOperations;
import org.apache.oozie.service.SchemaService.SchemaName;
import org.apache.oozie.client.OozieClient;
import org.apache.oozie.client.WorkflowJob;
import org.apache.oozie.client.SLAEvent.SlaAppType;
import org.apache.oozie.client.rest.JsonBean;
import org.jdom.Element;
import org.jdom.filter.ElementFilter;
import java.util.ArrayList;
import java.util.Date;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
import java.util.HashSet;
import java.io.IOException;
import java.net.URI;
@SuppressWarnings("deprecation")
public class SubmitXCommand extends WorkflowXCommand<String> {
public static final String CONFIG_DEFAULT = "config-default.xml";
private Configuration conf;
private List<JsonBean> insertList = new ArrayList<JsonBean>();
private String parentId;
/**
* Constructor to create the workflow Submit Command.
*
* @param conf : Configuration for workflow job
*/
public SubmitXCommand(Configuration conf) {
super("submit", "submit", 1);
this.conf = Objects.requireNonNull(conf, "conf cannot be null");
}
/**
* Constructor for submitting wf through coordinator
*
* @param conf : Configuration for workflow job
* @param parentId the coord action id
*/
public SubmitXCommand(Configuration conf, String parentId) {
this(conf);
this.parentId = parentId;
}
/**
* Constructor to create the workflow Submit Command.
*
* @param dryrun : if dryrun
* @param conf : Configuration for workflow job
*/
public SubmitXCommand(boolean dryrun, Configuration conf) {
this(conf);
this.dryrun = dryrun;
}
private static final Set<String> DISALLOWED_USER_PROPERTIES = new HashSet<String>();
static {
String[] badUserProps = {PropertiesUtils.DAYS, PropertiesUtils.HOURS, PropertiesUtils.MINUTES,
PropertiesUtils.KB, PropertiesUtils.MB, PropertiesUtils.GB, PropertiesUtils.TB, PropertiesUtils.PB,
PropertiesUtils.RECORDS, PropertiesUtils.MAP_IN, PropertiesUtils.MAP_OUT, PropertiesUtils.REDUCE_IN,
PropertiesUtils.REDUCE_OUT, PropertiesUtils.GROUPS};
PropertiesUtils.createPropertySet(badUserProps, DISALLOWED_USER_PROPERTIES);
}
@Override
protected String execute() throws CommandException {
InstrumentUtils.incrJobCounter(getName(), 1, getInstrumentation());
WorkflowAppService wps = Services.get().get(WorkflowAppService.class);
try {
XLog.Info.get().setParameter(DagXLogInfoService.TOKEN, conf.get(OozieClient.LOG_TOKEN));
String user = conf.get(OozieClient.USER_NAME);
URI uri = new URI(conf.get(OozieClient.APP_PATH));
HadoopAccessorService has = Services.get().get(HadoopAccessorService.class);
Configuration fsConf = has.createConfiguration(uri.getAuthority());
FileSystem fs = has.createFileSystem(user, uri, fsConf);
Path configDefault = null;
Configuration defaultConf = null;
// app path could be a directory
Path path = new Path(uri.getPath());
if (!fs.isFile(path)) {
configDefault = new Path(path, CONFIG_DEFAULT);
} else {
configDefault = new Path(path.getParent(), CONFIG_DEFAULT);
}
if (fs.exists(configDefault)) {
try {
defaultConf = new XConfiguration(fs.open(configDefault));
PropertiesUtils.checkDisallowedProperties(defaultConf, DISALLOWED_USER_PROPERTIES);
PropertiesUtils.checkDefaultDisallowedProperties(defaultConf);
XConfiguration.injectDefaults(defaultConf, conf);
}
catch (IOException ex) {
throw new IOException("default configuration file, " + ex.getMessage(), ex);
}
}
if (defaultConf != null) {
defaultConf = resolveDefaultConfVariables(defaultConf);
}
WorkflowApp app = wps.parseDef(conf, defaultConf);
XConfiguration protoActionConf = wps.createProtoActionConf(conf, true);
WorkflowLib workflowLib = Services.get().get(WorkflowStoreService.class).getWorkflowLibWithNoDB();
PropertiesUtils.checkDisallowedProperties(conf, DISALLOWED_USER_PROPERTIES);
// Resolving all variables in the job properties.
// This ensures the Hadoop Configuration semantics is preserved.
XConfiguration resolvedVarsConf = new XConfiguration();
for (Map.Entry<String, String> entry : conf) {
resolvedVarsConf.set(entry.getKey(), conf.get(entry.getKey()));
}
conf = resolvedVarsConf;
WorkflowInstance wfInstance;
try {
wfInstance = workflowLib.createInstance(app, conf);
}
catch (WorkflowException e) {
throw new StoreException(e);
}
Configuration conf = wfInstance.getConf();
// System.out.println("WF INSTANCE CONF:");
// System.out.println(XmlUtils.prettyPrint(conf).toString());
WorkflowJobBean workflow = new WorkflowJobBean();
workflow.setId(wfInstance.getId());
workflow.setAppName(ELUtils.resolveAppName(app.getName(), conf));
workflow.setAppPath(conf.get(OozieClient.APP_PATH));
workflow.setConf(XmlUtils.prettyPrint(conf).toString());
workflow.setProtoActionConf(protoActionConf.toXmlString());
workflow.setCreatedTime(new Date());
workflow.setLastModifiedTime(new Date());
workflow.setLogToken(conf.get(OozieClient.LOG_TOKEN, ""));
workflow.setStatus(WorkflowJob.Status.PREP);
workflow.setRun(0);
workflow.setUser(conf.get(OozieClient.USER_NAME));
workflow.setGroup(conf.get(OozieClient.GROUP_NAME));
workflow.setWorkflowInstance(wfInstance);
workflow.setExternalId(conf.get(OozieClient.EXTERNAL_ID));
// Set parent id if it doesn't already have one (for subworkflows)
if (workflow.getParentId() == null) {
workflow.setParentId(conf.get(SubWorkflowActionExecutor.PARENT_ID));
}
// Set to coord action Id if workflow submitted through coordinator
if (workflow.getParentId() == null) {
workflow.setParentId(parentId);
}
LogUtils.setLogInfo(workflow);
LOG.debug("Workflow record created, Status [{0}]", workflow.getStatus());
Element wfElem = XmlUtils.parseXml(app.getDefinition());
ELEvaluator evalSla = createELEvaluatorForGroup(conf, "wf-sla-submit");
String jobSlaXml = verifySlaElements(wfElem, evalSla);
if (!dryrun) {
writeSLARegistration(wfElem, jobSlaXml, workflow.getId(), workflow.getParentId(), workflow.getUser(),
workflow.getGroup(), workflow.getAppName(), LOG, evalSla);
workflow.setSlaXml(jobSlaXml);
// System.out.println("SlaXml :"+ slaXml);
//store.insertWorkflow(workflow);
insertList.add(workflow);
JPAService jpaService = Services.get().get(JPAService.class);
if (jpaService != null) {
try {
BatchQueryExecutor.getInstance().executeBatchInsertUpdateDelete(insertList, null, null);
}
catch (JPAExecutorException je) {
throw new CommandException(je);
}
}
else {
LOG.error(ErrorCode.E0610);
return null;
}
return workflow.getId();
}
else {
// Checking variable substitution for dryrun
ActionExecutorContext context = new ActionXCommand.ActionExecutorContext(workflow, null, false, false);
Element workflowXml = XmlUtils.parseXml(app.getDefinition());
removeSlaElements(workflowXml);
String workflowXmlString = XmlUtils.removeComments(XmlUtils.prettyPrint(workflowXml).toString());
workflowXmlString = context.getELEvaluator().evaluate(workflowXmlString, String.class);
workflowXml = XmlUtils.parseXml(workflowXmlString);
Iterator<Element> it = workflowXml.getDescendants(new ElementFilter("job-xml"));
// Checking all variable substitutions in job-xml files
while (it.hasNext()) {
Element e = it.next();
String jobXml = e.getTextTrim();
Path xmlPath = new Path(workflow.getAppPath(), jobXml);
Configuration jobXmlConf = new XConfiguration(fs.open(xmlPath));
String jobXmlConfString = XmlUtils.prettyPrint(jobXmlConf).toString();
jobXmlConfString = XmlUtils.removeComments(jobXmlConfString);
context.getELEvaluator().evaluate(jobXmlConfString, String.class);
}
return "OK";
}
}
catch (WorkflowException ex) {
throw new CommandException(ex);
}
catch (HadoopAccessorException ex) {
throw new CommandException(ex);
}
catch (Exception ex) {
throw new CommandException(ErrorCode.E0803, ex.getMessage(), ex);
}
}
/**
* Resolving variables from config-default, which might be referencing into conf/defaultConf
* @param defaultConf config-default.xml
* @return resolved config-default configuration.
*/
private Configuration resolveDefaultConfVariables(Configuration defaultConf) {
XConfiguration resolveDefaultConf = new XConfiguration();
for (Map.Entry<String, String> entry : defaultConf) {
String defaultConfKey = entry.getKey();
String defaultConfValue = entry.getValue();
// if value is referencing some other key, first check within the default config to resolve,
// then job.properties (conf)
if (defaultConfValue.contains("$") && defaultConf.get(defaultConfKey).contains("$")) {
resolveDefaultConf.set(defaultConfKey, conf.get(defaultConfKey));
} else {
resolveDefaultConf.set(defaultConfKey, defaultConf.get(defaultConfKey));
}
}
return resolveDefaultConf;
}
private void removeSlaElements(Element eWfJob) {
Element sla = XmlUtils.getSLAElement(eWfJob);
if (sla != null) {
eWfJob.removeChildren(sla.getName(), sla.getNamespace());
}
for (Element action : (List<Element>) eWfJob.getChildren("action", eWfJob.getNamespace())) {
sla = XmlUtils.getSLAElement(action);
if (sla != null) {
action.removeChildren(sla.getName(), sla.getNamespace());
}
}
}
private String verifySlaElements(Element eWfJob, ELEvaluator evalSla) throws CommandException {
String jobSlaXml = "";
// Validate WF job
Element eSla = XmlUtils.getSLAElement(eWfJob);
if (eSla != null) {
jobSlaXml = resolveSla(eSla, evalSla);
}
// Validate all actions
for (Element action : (List<Element>) eWfJob.getChildren("action", eWfJob.getNamespace())) {
eSla = XmlUtils.getSLAElement(action);
if (eSla != null) {
resolveSla(eSla, evalSla);
}
}
return jobSlaXml;
}
private void writeSLARegistration(Element eWfJob, String slaXml, String jobId, String parentId, String user,
String group, String appName, XLog log, ELEvaluator evalSla) throws CommandException {
try {
if (slaXml != null && slaXml.length() > 0) {
Element eSla = XmlUtils.parseXml(slaXml);
SLAEventBean slaEvent = SLADbOperations.createSlaRegistrationEvent(eSla, jobId,
SlaAppType.WORKFLOW_JOB, user, group, log);
if(slaEvent != null) {
insertList.add(slaEvent);
}
// insert into new table
SLAOperations.createSlaRegistrationEvent(eSla, jobId, parentId, AppType.WORKFLOW_JOB, user, appName,
log, false);
}
// Add sla for wf actions
for (Element action : (List<Element>) eWfJob.getChildren("action", eWfJob.getNamespace())) {
Element actionSla = XmlUtils.getSLAElement(action);
if (actionSla != null) {
String actionSlaXml = SubmitXCommand.resolveSla(actionSla, evalSla);
actionSla = XmlUtils.parseXml(actionSlaXml);
String actionId = Services.get().get(UUIDService.class)
.generateChildId(jobId, action.getAttributeValue("name") + "");
SLAOperations.createSlaRegistrationEvent(actionSla, actionId, jobId, AppType.WORKFLOW_ACTION,
user, appName, log, false);
}
}
}
catch (Exception e) {
e.printStackTrace();
throw new CommandException(ErrorCode.E1007, "workflow " + jobId, e.getMessage(), e);
}
}
/**
* Resolve variables in sla xml element.
*
* @param eSla sla xml element
* @param evalSla sla evaluator
* @return sla xml string after evaluation
* @throws CommandException if command cannot be executed
*/
public static String resolveSla(Element eSla, ELEvaluator evalSla) throws CommandException {
// EL evaluation
String slaXml = XmlUtils.prettyPrint(eSla).toString();
try {
slaXml = XmlUtils.removeComments(slaXml);
slaXml = evalSla.evaluate(slaXml, String.class);
XmlUtils.validateData(slaXml, SchemaName.SLA_ORIGINAL);
return slaXml;
}
catch (Exception e) {
throw new CommandException(ErrorCode.E1004, "Validation error :" + e.getMessage(), e);
}
}
/**
* Create an EL evaluator for a given group.
*
* @param conf configuration variable
* @param group group variable
* @return the evaluator created for the group
*/
public static ELEvaluator createELEvaluatorForGroup(Configuration conf, String group) {
ELEvaluator eval = Services.get().get(ELService.class).createEvaluator(group);
for (Map.Entry<String, String> entry : conf) {
eval.setVariable(entry.getKey(), entry.getValue());
}
return eval;
}
@Override
public String getEntityKey() {
return null;
}
@Override
protected boolean isLockRequired() {
return false;
}
@Override
protected void loadState() {
}
@Override
protected void verifyPrecondition() throws CommandException {
}
}