| /** |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| |
| package org.apache.hadoop.mapred; |
| |
| import static java.util.concurrent.TimeUnit.MILLISECONDS; |
| |
| import java.io.ByteArrayOutputStream; |
| import java.io.IOException; |
| import java.io.OutputStream; |
| import java.io.PrintStream; |
| import java.net.InetSocketAddress; |
| import java.net.URI; |
| import java.security.PrivilegedExceptionAction; |
| import java.util.ArrayList; |
| import java.util.List; |
| |
| import org.apache.commons.logging.Log; |
| import org.apache.commons.logging.LogFactory; |
| import org.apache.hadoop.conf.Configuration; |
| import org.apache.hadoop.fs.FSError; |
| import org.apache.hadoop.fs.FileSystem; |
| import org.apache.hadoop.fs.LocalDirAllocator; |
| import org.apache.hadoop.fs.Path; |
| import org.apache.hadoop.fs.permission.FsPermission; |
| import org.apache.hadoop.io.IOUtils; |
| import org.apache.hadoop.ipc.RPC; |
| import org.apache.hadoop.mapreduce.MRConfig; |
| import org.apache.hadoop.mapreduce.MRJobConfig; |
| import org.apache.hadoop.mapreduce.TaskType; |
| import org.apache.hadoop.mapreduce.filecache.DistributedCache; |
| import org.apache.hadoop.mapreduce.security.TokenCache; |
| import org.apache.hadoop.mapreduce.security.token.JobTokenIdentifier; |
| import org.apache.hadoop.mapreduce.security.token.JobTokenSecretManager; |
| import org.apache.hadoop.mapreduce.v2.util.MRApps; |
| import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem; |
| import org.apache.hadoop.metrics2.source.JvmMetrics; |
| import org.apache.hadoop.net.NetUtils; |
| import org.apache.hadoop.security.Credentials; |
| import org.apache.hadoop.security.SecurityUtil; |
| import org.apache.hadoop.security.UserGroupInformation; |
| import org.apache.hadoop.security.token.Token; |
| import org.apache.hadoop.util.DiskChecker.DiskErrorException; |
| import org.apache.hadoop.util.StringUtils; |
| import org.apache.hadoop.yarn.YarnUncaughtExceptionHandler; |
| import org.apache.hadoop.yarn.api.ApplicationConstants; |
| import org.apache.log4j.LogManager; |
| |
| /** |
| * The main() for MapReduce task processes. |
| */ |
| class YarnChild { |
| |
| private static final Log LOG = LogFactory.getLog(YarnChild.class); |
| |
| static volatile TaskAttemptID taskid = null; |
| |
| public static void main(String[] args) throws Throwable { |
| Thread.setDefaultUncaughtExceptionHandler(new YarnUncaughtExceptionHandler()); |
| LOG.debug("Child starting"); |
| |
| final JobConf defaultConf = new JobConf(); |
| defaultConf.addResource(MRJobConfig.JOB_CONF_FILE); |
| UserGroupInformation.setConfiguration(defaultConf); |
| |
| String host = args[0]; |
| int port = Integer.parseInt(args[1]); |
| final InetSocketAddress address = |
| NetUtils.createSocketAddrForHost(host, port); |
| final TaskAttemptID firstTaskid = TaskAttemptID.forName(args[2]); |
| int jvmIdInt = Integer.parseInt(args[3]); |
| JVMId jvmId = new JVMId(firstTaskid.getJobID(), |
| firstTaskid.getTaskType() == TaskType.MAP, jvmIdInt); |
| |
| // initialize metrics |
| DefaultMetricsSystem.initialize( |
| StringUtils.camelize(firstTaskid.getTaskType().name()) +"Task"); |
| |
| // Security framework already loaded the tokens into current ugi |
| Credentials credentials = |
| UserGroupInformation.getCurrentUser().getCredentials(); |
| LOG.info("Executing with tokens:"); |
| for (Token<?> token: credentials.getAllTokens()) { |
| LOG.info(token); |
| } |
| |
| // Create TaskUmbilicalProtocol as actual task owner. |
| UserGroupInformation taskOwner = |
| UserGroupInformation.createRemoteUser(firstTaskid.getJobID().toString()); |
| Token<JobTokenIdentifier> jt = TokenCache.getJobToken(credentials); |
| SecurityUtil.setTokenService(jt, address); |
| taskOwner.addToken(jt); |
| final TaskUmbilicalProtocol umbilical = |
| taskOwner.doAs(new PrivilegedExceptionAction<TaskUmbilicalProtocol>() { |
| @Override |
| public TaskUmbilicalProtocol run() throws Exception { |
| return (TaskUmbilicalProtocol)RPC.getProxy(TaskUmbilicalProtocol.class, |
| TaskUmbilicalProtocol.versionID, address, defaultConf); |
| } |
| }); |
| |
| // report non-pid to application master |
| JvmContext context = new JvmContext(jvmId, "-1000"); |
| LOG.debug("PID: " + System.getenv().get("JVM_PID")); |
| Task task = null; |
| UserGroupInformation childUGI = null; |
| |
| try { |
| int idleLoopCount = 0; |
| JvmTask myTask = null;; |
| // poll for new task |
| for (int idle = 0; null == myTask; ++idle) { |
| long sleepTimeMilliSecs = Math.min(idle * 500, 1500); |
| LOG.info("Sleeping for " + sleepTimeMilliSecs |
| + "ms before retrying again. Got null now."); |
| MILLISECONDS.sleep(sleepTimeMilliSecs); |
| myTask = umbilical.getTask(context); |
| } |
| if (myTask.shouldDie()) { |
| return; |
| } |
| |
| task = myTask.getTask(); |
| YarnChild.taskid = task.getTaskID(); |
| |
| // Create the job-conf and set credentials |
| final JobConf job = configureTask(task, credentials, jt); |
| |
| // Initiate Java VM metrics |
| JvmMetrics.initSingleton(jvmId.toString(), job.getSessionId()); |
| childUGI = UserGroupInformation.createRemoteUser(System |
| .getenv(ApplicationConstants.Environment.USER.toString())); |
| // Add tokens to new user so that it may execute its task correctly. |
| childUGI.addCredentials(credentials); |
| |
| // Create a final reference to the task for the doAs block |
| final Task taskFinal = task; |
| childUGI.doAs(new PrivilegedExceptionAction<Object>() { |
| @Override |
| public Object run() throws Exception { |
| // use job-specified working directory |
| FileSystem.get(job).setWorkingDirectory(job.getWorkingDirectory()); |
| taskFinal.run(job, umbilical); // run the task |
| return null; |
| } |
| }); |
| } catch (FSError e) { |
| LOG.fatal("FSError from child", e); |
| umbilical.fsError(taskid, e.getMessage()); |
| } catch (Exception exception) { |
| LOG.warn("Exception running child : " |
| + StringUtils.stringifyException(exception)); |
| try { |
| if (task != null) { |
| // do cleanup for the task |
| if (childUGI == null) { // no need to job into doAs block |
| task.taskCleanup(umbilical); |
| } else { |
| final Task taskFinal = task; |
| childUGI.doAs(new PrivilegedExceptionAction<Object>() { |
| @Override |
| public Object run() throws Exception { |
| taskFinal.taskCleanup(umbilical); |
| return null; |
| } |
| }); |
| } |
| } |
| } catch (Exception e) { |
| LOG.info("Exception cleaning up: " + StringUtils.stringifyException(e)); |
| } |
| // Report back any failures, for diagnostic purposes |
| if (taskid != null) { |
| umbilical.fatalError(taskid, StringUtils.stringifyException(exception)); |
| } |
| } catch (Throwable throwable) { |
| LOG.fatal("Error running child : " |
| + StringUtils.stringifyException(throwable)); |
| if (taskid != null) { |
| Throwable tCause = throwable.getCause(); |
| String cause = tCause == null |
| ? throwable.getMessage() |
| : StringUtils.stringifyException(tCause); |
| umbilical.fatalError(taskid, cause); |
| } |
| } finally { |
| RPC.stopProxy(umbilical); |
| DefaultMetricsSystem.shutdown(); |
| // Shutting down log4j of the child-vm... |
| // This assumes that on return from Task.run() |
| // there is no more logging done. |
| LogManager.shutdown(); |
| } |
| } |
| |
| /** |
| * Configure mapred-local dirs. This config is used by the task for finding |
| * out an output directory. |
| * @throws IOException |
| */ |
| private static void configureLocalDirs(Task task, JobConf job) throws IOException { |
| String[] localSysDirs = StringUtils.getTrimmedStrings( |
| System.getenv(ApplicationConstants.LOCAL_DIR_ENV)); |
| job.setStrings(MRConfig.LOCAL_DIR, localSysDirs); |
| LOG.info(MRConfig.LOCAL_DIR + " for child: " + job.get(MRConfig.LOCAL_DIR)); |
| LocalDirAllocator lDirAlloc = new LocalDirAllocator(MRConfig.LOCAL_DIR); |
| Path workDir = null; |
| // First, try to find the JOB_LOCAL_DIR on this host. |
| try { |
| workDir = lDirAlloc.getLocalPathToRead("work", job); |
| } catch (DiskErrorException e) { |
| // DiskErrorException means dir not found. If not found, it will |
| // be created below. |
| } |
| if (workDir == null) { |
| // JOB_LOCAL_DIR doesn't exist on this host -- Create it. |
| workDir = lDirAlloc.getLocalPathForWrite("work", job); |
| FileSystem lfs = FileSystem.getLocal(job).getRaw(); |
| boolean madeDir = false; |
| try { |
| madeDir = lfs.mkdirs(workDir); |
| } catch (FileAlreadyExistsException e) { |
| // Since all tasks will be running in their own JVM, the race condition |
| // exists where multiple tasks could be trying to create this directory |
| // at the same time. If this task loses the race, it's okay because |
| // the directory already exists. |
| madeDir = true; |
| workDir = lDirAlloc.getLocalPathToRead("work", job); |
| } |
| if (!madeDir) { |
| throw new IOException("Mkdirs failed to create " |
| + workDir.toString()); |
| } |
| } |
| job.set(MRJobConfig.JOB_LOCAL_DIR,workDir.toString()); |
| } |
| |
| private static JobConf configureTask(Task task, Credentials credentials, |
| Token<JobTokenIdentifier> jt) throws IOException { |
| final JobConf job = new JobConf(MRJobConfig.JOB_CONF_FILE); |
| job.setCredentials(credentials); |
| |
| // set job classloader if configured |
| MRApps.setJobClassLoader(job); |
| |
| String appAttemptIdEnv = System |
| .getenv(MRJobConfig.APPLICATION_ATTEMPT_ID_ENV); |
| LOG.debug("APPLICATION_ATTEMPT_ID: " + appAttemptIdEnv); |
| // Set it in conf, so as to be able to be used the the OutputCommitter. |
| job.setInt(MRJobConfig.APPLICATION_ATTEMPT_ID, Integer |
| .parseInt(appAttemptIdEnv)); |
| |
| // set tcp nodelay |
| job.setBoolean("ipc.client.tcpnodelay", true); |
| job.setClass(MRConfig.TASK_LOCAL_OUTPUT_CLASS, |
| YarnOutputFiles.class, MapOutputFile.class); |
| // set the jobTokenFile into task |
| task.setJobTokenSecret( |
| JobTokenSecretManager.createSecretKey(jt.getPassword())); |
| |
| // setup the child's MRConfig.LOCAL_DIR. |
| configureLocalDirs(task, job); |
| |
| // setup the child's attempt directories |
| // Do the task-type specific localization |
| task.localizeConfiguration(job); |
| |
| // Set up the DistributedCache related configs |
| setupDistributedCacheConfig(job); |
| |
| // Overwrite the localized task jobconf which is linked to in the current |
| // work-dir. |
| Path localTaskFile = new Path(MRJobConfig.JOB_CONF_FILE); |
| writeLocalJobFile(localTaskFile, job); |
| task.setJobFile(localTaskFile.toString()); |
| task.setConf(job); |
| return job; |
| } |
| |
| /** |
| * Set up the DistributedCache related configs to make |
| * {@link DistributedCache#getLocalCacheFiles(Configuration)} |
| * and |
| * {@link DistributedCache#getLocalCacheArchives(Configuration)} |
| * working. |
| * @param job |
| * @throws IOException |
| */ |
| private static void setupDistributedCacheConfig(final JobConf job) |
| throws IOException { |
| |
| String localWorkDir = System.getenv("PWD"); |
| // ^ ^ all symlinks are created in the current work-dir |
| |
| // Update the configuration object with localized archives. |
| URI[] cacheArchives = DistributedCache.getCacheArchives(job); |
| if (cacheArchives != null) { |
| List<String> localArchives = new ArrayList<String>(); |
| for (int i = 0; i < cacheArchives.length; ++i) { |
| URI u = cacheArchives[i]; |
| Path p = new Path(u); |
| Path name = |
| new Path((null == u.getFragment()) ? p.getName() |
| : u.getFragment()); |
| String linkName = name.toUri().getPath(); |
| localArchives.add(new Path(localWorkDir, linkName).toUri().getPath()); |
| } |
| if (!localArchives.isEmpty()) { |
| job.set(MRJobConfig.CACHE_LOCALARCHIVES, StringUtils |
| .arrayToString(localArchives.toArray(new String[localArchives |
| .size()]))); |
| } |
| } |
| |
| // Update the configuration object with localized files. |
| URI[] cacheFiles = DistributedCache.getCacheFiles(job); |
| if (cacheFiles != null) { |
| List<String> localFiles = new ArrayList<String>(); |
| for (int i = 0; i < cacheFiles.length; ++i) { |
| URI u = cacheFiles[i]; |
| Path p = new Path(u); |
| Path name = |
| new Path((null == u.getFragment()) ? p.getName() |
| : u.getFragment()); |
| String linkName = name.toUri().getPath(); |
| localFiles.add(new Path(localWorkDir, linkName).toUri().getPath()); |
| } |
| if (!localFiles.isEmpty()) { |
| job.set(MRJobConfig.CACHE_LOCALFILES, |
| StringUtils.arrayToString(localFiles |
| .toArray(new String[localFiles.size()]))); |
| } |
| } |
| } |
| |
| private static final FsPermission urw_gr = |
| FsPermission.createImmutable((short) 0640); |
| |
| /** |
| * Write the task specific job-configuration file. |
| * @throws IOException |
| */ |
| private static void writeLocalJobFile(Path jobFile, JobConf conf) |
| throws IOException { |
| FileSystem localFs = FileSystem.getLocal(conf); |
| localFs.delete(jobFile); |
| OutputStream out = null; |
| try { |
| out = FileSystem.create(localFs, jobFile, urw_gr); |
| conf.writeXml(out); |
| } finally { |
| IOUtils.cleanup(LOG, out); |
| } |
| } |
| |
| } |