blob: b281d1ca2ead08efdecbd3a5756a169c3b02d8bd [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.oozie.action.hadoop;
import java.io.BufferedReader;
import java.io.File;
import java.io.FileInputStream;
import java.io.FileOutputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.InputStreamReader;
import java.io.OutputStream;
import java.io.OutputStreamWriter;
import java.io.StringWriter;
import java.io.Writer;
import java.net.URL;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.HashSet;
import java.util.LinkedHashSet;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Properties;
import java.util.Set;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.util.Shell;
import com.google.common.annotations.VisibleForTesting;
import org.apache.commons.lang3.StringUtils;
import org.apache.hadoop.yarn.api.ApplicationClientProtocol;
import org.apache.hadoop.yarn.api.protocolrecords.ApplicationsRequestScope;
import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationsRequest;
import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationsResponse;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.ApplicationReport;
import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
import org.apache.hadoop.yarn.client.ClientRMProxy;
import org.apache.hadoop.yarn.client.api.YarnClient;
import org.apache.hadoop.yarn.exceptions.YarnException;
public abstract class LauncherMain {
public static final String ACTION_PREFIX = "oozie.action.";
public static final String EXTERNAL_CHILD_IDS = ACTION_PREFIX + "externalChildIDs";
public static final String EXTERNAL_ACTION_STATS = ACTION_PREFIX + "stats.properties";
public static final String EXTERNAL_STATS_WRITE = ACTION_PREFIX + "external.stats.write";
public static final String OUTPUT_PROPERTIES = ACTION_PREFIX + "output.properties";
public static final String HADOOP_JOBS = "hadoopJobs";
public static final String MAPREDUCE_JOB_TAGS = "mapreduce.job.tags";
public static final String CHILD_MAPREDUCE_JOB_TAGS = "oozie.child.mapreduce.job.tags";
public static final String OOZIE_JOB_LAUNCH_TIME = "oozie.job.launch.time";
public static final String TEZ_APPLICATION_TAGS = "tez.application.tags";
public static final String SPARK_YARN_TAGS = "spark.yarn.tags";
public static final String PROPAGATION_CONF_XML = "propagation-conf.xml";
protected static String[] HADOOP_SITE_FILES = new String[]
{"core-site.xml", "hdfs-site.xml", "mapred-site.xml", "yarn-site.xml"};
/**
* Hadoop's {@code log4j.properties} found on the classpath, if readable and present.
*/
private static final String HADOOP_LOG4J_LOCATION = "log4j.properties";
/**
* Default {@code log4j.properties}, if Hadoop's one is not present or readable.
* <p>
* Its contents are mostly from Hadoop's {@code default-log4j.properties}.
*/
private static final String DEFAULT_LOG4J_LOCATION = "default-log4j.properties";
@VisibleForTesting
static SystemEnvironment sysenv = new SystemEnvironment();
protected Properties log4jProperties = new Properties();
protected static void run(Class<? extends LauncherMain> klass, String[] args) throws Exception {
LauncherMain main = klass.newInstance();
main.setupLog4jProperties();
main.propagateToHadoopConf();
main.run(args);
}
@VisibleForTesting
protected void setupLog4jProperties() {
if (tryLoadLog4jPropertiesFromResource(HADOOP_LOG4J_LOCATION)) {
return;
}
tryLoadLog4jPropertiesFromResource(DEFAULT_LOG4J_LOCATION);
}
private boolean tryLoadLog4jPropertiesFromResource(final String log4jLocation) {
System.out.println(String.format("INFO: loading log4j config file %s.", log4jLocation));
final URL log4jUrl = Thread.currentThread().getContextClassLoader().getResource(log4jLocation);
if (log4jUrl != null) {
try (final InputStream log4jStream = log4jUrl.openStream()) {
log4jProperties.load(log4jStream);
System.out.println(String.format("INFO: log4j config file %s loaded successfully.", log4jLocation));
return true;
} catch (final IOException e) {
System.out.println(
String.format("WARN: log4j config file %s is not readable. Exception message is: %s",
log4jLocation,
e.getMessage()));
e.printStackTrace(System.out);
}
}
else {
System.out.println(String.format("WARN: log4j config file %s is not present.", log4jLocation));
}
System.out.println(String.format("INFO: log4j config file %s could not be loaded.", log4jLocation));
return false;
}
protected static String getHadoopJobIds(String logFile, Pattern[] patterns) {
Set<String> jobIds = new LinkedHashSet<String>();
if (!new File(logFile).exists()) {
System.err.println("Log file: " + logFile + " not present. Therefore no Hadoop job IDs found.");
}
else {
try (BufferedReader br = new BufferedReader(new InputStreamReader(new FileInputStream(logFile),
StandardCharsets.UTF_8))) {
String line = br.readLine();
while (line != null) {
extractJobIDs(line, patterns, jobIds);
line = br.readLine();
}
} catch (IOException e) {
System.out.println("WARN: Error getting Hadoop Job IDs. logFile: " + logFile);
e.printStackTrace(System.out);
}
}
return jobIds.isEmpty() ? null : StringUtils.join(jobIds, ",");
}
@VisibleForTesting
protected static void extractJobIDs(String line, Pattern[] patterns, Set<String> jobIds) {
Objects.requireNonNull(line);
for (Pattern pattern : patterns) {
Matcher matcher = pattern.matcher(line);
if (matcher.find()) {
String jobId = matcher.group(1);
if (StringUtils.isEmpty(jobId) || jobId.equalsIgnoreCase("NULL")) {
continue;
}
jobId = jobId.replaceAll("application", "job");
jobIds.add(jobId);
}
}
}
protected static void writeExternalChildIDs(String logFile, Pattern[] patterns, String name) {
// Harvesting and recording Hadoop Job IDs
String jobIds = getHadoopJobIds(logFile, patterns);
if (jobIds != null) {
File externalChildIdsFile = new File(System.getProperty(EXTERNAL_CHILD_IDS));
try (OutputStream externalChildIdsStream = new FileOutputStream(externalChildIdsFile)) {
externalChildIdsStream.write(jobIds.getBytes(StandardCharsets.UTF_8));
System.out.println("Hadoop Job IDs executed by " + name + ": " + jobIds);
System.out.println();
} catch (IOException e) {
System.out.println("WARN: Error while writing to external child ids file: " +
System.getProperty(EXTERNAL_CHILD_IDS));
e.printStackTrace(System.out);
}
} else {
System.out.println("No child hadoop job is executed.");
}
}
public static Set<ApplicationId> getChildYarnJobs(Configuration actionConf) {
return getChildYarnJobs(actionConf, ApplicationsRequestScope.OWN);
}
public static Set<ApplicationId> getChildYarnJobs(Configuration actionConf, ApplicationsRequestScope scope,
long startTime) {
final Set<ApplicationId> childYarnJobs = new HashSet<ApplicationId>();
for (final ApplicationReport applicationReport : getChildYarnApplications(actionConf, scope, startTime)) {
childYarnJobs.add(applicationReport.getApplicationId());
}
if (childYarnJobs.isEmpty()) {
System.out.println("No child applications found");
} else {
System.out.println("Found child YARN applications: " + StringUtils.join(childYarnJobs, ","));
}
return childYarnJobs;
}
public static Set<ApplicationId> getChildYarnJobs(Configuration actionConf, ApplicationsRequestScope scope) {
System.out.println("Fetching child yarn jobs");
long startTime = actionConf.getLong(OOZIE_JOB_LAUNCH_TIME, 0L);
if(startTime == 0) {
try {
startTime = Long.parseLong(System.getProperty(OOZIE_JOB_LAUNCH_TIME));
} catch (NumberFormatException nfe) {
throw new RuntimeException("Could not find Oozie job launch time", nfe);
}
}
return getChildYarnJobs(actionConf, scope, startTime);
}
public static List<ApplicationReport> getChildYarnApplications(final Configuration actionConf,
final ApplicationsRequestScope scope,
long startTime) {
final String tag = actionConf.get(CHILD_MAPREDUCE_JOB_TAGS);
if (tag == null) {
System.out.print("Could not find YARN tags property " + CHILD_MAPREDUCE_JOB_TAGS);
return Collections.emptyList();
}
System.out.println("tag id : " + tag);
final GetApplicationsRequest gar = GetApplicationsRequest.newInstance();
gar.setScope(scope);
gar.setApplicationTags(Collections.singleton(tag));
long endTime = System.currentTimeMillis();
if (startTime > endTime) {
System.out.println("WARNING: Clock skew between the Oozie server host and this host detected. Please fix this. " +
"Attempting to work around...");
// We don't know which one is wrong (relative to the RM), so to be safe, let's assume they're both wrong and add an
// offset in both directions
final long diff = 2 * (startTime - endTime);
startTime = startTime - diff;
endTime = endTime + diff;
}
gar.setStartRange(startTime, endTime);
try {
final ApplicationClientProtocol proxy = ClientRMProxy.createRMProxy(actionConf, ApplicationClientProtocol.class);
final GetApplicationsResponse apps = proxy.getApplications(gar);
return apps.getApplicationList();
} catch (final YarnException | IOException e) {
throw new RuntimeException("Exception occurred while finding child jobs", e);
}
}
public static void killChildYarnJobs(Configuration actionConf) {
try {
Set<ApplicationId> childYarnJobs = getChildYarnJobs(actionConf);
if (!childYarnJobs.isEmpty()) {
checkAndKillChildYarnJobs(YarnClient.createYarnClient(), actionConf, childYarnJobs);
}
} catch (IOException | YarnException ye) {
throw new RuntimeException("Exception occurred while killing child job(s)", ye);
}
}
@VisibleForTesting
protected static Collection<ApplicationId> checkAndKillChildYarnJobs(YarnClient yarnClient,
Configuration actionConf,
Collection<ApplicationId> childYarnJobs)
throws YarnException, IOException {
System.out.println();
System.out.println("Found [" + childYarnJobs.size() + "] YARN application(s) from this launcher");
System.out.println("Killing existing applications and starting over:");
yarnClient.init(actionConf);
yarnClient.start();
Collection<ApplicationId> killedapps = new ArrayList<>();
for (ApplicationId app : childYarnJobs) {
if (finalAppStatusUndefined(yarnClient.getApplicationReport(app))) {
System.out.print("Killing [" + app + "] ... ");
yarnClient.killApplication(app);
System.out.println("Done");
killedapps.add(app);
}
}
System.out.println();
return killedapps;
}
private static boolean finalAppStatusUndefined(ApplicationReport appReport) {
FinalApplicationStatus status = appReport.getFinalApplicationStatus();
return !FinalApplicationStatus.SUCCEEDED.equals(status) &&
!FinalApplicationStatus.FAILED.equals(status) &&
!FinalApplicationStatus.KILLED.equals(status);
}
protected abstract void run(String[] args) throws Exception;
/**
* Write to STDOUT (the task log) the Configuration/Properties values. All properties that contain
* any of the strings in the maskSet will be masked when writting it to STDOUT.
*
* @param header message for the beginning of the Configuration/Properties dump.
* @param conf Configuration/Properties object to dump to STDOUT
* @throws IOException thrown if an IO error ocurred.
*/
protected static void logMasking(String header, Iterable<Map.Entry<String,String>> conf)
throws IOException {
StringWriter writer = new StringWriter();
writer.write(header + "\n");
writer.write("--------------------\n");
PasswordMasker masker = new PasswordMasker();
for (Map.Entry<String, String> entry : conf) {
writer.write(" " + entry.getKey() + " : " + masker.mask(entry) + "\n");
}
writer.write("--------------------\n");
writer.close();
System.out.println(writer.toString());
System.out.flush();
}
/**
* Get file path from the given environment
* @param env environment
* @return path given environment returns file path
*/
protected static String getFilePathFromEnv(String env) {
String path = sysenv.getenv(env);
if (path != null && Shell.WINDOWS) {
// In Windows, file paths are enclosed in \" so remove them here
// to avoid path errors
if (path.charAt(0) == '"') {
path = path.substring(1);
}
if (path.charAt(path.length() - 1) == '"') {
path = path.substring(0, path.length() - 1);
}
}
return path;
}
/**
* Read action configuration passes through action xml file.
*
* @return action Configuration
* @throws IOException in the configuration could not be read
*/
public static Configuration loadActionConf() throws IOException {
// loading action conf prepared by Oozie
Configuration actionConf = new Configuration(false);
String actionXml = System.getProperty("oozie.action.conf.xml");
if (actionXml == null) {
throw new RuntimeException("Missing Java System Property [oozie.action.conf.xml]");
}
if (!new File(actionXml).exists()) {
throw new RuntimeException("Action Configuration XML file [" + actionXml + "] does not exist");
}
actionConf.addResource(new Path("file:///", actionXml));
return actionConf;
}
protected static void setYarnTag(Configuration actionConf) {
if(actionConf.get(CHILD_MAPREDUCE_JOB_TAGS) != null) {
// in case the user set their own tags, appending the launcher tag.
if(actionConf.get(MAPREDUCE_JOB_TAGS) != null) {
actionConf.set(MAPREDUCE_JOB_TAGS, actionConf.get(MAPREDUCE_JOB_TAGS) + ","
+ actionConf.get(CHILD_MAPREDUCE_JOB_TAGS));
} else {
actionConf.set(MAPREDUCE_JOB_TAGS, actionConf.get(CHILD_MAPREDUCE_JOB_TAGS));
}
}
}
protected static void setApplicationTags(Configuration configName, String tagConfigName) {
if (configName.get(MAPREDUCE_JOB_TAGS) != null) {
System.out.println("Setting [" + tagConfigName + "] tag: " + configName.get(MAPREDUCE_JOB_TAGS));
configName.set(tagConfigName, configName.get(MAPREDUCE_JOB_TAGS));
}
}
/**
* Utility method that copies the contents of the src file into all of the dst file(s).
* It only requires reading the src file once.
*
* @param src The source file
* @param dst The destination file(s)
* @throws IOException if the files could not be copied
*/
protected static void copyFileMultiplex(File src, File... dst) throws IOException {
InputStream is = null;
OutputStream[] osa = new OutputStream[dst.length];
try {
is = new FileInputStream(src);
for (int i = 0; i < osa.length; i++) {
osa[i] = new FileOutputStream(dst[i]);
}
byte[] buffer = new byte[4096];
int read;
while ((read = is.read(buffer)) > -1) {
for (OutputStream os : osa) {
os.write(buffer, 0, read);
}
}
} finally {
if (is != null) {
is.close();
}
for (OutputStream os : osa) {
if (os != null) {
os.close();
}
}
}
}
protected void writeHadoopConfig(String actionXml, File basrDir) throws IOException {
File actionXmlFile = new File(actionXml);
System.out.println("Copying " + actionXml + " to " + basrDir + "/" + Arrays.toString(HADOOP_SITE_FILES));
basrDir.mkdirs();
File[] dstFiles = new File[HADOOP_SITE_FILES.length];
for (int i = 0; i < dstFiles.length; i++) {
dstFiles[i] = new File(basrDir, HADOOP_SITE_FILES[i]);
}
copyFileMultiplex(actionXmlFile, dstFiles);
}
/**
* Print arguments to standard output stream. Mask out argument values to option with name 'password' in them.
* @param banner source banner
* @param args arguments to be printed
*/
void printArgs(String banner, String[] args) {
System.out.println(banner);
boolean maskNextArg = false;
for (String arg : args) {
if (maskNextArg) {
System.out.println(" " + "********");
maskNextArg = false;
}
else {
System.out.println(" " + arg);
if (arg.toLowerCase().contains("password")) {
maskNextArg = true;
}
}
}
}
/*
* Pushing all important conf to hadoop conf for the action. This is also useful in a situation when a MapReduce job is
* submitted from a Java action, because the MR job tags must be set. If it's not set, then it's not possible to kill the
* MR job because child jobs are looked up based on tags.
*/
public void propagateToHadoopConf() throws IOException {
Configuration propagationConf = new Configuration(false);
if (System.getProperty(LauncherAM.OOZIE_ACTION_ID) != null) {
propagationConf.set(LauncherAM.OOZIE_ACTION_ID, System.getProperty(LauncherAM.OOZIE_ACTION_ID));
}
if (System.getProperty(LauncherAM.OOZIE_JOB_ID) != null) {
propagationConf.set(LauncherAM.OOZIE_JOB_ID, System.getProperty(LauncherAM.OOZIE_JOB_ID));
}
if(System.getProperty(LauncherAM.OOZIE_LAUNCHER_JOB_ID) != null) {
propagationConf.set(LauncherAM.OOZIE_LAUNCHER_JOB_ID, System.getProperty(LauncherAM.OOZIE_LAUNCHER_JOB_ID));
}
// loading action conf prepared by Oozie
Configuration actionConf = LauncherMain.loadActionConf();
if (actionConf.get(CHILD_MAPREDUCE_JOB_TAGS) != null) {
propagationConf.set(MAPREDUCE_JOB_TAGS, actionConf.get(CHILD_MAPREDUCE_JOB_TAGS));
}
try (Writer writer = new OutputStreamWriter(new FileOutputStream(
new File(PROPAGATION_CONF_XML)), StandardCharsets.UTF_8)) {
propagationConf.writeXml(writer);
}
Configuration.dumpConfiguration(propagationConf, new OutputStreamWriter(System.out, StandardCharsets.UTF_8));
Configuration.addDefaultResource(PROPAGATION_CONF_XML);
}
protected static URL createFileWithContentIfNotExists(String filename, Configuration content) throws IOException {
File output = new File(filename);
try (OutputStream os = createStreamIfFileNotExists(output)) {
if(os != null) {
content.writeXml(os);
}
}
return output.toURI().toURL();
}
protected static URL createFileWithContentIfNotExists(String filename, Properties content) throws IOException {
File output = new File(filename);
try (OutputStream os = createStreamIfFileNotExists(output)) {
if(os != null) {
content.store(os, "");
}
}
return output.toURI().toURL();
}
static FileOutputStream createStreamIfFileNotExists(File output) throws IOException {
if (output.exists()) {
System.out.println(output + " exists, skipping. The action will use the "
+ output.getName() + " defined in the workflow.");
return null;
} else {
System.out.println("Creating " + output.getAbsolutePath());
return new FileOutputStream(output);
}
}
}
class LauncherMainException extends Exception {
private int errorCode;
public LauncherMainException(int code) {
errorCode = code;
}
int getErrorCode() {
return errorCode;
}
}