blob: 707cf1b352d46cb28c132d0ef9f32cd53d5be556 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.server.resourcemanager;
import java.io.IOException;
import java.io.InputStream;
import java.net.InetSocketAddress;
import java.net.UnknownHostException;
import java.util.*;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.classification.InterfaceAudience.Private;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.CommonConfigurationKeysPublic;
import org.apache.hadoop.ipc.Server;
import org.apache.hadoop.security.SaslRpcServer;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.security.authorize.PolicyProvider;
import org.apache.hadoop.security.token.Token;
import org.apache.hadoop.security.token.TokenIdentifier;
import org.apache.hadoop.service.AbstractService;
import org.apache.hadoop.util.StringUtils;
import org.apache.hadoop.yarn.api.ApplicationMasterProtocol;
import org.apache.hadoop.yarn.api.protocolrecords.AllocateRequest;
import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse;
import org.apache.hadoop.yarn.api.protocolrecords.FinishApplicationMasterRequest;
import org.apache.hadoop.yarn.api.protocolrecords.FinishApplicationMasterResponse;
import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterRequest;
import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterResponse;
import org.apache.hadoop.yarn.api.records.AMCommand;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.Container;
import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.api.records.NMToken;
import org.apache.hadoop.yarn.api.records.NodeReport;
import org.apache.hadoop.yarn.api.records.PreemptionContainer;
import org.apache.hadoop.yarn.api.records.PreemptionContract;
import org.apache.hadoop.yarn.api.records.PreemptionMessage;
import org.apache.hadoop.yarn.api.records.PreemptionResourceRequest;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.api.records.ResourceBlacklistRequest;
import org.apache.hadoop.yarn.api.records.ResourceRequest;
import org.apache.hadoop.yarn.api.records.StrictPreemptionContract;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.exceptions.ApplicationMasterNotRegisteredException;
import org.apache.hadoop.yarn.exceptions.InvalidApplicationMasterRequestException;
import org.apache.hadoop.yarn.exceptions.InvalidContainerReleaseException;
import org.apache.hadoop.yarn.exceptions.InvalidResourceBlacklistRequestException;
import org.apache.hadoop.yarn.exceptions.InvalidResourceRequestException;
import org.apache.hadoop.yarn.exceptions.YarnException;
import org.apache.hadoop.yarn.factories.RecordFactory;
import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
import org.apache.hadoop.yarn.ipc.RPCUtil;
import org.apache.hadoop.yarn.ipc.YarnRPC;
import org.apache.hadoop.yarn.security.AMRMTokenIdentifier;
import org.apache.hadoop.yarn.server.resourcemanager.RMAuditLogger.AuditConstants;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.AMLivelinessMonitor;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptImpl;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.event.RMAppAttemptRegistrationEvent;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.event.RMAppAttemptStatusupdateEvent;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.event.RMAppAttemptUnregistrationEvent;
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.AbstractYarnScheduler;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.Allocation;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerNodeReport;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.YarnScheduler;
import org.apache.hadoop.yarn.server.resourcemanager.security.authorize.RMPolicyProvider;
import org.apache.hadoop.yarn.server.security.MasterKeyData;
import org.apache.hadoop.yarn.server.utils.BuilderUtils;
import com.google.common.annotations.VisibleForTesting;
@SuppressWarnings("unchecked")
@Private
public class ApplicationMasterService extends AbstractService implements
ApplicationMasterProtocol {
private static final Log LOG = LogFactory.getLog(ApplicationMasterService.class);
private final AMLivelinessMonitor amLivelinessMonitor;
private YarnScheduler rScheduler;
private InetSocketAddress bindAddress;
private Server server;
private final RecordFactory recordFactory =
RecordFactoryProvider.getRecordFactory(null);
private final ConcurrentMap<ApplicationAttemptId, AllocateResponseLock> responseMap =
new ConcurrentHashMap<ApplicationAttemptId, AllocateResponseLock>();
private final AllocateResponse resync =
recordFactory.newRecordInstance(AllocateResponse.class);
private final AllocateResponse shutdown =
recordFactory.newRecordInstance(AllocateResponse.class);
private final RMContext rmContext;
public ApplicationMasterService(RMContext rmContext, YarnScheduler scheduler) {
super(ApplicationMasterService.class.getName());
this.amLivelinessMonitor = rmContext.getAMLivelinessMonitor();
this.rScheduler = scheduler;
this.shutdown.setAMCommand(AMCommand.AM_SHUTDOWN);
this.resync.setAMCommand(AMCommand.AM_RESYNC);
this.rmContext = rmContext;
}
@Override
protected void serviceStart() throws Exception {
Configuration conf = getConfig();
YarnRPC rpc = YarnRPC.create(conf);
InetSocketAddress masterServiceAddress = conf.getSocketAddr(
YarnConfiguration.RM_BIND_HOST,
YarnConfiguration.RM_SCHEDULER_ADDRESS,
YarnConfiguration.DEFAULT_RM_SCHEDULER_ADDRESS,
YarnConfiguration.DEFAULT_RM_SCHEDULER_PORT);
Configuration serverConf = conf;
// If the auth is not-simple, enforce it to be token-based.
serverConf = new Configuration(conf);
serverConf.set(
CommonConfigurationKeysPublic.HADOOP_SECURITY_AUTHENTICATION,
SaslRpcServer.AuthMethod.TOKEN.toString());
this.server =
rpc.getServer(ApplicationMasterProtocol.class, this, masterServiceAddress,
serverConf, this.rmContext.getAMRMTokenSecretManager(),
serverConf.getInt(YarnConfiguration.RM_SCHEDULER_CLIENT_THREAD_COUNT,
YarnConfiguration.DEFAULT_RM_SCHEDULER_CLIENT_THREAD_COUNT));
// Enable service authorization?
if (conf.getBoolean(
CommonConfigurationKeysPublic.HADOOP_SECURITY_AUTHORIZATION,
false)) {
InputStream inputStream =
this.rmContext.getConfigurationProvider()
.getConfigurationInputStream(conf,
YarnConfiguration.HADOOP_POLICY_CONFIGURATION_FILE);
if (inputStream != null) {
conf.addResource(inputStream);
}
refreshServiceAcls(conf, RMPolicyProvider.getInstance());
}
this.server.start();
this.bindAddress =
conf.updateConnectAddr(YarnConfiguration.RM_BIND_HOST,
YarnConfiguration.RM_SCHEDULER_ADDRESS,
YarnConfiguration.DEFAULT_RM_SCHEDULER_ADDRESS,
server.getListenerAddress());
super.serviceStart();
}
@Private
public InetSocketAddress getBindAddress() {
return this.bindAddress;
}
// Obtain the needed AMRMTokenIdentifier from the remote-UGI. RPC layer
// currently sets only the required id, but iterate through anyways just to be
// sure.
private AMRMTokenIdentifier selectAMRMTokenIdentifier(
UserGroupInformation remoteUgi) throws IOException {
AMRMTokenIdentifier result = null;
Set<TokenIdentifier> tokenIds = remoteUgi.getTokenIdentifiers();
for (TokenIdentifier tokenId : tokenIds) {
if (tokenId instanceof AMRMTokenIdentifier) {
result = (AMRMTokenIdentifier) tokenId;
break;
}
}
return result;
}
private AMRMTokenIdentifier authorizeRequest()
throws YarnException {
UserGroupInformation remoteUgi;
try {
remoteUgi = UserGroupInformation.getCurrentUser();
} catch (IOException e) {
String msg =
"Cannot obtain the user-name for authorizing ApplicationMaster. "
+ "Got exception: " + StringUtils.stringifyException(e);
LOG.warn(msg);
throw RPCUtil.getRemoteException(msg);
}
boolean tokenFound = false;
String message = "";
AMRMTokenIdentifier appTokenIdentifier = null;
try {
appTokenIdentifier = selectAMRMTokenIdentifier(remoteUgi);
if (appTokenIdentifier == null) {
tokenFound = false;
message = "No AMRMToken found for user " + remoteUgi.getUserName();
} else {
tokenFound = true;
}
} catch (IOException e) {
tokenFound = false;
message =
"Got exception while looking for AMRMToken for user "
+ remoteUgi.getUserName();
}
if (!tokenFound) {
LOG.warn(message);
throw RPCUtil.getRemoteException(message);
}
return appTokenIdentifier;
}
@Override
public RegisterApplicationMasterResponse registerApplicationMaster(
RegisterApplicationMasterRequest request) throws YarnException,
IOException {
AMRMTokenIdentifier amrmTokenIdentifier = authorizeRequest();
ApplicationAttemptId applicationAttemptId =
amrmTokenIdentifier.getApplicationAttemptId();
ApplicationId appID = applicationAttemptId.getApplicationId();
AllocateResponseLock lock = responseMap.get(applicationAttemptId);
if (lock == null) {
RMAuditLogger.logFailure(this.rmContext.getRMApps().get(appID).getUser(),
AuditConstants.REGISTER_AM, "Application doesn't exist in cache "
+ applicationAttemptId, "ApplicationMasterService",
"Error in registering application master", appID,
applicationAttemptId);
throwApplicationDoesNotExistInCacheException(applicationAttemptId);
}
// Allow only one thread in AM to do registerApp at a time.
synchronized (lock) {
AllocateResponse lastResponse = lock.getAllocateResponse();
if (hasApplicationMasterRegistered(applicationAttemptId)) {
String message =
"Application Master is already registered : "
+ applicationAttemptId.getApplicationId();
LOG.warn(message);
RMAuditLogger.logFailure(
this.rmContext.getRMApps()
.get(applicationAttemptId.getApplicationId()).getUser(),
AuditConstants.REGISTER_AM, "", "ApplicationMasterService", message,
applicationAttemptId.getApplicationId(), applicationAttemptId);
throw new InvalidApplicationMasterRequestException(message);
}
this.amLivelinessMonitor.receivedPing(applicationAttemptId);
RMApp app = this.rmContext.getRMApps().get(appID);
// Setting the response id to 0 to identify if the
// application master is register for the respective attemptid
lastResponse.setResponseId(0);
lock.setAllocateResponse(lastResponse);
LOG.info("AM registration " + applicationAttemptId);
this.rmContext
.getDispatcher()
.getEventHandler()
.handle(
new RMAppAttemptRegistrationEvent(applicationAttemptId, request
.getHost(), request.getRpcPort(), request.getTrackingUrl()));
RMAuditLogger.logSuccess(app.getUser(), AuditConstants.REGISTER_AM,
"ApplicationMasterService", appID, applicationAttemptId);
// Pick up min/max resource from scheduler...
RegisterApplicationMasterResponse response = recordFactory
.newRecordInstance(RegisterApplicationMasterResponse.class);
response.setMaximumResourceCapability(rScheduler
.getMaximumResourceCapability());
response.setApplicationACLs(app.getRMAppAttempt(applicationAttemptId)
.getSubmissionContext().getAMContainerSpec().getApplicationACLs());
response.setQueue(app.getQueue());
if (UserGroupInformation.isSecurityEnabled()) {
LOG.info("Setting client token master key");
response.setClientToAMTokenMasterKey(java.nio.ByteBuffer.wrap(rmContext
.getClientToAMTokenSecretManager()
.getMasterKey(applicationAttemptId).getEncoded()));
}
// For work-preserving AM restart, retrieve previous attempts' containers
// and corresponding NM tokens.
List<Container> transferredContainers =
((AbstractYarnScheduler) rScheduler)
.getTransferredContainers(applicationAttemptId);
if (!transferredContainers.isEmpty()) {
response.setContainersFromPreviousAttempts(transferredContainers);
List<NMToken> nmTokens = new ArrayList<NMToken>();
for (Container container : transferredContainers) {
try {
NMToken token = rmContext.getNMTokenSecretManager()
.createAndGetNMToken(app.getUser(), applicationAttemptId,
container);
if (null != token) {
nmTokens.add(token);
}
} catch (IllegalArgumentException e) {
// if it's a DNS issue, throw UnknowHostException directly and that
// will be automatically retried by RMProxy in RPC layer.
if (e.getCause() instanceof UnknownHostException) {
throw (UnknownHostException) e.getCause();
}
}
}
response.setNMTokensFromPreviousAttempts(nmTokens);
LOG.info("Application " + appID + " retrieved "
+ transferredContainers.size() + " containers from previous"
+ " attempts and " + nmTokens.size() + " NM tokens.");
}
response.setSchedulerResourceTypes(rScheduler
.getSchedulingResourceTypes());
return response;
}
}
@Override
public FinishApplicationMasterResponse finishApplicationMaster(
FinishApplicationMasterRequest request) throws YarnException,
IOException {
ApplicationAttemptId applicationAttemptId =
authorizeRequest().getApplicationAttemptId();
AllocateResponseLock lock = responseMap.get(applicationAttemptId);
if (lock == null) {
throwApplicationDoesNotExistInCacheException(applicationAttemptId);
}
// Allow only one thread in AM to do finishApp at a time.
synchronized (lock) {
if (!hasApplicationMasterRegistered(applicationAttemptId)) {
String message =
"Application Master is trying to unregister before registering for: "
+ applicationAttemptId.getApplicationId();
LOG.error(message);
RMAuditLogger.logFailure(
this.rmContext.getRMApps()
.get(applicationAttemptId.getApplicationId()).getUser(),
AuditConstants.UNREGISTER_AM, "", "ApplicationMasterService",
message, applicationAttemptId.getApplicationId(),
applicationAttemptId);
throw new ApplicationMasterNotRegisteredException(message);
}
this.amLivelinessMonitor.receivedPing(applicationAttemptId);
RMApp rmApp =
rmContext.getRMApps().get(applicationAttemptId.getApplicationId());
if (rmApp.isAppFinalStateStored()) {
return FinishApplicationMasterResponse.newInstance(true);
}
rmContext.getDispatcher().getEventHandler().handle(
new RMAppAttemptUnregistrationEvent(applicationAttemptId, request
.getTrackingUrl(), request.getFinalApplicationStatus(), request
.getDiagnostics()));
// For UnmanagedAMs, return true so they don't retry
return FinishApplicationMasterResponse.newInstance(
rmApp.getApplicationSubmissionContext().getUnmanagedAM());
}
}
private void throwApplicationDoesNotExistInCacheException(
ApplicationAttemptId appAttemptId)
throws InvalidApplicationMasterRequestException {
String message = "Application doesn't exist in cache "
+ appAttemptId;
LOG.error(message);
throw new InvalidApplicationMasterRequestException(message);
}
/**
* @param appAttemptId
* @return true if application is registered for the respective attemptid
*/
public boolean hasApplicationMasterRegistered(
ApplicationAttemptId appAttemptId) {
boolean hasApplicationMasterRegistered = false;
AllocateResponseLock lastResponse = responseMap.get(appAttemptId);
if (lastResponse != null) {
synchronized (lastResponse) {
if (lastResponse.getAllocateResponse() != null
&& lastResponse.getAllocateResponse().getResponseId() >= 0) {
hasApplicationMasterRegistered = true;
}
}
}
return hasApplicationMasterRegistered;
}
@Override
public AllocateResponse allocate(AllocateRequest request)
throws YarnException, IOException {
AMRMTokenIdentifier amrmTokenIdentifier = authorizeRequest();
ApplicationAttemptId appAttemptId =
amrmTokenIdentifier.getApplicationAttemptId();
this.amLivelinessMonitor.receivedPing(appAttemptId);
/* check if its in cache */
AllocateResponseLock lock = responseMap.get(appAttemptId);
if (lock == null) {
LOG.error("AppAttemptId doesnt exist in cache " + appAttemptId);
return shutdown;
}
synchronized (lock) {
AllocateResponse lastResponse = lock.getAllocateResponse();
if (!hasApplicationMasterRegistered(appAttemptId)) {
String message =
"Application Master is not registered for known application: "
+ appAttemptId.getApplicationId()
+ ". Let AM resync.";
LOG.info(message);
RMAuditLogger.logFailure(
this.rmContext.getRMApps().get(appAttemptId.getApplicationId())
.getUser(), AuditConstants.REGISTER_AM, "",
"ApplicationMasterService", message,
appAttemptId.getApplicationId(),
appAttemptId);
return resync;
}
if ((request.getResponseId() + 1) == lastResponse.getResponseId()) {
/* old heartbeat */
return lastResponse;
} else if (request.getResponseId() + 1 < lastResponse.getResponseId()) {
LOG.error("Invalid responseid from appAttemptId " + appAttemptId);
// Oh damn! Sending reboot isn't enough. RM state is corrupted. TODO:
// Reboot is not useful since after AM reboots, it will send register
// and
// get an exception. Might as well throw an exception here.
return resync;
}
//filter illegal progress values
float filteredProgress = request.getProgress();
if (Float.isNaN(filteredProgress) || filteredProgress == Float.NEGATIVE_INFINITY
|| filteredProgress < 0) {
request.setProgress(0);
} else if (filteredProgress > 1 || filteredProgress == Float.POSITIVE_INFINITY) {
request.setProgress(1);
}
// Send the status update to the appAttempt.
this.rmContext.getDispatcher().getEventHandler().handle(
new RMAppAttemptStatusupdateEvent(appAttemptId, request
.getProgress()));
List<ResourceRequest> ask = request.getAskList();
List<ContainerId> release = request.getReleaseList();
ResourceBlacklistRequest blacklistRequest =
request.getResourceBlacklistRequest();
List<String> blacklistAdditions =
(blacklistRequest != null) ?
blacklistRequest.getBlacklistAdditions() : Collections.EMPTY_LIST;
List<String> blacklistRemovals =
(blacklistRequest != null) ?
blacklistRequest.getBlacklistRemovals() : Collections.EMPTY_LIST;
// sanity check
try {
RMServerUtils.validateResourceRequests(ask,
rScheduler.getMaximumResourceCapability());
} catch (InvalidResourceRequestException e) {
LOG.warn("Invalid resource ask by application " + appAttemptId, e);
throw e;
}
try {
RMServerUtils.validateBlacklistRequest(blacklistRequest);
} catch (InvalidResourceBlacklistRequestException e) {
LOG.warn("Invalid blacklist request by application " + appAttemptId, e);
throw e;
}
RMApp app =
this.rmContext.getRMApps().get(appAttemptId.getApplicationId());
// In the case of work-preserving AM restart, it's possible for the
// AM to release containers from the earlier attempt.
if (!app.getApplicationSubmissionContext()
.getKeepContainersAcrossApplicationAttempts()) {
try {
RMServerUtils.validateContainerReleaseRequest(release, appAttemptId);
} catch (InvalidContainerReleaseException e) {
LOG.warn("Invalid container release by application " + appAttemptId, e);
throw e;
}
}
// Send new requests to appAttempt.
Allocation allocation =
this.rScheduler.allocate(appAttemptId, ask, release,
blacklistAdditions, blacklistRemovals);
if (!blacklistAdditions.isEmpty() || !blacklistRemovals.isEmpty()) {
LOG.info("blacklist are updated in Scheduler." +
"blacklistAdditions: " + blacklistAdditions + ", " +
"blacklistRemovals: " + blacklistRemovals);
}
RMAppAttempt appAttempt = app.getRMAppAttempt(appAttemptId);
AllocateResponse allocateResponse =
recordFactory.newRecordInstance(AllocateResponse.class);
if (!allocation.getContainers().isEmpty()) {
allocateResponse.setNMTokens(allocation.getNMTokens());
}
// update the response with the deltas of node status changes
List<RMNode> updatedNodes = new ArrayList<RMNode>();
if(app.pullRMNodeUpdates(updatedNodes) > 0) {
List<NodeReport> updatedNodeReports = new ArrayList<NodeReport>();
for(RMNode rmNode: updatedNodes) {
SchedulerNodeReport schedulerNodeReport =
rScheduler.getNodeReport(rmNode.getNodeID());
Resource used = BuilderUtils.newResource(0, 0);
int numContainers = 0;
if (schedulerNodeReport != null) {
used = schedulerNodeReport.getUsedResource();
numContainers = schedulerNodeReport.getNumContainers();
}
NodeReport report = BuilderUtils.newNodeReport(rmNode.getNodeID(),
rmNode.getState(),
rmNode.getHttpAddress(), rmNode.getRackName(), used,
rmNode.getTotalCapability(), numContainers,
rmNode.getHealthReport(),
rmNode.getLastHealthReportTime());
updatedNodeReports.add(report);
}
allocateResponse.setUpdatedNodes(updatedNodeReports);
}
allocateResponse.setAllocatedContainers(allocation.getContainers());
allocateResponse.setCompletedContainersStatuses(appAttempt
.pullJustFinishedContainers());
allocateResponse.setResponseId(lastResponse.getResponseId() + 1);
allocateResponse.setAvailableResources(allocation.getResourceLimit());
allocateResponse.setNumClusterNodes(this.rScheduler.getNumClusterNodes());
// add preemption to the allocateResponse message (if any)
allocateResponse
.setPreemptionMessage(generatePreemptionMessage(allocation));
// update AMRMToken if the token is rolled-up
MasterKeyData nextMasterKey =
this.rmContext.getAMRMTokenSecretManager().getNextMasterKeyData();
if (nextMasterKey != null
&& nextMasterKey.getMasterKey().getKeyId() != amrmTokenIdentifier
.getKeyId()) {
Token<AMRMTokenIdentifier> amrmToken =
rmContext.getAMRMTokenSecretManager().createAndGetAMRMToken(
appAttemptId);
((RMAppAttemptImpl)appAttempt).setAMRMToken(amrmToken);
allocateResponse.setAMRMToken(org.apache.hadoop.yarn.api.records.Token
.newInstance(amrmToken.getIdentifier(), amrmToken.getKind()
.toString(), amrmToken.getPassword(), amrmToken.getService()
.toString()));
LOG.info("The AMRMToken has been rolled-over. Send new AMRMToken back"
+ " to application: " + appAttemptId.getApplicationId());
}
/*
* As we are updating the response inside the lock object so we don't
* need to worry about unregister call occurring in between (which
* removes the lock object).
*/
lock.setAllocateResponse(allocateResponse);
return allocateResponse;
}
}
private PreemptionMessage generatePreemptionMessage(Allocation allocation){
PreemptionMessage pMsg = null;
// assemble strict preemption request
if (allocation.getStrictContainerPreemptions() != null) {
pMsg =
recordFactory.newRecordInstance(PreemptionMessage.class);
StrictPreemptionContract pStrict =
recordFactory.newRecordInstance(StrictPreemptionContract.class);
Set<PreemptionContainer> pCont = new HashSet<PreemptionContainer>();
for (ContainerId cId : allocation.getStrictContainerPreemptions()) {
PreemptionContainer pc =
recordFactory.newRecordInstance(PreemptionContainer.class);
pc.setId(cId);
pCont.add(pc);
}
pStrict.setContainers(pCont);
pMsg.setStrictContract(pStrict);
}
// assemble negotiable preemption request
if (allocation.getResourcePreemptions() != null &&
allocation.getResourcePreemptions().size() > 0 &&
allocation.getContainerPreemptions() != null &&
allocation.getContainerPreemptions().size() > 0) {
if (pMsg == null) {
pMsg =
recordFactory.newRecordInstance(PreemptionMessage.class);
}
PreemptionContract contract =
recordFactory.newRecordInstance(PreemptionContract.class);
Set<PreemptionContainer> pCont = new HashSet<PreemptionContainer>();
for (ContainerId cId : allocation.getContainerPreemptions()) {
PreemptionContainer pc =
recordFactory.newRecordInstance(PreemptionContainer.class);
pc.setId(cId);
pCont.add(pc);
}
List<PreemptionResourceRequest> pRes = new ArrayList<PreemptionResourceRequest>();
for (ResourceRequest crr : allocation.getResourcePreemptions()) {
PreemptionResourceRequest prr =
recordFactory.newRecordInstance(PreemptionResourceRequest.class);
prr.setResourceRequest(crr);
pRes.add(prr);
}
contract.setContainers(pCont);
contract.setResourceRequest(pRes);
pMsg.setContract(contract);
}
return pMsg;
}
public void registerAppAttempt(ApplicationAttemptId attemptId) {
AllocateResponse response =
recordFactory.newRecordInstance(AllocateResponse.class);
// set response id to -1 before application master for the following
// attemptID get registered
response.setResponseId(-1);
LOG.info("Registering app attempt : " + attemptId);
responseMap.put(attemptId, new AllocateResponseLock(response));
rmContext.getNMTokenSecretManager().registerApplicationAttempt(attemptId);
}
public void unregisterAttempt(ApplicationAttemptId attemptId) {
LOG.info("Unregistering app attempt : " + attemptId);
responseMap.remove(attemptId);
rmContext.getNMTokenSecretManager().unregisterApplicationAttempt(attemptId);
}
public void refreshServiceAcls(Configuration configuration,
PolicyProvider policyProvider) {
this.server.refreshServiceAclWithLoadedConfiguration(configuration,
policyProvider);
}
@Override
protected void serviceStop() throws Exception {
if (this.server != null) {
this.server.stop();
}
super.serviceStop();
}
public static class AllocateResponseLock {
private AllocateResponse response;
public AllocateResponseLock(AllocateResponse response) {
this.response = response;
}
public synchronized AllocateResponse getAllocateResponse() {
return response;
}
public synchronized void setAllocateResponse(AllocateResponse response) {
this.response = response;
}
}
@VisibleForTesting
public Server getServer() {
return this.server;
}
}