| /** |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| |
| package org.apache.hadoop.yarn.server.uam; |
| |
| import java.io.IOException; |
| import java.lang.Thread.UncaughtExceptionHandler; |
| import java.util.EnumSet; |
| import java.util.Set; |
| import java.util.concurrent.BlockingQueue; |
| import java.util.concurrent.LinkedBlockingQueue; |
| |
| import org.apache.commons.lang3.StringUtils; |
| import org.apache.hadoop.classification.InterfaceAudience.Public; |
| import org.apache.hadoop.classification.InterfaceStability.Unstable; |
| import org.apache.hadoop.conf.Configuration; |
| import org.apache.hadoop.io.Text; |
| import org.apache.hadoop.security.UserGroupInformation; |
| import org.apache.hadoop.security.token.Token; |
| import org.apache.hadoop.yarn.api.ApplicationClientProtocol; |
| import org.apache.hadoop.yarn.api.ApplicationMasterProtocol; |
| import org.apache.hadoop.yarn.api.protocolrecords.AllocateRequest; |
| import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse; |
| import org.apache.hadoop.yarn.api.protocolrecords.FinishApplicationMasterRequest; |
| import org.apache.hadoop.yarn.api.protocolrecords.FinishApplicationMasterResponse; |
| import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationAttemptReportRequest; |
| import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationReportRequest; |
| import org.apache.hadoop.yarn.api.protocolrecords.KillApplicationRequest; |
| import org.apache.hadoop.yarn.api.protocolrecords.KillApplicationResponse; |
| import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterRequest; |
| import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterResponse; |
| import org.apache.hadoop.yarn.api.protocolrecords.SubmitApplicationRequest; |
| import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; |
| import org.apache.hadoop.yarn.api.records.ApplicationAttemptReport; |
| import org.apache.hadoop.yarn.api.records.ApplicationId; |
| import org.apache.hadoop.yarn.api.records.ApplicationReport; |
| import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext; |
| import org.apache.hadoop.yarn.api.records.Container; |
| import org.apache.hadoop.yarn.api.records.ContainerLaunchContext; |
| import org.apache.hadoop.yarn.api.records.NMToken; |
| import org.apache.hadoop.yarn.api.records.Resource; |
| import org.apache.hadoop.yarn.api.records.YarnApplicationAttemptState; |
| import org.apache.hadoop.yarn.api.records.YarnApplicationState; |
| import org.apache.hadoop.yarn.client.AMRMClientUtils; |
| import org.apache.hadoop.yarn.conf.YarnConfiguration; |
| import org.apache.hadoop.yarn.exceptions.YarnException; |
| import org.apache.hadoop.yarn.exceptions.YarnRuntimeException; |
| import org.apache.hadoop.yarn.factories.RecordFactory; |
| import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider; |
| import org.apache.hadoop.yarn.security.AMRMTokenIdentifier; |
| import org.apache.hadoop.yarn.server.AMRMClientRelayer; |
| import org.apache.hadoop.yarn.server.utils.BuilderUtils; |
| import org.apache.hadoop.yarn.server.utils.YarnServerSecurityUtils; |
| import org.apache.hadoop.yarn.util.AsyncCallback; |
| import org.apache.hadoop.yarn.util.ConverterUtils; |
| import org.slf4j.Logger; |
| import org.slf4j.LoggerFactory; |
| |
| import com.google.common.annotations.VisibleForTesting; |
| import com.google.common.base.Preconditions; |
| |
| /** |
| * UnmanagedApplicationManager is used to register unmanaged application and |
| * negotiate for resources from resource managers. An unmanagedAM is an AM that |
| * is not launched and managed by the RM. Allocate calls are handled |
| * asynchronously using {@link AsyncCallback}. |
| */ |
| @Public |
| @Unstable |
| public class UnmanagedApplicationManager { |
| private static final Logger LOG = |
| LoggerFactory.getLogger(UnmanagedApplicationManager.class); |
| private static final long AM_STATE_WAIT_TIMEOUT_MS = 10000; |
| public static final String APP_NAME = "UnmanagedAM"; |
| private static final String DEFAULT_QUEUE_CONFIG = "uam.default.queue.name"; |
| |
| private BlockingQueue<AsyncAllocateRequestInfo> requestQueue; |
| private AMRequestHandlerThread handlerThread; |
| private AMRMClientRelayer rmProxyRelayer; |
| private ApplicationId applicationId; |
| private String submitter; |
| private String appNameSuffix; |
| private Configuration conf; |
| private String queueName; |
| private UserGroupInformation userUgi; |
| private RegisterApplicationMasterRequest registerRequest; |
| private int lastResponseId; |
| private ApplicationClientProtocol rmClient; |
| private long asyncApiPollIntervalMillis; |
| private RecordFactory recordFactory; |
| private boolean keepContainersAcrossApplicationAttempts; |
| |
| /* |
| * This flag is used as an indication that this method launchUAM/reAttachUAM |
| * is called (and perhaps blocked in initializeUnmanagedAM below due to RM |
| * connection/failover issue and not finished yet). Set the flag before |
| * calling the blocking call to RM. |
| */ |
| private boolean connectionInitiated; |
| |
| /** |
| * Constructor. |
| * |
| * @param conf configuration |
| * @param appId application Id to use for this UAM |
| * @param queueName the queue of the UAM |
| * @param submitter user name of the app |
| * @param appNameSuffix the app name suffix to use |
| * @param keepContainersAcrossApplicationAttempts keep container flag for UAM |
| * recovery. See {@link ApplicationSubmissionContext |
| * #setKeepContainersAcrossApplicationAttempts(boolean)} |
| */ |
| public UnmanagedApplicationManager(Configuration conf, ApplicationId appId, |
| String queueName, String submitter, String appNameSuffix, |
| boolean keepContainersAcrossApplicationAttempts) { |
| Preconditions.checkNotNull(conf, "Configuration cannot be null"); |
| Preconditions.checkNotNull(appId, "ApplicationId cannot be null"); |
| Preconditions.checkNotNull(submitter, "App submitter cannot be null"); |
| |
| this.conf = conf; |
| this.applicationId = appId; |
| this.queueName = queueName; |
| this.submitter = submitter; |
| this.appNameSuffix = appNameSuffix; |
| this.handlerThread = new AMRequestHandlerThread(); |
| this.requestQueue = new LinkedBlockingQueue<>(); |
| this.rmProxyRelayer = null; |
| this.connectionInitiated = false; |
| this.registerRequest = null; |
| this.recordFactory = RecordFactoryProvider.getRecordFactory(conf); |
| this.asyncApiPollIntervalMillis = conf.getLong( |
| YarnConfiguration. |
| YARN_CLIENT_APPLICATION_CLIENT_PROTOCOL_POLL_INTERVAL_MS, |
| YarnConfiguration. |
| DEFAULT_YARN_CLIENT_APPLICATION_CLIENT_PROTOCOL_POLL_INTERVAL_MS); |
| this.keepContainersAcrossApplicationAttempts = |
| keepContainersAcrossApplicationAttempts; |
| } |
| |
| /** |
| * Launch a new UAM in the resource manager. |
| * |
| * @return identifier uam identifier |
| * @throws YarnException if fails |
| * @throws IOException if fails |
| */ |
| public Token<AMRMTokenIdentifier> launchUAM() |
| throws YarnException, IOException { |
| this.connectionInitiated = true; |
| |
| // Blocking call to RM |
| Token<AMRMTokenIdentifier> amrmToken = |
| initializeUnmanagedAM(this.applicationId); |
| |
| // Creates the UAM connection |
| createUAMProxy(amrmToken); |
| return amrmToken; |
| } |
| |
| /** |
| * Re-attach to an existing UAM in the resource manager. |
| * |
| * @param amrmToken the UAM token |
| * @throws IOException if re-attach fails |
| * @throws YarnException if re-attach fails |
| */ |
| public void reAttachUAM(Token<AMRMTokenIdentifier> amrmToken) |
| throws IOException, YarnException { |
| this.connectionInitiated = true; |
| |
| // Creates the UAM connection |
| createUAMProxy(amrmToken); |
| } |
| |
| protected void createUAMProxy(Token<AMRMTokenIdentifier> amrmToken) |
| throws IOException { |
| this.userUgi = UserGroupInformation.createProxyUser( |
| this.applicationId.toString(), UserGroupInformation.getCurrentUser()); |
| this.rmProxyRelayer = |
| new AMRMClientRelayer(createRMProxy(ApplicationMasterProtocol.class, |
| this.conf, this.userUgi, amrmToken)); |
| } |
| |
| /** |
| * Registers this {@link UnmanagedApplicationManager} with the resource |
| * manager. |
| * |
| * @param request RegisterApplicationMasterRequest |
| * @return register response |
| * @throws YarnException if register fails |
| * @throws IOException if register fails |
| */ |
| public RegisterApplicationMasterResponse registerApplicationMaster( |
| RegisterApplicationMasterRequest request) |
| throws YarnException, IOException { |
| // Save the register request for re-register later |
| this.registerRequest = request; |
| |
| LOG.info("Registering the Unmanaged application master {}", |
| this.applicationId); |
| RegisterApplicationMasterResponse response = |
| this.rmProxyRelayer.registerApplicationMaster(this.registerRequest); |
| this.lastResponseId = 0; |
| |
| for (Container container : response.getContainersFromPreviousAttempts()) { |
| LOG.debug("RegisterUAM returned existing running container " |
| + container.getId()); |
| } |
| for (NMToken nmToken : response.getNMTokensFromPreviousAttempts()) { |
| LOG.debug("RegisterUAM returned existing NM token for node " |
| + nmToken.getNodeId()); |
| } |
| |
| // Only when register succeed that we start the heartbeat thread |
| this.handlerThread.setUncaughtExceptionHandler( |
| new HeartBeatThreadUncaughtExceptionHandler()); |
| this.handlerThread.setDaemon(true); |
| this.handlerThread.start(); |
| |
| this.lastResponseId = 0; |
| return response; |
| } |
| |
| /** |
| * Unregisters from the resource manager and stops the request handler thread. |
| * |
| * @param request the finishApplicationMaster request |
| * @return the response |
| * @throws YarnException if finishAM call fails |
| * @throws IOException if finishAM call fails |
| */ |
| public FinishApplicationMasterResponse finishApplicationMaster( |
| FinishApplicationMasterRequest request) |
| throws YarnException, IOException { |
| |
| this.handlerThread.shutdown(); |
| |
| if (this.rmProxyRelayer == null) { |
| if (this.connectionInitiated) { |
| // This is possible if the async launchUAM is still |
| // blocked and retrying. Return a dummy response in this case. |
| LOG.warn("Unmanaged AM still not successfully launched/registered yet." |
| + " Stopping the UAM heartbeat thread anyways."); |
| return FinishApplicationMasterResponse.newInstance(false); |
| } else { |
| throw new YarnException("finishApplicationMaster should not " |
| + "be called before createAndRegister"); |
| } |
| } |
| return this.rmProxyRelayer.finishApplicationMaster(request); |
| } |
| |
| /** |
| * Force kill the UAM. |
| * |
| * @return kill response |
| * @throws IOException if fails to create rmProxy |
| * @throws YarnException if force kill fails |
| */ |
| public KillApplicationResponse forceKillApplication() |
| throws IOException, YarnException { |
| KillApplicationRequest request = |
| KillApplicationRequest.newInstance(this.applicationId); |
| |
| this.handlerThread.shutdown(); |
| |
| if (this.rmClient == null) { |
| this.rmClient = createRMProxy(ApplicationClientProtocol.class, this.conf, |
| UserGroupInformation.createRemoteUser(this.submitter), null); |
| } |
| return this.rmClient.forceKillApplication(request); |
| } |
| |
| /** |
| * Sends the specified heart beat request to the resource manager and invokes |
| * the callback asynchronously with the response. |
| * |
| * @param request the allocate request |
| * @param callback the callback method for the request |
| * @throws YarnException if registerAM is not called yet |
| */ |
| public void allocateAsync(AllocateRequest request, |
| AsyncCallback<AllocateResponse> callback) throws YarnException { |
| try { |
| this.requestQueue.put(new AsyncAllocateRequestInfo(request, callback)); |
| } catch (InterruptedException ex) { |
| // Should not happen as we have MAX_INT queue length |
| LOG.debug("Interrupted while waiting to put on response queue", ex); |
| } |
| // Two possible cases why the UAM is not successfully registered yet: |
| // 1. launchUAM is not called at all. Should throw here. |
| // 2. launchUAM is called but hasn't successfully returned. |
| // |
| // In case 2, we have already save the allocate request above, so if the |
| // registration succeed later, no request is lost. |
| if (this.rmProxyRelayer == null) { |
| if (this.connectionInitiated) { |
| LOG.info("Unmanaged AM still not successfully launched/registered yet." |
| + " Saving the allocate request and send later."); |
| } else { |
| throw new YarnException( |
| "AllocateAsync should not be called before launchUAM"); |
| } |
| } |
| } |
| |
| /** |
| * Returns the application id of the UAM. |
| * |
| * @return application id of the UAM |
| */ |
| public ApplicationId getAppId() { |
| return this.applicationId; |
| } |
| |
| /** |
| * Returns the rmProxy relayer of this UAM. |
| * |
| * @return rmProxy relayer of the UAM |
| */ |
| public AMRMClientRelayer getAMRMClientRelayer() { |
| return this.rmProxyRelayer; |
| } |
| |
| /** |
| * Returns RM proxy for the specified protocol type. Unit test cases can |
| * override this method and return mock proxy instances. |
| * |
| * @param protocol protocal of the proxy |
| * @param config configuration |
| * @param user ugi for the proxy connection |
| * @param token token for the connection |
| * @param <T> type of the proxy |
| * @return the proxy instance |
| * @throws IOException if fails to create the proxy |
| */ |
| protected <T> T createRMProxy(Class<T> protocol, Configuration config, |
| UserGroupInformation user, Token<AMRMTokenIdentifier> token) |
| throws IOException { |
| return AMRMClientUtils.createRMProxy(config, protocol, user, token); |
| } |
| |
| /** |
| * Launch and initialize an unmanaged AM. First, it creates a new application |
| * on the RM and negotiates a new attempt id. Then it waits for the RM |
| * application attempt state to reach YarnApplicationAttemptState.LAUNCHED |
| * after which it returns the AM-RM token. |
| * |
| * @param appId application id |
| * @return the UAM token |
| * @throws IOException if initialize fails |
| * @throws YarnException if initialize fails |
| */ |
| protected Token<AMRMTokenIdentifier> initializeUnmanagedAM( |
| ApplicationId appId) throws IOException, YarnException { |
| try { |
| UserGroupInformation appSubmitter = |
| UserGroupInformation.createRemoteUser(this.submitter); |
| this.rmClient = createRMProxy(ApplicationClientProtocol.class, this.conf, |
| appSubmitter, null); |
| |
| // Submit the application |
| submitUnmanagedApp(appId); |
| |
| // Monitor the application attempt to wait for launch state |
| monitorCurrentAppAttempt(appId, |
| EnumSet.of(YarnApplicationState.ACCEPTED, |
| YarnApplicationState.RUNNING, YarnApplicationState.KILLED, |
| YarnApplicationState.FAILED, YarnApplicationState.FINISHED), |
| YarnApplicationAttemptState.LAUNCHED); |
| return getUAMToken(); |
| } finally { |
| this.rmClient = null; |
| } |
| } |
| |
| private void submitUnmanagedApp(ApplicationId appId) |
| throws YarnException, IOException { |
| SubmitApplicationRequest submitRequest = |
| this.recordFactory.newRecordInstance(SubmitApplicationRequest.class); |
| |
| ApplicationSubmissionContext context = this.recordFactory |
| .newRecordInstance(ApplicationSubmissionContext.class); |
| |
| context.setApplicationId(appId); |
| context.setApplicationName(APP_NAME + "-" + appNameSuffix); |
| if (StringUtils.isBlank(this.queueName)) { |
| context.setQueue(this.conf.get(DEFAULT_QUEUE_CONFIG, |
| YarnConfiguration.DEFAULT_QUEUE_NAME)); |
| } else { |
| context.setQueue(this.queueName); |
| } |
| |
| ContainerLaunchContext amContainer = |
| this.recordFactory.newRecordInstance(ContainerLaunchContext.class); |
| Resource resource = BuilderUtils.newResource(1024, 1); |
| context.setResource(resource); |
| context.setAMContainerSpec(amContainer); |
| submitRequest.setApplicationSubmissionContext(context); |
| |
| context.setUnmanagedAM(true); |
| context.setKeepContainersAcrossApplicationAttempts( |
| this.keepContainersAcrossApplicationAttempts); |
| |
| LOG.info("Submitting unmanaged application {}", appId); |
| this.rmClient.submitApplication(submitRequest); |
| } |
| |
| /** |
| * Monitor the submitted application and attempt until it reaches certain |
| * states. |
| * |
| * @param appId Application Id of application to be monitored |
| * @param appStates acceptable application state |
| * @param attemptState acceptable application attempt state |
| * @return the application report |
| * @throws YarnException if getApplicationReport fails |
| * @throws IOException if getApplicationReport fails |
| */ |
| private ApplicationAttemptReport monitorCurrentAppAttempt(ApplicationId appId, |
| Set<YarnApplicationState> appStates, |
| YarnApplicationAttemptState attemptState) |
| throws YarnException, IOException { |
| |
| long startTime = System.currentTimeMillis(); |
| ApplicationAttemptId appAttemptId = null; |
| while (true) { |
| if (appAttemptId == null) { |
| // Get application report for the appId we are interested in |
| ApplicationReport report = getApplicationReport(appId); |
| YarnApplicationState state = report.getYarnApplicationState(); |
| if (appStates.contains(state)) { |
| if (state != YarnApplicationState.ACCEPTED) { |
| throw new YarnRuntimeException( |
| "Received non-accepted application state: " + state + " for " |
| + appId + ". This is likely because this is not the first " |
| + "app attempt in home sub-cluster, and AMRMProxy HA " |
| + "(yarn.nodemanager.amrmproxy.ha.enable) is not enabled."); |
| } |
| appAttemptId = |
| getApplicationReport(appId).getCurrentApplicationAttemptId(); |
| } else { |
| LOG.info("Current application state of {} is {}, will retry later.", |
| appId, state); |
| } |
| } |
| |
| if (appAttemptId != null) { |
| GetApplicationAttemptReportRequest req = this.recordFactory |
| .newRecordInstance(GetApplicationAttemptReportRequest.class); |
| req.setApplicationAttemptId(appAttemptId); |
| ApplicationAttemptReport attemptReport = this.rmClient |
| .getApplicationAttemptReport(req).getApplicationAttemptReport(); |
| if (attemptState |
| .equals(attemptReport.getYarnApplicationAttemptState())) { |
| return attemptReport; |
| } |
| LOG.info("Current attempt state of " + appAttemptId + " is " |
| + attemptReport.getYarnApplicationAttemptState() |
| + ", waiting for current attempt to reach " + attemptState); |
| } |
| |
| try { |
| Thread.sleep(this.asyncApiPollIntervalMillis); |
| } catch (InterruptedException e) { |
| LOG.warn("Interrupted while waiting for current attempt of " + appId |
| + " to reach " + attemptState); |
| } |
| |
| if (System.currentTimeMillis() - startTime > AM_STATE_WAIT_TIMEOUT_MS) { |
| throw new RuntimeException("Timeout for waiting current attempt of " |
| + appId + " to reach " + attemptState); |
| } |
| } |
| } |
| |
| /** |
| * Gets the amrmToken of the unmanaged AM. |
| * |
| * @return the amrmToken of the unmanaged AM. |
| * @throws IOException if getApplicationReport fails |
| * @throws YarnException if getApplicationReport fails |
| */ |
| protected Token<AMRMTokenIdentifier> getUAMToken() |
| throws IOException, YarnException { |
| Token<AMRMTokenIdentifier> token = null; |
| org.apache.hadoop.yarn.api.records.Token amrmToken = |
| getApplicationReport(this.applicationId).getAMRMToken(); |
| if (amrmToken != null) { |
| token = ConverterUtils.convertFromYarn(amrmToken, (Text) null); |
| } else { |
| LOG.warn( |
| "AMRMToken not found in the application report for application: {}", |
| this.applicationId); |
| } |
| return token; |
| } |
| |
| private ApplicationReport getApplicationReport(ApplicationId appId) |
| throws YarnException, IOException { |
| GetApplicationReportRequest request = |
| this.recordFactory.newRecordInstance(GetApplicationReportRequest.class); |
| request.setApplicationId(appId); |
| return this.rmClient.getApplicationReport(request).getApplicationReport(); |
| } |
| |
| /** |
| * Data structure that encapsulates AllocateRequest and AsyncCallback |
| * instance. |
| */ |
| public static class AsyncAllocateRequestInfo { |
| private AllocateRequest request; |
| private AsyncCallback<AllocateResponse> callback; |
| |
| public AsyncAllocateRequestInfo(AllocateRequest request, |
| AsyncCallback<AllocateResponse> callback) { |
| Preconditions.checkArgument(request != null, |
| "AllocateRequest cannot be null"); |
| Preconditions.checkArgument(callback != null, "Callback cannot be null"); |
| |
| this.request = request; |
| this.callback = callback; |
| } |
| |
| public AsyncCallback<AllocateResponse> getCallback() { |
| return this.callback; |
| } |
| |
| public AllocateRequest getRequest() { |
| return this.request; |
| } |
| } |
| |
| @VisibleForTesting |
| public int getRequestQueueSize() { |
| return this.requestQueue.size(); |
| } |
| |
| /** |
| * Extends Thread and provides an implementation that is used for processing |
| * the AM heart beat request asynchronously and sending back the response |
| * using the callback method registered with the system. |
| */ |
| public class AMRequestHandlerThread extends Thread { |
| |
| // Indication flag for the thread to keep running |
| private volatile boolean keepRunning; |
| |
| public AMRequestHandlerThread() { |
| super("UnmanagedApplicationManager Heartbeat Handler Thread"); |
| this.keepRunning = true; |
| } |
| |
| /** |
| * Shutdown the thread. |
| */ |
| public void shutdown() { |
| this.keepRunning = false; |
| this.interrupt(); |
| } |
| |
| @Override |
| public void run() { |
| while (keepRunning) { |
| AsyncAllocateRequestInfo requestInfo; |
| try { |
| requestInfo = requestQueue.take(); |
| if (requestInfo == null) { |
| throw new YarnException( |
| "Null requestInfo taken from request queue"); |
| } |
| if (!keepRunning) { |
| break; |
| } |
| |
| // change the response id before forwarding the allocate request as we |
| // could have different values for each UAM |
| AllocateRequest request = requestInfo.getRequest(); |
| if (request == null) { |
| throw new YarnException("Null allocateRequest from requestInfo"); |
| } |
| if (LOG.isDebugEnabled()) { |
| LOG.debug("Sending Heartbeat to Unmanaged AM. AskList:" |
| + ((request.getAskList() == null) ? " empty" |
| : request.getAskList().size())); |
| } |
| |
| request.setResponseId(lastResponseId); |
| AllocateResponse response = rmProxyRelayer.allocate(request); |
| if (response == null) { |
| throw new YarnException("Null allocateResponse from allocate"); |
| } |
| |
| lastResponseId = response.getResponseId(); |
| // update token if RM has reissued/renewed |
| if (response.getAMRMToken() != null) { |
| LOG.debug("Received new AMRMToken"); |
| YarnServerSecurityUtils.updateAMRMToken(response.getAMRMToken(), |
| userUgi, conf); |
| } |
| |
| if (LOG.isDebugEnabled()) { |
| LOG.debug("Received Heartbeat reply from RM. Allocated Containers:" |
| + ((response.getAllocatedContainers() == null) ? " empty" |
| : response.getAllocatedContainers().size())); |
| } |
| |
| if (requestInfo.getCallback() == null) { |
| throw new YarnException("Null callback from requestInfo"); |
| } |
| requestInfo.getCallback().callback(response); |
| } catch (InterruptedException ex) { |
| if (LOG.isDebugEnabled()) { |
| LOG.debug("Interrupted while waiting for queue", ex); |
| } |
| } catch (IOException ex) { |
| LOG.warn("IO Error occurred while processing heart beat for " |
| + applicationId, ex); |
| } catch (Throwable ex) { |
| LOG.warn( |
| "Error occurred while processing heart beat for " + applicationId, |
| ex); |
| } |
| } |
| |
| LOG.info("UnmanagedApplicationManager has been stopped for {}. " |
| + "AMRequestHandlerThread thread is exiting", applicationId); |
| } |
| } |
| |
| /** |
| * Uncaught exception handler for the background heartbeat thread. |
| */ |
| protected class HeartBeatThreadUncaughtExceptionHandler |
| implements UncaughtExceptionHandler { |
| @Override |
| public void uncaughtException(Thread t, Throwable e) { |
| LOG.error("Heartbeat thread {} for application {} crashed!", |
| t.getName(), applicationId, e); |
| } |
| } |
| } |