blob: 4c85cf31cc487a0d4f353d78cf4dc50e6a8f5c8d [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.drill.yarn.appMaster;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.drill.yarn.core.DoYUtil;
import org.apache.hadoop.yarn.api.records.Container;
import org.apache.hadoop.yarn.api.records.ContainerId;
/**
* Manages a the set of tasks associated with a scheduler. The scheduler decides
* which tasks to run or stop; the task group manages the life-cycle of the
* tasks for the given scheduler.
* <p>
* Schedulers, and hence their groups, define a priority. When starting, higher
* priority (lower priority value) groups run before lower priority groups.
* Similarly, when shrinking the cluster, lower priority groups shrink before
* higher priority groups.
*/
public final class SchedulerStateImpl
implements SchedulerState, SchedulerStateActions {
static final Log LOG = LogFactory.getLog(SchedulerStateImpl.class);
private final Scheduler scheduler;
private final ClusterControllerImpl controller;
/**
* Tracks the tasks to be started, but for which no work has yet been done.
* (State == PENDING).
*/
protected List<Task> pendingTasks = new LinkedList<>();
/**
* Tracks the tasks for which containers have been requested. (State ==
* REQUESTED).
*/
protected List<Task> allocatingTasks = new LinkedList<>();
/**
* Tracks running tasks: those that have been allocated containers and are
* starting, running, failed or ended. We use a map for this because, during
* these states, the task is identified by its container. (State == LAUNCHING,
* RUNNING or ENDING).
*/
protected Map<ContainerId, Task> activeContainers = new HashMap<>();
public SchedulerStateImpl(ClusterControllerImpl controller,
Scheduler scheduler) {
this.controller = controller;
this.scheduler = scheduler;
scheduler.registerState(this);
}
@Override
public String getName() {
return scheduler.getName();
}
public int getMaxRetries() {
return controller.getMaxRetries();
}
public int getStopTimeoutMs() {
return controller.getStopTimeoutMs();
}
@Override
public Scheduler getScheduler() { return scheduler; }
/**
* Define a new task in this group. Adds it to the pending queue so that a
* container will be requested.
*
* @param task
*/
@Override
public void start(Task task) {
assert task.getGroup() == null;
task.setGroup(this);
enqueuePendingRequest(task);
}
/**
* Put a task into the queue waiting to send a container request to YARN.
*
* @param task
*/
public void enqueuePendingRequest(Task task) {
assert !activeContainers.containsValue(task);
assert !allocatingTasks.contains(task);
assert !pendingTasks.contains(task);
pendingTasks.add(task);
// Special initial-state notification
EventContext context = new EventContext(controller, task);
controller.fireLifecycleChange(TaskLifecycleListener.Event.CREATED,
context);
}
public int maxCurrentRequests() {
return this.scheduler.getTaskManager().maxConcurrentAllocs();
}
@Override
public boolean requestContainers(EventContext context, int maxRequests) {
if (pendingTasks.isEmpty()) {
return false;
}
// Limit the maximum number of requests to the limit set by
// the scheduler.
maxRequests = Math.min(maxRequests, maxCurrentRequests());
// Further limit requests to account for in-flight requests.
maxRequests -= allocatingTasks.size( );
// Request containers as long as there are pending tasks remaining.
for (int i = 0; i < maxRequests && !pendingTasks.isEmpty(); i++) {
context.setTask(pendingTasks.get(0));
context.getState().requestContainer(context);
}
return true;
}
/**
* Remove a task from the queue of tasks waiting to send a container request.
* The caller must put the task into the proper next state: the allocating
* queue or the completed task list.
*
* @param task
*/
public void dequeuePendingRequest(Task task) {
assert !activeContainers.containsValue(task);
assert !allocatingTasks.contains(task);
assert pendingTasks.contains(task);
pendingTasks.remove(task);
}
/**
* Put a task onto the queue awaiting an allocation response from YARN.
*
* @param task
*/
public void enqueueAllocatingTask(Task task) {
assert !activeContainers.containsValue(task);
assert !allocatingTasks.contains(task);
assert !pendingTasks.contains(task);
allocatingTasks.add(task);
}
@Override
public void containerAllocated(EventContext context, Container container) {
if (activeContainers.containsKey(container.getId())) {
LOG.error("Container allocated again: " + DoYUtil.labelContainer(container));
return;
}
if (allocatingTasks.isEmpty()) {
// Not sure why this happens. Maybe only in debug mode
// due stopping execution one thread while the RM
// heartbeat keeps sending our request over & over?
// One known case: the user requests a container. While YARN is
// considering the request, the user cancels the task.
LOG.warn("Releasing unwanted container: " + DoYUtil.labelContainer(container) );
context.yarn.releaseContainer(container);
return;
}
context.setTask(allocatingTasks.get(0));
context.getState().containerAllocated(context, container);
}
@Override
public void checkTasks(EventContext context, long curTime) {
// Iterate over tasks using a temporary list. The tick event may cause a timeout
// that turns around and modifies these lists.
List<Task> temp = new ArrayList<>( );
temp.addAll( allocatingTasks );
for (Task task : temp) {
context.setTask(task);
context.getState().tick(context, curTime);
}
temp.clear();
temp.addAll( pendingTasks );
for (Task task : temp) {
context.setTask(task);
context.getState().tick(context, curTime);
}
temp.clear();
temp.addAll( activeContainers.values( ) );
for (Task task : temp) {
context.setTask(task);
context.getState().tick(context, curTime);
}
}
/**
* Remove a task from the list of those waiting for a container allocation.
* The allocation may be done, or cancelled. The caller is responsible for
* moving the task to the next collection.
*
* @param task
*/
public void dequeueAllocatingTask(Task task) {
assert allocatingTasks.contains(task);
allocatingTasks.remove(task);
}
/**
* Mark that a task has become active and should be tracked by its container
* ID. Prior to this, the task is not associated with a container.
*
* @param task
*/
public void containerAllocated(Task task) {
assert !activeContainers.containsValue(task);
assert !allocatingTasks.contains(task);
assert !pendingTasks.contains(task);
activeContainers.put(task.getContainerId(), task);
controller.containerAllocated(task);
}
/**
* Mark that a task has completed: its container has expired or been revoked
* or the task has completed: successfully or a failure, as given by the
* task's disposition. The task can no longer be tracked by its container ID.
* If this is the last active task for this group, mark the group itself as
* completed.
*
* @param task
*/
public void containerReleased(Task task) {
assert activeContainers.containsKey(task.getContainerId());
activeContainers.remove(task.getContainerId());
controller.containerReleased(task);
}
/**
* Mark that a task has completed successfully or a failure, as given by the
* task's disposition. If this is the last active task for this group, mark
* the group itself as completed.
*
* @param task
*/
public void taskEnded(Task task) {
scheduler.completed(task);
controller.taskEnded(task);
if (isDone()) {
controller.taskGroupCompleted(this);
}
LOG.info(task.toString() + " - Task completed" );
}
/**
* Mark that a task is about to be retried. Task still retains its state from
* the current try.
*
* @param task
*/
public void taskRetried(Task task) {
controller.taskRetried(task);
}
@Override
public void shutDown(EventContext context) {
for (Task task : getStartingTasks()) {
context.setTask(task);
context.getState().cancel(context);
}
for (Task task : getActiveTasks()) {
context.setTask(task);
context.getState().cancel(context);
}
}
/**
* Report if this task group has any tasks in the active part of their
* life-cycle: pending, allocating or active.
*
* @return
*/
public boolean hasTasks() {
return getTaskCount() != 0;
}
@Override
public boolean isDone() {
return !hasTasks() && !scheduler.hasMoreTasks();
}
@Override
public void adjustTasks() {
scheduler.adjust();
}
/**
* Request a graceful stop of the task. Delegates to the task manager to do
* the actual work.
*
* @return true if the graceful stop request was sent, false if not, or if
* this task type has no graceful stop
*/
public boolean requestStop(Task task) {
return scheduler.getTaskManager().stop(task);
}
@Override
public int getTaskCount() {
return pendingTasks.size() + allocatingTasks.size()
+ activeContainers.size();
}
@Override
public int getCancelledTaskCount() {
// TODO Crude first cut. This value should be maintained
// as a count.
int count = 0;
for (Task task : pendingTasks) {
if (task.isCancelled()) {
count++;
}
}
for (Task task : allocatingTasks) {
if (task.isCancelled()) {
count++;
}
}
for (Task task : activeContainers.values()) {
if (task.isCancelled()) {
count++;
}
}
return count;
}
@Override
public List<Task> getStartingTasks() {
List<Task> tasks = new ArrayList<>();
for (Task task : pendingTasks) {
if (!task.isCancelled()) {
tasks.add(task);
}
}
for (Task task : allocatingTasks) {
if (!task.isCancelled()) {
tasks.add(task);
}
}
return tasks;
}
@Override
public List<Task> getActiveTasks() {
List<Task> tasks = new ArrayList<>();
for (Task task : activeContainers.values()) {
if (!task.isCancelled()) {
tasks.add(task);
}
}
return tasks;
}
@Override
public void cancel(Task task) {
EventContext context = new EventContext(controller, task);
LOG.info( task.getLabel() + " Task cancelled" );
context.getState().cancel(context);
}
@Override
public int getLiveCount() {
int count = 0;
for (Task task : activeContainers.values()) {
if (task.isLive()) {
count++;
}
}
return count;
}
@Override
public void visitTaskModels(TaskVisitor visitor) {
for (Task task : pendingTasks) {
visitor.visit(task);
}
for (Task task : allocatingTasks) {
visitor.visit(task);
}
for (Task task : activeContainers.values()) {
visitor.visit(task);
}
}
@Override
public Task getTask(int id) {
for (Task task : pendingTasks) {
if (task.getId() == id) {
return task;
}
}
for (Task task : allocatingTasks) {
if (task.getId() == id) {
return task;
}
}
for (Task task : activeContainers.values()) {
if (task.getId() == id) {
return task;
}
}
return null;
}
@Override
public int getRequestCount() {
return allocatingTasks.size();
}
@Override
public ClusterController getController( ) { return controller; }
}