| /* |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, |
| * software distributed under the License is distributed on an |
| * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| * KIND, either express or implied. See the License for the |
| * specific language governing permissions and limitations |
| * under the License. |
| */ |
| package org.apache.reef.runtime.common.driver.evaluator; |
| |
| import org.apache.reef.annotations.audience.DriverSide; |
| import org.apache.reef.annotations.audience.Private; |
| import org.apache.reef.driver.evaluator.FailedEvaluator; |
| import org.apache.reef.driver.parameters.EvaluatorConfigurationProviders; |
| import org.apache.reef.driver.restart.DriverRestartManager; |
| import org.apache.reef.driver.restart.EvaluatorRestartState; |
| import org.apache.reef.exception.NonSerializableException; |
| import org.apache.reef.runtime.common.driver.api.*; |
| import org.apache.reef.runtime.common.driver.evaluator.pojos.ContextStatusPOJO; |
| import org.apache.reef.runtime.common.driver.evaluator.pojos.EvaluatorStatusPOJO; |
| import org.apache.reef.runtime.common.driver.evaluator.pojos.State; |
| import org.apache.reef.runtime.common.driver.evaluator.pojos.TaskStatusPOJO; |
| import org.apache.reef.tang.ConfigurationProvider; |
| import org.apache.reef.driver.context.ActiveContext; |
| import org.apache.reef.driver.context.FailedContext; |
| import org.apache.reef.driver.evaluator.AllocatedEvaluator; |
| import org.apache.reef.driver.evaluator.EvaluatorDescriptor; |
| import org.apache.reef.driver.task.FailedTask; |
| import org.apache.reef.exception.EvaluatorException; |
| import org.apache.reef.exception.EvaluatorKilledByResourceManagerException; |
| import org.apache.reef.io.naming.Identifiable; |
| import org.apache.reef.proto.EvaluatorRuntimeProtocol; |
| import org.apache.reef.proto.ReefServiceProtos; |
| import org.apache.reef.driver.evaluator.EvaluatorProcess; |
| import org.apache.reef.runtime.common.driver.context.ContextControlHandler; |
| import org.apache.reef.runtime.common.driver.context.ContextRepresenters; |
| import org.apache.reef.runtime.common.driver.idle.EventHandlerIdlenessSource; |
| import org.apache.reef.runtime.common.driver.resourcemanager.ResourceStatusEvent; |
| import org.apache.reef.runtime.common.driver.task.TaskRepresenter; |
| import org.apache.reef.runtime.common.utils.ExceptionCodec; |
| import org.apache.reef.runtime.common.utils.RemoteManager; |
| import org.apache.reef.tang.annotations.Name; |
| import org.apache.reef.tang.annotations.NamedParameter; |
| import org.apache.reef.tang.annotations.Parameter; |
| import org.apache.reef.tang.formats.ConfigurationSerializer; |
| import org.apache.reef.util.Optional; |
| import org.apache.reef.util.logging.LoggingScopeFactory; |
| import org.apache.reef.wake.EventHandler; |
| import org.apache.reef.wake.remote.RemoteMessage; |
| import org.apache.reef.wake.time.Clock; |
| import org.apache.reef.wake.time.event.Alarm; |
| |
| import javax.inject.Inject; |
| import java.io.File; |
| import java.util.ArrayList; |
| import java.util.List; |
| import java.util.Set; |
| import java.util.logging.Level; |
| import java.util.logging.Logger; |
| |
| /** |
| * Manages a single Evaluator instance including all lifecycle instances: |
| * (AllocatedEvaluator, CompletedEvaluator, FailedEvaluator). |
| * <p> |
| * A (periodic) heartbeat channel is established from EvaluatorRuntime to EvaluatorManager. |
| * The EvaluatorRuntime will (periodically) send (status) messages to the EvaluatorManager using this |
| * heartbeat channel. |
| * <p> |
| * A (push-based) EventHandler channel is established from EvaluatorManager to EvaluatorRuntime. |
| * The EvaluatorManager uses this to forward Driver messages, launch Tasks, and initiate |
| * control information (e.g., shutdown, suspend). |
| */ |
| @Private |
| @DriverSide |
| public final class EvaluatorManager implements Identifiable, AutoCloseable { |
| |
| private static final Logger LOG = Logger.getLogger(EvaluatorManager.class.getName()); |
| |
| private final EvaluatorHeartBeatSanityChecker sanityChecker = new EvaluatorHeartBeatSanityChecker(); |
| private final Clock clock; |
| private final ResourceReleaseHandler resourceReleaseHandler; |
| private final ResourceLaunchHandler resourceLaunchHandler; |
| private final String evaluatorId; |
| private final EvaluatorDescriptorImpl evaluatorDescriptor; |
| private final ContextRepresenters contextRepresenters; |
| private final EvaluatorMessageDispatcher messageDispatcher; |
| private final EvaluatorControlHandler evaluatorControlHandler; |
| private final ContextControlHandler contextControlHandler; |
| private final EvaluatorStatusManager stateManager; |
| private final ExceptionCodec exceptionCodec; |
| private final EventHandlerIdlenessSource idlenessSource; |
| private final RemoteManager remoteManager; |
| private final ConfigurationSerializer configurationSerializer; |
| private final LoggingScopeFactory loggingScopeFactory; |
| private final Set<ConfigurationProvider> evaluatorConfigurationProviders; |
| private final DriverRestartManager driverRestartManager; |
| private final EvaluatorIdlenessThreadPool idlenessThreadPool; |
| |
| // Mutable fields |
| private Optional<TaskRepresenter> task = Optional.empty(); |
| private boolean resourceNotReleased = true; |
| private boolean allocationNotFired = true; |
| |
| @Inject |
| private EvaluatorManager( |
| @Parameter(EvaluatorIdentifier.class) final String evaluatorId, |
| @Parameter(EvaluatorDescriptorName.class) final EvaluatorDescriptorImpl evaluatorDescriptor, |
| @Parameter(EvaluatorConfigurationProviders.class) |
| final Set<ConfigurationProvider> evaluatorConfigurationProviders, |
| final Clock clock, |
| final RemoteManager remoteManager, |
| final ResourceReleaseHandler resourceReleaseHandler, |
| final ResourceLaunchHandler resourceLaunchHandler, |
| final ContextRepresenters contextRepresenters, |
| final ConfigurationSerializer configurationSerializer, |
| final EvaluatorMessageDispatcher messageDispatcher, |
| final EvaluatorControlHandler evaluatorControlHandler, |
| final ContextControlHandler contextControlHandler, |
| final EvaluatorStatusManager stateManager, |
| final ExceptionCodec exceptionCodec, |
| final EventHandlerIdlenessSource idlenessSource, |
| final LoggingScopeFactory loggingScopeFactory, |
| final DriverRestartManager driverRestartManager, |
| final EvaluatorIdlenessThreadPool idlenessThreadPool) { |
| |
| LOG.log(Level.FINEST, "Instantiating 'EvaluatorManager' for evaluator: {0}", evaluatorId); |
| |
| this.evaluatorId = evaluatorId; |
| this.evaluatorDescriptor = evaluatorDescriptor; |
| this.evaluatorConfigurationProviders = evaluatorConfigurationProviders; |
| |
| this.clock = clock; |
| this.contextRepresenters = contextRepresenters; |
| this.idlenessSource = idlenessSource; |
| this.resourceReleaseHandler = resourceReleaseHandler; |
| this.resourceLaunchHandler = resourceLaunchHandler; |
| |
| this.messageDispatcher = messageDispatcher; |
| this.evaluatorControlHandler = evaluatorControlHandler; |
| this.contextControlHandler = contextControlHandler; |
| this.stateManager = stateManager; |
| this.exceptionCodec = exceptionCodec; |
| |
| this.remoteManager = remoteManager; |
| this.configurationSerializer = configurationSerializer; |
| this.loggingScopeFactory = loggingScopeFactory; |
| this.driverRestartManager = driverRestartManager; |
| this.idlenessThreadPool = idlenessThreadPool; |
| |
| LOG.log(Level.FINEST, "Instantiated 'EvaluatorManager' for evaluator: [{0}]", this.getId()); |
| } |
| |
| /** |
| * Get the id of current job/application. |
| */ |
| public static String getJobIdentifier() { |
| // TODO[JIRA REEF-818]: currently we obtain the job id directly by parsing execution (container) directory path |
| // #845 is open to get the id from RM properly |
| for (File directory = new File(System.getProperty("user.dir")); |
| directory != null; directory = directory.getParentFile()) { |
| final String currentDirectoryName = directory.getName(); |
| if (currentDirectoryName.toLowerCase().contains("application_")) { |
| return currentDirectoryName; |
| } |
| } |
| // cannot find a directory that contains application_, presumably we are on local runtime |
| // again, this is a hack for now, we need #845 as a proper solution |
| return "REEF_LOCAL_RUNTIME"; |
| } |
| |
| /** |
| * Fires the EvaluatorAllocatedEvent to the handlers. Can only be done once. |
| */ |
| public synchronized void fireEvaluatorAllocatedEvent() { |
| |
| if (this.stateManager.isAllocated() && this.allocationNotFired) { |
| |
| final AllocatedEvaluator allocatedEvaluator = |
| new AllocatedEvaluatorImpl(this, |
| this.remoteManager.getMyIdentifier(), |
| this.configurationSerializer, |
| getJobIdentifier(), |
| this.loggingScopeFactory, |
| this.evaluatorConfigurationProviders); |
| |
| LOG.log(Level.FINEST, "Firing AllocatedEvaluator event for Evaluator with ID [{0}]", this.evaluatorId); |
| |
| this.messageDispatcher.onEvaluatorAllocated(allocatedEvaluator); |
| this.allocationNotFired = false; |
| |
| } else { |
| LOG.log(Level.WARNING, "AllocatedEvaluator event fired more than once."); |
| } |
| } |
| |
| @Override |
| public String getId() { |
| return this.evaluatorId; |
| } |
| |
| public void setProcess(final EvaluatorProcess process) { |
| this.evaluatorDescriptor.setProcess(process); |
| } |
| |
| public EvaluatorDescriptor getEvaluatorDescriptor() { |
| return this.evaluatorDescriptor; |
| } |
| |
| @Override |
| public void close() { |
| |
| LOG.log(Level.FINER, "Close EvaluatorManager {0} - begin", this.evaluatorId); |
| |
| synchronized (this.evaluatorDescriptor) { |
| |
| if (this.stateManager.isAvailable()) { |
| |
| LOG.log(Level.WARNING, "Dirty shutdown of running evaluator id[{0}]", getId()); |
| |
| try { |
| |
| if (this.stateManager.isRunning()) { |
| |
| // Killing the evaluator means that it doesn't need to send a confirmation; it just dies. |
| this.sendEvaluatorControlMessage( |
| EvaluatorRuntimeProtocol.EvaluatorControlProto.newBuilder() |
| .setTimestamp(System.currentTimeMillis()) |
| .setIdentifier(getId()) |
| .setKillEvaluator(EvaluatorRuntimeProtocol.KillEvaluatorProto.newBuilder().build()) |
| .build()); |
| |
| this.stateManager.setClosing(); |
| |
| } else { |
| this.stateManager.setKilled(); |
| } |
| |
| } catch (Exception e) { |
| LOG.log(Level.WARNING, "Exception occurred when manager sends killing message to task.", e); |
| this.stateManager.setKilled(); |
| } |
| } |
| |
| if (this.resourceNotReleased) { |
| |
| this.resourceNotReleased = false; |
| |
| final ResourceReleaseEvent releaseEvent = ResourceReleaseEventImpl.newBuilder() |
| .setIdentifier(this.evaluatorId) |
| .setRuntimeName(this.getEvaluatorDescriptor().getRuntimeName()) |
| .build(); |
| |
| try { |
| // We need to wait awhile before returning the container to the RM |
| // in order to give the EvaluatorRuntime (and Launcher) time to cleanly exit. |
| this.clock.scheduleAlarm(100, new EventHandler<Alarm>() { |
| @Override |
| public void onNext(final Alarm alarm) { |
| resourceReleaseHandler.onNext(releaseEvent); |
| } |
| }); |
| } catch (final IllegalStateException e) { |
| LOG.log(Level.WARNING, "Force resource release because the client closed the clock.", e); |
| this.resourceReleaseHandler.onNext(releaseEvent); |
| } |
| } |
| } |
| |
| this.idlenessThreadPool.runCheckAsync(this); |
| |
| LOG.log(Level.FINER, "Close EvaluatorManager {0} - end", this.evaluatorId); |
| } |
| |
| /** |
| * Close message dispatcher for the evaluator. |
| */ |
| public void shutdown() { |
| LOG.log(Level.FINEST, "Shutdown EvaluatorManager: {0}", this.evaluatorId); |
| this.messageDispatcher.close(); |
| } |
| |
| /** |
| * Return true if the state is DONE, FAILED, or KILLED, |
| * <em>and</em> there are no messages queued or in processing. |
| */ |
| public boolean isClosed() { |
| return this.messageDispatcher.isEmpty() && this.stateManager.isCompleted(); |
| } |
| |
| /** |
| * Return true if the state is CLOSING. |
| */ |
| public boolean isClosing() { |
| return this.stateManager.isClosing(); |
| } |
| |
| /** |
| * Return true if the state is DONE, FAILED, KILLED, or CLOSING. |
| */ |
| public boolean isClosedOrClosing() { |
| return isClosed() || isClosing(); |
| } |
| |
| /** |
| * Triggers a call to check the idleness of the Evaluator. |
| */ |
| void checkIdlenessSource() { |
| this.idlenessSource.check(); |
| } |
| |
| /** |
| * EvaluatorException will trigger is FailedEvaluator and state transition to FAILED. |
| * |
| * @param exception on the EvaluatorRuntime |
| */ |
| public void onEvaluatorException(final EvaluatorException exception) { |
| synchronized (this.evaluatorDescriptor) { |
| |
| if (this.stateManager.isCompleted()) { |
| LOG.log(Level.FINE, |
| "Ignoring an exception received for Evaluator {0} which is already in state {1}.", |
| new Object[] {this.getId(), this.stateManager}); |
| return; |
| } |
| |
| LOG.log(Level.WARNING, "Failed evaluator: " + getId(), exception); |
| |
| try { |
| |
| final List<FailedContext> failedContextList = this.contextRepresenters.getFailedContextsForEvaluatorFailure(); |
| |
| final Optional<FailedTask> failedTaskOptional; |
| if (this.task.isPresent()) { |
| |
| final String taskId = this.task.get().getId(); |
| final Optional<ActiveContext> evaluatorContext = Optional.empty(); |
| final Optional<byte[]> bytes = Optional.empty(); |
| final Optional<Throwable> taskException = Optional.<Throwable>of(new Exception("Evaluator crash")); |
| final String message = "Evaluator crash"; |
| final Optional<String> description = Optional.empty(); |
| final FailedTask failedTask = |
| new FailedTask(taskId, message, description, taskException, bytes, evaluatorContext); |
| |
| failedTaskOptional = Optional.of(failedTask); |
| |
| } else { |
| failedTaskOptional = Optional.empty(); |
| } |
| |
| final FailedEvaluator failedEvaluator = new FailedEvaluatorImpl( |
| exception, failedContextList, failedTaskOptional, this.evaluatorId); |
| |
| if (driverRestartManager.getEvaluatorRestartState(evaluatorId).isFailedOrExpired()) { |
| this.messageDispatcher.onDriverRestartEvaluatorFailed(failedEvaluator); |
| } else { |
| this.messageDispatcher.onEvaluatorFailed(failedEvaluator); |
| } |
| } catch (final Exception e) { |
| LOG.log(Level.SEVERE, "Exception while handling FailedEvaluator", e); |
| } finally { |
| this.stateManager.setFailed(); |
| this.close(); |
| } |
| } |
| } |
| |
| /** |
| * Process an evaluator heartbeat message. |
| */ |
| public void onEvaluatorHeartbeatMessage( |
| final RemoteMessage<EvaluatorRuntimeProtocol.EvaluatorHeartbeatProto> evaluatorHeartbeatProtoRemoteMessage) { |
| |
| final EvaluatorRuntimeProtocol.EvaluatorHeartbeatProto evaluatorHeartbeatProto = |
| evaluatorHeartbeatProtoRemoteMessage.getMessage(); |
| |
| LOG.log(Level.FINEST, "Evaluator heartbeat: {0}", evaluatorHeartbeatProto); |
| |
| synchronized (this.evaluatorDescriptor) { |
| |
| if (this.stateManager.isCompleted()) { |
| |
| LOG.log(Level.FINE, |
| "Ignoring a heartbeat received for Evaluator {0} which is already in state {1}.", |
| new Object[] {this.getId(), this.stateManager}); |
| |
| return; |
| |
| } else if (this.stateManager.isAvailable()) { |
| |
| this.sanityChecker.check(this.evaluatorId, evaluatorHeartbeatProto.getTimestamp()); |
| final String evaluatorRID = evaluatorHeartbeatProtoRemoteMessage.getIdentifier().toString(); |
| |
| final EvaluatorRestartState evaluatorRestartState = |
| this.driverRestartManager.getEvaluatorRestartState(this.evaluatorId); |
| |
| /* |
| * First message from a running evaluator. The evaluator can be a new evaluator or be a previous evaluator |
| * from a separate application attempt. In the case of a previous evaluator, if the restart period has not |
| * yet expired, we should register it and trigger context active and task events. If the restart period has |
| * expired, we should return immediately after setting its remote ID in order to close it. |
| */ |
| if (this.stateManager.isSubmitted() || |
| evaluatorRestartState == EvaluatorRestartState.REPORTED || |
| evaluatorRestartState == EvaluatorRestartState.EXPIRED) { |
| |
| this.evaluatorControlHandler.setRemoteID(evaluatorRID); |
| |
| if (evaluatorRestartState == EvaluatorRestartState.EXPIRED) { |
| // Don't do anything if evaluator has expired. Close it immediately upon exit of this method. |
| return; |
| } |
| |
| this.stateManager.setRunning(); |
| LOG.log(Level.FINEST, "Evaluator {0} is running", this.evaluatorId); |
| |
| if (evaluatorRestartState == EvaluatorRestartState.REPORTED) { |
| this.driverRestartManager.setEvaluatorReregistered(this.evaluatorId); |
| } |
| } |
| } |
| |
| // All messages from a heartbeat receive the heartbeat timestamp as a sequence number. See REEF-806. |
| final long messageSequenceNumber = evaluatorHeartbeatProto.getTimestamp(); |
| |
| // Process the Evaluator status message |
| if (evaluatorHeartbeatProto.hasEvaluatorStatus()) { |
| this.onEvaluatorStatusMessage(new EvaluatorStatusPOJO(evaluatorHeartbeatProto.getEvaluatorStatus())); |
| } |
| |
| // Process the Context status message(s) |
| final boolean informClientOfNewContexts = !evaluatorHeartbeatProto.hasTaskStatus(); |
| final List<ContextStatusPOJO> contextStatusList = new ArrayList<>(); |
| for (ReefServiceProtos.ContextStatusProto proto : evaluatorHeartbeatProto.getContextStatusList()) { |
| contextStatusList.add(new ContextStatusPOJO(proto, messageSequenceNumber)); |
| } |
| |
| this.contextRepresenters.onContextStatusMessages(contextStatusList, informClientOfNewContexts); |
| |
| // Process the Task status message |
| if (evaluatorHeartbeatProto.hasTaskStatus()) { |
| this.onTaskStatusMessage(new TaskStatusPOJO(evaluatorHeartbeatProto.getTaskStatus(), messageSequenceNumber)); |
| } |
| |
| LOG.log(Level.FINE, "DONE with evaluator heartbeat from Evaluator {0}", this.getId()); |
| } |
| } |
| |
| /** |
| * Process a evaluator status message. |
| * |
| * @param message |
| */ |
| private synchronized void onEvaluatorStatusMessage(final EvaluatorStatusPOJO message) { |
| |
| switch (message.getState()) { |
| case DONE: |
| this.onEvaluatorDone(message); |
| break; |
| case FAILED: |
| this.onEvaluatorFailed(message); |
| break; |
| case KILLED: |
| this.onEvaluatorKilled(message); |
| break; |
| case INIT: |
| case RUNNING: |
| case SUSPEND: |
| break; |
| default: |
| throw new RuntimeException("Unknown state: " + message.getState()); |
| } |
| } |
| |
| /** |
| * Process an evaluator message that indicates that the evaluator shut down cleanly. |
| * |
| * @param message |
| */ |
| private synchronized void onEvaluatorDone(final EvaluatorStatusPOJO message) { |
| |
| assert message.getState() == State.DONE; |
| |
| LOG.log(Level.FINEST, "Evaluator {0} done.", getId()); |
| |
| // Send an ACK to the Evaluator. |
| sendEvaluatorControlMessage( |
| EvaluatorRuntimeProtocol.EvaluatorControlProto.newBuilder() |
| .setTimestamp(System.currentTimeMillis()) |
| .setIdentifier(getId()) |
| .setDoneEvaluator(EvaluatorRuntimeProtocol.DoneEvaluatorProto.newBuilder().build()) |
| .build()); |
| |
| this.stateManager.setDone(); |
| this.messageDispatcher.onEvaluatorCompleted(new CompletedEvaluatorImpl(this.evaluatorId)); |
| |
| this.close(); |
| } |
| |
| /** |
| * Process an evaluator message that indicates a crash. |
| * |
| * @param evaluatorStatus |
| */ |
| private synchronized void onEvaluatorFailed(final EvaluatorStatusPOJO evaluatorStatus) { |
| |
| assert evaluatorStatus.getState() == State.FAILED; |
| |
| final EvaluatorException evaluatorException; |
| |
| if (evaluatorStatus.hasError()) { |
| |
| final Optional<Throwable> exception = |
| this.exceptionCodec.fromBytes(evaluatorStatus.getError()); |
| |
| evaluatorException = new EvaluatorException(getId(), exception.isPresent() ? exception.get() : |
| new NonSerializableException("Exception sent, but can't be deserialized", evaluatorStatus.getError())); |
| |
| } else { |
| evaluatorException = new EvaluatorException(getId(), new Exception("No exception sent")); |
| } |
| |
| this.onEvaluatorException(evaluatorException); |
| } |
| |
| /** |
| * Process an evaluator message that indicates that the evaluator completed the unclean shut down request. |
| * |
| * @param message |
| */ |
| private synchronized void onEvaluatorKilled(final EvaluatorStatusPOJO message) { |
| |
| assert message.getState() == State.KILLED; |
| assert this.stateManager.isClosing(); |
| |
| LOG.log(Level.WARNING, "Evaluator {0} killed completely.", getId()); |
| |
| this.stateManager.setKilled(); |
| } |
| |
| public void onResourceLaunch(final ResourceLaunchEvent resourceLaunchEvent) { |
| synchronized (this.evaluatorDescriptor) { |
| if (this.stateManager.isAllocated()) { |
| this.stateManager.setSubmitted(); |
| this.resourceLaunchHandler.onNext(resourceLaunchEvent); |
| } else if (this.stateManager.isCompletedAbnormally()) { |
| LOG.log(Level.WARNING, "Evaluator manager expected {0} state but instead is in state {1}", |
| new Object[] {EvaluatorState.ALLOCATED, this.stateManager}); |
| } else { |
| throw new RuntimeException("Evaluator manager expected " + EvaluatorState.ALLOCATED + |
| " state but instead is in state " + this.stateManager); |
| } |
| } |
| } |
| |
| /** |
| * Packages the ContextControlProto in an EvaluatorControlProto and forward it to the EvaluatorRuntime. |
| * |
| * @param contextControlProto message contains context control info. |
| */ |
| public void sendContextControlMessage(final EvaluatorRuntimeProtocol.ContextControlProto contextControlProto) { |
| synchronized (this.evaluatorDescriptor) { |
| LOG.log(Level.FINEST, "Context control message to {0}", this.evaluatorId); |
| this.contextControlHandler.send(contextControlProto); |
| } |
| } |
| |
| /** |
| * Forward the EvaluatorControlProto to the EvaluatorRuntime. |
| * |
| * @param evaluatorControlProto message contains evaluator control information. |
| */ |
| void sendEvaluatorControlMessage(final EvaluatorRuntimeProtocol.EvaluatorControlProto evaluatorControlProto) { |
| synchronized (this.evaluatorDescriptor) { |
| this.evaluatorControlHandler.send(evaluatorControlProto); |
| } |
| } |
| |
| /** |
| * Handle task status messages. |
| * |
| * @param taskStatus message contains the current task status. |
| */ |
| private void onTaskStatusMessage(final TaskStatusPOJO taskStatus) { |
| |
| if (!(this.task.isPresent() && this.task.get().getId().equals(taskStatus.getTaskId()))) { |
| |
| final State state = taskStatus.getState(); |
| if (state.isRestartable() || |
| this.driverRestartManager.getEvaluatorRestartState(this.evaluatorId).isReregistered()) { |
| |
| // [REEF-308] exposes a bug where the .NET evaluator does not send its states in the right order |
| // [REEF-289] is a related item which may fix the issue |
| if (state.isRunning()) { |
| LOG.log(Level.WARNING, |
| "Received a message of state {0} for Task {1} before receiving its {2} state", |
| new Object[] {State.RUNNING, taskStatus.getTaskId(), State.INIT}); |
| } |
| |
| // FAILED is a legal first state of a Task as it could have failed during construction. |
| this.task = Optional.of( |
| new TaskRepresenter(taskStatus.getTaskId(), |
| this.contextRepresenters.getContext(taskStatus.getContextId()), |
| this.messageDispatcher, |
| this, |
| this.exceptionCodec, |
| this.driverRestartManager)); |
| } else { |
| throw new RuntimeException("Received a message of state " + state + |
| ", not INIT, RUNNING, or FAILED for Task " + taskStatus.getTaskId() + |
| " which we haven't heard from before."); |
| } |
| } |
| |
| this.task.get().onTaskStatusMessage(taskStatus); |
| |
| if (this.task.get().isNotRunning()) { |
| LOG.log(Level.FINEST, "Task no longer running. De-registering it."); |
| this.task = Optional.empty(); |
| } |
| } |
| |
| /** |
| * Resource status information from the (actual) resource manager. |
| */ |
| public void onResourceStatusMessage(final ResourceStatusEvent resourceStatusEvent) { |
| |
| synchronized (this.evaluatorDescriptor) { |
| |
| final State state = resourceStatusEvent.getState(); |
| LOG.log(Level.FINEST, "Resource manager state update: {0}", state); |
| |
| if (!this.stateManager.isAvailable()) { |
| |
| LOG.log(Level.FINE, |
| "Ignoring resource status update for Evaluator {0} which is already in state {1}.", |
| new Object[] {this.getId(), this.stateManager}); |
| |
| } else if (state.isCompleted() && this.stateManager.isAvailable()) { |
| |
| // Something is wrong. The resource manager reports that the Evaluator is done or failed, |
| // but the Driver assumes it to be alive. |
| final StringBuilder messageBuilder = new StringBuilder("Evaluator [") |
| .append(this.evaluatorId) |
| .append("] is assumed to be in state [") |
| .append(this.stateManager.toString()) |
| .append("]. But the resource manager reports it to be in state [") |
| .append(state) |
| .append("]."); |
| |
| if (this.stateManager.isSubmitted()) { |
| messageBuilder |
| .append(" This most likely means that the Evaluator suffered a failure before establishing " + |
| "a communications link to the driver."); |
| } else if (this.stateManager.isAllocated()) { |
| messageBuilder.append(" This most likely means that the Evaluator suffered a failure before being used."); |
| } else if (this.stateManager.isRunning()) { |
| messageBuilder.append(" This means that the Evaluator failed but wasn't able to send an error message " + |
| "back to the driver."); |
| } |
| |
| if (this.task.isPresent()) { |
| messageBuilder.append(" Task [") |
| .append(this.task.get().getId()) |
| .append("] was running when the Evaluator crashed."); |
| } |
| |
| if (resourceStatusEvent.getState() == State.KILLED) { |
| this.onEvaluatorException( |
| new EvaluatorKilledByResourceManagerException(this.evaluatorId, messageBuilder.toString())); |
| } else { |
| this.onEvaluatorException(new EvaluatorException(this.evaluatorId, messageBuilder.toString())); |
| } |
| } |
| } |
| } |
| |
| @Override |
| public String toString() { |
| return "EvaluatorManager:" |
| + " id=" + this.evaluatorId |
| + " state=" + this.stateManager |
| + " task=" + this.task; |
| } |
| |
| // Dynamic Parameters |
| |
| /** |
| * The Evaluator Identifier. |
| */ |
| @NamedParameter(doc = "The Evaluator Identifier.") |
| public static final class EvaluatorIdentifier implements Name<String> { |
| } |
| |
| /** |
| * The Evaluator Host. |
| */ |
| @NamedParameter(doc = "The Evaluator Host.") |
| public static final class EvaluatorDescriptorName implements Name<EvaluatorDescriptorImpl> { |
| } |
| } |