blob: 31a7142987c41b6eb0aaa58d4669db7008d23fc7 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.tez.dag.app.rm;
import com.google.common.annotations.VisibleForTesting;
import org.apache.tez.common.Preconditions;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
import org.apache.commons.lang.mutable.MutableInt;
import org.apache.commons.math3.random.RandomDataGenerator;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.util.StringUtils;
import org.apache.hadoop.util.Time;
import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterResponse;
import org.apache.hadoop.yarn.api.records.Container;
import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.api.records.ContainerStatus;
import org.apache.hadoop.yarn.api.records.NodeId;
import org.apache.hadoop.yarn.api.records.NodeReport;
import org.apache.hadoop.yarn.api.records.Priority;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.api.records.ResourceRequest;
import org.apache.hadoop.yarn.client.api.AMRMClient;
import org.apache.hadoop.yarn.client.api.async.AMRMClientAsync;
import org.apache.hadoop.yarn.client.api.async.impl.AMRMClientAsyncImpl;
import org.apache.hadoop.yarn.client.api.impl.AMRMClientImpl;
import org.apache.hadoop.yarn.proto.YarnServiceProtos.SchedulerResourceTypes;
import org.apache.hadoop.yarn.util.RackResolver;
import org.apache.hadoop.yarn.util.resource.Resources;
import org.apache.tez.common.ContainerSignatureMatcher;
import org.apache.tez.common.TezUtils;
import org.apache.tez.dag.api.TezConfiguration;
import org.apache.tez.dag.app.dag.TaskAttempt;
import org.apache.tez.serviceplugins.api.DagInfo;
import org.apache.tez.serviceplugins.api.TaskAttemptEndReason;
import org.apache.tez.serviceplugins.api.TaskScheduler;
import org.apache.tez.serviceplugins.api.TaskSchedulerContext;
import org.apache.tez.serviceplugins.api.TaskSchedulerContext.AMState;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import javax.annotation.Nullable;
import javax.annotation.concurrent.GuardedBy;
import java.util.ArrayList;
import java.util.BitSet;
import java.util.Collection;
import java.util.Collections;
import java.util.Comparator;
import java.util.EnumSet;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.NavigableMap;
import java.util.PriorityQueue;
import java.util.Set;
import java.util.TreeMap;
import java.util.concurrent.Callable;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
/**
* A YARN task scheduler that is aware of the dependencies between vertices
* in the DAG and takes them into account when deciding how to schedule
* and preempt tasks.
*
* This scheduler makes the assumption that vertex IDs start at 0 and are
* densely allocated (i.e.: there are no "gaps" in the vertex ID space).
*/
public class DagAwareYarnTaskScheduler extends TaskScheduler
implements AMRMClientAsync.CallbackHandler {
private static final Logger LOG = LoggerFactory.getLogger(DagAwareYarnTaskScheduler.class);
private static final Comparator<HeldContainer> PREEMPT_ORDER_COMPARATOR = new PreemptOrderComparator();
private final RandomDataGenerator random = new RandomDataGenerator();
private AMRMClientAsyncWrapper client;
private ScheduledExecutorService reuseExecutor;
private ResourceCalculator resourceCalculator;
private int numHeartbeats = 0;
private Resource totalResources = Resource.newInstance(0, 0);
@GuardedBy("this")
private Resource allocatedResources = Resource.newInstance(0, 0);
private final Set<NodeId> blacklistedNodes = Collections.newSetFromMap(new ConcurrentHashMap<NodeId, Boolean>());
private final ContainerSignatureMatcher signatureMatcher;
@GuardedBy("this")
private final RequestTracker requestTracker = new RequestTracker();
@GuardedBy("this")
private final Map<ContainerId, HeldContainer> heldContainers = new HashMap<>();
@GuardedBy("this")
private final IdleContainerTracker idleTracker = new IdleContainerTracker();
@GuardedBy("this")
private final Map<Object, HeldContainer> taskAssignments = new HashMap<>();
/** A mapping from the vertex ID to the set of containers assigned to tasks for that vertex */
@GuardedBy("this")
private final Map<Integer, Set<HeldContainer>> vertexAssignments = new HashMap<>();
/** If vertex N has at least one task assigned to a container then the corresponding bit at index N is set */
@GuardedBy("this")
private final BitSet assignedVertices = new BitSet();
/**
* Tracks assigned tasks for released containers so the app can be notified properly when the
* container completion event finally arrives.
*/
@GuardedBy("this")
private final Map<ContainerId, Object> releasedContainers = new HashMap<>();
@GuardedBy("this")
private final Set<HeldContainer> sessionContainers = new HashSet<>();
/**
* Tracks the set of descendant vertices in the DAG for each vertex. The BitSet for descendants of vertex N
* are at array index N. If a bit is set at index X in the descendants BitSet then vertex X is a descendant
* of vertex N in the DAG.
*/
@GuardedBy("this")
private ArrayList<BitSet> vertexDescendants = null;
private volatile boolean stopRequested = false;
private volatile boolean shouldUnregister = false;
private volatile boolean hasUnregistered = false;
// cached configuration parameters
private boolean shouldReuseContainers;
private boolean reuseRackLocal;
private boolean reuseNonLocal;
private boolean reuseNewContainers;
private long localitySchedulingDelay;
private long idleContainerTimeoutMin;
private long idleContainerTimeoutMax;
private int sessionNumMinHeldContainers;
private int preemptionPercentage;
private int numHeartbeatsBetweenPreemptions;
private int lastPreemptionHeartbeat = 0;
private long preemptionMaxWaitTime;
public DagAwareYarnTaskScheduler(TaskSchedulerContext taskSchedulerContext) {
super(taskSchedulerContext);
signatureMatcher = taskSchedulerContext.getContainerSignatureMatcher();
}
@Override
public void initialize() throws Exception {
initialize(new AMRMClientAsyncWrapper(new AMRMClientImpl<TaskRequest>(), 1000, this));
}
void initialize(AMRMClientAsyncWrapper client) throws Exception {
super.initialize();
this.client = client;
Configuration conf = TezUtils.createConfFromUserPayload(getContext().getInitialUserPayload());
client.init(conf);
int heartbeatIntervalMax = conf.getInt(
TezConfiguration.TEZ_AM_RM_HEARTBEAT_INTERVAL_MS_MAX,
TezConfiguration.TEZ_AM_RM_HEARTBEAT_INTERVAL_MS_MAX_DEFAULT);
client.setHeartbeatInterval(heartbeatIntervalMax);
shouldReuseContainers = conf.getBoolean(
TezConfiguration.TEZ_AM_CONTAINER_REUSE_ENABLED,
TezConfiguration.TEZ_AM_CONTAINER_REUSE_ENABLED_DEFAULT);
reuseRackLocal = conf.getBoolean(
TezConfiguration.TEZ_AM_CONTAINER_REUSE_RACK_FALLBACK_ENABLED,
TezConfiguration.TEZ_AM_CONTAINER_REUSE_RACK_FALLBACK_ENABLED_DEFAULT);
reuseNonLocal = conf
.getBoolean(
TezConfiguration.TEZ_AM_CONTAINER_REUSE_NON_LOCAL_FALLBACK_ENABLED,
TezConfiguration.TEZ_AM_CONTAINER_REUSE_NON_LOCAL_FALLBACK_ENABLED_DEFAULT);
Preconditions.checkArgument(
((!reuseRackLocal && !reuseNonLocal) || (reuseRackLocal)),
"Re-use Rack-Local cannot be disabled if Re-use Non-Local has been"
+ " enabled");
reuseNewContainers = shouldReuseContainers && conf.getBoolean(
TezConfiguration.TEZ_AM_CONTAINER_REUSE_NEW_CONTAINERS_ENABLED,
TezConfiguration.TEZ_AM_CONTAINER_REUSE_NEW_CONTAINERS_ENABLED_DEFAULT);
localitySchedulingDelay = conf.getLong(
TezConfiguration.TEZ_AM_CONTAINER_REUSE_LOCALITY_DELAY_ALLOCATION_MILLIS,
TezConfiguration.TEZ_AM_CONTAINER_REUSE_LOCALITY_DELAY_ALLOCATION_MILLIS_DEFAULT);
Preconditions.checkArgument(localitySchedulingDelay >= 0,
"Locality Scheduling delay should be >=0");
idleContainerTimeoutMin = conf.getLong(
TezConfiguration.TEZ_AM_CONTAINER_IDLE_RELEASE_TIMEOUT_MIN_MILLIS,
TezConfiguration.TEZ_AM_CONTAINER_IDLE_RELEASE_TIMEOUT_MIN_MILLIS_DEFAULT);
Preconditions.checkArgument(idleContainerTimeoutMin >= 0 || idleContainerTimeoutMin == -1,
"Idle container release min timeout should be either -1 or >=0");
idleContainerTimeoutMax = conf.getLong(
TezConfiguration.TEZ_AM_CONTAINER_IDLE_RELEASE_TIMEOUT_MAX_MILLIS,
TezConfiguration.TEZ_AM_CONTAINER_IDLE_RELEASE_TIMEOUT_MAX_MILLIS_DEFAULT);
Preconditions.checkArgument(
idleContainerTimeoutMax >= 0 && idleContainerTimeoutMax >= idleContainerTimeoutMin,
"Idle container release max timeout should be >=0 and >= " +
TezConfiguration.TEZ_AM_CONTAINER_IDLE_RELEASE_TIMEOUT_MIN_MILLIS);
sessionNumMinHeldContainers = conf.getInt(TezConfiguration.TEZ_AM_SESSION_MIN_HELD_CONTAINERS,
TezConfiguration.TEZ_AM_SESSION_MIN_HELD_CONTAINERS_DEFAULT);
Preconditions.checkArgument(sessionNumMinHeldContainers >= 0,
"Session minimum held containers should be >=0");
preemptionPercentage = conf.getInt(TezConfiguration.TEZ_AM_PREEMPTION_PERCENTAGE,
TezConfiguration.TEZ_AM_PREEMPTION_PERCENTAGE_DEFAULT);
Preconditions.checkArgument(preemptionPercentage >= 0 && preemptionPercentage <= 100,
"Preemption percentage should be between 0-100");
numHeartbeatsBetweenPreemptions = conf.getInt(
TezConfiguration.TEZ_AM_PREEMPTION_HEARTBEATS_BETWEEN_PREEMPTIONS,
TezConfiguration.TEZ_AM_PREEMPTION_HEARTBEATS_BETWEEN_PREEMPTIONS_DEFAULT);
Preconditions.checkArgument(numHeartbeatsBetweenPreemptions >= 1,
"Heartbeats between preemptions should be >=1");
preemptionMaxWaitTime = conf.getInt(TezConfiguration.TEZ_AM_PREEMPTION_MAX_WAIT_TIME_MS,
TezConfiguration.TEZ_AM_PREEMPTION_MAX_WAIT_TIME_MS_DEFAULT);
Preconditions.checkArgument(preemptionMaxWaitTime >=0, "Preemption max wait time must be >=0");
LOG.info("scheduler initialized with maxRMHeartbeatInterval:" + heartbeatIntervalMax +
" reuseEnabled:" + shouldReuseContainers +
" reuseRack:" + reuseRackLocal +
" reuseAny:" + reuseNonLocal +
" localityDelay:" + localitySchedulingDelay +
" preemptPercentage:" + preemptionPercentage +
" preemptMaxWaitTime:" + preemptionMaxWaitTime +
" numHeartbeatsBetweenPreemptions:" + numHeartbeatsBetweenPreemptions +
" idleContainerMinTimeout:" + idleContainerTimeoutMin +
" idleContainerMaxTimeout:" + idleContainerTimeoutMax +
" sessionMinHeldContainers:" + sessionNumMinHeldContainers);
}
@Override
public void start() throws Exception {
super.start();
client.start();
if (shouldReuseContainers) {
reuseExecutor = createExecutor();
}
TaskSchedulerContext ctx = getContext();
RegisterApplicationMasterResponse response = client.registerApplicationMaster(
ctx.getAppHostName(), ctx.getAppClientPort(), ctx.getAppTrackingUrl());
ctx.setApplicationRegistrationData(response.getMaximumResourceCapability(),
response.getApplicationACLs(), response.getClientToAMTokenMasterKey(),
response.getQueue());
if (response.getSchedulerResourceTypes().contains(SchedulerResourceTypes.CPU)) {
resourceCalculator = new MemCpuResourceCalculator();
} else {
resourceCalculator = new MemResourceCalculator();
}
}
protected ScheduledExecutorService createExecutor() {
return new ReuseContainerExecutor();
}
protected long now() {
return Time.monotonicNow();
}
@Override
public void initiateStop() {
super.initiateStop();
LOG.debug("Initiating stop of task scheduler");
stopRequested = true;
List<ContainerId> releasedLaunchedContainers;
synchronized (this) {
releasedLaunchedContainers = new ArrayList<>(heldContainers.size());
List<HeldContainer> heldList = new ArrayList<>(heldContainers.values());
for (HeldContainer hc : heldList) {
if (releaseContainer(hc)) {
releasedLaunchedContainers.add(hc.getId());
}
}
List<Object> tasks = requestTracker.getTasks();
for (Object task : tasks) {
removeTaskRequest(task);
}
}
// perform app callback outside of locks
for (ContainerId id : releasedLaunchedContainers) {
getContext().containerBeingReleased(id);
}
}
@Override
public void shutdown() throws Exception {
super.shutdown();
if (reuseExecutor != null) {
reuseExecutor.shutdown();
reuseExecutor.awaitTermination(2, TimeUnit.SECONDS);
}
synchronized (this) {
if (shouldUnregister && !hasUnregistered) {
TaskSchedulerContext.AppFinalStatus status = getContext().getFinalAppStatus();
LOG.info("Unregistering from RM, exitStatus={} exitMessage={} trackingURL={}",
status.exitStatus, status.exitMessage, status.postCompletionTrackingUrl);
client.unregisterApplicationMaster(status.exitStatus,
status.exitMessage,
status.postCompletionTrackingUrl);
hasUnregistered = true;
}
}
client.stop();
}
@Override
public void onContainersAllocated(List<Container> containers) {
AMState appState = getContext().getAMState();
if (stopRequested || appState == AMState.COMPLETED) {
LOG.info("Ignoring {} allocations since app is terminating", containers.size());
for (Container c : containers) {
client.releaseAssignedContainer(c.getId());
}
return;
}
List<Assignment> assignments = assignNewContainers(containers, getContext().getAMState(), getContext().isSession());
informAppAboutAssignments(assignments);
}
private synchronized List<Assignment> assignNewContainers(List<Container> newContainers,
AMState appState, boolean isSession) {
// try to assign the containers as node-local
List<Assignment> assignments = new ArrayList<>(newContainers.size());
List<HeldContainer> unassigned = new ArrayList<>(newContainers.size());
for (Container c : newContainers) {
HeldContainer hc = new HeldContainer(c);
heldContainers.put(hc.getId(), hc);
Resources.addTo(allocatedResources, c.getResource());
tryAssignNewContainer(hc, hc.getHost(), assignments, unassigned);
}
// try to assign the remaining containers as rack-local
List<HeldContainer> containers = unassigned;
unassigned = new ArrayList<>(containers.size());
for (HeldContainer hc : containers) {
tryAssignNewContainer(hc, hc.getRack(), assignments, unassigned);
}
// try to assign the remaining containers without locality
containers = unassigned;
unassigned = new ArrayList<>(containers.size());
for (HeldContainer hc : containers) {
tryAssignNewContainer(hc, ResourceRequest.ANY, assignments, unassigned);
}
for (HeldContainer hc : unassigned) {
if (reuseNewContainers) {
idleTracker.add(hc);
TaskRequest assigned = tryAssignReuseContainer(hc, appState, isSession);
if (assigned != null) {
assignments.add(new Assignment(assigned, hc.getContainer()));
}
} else {
releaseContainer(hc);
}
}
return assignments;
}
/**
* Try to assign a newly acquired container to a task of the same priority.
*
* @param hc the container to assign
* @param location the locality to consider for assignment
* @param assignments list to update if container is assigned
* @param unassigned list to update if container is not assigned
*/
@GuardedBy("this")
private void tryAssignNewContainer(HeldContainer hc, String location,
List<Assignment> assignments, List<HeldContainer> unassigned) {
List<? extends Collection<TaskRequest>> results = client.getMatchingRequests(hc.getPriority(),
location, hc.getCapability());
if (!results.isEmpty()) {
for (Collection<TaskRequest> requests : results) {
if (!requests.isEmpty()) {
TaskRequest request = requests.iterator().next();
if (maybeChangeNode(request, hc.getContainer().getNodeId())) {
continue;
}
assignContainer(request, hc, location);
assignments.add(new Assignment(request, hc.getContainer()));
return;
}
}
}
unassigned.add(hc);
}
@GuardedBy("this")
@Nullable
private TaskRequest tryAssignReuseContainer(HeldContainer hc,
AMState appState, boolean isSession) {
if (stopRequested) {
return null;
}
TaskRequest assignedRequest = null;
switch (appState) {
case IDLE:
handleReuseContainerWhenIdle(hc, isSession);
break;
case RUNNING_APP:
if (requestTracker.isEmpty()) {
// treat no requests as if app is idle
handleReuseContainerWhenIdle(hc, isSession);
} else {
assignedRequest = tryAssignReuseContainerAppRunning(hc);
if (assignedRequest == null) {
if (hc.atMaxMatchLevel()) {
LOG.info("Releasing idle container {} due to pending requests", hc.getId());
releaseContainer(hc);
} else {
hc.scheduleForReuse(localitySchedulingDelay);
}
}
}
break;
case COMPLETED:
LOG.info("Releasing container {} because app has completed", hc.getId());
releaseContainer(hc);
break;
default:
throw new IllegalStateException("Unexpected app state " + appState);
}
return assignedRequest;
}
@GuardedBy("this")
private void handleReuseContainerWhenIdle(HeldContainer hc, boolean isSession) {
if (isSession && sessionContainers.isEmpty() && sessionNumMinHeldContainers > 0) {
computeSessionContainers();
}
if (sessionContainers.contains(hc)) {
LOG.info("Retaining container {} since it is a session container");
hc.resetMatchingLevel();
} else {
long now = now();
long expiration = hc.getIdleExpirationTimestamp(now);
if (now >= expiration) {
LOG.info("Releasing expired idle container {}", hc.getId());
releaseContainer(hc);
} else {
hc.scheduleForReuse(expiration - now);
}
}
}
@GuardedBy("this")
@Nullable
private TaskRequest tryAssignReuseContainerAppRunning(HeldContainer hc) {
if (!hc.isAssignable()) {
LOG.debug("Skipping scheduling of container {} because it state is {}", hc.getId(), hc.getState());
return null;
}
TaskRequest assignedRequest = tryAssignReuseContainerForAffinity(hc);
if (assignedRequest != null) {
return assignedRequest;
}
for (Entry<Priority,RequestPriorityStats> entry : requestTracker.getStatsEntries()) {
Priority priority = entry.getKey();
RequestPriorityStats stats = entry.getValue();
if (!stats.allowedVertices.intersects(stats.vertices)) {
LOG.debug("Skipping requests at priority {} because all requesting vertices are blocked by higher priority requests",
priority);
continue;
}
String matchLocation = hc.getMatchingLocation();
if (stats.localityCount <= 0) {
LOG.debug("Overriding locality match of container {} to ANY since there are no locality requests at priority {}",
hc.getId(), priority);
matchLocation = ResourceRequest.ANY;
}
assignedRequest = tryAssignReuseContainerForPriority(hc, matchLocation,
priority, stats.allowedVertices);
if (assignedRequest != null) {
break;
}
}
return assignedRequest;
}
@GuardedBy("this")
@Nullable
private TaskRequest tryAssignReuseContainerForAffinity(HeldContainer hc) {
Collection<TaskRequest> affinities = hc.getAffinities();
if (affinities != null) {
for (TaskRequest request : affinities) {
if (requestTracker.isRequestBlocked(request)) {
LOG.debug("Cannot assign task {} to container {} since vertex {} is a descendant of pending tasks",
request.getTask(), hc.getId(), request.getVertexIndex());
} else if (maybeChangeNode(request, hc.getContainer().getNodeId())) {
LOG.debug("Cannot assign task {} to container {} since node {} is running sibling attempts",
request.getTask(), hc.getId(), request.getVertexIndex());
} else {
assignContainer(request, hc, hc.getId());
return request;
}
}
}
return null;
}
@GuardedBy("this")
@Nullable
private TaskRequest tryAssignReuseContainerForPriority(HeldContainer hc, String matchLocation,
Priority priority, BitSet allowedVertices) {
List<? extends Collection<TaskRequest>> results = client.getMatchingRequests(priority, matchLocation, hc.getCapability());
if (results.isEmpty()) {
return null;
}
for (Collection<TaskRequest> requests : results) {
for (TaskRequest request : requests) {
final int vertexIndex = request.getVertexIndex();
if (!allowedVertices.get(vertexIndex)) {
LOG.debug("Not assigning task {} since it is a descendant of a pending vertex", request.getTask());
continue;
}
Object signature = hc.getSignature();
if (signature == null || signatureMatcher.isSuperSet(signature, request.getContainerSignature())) {
if (!maybeChangeNode(request, hc.getContainer().getNodeId())) {
assignContainer(request, hc, matchLocation);
return request;
}
}
}
}
return null;
}
private void informAppAboutAssignments(List<Assignment> assignments) {
if (!assignments.isEmpty()) {
for (Assignment a : assignments) {
informAppAboutAssignment(a.request, a.container);
}
}
}
/**
* Inform the app about a task assignment. This should not be called with
* any locks held.
*
* @param request the corresponding task request
* @param container the container assigned to the task
*/
private void informAppAboutAssignment(TaskRequest request, Container container) {
if (blacklistedNodes.contains(container.getNodeId())) {
Object task = request.getTask();
LOG.info("Container {} allocated for task {} on blacklisted node {}",
container.getId(), container.getNodeId(), task);
deallocateContainer(container.getId());
// its ok to submit the same request again because the RM will not give us
// the bad/unhealthy nodes again. The nodes may become healthy/unblacklisted
// and so its better to give the RM the full information.
allocateTask(task, request.getCapability(),
(request.getNodes() == null ? null :
request.getNodes().toArray(new String[request.getNodes().size()])),
(request.getRacks() == null ? null :
request.getRacks().toArray(new String[request.getRacks().size()])),
request.getPriority(),
request.getContainerSignature(),
request.getCookie());
} else {
getContext().taskAllocated(request.getTask(), request.getCookie(), container);
}
}
@GuardedBy("this")
private void computeSessionContainers() {
Map<String, MutableInt> rackHeldNumber = new HashMap<>();
Map<String, List<HeldContainer>> nodeHeldContainers = new HashMap<>();
for(HeldContainer heldContainer : heldContainers.values()) {
if (heldContainer.getSignature() == null) {
// skip containers that have not been launched as there is no process to reuse
continue;
}
MutableInt count = rackHeldNumber.get(heldContainer.getRack());
if (count == null) {
count = new MutableInt(0);
rackHeldNumber.put(heldContainer.getRack(), count);
}
count.increment();
String host = heldContainer.getHost();
List<HeldContainer> nodeContainers = nodeHeldContainers.get(host);
if (nodeContainers == null) {
nodeContainers = new LinkedList<>();
nodeHeldContainers.put(host, nodeContainers);
}
nodeContainers.add(heldContainer);
}
Map<String, MutableInt> rackToHoldNumber = new HashMap<>();
for (String rack : rackHeldNumber.keySet()) {
rackToHoldNumber.put(rack, new MutableInt(0));
}
// distribute evenly across nodes
// the loop assigns 1 container per rack over all racks
int containerCount = 0;
while (containerCount < sessionNumMinHeldContainers && !rackHeldNumber.isEmpty()) {
Iterator<Entry<String, MutableInt>> iter = rackHeldNumber.entrySet().iterator();
while (containerCount < sessionNumMinHeldContainers && iter.hasNext()) {
Entry<String, MutableInt> entry = iter.next();
MutableInt rackCount = entry.getValue();
rackCount.decrement();
if (rackCount.intValue() >=0) {
containerCount++;
rackToHoldNumber.get(entry.getKey()).increment();
} else {
iter.remove();
}
}
}
// distribute containers evenly across nodes while not exceeding rack limit
// the loop assigns 1 container per node over all nodes
containerCount = 0;
while (containerCount < sessionNumMinHeldContainers && !nodeHeldContainers.isEmpty()) {
Iterator<Entry<String, List<HeldContainer>>> iter = nodeHeldContainers.entrySet().iterator();
while (containerCount < sessionNumMinHeldContainers && iter.hasNext()) {
List<HeldContainer> nodeContainers = iter.next().getValue();
if (nodeContainers.isEmpty()) {
// node is empty. remove it.
iter.remove();
continue;
}
HeldContainer heldContainer = nodeContainers.remove(nodeContainers.size() - 1);
MutableInt holdCount = rackToHoldNumber.get(heldContainer.getRack());
holdCount.decrement();
if (holdCount.intValue() >= 0) {
// rack can hold a container
containerCount++;
sessionContainers.add(heldContainer);
} else {
// rack limit reached. remove node.
iter.remove();
}
}
}
LOG.info("Identified {} session containers out of {} total containers",
sessionContainers.size(), heldContainers.size());
}
@GuardedBy("this")
private void activateSessionContainers() {
if (!sessionContainers.isEmpty()) {
for (HeldContainer hc : sessionContainers) {
if (hc.isAssignable()) {
hc.scheduleForReuse(localitySchedulingDelay);
}
}
sessionContainers.clear();
}
}
@Override
public void onContainersCompleted(List<ContainerStatus> statuses) {
if (stopRequested) {
return;
}
List<TaskStatus> taskStatusList = new ArrayList<>(statuses.size());
synchronized (this) {
for (ContainerStatus status : statuses) {
ContainerId cid = status.getContainerId();
LOG.info("Container {} completed with status {}", cid, status);
Object task = releasedContainers.remove(cid);
if (task == null) {
HeldContainer hc = heldContainers.get(cid);
if (hc != null) {
task = containerCompleted(hc);
}
}
if (task != null) {
taskStatusList.add(new TaskStatus(task, status));
}
}
}
// perform app callback outside of locks
for (TaskStatus taskStatus : taskStatusList) {
getContext().containerCompleted(taskStatus.task, taskStatus.status);
}
}
@Override
public void onNodesUpdated(List<NodeReport> updatedNodes) {
if (!stopRequested) {
getContext().nodesUpdated(updatedNodes);
}
}
@Override
public float getProgress() {
if (stopRequested) {
return 1;
}
Collection<ContainerId> preemptedContainers;
synchronized (this) {
Resource freeResources = getAvailableResources();
if (totalResources.getMemory() == 0) {
// assume this is the first allocate callback. nothing is allocated.
// available resource = totalResource
// TODO this will not handle dynamic changes in resources
totalResources = Resources.clone(freeResources);
LOG.info("App total resource memory: {} cpu: {} activeAssignments: {}",
totalResources.getMemory(), totalResources.getVirtualCores(), taskAssignments.size());
}
++numHeartbeats;
if (LOG.isDebugEnabled() || numHeartbeats % 50 == 1) {
LOG.info(constructPeriodicLog(freeResources));
}
preemptedContainers = maybePreempt(freeResources);
if (preemptedContainers != null && !preemptedContainers.isEmpty()) {
lastPreemptionHeartbeat = numHeartbeats;
}
}
// perform app callback outside of locks
if (preemptedContainers != null && !preemptedContainers.isEmpty()) {
for (ContainerId cid : preemptedContainers) {
LOG.info("Preempting container {} currently allocated to a task", cid);
getContext().preemptContainer(cid);
}
}
return getContext().getProgress();
}
@Override
public void onShutdownRequest() {
if (!stopRequested) {
getContext().appShutdownRequested();
}
}
@Override
public void onError(Throwable e) {
LOG.error("Error from ARMRMClient", e);
if (!stopRequested) {
getContext().reportError(YarnTaskSchedulerServiceError.RESOURCEMANAGER_ERROR,
StringUtils.stringifyException(e), null);
}
}
@Override
public Resource getAvailableResources() {
return client.getAvailableResources();
}
@Override
public Resource getTotalResources() {
return totalResources;
}
@Override
public int getClusterNodeCount() {
return client.getClusterNodeCount();
}
@Override
public synchronized void blacklistNode(NodeId nodeId) {
LOG.info("Blacklisting node: {}", nodeId);
blacklistedNodes.add(nodeId);
client.updateBlacklist(Collections.singletonList(nodeId.getHost()), null);
}
@Override
public synchronized void unblacklistNode(NodeId nodeId) {
if (blacklistedNodes.remove(nodeId)) {
LOG.info("Removing blacklist for node: {}", nodeId);
client.updateBlacklist(null, Collections.singletonList(nodeId.getHost()));
}
}
@Override
public void allocateTask(Object task, Resource capability, String[] hosts, String[] racks,
Priority priority, Object containerSignature, Object clientCookie) {
int vertexIndex = getContext().getVertexIndexForTask(task);
TaskRequest request = new TaskRequest(task, vertexIndex, capability, hosts, racks,
priority, containerSignature, clientCookie);
addTaskRequest(request);
}
@Override
public void allocateTask(Object task, Resource capability, ContainerId containerId,
Priority priority, Object containerSignature, Object clientCookie) {
String[] hosts = null;
synchronized (this) {
HeldContainer held = heldContainers.get(containerId);
if (held != null) {
if (held.canFit(capability)) {
hosts = new String[]{held.getHost()};
} else {
LOG.warn("Match request to container {} but {} does not fit in {}",
containerId, capability, held.getCapability());
containerId = null;
}
} else {
LOG.info("Ignoring match request to unknown container {}", containerId);
containerId = null;
}
}
int vertexIndex = getContext().getVertexIndexForTask(task);
TaskRequest request = new TaskRequest(task, vertexIndex, capability, hosts, null,
priority, containerSignature, clientCookie, containerId);
addTaskRequest(request);
}
@Override
public boolean deallocateTask(Object task, boolean taskSucceeded,
TaskAttemptEndReason endReason, String diagnostics) {
ContainerId releasedLaunchedContainer = null;
AMState appState = getContext().getAMState();
boolean isSession = getContext().isSession();
TaskRequest newAssignment = null;
HeldContainer hc;
synchronized (this) {
TaskRequest request = removeTaskRequest(task);
if (request != null) {
LOG.debug("Deallocating task {} before it was allocated", task);
return false;
}
hc = removeTaskAssignment(task);
if (hc != null) {
if (taskSucceeded && shouldReuseContainers) {
idleTracker.add(hc);
newAssignment = tryAssignReuseContainer(hc, appState, isSession);
if (newAssignment == null && hc.isReleasedAndUsed()) {
releasedLaunchedContainer = hc.getId();
}
} else {
if (releaseContainer(hc)) {
releasedLaunchedContainer = hc.getId();
}
}
}
}
// perform app callback outside of locks
if (newAssignment != null) {
informAppAboutAssignment(newAssignment, hc.getContainer());
return true;
}
if (releasedLaunchedContainer != null) {
getContext().containerBeingReleased(releasedLaunchedContainer);
return true;
}
return hc != null;
}
@Override
public Object deallocateContainer(ContainerId containerId) {
Object task = null;
ContainerId releasedLaunchedContainer = null;
synchronized (this) {
HeldContainer hc = heldContainers.remove(containerId);
if (hc != null) {
task = hc.getAssignedTask();
if (task != null) {
LOG.info("Deallocated container {} from task {}", containerId, task);
}
if (releaseContainer(hc)) {
releasedLaunchedContainer = hc.getId();
}
} else {
LOG.info("Ignoring deallocation of unknown container {}", containerId);
}
}
// perform app callback outside of locks
if (releasedLaunchedContainer != null) {
getContext().containerBeingReleased(releasedLaunchedContainer);
}
return task;
}
@GuardedBy("this")
private void assignContainer(TaskRequest request, HeldContainer hc, Object match) {
LOG.info("Assigning container {} to task {} host={} priority={} capability={} match={} lastTask={}",
hc.getId(), request.getTask(), hc.getHost(), hc.getPriority(), hc.getCapability(), match, hc.getLastTask());
removeTaskRequest(request.getTask());
addTaskAssignment(request, hc);
idleTracker.remove(hc);
}
private synchronized boolean releaseContainer(HeldContainer hc) {
Object task = containerCompleted(hc);
client.releaseAssignedContainer(hc.getId());
if (task != null) {
releasedContainers.put(hc.getId(), task);
return true;
}
return false;
}
@GuardedBy("this")
private void addTaskAssignment(TaskRequest request, HeldContainer hc) {
HeldContainer oldContainer = taskAssignments.put(request.getTask(), hc);
if (oldContainer != null) {
LOG.error("Task {} being assigned to container {} but was already assigned to container {}",
request.getTask(), hc.getId(), oldContainer.getId());
}
Integer vertexIndex = request.vertexIndex;
Set<HeldContainer> cset = vertexAssignments.get(vertexIndex);
if (cset == null) {
cset = new HashSet<>();
vertexAssignments.put(vertexIndex, cset);
assignedVertices.set(vertexIndex);
}
cset.add(hc);
hc.assignTask(request);
}
@GuardedBy("this")
private HeldContainer removeTaskAssignment(Object task) {
HeldContainer hc = taskAssignments.remove(task);
if (hc != null) {
TaskRequest request = hc.removeAssignment();
if (request != null) {
Integer vertexIndex = request.vertexIndex;
Set<HeldContainer> cset = vertexAssignments.get(vertexIndex);
if (cset != null && cset.remove(hc) && cset.isEmpty()) {
vertexAssignments.remove(vertexIndex);
assignedVertices.clear(vertexIndex);
}
} else {
LOG.error("Container {} had assigned task {} but no request?!?", hc.getId(), task);
}
}
return hc;
}
@GuardedBy("this")
@Nullable
private Object containerCompleted(HeldContainer hc) {
idleTracker.remove(hc);
heldContainers.remove(hc.getId());
Resources.subtractFrom(allocatedResources, hc.getCapability());
removeTaskAssignment(hc.getAssignedTask());
hc.released();
return hc.getLastTask();
}
@GuardedBy("this")
private void ensureVertexDescendants() {
if (vertexDescendants == null) {
DagInfo info = getContext().getCurrentDagInfo();
if (info == null) {
throw new IllegalStateException("Scheduling tasks but no current DAG info?");
}
int numVertices = info.getTotalVertices();
ArrayList<BitSet> descendants = new ArrayList<>(numVertices);
for (int i = 0; i < numVertices; ++i) {
descendants.add(info.getVertexDescendants(i));
}
vertexDescendants = descendants;
}
}
private void addTaskRequest(TaskRequest request) {
Container assignedContainer = null;
synchronized (this) {
if (shouldReuseContainers && !stopRequested && getContext().getAMState() != AMState.COMPLETED) {
ensureVertexDescendants();
activateSessionContainers();
HeldContainer hc = tryAssignTaskToIdleContainer(request);
if (hc != null) {
assignedContainer = hc.getContainer();
}
}
if (assignedContainer == null) {
ensureVertexDescendants();
TaskRequest old = requestTracker.add(request);
if (old != null) {
removeTaskRequestByRequest(request);
}
client.addContainerRequest(request);
HeldContainer hc = heldContainers.get(request.getAffinity());
if (hc != null) {
hc.addAffinity(request);
}
}
}
// perform app callback outside of locks
if (assignedContainer != null) {
informAppAboutAssignment(request, assignedContainer);
}
}
@Nullable
private synchronized TaskRequest removeTaskRequest(Object task) {
TaskRequest request = requestTracker.remove(task);
if (request != null) {
removeTaskRequestByRequest(request);
}
return request;
}
@GuardedBy("this")
private void removeTaskRequestByRequest(TaskRequest request) {
client.removeContainerRequest(request);
HeldContainer hc = heldContainers.get(request.getAffinity());
if (hc != null) {
hc.removeAffinity(request);
}
}
@GuardedBy("this")
@Nullable
private HeldContainer tryAssignTaskToIdleContainer(TaskRequest request) {
if (requestTracker.isRequestBlocked(request)) {
LOG.debug("Cannot assign task {} to an idle container since vertex {} is a descendant of pending tasks",
request.getTask(), request.getVertexIndex());
return null;
}
// check if container affinity can be satisfied immediately
ContainerId affinity = request.getAffinity();
if (affinity != null) {
HeldContainer hc = heldContainers.get(affinity);
if (hc != null && hc.isAssignable() && !maybeChangeNode(request, hc.getContainer().getNodeId())) {
assignContainer(request, hc, affinity);
return hc;
}
}
// try to match the task against idle containers in order from best locality to worst
HeldContainer hc;
if (request.hasLocality()) {
hc = tryAssignTaskToIdleContainer(request, request.getNodes(), HeldContainerState.MATCHES_LOCAL_STATES);
if (hc == null) {
hc = tryAssignTaskToIdleContainer(request, request.getRacks(), HeldContainerState.MATCHES_RACK_STATES);
if (hc == null) {
hc = tryAssignTaskToIdleContainer(request, ResourceRequest.ANY, HeldContainerState.MATCHES_ANY_STATES);
}
}
} else {
hc = tryAssignTaskToIdleContainer(request, ResourceRequest.ANY, HeldContainerState.MATCHES_LOCAL_STATES);
}
return hc;
}
@GuardedBy("this")
@Nullable
private HeldContainer tryAssignTaskToIdleContainer(TaskRequest request,
List<String> locations, EnumSet<HeldContainerState> eligibleStates) {
if (locations != null && !locations.isEmpty()) {
for (String location : locations) {
HeldContainer hc = tryAssignTaskToIdleContainer(request, location, eligibleStates);
if (hc != null) {
return hc;
}
}
}
return null;
}
@GuardedBy("this")
@Nullable
private HeldContainer tryAssignTaskToIdleContainer(TaskRequest request,
String location, EnumSet<HeldContainerState> eligibleStates) {
Set<HeldContainer> containers = idleTracker.getByLocation(location);
HeldContainer bestMatch = null;
if (containers != null && !containers.isEmpty()) {
for (HeldContainer hc : containers) {
if (eligibleStates.contains(hc.getState())) {
Object csig = hc.getSignature();
if (csig == null || signatureMatcher.isSuperSet(csig, request.getContainerSignature())) {
boolean needToChangeNode = maybeChangeNode(request, hc.getContainer().getNodeId());
int numAffinities = hc.getNumAffinities();
if (numAffinities == 0 && !needToChangeNode) {
bestMatch = hc;
break;
}
if ((bestMatch == null || numAffinities < bestMatch.getNumAffinities()) && !needToChangeNode) {
bestMatch = hc;
}
} else {
LOG.debug("Unable to assign task {} to container {} due to signature mismatch", request.getTask(), hc.getId());
}
}
}
}
if (bestMatch != null) {
assignContainer(request, bestMatch, location);
}
return bestMatch;
}
private boolean maybeChangeNode(TaskRequest request, NodeId nodeId) {
Object task = request.getTask();
if (task instanceof TaskAttempt) {
Set<NodeId> nodesWithSiblingRunningAttempts = ((TaskAttempt) task).getTask().getNodesWithRunningAttempts();
if (nodesWithSiblingRunningAttempts != null
&& nodesWithSiblingRunningAttempts.contains(nodeId)) {
return true;
}
}
return false;
}
@Override
public void setShouldUnregister() {
shouldUnregister = true;
}
@Override
public boolean hasUnregistered() {
return hasUnregistered;
}
@Override
public synchronized void dagComplete() {
for (HeldContainer hc : sessionContainers) {
hc.resetMatchingLevel();
}
vertexDescendants = null;
}
@GuardedBy("this")
@Nullable
private Collection<ContainerId> maybePreempt(Resource freeResources) {
if (preemptionPercentage == 0 || numHeartbeats - lastPreemptionHeartbeat < numHeartbeatsBetweenPreemptions) {
return null;
}
if (!requestTracker.isPreemptionDeadlineExpired() && requestTracker.fitsHighestPriorityRequest(freeResources)) {
if (numHeartbeats % 50 == 1) {
LOG.info("Highest priority request fits in free resources {}", freeResources);
}
return null;
}
int numIdleContainers = idleTracker.getNumContainers();
if (numIdleContainers > 0) {
if (numHeartbeats % 50 == 1) {
LOG.info("Avoiding preemption since there are {} idle containers", numIdleContainers);
}
return null;
}
BitSet blocked = requestTracker.createVertexBlockedSet();
if (!blocked.intersects(assignedVertices)) {
if (numHeartbeats % 50 == 1) {
LOG.info("Avoiding preemption since there are no descendants of the highest priority requests running");
}
return null;
}
Resource preemptLeft = requestTracker.getAmountToPreempt(preemptionPercentage);
if (!resourceCalculator.anyAvailable(preemptLeft)) {
if (numHeartbeats % 50 == 1) {
LOG.info("Avoiding preemption since amount to preempt is {}", preemptLeft);
}
return null;
}
PriorityQueue<HeldContainer> candidates = new PriorityQueue<>(11, PREEMPT_ORDER_COMPARATOR);
blocked.and(assignedVertices);
for (int i = blocked.nextSetBit(0); i >= 0; i = blocked.nextSetBit(i + 1)) {
Collection<HeldContainer> containers = vertexAssignments.get(i);
if (containers != null) {
candidates.addAll(containers);
} else {
LOG.error("Vertex {} in assignedVertices but no assignments?", i);
}
}
ArrayList<ContainerId> preemptedContainers = new ArrayList<>();
HeldContainer hc;
while ((hc = candidates.poll()) != null) {
LOG.info("Preempting container {} currently allocated to task {}", hc.getId(), hc.getAssignedTask());
preemptedContainers.add(hc.getId());
resourceCalculator.deductFrom(preemptLeft, hc.getCapability());
if (!resourceCalculator.anyAvailable(preemptLeft)) {
break;
}
}
return preemptedContainers;
}
@GuardedBy("this")
private String constructPeriodicLog(Resource freeResource) {
Priority highestPriority = requestTracker.getHighestPriority();
return "Allocated: " + allocatedResources +
" Free: " + freeResource +
" pendingRequests: " + requestTracker.getNumRequests() +
" heldContainers: " + heldContainers.size() +
" heartbeats: " + numHeartbeats +
" lastPreemptionHeartbeat: " + lastPreemptionHeartbeat +
((highestPriority != null) ?
(" highestWaitingRequestWaitStartTime: " + requestTracker.getHighestPriorityWaitTimestamp() +
" highestWaitingRequestPriority: " + highestPriority) : "");
}
@VisibleForTesting
int getNumBlacklistedNodes() {
return blacklistedNodes.size();
}
@VisibleForTesting
Collection<HeldContainer> getSessionContainers() {
return sessionContainers;
}
// Wrapper class to work around lack of blacklisting APIs in async client.
// This can be removed once Tez requires YARN >= 2.7.0
static class AMRMClientAsyncWrapper extends AMRMClientAsyncImpl<TaskRequest> {
AMRMClientAsyncWrapper(AMRMClient<TaskRequest> syncClient, int intervalMs, CallbackHandler handler) {
super(syncClient, intervalMs, handler);
}
public void updateBlacklist(List<String> additions, List<String> removals) {
client.updateBlacklist(additions, removals);
}
}
/**
* A utility class to track a task allocation.
*/
static class TaskRequest extends AMRMClient.ContainerRequest {
final Object task;
final int vertexIndex;
final Object signature;
final Object cookie;
final ContainerId affinityContainerId;
TaskRequest(Object task, int vertexIndex, Resource capability, String[] hosts, String[] racks,
Priority priority, Object signature, Object cookie) {
this(task, vertexIndex, capability, hosts, racks, priority, signature, cookie, null);
}
TaskRequest(Object task, int vertexIndex, Resource capability, String[] hosts, String[] racks,
Priority priority, Object signature, Object cookie, ContainerId affinityContainerId) {
super(capability, hosts, racks, priority);
this.task = task;
this.vertexIndex = vertexIndex;
this.signature = signature;
this.cookie = cookie;
this.affinityContainerId = affinityContainerId;
}
Object getTask() {
return task;
}
int getVertexIndex() {
return vertexIndex;
}
Object getContainerSignature() {
return signature;
}
Object getCookie() {
return cookie;
}
@Nullable
ContainerId getAffinity() {
return affinityContainerId;
}
boolean hasLocality() {
List<String> nodes = getNodes();
List<String> racks = getRacks();
return (nodes != null && !nodes.isEmpty()) || (racks != null && !racks.isEmpty());
}
}
private enum HeldContainerState {
MATCHING_LOCAL(true),
MATCHING_RACK(true),
MATCHING_ANY(true),
ASSIGNED(false),
RELEASED(false);
private static final EnumSet<HeldContainerState> MATCHES_LOCAL_STATES = EnumSet.of(
HeldContainerState.MATCHING_LOCAL, HeldContainerState.MATCHING_RACK, HeldContainerState.MATCHING_ANY);
private static final EnumSet<HeldContainerState> MATCHES_RACK_STATES = EnumSet.of(
HeldContainerState.MATCHING_RACK, HeldContainerState.MATCHING_ANY);
private static final EnumSet<HeldContainerState> MATCHES_ANY_STATES = EnumSet.of(HeldContainerState.MATCHING_ANY);
private final boolean assignable;
HeldContainerState(boolean assignable) {
this.assignable = assignable;
}
boolean isAssignable() {
return assignable;
}
}
/**
* Tracking for an allocated container.
*/
@VisibleForTesting
class HeldContainer implements Callable<Void> {
final Container container;
final String rack;
@GuardedBy("DagAwareYarnTaskScheduler.this")
HeldContainerState state = HeldContainerState.MATCHING_LOCAL;
/** The Future received when scheduling an idle container for re-allocation at a later time. */
@GuardedBy("DagAwareYarnTaskScheduler.this")
Future<Void> future = null;
/** The collection of task requests that have specified this container as a scheduling affinity. */
@GuardedBy("DagAwareYarnTaskScheduler.this")
Collection<TaskRequest> affinities = null;
/**
* The task request corresponding to the currently assigned task to this container.
* This field is null when the container is not currently assigned.
*/
@GuardedBy("DagAwareYarnTaskScheduler.this")
TaskRequest assignedRequest = null;
/** The task request corresponding to the last task that was assigned to this container. */
@GuardedBy("DagAwareYarnTaskScheduler.this")
TaskRequest lastRequest = null;
/** The timestamp when the idle container will expire. 0 if the container is not idle. */
@GuardedBy("DagAwareYarnTaskScheduler.this")
long idleExpirationTimestamp = 0;
/** The timestamp when this container was assigned. 0 if the container is not assigned. */
@GuardedBy("DagAwareYarnTaskScheduler.this")
long assignmentTimestamp = 0;
HeldContainer(Container container) {
this.container = container;
this.rack = RackResolver.resolve(container.getNodeId().getHost()).getNetworkLocation();
}
HeldContainerState getState() {
return state;
}
boolean isAssignable() {
return state.isAssignable();
}
boolean isReleasedAndUsed() {
return state == HeldContainerState.RELEASED && getLastTask() != null;
}
Container getContainer() {
return container;
}
ContainerId getId() {
return container.getId();
}
String getHost() {
return container.getNodeId().getHost();
}
String getRack() {
return rack;
}
Priority getPriority() {
return container.getPriority();
}
Resource getCapability() {
return container.getResource();
}
@Nullable
Object getAssignedTask() {
return assignedRequest != null ? assignedRequest.getTask() : null;
}
void assignTask(TaskRequest request) {
assert state != HeldContainerState.ASSIGNED && state != HeldContainerState.RELEASED;
if (assignedRequest != null) {
LOG.error("Container {} assigned task {} but already running task {}",
getId(), request.getTask(), assignedRequest.getTask());
}
assignedRequest = request;
lastRequest = request;
state = HeldContainerState.ASSIGNED;
idleExpirationTimestamp = 0;
assignmentTimestamp = now();
if (future != null) {
future.cancel(false);
future = null;
}
}
TaskRequest removeAssignment() {
assert state == HeldContainerState.ASSIGNED;
TaskRequest result = assignedRequest;
assignedRequest = null;
assignmentTimestamp = 0;
state = HeldContainerState.MATCHING_LOCAL;
return result;
}
void addAffinity(TaskRequest request) {
if (affinities == null) {
affinities = new HashSet<>();
}
affinities.add(request);
}
void removeAffinity(TaskRequest request) {
if (affinities != null && affinities.remove(request) && affinities.isEmpty()) {
affinities = null;
}
}
int getNumAffinities() {
return affinities != null ? affinities.size() : 0;
}
@Nullable
Collection<TaskRequest> getAffinities() {
return affinities;
}
void scheduleForReuse(long delayMillis) {
assert state != HeldContainerState.ASSIGNED && state != HeldContainerState.RELEASED;
try {
if (future != null) {
future.cancel(false);
}
future = reuseExecutor.schedule(this, delayMillis, TimeUnit.MILLISECONDS);
} catch (RejectedExecutionException e) {
if (!stopRequested) {
LOG.error("Container {} could not be scheduled for reuse!", getId(), e);
}
}
}
@Nullable
Object getSignature() {
return lastRequest != null ? lastRequest.getContainerSignature() : null;
}
@Nullable
Object getLastTask() {
return lastRequest != null ? lastRequest.getTask() : null;
}
String getMatchingLocation() {
switch (state) {
case MATCHING_LOCAL:
return getHost();
case MATCHING_RACK:
return getRack();
case MATCHING_ANY:
return ResourceRequest.ANY;
default:
throw new IllegalStateException("Container " + getId() + " trying to match in state " + state);
}
}
void moveToNextMatchingLevel() {
switch (state) {
case MATCHING_LOCAL:
if (reuseRackLocal) {
state = HeldContainerState.MATCHING_RACK;
}
break;
case MATCHING_RACK:
if (reuseNonLocal) {
state = HeldContainerState.MATCHING_ANY;
}
break;
case MATCHING_ANY:
break;
default:
throw new IllegalStateException("Container " + getId() + " trying to match in state " + state);
}
}
boolean atMaxMatchLevel() {
switch (state) {
case MATCHING_LOCAL:
return !reuseRackLocal;
case MATCHING_RACK:
return !reuseNonLocal;
case MATCHING_ANY:
return true;
default:
throw new IllegalStateException("Container " + getId() + " trying to match in state " + state);
}
}
void resetMatchingLevel() {
if (isAssignable()) {
state = HeldContainerState.MATCHING_LOCAL;
}
}
long getIdleExpirationTimestamp(long now) {
if (idleExpirationTimestamp == 0) {
if (idleContainerTimeoutMin > 0) {
idleExpirationTimestamp = now + (idleContainerTimeoutMin == idleContainerTimeoutMax ? idleContainerTimeoutMin
: random.nextLong(idleContainerTimeoutMin, idleContainerTimeoutMax));
} else {
idleExpirationTimestamp = Long.MAX_VALUE;
}
}
return idleExpirationTimestamp;
}
long getAssignmentTimestamp() {
return assignmentTimestamp;
}
boolean canFit(Resource capability) {
Resource cr = container.getResource();
return cr.getMemory() >= capability.getMemory() && cr.getVirtualCores() >= capability.getVirtualCores();
}
@Override
public Void call() throws Exception {
AMState appState = getContext().getAMState();
boolean isSession = getContext().isSession();
TaskRequest assigned = null;
ContainerId released = null;
synchronized (DagAwareYarnTaskScheduler.this) {
future = null;
if (isAssignable()) {
moveToNextMatchingLevel();
assigned = tryAssignReuseContainer(this, appState, isSession);
if (assigned == null && isReleasedAndUsed()) {
released = getId();
}
}
}
if (assigned != null) {
informAppAboutAssignment(assigned, container);
}
if (released != null) {
getContext().containerBeingReleased(released);
}
return null;
}
void released() {
assert state != HeldContainerState.RELEASED;
state = HeldContainerState.RELEASED;
if (future != null) {
future.cancel(false);
}
future = null;
}
}
/**
* Utility comparator to order containers by assignment timestamp from
* most recent to least recent.
*/
private static class PreemptOrderComparator implements Comparator<HeldContainer> {
@Override
public int compare(HeldContainer o1, HeldContainer o2) {
long timestamp1 = o1.getAssignmentTimestamp();
if (timestamp1 == 0) {
timestamp1 = Long.MAX_VALUE;
}
long timestamp2 = o2.getAssignmentTimestamp();
if (timestamp2 == 0) {
timestamp2 = Long.MAX_VALUE;
}
return Long.compare(timestamp2, timestamp1);
}
}
/**
* Utility class for a request, container pair
*/
private static class Assignment {
final TaskRequest request;
final Container container;
Assignment(TaskRequest request, Container container) {
this.request = request;
this.container = container;
}
}
/**
* Utility class for a task, container exit status pair
*/
private static class TaskStatus {
final Object task;
final ContainerStatus status;
TaskStatus(Object task, ContainerStatus status) {
this.task = task;
this.status = status;
}
}
/**
* The task allocation request tracker tracks task allocations
* and keeps statistics on which priorities have requests and which vertices
* should be blocked from container reuse due to DAG topology.
*/
private class RequestTracker {
private final Map<Object, TaskRequest> requests = new HashMap<>();
/** request map ordered by priority with highest priority first */
private final NavigableMap<Priority, RequestPriorityStats> priorityStats =
new TreeMap<>(Collections.reverseOrder());
private Priority highestPriority = null;
private long highestPriorityWaitTimestamp = 0;
@GuardedBy("DagAwareYarnTaskScheduler.this")
@Nullable
TaskRequest add(TaskRequest request) {
TaskRequest oldRequest = requests.put(request.getTask(), request);
Priority priority = request.getPriority();
RequestPriorityStats stats = priorityStats.get(priority);
if (stats == null) {
stats = addStatsForPriority(priority);
}
++stats.requestCount;
if (request.hasLocality()) {
++stats.localityCount;
}
incrVertexTaskCount(priority, stats, request.getVertexIndex());
if (oldRequest != null) {
updateStatsForRemoval(oldRequest);
}
return oldRequest;
}
@GuardedBy("DagAwareYarnTaskScheduler.this")
@Nullable
TaskRequest remove(Object task) {
TaskRequest request = requests.remove(task);
if (request != null) {
updateStatsForRemoval(request);
return request;
}
return null;
}
private RequestPriorityStats addStatsForPriority(Priority priority) {
BitSet allowedVerts = new BitSet(vertexDescendants.size());
Entry<Priority,RequestPriorityStats> lowerEntry = priorityStats.lowerEntry(priority);
if (lowerEntry != null) {
// initialize the allowed vertices BitSet using the information derived
// from the next higher priority entry
RequestPriorityStats priorStats = lowerEntry.getValue();
allowedVerts.or(priorStats.allowedVertices);
allowedVerts.andNot(priorStats.descendants);
} else {
// no higher priority entry so this priority is currently the highest
highestPriority = priority;
highestPriorityWaitTimestamp = now();
allowedVerts.set(0, vertexDescendants.size());
}
RequestPriorityStats stats = new RequestPriorityStats(vertexDescendants.size(), allowedVerts);
priorityStats.put(priority, stats);
return stats;
}
private void updateStatsForRemoval(TaskRequest request) {
Priority priority = request.getPriority();
RequestPriorityStats stats = priorityStats.get(priority);
decrVertexTaskCount(priority, stats, request.getVertexIndex());
--stats.requestCount;
if (request.hasLocality()) {
--stats.localityCount;
}
if (stats.requestCount == 0) {
priorityStats.remove(priority);
if (highestPriority.equals(priority)) {
if (priorityStats.isEmpty()) {
highestPriority = null;
highestPriorityWaitTimestamp = 0;
} else {
highestPriority = priorityStats.firstKey();
highestPriorityWaitTimestamp = now();
}
}
}
}
@GuardedBy("DagAwareYarnTaskScheduler.this")
boolean isEmpty() {
return requests.isEmpty();
}
@GuardedBy("DagAwareYarnTaskScheduler.this")
int getNumRequests() {
return requests.size();
}
@GuardedBy("DagAwareYarnTaskScheduler.this")
List<Object> getTasks() {
return new ArrayList<>(requests.keySet());
}
@GuardedBy("DagAwareYarnTaskScheduler.this")
Collection<Entry<Priority, RequestPriorityStats>> getStatsEntries() {
return priorityStats.entrySet();
}
@GuardedBy("DagAwareYarnTaskScheduler.this")
@Nullable
Priority getHighestPriority() {
if (priorityStats.isEmpty()) {
return null;
}
return priorityStats.firstKey();
}
@GuardedBy("DagAwareYarnTaskScheduler.this")
long getHighestPriorityWaitTimestamp() {
return highestPriorityWaitTimestamp;
}
@GuardedBy("DagAwareYarnTaskScheduler.this")
boolean isRequestBlocked(TaskRequest request) {
Entry<Priority, RequestPriorityStats> entry = priorityStats.floorEntry(request.getPriority());
if (entry != null) {
RequestPriorityStats stats = entry.getValue();
int vertexIndex = request.getVertexIndex();
return !stats.allowedVertices.get(vertexIndex) || stats.descendants.get(vertexIndex);
}
return false;
}
private void incrVertexTaskCount(Priority priority, RequestPriorityStats stats, int vertexIndex) {
Integer vertexIndexInt = vertexIndex;
MutableInt taskCount = stats.vertexTaskCount.get(vertexIndexInt);
if (taskCount != null) {
taskCount.increment();
} else {
addVertexToRequestStats(priority, stats, vertexIndexInt);
}
}
private void decrVertexTaskCount(Priority priority, RequestPriorityStats stats, int vertexIndex) {
Integer vertexIndexInt = vertexIndex;
MutableInt taskCount = stats.vertexTaskCount.get(vertexIndexInt);
taskCount.decrement();
if (taskCount.intValue() <= 0) {
removeVertexFromRequestStats(priority, stats, vertexIndexInt);
}
}
/**
* Add a new vertex to a RequestPriorityStats.
*
* Adding a vertex to the request stats requires updating the stats descendants bitmask to include the descendants
* of the new vertex and also updating the allowedVertices bitmask for all lower priority requests to prevent any
* task request from a descendant vertex in the DAG from being allocated. This avoids assigning allocations to
* lower priority requests when a higher priority request of an ancestor is still pending, but it allows lower
* priority requests to be satisfied if higher priority requests are not ancestors. This is particularly useful
* for DAGs that have independent trees of vertices or significant, parallel branches within a tree.
*
* Requests are blocked by taking the specified vertex's full descendant vertex bitmask in vertexDescendants and
* clearing those bits for all lower priority requests. For the following example DAG where each vertex index
* corresponds to its letter position (i.e.: A=0, B=1, C=2, etc.)
*
* A
* |
* C---B----E
* | |
* D F
* |
* G---H
*
* Vertices F, G, and H are descendants of E but all other vertices are not. The vertexDescendants bitmask for
* vertex E is therefore 11100000b or 0xE0. When the first vertex E task request arrives we need to disallow
* requests for all descendants of E. That is accomplished by iterating through the request stats for all lower
* priority requests and clearing the allowedVertex bits corresponding to the descendants,
* i.e: allowedVertices = allowedVertices & ~descendants
*/
private void addVertexToRequestStats(Priority priority, RequestPriorityStats stats, Integer vertexIndexInt) {
// Creating a new vertex entry for this priority, so the allowed vertices for all
// lower priorities need to be updated based on the descendants of the new vertex.
stats.vertexTaskCount.put(vertexIndexInt, new MutableInt(1));
int vertexIndex = vertexIndexInt;
stats.vertices.set(vertexIndex);
BitSet d = vertexDescendants.get(vertexIndex);
stats.descendants.or(d);
for (RequestPriorityStats lowerStat : priorityStats.tailMap(priority, false).values()) {
lowerStat.allowedVertices.andNot(d);
}
}
/**
* Removes a vertex from a RequestPriorityStats.
*
* Removing a vertex is more expensive than adding a vertex. The stats contain bitmasks which only store on/off
* values rather than reference counts. Therefore we must rebuild the descendants bitmasks from the remaining
* vertices in the request stats. Once the new descendants mask is computed we then need to rebuild the
* allowedVertices BitSet for all lower priority request stats in case the removal of this vertex unblocks lower
* priority requests of a descendant vertex.
*
* Rebuilding allowedVertices for the lower priorities involves starting with the allowedVertices mask at the
* current priority then masking off the descendants at each priority level encountered, accumulating the results.
* Any descendants of a level will be blocked at all lower levels. See the addVertexToRequestStats documentation
* for details on how vertices map to the descendants and allowedVertices bit masks.
*/
private void removeVertexFromRequestStats(Priority priority, RequestPriorityStats stats, Integer vertexIndexInt) {
stats.vertexTaskCount.remove(vertexIndexInt);
int vertexIndex = vertexIndexInt;
stats.vertices.clear(vertexIndex);
// Rebuild the descendants BitSet for the remaining vertices at this priority.
stats.descendants.clear();
for (Integer vIndex : stats.vertexTaskCount.keySet()) {
stats.descendants.or(vertexDescendants.get(vIndex));
}
// The allowedVertices for all lower priorities need to be recalculated where the vertex descendants at each
// level are removed from the list of allowed vertices at all subsequent levels.
Collection<RequestPriorityStats> tailStats = priorityStats.tailMap(priority, false).values();
if (!tailStats.isEmpty()) {
BitSet cumulativeAllowed = new BitSet(vertexDescendants.size());
cumulativeAllowed.or(stats.allowedVertices);
cumulativeAllowed.andNot(stats.descendants);
for (RequestPriorityStats s : tailStats) {
s.allowedVertices.clear();
s.allowedVertices.or(cumulativeAllowed);
cumulativeAllowed.andNot(s.descendants);
}
}
}
@GuardedBy("DagAwareYarnTaskScheduler.this")
boolean isPreemptionDeadlineExpired() {
return highestPriorityWaitTimestamp != 0
&& now() - highestPriorityWaitTimestamp > preemptionMaxWaitTime;
}
@GuardedBy("DagAwareYarnTaskScheduler.this")
boolean fitsHighestPriorityRequest(Resource freeResources) {
if (priorityStats.isEmpty()) {
return true;
}
Priority priority = priorityStats.firstKey();
List<? extends Collection> requestsList = client.getMatchingRequests(
priority, ResourceRequest.ANY, freeResources);
return !requestsList.isEmpty();
}
@GuardedBy("DagAwareYarnTaskScheduler.this")
Resource getAmountToPreempt(int preemptionPercentage) {
if (priorityStats.isEmpty()) {
return Resources.none();
}
Priority priority = priorityStats.firstKey();
List<? extends Collection<TaskRequest>> requestsList = client.getMatchingRequests(
priority, ResourceRequest.ANY, Resources.unbounded());
int numRequests = 0;
for (Collection<TaskRequest> requests : requestsList) {
numRequests += requests.size();
}
numRequests = (int) Math.ceil(numRequests * (preemptionPercentage / 100.f));
Resource toPreempt = Resource.newInstance(0, 0);
if (numRequests != 0) {
outer_loop:
for (Collection<TaskRequest> requests : requestsList) {
for (TaskRequest request : requests) {
Resources.addTo(toPreempt, request.getCapability());
if (--numRequests == 0) {
break outer_loop;
}
}
}
}
return toPreempt;
}
// Create a new BitSet that represents all of the vertices that should not be
// scheduled due to outstanding requests from higher priority predecessor vertices.
@GuardedBy("DagAwareYarnTaskScheduler.this")
BitSet createVertexBlockedSet() {
BitSet blocked = new BitSet(vertexDescendants.size());
Entry<Priority, RequestPriorityStats> entry = priorityStats.lastEntry();
if (entry != null) {
RequestPriorityStats stats = entry.getValue();
blocked.or(stats.allowedVertices);
blocked.flip(0, blocked.size());
blocked.or(stats.descendants);
}
return blocked;
}
}
/**
* Tracks statistics on vertices that are requesting tasks at a particular priority
*/
private static class RequestPriorityStats {
/** Map from vertex ID to number of task requests for that vertex */
final Map<Integer, MutableInt> vertexTaskCount = new HashMap<>();
/** BitSet of vertices that have oustanding requests at this priority */
final BitSet vertices;
/** BitSet of vertices that are descendants of this vertex */
final BitSet descendants;
/**
* BitSet of vertices that are allowed to be scheduled at this priority
* (i.e.: no oustanding predecessors requesting at higher priorities)
*/
final BitSet allowedVertices;
int requestCount = 0;
int localityCount = 0;
RequestPriorityStats(int numTotalVertices, BitSet allowedVertices) {
this.vertices = new BitSet(numTotalVertices);
this.descendants = new BitSet(numTotalVertices);
this.allowedVertices = allowedVertices;
}
}
/**
* Tracks idle containers and facilitates faster matching of task requests
* against those containers given a desired location.
*/
private static class IdleContainerTracker {
/**
* Map of location ID (e.g.: a specific host, rack, or ANY) to set of
* idle containers matching that location
*/
final Map<String, Set<HeldContainer>> containersByLocation = new HashMap<>();
int numContainers = 0;
@GuardedBy("DagAwareYarnTaskScheduler.this")
void add(HeldContainer hc) {
add(hc, hc.getHost());
add(hc, hc.getRack());
add(hc, ResourceRequest.ANY);
++numContainers;
}
@GuardedBy("DagAwareYarnTaskScheduler.this")
void remove(HeldContainer hc) {
remove(hc, hc.getHost());
remove(hc, hc.getRack());
remove(hc, ResourceRequest.ANY);
--numContainers;
}
@GuardedBy("DagAwareYarnTaskScheduler.this")
int getNumContainers() {
return numContainers;
}
private void add(HeldContainer hc, String location) {
Set<HeldContainer> containers = containersByLocation.get(location);
if (containers == null) {
containers = new HashSet<>();
containersByLocation.put(location, containers);
}
containers.add(hc);
}
private void remove(HeldContainer hc, String location) {
Set<HeldContainer> containers = containersByLocation.get(location);
if (containers != null) {
if (containers.remove(hc) && containers.isEmpty()) {
containersByLocation.remove(location);
}
}
}
@GuardedBy("DagAwareYarnTaskScheduler.this")
@Nullable
Set<HeldContainer> getByLocation(String location) {
return containersByLocation.get(location);
}
}
private interface ResourceCalculator {
boolean anyAvailable(Resource rsrc);
void deductFrom(Resource total, Resource toSubtract);
}
/**
* ResourceCalculator for memory-only allocation
*/
private static class MemResourceCalculator implements ResourceCalculator {
@Override
public boolean anyAvailable(Resource rsrc) {
return rsrc.getMemory() > 0;
}
@Override
public void deductFrom(Resource total, Resource toSubtract) {
total.setMemory(total.getMemory() - toSubtract.getMemory());
}
}
/**
* ResourceCalculator for memory and vcore allocation
*/
private static class MemCpuResourceCalculator extends MemResourceCalculator {
@Override
public boolean anyAvailable(Resource rsrc) {
return super.anyAvailable(rsrc) || rsrc.getVirtualCores() > 0;
}
@Override
public void deductFrom(Resource total, Resource toSubtract) {
super.deductFrom(total, toSubtract);
total.setVirtualCores(total.getVirtualCores() - toSubtract.getVirtualCores());
}
}
/**
* Scheduled thread pool executor that logs any errors that escape the worker thread.
* This can be replaced with HadoopThreadPoolExecutor once Tez requires Hadoop 2.8 or later.
*/
static class ReuseContainerExecutor extends ScheduledThreadPoolExecutor {
ReuseContainerExecutor() {
super(1, new ThreadFactoryBuilder().setNameFormat("ReuseContainerExecutor #%d").build());
setRemoveOnCancelPolicy(true);
setExecuteExistingDelayedTasksAfterShutdownPolicy(false);
}
@Override
protected void afterExecute(Runnable r, Throwable t) {
super.afterExecute(r, t);
if (t == null && r instanceof Future<?>) {
try {
((Future<?>) r).get();
} catch (ExecutionException ee) {
LOG.warn("Execution exception when running task in {}", Thread.currentThread().getName());
t = ee.getCause();
} catch (InterruptedException ie) {
LOG.warn("Thread ({}) interrupted: ", Thread.currentThread(), ie);
Thread.currentThread().interrupt();
} catch (Throwable throwable) {
t = throwable;
}
}
if (t != null) {
LOG.warn("Caught exception in thread {}", Thread.currentThread().getName(), t);
}
}
}
}