blob: 1534354b3bfa76c6e2d058e38bc736d7a22c3043 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.server;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.yarn.api.protocolrecords.AllocateRequest;
import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.exceptions.YarnException;
import org.apache.hadoop.yarn.server.utils.YarnServerSecurityUtils;
import org.apache.hadoop.yarn.util.AsyncCallback;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
/**
* Extends Thread and provides an implementation that is used for processing the
* AM heart beat request asynchronously and sending back the response using the
* callback method registered with the system.
*/
public class AMHeartbeatRequestHandler extends Thread {
public static final Logger LOG =
LoggerFactory.getLogger(AMHeartbeatRequestHandler.class);
// Indication flag for the thread to keep running
private volatile boolean keepRunning;
// For unit test draining
private volatile boolean isThreadWaiting;
private Configuration conf;
private ApplicationId applicationId;
private BlockingQueue<AsyncAllocateRequestInfo> requestQueue;
private AMRMClientRelayer rmProxyRelayer;
private UserGroupInformation userUgi;
private int lastResponseId;
public AMHeartbeatRequestHandler(Configuration conf,
ApplicationId applicationId, AMRMClientRelayer rmProxyRelayer) {
super("AMHeartbeatRequestHandler Heartbeat Handler Thread");
this.setUncaughtExceptionHandler(
new HeartBeatThreadUncaughtExceptionHandler());
this.keepRunning = true;
this.isThreadWaiting = false;
this.conf = conf;
this.applicationId = applicationId;
this.requestQueue = new LinkedBlockingQueue<>();
this.rmProxyRelayer = rmProxyRelayer;
resetLastResponseId();
}
/**
* Shutdown the thread.
*/
public void shutdown() {
this.keepRunning = false;
this.interrupt();
}
@Override
public void run() {
while (keepRunning) {
AsyncAllocateRequestInfo requestInfo;
try {
this.isThreadWaiting = true;
requestInfo = this.requestQueue.take();
this.isThreadWaiting = false;
if (requestInfo == null) {
throw new YarnException(
"Null requestInfo taken from request queue");
}
if (!this.keepRunning) {
break;
}
// change the response id before forwarding the allocate request as we
// could have different values for each UAM
AllocateRequest request = requestInfo.getRequest();
if (request == null) {
throw new YarnException("Null allocateRequest from requestInfo");
}
if (LOG.isDebugEnabled()) {
LOG.debug("Sending Heartbeat to RM. AskList:"
+ ((request.getAskList() == null) ? " empty"
: request.getAskList().size()));
}
request.setResponseId(lastResponseId);
AllocateResponse response = rmProxyRelayer.allocate(request);
if (response == null) {
throw new YarnException("Null allocateResponse from allocate");
}
lastResponseId = response.getResponseId();
// update token if RM has reissued/renewed
if (response.getAMRMToken() != null) {
LOG.debug("Received new AMRMToken");
YarnServerSecurityUtils.updateAMRMToken(response.getAMRMToken(),
userUgi, conf);
}
if (LOG.isDebugEnabled()) {
LOG.debug("Received Heartbeat reply from RM. Allocated Containers:"
+ ((response.getAllocatedContainers() == null) ? " empty"
: response.getAllocatedContainers().size()));
}
if (requestInfo.getCallback() == null) {
throw new YarnException("Null callback from requestInfo");
}
requestInfo.getCallback().callback(response);
} catch (InterruptedException ex) {
if (LOG.isDebugEnabled()) {
LOG.debug("Interrupted while waiting for queue", ex);
}
} catch (Throwable ex) {
LOG.warn(
"Error occurred while processing heart beat for " + applicationId,
ex);
}
}
LOG.info("AMHeartbeatRequestHandler thread for {} is exiting",
applicationId);
}
/**
* Reset the lastResponseId to zero.
*/
public void resetLastResponseId() {
this.lastResponseId = 0;
}
/**
* Set the UGI for RM connection.
*/
public void setUGI(UserGroupInformation ugi) {
this.userUgi = ugi;
}
/**
* Sends the specified heart beat request to the resource manager and invokes
* the callback asynchronously with the response.
*
* @param request the allocate request
* @param callback the callback method for the request
* @throws YarnException if registerAM is not called yet
*/
public void allocateAsync(AllocateRequest request,
AsyncCallback<AllocateResponse> callback) throws YarnException {
try {
this.requestQueue.put(new AsyncAllocateRequestInfo(request, callback));
} catch (InterruptedException ex) {
// Should not happen as we have MAX_INT queue length
LOG.debug("Interrupted while waiting to put on response queue", ex);
}
}
@VisibleForTesting
public void drainHeartbeatThread() {
while (!this.isThreadWaiting || this.requestQueue.size() > 0) {
try {
Thread.sleep(10);
} catch (InterruptedException e) {
}
}
}
@VisibleForTesting
public int getRequestQueueSize() {
return this.requestQueue.size();
}
/**
* Data structure that encapsulates AllocateRequest and AsyncCallback
* instance.
*/
public static class AsyncAllocateRequestInfo {
private AllocateRequest request;
private AsyncCallback<AllocateResponse> callback;
public AsyncAllocateRequestInfo(AllocateRequest request,
AsyncCallback<AllocateResponse> callback) {
Preconditions.checkArgument(request != null,
"AllocateRequest cannot be null");
Preconditions.checkArgument(callback != null, "Callback cannot be null");
this.request = request;
this.callback = callback;
}
public AsyncCallback<AllocateResponse> getCallback() {
return this.callback;
}
public AllocateRequest getRequest() {
return this.request;
}
}
/**
* Uncaught exception handler for the background heartbeat thread.
*/
public class HeartBeatThreadUncaughtExceptionHandler
implements UncaughtExceptionHandler {
@Override
public void uncaughtException(Thread t, Throwable e) {
LOG.error("Heartbeat thread {} for application {} crashed!", t.getName(),
applicationId, e);
}
}
}