blob: a5bda0a7f62008cc50385c873bbee81340d00376 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.reef.runtime.common.evaluator.task;
import org.apache.reef.annotations.audience.EvaluatorSide;
import org.apache.reef.annotations.audience.Private;
import org.apache.reef.driver.task.TaskConfigurationOptions;
import org.apache.reef.proto.ReefServiceProtos;
import org.apache.reef.runtime.common.evaluator.HeartBeatManager;
import org.apache.reef.runtime.common.evaluator.task.exceptions.*;
import org.apache.reef.tang.InjectionFuture;
import org.apache.reef.tang.annotations.Parameter;
import org.apache.reef.task.Task;
import org.apache.reef.task.events.CloseEvent;
import org.apache.reef.task.events.DriverMessage;
import org.apache.reef.task.events.SuspendEvent;
import org.apache.reef.util.Optional;
import org.apache.reef.wake.EventHandler;
import javax.inject.Inject;
import javax.xml.bind.DatatypeConverter;
import java.util.logging.Level;
import java.util.logging.Logger;
/**
* The execution environment for a Task.
*/
@Private
@EvaluatorSide
public final class TaskRuntime implements Runnable {
private static final Logger LOG = Logger.getLogger(TaskRuntime.class.getName());
/**
* User supplied Task code.
*/
private final Task task;
private final InjectionFuture<EventHandler<CloseEvent>> fCloseHandler;
private final InjectionFuture<EventHandler<SuspendEvent>> fSuspendHandler;
private final InjectionFuture<EventHandler<DriverMessage>> fMessageHandler;
private final TaskLifeCycleHandlers taskLifeCycleHandlers;
/**
* The memento given by the task configuration.
*/
private final Optional<byte[]> memento;
/**
* Heart beat manager to trigger on heartbeats.
*/
private final HeartBeatManager heartBeatManager;
private final TaskStatus currentStatus;
@Inject
private TaskRuntime(
final HeartBeatManager heartBeatManager,
final Task task,
final TaskStatus currentStatus,
@Parameter(TaskConfigurationOptions.CloseHandler.class)
final InjectionFuture<EventHandler<CloseEvent>> fCloseHandler,
@Parameter(TaskConfigurationOptions.SuspendHandler.class)
final InjectionFuture<EventHandler<SuspendEvent>> fSuspendHandler,
@Parameter(TaskConfigurationOptions.MessageHandler.class)
final InjectionFuture<EventHandler<DriverMessage>> fMessageHandler,
final TaskLifeCycleHandlers taskLifeCycleHandlers) {
this(heartBeatManager, task, currentStatus, fCloseHandler, fSuspendHandler, fMessageHandler, null,
taskLifeCycleHandlers);
}
@Inject
private TaskRuntime(
final HeartBeatManager heartBeatManager,
final Task task,
final TaskStatus currentStatus,
@Parameter(TaskConfigurationOptions.CloseHandler.class)
final InjectionFuture<EventHandler<CloseEvent>> fCloseHandler,
@Parameter(TaskConfigurationOptions.SuspendHandler.class)
final InjectionFuture<EventHandler<SuspendEvent>> fSuspendHandler,
@Parameter(TaskConfigurationOptions.MessageHandler.class)
final InjectionFuture<EventHandler<DriverMessage>> fMessageHandler,
@Parameter(TaskConfigurationOptions.Memento.class) final String memento,
final TaskLifeCycleHandlers taskLifeCycleHandlers) {
this.heartBeatManager = heartBeatManager;
this.task = task;
this.taskLifeCycleHandlers = taskLifeCycleHandlers;
this.memento = null == memento ? Optional.<byte[]>empty() :
Optional.of(DatatypeConverter.parseBase64Binary(memento));
this.fCloseHandler = fCloseHandler;
this.fSuspendHandler = fSuspendHandler;
this.fMessageHandler = fMessageHandler;
this.currentStatus = currentStatus;
}
/**
* This method needs to be called before a Task can be run().
* It informs the Driver that the Task is initializing.
*/
public void initialize() {
this.currentStatus.setInit();
}
/**
* Run the task: Fire TaskStart, call Task.call(), fire TaskStop.
*/
@Override
public void run() {
try {
// Change state and inform the Driver
this.taskLifeCycleHandlers.beforeTaskStart();
LOG.log(Level.FINEST, "Informing registered EventHandler<TaskStart>.");
this.currentStatus.setRunning();
// Call Task.call()
final byte[] result = this.runTask();
// Inform the Driver about it
this.currentStatus.setResult(result);
LOG.log(Level.FINEST, "Informing registered EventHandler<TaskStop>.");
this.taskLifeCycleHandlers.afterTaskExit();
} catch (final TaskStartHandlerFailure taskStartHandlerFailure) {
LOG.log(Level.WARNING, "Caught an exception during TaskStart handler execution.", taskStartHandlerFailure);
this.currentStatus.setException(taskStartHandlerFailure.getCause());
} catch (final TaskStopHandlerFailure taskStopHandlerFailure) {
LOG.log(Level.WARNING, "Caught an exception during TaskStop handler execution.", taskStopHandlerFailure);
this.currentStatus.setException(taskStopHandlerFailure.getCause());
} catch (final TaskCallFailure e) {
LOG.log(Level.WARNING, "Caught an exception during Task.call().", e.getCause());
this.currentStatus.setException(e);
}
}
/**
* Called by heartbeat manager.
*
* @return current TaskStatusProto
*/
public ReefServiceProtos.TaskStatusProto getStatusProto() {
return this.currentStatus.toProto();
}
/**
* @return true, if the Task is no longer running, either because it is crashed or exited cleanly
*/
public boolean hasEnded() {
return this.currentStatus.hasEnded();
}
/**
* @return the ID of the task.
*/
public String getTaskId() {
return this.currentStatus.getTaskId();
}
public String getId() {
return "TASK:" + this.task.getClass().getSimpleName() + ':' + this.currentStatus.getTaskId();
}
/**
* Close the Task. This calls the configured close handler.
*
* @param message the optional message for the close handler or null if there none.
*/
public void close(final byte[] message) {
LOG.log(Level.FINEST, "Triggering Task close.");
synchronized (this.heartBeatManager) {
if (this.currentStatus.isNotRunning()) {
LOG.log(Level.WARNING, "Trying to close a task that is in state: {0}. Ignoring.",
this.currentStatus.getState());
} else {
try {
this.closeTask(message);
this.currentStatus.setCloseRequested();
} catch (final TaskCloseHandlerFailure taskCloseHandlerFailure) {
LOG.log(Level.WARNING, "Exception while executing task close handler.",
taskCloseHandlerFailure.getCause());
this.currentStatus.setException(taskCloseHandlerFailure.getCause());
}
}
}
}
/**
* Suspend the Task. This calls the configured suspend handler.
*
* @param message the optional message for the suspend handler or null if there none.
*/
public void suspend(final byte[] message) {
synchronized (this.heartBeatManager) {
if (this.currentStatus.isNotRunning()) {
LOG.log(Level.WARNING, "Trying to suspend a task that is in state: {0}. Ignoring.",
this.currentStatus.getState());
} else {
try {
this.suspendTask(message);
this.currentStatus.setSuspendRequested();
} catch (final TaskSuspendHandlerFailure taskSuspendHandlerFailure) {
LOG.log(Level.WARNING, "Exception while executing task suspend handler.",
taskSuspendHandlerFailure.getCause());
this.currentStatus.setException(taskSuspendHandlerFailure.getCause());
}
}
}
}
/**
* Deliver a message to the Task. This calls into the user supplied message handler.
*
* @param message the message to be delivered.
*/
public void deliver(final byte[] message) {
synchronized (this.heartBeatManager) {
if (this.currentStatus.isNotRunning()) {
LOG.log(Level.WARNING,
"Trying to send a message to a task that is in state: {0}. Ignoring.",
this.currentStatus.getState());
} else {
try {
this.deliverMessageToTask(message);
} catch (final TaskMessageHandlerFailure taskMessageHandlerFailure) {
LOG.log(Level.WARNING, "Exception while executing task close handler.",
taskMessageHandlerFailure.getCause());
this.currentStatus.setException(taskMessageHandlerFailure.getCause());
}
}
}
}
/**
* @return the ID of the Context this task is executing in.
*/
private String getContextID() {
return this.currentStatus.getContextId();
}
/**
* Calls the Task.call() method and catches exceptions it may throw.
*
* @return the return value of Task.call()
* @throws TaskCallFailure if any Throwable was caught from the Task.call() method.
* That throwable would be the cause of the TaskCallFailure.
*/
@SuppressWarnings("checkstyle:illegalcatch")
private byte[] runTask() throws TaskCallFailure {
try {
final byte[] result;
if (this.memento.isPresent()) {
LOG.log(Level.FINEST, "Calling Task.call() with a memento");
result = this.task.call(this.memento.get());
} else {
LOG.log(Level.FINEST, "Calling Task.call() without a memento");
result = this.task.call(null);
}
LOG.log(Level.FINEST, "Task.call() exited cleanly.");
return result;
} catch (final Throwable throwable) {
throw new TaskCallFailure(throwable);
}
}
/**
* Calls the configured Task close handler and catches exceptions it may throw.
*/
@SuppressWarnings("checkstyle:illegalcatch")
private void closeTask(final byte[] message) throws TaskCloseHandlerFailure {
LOG.log(Level.FINEST, "Invoking close handler.");
try {
this.fCloseHandler.get().onNext(new CloseEventImpl(message));
} catch (final Throwable throwable) {
throw new TaskCloseHandlerFailure(throwable);
}
}
/**
* Calls the configured Task message handler and catches exceptions it may throw.
*/
@SuppressWarnings("checkstyle:illegalcatch")
private void deliverMessageToTask(final byte[] message) throws TaskMessageHandlerFailure {
try {
this.fMessageHandler.get().onNext(new DriverMessageImpl(message));
} catch (final Throwable throwable) {
throw new TaskMessageHandlerFailure(throwable);
}
}
/**
* Calls the configured Task suspend handler and catches exceptions it may throw.
*/
@SuppressWarnings("checkstyle:illegalcatch")
private void suspendTask(final byte[] message) throws TaskSuspendHandlerFailure {
try {
this.fSuspendHandler.get().onNext(new SuspendEventImpl(message));
} catch (final Throwable throwable) {
throw new TaskSuspendHandlerFailure(throwable);
}
}
}