blob: 5d4ab4a6814db46e30a9e8633ab2aaf21637aeb9 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.client;
import java.io.IOException;
import java.security.PrivilegedExceptionAction;
import java.util.Collection;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Set;
import org.apache.hadoop.classification.InterfaceAudience.Private;
import org.apache.hadoop.classification.InterfaceAudience.Public;
import org.apache.hadoop.classification.InterfaceStability.Unstable;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.CommonConfigurationKeysPublic;
import org.apache.hadoop.security.SaslRpcServer;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.security.token.Token;
import org.apache.hadoop.security.token.TokenIdentifier;
import org.apache.hadoop.yarn.api.records.Container;
import org.apache.hadoop.yarn.api.records.SchedulingRequest;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* Utility class for AMRMClient.
*/
@Private
public final class AMRMClientUtils {
private static final Logger LOG =
LoggerFactory.getLogger(AMRMClientUtils.class);
public static final String APP_ALREADY_REGISTERED_MESSAGE =
"Application Master is already registered : ";
private AMRMClientUtils() {
}
/**
* Create a proxy for the specified protocol.
*
* @param configuration Configuration to generate {@link ClientRMProxy}
* @param protocol Protocol for the proxy
* @param user the user on whose behalf the proxy is being created
* @param token the auth token to use for connection
* @param <T> Type information of the proxy
* @return Proxy to the RM
* @throws IOException on failure
*/
@Public
@Unstable
public static <T> T createRMProxy(final Configuration configuration,
final Class<T> protocol, UserGroupInformation user,
final Token<? extends TokenIdentifier> token) throws IOException {
try {
String rmClusterId = configuration.get(YarnConfiguration.RM_CLUSTER_ID,
YarnConfiguration.DEFAULT_RM_CLUSTER_ID);
LOG.info("Creating RMProxy to RM {} for protocol {} for user {}",
rmClusterId, protocol.getSimpleName(), user);
if (token != null) {
// preserve the token service sent by the RM when adding the token
// to ensure we replace the previous token setup by the RM.
// Afterwards we can update the service address for the RPC layer.
// Same as YarnServerSecurityUtils.updateAMRMToken()
user.addToken(token);
token.setService(ClientRMProxy.getAMRMTokenService(configuration));
setAuthModeInConf(configuration);
}
final T proxyConnection = user.doAs(new PrivilegedExceptionAction<T>() {
@Override
public T run() throws Exception {
return ClientRMProxy.createRMProxy(configuration, protocol);
}
});
return proxyConnection;
} catch (InterruptedException e) {
throw new YarnRuntimeException(e);
}
}
private static void setAuthModeInConf(Configuration conf) {
conf.set(CommonConfigurationKeysPublic.HADOOP_SECURITY_AUTHENTICATION,
SaslRpcServer.AuthMethod.TOKEN.toString());
}
public static void addToOutstandingSchedulingRequests(
Collection<SchedulingRequest> requests,
Map<Set<String>, List<SchedulingRequest>> outstandingSchedRequests) {
for (SchedulingRequest req : requests) {
List<SchedulingRequest> schedulingRequests = outstandingSchedRequests
.computeIfAbsent(req.getAllocationTags(), x -> new LinkedList<>());
SchedulingRequest matchingReq = null;
for (SchedulingRequest schedReq : schedulingRequests) {
if (isMatchingSchedulingRequests(req, schedReq)) {
matchingReq = schedReq;
break;
}
}
if (matchingReq != null) {
matchingReq.getResourceSizing()
.setNumAllocations(req.getResourceSizing().getNumAllocations());
} else {
schedulingRequests.add(req);
}
}
}
public static boolean isMatchingSchedulingRequests(
SchedulingRequest schedReq1, SchedulingRequest schedReq2) {
return schedReq1.getPriority().equals(schedReq2.getPriority()) &&
schedReq1.getExecutionType().getExecutionType().equals(
schedReq1.getExecutionType().getExecutionType()) &&
schedReq1.getAllocationRequestId() ==
schedReq2.getAllocationRequestId();
}
public static void removeFromOutstandingSchedulingRequests(
Collection<Container> containers,
Map<Set<String>, List<SchedulingRequest>> outstandingSchedRequests) {
if (containers == null || containers.isEmpty()) {
return;
}
for (Container container : containers) {
if (container.getAllocationTags() != null
&& !container.getAllocationTags().isEmpty()) {
List<SchedulingRequest> schedReqs =
outstandingSchedRequests.get(container.getAllocationTags());
if (schedReqs != null && !schedReqs.isEmpty()) {
Iterator<SchedulingRequest> iter = schedReqs.iterator();
while (iter.hasNext()) {
SchedulingRequest schedReq = iter.next();
if (schedReq.getPriority().equals(container.getPriority())
&& schedReq.getAllocationRequestId() == container
.getAllocationRequestId()) {
int numAllocations =
schedReq.getResourceSizing().getNumAllocations();
numAllocations--;
if (numAllocations == 0) {
iter.remove();
} else {
schedReq.getResourceSizing().setNumAllocations(numAllocations);
}
}
}
}
}
}
}
}