blob: d1e86de5dac06a7dc2f8edcab87c34fd433d473b [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.server.uam;
import java.io.IOException;
import java.util.HashSet;
import java.util.Map;
import java.util.Set;
import java.util.HashMap;
import java.util.Collections;
import java.util.concurrent.Callable;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutorCompletionService;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import org.apache.hadoop.classification.InterfaceAudience.Public;
import org.apache.hadoop.classification.InterfaceStability.Unstable;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.security.token.Token;
import org.apache.hadoop.service.AbstractService;
import org.apache.hadoop.yarn.api.ApplicationClientProtocol;
import org.apache.hadoop.yarn.api.protocolrecords.AllocateRequest;
import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse;
import org.apache.hadoop.yarn.api.protocolrecords.FinishApplicationMasterRequest;
import org.apache.hadoop.yarn.api.protocolrecords.FinishApplicationMasterResponse;
import org.apache.hadoop.yarn.api.protocolrecords.GetNewApplicationRequest;
import org.apache.hadoop.yarn.api.protocolrecords.GetNewApplicationResponse;
import org.apache.hadoop.yarn.api.protocolrecords.KillApplicationResponse;
import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterRequest;
import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterResponse;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext;
import org.apache.hadoop.yarn.client.AMRMClientUtils;
import org.apache.hadoop.yarn.exceptions.YarnException;
import org.apache.hadoop.yarn.security.AMRMTokenIdentifier;
import org.apache.hadoop.yarn.server.AMRMClientRelayer;
import org.apache.hadoop.yarn.util.AsyncCallback;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.hadoop.classification.VisibleForTesting;
/**
* A service that manages a pool of UAM managers in
* {@link UnmanagedApplicationManager}.
*/
@Public
@Unstable
public class UnmanagedAMPoolManager extends AbstractService {
public static final Logger LOG =
LoggerFactory.getLogger(UnmanagedAMPoolManager.class);
// Map from uamId to UAM instances
private Map<String, UnmanagedApplicationManager> unmanagedAppMasterMap;
private Map<String, ApplicationId> appIdMap;
private ExecutorService threadpool;
public UnmanagedAMPoolManager(ExecutorService threadpool) {
super(UnmanagedAMPoolManager.class.getName());
this.threadpool = threadpool;
}
@Override
protected void serviceStart() throws Exception {
if (this.threadpool == null) {
this.threadpool = Executors.newCachedThreadPool();
}
this.unmanagedAppMasterMap = new ConcurrentHashMap<>();
this.appIdMap = new ConcurrentHashMap<>();
super.serviceStart();
}
/**
* Normally we should finish all applications before stop. If there are still
* UAMs running, force kill all of them. Do parallel kill because of
* performance reasons.
*
* TODO: move waiting for the kill to finish into a separate thread, without
* blocking the serviceStop.
*/
@Override
protected void serviceStop() throws Exception {
ExecutorCompletionService<KillApplicationResponse> completionService =
new ExecutorCompletionService<>(this.threadpool);
if (this.unmanagedAppMasterMap.isEmpty()) {
return;
}
// Save a local copy of the key set so that it won't change with the map
Set<String> addressList =
new HashSet<>(this.unmanagedAppMasterMap.keySet());
LOG.warn("Abnormal shutdown of UAMPoolManager, still {} UAMs in map",
addressList.size());
for (final String uamId : addressList) {
completionService.submit(new Callable<KillApplicationResponse>() {
@Override
public KillApplicationResponse call() throws Exception {
try {
LOG.info("Force-killing UAM id " + uamId + " for application "
+ appIdMap.get(uamId));
return unmanagedAppMasterMap.remove(uamId).forceKillApplication();
} catch (Exception e) {
LOG.error("Failed to kill unmanaged application master", e);
return null;
}
}
});
}
for (int i = 0; i < addressList.size(); ++i) {
try {
Future<KillApplicationResponse> future = completionService.take();
future.get();
} catch (Exception e) {
LOG.error("Failed to kill unmanaged application master", e);
}
}
this.appIdMap.clear();
super.serviceStop();
}
/**
* Create a new UAM and register the application, without specifying uamId and
* appId. We will ask for an appId from RM and use it as the uamId.
*
* @param registerRequest RegisterApplicationMasterRequest
* @param conf configuration for this UAM
* @param queueName queue of the application
* @param submitter submitter name of the UAM
* @param appNameSuffix application name suffix for the UAM
* @param keepContainersAcrossApplicationAttempts keep container flag for UAM
* recovery.
* @param rmName name of the YarnRM
* @see ApplicationSubmissionContext
* #setKeepContainersAcrossApplicationAttempts(boolean)
* @return uamId for the UAM
* @throws YarnException if registerApplicationMaster fails
* @throws IOException if registerApplicationMaster fails
*/
public String createAndRegisterNewUAM(
RegisterApplicationMasterRequest registerRequest, Configuration conf,
String queueName, String submitter, String appNameSuffix,
boolean keepContainersAcrossApplicationAttempts, String rmName)
throws YarnException, IOException {
ApplicationId appId = null;
ApplicationClientProtocol rmClient;
try {
UserGroupInformation appSubmitter =
UserGroupInformation.createRemoteUser(submitter);
rmClient = AMRMClientUtils.createRMProxy(conf,
ApplicationClientProtocol.class, appSubmitter, null);
// Get a new appId from RM
GetNewApplicationResponse response =
rmClient.getNewApplication(GetNewApplicationRequest.newInstance());
if (response == null) {
throw new YarnException("getNewApplication got null response");
}
appId = response.getApplicationId();
LOG.info("Received new application ID {} from RM", appId);
} finally {
rmClient = null;
}
// Launch the UAM in RM
launchUAM(appId.toString(), conf, appId, queueName, submitter,
appNameSuffix, keepContainersAcrossApplicationAttempts, rmName);
// Register the UAM application
registerApplicationMaster(appId.toString(), registerRequest);
// Returns the appId as uamId
return appId.toString();
}
/**
* Launch a new UAM, using the provided uamId and appId.
*
* @param uamId uam Id
* @param conf configuration for this UAM
* @param appId application id for the UAM
* @param queueName queue of the application
* @param submitter submitter name of the UAM
* @param appNameSuffix application name suffix for the UAM
* @param keepContainersAcrossApplicationAttempts keep container flag for UAM
* recovery.
* @param rmName name of the YarnRM
* @see ApplicationSubmissionContext
* #setKeepContainersAcrossApplicationAttempts(boolean)
* @return UAM token
* @throws YarnException if fails
* @throws IOException if fails
*/
public Token<AMRMTokenIdentifier> launchUAM(String uamId, Configuration conf,
ApplicationId appId, String queueName, String submitter,
String appNameSuffix, boolean keepContainersAcrossApplicationAttempts,
String rmName) throws YarnException, IOException {
if (this.unmanagedAppMasterMap.containsKey(uamId)) {
throw new YarnException("UAM " + uamId + " already exists");
}
UnmanagedApplicationManager uam = createUAM(conf, appId, queueName,
submitter, appNameSuffix, keepContainersAcrossApplicationAttempts,
rmName);
// Put the UAM into map first before initializing it to avoid additional UAM
// for the same uamId being created concurrently
this.unmanagedAppMasterMap.put(uamId, uam);
Token<AMRMTokenIdentifier> amrmToken = null;
try {
LOG.info("Launching UAM id {} for application {}", uamId, appId);
amrmToken = uam.launchUAM();
} catch (Exception e) {
// Add the map earlier and remove here if register failed because we want
// to make sure there is only one uam instance per uamId at any given time
this.unmanagedAppMasterMap.remove(uamId);
throw e;
}
this.appIdMap.put(uamId, uam.getAppId());
return amrmToken;
}
/**
* Re-attach to an existing UAM, using the provided uamIdentifier.
*
* @param uamId uam Id
* @param conf configuration for this UAM
* @param appId application id for the UAM
* @param queueName queue of the application
* @param submitter submitter name of the UAM
* @param appNameSuffix application name suffix for the UAM
* @param uamToken UAM token
* @param rmName name of the YarnRM
* @throws YarnException if fails
* @throws IOException if fails
*/
public void reAttachUAM(String uamId, Configuration conf, ApplicationId appId,
String queueName, String submitter, String appNameSuffix,
Token<AMRMTokenIdentifier> uamToken, String rmName)
throws YarnException, IOException {
if (this.unmanagedAppMasterMap.containsKey(uamId)) {
throw new YarnException("UAM " + uamId + " already exists");
}
UnmanagedApplicationManager uam = createUAM(conf, appId, queueName,
submitter, appNameSuffix, true, rmName);
// Put the UAM into map first before initializing it to avoid additional UAM
// for the same uamId being created concurrently
this.unmanagedAppMasterMap.put(uamId, uam);
try {
LOG.info("Reattaching UAM id {} for application {}", uamId, appId);
uam.reAttachUAM(uamToken);
} catch (Exception e) {
// Add the map earlier and remove here if register failed because we want
// to make sure there is only one uam instance per uamId at any given time
this.unmanagedAppMasterMap.remove(uamId);
throw e;
}
this.appIdMap.put(uamId, uam.getAppId());
}
/**
* Creates the UAM instance. Pull out to make unit test easy.
*
* @param conf Configuration
* @param appId application id
* @param queueName queue of the application
* @param submitter submitter name of the application
* @param appNameSuffix application name suffix
* @param keepContainersAcrossApplicationAttempts keep container flag for UAM
* @param rmName name of the YarnRM
* @return the UAM instance
*/
@VisibleForTesting
protected UnmanagedApplicationManager createUAM(Configuration conf,
ApplicationId appId, String queueName, String submitter,
String appNameSuffix, boolean keepContainersAcrossApplicationAttempts,
String rmName) {
return new UnmanagedApplicationManager(conf, appId, queueName, submitter,
appNameSuffix, keepContainersAcrossApplicationAttempts, rmName);
}
/**
* Register application master for the UAM.
*
* @param uamId uam Id
* @param registerRequest RegisterApplicationMasterRequest
* @return register response
* @throws YarnException if register fails
* @throws IOException if register fails
*/
public RegisterApplicationMasterResponse registerApplicationMaster(
String uamId, RegisterApplicationMasterRequest registerRequest)
throws YarnException, IOException {
if (!this.unmanagedAppMasterMap.containsKey(uamId)) {
throw new YarnException("UAM " + uamId + " does not exist");
}
LOG.info("Registering UAM id {} for application {}", uamId,
this.appIdMap.get(uamId));
return this.unmanagedAppMasterMap.get(uamId)
.registerApplicationMaster(registerRequest);
}
/**
* AllocateAsync to an UAM.
*
* @param uamId uam Id
* @param request AllocateRequest
* @param callback callback for response
* @throws YarnException if allocate fails
* @throws IOException if allocate fails
*/
public void allocateAsync(String uamId, AllocateRequest request,
AsyncCallback<AllocateResponse> callback)
throws YarnException, IOException {
if (!this.unmanagedAppMasterMap.containsKey(uamId)) {
throw new YarnException("UAM " + uamId + " does not exist");
}
this.unmanagedAppMasterMap.get(uamId).allocateAsync(request, callback);
}
/**
* Finish an UAM/application.
*
* @param uamId uam Id
* @param request FinishApplicationMasterRequest
* @return FinishApplicationMasterResponse
* @throws YarnException if finishApplicationMaster call fails
* @throws IOException if finishApplicationMaster call fails
*/
public FinishApplicationMasterResponse finishApplicationMaster(String uamId,
FinishApplicationMasterRequest request)
throws YarnException, IOException {
if (!this.unmanagedAppMasterMap.containsKey(uamId)) {
throw new YarnException("UAM " + uamId + " does not exist");
}
LOG.info("Finishing UAM id {} for application {}", uamId,
this.appIdMap.get(uamId));
FinishApplicationMasterResponse response =
this.unmanagedAppMasterMap.get(uamId).finishApplicationMaster(request);
if (response.getIsUnregistered()) {
// Only remove the UAM when the unregister finished
this.unmanagedAppMasterMap.remove(uamId);
this.appIdMap.remove(uamId);
LOG.info("UAM id {} is unregistered", uamId);
}
return response;
}
/**
* Shutdown an UAM client without killing it in YarnRM.
*
* @param uamId uam Id
* @throws YarnException if fails
*/
public void shutDownConnections(String uamId)
throws YarnException {
if (!this.unmanagedAppMasterMap.containsKey(uamId)) {
throw new YarnException("UAM " + uamId + " does not exist");
}
LOG.info(
"Shutting down UAM id {} for application {} without killing the UAM",
uamId, this.appIdMap.get(uamId));
this.unmanagedAppMasterMap.remove(uamId).shutDownConnections();
}
/**
* Shutdown all UAM clients without killing them in YarnRM.
*
* @throws YarnException if fails
*/
public void shutDownConnections() throws YarnException {
for (String uamId : this.unmanagedAppMasterMap.keySet()) {
shutDownConnections(uamId);
}
}
/**
* Get the id of all running UAMs.
*
* @return uamId set
*/
public Set<String> getAllUAMIds() {
// Return a clone of the current id set for concurrency reasons, so that the
// returned map won't change with the actual map
return new HashSet<String>(this.unmanagedAppMasterMap.keySet());
}
/**
* Return whether an UAM exists.
*
* @param uamId uam Id
* @return UAM exists or not
*/
public boolean hasUAMId(String uamId) {
return this.unmanagedAppMasterMap.containsKey(uamId);
}
/**
* Return the rmProxy relayer of an UAM.
*
* @param uamId uam Id
* @return the rmProxy relayer
* @throws YarnException if fails
*/
public AMRMClientRelayer getAMRMClientRelayer(String uamId)
throws YarnException {
if (!this.unmanagedAppMasterMap.containsKey(uamId)) {
throw new YarnException("UAM " + uamId + " does not exist");
}
return this.unmanagedAppMasterMap.get(uamId).getAMRMClientRelayer();
}
@VisibleForTesting
public int getRequestQueueSize(String uamId) throws YarnException {
if (!this.unmanagedAppMasterMap.containsKey(uamId)) {
throw new YarnException("UAM " + uamId + " does not exist");
}
return this.unmanagedAppMasterMap.get(uamId).getRequestQueueSize();
}
@VisibleForTesting
public void drainUAMHeartbeats() {
for (UnmanagedApplicationManager uam : this.unmanagedAppMasterMap
.values()) {
uam.drainHeartbeatThread();
}
}
/**
* Complete FinishApplicationMaster interface calls in batches.
*
* @param request FinishApplicationMasterRequest
* @param appId application Id
* @return Returns the Map map,
* the key is subClusterId, the value is FinishApplicationMasterResponse
*/
public Map<String, FinishApplicationMasterResponse> batchFinishApplicationMaster(
FinishApplicationMasterRequest request, String appId) {
Map<String, FinishApplicationMasterResponse> responseMap = new HashMap<>();
Set<String> subClusterIds = this.unmanagedAppMasterMap.keySet();
if (subClusterIds != null && !subClusterIds.isEmpty()) {
ExecutorCompletionService<Map<String, FinishApplicationMasterResponse>> finishAppService =
new ExecutorCompletionService<>(this.threadpool);
LOG.info("Sending finish application request to {} sub-cluster RMs", subClusterIds.size());
for (final String subClusterId : subClusterIds) {
finishAppService.submit(() -> {
LOG.info("Sending finish application request to RM {}", subClusterId);
try {
FinishApplicationMasterResponse uamResponse =
finishApplicationMaster(subClusterId, request);
return Collections.singletonMap(subClusterId, uamResponse);
} catch (Throwable e) {
LOG.warn("Failed to finish unmanaged application master: " +
" RM address: {} ApplicationId: {}", subClusterId, appId, e);
return Collections.singletonMap(subClusterId, null);
}
});
}
for (int i = 0; i < subClusterIds.size(); ++i) {
try {
Future<Map<String, FinishApplicationMasterResponse>> future = finishAppService.take();
Map<String, FinishApplicationMasterResponse> uamResponse = future.get();
LOG.debug("Received finish application response from RM: {}", uamResponse.keySet());
responseMap.putAll(uamResponse);
} catch (Throwable e) {
LOG.warn("Failed to finish unmanaged application master: ApplicationId: {}", appId, e);
}
}
}
return responseMap;
}
}