blob: bfa636d49e6f48d227fc7f0c428f40a0309958ba [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.drill.yarn.appMaster;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
/**
* Abstract base class for schedulers that work with persistent
* (long-running) tasks. Such tasks are intended to run until
* explicitly shut down (unlike batch tasks that run until
* some expected completion.)
* <p>
* Provides a target quantity of tasks
* (see {@link #getTarget()}, along with operations to increase,
* decrease or set the target number.
* <p>
* The scheduler acts as a controller: starting new tasks as needed to
* match the desired target, or stopping tasks as needed when the
* target level is reduced.
*/
public abstract class PersistentTaskScheduler extends AbstractScheduler {
private static final Log LOG = LogFactory.getLog(PersistentTaskScheduler.class);
protected int quantity;
public PersistentTaskScheduler(String type, String name, int quantity) {
super(type, name);
this.quantity = quantity;
}
/**
* Set the number of running tasks to the quantity given.
*
* @param level
* the target number of tasks
*/
@Override
public int resize(int level) {
quantity = level;
if (quantity < 0) {
quantity = 0;
}
return quantity;
}
@Override
public int getTarget() { return quantity; }
/**
* Indicate that a task is completed. Normally occurs only
* when shutting down excess tasks.
*
* @param task
*/
@Override
public void completed(Task task) { }
/**
* Progress for persistent tasks defaults to the ratio of
* running tasks to target level. Thus, a persistent cluster
* will normally report 100% progress.
*
* @return The progress of persistent tasks.
*/
@Override
public int[] getProgress() {
int activeCount = state.getTaskCount();
return new int[] { Math.min(activeCount, quantity), quantity };
}
/**
* Adjust the number of running tasks to better match the target
* by starting or stopping tasks as needed.
*/
@Override
public void adjust() {
int activeCount = state.getTaskCount();
int delta = quantity - activeCount;
if (delta > 0) {
addTasks(delta);
} else if (delta < 0) {
cancelTasks(activeCount);
}
}
/**
* Cancel the requested number of tasks. We exclude any tasks that are already
* in the process of being cancelled. Because we ignore those tasks, it might
* be that we want to reduce the task count, but there is nothing left to cancel.
*
* @param cancelCount
*/
private void cancelTasks(int cancelCount) {
int cancelled = state.getCancelledTaskCount();
int cancellable = cancelCount - cancelled;
int n = cancellable - quantity;
LOG.info("[" + getName( ) + "] - Cancelling " + cancelCount +
" tasks. " + cancelled + " are already cancelled, " +
cancellable + " more will be cancelled.");
if (n <= 0) {
return;
}
for (Task task : state.getStartingTasks()) {
state.cancel(task);
if (--n == 0) {
return;
}
}
for (Task task : state.getActiveTasks()) {
state.cancel(task);
if (--n == 0) {
return;
}
}
// If we get here it means something has gotten out of whack.
LOG.error("Tried to cancel " + cancellable + " tasks, but " + n + " could not be cancelled.");
assert false;
}
/**
* The persistent scheduler has no fixed sequence of tasks to run, it launches
* a set and is never "done". For purposes of completion tracking claim we
* have no further tasks.
*
* @return false
*/
@Override
public boolean hasMoreTasks() { return false; }
@Override
public void requestTimedOut() {
// We requested a node a while back, requested a container from YARN,
// but waited too long to receive it. Most likely cause is that we
// want a container on a node that either does not exist, or is too
// heavily loaded. (That is, we have a 3-node cluster and are requesting
// a 4th node. Or, we have 2 nodes but node 3 has insufficient resources.)
// In either case, we're not likely to ever get the container, so just
// reduce the target size to what we an get.
assert quantity > 0;
if (quantity == 0) {
LOG.error("Container timed out, but target quantity is already 0!");
} else {
quantity--;
LOG.info("Container request timed out. Reducing target container count by 1 to " + quantity);
}
}
}