blob: 7993bd8a5ece4bdd8c3b961ee8508e0564d29de4 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.server.utils;
import java.io.IOException;
import java.security.PrivilegedExceptionAction;
import org.apache.hadoop.classification.InterfaceAudience.Private;
import org.apache.hadoop.classification.InterfaceAudience.Public;
import org.apache.hadoop.classification.InterfaceStability.Unstable;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.CommonConfigurationKeysPublic;
import org.apache.hadoop.security.SaslRpcServer;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.security.token.Token;
import org.apache.hadoop.security.token.TokenIdentifier;
import org.apache.hadoop.yarn.api.ApplicationMasterProtocol;
import org.apache.hadoop.yarn.api.protocolrecords.AllocateRequest;
import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse;
import org.apache.hadoop.yarn.api.protocolrecords.FinishApplicationMasterRequest;
import org.apache.hadoop.yarn.api.protocolrecords.FinishApplicationMasterResponse;
import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterRequest;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.client.ClientRMProxy;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.exceptions.ApplicationMasterNotRegisteredException;
import org.apache.hadoop.yarn.exceptions.InvalidApplicationMasterRequestException;
import org.apache.hadoop.yarn.exceptions.YarnException;
import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* Utility class for AMRMClient.
*/
@Private
public final class AMRMClientUtils {
private static final Logger LOG =
LoggerFactory.getLogger(AMRMClientUtils.class);
public static final String APP_ALREADY_REGISTERED_MESSAGE =
"Application Master is already registered : ";
private AMRMClientUtils() {
}
/**
* Handle ApplicationNotRegistered exception and re-register.
*
* @param attemptId app attemptId
* @param rmProxy RM proxy instance
* @param registerRequest the AM re-register request
* @throws YarnException if re-register fails
*/
public static void handleNotRegisteredExceptionAndReRegister(
ApplicationAttemptId attemptId, ApplicationMasterProtocol rmProxy,
RegisterApplicationMasterRequest registerRequest) throws YarnException {
LOG.info("App attempt {} not registered, most likely due to RM failover. "
+ " Trying to re-register.", attemptId);
try {
rmProxy.registerApplicationMaster(registerRequest);
} catch (Exception e) {
if (e instanceof InvalidApplicationMasterRequestException
&& e.getMessage().contains(APP_ALREADY_REGISTERED_MESSAGE)) {
LOG.info("Concurrent thread successfully registered, moving on.");
} else {
LOG.error("Error trying to re-register AM", e);
throw new YarnException(e);
}
}
}
/**
* Helper method for client calling ApplicationMasterProtocol.allocate that
* handles re-register if RM fails over.
*
* @param request allocate request
* @param rmProxy RM proxy
* @param registerRequest the register request for re-register
* @param attemptId application attempt id
* @return allocate response
* @throws YarnException if RM call fails
* @throws IOException if RM call fails
*/
public static AllocateResponse allocateWithReRegister(AllocateRequest request,
ApplicationMasterProtocol rmProxy,
RegisterApplicationMasterRequest registerRequest,
ApplicationAttemptId attemptId) throws YarnException, IOException {
try {
return rmProxy.allocate(request);
} catch (ApplicationMasterNotRegisteredException e) {
handleNotRegisteredExceptionAndReRegister(attemptId, rmProxy,
registerRequest);
// reset responseId after re-register
request.setResponseId(0);
// retry allocate
return allocateWithReRegister(request, rmProxy, registerRequest,
attemptId);
}
}
/**
* Helper method for client calling
* ApplicationMasterProtocol.finishApplicationMaster that handles re-register
* if RM fails over.
*
* @param request finishApplicationMaster request
* @param rmProxy RM proxy
* @param registerRequest the register request for re-register
* @param attemptId application attempt id
* @return finishApplicationMaster response
* @throws YarnException if RM call fails
* @throws IOException if RM call fails
*/
public static FinishApplicationMasterResponse finishAMWithReRegister(
FinishApplicationMasterRequest request, ApplicationMasterProtocol rmProxy,
RegisterApplicationMasterRequest registerRequest,
ApplicationAttemptId attemptId) throws YarnException, IOException {
try {
return rmProxy.finishApplicationMaster(request);
} catch (ApplicationMasterNotRegisteredException ex) {
handleNotRegisteredExceptionAndReRegister(attemptId, rmProxy,
registerRequest);
// retry finishAM after re-register
return finishAMWithReRegister(request, rmProxy, registerRequest,
attemptId);
}
}
/**
* Create a proxy for the specified protocol.
*
* @param configuration Configuration to generate {@link ClientRMProxy}
* @param protocol Protocol for the proxy
* @param user the user on whose behalf the proxy is being created
* @param token the auth token to use for connection
* @param <T> Type information of the proxy
* @return Proxy to the RM
* @throws IOException on failure
*/
@Public
@Unstable
public static <T> T createRMProxy(final Configuration configuration,
final Class<T> protocol, UserGroupInformation user,
final Token<? extends TokenIdentifier> token) throws IOException {
try {
String rmClusterId = configuration.get(YarnConfiguration.RM_CLUSTER_ID,
YarnConfiguration.DEFAULT_RM_CLUSTER_ID);
LOG.info("Creating RMProxy to RM {} for protocol {} for user {}",
rmClusterId, protocol.getSimpleName(), user);
if (token != null) {
token.setService(ClientRMProxy.getAMRMTokenService(configuration));
user.addToken(token);
setAuthModeInConf(configuration);
}
final T proxyConnection = user.doAs(new PrivilegedExceptionAction<T>() {
@Override
public T run() throws Exception {
return ClientRMProxy.createRMProxy(configuration, protocol);
}
});
return proxyConnection;
} catch (InterruptedException e) {
throw new YarnRuntimeException(e);
}
}
private static void setAuthModeInConf(Configuration conf) {
conf.set(CommonConfigurationKeysPublic.HADOOP_SECURITY_AUTHENTICATION,
SaslRpcServer.AuthMethod.TOKEN.toString());
}
}