blob: 61564b1bab626270685f268b4f5d2051cbaa75a0 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.reef.runtime.common.driver.evaluator;
import org.apache.reef.annotations.audience.DriverSide;
import org.apache.reef.annotations.audience.Private;
import org.apache.reef.driver.evaluator.FailedEvaluator;
import org.apache.reef.driver.parameters.EvaluatorConfigurationProviders;
import org.apache.reef.driver.restart.DriverRestartManager;
import org.apache.reef.driver.restart.EvaluatorRestartState;
import org.apache.reef.exception.NonSerializableException;
import org.apache.reef.runtime.common.driver.api.*;
import org.apache.reef.runtime.common.driver.evaluator.pojos.ContextStatusPOJO;
import org.apache.reef.runtime.common.driver.evaluator.pojos.EvaluatorStatusPOJO;
import org.apache.reef.runtime.common.driver.evaluator.pojos.State;
import org.apache.reef.runtime.common.driver.evaluator.pojos.TaskStatusPOJO;
import org.apache.reef.tang.ConfigurationProvider;
import org.apache.reef.driver.context.ActiveContext;
import org.apache.reef.driver.context.FailedContext;
import org.apache.reef.driver.evaluator.AllocatedEvaluator;
import org.apache.reef.driver.evaluator.EvaluatorDescriptor;
import org.apache.reef.driver.task.FailedTask;
import org.apache.reef.exception.EvaluatorException;
import org.apache.reef.exception.EvaluatorKilledByResourceManagerException;
import org.apache.reef.io.naming.Identifiable;
import org.apache.reef.proto.EvaluatorRuntimeProtocol;
import org.apache.reef.proto.ReefServiceProtos;
import org.apache.reef.driver.evaluator.EvaluatorProcess;
import org.apache.reef.runtime.common.driver.context.ContextControlHandler;
import org.apache.reef.runtime.common.driver.context.ContextRepresenters;
import org.apache.reef.runtime.common.driver.idle.EventHandlerIdlenessSource;
import org.apache.reef.runtime.common.driver.resourcemanager.ResourceStatusEvent;
import org.apache.reef.runtime.common.driver.task.TaskRepresenter;
import org.apache.reef.runtime.common.utils.ExceptionCodec;
import org.apache.reef.runtime.common.utils.RemoteManager;
import org.apache.reef.tang.annotations.Name;
import org.apache.reef.tang.annotations.NamedParameter;
import org.apache.reef.tang.annotations.Parameter;
import org.apache.reef.tang.formats.ConfigurationSerializer;
import org.apache.reef.util.Optional;
import org.apache.reef.util.logging.LoggingScopeFactory;
import org.apache.reef.wake.EventHandler;
import org.apache.reef.wake.remote.RemoteMessage;
import org.apache.reef.wake.time.Clock;
import org.apache.reef.wake.time.event.Alarm;
import javax.inject.Inject;
import java.io.File;
import java.util.ArrayList;
import java.util.List;
import java.util.Set;
import java.util.logging.Level;
import java.util.logging.Logger;
/**
* Manages a single Evaluator instance including all lifecycle instances:
* (AllocatedEvaluator, CompletedEvaluator, FailedEvaluator).
* <p>
* A (periodic) heartbeat channel is established from EvaluatorRuntime to EvaluatorManager.
* The EvaluatorRuntime will (periodically) send (status) messages to the EvaluatorManager using this
* heartbeat channel.
* <p>
* A (push-based) EventHandler channel is established from EvaluatorManager to EvaluatorRuntime.
* The EvaluatorManager uses this to forward Driver messages, launch Tasks, and initiate
* control information (e.g., shutdown, suspend).
*/
@Private
@DriverSide
public final class EvaluatorManager implements Identifiable, AutoCloseable {
private static final Logger LOG = Logger.getLogger(EvaluatorManager.class.getName());
private final EvaluatorHeartBeatSanityChecker sanityChecker = new EvaluatorHeartBeatSanityChecker();
private final Clock clock;
private final ResourceReleaseHandler resourceReleaseHandler;
private final ResourceLaunchHandler resourceLaunchHandler;
private final String evaluatorId;
private final EvaluatorDescriptorImpl evaluatorDescriptor;
private final ContextRepresenters contextRepresenters;
private final EvaluatorMessageDispatcher messageDispatcher;
private final EvaluatorControlHandler evaluatorControlHandler;
private final ContextControlHandler contextControlHandler;
private final EvaluatorStatusManager stateManager;
private final ExceptionCodec exceptionCodec;
private final EventHandlerIdlenessSource idlenessSource;
private final RemoteManager remoteManager;
private final ConfigurationSerializer configurationSerializer;
private final LoggingScopeFactory loggingScopeFactory;
private final Set<ConfigurationProvider> evaluatorConfigurationProviders;
private final DriverRestartManager driverRestartManager;
private final EvaluatorIdlenessThreadPool idlenessThreadPool;
// Mutable fields
private Optional<TaskRepresenter> task = Optional.empty();
private boolean resourceNotReleased = true;
private boolean allocationNotFired = true;
@Inject
private EvaluatorManager(
@Parameter(EvaluatorIdentifier.class) final String evaluatorId,
@Parameter(EvaluatorDescriptorName.class) final EvaluatorDescriptorImpl evaluatorDescriptor,
@Parameter(EvaluatorConfigurationProviders.class)
final Set<ConfigurationProvider> evaluatorConfigurationProviders,
final Clock clock,
final RemoteManager remoteManager,
final ResourceReleaseHandler resourceReleaseHandler,
final ResourceLaunchHandler resourceLaunchHandler,
final ContextRepresenters contextRepresenters,
final ConfigurationSerializer configurationSerializer,
final EvaluatorMessageDispatcher messageDispatcher,
final EvaluatorControlHandler evaluatorControlHandler,
final ContextControlHandler contextControlHandler,
final EvaluatorStatusManager stateManager,
final ExceptionCodec exceptionCodec,
final EventHandlerIdlenessSource idlenessSource,
final LoggingScopeFactory loggingScopeFactory,
final DriverRestartManager driverRestartManager,
final EvaluatorIdlenessThreadPool idlenessThreadPool) {
LOG.log(Level.FINEST, "Instantiating 'EvaluatorManager' for evaluator: {0}", evaluatorId);
this.evaluatorId = evaluatorId;
this.evaluatorDescriptor = evaluatorDescriptor;
this.evaluatorConfigurationProviders = evaluatorConfigurationProviders;
this.clock = clock;
this.contextRepresenters = contextRepresenters;
this.idlenessSource = idlenessSource;
this.resourceReleaseHandler = resourceReleaseHandler;
this.resourceLaunchHandler = resourceLaunchHandler;
this.messageDispatcher = messageDispatcher;
this.evaluatorControlHandler = evaluatorControlHandler;
this.contextControlHandler = contextControlHandler;
this.stateManager = stateManager;
this.exceptionCodec = exceptionCodec;
this.remoteManager = remoteManager;
this.configurationSerializer = configurationSerializer;
this.loggingScopeFactory = loggingScopeFactory;
this.driverRestartManager = driverRestartManager;
this.idlenessThreadPool = idlenessThreadPool;
LOG.log(Level.FINEST, "Instantiated 'EvaluatorManager' for evaluator: [{0}]", this.getId());
}
/**
* Get the id of current job/application.
*/
public static String getJobIdentifier() {
// TODO[JIRA REEF-818]: currently we obtain the job id directly by parsing execution (container) directory path
// #845 is open to get the id from RM properly
for (File directory = new File(System.getProperty("user.dir"));
directory != null; directory = directory.getParentFile()) {
final String currentDirectoryName = directory.getName();
if (currentDirectoryName.toLowerCase().contains("application_")) {
return currentDirectoryName;
}
}
// cannot find a directory that contains application_, presumably we are on local runtime
// again, this is a hack for now, we need #845 as a proper solution
return "REEF_LOCAL_RUNTIME";
}
/**
* Fires the EvaluatorAllocatedEvent to the handlers. Can only be done once.
*/
public synchronized void fireEvaluatorAllocatedEvent() {
if (this.stateManager.isAllocated() && this.allocationNotFired) {
final AllocatedEvaluator allocatedEvaluator =
new AllocatedEvaluatorImpl(this,
this.remoteManager.getMyIdentifier(),
this.configurationSerializer,
getJobIdentifier(),
this.loggingScopeFactory,
this.evaluatorConfigurationProviders);
LOG.log(Level.FINEST, "Firing AllocatedEvaluator event for Evaluator with ID [{0}]", this.evaluatorId);
this.messageDispatcher.onEvaluatorAllocated(allocatedEvaluator);
this.allocationNotFired = false;
} else {
LOG.log(Level.WARNING, "AllocatedEvaluator event fired more than once.");
}
}
@Override
public String getId() {
return this.evaluatorId;
}
public void setProcess(final EvaluatorProcess process) {
this.evaluatorDescriptor.setProcess(process);
}
public EvaluatorDescriptor getEvaluatorDescriptor() {
return this.evaluatorDescriptor;
}
@Override
public void close() {
LOG.log(Level.FINER, "Close EvaluatorManager {0} - begin", this.evaluatorId);
synchronized (this.evaluatorDescriptor) {
if (this.stateManager.isAvailable()) {
LOG.log(Level.WARNING, "Dirty shutdown of running evaluator id[{0}]", getId());
try {
if (this.stateManager.isRunning()) {
// Killing the evaluator means that it doesn't need to send a confirmation; it just dies.
this.sendEvaluatorControlMessage(
EvaluatorRuntimeProtocol.EvaluatorControlProto.newBuilder()
.setTimestamp(System.currentTimeMillis())
.setIdentifier(getId())
.setKillEvaluator(EvaluatorRuntimeProtocol.KillEvaluatorProto.newBuilder().build())
.build());
this.stateManager.setClosing();
} else {
this.stateManager.setKilled();
}
} catch (Exception e) {
LOG.log(Level.WARNING, "Exception occurred when manager sends killing message to task.", e);
this.stateManager.setKilled();
}
}
if (this.resourceNotReleased) {
this.resourceNotReleased = false;
final ResourceReleaseEvent releaseEvent = ResourceReleaseEventImpl.newBuilder()
.setIdentifier(this.evaluatorId)
.setRuntimeName(this.getEvaluatorDescriptor().getRuntimeName())
.build();
try {
// We need to wait awhile before returning the container to the RM
// in order to give the EvaluatorRuntime (and Launcher) time to cleanly exit.
this.clock.scheduleAlarm(100, new EventHandler<Alarm>() {
@Override
public void onNext(final Alarm alarm) {
resourceReleaseHandler.onNext(releaseEvent);
}
});
} catch (final IllegalStateException e) {
LOG.log(Level.WARNING, "Force resource release because the client closed the clock.", e);
this.resourceReleaseHandler.onNext(releaseEvent);
}
}
}
this.idlenessThreadPool.runCheckAsync(this);
LOG.log(Level.FINER, "Close EvaluatorManager {0} - end", this.evaluatorId);
}
/**
* Close message dispatcher for the evaluator.
*/
public void shutdown() {
LOG.log(Level.FINEST, "Shutdown EvaluatorManager: {0}", this.evaluatorId);
this.messageDispatcher.close();
}
/**
* Return true if the state is DONE, FAILED, or KILLED,
* <em>and</em> there are no messages queued or in processing.
*/
public boolean isClosed() {
return this.messageDispatcher.isEmpty() && this.stateManager.isCompleted();
}
/**
* Return true if the state is CLOSING.
*/
public boolean isClosing() {
return this.stateManager.isClosing();
}
/**
* Return true if the state is DONE, FAILED, KILLED, or CLOSING.
*/
public boolean isClosedOrClosing() {
return isClosed() || isClosing();
}
/**
* Triggers a call to check the idleness of the Evaluator.
*/
void checkIdlenessSource() {
this.idlenessSource.check();
}
/**
* EvaluatorException will trigger is FailedEvaluator and state transition to FAILED.
*
* @param exception on the EvaluatorRuntime
*/
public void onEvaluatorException(final EvaluatorException exception) {
synchronized (this.evaluatorDescriptor) {
if (this.stateManager.isCompleted()) {
LOG.log(Level.FINE,
"Ignoring an exception received for Evaluator {0} which is already in state {1}.",
new Object[] {this.getId(), this.stateManager});
return;
}
LOG.log(Level.WARNING, "Failed evaluator: " + getId(), exception);
try {
final List<FailedContext> failedContextList = this.contextRepresenters.getFailedContextsForEvaluatorFailure();
final Optional<FailedTask> failedTaskOptional;
if (this.task.isPresent()) {
final String taskId = this.task.get().getId();
final Optional<ActiveContext> evaluatorContext = Optional.empty();
final Optional<byte[]> bytes = Optional.empty();
final Optional<Throwable> taskException = Optional.<Throwable>of(new Exception("Evaluator crash"));
final String message = "Evaluator crash";
final Optional<String> description = Optional.empty();
final FailedTask failedTask =
new FailedTask(taskId, message, description, taskException, bytes, evaluatorContext);
failedTaskOptional = Optional.of(failedTask);
} else {
failedTaskOptional = Optional.empty();
}
final FailedEvaluator failedEvaluator = new FailedEvaluatorImpl(
exception, failedContextList, failedTaskOptional, this.evaluatorId);
if (driverRestartManager.getEvaluatorRestartState(evaluatorId).isFailedOrExpired()) {
this.messageDispatcher.onDriverRestartEvaluatorFailed(failedEvaluator);
} else {
this.messageDispatcher.onEvaluatorFailed(failedEvaluator);
}
} catch (final Exception e) {
LOG.log(Level.SEVERE, "Exception while handling FailedEvaluator", e);
} finally {
this.stateManager.setFailed();
this.close();
}
}
}
/**
* Process an evaluator heartbeat message.
*/
public void onEvaluatorHeartbeatMessage(
final RemoteMessage<EvaluatorRuntimeProtocol.EvaluatorHeartbeatProto> evaluatorHeartbeatProtoRemoteMessage) {
final EvaluatorRuntimeProtocol.EvaluatorHeartbeatProto evaluatorHeartbeatProto =
evaluatorHeartbeatProtoRemoteMessage.getMessage();
LOG.log(Level.FINEST, "Evaluator heartbeat: {0}", evaluatorHeartbeatProto);
synchronized (this.evaluatorDescriptor) {
if (this.stateManager.isCompleted()) {
LOG.log(Level.FINE,
"Ignoring a heartbeat received for Evaluator {0} which is already in state {1}.",
new Object[] {this.getId(), this.stateManager});
return;
} else if (this.stateManager.isAvailable()) {
this.sanityChecker.check(this.evaluatorId, evaluatorHeartbeatProto.getTimestamp());
final String evaluatorRID = evaluatorHeartbeatProtoRemoteMessage.getIdentifier().toString();
final EvaluatorRestartState evaluatorRestartState =
this.driverRestartManager.getEvaluatorRestartState(this.evaluatorId);
/*
* First message from a running evaluator. The evaluator can be a new evaluator or be a previous evaluator
* from a separate application attempt. In the case of a previous evaluator, if the restart period has not
* yet expired, we should register it and trigger context active and task events. If the restart period has
* expired, we should return immediately after setting its remote ID in order to close it.
*/
if (this.stateManager.isSubmitted() ||
evaluatorRestartState == EvaluatorRestartState.REPORTED ||
evaluatorRestartState == EvaluatorRestartState.EXPIRED) {
this.evaluatorControlHandler.setRemoteID(evaluatorRID);
if (evaluatorRestartState == EvaluatorRestartState.EXPIRED) {
// Don't do anything if evaluator has expired. Close it immediately upon exit of this method.
return;
}
this.stateManager.setRunning();
LOG.log(Level.FINEST, "Evaluator {0} is running", this.evaluatorId);
if (evaluatorRestartState == EvaluatorRestartState.REPORTED) {
this.driverRestartManager.setEvaluatorReregistered(this.evaluatorId);
}
}
}
// All messages from a heartbeat receive the heartbeat timestamp as a sequence number. See REEF-806.
final long messageSequenceNumber = evaluatorHeartbeatProto.getTimestamp();
// Process the Evaluator status message
if (evaluatorHeartbeatProto.hasEvaluatorStatus()) {
this.onEvaluatorStatusMessage(new EvaluatorStatusPOJO(evaluatorHeartbeatProto.getEvaluatorStatus()));
}
// Process the Context status message(s)
final boolean informClientOfNewContexts = !evaluatorHeartbeatProto.hasTaskStatus();
final List<ContextStatusPOJO> contextStatusList = new ArrayList<>();
for (ReefServiceProtos.ContextStatusProto proto : evaluatorHeartbeatProto.getContextStatusList()) {
contextStatusList.add(new ContextStatusPOJO(proto, messageSequenceNumber));
}
this.contextRepresenters.onContextStatusMessages(contextStatusList, informClientOfNewContexts);
// Process the Task status message
if (evaluatorHeartbeatProto.hasTaskStatus()) {
this.onTaskStatusMessage(new TaskStatusPOJO(evaluatorHeartbeatProto.getTaskStatus(), messageSequenceNumber));
}
LOG.log(Level.FINE, "DONE with evaluator heartbeat from Evaluator {0}", this.getId());
}
}
/**
* Process a evaluator status message.
*
* @param message
*/
private synchronized void onEvaluatorStatusMessage(final EvaluatorStatusPOJO message) {
switch (message.getState()) {
case DONE:
this.onEvaluatorDone(message);
break;
case FAILED:
this.onEvaluatorFailed(message);
break;
case KILLED:
this.onEvaluatorKilled(message);
break;
case INIT:
case RUNNING:
case SUSPEND:
break;
default:
throw new RuntimeException("Unknown state: " + message.getState());
}
}
/**
* Process an evaluator message that indicates that the evaluator shut down cleanly.
*
* @param message
*/
private synchronized void onEvaluatorDone(final EvaluatorStatusPOJO message) {
assert message.getState() == State.DONE;
LOG.log(Level.FINEST, "Evaluator {0} done.", getId());
// Send an ACK to the Evaluator.
sendEvaluatorControlMessage(
EvaluatorRuntimeProtocol.EvaluatorControlProto.newBuilder()
.setTimestamp(System.currentTimeMillis())
.setIdentifier(getId())
.setDoneEvaluator(EvaluatorRuntimeProtocol.DoneEvaluatorProto.newBuilder().build())
.build());
this.stateManager.setDone();
this.messageDispatcher.onEvaluatorCompleted(new CompletedEvaluatorImpl(this.evaluatorId));
this.close();
}
/**
* Process an evaluator message that indicates a crash.
*
* @param evaluatorStatus
*/
private synchronized void onEvaluatorFailed(final EvaluatorStatusPOJO evaluatorStatus) {
assert evaluatorStatus.getState() == State.FAILED;
final EvaluatorException evaluatorException;
if (evaluatorStatus.hasError()) {
final Optional<Throwable> exception =
this.exceptionCodec.fromBytes(evaluatorStatus.getError());
evaluatorException = new EvaluatorException(getId(), exception.isPresent() ? exception.get() :
new NonSerializableException("Exception sent, but can't be deserialized", evaluatorStatus.getError()));
} else {
evaluatorException = new EvaluatorException(getId(), new Exception("No exception sent"));
}
this.onEvaluatorException(evaluatorException);
}
/**
* Process an evaluator message that indicates that the evaluator completed the unclean shut down request.
*
* @param message
*/
private synchronized void onEvaluatorKilled(final EvaluatorStatusPOJO message) {
assert message.getState() == State.KILLED;
assert this.stateManager.isClosing();
LOG.log(Level.WARNING, "Evaluator {0} killed completely.", getId());
this.stateManager.setKilled();
}
public void onResourceLaunch(final ResourceLaunchEvent resourceLaunchEvent) {
synchronized (this.evaluatorDescriptor) {
if (this.stateManager.isAllocated()) {
this.stateManager.setSubmitted();
this.resourceLaunchHandler.onNext(resourceLaunchEvent);
} else if (this.stateManager.isCompletedAbnormally()) {
LOG.log(Level.WARNING, "Evaluator manager expected {0} state but instead is in state {1}",
new Object[] {EvaluatorState.ALLOCATED, this.stateManager});
} else {
throw new RuntimeException("Evaluator manager expected " + EvaluatorState.ALLOCATED +
" state but instead is in state " + this.stateManager);
}
}
}
/**
* Packages the ContextControlProto in an EvaluatorControlProto and forward it to the EvaluatorRuntime.
*
* @param contextControlProto message contains context control info.
*/
public void sendContextControlMessage(final EvaluatorRuntimeProtocol.ContextControlProto contextControlProto) {
synchronized (this.evaluatorDescriptor) {
LOG.log(Level.FINEST, "Context control message to {0}", this.evaluatorId);
this.contextControlHandler.send(contextControlProto);
}
}
/**
* Forward the EvaluatorControlProto to the EvaluatorRuntime.
*
* @param evaluatorControlProto message contains evaluator control information.
*/
void sendEvaluatorControlMessage(final EvaluatorRuntimeProtocol.EvaluatorControlProto evaluatorControlProto) {
synchronized (this.evaluatorDescriptor) {
this.evaluatorControlHandler.send(evaluatorControlProto);
}
}
/**
* Handle task status messages.
*
* @param taskStatus message contains the current task status.
*/
private void onTaskStatusMessage(final TaskStatusPOJO taskStatus) {
if (!(this.task.isPresent() && this.task.get().getId().equals(taskStatus.getTaskId()))) {
final State state = taskStatus.getState();
if (state.isRestartable() ||
this.driverRestartManager.getEvaluatorRestartState(this.evaluatorId).isReregistered()) {
// [REEF-308] exposes a bug where the .NET evaluator does not send its states in the right order
// [REEF-289] is a related item which may fix the issue
if (state.isRunning()) {
LOG.log(Level.WARNING,
"Received a message of state {0} for Task {1} before receiving its {2} state",
new Object[] {State.RUNNING, taskStatus.getTaskId(), State.INIT});
}
// FAILED is a legal first state of a Task as it could have failed during construction.
this.task = Optional.of(
new TaskRepresenter(taskStatus.getTaskId(),
this.contextRepresenters.getContext(taskStatus.getContextId()),
this.messageDispatcher,
this,
this.exceptionCodec,
this.driverRestartManager));
} else {
throw new RuntimeException("Received a message of state " + state +
", not INIT, RUNNING, or FAILED for Task " + taskStatus.getTaskId() +
" which we haven't heard from before.");
}
}
this.task.get().onTaskStatusMessage(taskStatus);
if (this.task.get().isNotRunning()) {
LOG.log(Level.FINEST, "Task no longer running. De-registering it.");
this.task = Optional.empty();
}
}
/**
* Resource status information from the (actual) resource manager.
*/
public void onResourceStatusMessage(final ResourceStatusEvent resourceStatusEvent) {
synchronized (this.evaluatorDescriptor) {
final State state = resourceStatusEvent.getState();
LOG.log(Level.FINEST, "Resource manager state update: {0}", state);
if (!this.stateManager.isAvailable()) {
LOG.log(Level.FINE,
"Ignoring resource status update for Evaluator {0} which is already in state {1}.",
new Object[] {this.getId(), this.stateManager});
} else if (state.isCompleted() && this.stateManager.isAvailable()) {
// Something is wrong. The resource manager reports that the Evaluator is done or failed,
// but the Driver assumes it to be alive.
final StringBuilder messageBuilder = new StringBuilder("Evaluator [")
.append(this.evaluatorId)
.append("] is assumed to be in state [")
.append(this.stateManager.toString())
.append("]. But the resource manager reports it to be in state [")
.append(state)
.append("].");
if (this.stateManager.isSubmitted()) {
messageBuilder
.append(" This most likely means that the Evaluator suffered a failure before establishing " +
"a communications link to the driver.");
} else if (this.stateManager.isAllocated()) {
messageBuilder.append(" This most likely means that the Evaluator suffered a failure before being used.");
} else if (this.stateManager.isRunning()) {
messageBuilder.append(" This means that the Evaluator failed but wasn't able to send an error message " +
"back to the driver.");
}
if (this.task.isPresent()) {
messageBuilder.append(" Task [")
.append(this.task.get().getId())
.append("] was running when the Evaluator crashed.");
}
if (resourceStatusEvent.getState() == State.KILLED) {
this.onEvaluatorException(
new EvaluatorKilledByResourceManagerException(this.evaluatorId, messageBuilder.toString()));
} else {
this.onEvaluatorException(new EvaluatorException(this.evaluatorId, messageBuilder.toString()));
}
}
}
}
@Override
public String toString() {
return "EvaluatorManager:"
+ " id=" + this.evaluatorId
+ " state=" + this.stateManager
+ " task=" + this.task;
}
// Dynamic Parameters
/**
* The Evaluator Identifier.
*/
@NamedParameter(doc = "The Evaluator Identifier.")
public static final class EvaluatorIdentifier implements Name<String> {
}
/**
* The Evaluator Host.
*/
@NamedParameter(doc = "The Evaluator Host.")
public static final class EvaluatorDescriptorName implements Name<EvaluatorDescriptorImpl> {
}
}