blob: 50577fd7fa4038336fedfafe07a7e66eea679cec [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.oozie.action.hadoop;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.io.StringReader;
import java.net.ConnectException;
import java.net.URI;
import java.net.URISyntaxException;
import java.net.UnknownHostException;
import java.security.PrivilegedExceptionAction;
import java.text.MessageFormat;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Properties;
import java.util.Set;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.filecache.DistributedCache;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.permission.AccessControlException;
import org.apache.oozie.hadoop.utils.HadoopShims;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapred.JobClient;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.JobID;
import org.apache.hadoop.mapred.RunningJob;
import org.apache.hadoop.mapreduce.security.token.delegation.DelegationTokenIdentifier;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.security.token.Token;
import org.apache.hadoop.security.token.TokenIdentifier;
import org.apache.hadoop.util.DiskChecker;
import org.apache.oozie.WorkflowActionBean;
import org.apache.oozie.WorkflowJobBean;
import org.apache.oozie.action.ActionExecutor;
import org.apache.oozie.action.ActionExecutorException;
import org.apache.oozie.client.OozieClient;
import org.apache.oozie.client.WorkflowAction;
import org.apache.oozie.client.WorkflowJob;
import org.apache.oozie.command.coord.CoordActionStartXCommand;
import org.apache.oozie.service.ConfigurationService;
import org.apache.oozie.service.HadoopAccessorException;
import org.apache.oozie.service.HadoopAccessorService;
import org.apache.oozie.service.Services;
import org.apache.oozie.service.ShareLibService;
import org.apache.oozie.service.URIHandlerService;
import org.apache.oozie.service.UserGroupInformationService;
import org.apache.oozie.service.WorkflowAppService;
import org.apache.oozie.util.ELEvaluationException;
import org.apache.oozie.util.ELEvaluator;
import org.apache.oozie.util.JobUtils;
import org.apache.oozie.util.LogUtils;
import org.apache.oozie.util.PropertiesUtils;
import org.apache.oozie.util.XConfiguration;
import org.apache.oozie.util.XLog;
import org.apache.oozie.util.XmlUtils;
import org.jdom.Element;
import org.jdom.JDOMException;
import org.jdom.Namespace;
public class JavaActionExecutor extends ActionExecutor {
protected static final String HADOOP_USER = "user.name";
public static final String HADOOP_JOB_TRACKER = "mapred.job.tracker";
public static final String HADOOP_JOB_TRACKER_2 = "mapreduce.jobtracker.address";
public static final String HADOOP_YARN_RM = "yarn.resourcemanager.address";
public static final String HADOOP_NAME_NODE = "fs.default.name";
private static final String HADOOP_JOB_NAME = "mapred.job.name";
public static final String OOZIE_COMMON_LIBDIR = "oozie";
private static final Set<String> DISALLOWED_PROPERTIES = new HashSet<String>();
public final static String MAX_EXTERNAL_STATS_SIZE = "oozie.external.stats.max.size";
public static final String ACL_VIEW_JOB = "mapreduce.job.acl-view-job";
public static final String ACL_MODIFY_JOB = "mapreduce.job.acl-modify-job";
public static final String HADOOP_YARN_TIMELINE_SERVICE_ENABLED = "yarn.timeline-service.enabled";
public static final String HADOOP_YARN_UBER_MODE = "mapreduce.job.ubertask.enable";
public static final String HADOOP_YARN_KILL_CHILD_JOBS_ON_AMRESTART = "oozie.action.launcher.am.restart.kill.childjobs";
public static final String HADOOP_MAP_MEMORY_MB = "mapreduce.map.memory.mb";
public static final String HADOOP_CHILD_JAVA_OPTS = "mapred.child.java.opts";
public static final String HADOOP_MAP_JAVA_OPTS = "mapreduce.map.java.opts";
public static final String HADOOP_REDUCE_JAVA_OPTS = "mapreduce.reduce.java.opts";
public static final String HADOOP_CHILD_JAVA_ENV = "mapred.child.env";
public static final String HADOOP_MAP_JAVA_ENV = "mapreduce.map.env";
public static final String YARN_AM_RESOURCE_MB = "yarn.app.mapreduce.am.resource.mb";
public static final String YARN_AM_COMMAND_OPTS = "yarn.app.mapreduce.am.command-opts";
public static final String YARN_AM_ENV = "yarn.app.mapreduce.am.env";
private static final String JAVA_MAIN_CLASS_NAME = "org.apache.oozie.action.hadoop.JavaMain";
public static final int YARN_MEMORY_MB_MIN = 512;
private static int maxActionOutputLen;
private static int maxExternalStatsSize;
private static int maxFSGlobMax;
private static final String SUCCEEDED = "SUCCEEDED";
private static final String KILLED = "KILLED";
private static final String FAILED = "FAILED";
private static final String FAILED_KILLED = "FAILED/KILLED";
protected XLog LOG = XLog.getLog(getClass());
private static final Pattern heapPattern = Pattern.compile("-Xmx(([0-9]+)[mMgG])");
private static final String JAVA_TMP_DIR_SETTINGS = "-Djava.io.tmpdir=";
public static final String CONF_HADOOP_YARN_UBER_MODE = "oozie.action.launcher." + HADOOP_YARN_UBER_MODE;
public static final String HADOOP_JOB_CLASSLOADER = "mapreduce.job.classloader";
public static final String HADOOP_USER_CLASSPATH_FIRST = "mapreduce.user.classpath.first";
public static final String OOZIE_CREDENTIALS_SKIP = "oozie.credentials.skip";
public XConfiguration workflowConf = null;
static {
DISALLOWED_PROPERTIES.add(HADOOP_USER);
DISALLOWED_PROPERTIES.add(HADOOP_JOB_TRACKER);
DISALLOWED_PROPERTIES.add(HADOOP_NAME_NODE);
DISALLOWED_PROPERTIES.add(HADOOP_JOB_TRACKER_2);
DISALLOWED_PROPERTIES.add(HADOOP_YARN_RM);
}
public JavaActionExecutor() {
this("java");
}
protected JavaActionExecutor(String type) {
super(type);
}
public static List<Class> getCommonLauncherClasses() {
List<Class> classes = new ArrayList<Class>();
classes.add(LauncherMapper.class);
classes.add(OozieLauncherInputFormat.class);
classes.add(OozieLauncherOutputFormat.class);
classes.add(OozieLauncherOutputCommitter.class);
classes.add(LauncherMainHadoopUtils.class);
classes.add(HadoopShims.class);
classes.addAll(Services.get().get(URIHandlerService.class).getClassesForLauncher());
return classes;
}
public List<Class> getLauncherClasses() {
List<Class> classes = new ArrayList<Class>();
try {
classes.add(Class.forName(JAVA_MAIN_CLASS_NAME));
}
catch (ClassNotFoundException e) {
throw new RuntimeException("Class not found", e);
}
return classes;
}
@Override
public void initActionType() {
super.initActionType();
maxActionOutputLen = ConfigurationService.getInt(LauncherMapper.CONF_OOZIE_ACTION_MAX_OUTPUT_DATA);
//Get the limit for the maximum allowed size of action stats
maxExternalStatsSize = ConfigurationService.getInt(JavaActionExecutor.MAX_EXTERNAL_STATS_SIZE);
maxExternalStatsSize = (maxExternalStatsSize == -1) ? Integer.MAX_VALUE : maxExternalStatsSize;
//Get the limit for the maximum number of globbed files/dirs for FS operation
maxFSGlobMax = ConfigurationService.getInt(LauncherMapper.CONF_OOZIE_ACTION_FS_GLOB_MAX);
registerError(UnknownHostException.class.getName(), ActionExecutorException.ErrorType.TRANSIENT, "JA001");
registerError(AccessControlException.class.getName(), ActionExecutorException.ErrorType.NON_TRANSIENT,
"JA002");
registerError(DiskChecker.DiskOutOfSpaceException.class.getName(),
ActionExecutorException.ErrorType.NON_TRANSIENT, "JA003");
registerError(org.apache.hadoop.hdfs.protocol.QuotaExceededException.class.getName(),
ActionExecutorException.ErrorType.NON_TRANSIENT, "JA004");
registerError(org.apache.hadoop.hdfs.server.namenode.SafeModeException.class.getName(),
ActionExecutorException.ErrorType.NON_TRANSIENT, "JA005");
registerError(ConnectException.class.getName(), ActionExecutorException.ErrorType.TRANSIENT, " JA006");
registerError(JDOMException.class.getName(), ActionExecutorException.ErrorType.ERROR, "JA007");
registerError(FileNotFoundException.class.getName(), ActionExecutorException.ErrorType.ERROR, "JA008");
registerError(IOException.class.getName(), ActionExecutorException.ErrorType.TRANSIENT, "JA009");
}
/**
* Get the maximum allowed size of stats
*
* @return maximum size of stats
*/
public static int getMaxExternalStatsSize() {
return maxExternalStatsSize;
}
static void checkForDisallowedProps(Configuration conf, String confName) throws ActionExecutorException {
for (String prop : DISALLOWED_PROPERTIES) {
if (conf.get(prop) != null) {
throw new ActionExecutorException(ActionExecutorException.ErrorType.FAILED, "JA010",
"Property [{0}] not allowed in action [{1}] configuration", prop, confName);
}
}
}
public JobConf createBaseHadoopConf(Context context, Element actionXml) {
return createBaseHadoopConf(context, actionXml, true);
}
protected JobConf createBaseHadoopConf(Context context, Element actionXml, boolean loadResources) {
Namespace ns = actionXml.getNamespace();
String jobTracker = actionXml.getChild("job-tracker", ns).getTextTrim();
String nameNode = actionXml.getChild("name-node", ns).getTextTrim();
JobConf conf = null;
if (loadResources) {
conf = Services.get().get(HadoopAccessorService.class).createJobConf(jobTracker);
}
else {
conf = new JobConf(false);
}
conf.set(HADOOP_USER, context.getProtoActionConf().get(WorkflowAppService.HADOOP_USER));
conf.set(HADOOP_JOB_TRACKER, jobTracker);
conf.set(HADOOP_JOB_TRACKER_2, jobTracker);
conf.set(HADOOP_YARN_RM, jobTracker);
conf.set(HADOOP_NAME_NODE, nameNode);
conf.set("mapreduce.fileoutputcommitter.marksuccessfuljobs", "true");
return conf;
}
protected JobConf loadHadoopDefaultResources(Context context, Element actionXml) {
return createBaseHadoopConf(context, actionXml);
}
private static void injectLauncherProperties(Configuration srcConf, Configuration launcherConf) {
for (Map.Entry<String, String> entry : srcConf) {
if (entry.getKey().startsWith("oozie.launcher.")) {
String name = entry.getKey().substring("oozie.launcher.".length());
String value = entry.getValue();
// setting original KEY
launcherConf.set(entry.getKey(), value);
// setting un-prefixed key (to allow Hadoop job config
// for the launcher job
launcherConf.set(name, value);
}
}
}
Configuration setupLauncherConf(Configuration conf, Element actionXml, Path appPath, Context context)
throws ActionExecutorException {
try {
Namespace ns = actionXml.getNamespace();
XConfiguration launcherConf = new XConfiguration();
// Inject action defaults for launcher
HadoopAccessorService has = Services.get().get(HadoopAccessorService.class);
XConfiguration actionDefaultConf = has.createActionDefaultConf(conf.get(HADOOP_JOB_TRACKER), getType());
injectLauncherProperties(actionDefaultConf, launcherConf);
// Inject <job-xml> and <configuration> for launcher
try {
parseJobXmlAndConfiguration(context, actionXml, appPath, launcherConf, true);
} catch (HadoopAccessorException ex) {
throw convertException(ex);
} catch (URISyntaxException ex) {
throw convertException(ex);
}
// Inject use uber mode for launcher
injectLauncherUseUberMode(launcherConf);
XConfiguration.copy(launcherConf, conf);
checkForDisallowedProps(launcherConf, "launcher configuration");
// Inject config-class for launcher to use for action
Element e = actionXml.getChild("config-class", ns);
if (e != null) {
conf.set(LauncherMapper.OOZIE_ACTION_CONFIG_CLASS, e.getTextTrim());
}
return conf;
}
catch (IOException ex) {
throw convertException(ex);
}
}
void injectLauncherUseUberMode(Configuration launcherConf) {
// Set Uber Mode for the launcher (YARN only, ignored by MR1)
// Priority:
// 1. action's <configuration>
// 2. oozie.action.#action-type#.launcher.mapreduce.job.ubertask.enable
// 3. oozie.action.launcher.mapreduce.job.ubertask.enable
if (launcherConf.get(HADOOP_YARN_UBER_MODE) == null) {
if (ConfigurationService.get("oozie.action." + getType() + ".launcher." + HADOOP_YARN_UBER_MODE).length() > 0) {
if (ConfigurationService.getBoolean("oozie.action." + getType() + ".launcher." + HADOOP_YARN_UBER_MODE)) {
launcherConf.setBoolean(HADOOP_YARN_UBER_MODE, true);
}
} else {
if (ConfigurationService.getBoolean("oozie.action.launcher." + HADOOP_YARN_UBER_MODE)) {
launcherConf.setBoolean(HADOOP_YARN_UBER_MODE, true);
}
}
}
}
void injectLauncherTimelineServiceEnabled(Configuration launcherConf, Configuration actionConf) {
// Getting delegation token for ATS. If tez-site.xml is present in distributed cache, turn on timeline service.
if (actionConf.get("oozie.launcher." + HADOOP_YARN_TIMELINE_SERVICE_ENABLED) == null
&& ConfigurationService.getBoolean("oozie.action.launcher." + HADOOP_YARN_TIMELINE_SERVICE_ENABLED)) {
String cacheFiles = launcherConf.get("mapred.cache.files");
if (cacheFiles != null && cacheFiles.contains("tez-site.xml")) {
launcherConf.setBoolean(HADOOP_YARN_TIMELINE_SERVICE_ENABLED, true);
}
}
}
void updateConfForUberMode(Configuration launcherConf) {
// child.env
boolean hasConflictEnv = false;
String launcherMapEnv = launcherConf.get(HADOOP_MAP_JAVA_ENV);
if (launcherMapEnv == null) {
launcherMapEnv = launcherConf.get(HADOOP_CHILD_JAVA_ENV);
}
String amEnv = launcherConf.get(YARN_AM_ENV);
StringBuffer envStr = new StringBuffer();
HashMap<String, List<String>> amEnvMap = null;
HashMap<String, List<String>> launcherMapEnvMap = null;
if (amEnv != null) {
envStr.append(amEnv);
amEnvMap = populateEnvMap(amEnv);
}
if (launcherMapEnv != null) {
launcherMapEnvMap = populateEnvMap(launcherMapEnv);
if (amEnvMap != null) {
Iterator<String> envKeyItr = launcherMapEnvMap.keySet().iterator();
while (envKeyItr.hasNext()) {
String envKey = envKeyItr.next();
if (amEnvMap.containsKey(envKey)) {
List<String> amValList = amEnvMap.get(envKey);
List<String> launcherValList = launcherMapEnvMap.get(envKey);
Iterator<String> valItr = launcherValList.iterator();
while (valItr.hasNext()) {
String val = valItr.next();
if (!amValList.contains(val)) {
hasConflictEnv = true;
break;
}
else {
valItr.remove();
}
}
if (launcherValList.isEmpty()) {
envKeyItr.remove();
}
}
}
}
}
if (hasConflictEnv) {
launcherConf.setBoolean(HADOOP_YARN_UBER_MODE, false);
}
else {
if (launcherMapEnvMap != null) {
for (String key : launcherMapEnvMap.keySet()) {
List<String> launcherValList = launcherMapEnvMap.get(key);
for (String val : launcherValList) {
if (envStr.length() > 0) {
envStr.append(",");
}
envStr.append(key).append("=").append(val);
}
}
}
launcherConf.set(YARN_AM_ENV, envStr.toString());
// memory.mb
int launcherMapMemoryMB = launcherConf.getInt(HADOOP_MAP_MEMORY_MB, 1536);
int amMemoryMB = launcherConf.getInt(YARN_AM_RESOURCE_MB, 1536);
// YARN_MEMORY_MB_MIN to provide buffer.
// suppose launcher map aggressively use high memory, need some
// headroom for AM
int memoryMB = Math.max(launcherMapMemoryMB, amMemoryMB) + YARN_MEMORY_MB_MIN;
// limit to 4096 in case of 32 bit
if (launcherMapMemoryMB < 4096 && amMemoryMB < 4096 && memoryMB > 4096) {
memoryMB = 4096;
}
launcherConf.setInt(YARN_AM_RESOURCE_MB, memoryMB);
// We already made mapred.child.java.opts and
// mapreduce.map.java.opts equal, so just start with one of them
String launcherMapOpts = launcherConf.get(HADOOP_MAP_JAVA_OPTS, "");
String amChildOpts = launcherConf.get(YARN_AM_COMMAND_OPTS);
StringBuilder optsStr = new StringBuilder();
int heapSizeForMap = extractHeapSizeMB(launcherMapOpts);
int heapSizeForAm = extractHeapSizeMB(amChildOpts);
int heapSize = Math.max(heapSizeForMap, heapSizeForAm) + YARN_MEMORY_MB_MIN;
// limit to 3584 in case of 32 bit
if (heapSizeForMap < 4096 && heapSizeForAm < 4096 && heapSize > 3584) {
heapSize = 3584;
}
if (amChildOpts != null) {
optsStr.append(amChildOpts);
}
optsStr.append(" ").append(launcherMapOpts.trim());
if (heapSize > 0) {
// append calculated total heap size to the end
optsStr.append(" ").append("-Xmx").append(heapSize).append("m");
}
launcherConf.set(YARN_AM_COMMAND_OPTS, optsStr.toString().trim());
}
}
void updateConfForJavaTmpDir(Configuration conf) {
String amChildOpts = conf.get(YARN_AM_COMMAND_OPTS);
String oozieJavaTmpDirSetting = "-Djava.io.tmpdir=./tmp";
if (amChildOpts != null && !amChildOpts.contains(JAVA_TMP_DIR_SETTINGS)) {
conf.set(YARN_AM_COMMAND_OPTS, amChildOpts + " " + oozieJavaTmpDirSetting);
}
}
private HashMap<String, List<String>> populateEnvMap(String input) {
HashMap<String, List<String>> envMaps = new HashMap<String, List<String>>();
String[] envEntries = input.split(",");
for (String envEntry : envEntries) {
String[] envKeyVal = envEntry.split("=");
String envKey = envKeyVal[0].trim();
List<String> valList = envMaps.get(envKey);
if (valList == null) {
valList = new ArrayList<String>();
}
valList.add(envKeyVal[1].trim());
envMaps.put(envKey, valList);
}
return envMaps;
}
public int extractHeapSizeMB(String input) {
int ret = 0;
if(input == null || input.equals(""))
return ret;
Matcher m = heapPattern.matcher(input);
String heapStr = null;
String heapNum = null;
// Grabs the last match which takes effect (in case that multiple Xmx options specified)
while (m.find()) {
heapStr = m.group(1);
heapNum = m.group(2);
}
if (heapStr != null) {
// when Xmx specified in Gigabyte
if(heapStr.endsWith("g") || heapStr.endsWith("G")) {
ret = Integer.parseInt(heapNum) * 1024;
} else {
ret = Integer.parseInt(heapNum);
}
}
return ret;
}
public static void parseJobXmlAndConfiguration(Context context, Element element, Path appPath, Configuration conf)
throws IOException, ActionExecutorException, HadoopAccessorException, URISyntaxException {
parseJobXmlAndConfiguration(context, element, appPath, conf, false);
}
public static void parseJobXmlAndConfiguration(Context context, Element element, Path appPath, Configuration conf,
boolean isLauncher) throws IOException, ActionExecutorException, HadoopAccessorException, URISyntaxException {
Namespace ns = element.getNamespace();
Iterator<Element> it = element.getChildren("job-xml", ns).iterator();
HashMap<String, FileSystem> filesystemsMap = new HashMap<String, FileSystem>();
HadoopAccessorService has = Services.get().get(HadoopAccessorService.class);
while (it.hasNext()) {
Element e = it.next();
String jobXml = e.getTextTrim();
Path pathSpecified = new Path(jobXml);
Path path = pathSpecified.isAbsolute() ? pathSpecified : new Path(appPath, jobXml);
FileSystem fs;
if (filesystemsMap.containsKey(path.toUri().getAuthority())) {
fs = filesystemsMap.get(path.toUri().getAuthority());
}
else {
if (path.toUri().getAuthority() != null) {
fs = has.createFileSystem(context.getWorkflow().getUser(), path.toUri(),
has.createJobConf(path.toUri().getAuthority()));
}
else {
fs = context.getAppFileSystem();
}
filesystemsMap.put(path.toUri().getAuthority(), fs);
}
Configuration jobXmlConf = new XConfiguration(fs.open(path));
try {
String jobXmlConfString = XmlUtils.prettyPrint(jobXmlConf).toString();
jobXmlConfString = XmlUtils.removeComments(jobXmlConfString);
jobXmlConfString = context.getELEvaluator().evaluate(jobXmlConfString, String.class);
jobXmlConf = new XConfiguration(new StringReader(jobXmlConfString));
}
catch (ELEvaluationException ex) {
throw new ActionExecutorException(ActionExecutorException.ErrorType.TRANSIENT, "EL_EVAL_ERROR", ex
.getMessage(), ex);
}
catch (Exception ex) {
context.setErrorInfo("EL_ERROR", ex.getMessage());
}
checkForDisallowedProps(jobXmlConf, "job-xml");
if (isLauncher) {
injectLauncherProperties(jobXmlConf, conf);
} else {
XConfiguration.copy(jobXmlConf, conf);
}
}
Element e = element.getChild("configuration", ns);
if (e != null) {
String strConf = XmlUtils.prettyPrint(e).toString();
XConfiguration inlineConf = new XConfiguration(new StringReader(strConf));
checkForDisallowedProps(inlineConf, "inline configuration");
if (isLauncher) {
injectLauncherProperties(inlineConf, conf);
} else {
XConfiguration.copy(inlineConf, conf);
}
}
}
Configuration setupActionConf(Configuration actionConf, Context context, Element actionXml, Path appPath)
throws ActionExecutorException {
try {
HadoopAccessorService has = Services.get().get(HadoopAccessorService.class);
XConfiguration actionDefaults = has.createActionDefaultConf(actionConf.get(HADOOP_JOB_TRACKER), getType());
XConfiguration.injectDefaults(actionDefaults, actionConf);
has.checkSupportedFilesystem(appPath.toUri());
// Set the Java Main Class for the Java action to give to the Java launcher
setJavaMain(actionConf, actionXml);
parseJobXmlAndConfiguration(context, actionXml, appPath, actionConf);
// set cancel.delegation.token in actionConf that child job doesn't cancel delegation token
actionConf.setBoolean("mapreduce.job.complete.cancel.delegation.tokens", false);
updateConfForJavaTmpDir(actionConf);
setRootLoggerLevel(actionConf);
return actionConf;
}
catch (IOException ex) {
throw convertException(ex);
}
catch (HadoopAccessorException ex) {
throw convertException(ex);
}
catch (URISyntaxException ex) {
throw convertException(ex);
}
}
/**
* Set root log level property in actionConf
* @param actionConf
*/
void setRootLoggerLevel(Configuration actionConf) {
String oozieActionTypeRootLogger = "oozie.action." + getType() + LauncherMapper.ROOT_LOGGER_LEVEL;
String oozieActionRootLogger = "oozie.action." + LauncherMapper.ROOT_LOGGER_LEVEL;
// check if root log level has already mentioned in action configuration
String rootLogLevel = actionConf.get(oozieActionTypeRootLogger, actionConf.get(oozieActionRootLogger));
if (rootLogLevel != null) {
// root log level is mentioned in action configuration
return;
}
// set the root log level which is mentioned in oozie default
rootLogLevel = ConfigurationService.get(oozieActionTypeRootLogger);
if (rootLogLevel != null && rootLogLevel.length() > 0) {
actionConf.set(oozieActionRootLogger, rootLogLevel);
}
else {
rootLogLevel = ConfigurationService.get(oozieActionRootLogger);
if (rootLogLevel != null && rootLogLevel.length() > 0) {
actionConf.set(oozieActionRootLogger, rootLogLevel);
}
}
}
Configuration addToCache(Configuration conf, Path appPath, String filePath, boolean archive)
throws ActionExecutorException {
URI uri = null;
try {
uri = new URI(filePath);
URI baseUri = appPath.toUri();
if (uri.getScheme() == null) {
String resolvedPath = uri.getPath();
if (!resolvedPath.startsWith("/")) {
resolvedPath = baseUri.getPath() + "/" + resolvedPath;
}
uri = new URI(baseUri.getScheme(), baseUri.getAuthority(), resolvedPath, uri.getQuery(), uri.getFragment());
}
if (archive) {
DistributedCache.addCacheArchive(uri.normalize(), conf);
}
else {
String fileName = filePath.substring(filePath.lastIndexOf("/") + 1);
if (fileName.endsWith(".so") || fileName.contains(".so.")) { // .so files
uri = new URI(uri.getScheme(), uri.getAuthority(), uri.getPath(), uri.getQuery(), fileName);
DistributedCache.addCacheFile(uri.normalize(), conf);
}
else if (fileName.endsWith(".jar")) { // .jar files
if (!fileName.contains("#")) {
String user = conf.get("user.name");
Path pathToAdd = new Path(uri.normalize());
Services.get().get(HadoopAccessorService.class).addFileToClassPath(user, pathToAdd, conf);
}
else {
DistributedCache.addCacheFile(uri.normalize(), conf);
}
}
else { // regular files
if (!fileName.contains("#")) {
uri = new URI(uri.getScheme(), uri.getAuthority(), uri.getPath(), uri.getQuery(), fileName);
}
DistributedCache.addCacheFile(uri.normalize(), conf);
}
}
DistributedCache.createSymlink(conf);
return conf;
}
catch (Exception ex) {
LOG.debug(
"Errors when add to DistributedCache. Path=" + uri.toString() + ", archive=" + archive + ", conf="
+ XmlUtils.prettyPrint(conf).toString());
throw convertException(ex);
}
}
public void prepareActionDir(FileSystem actionFs, Context context) throws ActionExecutorException {
try {
Path actionDir = context.getActionDir();
Path tempActionDir = new Path(actionDir.getParent(), actionDir.getName() + ".tmp");
if (!actionFs.exists(actionDir)) {
try {
actionFs.mkdirs(tempActionDir);
actionFs.rename(tempActionDir, actionDir);
}
catch (IOException ex) {
actionFs.delete(tempActionDir, true);
actionFs.delete(actionDir, true);
throw ex;
}
}
}
catch (Exception ex) {
throw convertException(ex);
}
}
void cleanUpActionDir(FileSystem actionFs, Context context) throws ActionExecutorException {
try {
Path actionDir = context.getActionDir();
if (!context.getProtoActionConf().getBoolean("oozie.action.keep.action.dir", false)
&& actionFs.exists(actionDir)) {
actionFs.delete(actionDir, true);
}
}
catch (Exception ex) {
throw convertException(ex);
}
}
protected void addShareLib(Configuration conf, String[] actionShareLibNames)
throws ActionExecutorException {
Set<String> confSet = new HashSet<String>(Arrays.asList(getShareLibFilesForActionConf() == null ? new String[0]
: getShareLibFilesForActionConf()));
Set<Path> sharelibList = new HashSet<Path>();
if (actionShareLibNames != null) {
try {
ShareLibService shareLibService = Services.get().get(ShareLibService.class);
FileSystem fs = shareLibService.getFileSystem();
if (fs != null) {
for (String actionShareLibName : actionShareLibNames) {
List<Path> listOfPaths = shareLibService.getShareLibJars(actionShareLibName);
if (listOfPaths != null && !listOfPaths.isEmpty()) {
for (Path actionLibPath : listOfPaths) {
String fragmentName = new URI(actionLibPath.toString()).getFragment();
String fileName = fragmentName == null ? actionLibPath.getName() : fragmentName;
if (confSet.contains(fileName)) {
Configuration jobXmlConf = shareLibService.getShareLibConf(actionShareLibName,
actionLibPath);
if (jobXmlConf != null) {
checkForDisallowedProps(jobXmlConf, actionLibPath.getName());
XConfiguration.injectDefaults(jobXmlConf, conf);
LOG.trace("Adding properties of " + actionLibPath + " to job conf");
}
}
else {
// Filtering out duplicate jars or files
sharelibList.add(new Path(actionLibPath.toUri()) {
@Override
public int hashCode() {
return getName().hashCode();
}
@Override
public String getName() {
try {
return (new URI(toString())).getFragment() == null ? new Path(toUri()).getName()
: (new URI(toString())).getFragment();
}
catch (URISyntaxException e) {
throw new RuntimeException(e);
}
}
@Override
public boolean equals(Object input) {
if (input == null) {
return false;
}
if (input == this) {
return true;
}
if (!(input instanceof Path)) {
return false;
}
return getName().equals(((Path) input).getName());
}
});
}
}
}
}
}
for (Path libPath : sharelibList) {
addToCache(conf, libPath, libPath.toUri().getPath(), false);
}
}
catch (URISyntaxException ex) {
throw new ActionExecutorException(ActionExecutorException.ErrorType.FAILED, "Error configuring sharelib",
ex.getMessage());
}
catch (IOException ex) {
throw new ActionExecutorException(ActionExecutorException.ErrorType.FAILED, "It should never happen",
ex.getMessage());
}
}
}
protected void addSystemShareLibForAction(Configuration conf) throws ActionExecutorException {
ShareLibService shareLibService = Services.get().get(ShareLibService.class);
// ShareLibService is null for test cases
if (shareLibService != null) {
try {
List<Path> listOfPaths = shareLibService.getSystemLibJars(JavaActionExecutor.OOZIE_COMMON_LIBDIR);
if (listOfPaths == null || listOfPaths.isEmpty()) {
throw new ActionExecutorException(ActionExecutorException.ErrorType.FAILED, "EJ001",
"Could not locate Oozie sharelib");
}
FileSystem fs = listOfPaths.get(0).getFileSystem(conf);
for (Path actionLibPath : listOfPaths) {
JobUtils.addFileToClassPath(actionLibPath, conf, fs);
DistributedCache.createSymlink(conf);
}
listOfPaths = shareLibService.getSystemLibJars(getType());
if (listOfPaths != null) {
for (Path actionLibPath : listOfPaths) {
JobUtils.addFileToClassPath(actionLibPath, conf, fs);
DistributedCache.createSymlink(conf);
}
}
}
catch (IOException ex) {
throw new ActionExecutorException(ActionExecutorException.ErrorType.FAILED, "It should never happen",
ex.getMessage());
}
}
}
protected void addActionLibs(Path appPath, Configuration conf) throws ActionExecutorException {
String[] actionLibsStrArr = conf.getStrings("oozie.launcher.oozie.libpath");
if (actionLibsStrArr != null) {
try {
for (String actionLibsStr : actionLibsStrArr) {
actionLibsStr = actionLibsStr.trim();
if (actionLibsStr.length() > 0)
{
Path actionLibsPath = new Path(actionLibsStr);
String user = conf.get("user.name");
FileSystem fs = Services.get().get(HadoopAccessorService.class).createFileSystem(user, appPath.toUri(), conf);
if (fs.exists(actionLibsPath)) {
FileStatus[] files = fs.listStatus(actionLibsPath);
for (FileStatus file : files) {
addToCache(conf, appPath, file.getPath().toUri().getPath(), false);
}
}
}
}
}
catch (HadoopAccessorException ex){
throw new ActionExecutorException(ActionExecutorException.ErrorType.FAILED,
ex.getErrorCode().toString(), ex.getMessage());
}
catch (IOException ex){
throw new ActionExecutorException(ActionExecutorException.ErrorType.FAILED,
"It should never happen", ex.getMessage());
}
}
}
@SuppressWarnings("unchecked")
public void setLibFilesArchives(Context context, Element actionXml, Path appPath, Configuration conf)
throws ActionExecutorException {
Configuration proto = context.getProtoActionConf();
// Workflow lib/
String[] paths = proto.getStrings(WorkflowAppService.APP_LIB_PATH_LIST);
if (paths != null) {
for (String path : paths) {
addToCache(conf, appPath, path, false);
}
}
// Action libs
addActionLibs(appPath, conf);
// files and archives defined in the action
for (Element eProp : (List<Element>) actionXml.getChildren()) {
if (eProp.getName().equals("file")) {
String[] filePaths = eProp.getTextTrim().split(",");
for (String path : filePaths) {
addToCache(conf, appPath, path.trim(), false);
}
}
else if (eProp.getName().equals("archive")) {
String[] archivePaths = eProp.getTextTrim().split(",");
for (String path : archivePaths){
addToCache(conf, appPath, path.trim(), true);
}
}
}
addAllShareLibs(appPath, conf, context, actionXml);
}
// Adds action specific share libs and common share libs
private void addAllShareLibs(Path appPath, Configuration conf, Context context, Element actionXml)
throws ActionExecutorException {
// Add action specific share libs
addActionShareLib(appPath, conf, context, actionXml);
// Add common sharelibs for Oozie and launcher jars
addSystemShareLibForAction(conf);
}
private void addActionShareLib(Path appPath, Configuration conf, Context context, Element actionXml)
throws ActionExecutorException {
XConfiguration wfJobConf = null;
try {
wfJobConf = getWorkflowConf(context);
}
catch (IOException ioe) {
throw new ActionExecutorException(ActionExecutorException.ErrorType.FAILED, "It should never happen",
ioe.getMessage());
}
// Action sharelibs are only added if user has specified to use system libpath
if (conf.get(OozieClient.USE_SYSTEM_LIBPATH) == null) {
if (wfJobConf.getBoolean(OozieClient.USE_SYSTEM_LIBPATH,
ConfigurationService.getBoolean(OozieClient.USE_SYSTEM_LIBPATH))) {
// add action specific sharelibs
addShareLib(conf, getShareLibNames(context, actionXml, conf));
}
}
else {
if (conf.getBoolean(OozieClient.USE_SYSTEM_LIBPATH, false)) {
// add action specific sharelibs
addShareLib(conf, getShareLibNames(context, actionXml, conf));
}
}
}
protected String getLauncherMain(Configuration launcherConf, Element actionXml) {
return launcherConf.get(LauncherMapper.CONF_OOZIE_ACTION_MAIN_CLASS, JavaMain.class.getName());
}
private void setJavaMain(Configuration actionConf, Element actionXml) {
Namespace ns = actionXml.getNamespace();
Element e = actionXml.getChild("main-class", ns);
if (e != null) {
actionConf.set(JavaMain.JAVA_MAIN_CLASS, e.getTextTrim());
}
}
private static final String QUEUE_NAME = "mapred.job.queue.name";
private static final Set<String> SPECIAL_PROPERTIES = new HashSet<String>();
static {
SPECIAL_PROPERTIES.add(QUEUE_NAME);
SPECIAL_PROPERTIES.add(ACL_VIEW_JOB);
SPECIAL_PROPERTIES.add(ACL_MODIFY_JOB);
}
@SuppressWarnings("unchecked")
JobConf createLauncherConf(FileSystem actionFs, Context context, WorkflowAction action, Element actionXml, Configuration actionConf)
throws ActionExecutorException {
try {
// app path could be a file
Path appPathRoot = new Path(context.getWorkflow().getAppPath());
if (actionFs.isFile(appPathRoot)) {
appPathRoot = appPathRoot.getParent();
}
// launcher job configuration
JobConf launcherJobConf = createBaseHadoopConf(context, actionXml);
// cancel delegation token on a launcher job which stays alive till child job(s) finishes
// otherwise (in mapred action), doesn't cancel not to disturb running child job
launcherJobConf.setBoolean("mapreduce.job.complete.cancel.delegation.tokens", true);
setupLauncherConf(launcherJobConf, actionXml, appPathRoot, context);
// Properties for when a launcher job's AM gets restarted
if (ConfigurationService.getBoolean(HADOOP_YARN_KILL_CHILD_JOBS_ON_AMRESTART)) {
// launcher time filter is required to prune the search of launcher tag.
// Setting coordinator action nominal time as launcher time as it child job cannot launch before nominal
// time. Workflow created time is good enough when workflow is running independently or workflow is
// rerunning from failed node.
long launcherTime = System.currentTimeMillis();
String coordActionNominalTime = context.getProtoActionConf().get(
CoordActionStartXCommand.OOZIE_COORD_ACTION_NOMINAL_TIME);
if (coordActionNominalTime != null) {
launcherTime = Long.parseLong(coordActionNominalTime);
}
else if (context.getWorkflow().getCreatedTime() != null) {
launcherTime = context.getWorkflow().getCreatedTime().getTime();
}
String actionYarnTag = getActionYarnTag(getWorkflowConf(context), context.getWorkflow(), action);
LauncherMapperHelper.setupYarnRestartHandling(launcherJobConf, actionConf, actionYarnTag, launcherTime);
}
else {
LOG.info(MessageFormat.format("{0} is set to false, not setting YARN restart properties",
HADOOP_YARN_KILL_CHILD_JOBS_ON_AMRESTART));
}
String actionShareLibProperty = actionConf.get(ACTION_SHARELIB_FOR + getType());
if (actionShareLibProperty != null) {
launcherJobConf.set(ACTION_SHARELIB_FOR + getType(), actionShareLibProperty);
}
setLibFilesArchives(context, actionXml, appPathRoot, launcherJobConf);
String jobName = launcherJobConf.get(HADOOP_JOB_NAME);
if (jobName == null || jobName.isEmpty()) {
jobName = XLog.format(
"oozie:launcher:T={0}:W={1}:A={2}:ID={3}", getType(),
context.getWorkflow().getAppName(), action.getName(),
context.getWorkflow().getId());
launcherJobConf.setJobName(jobName);
}
// Inject Oozie job information if enabled.
injectJobInfo(launcherJobConf, actionConf, context, action);
injectLauncherCallback(context, launcherJobConf);
String jobId = context.getWorkflow().getId();
String actionId = action.getId();
Path actionDir = context.getActionDir();
String recoveryId = context.getRecoveryId();
// Getting the prepare XML from the action XML
Namespace ns = actionXml.getNamespace();
Element prepareElement = actionXml.getChild("prepare", ns);
String prepareXML = "";
if (prepareElement != null) {
if (prepareElement.getChildren().size() > 0) {
prepareXML = XmlUtils.prettyPrint(prepareElement).toString().trim();
}
}
LauncherMapperHelper.setupLauncherInfo(launcherJobConf, jobId, actionId, actionDir, recoveryId, actionConf,
prepareXML);
// Set the launcher Main Class
LauncherMapperHelper.setupMainClass(launcherJobConf, getLauncherMain(launcherJobConf, actionXml));
LauncherMapperHelper.setupLauncherURIHandlerConf(launcherJobConf);
LauncherMapperHelper.setupMaxOutputData(launcherJobConf, maxActionOutputLen);
LauncherMapperHelper.setupMaxExternalStatsSize(launcherJobConf, maxExternalStatsSize);
LauncherMapperHelper.setupMaxFSGlob(launcherJobConf, maxFSGlobMax);
List<Element> list = actionXml.getChildren("arg", ns);
String[] args = new String[list.size()];
for (int i = 0; i < list.size(); i++) {
args[i] = list.get(i).getTextTrim();
}
LauncherMapperHelper.setupMainArguments(launcherJobConf, args);
// backward compatibility flag - see OOZIE-2872
if (ConfigurationService.getBoolean(LauncherMapper.CONF_OOZIE_NULL_ARGS_ALLOWED)) {
launcherJobConf.setBoolean(LauncherMapper.CONF_OOZIE_NULL_ARGS_ALLOWED, true);
} else {
launcherJobConf.setBoolean(LauncherMapper.CONF_OOZIE_NULL_ARGS_ALLOWED, false);
}
// Make mapred.child.java.opts and mapreduce.map.java.opts equal, but give values from the latter priority; also append
// <java-opt> and <java-opts> and give those highest priority
StringBuilder opts = new StringBuilder(launcherJobConf.get(HADOOP_CHILD_JAVA_OPTS, ""));
if (launcherJobConf.get(HADOOP_MAP_JAVA_OPTS) != null) {
opts.append(" ").append(launcherJobConf.get(HADOOP_MAP_JAVA_OPTS));
}
List<Element> javaopts = actionXml.getChildren("java-opt", ns);
for (Element opt: javaopts) {
opts.append(" ").append(opt.getTextTrim());
}
Element opt = actionXml.getChild("java-opts", ns);
if (opt != null) {
opts.append(" ").append(opt.getTextTrim());
}
launcherJobConf.set(HADOOP_CHILD_JAVA_OPTS, opts.toString().trim());
launcherJobConf.set(HADOOP_MAP_JAVA_OPTS, opts.toString().trim());
// setting for uber mode
if (launcherJobConf.getBoolean(HADOOP_YARN_UBER_MODE, false)) {
if (checkPropertiesToDisableUber(launcherJobConf)) {
launcherJobConf.setBoolean(HADOOP_YARN_UBER_MODE, false);
}
else {
updateConfForUberMode(launcherJobConf);
}
}
updateConfForJavaTmpDir(launcherJobConf);
injectLauncherTimelineServiceEnabled(launcherJobConf, actionConf);
// properties from action that are needed by the launcher (e.g. QUEUE NAME, ACLs)
// maybe we should add queue to the WF schema, below job-tracker
actionConfToLauncherConf(actionConf, launcherJobConf);
return launcherJobConf;
}
catch (Exception ex) {
throw convertException(ex);
}
}
private boolean checkPropertiesToDisableUber(Configuration launcherConf) {
boolean disable = false;
if (launcherConf.getBoolean(HADOOP_JOB_CLASSLOADER, false)) {
disable = true;
}
else if (launcherConf.getBoolean(HADOOP_USER_CLASSPATH_FIRST, false)) {
disable = true;
}
return disable;
}
protected void injectCallback(Context context, Configuration conf) {
String callback = context.getCallbackUrl("$jobStatus");
if (conf.get("job.end.notification.url") != null) {
LOG.warn("Overriding the action job end notification URI");
}
conf.set("job.end.notification.url", callback);
}
void injectActionCallback(Context context, Configuration actionConf) {
// action callback needs to be injected only for mapreduce actions.
}
void injectLauncherCallback(Context context, Configuration launcherConf) {
injectCallback(context, launcherConf);
}
private void actionConfToLauncherConf(Configuration actionConf, JobConf launcherConf) {
for (String name : SPECIAL_PROPERTIES) {
if (actionConf.get(name) != null && launcherConf.get("oozie.launcher." + name) == null) {
launcherConf.set(name, actionConf.get(name));
}
}
}
public void submitLauncher(FileSystem actionFs, Context context, WorkflowAction action) throws ActionExecutorException {
JobClient jobClient = null;
boolean exception = false;
try {
Path appPathRoot = new Path(context.getWorkflow().getAppPath());
// app path could be a file
if (actionFs.isFile(appPathRoot)) {
appPathRoot = appPathRoot.getParent();
}
Element actionXml = XmlUtils.parseXml(action.getConf());
// action job configuration
Configuration actionConf = loadHadoopDefaultResources(context, actionXml);
setupActionConf(actionConf, context, actionXml, appPathRoot);
LOG.debug("Setting LibFilesArchives ");
setLibFilesArchives(context, actionXml, appPathRoot, actionConf);
String jobName = actionConf.get(HADOOP_JOB_NAME);
if (jobName == null || jobName.isEmpty()) {
jobName = XLog.format("oozie:action:T={0}:W={1}:A={2}:ID={3}",
getType(), context.getWorkflow().getAppName(),
action.getName(), context.getWorkflow().getId());
actionConf.set(HADOOP_JOB_NAME, jobName);
}
injectActionCallback(context, actionConf);
if(actionConf.get(ACL_MODIFY_JOB) == null || actionConf.get(ACL_MODIFY_JOB).trim().equals("")) {
// ONLY in the case where user has not given the
// modify-job ACL specifically
if (context.getWorkflow().getAcl() != null) {
// setting the group owning the Oozie job to allow anybody in that
// group to modify the jobs.
actionConf.set(ACL_MODIFY_JOB, context.getWorkflow().getAcl());
}
}
// Setting the credential properties in launcher conf
JobConf credentialsConf = null;
HashMap<String, CredentialsProperties> credentialsProperties = setCredentialPropertyToActionConf(context,
action, actionConf);
if (credentialsProperties != null) {
// Adding if action need to set more credential tokens
credentialsConf = new JobConf(false);
XConfiguration.copy(actionConf, credentialsConf);
setCredentialTokens(credentialsConf, context, action, credentialsProperties);
// insert conf to action conf from credentialsConf
for (Entry<String, String> entry : credentialsConf) {
if (actionConf.get(entry.getKey()) == null) {
actionConf.set(entry.getKey(), entry.getValue());
}
}
}
JobConf launcherJobConf = createLauncherConf(actionFs, context, action, actionXml, actionConf);
LOG.debug("Creating Job Client for action " + action.getId());
jobClient = createJobClient(context, launcherJobConf);
String launcherId = LauncherMapperHelper.getRecoveryId(launcherJobConf, context.getActionDir(), context
.getRecoveryId());
boolean alreadyRunning = launcherId != null;
RunningJob runningJob;
// if user-retry is on, always submit new launcher
boolean isUserRetry = ((WorkflowActionBean)action).isUserRetry();
if (alreadyRunning && !isUserRetry) {
runningJob = jobClient.getJob(JobID.forName(launcherId));
if (runningJob == null) {
String jobTracker = launcherJobConf.get(HADOOP_JOB_TRACKER);
throw new ActionExecutorException(ActionExecutorException.ErrorType.ERROR, "JA017",
"unknown job [{0}@{1}], cannot recover", launcherId, jobTracker);
}
}
else {
LOG.debug("Submitting the job through Job Client for action " + action.getId());
// setting up propagation of the delegation token.
Services.get().get(HadoopAccessorService.class).addRMDelegationToken(jobClient, launcherJobConf);
// insert credentials tokens to launcher job conf if needed
if (needInjectCredentials() && credentialsConf != null) {
for (Token<? extends TokenIdentifier> tk : credentialsConf.getCredentials().getAllTokens()) {
Text fauxAlias = new Text(tk.getKind() + "_" + tk.getService());
LOG.debug("ADDING TOKEN: " + fauxAlias);
launcherJobConf.getCredentials().addToken(fauxAlias, tk);
}
if (credentialsConf.getCredentials().numberOfSecretKeys() > 0) {
for (Entry<String, CredentialsProperties> entry : credentialsProperties.entrySet()) {
CredentialsProperties credProps = entry.getValue();
if (credProps != null) {
Text credName = new Text(credProps.getName());
byte[] secKey = credentialsConf.getCredentials().getSecretKey(credName);
if (secKey != null) {
LOG.debug("ADDING CREDENTIAL: " + credProps.getName());
launcherJobConf.getCredentials().addSecretKey(credName, secKey);
}
}
}
}
}
else {
LOG.info("No need to inject credentials.");
}
runningJob = jobClient.submitJob(launcherJobConf);
if (runningJob == null) {
throw new ActionExecutorException(ActionExecutorException.ErrorType.ERROR, "JA017",
"Error submitting launcher for action [{0}]", action.getId());
}
launcherId = runningJob.getID().toString();
LOG.debug("After submission get the launcherId " + launcherId);
}
String jobTracker = launcherJobConf.get(HADOOP_JOB_TRACKER);
String consoleUrl = runningJob.getTrackingURL();
context.setStartData(launcherId, jobTracker, consoleUrl);
}
catch (Exception ex) {
exception = true;
throw convertException(ex);
}
finally {
if (jobClient != null) {
try {
jobClient.close();
}
catch (Exception e) {
if (exception) {
LOG.error("JobClient error: ", e);
}
else {
throw convertException(e);
}
}
}
}
}
private boolean needInjectCredentials() {
boolean methodExists = true;
Class klass;
try {
klass = Class.forName("org.apache.hadoop.mapred.JobConf");
klass.getMethod("getCredentials");
}
catch (ClassNotFoundException ex) {
methodExists = false;
}
catch (NoSuchMethodException ex) {
methodExists = false;
}
return methodExists;
}
protected HashMap<String, CredentialsProperties> setCredentialPropertyToActionConf(Context context,
WorkflowAction action, Configuration actionConf) throws Exception {
HashMap<String, CredentialsProperties> credPropertiesMap = null;
if (context != null && action != null) {
if (!"true".equals(actionConf.get(OOZIE_CREDENTIALS_SKIP))) {
XConfiguration wfJobConf = getWorkflowConf(context);
if ("false".equals(actionConf.get(OOZIE_CREDENTIALS_SKIP)) ||
!wfJobConf.getBoolean(OOZIE_CREDENTIALS_SKIP, ConfigurationService.getBoolean(OOZIE_CREDENTIALS_SKIP))) {
credPropertiesMap = getActionCredentialsProperties(context, action);
if (credPropertiesMap != null) {
for (String key : credPropertiesMap.keySet()) {
CredentialsProperties prop = credPropertiesMap.get(key);
if (prop != null) {
LOG.debug("Credential Properties set for action : " + action.getId());
for (String property : prop.getProperties().keySet()) {
actionConf.set(property, prop.getProperties().get(property));
LOG.debug("property : '" + property + "', value : '" + prop.getProperties().get(property)
+ "'");
}
}
}
} else {
LOG.warn("No credential properties found for action : " + action.getId() + ", cred : " + action.getCred());
}
} else {
LOG.info("Skipping credentials (" + OOZIE_CREDENTIALS_SKIP + "=true)");
}
} else {
LOG.info("Skipping credentials (" + OOZIE_CREDENTIALS_SKIP + "=true)");
}
} else {
LOG.warn("context or action is null");
}
return credPropertiesMap;
}
protected void setCredentialTokens(JobConf jobconf, Context context, WorkflowAction action,
HashMap<String, CredentialsProperties> credPropertiesMap) throws Exception {
if (context != null && action != null && credPropertiesMap != null) {
// Make sure we're logged into Kerberos; if not, or near expiration, it will relogin
CredentialsProvider.ensureKerberosLogin();
for (Entry<String, CredentialsProperties> entry : credPropertiesMap.entrySet()) {
String credName = entry.getKey();
CredentialsProperties credProps = entry.getValue();
if (credProps != null) {
CredentialsProvider credProvider = new CredentialsProvider(credProps.getType());
Credentials credentialObject = credProvider.createCredentialObject();
if (credentialObject != null) {
credentialObject.addtoJobConf(jobconf, credProps, context);
LOG.debug("Retrieved Credential '" + credName + "' for action " + action.getId());
}
else {
LOG.debug("Credentials object is null for name= " + credName + ", type=" + credProps.getType());
throw new ActionExecutorException(ActionExecutorException.ErrorType.ERROR, "JA020",
"Could not load credentials of type [{0}] with name [{1}]]; perhaps it was not defined"
+ " in oozie-site.xml?", credProps.getType(), credName);
}
}
}
}
}
protected HashMap<String, CredentialsProperties> getActionCredentialsProperties(Context context,
WorkflowAction action) throws Exception {
HashMap<String, CredentialsProperties> props = new HashMap<String, CredentialsProperties>();
if (context != null && action != null) {
String credsInAction = action.getCred();
if (credsInAction != null) {
LOG.debug("Get credential '" + credsInAction + "' properties for action : " + action.getId());
String[] credNames = credsInAction.split(",");
for (String credName : credNames) {
CredentialsProperties credProps = getCredProperties(context, credName);
props.put(credName, credProps);
}
}
}
else {
LOG.warn("context or action is null");
}
return props;
}
@SuppressWarnings("unchecked")
protected CredentialsProperties getCredProperties(Context context, String credName)
throws Exception {
CredentialsProperties credProp = null;
String workflowXml = ((WorkflowJobBean) context.getWorkflow()).getWorkflowInstance().getApp().getDefinition();
XConfiguration wfjobConf = getWorkflowConf(context);
Element elementJob = XmlUtils.parseXml(workflowXml);
Element credentials = elementJob.getChild("credentials", elementJob.getNamespace());
if (credentials != null) {
for (Element credential : (List<Element>) credentials.getChildren("credential", credentials.getNamespace())) {
String name = credential.getAttributeValue("name");
String type = credential.getAttributeValue("type");
LOG.debug("getCredProperties: Name: " + name + ", Type: " + type);
if (name.equalsIgnoreCase(credName)) {
credProp = new CredentialsProperties(name, type);
for (Element property : (List<Element>) credential.getChildren("property",
credential.getNamespace())) {
String propertyName = property.getChildText("name", property.getNamespace());
String propertyValue = property.getChildText("value", property.getNamespace());
ELEvaluator eval = new ELEvaluator();
for (Map.Entry<String, String> entry : wfjobConf) {
eval.setVariable(entry.getKey(), entry.getValue().trim());
}
propertyName = eval.evaluate(propertyName, String.class);
propertyValue = eval.evaluate(propertyValue, String.class);
credProp.getProperties().put(propertyName, propertyValue);
LOG.debug("getCredProperties: Properties name :'" + propertyName + "', Value : '"
+ propertyValue + "'");
}
}
}
if (credProp == null && credName != null) {
throw new ActionExecutorException(ActionExecutorException.ErrorType.ERROR, "JA021",
"Could not load credentials with name [{0}]].", credName);
}
} else {
LOG.debug("credentials is null for the action");
}
return credProp;
}
@Override
public void start(Context context, WorkflowAction action) throws ActionExecutorException {
LogUtils.setLogInfo(action);
try {
LOG.debug("Starting action " + action.getId() + " getting Action File System");
FileSystem actionFs = context.getAppFileSystem();
LOG.debug("Preparing action Dir through copying " + context.getActionDir());
prepareActionDir(actionFs, context);
LOG.debug("Action Dir is ready. Submitting the action ");
submitLauncher(actionFs, context, action);
LOG.debug("Action submit completed. Performing check ");
check(context, action);
LOG.debug("Action check is done after submission");
}
catch (Exception ex) {
throw convertException(ex);
}
}
@Override
public void end(Context context, WorkflowAction action) throws ActionExecutorException {
try {
String externalStatus = action.getExternalStatus();
WorkflowAction.Status status = externalStatus.equals(SUCCEEDED) ? WorkflowAction.Status.OK
: WorkflowAction.Status.ERROR;
context.setEndData(status, getActionSignal(status));
}
catch (Exception ex) {
throw convertException(ex);
}
finally {
try {
FileSystem actionFs = context.getAppFileSystem();
cleanUpActionDir(actionFs, context);
}
catch (Exception ex) {
throw convertException(ex);
}
}
}
/**
* Create job client object
*
* @param context
* @param jobConf
* @return JobClient
* @throws HadoopAccessorException
*/
protected JobClient createJobClient(Context context, JobConf jobConf) throws HadoopAccessorException {
String user = context.getWorkflow().getUser();
String group = context.getWorkflow().getGroup();
return Services.get().get(HadoopAccessorService.class).createJobClient(user, jobConf);
}
protected RunningJob getRunningJob(Context context, WorkflowAction action, JobClient jobClient) throws Exception{
String externalId = action.getExternalId();
RunningJob runningJob = null;
if (externalId != null) {
runningJob = jobClient.getJob(JobID.forName(externalId));
}
return runningJob;
}
/**
* Useful for overriding in actions that do subsequent job runs
* such as the MapReduce Action, where the launcher job is not the
* actual job that then gets monitored.
*/
protected String getActualExternalId(WorkflowAction action) {
return action.getExternalId();
}
@Override
public void check(Context context, WorkflowAction action) throws ActionExecutorException {
JobClient jobClient = null;
boolean exception = false;
LogUtils.setLogInfo(action);
try {
Element actionXml = XmlUtils.parseXml(action.getConf());
FileSystem actionFs = context.getAppFileSystem();
JobConf jobConf = createBaseHadoopConf(context, actionXml);
jobClient = createJobClient(context, jobConf);
RunningJob runningJob = getRunningJob(context, action, jobClient);
if (runningJob == null) {
context.setExecutionData(FAILED, null);
throw new ActionExecutorException(ActionExecutorException.ErrorType.FAILED, "JA017",
"Could not lookup launched hadoop Job ID [{0}] which was associated with " +
" action [{1}]. Failing this action!", getActualExternalId(action), action.getId());
}
if (runningJob.isComplete()) {
Path actionDir = context.getActionDir();
String newId = null;
// load sequence file into object
Map<String, String> actionData = LauncherMapperHelper.getActionData(actionFs, actionDir, jobConf);
if (actionData.containsKey(LauncherMapper.ACTION_DATA_NEW_ID)) {
newId = actionData.get(LauncherMapper.ACTION_DATA_NEW_ID);
String launcherId = action.getExternalId();
runningJob = jobClient.getJob(JobID.forName(newId));
if (runningJob == null) {
context.setExternalStatus(FAILED);
throw new ActionExecutorException(ActionExecutorException.ErrorType.FAILED, "JA017",
"Unknown hadoop job [{0}] associated with action [{1}]. Failing this action!", newId,
action.getId());
}
context.setExternalChildIDs(newId);
LOG.info(XLog.STD, "External ID swap, old ID [{0}] new ID [{1}]", launcherId,
newId);
}
else {
String externalIDs = actionData.get(LauncherMapper.ACTION_DATA_EXTERNAL_CHILD_IDS);
if (externalIDs != null) {
context.setExternalChildIDs(externalIDs);
LOG.info(XLog.STD, "Hadoop Jobs launched : [{0}]", externalIDs);
}
else if (LauncherMapperHelper.hasOutputData(actionData)) {
// Load stored Hadoop jobs ids and promote them as external child ids
// This is for jobs launched with older release during upgrade to Oozie 4.3
Properties props = PropertiesUtils.stringToProperties(actionData
.get(LauncherMapper.ACTION_DATA_OUTPUT_PROPS));
if (props.get(LauncherMain.HADOOP_JOBS) != null) {
externalIDs = (String) props.get(LauncherMain.HADOOP_JOBS);
context.setExternalChildIDs(externalIDs);
LOG.info(XLog.STD, "Hadoop Jobs launched : [{0}]", externalIDs);
}
}
}
if (runningJob.isComplete()) {
// fetching action output and stats for the Map-Reduce action.
if (newId != null) {
actionData = LauncherMapperHelper.getActionData(actionFs, context.getActionDir(), jobConf);
}
LOG.info(XLog.STD, "action completed, external ID [{0}]",
action.getExternalId());
if (LauncherMapperHelper.isMainSuccessful(runningJob)) {
if (getCaptureOutput(action) && LauncherMapperHelper.hasOutputData(actionData)) {
context.setExecutionData(SUCCEEDED, PropertiesUtils.stringToProperties(actionData
.get(LauncherMapper.ACTION_DATA_OUTPUT_PROPS)));
LOG.info(XLog.STD, "action produced output");
}
else {
context.setExecutionData(SUCCEEDED, null);
}
if (LauncherMapperHelper.hasStatsData(actionData)) {
context.setExecutionStats(actionData.get(LauncherMapper.ACTION_DATA_STATS));
LOG.info(XLog.STD, "action produced stats");
}
getActionData(actionFs, runningJob, action, context);
}
else {
String errorReason;
if (actionData.containsKey(LauncherMapper.ACTION_DATA_ERROR_PROPS)) {
Properties props = PropertiesUtils.stringToProperties(actionData
.get(LauncherMapper.ACTION_DATA_ERROR_PROPS));
String errorCode = props.getProperty("error.code");
if ("0".equals(errorCode)) {
errorCode = "JA018";
}
if ("-1".equals(errorCode)) {
errorCode = "JA019";
}
errorReason = props.getProperty("error.reason");
LOG.warn("Launcher ERROR, reason: {0}", errorReason);
String exMsg = props.getProperty("exception.message");
String errorInfo = (exMsg != null) ? exMsg : errorReason;
context.setErrorInfo(errorCode, errorInfo);
String exStackTrace = props.getProperty("exception.stacktrace");
if (exMsg != null) {
LOG.warn("Launcher exception: {0}{E}{1}", exMsg, exStackTrace);
}
}
else {
errorReason = XLog.format("LauncherMapper died, check Hadoop LOG for job [{0}:{1}]", action
.getTrackerUri(), action.getExternalId());
LOG.warn(errorReason);
}
context.setExecutionData(FAILED_KILLED, null);
}
}
else {
context.setExternalStatus("RUNNING");
LOG.info(XLog.STD, "checking action, hadoop job ID [{0}] status [RUNNING]",
runningJob.getID());
}
}
else {
context.setExternalStatus("RUNNING");
LOG.info(XLog.STD, "checking action, hadoop job ID [{0}] status [RUNNING]",
runningJob.getID());
}
}
catch (Exception ex) {
LOG.warn("Exception in check(). Message[{0}]", ex.getMessage(), ex);
exception = true;
throw convertException(ex);
}
finally {
if (jobClient != null) {
try {
jobClient.close();
}
catch (Exception e) {
if (exception) {
LOG.error("JobClient error: ", e);
}
else {
throw convertException(e);
}
}
}
}
}
/**
* Get the output data of an action. Subclasses should override this method
* to get action specific output data.
*
* @param actionFs the FileSystem object
* @param runningJob the runningJob
* @param action the Workflow action
* @param context executor context
*
*/
protected void getActionData(FileSystem actionFs, RunningJob runningJob, WorkflowAction action, Context context)
throws HadoopAccessorException, JDOMException, IOException, URISyntaxException {
}
protected boolean getCaptureOutput(WorkflowAction action) throws JDOMException {
Element eConf = XmlUtils.parseXml(action.getConf());
Namespace ns = eConf.getNamespace();
Element captureOutput = eConf.getChild("capture-output", ns);
return captureOutput != null;
}
@Override
public void kill(Context context, WorkflowAction action) throws ActionExecutorException {
JobClient jobClient = null;
boolean exception = false;
try {
Element actionXml = XmlUtils.parseXml(action.getConf());
final JobConf jobConf = createBaseHadoopConf(context, actionXml);
WorkflowJob wfJob = context.getWorkflow();
Configuration conf = null;
if ( wfJob.getConf() != null ) {
conf = new XConfiguration(new StringReader(wfJob.getConf()));
}
String launcherTag = LauncherMapperHelper.getActionYarnTag(conf, wfJob.getParentId(), action);
jobConf.set(LauncherMainHadoopUtils.CHILD_MAPREDUCE_JOB_TAGS, LauncherMapperHelper.getTag(launcherTag));
jobConf.set(LauncherMainHadoopUtils.OOZIE_JOB_LAUNCH_TIME, Long.toString(action.getStartTime().getTime()));
UserGroupInformation ugi = Services.get().get(UserGroupInformationService.class)
.getProxyUser(context.getWorkflow().getUser());
ugi.doAs(new PrivilegedExceptionAction<Void>() {
@Override
public Void run() throws Exception {
LauncherMainHadoopUtils.killChildYarnJobs(jobConf);
return null;
}
});
jobClient = createJobClient(context, jobConf);
RunningJob runningJob = getRunningJob(context, action, jobClient);
if (runningJob != null) {
runningJob.killJob();
}
context.setExternalStatus(KILLED);
context.setExecutionData(KILLED, null);
}
catch (Exception ex) {
exception = true;
throw convertException(ex);
}
finally {
try {
FileSystem actionFs = context.getAppFileSystem();
cleanUpActionDir(actionFs, context);
if (jobClient != null) {
jobClient.close();
}
}
catch (Exception ex) {
if (exception) {
LOG.error("Error: ", ex);
}
else {
throw convertException(ex);
}
}
}
}
private static Set<String> FINAL_STATUS = new HashSet<String>();
static {
FINAL_STATUS.add(SUCCEEDED);
FINAL_STATUS.add(KILLED);
FINAL_STATUS.add(FAILED);
FINAL_STATUS.add(FAILED_KILLED);
}
@Override
public boolean isCompleted(String externalStatus) {
return FINAL_STATUS.contains(externalStatus);
}
/**
* Return the sharelib names for the action.
* <p>
* If <code>NULL</code> or empty, it means that the action does not use the action
* sharelib.
* <p>
* If a non-empty string, i.e. <code>foo</code>, it means the action uses the
* action sharelib sub-directory <code>foo</code> and all JARs in the sharelib
* <code>foo</code> directory will be in the action classpath. Multiple sharelib
* sub-directories can be specified as a comma separated list.
* <p>
* The resolution is done using the following precedence order:
* <ul>
* <li><b>action.sharelib.for.#ACTIONTYPE#</b> in the action configuration</li>
* <li><b>action.sharelib.for.#ACTIONTYPE#</b> in the job configuration</li>
* <li><b>action.sharelib.for.#ACTIONTYPE#</b> in the oozie configuration</li>
* <li>Action Executor <code>getDefaultShareLibName()</code> method</li>
* </ul>
*
*
* @param context executor context.
* @param actionXml
* @param conf action configuration.
* @return the action sharelib names.
*/
protected String[] getShareLibNames(Context context, Element actionXml, Configuration conf) {
String[] names = conf.getStrings(ACTION_SHARELIB_FOR + getType());
if (names == null || names.length == 0) {
try {
XConfiguration jobConf = getWorkflowConf(context);
names = jobConf.getStrings(ACTION_SHARELIB_FOR + getType());
if (names == null || names.length == 0) {
names = Services.get().getConf().getStrings(ACTION_SHARELIB_FOR + getType());
if (names == null || names.length == 0) {
String name = getDefaultShareLibName(actionXml);
if (name != null) {
names = new String[] { name };
}
}
}
}
catch (IOException ex) {
throw new RuntimeException("It cannot happen, " + ex.toString(), ex);
}
}
return names;
}
private final static String ACTION_SHARELIB_FOR = "oozie.action.sharelib.for.";
/**
* Returns the default sharelib name for the action if any.
*
* @param actionXml the action XML fragment.
* @return the sharelib name for the action, <code>NULL</code> if none.
*/
protected String getDefaultShareLibName(Element actionXml) {
return null;
}
public String[] getShareLibFilesForActionConf() {
return null;
}
/**
* Sets some data for the action on completion
*
* @param context executor context
* @param actionFs the FileSystem object
*/
protected void setActionCompletionData(Context context, FileSystem actionFs) throws IOException,
HadoopAccessorException, URISyntaxException {
}
private void injectJobInfo(JobConf launcherJobConf, Configuration actionConf, Context context, WorkflowAction action) {
if (OozieJobInfo.isJobInfoEnabled()) {
try {
OozieJobInfo jobInfo = new OozieJobInfo(actionConf, context, action);
String jobInfoStr = jobInfo.getJobInfo();
launcherJobConf.set(OozieJobInfo.JOB_INFO_KEY, jobInfoStr + "launcher=true");
actionConf.set(OozieJobInfo.JOB_INFO_KEY, jobInfoStr + "launcher=false");
}
catch (Exception e) {
// Just job info, should not impact the execution.
LOG.error("Error while populating job info", e);
}
}
}
@Override
public boolean requiresNameNodeJobTracker() {
return true;
}
@Override
public boolean supportsConfigurationJobXML() {
return true;
}
private XConfiguration getWorkflowConf(Context context) throws IOException {
if (workflowConf == null) {
workflowConf = new XConfiguration(new StringReader(context.getWorkflow().getConf()));
}
return workflowConf;
}
}