blob: 3d52105e48a395b559a6eb5bec1269b643db6ffa [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.drill.yarn.appMaster;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.drill.yarn.appMaster.Task.Disposition;
import org.apache.drill.yarn.core.DoYUtil;
import org.apache.hadoop.yarn.api.records.Container;
import org.apache.hadoop.yarn.api.records.ContainerStatus;
/**
* Represents the behaviors associated with each state in the lifecycle
* of a task.
* <p>
* Startup process:
* <dl>
* <dt>START --> REQUESTING<dt>
* <dd>New task sends a container request to YARN.</dd>
* <dt>REQUESTING --> LAUNCHING<dt>
* <dd>Container received from YARN, launching the tasks's process.</dd>
* <dt>LAUNCHING --> RUNNING<dt>
* <dd>Task launched and needs no start Ack.</dd>
* <dt>LAUNCHING --> WAIT_START_ACK<dt>
* <dd>Task launched and needs a start Ack.</dd>
* <dt>WAIT_START_ACK --> RUNNING<dt>
* <dd>Start Ack received.</dd>
* </dl>
* <p>
* Shutdown process:
* <dt>RUNNING --> WAIT_END_ACK | END<dt>
* <dd>The resource manager reported task completion.</dd>
* <dt>RUNNING --> ENDING<dt>
* <dd>Request sent to the task for a graceful shutdown.</dd>
* <dt>RUNNING --> KILLING<dt>
* <dd>Request sent to the node manager to forcibly kill the task.</dd>
* <dt>ENDING --> WAIT_END_ACK | END<dt>
* <dd>The task gracefully exited as reported by the resource manager.</dd>
* <dt>ENDING --> KILLING<dt>
* <dd>The wait for graceful exit timed out, a forced kill message
* sent to the node manager.</dd>
* <dt>KILLING --> WAIT_END_ACK | END<dt>
* <dd>The task exited as reported by the resource manager.</dd>
* <dt>END_ACK --> END<dt>
* <dd>The end-ack is received or the wait timed out.</dd>
* <dl>
* <p>
* This is a do-it-yourself enum. Java enums values are instances of a single
* class. In this version, each enum value is the sole instance of a separate
* class, allowing each state to have its own behavior.
*/
public abstract class TaskState {
/**
* Task that is newly created and needs a container allocated. No messages
* have yet been sent to YARN for the task.
*/
private static class StartState extends TaskState {
protected StartState() { super(false, TaskLifecycleListener.Event.CREATED, true); }
@Override
public void requestContainer(EventContext context) {
Task task = context.task;
task.tryCount++;
context.group.dequeuePendingRequest(task);
if (task.cancelled) {
taskStartFailed(context, Disposition.CANCELLED);
} else {
transition(context, REQUESTING);
context.group.enqueueAllocatingTask(task);
task.containerRequest = context.yarn
.requestContainer(task.getContainerSpec());
}
}
/**
* Cancellation is trivial: just drop the task; no need to coordinate
* with YARN.
*/
@Override
public void cancel(EventContext context) {
Task task = context.task;
assert !task.cancelled;
context.group.dequeuePendingRequest(task);
task.cancel();
taskStartFailed(context, Disposition.CANCELLED);
}
}
/**
* Task for which a container request has been sent but not yet received.
*/
private static class RequestingState extends TaskState {
protected RequestingState() {
super(false, TaskLifecycleListener.Event.CREATED, true);
}
/**
* Handle REQUESING --> LAUNCHING. Indicates that we've asked YARN to start
* the task on the allocated container.
*/
@Override
public void containerAllocated(EventContext context, Container container) {
Task task = context.task;
LOG.info(task.getLabel() + " - Received container: "
+ DoYUtil.describeContainer(container));
context.group.dequeueAllocatingTask(task);
// No matter what happens below, we don't want to ask for this
// container again. The RM async API is a bit bizarre in this
// regard: it will keep asking for container over and over until
// we tell it to stop.
context.yarn.removeContainerRequest(task.containerRequest);
// The container is need both in the normal and in the cancellation
// path, so set it here.
task.container = container;
if (task.cancelled) {
context.yarn.releaseContainer(container);
taskStartFailed(context, Disposition.CANCELLED);
return;
}
task.error = null;
task.completionStatus = null;
transition(context, LAUNCHING);
// The pool that manages this task wants to know that we have
// a container. The task manager may want to do some task-
// specific setup.
context.group.containerAllocated(context.task);
context.getTaskManager().allocated(context);
// Go ahead and launch a task in the container using the launch
// specification provided by the task group (pool).
try {
context.yarn.launchContainer(container, task.getLaunchSpec());
task.launchTime = System.currentTimeMillis();
} catch (YarnFacadeException e) {
LOG.error("Container launch failed: " + task.getContainerId(), e);
// This may not be the right response. RM may still think
// we have the container if the above is a local failure.
task.error = e;
context.group.containerReleased(task);
task.container = null;
taskStartFailed(context, Disposition.LAUNCH_FAILED);
}
}
/**
* Cancel the container request. We must wait for the response from YARN to
* do the actual cancellation. For now, just mark the task as cancelled.
*/
@Override
public void cancel(EventContext context) {
Task task = context.task;
context.task.cancel();
LOG.info(task.getLabel() + " - Cancelled at user request");
context.yarn.removeContainerRequest(task.containerRequest);
context.group.dequeueAllocatingTask(task);
task.disposition = Task.Disposition.CANCELLED;
task.completionTime = System.currentTimeMillis();
transition(context, END);
context.group.taskEnded(context.task);
}
/**
* The task is requesting a container. If the request takes too long,
* cancel the request and shrink the target task count. This event
* generally indicates that the user wants to run more tasks than
* the cluster has capacity.
*/
@Override
public void tick(EventContext context, long curTime) {
Task task = context.task;
int timeoutSec = task.scheduler.getRequestTimeoutSec( );
if (timeoutSec == 0) {
return;
}
if (task.stateStartTime + timeoutSec * 1000 > curTime) {
return;
}
LOG.info(task.getLabel() + " - Request timed out after + "
+ timeoutSec + " secs.");
context.yarn.removeContainerRequest(task.containerRequest);
context.group.dequeueAllocatingTask(task);
task.disposition = Task.Disposition.LAUNCH_FAILED;
task.completionTime = System.currentTimeMillis();
transition(context, END);
context.group.taskEnded(context.task);
task.scheduler.requestTimedOut();
}
}
/**
* Task for which a container has been allocated and the task launch request
* sent. Awaiting confirmation that the task is running.
*/
private static class LaunchingState extends TaskState {
protected LaunchingState() {
super(true, TaskLifecycleListener.Event.ALLOCATED, true);
}
/**
* Handle launch failure. Results in a LAUNCHING --> END transition or
* restart.
* <p>
* This situation can occur, when debugging, if a timeout occurs after the
* allocation message, such as when, sitting in the debugger on the
* allocation event.
*/
@Override
public void launchFailed(EventContext context, Throwable t) {
Task task = context.task;
LOG.info(task.getLabel() + " - Container start failed");
context.task.error = t;
launchFailed(context);
}
/**
* Handle LAUNCHING --> RUNNING/START_ACK. Indicates that YARN has confirmed
* that the task is, indeed, running.
*/
@Override
public void containerStarted(EventContext context) {
Task task = context.task;
// If this task is tracked (that is, it is a Drillbit which
// we monitor using ZK) then we have to decide if we've
// seen the task in the tracker yet. If we have, then the
// task is fully running. If we haven't, then we need to
// wait for the start acknowledgement.
if (task.trackingState == Task.TrackingState.NEW) {
transition(context, WAIT_START_ACK);
} else {
transition(context, RUNNING);
}
task.error = null;
// If someone came along and marked the task as cancelled,
// we are now done waiting for YARN so we can immediately
// turn around and kill the task. (Can't kill the task,
// however, until YARN starts it, hence the need to wait
// for YARN to start the task before killing it.)
if (task.cancelled) {
transition(context, KILLING);
context.yarn.killContainer(task.getContainer());
}
}
/**
* Out-of-order start ACK, perhaps due to network latency. Handle by staying
* in this state, but later jump directly<br>
* LAUNCHING --> RUNNING
*/
@Override
public void startAck(EventContext context) {
context.task.trackingState = Task.TrackingState.START_ACK;
}
@Override
public void containerCompleted(EventContext context,
ContainerStatus status) {
// Seen on Mac when putting machine to sleep.
// Handle by failing & retrying.
completed(context, status);
endOrAck(context);
}
@Override
public void cancel(EventContext context) {
context.task.cancel();
context.yarn.killContainer(context.task.getContainer());
}
@Override
public void tick(EventContext context, long curTime) {
// If we are canceling the task, and YARN has not reported container
// completion after some amount of time, just force failure.
Task task = context.task;
if (task.isCancelled()
&& task.cancellationTime + Task.MAX_CANCELLATION_TIME < curTime) {
LOG.error(task.getLabel() + " - Launch timed out after "
+ Task.MAX_CANCELLATION_TIME / 1000 + " secs.");
launchFailed(context);
}
}
private void launchFailed(EventContext context) {
Task task = context.task;
task.completionTime = System.currentTimeMillis();
// Not sure if releasing the container is needed...
context.yarn.releaseContainer(task.container);
context.group.containerReleased(task);
task.container = null;
taskStartFailed(context, Disposition.LAUNCH_FAILED);
}
}
/**
* Task has been launched, is tracked, but we've not yet received a start ack.
*/
private static class WaitStartAckState extends TaskState {
protected WaitStartAckState() {
super(true, TaskLifecycleListener.Event.RUNNING, true);
}
@Override
public void startAck(EventContext context) {
context.task.trackingState = Task.TrackingState.START_ACK;
transition(context, RUNNING);
}
@Override
public void cancel(EventContext context) {
RUNNING.cancel(context);
}
// @Override
// public void containerStopped(EventContext context) {
// transition(context, WAIT_COMPLETE );
// }
@Override
public void containerCompleted(EventContext context,
ContainerStatus status) {
completed(context, status);
taskTerminated(context);
}
// TODO: Timeout in this state.
}
/**
* Task in the normal running state.
*/
private static class RunningState extends TaskState {
protected RunningState() {
super(true, TaskLifecycleListener.Event.RUNNING, true);
}
/**
* Normal task completion. Implements the RUNNING --> END transition.
*
* @param status
*/
@Override
public void containerCompleted(EventContext context,
ContainerStatus status) {
completed(context, status);
endOrAck(context);
}
@Override
public void cancel(EventContext context) {
Task task = context.task;
task.cancel();
if (context.group.requestStop(task)) {
transition(context, ENDING);
} else {
context.yarn.killContainer(task.container);
transition(context, KILLING);
}
}
/**
* The task claims that it is complete, but we think it is running. Assume
* that the task has started its own graceful shutdown (or the
* equivalent).<br>
* RUNNING --> ENDING
*/
@Override
public void completionAck(EventContext context) {
context.task.trackingState = Task.TrackingState.END_ACK;
transition(context, ENDING);
}
}
/**
* Task for which a termination request has been sent to the Drill-bit, but
* confirmation has not yet been received from the Node Manager. (Not yet
* supported in the Drill-bit.
*/
public static class EndingState extends TaskState {
protected EndingState() { super(true, TaskLifecycleListener.Event.RUNNING, false); }
/*
* Normal ENDING --> WAIT_COMPLETE transition, awaiting Resource Manager
* confirmation.
*/
// @Override
// public void containerStopped(EventContext context) {
// transition(context, WAIT_COMPLETE);
// }
/**
* Normal ENDING --> WAIT_END_ACK | END transition.
*
* @param status
*/
@Override
public void containerCompleted(EventContext context,
ContainerStatus status) {
completed(context, status);
endOrAck(context);
}
@Override
public void cancel(EventContext context) {
context.task.cancel();
}
/**
* If the graceful stop process exceeds the maximum timeout, go ahead and
* forcibly kill the process.
*/
@Override
public void tick(EventContext context, long curTime) {
Task task = context.task;
if (curTime - task.stateStartTime > task.taskGroup.getStopTimeoutMs()) {
context.yarn.killContainer(task.container);
transition(context, KILLING);
}
}
@Override
public void completionAck(EventContext context) {
context.task.trackingState = Task.TrackingState.END_ACK;
}
}
/**
* Task for which a forced termination request has been sent to the Node
* Manager, but a stop message has not yet been received.
*/
public static class KillingState extends TaskState {
protected KillingState() { super(true, TaskLifecycleListener.Event.RUNNING, false); }
/*
* Normal KILLING --> WAIT_COMPLETE transition, awaiting Resource Manager
* confirmation.
*/
// @Override
// public void containerStopped(EventContext context) {
// transition(context, WAIT_COMPLETE);
// }
/**
* Normal KILLING --> WAIT_END_ACK | END transition.
*
* @param status
*/
@Override
public void containerCompleted(EventContext context,
ContainerStatus status) {
completed(context, status);
endOrAck(context);
}
@Override
public void cancel(EventContext context) {
context.task.cancel();
}
@Override
public void startAck(EventContext context) {
// Better late than never... Happens during debugging sessions
// when order of messages is scrambled.
context.task.trackingState = Task.TrackingState.START_ACK;
}
@Override
public void completionAck(EventContext context) {
context.task.trackingState = Task.TrackingState.END_ACK;
}
@Override
public void stopTaskFailed(EventContext context, Throwable t) {
assert false;
// What to do?
}
}
/**
* Task exited, but we are waiting for confirmation from Zookeeper that
* the Drillbit registration has been removed. Required to associate
* ZK registrations with Drillbits. Ensures that we don't try to
* start a new Drillbit on a node until the previous Drillbit
* completely shut down, including dropping out of ZK.
*/
private static class WaitEndAckState extends TaskState {
protected WaitEndAckState() {
super(false, TaskLifecycleListener.Event.RUNNING, false);
}
@Override
public void cancel(EventContext context) {
context.task.cancel();
}
@Override
public void completionAck(EventContext context) {
context.task.trackingState = Task.TrackingState.END_ACK;
taskTerminated(context);
}
/**
* Periodically check if the process is still live. We are supposed to
* receive events when the task becomes deregistered. But, we've seen
* cases where the task hangs in this state forever. Try to resolve
* the issue by polling periodically.
*/
@Override
public void tick(EventContext context, long curTime) {
if(! context.getTaskManager().isLive(context)){
taskTerminated(context);
}
}
}
/**
* Task is completed or failed. The disposition field gives the details of the
* completion type. The task is not active on YARN, but could be retried.
*/
private static class EndState extends TaskState {
protected EndState() {
super(false, TaskLifecycleListener.Event.ENDED, false);
}
/*
* Ignore out-of-order Node Manager completion notices.
*/
// @Override
// public void containerStopped(EventContext context) {
// }
@Override
public void cancel(EventContext context) {
}
}
private static final Log LOG = LogFactory.getLog(TaskState.class);
public static final TaskState START = new StartState();
public static final TaskState REQUESTING = new RequestingState();
public static final TaskState LAUNCHING = new LaunchingState();
public static final TaskState WAIT_START_ACK = new WaitStartAckState();
public static final TaskState RUNNING = new RunningState();
public static final TaskState ENDING = new EndingState();
public static final TaskState KILLING = new KillingState();
public static final TaskState WAIT_END_ACK = new WaitEndAckState();
public static final TaskState END = new EndState();
protected final boolean hasContainer;
protected final TaskLifecycleListener.Event lifeCycleEvent;
protected final String label;
protected final boolean cancellable;
public TaskState(boolean hasContainer, TaskLifecycleListener.Event lcEvent,
boolean cancellable) {
this.hasContainer = hasContainer;
lifeCycleEvent = lcEvent;
this.cancellable = cancellable;
String name = toString();
name = name.replace("State", "");
name = name.replaceAll("([a-z]+)([A-Z])", "$1_$2");
label = name.toUpperCase();
}
protected void endOrAck(EventContext context) {
if (context.task.trackingState == Task.TrackingState.START_ACK) {
transition(context, WAIT_END_ACK);
} else {
taskTerminated(context);
}
}
public void requestContainer(EventContext context) {
illegalState(context, "requestContainer");
}
/**
* Resource Manager reports that the task has been allocated a container.
*
* @param context
* @param container
*/
public void containerAllocated(EventContext context, Container container) {
illegalState(context, "containerAllocated");
}
/**
* The launch of the container failed.
*
* @param context
* @param t
*/
public void launchFailed(EventContext context, Throwable t) {
illegalState(context, "launchFailed");
}
/**
* Node Manager reports that the task has started execution.
*
* @param context
*/
public void containerStarted(EventContext context) {
illegalState(context, "containerStarted");
}
/**
* The monitoring plugin has detected that the task has confirmed that it is
* fully started.
*/
public void startAck(EventContext context) {
illegalState(context, "startAck");
}
/**
* The node manager request to stop a task failed.
*
* @param context
* @param t
*/
public void stopTaskFailed(EventContext context, Throwable t) {
illegalState(context, "stopTaskFailed");
}
/**
* The monitoring plugin has detected that the task has confirmed that it has
* started shutdown.
*/
public void completionAck(EventContext context) {
illegalState(context, "completionAck");
}
/**
* Node Manager reports that the task has stopped execution. We don't yet know
* if this was a success or failure.
*
* @param context
*/
public void containerStopped(EventContext context) {
illegalState(context, "containerStopped");
}
/**
* Resource Manager reports that the task has completed execution and provided
* the completion status.
*
* @param context
* @param status
*/
public void containerCompleted(EventContext context, ContainerStatus status) {
completed(context, status);
illegalState(context, "containerCompleted");
}
/**
* Cluster manager wishes to cancel this task.
*
* @param context
*/
public void cancel(EventContext context) {
illegalState(context, "cancel");
}
public void tick(EventContext context, long curTime) {
// Ignore by default
}
/**
* Implement a state transition, alerting any life cycle listeners and
* updating the log file. Marks the start time of the new state in support of
* states that implement a timeout.
*
* @param context
* @param newState
*/
protected void transition(EventContext context, TaskState newState) {
TaskState oldState = context.task.state;
LOG.info(context.task.getLabel() + " " + oldState.toString() + " --> "
+ newState.toString());
context.task.state = newState;
if (newState.lifeCycleEvent != oldState.lifeCycleEvent) {
context.controller.fireLifecycleChange(newState.lifeCycleEvent, context);
}
context.task.stateStartTime = System.currentTimeMillis();
}
/**
* Task failed when starting. No container has been allocated. The task
* will go from:<br>
* * --> END
* <p>
* If the run failed, and the task can be retried, it may
* then move from<br>
* END --> STARTING
* @param context
* @param disposition
*/
protected void taskStartFailed(EventContext context,
Disposition disposition) {
// No container, so don't alert the task manager.
assert context.task.container == null;
context.getTaskManager().completed(context);
taskEnded(context, disposition);
retryTask(context);
}
/**
* A running task terminated. It may have succeeded or failed,
* this method will determine which.
* <p>
* Every task goes from:<br>
* * --> END
* <p>
* If the run failed, and the task can be retried, it may
* then move from<br>
* END --> STARTING
*
* @param context
*/
protected void taskTerminated(EventContext context) {
Task task = context.task;
// Give the task manager a peek at the completed task.
// The task manager can override retry behavior. To
// cancel a task that would otherwise be retried, call
// cancel( ) on the task.
context.getTaskManager().completed(context);
context.group.containerReleased(task);
assert task.completionStatus != null;
if (task.completionStatus.getExitStatus() == 0) {
taskEnded(context, Disposition.COMPLETED);
context.group.taskEnded(context.task);
} else {
taskEnded(context, Disposition.RUN_FAILED);
retryTask(context);
}
}
/**
* Implements the details of marking a task as ended. Note, this method
* does not deregister the task with the scheduler state, we keep it
* registered in case we decide to retry.
*
* @param context
* @param disposition
*/
private void taskEnded(EventContext context, Disposition disposition) {
Task task = context.task;
if (disposition == null) {
assert task.disposition != null;
} else {
task.disposition = disposition;
}
task.completionTime = System.currentTimeMillis();
transition(context, END);
}
/**
* Retry a task. Requires that the task currently be in the END state to provide
* clean state transitions. Will deregister the task if it cannot be retried
* because the cluster is ending or the task has failed too many times.
* Otherwise, starts the whole life cycle over again.
*
* @param context
*/
private void retryTask(EventContext context) {
Task task = context.task;
assert task.state == END;
if (!context.controller.isLive() || !task.retryable()) {
context.group.taskEnded(task);
return;
}
if (task.tryCount > task.taskGroup.getMaxRetries()) {
LOG.error(task.getLabel() + " - Too many retries: " + task.tryCount);
task.disposition = Disposition.TOO_MANY_RETRIES;
context.group.taskEnded(task);
return;
}
LOG.info(task.getLabel() + " - Retrying task, try " + task.tryCount);
context.group.taskRetried(task);
task.reset();
transition(context, START);
context.group.enqueuePendingRequest(task);
}
/**
* An event is called in a state where it is not expected. Log it, ignore it
* and hope it goes away.
*
* @param action
*/
private void illegalState(EventContext context, String action) {
// Intentionally assert: fails during debugging, soldiers on in production.
assert false;
LOG.error(context.task.getLabel() + " - Action " + action
+ " in wrong state: " + toString(),
new IllegalStateException("Action in wrong state"));
}
protected void completed(EventContext context, ContainerStatus status) {
Task task = context.task;
String diag = status.getDiagnostics();
LOG.trace(
task.getLabel() + " Completed, exit status: " + status.getExitStatus()
+ (DoYUtil.isBlank(diag) ? "" : ": " + status.getDiagnostics()));
task.completionStatus = status;
}
@Override
public String toString() { return getClass().getSimpleName(); }
public boolean hasContainer() { return hasContainer; }
public String getLabel() { return label; }
public boolean isCancellable() {
return cancellable;
}
}