| /** |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| |
| package org.apache.hadoop.mapreduce.v2.app; |
| |
| import java.io.FileNotFoundException; |
| import java.io.IOException; |
| import java.lang.reflect.Constructor; |
| import java.lang.reflect.InvocationTargetException; |
| import java.nio.file.Paths; |
| import java.security.NoSuchAlgorithmException; |
| import java.security.PrivilegedExceptionAction; |
| import java.util.ArrayList; |
| import java.util.HashMap; |
| import java.util.Iterator; |
| import java.util.LinkedList; |
| import java.util.List; |
| import java.util.Map; |
| import java.util.Map.Entry; |
| import java.util.Set; |
| import java.util.concurrent.ConcurrentHashMap; |
| import java.util.concurrent.ScheduledExecutorService; |
| import java.util.concurrent.atomic.AtomicBoolean; |
| import java.util.regex.Matcher; |
| import java.util.regex.Pattern; |
| |
| import javax.crypto.KeyGenerator; |
| |
| import org.apache.commons.io.IOUtils; |
| import org.apache.hadoop.conf.Configuration; |
| import org.apache.hadoop.fs.FSDataInputStream; |
| import org.apache.hadoop.fs.FileSystem; |
| import org.apache.hadoop.fs.Path; |
| import org.apache.hadoop.ipc.CallerContext; |
| import org.apache.hadoop.mapred.FileOutputCommitter; |
| import org.apache.hadoop.mapred.JobConf; |
| import org.apache.hadoop.mapred.LocalContainerLauncher; |
| import org.apache.hadoop.mapred.TaskAttemptListenerImpl; |
| import org.apache.hadoop.mapred.TaskLog; |
| import org.apache.hadoop.mapred.TaskUmbilicalProtocol; |
| import org.apache.hadoop.mapreduce.CryptoUtils; |
| import org.apache.hadoop.mapreduce.JobContext; |
| import org.apache.hadoop.mapreduce.JobStatus.State; |
| import org.apache.hadoop.mapreduce.MRJobConfig; |
| import org.apache.hadoop.mapreduce.OutputCommitter; |
| import org.apache.hadoop.mapreduce.OutputFormat; |
| import org.apache.hadoop.mapreduce.TaskAttemptContext; |
| import org.apache.hadoop.mapreduce.TaskAttemptID; |
| import org.apache.hadoop.mapreduce.TypeConverter; |
| import org.apache.hadoop.mapreduce.jobhistory.AMStartedEvent; |
| import org.apache.hadoop.mapreduce.jobhistory.EventReader; |
| import org.apache.hadoop.mapreduce.jobhistory.EventType; |
| import org.apache.hadoop.mapreduce.jobhistory.HistoryEvent; |
| import org.apache.hadoop.mapreduce.jobhistory.JobHistoryCopyService; |
| import org.apache.hadoop.mapreduce.jobhistory.JobHistoryEvent; |
| import org.apache.hadoop.mapreduce.jobhistory.JobHistoryEventHandler; |
| import org.apache.hadoop.mapreduce.jobhistory.JobHistoryParser; |
| import org.apache.hadoop.mapreduce.jobhistory.JobHistoryParser.JobInfo; |
| import org.apache.hadoop.mapreduce.jobhistory.JobHistoryParser.TaskAttemptInfo; |
| import org.apache.hadoop.mapreduce.jobhistory.JobHistoryParser.TaskInfo; |
| import org.apache.hadoop.mapreduce.security.TokenCache; |
| import org.apache.hadoop.mapreduce.security.token.JobTokenSecretManager; |
| import org.apache.hadoop.mapreduce.task.JobContextImpl; |
| import org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl; |
| import org.apache.hadoop.mapreduce.v2.api.records.AMInfo; |
| import org.apache.hadoop.mapreduce.v2.api.records.JobId; |
| import org.apache.hadoop.mapreduce.v2.api.records.JobReport; |
| import org.apache.hadoop.mapreduce.v2.api.records.JobState; |
| import org.apache.hadoop.mapreduce.v2.api.records.TaskId; |
| import org.apache.hadoop.mapreduce.v2.api.records.TaskState; |
| import org.apache.hadoop.mapreduce.v2.api.records.TaskType; |
| import org.apache.hadoop.mapreduce.v2.app.client.ClientService; |
| import org.apache.hadoop.mapreduce.v2.app.client.MRClientService; |
| import org.apache.hadoop.mapreduce.v2.app.commit.CommitterEvent; |
| import org.apache.hadoop.mapreduce.v2.app.commit.CommitterEventHandler; |
| import org.apache.hadoop.mapreduce.v2.app.commit.CommitterEventType; |
| import org.apache.hadoop.mapreduce.v2.app.job.Job; |
| import org.apache.hadoop.mapreduce.v2.app.job.JobStateInternal; |
| import org.apache.hadoop.mapreduce.v2.app.job.Task; |
| import org.apache.hadoop.mapreduce.v2.app.job.TaskAttempt; |
| import org.apache.hadoop.mapreduce.v2.app.job.event.JobEvent; |
| import org.apache.hadoop.mapreduce.v2.app.job.event.JobEventType; |
| import org.apache.hadoop.mapreduce.v2.app.job.event.JobFinishEvent; |
| import org.apache.hadoop.mapreduce.v2.app.job.event.JobStartEvent; |
| import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptEvent; |
| import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptEventType; |
| import org.apache.hadoop.mapreduce.v2.app.job.event.TaskEvent; |
| import org.apache.hadoop.mapreduce.v2.app.job.event.TaskEventType; |
| import org.apache.hadoop.mapreduce.v2.app.job.impl.JobImpl; |
| import org.apache.hadoop.mapreduce.v2.app.launcher.ContainerLauncher; |
| import org.apache.hadoop.mapreduce.v2.app.launcher.ContainerLauncherEvent; |
| import org.apache.hadoop.mapreduce.v2.app.launcher.ContainerLauncherImpl; |
| import org.apache.hadoop.mapreduce.v2.app.local.LocalContainerAllocator; |
| import org.apache.hadoop.mapreduce.v2.app.metrics.MRAppMetrics; |
| import org.apache.hadoop.mapreduce.v2.app.rm.ContainerAllocator; |
| import org.apache.hadoop.mapreduce.v2.app.rm.ContainerAllocatorEvent; |
| import org.apache.hadoop.mapreduce.v2.app.rm.RMCommunicator; |
| import org.apache.hadoop.mapreduce.v2.app.rm.RMContainerAllocator; |
| import org.apache.hadoop.mapreduce.v2.app.rm.RMContainerRequestor; |
| import org.apache.hadoop.mapreduce.v2.app.rm.RMHeartbeatHandler; |
| import org.apache.hadoop.mapreduce.v2.app.rm.preemption.AMPreemptionPolicy; |
| import org.apache.hadoop.mapreduce.v2.app.rm.preemption.NoopAMPreemptionPolicy; |
| import org.apache.hadoop.mapreduce.v2.app.speculate.DefaultSpeculator; |
| import org.apache.hadoop.mapreduce.v2.app.speculate.Speculator; |
| import org.apache.hadoop.mapreduce.v2.app.speculate.SpeculatorEvent; |
| import org.apache.hadoop.mapreduce.v2.jobhistory.JobHistoryUtils; |
| import org.apache.hadoop.mapreduce.v2.util.MRApps; |
| import org.apache.hadoop.mapreduce.v2.util.MRBuilderUtils; |
| import org.apache.hadoop.mapreduce.v2.util.MRWebAppUtil; |
| import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem; |
| import org.apache.hadoop.security.Credentials; |
| import org.apache.hadoop.security.SecurityUtil; |
| import org.apache.hadoop.security.UserGroupInformation; |
| import org.apache.hadoop.security.token.Token; |
| import org.apache.hadoop.service.AbstractService; |
| import org.apache.hadoop.service.CompositeService; |
| import org.apache.hadoop.service.Service; |
| import org.apache.hadoop.service.ServiceOperations; |
| import org.apache.hadoop.util.ExitUtil; |
| import org.apache.hadoop.util.ReflectionUtils; |
| import org.apache.hadoop.util.ShutdownHookManager; |
| import org.apache.hadoop.util.StringInterner; |
| import org.apache.hadoop.yarn.YarnUncaughtExceptionHandler; |
| import org.apache.hadoop.yarn.api.ApplicationConstants; |
| import org.apache.hadoop.yarn.api.ApplicationConstants.Environment; |
| import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; |
| import org.apache.hadoop.yarn.api.records.ApplicationId; |
| import org.apache.hadoop.yarn.api.records.ContainerId; |
| import org.apache.hadoop.yarn.client.api.TimelineClient; |
| import org.apache.hadoop.yarn.client.api.TimelineV2Client; |
| import org.apache.hadoop.yarn.conf.YarnConfiguration; |
| import org.apache.hadoop.yarn.event.AsyncDispatcher; |
| import org.apache.hadoop.yarn.event.Dispatcher; |
| import org.apache.hadoop.yarn.event.Event; |
| import org.apache.hadoop.yarn.event.EventHandler; |
| import org.apache.hadoop.yarn.exceptions.YarnRuntimeException; |
| import org.apache.hadoop.yarn.security.AMRMTokenIdentifier; |
| import org.apache.hadoop.yarn.security.client.ClientToAMTokenSecretManager; |
| import org.apache.hadoop.yarn.util.Clock; |
| import org.apache.hadoop.yarn.util.SystemClock; |
| |
| import org.apache.hadoop.thirdparty.com.google.common.annotations.VisibleForTesting; |
| import org.slf4j.Logger; |
| import org.slf4j.LoggerFactory; |
| |
| /** |
| * The Map-Reduce Application Master. |
| * The state machine is encapsulated in the implementation of Job interface. |
| * All state changes happens via Job interface. Each event |
| * results in a Finite State Transition in Job. |
| * |
| * MR AppMaster is the composition of loosely coupled services. The services |
| * interact with each other via events. The components resembles the |
| * Actors model. The component acts on received event and send out the |
| * events to other components. |
| * This keeps it highly concurrent with no or minimal synchronization needs. |
| * |
| * The events are dispatched by a central Dispatch mechanism. All components |
| * register to the Dispatcher. |
| * |
| * The information is shared across different components using AppContext. |
| */ |
| |
| @SuppressWarnings("rawtypes") |
| public class MRAppMaster extends CompositeService { |
| |
| private static final Logger LOG = LoggerFactory.getLogger(MRAppMaster.class); |
| |
| /** |
| * Priority of the MRAppMaster shutdown hook. |
| */ |
| public static final int SHUTDOWN_HOOK_PRIORITY = 30; |
| public static final String INTERMEDIATE_DATA_ENCRYPTION_ALGO = "HmacSHA1"; |
| |
| private Clock clock; |
| private final long startTime; |
| private final long appSubmitTime; |
| private String appName; |
| private final ApplicationAttemptId appAttemptID; |
| private final ContainerId containerID; |
| private final String nmHost; |
| private final int nmPort; |
| private final int nmHttpPort; |
| protected final MRAppMetrics metrics; |
| private Map<TaskId, TaskInfo> completedTasksFromPreviousRun; |
| private List<AMInfo> amInfos; |
| private AppContext context; |
| private Dispatcher dispatcher; |
| private ClientService clientService; |
| private ContainerAllocator containerAllocator; |
| private ContainerLauncher containerLauncher; |
| private EventHandler<CommitterEvent> committerEventHandler; |
| private Speculator speculator; |
| protected TaskAttemptListener taskAttemptListener; |
| protected JobTokenSecretManager jobTokenSecretManager = |
| new JobTokenSecretManager(); |
| private JobId jobId; |
| private boolean newApiCommitter; |
| private ClassLoader jobClassLoader; |
| private OutputCommitter committer; |
| private JobEventDispatcher jobEventDispatcher; |
| private JobHistoryEventHandler jobHistoryEventHandler; |
| private SpeculatorEventDispatcher speculatorEventDispatcher; |
| private AMPreemptionPolicy preemptionPolicy; |
| private byte[] encryptedSpillKey; |
| |
| // After a task attempt completes from TaskUmbilicalProtocol's point of view, |
| // it will be transitioned to finishing state. |
| // taskAttemptFinishingMonitor is just a timer for attempts in finishing |
| // state. If the attempt stays in finishing state for too long, |
| // taskAttemptFinishingMonitor will notify the attempt via TA_TIMED_OUT |
| // event. |
| private TaskAttemptFinishingMonitor taskAttemptFinishingMonitor; |
| |
| private Job job; |
| private Credentials jobCredentials = new Credentials(); // Filled during init |
| protected UserGroupInformation currentUser; // Will be setup during init |
| |
| @VisibleForTesting |
| protected volatile boolean isLastAMRetry = false; |
| //Something happened and we should shut down right after we start up. |
| boolean errorHappenedShutDown = false; |
| private String shutDownMessage = null; |
| JobStateInternal forcedState = null; |
| private final ScheduledExecutorService logSyncer; |
| |
| private long recoveredJobStartTime = -1L; |
| private static boolean mainStarted = false; |
| |
| @VisibleForTesting |
| protected AtomicBoolean successfullyUnregistered = |
| new AtomicBoolean(false); |
| |
| public MRAppMaster(ApplicationAttemptId applicationAttemptId, |
| ContainerId containerId, String nmHost, int nmPort, int nmHttpPort, |
| long appSubmitTime) { |
| this(applicationAttemptId, containerId, nmHost, nmPort, nmHttpPort, |
| SystemClock.getInstance(), appSubmitTime); |
| } |
| |
| public MRAppMaster(ApplicationAttemptId applicationAttemptId, |
| ContainerId containerId, String nmHost, int nmPort, int nmHttpPort, |
| Clock clock, long appSubmitTime) { |
| super(MRAppMaster.class.getName()); |
| this.clock = clock; |
| this.startTime = clock.getTime(); |
| this.appSubmitTime = appSubmitTime; |
| this.appAttemptID = applicationAttemptId; |
| this.containerID = containerId; |
| this.nmHost = nmHost; |
| this.nmPort = nmPort; |
| this.nmHttpPort = nmHttpPort; |
| this.metrics = MRAppMetrics.create(); |
| logSyncer = TaskLog.createLogSyncer(); |
| LOG.info("Created MRAppMaster for application " + applicationAttemptId); |
| } |
| protected TaskAttemptFinishingMonitor createTaskAttemptFinishingMonitor( |
| EventHandler eventHandler) { |
| TaskAttemptFinishingMonitor monitor = |
| new TaskAttemptFinishingMonitor(eventHandler); |
| return monitor; |
| } |
| |
| @Override |
| protected void serviceInit(final Configuration conf) throws Exception { |
| // create the job classloader if enabled |
| createJobClassLoader(conf); |
| |
| initJobCredentialsAndUGI(conf); |
| |
| dispatcher = createDispatcher(); |
| addIfService(dispatcher); |
| taskAttemptFinishingMonitor = createTaskAttemptFinishingMonitor(dispatcher.getEventHandler()); |
| addIfService(taskAttemptFinishingMonitor); |
| context = new RunningAppContext(conf, taskAttemptFinishingMonitor); |
| |
| // Job name is the same as the app name util we support DAG of jobs |
| // for an app later |
| appName = conf.get(MRJobConfig.JOB_NAME, "<missing app name>"); |
| |
| conf.setInt(MRJobConfig.APPLICATION_ATTEMPT_ID, appAttemptID.getAttemptId()); |
| |
| newApiCommitter = false; |
| jobId = MRBuilderUtils.newJobId(appAttemptID.getApplicationId(), |
| appAttemptID.getApplicationId().getId()); |
| int numReduceTasks = conf.getInt(MRJobConfig.NUM_REDUCES, 0); |
| if ((numReduceTasks > 0 && |
| conf.getBoolean("mapred.reducer.new-api", false)) || |
| (numReduceTasks == 0 && |
| conf.getBoolean("mapred.mapper.new-api", false))) { |
| newApiCommitter = true; |
| LOG.info("Using mapred newApiCommitter."); |
| } |
| |
| boolean copyHistory = false; |
| committer = createOutputCommitter(conf); |
| try { |
| String user = UserGroupInformation.getCurrentUser().getShortUserName(); |
| Path stagingDir = MRApps.getStagingAreaDir(conf, user); |
| FileSystem fs = getFileSystem(conf); |
| |
| boolean stagingExists = fs.exists(stagingDir); |
| Path startCommitFile = MRApps.getStartJobCommitFile(conf, user, jobId); |
| boolean commitStarted = fs.exists(startCommitFile); |
| Path endCommitSuccessFile = MRApps.getEndJobCommitSuccessFile(conf, user, jobId); |
| boolean commitSuccess = fs.exists(endCommitSuccessFile); |
| Path endCommitFailureFile = MRApps.getEndJobCommitFailureFile(conf, user, jobId); |
| boolean commitFailure = fs.exists(endCommitFailureFile); |
| if(!stagingExists) { |
| isLastAMRetry = true; |
| LOG.info("Attempt num: " + appAttemptID.getAttemptId() + |
| " is last retry: " + isLastAMRetry + |
| " because the staging dir doesn't exist."); |
| errorHappenedShutDown = true; |
| forcedState = JobStateInternal.ERROR; |
| shutDownMessage = "Staging dir does not exist " + stagingDir; |
| LOG.error(shutDownMessage); |
| } else if (commitStarted) { |
| //A commit was started so this is the last time, we just need to know |
| // what result we will use to notify, and how we will unregister |
| errorHappenedShutDown = true; |
| isLastAMRetry = true; |
| LOG.info("Attempt num: " + appAttemptID.getAttemptId() + |
| " is last retry: " + isLastAMRetry + |
| " because a commit was started."); |
| copyHistory = true; |
| if (commitSuccess) { |
| shutDownMessage = |
| "Job commit succeeded in a prior MRAppMaster attempt " + |
| "before it crashed. Recovering."; |
| forcedState = JobStateInternal.SUCCEEDED; |
| } else if (commitFailure) { |
| shutDownMessage = |
| "Job commit failed in a prior MRAppMaster attempt " + |
| "before it crashed. Not retrying."; |
| forcedState = JobStateInternal.FAILED; |
| } else { |
| if (isCommitJobRepeatable()) { |
| // cleanup previous half done commits if committer supports |
| // repeatable job commit. |
| errorHappenedShutDown = false; |
| cleanupInterruptedCommit(conf, fs, startCommitFile); |
| } else { |
| //The commit is still pending, commit error |
| shutDownMessage = |
| "Job commit from a prior MRAppMaster attempt is " + |
| "potentially in progress. Preventing multiple commit executions"; |
| forcedState = JobStateInternal.ERROR; |
| } |
| } |
| } |
| } catch (IOException e) { |
| throw new YarnRuntimeException("Error while initializing", e); |
| } |
| |
| if (errorHappenedShutDown) { |
| NoopEventHandler eater = new NoopEventHandler(); |
| //We do not have a JobEventDispatcher in this path |
| dispatcher.register(JobEventType.class, eater); |
| |
| EventHandler<JobHistoryEvent> historyService = null; |
| if (copyHistory) { |
| historyService = |
| createJobHistoryHandler(context); |
| dispatcher.register(org.apache.hadoop.mapreduce.jobhistory.EventType.class, |
| historyService); |
| } else { |
| dispatcher.register(org.apache.hadoop.mapreduce.jobhistory.EventType.class, |
| eater); |
| } |
| |
| if (copyHistory) { |
| // Now that there's a FINISHING state for application on RM to give AMs |
| // plenty of time to clean up after unregister it's safe to clean staging |
| // directory after unregistering with RM. So, we start the staging-dir |
| // cleaner BEFORE the ContainerAllocator so that on shut-down, |
| // ContainerAllocator unregisters first and then the staging-dir cleaner |
| // deletes staging directory. |
| addService(createStagingDirCleaningService()); |
| } |
| |
| // service to allocate containers from RM (if non-uber) or to fake it (uber) |
| containerAllocator = createContainerAllocator(null, context); |
| addIfService(containerAllocator); |
| dispatcher.register(ContainerAllocator.EventType.class, containerAllocator); |
| |
| if (copyHistory) { |
| // Add the JobHistoryEventHandler last so that it is properly stopped first. |
| // This will guarantee that all history-events are flushed before AM goes |
| // ahead with shutdown. |
| // Note: Even though JobHistoryEventHandler is started last, if any |
| // component creates a JobHistoryEvent in the meanwhile, it will be just be |
| // queued inside the JobHistoryEventHandler |
| addIfService(historyService); |
| |
| JobHistoryCopyService cpHist = new JobHistoryCopyService(appAttemptID, |
| dispatcher.getEventHandler()); |
| addIfService(cpHist); |
| } |
| } else { |
| |
| //service to handle requests from JobClient |
| clientService = createClientService(context); |
| // Init ClientService separately so that we stop it separately, since this |
| // service needs to wait some time before it stops so clients can know the |
| // final states |
| clientService.init(conf); |
| |
| containerAllocator = createContainerAllocator(clientService, context); |
| |
| //service to handle the output committer |
| committerEventHandler = createCommitterEventHandler(context, committer); |
| addIfService(committerEventHandler); |
| |
| //policy handling preemption requests from RM |
| callWithJobClassLoader(conf, new Action<Void>() { |
| public Void call(Configuration conf) { |
| preemptionPolicy = createPreemptionPolicy(conf); |
| preemptionPolicy.init(context); |
| return null; |
| } |
| }); |
| |
| //service to handle requests to TaskUmbilicalProtocol |
| taskAttemptListener = createTaskAttemptListener(context, preemptionPolicy); |
| addIfService(taskAttemptListener); |
| |
| //service to log job history events |
| EventHandler<JobHistoryEvent> historyService = |
| createJobHistoryHandler(context); |
| dispatcher.register(org.apache.hadoop.mapreduce.jobhistory.EventType.class, |
| historyService); |
| |
| this.jobEventDispatcher = new JobEventDispatcher(); |
| |
| //register the event dispatchers |
| dispatcher.register(JobEventType.class, jobEventDispatcher); |
| dispatcher.register(TaskEventType.class, new TaskEventDispatcher()); |
| dispatcher.register(TaskAttemptEventType.class, |
| new TaskAttemptEventDispatcher()); |
| dispatcher.register(CommitterEventType.class, committerEventHandler); |
| |
| if (conf.getBoolean(MRJobConfig.MAP_SPECULATIVE, false) |
| || conf.getBoolean(MRJobConfig.REDUCE_SPECULATIVE, false)) { |
| //optional service to speculate on task attempts' progress |
| speculator = createSpeculator(conf, context); |
| addIfService(speculator); |
| } |
| |
| speculatorEventDispatcher = new SpeculatorEventDispatcher(conf); |
| dispatcher.register(Speculator.EventType.class, |
| speculatorEventDispatcher); |
| |
| // Now that there's a FINISHING state for application on RM to give AMs |
| // plenty of time to clean up after unregister it's safe to clean staging |
| // directory after unregistering with RM. So, we start the staging-dir |
| // cleaner BEFORE the ContainerAllocator so that on shut-down, |
| // ContainerAllocator unregisters first and then the staging-dir cleaner |
| // deletes staging directory. |
| addService(createStagingDirCleaningService()); |
| |
| // service to allocate containers from RM (if non-uber) or to fake it (uber) |
| addIfService(containerAllocator); |
| dispatcher.register(ContainerAllocator.EventType.class, containerAllocator); |
| |
| // corresponding service to launch allocated containers via NodeManager |
| containerLauncher = createContainerLauncher(context); |
| addIfService(containerLauncher); |
| dispatcher.register(ContainerLauncher.EventType.class, containerLauncher); |
| |
| // Add the JobHistoryEventHandler last so that it is properly stopped first. |
| // This will guarantee that all history-events are flushed before AM goes |
| // ahead with shutdown. |
| // Note: Even though JobHistoryEventHandler is started last, if any |
| // component creates a JobHistoryEvent in the meanwhile, it will be just be |
| // queued inside the JobHistoryEventHandler |
| addIfService(historyService); |
| } |
| super.serviceInit(conf); |
| } // end of init() |
| |
| protected Dispatcher createDispatcher() { |
| return new AsyncDispatcher(); |
| } |
| |
| private boolean isCommitJobRepeatable() throws IOException { |
| boolean isRepeatable = false; |
| Configuration conf = getConfig(); |
| if (committer != null) { |
| final JobContext jobContext = getJobContextFromConf(conf); |
| |
| isRepeatable = callWithJobClassLoader(conf, |
| new ExceptionAction<Boolean>() { |
| public Boolean call(Configuration conf) throws IOException { |
| return committer.isCommitJobRepeatable(jobContext); |
| } |
| }); |
| } |
| return isRepeatable; |
| } |
| |
| private JobContext getJobContextFromConf(Configuration conf) { |
| if (newApiCommitter) { |
| return new JobContextImpl(conf, TypeConverter.fromYarn(getJobId())); |
| } else { |
| return new org.apache.hadoop.mapred.JobContextImpl( |
| new JobConf(conf), TypeConverter.fromYarn(getJobId())); |
| } |
| } |
| |
| private void cleanupInterruptedCommit(Configuration conf, |
| FileSystem fs, Path startCommitFile) throws IOException { |
| LOG.info("Delete startJobCommitFile in case commit is not finished as " + |
| "successful or failed."); |
| fs.delete(startCommitFile, false); |
| } |
| |
| private OutputCommitter createOutputCommitter(Configuration conf) { |
| return callWithJobClassLoader(conf, new Action<OutputCommitter>() { |
| public OutputCommitter call(Configuration conf) { |
| OutputCommitter committer = null; |
| |
| LOG.info("OutputCommitter set in config " |
| + conf.get("mapred.output.committer.class")); |
| |
| if (newApiCommitter) { |
| org.apache.hadoop.mapreduce.v2.api.records.TaskId taskID = |
| MRBuilderUtils.newTaskId(jobId, 0, TaskType.MAP); |
| org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptId attemptID = |
| MRBuilderUtils.newTaskAttemptId(taskID, 0); |
| TaskAttemptContext taskContext = new TaskAttemptContextImpl(conf, |
| TypeConverter.fromYarn(attemptID)); |
| OutputFormat outputFormat; |
| try { |
| outputFormat = ReflectionUtils.newInstance(taskContext |
| .getOutputFormatClass(), conf); |
| committer = outputFormat.getOutputCommitter(taskContext); |
| } catch (Exception e) { |
| throw new YarnRuntimeException(e); |
| } |
| } else { |
| committer = ReflectionUtils.newInstance(conf.getClass( |
| "mapred.output.committer.class", FileOutputCommitter.class, |
| org.apache.hadoop.mapred.OutputCommitter.class), conf); |
| } |
| LOG.info("OutputCommitter is " + committer.getClass().getName()); |
| return committer; |
| } |
| }); |
| } |
| |
| protected AMPreemptionPolicy createPreemptionPolicy(Configuration conf) { |
| return ReflectionUtils.newInstance(conf.getClass( |
| MRJobConfig.MR_AM_PREEMPTION_POLICY, |
| NoopAMPreemptionPolicy.class, AMPreemptionPolicy.class), conf); |
| } |
| |
| private boolean isJobNamePatternMatch(JobConf conf, String jobTempDir) { |
| // Matched staging files should be preserved after job is finished. |
| if (conf.getKeepTaskFilesPattern() != null && jobTempDir != null) { |
| java.nio.file.Path pathName = Paths.get(jobTempDir).getFileName(); |
| if (pathName != null) { |
| String jobFileName = pathName.toString(); |
| Pattern pattern = Pattern.compile(conf.getKeepTaskFilesPattern()); |
| Matcher matcher = pattern.matcher(jobFileName); |
| return matcher.find(); |
| } |
| } |
| return false; |
| } |
| |
| private boolean isKeepFailedTaskFiles(JobConf conf) { |
| // TODO: Decide which failed task files that should |
| // be kept are in application log directory. |
| return conf.getKeepFailedTaskFiles(); |
| } |
| |
| protected boolean keepJobFiles(JobConf conf, String jobTempDir) { |
| return isJobNamePatternMatch(conf, jobTempDir) |
| || isKeepFailedTaskFiles(conf); |
| } |
| |
| /** |
| * Create the default file System for this job. |
| * @param conf the conf object |
| * @return the default filesystem for this job |
| * @throws IOException |
| */ |
| protected FileSystem getFileSystem(Configuration conf) throws IOException { |
| return FileSystem.get(conf); |
| } |
| |
| protected Credentials getCredentials() { |
| return jobCredentials; |
| } |
| |
| /** |
| * clean up staging directories for the job. |
| * @throws IOException |
| */ |
| public void cleanupStagingDir() throws IOException { |
| /* make sure we clean the staging files */ |
| String jobTempDir = getConfig().get(MRJobConfig.MAPREDUCE_JOB_DIR); |
| FileSystem fs = getFileSystem(getConfig()); |
| try { |
| if (!keepJobFiles(new JobConf(getConfig()), jobTempDir)) { |
| if (jobTempDir == null) { |
| LOG.warn("Job Staging directory is null"); |
| return; |
| } |
| Path jobTempDirPath = new Path(jobTempDir); |
| LOG.info("Deleting staging directory " + FileSystem.getDefaultUri(getConfig()) + |
| " " + jobTempDir); |
| fs.delete(jobTempDirPath, true); |
| } |
| } catch(IOException io) { |
| LOG.error("Failed to cleanup staging dir " + jobTempDir, io); |
| } |
| } |
| |
| /** |
| * Exit call. Just in a function call to enable testing. |
| */ |
| protected void sysexit() { |
| System.exit(0); |
| } |
| |
| @VisibleForTesting |
| public void shutDownJob() { |
| // job has finished |
| // this is the only job, so shut down the Appmaster |
| // note in a workflow scenario, this may lead to creation of a new |
| // job (FIXME?) |
| |
| JobEndNotifier notifier = null; |
| if (getConfig().get(MRJobConfig.MR_JOB_END_NOTIFICATION_URL) != null) { |
| notifier = new JobEndNotifier(); |
| notifier.setConf(getConfig()); |
| } |
| |
| try { |
| //if isLastAMRetry comes as true, should never set it to false |
| if ( !isLastAMRetry){ |
| if (((JobImpl)job).getInternalState() != JobStateInternal.REBOOT) { |
| LOG.info("Job finished cleanly, recording last MRAppMaster retry"); |
| isLastAMRetry = true; |
| } |
| } |
| notifyIsLastAMRetry(isLastAMRetry); |
| // Stop all services |
| // This will also send the final report to the ResourceManager |
| LOG.info("Calling stop for all the services"); |
| MRAppMaster.this.stop(); |
| |
| if (isLastAMRetry && notifier != null) { |
| // Send job-end notification when it is safe to report termination to |
| // users and it is the last AM retry |
| sendJobEndNotify(notifier); |
| notifier = null; |
| } |
| |
| try { |
| Thread.sleep(5000); |
| } catch (InterruptedException e) { |
| e.printStackTrace(); |
| } |
| clientService.stop(); |
| } catch (Throwable t) { |
| LOG.warn("Graceful stop failed. Exiting.. ", t); |
| exitMRAppMaster(1, t); |
| } finally { |
| if (isLastAMRetry && notifier != null) { |
| sendJobEndNotify(notifier); |
| } |
| } |
| exitMRAppMaster(0, null); |
| } |
| |
| private void sendJobEndNotify(JobEndNotifier notifier) { |
| try { |
| LOG.info("Job end notification started for jobID : " |
| + job.getReport().getJobId()); |
| // If unregistration fails, the final state is unavailable. However, |
| // at the last AM Retry, the client will finally be notified FAILED |
| // from RM, so we should let users know FAILED via notifier as well |
| JobReport report = job.getReport(); |
| if (!context.hasSuccessfullyUnregistered()) { |
| report.setJobState(JobState.FAILED); |
| } |
| notifier.notify(report); |
| } catch (InterruptedException ie) { |
| LOG.warn("Job end notification interrupted for jobID : " |
| + job.getReport().getJobId(), ie); |
| } |
| } |
| |
| /** MRAppMaster exit method which has been instrumented for both runtime and |
| * unit testing. |
| * If the main thread has not been started, this method was called from a |
| * test. In that case, configure the ExitUtil object to not exit the JVM. |
| * |
| * @param status integer indicating exit status |
| * @param t throwable exception that could be null |
| */ |
| private void exitMRAppMaster(int status, Throwable t) { |
| if (!mainStarted) { |
| ExitUtil.disableSystemExit(); |
| } |
| try { |
| if (t != null) { |
| ExitUtil.terminate(status, t); |
| } else { |
| ExitUtil.terminate(status); |
| } |
| } catch (ExitUtil.ExitException ee) { |
| // ExitUtil.ExitException is only thrown from the ExitUtil test code when |
| // SystemExit has been disabled. It is always thrown in in the test code, |
| // even when no error occurs. Ignore the exception so that tests don't |
| // need to handle it. |
| } |
| } |
| |
| private class JobFinishEventHandler implements EventHandler<JobFinishEvent> { |
| @Override |
| public void handle(JobFinishEvent event) { |
| // Create a new thread to shutdown the AM. We should not do it in-line |
| // to avoid blocking the dispatcher itself. |
| new Thread() { |
| |
| @Override |
| public void run() { |
| shutDownJob(); |
| } |
| }.start(); |
| } |
| } |
| |
| /** |
| * create an event handler that handles the job finish event. |
| * @return the job finish event handler. |
| */ |
| protected EventHandler<JobFinishEvent> createJobFinishEventHandler() { |
| return new JobFinishEventHandler(); |
| } |
| |
| /** Create and initialize (but don't start) a single job. |
| * @param forcedState a state to force the job into or null for normal operation. |
| * @param diagnostic a diagnostic message to include with the job. |
| */ |
| protected Job createJob(Configuration conf, JobStateInternal forcedState, |
| String diagnostic) { |
| |
| // create single job |
| Job newJob = |
| new JobImpl(jobId, appAttemptID, conf, dispatcher.getEventHandler(), |
| taskAttemptListener, jobTokenSecretManager, jobCredentials, clock, |
| completedTasksFromPreviousRun, metrics, |
| committer, newApiCommitter, |
| currentUser.getUserName(), appSubmitTime, amInfos, context, |
| forcedState, diagnostic); |
| ((RunningAppContext) context).jobs.put(newJob.getID(), newJob); |
| |
| dispatcher.register(JobFinishEvent.Type.class, |
| createJobFinishEventHandler()); |
| return newJob; |
| } // end createJob() |
| |
| |
| /** |
| * Obtain the tokens needed by the job and put them in the UGI |
| * @param conf |
| */ |
| protected void initJobCredentialsAndUGI(Configuration conf) { |
| |
| try { |
| this.currentUser = UserGroupInformation.getCurrentUser(); |
| this.jobCredentials = ((JobConf)conf).getCredentials(); |
| if (CryptoUtils.isEncryptedSpillEnabled(conf)) { |
| int keyLen = conf.getInt( |
| MRJobConfig.MR_ENCRYPTED_INTERMEDIATE_DATA_KEY_SIZE_BITS, |
| MRJobConfig |
| .DEFAULT_MR_ENCRYPTED_INTERMEDIATE_DATA_KEY_SIZE_BITS); |
| KeyGenerator keyGen = |
| KeyGenerator.getInstance(INTERMEDIATE_DATA_ENCRYPTION_ALGO); |
| keyGen.init(keyLen); |
| encryptedSpillKey = keyGen.generateKey().getEncoded(); |
| } else { |
| encryptedSpillKey = new byte[] {0}; |
| } |
| } catch (IOException e) { |
| throw new YarnRuntimeException(e); |
| } catch (NoSuchAlgorithmException e) { |
| throw new YarnRuntimeException(e); |
| } |
| } |
| |
| protected EventHandler<JobHistoryEvent> createJobHistoryHandler( |
| AppContext context) { |
| this.jobHistoryEventHandler = new JobHistoryEventHandler(context, |
| getStartCount()); |
| return this.jobHistoryEventHandler; |
| } |
| |
| protected AbstractService createStagingDirCleaningService() { |
| return new StagingDirCleaningService(); |
| } |
| |
| protected Speculator createSpeculator(Configuration conf, |
| final AppContext context) { |
| return callWithJobClassLoader(conf, new Action<Speculator>() { |
| public Speculator call(Configuration conf) { |
| Class<? extends Speculator> speculatorClass; |
| try { |
| speculatorClass |
| // "yarn.mapreduce.job.speculator.class" |
| = conf.getClass(MRJobConfig.MR_AM_JOB_SPECULATOR, |
| DefaultSpeculator.class, |
| Speculator.class); |
| Constructor<? extends Speculator> speculatorConstructor |
| = speculatorClass.getConstructor |
| (Configuration.class, AppContext.class); |
| Speculator result = speculatorConstructor.newInstance(conf, context); |
| |
| return result; |
| } catch (InstantiationException ex) { |
| LOG.error("Can't make a speculator -- check " |
| + MRJobConfig.MR_AM_JOB_SPECULATOR, ex); |
| throw new YarnRuntimeException(ex); |
| } catch (IllegalAccessException ex) { |
| LOG.error("Can't make a speculator -- check " |
| + MRJobConfig.MR_AM_JOB_SPECULATOR, ex); |
| throw new YarnRuntimeException(ex); |
| } catch (InvocationTargetException ex) { |
| LOG.error("Can't make a speculator -- check " |
| + MRJobConfig.MR_AM_JOB_SPECULATOR, ex); |
| throw new YarnRuntimeException(ex); |
| } catch (NoSuchMethodException ex) { |
| LOG.error("Can't make a speculator -- check " |
| + MRJobConfig.MR_AM_JOB_SPECULATOR, ex); |
| throw new YarnRuntimeException(ex); |
| } |
| } |
| }); |
| } |
| |
| protected TaskAttemptListener createTaskAttemptListener(AppContext context, |
| AMPreemptionPolicy preemptionPolicy) { |
| TaskAttemptListener lis = |
| new TaskAttemptListenerImpl(context, jobTokenSecretManager, |
| getRMHeartbeatHandler(), preemptionPolicy, encryptedSpillKey); |
| return lis; |
| } |
| |
| protected EventHandler<CommitterEvent> createCommitterEventHandler( |
| AppContext context, OutputCommitter committer) { |
| return new CommitterEventHandler(context, committer, |
| getRMHeartbeatHandler(), jobClassLoader); |
| } |
| |
| protected ContainerAllocator createContainerAllocator( |
| final ClientService clientService, final AppContext context) { |
| return new ContainerAllocatorRouter(clientService, context); |
| } |
| |
| protected RMHeartbeatHandler getRMHeartbeatHandler() { |
| return (RMHeartbeatHandler) containerAllocator; |
| } |
| |
| protected ContainerLauncher |
| createContainerLauncher(final AppContext context) { |
| return new ContainerLauncherRouter(context); |
| } |
| |
| //TODO:should have an interface for MRClientService |
| protected ClientService createClientService(AppContext context) { |
| return new MRClientService(context); |
| } |
| |
| public ApplicationId getAppID() { |
| return appAttemptID.getApplicationId(); |
| } |
| |
| public ApplicationAttemptId getAttemptID() { |
| return appAttemptID; |
| } |
| |
| public JobId getJobId() { |
| return jobId; |
| } |
| |
| public OutputCommitter getCommitter() { |
| return committer; |
| } |
| |
| public boolean isNewApiCommitter() { |
| return newApiCommitter; |
| } |
| |
| public int getStartCount() { |
| return appAttemptID.getAttemptId(); |
| } |
| |
| public AppContext getContext() { |
| return context; |
| } |
| |
| public Dispatcher getDispatcher() { |
| return dispatcher; |
| } |
| |
| public Map<TaskId, TaskInfo> getCompletedTaskFromPreviousRun() { |
| return completedTasksFromPreviousRun; |
| } |
| |
| public List<AMInfo> getAllAMInfos() { |
| return amInfos; |
| } |
| |
| public ContainerAllocator getContainerAllocator() { |
| return containerAllocator; |
| } |
| |
| public ContainerLauncher getContainerLauncher() { |
| return containerLauncher; |
| } |
| |
| public TaskAttemptListener getTaskAttemptListener() { |
| return taskAttemptListener; |
| } |
| |
| public Boolean isLastAMRetry() { |
| return isLastAMRetry; |
| } |
| |
| /** |
| * By the time life-cycle of this router starts, job-init would have already |
| * happened. |
| */ |
| private final class ContainerAllocatorRouter extends AbstractService |
| implements ContainerAllocator, RMHeartbeatHandler { |
| private final ClientService clientService; |
| private final AppContext context; |
| private ContainerAllocator containerAllocator; |
| |
| ContainerAllocatorRouter(ClientService clientService, |
| AppContext context) { |
| super(ContainerAllocatorRouter.class.getName()); |
| this.clientService = clientService; |
| this.context = context; |
| } |
| |
| @Override |
| protected void serviceStart() throws Exception { |
| if (job.isUber()) { |
| MRApps.setupDistributedCacheLocal(getConfig()); |
| this.containerAllocator = new LocalContainerAllocator( |
| this.clientService, this.context, nmHost, nmPort, nmHttpPort |
| , containerID); |
| } else { |
| this.containerAllocator = new RMContainerAllocator( |
| this.clientService, this.context, preemptionPolicy); |
| } |
| ((Service)this.containerAllocator).init(getConfig()); |
| ((Service)this.containerAllocator).start(); |
| super.serviceStart(); |
| } |
| |
| @Override |
| protected void serviceStop() throws Exception { |
| ServiceOperations.stop((Service) this.containerAllocator); |
| super.serviceStop(); |
| } |
| |
| @Override |
| public void handle(ContainerAllocatorEvent event) { |
| this.containerAllocator.handle(event); |
| } |
| |
| public void setSignalled(boolean isSignalled) { |
| ((RMCommunicator) containerAllocator).setSignalled(isSignalled); |
| } |
| |
| public void setShouldUnregister(boolean shouldUnregister) { |
| ((RMCommunicator) containerAllocator).setShouldUnregister(shouldUnregister); |
| } |
| |
| @Override |
| public long getLastHeartbeatTime() { |
| return ((RMCommunicator) containerAllocator).getLastHeartbeatTime(); |
| } |
| |
| @Override |
| public void runOnNextHeartbeat(Runnable callback) { |
| ((RMCommunicator) containerAllocator).runOnNextHeartbeat(callback); |
| } |
| } |
| |
| /** |
| * By the time life-cycle of this router starts, job-init would have already |
| * happened. |
| */ |
| private final class ContainerLauncherRouter extends AbstractService |
| implements ContainerLauncher { |
| private final AppContext context; |
| private ContainerLauncher containerLauncher; |
| |
| ContainerLauncherRouter(AppContext context) { |
| super(ContainerLauncherRouter.class.getName()); |
| this.context = context; |
| } |
| |
| @Override |
| protected void serviceStart() throws Exception { |
| if (job.isUber()) { |
| this.containerLauncher = new LocalContainerLauncher(context, |
| (TaskUmbilicalProtocol) taskAttemptListener, jobClassLoader); |
| ((LocalContainerLauncher) this.containerLauncher) |
| .setEncryptedSpillKey(encryptedSpillKey); |
| } else { |
| this.containerLauncher = new ContainerLauncherImpl(context); |
| } |
| ((Service)this.containerLauncher).init(getConfig()); |
| ((Service)this.containerLauncher).start(); |
| super.serviceStart(); |
| } |
| |
| @Override |
| public void handle(ContainerLauncherEvent event) { |
| this.containerLauncher.handle(event); |
| } |
| |
| @Override |
| protected void serviceStop() throws Exception { |
| ServiceOperations.stop((Service) this.containerLauncher); |
| super.serviceStop(); |
| } |
| } |
| |
| private final class StagingDirCleaningService extends AbstractService { |
| StagingDirCleaningService() { |
| super(StagingDirCleaningService.class.getName()); |
| } |
| |
| @Override |
| protected void serviceStop() throws Exception { |
| try { |
| if(isLastAMRetry) { |
| cleanupStagingDir(); |
| } else { |
| LOG.info("Skipping cleaning up the staging dir. " |
| + "assuming AM will be retried."); |
| } |
| } catch (IOException io) { |
| LOG.error("Failed to cleanup staging dir: ", io); |
| } |
| super.serviceStop(); |
| } |
| } |
| |
| public class RunningAppContext implements AppContext { |
| |
| private final Map<JobId, Job> jobs = new ConcurrentHashMap<JobId, Job>(); |
| private final Configuration conf; |
| private final ClusterInfo clusterInfo = new ClusterInfo(); |
| private final ClientToAMTokenSecretManager clientToAMTokenSecretManager; |
| private TimelineClient timelineClient = null; |
| private TimelineV2Client timelineV2Client = null; |
| private String historyUrl = null; |
| |
| private final TaskAttemptFinishingMonitor taskAttemptFinishingMonitor; |
| |
| public RunningAppContext(Configuration config, |
| TaskAttemptFinishingMonitor taskAttemptFinishingMonitor) { |
| this.conf = config; |
| this.clientToAMTokenSecretManager = |
| new ClientToAMTokenSecretManager(appAttemptID, null); |
| this.taskAttemptFinishingMonitor = taskAttemptFinishingMonitor; |
| if (conf.getBoolean(MRJobConfig.MAPREDUCE_JOB_EMIT_TIMELINE_DATA, |
| MRJobConfig.DEFAULT_MAPREDUCE_JOB_EMIT_TIMELINE_DATA) |
| && YarnConfiguration.timelineServiceEnabled(conf)) { |
| |
| if (YarnConfiguration.timelineServiceV2Enabled(conf)) { |
| // create new version TimelineClient |
| timelineV2Client = TimelineV2Client.createTimelineClient( |
| appAttemptID.getApplicationId()); |
| } else { |
| timelineClient = TimelineClient.createTimelineClient(); |
| } |
| } |
| } |
| |
| @Override |
| public ApplicationAttemptId getApplicationAttemptId() { |
| return appAttemptID; |
| } |
| |
| @Override |
| public ApplicationId getApplicationID() { |
| return appAttemptID.getApplicationId(); |
| } |
| |
| @Override |
| public String getApplicationName() { |
| return appName; |
| } |
| |
| @Override |
| public long getStartTime() { |
| return startTime; |
| } |
| |
| @Override |
| public Job getJob(JobId jobID) { |
| return jobs.get(jobID); |
| } |
| |
| @Override |
| public Map<JobId, Job> getAllJobs() { |
| return jobs; |
| } |
| |
| @Override |
| public EventHandler<Event> getEventHandler() { |
| return dispatcher.getEventHandler(); |
| } |
| |
| @Override |
| public CharSequence getUser() { |
| return this.conf.get(MRJobConfig.USER_NAME); |
| } |
| |
| @Override |
| public Clock getClock() { |
| return clock; |
| } |
| |
| @Override |
| public ClusterInfo getClusterInfo() { |
| return this.clusterInfo; |
| } |
| |
| @Override |
| public Set<String> getBlacklistedNodes() { |
| return ((RMContainerRequestor) containerAllocator).getBlacklistedNodes(); |
| } |
| |
| @Override |
| public ClientToAMTokenSecretManager getClientToAMTokenSecretManager() { |
| return clientToAMTokenSecretManager; |
| } |
| |
| @Override |
| public boolean isLastAMRetry(){ |
| return isLastAMRetry; |
| } |
| |
| @Override |
| public boolean hasSuccessfullyUnregistered() { |
| return successfullyUnregistered.get(); |
| } |
| |
| public void markSuccessfulUnregistration() { |
| successfullyUnregistered.set(true); |
| } |
| |
| public void resetIsLastAMRetry() { |
| isLastAMRetry = false; |
| } |
| |
| @Override |
| public String getNMHostname() { |
| return nmHost; |
| } |
| |
| @Override |
| public TaskAttemptFinishingMonitor getTaskAttemptFinishingMonitor() { |
| return taskAttemptFinishingMonitor; |
| } |
| |
| public TimelineClient getTimelineClient() { |
| return timelineClient; |
| } |
| |
| // Get Timeline Collector's address (get sync from RM) |
| public TimelineV2Client getTimelineV2Client() { |
| return timelineV2Client; |
| } |
| |
| @Override |
| public String getHistoryUrl() { |
| return historyUrl; |
| } |
| |
| @Override |
| public void setHistoryUrl(String historyUrl) { |
| this.historyUrl = historyUrl; |
| } |
| } |
| |
| @SuppressWarnings("unchecked") |
| @Override |
| protected void serviceStart() throws Exception { |
| |
| amInfos = new LinkedList<AMInfo>(); |
| completedTasksFromPreviousRun = new HashMap<TaskId, TaskInfo>(); |
| processRecovery(); |
| cleanUpPreviousJobOutput(); |
| |
| // Current an AMInfo for the current AM generation. |
| AMInfo amInfo = |
| MRBuilderUtils.newAMInfo(appAttemptID, startTime, containerID, nmHost, |
| nmPort, nmHttpPort); |
| |
| // /////////////////// Create the job itself. |
| job = createJob(getConfig(), forcedState, shutDownMessage); |
| |
| // End of creating the job. |
| |
| // Send out an MR AM inited event for all previous AMs. |
| for (AMInfo info : amInfos) { |
| dispatcher.getEventHandler().handle( |
| new JobHistoryEvent(job.getID(), new AMStartedEvent(info |
| .getAppAttemptId(), info.getStartTime(), info.getContainerId(), |
| info.getNodeManagerHost(), info.getNodeManagerPort(), info |
| .getNodeManagerHttpPort(), appSubmitTime))); |
| } |
| |
| // Send out an MR AM inited event for this AM. |
| dispatcher.getEventHandler().handle( |
| new JobHistoryEvent(job.getID(), new AMStartedEvent(amInfo |
| .getAppAttemptId(), amInfo.getStartTime(), amInfo.getContainerId(), |
| amInfo.getNodeManagerHost(), amInfo.getNodeManagerPort(), amInfo |
| .getNodeManagerHttpPort(), this.forcedState == null ? null |
| : this.forcedState.toString(), appSubmitTime))); |
| amInfos.add(amInfo); |
| |
| // metrics system init is really init & start. |
| // It's more test friendly to put it here. |
| DefaultMetricsSystem.initialize("MRAppMaster"); |
| |
| boolean initFailed = false; |
| if (!errorHappenedShutDown) { |
| // create a job event for job initialization |
| JobEvent initJobEvent = new JobEvent(job.getID(), JobEventType.JOB_INIT); |
| // Send init to the job (this does NOT trigger job execution) |
| // This is a synchronous call, not an event through dispatcher. We want |
| // job-init to be done completely here. |
| jobEventDispatcher.handle(initJobEvent); |
| |
| // If job is still not initialized, an error happened during |
| // initialization. Must complete starting all of the services so failure |
| // events can be processed. |
| initFailed = (((JobImpl)job).getInternalState() != JobStateInternal.INITED); |
| |
| // JobImpl's InitTransition is done (call above is synchronous), so the |
| // "uber-decision" (MR-1220) has been made. Query job and switch to |
| // ubermode if appropriate (by registering different container-allocator |
| // and container-launcher services/event-handlers). |
| |
| if (job.isUber()) { |
| speculatorEventDispatcher.disableSpeculation(); |
| LOG.info("MRAppMaster uberizing job " + job.getID() |
| + " in local container (\"uber-AM\") on node " |
| + nmHost + ":" + nmPort + "."); |
| } else { |
| // send init to speculator only for non-uber jobs. |
| // This won't yet start as dispatcher isn't started yet. |
| dispatcher.getEventHandler().handle( |
| new SpeculatorEvent(job.getID(), clock.getTime())); |
| LOG.info("MRAppMaster launching normal, non-uberized, multi-container " |
| + "job " + job.getID() + "."); |
| } |
| // Start ClientService here, since it's not initialized if |
| // errorHappenedShutDown is true |
| clientService.start(); |
| } |
| //start all the components |
| super.serviceStart(); |
| |
| // finally set the job classloader |
| MRApps.setClassLoader(jobClassLoader, getConfig()); |
| |
| if (initFailed) { |
| JobEvent initFailedEvent = new JobEvent(job.getID(), JobEventType.JOB_INIT_FAILED); |
| jobEventDispatcher.handle(initFailedEvent); |
| } else { |
| // All components have started, start the job. |
| startJobs(); |
| } |
| } |
| |
| @Override |
| public void stop() { |
| super.stop(); |
| } |
| |
| private boolean isRecoverySupported() throws IOException { |
| boolean isSupported = false; |
| Configuration conf = getConfig(); |
| if (committer != null) { |
| final JobContext _jobContext = getJobContextFromConf(conf); |
| isSupported = callWithJobClassLoader(conf, |
| new ExceptionAction<Boolean>() { |
| public Boolean call(Configuration conf) throws IOException { |
| return committer.isRecoverySupported(_jobContext); |
| } |
| }); |
| } |
| return isSupported; |
| } |
| |
| private void processRecovery() throws IOException{ |
| boolean attemptRecovery = shouldAttemptRecovery(); |
| boolean recoverySucceeded = true; |
| if (attemptRecovery) { |
| LOG.info("Attempting to recover."); |
| try { |
| parsePreviousJobHistory(); |
| } catch (IOException e) { |
| LOG.warn("Unable to parse prior job history, aborting recovery", e); |
| recoverySucceeded = false; |
| } |
| } |
| |
| if (!isFirstAttempt() && (!attemptRecovery || !recoverySucceeded)) { |
| amInfos.addAll(readJustAMInfos()); |
| } |
| } |
| |
| private boolean isFirstAttempt() { |
| return appAttemptID.getAttemptId() == 1; |
| } |
| |
| /** |
| * Check if the current job attempt should try to recover from previous |
| * job attempts if any. |
| */ |
| private boolean shouldAttemptRecovery() throws IOException { |
| if (isFirstAttempt()) { |
| return false; // no need to recover on the first attempt |
| } |
| |
| boolean recoveryEnabled = getConfig().getBoolean( |
| MRJobConfig.MR_AM_JOB_RECOVERY_ENABLE, |
| MRJobConfig.MR_AM_JOB_RECOVERY_ENABLE_DEFAULT); |
| if (!recoveryEnabled) { |
| LOG.info("Not attempting to recover. Recovery disabled. To enable " + |
| "recovery, set " + MRJobConfig.MR_AM_JOB_RECOVERY_ENABLE); |
| return false; |
| } |
| |
| boolean recoverySupportedByCommitter = isRecoverySupported(); |
| if (!recoverySupportedByCommitter) { |
| LOG.info("Not attempting to recover. Recovery is not supported by " + |
| committer.getClass() + ". Use an OutputCommitter that supports" + |
| " recovery."); |
| return false; |
| } |
| |
| int reducerCount = getConfig().getInt(MRJobConfig.NUM_REDUCES, 0); |
| |
| // If a shuffle secret was not provided by the job client, one will be |
| // generated in this job attempt. However, that disables recovery if |
| // there are reducers as the shuffle secret would be job attempt specific. |
| boolean shuffleKeyValidForRecovery = |
| TokenCache.getShuffleSecretKey(jobCredentials) != null; |
| if (reducerCount > 0 && !shuffleKeyValidForRecovery) { |
| LOG.info("Not attempting to recover. The shuffle key is invalid for " + |
| "recovery."); |
| return false; |
| } |
| |
| // If the intermediate data is encrypted, recovering the job requires the |
| // access to the key. Until the encryption key is persisted, we should |
| // avoid attempts to recover. |
| boolean spillEncrypted = CryptoUtils.isEncryptedSpillEnabled(getConfig()); |
| if (reducerCount > 0 && spillEncrypted) { |
| LOG.info("Not attempting to recover. Intermediate spill encryption" + |
| " is enabled."); |
| return false; |
| } |
| |
| return true; |
| } |
| |
| private void cleanUpPreviousJobOutput() { |
| // recovered application masters should not remove data from previous job |
| if (!isFirstAttempt() && !recovered()) { |
| JobContext jobContext = getJobContextFromConf(getConfig()); |
| try { |
| LOG.info("Starting to clean up previous job's temporary files"); |
| this.committer.abortJob(jobContext, State.FAILED); |
| LOG.info("Finished cleaning up previous job temporary files"); |
| } catch (FileNotFoundException e) { |
| LOG.info("Previous job temporary files do not exist, " + |
| "no clean up was necessary."); |
| } catch (Exception e) { |
| // the clean up of a previous attempt is not critical to the success |
| // of this job - only logging the error |
| LOG.error("Error while trying to clean up previous job's temporary " + |
| "files", e); |
| } |
| } |
| } |
| |
| private static FSDataInputStream getPreviousJobHistoryStream( |
| Configuration conf, ApplicationAttemptId appAttemptId) |
| throws IOException { |
| Path historyFile = JobHistoryUtils.getPreviousJobHistoryPath(conf, |
| appAttemptId); |
| LOG.info("Previous history file is at " + historyFile); |
| return historyFile.getFileSystem(conf).open(historyFile); |
| } |
| |
| private void parsePreviousJobHistory() throws IOException { |
| FSDataInputStream in = getPreviousJobHistoryStream(getConfig(), |
| appAttemptID); |
| JobHistoryParser parser = new JobHistoryParser(in); |
| JobInfo jobInfo = parser.parse(); |
| Exception parseException = parser.getParseException(); |
| if (parseException != null) { |
| LOG.info("Got an error parsing job-history file" + |
| ", ignoring incomplete events.", parseException); |
| } |
| Map<org.apache.hadoop.mapreduce.TaskID, TaskInfo> taskInfos = jobInfo |
| .getAllTasks(); |
| for (TaskInfo taskInfo : taskInfos.values()) { |
| if (TaskState.SUCCEEDED.toString().equals(taskInfo.getTaskStatus())) { |
| Iterator<Entry<TaskAttemptID, TaskAttemptInfo>> taskAttemptIterator = |
| taskInfo.getAllTaskAttempts().entrySet().iterator(); |
| while (taskAttemptIterator.hasNext()) { |
| Map.Entry<TaskAttemptID, TaskAttemptInfo> currentEntry = taskAttemptIterator.next(); |
| if (!jobInfo.getAllCompletedTaskAttempts().containsKey(currentEntry.getKey())) { |
| taskAttemptIterator.remove(); |
| } |
| } |
| completedTasksFromPreviousRun |
| .put(TypeConverter.toYarn(taskInfo.getTaskId()), taskInfo); |
| LOG.info("Read from history task " |
| + TypeConverter.toYarn(taskInfo.getTaskId())); |
| } |
| } |
| LOG.info("Read completed tasks from history " |
| + completedTasksFromPreviousRun.size()); |
| recoveredJobStartTime = jobInfo.getLaunchTime(); |
| |
| // recover AMInfos |
| List<JobHistoryParser.AMInfo> jhAmInfoList = jobInfo.getAMInfos(); |
| if (jhAmInfoList != null) { |
| for (JobHistoryParser.AMInfo jhAmInfo : jhAmInfoList) { |
| AMInfo amInfo = MRBuilderUtils.newAMInfo(jhAmInfo.getAppAttemptId(), |
| jhAmInfo.getStartTime(), jhAmInfo.getContainerId(), |
| jhAmInfo.getNodeManagerHost(), jhAmInfo.getNodeManagerPort(), |
| jhAmInfo.getNodeManagerHttpPort()); |
| amInfos.add(amInfo); |
| } |
| } |
| } |
| |
| private List<AMInfo> readJustAMInfos() { |
| List<AMInfo> amInfos = new ArrayList<AMInfo>(); |
| FSDataInputStream inputStream = null; |
| try { |
| inputStream = getPreviousJobHistoryStream(getConfig(), appAttemptID); |
| EventReader jobHistoryEventReader = new EventReader(inputStream); |
| |
| // All AMInfos are contiguous. Track when the first AMStartedEvent |
| // appears. |
| boolean amStartedEventsBegan = false; |
| |
| HistoryEvent event; |
| while ((event = jobHistoryEventReader.getNextEvent()) != null) { |
| if (event.getEventType() == EventType.AM_STARTED) { |
| if (!amStartedEventsBegan) { |
| // First AMStartedEvent. |
| amStartedEventsBegan = true; |
| } |
| AMStartedEvent amStartedEvent = (AMStartedEvent) event; |
| amInfos.add(MRBuilderUtils.newAMInfo( |
| amStartedEvent.getAppAttemptId(), amStartedEvent.getStartTime(), |
| amStartedEvent.getContainerId(), |
| StringInterner.weakIntern(amStartedEvent.getNodeManagerHost()), |
| amStartedEvent.getNodeManagerPort(), |
| amStartedEvent.getNodeManagerHttpPort())); |
| } else if (amStartedEventsBegan) { |
| // This means AMStartedEvents began and this event is a |
| // non-AMStarted event. |
| // No need to continue reading all the other events. |
| break; |
| } |
| } |
| } catch (IOException e) { |
| LOG.warn("Could not parse the old history file. " |
| + "Will not have old AMinfos ", e); |
| } finally { |
| if (inputStream != null) { |
| IOUtils.closeQuietly(inputStream); |
| } |
| } |
| return amInfos; |
| } |
| |
| public boolean recovered() { |
| return recoveredJobStartTime > 0; |
| } |
| |
| /** |
| * This can be overridden to instantiate multiple jobs and create a |
| * workflow. |
| * |
| * TODO: Rework the design to actually support this. Currently much of the |
| * job stuff has been moved to init() above to support uberization (MR-1220). |
| * In a typical workflow, one presumably would want to uberize only a subset |
| * of the jobs (the "small" ones), which is awkward with the current design. |
| */ |
| @SuppressWarnings("unchecked") |
| protected void startJobs() { |
| /** create a job-start event to get this ball rolling */ |
| JobEvent startJobEvent = new JobStartEvent(job.getID(), |
| recoveredJobStartTime); |
| /** send the job-start event. this triggers the job execution. */ |
| dispatcher.getEventHandler().handle(startJobEvent); |
| } |
| |
| private class JobEventDispatcher implements EventHandler<JobEvent> { |
| @SuppressWarnings("unchecked") |
| @Override |
| public void handle(JobEvent event) { |
| ((EventHandler<JobEvent>)context.getJob(event.getJobId())).handle(event); |
| } |
| } |
| |
| private class TaskEventDispatcher implements EventHandler<TaskEvent> { |
| @SuppressWarnings("unchecked") |
| @Override |
| public void handle(TaskEvent event) { |
| Task task = context.getJob(event.getTaskID().getJobId()).getTask( |
| event.getTaskID()); |
| ((EventHandler<TaskEvent>)task).handle(event); |
| } |
| } |
| |
| private class TaskAttemptEventDispatcher |
| implements EventHandler<TaskAttemptEvent> { |
| @SuppressWarnings("unchecked") |
| @Override |
| public void handle(TaskAttemptEvent event) { |
| Job job = context.getJob(event.getTaskAttemptID().getTaskId().getJobId()); |
| Task task = job.getTask(event.getTaskAttemptID().getTaskId()); |
| TaskAttempt attempt = task.getAttempt(event.getTaskAttemptID()); |
| ((EventHandler<TaskAttemptEvent>) attempt).handle(event); |
| } |
| } |
| |
| private class SpeculatorEventDispatcher implements |
| EventHandler<SpeculatorEvent> { |
| private final Configuration conf; |
| private volatile boolean disabled; |
| public SpeculatorEventDispatcher(Configuration config) { |
| this.conf = config; |
| } |
| @Override |
| public void handle(final SpeculatorEvent event) { |
| if (disabled) { |
| return; |
| } |
| |
| TaskId tId = event.getTaskID(); |
| TaskType tType = null; |
| /* event's TaskId will be null if the event type is JOB_CREATE or |
| * ATTEMPT_STATUS_UPDATE |
| */ |
| if (tId != null) { |
| tType = tId.getTaskType(); |
| } |
| boolean shouldMapSpec = |
| conf.getBoolean(MRJobConfig.MAP_SPECULATIVE, false); |
| boolean shouldReduceSpec = |
| conf.getBoolean(MRJobConfig.REDUCE_SPECULATIVE, false); |
| |
| /* The point of the following is to allow the MAP and REDUCE speculative |
| * config values to be independent: |
| * IF spec-exec is turned on for maps AND the task is a map task |
| * OR IF spec-exec is turned on for reduces AND the task is a reduce task |
| * THEN call the speculator to handle the event. |
| */ |
| if ( (shouldMapSpec && (tType == null || tType == TaskType.MAP)) |
| || (shouldReduceSpec && (tType == null || tType == TaskType.REDUCE))) { |
| // Speculator IS enabled, direct the event to there. |
| callWithJobClassLoader(conf, new Action<Void>() { |
| public Void call(Configuration conf) { |
| speculator.handle(event); |
| return null; |
| } |
| }); |
| } |
| } |
| |
| public void disableSpeculation() { |
| disabled = true; |
| } |
| |
| } |
| |
| /** |
| * Eats events that are not needed in some error cases. |
| */ |
| private static class NoopEventHandler implements EventHandler<Event> { |
| |
| @Override |
| public void handle(Event event) { |
| //Empty |
| } |
| } |
| |
| private static void validateInputParam(String value, String param) |
| throws IOException { |
| if (value == null) { |
| String msg = param + " is null"; |
| LOG.error(msg); |
| throw new IOException(msg); |
| } |
| } |
| |
| public static void main(String[] args) { |
| try { |
| mainStarted = true; |
| Thread.setDefaultUncaughtExceptionHandler(new YarnUncaughtExceptionHandler()); |
| String containerIdStr = |
| System.getenv(Environment.CONTAINER_ID.name()); |
| String nodeHostString = System.getenv(Environment.NM_HOST.name()); |
| String nodePortString = System.getenv(Environment.NM_PORT.name()); |
| String nodeHttpPortString = |
| System.getenv(Environment.NM_HTTP_PORT.name()); |
| String appSubmitTimeStr = |
| System.getenv(ApplicationConstants.APP_SUBMIT_TIME_ENV); |
| |
| validateInputParam(containerIdStr, |
| Environment.CONTAINER_ID.name()); |
| validateInputParam(nodeHostString, Environment.NM_HOST.name()); |
| validateInputParam(nodePortString, Environment.NM_PORT.name()); |
| validateInputParam(nodeHttpPortString, |
| Environment.NM_HTTP_PORT.name()); |
| validateInputParam(appSubmitTimeStr, |
| ApplicationConstants.APP_SUBMIT_TIME_ENV); |
| |
| ContainerId containerId = ContainerId.fromString(containerIdStr); |
| ApplicationAttemptId applicationAttemptId = |
| containerId.getApplicationAttemptId(); |
| if (applicationAttemptId != null) { |
| CallerContext.setCurrent(new CallerContext.Builder( |
| "mr_appmaster_" + applicationAttemptId.toString()).build()); |
| } |
| long appSubmitTime = Long.parseLong(appSubmitTimeStr); |
| |
| |
| MRAppMaster appMaster = |
| new MRAppMaster(applicationAttemptId, containerId, nodeHostString, |
| Integer.parseInt(nodePortString), |
| Integer.parseInt(nodeHttpPortString), appSubmitTime); |
| ShutdownHookManager.get().addShutdownHook( |
| new MRAppMasterShutdownHook(appMaster), SHUTDOWN_HOOK_PRIORITY); |
| JobConf conf = new JobConf(new YarnConfiguration()); |
| conf.addResource(new Path(MRJobConfig.JOB_CONF_FILE)); |
| |
| MRWebAppUtil.initialize(conf); |
| // log the system properties |
| String systemPropsToLog = MRApps.getSystemPropertiesToLog(conf); |
| if (systemPropsToLog != null) { |
| LOG.info(systemPropsToLog); |
| } |
| |
| String jobUserName = System |
| .getenv(ApplicationConstants.Environment.USER.name()); |
| conf.set(MRJobConfig.USER_NAME, jobUserName); |
| initAndStartAppMaster(appMaster, conf, jobUserName); |
| } catch (Throwable t) { |
| LOG.error("Error starting MRAppMaster", t); |
| ExitUtil.terminate(1, t); |
| } |
| } |
| |
| // The shutdown hook that runs when a signal is received AND during normal |
| // close of the JVM. |
| static class MRAppMasterShutdownHook implements Runnable { |
| MRAppMaster appMaster; |
| MRAppMasterShutdownHook(MRAppMaster appMaster) { |
| this.appMaster = appMaster; |
| } |
| public void run() { |
| LOG.info("MRAppMaster received a signal. Signaling RMCommunicator and " |
| + "JobHistoryEventHandler."); |
| |
| // Notify the JHEH and RMCommunicator that a SIGTERM has been received so |
| // that they don't take too long in shutting down |
| if(appMaster.containerAllocator instanceof ContainerAllocatorRouter) { |
| ((ContainerAllocatorRouter) appMaster.containerAllocator) |
| .setSignalled(true); |
| } |
| appMaster.notifyIsLastAMRetry(appMaster.isLastAMRetry); |
| appMaster.stop(); |
| } |
| } |
| |
| public void notifyIsLastAMRetry(boolean isLastAMRetry){ |
| if(containerAllocator instanceof ContainerAllocatorRouter) { |
| LOG.info("Notify RMCommunicator isAMLastRetry: " + isLastAMRetry); |
| ((ContainerAllocatorRouter) containerAllocator) |
| .setShouldUnregister(isLastAMRetry); |
| } |
| if(jobHistoryEventHandler != null) { |
| LOG.info("Notify JHEH isAMLastRetry: " + isLastAMRetry); |
| jobHistoryEventHandler.setForcejobCompletion(isLastAMRetry); |
| } |
| } |
| |
| protected static void initAndStartAppMaster(final MRAppMaster appMaster, |
| final JobConf conf, String jobUserName) throws IOException, |
| InterruptedException { |
| UserGroupInformation.setConfiguration(conf); |
| // MAPREDUCE-6565: need to set configuration for SecurityUtil. |
| SecurityUtil.setConfiguration(conf); |
| // Security framework already loaded the tokens into current UGI, just use |
| // them |
| Credentials credentials = |
| UserGroupInformation.getCurrentUser().getCredentials(); |
| LOG.info("Executing with tokens: {}", credentials.getAllTokens()); |
| |
| UserGroupInformation appMasterUgi = UserGroupInformation |
| .createRemoteUser(jobUserName); |
| appMasterUgi.addCredentials(credentials); |
| |
| // Now remove the AM->RM token so tasks don't have it |
| Iterator<Token<?>> iter = credentials.getAllTokens().iterator(); |
| while (iter.hasNext()) { |
| Token<?> token = iter.next(); |
| if (token.getKind().equals(AMRMTokenIdentifier.KIND_NAME)) { |
| iter.remove(); |
| } |
| } |
| conf.getCredentials().addAll(credentials); |
| appMasterUgi.doAs(new PrivilegedExceptionAction<Object>() { |
| @Override |
| public Object run() throws Exception { |
| appMaster.init(conf); |
| appMaster.start(); |
| if(appMaster.errorHappenedShutDown) { |
| throw new IOException("Was asked to shut down."); |
| } |
| return null; |
| } |
| }); |
| } |
| |
| /** |
| * Creates a job classloader based on the configuration if the job classloader |
| * is enabled. It is a no-op if the job classloader is not enabled. |
| */ |
| private void createJobClassLoader(Configuration conf) throws IOException { |
| jobClassLoader = MRApps.createJobClassLoader(conf); |
| } |
| |
| /** |
| * Executes the given action with the job classloader set as the configuration |
| * classloader as well as the thread context class loader if the job |
| * classloader is enabled. After the call, the original classloader is |
| * restored. |
| * |
| * If the job classloader is enabled and the code needs to load user-supplied |
| * classes via configuration or thread context classloader, this method should |
| * be used in order to load them. |
| * |
| * @param conf the configuration on which the classloader will be set |
| * @param action the callable action to be executed |
| */ |
| <T> T callWithJobClassLoader(Configuration conf, Action<T> action) { |
| // if the job classloader is enabled, we may need it to load the (custom) |
| // classes; we make the job classloader available and unset it once it is |
| // done |
| ClassLoader currentClassLoader = conf.getClassLoader(); |
| boolean setJobClassLoader = |
| jobClassLoader != null && currentClassLoader != jobClassLoader; |
| if (setJobClassLoader) { |
| MRApps.setClassLoader(jobClassLoader, conf); |
| } |
| try { |
| return action.call(conf); |
| } finally { |
| if (setJobClassLoader) { |
| // restore the original classloader |
| MRApps.setClassLoader(currentClassLoader, conf); |
| } |
| } |
| } |
| |
| /** |
| * Executes the given action that can throw a checked exception with the job |
| * classloader set as the configuration classloader as well as the thread |
| * context class loader if the job classloader is enabled. After the call, the |
| * original classloader is restored. |
| * |
| * If the job classloader is enabled and the code needs to load user-supplied |
| * classes via configuration or thread context classloader, this method should |
| * be used in order to load them. |
| * |
| * @param conf the configuration on which the classloader will be set |
| * @param action the callable action to be executed |
| * @throws IOException if the underlying action throws an IOException |
| * @throws YarnRuntimeException if the underlying action throws an exception |
| * other than an IOException |
| */ |
| <T> T callWithJobClassLoader(Configuration conf, ExceptionAction<T> action) |
| throws IOException { |
| // if the job classloader is enabled, we may need it to load the (custom) |
| // classes; we make the job classloader available and unset it once it is |
| // done |
| ClassLoader currentClassLoader = conf.getClassLoader(); |
| boolean setJobClassLoader = |
| jobClassLoader != null && currentClassLoader != jobClassLoader; |
| if (setJobClassLoader) { |
| MRApps.setClassLoader(jobClassLoader, conf); |
| } |
| try { |
| return action.call(conf); |
| } catch (IOException e) { |
| throw e; |
| } catch (YarnRuntimeException e) { |
| throw e; |
| } catch (Exception e) { |
| // wrap it with a YarnRuntimeException |
| throw new YarnRuntimeException(e); |
| } finally { |
| if (setJobClassLoader) { |
| // restore the original classloader |
| MRApps.setClassLoader(currentClassLoader, conf); |
| } |
| } |
| } |
| |
| /** |
| * Action to be wrapped with setting and unsetting the job classloader |
| */ |
| private static interface Action<T> { |
| T call(Configuration conf); |
| } |
| |
| private static interface ExceptionAction<T> { |
| T call(Configuration conf) throws Exception; |
| } |
| |
| @Override |
| protected void serviceStop() throws Exception { |
| super.serviceStop(); |
| } |
| |
| public ClientService getClientService() { |
| return clientService; |
| } |
| } |