| /** |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| |
| package org.apache.hadoop.yarn.client.api.impl; |
| |
| import java.io.IOException; |
| import java.io.Serializable; |
| import java.util.ArrayList; |
| import java.util.Collection; |
| import java.util.Collections; |
| import java.util.Comparator; |
| import java.util.HashMap; |
| import java.util.HashSet; |
| import java.util.Iterator; |
| import java.util.LinkedHashSet; |
| import java.util.LinkedList; |
| import java.util.List; |
| import java.util.Map; |
| import java.util.Set; |
| import java.util.TreeSet; |
| import java.util.AbstractMap.SimpleEntry; |
| |
| import org.apache.commons.logging.Log; |
| import org.apache.commons.logging.LogFactory; |
| import org.apache.hadoop.classification.InterfaceAudience.Private; |
| import org.apache.hadoop.classification.InterfaceStability.Unstable; |
| import org.apache.hadoop.conf.Configuration; |
| import org.apache.hadoop.io.Text; |
| import org.apache.hadoop.ipc.RPC; |
| import org.apache.hadoop.security.UserGroupInformation; |
| import org.apache.hadoop.yarn.api.ApplicationMasterProtocol; |
| import org.apache.hadoop.yarn.api.protocolrecords.AllocateRequest; |
| import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse; |
| import org.apache.hadoop.yarn.api.protocolrecords.FinishApplicationMasterRequest; |
| import org.apache.hadoop.yarn.api.protocolrecords.FinishApplicationMasterResponse; |
| import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterRequest; |
| import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterResponse; |
| import org.apache.hadoop.yarn.api.records.Container; |
| import org.apache.hadoop.yarn.api.records.ContainerId; |
| import org.apache.hadoop.yarn.api.records.ContainerStatus; |
| import org.apache.hadoop.yarn.api.records.ContainerUpdateType; |
| import org.apache.hadoop.yarn.api.records.ExecutionType; |
| import org.apache.hadoop.yarn.api.records.ExecutionTypeRequest; |
| import org.apache.hadoop.yarn.api.records.FinalApplicationStatus; |
| import org.apache.hadoop.yarn.api.records.NMToken; |
| import org.apache.hadoop.yarn.api.records.Priority; |
| import org.apache.hadoop.yarn.api.records.ProfileCapability; |
| import org.apache.hadoop.yarn.api.records.Resource; |
| import org.apache.hadoop.yarn.api.records.ResourceBlacklistRequest; |
| import org.apache.hadoop.yarn.api.records.ResourceRequest; |
| import org.apache.hadoop.yarn.api.records.Token; |
| import org.apache.hadoop.yarn.api.records.UpdateContainerRequest; |
| import org.apache.hadoop.yarn.api.records.UpdatedContainer; |
| import org.apache.hadoop.yarn.client.ClientRMProxy; |
| import org.apache.hadoop.yarn.client.api.AMRMClient; |
| import org.apache.hadoop.yarn.client.api.AMRMClient.ContainerRequest; |
| import org.apache.hadoop.yarn.client.api.InvalidContainerRequestException; |
| import org.apache.hadoop.yarn.conf.YarnConfiguration; |
| import org.apache.hadoop.yarn.exceptions.ApplicationMasterNotRegisteredException; |
| import org.apache.hadoop.yarn.exceptions.YarnException; |
| import org.apache.hadoop.yarn.exceptions.YarnRuntimeException; |
| import org.apache.hadoop.yarn.security.AMRMTokenIdentifier; |
| import org.apache.hadoop.yarn.util.RackResolver; |
| |
| import com.google.common.annotations.VisibleForTesting; |
| import com.google.common.base.Joiner; |
| import com.google.common.base.Preconditions; |
| import org.apache.hadoop.yarn.util.resource.Resources; |
| |
| @Private |
| @Unstable |
| public class AMRMClientImpl<T extends ContainerRequest> extends AMRMClient<T> { |
| |
| private static final Log LOG = LogFactory.getLog(AMRMClientImpl.class); |
| private static final List<String> ANY_LIST = |
| Collections.singletonList(ResourceRequest.ANY); |
| |
| private int lastResponseId = 0; |
| |
| protected String appHostName; |
| protected int appHostPort; |
| protected String appTrackingUrl; |
| |
| protected ApplicationMasterProtocol rmClient; |
| protected Resource clusterAvailableResources; |
| protected int clusterNodeCount; |
| |
| // blacklistedNodes is required for keeping history of blacklisted nodes that |
| // are sent to RM. On RESYNC command from RM, blacklistedNodes are used to get |
| // current blacklisted nodes and send back to RM. |
| protected final Set<String> blacklistedNodes = new HashSet<String>(); |
| protected final Set<String> blacklistAdditions = new HashSet<String>(); |
| protected final Set<String> blacklistRemovals = new HashSet<String>(); |
| |
| protected Map<String, Resource> resourceProfilesMap; |
| |
| static class ResourceRequestInfo<T> { |
| ResourceRequest remoteRequest; |
| LinkedHashSet<T> containerRequests; |
| |
| ResourceRequestInfo(Long allocationRequestId, Priority priority, |
| String resourceName, Resource capability, boolean relaxLocality, |
| String resourceProfile) { |
| ProfileCapability profileCapability = ProfileCapability |
| .newInstance(resourceProfile, capability); |
| remoteRequest = ResourceRequest.newBuilder().priority(priority) |
| .resourceName(resourceName).capability(capability).numContainers(0) |
| .allocationRequestId(allocationRequestId).relaxLocality(relaxLocality) |
| .profileCapability(profileCapability).build(); |
| containerRequests = new LinkedHashSet<T>(); |
| } |
| } |
| |
| /** |
| * Class compares Resource by memory, then cpu and then the remaining resource |
| * types in reverse order. |
| */ |
| static class ProfileCapabilityComparator<T extends ProfileCapability> |
| implements Comparator<T> { |
| |
| HashMap<String, Resource> resourceProfilesMap; |
| |
| public ProfileCapabilityComparator( |
| HashMap<String, Resource> resourceProfileMap) { |
| this.resourceProfilesMap = resourceProfileMap; |
| } |
| |
| public int compare(T arg0, T arg1) { |
| Resource resource0 = |
| ProfileCapability.toResource(arg0, resourceProfilesMap); |
| Resource resource1 = |
| ProfileCapability.toResource(arg1, resourceProfilesMap); |
| return resource1.compareTo(resource0); |
| } |
| } |
| |
| boolean canFit(ProfileCapability arg0, ProfileCapability arg1) { |
| Resource resource0 = |
| ProfileCapability.toResource(arg0, resourceProfilesMap); |
| Resource resource1 = |
| ProfileCapability.toResource(arg1, resourceProfilesMap); |
| return Resources.fitsIn(resource0, resource1); |
| |
| } |
| |
| private final Map<Long, RemoteRequestsTable<T>> remoteRequests = |
| new HashMap<>(); |
| |
| protected final Set<ResourceRequest> ask = new TreeSet<ResourceRequest>( |
| new org.apache.hadoop.yarn.api.records.ResourceRequest.ResourceRequestComparator()); |
| protected final Set<ContainerId> release = new TreeSet<ContainerId>(); |
| // pendingRelease holds history of release requests. |
| // request is removed only if RM sends completedContainer. |
| // How it different from release? --> release is for per allocate() request. |
| protected Set<ContainerId> pendingRelease = new TreeSet<ContainerId>(); |
| // change map holds container resource change requests between two allocate() |
| // calls, and are cleared after each successful allocate() call. |
| protected final Map<ContainerId, SimpleEntry<Container, Resource>> change = |
| new HashMap<>(); |
| // pendingChange map holds history of container resource change requests in |
| // case AM needs to reregister with the ResourceManager. |
| // Change requests are removed from this map if RM confirms the change |
| // through allocate response, or if RM confirms that the container has been |
| // completed. |
| protected final Map<ContainerId, SimpleEntry<Container, Resource>> |
| pendingChange = new HashMap<>(); |
| |
| public AMRMClientImpl() { |
| super(AMRMClientImpl.class.getName()); |
| } |
| |
| @VisibleForTesting |
| AMRMClientImpl(ApplicationMasterProtocol protocol) { |
| super(AMRMClientImpl.class.getName()); |
| this.rmClient = protocol; |
| } |
| |
| @Override |
| protected void serviceInit(Configuration conf) throws Exception { |
| RackResolver.init(conf); |
| super.serviceInit(conf); |
| } |
| |
| @Override |
| protected void serviceStart() throws Exception { |
| final YarnConfiguration conf = new YarnConfiguration(getConfig()); |
| try { |
| if (rmClient == null) { |
| rmClient = ClientRMProxy.createRMProxy( |
| conf, ApplicationMasterProtocol.class); |
| } |
| } catch (IOException e) { |
| throw new YarnRuntimeException(e); |
| } |
| super.serviceStart(); |
| } |
| |
| @Override |
| protected void serviceStop() throws Exception { |
| if (this.rmClient != null) { |
| RPC.stopProxy(this.rmClient); |
| } |
| super.serviceStop(); |
| } |
| |
| @Override |
| public RegisterApplicationMasterResponse registerApplicationMaster( |
| String appHostName, int appHostPort, String appTrackingUrl) |
| throws YarnException, IOException { |
| this.appHostName = appHostName; |
| this.appHostPort = appHostPort; |
| this.appTrackingUrl = appTrackingUrl; |
| Preconditions.checkArgument(appHostName != null, |
| "The host name should not be null"); |
| Preconditions.checkArgument(appHostPort >= -1, "Port number of the host" |
| + " should be any integers larger than or equal to -1"); |
| |
| return registerApplicationMaster(); |
| } |
| |
| @SuppressWarnings("unchecked") |
| private RegisterApplicationMasterResponse registerApplicationMaster() |
| throws YarnException, IOException { |
| RegisterApplicationMasterRequest request = |
| RegisterApplicationMasterRequest.newInstance(this.appHostName, |
| this.appHostPort, this.appTrackingUrl); |
| RegisterApplicationMasterResponse response = |
| rmClient.registerApplicationMaster(request); |
| synchronized (this) { |
| lastResponseId = 0; |
| if (!response.getNMTokensFromPreviousAttempts().isEmpty()) { |
| populateNMTokens(response.getNMTokensFromPreviousAttempts()); |
| } |
| this.resourceProfilesMap = response.getResourceProfiles(); |
| } |
| return response; |
| } |
| |
| @Override |
| public AllocateResponse allocate(float progressIndicator) |
| throws YarnException, IOException { |
| Preconditions.checkArgument(progressIndicator >= 0, |
| "Progress indicator should not be negative"); |
| AllocateResponse allocateResponse = null; |
| List<ResourceRequest> askList = null; |
| List<ContainerId> releaseList = null; |
| AllocateRequest allocateRequest = null; |
| List<String> blacklistToAdd = new ArrayList<String>(); |
| List<String> blacklistToRemove = new ArrayList<String>(); |
| Map<ContainerId, SimpleEntry<Container, Resource>> oldChange = |
| new HashMap<>(); |
| try { |
| synchronized (this) { |
| askList = cloneAsks(); |
| // Save the current change for recovery |
| oldChange.putAll(change); |
| List<UpdateContainerRequest> updateList = createUpdateList(); |
| releaseList = new ArrayList<ContainerId>(release); |
| // optimistically clear this collection assuming no RPC failure |
| ask.clear(); |
| release.clear(); |
| change.clear(); |
| |
| blacklistToAdd.addAll(blacklistAdditions); |
| blacklistToRemove.addAll(blacklistRemovals); |
| |
| ResourceBlacklistRequest blacklistRequest = |
| ResourceBlacklistRequest.newInstance(blacklistToAdd, |
| blacklistToRemove); |
| |
| allocateRequest = AllocateRequest.newBuilder() |
| .responseId(lastResponseId).progress(progressIndicator) |
| .askList(askList).resourceBlacklistRequest(blacklistRequest) |
| .releaseList(releaseList).updateRequests(updateList).build(); |
| // clear blacklistAdditions and blacklistRemovals before |
| // unsynchronized part |
| blacklistAdditions.clear(); |
| blacklistRemovals.clear(); |
| } |
| |
| try { |
| allocateResponse = rmClient.allocate(allocateRequest); |
| } catch (ApplicationMasterNotRegisteredException e) { |
| LOG.warn("ApplicationMaster is out of sync with ResourceManager," |
| + " hence resyncing."); |
| synchronized (this) { |
| release.addAll(this.pendingRelease); |
| blacklistAdditions.addAll(this.blacklistedNodes); |
| for (RemoteRequestsTable remoteRequestsTable : |
| remoteRequests.values()) { |
| @SuppressWarnings("unchecked") |
| Iterator<ResourceRequestInfo<T>> reqIter = |
| remoteRequestsTable.iterator(); |
| while (reqIter.hasNext()) { |
| addResourceRequestToAsk(reqIter.next().remoteRequest); |
| } |
| } |
| change.putAll(this.pendingChange); |
| } |
| // re register with RM |
| registerApplicationMaster(); |
| allocateResponse = allocate(progressIndicator); |
| return allocateResponse; |
| } |
| |
| synchronized (this) { |
| // update these on successful RPC |
| clusterNodeCount = allocateResponse.getNumClusterNodes(); |
| lastResponseId = allocateResponse.getResponseId(); |
| clusterAvailableResources = allocateResponse.getAvailableResources(); |
| if (!allocateResponse.getNMTokens().isEmpty()) { |
| populateNMTokens(allocateResponse.getNMTokens()); |
| } |
| if (allocateResponse.getAMRMToken() != null) { |
| updateAMRMToken(allocateResponse.getAMRMToken()); |
| } |
| if (!pendingRelease.isEmpty() |
| && !allocateResponse.getCompletedContainersStatuses().isEmpty()) { |
| removePendingReleaseRequests(allocateResponse |
| .getCompletedContainersStatuses()); |
| } |
| if (!pendingChange.isEmpty()) { |
| List<ContainerStatus> completed = |
| allocateResponse.getCompletedContainersStatuses(); |
| List<UpdatedContainer> changed = new ArrayList<>(); |
| changed.addAll(allocateResponse.getUpdatedContainers()); |
| // remove all pending change requests that belong to the completed |
| // containers |
| for (ContainerStatus status : completed) { |
| ContainerId containerId = status.getContainerId(); |
| pendingChange.remove(containerId); |
| } |
| // remove all pending change requests that have been satisfied |
| if (!changed.isEmpty()) { |
| removePendingChangeRequests(changed); |
| } |
| } |
| } |
| } finally { |
| // TODO how to differentiate remote yarn exception vs error in rpc |
| if(allocateResponse == null) { |
| // we hit an exception in allocate() |
| // preserve ask and release for next call to allocate() |
| synchronized (this) { |
| release.addAll(releaseList); |
| // requests could have been added or deleted during call to allocate |
| // If requests were added/removed then there is nothing to do since |
| // the ResourceRequest object in ask would have the actual new value. |
| // If ask does not have this ResourceRequest then it was unchanged and |
| // so we can add the value back safely. |
| // This assumes that there will no concurrent calls to allocate() and |
| // so we dont have to worry about ask being changed in the |
| // synchronized block at the beginning of this method. |
| for(ResourceRequest oldAsk : askList) { |
| if(!ask.contains(oldAsk)) { |
| ask.add(oldAsk); |
| } |
| } |
| // change requests could have been added during the allocate call. |
| // Those are the newest requests which take precedence |
| // over requests cached in the oldChange map. |
| // |
| // Only insert entries from the cached oldChange map |
| // that do not exist in the current change map: |
| for (Map.Entry<ContainerId, SimpleEntry<Container, Resource>> entry : |
| oldChange.entrySet()) { |
| ContainerId oldContainerId = entry.getKey(); |
| Container oldContainer = entry.getValue().getKey(); |
| Resource oldResource = entry.getValue().getValue(); |
| if (change.get(oldContainerId) == null) { |
| change.put( |
| oldContainerId, new SimpleEntry<>(oldContainer, oldResource)); |
| } |
| } |
| blacklistAdditions.addAll(blacklistToAdd); |
| blacklistRemovals.addAll(blacklistToRemove); |
| } |
| } |
| } |
| return allocateResponse; |
| } |
| |
| private List<UpdateContainerRequest> createUpdateList() { |
| List<UpdateContainerRequest> updateList = new ArrayList<>(); |
| for (Map.Entry<ContainerId, SimpleEntry<Container, Resource>> entry : |
| change.entrySet()) { |
| Resource targetCapability = entry.getValue().getValue(); |
| Resource currCapability = entry.getValue().getKey().getResource(); |
| int version = entry.getValue().getKey().getVersion(); |
| ContainerUpdateType updateType = |
| ContainerUpdateType.INCREASE_RESOURCE; |
| if (Resources.fitsIn(targetCapability, currCapability)) { |
| updateType = ContainerUpdateType.DECREASE_RESOURCE; |
| } |
| updateList.add( |
| UpdateContainerRequest.newInstance(version, entry.getKey(), |
| updateType, targetCapability, null)); |
| } |
| return updateList; |
| } |
| |
| private List<ResourceRequest> cloneAsks() { |
| List<ResourceRequest> askList = new ArrayList<ResourceRequest>(ask.size()); |
| for(ResourceRequest r : ask) { |
| // create a copy of ResourceRequest as we might change it while the |
| // RPC layer is using it to send info across |
| ResourceRequest rr = |
| ResourceRequest.newBuilder().priority(r.getPriority()) |
| .resourceName(r.getResourceName()).capability(r.getCapability()) |
| .numContainers(r.getNumContainers()) |
| .relaxLocality(r.getRelaxLocality()) |
| .nodeLabelExpression(r.getNodeLabelExpression()) |
| .executionTypeRequest(r.getExecutionTypeRequest()) |
| .allocationRequestId(r.getAllocationRequestId()) |
| .profileCapability(r.getProfileCapability()).build(); |
| askList.add(rr); |
| } |
| return askList; |
| } |
| |
| protected void removePendingReleaseRequests( |
| List<ContainerStatus> completedContainersStatuses) { |
| for (ContainerStatus containerStatus : completedContainersStatuses) { |
| pendingRelease.remove(containerStatus.getContainerId()); |
| } |
| } |
| |
| protected void removePendingChangeRequests( |
| List<UpdatedContainer> changedContainers) { |
| for (UpdatedContainer changedContainer : changedContainers) { |
| ContainerId containerId = changedContainer.getContainer().getId(); |
| if (pendingChange.get(containerId) == null) { |
| continue; |
| } |
| if (LOG.isDebugEnabled()) { |
| LOG.debug("RM has confirmed changed resource allocation for " |
| + "container " + containerId + ". Current resource allocation:" |
| + changedContainer.getContainer().getResource() |
| + ". Remove pending change request:" |
| + pendingChange.get(containerId).getValue()); |
| } |
| pendingChange.remove(containerId); |
| } |
| } |
| |
| @Private |
| @VisibleForTesting |
| protected void populateNMTokens(List<NMToken> nmTokens) { |
| for (NMToken token : nmTokens) { |
| String nodeId = token.getNodeId().toString(); |
| if (LOG.isDebugEnabled()) { |
| if (getNMTokenCache().containsToken(nodeId)) { |
| LOG.debug("Replacing token for : " + nodeId); |
| } else { |
| LOG.debug("Received new token for : " + nodeId); |
| } |
| } |
| getNMTokenCache().setToken(nodeId, token.getToken()); |
| } |
| } |
| |
| @Override |
| public void unregisterApplicationMaster(FinalApplicationStatus appStatus, |
| String appMessage, String appTrackingUrl) throws YarnException, |
| IOException { |
| Preconditions.checkArgument(appStatus != null, |
| "AppStatus should not be null."); |
| FinishApplicationMasterRequest request = |
| FinishApplicationMasterRequest.newInstance(appStatus, appMessage, |
| appTrackingUrl); |
| try { |
| while (true) { |
| FinishApplicationMasterResponse response = |
| rmClient.finishApplicationMaster(request); |
| if (response.getIsUnregistered()) { |
| break; |
| } |
| LOG.info("Waiting for application to be successfully unregistered."); |
| Thread.sleep(100); |
| } |
| } catch (InterruptedException e) { |
| LOG.info("Interrupted while waiting for application" |
| + " to be removed from RMStateStore"); |
| } catch (ApplicationMasterNotRegisteredException e) { |
| LOG.warn("ApplicationMaster is out of sync with ResourceManager," |
| + " hence resyncing."); |
| // re register with RM |
| registerApplicationMaster(); |
| unregisterApplicationMaster(appStatus, appMessage, appTrackingUrl); |
| } |
| } |
| |
| @Override |
| public synchronized void addContainerRequest(T req) { |
| Preconditions.checkArgument(req != null, |
| "Resource request can not be null."); |
| ProfileCapability profileCapability = ProfileCapability |
| .newInstance(req.getResourceProfile(), req.getCapability()); |
| Set<String> dedupedRacks = new HashSet<String>(); |
| if (req.getRacks() != null) { |
| dedupedRacks.addAll(req.getRacks()); |
| if(req.getRacks().size() != dedupedRacks.size()) { |
| Joiner joiner = Joiner.on(','); |
| LOG.warn("ContainerRequest has duplicate racks: " |
| + joiner.join(req.getRacks())); |
| } |
| } |
| Set<String> inferredRacks = resolveRacks(req.getNodes()); |
| inferredRacks.removeAll(dedupedRacks); |
| |
| checkResourceProfile(req.getResourceProfile()); |
| |
| // check that specific and non-specific requests cannot be mixed within a |
| // priority |
| checkLocalityRelaxationConflict(req.getAllocationRequestId(), |
| req.getPriority(), ANY_LIST, req.getRelaxLocality()); |
| // check that specific rack cannot be mixed with specific node within a |
| // priority. If node and its rack are both specified then they must be |
| // in the same request. |
| // For explicitly requested racks, we set locality relaxation to true |
| checkLocalityRelaxationConflict(req.getAllocationRequestId(), |
| req.getPriority(), dedupedRacks, true); |
| checkLocalityRelaxationConflict(req.getAllocationRequestId(), |
| req.getPriority(), inferredRacks, req.getRelaxLocality()); |
| // check if the node label expression specified is valid |
| checkNodeLabelExpression(req); |
| |
| if (req.getNodes() != null) { |
| HashSet<String> dedupedNodes = new HashSet<String>(req.getNodes()); |
| if(dedupedNodes.size() != req.getNodes().size()) { |
| Joiner joiner = Joiner.on(','); |
| LOG.warn("ContainerRequest has duplicate nodes: " |
| + joiner.join(req.getNodes())); |
| } |
| for (String node : dedupedNodes) { |
| addResourceRequest(req.getPriority(), node, |
| req.getExecutionTypeRequest(), profileCapability, req, true, |
| req.getNodeLabelExpression()); |
| } |
| } |
| |
| for (String rack : dedupedRacks) { |
| addResourceRequest(req.getPriority(), rack, req.getExecutionTypeRequest(), |
| profileCapability, req, true, req.getNodeLabelExpression()); |
| } |
| |
| // Ensure node requests are accompanied by requests for |
| // corresponding rack |
| for (String rack : inferredRacks) { |
| addResourceRequest(req.getPriority(), rack, req.getExecutionTypeRequest(), |
| profileCapability, req, req.getRelaxLocality(), |
| req.getNodeLabelExpression()); |
| } |
| // Off-switch |
| addResourceRequest(req.getPriority(), ResourceRequest.ANY, |
| req.getExecutionTypeRequest(), profileCapability, req, |
| req.getRelaxLocality(), req.getNodeLabelExpression()); |
| } |
| |
| @Override |
| public synchronized void removeContainerRequest(T req) { |
| Preconditions.checkArgument(req != null, |
| "Resource request can not be null."); |
| ProfileCapability profileCapability = ProfileCapability |
| .newInstance(req.getResourceProfile(), req.getCapability()); |
| Set<String> allRacks = new HashSet<String>(); |
| if (req.getRacks() != null) { |
| allRacks.addAll(req.getRacks()); |
| } |
| allRacks.addAll(resolveRacks(req.getNodes())); |
| |
| // Update resource requests |
| if (req.getNodes() != null) { |
| for (String node : new HashSet<String>(req.getNodes())) { |
| decResourceRequest(req.getPriority(), node, |
| req.getExecutionTypeRequest(), profileCapability, req); |
| } |
| } |
| |
| for (String rack : allRacks) { |
| decResourceRequest(req.getPriority(), rack, |
| req.getExecutionTypeRequest(), profileCapability, req); |
| } |
| |
| decResourceRequest(req.getPriority(), ResourceRequest.ANY, |
| req.getExecutionTypeRequest(), profileCapability, req); |
| } |
| |
| @Override |
| public synchronized void requestContainerResourceChange( |
| Container container, Resource capability) { |
| validateContainerResourceChangeRequest( |
| container.getId(), container.getResource(), capability); |
| if (change.get(container.getId()) == null) { |
| change.put(container.getId(), |
| new SimpleEntry<>(container, capability)); |
| } else { |
| change.get(container.getId()).setValue(capability); |
| } |
| if (pendingChange.get(container.getId()) == null) { |
| pendingChange.put(container.getId(), |
| new SimpleEntry<>(container, capability)); |
| } else { |
| pendingChange.get(container.getId()).setValue(capability); |
| } |
| } |
| |
| @Override |
| public synchronized void releaseAssignedContainer(ContainerId containerId) { |
| Preconditions.checkArgument(containerId != null, |
| "ContainerId can not be null."); |
| pendingRelease.add(containerId); |
| release.add(containerId); |
| pendingChange.remove(containerId); |
| } |
| |
| @Override |
| public synchronized Resource getAvailableResources() { |
| return clusterAvailableResources; |
| } |
| |
| @Override |
| public synchronized int getClusterNodeCount() { |
| return clusterNodeCount; |
| } |
| |
| |
| @Override |
| @SuppressWarnings("unchecked") |
| public Collection<T> getMatchingRequests(long allocationRequestId) { |
| RemoteRequestsTable remoteRequestsTable = getTable(allocationRequestId); |
| LinkedHashSet<T> list = new LinkedHashSet<>(); |
| |
| if (remoteRequestsTable != null) { |
| Iterator<ResourceRequestInfo<T>> reqIter = |
| remoteRequestsTable.iterator(); |
| while (reqIter.hasNext()) { |
| ResourceRequestInfo<T> resReqInfo = reqIter.next(); |
| list.addAll(resReqInfo.containerRequests); |
| } |
| } |
| return list; |
| } |
| |
| @Override |
| public synchronized List<? extends Collection<T>> getMatchingRequests( |
| Priority priority, |
| String resourceName, |
| Resource capability) { |
| return getMatchingRequests(priority, resourceName, |
| ExecutionType.GUARANTEED, capability); |
| } |
| |
| @Override |
| @SuppressWarnings("unchecked") |
| public synchronized List<? extends Collection<T>> getMatchingRequests( |
| Priority priority, String resourceName, ExecutionType executionType, |
| Resource capability) { |
| ProfileCapability profileCapability = |
| ProfileCapability.newInstance(capability); |
| return getMatchingRequests(priority, resourceName, executionType, |
| profileCapability); |
| } |
| |
| @Override |
| @SuppressWarnings("unchecked") |
| public synchronized List<? extends Collection<T>> getMatchingRequests( |
| Priority priority, String resourceName, ExecutionType executionType, |
| ProfileCapability capability) { |
| Preconditions.checkArgument(capability != null, |
| "The Resource to be requested should not be null "); |
| Preconditions.checkArgument(priority != null, |
| "The priority at which to request containers should not be null "); |
| List<LinkedHashSet<T>> list = new LinkedList<LinkedHashSet<T>>(); |
| |
| RemoteRequestsTable remoteRequestsTable = getTable(0); |
| |
| if (null != remoteRequestsTable) { |
| List<ResourceRequestInfo<T>> matchingRequests = remoteRequestsTable |
| .getMatchingRequests(priority, resourceName, executionType, |
| capability); |
| if (null != matchingRequests) { |
| // If no exact match. Container may be larger than what was requested. |
| // get all resources <= capability. map is reverse sorted. |
| for (ResourceRequestInfo<T> resReqInfo : matchingRequests) { |
| if (canFit(resReqInfo.remoteRequest.getProfileCapability(), |
| capability) && !resReqInfo.containerRequests.isEmpty()) { |
| list.add(resReqInfo.containerRequests); |
| } |
| } |
| } |
| } |
| // no match found |
| return list; |
| } |
| |
| private Set<String> resolveRacks(List<String> nodes) { |
| Set<String> racks = new HashSet<String>(); |
| if (nodes != null) { |
| for (String node : nodes) { |
| // Ensure node requests are accompanied by requests for |
| // corresponding rack |
| String rack = RackResolver.resolve(node).getNetworkLocation(); |
| if (rack == null) { |
| LOG.warn("Failed to resolve rack for node " + node + "."); |
| } else { |
| racks.add(rack); |
| } |
| } |
| } |
| |
| return racks; |
| } |
| |
| /** |
| * ContainerRequests with locality relaxation cannot be made at the same |
| * priority as ContainerRequests without locality relaxation. |
| */ |
| private void checkLocalityRelaxationConflict(Long allocationReqId, |
| Priority priority, Collection<String> locations, boolean relaxLocality) { |
| // Locality relaxation will be set to relaxLocality for all implicitly |
| // requested racks. Make sure that existing rack requests match this. |
| |
| RemoteRequestsTable<T> remoteRequestsTable = getTable(allocationReqId); |
| if (remoteRequestsTable != null) { |
| @SuppressWarnings("unchecked") |
| List<ResourceRequestInfo> allCapabilityMaps = |
| remoteRequestsTable.getAllResourceRequestInfos(priority, locations); |
| for (ResourceRequestInfo reqs : allCapabilityMaps) { |
| ResourceRequest remoteRequest = reqs.remoteRequest; |
| boolean existingRelaxLocality = remoteRequest.getRelaxLocality(); |
| if (relaxLocality != existingRelaxLocality) { |
| throw new InvalidContainerRequestException("Cannot submit a " |
| + "ContainerRequest asking for location " |
| + remoteRequest.getResourceName() + " with locality relaxation " |
| + relaxLocality + " when it has already been requested" |
| + "with locality relaxation " + existingRelaxLocality); |
| } |
| } |
| } |
| } |
| |
| private void checkResourceProfile(String profile) { |
| if (resourceProfilesMap != null && !resourceProfilesMap.isEmpty() |
| && !resourceProfilesMap.containsKey(profile)) { |
| throw new InvalidContainerRequestException( |
| "Invalid profile name, valid profile names are " + resourceProfilesMap |
| .keySet()); |
| } |
| } |
| |
| /** |
| * Valid if a node label expression specified on container request is valid or |
| * not |
| * |
| * @param containerRequest |
| */ |
| private void checkNodeLabelExpression(T containerRequest) { |
| String exp = containerRequest.getNodeLabelExpression(); |
| |
| if (null == exp || exp.isEmpty()) { |
| return; |
| } |
| |
| // Don't support specifying >= 2 node labels in a node label expression now |
| if (exp.contains("&&") || exp.contains("||")) { |
| throw new InvalidContainerRequestException( |
| "Cannot specify more than two node labels" |
| + " in a single node label expression"); |
| } |
| } |
| |
| private void validateContainerResourceChangeRequest( |
| ContainerId containerId, Resource original, Resource target) { |
| Preconditions.checkArgument(containerId != null, |
| "ContainerId cannot be null"); |
| Preconditions.checkArgument(original != null, |
| "Original resource capability cannot be null"); |
| Preconditions.checkArgument(!Resources.equals(Resources.none(), original) |
| && Resources.fitsIn(Resources.none(), original), |
| "Original resource capability must be greater than 0"); |
| Preconditions.checkArgument(target != null, |
| "Target resource capability cannot be null"); |
| Preconditions.checkArgument(!Resources.equals(Resources.none(), target) |
| && Resources.fitsIn(Resources.none(), target), |
| "Target resource capability must be greater than 0"); |
| } |
| |
| private void addResourceRequestToAsk(ResourceRequest remoteRequest) { |
| // This code looks weird but is needed because of the following scenario. |
| // A ResourceRequest is removed from the remoteRequestTable. A 0 container |
| // request is added to 'ask' to notify the RM about not needing it any more. |
| // Before the call to allocate, the user now requests more containers. If |
| // the locations of the 0 size request and the new request are the same |
| // (with the difference being only container count), then the set comparator |
| // will consider both to be the same and not add the new request to ask. So |
| // we need to check for the "same" request being present and remove it and |
| // then add it back. The comparator is container count agnostic. |
| // This should happen only rarely but we do need to guard against it. |
| if(ask.contains(remoteRequest)) { |
| ask.remove(remoteRequest); |
| } |
| ask.add(remoteRequest); |
| } |
| |
| private void addResourceRequest(Priority priority, String resourceName, |
| ExecutionTypeRequest execTypeReq, ProfileCapability capability, T req, |
| boolean relaxLocality, String labelExpression) { |
| RemoteRequestsTable<T> remoteRequestsTable = |
| getTable(req.getAllocationRequestId()); |
| if (remoteRequestsTable == null) { |
| remoteRequestsTable = new RemoteRequestsTable<T>(); |
| if (this.resourceProfilesMap instanceof HashMap) { |
| remoteRequestsTable.setResourceComparator( |
| new ProfileCapabilityComparator((HashMap) resourceProfilesMap)); |
| } |
| putTable(req.getAllocationRequestId(), remoteRequestsTable); |
| } |
| @SuppressWarnings("unchecked") |
| ResourceRequestInfo resourceRequestInfo = remoteRequestsTable |
| .addResourceRequest(req.getAllocationRequestId(), priority, |
| resourceName, execTypeReq, capability, req, relaxLocality, |
| labelExpression); |
| |
| // Note this down for next interaction with ResourceManager |
| addResourceRequestToAsk(resourceRequestInfo.remoteRequest); |
| |
| if (LOG.isDebugEnabled()) { |
| LOG.debug("Adding request to ask " + resourceRequestInfo.remoteRequest); |
| LOG.debug("addResourceRequest:" + " applicationId=" |
| + " priority=" + priority.getPriority() |
| + " resourceName=" + resourceName + " numContainers=" |
| + resourceRequestInfo.remoteRequest.getNumContainers() |
| + " #asks=" + ask.size()); |
| } |
| } |
| |
| private void decResourceRequest(Priority priority, String resourceName, |
| ExecutionTypeRequest execTypeReq, ProfileCapability capability, T req) { |
| RemoteRequestsTable<T> remoteRequestsTable = |
| getTable(req.getAllocationRequestId()); |
| if (remoteRequestsTable != null) { |
| @SuppressWarnings("unchecked") |
| ResourceRequestInfo resourceRequestInfo = |
| remoteRequestsTable.decResourceRequest(priority, resourceName, |
| execTypeReq, capability, req); |
| // send the ResourceRequest to RM even if is 0 because it needs to |
| // override a previously sent value. If ResourceRequest was not sent |
| // previously then sending 0 aught to be a no-op on RM |
| if (resourceRequestInfo != null) { |
| addResourceRequestToAsk(resourceRequestInfo.remoteRequest); |
| |
| // delete entry from map if no longer needed |
| if (resourceRequestInfo.remoteRequest.getNumContainers() == 0) { |
| remoteRequestsTable.remove(priority, resourceName, |
| execTypeReq.getExecutionType(), capability); |
| } |
| |
| if (LOG.isDebugEnabled()) { |
| LOG.debug("AFTER decResourceRequest:" |
| + " allocationRequestId=" + req.getAllocationRequestId() |
| + " priority=" + priority.getPriority() |
| + " resourceName=" + resourceName + " numContainers=" |
| + resourceRequestInfo.remoteRequest.getNumContainers() |
| + " #asks=" + ask.size()); |
| } |
| } |
| } else { |
| LOG.info("No remoteRequestTable found with allocationRequestId=" |
| + req.getAllocationRequestId()); |
| } |
| } |
| |
| @Override |
| public synchronized void updateBlacklist(List<String> blacklistAdditions, |
| List<String> blacklistRemovals) { |
| |
| if (blacklistAdditions != null) { |
| this.blacklistAdditions.addAll(blacklistAdditions); |
| this.blacklistedNodes.addAll(blacklistAdditions); |
| // if some resources are also in blacklistRemovals updated before, we |
| // should remove them here. |
| this.blacklistRemovals.removeAll(blacklistAdditions); |
| } |
| |
| if (blacklistRemovals != null) { |
| this.blacklistRemovals.addAll(blacklistRemovals); |
| this.blacklistedNodes.removeAll(blacklistRemovals); |
| // if some resources are in blacklistAdditions before, we should remove |
| // them here. |
| this.blacklistAdditions.removeAll(blacklistRemovals); |
| } |
| |
| if (blacklistAdditions != null && blacklistRemovals != null |
| && blacklistAdditions.removeAll(blacklistRemovals)) { |
| // we allow resources to appear in addition list and removal list in the |
| // same invocation of updateBlacklist(), but should get a warn here. |
| LOG.warn("The same resources appear in both blacklistAdditions and " + |
| "blacklistRemovals in updateBlacklist."); |
| } |
| } |
| |
| private void updateAMRMToken(Token token) throws IOException { |
| org.apache.hadoop.security.token.Token<AMRMTokenIdentifier> amrmToken = |
| new org.apache.hadoop.security.token.Token<AMRMTokenIdentifier>(token |
| .getIdentifier().array(), token.getPassword().array(), new Text( |
| token.getKind()), new Text(token.getService())); |
| // Preserve the token service sent by the RM when adding the token |
| // to ensure we replace the previous token setup by the RM. |
| // Afterwards we can update the service address for the RPC layer. |
| UserGroupInformation currentUGI = UserGroupInformation.getCurrentUser(); |
| LOG.info("Updating with new AMRMToken"); |
| currentUGI.addToken(amrmToken); |
| amrmToken.setService(ClientRMProxy.getAMRMTokenService(getConfig())); |
| } |
| |
| @VisibleForTesting |
| RemoteRequestsTable<T> getTable(long allocationRequestId) { |
| return remoteRequests.get(Long.valueOf(allocationRequestId)); |
| } |
| |
| RemoteRequestsTable<T> putTable(long allocationRequestId, |
| RemoteRequestsTable<T> table) { |
| return remoteRequests.put(Long.valueOf(allocationRequestId), table); |
| } |
| } |