| /** |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| |
| package org.apache.tez.dag.app.rm; |
| |
| import java.io.IOException; |
| import java.util.Collection; |
| import java.util.Collections; |
| import java.util.Comparator; |
| import java.util.HashMap; |
| import java.util.Iterator; |
| import java.util.LinkedHashMap; |
| import java.util.List; |
| import java.util.Map; |
| import java.util.Map.Entry; |
| import java.util.Set; |
| import java.util.concurrent.ConcurrentHashMap; |
| import java.util.concurrent.ExecutorService; |
| import java.util.concurrent.Executors; |
| import java.util.concurrent.PriorityBlockingQueue; |
| import java.util.concurrent.TimeUnit; |
| import java.util.concurrent.atomic.AtomicBoolean; |
| import java.util.concurrent.atomic.AtomicInteger; |
| |
| import org.apache.commons.logging.Log; |
| import org.apache.commons.logging.LogFactory; |
| import org.apache.commons.math3.random.RandomDataGenerator; |
| import org.apache.hadoop.classification.InterfaceAudience.Private; |
| import org.apache.hadoop.conf.Configuration; |
| import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterResponse; |
| import org.apache.hadoop.yarn.api.records.Container; |
| import org.apache.hadoop.yarn.api.records.ContainerId; |
| import org.apache.hadoop.yarn.api.records.ContainerStatus; |
| import org.apache.hadoop.yarn.api.records.NodeId; |
| import org.apache.hadoop.yarn.api.records.NodeReport; |
| import org.apache.hadoop.yarn.api.records.Priority; |
| import org.apache.hadoop.yarn.api.records.Resource; |
| import org.apache.hadoop.yarn.api.records.ResourceRequest; |
| import org.apache.hadoop.yarn.client.api.AMRMClient.ContainerRequest; |
| import org.apache.hadoop.yarn.client.api.async.AMRMClientAsync; |
| import org.apache.hadoop.yarn.exceptions.YarnException; |
| import org.apache.hadoop.yarn.util.RackResolver; |
| import org.apache.hadoop.yarn.util.resource.Resources; |
| import org.apache.tez.dag.api.TezConfiguration; |
| import org.apache.tez.dag.api.TezUncheckedException; |
| import org.apache.tez.dag.app.AppContext; |
| import org.apache.tez.dag.app.DAGAppMasterState; |
| import org.apache.tez.dag.app.rm.TaskSchedulerService.TaskSchedulerAppCallback.AppFinalStatus; |
| import org.apache.tez.dag.app.rm.container.ContainerSignatureMatcher; |
| |
| import com.google.common.annotations.VisibleForTesting; |
| import com.google.common.base.Preconditions; |
| import com.google.common.collect.Lists; |
| import com.google.common.collect.Maps; |
| import com.google.common.collect.Sets; |
| import com.google.common.util.concurrent.ThreadFactoryBuilder; |
| |
| /* TODO not yet updating cluster nodes on every allocate response |
| * from RMContainerRequestor |
| import org.apache.tez.dag.app.rm.node.AMNodeEventNodeCountUpdated; |
| if (clusterNmCount != lastClusterNmCount) { |
| LOG.info("Num cluster nodes changed from " + lastClusterNmCount + " to " |
| + clusterNmCount); |
| eventHandler.handle(new AMNodeEventNodeCountUpdated(clusterNmCount)); |
| } |
| */ |
| public class YarnTaskSchedulerService extends TaskSchedulerService |
| implements AMRMClientAsync.CallbackHandler { |
| private static final Log LOG = LogFactory.getLog(YarnTaskSchedulerService.class); |
| |
| |
| |
| final TezAMRMClientAsync<CookieContainerRequest> amRmClient; |
| final TaskSchedulerAppCallback realAppClient; |
| final TaskSchedulerAppCallback appClientDelegate; |
| final ContainerSignatureMatcher containerSignatureMatcher; |
| ExecutorService appCallbackExecutor; |
| |
| // Container Re-Use configuration |
| private boolean shouldReuseContainers; |
| private boolean reuseRackLocal; |
| private boolean reuseNonLocal; |
| |
| Map<Object, CookieContainerRequest> taskRequests = |
| new HashMap<Object, CookieContainerRequest>(); |
| // LinkedHashMap is need in getProgress() |
| LinkedHashMap<Object, Container> taskAllocations = |
| new LinkedHashMap<Object, Container>(); |
| /** |
| * Tracks last task assigned to a known container. |
| */ |
| Map<ContainerId, Object> containerAssignments = |
| new HashMap<ContainerId, Object>(); |
| // Remove inUse depending on resolution of TEZ-1129 |
| Set<ContainerId> inUseContainers = Sets.newHashSet(); |
| HashMap<ContainerId, Object> releasedContainers = |
| new HashMap<ContainerId, Object>(); |
| /** |
| * Map of containers currently being held by the TaskScheduler. |
| */ |
| Map<ContainerId, HeldContainer> heldContainers = |
| new HashMap<ContainerId, HeldContainer>(); |
| |
| Set<Priority> priorityHasAffinity = Sets.newHashSet(); |
| |
| Set<NodeId> blacklistedNodes = Collections |
| .newSetFromMap(new ConcurrentHashMap<NodeId, Boolean>()); |
| |
| Resource totalResources = Resource.newInstance(0, 0); |
| Resource allocatedResources = Resource.newInstance(0, 0); |
| long numHeartbeats = 0; |
| long heartbeatAtLastPreemption = 0; |
| int numHeartbeatsBetweenPreemptions = 0; |
| |
| final String appHostName; |
| final int appHostPort; |
| final String appTrackingUrl; |
| final AppContext appContext; |
| private AtomicBoolean hasUnregistered = new AtomicBoolean(false); |
| |
| AtomicBoolean isStopped = new AtomicBoolean(false); |
| |
| private ContainerAssigner NODE_LOCAL_ASSIGNER = new NodeLocalContainerAssigner(); |
| private ContainerAssigner RACK_LOCAL_ASSIGNER = new RackLocalContainerAssigner(); |
| private ContainerAssigner NON_LOCAL_ASSIGNER = new NonLocalContainerAssigner(); |
| |
| DelayedContainerManager delayedContainerManager; |
| long localitySchedulingDelay; |
| long idleContainerTimeoutMin; |
| long idleContainerTimeoutMax = 0; |
| int sessionNumMinHeldContainers = 0; |
| int preemptionPercentage = 0; |
| |
| Set<ContainerId> sessionMinHeldContainers = Sets.newHashSet(); |
| |
| RandomDataGenerator random = new RandomDataGenerator(); |
| |
| @VisibleForTesting |
| protected AtomicBoolean shouldUnregister = new AtomicBoolean(false); |
| |
| class CRCookie { |
| // Do not use these variables directly. Can caused mocked unit tests to fail. |
| private Object task; |
| private Object appCookie; |
| private Object containerSignature; |
| |
| CRCookie(Object task, Object appCookie, Object containerSignature) { |
| this.task = task; |
| this.appCookie = appCookie; |
| this.containerSignature = containerSignature; |
| } |
| |
| Object getTask() { |
| return task; |
| } |
| |
| Object getAppCookie() { |
| return appCookie; |
| } |
| |
| Object getContainerSignature() { |
| return containerSignature; |
| } |
| } |
| |
| class CookieContainerRequest extends ContainerRequest { |
| CRCookie cookie; |
| ContainerId affinitizedContainerId; |
| |
| public CookieContainerRequest( |
| Resource capability, |
| String[] hosts, |
| String[] racks, |
| Priority priority, |
| CRCookie cookie) { |
| super(capability, hosts, racks, priority); |
| this.cookie = cookie; |
| } |
| |
| public CookieContainerRequest( |
| Resource capability, |
| ContainerId containerId, |
| String[] hosts, |
| String[] racks, |
| Priority priority, |
| CRCookie cookie) { |
| this(capability, hosts, racks, priority, cookie); |
| this.affinitizedContainerId = containerId; |
| } |
| |
| CRCookie getCookie() { |
| return cookie; |
| } |
| |
| ContainerId getAffinitizedContainer() { |
| return affinitizedContainerId; |
| } |
| } |
| |
| public YarnTaskSchedulerService(TaskSchedulerAppCallback appClient, |
| ContainerSignatureMatcher containerSignatureMatcher, |
| String appHostName, |
| int appHostPort, |
| String appTrackingUrl, |
| AppContext appContext) { |
| super(YarnTaskSchedulerService.class.getName()); |
| this.realAppClient = appClient; |
| this.appCallbackExecutor = createAppCallbackExecutorService(); |
| this.containerSignatureMatcher = containerSignatureMatcher; |
| this.appClientDelegate = createAppCallbackDelegate(appClient); |
| this.amRmClient = TezAMRMClientAsync.createAMRMClientAsync(1000, this); |
| this.appHostName = appHostName; |
| this.appHostPort = appHostPort; |
| this.appTrackingUrl = appTrackingUrl; |
| this.appContext = appContext; |
| } |
| |
| @Private |
| @VisibleForTesting |
| YarnTaskSchedulerService(TaskSchedulerAppCallback appClient, |
| ContainerSignatureMatcher containerSignatureMatcher, |
| String appHostName, |
| int appHostPort, |
| String appTrackingUrl, |
| TezAMRMClientAsync<CookieContainerRequest> client, |
| AppContext appContext) { |
| super(YarnTaskSchedulerService.class.getName()); |
| this.realAppClient = appClient; |
| this.appCallbackExecutor = createAppCallbackExecutorService(); |
| this.containerSignatureMatcher = containerSignatureMatcher; |
| this.appClientDelegate = createAppCallbackDelegate(appClient); |
| this.amRmClient = client; |
| this.appHostName = appHostName; |
| this.appHostPort = appHostPort; |
| this.appTrackingUrl = appTrackingUrl; |
| this.appContext = appContext; |
| } |
| |
| @VisibleForTesting |
| ExecutorService createAppCallbackExecutorService() { |
| return Executors.newSingleThreadExecutor(new ThreadFactoryBuilder() |
| .setNameFormat("TaskSchedulerAppCaller #%d").setDaemon(true).build()); |
| } |
| |
| @Override |
| public Resource getAvailableResources() { |
| return amRmClient.getAvailableResources(); |
| } |
| |
| @Override |
| public int getClusterNodeCount() { |
| // this can potentially be cheaper after YARN-1722 |
| return amRmClient.getClusterNodeCount(); |
| } |
| |
| TaskSchedulerAppCallback createAppCallbackDelegate( |
| TaskSchedulerAppCallback realAppClient) { |
| return new TaskSchedulerAppCallbackWrapper(realAppClient, |
| appCallbackExecutor); |
| } |
| |
| @Override |
| public void setShouldUnregister() { |
| this.shouldUnregister.set(true); |
| } |
| |
| @Override |
| public boolean hasUnregistered() { |
| return hasUnregistered.get(); |
| } |
| |
| // AbstractService methods |
| @Override |
| public synchronized void serviceInit(Configuration conf) { |
| |
| amRmClient.init(conf); |
| int heartbeatIntervalMax = conf.getInt( |
| TezConfiguration.TEZ_AM_RM_HEARTBEAT_INTERVAL_MS_MAX, |
| TezConfiguration.TEZ_AM_RM_HEARTBEAT_INTERVAL_MS_MAX_DEFAULT); |
| amRmClient.setHeartbeatInterval(heartbeatIntervalMax); |
| |
| shouldReuseContainers = conf.getBoolean( |
| TezConfiguration.TEZ_AM_CONTAINER_REUSE_ENABLED, |
| TezConfiguration.TEZ_AM_CONTAINER_REUSE_ENABLED_DEFAULT); |
| reuseRackLocal = conf.getBoolean( |
| TezConfiguration.TEZ_AM_CONTAINER_REUSE_RACK_FALLBACK_ENABLED, |
| TezConfiguration.TEZ_AM_CONTAINER_REUSE_RACK_FALLBACK_ENABLED_DEFAULT); |
| reuseNonLocal = conf |
| .getBoolean( |
| TezConfiguration.TEZ_AM_CONTAINER_REUSE_NON_LOCAL_FALLBACK_ENABLED, |
| TezConfiguration.TEZ_AM_CONTAINER_REUSE_NON_LOCAL_FALLBACK_ENABLED_DEFAULT); |
| Preconditions.checkArgument( |
| ((!reuseRackLocal && !reuseNonLocal) || (reuseRackLocal)), |
| "Re-use Rack-Local cannot be disabled if Re-use Non-Local has been" |
| + " enabled"); |
| |
| localitySchedulingDelay = conf.getLong( |
| TezConfiguration.TEZ_AM_CONTAINER_REUSE_LOCALITY_DELAY_ALLOCATION_MILLIS, |
| TezConfiguration.TEZ_AM_CONTAINER_REUSE_LOCALITY_DELAY_ALLOCATION_MILLIS_DEFAULT); |
| Preconditions.checkArgument(localitySchedulingDelay >= 0, |
| "Locality Scheduling delay should be >=0"); |
| |
| idleContainerTimeoutMin = conf.getLong( |
| TezConfiguration.TEZ_AM_CONTAINER_IDLE_RELEASE_TIMEOUT_MIN_MILLIS, |
| TezConfiguration.TEZ_AM_CONTAINER_IDLE_RELEASE_TIMEOUT_MIN_MILLIS_DEFAULT); |
| Preconditions.checkArgument(idleContainerTimeoutMin >= 0 || idleContainerTimeoutMin == -1, |
| "Idle container release min timeout should be either -1 or >=0"); |
| |
| idleContainerTimeoutMax = conf.getLong( |
| TezConfiguration.TEZ_AM_CONTAINER_IDLE_RELEASE_TIMEOUT_MAX_MILLIS, |
| TezConfiguration.TEZ_AM_CONTAINER_IDLE_RELEASE_TIMEOUT_MAX_MILLIS_DEFAULT); |
| Preconditions.checkArgument( |
| idleContainerTimeoutMax >= 0 && idleContainerTimeoutMax >= idleContainerTimeoutMin, |
| "Idle container release max timeout should be >=0 and >= " + |
| TezConfiguration.TEZ_AM_CONTAINER_IDLE_RELEASE_TIMEOUT_MIN_MILLIS); |
| |
| sessionNumMinHeldContainers = conf.getInt(TezConfiguration.TEZ_AM_SESSION_MIN_HELD_CONTAINERS, |
| TezConfiguration.TEZ_AM_SESSION_MIN_HELD_CONTAINERS_DEFAULT); |
| Preconditions.checkArgument(sessionNumMinHeldContainers >= 0, |
| "Session minimum held containers should be >=0"); |
| |
| preemptionPercentage = conf.getInt(TezConfiguration.TEZ_AM_PREEMPTION_PERCENTAGE, |
| TezConfiguration.TEZ_AM_PREEMPTION_PERCENTAGE_DEFAULT); |
| Preconditions.checkArgument(preemptionPercentage >= 0 && preemptionPercentage <= 100, |
| "Preemption percentage should be between 0-100"); |
| |
| numHeartbeatsBetweenPreemptions = conf.getInt( |
| TezConfiguration.TEZ_AM_PREEMPTION_HEARTBEATS_BETWEEN_PREEMPTIONS, |
| TezConfiguration.TEZ_AM_PREEMPTION_HEARTBEATS_BETWEEN_PREEMPTIONS_DEFAULT); |
| Preconditions.checkArgument(numHeartbeatsBetweenPreemptions >= 1, |
| "Heartbeats between preemptions should be >=1"); |
| |
| delayedContainerManager = new DelayedContainerManager(); |
| LOG.info("TaskScheduler initialized with configuration: " + |
| "maxRMHeartbeatInterval: " + heartbeatIntervalMax + |
| ", containerReuseEnabled: " + shouldReuseContainers + |
| ", reuseRackLocal: " + reuseRackLocal + |
| ", reuseNonLocal: " + reuseNonLocal + |
| ", localitySchedulingDelay: " + localitySchedulingDelay + |
| ", preemptionPercentage: " + preemptionPercentage + |
| ", numHeartbeatsBetweenPreemptions" + numHeartbeatsBetweenPreemptions + |
| ", idleContainerMinTimeout=" + idleContainerTimeoutMin + |
| ", idleContainerMaxTimeout=" + idleContainerTimeoutMax + |
| ", sessionMinHeldContainers=" + sessionNumMinHeldContainers); |
| } |
| |
| @Override |
| public void serviceStart() { |
| try { |
| RegisterApplicationMasterResponse response; |
| synchronized (this) { |
| amRmClient.start(); |
| response = amRmClient.registerApplicationMaster(appHostName, |
| appHostPort, |
| appTrackingUrl); |
| } |
| // upcall to app outside locks |
| appClientDelegate.setApplicationRegistrationData( |
| response.getMaximumResourceCapability(), |
| response.getApplicationACLs(), |
| response.getClientToAMTokenMasterKey()); |
| |
| delayedContainerManager.start(); |
| } catch (YarnException e) { |
| LOG.error("Yarn Exception while registering", e); |
| throw new TezUncheckedException(e); |
| } catch (IOException e) { |
| LOG.error("IO Exception while registering", e); |
| throw new TezUncheckedException(e); |
| } |
| } |
| |
| @Override |
| public void serviceStop() throws InterruptedException { |
| // upcall to app outside of locks |
| try { |
| delayedContainerManager.shutdown(); |
| // Wait for contianers to be released. |
| delayedContainerManager.join(2000l); |
| synchronized (this) { |
| isStopped.set(true); |
| if (shouldUnregister.get()) { |
| AppFinalStatus status = appClientDelegate.getFinalAppStatus(); |
| LOG.info("Unregistering application from RM" |
| + ", exitStatus=" + status.exitStatus |
| + ", exitMessage=" + status.exitMessage |
| + ", trackingURL=" + status.postCompletionTrackingUrl); |
| amRmClient.unregisterApplicationMaster(status.exitStatus, |
| status.exitMessage, |
| status.postCompletionTrackingUrl); |
| LOG.info("Successfully unregistered application from RM"); |
| hasUnregistered.set(true); |
| } |
| } |
| |
| // call client.stop() without lock client will attempt to stop the callback |
| // operation and at the same time the callback operation might be trying |
| // to get our lock. |
| amRmClient.stop(); |
| appCallbackExecutor.shutdown(); |
| appCallbackExecutor.awaitTermination(1000l, TimeUnit.MILLISECONDS); |
| } catch (YarnException e) { |
| LOG.error("Yarn Exception while unregistering ", e); |
| throw new TezUncheckedException(e); |
| } catch (IOException e) { |
| LOG.error("IOException while unregistering ", e); |
| throw new TezUncheckedException(e); |
| } |
| } |
| |
| // AMRMClientAsync interface methods |
| @Override |
| public void onContainersCompleted(List<ContainerStatus> statuses) { |
| if (isStopped.get()) { |
| return; |
| } |
| Map<Object, ContainerStatus> appContainerStatus = |
| new HashMap<Object, ContainerStatus>(statuses.size()); |
| synchronized (this) { |
| for(ContainerStatus containerStatus : statuses) { |
| ContainerId completedId = containerStatus.getContainerId(); |
| HeldContainer delayedContainer = heldContainers.get(completedId); |
| |
| Object task = releasedContainers.remove(completedId); |
| if(task != null){ |
| if (delayedContainer != null) { |
| LOG.warn("Held container should be null since releasedContainer is not"); |
| } |
| // TODO later we may want to check if exit code matched expectation |
| // e.g. successful container should not come back fail exit code after |
| // being released |
| // completion of a container we had released earlier |
| // an allocated container completed. notify app |
| LOG.info("Released container completed:" + completedId + |
| " last allocated to task: " + task); |
| appContainerStatus.put(task, containerStatus); |
| continue; |
| } |
| |
| // not found in released containers. check currently allocated containers |
| // no need to release this container as the RM has already completed it |
| task = unAssignContainer(completedId, false); |
| if (delayedContainer != null) { |
| heldContainers.remove(completedId); |
| Resources.subtract(allocatedResources, delayedContainer.getContainer().getResource()); |
| } else { |
| LOG.warn("Held container expected to be not null for a non-AM-released container"); |
| } |
| if(task != null) { |
| // completion of a container we have allocated currently |
| // an allocated container completed. notify app |
| LOG.info("Allocated container completed:" + completedId + |
| " last allocated to task: " + task); |
| appContainerStatus.put(task, containerStatus); |
| continue; |
| } |
| |
| // container neither allocated nor released |
| LOG.info("Ignoring unknown container: " + containerStatus.getContainerId()); |
| } |
| } |
| |
| // upcall to app must be outside locks |
| for (Entry<Object, ContainerStatus> entry : appContainerStatus.entrySet()) { |
| appClientDelegate.containerCompleted(entry.getKey(), entry.getValue()); |
| } |
| } |
| |
| @Override |
| public void onContainersAllocated(List<Container> containers) { |
| if (isStopped.get()) { |
| return; |
| } |
| Map<CookieContainerRequest, Container> assignedContainers; |
| |
| if (LOG.isDebugEnabled()) { |
| StringBuilder sb = new StringBuilder(); |
| for (Container container: containers) { |
| sb.append(container.getId()).append(", "); |
| } |
| LOG.debug("Assigned New Containers: " + sb.toString()); |
| } |
| |
| synchronized (this) { |
| if (!shouldReuseContainers) { |
| List<Container> modifiableContainerList = Lists.newLinkedList(containers); |
| assignedContainers = assignNewlyAllocatedContainers( |
| modifiableContainerList); |
| } else { |
| // unify allocations |
| pushNewContainerToDelayed(containers); |
| return; |
| } |
| } |
| |
| // upcall to app must be outside locks |
| informAppAboutAssignments(assignedContainers); |
| } |
| |
| /** |
| * Tries assigning the list of specified containers. Optionally, release |
| * containers or add them to the delayed container queue. |
| * |
| * The flags apply to all containers in the specified lists. So, separate |
| * calls should be made based on the expected behaviour. |
| * |
| * @param containers |
| * The list of containers to be assigned. The list *may* be modified |
| * in place based on allocations and releases. |
| * @return Assignments. |
| */ |
| private synchronized Map<CookieContainerRequest, Container> |
| assignNewlyAllocatedContainers(Iterable<Container> containers) { |
| |
| Map<CookieContainerRequest, Container> assignedContainers = |
| new HashMap<CookieContainerRequest, Container>(); |
| assignNewContainersWithLocation(containers, |
| NODE_LOCAL_ASSIGNER, assignedContainers); |
| assignNewContainersWithLocation(containers, |
| RACK_LOCAL_ASSIGNER, assignedContainers); |
| assignNewContainersWithLocation(containers, |
| NON_LOCAL_ASSIGNER, assignedContainers); |
| |
| // Release any unassigned containers given by the RM |
| releaseUnassignedContainers(containers); |
| |
| return assignedContainers; |
| } |
| |
| private synchronized Map<CookieContainerRequest, Container> |
| tryAssignReUsedContainers(Iterable<Container> containers) { |
| |
| Map<CookieContainerRequest, Container> assignedContainers = |
| new HashMap<CookieContainerRequest, Container>(); |
| |
| // Honor locality and match as many as possible |
| assignReUsedContainersWithLocation(containers, |
| NODE_LOCAL_ASSIGNER, assignedContainers, true); |
| assignReUsedContainersWithLocation(containers, |
| RACK_LOCAL_ASSIGNER, assignedContainers, true); |
| assignReUsedContainersWithLocation(containers, |
| NON_LOCAL_ASSIGNER, assignedContainers, true); |
| |
| return assignedContainers; |
| } |
| |
| @VisibleForTesting |
| long getHeldContainerExpireTime(long startTime) { |
| long expireTime = (startTime + idleContainerTimeoutMin); |
| if (idleContainerTimeoutMin != -1 && idleContainerTimeoutMin < idleContainerTimeoutMax) { |
| long expireTimeMax = startTime + idleContainerTimeoutMax; |
| expireTime = random.nextLong(expireTime, expireTimeMax); |
| } |
| |
| return expireTime; |
| } |
| |
| /** |
| * Try to assign a re-used container |
| * @param heldContainer Container to be used to assign to tasks |
| * @return Assigned container map |
| */ |
| |
| private synchronized Map<CookieContainerRequest, Container> |
| assignDelayedContainer(HeldContainer heldContainer) { |
| |
| DAGAppMasterState state = appContext.getAMState(); |
| boolean isNew = heldContainer.isNew(); |
| if (LOG.isDebugEnabled()) { |
| LOG.debug("Trying to assign a delayed container" |
| + ", containerId=" + heldContainer.getContainer().getId() |
| + ", nextScheduleTime=" + heldContainer.getNextScheduleTime() |
| + ", containerExpiryTime=" + heldContainer.getContainerExpiryTime() |
| + ", AMState=" + state |
| + ", matchLevel=" + heldContainer.getLocalityMatchLevel() |
| + ", taskRequestsCount=" + taskRequests.size() |
| + ", heldContainers=" + heldContainers.size() |
| + ", delayedContainers=" + delayedContainerManager.delayedContainers.size() |
| + ", isNew=" + isNew); |
| } |
| |
| if (state.equals(DAGAppMasterState.IDLE) || taskRequests.isEmpty()) { |
| // reset locality level on held container |
| // if sessionDelay defined, push back into delayed queue if not already |
| // done so |
| |
| // Compute min held containers. |
| if (appContext.isSession() && sessionNumMinHeldContainers > 0 && |
| sessionMinHeldContainers.isEmpty()) { |
| // session mode and need to hold onto containers and not done so already |
| determineMinHeldContainers(); |
| } |
| |
| heldContainer.resetLocalityMatchLevel(); |
| long currentTime = System.currentTimeMillis(); |
| boolean releaseContainer = false; |
| |
| if (isNew || (heldContainer.getContainerExpiryTime() <= currentTime |
| && idleContainerTimeoutMin != -1)) { |
| // container idle timeout has expired or is a new unused container. |
| // new container is possibly a spurious race condition allocation. |
| if (!isNew && appContext.isSession() && |
| sessionMinHeldContainers.contains(heldContainer.getContainer().getId())) { |
| // Not a potentially spurious new container. |
| // In session mode and container in set of chosen min held containers |
| // increase the idle container expire time to maintain sanity with |
| // the rest of the code |
| heldContainer.setContainerExpiryTime(getHeldContainerExpireTime(currentTime)); |
| } else { |
| releaseContainer = true; |
| } |
| } |
| |
| if (releaseContainer) { |
| LOG.info("No taskRequests. Container's idle timeout delay expired or is new. " + |
| "Releasing container" |
| + ", containerId=" + heldContainer.container.getId() |
| + ", containerExpiryTime=" |
| + heldContainer.getContainerExpiryTime() |
| + ", idleTimeout=" + idleContainerTimeoutMin |
| + ", taskRequestsCount=" + taskRequests.size() |
| + ", heldContainers=" + heldContainers.size() |
| + ", delayedContainers=" + delayedContainerManager.delayedContainers.size() |
| + ", isNew=" + isNew); |
| releaseUnassignedContainers( |
| Lists.newArrayList(heldContainer.container)); |
| } else { |
| // no outstanding work and container idle timeout not expired |
| if (LOG.isDebugEnabled()) { |
| LOG.debug("Holding onto idle container with no work. CId: " |
| + heldContainer.getContainer().getId() + " with expiry: " |
| + heldContainer.getContainerExpiryTime() + " currentTime: " |
| + currentTime + " next look: " |
| + (currentTime + localitySchedulingDelay)); |
| } |
| // put back and wait for new requests until expiry |
| heldContainer.resetLocalityMatchLevel(); |
| delayedContainerManager.addDelayedContainer( |
| heldContainer.getContainer(), currentTime |
| + localitySchedulingDelay); |
| } |
| } else if (state.equals(DAGAppMasterState.RUNNING)) { |
| // clear min held containers since we need to allocate to tasks |
| sessionMinHeldContainers.clear(); |
| HeldContainer.LocalityMatchLevel localityMatchLevel = |
| heldContainer.getLocalityMatchLevel(); |
| Map<CookieContainerRequest, Container> assignedContainers = |
| new HashMap<CookieContainerRequest, Container>(); |
| |
| Container containerToAssign = heldContainer.container; |
| |
| heldContainer.incrementAssignmentAttempts(); |
| // Each time a container is seen, we try node, rack and non-local in that |
| // order depending on matching level allowed |
| |
| // if match level is NEW or NODE, match only at node-local |
| // always try node local matches for other levels |
| if (isNew |
| || localityMatchLevel.equals(HeldContainer.LocalityMatchLevel.NEW) |
| || localityMatchLevel.equals(HeldContainer.LocalityMatchLevel.NODE) |
| || localityMatchLevel.equals(HeldContainer.LocalityMatchLevel.RACK) |
| || localityMatchLevel.equals(HeldContainer.LocalityMatchLevel.NON_LOCAL)) { |
| assignReUsedContainerWithLocation(containerToAssign, |
| NODE_LOCAL_ASSIGNER, assignedContainers, true); |
| if (LOG.isDebugEnabled() && assignedContainers.isEmpty()) { |
| LOG.info("Failed to assign tasks to delayed container using node" |
| + ", containerId=" + heldContainer.getContainer().getId()); |
| } |
| } |
| |
| // if re-use allowed at rack |
| // match against rack if match level is RACK or NON-LOCAL |
| // if scheduling delay is 0, match at RACK allowed without a sleep |
| if (assignedContainers.isEmpty()) { |
| if ((reuseRackLocal || isNew) && (localitySchedulingDelay == 0 || |
| (localityMatchLevel.equals(HeldContainer.LocalityMatchLevel.RACK) |
| || localityMatchLevel.equals( |
| HeldContainer.LocalityMatchLevel.NON_LOCAL)))) { |
| assignReUsedContainerWithLocation(containerToAssign, |
| RACK_LOCAL_ASSIGNER, assignedContainers, false); |
| if (LOG.isDebugEnabled() && assignedContainers.isEmpty()) { |
| LOG.info("Failed to assign tasks to delayed container using rack" |
| + ", containerId=" + heldContainer.getContainer().getId()); |
| } |
| } |
| } |
| |
| // if re-use allowed at non-local |
| // match against rack if match level is NON-LOCAL |
| // if scheduling delay is 0, match at NON-LOCAL allowed without a sleep |
| if (assignedContainers.isEmpty()) { |
| if ((reuseNonLocal || isNew) && (localitySchedulingDelay == 0 |
| || localityMatchLevel.equals( |
| HeldContainer.LocalityMatchLevel.NON_LOCAL))) { |
| assignReUsedContainerWithLocation(containerToAssign, |
| NON_LOCAL_ASSIGNER, assignedContainers, false); |
| if (LOG.isDebugEnabled() && assignedContainers.isEmpty()) { |
| LOG.info("Failed to assign tasks to delayed container using non-local" |
| + ", containerId=" + heldContainer.getContainer().getId()); |
| } |
| } |
| } |
| |
| if (assignedContainers.isEmpty()) { |
| |
| long currentTime = System.currentTimeMillis(); |
| |
| // Release container if final expiry time is reached |
| // Dont release a new container. The RM may not give us new ones |
| // The assumption is that the expire time is larger than the sum of all |
| // locality delays. So if we hit the expire time then we have already |
| // tried to assign at all locality levels. |
| // We run the risk of not being able to retain min held containers but |
| // if we are not being able to assign containers to pending tasks then |
| // we cannot avoid releasing containers. Or else we may not be able to |
| // get new containers from YARN to match the pending request |
| if (!isNew && heldContainer.getContainerExpiryTime() <= currentTime |
| && idleContainerTimeoutMin != -1) { |
| LOG.info("Container's idle timeout expired. Releasing container" |
| + ", containerId=" + heldContainer.container.getId() |
| + ", containerExpiryTime=" |
| + heldContainer.getContainerExpiryTime() |
| + ", idleTimeoutMin=" + idleContainerTimeoutMin); |
| releaseUnassignedContainers( |
| Lists.newArrayList(heldContainer.container)); |
| } else { |
| |
| // Let's decide if this container has hit the end of the road |
| |
| // EOL true if container's match level is NON-LOCAL |
| boolean hitFinalMatchLevel = localityMatchLevel.equals( |
| HeldContainer.LocalityMatchLevel.NON_LOCAL); |
| if (!hitFinalMatchLevel) { |
| // EOL also true if locality delay is 0 |
| // or rack-local or non-local is disabled |
| heldContainer.incrementLocalityMatchLevel(); |
| if (localitySchedulingDelay == 0 || |
| (!reuseRackLocal |
| || (!reuseNonLocal && |
| heldContainer.getLocalityMatchLevel().equals( |
| HeldContainer.LocalityMatchLevel.NON_LOCAL)))) { |
| hitFinalMatchLevel = true; |
| } |
| // the above if-stmt does not apply to new containers since they will |
| // be matched at all locality levels. So there finalMatchLevel cannot |
| // be short-circuited |
| if (localitySchedulingDelay > 0 && isNew) { |
| hitFinalMatchLevel = false; |
| } |
| } |
| |
| if (hitFinalMatchLevel) { |
| boolean safeToRelease = true; |
| Priority topPendingPriority = amRmClient.getTopPriority(); |
| Priority containerPriority = heldContainer.container.getPriority(); |
| if (isNew && topPendingPriority != null && |
| containerPriority.compareTo(topPendingPriority) < 0) { |
| // this container is of lower priority and given to us by the RM for |
| // a task that will be matched after the current top priority. Keep |
| // this container for those pending tasks since the RM is not going |
| // to give this container to us again |
| safeToRelease = false; |
| } |
| |
| // Are there any pending requests at any priority? |
| // release if there are tasks or this is not a session |
| if (safeToRelease && |
| (!taskRequests.isEmpty() || !appContext.isSession())) { |
| LOG.info("Releasing held container as either there are pending but " |
| + " unmatched requests or this is not a session" |
| + ", containerId=" + heldContainer.container.getId() |
| + ", pendingTasks=" + taskRequests.size() |
| + ", isSession=" + appContext.isSession() |
| + ". isNew=" + isNew); |
| releaseUnassignedContainers( |
| Lists.newArrayList(heldContainer.container)); |
| } else { |
| // if no tasks, treat this like an idle session |
| heldContainer.resetLocalityMatchLevel(); |
| delayedContainerManager.addDelayedContainer( |
| heldContainer.getContainer(), |
| currentTime + localitySchedulingDelay); |
| } |
| } else { |
| // Schedule delay container to match at a later try |
| delayedContainerManager.addDelayedContainer( |
| heldContainer.getContainer(), |
| currentTime + localitySchedulingDelay); |
| } |
| } |
| } else if (LOG.isDebugEnabled()) { |
| LOG.debug("Delayed container assignment successful" |
| + ", containerId=" + heldContainer.getContainer().getId()); |
| } |
| |
| return assignedContainers; |
| } else { |
| // ignore all other cases? |
| LOG.warn("Received a request to assign re-used containers when AM was " |
| + " in state: " + state + ". Ignoring request and releasing container" |
| + ": " + heldContainer.getContainer().getId()); |
| releaseUnassignedContainers(Lists.newArrayList(heldContainer.container)); |
| } |
| |
| return null; |
| } |
| |
| @Override |
| public synchronized void resetMatchLocalityForAllHeldContainers() { |
| for (HeldContainer heldContainer : heldContainers.values()) { |
| heldContainer.resetLocalityMatchLevel(); |
| } |
| synchronized(delayedContainerManager) { |
| delayedContainerManager.notify(); |
| } |
| } |
| |
| @Override |
| public void onShutdownRequest() { |
| if (isStopped.get()) { |
| return; |
| } |
| // upcall to app must be outside locks |
| appClientDelegate.appShutdownRequested(); |
| } |
| |
| @Override |
| public void onNodesUpdated(List<NodeReport> updatedNodes) { |
| if (isStopped.get()) { |
| return; |
| } |
| // ignore bad nodes for now |
| // upcall to app must be outside locks |
| appClientDelegate.nodesUpdated(updatedNodes); |
| } |
| |
| @Override |
| public float getProgress() { |
| if (isStopped.get()) { |
| return 1; |
| } |
| |
| if(totalResources.getMemory() == 0) { |
| // assume this is the first allocate callback. nothing is allocated. |
| // available resource = totalResource |
| // TODO this will not handle dynamic changes in resources |
| totalResources = Resources.clone(getAvailableResources()); |
| LOG.info("App total resource memory: " + totalResources.getMemory() + |
| " cpu: " + totalResources.getVirtualCores() + |
| " taskAllocations: " + taskAllocations.size()); |
| } |
| |
| numHeartbeats++; |
| preemptIfNeeded(); |
| |
| return appClientDelegate.getProgress(); |
| } |
| |
| @Override |
| public void onError(Throwable t) { |
| if (isStopped.get()) { |
| return; |
| } |
| appClientDelegate.onError(t); |
| } |
| |
| @Override |
| public Resource getTotalResources() { |
| return totalResources; |
| } |
| |
| @Override |
| public synchronized void blacklistNode(NodeId nodeId) { |
| LOG.info("Blacklisting node: " + nodeId); |
| amRmClient.addNodeToBlacklist(nodeId); |
| blacklistedNodes.add(nodeId); |
| } |
| |
| @Override |
| public synchronized void unblacklistNode(NodeId nodeId) { |
| if (blacklistedNodes.remove(nodeId)) { |
| LOG.info("UnBlacklisting node: " + nodeId); |
| amRmClient.removeNodeFromBlacklist(nodeId); |
| } |
| } |
| |
| @Override |
| public synchronized void allocateTask( |
| Object task, |
| Resource capability, |
| String[] hosts, |
| String[] racks, |
| Priority priority, |
| Object containerSignature, |
| Object clientCookie) { |
| |
| // XXX Have ContainerContext implement an interface defined by TaskScheduler. |
| // TODO check for nulls etc |
| // TODO extra memory allocation |
| CRCookie cookie = new CRCookie(task, clientCookie, containerSignature); |
| CookieContainerRequest request = new CookieContainerRequest( |
| capability, hosts, racks, priority, cookie); |
| |
| addRequestAndTrigger(task, request, hosts, racks); |
| } |
| |
| @Override |
| public synchronized void allocateTask( |
| Object task, |
| Resource capability, |
| ContainerId containerId, |
| Priority priority, |
| Object containerSignature, |
| Object clientCookie) { |
| |
| HeldContainer heldContainer = heldContainers.get(containerId); |
| String[] hosts = null; |
| String[] racks = null; |
| if (heldContainer != null) { |
| Container container = heldContainer.getContainer(); |
| if (canFit(capability, container.getResource())) { |
| // just specify node and use YARN's soft locality constraint for the rest |
| hosts = new String[1]; |
| hosts[0] = container.getNodeId().getHost(); |
| priorityHasAffinity.add(priority); |
| } else { |
| LOG.warn("Matching requested to container: " + containerId + |
| " but requested capability: " + capability + |
| " does not fit in container resource: " + container.getResource()); |
| } |
| } else { |
| LOG.warn("Matching requested to unknown container: " + containerId); |
| } |
| |
| CRCookie cookie = new CRCookie(task, clientCookie, containerSignature); |
| CookieContainerRequest request = new CookieContainerRequest( |
| capability, containerId, hosts, racks, priority, cookie); |
| |
| addRequestAndTrigger(task, request, hosts, racks); |
| } |
| |
| private void addRequestAndTrigger(Object task, CookieContainerRequest request, |
| String[] hosts, String[] racks) { |
| addTaskRequest(task, request); |
| // See if any of the delayedContainers can be used for this task. |
| delayedContainerManager.triggerScheduling(true); |
| LOG.info("Allocation request for task: " + task + |
| " with request: " + request + |
| " host: " + ((hosts!=null&&hosts.length>0)?hosts[0]:"null") + |
| " rack: " + ((racks!=null&&racks.length>0)?racks[0]:"null")); |
| } |
| |
| /** |
| * @param task |
| * the task to de-allocate. |
| * @param taskSucceeded |
| * specify whether the task succeeded or failed. |
| * @return true if a container is assigned to this task. |
| */ |
| @Override |
| public boolean deallocateTask(Object task, boolean taskSucceeded) { |
| Map<CookieContainerRequest, Container> assignedContainers = null; |
| |
| synchronized (this) { |
| CookieContainerRequest request = removeTaskRequest(task); |
| if (request != null) { |
| // task not allocated yet |
| LOG.info("Deallocating task: " + task + " before allocation"); |
| return false; |
| } |
| |
| // task request not present. Look in allocations |
| Container container = doBookKeepingForTaskDeallocate(task); |
| if (container == null) { |
| // task neither requested nor allocated. |
| LOG.info("Ignoring removal of unknown task: " + task); |
| return false; |
| } else { |
| LOG.info("Deallocated task: " + task + " from container: " |
| + container.getId()); |
| |
| if (!taskSucceeded || !shouldReuseContainers) { |
| if (LOG.isDebugEnabled()) { |
| LOG.debug("Releasing container, containerId=" + container.getId() |
| + ", taskSucceeded=" + taskSucceeded |
| + ", reuseContainersFlag=" + shouldReuseContainers); |
| } |
| releaseContainer(container.getId()); |
| } else { |
| // Don't attempt to delay containers if delay is 0. |
| HeldContainer heldContainer = heldContainers.get(container.getId()); |
| if (heldContainer != null) { |
| heldContainer.resetLocalityMatchLevel(); |
| long currentTime = System.currentTimeMillis(); |
| if (idleContainerTimeoutMin > 0) { |
| heldContainer.setContainerExpiryTime(getHeldContainerExpireTime(currentTime)); |
| } |
| assignedContainers = assignDelayedContainer(heldContainer); |
| } else { |
| LOG.info("Skipping container after task deallocate as container is" |
| + " no longer running, containerId=" + container.getId()); |
| } |
| } |
| } |
| } |
| |
| // up call outside of the lock. |
| if (assignedContainers != null && assignedContainers.size() == 1) { |
| informAppAboutAssignments(assignedContainers); |
| } |
| return true; |
| } |
| |
| @Override |
| public synchronized Object deallocateContainer(ContainerId containerId) { |
| Object task = unAssignContainer(containerId, true); |
| if(task != null) { |
| LOG.info("Deallocated container: " + containerId + |
| " from task: " + task); |
| return task; |
| } |
| |
| LOG.info("Ignoring dealloction of unknown container: " + containerId); |
| return null; |
| } |
| |
| boolean canFit(Resource arg0, Resource arg1) { |
| int mem0 = arg0.getMemory(); |
| int mem1 = arg1.getMemory(); |
| int cpu0 = arg0.getVirtualCores(); |
| int cpu1 = arg1.getVirtualCores(); |
| |
| if(mem0 <= mem1 && cpu0 <= cpu1) { |
| return true; |
| } |
| return false; |
| } |
| |
| private int scaleDownByPreemptionPercentage(int original) { |
| return (original + (preemptionPercentage - 1)) / preemptionPercentage; |
| } |
| |
| void preemptIfNeeded() { |
| if (preemptionPercentage == 0) { |
| // turned off |
| return; |
| } |
| ContainerId[] preemptedContainers = null; |
| int numPendingRequestsToService = 0; |
| synchronized (this) { |
| Resource freeResources = amRmClient.getAvailableResources(); |
| if (LOG.isDebugEnabled()) { |
| LOG.debug("Allocated resource memory: " + allocatedResources.getMemory() + |
| " cpu:" + allocatedResources.getVirtualCores() + |
| " delayedContainers: " + delayedContainerManager.delayedContainers.size() + |
| " heartbeats: " + numHeartbeats + " lastPreemptionHeartbeat: " + heartbeatAtLastPreemption); |
| } |
| assert freeResources.getMemory() >= 0; |
| |
| CookieContainerRequest highestPriRequest = null; |
| int numHighestPriRequests = 0; |
| for(CookieContainerRequest request : taskRequests.values()) { |
| if(highestPriRequest == null) { |
| highestPriRequest = request; |
| numHighestPriRequests = 1; |
| } else if(isHigherPriority(request.getPriority(), |
| highestPriRequest.getPriority())){ |
| highestPriRequest = request; |
| numHighestPriRequests = 1; |
| } else if (request.getPriority().equals(highestPriRequest.getPriority())) { |
| numHighestPriRequests++; |
| } |
| } |
| |
| if (highestPriRequest == null) { |
| // nothing pending |
| return; |
| } |
| |
| if(fitsIn(highestPriRequest.getCapability(), freeResources)) { |
| if (LOG.isDebugEnabled()) { |
| LOG.debug("Highest pri request: " + highestPriRequest + " fits in available resources " |
| + freeResources); |
| } |
| return; |
| } |
| // highest priority request will not fit in existing free resources |
| // free up some more |
| // TODO this is subject to error wrt RM resource normalization |
| |
| numPendingRequestsToService = scaleDownByPreemptionPercentage(numHighestPriRequests); |
| |
| if (numPendingRequestsToService < 1) { |
| return; |
| } |
| |
| if (LOG.isDebugEnabled()) { |
| LOG.debug("Trying to service " + numPendingRequestsToService + " out of total " |
| + numHighestPriRequests + " pending requests at pri: " |
| + highestPriRequest.getPriority()); |
| } |
| |
| for (int i=0; i<numPendingRequestsToService; ++i) { |
| // This request must have been considered for matching with all existing |
| // containers when request was made. |
| Container lowestPriNewContainer = null; |
| // could not find anything to preempt. Check if we can release unused |
| // containers |
| for (HeldContainer heldContainer : delayedContainerManager.delayedContainers) { |
| if (!heldContainer.isNew()) { |
| if (LOG.isDebugEnabled()) { |
| LOG.debug("Reused container exists. Wait for assignment loop to release it. " |
| + heldContainer.getContainer().getId()); |
| } |
| return; |
| } |
| if (heldContainer.geNumAssignmentAttempts() < 3) { |
| // we havent tried to assign this container at node/rack/ANY |
| if (LOG.isDebugEnabled()) { |
| LOG.debug("Brand new container. Wait for assignment loop to match it. " |
| + heldContainer.getContainer().getId()); |
| } |
| return; |
| } |
| Container container = heldContainer.getContainer(); |
| if (lowestPriNewContainer == null || |
| isHigherPriority(lowestPriNewContainer.getPriority(), container.getPriority())){ |
| // there is a lower priority new container |
| lowestPriNewContainer = container; |
| } |
| } |
| |
| if (lowestPriNewContainer != null) { |
| LOG.info("Preempting new container: " + lowestPriNewContainer.getId() + |
| " with priority: " + lowestPriNewContainer.getPriority() + |
| " to free resource for request: " + highestPriRequest + |
| " . Current free resources: " + freeResources); |
| numPendingRequestsToService--; |
| releaseUnassignedContainers(Collections.singletonList(lowestPriNewContainer)); |
| // We are returning an unused resource back the RM. The RM thinks it |
| // has serviced our initial request and will not re-allocate this back |
| // to us anymore. So we need to ask for this again. If there is no |
| // outstanding request at that priority then its fine to not ask again. |
| // See TEZ-915 for more details |
| for (Map.Entry<Object, CookieContainerRequest> entry : taskRequests.entrySet()) { |
| Object task = entry.getKey(); |
| CookieContainerRequest request = entry.getValue(); |
| if (request.getPriority().equals(lowestPriNewContainer.getPriority())) { |
| LOG.info("Resending request for task again: " + task); |
| deallocateTask(task, true); |
| allocateTask(task, request.getCapability(), |
| (request.getNodes() == null ? null : |
| request.getNodes().toArray(new String[request.getNodes().size()])), |
| (request.getRacks() == null ? null : |
| request.getRacks().toArray(new String[request.getRacks().size()])), |
| request.getPriority(), |
| request.getCookie().getContainerSignature(), |
| request.getCookie().getAppCookie()); |
| break; |
| } |
| } |
| // come back and free more new containers if needed |
| continue; |
| } |
| } |
| |
| if (numPendingRequestsToService < 1) { |
| return; |
| } |
| |
| // there are no reused or new containers to release. try to preempt running containers |
| // this assert will be a no-op in production but can help identify |
| // invalid assumptions during testing |
| assert delayedContainerManager.delayedContainers.isEmpty(); |
| |
| if ((numHeartbeats - heartbeatAtLastPreemption) <= numHeartbeatsBetweenPreemptions) { |
| return; |
| } |
| |
| Priority preemptedTaskPriority = null; |
| int numEntriesAtPreemptedPriority = 0; |
| for(Map.Entry<Object, Container> entry : taskAllocations.entrySet()) { |
| HeldContainer heldContainer = heldContainers.get(entry.getValue().getId()); |
| CookieContainerRequest lastTaskInfo = heldContainer.getLastTaskInfo(); |
| Priority taskPriority = lastTaskInfo.getPriority(); |
| Object signature = lastTaskInfo.getCookie().getContainerSignature(); |
| if(!isHigherPriority(highestPriRequest.getPriority(), taskPriority)) { |
| // higher or same priority |
| continue; |
| } |
| if (containerSignatureMatcher.isExactMatch( |
| highestPriRequest.getCookie().getContainerSignature(), |
| signature)) { |
| // exact match with different priorities |
| continue; |
| } |
| if (preemptedTaskPriority == null || |
| !isHigherPriority(taskPriority, preemptedTaskPriority)) { |
| // keep the lower priority |
| preemptedTaskPriority = taskPriority; |
| if (taskPriority.equals(preemptedTaskPriority)) { |
| numEntriesAtPreemptedPriority++; |
| } else { |
| // this is at a lower priority than existing |
| numEntriesAtPreemptedPriority = 1; |
| } |
| } |
| } |
| if(preemptedTaskPriority != null) { |
| int newNumPendingRequestsToService = scaleDownByPreemptionPercentage(Math.min( |
| numEntriesAtPreemptedPriority, numHighestPriRequests)); |
| numPendingRequestsToService = Math.min(newNumPendingRequestsToService, |
| numPendingRequestsToService); |
| if (numPendingRequestsToService < 1) { |
| return; |
| } |
| LOG.info("Trying to service " + numPendingRequestsToService + " out of total " |
| + numHighestPriRequests + " pending requests at pri: " |
| + highestPriRequest.getPriority() + " by preempting from " |
| + numEntriesAtPreemptedPriority + " running tasks at priority: " + preemptedTaskPriority); |
| // found something to preempt. get others of the same priority |
| preemptedContainers = new ContainerId[numPendingRequestsToService]; |
| int currIndex = 0; |
| for (Map.Entry<Object, Container> entry : taskAllocations.entrySet()) { |
| Container container = entry.getValue(); |
| if (preemptedTaskPriority.equals(container.getPriority())) { |
| // taskAllocations map will iterate from oldest to newest assigned containers |
| // keep the N newest containersIds with the matching priority |
| preemptedContainers[currIndex++ % numPendingRequestsToService] = container.getId(); |
| } |
| } |
| // app client will be notified when after container is killed |
| // and we get its completed container status |
| } |
| } |
| |
| // upcall outside locks |
| if (preemptedContainers != null) { |
| heartbeatAtLastPreemption = numHeartbeats; |
| for(int i=0; i<numPendingRequestsToService; ++i) { |
| ContainerId cId = preemptedContainers[i]; |
| if (cId != null) { |
| LOG.info("Preempting container: " + cId + " currently allocated to a task."); |
| appClientDelegate.preemptContainer(cId); |
| } |
| } |
| } |
| } |
| |
| private boolean fitsIn(Resource toFit, Resource resource) { |
| // YARN-893 prevents using correct library code |
| //return Resources.fitsIn(toFit, resource); |
| return resource.getMemory() >= toFit.getMemory(); |
| } |
| |
| private CookieContainerRequest getMatchingRequestWithPriority( |
| Container container, |
| String location) { |
| Priority priority = container.getPriority(); |
| Resource capability = container.getResource(); |
| List<? extends Collection<CookieContainerRequest>> requestsList = |
| amRmClient.getMatchingRequests(priority, location, capability); |
| |
| if (!requestsList.isEmpty()) { |
| // pick first one |
| for (Collection<CookieContainerRequest> requests : requestsList) { |
| for (CookieContainerRequest cookieContainerRequest : requests) { |
| if (canAssignTaskToContainer(cookieContainerRequest, container)) { |
| return cookieContainerRequest; |
| } |
| } |
| } |
| } |
| |
| return null; |
| } |
| |
| private CookieContainerRequest getMatchingRequestWithoutPriority( |
| Container container, |
| String location, |
| boolean considerContainerAffinity) { |
| Resource capability = container.getResource(); |
| List<? extends Collection<CookieContainerRequest>> pRequestsList = |
| amRmClient.getMatchingRequestsForTopPriority(location, capability); |
| if (considerContainerAffinity && |
| !priorityHasAffinity.contains(amRmClient.getTopPriority())) { |
| considerContainerAffinity = false; |
| } |
| if (pRequestsList == null || pRequestsList.isEmpty()) { |
| return null; |
| } |
| CookieContainerRequest firstMatch = null; |
| for (Collection<CookieContainerRequest> requests : pRequestsList) { |
| for (CookieContainerRequest cookieContainerRequest : requests) { |
| if (firstMatch == null || // we dont have a match. So look for one |
| // we have a match but are looking for a better container level match. |
| // skip the expensive canAssignTaskToContainer() if the request is |
| // not affinitized to the container |
| container.getId().equals(cookieContainerRequest.getAffinitizedContainer()) |
| ) { |
| if (canAssignTaskToContainer(cookieContainerRequest, container)) { |
| // request matched to container |
| if (!considerContainerAffinity) { |
| return cookieContainerRequest; |
| } |
| ContainerId affCId = cookieContainerRequest.getAffinitizedContainer(); |
| boolean canMatchTaskWithAffinity = true; |
| if (affCId == null || |
| !heldContainers.containsKey(affCId) || |
| inUseContainers.contains(affCId)) { |
| // affinity not specified |
| // affinitized container is no longer held |
| // affinitized container is in use |
| canMatchTaskWithAffinity = false; |
| } |
| if (canMatchTaskWithAffinity) { |
| if (container.getId().equals( |
| cookieContainerRequest.getAffinitizedContainer())) { |
| // container level match |
| if (LOG.isDebugEnabled()) { |
| LOG.debug("Matching with affinity for request: " |
| + cookieContainerRequest + " container: " + affCId); |
| } |
| return cookieContainerRequest; |
| } |
| if (LOG.isDebugEnabled()) { |
| LOG.debug("Skipping request for container " + container.getId() |
| + " due to affinity. Request: " + cookieContainerRequest |
| + " affContainer: " + affCId); |
| } |
| } else { |
| firstMatch = cookieContainerRequest; |
| } |
| } |
| } |
| } |
| } |
| |
| return firstMatch; |
| } |
| |
| private boolean canAssignTaskToContainer( |
| CookieContainerRequest cookieContainerRequest, Container container) { |
| HeldContainer heldContainer = heldContainers.get(container.getId()); |
| if (heldContainer == null || heldContainer.isNew()) { // New container. |
| return true; |
| } else { |
| if (LOG.isDebugEnabled()) { |
| LOG.debug("Trying to match task to a held container, " |
| + " containerId=" + heldContainer.container.getId()); |
| } |
| if (containerSignatureMatcher.isSuperSet(heldContainer |
| .getFirstContainerSignature(), cookieContainerRequest.getCookie() |
| .getContainerSignature())) { |
| if (LOG.isDebugEnabled()) { |
| LOG.debug("Matched delayed container to task" |
| + " containerId=" + heldContainer.container.getId()); |
| } |
| return true; |
| } |
| } |
| if (LOG.isDebugEnabled()) { |
| LOG.debug("Failed to match delayed container to task" |
| + " containerId=" + heldContainer.container.getId()); |
| } |
| return false; |
| } |
| |
| private Object getTask(CookieContainerRequest request) { |
| return request.getCookie().getTask(); |
| } |
| |
| private void releaseContainer(ContainerId containerId) { |
| Object assignedTask = containerAssignments.remove(containerId); |
| if (assignedTask != null) { |
| // A task was assigned to this container at some point. Inform the app. |
| appClientDelegate.containerBeingReleased(containerId); |
| } |
| HeldContainer delayedContainer = heldContainers.remove(containerId); |
| if (delayedContainer != null) { |
| Resources.subtractFrom(allocatedResources, |
| delayedContainer.getContainer().getResource()); |
| } |
| if (delayedContainer != null || !shouldReuseContainers) { |
| amRmClient.releaseAssignedContainer(containerId); |
| } |
| if (assignedTask != null) { |
| // A task was assigned at some point. Add to release list since we are |
| // releasing the container. |
| releasedContainers.put(containerId, assignedTask); |
| } |
| } |
| |
| private void assignContainer(Object task, |
| Container container, |
| CookieContainerRequest assigned) { |
| CookieContainerRequest request = removeTaskRequest(task); |
| assert request != null; |
| //assert assigned.equals(request); |
| |
| Container result = taskAllocations.put(task, container); |
| assert result == null; |
| inUseContainers.add(container.getId()); |
| containerAssignments.put(container.getId(), task); |
| HeldContainer heldContainer = heldContainers.get(container.getId()); |
| if (!shouldReuseContainers && heldContainer == null) { |
| heldContainers.put(container.getId(), new HeldContainer(container, |
| -1, -1, assigned)); |
| Resources.addTo(allocatedResources, container.getResource()); |
| } else { |
| if (heldContainer.isNew()) { |
| // check for existence before adding since the first container potentially |
| // has the broadest signature as subsequent uses dont expand any dimension. |
| // This will need to be enhanced to track other signatures too when we |
| // think about preferring within vertex matching etc. |
| heldContainers.put(container.getId(), |
| new HeldContainer(container, heldContainer.getNextScheduleTime(), |
| heldContainer.getContainerExpiryTime(), assigned)); |
| } |
| heldContainer.setLastTaskInfo(assigned); |
| } |
| } |
| |
| private void pushNewContainerToDelayed(List<Container> containers){ |
| long expireTime = -1; |
| if (idleContainerTimeoutMin > 0) { |
| long currentTime = System.currentTimeMillis(); |
| expireTime = currentTime + idleContainerTimeoutMin; |
| } |
| |
| synchronized (delayedContainerManager) { |
| for (Container container : containers) { |
| if (heldContainers.put(container.getId(), new HeldContainer(container, |
| -1, expireTime, null)) != null) { |
| throw new TezUncheckedException("New container " + container.getId() |
| + " is already held."); |
| } |
| long nextScheduleTime = delayedContainerManager.maxScheduleTimeSeen; |
| if (delayedContainerManager.maxScheduleTimeSeen == -1) { |
| nextScheduleTime = System.currentTimeMillis(); |
| } |
| Resources.addTo(allocatedResources, container.getResource()); |
| delayedContainerManager.addDelayedContainer(container, |
| nextScheduleTime + 1); |
| } |
| } |
| delayedContainerManager.triggerScheduling(false); |
| } |
| |
| private CookieContainerRequest removeTaskRequest(Object task) { |
| CookieContainerRequest request = taskRequests.remove(task); |
| if(request != null) { |
| // remove all references of the request from AMRMClient |
| amRmClient.removeContainerRequest(request); |
| } |
| return request; |
| } |
| |
| private void addTaskRequest(Object task, |
| CookieContainerRequest request) { |
| CookieContainerRequest oldRequest = taskRequests.put(task, request); |
| if (oldRequest != null) { |
| // remove all references of the request from AMRMClient |
| amRmClient.removeContainerRequest(oldRequest); |
| } |
| amRmClient.addContainerRequest(request); |
| } |
| |
| private Container doBookKeepingForTaskDeallocate(Object task) { |
| Container container = taskAllocations.remove(task); |
| if (container == null) { |
| return null; |
| } |
| inUseContainers.remove(container.getId()); |
| return container; |
| } |
| |
| private Object unAssignContainer(ContainerId containerId, |
| boolean releaseIfFound) { |
| // Not removing. containerAssignments tracks the last task run on a |
| // container. |
| Object task = containerAssignments.get(containerId); |
| if(task == null) { |
| return null; |
| } |
| Container container = taskAllocations.remove(task); |
| assert container != null; |
| inUseContainers.remove(containerId); |
| if(releaseIfFound) { |
| releaseContainer(containerId); |
| } |
| return task; |
| } |
| |
| private boolean isHigherPriority(Priority lhs, Priority rhs) { |
| return lhs.getPriority() < rhs.getPriority(); |
| } |
| |
| private synchronized void assignNewContainersWithLocation( |
| Iterable<Container> containers, |
| ContainerAssigner assigner, |
| Map<CookieContainerRequest, Container> assignedContainers) { |
| |
| Iterator<Container> containerIterator = containers.iterator(); |
| while (containerIterator.hasNext()) { |
| Container container = containerIterator.next(); |
| CookieContainerRequest assigned = |
| assigner.assignNewContainer(container); |
| if (assigned != null) { |
| assignedContainers.put(assigned, container); |
| containerIterator.remove(); |
| } |
| } |
| } |
| |
| private synchronized void assignReUsedContainersWithLocation( |
| Iterable<Container> containers, |
| ContainerAssigner assigner, |
| Map<CookieContainerRequest, Container> assignedContainers, |
| boolean honorLocality) { |
| |
| Iterator<Container> containerIterator = containers.iterator(); |
| while (containerIterator.hasNext()) { |
| Container container = containerIterator.next(); |
| if (assignReUsedContainerWithLocation(container, assigner, |
| assignedContainers, honorLocality)) { |
| containerIterator.remove(); |
| } |
| } |
| } |
| |
| private synchronized boolean assignReUsedContainerWithLocation( |
| Container container, |
| ContainerAssigner assigner, |
| Map<CookieContainerRequest, Container> assignedContainers, |
| boolean honorLocality) { |
| |
| Priority containerPriority = container.getPriority(); |
| Priority topPendingTaskPriority = amRmClient.getTopPriority(); |
| if (topPendingTaskPriority == null) { |
| // nothing left to assign |
| return false; |
| } |
| |
| if (topPendingTaskPriority.compareTo(containerPriority) > 0 && |
| heldContainers.get(container.getId()).isNew()) { |
| // if the next task to assign is higher priority than the container then |
| // dont assign this container to that task. |
| // if task and container are equal priority - then its first use or reuse |
| // within the same priority - safe to use |
| // if task is lower priority than container then if we use a container that |
| // is no longer needed by higher priority tasks All those higher pri tasks |
| // has been assigned resources - safe to use (first use or reuse) |
| // if task is higher priority than container then we may end up using a |
| // container that was assigned by the RM for a lower priority pending task |
| // that will be assigned after this higher priority task is assigned. If we |
| // use that task's container now then we may not be able to match this |
| // container to that task later on. However the RM has already assigned us |
| // all containers and is not going to give us new containers. We will get |
| // stuck for resources. |
| // the above applies for new containers. If a container has already been |
| // re-used then this is not relevant |
| return false; |
| } |
| |
| CookieContainerRequest assigned = |
| assigner.assignReUsedContainer(container, honorLocality); |
| if (assigned != null) { |
| assignedContainers.put(assigned, container); |
| return true; |
| } |
| return false; |
| } |
| |
| private void releaseUnassignedContainers(Iterable<Container> containers) { |
| for (Container container : containers) { |
| LOG.info("Releasing unused container: " |
| + container.getId()); |
| releaseContainer(container.getId()); |
| } |
| } |
| |
| private void informAppAboutAssignment(CookieContainerRequest assigned, |
| Container container) { |
| appClientDelegate.taskAllocated(getTask(assigned), |
| assigned.getCookie().getAppCookie(), container); |
| } |
| |
| private void informAppAboutAssignments( |
| Map<CookieContainerRequest, Container> assignedContainers) { |
| if (assignedContainers == null || assignedContainers.isEmpty()) { |
| return; |
| } |
| for (Entry<CookieContainerRequest, Container> entry : assignedContainers |
| .entrySet()) { |
| Container container = entry.getValue(); |
| // check for blacklisted nodes. There may be race conditions between |
| // setting blacklist and receiving allocations |
| if (blacklistedNodes.contains(container.getNodeId())) { |
| CookieContainerRequest request = entry.getKey(); |
| Object task = getTask(request); |
| LOG.info("Container: " + container.getId() + |
| " allocated on blacklisted node: " + container.getNodeId() + |
| " for task: " + task); |
| Object deAllocTask = deallocateContainer(container.getId()); |
| assert deAllocTask.equals(task); |
| // its ok to submit the same request again because the RM will not give us |
| // the bad/unhealthy nodes again. The nodes may become healthy/unblacklisted |
| // and so its better to give the RM the full information. |
| allocateTask(task, request.getCapability(), |
| (request.getNodes() == null ? null : |
| request.getNodes().toArray(new String[request.getNodes().size()])), |
| (request.getRacks() == null ? null : |
| request.getRacks().toArray(new String[request.getRacks().size()])), |
| request.getPriority(), |
| request.getCookie().getContainerSignature(), |
| request.getCookie().getAppCookie()); |
| } else { |
| informAppAboutAssignment(entry.getKey(), container); |
| } |
| } |
| } |
| |
| private abstract class ContainerAssigner { |
| |
| protected final String locality; |
| |
| protected ContainerAssigner(String locality) { |
| this.locality = locality; |
| } |
| |
| public abstract CookieContainerRequest assignNewContainer( |
| Container container); |
| |
| public abstract CookieContainerRequest assignReUsedContainer( |
| Container container, boolean honorLocality); |
| |
| public void doBookKeepingForAssignedContainer( |
| CookieContainerRequest assigned, Container container, |
| String matchedLocation, boolean honorLocalityFlags) { |
| if (assigned == null) { |
| return; |
| } |
| Object task = getTask(assigned); |
| assert task != null; |
| |
| LOG.info("Assigning container to task" |
| + ", container=" + container |
| + ", task=" + task |
| + ", containerHost=" + container.getNodeId().getHost() |
| + ", localityMatchType=" + locality |
| + ", matchedLocation=" + matchedLocation |
| + ", honorLocalityFlags=" + honorLocalityFlags |
| + ", reusedContainer=" |
| + containerAssignments.containsKey(container.getId()) |
| + ", delayedContainers=" + delayedContainerManager.delayedContainers.size() |
| + ", containerResourceMemory=" + container.getResource().getMemory() |
| + ", containerResourceVCores=" |
| + container.getResource().getVirtualCores()); |
| |
| assignContainer(task, container, assigned); |
| } |
| } |
| |
| private class NodeLocalContainerAssigner extends ContainerAssigner { |
| |
| NodeLocalContainerAssigner() { |
| super("NodeLocal"); |
| } |
| |
| @Override |
| public CookieContainerRequest assignNewContainer(Container container) { |
| String location = container.getNodeId().getHost(); |
| CookieContainerRequest assigned = getMatchingRequestWithPriority( |
| container, location); |
| doBookKeepingForAssignedContainer(assigned, container, location, false); |
| return assigned; |
| } |
| |
| @Override |
| public CookieContainerRequest assignReUsedContainer(Container container, |
| boolean honorLocality) { |
| String location = container.getNodeId().getHost(); |
| CookieContainerRequest assigned = getMatchingRequestWithoutPriority( |
| container, location, true); |
| doBookKeepingForAssignedContainer(assigned, container, location, true); |
| return assigned; |
| |
| } |
| } |
| |
| private class RackLocalContainerAssigner extends ContainerAssigner { |
| |
| RackLocalContainerAssigner() { |
| super("RackLocal"); |
| } |
| |
| @Override |
| public CookieContainerRequest assignNewContainer(Container container) { |
| String location = RackResolver.resolve(container.getNodeId().getHost()) |
| .getNetworkLocation(); |
| CookieContainerRequest assigned = getMatchingRequestWithPriority(container, |
| location); |
| doBookKeepingForAssignedContainer(assigned, container, location, false); |
| return assigned; |
| } |
| |
| @Override |
| public CookieContainerRequest assignReUsedContainer( |
| Container container, boolean honorLocality) { |
| // TEZ-586 this is not match an actual rackLocal request unless honorLocality |
| // is false. This method is useless if honorLocality=true |
| if (!honorLocality) { |
| String location = heldContainers.get(container.getId()).getRack(); |
| CookieContainerRequest assigned = getMatchingRequestWithoutPriority( |
| container, location, false); |
| doBookKeepingForAssignedContainer(assigned, container, location, |
| honorLocality); |
| return assigned; |
| } |
| return null; |
| } |
| } |
| |
| private class NonLocalContainerAssigner extends ContainerAssigner { |
| |
| NonLocalContainerAssigner() { |
| super("NonLocal"); |
| } |
| |
| @Override |
| public CookieContainerRequest assignNewContainer(Container container) { |
| String location = ResourceRequest.ANY; |
| CookieContainerRequest assigned = getMatchingRequestWithPriority(container, |
| location); |
| doBookKeepingForAssignedContainer(assigned, container, location, false); |
| return assigned; |
| } |
| |
| @Override |
| public CookieContainerRequest assignReUsedContainer(Container container, |
| boolean honorLocality) { |
| if (!honorLocality) { |
| String location = ResourceRequest.ANY; |
| CookieContainerRequest assigned = getMatchingRequestWithoutPriority( |
| container, location, false); |
| doBookKeepingForAssignedContainer(assigned, container, location, |
| honorLocality); |
| return assigned; |
| } |
| return null; |
| } |
| |
| } |
| |
| |
| @VisibleForTesting |
| class DelayedContainerManager extends Thread { |
| |
| class HeldContainerTimerComparator implements Comparator<HeldContainer> { |
| |
| @Override |
| public int compare(HeldContainer c1, |
| HeldContainer c2) { |
| return (int) (c1.getNextScheduleTime() - c2.getNextScheduleTime()); |
| } |
| } |
| |
| PriorityBlockingQueue<HeldContainer> delayedContainers = |
| new PriorityBlockingQueue<HeldContainer>(20, |
| new HeldContainerTimerComparator()); |
| |
| private volatile boolean tryAssigningAll = false; |
| private volatile boolean running = true; |
| private long maxScheduleTimeSeen = -1; |
| |
| // used for testing only |
| @VisibleForTesting |
| volatile AtomicBoolean drainedDelayedContainersForTest = null; |
| |
| DelayedContainerManager() { |
| super.setName("DelayedContainerManager"); |
| } |
| |
| @Override |
| public void run() { |
| while(running) { |
| // Try assigning all containers if there's a request to do so. |
| if (tryAssigningAll) { |
| doAssignAll(); |
| tryAssigningAll = false; |
| } |
| |
| // Try allocating containers which have timed out. |
| // Required since these containers may get assigned without |
| // locality at this point. |
| synchronized(this) { |
| if (delayedContainers.peek() == null) { |
| try { |
| // test only signaling to make TestTaskScheduler work |
| if (drainedDelayedContainersForTest != null) { |
| drainedDelayedContainersForTest.set(true); |
| synchronized (drainedDelayedContainersForTest) { |
| drainedDelayedContainersForTest.notifyAll(); |
| } |
| } |
| this.wait(); |
| // Re-loop to see if tryAssignAll is set. |
| continue; |
| } catch (InterruptedException e) { |
| LOG.info("AllocatedContainerManager Thread interrupted"); |
| } |
| } |
| } |
| // test only sleep to prevent tight loop cycling that makes tests stall |
| if (drainedDelayedContainersForTest != null) { |
| try { |
| Thread.sleep(100); |
| } catch (InterruptedException e) { |
| e.printStackTrace(); |
| } |
| } |
| HeldContainer delayedContainer = delayedContainers.peek(); |
| if (delayedContainer == null) { |
| continue; |
| } |
| if (LOG.isDebugEnabled()) { |
| LOG.debug("Considering HeldContainer: " |
| + delayedContainer + " for assignment"); |
| } |
| long currentTs = System.currentTimeMillis(); |
| long nextScheduleTs = delayedContainer.getNextScheduleTime(); |
| if (currentTs >= nextScheduleTs) { |
| // Remove the container and try scheduling it. |
| // TEZ-587 what if container is released by RM after this |
| // in onContainerCompleted() |
| delayedContainer = delayedContainers.poll(); |
| if (delayedContainer == null) { |
| continue; |
| } |
| Map<CookieContainerRequest, Container> assignedContainers = null; |
| synchronized(YarnTaskSchedulerService.this) { |
| if (null != |
| heldContainers.get(delayedContainer.getContainer().getId())) { |
| assignedContainers = assignDelayedContainer(delayedContainer); |
| } else { |
| LOG.info("Skipping delayed container as container is no longer" |
| + " running, containerId=" |
| + delayedContainer.getContainer().getId()); |
| } |
| } |
| // Inform App should be done outside of the lock |
| informAppAboutAssignments(assignedContainers); |
| } else { |
| synchronized(this) { |
| try { |
| // Wait for the next container to be assignable |
| delayedContainer = delayedContainers.peek(); |
| long diff = localitySchedulingDelay; |
| if (delayedContainer != null) { |
| diff = delayedContainer.getNextScheduleTime() - currentTs; |
| } |
| if (diff > 0) { |
| this.wait(diff); |
| } |
| } catch (InterruptedException e) { |
| LOG.info("AllocatedContainerManager Thread interrupted"); |
| } |
| } |
| } |
| } |
| releasePendingContainers(); |
| } |
| |
| private void doAssignAll() { |
| // The allocatedContainers queue should not be modified in the middle of an iteration over it. |
| // Synchronizing here on TaskScheduler.this to prevent this from happening. |
| // The call to assignAll from within this method should NOT add any |
| // elements back to the allocatedContainers list. Since they're all |
| // delayed elements, de-allocation should not happen either - leaving the |
| // list of delayed containers intact, except for the contaienrs which end |
| // up getting assigned. |
| if (delayedContainers.isEmpty()) { |
| return; |
| } |
| |
| Map<CookieContainerRequest, Container> assignedContainers; |
| synchronized(YarnTaskSchedulerService.this) { |
| // honor reuse-locality flags (container not timed out yet), Don't queue |
| // (already in queue), don't release (release happens when containers |
| // time-out) |
| if (LOG.isDebugEnabled()) { |
| LOG.debug("Trying to assign all delayed containers to newly received" |
| + " tasks"); |
| } |
| Iterator<HeldContainer> iter = delayedContainers.iterator(); |
| while(iter.hasNext()) { |
| HeldContainer delayedContainer = iter.next(); |
| if (!heldContainers.containsKey(delayedContainer.getContainer().getId())) { |
| // this container is no longer held by us |
| LOG.info("AssignAll - Skipping delayed container as container is no longer" |
| + " running, containerId=" |
| + delayedContainer.getContainer().getId()); |
| iter.remove(); |
| } |
| } |
| assignedContainers = tryAssignReUsedContainers( |
| new ContainerIterable(delayedContainers)); |
| } |
| // Inform app |
| informAppAboutAssignments(assignedContainers); |
| } |
| |
| /** |
| * Indicate that an attempt should be made to allocate all available containers. |
| * Intended to be used in cases where new Container requests come in |
| */ |
| public void triggerScheduling(boolean scheduleAll) { |
| this.tryAssigningAll = scheduleAll; |
| synchronized(this) { |
| this.notify(); |
| } |
| } |
| |
| public void shutdown() { |
| this.running = false; |
| this.interrupt(); |
| } |
| |
| private void releasePendingContainers() { |
| List<HeldContainer> pendingContainers = Lists.newArrayListWithCapacity( |
| delayedContainers.size()); |
| delayedContainers.drainTo(pendingContainers); |
| releaseUnassignedContainers(new ContainerIterable(pendingContainers)); |
| } |
| |
| private void addDelayedContainer(Container container, |
| long nextScheduleTime) { |
| HeldContainer delayedContainer = heldContainers.get(container.getId()); |
| if (delayedContainer == null) { |
| LOG.warn("Attempting to add a non-running container to the" |
| + " delayed container list, containerId=" + container.getId()); |
| return; |
| } else { |
| delayedContainer.setNextScheduleTime(nextScheduleTime); |
| } |
| if (maxScheduleTimeSeen < nextScheduleTime) { |
| maxScheduleTimeSeen = nextScheduleTime; |
| } |
| if (LOG.isDebugEnabled()) { |
| LOG.debug("Adding container to delayed queue" |
| + ", containerId=" + delayedContainer.getContainer().getId() |
| + ", nextScheduleTime=" + delayedContainer.getNextScheduleTime() |
| + ", containerExpiry=" + delayedContainer.getContainerExpiryTime()); |
| } |
| boolean added = false; |
| synchronized(this) { |
| added = delayedContainers.offer(delayedContainer); |
| this.notify(); |
| } |
| if (!added) { |
| releaseUnassignedContainers(Lists.newArrayList(container)); |
| } |
| } |
| |
| } |
| |
| synchronized void determineMinHeldContainers() { |
| sessionMinHeldContainers.clear(); |
| if (sessionNumMinHeldContainers <= 0) { |
| return; |
| } |
| |
| if (heldContainers.size() <= sessionNumMinHeldContainers) { |
| sessionMinHeldContainers.addAll(heldContainers.keySet()); |
| } |
| |
| Map<String, AtomicInteger> rackHeldNumber = Maps.newHashMap(); |
| Map<String, List<HeldContainer>> nodeHeldContainers = Maps.newHashMap(); |
| for(HeldContainer heldContainer : heldContainers.values()) { |
| AtomicInteger count = rackHeldNumber.get(heldContainer.getRack()); |
| if (count == null) { |
| count = new AtomicInteger(0); |
| rackHeldNumber.put(heldContainer.getRack(), count); |
| } |
| count.incrementAndGet(); |
| List<HeldContainer> nodeContainers = nodeHeldContainers.get(heldContainer.getNode()); |
| if (nodeContainers == null) { |
| nodeContainers = Lists.newLinkedList(); |
| nodeHeldContainers.put(heldContainer.getNode(), nodeContainers); |
| } |
| nodeContainers.add(heldContainer); |
| } |
| Map<String, AtomicInteger> rackToHoldNumber = Maps.newHashMap(); |
| for (String rack : rackHeldNumber.keySet()) { |
| rackToHoldNumber.put(rack, new AtomicInteger(0)); |
| } |
| |
| // distribute evenly across nodes |
| // the loop assigns 1 container per rack over all racks |
| int containerCount = 0; |
| while (containerCount < sessionNumMinHeldContainers && !rackHeldNumber.isEmpty()) { |
| Iterator<Entry<String, AtomicInteger>> iter = rackHeldNumber.entrySet().iterator(); |
| while (containerCount < sessionNumMinHeldContainers && iter.hasNext()) { |
| Entry<String, AtomicInteger> entry = iter.next(); |
| if (entry.getValue().decrementAndGet() >=0) { |
| containerCount++; |
| rackToHoldNumber.get(entry.getKey()).incrementAndGet(); |
| } else { |
| iter.remove(); |
| } |
| } |
| } |
| |
| // distribute containers evenly across nodes while not exceeding rack limit |
| // the loop assigns 1 container per node over all nodes |
| containerCount = 0; |
| while (containerCount < sessionNumMinHeldContainers && !nodeHeldContainers.isEmpty()) { |
| Iterator<Entry<String, List<HeldContainer>>> iter = nodeHeldContainers.entrySet().iterator(); |
| while (containerCount < sessionNumMinHeldContainers && iter.hasNext()) { |
| List<HeldContainer> nodeContainers = iter.next().getValue(); |
| if (nodeContainers.isEmpty()) { |
| // node is empty. remove it. |
| iter.remove(); |
| continue; |
| } |
| HeldContainer heldContainer = nodeContainers.remove(nodeContainers.size() - 1); |
| if (rackToHoldNumber.get(heldContainer.getRack()).decrementAndGet() >= 0) { |
| // rack can hold a container |
| containerCount++; |
| sessionMinHeldContainers.add(heldContainer.getContainer().getId()); |
| } else { |
| // rack limit reached. remove node. |
| iter.remove(); |
| } |
| } |
| } |
| |
| LOG.info("Holding on to " + sessionMinHeldContainers.size() + " containers"); |
| } |
| |
| private class ContainerIterable implements Iterable<Container> { |
| |
| private final Iterable<HeldContainer> delayedContainers; |
| |
| ContainerIterable(Iterable<HeldContainer> delayedContainers) { |
| this.delayedContainers = delayedContainers; |
| } |
| |
| @Override |
| public Iterator<Container> iterator() { |
| |
| final Iterator<HeldContainer> delayedContainerIterator = delayedContainers |
| .iterator(); |
| |
| return new Iterator<Container>() { |
| |
| @Override |
| public boolean hasNext() { |
| return delayedContainerIterator.hasNext(); |
| } |
| |
| @Override |
| public Container next() { |
| return delayedContainerIterator.next().getContainer(); |
| } |
| |
| @Override |
| public void remove() { |
| delayedContainerIterator.remove(); |
| } |
| }; |
| } |
| } |
| |
| static class HeldContainer { |
| |
| enum LocalityMatchLevel { |
| NEW, |
| NODE, |
| RACK, |
| NON_LOCAL |
| } |
| |
| Container container; |
| private String rack; |
| private long nextScheduleTime; |
| private Object firstContainerSignature; |
| private LocalityMatchLevel localityMatchLevel; |
| private long containerExpiryTime; |
| private CookieContainerRequest lastTaskInfo; |
| private int numAssignmentAttempts = 0; |
| |
| HeldContainer(Container container, |
| long nextScheduleTime, |
| long containerExpiryTime, |
| CookieContainerRequest firstTaskInfo) { |
| this.container = container; |
| this.nextScheduleTime = nextScheduleTime; |
| if (firstTaskInfo != null) { |
| this.lastTaskInfo = firstTaskInfo; |
| this.firstContainerSignature = firstTaskInfo.getCookie().getContainerSignature(); |
| } |
| this.localityMatchLevel = LocalityMatchLevel.NODE; |
| this.containerExpiryTime = containerExpiryTime; |
| this.rack = RackResolver.resolve(container.getNodeId().getHost()) |
| .getNetworkLocation(); |
| } |
| |
| boolean isNew() { |
| return firstContainerSignature == null; |
| } |
| |
| String getRack() { |
| return this.rack; |
| } |
| |
| String getNode() { |
| return this.container.getNodeId().getHost(); |
| } |
| |
| int geNumAssignmentAttempts() { |
| return numAssignmentAttempts; |
| } |
| |
| void incrementAssignmentAttempts() { |
| numAssignmentAttempts++; |
| } |
| |
| public Container getContainer() { |
| return this.container; |
| } |
| |
| public long getNextScheduleTime() { |
| return this.nextScheduleTime; |
| } |
| |
| public void setNextScheduleTime(long nextScheduleTime) { |
| this.nextScheduleTime = nextScheduleTime; |
| } |
| |
| public long getContainerExpiryTime() { |
| return this.containerExpiryTime; |
| } |
| |
| public void setContainerExpiryTime(long containerExpiryTime) { |
| this.containerExpiryTime = containerExpiryTime; |
| } |
| |
| public Object getFirstContainerSignature() { |
| return this.firstContainerSignature; |
| } |
| |
| public CookieContainerRequest getLastTaskInfo() { |
| return this.lastTaskInfo; |
| } |
| |
| public void setLastTaskInfo(CookieContainerRequest taskInfo) { |
| lastTaskInfo = taskInfo; |
| } |
| |
| public synchronized void resetLocalityMatchLevel() { |
| localityMatchLevel = LocalityMatchLevel.NEW; |
| } |
| |
| public synchronized void incrementLocalityMatchLevel() { |
| if (localityMatchLevel.equals(LocalityMatchLevel.NEW)) { |
| localityMatchLevel = LocalityMatchLevel.NODE; |
| } else if (localityMatchLevel.equals(LocalityMatchLevel.NODE)) { |
| localityMatchLevel = LocalityMatchLevel.RACK; |
| } else if (localityMatchLevel.equals(LocalityMatchLevel.RACK)) { |
| localityMatchLevel = LocalityMatchLevel.NON_LOCAL; |
| } else if (localityMatchLevel.equals(LocalityMatchLevel.NON_LOCAL)) { |
| throw new TezUncheckedException("Cannot increment locality level " |
| + " from current NON_LOCAL for container: " + container.getId()); |
| } |
| } |
| |
| public LocalityMatchLevel getLocalityMatchLevel() { |
| return this.localityMatchLevel; |
| } |
| |
| @Override |
| public String toString() { |
| return "HeldContainer: id: " + container.getId() |
| + ", nextScheduleTime: " + nextScheduleTime |
| + ", localityMatchLevel=" + localityMatchLevel |
| + ", signature: " |
| + (firstContainerSignature != null? firstContainerSignature.toString():"null"); |
| } |
| } |
| } |