| /** |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| |
| package org.apache.hadoop.mapreduce.v2.util; |
| |
| import java.io.File; |
| import java.io.IOException; |
| import java.net.MalformedURLException; |
| import java.net.URI; |
| import java.net.URISyntaxException; |
| import java.security.AccessController; |
| import java.security.PrivilegedActionException; |
| import java.security.PrivilegedExceptionAction; |
| import java.util.ArrayList; |
| import java.util.Arrays; |
| import java.util.HashMap; |
| import java.util.List; |
| import java.util.Map; |
| |
| import com.google.common.annotations.VisibleForTesting; |
| |
| import org.apache.commons.logging.Log; |
| import org.apache.commons.logging.LogFactory; |
| import org.apache.hadoop.classification.InterfaceAudience.Private; |
| import org.apache.hadoop.classification.InterfaceAudience.Public; |
| import org.apache.hadoop.classification.InterfaceStability.Unstable; |
| import org.apache.hadoop.conf.Configuration; |
| import org.apache.hadoop.fs.FileSystem; |
| import org.apache.hadoop.fs.Path; |
| import org.apache.hadoop.mapreduce.Job; |
| import org.apache.hadoop.mapred.JobConf; |
| import org.apache.hadoop.mapred.Task; |
| import org.apache.hadoop.mapred.TaskLog; |
| import org.apache.hadoop.mapreduce.JobID; |
| import org.apache.hadoop.mapreduce.MRConfig; |
| import org.apache.hadoop.mapreduce.MRJobConfig; |
| import org.apache.hadoop.mapreduce.TaskAttemptID; |
| import org.apache.hadoop.mapreduce.TaskID; |
| import org.apache.hadoop.mapreduce.TypeConverter; |
| import org.apache.hadoop.mapreduce.filecache.DistributedCache; |
| import org.apache.hadoop.mapreduce.v2.api.records.JobId; |
| import org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptId; |
| import org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptState; |
| import org.apache.hadoop.mapreduce.v2.api.records.TaskId; |
| import org.apache.hadoop.mapreduce.v2.api.records.TaskState; |
| import org.apache.hadoop.mapreduce.v2.api.records.TaskType; |
| import org.apache.hadoop.util.ApplicationClassLoader; |
| import org.apache.hadoop.util.StringUtils; |
| import org.apache.hadoop.yarn.ContainerLogAppender; |
| import org.apache.hadoop.yarn.ContainerRollingLogAppender; |
| import org.apache.hadoop.yarn.api.ApplicationConstants; |
| import org.apache.hadoop.yarn.api.ApplicationConstants.Environment; |
| import org.apache.hadoop.yarn.api.records.LocalResource; |
| import org.apache.hadoop.yarn.api.records.LocalResourceType; |
| import org.apache.hadoop.yarn.conf.YarnConfiguration; |
| import org.apache.hadoop.yarn.exceptions.YarnRuntimeException; |
| import org.apache.hadoop.yarn.util.Apps; |
| |
| /** |
| * Helper class for MR applications |
| */ |
| @Private |
| @Unstable |
| public class MRApps extends Apps { |
| public static final Log LOG = LogFactory.getLog(MRApps.class); |
| |
| public static String toString(JobId jid) { |
| return jid.toString(); |
| } |
| |
| public static JobId toJobID(String jid) { |
| return TypeConverter.toYarn(JobID.forName(jid)); |
| } |
| |
| public static String toString(TaskId tid) { |
| return tid.toString(); |
| } |
| |
| public static TaskId toTaskID(String tid) { |
| return TypeConverter.toYarn(TaskID.forName(tid)); |
| } |
| |
| public static String toString(TaskAttemptId taid) { |
| return taid.toString(); |
| } |
| |
| public static TaskAttemptId toTaskAttemptID(String taid) { |
| return TypeConverter.toYarn(TaskAttemptID.forName(taid)); |
| } |
| |
| public static String taskSymbol(TaskType type) { |
| switch (type) { |
| case MAP: return "m"; |
| case REDUCE: return "r"; |
| } |
| throw new YarnRuntimeException("Unknown task type: "+ type.toString()); |
| } |
| |
| public static enum TaskAttemptStateUI { |
| NEW( |
| new TaskAttemptState[] { TaskAttemptState.NEW, |
| TaskAttemptState.STARTING }), |
| RUNNING( |
| new TaskAttemptState[] { TaskAttemptState.RUNNING, |
| TaskAttemptState.COMMIT_PENDING }), |
| SUCCESSFUL(new TaskAttemptState[] { TaskAttemptState.SUCCEEDED}), |
| FAILED(new TaskAttemptState[] { TaskAttemptState.FAILED}), |
| KILLED(new TaskAttemptState[] { TaskAttemptState.KILLED}); |
| |
| private final List<TaskAttemptState> correspondingStates; |
| |
| private TaskAttemptStateUI(TaskAttemptState[] correspondingStates) { |
| this.correspondingStates = Arrays.asList(correspondingStates); |
| } |
| |
| public boolean correspondsTo(TaskAttemptState state) { |
| return this.correspondingStates.contains(state); |
| } |
| } |
| |
| public static enum TaskStateUI { |
| RUNNING( |
| new TaskState[]{TaskState.RUNNING}), |
| PENDING(new TaskState[]{TaskState.SCHEDULED}), |
| COMPLETED(new TaskState[]{TaskState.SUCCEEDED, TaskState.FAILED, TaskState.KILLED}); |
| |
| private final List<TaskState> correspondingStates; |
| |
| private TaskStateUI(TaskState[] correspondingStates) { |
| this.correspondingStates = Arrays.asList(correspondingStates); |
| } |
| |
| public boolean correspondsTo(TaskState state) { |
| return this.correspondingStates.contains(state); |
| } |
| } |
| |
| public static TaskType taskType(String symbol) { |
| // JDK 7 supports switch on strings |
| if (symbol.equals("m")) return TaskType.MAP; |
| if (symbol.equals("r")) return TaskType.REDUCE; |
| throw new YarnRuntimeException("Unknown task symbol: "+ symbol); |
| } |
| |
| public static TaskAttemptStateUI taskAttemptState(String attemptStateStr) { |
| return TaskAttemptStateUI.valueOf(attemptStateStr); |
| } |
| |
| public static TaskStateUI taskState(String taskStateStr) { |
| return TaskStateUI.valueOf(taskStateStr); |
| } |
| |
| // gets the base name of the MapReduce framework or null if no |
| // framework was configured |
| private static String getMRFrameworkName(Configuration conf) { |
| String frameworkName = null; |
| String framework = |
| conf.get(MRJobConfig.MAPREDUCE_APPLICATION_FRAMEWORK_PATH, ""); |
| if (!framework.isEmpty()) { |
| URI uri; |
| try { |
| uri = new URI(framework); |
| } catch (URISyntaxException e) { |
| throw new IllegalArgumentException("Unable to parse '" + framework |
| + "' as a URI, check the setting for " |
| + MRJobConfig.MAPREDUCE_APPLICATION_FRAMEWORK_PATH, e); |
| } |
| |
| frameworkName = uri.getFragment(); |
| if (frameworkName == null) { |
| frameworkName = new Path(uri).getName(); |
| } |
| } |
| return frameworkName; |
| } |
| |
| private static void setMRFrameworkClasspath( |
| Map<String, String> environment, Configuration conf) throws IOException { |
| // Propagate the system classpath when using the mini cluster |
| if (conf.getBoolean(YarnConfiguration.IS_MINI_YARN_CLUSTER, false)) { |
| MRApps.addToEnvironment(environment, Environment.CLASSPATH.name(), |
| System.getProperty("java.class.path"), conf); |
| } |
| boolean crossPlatform = |
| conf.getBoolean(MRConfig.MAPREDUCE_APP_SUBMISSION_CROSS_PLATFORM, |
| MRConfig.DEFAULT_MAPREDUCE_APP_SUBMISSION_CROSS_PLATFORM); |
| |
| // if the framework is specified then only use the MR classpath |
| String frameworkName = getMRFrameworkName(conf); |
| if (frameworkName == null) { |
| // Add standard Hadoop classes |
| for (String c : conf.getStrings(YarnConfiguration.YARN_APPLICATION_CLASSPATH, |
| crossPlatform |
| ? YarnConfiguration.DEFAULT_YARN_CROSS_PLATFORM_APPLICATION_CLASSPATH |
| : YarnConfiguration.DEFAULT_YARN_APPLICATION_CLASSPATH)) { |
| MRApps.addToEnvironment(environment, Environment.CLASSPATH.name(), |
| c.trim(), conf); |
| } |
| } |
| |
| boolean foundFrameworkInClasspath = (frameworkName == null); |
| for (String c : conf.getStrings(MRJobConfig.MAPREDUCE_APPLICATION_CLASSPATH, |
| crossPlatform ? |
| StringUtils.getStrings(MRJobConfig.DEFAULT_MAPREDUCE_CROSS_PLATFORM_APPLICATION_CLASSPATH) |
| : StringUtils.getStrings(MRJobConfig.DEFAULT_MAPREDUCE_APPLICATION_CLASSPATH))) { |
| MRApps.addToEnvironment(environment, Environment.CLASSPATH.name(), |
| c.trim(), conf); |
| if (!foundFrameworkInClasspath) { |
| foundFrameworkInClasspath = c.contains(frameworkName); |
| } |
| } |
| |
| if (!foundFrameworkInClasspath) { |
| throw new IllegalArgumentException( |
| "Could not locate MapReduce framework name '" + frameworkName |
| + "' in " + MRJobConfig.MAPREDUCE_APPLICATION_CLASSPATH); |
| } |
| // TODO: Remove duplicates. |
| } |
| |
| @SuppressWarnings("deprecation") |
| public static void setClasspath(Map<String, String> environment, |
| Configuration conf) throws IOException { |
| boolean userClassesTakesPrecedence = |
| conf.getBoolean(MRJobConfig.MAPREDUCE_JOB_USER_CLASSPATH_FIRST, false); |
| |
| String classpathEnvVar = |
| conf.getBoolean(MRJobConfig.MAPREDUCE_JOB_CLASSLOADER, false) |
| ? Environment.APP_CLASSPATH.name() : Environment.CLASSPATH.name(); |
| |
| String hadoopClasspathEnvVar = Environment.HADOOP_CLASSPATH.name(); |
| |
| MRApps.addToEnvironment(environment, |
| classpathEnvVar, crossPlatformifyMREnv(conf, Environment.PWD), conf); |
| |
| MRApps.addToEnvironment(environment, |
| hadoopClasspathEnvVar, crossPlatformifyMREnv(conf, Environment.PWD), |
| conf); |
| |
| if (!userClassesTakesPrecedence) { |
| MRApps.setMRFrameworkClasspath(environment, conf); |
| } |
| |
| addClasspathToEnv(environment, classpathEnvVar, conf); |
| addClasspathToEnv(environment, hadoopClasspathEnvVar, conf); |
| |
| // MAPREDUCE-6619, retain $HADOOP_CLASSPATH |
| MRApps.addToEnvironment(environment, hadoopClasspathEnvVar, |
| System.getenv(hadoopClasspathEnvVar), conf); |
| |
| if (userClassesTakesPrecedence) { |
| MRApps.setMRFrameworkClasspath(environment, conf); |
| } |
| } |
| |
| @SuppressWarnings("deprecation") |
| public static void addClasspathToEnv(Map<String, String> environment, |
| String classpathEnvVar, Configuration conf) throws IOException { |
| |
| /* |
| * We use "*" for the name of the JOB_JAR instead of MRJobConfig.JOB_JAR for |
| * the case where the job jar is not necessarily named "job.jar". This can |
| * happen, for example, when the job is leveraging a resource from the YARN |
| * shared cache. |
| */ |
| MRApps.addToEnvironment( |
| environment, |
| classpathEnvVar, |
| MRJobConfig.JOB_JAR + Path.SEPARATOR + "*", conf); |
| MRApps.addToEnvironment( |
| environment, |
| classpathEnvVar, |
| MRJobConfig.JOB_JAR + Path.SEPARATOR + "classes" + Path.SEPARATOR, |
| conf); |
| |
| MRApps.addToEnvironment( |
| environment, |
| classpathEnvVar, |
| MRJobConfig.JOB_JAR + Path.SEPARATOR + "lib" + Path.SEPARATOR + "*", |
| conf); |
| |
| MRApps.addToEnvironment( |
| environment, |
| classpathEnvVar, |
| crossPlatformifyMREnv(conf, Environment.PWD) + Path.SEPARATOR + "*", |
| conf); |
| |
| // a * in the classpath will only find a .jar, so we need to filter out |
| // all .jars and add everything else |
| addToClasspathIfNotJar(DistributedCache.getFileClassPaths(conf), |
| DistributedCache.getCacheFiles(conf), |
| conf, |
| environment, classpathEnvVar); |
| addToClasspathIfNotJar(DistributedCache.getArchiveClassPaths(conf), |
| DistributedCache.getCacheArchives(conf), |
| conf, |
| environment, classpathEnvVar); |
| } |
| |
| /** |
| * Add the paths to the classpath if they are not jars |
| * @param paths the paths to add to the classpath |
| * @param withLinks the corresponding paths that may have a link name in them |
| * @param conf used to resolve the paths |
| * @param environment the environment to update CLASSPATH in |
| * @throws IOException if there is an error resolving any of the paths. |
| */ |
| private static void addToClasspathIfNotJar(Path[] paths, |
| URI[] withLinks, Configuration conf, |
| Map<String, String> environment, |
| String classpathEnvVar) throws IOException { |
| if (paths != null) { |
| HashMap<Path, String> linkLookup = new HashMap<Path, String>(); |
| if (withLinks != null) { |
| for (URI u: withLinks) { |
| Path p = new Path(u); |
| FileSystem remoteFS = p.getFileSystem(conf); |
| String name = p.getName(); |
| String wildcard = null; |
| |
| // If the path is wildcarded, resolve its parent directory instead |
| if (name.equals(DistributedCache.WILDCARD)) { |
| wildcard = name; |
| p = p.getParent(); |
| } |
| |
| p = remoteFS.resolvePath(p.makeQualified(remoteFS.getUri(), |
| remoteFS.getWorkingDirectory())); |
| |
| if ((wildcard != null) && (u.getFragment() != null)) { |
| throw new IOException("Invalid path URI: " + p + " - cannot " |
| + "contain both a URI fragment and a wildcard"); |
| } else if (wildcard != null) { |
| name = p.getName() + Path.SEPARATOR + wildcard; |
| } else if (u.getFragment() != null) { |
| name = u.getFragment(); |
| } |
| |
| // If it's not a JAR, add it to the link lookup. |
| if (!StringUtils.toLowerCase(name).endsWith(".jar")) { |
| String old = linkLookup.put(p, name); |
| |
| if ((old != null) && !name.equals(old)) { |
| LOG.warn("The same path is included more than once " |
| + "with different links or wildcards: " + p + " [" + |
| name + ", " + old + "]"); |
| } |
| } |
| } |
| } |
| |
| for (Path p : paths) { |
| FileSystem remoteFS = p.getFileSystem(conf); |
| p = remoteFS.resolvePath(p.makeQualified(remoteFS.getUri(), |
| remoteFS.getWorkingDirectory())); |
| String name = linkLookup.get(p); |
| if (name == null) { |
| name = p.getName(); |
| } |
| if(!StringUtils.toLowerCase(name).endsWith(".jar")) { |
| MRApps.addToEnvironment( |
| environment, |
| classpathEnvVar, |
| crossPlatformifyMREnv(conf, Environment.PWD) + Path.SEPARATOR + name, conf); |
| } |
| } |
| } |
| } |
| |
| /** |
| * Creates and sets a {@link ApplicationClassLoader} on the given |
| * configuration and as the thread context classloader, if |
| * {@link MRJobConfig#MAPREDUCE_JOB_CLASSLOADER} is set to true, and |
| * the APP_CLASSPATH environment variable is set. |
| * @param conf |
| * @throws IOException |
| */ |
| public static void setJobClassLoader(Configuration conf) |
| throws IOException { |
| setClassLoader(createJobClassLoader(conf), conf); |
| } |
| |
| /** |
| * Creates a {@link ApplicationClassLoader} if |
| * {@link MRJobConfig#MAPREDUCE_JOB_CLASSLOADER} is set to true, and |
| * the APP_CLASSPATH environment variable is set. |
| * @param conf |
| * @return the created job classloader, or null if the job classloader is not |
| * enabled or the APP_CLASSPATH environment variable is not set |
| * @throws IOException |
| */ |
| public static ClassLoader createJobClassLoader(Configuration conf) |
| throws IOException { |
| ClassLoader jobClassLoader = null; |
| if (conf.getBoolean(MRJobConfig.MAPREDUCE_JOB_CLASSLOADER, false)) { |
| String appClasspath = System.getenv(Environment.APP_CLASSPATH.key()); |
| if (appClasspath == null) { |
| LOG.warn("Not creating job classloader since APP_CLASSPATH is not set."); |
| } else { |
| LOG.info("Creating job classloader"); |
| if (LOG.isDebugEnabled()) { |
| LOG.debug("APP_CLASSPATH=" + appClasspath); |
| } |
| String[] systemClasses = getSystemClasses(conf); |
| jobClassLoader = createJobClassLoader(appClasspath, |
| systemClasses); |
| } |
| } |
| return jobClassLoader; |
| } |
| |
| /** |
| * Sets the provided classloader on the given configuration and as the thread |
| * context classloader if the classloader is not null. |
| * @param classLoader |
| * @param conf |
| */ |
| public static void setClassLoader(ClassLoader classLoader, |
| Configuration conf) { |
| if (classLoader != null) { |
| LOG.info("Setting classloader " + classLoader + |
| " on the configuration and as the thread context classloader"); |
| conf.setClassLoader(classLoader); |
| Thread.currentThread().setContextClassLoader(classLoader); |
| } |
| } |
| |
| @VisibleForTesting |
| static String[] getSystemClasses(Configuration conf) { |
| return conf.getTrimmedStrings( |
| MRJobConfig.MAPREDUCE_JOB_CLASSLOADER_SYSTEM_CLASSES); |
| } |
| |
| private static ClassLoader createJobClassLoader(final String appClasspath, |
| final String[] systemClasses) throws IOException { |
| try { |
| return AccessController.doPrivileged( |
| new PrivilegedExceptionAction<ClassLoader>() { |
| @Override |
| public ClassLoader run() throws MalformedURLException { |
| return new ApplicationClassLoader(appClasspath, |
| MRApps.class.getClassLoader(), Arrays.asList(systemClasses)); |
| } |
| }); |
| } catch (PrivilegedActionException e) { |
| Throwable t = e.getCause(); |
| if (t instanceof MalformedURLException) { |
| throw (MalformedURLException) t; |
| } |
| throw new IOException(e); |
| } |
| } |
| |
| private static final String STAGING_CONSTANT = ".staging"; |
| public static Path getStagingAreaDir(Configuration conf, String user) { |
| return new Path(conf.get(MRJobConfig.MR_AM_STAGING_DIR, |
| MRJobConfig.DEFAULT_MR_AM_STAGING_DIR) |
| + Path.SEPARATOR + user + Path.SEPARATOR + STAGING_CONSTANT); |
| } |
| |
| public static String getJobFile(Configuration conf, String user, |
| org.apache.hadoop.mapreduce.JobID jobId) { |
| Path jobFile = new Path(MRApps.getStagingAreaDir(conf, user), |
| jobId.toString() + Path.SEPARATOR + MRJobConfig.JOB_CONF_FILE); |
| return jobFile.toString(); |
| } |
| |
| public static Path getEndJobCommitSuccessFile(Configuration conf, String user, |
| JobId jobId) { |
| Path endCommitFile = new Path(MRApps.getStagingAreaDir(conf, user), |
| jobId.toString() + Path.SEPARATOR + "COMMIT_SUCCESS"); |
| return endCommitFile; |
| } |
| |
| public static Path getEndJobCommitFailureFile(Configuration conf, String user, |
| JobId jobId) { |
| Path endCommitFile = new Path(MRApps.getStagingAreaDir(conf, user), |
| jobId.toString() + Path.SEPARATOR + "COMMIT_FAIL"); |
| return endCommitFile; |
| } |
| |
| public static Path getStartJobCommitFile(Configuration conf, String user, |
| JobId jobId) { |
| Path startCommitFile = new Path(MRApps.getStagingAreaDir(conf, user), |
| jobId.toString() + Path.SEPARATOR + "COMMIT_STARTED"); |
| return startCommitFile; |
| } |
| |
| @SuppressWarnings("deprecation") |
| public static void setupDistributedCache( |
| Configuration conf, |
| Map<String, LocalResource> localResources) |
| throws IOException { |
| |
| LocalResourceBuilder lrb = new LocalResourceBuilder(); |
| lrb.setConf(conf); |
| |
| // Cache archives |
| lrb.setType(LocalResourceType.ARCHIVE); |
| lrb.setUris(DistributedCache.getCacheArchives(conf)); |
| lrb.setTimestamps(DistributedCache.getArchiveTimestamps(conf)); |
| lrb.setSizes(getFileSizes(conf, MRJobConfig.CACHE_ARCHIVES_SIZES)); |
| lrb.setVisibilities(DistributedCache.getArchiveVisibilities(conf)); |
| lrb.setSharedCacheUploadPolicies( |
| Job.getArchiveSharedCacheUploadPolicies(conf)); |
| lrb.createLocalResources(localResources); |
| |
| // Cache files |
| lrb.setType(LocalResourceType.FILE); |
| lrb.setUris(DistributedCache.getCacheFiles(conf)); |
| lrb.setTimestamps(DistributedCache.getFileTimestamps(conf)); |
| lrb.setSizes(getFileSizes(conf, MRJobConfig.CACHE_FILES_SIZES)); |
| lrb.setVisibilities(DistributedCache.getFileVisibilities(conf)); |
| lrb.setSharedCacheUploadPolicies( |
| Job.getFileSharedCacheUploadPolicies(conf)); |
| lrb.createLocalResources(localResources); |
| } |
| |
| /** |
| * Set up the DistributedCache related configs to make |
| * {@link DistributedCache#getLocalCacheFiles(Configuration)} |
| * and |
| * {@link DistributedCache#getLocalCacheArchives(Configuration)} |
| * working. |
| * @param conf |
| * @throws java.io.IOException |
| */ |
| @SuppressWarnings("deprecation") |
| public static void setupDistributedCacheLocal(Configuration conf) |
| throws IOException { |
| |
| String localWorkDir = System.getenv("PWD"); |
| // ^ ^ all symlinks are created in the current work-dir |
| |
| // Update the configuration object with localized archives. |
| URI[] cacheArchives = DistributedCache.getCacheArchives(conf); |
| if (cacheArchives != null) { |
| List<String> localArchives = new ArrayList<String>(); |
| for (int i = 0; i < cacheArchives.length; ++i) { |
| URI u = cacheArchives[i]; |
| Path p = new Path(u); |
| Path name = |
| new Path((null == u.getFragment()) ? p.getName() |
| : u.getFragment()); |
| String linkName = name.toUri().getPath(); |
| localArchives.add(new Path(localWorkDir, linkName).toUri().getPath()); |
| } |
| if (!localArchives.isEmpty()) { |
| conf.set(MRJobConfig.CACHE_LOCALARCHIVES, StringUtils |
| .arrayToString(localArchives.toArray(new String[localArchives |
| .size()]))); |
| } |
| } |
| |
| // Update the configuration object with localized files. |
| URI[] cacheFiles = DistributedCache.getCacheFiles(conf); |
| if (cacheFiles != null) { |
| List<String> localFiles = new ArrayList<String>(); |
| for (int i = 0; i < cacheFiles.length; ++i) { |
| URI u = cacheFiles[i]; |
| Path p = new Path(u); |
| Path name = |
| new Path((null == u.getFragment()) ? p.getName() |
| : u.getFragment()); |
| String linkName = name.toUri().getPath(); |
| localFiles.add(new Path(localWorkDir, linkName).toUri().getPath()); |
| } |
| if (!localFiles.isEmpty()) { |
| conf.set(MRJobConfig.CACHE_LOCALFILES, |
| StringUtils.arrayToString(localFiles |
| .toArray(new String[localFiles.size()]))); |
| } |
| } |
| } |
| |
| // TODO - Move this to MR! |
| private static long[] getFileSizes(Configuration conf, String key) { |
| String[] strs = conf.getStrings(key); |
| if (strs == null) { |
| return null; |
| } |
| long[] result = new long[strs.length]; |
| for(int i=0; i < strs.length; ++i) { |
| result[i] = Long.parseLong(strs[i]); |
| } |
| return result; |
| } |
| |
| public static String getChildLogLevel(Configuration conf, boolean isMap) { |
| if (isMap) { |
| return conf.get( |
| MRJobConfig.MAP_LOG_LEVEL, |
| JobConf.DEFAULT_LOG_LEVEL.toString() |
| ); |
| } else { |
| return conf.get( |
| MRJobConfig.REDUCE_LOG_LEVEL, |
| JobConf.DEFAULT_LOG_LEVEL.toString() |
| ); |
| } |
| } |
| |
| /** |
| * Add the JVM system properties necessary to configure |
| * {@link ContainerLogAppender} or |
| * {@link ContainerRollingLogAppender}. |
| * |
| * @param task for map/reduce, or null for app master |
| * @param vargs the argument list to append to |
| * @param conf configuration of MR job |
| */ |
| public static void addLog4jSystemProperties(Task task, |
| List<String> vargs, Configuration conf) { |
| String log4jPropertyFile = |
| conf.get(MRJobConfig.MAPREDUCE_JOB_LOG4J_PROPERTIES_FILE, ""); |
| if (log4jPropertyFile.isEmpty()) { |
| vargs.add("-Dlog4j.configuration=container-log4j.properties"); |
| } else { |
| URI log4jURI = null; |
| try { |
| log4jURI = new URI(log4jPropertyFile); |
| } catch (URISyntaxException e) { |
| throw new IllegalArgumentException(e); |
| } |
| Path log4jPath = new Path(log4jURI); |
| vargs.add("-Dlog4j.configuration="+log4jPath.getName()); |
| } |
| |
| long logSize; |
| String logLevel; |
| int numBackups; |
| |
| if (task == null) { |
| logSize = conf.getLong(MRJobConfig.MR_AM_LOG_KB, |
| MRJobConfig.DEFAULT_MR_AM_LOG_KB) << 10; |
| logLevel = conf.get( |
| MRJobConfig.MR_AM_LOG_LEVEL, MRJobConfig.DEFAULT_MR_AM_LOG_LEVEL); |
| numBackups = conf.getInt(MRJobConfig.MR_AM_LOG_BACKUPS, |
| MRJobConfig.DEFAULT_MR_AM_LOG_BACKUPS); |
| } else { |
| logSize = TaskLog.getTaskLogLimitBytes(conf); |
| logLevel = getChildLogLevel(conf, task.isMapTask()); |
| numBackups = conf.getInt(MRJobConfig.TASK_LOG_BACKUPS, |
| MRJobConfig.DEFAULT_TASK_LOG_BACKUPS); |
| } |
| |
| vargs.add("-D" + YarnConfiguration.YARN_APP_CONTAINER_LOG_DIR + "=" + |
| ApplicationConstants.LOG_DIR_EXPANSION_VAR); |
| vargs.add( |
| "-D" + YarnConfiguration.YARN_APP_CONTAINER_LOG_SIZE + "=" + logSize); |
| |
| if (logSize > 0L && numBackups > 0) { |
| // log should be rolled |
| vargs.add("-D" + YarnConfiguration.YARN_APP_CONTAINER_LOG_BACKUPS + "=" |
| + numBackups); |
| vargs.add("-Dhadoop.root.logger=" + logLevel + ",CRLA"); |
| } else { |
| vargs.add("-Dhadoop.root.logger=" + logLevel + ",CLA"); |
| } |
| vargs.add("-Dhadoop.root.logfile=" + TaskLog.LogName.SYSLOG); |
| |
| if ( task != null |
| && !task.isMapTask() |
| && conf.getBoolean(MRJobConfig.REDUCE_SEPARATE_SHUFFLE_LOG, |
| MRJobConfig.DEFAULT_REDUCE_SEPARATE_SHUFFLE_LOG)) { |
| final int numShuffleBackups = conf.getInt(MRJobConfig.SHUFFLE_LOG_BACKUPS, |
| MRJobConfig.DEFAULT_SHUFFLE_LOG_BACKUPS); |
| final long shuffleLogSize = conf.getLong(MRJobConfig.SHUFFLE_LOG_KB, |
| MRJobConfig.DEFAULT_SHUFFLE_LOG_KB) << 10; |
| final String shuffleLogger = logLevel |
| + (shuffleLogSize > 0L && numShuffleBackups > 0 |
| ? ",shuffleCRLA" |
| : ",shuffleCLA"); |
| |
| vargs.add("-D" + MRJobConfig.MR_PREFIX |
| + "shuffle.logger=" + shuffleLogger); |
| vargs.add("-D" + MRJobConfig.MR_PREFIX |
| + "shuffle.logfile=" + TaskLog.LogName.SYSLOG + ".shuffle"); |
| vargs.add("-D" + MRJobConfig.MR_PREFIX |
| + "shuffle.log.filesize=" + shuffleLogSize); |
| vargs.add("-D" + MRJobConfig.MR_PREFIX |
| + "shuffle.log.backups=" + numShuffleBackups); |
| } |
| } |
| |
| public static void setEnvFromInputString(Map<String, String> env, |
| String envString, Configuration conf) { |
| String classPathSeparator = |
| conf.getBoolean(MRConfig.MAPREDUCE_APP_SUBMISSION_CROSS_PLATFORM, |
| MRConfig.DEFAULT_MAPREDUCE_APP_SUBMISSION_CROSS_PLATFORM) |
| ? ApplicationConstants.CLASS_PATH_SEPARATOR : File.pathSeparator; |
| Apps.setEnvFromInputString(env, envString, classPathSeparator); |
| } |
| |
| @Public |
| @Unstable |
| public static void addToEnvironment(Map<String, String> environment, |
| String variable, String value, Configuration conf) { |
| String classPathSeparator = |
| conf.getBoolean(MRConfig.MAPREDUCE_APP_SUBMISSION_CROSS_PLATFORM, |
| MRConfig.DEFAULT_MAPREDUCE_APP_SUBMISSION_CROSS_PLATFORM) |
| ? ApplicationConstants.CLASS_PATH_SEPARATOR : File.pathSeparator; |
| Apps.addToEnvironment(environment, variable, value, classPathSeparator); |
| } |
| |
| public static String crossPlatformifyMREnv(Configuration conf, Environment env) { |
| boolean crossPlatform = |
| conf.getBoolean(MRConfig.MAPREDUCE_APP_SUBMISSION_CROSS_PLATFORM, |
| MRConfig.DEFAULT_MAPREDUCE_APP_SUBMISSION_CROSS_PLATFORM); |
| return crossPlatform ? env.$$() : env.$(); |
| } |
| |
| /** |
| * Return lines for system property keys and values per configuration. |
| * |
| * @return the formatted string for the system property lines or null if no |
| * properties are specified. |
| */ |
| public static String getSystemPropertiesToLog(Configuration conf) { |
| String key = conf.get(MRJobConfig.MAPREDUCE_JVM_SYSTEM_PROPERTIES_TO_LOG, |
| MRJobConfig.DEFAULT_MAPREDUCE_JVM_SYSTEM_PROPERTIES_TO_LOG); |
| if (key != null) { |
| key = key.trim(); // trim leading and trailing whitespace from the config |
| if (!key.isEmpty()) { |
| String[] props = key.split(","); |
| if (props.length > 0) { |
| StringBuilder sb = new StringBuilder(); |
| sb.append("\n/************************************************************\n"); |
| sb.append("[system properties]\n"); |
| for (String prop: props) { |
| prop = prop.trim(); // trim leading and trailing whitespace |
| if (!prop.isEmpty()) { |
| sb.append(prop).append(": ").append(System.getProperty(prop)).append('\n'); |
| } |
| } |
| sb.append("************************************************************/"); |
| return sb.toString(); |
| } |
| } |
| } |
| return null; |
| } |
| } |