blob: c9102e999305c9b966650a4889741b3a8985f705 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.tez.dag.app.rm;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.Comparator;
import java.util.HashMap;
import java.util.Iterator;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.PriorityBlockingQueue;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.hadoop.util.StringUtils;
import org.apache.tez.common.TezUtils;
import org.apache.tez.serviceplugins.api.TaskScheduler;
import org.apache.tez.serviceplugins.api.TaskSchedulerContext;
import org.apache.tez.serviceplugins.api.TaskSchedulerContext.AMState;
import org.apache.tez.serviceplugins.api.TaskSchedulerContext.AppFinalStatus;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.commons.lang.exception.ExceptionUtils;
import org.apache.commons.math3.random.RandomDataGenerator;
import org.apache.hadoop.classification.InterfaceAudience.Private;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.util.ExitUtil;
import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterResponse;
import org.apache.hadoop.yarn.api.records.Container;
import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.api.records.ContainerStatus;
import org.apache.hadoop.yarn.api.records.NodeId;
import org.apache.hadoop.yarn.api.records.NodeReport;
import org.apache.hadoop.yarn.api.records.Priority;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.api.records.ResourceRequest;
import org.apache.hadoop.yarn.client.api.AMRMClient.ContainerRequest;
import org.apache.hadoop.yarn.client.api.async.AMRMClientAsync;
import org.apache.hadoop.yarn.exceptions.YarnException;
import org.apache.hadoop.yarn.util.RackResolver;
import org.apache.hadoop.yarn.util.resource.Resources;
import org.apache.tez.serviceplugins.api.TaskAttemptEndReason;
import org.apache.tez.dag.app.dag.TaskAttempt;
import org.apache.tez.dag.api.TezConfiguration;
import org.apache.tez.dag.api.TezUncheckedException;
import org.apache.tez.common.ContainerSignatureMatcher;
import com.google.common.annotations.VisibleForTesting;
import org.apache.tez.common.Preconditions;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.google.common.collect.Sets;
/* TODO not yet updating cluster nodes on every allocate response
* from RMContainerRequestor
import org.apache.tez.dag.app.rm.node.AMNodeEventNodeCountUpdated;
if (clusterNmCount != lastClusterNmCount) {
LOG.info("Num cluster nodes changed from " + lastClusterNmCount + " to "
+ clusterNmCount);
eventHandler.handle(new AMNodeEventNodeCountUpdated(clusterNmCount));
}
*/
public class YarnTaskSchedulerService extends TaskScheduler
implements AMRMClientAsync.CallbackHandler {
private static final Logger LOG = LoggerFactory.getLogger(YarnTaskSchedulerService.class);
final TezAMRMClientAsync<CookieContainerRequest> amRmClient;
final ContainerSignatureMatcher containerSignatureMatcher;
// Container Re-Use configuration
private boolean shouldReuseContainers;
private boolean reuseRackLocal;
private boolean reuseNonLocal;
// type is linked hash map to maintain order of incoming requests
Map<Object, CookieContainerRequest> taskRequests =
new LinkedHashMap<Object, CookieContainerRequest>();
// LinkedHashMap is need in getProgress()
LinkedHashMap<Object, Container> taskAllocations =
new LinkedHashMap<Object, Container>();
/**
* Tracks last task assigned to a known container.
*/
Map<ContainerId, Object> containerAssignments =
new HashMap<ContainerId, Object>();
// Remove inUse depending on resolution of TEZ-1129
Set<ContainerId> inUseContainers = Sets.newHashSet();
HashMap<ContainerId, Object> releasedContainers =
new HashMap<ContainerId, Object>();
/**
* Map of containers currently being held by the TaskScheduler.
*/
Map<ContainerId, HeldContainer> heldContainers =
new HashMap<ContainerId, HeldContainer>();
Set<Priority> priorityHasAffinity = Sets.newHashSet();
Set<NodeId> blacklistedNodes = Collections
.newSetFromMap(new ConcurrentHashMap<NodeId, Boolean>());
Resource totalResources = Resource.newInstance(0, 0);
Resource allocatedResources = Resource.newInstance(0, 0);
long numHeartbeats = 0;
long heartbeatAtLastPreemption = 0;
int numHeartbeatsBetweenPreemptions = 0;
final String appHostName;
final int appHostPort;
final String appTrackingUrl;
private AtomicBoolean hasUnregistered = new AtomicBoolean(false);
AtomicBoolean isStopStarted = new AtomicBoolean(false);
private ContainerAssigner NODE_LOCAL_ASSIGNER;
private ContainerAssigner RACK_LOCAL_ASSIGNER;
private ContainerAssigner NON_LOCAL_ASSIGNER;
DelayedContainerManager delayedContainerManager;
long localitySchedulingDelay;
long idleContainerTimeoutMin;
long idleContainerTimeoutMax = 0;
int sessionNumMinHeldContainers = 0;
int preemptionPercentage = 0;
long preemptionMaxWaitTime = 0;
long highestWaitingRequestWaitStartTime = 0;
Priority highestWaitingRequestPriority = null;
Set<ContainerId> sessionMinHeldContainers = Sets.newHashSet();
RandomDataGenerator random = new RandomDataGenerator();
private final Configuration conf;
@VisibleForTesting
protected AtomicBoolean shouldUnregister = new AtomicBoolean(false);
static class CRCookie {
// Do not use these variables directly. Can caused mocked unit tests to fail.
private Object task;
private Object appCookie;
private Object containerSignature;
CRCookie(Object task, Object appCookie, Object containerSignature) {
this.task = task;
this.appCookie = appCookie;
this.containerSignature = containerSignature;
}
Object getTask() {
return task;
}
Object getAppCookie() {
return appCookie;
}
Object getContainerSignature() {
return containerSignature;
}
}
class CookieContainerRequest extends ContainerRequest {
CRCookie cookie;
ContainerId affinitizedContainerId;
public CookieContainerRequest(
Resource capability,
String[] hosts,
String[] racks,
Priority priority,
CRCookie cookie) {
super(capability, hosts, racks, priority);
this.cookie = cookie;
}
public CookieContainerRequest(
Resource capability,
ContainerId containerId,
String[] hosts,
String[] racks,
Priority priority,
CRCookie cookie) {
this(capability, hosts, racks, priority, cookie);
this.affinitizedContainerId = containerId;
}
CRCookie getCookie() {
return cookie;
}
ContainerId getAffinitizedContainer() {
return affinitizedContainerId;
}
}
public YarnTaskSchedulerService(TaskSchedulerContext taskSchedulerContext) {
super(taskSchedulerContext);
this.containerSignatureMatcher = taskSchedulerContext.getContainerSignatureMatcher();
this.amRmClient = TezAMRMClientAsync.createAMRMClientAsync(1000, this);
this.appHostName = taskSchedulerContext.getAppHostName();
this.appHostPort = taskSchedulerContext.getAppClientPort();
this.appTrackingUrl = taskSchedulerContext.getAppTrackingUrl();
try {
this.conf = TezUtils.createConfFromUserPayload(taskSchedulerContext.getInitialUserPayload());
} catch (IOException e) {
throw new TezUncheckedException(
"Failed to deserialize payload for " + YarnTaskSchedulerService.class.getSimpleName(),
e);
}
}
@Private
@VisibleForTesting
YarnTaskSchedulerService(TaskSchedulerContext taskSchedulerContext,
TezAMRMClientAsync<CookieContainerRequest> client) {
super(taskSchedulerContext);
this.containerSignatureMatcher = taskSchedulerContext.getContainerSignatureMatcher();
this.amRmClient = client;
this.appHostName = taskSchedulerContext.getAppHostName();
this.appHostPort = taskSchedulerContext.getAppClientPort();
this.appTrackingUrl = taskSchedulerContext.getAppTrackingUrl();
try {
this.conf = TezUtils.createConfFromUserPayload(taskSchedulerContext.getInitialUserPayload());
} catch (IOException e) {
throw new TezUncheckedException(
"Failed to deserialize payload for " + YarnTaskSchedulerService.class.getSimpleName(),
e);
}
}
@Override
public Resource getAvailableResources() {
return amRmClient.getAvailableResources();
}
@Override
public int getClusterNodeCount() {
// this can potentially be cheaper after YARN-1722
return amRmClient.getClusterNodeCount();
}
@Override
public void setShouldUnregister() {
this.shouldUnregister.set(true);
}
@Override
public boolean hasUnregistered() {
return hasUnregistered.get();
}
// AbstractService methods
@Override
public synchronized void initialize() {
// TODO Post TEZ-2003. Make all of these final fields.
amRmClient.init(conf);
int heartbeatIntervalMax = conf.getInt(
TezConfiguration.TEZ_AM_RM_HEARTBEAT_INTERVAL_MS_MAX,
TezConfiguration.TEZ_AM_RM_HEARTBEAT_INTERVAL_MS_MAX_DEFAULT);
amRmClient.setHeartbeatInterval(heartbeatIntervalMax);
shouldReuseContainers = conf.getBoolean(
TezConfiguration.TEZ_AM_CONTAINER_REUSE_ENABLED,
TezConfiguration.TEZ_AM_CONTAINER_REUSE_ENABLED_DEFAULT);
reuseRackLocal = conf.getBoolean(
TezConfiguration.TEZ_AM_CONTAINER_REUSE_RACK_FALLBACK_ENABLED,
TezConfiguration.TEZ_AM_CONTAINER_REUSE_RACK_FALLBACK_ENABLED_DEFAULT);
reuseNonLocal = conf
.getBoolean(
TezConfiguration.TEZ_AM_CONTAINER_REUSE_NON_LOCAL_FALLBACK_ENABLED,
TezConfiguration.TEZ_AM_CONTAINER_REUSE_NON_LOCAL_FALLBACK_ENABLED_DEFAULT);
Preconditions.checkArgument(
((!reuseRackLocal && !reuseNonLocal) || (reuseRackLocal)),
"Re-use Rack-Local cannot be disabled if Re-use Non-Local has been"
+ " enabled");
localitySchedulingDelay = conf.getLong(
TezConfiguration.TEZ_AM_CONTAINER_REUSE_LOCALITY_DELAY_ALLOCATION_MILLIS,
TezConfiguration.TEZ_AM_CONTAINER_REUSE_LOCALITY_DELAY_ALLOCATION_MILLIS_DEFAULT);
Preconditions.checkArgument(localitySchedulingDelay >= 0,
"Locality Scheduling delay should be >=0");
idleContainerTimeoutMin = conf.getLong(
TezConfiguration.TEZ_AM_CONTAINER_IDLE_RELEASE_TIMEOUT_MIN_MILLIS,
TezConfiguration.TEZ_AM_CONTAINER_IDLE_RELEASE_TIMEOUT_MIN_MILLIS_DEFAULT);
Preconditions.checkArgument(idleContainerTimeoutMin >= 0 || idleContainerTimeoutMin == -1,
"Idle container release min timeout should be either -1 or >=0");
idleContainerTimeoutMax = conf.getLong(
TezConfiguration.TEZ_AM_CONTAINER_IDLE_RELEASE_TIMEOUT_MAX_MILLIS,
TezConfiguration.TEZ_AM_CONTAINER_IDLE_RELEASE_TIMEOUT_MAX_MILLIS_DEFAULT);
Preconditions.checkArgument(
idleContainerTimeoutMax >= 0 && idleContainerTimeoutMax >= idleContainerTimeoutMin,
"Idle container release max timeout should be >=0 and >= " +
TezConfiguration.TEZ_AM_CONTAINER_IDLE_RELEASE_TIMEOUT_MIN_MILLIS);
sessionNumMinHeldContainers = conf.getInt(TezConfiguration.TEZ_AM_SESSION_MIN_HELD_CONTAINERS,
TezConfiguration.TEZ_AM_SESSION_MIN_HELD_CONTAINERS_DEFAULT);
Preconditions.checkArgument(sessionNumMinHeldContainers >= 0,
"Session minimum held containers should be >=0");
preemptionPercentage = conf.getInt(TezConfiguration.TEZ_AM_PREEMPTION_PERCENTAGE,
TezConfiguration.TEZ_AM_PREEMPTION_PERCENTAGE_DEFAULT);
Preconditions.checkArgument(preemptionPercentage >= 0 && preemptionPercentage <= 100,
"Preemption percentage should be between 0-100");
numHeartbeatsBetweenPreemptions = conf.getInt(
TezConfiguration.TEZ_AM_PREEMPTION_HEARTBEATS_BETWEEN_PREEMPTIONS,
TezConfiguration.TEZ_AM_PREEMPTION_HEARTBEATS_BETWEEN_PREEMPTIONS_DEFAULT);
Preconditions.checkArgument(numHeartbeatsBetweenPreemptions >= 1,
"Heartbeats between preemptions should be >=1");
preemptionMaxWaitTime = conf.getInt(TezConfiguration.TEZ_AM_PREEMPTION_MAX_WAIT_TIME_MS,
TezConfiguration.TEZ_AM_PREEMPTION_MAX_WAIT_TIME_MS_DEFAULT);
Preconditions.checkArgument(preemptionMaxWaitTime >=0, "Preemption max wait time must be >=0");
delayedContainerManager = new DelayedContainerManager();
NODE_LOCAL_ASSIGNER = new NodeLocalContainerAssigner();
RACK_LOCAL_ASSIGNER = new RackLocalContainerAssigner();
NON_LOCAL_ASSIGNER = new NonLocalContainerAssigner();
LOG.info("YarnTaskScheduler initialized with configuration: " +
"maxRMHeartbeatInterval: " + heartbeatIntervalMax +
", containerReuseEnabled: " + shouldReuseContainers +
", reuseRackLocal: " + reuseRackLocal +
", reuseNonLocal: " + reuseNonLocal +
", localitySchedulingDelay: " + localitySchedulingDelay +
", preemptionPercentage: " + preemptionPercentage +
", preemptionMaxWaitTime: " + preemptionMaxWaitTime +
", numHeartbeatsBetweenPreemptions: " + numHeartbeatsBetweenPreemptions +
", idleContainerMinTimeout: " + idleContainerTimeoutMin +
", idleContainerMaxTimeout: " + idleContainerTimeoutMax +
", sessionMinHeldContainers: " + sessionNumMinHeldContainers);
}
@Override
public void start() {
try {
RegisterApplicationMasterResponse response;
synchronized (this) {
amRmClient.start();
response = amRmClient.registerApplicationMaster(appHostName,
appHostPort,
appTrackingUrl);
}
// upcall to app outside locks
getContext().setApplicationRegistrationData(
response.getMaximumResourceCapability(),
response.getApplicationACLs(),
response.getClientToAMTokenMasterKey(),
response.getQueue());
delayedContainerManager.start();
} catch (YarnException e) {
LOG.error("Yarn Exception while registering", e);
throw new TezUncheckedException(e);
} catch (IOException e) {
LOG.error("IO Exception while registering", e);
throw new TezUncheckedException(e);
}
}
@Override
public void shutdown() throws InterruptedException {
// upcall to app outside of locks
try {
delayedContainerManager.shutdown();
// Wait for contianers to be released.
delayedContainerManager.join(2000l);
synchronized (this) {
if (shouldUnregister.get()) {
AppFinalStatus status = getContext().getFinalAppStatus();
LOG.info("Unregistering application from RM"
+ ", exitStatus=" + status.exitStatus
+ ", exitMessage=" + status.exitMessage
+ ", trackingURL=" + status.postCompletionTrackingUrl);
amRmClient.unregisterApplicationMaster(status.exitStatus,
status.exitMessage,
status.postCompletionTrackingUrl);
LOG.info("Successfully unregistered application from RM");
hasUnregistered.set(true);
}
}
// call client.stop() without lock client will attempt to stop the callback
// operation and at the same time the callback operation might be trying
// to get our lock.
amRmClient.stop();
} catch (YarnException e) {
LOG.error("Yarn Exception while unregistering ", e);
throw new TezUncheckedException(e);
} catch (IOException e) {
LOG.error("IOException while unregistering ", e);
throw new TezUncheckedException(e);
}
}
// AMRMClientAsync interface methods
@Override
public void onContainersCompleted(List<ContainerStatus> statuses) {
if (isStopStarted.get()) {
if (LOG.isDebugEnabled()) {
for (ContainerStatus status : statuses) {
LOG.debug("Container " + status.getContainerId() + " is completed with ContainerStatus=" +
status);
}
}
return;
}
Map<Object, ContainerStatus> appContainerStatus =
new HashMap<Object, ContainerStatus>(statuses.size());
synchronized (this) {
for(ContainerStatus containerStatus : statuses) {
ContainerId completedId = containerStatus.getContainerId();
HeldContainer delayedContainer = heldContainers.get(completedId);
Object task = releasedContainers.remove(completedId);
if(task != null){
if (delayedContainer != null) {
LOG.warn("Held container should be null since releasedContainer is not");
}
// TODO later we may want to check if exit code matched expectation
// e.g. successful container should not come back fail exit code after
// being released
// completion of a container we had released earlier
// an allocated container completed. notify app
if (LOG.isDebugEnabled()) {
LOG.debug("Released container completed:" + completedId +
" last allocated to task: " + task);
}
appContainerStatus.put(task, containerStatus);
continue;
}
// not found in released containers. check currently allocated containers
// no need to release this container as the RM has already completed it
task = unAssignContainer(completedId, false);
if (delayedContainer != null) {
heldContainers.remove(completedId);
Resources.subtract(allocatedResources, delayedContainer.getContainer().getResource());
} else {
LOG.warn("Held container expected to be not null for a non-AM-released container");
}
if(task != null) {
// completion of a container we have allocated currently
// an allocated container completed. notify app. This will cause attempt to get killed
LOG.info(
"Allocated container completed:" + completedId + " last allocated to task: " + task);
appContainerStatus.put(task, containerStatus);
continue;
}
// container neither allocated nor released
if (delayedContainer != null) {
LOG.info("Delayed container {} completed", containerStatus.getContainerId());
maybeRescheduleContainerAtPriority(delayedContainer.getContainer().getPriority());
} else {
LOG.info("Ignoring unknown container: " + containerStatus.getContainerId());
}
}
}
// upcall to app must be outside locks
for (Entry<Object, ContainerStatus> entry : appContainerStatus.entrySet()) {
getContext().containerCompleted(entry.getKey(), entry.getValue());
}
}
@Override
public void onContainersAllocated(List<Container> containers) {
if (isStopStarted.get()) {
LOG.info("Ignoring container allocations because application is shutting down. Num " +
containers.size());
if (LOG.isDebugEnabled()) {
for (Container container : containers) {
LOG.debug("Release container:" + container.getId() + ", because App is shutting down.");
releaseContainer(container.getId());
}
}
return;
}
Map<CookieContainerRequest, Container> assignedContainers;
if (LOG.isDebugEnabled()) {
StringBuilder sb = new StringBuilder();
for (Container container: containers) {
sb.append(container.getId()).append(", ");
}
LOG.debug("Assigned New Containers: " + sb.toString());
}
synchronized (this) {
if (!shouldReuseContainers) {
List<Container> modifiableContainerList = Lists.newLinkedList(containers);
assignedContainers = assignNewlyAllocatedContainers(
modifiableContainerList);
} else {
// unify allocations
pushNewContainerToDelayed(containers);
return;
}
}
// upcall to app must be outside locks
informAppAboutAssignments(assignedContainers);
}
/**
* Tries assigning the list of specified containers. Optionally, release
* containers or add them to the delayed container queue.
*
* The flags apply to all containers in the specified lists. So, separate
* calls should be made based on the expected behaviour.
*
* @param containers
* The list of containers to be assigned. The list *may* be modified
* in place based on allocations and releases.
* @return Assignments.
*/
private synchronized Map<CookieContainerRequest, Container>
assignNewlyAllocatedContainers(Iterable<Container> containers) {
boolean amInCompletionState = (getContext().getAMState() == AMState.COMPLETED);
Map<CookieContainerRequest, Container> assignedContainers =
new HashMap<CookieContainerRequest, Container>();
if (!amInCompletionState) {
assignNewContainersWithLocation(containers,
NODE_LOCAL_ASSIGNER, assignedContainers);
assignNewContainersWithLocation(containers,
RACK_LOCAL_ASSIGNER, assignedContainers);
assignNewContainersWithLocation(containers,
NON_LOCAL_ASSIGNER, assignedContainers);
}
// Release any unassigned containers given by the RM
if (containers.iterator().hasNext()) {
LOG.info("Releasing newly assigned containers which could not be allocated");
}
releaseUnassignedContainers(containers);
return assignedContainers;
}
private synchronized Map<CookieContainerRequest, Container>
tryAssignReUsedContainers(Iterable<Container> containers) {
boolean amInCompletionState = (getContext().getAMState() == AMState.COMPLETED);
Map<CookieContainerRequest, Container> assignedContainers =
new HashMap<CookieContainerRequest, Container>();
// Honor locality and match as many as possible
if (!amInCompletionState) {
assignReUsedContainersWithLocation(containers,
NODE_LOCAL_ASSIGNER, assignedContainers, true);
assignReUsedContainersWithLocation(containers,
RACK_LOCAL_ASSIGNER, assignedContainers, true);
assignReUsedContainersWithLocation(containers,
NON_LOCAL_ASSIGNER, assignedContainers, true);
}
return assignedContainers;
}
@VisibleForTesting
long getHeldContainerExpireTime(long startTime) {
// expire time is at least extended by min time.
// corner case when min time = -1 but then it does not matter because
// expire time is irrelevant at that point.
long expireTime = (startTime + idleContainerTimeoutMin);
if (idleContainerTimeoutMin != -1 && idleContainerTimeoutMin < idleContainerTimeoutMax) {
long expireTimeMax = startTime + idleContainerTimeoutMax;
expireTime = random.nextLong(expireTime, expireTimeMax);
}
return expireTime;
}
/**
* Try to assign a re-used container
* @param heldContainer Container to be used to assign to tasks
* @return Assigned container map
*/
private synchronized Map<CookieContainerRequest, Container>
assignDelayedContainer(HeldContainer heldContainer) {
AMState state = getContext().getAMState();
boolean isNew = heldContainer.isNew();
if (LOG.isDebugEnabled()) {
LOG.debug("Trying to assign a delayed container"
+ ", containerId=" + heldContainer.getContainer().getId()
+ ", nextScheduleTime=" + heldContainer.getNextScheduleTime()
+ ", containerExpiryTime=" + heldContainer.getContainerExpiryTime()
+ ", AMState=" + state
+ ", matchLevel=" + heldContainer.getLocalityMatchLevel()
+ ", taskRequestsCount=" + taskRequests.size()
+ ", heldContainers=" + heldContainers.size()
+ ", delayedContainers=" + delayedContainerManager.delayedContainers.size()
+ ", isNew=" + isNew);
}
if (state.equals(AMState.IDLE) || taskRequests.isEmpty()) {
// reset locality level on held container
// if sessionDelay defined, push back into delayed queue if not already
// done so
// Compute min held containers.
if (getContext().isSession() && sessionNumMinHeldContainers > 0 &&
sessionMinHeldContainers.isEmpty()) {
// session mode and need to hold onto containers and not done so already
determineMinHeldContainers();
}
heldContainer.resetLocalityMatchLevel();
long currentTime = System.currentTimeMillis();
boolean releaseContainer = false;
if (isNew || (heldContainer.getContainerExpiryTime() <= currentTime
&& idleContainerTimeoutMin != -1)) {
// container idle timeout has expired or is a new unused container.
// new container is possibly a spurious race condition allocation.
if (getContext().isSession()
&& sessionMinHeldContainers.contains(heldContainer.getContainer().getId())) {
// There are no outstanding requests. So its safe to hold new containers.
// We may have received more containers than necessary and some are unused
// In session mode and container in set of chosen min held containers
// increase the idle container expire time to maintain sanity with
// the rest of the code.
heldContainer.setContainerExpiryTime(getHeldContainerExpireTime(currentTime));
} else {
releaseContainer = true;
}
}
if (releaseContainer) {
LOG.info("No taskRequests. Container's idle timeout delay expired or is new. " +
"Releasing container"
+ ", containerId=" + heldContainer.getContainer().getId()
+ ", containerExpiryTime="
+ heldContainer.getContainerExpiryTime()
+ ", idleTimeout=" + idleContainerTimeoutMin
+ ", taskRequestsCount=" + taskRequests.size()
+ ", heldContainers=" + heldContainers.size()
+ ", delayedContainers=" + delayedContainerManager.delayedContainers.size()
+ ", isNew=" + isNew);
releaseUnassignedContainers(
Collections.singletonList((heldContainer.getContainer())));
} else {
// no outstanding work and container idle timeout not expired
if (LOG.isDebugEnabled()) {
LOG.debug("Holding onto idle container with no work. CId: "
+ heldContainer.getContainer().getId() + " with expiry: "
+ heldContainer.getContainerExpiryTime() + " currentTime: "
+ currentTime + " next look: "
+ (currentTime + localitySchedulingDelay));
}
// put back and wait for new requests until expiry
heldContainer.resetLocalityMatchLevel();
delayedContainerManager.addDelayedContainer(
heldContainer.getContainer(), currentTime
+ localitySchedulingDelay);
}
} else if (state.equals(AMState.RUNNING_APP)) {
// clear min held containers since we need to allocate to tasks
if (!sessionMinHeldContainers.isEmpty()) {
// update the expire time of min held containers so that they are
// not released immediately, when new requests come in, if they come in
// just before these containers are about to expire (race condition)
long currentTime = System.currentTimeMillis();
for (ContainerId minHeldCId : sessionMinHeldContainers) {
HeldContainer minHeldContainer = heldContainers.get(minHeldCId);
if (minHeldContainer != null) {
// check in case it got removed because of external reasons
minHeldContainer.setContainerExpiryTime(getHeldContainerExpireTime(currentTime));
}
}
sessionMinHeldContainers.clear();
}
HeldContainer.LocalityMatchLevel localityMatchLevel =
heldContainer.getLocalityMatchLevel();
Map<CookieContainerRequest, Container> assignedContainers =
new HashMap<CookieContainerRequest, Container>();
Container containerToAssign = heldContainer.container;
heldContainer.incrementAssignmentAttempts();
// Each time a container is seen, we try node, rack and non-local in that
// order depending on matching level allowed
// if match level is NEW or NODE, match only at node-local
// always try node local matches for other levels
if (isNew
|| localityMatchLevel.equals(HeldContainer.LocalityMatchLevel.NEW)
|| localityMatchLevel.equals(HeldContainer.LocalityMatchLevel.NODE)
|| localityMatchLevel.equals(HeldContainer.LocalityMatchLevel.RACK)
|| localityMatchLevel.equals(HeldContainer.LocalityMatchLevel.NON_LOCAL)) {
assignReUsedContainerWithLocation(containerToAssign,
NODE_LOCAL_ASSIGNER, assignedContainers, true);
if (LOG.isDebugEnabled() && assignedContainers.isEmpty()) {
LOG.debug("Failed to assign tasks to delayed container using node"
+ ", containerId=" + heldContainer.getContainer().getId());
}
}
// if re-use allowed at rack
// match against rack if match level is RACK or NON-LOCAL
// if scheduling delay is 0, match at RACK allowed without a sleep
if (assignedContainers.isEmpty()) {
if ((reuseRackLocal || isNew) && (localitySchedulingDelay == 0 ||
(localityMatchLevel.equals(HeldContainer.LocalityMatchLevel.RACK)
|| localityMatchLevel.equals(
HeldContainer.LocalityMatchLevel.NON_LOCAL)))) {
assignReUsedContainerWithLocation(containerToAssign,
RACK_LOCAL_ASSIGNER, assignedContainers, false);
if (LOG.isDebugEnabled() && assignedContainers.isEmpty()) {
LOG.debug("Failed to assign tasks to delayed container using rack"
+ ", containerId=" + heldContainer.getContainer().getId());
}
}
}
// if re-use allowed at non-local
// match against rack if match level is NON-LOCAL
// if scheduling delay is 0, match at NON-LOCAL allowed without a sleep
if (assignedContainers.isEmpty()) {
if ((reuseNonLocal || isNew) && (localitySchedulingDelay == 0
|| localityMatchLevel.equals(
HeldContainer.LocalityMatchLevel.NON_LOCAL))) {
assignReUsedContainerWithLocation(containerToAssign,
NON_LOCAL_ASSIGNER, assignedContainers, false);
if (LOG.isDebugEnabled() && assignedContainers.isEmpty()) {
LOG.debug("Failed to assign tasks to delayed container using non-local"
+ ", containerId=" + heldContainer.getContainer().getId());
}
}
}
if (assignedContainers.isEmpty()) {
long currentTime = System.currentTimeMillis();
// Release container if final expiry time is reached
// Dont release a new container. The RM may not give us new ones
// The assumption is that the expire time is larger than the sum of all
// locality delays. So if we hit the expire time then we have already
// tried to assign at all locality levels.
// We run the risk of not being able to retain min held containers but
// if we are not being able to assign containers to pending tasks then
// we cannot avoid releasing containers. Or else we may not be able to
// get new containers from YARN to match the pending request
if (!isNew && heldContainer.getContainerExpiryTime() <= currentTime
&& idleContainerTimeoutMin != -1) {
LOG.info("Container's idle timeout expired. Releasing container"
+ ", containerId=" + heldContainer.container.getId()
+ ", containerExpiryTime="
+ heldContainer.getContainerExpiryTime()
+ ", idleTimeoutMin=" + idleContainerTimeoutMin);
releaseUnassignedContainers(
Lists.newArrayList(heldContainer.container));
} else {
// Let's decide if this container has hit the end of the road
// EOL true if container's match level is NON-LOCAL
boolean hitFinalMatchLevel = localityMatchLevel.equals(
HeldContainer.LocalityMatchLevel.NON_LOCAL);
if (!hitFinalMatchLevel) {
// EOL also true if locality delay is 0
// or rack-local or non-local is disabled
heldContainer.incrementLocalityMatchLevel();
if (localitySchedulingDelay == 0 ||
(!reuseRackLocal
|| (!reuseNonLocal &&
heldContainer.getLocalityMatchLevel().equals(
HeldContainer.LocalityMatchLevel.NON_LOCAL)))) {
hitFinalMatchLevel = true;
}
// the above if-stmt does not apply to new containers since they will
// be matched at all locality levels. So there finalMatchLevel cannot
// be short-circuited
if (localitySchedulingDelay > 0 && isNew) {
hitFinalMatchLevel = false;
}
}
if (hitFinalMatchLevel) {
boolean safeToRelease = true;
Priority topPendingPriority = amRmClient.getTopPriority();
Priority containerPriority = heldContainer.container.getPriority();
if (isNew && topPendingPriority != null &&
containerPriority.compareTo(topPendingPriority) < 0) {
// this container is of lower priority and given to us by the RM for
// a task that will be matched after the current top priority. Keep
// this container for those pending tasks since the RM is not going
// to give this container to us again
safeToRelease = false;
}
// Are there any pending requests at any priority?
// release if there are tasks or this is not a session
if (safeToRelease &&
(!taskRequests.isEmpty() || !getContext().isSession())) {
LOG.info("Releasing held container as either there are pending but "
+ " unmatched requests or this is not a session"
+ ", containerId=" + heldContainer.container.getId()
+ ", pendingTasks=" + taskRequests.size()
+ ", isSession=" + getContext().isSession()
+ ". isNew=" + isNew);
releaseUnassignedContainers(
Lists.newArrayList(heldContainer.container));
} else {
// if no tasks, treat this like an idle session
heldContainer.resetLocalityMatchLevel();
delayedContainerManager.addDelayedContainer(
heldContainer.getContainer(),
currentTime + localitySchedulingDelay);
}
} else {
// Schedule delay container to match at a later try
delayedContainerManager.addDelayedContainer(
heldContainer.getContainer(),
currentTime + localitySchedulingDelay);
}
}
} else if (LOG.isDebugEnabled()) {
LOG.debug("Delayed container assignment successful"
+ ", containerId=" + heldContainer.getContainer().getId());
}
return assignedContainers;
} else {
// ignore all other cases?
LOG.warn("Received a request to assign re-used containers when AM was "
+ " in state: " + state + ". Ignoring request and releasing container"
+ ": " + heldContainer.getContainer().getId());
releaseUnassignedContainers(Lists.newArrayList(heldContainer.container));
}
return null;
}
@Override
public synchronized void dagComplete() {
for (HeldContainer heldContainer : heldContainers.values()) {
heldContainer.resetLocalityMatchLevel();
}
synchronized(delayedContainerManager) {
delayedContainerManager.notify();
}
}
@Override
public void onShutdownRequest() {
if (isStopStarted.get()) {
return;
}
// upcall to app must be outside locks
getContext().appShutdownRequested();
}
@Override
public void onNodesUpdated(List<NodeReport> updatedNodes) {
if (isStopStarted.get()) {
return;
}
// ignore bad nodes for now
// upcall to app must be outside locks
getContext().nodesUpdated(updatedNodes);
}
@Override
public float getProgress() {
if (isStopStarted.get()) {
return 1;
}
if(totalResources.getMemory() == 0) {
// assume this is the first allocate callback. nothing is allocated.
// available resource = totalResource
// TODO this will not handle dynamic changes in resources
totalResources = Resources.clone(getAvailableResources());
LOG.info("App total resource memory: " + totalResources.getMemory() +
" cpu: " + totalResources.getVirtualCores() +
" taskAllocations: " + taskAllocations.size());
}
synchronized (this) {
numHeartbeats++;
if (preemptIfNeeded()) {
heartbeatAtLastPreemption = numHeartbeats;
}
}
return getContext().getProgress();
}
@Override
public void onError(Throwable t) {
if (isStopStarted.get()) {
LOG.error("Got TaskSchedulerError, " + ExceptionUtils.getStackTrace(t));
return;
}
LOG.error("Got Error from RMClient", t);
getContext().reportError(YarnTaskSchedulerServiceError.RESOURCEMANAGER_ERROR, StringUtils.stringifyException(t),
null);
}
@Override
public Resource getTotalResources() {
return totalResources;
}
@Override
public synchronized void blacklistNode(NodeId nodeId) {
LOG.info("Blacklisting node: " + nodeId);
amRmClient.addNodeToBlacklist(nodeId);
blacklistedNodes.add(nodeId);
}
@Override
public synchronized void unblacklistNode(NodeId nodeId) {
if (blacklistedNodes.remove(nodeId)) {
LOG.info("UnBlacklisting node: " + nodeId);
amRmClient.removeNodeFromBlacklist(nodeId);
}
}
@Override
public synchronized void allocateTask(
Object task,
Resource capability,
String[] hosts,
String[] racks,
Priority priority,
Object containerSignature,
Object clientCookie) {
// XXX Have ContainerContext implement an interface defined by TaskScheduler.
// TODO check for nulls etc
// TODO extra memory allocation
CRCookie cookie = new CRCookie(task, clientCookie, containerSignature);
CookieContainerRequest request = new CookieContainerRequest(
capability, hosts, racks, priority, cookie);
addRequestAndTrigger(task, request, hosts, racks);
}
@Override
public synchronized void allocateTask(
Object task,
Resource capability,
ContainerId containerId,
Priority priority,
Object containerSignature,
Object clientCookie) {
HeldContainer heldContainer = heldContainers.get(containerId);
String[] hosts = null;
String[] racks = null;
if (heldContainer != null) {
Container container = heldContainer.getContainer();
if (canFit(capability, container.getResource())) {
// just specify node and use YARN's soft locality constraint for the rest
hosts = new String[1];
hosts[0] = container.getNodeId().getHost();
priorityHasAffinity.add(priority);
} else {
LOG.warn("Matching requested to container: " + containerId +
" but requested capability: " + capability +
" does not fit in container resource: " + container.getResource());
}
} else {
LOG.warn("Matching requested to unknown container: " + containerId);
}
CRCookie cookie = new CRCookie(task, clientCookie, containerSignature);
CookieContainerRequest request = new CookieContainerRequest(
capability, containerId, hosts, racks, priority, cookie);
addRequestAndTrigger(task, request, hosts, racks);
}
private void addRequestAndTrigger(Object task, CookieContainerRequest request,
String[] hosts, String[] racks) {
addTaskRequest(task, request);
// See if any of the delayedContainers can be used for this task.
delayedContainerManager.triggerScheduling(true);
LOG.info("Allocation request for task: " + task +
" with request: " + request +
" host: " + ((hosts != null && hosts.length > 0) ? hosts[0] : "null") +
" rack: " + ((racks != null && racks.length > 0) ? racks[0] : "null"));
}
/**
* @param task
* the task to de-allocate.
* @param taskSucceeded
* specify whether the task succeeded or failed.
* @param endReason
* reason for the task ending
* @return true if a container is assigned to this task.
*/
@Override
public boolean deallocateTask(Object task, boolean taskSucceeded,
TaskAttemptEndReason endReason,
String diagnostics) {
Map<CookieContainerRequest, Container> assignedContainers = null;
synchronized (this) {
CookieContainerRequest request = removeTaskRequest(task);
if (request != null) {
// task not allocated yet
LOG.info("Deallocating task: " + task + " before allocation");
return false;
}
// task request not present. Look in allocations
Container container = doBookKeepingForTaskDeallocate(task);
if (container == null) {
// task neither requested nor allocated.
LOG.info("Ignoring removal of unknown task: " + task);
return false;
} else {
if (LOG.isDebugEnabled()) {
LOG.debug("Deallocated task: " + task + " from container: "
+ container.getId());
}
if (!taskSucceeded || !shouldReuseContainers) {
if (LOG.isDebugEnabled()) {
LOG.debug("Releasing container, containerId=" + container.getId()
+ ", taskSucceeded=" + taskSucceeded
+ ", reuseContainersFlag=" + shouldReuseContainers);
}
releaseContainer(container.getId());
} else {
// Don't attempt to delay containers if delay is 0.
HeldContainer heldContainer = heldContainers.get(container.getId());
if (heldContainer != null) {
heldContainer.resetLocalityMatchLevel();
long currentTime = System.currentTimeMillis();
if (idleContainerTimeoutMin > 0) {
heldContainer.setContainerExpiryTime(getHeldContainerExpireTime(currentTime));
}
assignedContainers = assignDelayedContainer(heldContainer);
} else {
// this is a non standard situation
LOG.info("Skipping container after task deallocate as container is"
+ " no longer running, containerId=" + container.getId());
}
}
}
}
// up call outside of the lock.
if (assignedContainers != null && assignedContainers.size() == 1) {
informAppAboutAssignments(assignedContainers);
}
return true;
}
@Override
public synchronized Object deallocateContainer(ContainerId containerId) {
Object task = unAssignContainer(containerId, true);
if(task != null) {
// non-standard case for the app layer to deallocate container
LOG.info("Deallocated container: " + containerId +
" from task: " + task);
return task;
}
LOG.info("Ignoring dealloction of unknown container: " + containerId);
return null;
}
@Override
public synchronized void initiateStop() {
LOG.info("Initiating stop of YarnTaskScheduler");
// release held containers
LOG.info("Releasing held containers");
isStopStarted.set(true);
// Create a new list for containerIds to iterate, otherwise it would cause ConcurrentModificationException
// because method releaseContainer will change heldContainers.
List<ContainerId> heldContainerIds = new ArrayList<ContainerId>(heldContainers.size());
for (ContainerId containerId : heldContainers.keySet()) {
heldContainerIds.add(containerId);
}
for (ContainerId containerId : heldContainerIds) {
releaseContainer(containerId);
}
// remove taskRequest from AMRMClient to avoid allocating new containers in the next heartbeat
LOG.info("Removing all pending taskRequests");
// Create a new list for tasks to avoid ConcurrentModificationException
List<Object> tasks = new ArrayList<Object>(taskRequests.size());
for (Object task : taskRequests.keySet()) {
tasks.add(task);
}
for (Object task : tasks) {
removeTaskRequest(task);
}
}
boolean canFit(Resource arg0, Resource arg1) {
int mem0 = arg0.getMemory();
int mem1 = arg1.getMemory();
int cpu0 = arg0.getVirtualCores();
int cpu1 = arg1.getVirtualCores();
if(mem0 <= mem1 && cpu0 <= cpu1) {
return true;
}
return false;
}
static int scaleDownByPreemptionPercentage(int original, int percent) {
return (int) Math.ceil((original * percent)/100.f);
}
private String constructPreemptionPeriodicLog(Resource freeResource) {
return "Allocated: " + allocatedResources +
" Free: " + freeResource +
" pendingRequests: " + taskRequests.size() +
" delayedContainers: " + delayedContainerManager.delayedContainers.size() +
" heartbeats: " + numHeartbeats +
" lastPreemptionHeartbeat: " + heartbeatAtLastPreemption +
((highestWaitingRequestPriority != null) ?
(" highestWaitingRequestWaitStartTime: " + highestWaitingRequestWaitStartTime +
" highestWaitingRequestPriority: " + highestWaitingRequestPriority.toString()) : "");
}
private void resetHighestWaitingPriority(Priority newPri) {
highestWaitingRequestPriority = newPri;
highestWaitingRequestWaitStartTime = 0;
}
boolean preemptIfNeeded() {
if (preemptionPercentage == 0) {
// turned off
return true;
}
ContainerId[] preemptedContainers = null;
int numPendingRequestsToService = 0;
synchronized (this) {
Resource freeResources = amRmClient.getAvailableResources();
if (LOG.isDebugEnabled()) {
LOG.debug(constructPreemptionPeriodicLog(freeResources));
} else {
if (numHeartbeats % 50 == 1) {
LOG.info(constructPreemptionPeriodicLog(freeResources));
}
}
assert freeResources.getMemory() >= 0;
CookieContainerRequest highestPriRequest = null;
int numHighestPriRequests = 0;
for(CookieContainerRequest request : taskRequests.values()) {
if(highestPriRequest == null) {
highestPriRequest = request;
numHighestPriRequests = 1;
} else if(isHigherPriority(request.getPriority(),
highestPriRequest.getPriority())){
highestPriRequest = request;
numHighestPriRequests = 1;
} else if (request.getPriority().equals(highestPriRequest.getPriority())) {
numHighestPriRequests++;
}
}
if (highestPriRequest == null) {
// nothing pending
resetHighestWaitingPriority(null);
return true;
}
// reset the wait time when waiting priority changes to prevent carry over of the value
if (highestWaitingRequestPriority == null ||
!highestPriRequest.getPriority().equals(highestWaitingRequestPriority)) {
resetHighestWaitingPriority(highestPriRequest.getPriority());
}
long currTime = System.currentTimeMillis();
if (highestWaitingRequestWaitStartTime == 0) {
highestWaitingRequestWaitStartTime = currTime;
}
boolean preemptionWaitDeadlineCrossed =
(currTime - highestWaitingRequestWaitStartTime) > preemptionMaxWaitTime ? true : false;
if(!preemptionWaitDeadlineCrossed &&
fitsIn(highestPriRequest.getCapability(), freeResources)) {
if (LOG.isDebugEnabled()) {
LOG.debug(highestPriRequest + " fits in free resources");
} else {
if (numHeartbeats % 50 == 1) {
LOG.info(highestPriRequest + " fits in free resources");
}
}
return true;
}
if (preemptionWaitDeadlineCrossed) {
// check if anything lower priority is running - priority inversion
// this check could have been done earlier but in the common case
// this would be unnecessary since there are usually requests pending
// in the normal case without priority inversion. So do this expensive
// iteration now
boolean lowerPriRunning = false;
for(Map.Entry<Object, Container> entry : taskAllocations.entrySet()) {
HeldContainer heldContainer = heldContainers.get(entry.getValue().getId());
CookieContainerRequest lastTaskInfo = heldContainer.getLastTaskInfo();
Priority taskPriority = lastTaskInfo.getPriority();
Object signature = lastTaskInfo.getCookie().getContainerSignature();
if(isHigherPriority(highestPriRequest.getPriority(), taskPriority)) {
// lower priority task is running
if (containerSignatureMatcher.isExactMatch(
highestPriRequest.getCookie().getContainerSignature(),
signature)) {
// exact match with different priorities
continue;
}
lowerPriRunning = true;
break;
}
}
if (!lowerPriRunning) {
// nothing lower priority running
// normal case of many pending request without priority inversion
resetHighestWaitingPriority(null);
return true;
}
LOG.info("Preemption deadline crossed at pri: " + highestPriRequest.getPriority()
+ " numRequests: " + numHighestPriRequests + ". "
+ constructPreemptionPeriodicLog(freeResources));
}
// highest priority request will not fit in existing free resources
// free up some more
// TODO this is subject to error wrt RM resource normalization
numPendingRequestsToService = scaleDownByPreemptionPercentage(numHighestPriRequests,
preemptionPercentage);
if (numPendingRequestsToService < 1) {
// nothing to preempt - reset preemption last heartbeat
return true;
}
if (LOG.isDebugEnabled()) {
LOG.debug("Trying to service " + numPendingRequestsToService + " out of total "
+ numHighestPriRequests + " pending requests at pri: "
+ highestPriRequest.getPriority());
}
for (int i=0; i<numPendingRequestsToService; ++i) {
// This request must have been considered for matching with all existing
// containers when request was made.
Container lowestPriNewContainer = null;
// could not find anything to preempt. Check if we can release unused
// containers
for (HeldContainer heldContainer : delayedContainerManager.delayedContainers) {
if (!heldContainer.isNew()) {
if (LOG.isDebugEnabled()) {
LOG.debug("Reused container exists. Wait for assignment loop to release it. "
+ heldContainer.getContainer().getId());
}
return true;
}
if (heldContainer.geNumAssignmentAttempts() < 3) {
// we havent tried to assign this container at node/rack/ANY
if (LOG.isDebugEnabled()) {
LOG.debug("Brand new container. Wait for assignment loop to match it. "
+ heldContainer.getContainer().getId());
}
return true;
}
Container container = heldContainer.getContainer();
if (lowestPriNewContainer == null ||
isHigherPriority(lowestPriNewContainer.getPriority(), container.getPriority())){
// there is a lower priority new container
lowestPriNewContainer = container;
}
}
if (lowestPriNewContainer != null) {
LOG.info("Preempting new container: " + lowestPriNewContainer.getId() +
" with priority: " + lowestPriNewContainer.getPriority() +
" to free resource for request: " + highestPriRequest +
" . Current free resources: " + freeResources);
numPendingRequestsToService--;
releaseUnassignedContainers(Collections.singletonList(lowestPriNewContainer));
// We are returning an unused resource back the RM. The RM thinks it
// has serviced our initial request and will not re-allocate this back
// to us anymore. So we need to ask for this again. If there is no
// outstanding request at that priority then its fine to not ask again.
// See TEZ-915 for more details
maybeRescheduleContainerAtPriority(lowestPriNewContainer.getPriority());
// come back and free more new containers if needed
continue;
}
}
if (numPendingRequestsToService < 1) {
return true;
}
// there are no reused or new containers to release. try to preempt running containers
// this assert will be a no-op in production but can help identify
// invalid assumptions during testing
assert delayedContainerManager.delayedContainers.isEmpty();
if (!delayedContainerManager.delayedContainers.isEmpty()) {
LOG.warn("Expected delayed containers to be empty. "
+ constructPreemptionPeriodicLog(freeResources));
}
Priority preemptedTaskPriority = null;
int numEntriesAtPreemptedPriority = 0;
for(Map.Entry<Object, Container> entry : taskAllocations.entrySet()) {
HeldContainer heldContainer = heldContainers.get(entry.getValue().getId());
CookieContainerRequest lastTaskInfo = heldContainer.getLastTaskInfo();
Priority taskPriority = lastTaskInfo.getPriority();
Object signature = lastTaskInfo.getCookie().getContainerSignature();
if(!isHigherPriority(highestPriRequest.getPriority(), taskPriority)) {
// higher or same priority
continue;
}
if (containerSignatureMatcher.isExactMatch(
highestPriRequest.getCookie().getContainerSignature(),
signature)) {
// exact match with different priorities
continue;
}
if (preemptedTaskPriority == null ||
!isHigherPriority(taskPriority, preemptedTaskPriority)) {
// keep the lower priority
if (taskPriority.equals(preemptedTaskPriority)) {
numEntriesAtPreemptedPriority++;
} else {
// this is at a lower priority than existing
numEntriesAtPreemptedPriority = 1;
}
preemptedTaskPriority = taskPriority;
}
}
if(preemptedTaskPriority != null) {
int newNumPendingRequestsToService = scaleDownByPreemptionPercentage(Math.min(
numEntriesAtPreemptedPriority, numHighestPriRequests), preemptionPercentage);
numPendingRequestsToService = Math.min(newNumPendingRequestsToService,
numPendingRequestsToService);
if (numPendingRequestsToService < 1) {
return true;
}
// wait for enough heartbeats since this request became active for preemption
if ((numHeartbeats - heartbeatAtLastPreemption) < numHeartbeatsBetweenPreemptions) {
// stop incrementing lastpreemption heartbeat count
return false;
}
LOG.info("Trying to service " + numPendingRequestsToService + " out of total "
+ numHighestPriRequests + " pending requests at pri: "
+ highestPriRequest.getPriority() + " by preempting from "
+ numEntriesAtPreemptedPriority + " running tasks at priority: " + preemptedTaskPriority);
// found something to preempt. get others of the same priority
preemptedContainers = new ContainerId[numPendingRequestsToService];
int currIndex = 0;
for (Map.Entry<Object, Container> entry : taskAllocations.entrySet()) {
HeldContainer heldContainer = heldContainers.get(entry.getValue().getId());
CookieContainerRequest lastTaskInfo = heldContainer.getLastTaskInfo();
Priority taskPriority = lastTaskInfo.getPriority();
Container container = entry.getValue();
if (preemptedTaskPriority.equals(taskPriority)) {
// taskAllocations map will iterate from oldest to newest assigned containers
// keep the N newest containersIds with the matching priority
preemptedContainers[currIndex++ % numPendingRequestsToService] = container.getId();
}
}
// app client will be notified when after container is killed
// and we get its completed container status
}
}
// upcall outside locks
if (preemptedContainers != null) {
for(int i=0; i<numPendingRequestsToService; ++i) {
ContainerId cId = preemptedContainers[i];
if (cId != null) {
LOG.info("Preempting container: " + cId + " currently allocated to a task.");
getContext().preemptContainer(cId);
}
}
}
return true;
}
private void maybeRescheduleContainerAtPriority(Priority priority) {
for (Map.Entry<Object, CookieContainerRequest> entry : taskRequests.entrySet()) {
Object task = entry.getKey();
CookieContainerRequest request = entry.getValue();
if (request.getPriority().equals(priority)) {
LOG.info("Resending request for task again: " + task);
deallocateTask(task, true, null, null);
allocateTask(task, request.getCapability(),
(request.getNodes() == null ? null :
request.getNodes().toArray(new String[request.getNodes().size()])),
(request.getRacks() == null ? null :
request.getRacks().toArray(new String[request.getRacks().size()])),
request.getPriority(),
request.getCookie().getContainerSignature(),
request.getCookie().getAppCookie());
break;
}
}
}
private boolean fitsIn(Resource toFit, Resource resource) {
// YARN-893 prevents using correct library code
//return Resources.fitsIn(toFit, resource);
return resource.getMemory() >= toFit.getMemory();
}
private CookieContainerRequest getMatchingRequestWithPriority(
Container container,
String location) {
Priority priority = container.getPriority();
Resource capability = container.getResource();
List<? extends Collection<CookieContainerRequest>> requestsList =
amRmClient.getMatchingRequests(priority, location, capability);
if (!requestsList.isEmpty()) {
// pick first one
for (Collection<CookieContainerRequest> requests : requestsList) {
for (CookieContainerRequest cookieContainerRequest : requests) {
if (canAssignTaskToContainer(cookieContainerRequest, container)) {
return cookieContainerRequest;
}
}
}
}
return null;
}
private CookieContainerRequest getMatchingRequestWithoutPriority(
Container container,
String location,
boolean considerContainerAffinity) {
Resource capability = container.getResource();
List<? extends Collection<CookieContainerRequest>> pRequestsList =
amRmClient.getMatchingRequestsForTopPriority(location, capability);
if (considerContainerAffinity &&
!priorityHasAffinity.contains(amRmClient.getTopPriority())) {
considerContainerAffinity = false;
}
if (pRequestsList == null || pRequestsList.isEmpty()) {
return null;
}
CookieContainerRequest firstMatch = null;
for (Collection<CookieContainerRequest> requests : pRequestsList) {
for (CookieContainerRequest cookieContainerRequest : requests) {
if (firstMatch == null || // we dont have a match. So look for one
// we have a match but are looking for a better container level match.
// skip the expensive canAssignTaskToContainer() if the request is
// not affinitized to the container
container.getId().equals(cookieContainerRequest.getAffinitizedContainer())
) {
if (canAssignTaskToContainer(cookieContainerRequest, container)) {
// request matched to container
if (!considerContainerAffinity) {
return cookieContainerRequest;
}
ContainerId affCId = cookieContainerRequest.getAffinitizedContainer();
boolean canMatchTaskWithAffinity = true;
if (affCId == null ||
!heldContainers.containsKey(affCId) ||
inUseContainers.contains(affCId)) {
// affinity not specified
// affinitized container is no longer held
// affinitized container is in use
canMatchTaskWithAffinity = false;
}
if (canMatchTaskWithAffinity) {
if (container.getId().equals(
cookieContainerRequest.getAffinitizedContainer())) {
// container level match
if (LOG.isDebugEnabled()) {
LOG.debug("Matching with affinity for request: "
+ cookieContainerRequest + " container: " + affCId);
}
return cookieContainerRequest;
}
if (LOG.isDebugEnabled()) {
LOG.debug("Skipping request for container " + container.getId()
+ " due to affinity. Request: " + cookieContainerRequest
+ " affContainer: " + affCId);
}
} else {
firstMatch = cookieContainerRequest;
}
}
}
}
}
return firstMatch;
}
private boolean canAssignTaskToContainer(
CookieContainerRequest cookieContainerRequest, Container container) {
HeldContainer heldContainer = heldContainers.get(container.getId());
Object task = getTask(cookieContainerRequest);
if (task instanceof TaskAttempt
&& ((TaskAttempt) task).getTask() != null
&& ((TaskAttempt) task).getTask().getNodesWithRunningAttempts().contains(container.getNodeId())) {
return false;
}
if (heldContainer == null || heldContainer.isNew()) { // New container.
return true;
} else {
if (LOG.isDebugEnabled()) {
LOG.debug("Trying to match task to a held container, "
+ " containerId=" + heldContainer.container.getId());
}
if (containerSignatureMatcher.isSuperSet(heldContainer
.getLastAssignedContainerSignature(), cookieContainerRequest.getCookie()
.getContainerSignature())) {
if (LOG.isDebugEnabled()) {
LOG.debug("Matched delayed container to task"
+ " containerId=" + heldContainer.container.getId());
}
return true;
}
}
if (LOG.isDebugEnabled()) {
LOG.debug("Failed to match delayed container to task"
+ " containerId=" + heldContainer.container.getId());
}
return false;
}
private Object getTask(CookieContainerRequest request) {
return request.getCookie().getTask();
}
private void releaseContainer(ContainerId containerId) {
Object assignedTask = containerAssignments.remove(containerId);
if (assignedTask != null) {
// A task was assigned to this container at some point. Inform the app.
getContext().containerBeingReleased(containerId);
}
HeldContainer delayedContainer = heldContainers.remove(containerId);
if (delayedContainer != null) {
Resources.subtractFrom(allocatedResources,
delayedContainer.getContainer().getResource());
}
if (delayedContainer != null || !shouldReuseContainers) {
amRmClient.releaseAssignedContainer(containerId);
}
if (assignedTask != null) {
// A task was assigned at some point. Add to release list since we are
// releasing the container.
releasedContainers.put(containerId, assignedTask);
}
}
private void assignContainer(Object task,
Container container,
CookieContainerRequest assigned) {
CookieContainerRequest request = removeTaskRequest(task);
assert request != null;
//assert assigned.equals(request);
Container result = taskAllocations.put(task, container);
assert result == null;
inUseContainers.add(container.getId());
containerAssignments.put(container.getId(), task);
HeldContainer heldContainer = heldContainers.get(container.getId());
if (!shouldReuseContainers && heldContainer == null) {
heldContainers.put(container.getId(), new HeldContainer(container,
-1, -1, assigned, this.containerSignatureMatcher));
Resources.addTo(allocatedResources, container.getResource());
} else {
if (heldContainer.isNew()) {
// check for existence before adding since the first container potentially
// has the broadest signature as subsequent uses dont expand any dimension.
// This will need to be enhanced to track other signatures too when we
// think about preferring within vertex matching etc.
heldContainers.put(container.getId(),
new HeldContainer(container, heldContainer.getNextScheduleTime(),
heldContainer.getContainerExpiryTime(), assigned, this.containerSignatureMatcher));
}
heldContainer.setLastTaskInfo(assigned);
}
}
private void pushNewContainerToDelayed(List<Container> containers){
long expireTime = getHeldContainerExpireTime(System.currentTimeMillis());
synchronized (delayedContainerManager) {
for (Container container : containers) {
if (heldContainers.put(container.getId(), new HeldContainer(container,
-1, expireTime, null, this.containerSignatureMatcher)) != null) {
throw new TezUncheckedException("New container " + container.getId()
+ " is already held.");
}
long nextScheduleTime = delayedContainerManager.maxScheduleTimeSeen;
if (delayedContainerManager.maxScheduleTimeSeen == -1) {
nextScheduleTime = System.currentTimeMillis();
}
Resources.addTo(allocatedResources, container.getResource());
delayedContainerManager.addDelayedContainer(container,
nextScheduleTime + 1);
}
}
delayedContainerManager.triggerScheduling(false);
}
private CookieContainerRequest removeTaskRequest(Object task) {
CookieContainerRequest request = taskRequests.remove(task);
if(request != null) {
// remove all references of the request from AMRMClient
amRmClient.removeContainerRequest(request);
}
return request;
}
private void addTaskRequest(Object task,
CookieContainerRequest request) {
CookieContainerRequest oldRequest = taskRequests.put(task, request);
if (oldRequest != null) {
// remove all references of the request from AMRMClient
amRmClient.removeContainerRequest(oldRequest);
}
amRmClient.addContainerRequest(request);
}
private Container doBookKeepingForTaskDeallocate(Object task) {
Container container = taskAllocations.remove(task);
if (container == null) {
return null;
}
inUseContainers.remove(container.getId());
return container;
}
private Object unAssignContainer(ContainerId containerId,
boolean releaseIfFound) {
// Not removing. containerAssignments tracks the last task run on a
// container.
Object task = containerAssignments.get(containerId);
if(task == null) {
return null;
}
Container container = taskAllocations.remove(task);
assert container != null;
inUseContainers.remove(containerId);
if(releaseIfFound) {
releaseContainer(containerId);
}
return task;
}
private boolean isHigherPriority(Priority lhs, Priority rhs) {
return lhs.getPriority() < rhs.getPriority();
}
private synchronized void assignNewContainersWithLocation(
Iterable<Container> containers,
ContainerAssigner assigner,
Map<CookieContainerRequest, Container> assignedContainers) {
Iterator<Container> containerIterator = containers.iterator();
while (containerIterator.hasNext()) {
Container container = containerIterator.next();
CookieContainerRequest assigned =
assigner.assignNewContainer(container);
if (assigned != null) {
assignedContainers.put(assigned, container);
containerIterator.remove();
}
}
}
private synchronized void assignReUsedContainersWithLocation(
Iterable<Container> containers,
ContainerAssigner assigner,
Map<CookieContainerRequest, Container> assignedContainers,
boolean honorLocality) {
Iterator<Container> containerIterator = containers.iterator();
while (containerIterator.hasNext()) {
Container container = containerIterator.next();
if (assignReUsedContainerWithLocation(container, assigner,
assignedContainers, honorLocality)) {
containerIterator.remove();
}
}
}
private synchronized boolean assignReUsedContainerWithLocation(
Container container,
ContainerAssigner assigner,
Map<CookieContainerRequest, Container> assignedContainers,
boolean honorLocality) {
Priority containerPriority = container.getPriority();
Priority topPendingTaskPriority = amRmClient.getTopPriority();
if (topPendingTaskPriority == null) {
// nothing left to assign
return false;
}
if (topPendingTaskPriority.compareTo(containerPriority) > 0 &&
heldContainers.get(container.getId()).isNew()) {
// if the next task to assign is higher priority than the container then
// dont assign this container to that task.
// if task and container are equal priority - then its first use or reuse
// within the same priority - safe to use
// if task is lower priority than container then if we use a container that
// is no longer needed by higher priority tasks All those higher pri tasks
// has been assigned resources - safe to use (first use or reuse)
// if task is higher priority than container then we may end up using a
// container that was assigned by the RM for a lower priority pending task
// that will be assigned after this higher priority task is assigned. If we
// use that task's container now then we may not be able to match this
// container to that task later on. However the RM has already assigned us
// all containers and is not going to give us new containers. We will get
// stuck for resources.
// the above applies for new containers. If a container has already been
// re-used then this is not relevant
return false;
}
CookieContainerRequest assigned =
assigner.assignReUsedContainer(container, honorLocality);
if (assigned != null) {
assignedContainers.put(assigned, container);
return true;
}
return false;
}
private void releaseUnassignedContainers(Iterable<Container> containers) {
for (Container container : containers) {
if (LOG.isDebugEnabled()) {
LOG.debug("Releasing unused container: " + container.getId());
}
releaseContainer(container.getId());
}
}
private void informAppAboutAssignment(CookieContainerRequest assigned,
Container container) {
getContext().taskAllocated(getTask(assigned),
assigned.getCookie().getAppCookie(), container);
}
private void informAppAboutAssignments(
Map<CookieContainerRequest, Container> assignedContainers) {
if (assignedContainers == null || assignedContainers.isEmpty()) {
return;
}
for (Entry<CookieContainerRequest, Container> entry : assignedContainers
.entrySet()) {
Container container = entry.getValue();
// check for blacklisted nodes. There may be race conditions between
// setting blacklist and receiving allocations
if (blacklistedNodes.contains(container.getNodeId())) {
CookieContainerRequest request = entry.getKey();
Object task = getTask(request);
LOG.info("Container: " + container.getId() +
" allocated on blacklisted node: " + container.getNodeId() +
" for task: " + task);
Object deAllocTask = deallocateContainer(container.getId());
assert deAllocTask.equals(task);
// its ok to submit the same request again because the RM will not give us
// the bad/unhealthy nodes again. The nodes may become healthy/unblacklisted
// and so its better to give the RM the full information.
allocateTask(task, request.getCapability(),
(request.getNodes() == null ? null :
request.getNodes().toArray(new String[request.getNodes().size()])),
(request.getRacks() == null ? null :
request.getRacks().toArray(new String[request.getRacks().size()])),
request.getPriority(),
request.getCookie().getContainerSignature(),
request.getCookie().getAppCookie());
} else {
informAppAboutAssignment(entry.getKey(), container);
}
}
}
private abstract class ContainerAssigner {
protected final String locality;
protected ContainerAssigner(String locality) {
this.locality = locality;
}
public abstract CookieContainerRequest assignNewContainer(
Container container);
public abstract CookieContainerRequest assignReUsedContainer(
Container container, boolean honorLocality);
public void doBookKeepingForAssignedContainer(
CookieContainerRequest assigned, Container container,
String matchedLocation, boolean honorLocalityFlags) {
if (assigned == null) {
return;
}
Object task = getTask(assigned);
assert task != null;
LOG.info("Assigning container to task: "
+ "containerId=" + container.getId()
+ ", task=" + task
+ ", containerHost=" + container.getNodeId()
+ ", containerPriority= " + container.getPriority()
+ ", containerResources=" + container.getResource()
+ ", localityMatchType=" + locality
+ ", matchedLocation=" + matchedLocation
+ ", honorLocalityFlags=" + honorLocalityFlags
+ ", reusedContainer=" + containerAssignments.containsKey(container.getId())
+ ", delayedContainers=" + delayedContainerManager.delayedContainers.size());
assignContainer(task, container, assigned);
}
}
private class NodeLocalContainerAssigner extends ContainerAssigner {
NodeLocalContainerAssigner() {
super("NodeLocal");
}
@Override
public CookieContainerRequest assignNewContainer(Container container) {
String location = container.getNodeId().getHost();
CookieContainerRequest assigned = getMatchingRequestWithPriority(
container, location);
doBookKeepingForAssignedContainer(assigned, container, location, false);
return assigned;
}
@Override
public CookieContainerRequest assignReUsedContainer(Container container,
boolean honorLocality) {
String location = container.getNodeId().getHost();
CookieContainerRequest assigned = getMatchingRequestWithoutPriority(
container, location, true);
doBookKeepingForAssignedContainer(assigned, container, location, true);
return assigned;
}
}
private class RackLocalContainerAssigner extends ContainerAssigner {
RackLocalContainerAssigner() {
super("RackLocal");
}
@Override
public CookieContainerRequest assignNewContainer(Container container) {
String location = RackResolver.resolve(container.getNodeId().getHost())
.getNetworkLocation();
CookieContainerRequest assigned = getMatchingRequestWithPriority(container,
location);
doBookKeepingForAssignedContainer(assigned, container, location, false);
return assigned;
}
@Override
public CookieContainerRequest assignReUsedContainer(
Container container, boolean honorLocality) {
// TEZ-586 this is not match an actual rackLocal request unless honorLocality
// is false. This method is useless if honorLocality=true
if (!honorLocality) {
String location = heldContainers.get(container.getId()).getRack();
CookieContainerRequest assigned = getMatchingRequestWithoutPriority(
container, location, false);
doBookKeepingForAssignedContainer(assigned, container, location,
honorLocality);
return assigned;
}
return null;
}
}
private class NonLocalContainerAssigner extends ContainerAssigner {
NonLocalContainerAssigner() {
super("NonLocal");
}
@Override
public CookieContainerRequest assignNewContainer(Container container) {
String location = ResourceRequest.ANY;
CookieContainerRequest assigned = getMatchingRequestWithPriority(container,
location);
doBookKeepingForAssignedContainer(assigned, container, location, false);
return assigned;
}
@Override
public CookieContainerRequest assignReUsedContainer(Container container,
boolean honorLocality) {
if (!honorLocality) {
String location = ResourceRequest.ANY;
CookieContainerRequest assigned = getMatchingRequestWithoutPriority(
container, location, false);
doBookKeepingForAssignedContainer(assigned, container, location,
honorLocality);
return assigned;
}
return null;
}
}
@VisibleForTesting
class DelayedContainerManager extends Thread {
class HeldContainerTimerComparator implements Comparator<HeldContainer> {
@Override
public int compare(HeldContainer c1,
HeldContainer c2) {
return (int) (c1.getNextScheduleTime() - c2.getNextScheduleTime());
}
}
PriorityBlockingQueue<HeldContainer> delayedContainers =
new PriorityBlockingQueue<HeldContainer>(20,
new HeldContainerTimerComparator());
private volatile boolean tryAssigningAll = false;
private volatile boolean running = true;
private long maxScheduleTimeSeen = -1;
// used for testing only
@VisibleForTesting
volatile AtomicBoolean drainedDelayedContainersForTest = null;
DelayedContainerManager() {
super.setName("DelayedContainerManager");
}
@Override
public void run() {
try {
mainLoop();
} catch (Throwable e) {
ExitUtil.terminate(1, e);
}
}
private void mainLoop() {
while(running) {
// Try assigning all containers if there's a request to do so.
if (tryAssigningAll) {
doAssignAll();
tryAssigningAll = false;
}
// Try allocating containers which have timed out.
// Required since these containers may get assigned without
// locality at this point.
synchronized(this) {
if (delayedContainers.peek() == null) {
try {
// test only signaling to make TestTaskScheduler work
if (drainedDelayedContainersForTest != null) {
synchronized (drainedDelayedContainersForTest) {
drainedDelayedContainersForTest.set(true);
drainedDelayedContainersForTest.notifyAll();
}
}
this.wait();
// Re-loop to see if tryAssignAll is set.
continue;
} catch (InterruptedException e) {
LOG.info("AllocatedContainerManager Thread interrupted");
}
}
}
// test only sleep to prevent tight loop cycling that makes tests stall
if (drainedDelayedContainersForTest != null) {
try {
Thread.sleep(100);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
HeldContainer delayedContainer = delayedContainers.peek();
if (delayedContainer == null) {
continue;
}
if (LOG.isDebugEnabled()) {
LOG.debug("Considering HeldContainer: "
+ delayedContainer + " for assignment");
}
long currentTs = System.currentTimeMillis();
long nextScheduleTs = delayedContainer.getNextScheduleTime();
if (currentTs >= nextScheduleTs) {
Map<CookieContainerRequest, Container> assignedContainers = null;
synchronized(YarnTaskSchedulerService.this) {
// Remove the container and try scheduling it.
// TEZ-587 what if container is released by RM after this
// in onContainerCompleted()
delayedContainer = delayedContainers.poll();
if (delayedContainer == null) {
continue;
}
if (null !=
heldContainers.get(delayedContainer.getContainer().getId())) {
assignedContainers = assignDelayedContainer(delayedContainer);
} else {
// non standard scenario
LOG.info("Skipping delayed container as container is no longer"
+ " running, containerId="
+ delayedContainer.getContainer().getId());
}
}
// Inform App should be done outside of the lock
informAppAboutAssignments(assignedContainers);
} else {
synchronized(this) {
try {
// Wait for the next container to be assignable
delayedContainer = delayedContainers.peek();
long diff = localitySchedulingDelay;
if (delayedContainer != null) {
diff = delayedContainer.getNextScheduleTime() - currentTs;
}
if (diff > 0) {
this.wait(diff);
}
} catch (InterruptedException e) {
LOG.info("AllocatedContainerManager Thread interrupted");
}
}
}
}
releasePendingContainers();
}
private void doAssignAll() {
// The allocatedContainers queue should not be modified in the middle of an iteration over it.
// Synchronizing here on TaskScheduler.this to prevent this from happening.
// The call to assignAll from within this method should NOT add any
// elements back to the allocatedContainers list. Since they're all
// delayed elements, de-allocation should not happen either - leaving the
// list of delayed containers intact, except for the contaienrs which end
// up getting assigned.
if (delayedContainers.isEmpty()) {
return;
}
Map<CookieContainerRequest, Container> assignedContainers;
synchronized(YarnTaskSchedulerService.this) {
// honor reuse-locality flags (container not timed out yet), Don't queue
// (already in queue), don't release (release happens when containers
// time-out)
if (LOG.isDebugEnabled()) {
LOG.debug("Trying to assign all delayed containers to newly received"
+ " tasks");
}
Iterator<HeldContainer> iter = delayedContainers.iterator();
while(iter.hasNext()) {
HeldContainer delayedContainer = iter.next();
if (!heldContainers.containsKey(delayedContainer.getContainer().getId())) {
// this container is no longer held by us
// non standard scenario
LOG.info("AssignAll - Skipping delayed container as container is no longer"
+ " running, containerId="
+ delayedContainer.getContainer().getId());
iter.remove();
}
}
assignedContainers = tryAssignReUsedContainers(
new ContainerIterable(delayedContainers));
}
// Inform app
informAppAboutAssignments(assignedContainers);
}
/**
* Indicate that an attempt should be made to allocate all available containers.
* Intended to be used in cases where new Container requests come in
*/
public void triggerScheduling(boolean scheduleAll) {
synchronized(this) {
this.tryAssigningAll = scheduleAll;
this.notify();
}
}
public void shutdown() {
this.running = false;
this.interrupt();
}
private void releasePendingContainers() {
List<HeldContainer> pendingContainers = Lists.newArrayListWithCapacity(
delayedContainers.size());
delayedContainers.drainTo(pendingContainers);
releaseUnassignedContainers(new ContainerIterable(pendingContainers));
}
@VisibleForTesting
void addDelayedContainer(Container container,
long nextScheduleTime) {
HeldContainer delayedContainer = heldContainers.get(container.getId());
if (delayedContainer == null) {
LOG.warn("Attempting to add a non-running container to the"
+ " delayed container list, containerId=" + container.getId());
return;
} else {
delayedContainer.setNextScheduleTime(nextScheduleTime);
}
if (maxScheduleTimeSeen < nextScheduleTime) {
maxScheduleTimeSeen = nextScheduleTime;
}
if (LOG.isDebugEnabled()) {
LOG.debug("Adding container to delayed queue"
+ ", containerId=" + delayedContainer.getContainer().getId()
+ ", nextScheduleTime=" + delayedContainer.getNextScheduleTime()
+ ", containerExpiry=" + delayedContainer.getContainerExpiryTime());
}
boolean added = false;
synchronized(this) {
added = delayedContainers.offer(delayedContainer);
if (drainedDelayedContainersForTest != null) {
synchronized (drainedDelayedContainersForTest) {
drainedDelayedContainersForTest.set(false);
}
}
this.notify();
}
if (!added) {
releaseUnassignedContainers(Lists.newArrayList(container));
}
}
}
synchronized void determineMinHeldContainers() {
sessionMinHeldContainers.clear();
if (sessionNumMinHeldContainers <= 0) {
return;
}
if (heldContainers.size() <= sessionNumMinHeldContainers) {
sessionMinHeldContainers.addAll(heldContainers.keySet());
}
Map<String, AtomicInteger> rackHeldNumber = Maps.newHashMap();
Map<String, List<HeldContainer>> nodeHeldContainers = Maps.newHashMap();
for(HeldContainer heldContainer : heldContainers.values()) {
AtomicInteger count = rackHeldNumber.get(heldContainer.getRack());
if (count == null) {
count = new AtomicInteger(0);
rackHeldNumber.put(heldContainer.getRack(), count);
}
count.incrementAndGet();
List<HeldContainer> nodeContainers = nodeHeldContainers.get(heldContainer.getNode());
if (nodeContainers == null) {
nodeContainers = Lists.newLinkedList();
nodeHeldContainers.put(heldContainer.getNode(), nodeContainers);
}
nodeContainers.add(heldContainer);
}
Map<String, AtomicInteger> rackToHoldNumber = Maps.newHashMap();
for (String rack : rackHeldNumber.keySet()) {
rackToHoldNumber.put(rack, new AtomicInteger(0));
}
// distribute evenly across nodes
// the loop assigns 1 container per rack over all racks
int containerCount = 0;
while (containerCount < sessionNumMinHeldContainers && !rackHeldNumber.isEmpty()) {
Iterator<Entry<String, AtomicInteger>> iter = rackHeldNumber.entrySet().iterator();
while (containerCount < sessionNumMinHeldContainers && iter.hasNext()) {
Entry<String, AtomicInteger> entry = iter.next();
if (entry.getValue().decrementAndGet() >=0) {
containerCount++;
rackToHoldNumber.get(entry.getKey()).incrementAndGet();
} else {
iter.remove();
}
}
}
// distribute containers evenly across nodes while not exceeding rack limit
// the loop assigns 1 container per node over all nodes
containerCount = 0;
while (containerCount < sessionNumMinHeldContainers && !nodeHeldContainers.isEmpty()) {
Iterator<Entry<String, List<HeldContainer>>> iter = nodeHeldContainers.entrySet().iterator();
while (containerCount < sessionNumMinHeldContainers && iter.hasNext()) {
List<HeldContainer> nodeContainers = iter.next().getValue();
if (nodeContainers.isEmpty()) {
// node is empty. remove it.
iter.remove();
continue;
}
HeldContainer heldContainer = nodeContainers.remove(nodeContainers.size() - 1);
if (rackToHoldNumber.get(heldContainer.getRack()).decrementAndGet() >= 0) {
// rack can hold a container
containerCount++;
sessionMinHeldContainers.add(heldContainer.getContainer().getId());
} else {
// rack limit reached. remove node.
iter.remove();
}
}
}
LOG.info("Holding on to " + sessionMinHeldContainers.size() + " containers"
+ " out of total held containers: " + heldContainers.size());
}
private static class ContainerIterable implements Iterable<Container> {
private final Iterable<HeldContainer> delayedContainers;
ContainerIterable(Iterable<HeldContainer> delayedContainers) {
this.delayedContainers = delayedContainers;
}
@Override
public Iterator<Container> iterator() {
final Iterator<HeldContainer> delayedContainerIterator = delayedContainers
.iterator();
return new Iterator<Container>() {
@Override
public boolean hasNext() {
return delayedContainerIterator.hasNext();
}
@Override
public Container next() {
return delayedContainerIterator.next().getContainer();
}
@Override
public void remove() {
delayedContainerIterator.remove();
}
};
}
}
static class HeldContainer {
enum LocalityMatchLevel {
NEW,
NODE,
RACK,
NON_LOCAL
}
final Container container;
final private String rack;
private long nextScheduleTime;
private LocalityMatchLevel localityMatchLevel;
private long containerExpiryTime;
private CookieContainerRequest lastTaskInfo;
private int numAssignmentAttempts = 0;
private Object lastAssignedContainerSignature;
final ContainerSignatureMatcher signatureMatcher;
HeldContainer(Container container,
long nextScheduleTime,
long containerExpiryTime,
CookieContainerRequest firstTaskInfo,
ContainerSignatureMatcher signatureMatcher) {
this.container = container;
this.nextScheduleTime = nextScheduleTime;
if (firstTaskInfo != null) {
this.lastTaskInfo = firstTaskInfo;
this.lastAssignedContainerSignature = firstTaskInfo.getCookie().getContainerSignature();
}
this.localityMatchLevel = LocalityMatchLevel.NODE;
this.containerExpiryTime = containerExpiryTime;
this.rack = RackResolver.resolve(container.getNodeId().getHost())
.getNetworkLocation();
this.signatureMatcher = signatureMatcher;
}
boolean isNew() {
return lastTaskInfo == null;
}
String getRack() {
return this.rack;
}
String getNode() {
return this.container.getNodeId().getHost();
}
int geNumAssignmentAttempts() {
return numAssignmentAttempts;
}
void incrementAssignmentAttempts() {
numAssignmentAttempts++;
}
public Container getContainer() {
return this.container;
}
public long getNextScheduleTime() {
return this.nextScheduleTime;
}
public void setNextScheduleTime(long nextScheduleTime) {
this.nextScheduleTime = nextScheduleTime;
}
public long getContainerExpiryTime() {
return this.containerExpiryTime;
}
public void setContainerExpiryTime(long containerExpiryTime) {
this.containerExpiryTime = containerExpiryTime;
}
public Object getLastAssignedContainerSignature() {
return this.lastAssignedContainerSignature;
}
public CookieContainerRequest getLastTaskInfo() {
return this.lastTaskInfo;
}
public void setLastTaskInfo(CookieContainerRequest taskInfo) {
// Merge the container signatures to account for any changes to the container
// footprint. For example, re-localization of additional resources will
// cause the held container's signature to change.
if (lastAssignedContainerSignature != null) {
lastAssignedContainerSignature = signatureMatcher.union(
lastAssignedContainerSignature,
taskInfo.getCookie().getContainerSignature());
} else {
lastAssignedContainerSignature = taskInfo.getCookie().getContainerSignature();
}
lastTaskInfo = taskInfo;
}
public synchronized void resetLocalityMatchLevel() {
localityMatchLevel = LocalityMatchLevel.NEW;
}
public synchronized void incrementLocalityMatchLevel() {
if (localityMatchLevel.equals(LocalityMatchLevel.NEW)) {
localityMatchLevel = LocalityMatchLevel.NODE;
} else if (localityMatchLevel.equals(LocalityMatchLevel.NODE)) {
localityMatchLevel = LocalityMatchLevel.RACK;
} else if (localityMatchLevel.equals(LocalityMatchLevel.RACK)) {
localityMatchLevel = LocalityMatchLevel.NON_LOCAL;
} else if (localityMatchLevel.equals(LocalityMatchLevel.NON_LOCAL)) {
throw new TezUncheckedException("Cannot increment locality level "
+ " from current NON_LOCAL for container: " + container.getId());
}
}
public LocalityMatchLevel getLocalityMatchLevel() {
return this.localityMatchLevel;
}
@Override
public String toString() {
return "HeldContainer: id: " + container.getId()
+ ", nextScheduleTime: " + nextScheduleTime
+ ", localityMatchLevel=" + localityMatchLevel
+ ", signature: "
+ (lastAssignedContainerSignature != null? lastAssignedContainerSignature.toString()
: "null");
}
}
}