blob: 4c74b73ceaf3c6c813d56c2bed36d78a5aca0d9f [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.oozie.action.hadoop;
import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStream;
import java.io.InputStreamReader;
import java.math.BigInteger;
import java.nio.charset.StandardCharsets;
import java.security.MessageDigest;
import java.security.NoSuchAlgorithmException;
import java.security.PrivilegedExceptionAction;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.SequenceFile;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapred.Counters;
import org.apache.hadoop.mapred.RunningJob;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.oozie.client.OozieClient;
import org.apache.oozie.client.WorkflowAction;
import org.apache.oozie.service.HadoopAccessorException;
import org.apache.oozie.service.HadoopAccessorService;
import org.apache.oozie.service.Services;
import org.apache.oozie.service.URIHandlerService;
import org.apache.oozie.service.UserGroupInformationService;
import org.apache.oozie.util.IOUtils;
import org.apache.oozie.util.PropertiesUtils;
public class LauncherHelper {
public static final String OOZIE_ACTION_YARN_TAG = "oozie.action.yarn.tag";
public static String getRecoveryId(Configuration launcherConf, Path actionDir, String recoveryId)
throws HadoopAccessorException, IOException {
String jobId = null;
Path recoveryFile = new Path(actionDir, recoveryId);
FileSystem fs = Services.get().get(HadoopAccessorService.class)
.createFileSystem(launcherConf.get("user.name"),recoveryFile.toUri(), launcherConf);
if (fs.exists(recoveryFile)) {
InputStream is = fs.open(recoveryFile);
BufferedReader reader = new BufferedReader(new InputStreamReader(is, StandardCharsets.UTF_8));
jobId = reader.readLine();
reader.close();
}
return jobId;
}
public static void setupMainClass(Configuration launcherConf, String javaMainClass) {
// Only set the javaMainClass if its not null or empty string, this way the user can override the action's main class via
// <configuration> property
if (javaMainClass != null && !javaMainClass.equals("")) {
launcherConf.set(LauncherAMUtils.CONF_OOZIE_ACTION_MAIN_CLASS, javaMainClass);
}
}
public static void setupLauncherURIHandlerConf(Configuration launcherConf) {
for(Map.Entry<String, String> entry : Services.get().get(URIHandlerService.class).getLauncherConfig()) {
launcherConf.set(entry.getKey(), entry.getValue());
}
}
public static void setupMainArguments(Configuration launcherConf, String[] args) {
launcherConf.setInt(LauncherAMUtils.CONF_OOZIE_ACTION_MAIN_ARG_COUNT, args.length);
for (int i = 0; i < args.length; i++) {
launcherConf.set(LauncherAMUtils.CONF_OOZIE_ACTION_MAIN_ARG_PREFIX + i, args[i]);
}
}
public static void setupMaxOutputData(Configuration launcherConf, int maxOutputData) {
launcherConf.setInt(LauncherAMUtils.CONF_OOZIE_ACTION_MAX_OUTPUT_DATA, maxOutputData);
}
/**
* Set the maximum value of stats data
*
* @param launcherConf the oozie launcher configuration
* @param maxStatsData the maximum allowed size of stats data
*/
public static void setupMaxExternalStatsSize(Configuration launcherConf, int maxStatsData){
launcherConf.setInt(LauncherAMUtils.CONF_OOZIE_EXTERNAL_STATS_MAX_SIZE, maxStatsData);
}
/**
* Set the maximum number of globbed files/dirs
*
* @param launcherConf the oozie launcher configuration
* @param fsGlobMax the maximum number of files/dirs for FS operation
*/
public static void setupMaxFSGlob(Configuration launcherConf, int fsGlobMax){
launcherConf.setInt(LauncherAMUtils.CONF_OOZIE_ACTION_FS_GLOB_MAX, fsGlobMax);
}
public static void setupLauncherInfo(Configuration launcherConf, String jobId, String actionId, Path actionDir,
String recoveryId, Configuration actionConf, String prepareXML) throws IOException, HadoopAccessorException {
launcherConf.set(LauncherAMUtils.OOZIE_JOB_ID, jobId);
launcherConf.set(LauncherAMUtils.OOZIE_ACTION_ID, actionId);
launcherConf.set(LauncherAMUtils.OOZIE_ACTION_DIR_PATH, actionDir.toString());
launcherConf.set(LauncherAMUtils.OOZIE_ACTION_RECOVERY_ID, recoveryId);
launcherConf.set(LauncherAMUtils.ACTION_PREPARE_XML, prepareXML);
actionConf.set(LauncherAMUtils.OOZIE_JOB_ID, jobId);
actionConf.set(LauncherAMUtils.OOZIE_ACTION_ID, actionId);
if (Services.get().getConf().getBoolean("oozie.hadoop-2.0.2-alpha.workaround.for.distributed.cache", false)) {
List<String> purgedEntries = new ArrayList<>();
Collection<String> entries = actionConf.getStringCollection("mapreduce.job.cache.files");
for (String entry : entries) {
if (entry.contains("#")) {
purgedEntries.add(entry);
}
}
actionConf.setStrings("mapreduce.job.cache.files", purgedEntries.toArray(new String[purgedEntries.size()]));
launcherConf.setBoolean("oozie.hadoop-2.0.2-alpha.workaround.for.distributed.cache", true);
}
}
public static void setupYarnRestartHandling(Configuration launcherJobConf, Configuration actionConf, String launcherTag,
long launcherTime)
throws NoSuchAlgorithmException {
launcherJobConf.setLong(LauncherMain.OOZIE_JOB_LAUNCH_TIME, launcherTime);
// Tags are limited to 100 chars so we need to hash them to make sure (the actionId otherwise doesn't have a max length)
String tag = getTag(launcherTag);
// keeping the oozie.child.mapreduce.job.tags instead of mapreduce.job.tags to avoid killing launcher itself.
// mapreduce.job.tags should only go to child job launch by launcher.
actionConf.set(LauncherMain.CHILD_MAPREDUCE_JOB_TAGS, tag);
}
public static String getTag(String launcherTag) throws NoSuchAlgorithmException {
MessageDigest digest = MessageDigest.getInstance("MD5");
digest.update(launcherTag.getBytes(StandardCharsets.UTF_8), 0, launcherTag.length());
return "oozie-" + new BigInteger(1, digest.digest()).toString(16);
}
public static boolean isMainDone(RunningJob runningJob) throws IOException {
return runningJob.isComplete();
}
public static boolean isMainSuccessful(RunningJob runningJob) throws IOException {
boolean succeeded = runningJob.isSuccessful();
if (succeeded) {
Counters counters = runningJob.getCounters();
if (counters != null) {
Counters.Group group = counters.getGroup(LauncherAMUtils.COUNTER_GROUP);
if (group != null) {
succeeded = group.getCounter(LauncherAMUtils.COUNTER_LAUNCHER_ERROR) == 0;
}
}
}
return succeeded;
}
/**
* Determine whether action has output data or not
* @param actionData action configuration data
* @return true/false
*/
public static boolean hasOutputData(Map<String, String> actionData) {
return actionData.containsKey(LauncherAMUtils.ACTION_DATA_OUTPUT_PROPS);
}
/**
* Determine whether action has external stats or not
* @param actionData action configuration data
* @return true/false
*/
public static boolean hasStatsData(Map<String, String> actionData) {
return actionData.containsKey(LauncherAMUtils.ACTION_DATA_STATS);
}
/**
* Determine whether action has new id (id swap) or not
* @param actionData action configuration data
* @return true/false
*/
public static boolean hasIdSwap(Map<String, String> actionData) {
return actionData.containsKey(LauncherAMUtils.ACTION_DATA_NEW_ID);
}
/**
* Get the sequence file path storing all action data
* @param actionDir directory of action conf
* @return Path returns sequence file path storing all action data
*/
public static Path getActionDataSequenceFilePath(Path actionDir) {
return new Path(actionDir, LauncherAMUtils.ACTION_DATA_SEQUENCE_FILE);
}
/**
* Utility function to load the contents of action data sequence file into
* memory object
*
* @param fs Action Filesystem
* @param actionDir Path
* @param conf Configuration
* @return Map action data
* @throws IOException if an IO error occurred
* @throws InterruptedException if UGI action is interrupted
*/
public static Map<String, String> getActionData(final FileSystem fs, final Path actionDir, final Configuration conf)
throws IOException, InterruptedException {
UserGroupInformationService ugiService = Services.get().get(UserGroupInformationService.class);
UserGroupInformation ugi = ugiService.getProxyUser(conf.get(OozieClient.USER_NAME));
return ugi.doAs(new PrivilegedExceptionAction<Map<String, String>>() {
@Override
public Map<String, String> run() throws IOException {
Map<String, String> ret = new HashMap<>();
Path seqFilePath = getActionDataSequenceFilePath(actionDir);
if (fs.exists(seqFilePath)) {
SequenceFile.Reader seqFile = new SequenceFile.Reader(fs, seqFilePath, conf);
Text key = new Text(), value = new Text();
while (seqFile.next(key, value)) {
ret.put(key.toString(), value.toString());
}
seqFile.close();
}
else { // maintain backward-compatibility. to be deprecated
org.apache.hadoop.fs.FileStatus[] files = fs.listStatus(actionDir);
InputStream is;
BufferedReader reader;
Properties props;
if (files != null && files.length > 0) {
for (FileStatus fileStatus : files) {
Path path = fileStatus.getPath();
if (path.equals(new Path(actionDir, "externalChildIds.properties"))) {
is = fs.open(path);
reader = new BufferedReader(new InputStreamReader(is, StandardCharsets.UTF_8));
ret.put(LauncherAMUtils.ACTION_DATA_EXTERNAL_CHILD_IDS,
IOUtils.getReaderAsString(reader, -1));
} else if (path.equals(new Path(actionDir, "newId.properties"))) {
is = fs.open(path);
reader = new BufferedReader(new InputStreamReader(is, StandardCharsets.UTF_8));
props = PropertiesUtils.readProperties(reader, -1);
ret.put(LauncherAMUtils.ACTION_DATA_NEW_ID, props.getProperty("id"));
}
else if (path.equals(new Path(actionDir, LauncherAMUtils.ACTION_DATA_OUTPUT_PROPS))) {
int maxOutputData = conf.getInt(LauncherAMUtils.CONF_OOZIE_ACTION_MAX_OUTPUT_DATA,
2 * 1024);
is = fs.open(path);
reader = new BufferedReader(new InputStreamReader(is, StandardCharsets.UTF_8));
ret.put(LauncherAMUtils.ACTION_DATA_OUTPUT_PROPS, PropertiesUtils
.propertiesToString(PropertiesUtils.readProperties(reader, maxOutputData)));
}
else if (path.equals(new Path(actionDir, LauncherAMUtils.ACTION_DATA_STATS))) {
int statsMaxOutputData = conf.getInt(LauncherAMUtils.CONF_OOZIE_EXTERNAL_STATS_MAX_SIZE,
Integer.MAX_VALUE);
is = fs.open(path);
reader = new BufferedReader(new InputStreamReader(is, StandardCharsets.UTF_8));
ret.put(LauncherAMUtils.ACTION_DATA_STATS, PropertiesUtils
.propertiesToString(PropertiesUtils.readProperties(reader, statsMaxOutputData)));
}
else if (path.equals(new Path(actionDir, LauncherAMUtils.ACTION_DATA_ERROR_PROPS))) {
is = fs.open(path);
reader = new BufferedReader(new InputStreamReader(is, StandardCharsets.UTF_8));
ret.put(LauncherAMUtils.ACTION_DATA_ERROR_PROPS, IOUtils.getReaderAsString(reader, -1));
}
}
}
}
return ret;
}
});
}
public static String getActionYarnTag(Configuration conf, String parentId, WorkflowAction wfAction) {
String tag;
if ( conf != null && conf.get(OOZIE_ACTION_YARN_TAG) != null) {
tag = conf.get(OOZIE_ACTION_YARN_TAG) + "@" + wfAction.getName();
} else if (parentId != null) {
tag = parentId + "@" + wfAction.getName();
} else {
tag = wfAction.getId();
}
return tag;
}
}