blob: de9877b85aef603ea1e3ca315d8bf7d722a69a6d [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.client.api.impl;
import java.io.IOException;
import java.io.Serializable;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.Comparator;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.LinkedHashSet;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.TreeSet;
import java.util.AbstractMap.SimpleEntry;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.classification.InterfaceAudience.Private;
import org.apache.hadoop.classification.InterfaceStability.Unstable;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.ipc.RPC;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.yarn.api.ApplicationMasterProtocol;
import org.apache.hadoop.yarn.api.protocolrecords.AllocateRequest;
import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse;
import org.apache.hadoop.yarn.api.protocolrecords.FinishApplicationMasterRequest;
import org.apache.hadoop.yarn.api.protocolrecords.FinishApplicationMasterResponse;
import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterRequest;
import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterResponse;
import org.apache.hadoop.yarn.api.records.Container;
import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.api.records.ContainerStatus;
import org.apache.hadoop.yarn.api.records.ContainerUpdateType;
import org.apache.hadoop.yarn.api.records.ExecutionType;
import org.apache.hadoop.yarn.api.records.ExecutionTypeRequest;
import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
import org.apache.hadoop.yarn.api.records.NMToken;
import org.apache.hadoop.yarn.api.records.Priority;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.api.records.ResourceBlacklistRequest;
import org.apache.hadoop.yarn.api.records.ResourceRequest;
import org.apache.hadoop.yarn.api.records.Token;
import org.apache.hadoop.yarn.api.records.UpdateContainerRequest;
import org.apache.hadoop.yarn.api.records.UpdatedContainer;
import org.apache.hadoop.yarn.client.ClientRMProxy;
import org.apache.hadoop.yarn.client.api.AMRMClient;
import org.apache.hadoop.yarn.client.api.AMRMClient.ContainerRequest;
import org.apache.hadoop.yarn.client.api.InvalidContainerRequestException;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.exceptions.ApplicationMasterNotRegisteredException;
import org.apache.hadoop.yarn.exceptions.YarnException;
import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
import org.apache.hadoop.yarn.security.AMRMTokenIdentifier;
import org.apache.hadoop.yarn.util.RackResolver;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Joiner;
import com.google.common.base.Preconditions;
import org.apache.hadoop.yarn.util.resource.Resources;
@Private
@Unstable
public class AMRMClientImpl<T extends ContainerRequest> extends AMRMClient<T> {
private static final Log LOG = LogFactory.getLog(AMRMClientImpl.class);
private static final List<String> ANY_LIST =
Collections.singletonList(ResourceRequest.ANY);
private int lastResponseId = 0;
protected String appHostName;
protected int appHostPort;
protected String appTrackingUrl;
protected String newTrackingUrl;
protected ApplicationMasterProtocol rmClient;
protected Resource clusterAvailableResources;
protected int clusterNodeCount;
// blacklistedNodes is required for keeping history of blacklisted nodes that
// are sent to RM. On RESYNC command from RM, blacklistedNodes are used to get
// current blacklisted nodes and send back to RM.
protected final Set<String> blacklistedNodes = new HashSet<String>();
protected final Set<String> blacklistAdditions = new HashSet<String>();
protected final Set<String> blacklistRemovals = new HashSet<String>();
static class ResourceRequestInfo<T> {
ResourceRequest remoteRequest;
LinkedHashSet<T> containerRequests;
ResourceRequestInfo(Long allocationRequestId, Priority priority,
String resourceName, Resource capability, boolean relaxLocality) {
remoteRequest = ResourceRequest.newBuilder().priority(priority)
.resourceName(resourceName).capability(capability).numContainers(0)
.allocationRequestId(allocationRequestId)
.relaxLocality(relaxLocality).build();
containerRequests = new LinkedHashSet<T>();
}
}
/**
* Class compares Resource by memory then cpu in reverse order
*/
static class ResourceReverseMemoryThenCpuComparator implements
Comparator<Resource>, Serializable {
static final long serialVersionUID = 12345L;
@Override
public int compare(Resource arg0, Resource arg1) {
long mem0 = arg0.getMemorySize();
long mem1 = arg1.getMemorySize();
long cpu0 = arg0.getVirtualCores();
long cpu1 = arg1.getVirtualCores();
if(mem0 == mem1) {
if(cpu0 == cpu1) {
return 0;
}
if(cpu0 < cpu1) {
return 1;
}
return -1;
}
if(mem0 < mem1) {
return 1;
}
return -1;
}
}
static boolean canFit(Resource arg0, Resource arg1) {
return Resources.fitsIn(arg0, arg1);
}
private final Map<Long, RemoteRequestsTable<T>> remoteRequests =
new HashMap<>();
protected final Set<ResourceRequest> ask = new TreeSet<ResourceRequest>(
new org.apache.hadoop.yarn.api.records.ResourceRequest.ResourceRequestComparator());
protected final Set<ContainerId> release = new TreeSet<ContainerId>();
// pendingRelease holds history of release requests.
// request is removed only if RM sends completedContainer.
// How it different from release? --> release is for per allocate() request.
protected Set<ContainerId> pendingRelease = new TreeSet<ContainerId>();
// change map holds container resource change requests between two allocate()
// calls, and are cleared after each successful allocate() call.
protected final Map<ContainerId,
SimpleEntry<Container, UpdateContainerRequest>> change = new HashMap<>();
// pendingChange map holds history of container resource change requests in
// case AM needs to reregister with the ResourceManager.
// Change requests are removed from this map if RM confirms the change
// through allocate response, or if RM confirms that the container has been
// completed.
protected final Map<ContainerId,
SimpleEntry<Container, UpdateContainerRequest>> pendingChange =
new HashMap<>();
public AMRMClientImpl() {
super(AMRMClientImpl.class.getName());
}
@VisibleForTesting
AMRMClientImpl(ApplicationMasterProtocol protocol) {
super(AMRMClientImpl.class.getName());
this.rmClient = protocol;
}
@Override
protected void serviceInit(Configuration conf) throws Exception {
RackResolver.init(conf);
super.serviceInit(conf);
}
@Override
protected void serviceStart() throws Exception {
final YarnConfiguration conf = new YarnConfiguration(getConfig());
try {
if (rmClient == null) {
rmClient = ClientRMProxy.createRMProxy(
conf, ApplicationMasterProtocol.class);
}
} catch (IOException e) {
throw new YarnRuntimeException(e);
}
super.serviceStart();
}
@Override
protected void serviceStop() throws Exception {
if (this.rmClient != null) {
RPC.stopProxy(this.rmClient);
}
super.serviceStop();
}
@Override
public RegisterApplicationMasterResponse registerApplicationMaster(
String appHostName, int appHostPort, String appTrackingUrl)
throws YarnException, IOException {
this.appHostName = appHostName;
this.appHostPort = appHostPort;
this.appTrackingUrl = appTrackingUrl;
Preconditions.checkArgument(appHostName != null,
"The host name should not be null");
Preconditions.checkArgument(appHostPort >= -1, "Port number of the host"
+ " should be any integers larger than or equal to -1");
return registerApplicationMaster();
}
private RegisterApplicationMasterResponse registerApplicationMaster()
throws YarnException, IOException {
RegisterApplicationMasterRequest request =
RegisterApplicationMasterRequest.newInstance(this.appHostName,
this.appHostPort, this.appTrackingUrl);
RegisterApplicationMasterResponse response =
rmClient.registerApplicationMaster(request);
synchronized (this) {
lastResponseId = 0;
if (!response.getNMTokensFromPreviousAttempts().isEmpty()) {
populateNMTokens(response.getNMTokensFromPreviousAttempts());
}
}
return response;
}
@Override
public AllocateResponse allocate(float progressIndicator)
throws YarnException, IOException {
Preconditions.checkArgument(progressIndicator >= 0,
"Progress indicator should not be negative");
AllocateResponse allocateResponse = null;
List<ResourceRequest> askList = null;
List<ContainerId> releaseList = null;
AllocateRequest allocateRequest = null;
List<String> blacklistToAdd = new ArrayList<String>();
List<String> blacklistToRemove = new ArrayList<String>();
Map<ContainerId, SimpleEntry<Container, UpdateContainerRequest>> oldChange =
new HashMap<>();
try {
synchronized (this) {
askList = cloneAsks();
// Save the current change for recovery
oldChange.putAll(change);
List<UpdateContainerRequest> updateList = createUpdateList();
releaseList = new ArrayList<ContainerId>(release);
// optimistically clear this collection assuming no RPC failure
ask.clear();
release.clear();
change.clear();
blacklistToAdd.addAll(blacklistAdditions);
blacklistToRemove.addAll(blacklistRemovals);
ResourceBlacklistRequest blacklistRequest =
ResourceBlacklistRequest.newInstance(blacklistToAdd,
blacklistToRemove);
allocateRequest = AllocateRequest.newBuilder()
.responseId(lastResponseId).progress(progressIndicator)
.askList(askList).resourceBlacklistRequest(blacklistRequest)
.releaseList(releaseList).updateRequests(updateList).build();
if (this.newTrackingUrl != null) {
allocateRequest.setTrackingUrl(this.newTrackingUrl);
this.appTrackingUrl = this.newTrackingUrl;
this.newTrackingUrl = null;
}
// clear blacklistAdditions and blacklistRemovals before
// unsynchronized part
blacklistAdditions.clear();
blacklistRemovals.clear();
}
try {
allocateResponse = rmClient.allocate(allocateRequest);
} catch (ApplicationMasterNotRegisteredException e) {
LOG.warn("ApplicationMaster is out of sync with ResourceManager,"
+ " hence resyncing.");
synchronized (this) {
release.addAll(this.pendingRelease);
blacklistAdditions.addAll(this.blacklistedNodes);
for (RemoteRequestsTable remoteRequestsTable :
remoteRequests.values()) {
@SuppressWarnings("unchecked")
Iterator<ResourceRequestInfo<T>> reqIter =
remoteRequestsTable.iterator();
while (reqIter.hasNext()) {
addResourceRequestToAsk(reqIter.next().remoteRequest);
}
}
change.putAll(this.pendingChange);
}
// re register with RM
registerApplicationMaster();
allocateResponse = allocate(progressIndicator);
return allocateResponse;
}
synchronized (this) {
// update these on successful RPC
clusterNodeCount = allocateResponse.getNumClusterNodes();
lastResponseId = allocateResponse.getResponseId();
clusterAvailableResources = allocateResponse.getAvailableResources();
if (!allocateResponse.getNMTokens().isEmpty()) {
populateNMTokens(allocateResponse.getNMTokens());
}
if (allocateResponse.getAMRMToken() != null) {
updateAMRMToken(allocateResponse.getAMRMToken());
}
if (!pendingRelease.isEmpty()
&& !allocateResponse.getCompletedContainersStatuses().isEmpty()) {
removePendingReleaseRequests(allocateResponse
.getCompletedContainersStatuses());
}
if (!pendingChange.isEmpty()) {
List<ContainerStatus> completed =
allocateResponse.getCompletedContainersStatuses();
List<UpdatedContainer> changed = new ArrayList<>();
changed.addAll(allocateResponse.getUpdatedContainers());
// remove all pending change requests that belong to the completed
// containers
for (ContainerStatus status : completed) {
ContainerId containerId = status.getContainerId();
pendingChange.remove(containerId);
}
// remove all pending change requests that have been satisfied
if (!changed.isEmpty()) {
removePendingChangeRequests(changed);
}
}
}
} finally {
// TODO how to differentiate remote yarn exception vs error in rpc
if(allocateResponse == null) {
// we hit an exception in allocate()
// preserve ask and release for next call to allocate()
synchronized (this) {
release.addAll(releaseList);
// requests could have been added or deleted during call to allocate
// If requests were added/removed then there is nothing to do since
// the ResourceRequest object in ask would have the actual new value.
// If ask does not have this ResourceRequest then it was unchanged and
// so we can add the value back safely.
// This assumes that there will no concurrent calls to allocate() and
// so we dont have to worry about ask being changed in the
// synchronized block at the beginning of this method.
for(ResourceRequest oldAsk : askList) {
if(!ask.contains(oldAsk)) {
ask.add(oldAsk);
}
}
// change requests could have been added during the allocate call.
// Those are the newest requests which take precedence
// over requests cached in the oldChange map.
//
// Only insert entries from the cached oldChange map
// that do not exist in the current change map:
for (Map.Entry<ContainerId,
SimpleEntry<Container, UpdateContainerRequest>> entry :
oldChange.entrySet()) {
ContainerId oldContainerId = entry.getKey();
Container oldContainer = entry.getValue().getKey();
UpdateContainerRequest oldupdate = entry.getValue().getValue();
if (change.get(oldContainerId) == null) {
change.put(
oldContainerId, new SimpleEntry<>(oldContainer, oldupdate));
}
}
blacklistAdditions.addAll(blacklistToAdd);
blacklistRemovals.addAll(blacklistToRemove);
}
}
}
return allocateResponse;
}
private List<UpdateContainerRequest> createUpdateList() {
List<UpdateContainerRequest> updateList = new ArrayList<>();
for (Map.Entry<ContainerId, SimpleEntry<Container,
UpdateContainerRequest>> entry : change.entrySet()) {
Resource targetCapability = entry.getValue().getValue().getCapability();
ExecutionType targetExecType =
entry.getValue().getValue().getExecutionType();
ContainerUpdateType updateType =
entry.getValue().getValue().getContainerUpdateType();
int version = entry.getValue().getKey().getVersion();
updateList.add(
UpdateContainerRequest.newInstance(version, entry.getKey(),
updateType, targetCapability, targetExecType));
}
return updateList;
}
private List<ResourceRequest> cloneAsks() {
List<ResourceRequest> askList = new ArrayList<ResourceRequest>(ask.size());
for(ResourceRequest r : ask) {
// create a copy of ResourceRequest as we might change it while the
// RPC layer is using it to send info across
askList.add(ResourceRequest.clone(r));
}
return askList;
}
protected void removePendingReleaseRequests(
List<ContainerStatus> completedContainersStatuses) {
for (ContainerStatus containerStatus : completedContainersStatuses) {
pendingRelease.remove(containerStatus.getContainerId());
}
}
protected void removePendingChangeRequests(
List<UpdatedContainer> changedContainers) {
for (UpdatedContainer changedContainer : changedContainers) {
ContainerId containerId = changedContainer.getContainer().getId();
if (pendingChange.get(containerId) == null) {
continue;
}
if (LOG.isDebugEnabled()) {
LOG.debug("RM has confirmed changed resource allocation for "
+ "container " + containerId + ". Current resource allocation:"
+ changedContainer.getContainer().getResource()
+ ". Remove pending change request:"
+ pendingChange.get(containerId).getValue());
}
pendingChange.remove(containerId);
}
}
@Private
@VisibleForTesting
protected void populateNMTokens(List<NMToken> nmTokens) {
for (NMToken token : nmTokens) {
String nodeId = token.getNodeId().toString();
if (LOG.isDebugEnabled()) {
if (getNMTokenCache().containsToken(nodeId)) {
LOG.debug("Replacing token for : " + nodeId);
} else {
LOG.debug("Received new token for : " + nodeId);
}
}
getNMTokenCache().setToken(nodeId, token.getToken());
}
}
@Override
public void unregisterApplicationMaster(FinalApplicationStatus appStatus,
String appMessage, String appTrackingUrl) throws YarnException,
IOException {
Preconditions.checkArgument(appStatus != null,
"AppStatus should not be null.");
FinishApplicationMasterRequest request =
FinishApplicationMasterRequest.newInstance(appStatus, appMessage,
appTrackingUrl);
try {
while (true) {
FinishApplicationMasterResponse response =
rmClient.finishApplicationMaster(request);
if (response.getIsUnregistered()) {
break;
}
LOG.info("Waiting for application to be successfully unregistered.");
Thread.sleep(100);
}
} catch (InterruptedException e) {
LOG.info("Interrupted while waiting for application"
+ " to be removed from RMStateStore");
} catch (ApplicationMasterNotRegisteredException e) {
LOG.warn("ApplicationMaster is out of sync with ResourceManager,"
+ " hence resyncing.");
// re register with RM
registerApplicationMaster();
unregisterApplicationMaster(appStatus, appMessage, appTrackingUrl);
}
}
@Override
public synchronized void addContainerRequest(T req) {
Preconditions.checkArgument(req != null,
"Resource request can not be null.");
Set<String> dedupedRacks = new HashSet<String>();
if (req.getRacks() != null) {
dedupedRacks.addAll(req.getRacks());
if(req.getRacks().size() != dedupedRacks.size()) {
Joiner joiner = Joiner.on(',');
LOG.warn("ContainerRequest has duplicate racks: "
+ joiner.join(req.getRacks()));
}
}
Set<String> inferredRacks = resolveRacks(req.getNodes());
inferredRacks.removeAll(dedupedRacks);
// check that specific and non-specific requests cannot be mixed within a
// priority
checkLocalityRelaxationConflict(req.getAllocationRequestId(),
req.getPriority(), ANY_LIST, req.getRelaxLocality());
// check that specific rack cannot be mixed with specific node within a
// priority. If node and its rack are both specified then they must be
// in the same request.
// For explicitly requested racks, we set locality relaxation to true
checkLocalityRelaxationConflict(req.getAllocationRequestId(),
req.getPriority(), dedupedRacks, true);
checkLocalityRelaxationConflict(req.getAllocationRequestId(),
req.getPriority(), inferredRacks, req.getRelaxLocality());
// check if the node label expression specified is valid
checkNodeLabelExpression(req);
if (req.getNodes() != null) {
HashSet<String> dedupedNodes = new HashSet<String>(req.getNodes());
if(dedupedNodes.size() != req.getNodes().size()) {
Joiner joiner = Joiner.on(',');
LOG.warn("ContainerRequest has duplicate nodes: "
+ joiner.join(req.getNodes()));
}
for (String node : dedupedNodes) {
addResourceRequest(req.getPriority(), node,
req.getExecutionTypeRequest(), req.getCapability(), req, true,
req.getNodeLabelExpression());
}
}
for (String rack : dedupedRacks) {
addResourceRequest(req.getPriority(), rack, req.getExecutionTypeRequest(),
req.getCapability(), req, true, req.getNodeLabelExpression());
}
// Ensure node requests are accompanied by requests for
// corresponding rack
for (String rack : inferredRacks) {
addResourceRequest(req.getPriority(), rack, req.getExecutionTypeRequest(),
req.getCapability(), req, req.getRelaxLocality(),
req.getNodeLabelExpression());
}
// Off-switch
addResourceRequest(req.getPriority(), ResourceRequest.ANY,
req.getExecutionTypeRequest(), req.getCapability(), req,
req.getRelaxLocality(), req.getNodeLabelExpression());
}
@Override
public synchronized void removeContainerRequest(T req) {
Preconditions.checkArgument(req != null,
"Resource request can not be null.");
Set<String> allRacks = new HashSet<String>();
if (req.getRacks() != null) {
allRacks.addAll(req.getRacks());
}
allRacks.addAll(resolveRacks(req.getNodes()));
// Update resource requests
if (req.getNodes() != null) {
for (String node : new HashSet<String>(req.getNodes())) {
decResourceRequest(req.getPriority(), node,
req.getExecutionTypeRequest(), req.getCapability(), req);
}
}
for (String rack : allRacks) {
decResourceRequest(req.getPriority(), rack,
req.getExecutionTypeRequest(), req.getCapability(), req);
}
decResourceRequest(req.getPriority(), ResourceRequest.ANY,
req.getExecutionTypeRequest(), req.getCapability(), req);
}
@Override
public synchronized void requestContainerUpdate(
Container container, UpdateContainerRequest updateContainerRequest) {
Preconditions.checkNotNull(container, "Container cannot be null!!");
Preconditions.checkNotNull(updateContainerRequest,
"UpdateContainerRequest cannot be null!!");
LOG.info("Requesting Container update : " +
"container=" + container + ", " +
"updateType=" + updateContainerRequest.getContainerUpdateType() + ", " +
"targetCapability=" + updateContainerRequest.getCapability() + ", " +
"targetExecType=" + updateContainerRequest.getExecutionType());
if (updateContainerRequest.getCapability() != null &&
updateContainerRequest.getExecutionType() == null) {
validateContainerResourceChangeRequest(
updateContainerRequest.getContainerUpdateType(),
container.getId(), container.getResource(),
updateContainerRequest.getCapability());
} else if (updateContainerRequest.getExecutionType() != null &&
updateContainerRequest.getCapability() == null) {
validateContainerExecTypeChangeRequest(
updateContainerRequest.getContainerUpdateType(),
container.getId(), container.getExecutionType(),
updateContainerRequest.getExecutionType());
} else if (updateContainerRequest.getExecutionType() == null &&
updateContainerRequest.getCapability() == null) {
throw new IllegalArgumentException("Both target Capability and" +
"target Execution Type are null");
} else {
throw new IllegalArgumentException("Support currently exists only for" +
" EITHER update of Capability OR update of Execution Type NOT both");
}
if (change.get(container.getId()) == null) {
change.put(container.getId(),
new SimpleEntry<>(container, updateContainerRequest));
} else {
change.get(container.getId()).setValue(updateContainerRequest);
}
if (pendingChange.get(container.getId()) == null) {
pendingChange.put(container.getId(),
new SimpleEntry<>(container, updateContainerRequest));
} else {
pendingChange.get(container.getId()).setValue(updateContainerRequest);
}
}
@Override
public synchronized void releaseAssignedContainer(ContainerId containerId) {
Preconditions.checkArgument(containerId != null,
"ContainerId can not be null.");
pendingRelease.add(containerId);
release.add(containerId);
pendingChange.remove(containerId);
}
@Override
public synchronized Resource getAvailableResources() {
return clusterAvailableResources;
}
@Override
public synchronized int getClusterNodeCount() {
return clusterNodeCount;
}
@Override
@SuppressWarnings("unchecked")
public Collection<T> getMatchingRequests(long allocationRequestId) {
RemoteRequestsTable remoteRequestsTable = getTable(allocationRequestId);
LinkedHashSet<T> list = new LinkedHashSet<>();
if (remoteRequestsTable != null) {
Iterator<ResourceRequestInfo<T>> reqIter =
remoteRequestsTable.iterator();
while (reqIter.hasNext()) {
ResourceRequestInfo<T> resReqInfo = reqIter.next();
list.addAll(resReqInfo.containerRequests);
}
}
return list;
}
@Override
public synchronized List<? extends Collection<T>> getMatchingRequests(
Priority priority,
String resourceName,
Resource capability) {
return getMatchingRequests(priority, resourceName,
ExecutionType.GUARANTEED, capability);
}
@Override
@SuppressWarnings("unchecked")
public synchronized List<? extends Collection<T>> getMatchingRequests(
Priority priority, String resourceName, ExecutionType executionType,
Resource capability) {
Preconditions.checkArgument(capability != null,
"The Resource to be requested should not be null ");
Preconditions.checkArgument(priority != null,
"The priority at which to request containers should not be null ");
List<LinkedHashSet<T>> list = new LinkedList<LinkedHashSet<T>>();
RemoteRequestsTable remoteRequestsTable = getTable(0);
if (null != remoteRequestsTable) {
List<ResourceRequestInfo<T>> matchingRequests =
remoteRequestsTable.getMatchingRequests(priority, resourceName,
executionType, capability);
if (null != matchingRequests) {
// If no exact match. Container may be larger than what was requested.
// get all resources <= capability. map is reverse sorted.
for (ResourceRequestInfo<T> resReqInfo : matchingRequests) {
if (canFit(resReqInfo.remoteRequest.getCapability(), capability) &&
!resReqInfo.containerRequests.isEmpty()) {
list.add(resReqInfo.containerRequests);
}
}
}
}
// no match found
return list;
}
private Set<String> resolveRacks(List<String> nodes) {
Set<String> racks = new HashSet<String>();
if (nodes != null) {
for (String node : nodes) {
// Ensure node requests are accompanied by requests for
// corresponding rack
String rack = RackResolver.resolve(node).getNetworkLocation();
if (rack == null) {
LOG.warn("Failed to resolve rack for node " + node + ".");
} else {
racks.add(rack);
}
}
}
return racks;
}
/**
* ContainerRequests with locality relaxation cannot be made at the same
* priority as ContainerRequests without locality relaxation.
*/
private void checkLocalityRelaxationConflict(Long allocationReqId,
Priority priority, Collection<String> locations, boolean relaxLocality) {
// Locality relaxation will be set to relaxLocality for all implicitly
// requested racks. Make sure that existing rack requests match this.
RemoteRequestsTable<T> remoteRequestsTable = getTable(allocationReqId);
if (remoteRequestsTable != null) {
@SuppressWarnings("unchecked")
List<ResourceRequestInfo> allCapabilityMaps =
remoteRequestsTable.getAllResourceRequestInfos(priority, locations);
for (ResourceRequestInfo reqs : allCapabilityMaps) {
ResourceRequest remoteRequest = reqs.remoteRequest;
boolean existingRelaxLocality = remoteRequest.getRelaxLocality();
if (relaxLocality != existingRelaxLocality) {
throw new InvalidContainerRequestException("Cannot submit a "
+ "ContainerRequest asking for location "
+ remoteRequest.getResourceName() + " with locality relaxation "
+ relaxLocality + " when it has already been requested"
+ "with locality relaxation " + existingRelaxLocality);
}
}
}
}
/**
* Valid if a node label expression specified on container request is valid or
* not
*
* @param containerRequest
*/
private void checkNodeLabelExpression(T containerRequest) {
String exp = containerRequest.getNodeLabelExpression();
if (null == exp || exp.isEmpty()) {
return;
}
// Don't support specifying > 1 node labels in a node label expression now
if (exp.contains("&&") || exp.contains("||")) {
throw new InvalidContainerRequestException(
"Cannot specify more than one node label"
+ " in a single node label expression");
}
}
private void validateContainerResourceChangeRequest(
ContainerUpdateType updateType, ContainerId containerId,
Resource original, Resource target) {
Preconditions.checkArgument(containerId != null,
"ContainerId cannot be null");
Preconditions.checkArgument(original != null,
"Original resource capability cannot be null");
Preconditions.checkArgument(!Resources.equals(Resources.none(), original)
&& Resources.fitsIn(Resources.none(), original),
"Original resource capability must be greater than 0");
Preconditions.checkArgument(target != null,
"Target resource capability cannot be null");
Preconditions.checkArgument(!Resources.equals(Resources.none(), target)
&& Resources.fitsIn(Resources.none(), target),
"Target resource capability must be greater than 0");
if (ContainerUpdateType.DECREASE_RESOURCE == updateType) {
Preconditions.checkArgument(Resources.fitsIn(target, original),
"Target resource capability must fit in Original capability");
} else {
Preconditions.checkArgument(Resources.fitsIn(original, target),
"Target resource capability must be more than Original capability");
}
}
private void validateContainerExecTypeChangeRequest(
ContainerUpdateType updateType, ContainerId containerId,
ExecutionType original, ExecutionType target) {
Preconditions.checkArgument(containerId != null,
"ContainerId cannot be null");
Preconditions.checkArgument(original != null,
"Original Execution Type cannot be null");
Preconditions.checkArgument(target != null,
"Target Execution Type cannot be null");
if (ContainerUpdateType.DEMOTE_EXECUTION_TYPE == updateType) {
Preconditions.checkArgument(target == ExecutionType.OPPORTUNISTIC
&& original == ExecutionType.GUARANTEED,
"Incorrect Container update request, target should be" +
" OPPORTUNISTIC and original should be GUARANTEED");
} else {
Preconditions.checkArgument(target == ExecutionType.GUARANTEED
&& original == ExecutionType.OPPORTUNISTIC,
"Incorrect Container update request, target should be" +
" GUARANTEED and original should be OPPORTUNISTIC");
}
}
private void addResourceRequestToAsk(ResourceRequest remoteRequest) {
// This code looks weird but is needed because of the following scenario.
// A ResourceRequest is removed from the remoteRequestTable. A 0 container
// request is added to 'ask' to notify the RM about not needing it any more.
// Before the call to allocate, the user now requests more containers. If
// the locations of the 0 size request and the new request are the same
// (with the difference being only container count), then the set comparator
// will consider both to be the same and not add the new request to ask. So
// we need to check for the "same" request being present and remove it and
// then add it back. The comparator is container count agnostic.
// This should happen only rarely but we do need to guard against it.
if(ask.contains(remoteRequest)) {
ask.remove(remoteRequest);
}
ask.add(remoteRequest);
}
private void addResourceRequest(Priority priority, String resourceName,
ExecutionTypeRequest execTypeReq, Resource capability, T req,
boolean relaxLocality, String labelExpression) {
RemoteRequestsTable<T> remoteRequestsTable =
getTable(req.getAllocationRequestId());
if (remoteRequestsTable == null) {
remoteRequestsTable = new RemoteRequestsTable<T>();
putTable(req.getAllocationRequestId(), remoteRequestsTable);
}
@SuppressWarnings("unchecked")
ResourceRequestInfo resourceRequestInfo = remoteRequestsTable
.addResourceRequest(req.getAllocationRequestId(), priority,
resourceName, execTypeReq, capability, req, relaxLocality,
labelExpression);
// Note this down for next interaction with ResourceManager
addResourceRequestToAsk(resourceRequestInfo.remoteRequest);
if (LOG.isDebugEnabled()) {
LOG.debug("addResourceRequest:" + " applicationId="
+ " priority=" + priority.getPriority()
+ " resourceName=" + resourceName + " numContainers="
+ resourceRequestInfo.remoteRequest.getNumContainers()
+ " #asks=" + ask.size());
}
}
private void decResourceRequest(Priority priority, String resourceName,
ExecutionTypeRequest execTypeReq, Resource capability, T req) {
RemoteRequestsTable<T> remoteRequestsTable =
getTable(req.getAllocationRequestId());
if (remoteRequestsTable != null) {
@SuppressWarnings("unchecked")
ResourceRequestInfo resourceRequestInfo =
remoteRequestsTable.decResourceRequest(priority, resourceName,
execTypeReq, capability, req);
// send the ResourceRequest to RM even if is 0 because it needs to
// override a previously sent value. If ResourceRequest was not sent
// previously then sending 0 ought to be a no-op on RM
if (resourceRequestInfo != null) {
addResourceRequestToAsk(resourceRequestInfo.remoteRequest);
// delete entry from map if no longer needed
if (resourceRequestInfo.remoteRequest.getNumContainers() == 0) {
remoteRequestsTable.remove(priority, resourceName,
execTypeReq.getExecutionType(), capability);
}
if (LOG.isDebugEnabled()) {
LOG.debug("AFTER decResourceRequest:"
+ " allocationRequestId=" + req.getAllocationRequestId()
+ " priority=" + priority.getPriority()
+ " resourceName=" + resourceName + " numContainers="
+ resourceRequestInfo.remoteRequest.getNumContainers()
+ " #asks=" + ask.size());
}
}
} else {
LOG.info("No remoteRequestTable found with allocationRequestId="
+ req.getAllocationRequestId());
}
}
@Override
public synchronized void updateBlacklist(List<String> blacklistAdditions,
List<String> blacklistRemovals) {
if (blacklistAdditions != null) {
this.blacklistAdditions.addAll(blacklistAdditions);
this.blacklistedNodes.addAll(blacklistAdditions);
// if some resources are also in blacklistRemovals updated before, we
// should remove them here.
this.blacklistRemovals.removeAll(blacklistAdditions);
}
if (blacklistRemovals != null) {
this.blacklistRemovals.addAll(blacklistRemovals);
this.blacklistedNodes.removeAll(blacklistRemovals);
// if some resources are in blacklistAdditions before, we should remove
// them here.
this.blacklistAdditions.removeAll(blacklistRemovals);
}
if (blacklistAdditions != null && blacklistRemovals != null
&& blacklistAdditions.removeAll(blacklistRemovals)) {
// we allow resources to appear in addition list and removal list in the
// same invocation of updateBlacklist(), but should get a warn here.
LOG.warn("The same resources appear in both blacklistAdditions and " +
"blacklistRemovals in updateBlacklist.");
}
}
@Override
public synchronized void updateTrackingUrl(String trackingUrl) {
this.newTrackingUrl = trackingUrl;
}
private void updateAMRMToken(Token token) throws IOException {
org.apache.hadoop.security.token.Token<AMRMTokenIdentifier> amrmToken =
new org.apache.hadoop.security.token.Token<AMRMTokenIdentifier>(token
.getIdentifier().array(), token.getPassword().array(), new Text(
token.getKind()), new Text(token.getService()));
// Preserve the token service sent by the RM when adding the token
// to ensure we replace the previous token setup by the RM.
// Afterwards we can update the service address for the RPC layer.
UserGroupInformation currentUGI = UserGroupInformation.getCurrentUser();
LOG.info("Updating with new AMRMToken");
currentUGI.addToken(amrmToken);
amrmToken.setService(ClientRMProxy.getAMRMTokenService(getConfig()));
}
@VisibleForTesting
RemoteRequestsTable<T> getTable(long allocationRequestId) {
return remoteRequests.get(Long.valueOf(allocationRequestId));
}
RemoteRequestsTable<T> putTable(long allocationRequestId,
RemoteRequestsTable<T> table) {
return remoteRequests.put(Long.valueOf(allocationRequestId), table);
}
}