blob: e28373dcf3ff2ec25494062ecd73325fbd70d72e [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
using System;
using System.Collections.Generic;
using System.Globalization;
using System.Linq;
using System.Text;
using Org.Apache.REEF.Common.Api;
using Org.Apache.REEF.Common.Catalog;
using Org.Apache.REEF.Common.Evaluator;
using Org.Apache.REEF.Common.Exceptions;
using Org.Apache.REEF.Common.Protobuf.ReefProtocol;
using Org.Apache.REEF.Driver.Bridge.Events;
using Org.Apache.REEF.Driver.Context;
using Org.Apache.REEF.Driver.Evaluator;
using Org.Apache.REEF.Driver.Task;
using Org.Apache.REEF.Tang.Annotations;
using Org.Apache.REEF.Utilities;
using Org.Apache.REEF.Utilities.Diagnostics;
using Org.Apache.REEF.Utilities.Logging;
using Org.Apache.REEF.Wake.Remote;
using Org.Apache.REEF.Wake.Time;
using TaskMessage = Org.Apache.REEF.Common.Tasks.TaskMessage;
namespace Org.Apache.REEF.Driver
{
/// <summary>
/// Manages a single Evaluator instance including all lifecycle instances:
/// (AllocatedEvaluator, CompletedEvaluator, FailedEvaluator).
/// A (periodic) heartbeat channel is established EvaluatorRuntime -> EvaluatorManager.
/// The EvaluatorRuntime will (periodically) send (status) messages to the EvaluatorManager using this heartbeat channel.
/// A (push-based) EventHandler channel is established EvaluatorManager -> EvaluatorRuntime.
/// The EvaluatorManager uses this to forward Driver messages, launch Tasks, and initiate
/// control information (e.g., shutdown, suspend).* Manages a single Evaluator instance including all lifecycle instances:
/// (AllocatedEvaluator, CompletedEvaluator, FailedEvaluator).
/// A (periodic) heartbeat channel is established EvaluatorRuntime -> EvaluatorManager.
/// The EvaluatorRuntime will (periodically) send (status) messages to the EvaluatorManager using this heartbeat channel.
/// A (push-based) EventHandler channel is established EvaluatorManager -> EvaluatorRuntime.
/// The EvaluatorManager uses this to forward Driver messages, launch Tasks, and initiate control information (e.g., shutdown, suspend).
/// </summary>
[Obsolete("Driver core logic no longer needed in.NET")]
public class EvaluatorManager : IDisposable, IIdentifiable
{
private static readonly Logger LOGGER = Logger.GetLogger(typeof(EvaluatorManager));
private STATE _state = STATE.ALLOCATED;
private IClock _clock;
// TODO
// private final RemoteManager remoteManager;
private readonly DriverManager _driverManager;
private readonly IResourceReleaseHandler _resourceReleaseHandler;
private readonly IResourceLaunchHandler _resourceLaunchHandler;
private readonly EvaluatorDescriptorImpl _evaluatorDescriptor;
private readonly string _evaluatorId;
private readonly IList<EvaluatorContext> _activeContexts = new List<EvaluatorContext>();
private readonly HashSet<string> _activeContextIds = new HashSet<string>();
private IRunningTask _runningTask = null;
private readonly IObserver<EvaluatorControlProto> _evaluatorControlHandler = null;
private bool _isResourceReleased = false;
//TODO
//private final DispatchingEStage dispatcher;
private EvaluatorType _type = EvaluatorType.CLR;
public EvaluatorManager(
IClock clock,
//RemoteManager remoteManager,
DriverManager driverManager,
IResourceReleaseHandler resourceReleaseHandler,
IResourceLaunchHandler resourceLaunchHandler,
//REEFErrorHandler errorHandler,
string evaluatorId,
EvaluatorDescriptorImpl evaluatorDescriptor,
ISet<IObservable<IActiveContext>> activeContextEventHandler,
ISet<IObservable<IClosedContext>> closedContextEventHandlers,
ISet<IObservable<FailedContext>> failedContextEventHandlers,
ISet<IObservable<ContextMessage>> contextMessageHandlers,
ISet<IObservable<IRunningTask>> runningTaskEventHandlers,
ISet<IObservable<ICompletedTask>> completedTaskEventHandlers,
ISet<IObservable<ISuspendedTask>> suspendedTaskEventHandlers,
ISet<IObservable<TaskMessage>> taskMessageEventHandlers,
ISet<IObservable<FailedTask>> taskExceptionEventHandlers,
ISet<IObservable<IAllocatedEvaluator>> allocatedEvaluatorEventHandlers,
ISet<IObservable<IFailedEvaluator>> failedEvaluatorHandlers,
ISet<IObservable<ICompletedEvaluator>> completedEvaluatorHandlers)
{
_clock = clock;
//_remoteManager = remoteManager;
_driverManager = driverManager;
_resourceReleaseHandler = resourceReleaseHandler;
_resourceLaunchHandler = resourceLaunchHandler;
_evaluatorId = evaluatorId;
_evaluatorDescriptor = evaluatorDescriptor;
//this.dispatcher = new DispatchingEStage(errorHandler, 16); // 16 threads
//this.dispatcher.register(ActiveContext.class, activeContextEventHandlers);
//this.dispatcher.register(ClosedContext.class, closedContextEventHandlers);
//this.dispatcher.register(FailedContext.class, failedContextEventHandlers);
//this.dispatcher.register(ContextMessage.class, contextMessageHandlers);
//this.dispatcher.register(RunningTask.class, runningTaskEventHandlers);
//this.dispatcher.register(CompletedTask.class, completedTaskEventHandlers);
//this.dispatcher.register(SuspendedTask.class, suspendedTaskEventHandlers);
//this.dispatcher.register(TaskMessage.class, taskMessageEventHandlers);
//this.dispatcher.register(FailedTask.class, taskExceptionEventHandlers);
//this.dispatcher.register(FailedEvaluator.class, failedEvaluatorHandlers);
//this.dispatcher.register(CompletedEvaluator.class, completedEvaluatorHandlers);
//this.dispatcher.register(AllocatedEvaluator.class, allocatedEvaluatorEventHandlers);
//this.dispatcher.onNext(AllocatedEvaluator.class,
// new AllocatedEvaluatorImpl(this, remoteManager.getMyIdentifier()));
}
/// <summary>
/// Various states that the EvaluatorManager could be in. The EvaluatorManager is created when a resource has been allocated by the ResourceManager.
/// </summary>
public enum STATE
{
ALLOCATED, // initial state
SUBMITTED, // client called AllocatedEvaluator.submitTask() and we're waiting for first contact
RUNNING, // first contact received, all communication channels established, Evaluator sent to client.
DONE, // clean shutdown
FAILED, // some failure occurred.
KILLED // unclean shutdown
}
public IEvaluatorDescriptor EvaluatorDescriptor
{
get
{
return _evaluatorDescriptor;
}
}
public INodeDescriptor NodeDescriptor
{
get
{
return EvaluatorDescriptor.NodeDescriptor;
}
}
public IRunningTask RunningTask
{
get
{
lock (_evaluatorDescriptor)
{
return _runningTask;
}
}
}
public string Id
{
get
{
return _evaluatorId;
}
set
{
}
}
public EvaluatorType Type
{
get
{
return _type;
}
set
{
_type = value;
_evaluatorDescriptor.EvaluatorType = value;
}
}
public void Dispose()
{
lock (_evaluatorDescriptor)
{
if (_state == STATE.RUNNING)
{
LOGGER.Log(Level.Warning, "Dirty shutdown of running evaluator :" + Id);
try
{
// Killing the evaluator means that it doesn't need to send a confirmation; it just dies.
EvaluatorControlProto proto = new EvaluatorControlProto();
proto.timestamp = DateTime.Now.Ticks;
proto.identifier = Id;
proto.kill_evaluator = new KillEvaluatorProto();
Handle(proto);
}
finally
{
_state = STATE.KILLED;
}
}
}
if (!_isResourceReleased)
{
try
{
// We need to wait awhile before returning the container to the RM in order to
// give the EvaluatorRuntime (and Launcher) time to cleanly exit.
// this.clock.scheduleAlarm(100, new EventHandler<Alarm>() {
//@Override
//public void onNext(final Alarm alarm) {
// EvaluatorManager.this.resourceReleaseHandler.onNext(
// DriverRuntimeProtocol.ResourceReleaseProto.newBuilder()
// .setIdentifier(EvaluatorManager.this.evaluatorId).build());
}
catch (Exception e)
{
Exceptions.Caught(e, Level.Warning, "Force resource release because the client closed the clock.", LOGGER);
ResourceReleaseProto proto = new ResourceReleaseProto();
proto.identifier = _evaluatorId;
_resourceReleaseHandler.OnNext(proto);
}
finally
{
_isResourceReleased = true;
_driverManager.Release(this);
}
}
}
/// <summary>
/// EvaluatorException will trigger is FailedEvaluator and state transition to FAILED
/// </summary>
/// <param name="exception"></param>
public void Handle(EvaluatorException exception)
{
lock (_evaluatorDescriptor)
{
if (_state >= STATE.DONE)
{
return;
}
LOGGER.Log(Level.Warning, "Failed Evaluator: " + Id + exception.Message);
try
{
IList<FailedContext> failedContexts = new List<FailedContext>();
IList<EvaluatorContext> activeContexts = new List<EvaluatorContext>(_activeContexts);
activeContexts = activeContexts.Reverse().ToList();
foreach (EvaluatorContext context in activeContexts)
{
Optional<IActiveContext> parentContext = context.ParentId.IsPresent()
? Optional<IActiveContext>.Of(GetEvaluatorContext(context.ParentId.Value))
: Optional<IActiveContext>.Empty();
failedContexts.Add(context.GetFailedContext(parentContext, exception));
}
//Optional<FailedTask> failedTask = _runningTask != null ?
// Optional<FailedTask>.Of(new FailedTask(_runningTask.Id, exception)) : Optional<FailedTask>.Empty();
//LOGGER.Log(Level.Info, "TODO: REPLACE THIS " + failedTask.ToString());
//this.dispatcher.onNext(FailedEvaluator.class, new FailedEvaluatorImpl(
//exception, failedContextList, failedTaskOptional, this.evaluatorId));
}
catch (Exception e)
{
Exceptions.CaughtAndThrow(e, Level.Error, "Exception while handling FailedEvaluator.", LOGGER);
}
finally
{
_state = STATE.FAILED;
Dispose();
}
}
}
public void Handle(IRemoteMessage<EvaluatorHeartbeatProto> evaluatorHearBeatProtoMessage)
{
lock (_evaluatorDescriptor)
{
EvaluatorHeartbeatProto heartbeatProto = evaluatorHearBeatProtoMessage.Message;
if (heartbeatProto.evaluator_status != null)
{
EvaluatorStatusProto status = heartbeatProto.evaluator_status;
if (status.error != null)
{
Handle(new EvaluatorException(Id, ByteUtilities.ByteArrarysToString(status.error)));
return;
}
else if (_state == STATE.SUBMITTED)
{
string evaluatorRId = evaluatorHearBeatProtoMessage.Identifier.ToString();
LOGGER.Log(Level.Info, "TODO: REPLACE THIS " + evaluatorRId);
// TODO
// _evaluatorControlHandler = _remoteManager.getHandler(evaluatorRID, EvaluatorRuntimeProtocol.EvaluatorControlProto.class);
_state = STATE.RUNNING;
LOGGER.Log(Level.Info, string.Format(CultureInfo.InvariantCulture, "Evaluator {0} is running", _evaluatorId));
}
}
LOGGER.Log(Level.Info, "Evaluator heartbeat: " + heartbeatProto);
EvaluatorStatusProto evaluatorStatusProto = heartbeatProto.evaluator_status;
foreach (ContextStatusProto contextStatusProto in heartbeatProto.context_status)
{
Handle(contextStatusProto, heartbeatProto.task_status != null);
}
if (heartbeatProto.task_status != null)
{
Handle(heartbeatProto.task_status);
}
if (evaluatorStatusProto.state == State.FAILED)
{
_state = STATE.FAILED;
EvaluatorException e = evaluatorStatusProto.error != null ?
new EvaluatorException(_evaluatorId, ByteUtilities.ByteArrarysToString(evaluatorStatusProto.error)) :
new EvaluatorException(_evaluatorId, "unknown cause");
LOGGER.Log(Level.Warning, "Failed evaluator: " + Id + e.Message);
Handle(e);
}
else if (evaluatorStatusProto.state == State.DONE)
{
LOGGER.Log(Level.Info, string.Format(CultureInfo.InvariantCulture, "Evaluator {0} done", Id));
_state = STATE.DONE;
// TODO
// dispatcher.onNext(CompletedEvaluator.class, new CompletedEvaluator() {
//@Override
//public String getId() {
// return EvaluatorManager.this.evaluatorId;
Dispose();
}
}
LOGGER.Log(Level.Info, "DONE with evaluator heartbeat");
}
public void Handle(ResourceLaunchProto resourceLaunchProto)
{
lock (_evaluatorDescriptor)
{
if (_state == STATE.ALLOCATED)
{
_state = STATE.SUBMITTED;
_resourceLaunchHandler.OnNext(resourceLaunchProto);
}
else
{
var e = new InvalidOperationException(string.Format(CultureInfo.InvariantCulture, "Evaluator manager expected {0} state, but instead is in state {1}", STATE.ALLOCATED, _state));
Exceptions.Throw(e, LOGGER);
}
}
}
/// <summary>
/// Packages the TaskControlProto in an EvaluatorControlProto and forward it to the EvaluatorRuntime
/// </summary>
/// <param name="contextControlProto"></param>
public void Handle(ContextControlProto contextControlProto)
{
lock (_evaluatorDescriptor)
{
LOGGER.Log(Level.Info, "Task control message from " + _evaluatorId);
EvaluatorControlProto evaluatorControlProto = new EvaluatorControlProto();
evaluatorControlProto.timestamp = DateTime.Now.Ticks;
evaluatorControlProto.identifier = Id;
evaluatorControlProto.context_control = contextControlProto;
Handle(evaluatorControlProto);
}
}
/// <summary>
/// Forward the EvaluatorControlProto to the EvaluatorRuntime
/// </summary>
/// <param name="proto"></param>
public void Handle(EvaluatorControlProto proto)
{
lock (_evaluatorDescriptor)
{
if (_state == STATE.RUNNING)
{
_evaluatorControlHandler.OnNext(proto);
}
else
{
var e = new InvalidOperationException(
string.Format(
CultureInfo.InvariantCulture,
"Evaluator manager expects to be in {0} state, but instead is in state {1}",
STATE.RUNNING,
_state));
Exceptions.Throw(e, LOGGER);
}
}
}
/// <summary>
/// Resource status information from the (actual) resource manager.
/// </summary>
/// <param name="resourceStatusProto"></param>
public void Handle(ResourceStatusProto resourceStatusProto)
{
lock (_evaluatorDescriptor)
{
State resourceState = resourceStatusProto.state;
LOGGER.Log(Level.Info, "Resource manager state update: " + resourceState);
if (resourceState == State.DONE || resourceState == State.FAILED)
{
if (_state < STATE.DONE)
{
// something is wrong, I think I'm alive but the resource manager runtime says I'm dead
StringBuilder stringBuilder = new StringBuilder();
stringBuilder.Append(
string.Format(
CultureInfo.InvariantCulture,
"The resource manager informed me that Evaluator {0} is in state {1} but I think I am in {2} state",
_evaluatorId,
resourceState,
_state));
if (resourceStatusProto.diagnostics != null)
{
stringBuilder.Append("Cause: " + resourceStatusProto.diagnostics);
}
if (_runningTask != null)
{
stringBuilder.Append(
string.Format(
CultureInfo.InvariantCulture,
"Taskruntime {0} did not complete before this evaluator died.",
_runningTask.Id));
}
// RM is telling me its DONE/FAILED - assuming it has already released the resources
_isResourceReleased = true;
//Handle(new EvaluatorException(_evaluatorId, stringBuilder.ToString(), _runningTask));
_state = STATE.KILLED;
}
}
}
}
/// <summary>
/// Handle a context status update
/// </summary>
/// <param name="contextStatusProto"></param>
/// <param name="notifyClientOnNewActiveContext"></param>
private void Handle(ContextStatusProto contextStatusProto, bool notifyClientOnNewActiveContext)
{
string contextId = contextStatusProto.context_id;
Optional<string> parentId = contextStatusProto.parent_id != null ?
Optional<string>.Of(contextStatusProto.parent_id) : Optional<string>.Empty();
if (ContextStatusProto.State.READY == contextStatusProto.context_state)
{
if (!_activeContextIds.Contains(contextId))
{
EvaluatorContext evaluatorContext = new EvaluatorContext(this, contextId, parentId);
AddEvaluatorContext(evaluatorContext);
if (notifyClientOnNewActiveContext)
{
LOGGER.Log(Level.Info, "TODO: REPLACE THIS " + evaluatorContext.ToString());
//TODO
//dispatcher.onNext(ActiveContext.class, context);
}
}
foreach (ContextStatusProto.ContextMessageProto contextMessageProto in contextStatusProto.context_message)
{
byte[] message = contextMessageProto.message;
string sourceId = contextMessageProto.source_id;
LOGGER.Log(Level.Info, "TODO: REPLACE THIS " + sourceId + message);
// this.dispatcher.onNext(ContextMessage.class,
//new ContextMessageImpl(theMessage, contextID, sourceID));
}
}
else
{
if (!_activeContextIds.Contains(contextId))
{
if (ContextStatusProto.State.FAIL == contextStatusProto.context_state)
{
AddEvaluatorContext(new EvaluatorContext(this, contextId, parentId));
}
else
{
var e = new InvalidOperationException("unknown context signaling state " + contextStatusProto.context_state);
Exceptions.Throw(e, LOGGER);
}
}
}
EvaluatorContext context = GetEvaluatorContext(contextId);
EvaluatorContext parentContext = context.ParentId.IsPresent() ?
GetEvaluatorContext(context.ParentId.Value) : null;
RemoveEvaluatorContext(context);
if (ContextStatusProto.State.FAIL == contextStatusProto.context_state)
{
// TODO
Exception reason = new InvalidOperationException(ByteUtilities.ByteArrarysToString(contextStatusProto.error));
Optional<IActiveContext> optionalParentContext = (null == parentContext) ?
Optional<IActiveContext>.Empty() : Optional<IActiveContext>.Of(parentContext);
LOGGER.Log(Level.Info, "TODO: REPLACE THIS " + reason.ToString() + optionalParentContext);
// TODO
//this.dispatcher.onNext(FailedContext.class,
//context.getFailedContext(optionalParentContext, reason));
}
else if (ContextStatusProto.State.DONE == contextStatusProto.context_state)
{
if (null != parentContext)
{
// TODO
//this.dispatcher.onNext(ClosedContext.class, context.getClosedContext(parentContext));
}
else
{
LOGGER.Log(Level.Info, "Root context closed. Evaluator closed will trigger final shutdown.");
}
}
else
{
var e = new InvalidOperationException(string.Format(CultureInfo.InvariantCulture, "Unknown context state {0} for context {1}", contextStatusProto.context_state, contextId));
Exceptions.Throw(e, LOGGER);
}
}
/// <summary>
/// Handle task status messages.
/// </summary>
/// <param name="taskStatusProto"></param>
private void Handle(TaskStatusProto taskStatusProto)
{
LOGGER.Log(Level.Info, string.Format(CultureInfo.InvariantCulture, "Received task {0} status {1}", taskStatusProto.task_id, taskStatusProto.state));
string taskId = taskStatusProto.task_id;
string contextId = taskStatusProto.context_id;
State taskState = taskStatusProto.state;
if (taskState == State.INIT)
{
EvaluatorContext evaluatorContext = GetEvaluatorContext(contextId);
_runningTask = new RunningTaskImpl(taskId, evaluatorContext);
// this.dispatcher.onNext(RunningTask.class, this.runningTask);
}
else if (taskState == State.SUSPEND)
{
EvaluatorContext evaluatorContext = GetEvaluatorContext(contextId);
_runningTask = null;
byte[] message = taskStatusProto.result != null ? taskStatusProto.result : null;
LOGGER.Log(Level.Info, "TODO: REPLACE THIS " + evaluatorContext + message.ToString());
//this.dispatcher.onNext(SuspendedTask.class, new SuspendedTaskImpl(evaluatorContext, message, taskId));
}
else if (taskState == State.DONE)
{
EvaluatorContext evaluatorContext = GetEvaluatorContext(contextId);
_runningTask = null;
byte[] message = taskStatusProto.result != null ? taskStatusProto.result : null;
LOGGER.Log(Level.Info, "TODO: REPLACE THIS " + evaluatorContext + message.ToString());
//this.dispatcher.onNext(CompletedTask.class, new CompletedTaskImpl(evaluatorContext, message, taskId));
}
else if (taskState == State.FAILED)
{
_runningTask = null;
//EvaluatorContext evaluatorContext = GetEvaluatorContext(contextId);
//FailedTask failedTask = taskStatusProto.result != null ?
// new FailedTask(taskId, ByteUtilities.ByteArrarysToString(taskStatusProto.result), Optional<IActiveContext>.Of(evaluatorContext)) :
// new FailedTask(taskId, "Failed task: " + taskState, Optional<IActiveContext>.Of(evaluatorContext));
//LOGGER.Log(Level.Info, "TODO: REPLACE THIS " + failedTask.ToString());
//this.dispatcher.onNext(FailedTask.class, taskException);
}
else if (taskStatusProto.task_message.Count > 0)
{
if (_runningTask != null)
{
var e = new InvalidOperationException("runningTask must be null when there are multiple task messages");
Exceptions.Throw(e, LOGGER);
}
foreach (TaskStatusProto.TaskMessageProto taskMessageProto in taskStatusProto.task_message)
{
LOGGER.Log(Level.Info, "TODO: REPLACE THIS " + taskMessageProto.ToString());
// this.dispatcher.onNext(TaskMessage.class,
//new TaskMessageImpl(taskMessageProto.getMessage().toByteArray(),
// taskId, contextId, taskMessageProto.getSourceId()));
}
}
}
private EvaluatorContext GetEvaluatorContext(string id)
{
foreach (EvaluatorContext context in _activeContexts)
{
if (context.Id.Equals(id))
{
return context;
}
var e = new InvalidOperationException("Unknown evaluator context with id " + id);
Exceptions.Throw(e, LOGGER);
}
return null;
}
private void AddEvaluatorContext(EvaluatorContext context)
{
_activeContexts.Add(context);
_activeContextIds.Add(context.Id);
}
private void RemoveEvaluatorContext(EvaluatorContext context)
{
_activeContexts.Remove(context);
_activeContextIds.Remove(context.Id);
}
[NamedParameter(documentation: "The Evaluator Identifier.")]
public class EvaluatorIdentifier : Name<string>
{
}
[NamedParameter(documentation: "The Evaluator Host.")]
public class EvaluatorDescriptorName : Name<EvaluatorDescriptorImpl>
{
}
}
}