blob: 2de40440246fe367604a9a2e1f413c64a1b92c28 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.server.resourcemanager;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.util.List;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.classification.InterfaceAudience.Private;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.CommonConfigurationKeysPublic;
import org.apache.hadoop.ipc.Server;
import org.apache.hadoop.net.NetUtils;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.security.authorize.PolicyProvider;
import org.apache.hadoop.util.StringUtils;
import org.apache.hadoop.yarn.api.AMRMProtocol;
import org.apache.hadoop.yarn.api.protocolrecords.AllocateRequest;
import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse;
import org.apache.hadoop.yarn.api.protocolrecords.FinishApplicationMasterRequest;
import org.apache.hadoop.yarn.api.protocolrecords.FinishApplicationMasterResponse;
import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterRequest;
import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterResponse;
import org.apache.hadoop.yarn.api.records.AMResponse;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.api.records.ResourceRequest;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.exceptions.YarnRemoteException;
import org.apache.hadoop.yarn.factories.RecordFactory;
import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
import org.apache.hadoop.yarn.ipc.RPCUtil;
import org.apache.hadoop.yarn.ipc.YarnRPC;
import org.apache.hadoop.yarn.security.ApplicationTokenSecretManager;
import org.apache.hadoop.yarn.server.resourcemanager.RMAuditLogger.AuditConstants;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.AMLivelinessMonitor;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.event.RMAppAttemptRegistrationEvent;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.event.RMAppAttemptStatusupdateEvent;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.event.RMAppAttemptUnregistrationEvent;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.Allocation;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.YarnScheduler;
import org.apache.hadoop.yarn.server.resourcemanager.security.authorize.RMPolicyProvider;
import org.apache.hadoop.yarn.service.AbstractService;
@Private
public class ApplicationMasterService extends AbstractService implements
AMRMProtocol {
private static final Log LOG = LogFactory.getLog(ApplicationMasterService.class);
private final AMLivelinessMonitor amLivelinessMonitor;
private YarnScheduler rScheduler;
private ApplicationTokenSecretManager appTokenManager;
private InetSocketAddress masterServiceAddress;
private Server server;
private final RecordFactory recordFactory = RecordFactoryProvider.getRecordFactory(null);
private final ConcurrentMap<ApplicationAttemptId, AMResponse> responseMap =
new ConcurrentHashMap<ApplicationAttemptId, AMResponse>();
private final AMResponse reboot = recordFactory.newRecordInstance(AMResponse.class);
private final RMContext rmContext;
public ApplicationMasterService(RMContext rmContext,
ApplicationTokenSecretManager appTokenManager, YarnScheduler scheduler) {
super(ApplicationMasterService.class.getName());
this.amLivelinessMonitor = rmContext.getAMLivelinessMonitor();
this.appTokenManager = appTokenManager;
this.rScheduler = scheduler;
this.reboot.setReboot(true);
// this.reboot.containers = new ArrayList<Container>();
this.rmContext = rmContext;
}
@Override
public void init(Configuration conf) {
String bindAddress =
conf.get(YarnConfiguration.RM_SCHEDULER_ADDRESS,
YarnConfiguration.DEFAULT_RM_SCHEDULER_ADDRESS);
masterServiceAddress = NetUtils.createSocketAddr(bindAddress,
YarnConfiguration.DEFAULT_RM_SCHEDULER_PORT,
YarnConfiguration.RM_SCHEDULER_ADDRESS);
super.init(conf);
}
@Override
public void start() {
Configuration conf = getConfig();
YarnRPC rpc = YarnRPC.create(conf);
this.server =
rpc.getServer(AMRMProtocol.class, this, masterServiceAddress,
conf, this.appTokenManager,
conf.getInt(YarnConfiguration.RM_SCHEDULER_CLIENT_THREAD_COUNT,
YarnConfiguration.DEFAULT_RM_SCHEDULER_CLIENT_THREAD_COUNT));
// Enable service authorization?
if (conf.getBoolean(
CommonConfigurationKeysPublic.HADOOP_SECURITY_AUTHORIZATION,
false)) {
refreshServiceAcls(conf, new RMPolicyProvider());
}
this.server.start();
super.start();
}
private void authorizeRequest(ApplicationAttemptId appAttemptID)
throws YarnRemoteException {
if (!UserGroupInformation.isSecurityEnabled()) {
return;
}
String appAttemptIDStr = appAttemptID.toString();
UserGroupInformation remoteUgi;
try {
remoteUgi = UserGroupInformation.getCurrentUser();
} catch (IOException e) {
String msg = "Cannot obtain the user-name for ApplicationAttemptID: "
+ appAttemptIDStr + ". Got exception: "
+ StringUtils.stringifyException(e);
LOG.warn(msg);
throw RPCUtil.getRemoteException(msg);
}
if (!remoteUgi.getUserName().equals(appAttemptIDStr)) {
String msg = "Unauthorized request from ApplicationMaster. "
+ "Expected ApplicationAttemptID: " + remoteUgi.getUserName()
+ " Found: " + appAttemptIDStr;
LOG.warn(msg);
throw RPCUtil.getRemoteException(msg);
}
}
@Override
public RegisterApplicationMasterResponse registerApplicationMaster(
RegisterApplicationMasterRequest request) throws YarnRemoteException {
ApplicationAttemptId applicationAttemptId = request
.getApplicationAttemptId();
authorizeRequest(applicationAttemptId);
ApplicationId appID = applicationAttemptId.getApplicationId();
AMResponse lastResponse = responseMap.get(applicationAttemptId);
if (lastResponse == null) {
String message = "Application doesn't exist in cache "
+ applicationAttemptId;
LOG.error(message);
RMAuditLogger.logFailure(this.rmContext.getRMApps().get(appID).getUser(),
AuditConstants.REGISTER_AM, message, "ApplicationMasterService",
"Error in registering application master", appID,
applicationAttemptId);
throw RPCUtil.getRemoteException(message);
}
// Allow only one thread in AM to do registerApp at a time.
synchronized (lastResponse) {
LOG.info("AM registration " + applicationAttemptId);
this.amLivelinessMonitor.receivedPing(applicationAttemptId);
this.rmContext.getDispatcher().getEventHandler().handle(
new RMAppAttemptRegistrationEvent(applicationAttemptId, request
.getHost(), request.getRpcPort(), request.getTrackingUrl()));
RMAuditLogger.logSuccess(this.rmContext.getRMApps().get(appID).getUser(),
AuditConstants.REGISTER_AM, "ApplicationMasterService", appID,
applicationAttemptId);
// Pick up min/max resource from scheduler...
RegisterApplicationMasterResponse response = recordFactory
.newRecordInstance(RegisterApplicationMasterResponse.class);
response.setMinimumResourceCapability(rScheduler
.getMinimumResourceCapability());
response.setMaximumResourceCapability(rScheduler
.getMaximumResourceCapability());
return response;
}
}
@Override
public FinishApplicationMasterResponse finishApplicationMaster(
FinishApplicationMasterRequest request) throws YarnRemoteException {
ApplicationAttemptId applicationAttemptId = request
.getApplicationAttemptId();
authorizeRequest(applicationAttemptId);
AMResponse lastResponse = responseMap.get(applicationAttemptId);
if (lastResponse == null) {
String message = "Application doesn't exist in cache "
+ applicationAttemptId;
LOG.error(message);
throw RPCUtil.getRemoteException(message);
}
// Allow only one thread in AM to do finishApp at a time.
synchronized (lastResponse) {
this.amLivelinessMonitor.receivedPing(applicationAttemptId);
rmContext.getDispatcher().getEventHandler().handle(
new RMAppAttemptUnregistrationEvent(applicationAttemptId, request
.getTrackingUrl(), request.getFinalApplicationStatus(), request
.getDiagnostics()));
FinishApplicationMasterResponse response = recordFactory
.newRecordInstance(FinishApplicationMasterResponse.class);
return response;
}
}
@Override
public AllocateResponse allocate(AllocateRequest request)
throws YarnRemoteException {
ApplicationAttemptId appAttemptId = request.getApplicationAttemptId();
authorizeRequest(appAttemptId);
this.amLivelinessMonitor.receivedPing(appAttemptId);
/* check if its in cache */
AllocateResponse allocateResponse = recordFactory
.newRecordInstance(AllocateResponse.class);
AMResponse lastResponse = responseMap.get(appAttemptId);
if (lastResponse == null) {
LOG.error("AppAttemptId doesnt exist in cache " + appAttemptId);
allocateResponse.setAMResponse(reboot);
return allocateResponse;
}
if ((request.getResponseId() + 1) == lastResponse.getResponseId()) {
/* old heartbeat */
allocateResponse.setAMResponse(lastResponse);
return allocateResponse;
} else if (request.getResponseId() + 1 < lastResponse.getResponseId()) {
LOG.error("Invalid responseid from appAttemptId " + appAttemptId);
// Oh damn! Sending reboot isn't enough. RM state is corrupted. TODO:
allocateResponse.setAMResponse(reboot);
return allocateResponse;
}
// Allow only one thread in AM to do heartbeat at a time.
synchronized (lastResponse) { // BUG TODO: Locking order is screwed.
// Send the status update to the appAttempt.
this.rmContext.getDispatcher().getEventHandler().handle(
new RMAppAttemptStatusupdateEvent(appAttemptId, request
.getProgress()));
List<ResourceRequest> ask = request.getAskList();
List<ContainerId> release = request.getReleaseList();
// Send new requests to appAttempt.
Allocation allocation =
this.rScheduler.allocate(appAttemptId, ask, release);
RMApp app = this.rmContext.getRMApps().get(appAttemptId.getApplicationId());
RMAppAttempt appAttempt = app.getRMAppAttempt(appAttemptId);
AMResponse response = recordFactory.newRecordInstance(AMResponse.class);
response.setAllocatedContainers(allocation.getContainers());
response.setCompletedContainersStatuses(appAttempt
.pullJustFinishedContainers());
response.setResponseId(lastResponse.getResponseId() + 1);
response.setAvailableResources(allocation.getResourceLimit());
responseMap.put(appAttemptId, response);
allocateResponse.setAMResponse(response);
allocateResponse.setNumClusterNodes(this.rScheduler.getNumClusterNodes());
return allocateResponse;
}
}
public void registerAppAttempt(ApplicationAttemptId attemptId) {
AMResponse response = recordFactory.newRecordInstance(AMResponse.class);
response.setResponseId(0);
LOG.info("Registering " + attemptId);
responseMap.put(attemptId, response);
}
public void unregisterAttempt(ApplicationAttemptId attemptId) {
AMResponse lastResponse = responseMap.get(attemptId);
if (lastResponse != null) {
synchronized (lastResponse) {
responseMap.remove(attemptId);
}
}
}
public void refreshServiceAcls(Configuration configuration,
PolicyProvider policyProvider) {
this.server.refreshServiceAcl(configuration, policyProvider);
}
@Override
public void stop() {
if (this.server != null) {
this.server.stop();
}
super.stop();
}
}