blob: fee4fa380233b6b7422532b0afce43d37acb1e22 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.oozie.command.bundle;
import java.io.IOException;
import java.io.InputStreamReader;
import java.io.Reader;
import java.io.StringReader;
import java.io.StringWriter;
import java.net.URI;
import java.net.URISyntaxException;
import java.nio.charset.StandardCharsets;
import java.util.Date;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
import javax.xml.transform.stream.StreamSource;
import javax.xml.validation.Validator;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.oozie.BundleJobBean;
import org.apache.oozie.ErrorCode;
import org.apache.oozie.client.Job;
import org.apache.oozie.client.OozieClient;
import org.apache.oozie.command.CommandException;
import org.apache.oozie.command.PreconditionException;
import org.apache.oozie.command.SubmitTransitionXCommand;
import org.apache.oozie.executor.jpa.BundleJobQueryExecutor;
import org.apache.oozie.service.ELService;
import org.apache.oozie.service.HadoopAccessorException;
import org.apache.oozie.service.HadoopAccessorService;
import org.apache.oozie.service.SchemaService;
import org.apache.oozie.service.SchemaService.SchemaName;
import org.apache.oozie.service.Services;
import org.apache.oozie.service.UUIDService;
import org.apache.oozie.service.UUIDService.ApplicationType;
import org.apache.oozie.util.ConfigUtils;
import org.apache.oozie.util.DateUtils;
import org.apache.oozie.util.ELEvaluator;
import org.apache.oozie.util.ELUtils;
import org.apache.oozie.util.IOUtils;
import org.apache.oozie.util.InstrumentUtils;
import org.apache.oozie.util.LogUtils;
import org.apache.oozie.util.ParamChecker;
import org.apache.oozie.util.ParameterVerifier;
import org.apache.oozie.util.PropertiesUtils;
import org.apache.oozie.util.XConfiguration;
import org.apache.oozie.util.XmlUtils;
import org.jdom.Attribute;
import org.jdom.Element;
import org.jdom.JDOMException;
import org.xml.sax.SAXException;
/**
* This Command will submit the bundle.
*/
public class BundleSubmitXCommand extends SubmitTransitionXCommand {
private Configuration conf;
public static final String CONFIG_DEFAULT = "bundle-config-default.xml";
public static final String BUNDLE_XML_FILE = "bundle.xml";
private final BundleJobBean bundleBean = new BundleJobBean();
private String jobId;
private static final Set<String> DISALLOWED_USER_PROPERTIES = new HashSet<>();
private static final Set<String> DISALLOWED_DEFAULT_PROPERTIES = new HashSet<>();
static {
String[] badUserProps = { PropertiesUtils.YEAR, PropertiesUtils.MONTH, PropertiesUtils.DAY,
PropertiesUtils.HOUR, PropertiesUtils.MINUTE, PropertiesUtils.DAYS, PropertiesUtils.HOURS,
PropertiesUtils.MINUTES, PropertiesUtils.KB, PropertiesUtils.MB, PropertiesUtils.GB,
PropertiesUtils.TB, PropertiesUtils.PB, PropertiesUtils.RECORDS, PropertiesUtils.MAP_IN,
PropertiesUtils.MAP_OUT, PropertiesUtils.REDUCE_IN, PropertiesUtils.REDUCE_OUT, PropertiesUtils.GROUPS };
PropertiesUtils.createPropertySet(badUserProps, DISALLOWED_USER_PROPERTIES);
}
/**
* Constructor to create the bundle submit command.
*
* @param conf configuration for bundle job
*/
public BundleSubmitXCommand(Configuration conf) {
super("bundle_submit", "bundle_submit", 1);
this.conf = Objects.requireNonNull(conf, "conf cannot be null");
}
/**
* Constructor to create the bundle submit command.
*
* @param dryrun true if dryrun is enable
* @param conf configuration for bundle job
*/
public BundleSubmitXCommand(boolean dryrun, Configuration conf) {
this(conf);
this.dryrun = dryrun;
}
@Override
protected String submit() throws CommandException {
LOG.info("STARTED Bundle Submit");
try {
InstrumentUtils.incrJobCounter(getName(), 1, getInstrumentation());
ParameterVerifier.verifyParameters(conf, XmlUtils.parseXml(bundleBean.getOrigJobXml()));
String jobXmlWithNoComment = XmlUtils.removeComments(this.bundleBean.getOrigJobXml().toString());
// Resolving all variables in the job properties.
// This ensures the Hadoop Configuration semantics is preserved.
XConfiguration resolvedVarsConf = new XConfiguration();
for (Map.Entry<String, String> entry : conf) {
resolvedVarsConf.set(entry.getKey(), conf.get(entry.getKey()));
}
conf = resolvedVarsConf;
String resolvedJobXml = resolvedVarsandFunctions(jobXmlWithNoComment, conf);
//verify the uniqueness of coord names
verifyCoordNameUnique(resolvedJobXml);
this.jobId = storeToDB(bundleBean, resolvedJobXml);
LogUtils.setLogInfo(bundleBean);
if (dryrun) {
jobId = bundleBean.getId();
LOG.info("[" + jobId + "]: Update status to PREP");
bundleBean.setStatus(Job.Status.PREP);
try {
new XConfiguration(new StringReader(bundleBean.getConf()));
}
catch (IOException e1) {
LOG.warn("Configuration parse error. read from DB :" + bundleBean.getConf(), e1);
}
return bundleBean.getJobXml() + System.getProperty("line.separator");
}
else {
if (bundleBean.getKickoffTime() == null) {
// If there is no KickOffTime, default kickoff is NOW.
LOG.debug("Since kickoff time is not defined for job id " + jobId
+ ". Queuing and BundleStartXCommand immediately after submission");
queue(new BundleStartXCommand(jobId));
}
}
}
catch (Exception ex) {
throw new CommandException(ErrorCode.E1310, ex.getMessage(), ex);
}
LOG.info("ENDED Bundle Submit");
return this.jobId;
}
@Override
public void notifyParent() throws CommandException {
}
@Override
public String getEntityKey() {
return null;
}
@Override
protected boolean isLockRequired() {
return false;
}
@Override
protected void loadState() throws CommandException {
}
@Override
protected void verifyPrecondition() throws CommandException, PreconditionException {
}
@Override
protected void eagerLoadState() throws CommandException {
}
@Override
protected void eagerVerifyPrecondition() throws CommandException, PreconditionException {
try {
mergeDefaultConfig();
String appXml = readAndValidateXml();
bundleBean.setOrigJobXml(appXml);
LOG.debug("jobXml after initial validation " + XmlUtils.prettyPrint(appXml).toString());
}
catch (BundleJobException ex) {
LOG.warn("BundleJobException: ", ex);
throw new CommandException(ex);
}
catch (IllegalArgumentException iex) {
LOG.warn("IllegalArgumentException: ", iex);
throw new CommandException(ErrorCode.E1310, iex.getMessage(), iex);
}
catch (Exception ex) {
LOG.warn("Exception: ", ex);
throw new CommandException(ErrorCode.E1310, ex.getMessage(), ex);
}
}
/**
* Merge default configuration with user-defined configuration.
*
* @throws CommandException thrown if failed to merge configuration
*/
protected void mergeDefaultConfig() throws CommandException {
Path configDefault = null;
try {
String bundleAppPathStr = conf.get(OozieClient.BUNDLE_APP_PATH);
Path bundleAppPath = new Path(bundleAppPathStr);
String user = ParamChecker.notEmpty(conf.get(OozieClient.USER_NAME), OozieClient.USER_NAME);
HadoopAccessorService has = Services.get().get(HadoopAccessorService.class);
Configuration fsConf = has.createConfiguration(bundleAppPath.toUri().getAuthority());
FileSystem fs = has.createFileSystem(user, bundleAppPath.toUri(), fsConf);
// app path could be a directory
if (!fs.isFile(bundleAppPath)) {
configDefault = new Path(bundleAppPath, CONFIG_DEFAULT);
} else {
configDefault = new Path(bundleAppPath.getParent(), CONFIG_DEFAULT);
}
if (fs.exists(configDefault)) {
Configuration defaultConf = new XConfiguration(fs.open(configDefault));
PropertiesUtils.checkDisallowedProperties(defaultConf, DISALLOWED_USER_PROPERTIES);
PropertiesUtils.checkDefaultDisallowedProperties(defaultConf);
XConfiguration.injectDefaults(defaultConf, conf);
}
else {
LOG.info("configDefault Doesn't exist " + configDefault);
}
PropertiesUtils.checkDisallowedProperties(conf, DISALLOWED_USER_PROPERTIES);
}
catch (IOException e) {
throw new CommandException(ErrorCode.E0702, e.getMessage() + " : Problem reading default config "
+ configDefault, e);
}
catch (HadoopAccessorException e) {
throw new CommandException(e);
}
LOG.debug("Merged CONF :" + XmlUtils.prettyPrint(conf).toString());
}
/**
* Read the application XML and validate against bundle Schema
*
* @return validated bundle XML
* @throws BundleJobException thrown if failed to read or validate xml
*/
private String readAndValidateXml() throws BundleJobException {
String appPath = ParamChecker.notEmpty(conf.get(OozieClient.BUNDLE_APP_PATH), OozieClient.BUNDLE_APP_PATH);
String bundleXml = readDefinition(appPath);
validateXml(bundleXml);
return bundleXml;
}
/**
* Read bundle definition.
*
* @param appPath application path.
* @return bundle definition.
* @throws BundleJobException thrown if the definition could not be read.
*/
protected String readDefinition(String appPath) throws BundleJobException {
String user = ParamChecker.notEmpty(conf.get(OozieClient.USER_NAME), OozieClient.USER_NAME);
//Configuration confHadoop = CoordUtils.getHadoopConf(conf);
try {
URI uri = new URI(appPath);
LOG.debug("user =" + user);
HadoopAccessorService has = Services.get().get(HadoopAccessorService.class);
Configuration fsConf = has.createConfiguration(uri.getAuthority());
FileSystem fs = has.createFileSystem(user, uri, fsConf);
Path appDefPath;
// app path could be a directory
Path path = new Path(uri.getPath());
if (!fs.isFile(path)) {
appDefPath = new Path(path, BUNDLE_XML_FILE);
} else {
appDefPath = path;
}
Reader reader = new InputStreamReader(fs.open(appDefPath), StandardCharsets.UTF_8);
StringWriter writer = new StringWriter();
IOUtils.copyCharStream(reader, writer);
return writer.toString();
}
catch (IOException ex) {
LOG.warn("IOException :" + XmlUtils.prettyPrint(conf), ex);
throw new BundleJobException(ErrorCode.E1301, ex.getMessage(), ex);
}
catch (URISyntaxException ex) {
LOG.warn("URISyException :" + ex.getMessage());
throw new BundleJobException(ErrorCode.E1302, appPath, ex.getMessage(), ex);
}
catch (HadoopAccessorException ex) {
throw new BundleJobException(ex);
}
catch (Exception ex) {
LOG.warn("Exception :", ex);
throw new BundleJobException(ErrorCode.E1301, ex.getMessage(), ex);
}
}
/**
* Validate against Bundle XSD file
*
* @param xmlContent input bundle xml
* @throws BundleJobException thrown if failed to validate xml
*/
private void validateXml(String xmlContent) throws BundleJobException {
try {
Validator validator = Services.get().get(SchemaService.class).getValidator(SchemaName.BUNDLE);
validator.validate(new StreamSource(new StringReader(xmlContent)));
}
catch (SAXException ex) {
LOG.warn("SAXException :", ex);
throw new BundleJobException(ErrorCode.E0701, ex.getMessage(), ex);
}
catch (IOException ex) {
LOG.warn("IOException :", ex);
throw new BundleJobException(ErrorCode.E0702, ex.getMessage(), ex);
}
}
/**
* Write a Bundle Job into database
*
* @param bundleJob job bean
* @return job id
* @throws CommandException thrown if failed to store bundle job bean to db
*/
private String storeToDB(BundleJobBean bundleJob, String resolvedJobXml) throws CommandException {
try {
jobId = Services.get().get(UUIDService.class).generateId(ApplicationType.BUNDLE);
bundleJob.setId(jobId);
String name = XmlUtils.parseXml(bundleBean.getOrigJobXml()).getAttributeValue("name");
name = ELUtils.resolveAppName(name, conf);
bundleJob.setAppName(name);
bundleJob.setAppPath(conf.get(OozieClient.BUNDLE_APP_PATH));
// bundleJob.setStatus(BundleJob.Status.PREP); //This should be set in parent class.
bundleJob.setCreatedTime(new Date());
bundleJob.setUser(conf.get(OozieClient.USER_NAME));
String group = ConfigUtils.getWithDeprecatedCheck(conf, OozieClient.JOB_ACL, OozieClient.GROUP_NAME, null);
bundleJob.setGroup(group);
bundleJob.setConf(XmlUtils.prettyPrint(conf).toString());
bundleJob.setJobXml(resolvedJobXml);
Element jobElement = XmlUtils.parseXml(resolvedJobXml);
Element controlsElement = jobElement.getChild("controls", jobElement.getNamespace());
if (controlsElement != null) {
Element kickoffTimeElement = controlsElement.getChild("kick-off-time", jobElement.getNamespace());
if (kickoffTimeElement != null && !kickoffTimeElement.getValue().isEmpty()) {
Date kickoffTime = DateUtils.parseDateOozieTZ(kickoffTimeElement.getValue());
bundleJob.setKickoffTime(kickoffTime);
}
}
bundleJob.setLastModifiedTime(new Date());
if (!dryrun) {
BundleJobQueryExecutor.getInstance().insert(bundleJob);
}
}
catch (Exception ex) {
throw new CommandException(ErrorCode.E1301, ex.getMessage(), ex);
}
return jobId;
}
@Override
public Job getJob() {
return bundleBean;
}
public static ELEvaluator createELEvaluatorForGroup(Configuration conf, String group) {
ELEvaluator eval = Services.get().get(ELService.class).createEvaluator(group);
setConfigToEval(eval, conf);
return eval;
}
private static void setConfigToEval(ELEvaluator eval, Configuration conf) {
for (Map.Entry<String, String> entry : conf) {
eval.setVariable(entry.getKey(), entry.getValue().trim());
}
}
/**
* Resolve job xml with conf
*
* @param bundleXml bundle job xml
* @param conf job configuration
* @return resolved job xml
* @throws BundleJobException thrown if failed to resolve variables
*/
private String resolvedVarsandFunctions(String bundleXml, Configuration conf) throws BundleJobException {
ELEvaluator eval;
try {
eval = createELEvaluatorForGroup(conf, "bundle-submit");
return eval.evaluate(bundleXml, String.class);
}
catch (Exception e) {
throw new BundleJobException(ErrorCode.E1004, e.getMessage(), e);
}
}
/**
* Create ELEvaluator
*
* @param conf job configuration
* @return ELEvaluator the evaluator for el function
* @throws BundleJobException thrown if failed to create evaluator
*/
public ELEvaluator createEvaluator(Configuration conf) throws BundleJobException {
ELEvaluator eval;
ELEvaluator.Context context;
try {
context = new ELEvaluator.Context();
eval = new ELEvaluator(context);
for (Map.Entry<String, String> entry : conf) {
eval.setVariable(entry.getKey(), entry.getValue());
}
}
catch (Exception e) {
throw new BundleJobException(ErrorCode.E1004, e.getMessage(), e);
}
return eval;
}
/**
* Verify the uniqueness of coordinator names
*
* @param resolvedJobXml job xml
* @throws CommandException thrown if failed to verify the uniqueness of coordinator names
*/
@SuppressWarnings("unchecked")
private Void verifyCoordNameUnique(String resolvedJobXml) throws CommandException {
Set<String> set = new HashSet<>();
try {
Element bAppXml = XmlUtils.parseXml(resolvedJobXml);
List<Element> coordElems = bAppXml.getChildren("coordinator", bAppXml.getNamespace());
for (Element elem : coordElems) {
Attribute name = elem.getAttribute("name");
if (name != null) {
String coordName;
try {
coordName = ELUtils.resolveAppName(name.getValue(), conf);
}
catch (Exception e) {
throw new CommandException(ErrorCode.E1321, e.getMessage(), e);
}
if (set.contains(coordName)) {
throw new CommandException(ErrorCode.E1304, name);
}
set.add(coordName);
}
else {
throw new CommandException(ErrorCode.E1305);
}
}
}
catch (JDOMException jex) {
throw new CommandException(ErrorCode.E1301, jex.getMessage(), jex);
}
return null;
}
@Override
public void updateJob() throws CommandException {
}
@Override
public void performWrites() throws CommandException {
}
}