blob: a3b53ed95e6ce94cd266233e08deb79618318eb9 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
using System;
using System.Collections.Generic;
using System.Globalization;
using Org.Apache.REEF.Common;
using Org.Apache.REEF.Common.Api;
using Org.Apache.REEF.Common.Catalog;
using Org.Apache.REEF.Common.Evaluator;
using Org.Apache.REEF.Common.Exceptions;
using Org.Apache.REEF.Common.Protobuf.ReefProtocol;
using Org.Apache.REEF.Driver.Evaluator;
using Org.Apache.REEF.Tang.Implementations.InjectionPlan;
using Org.Apache.REEF.Tang.Interface;
using Org.Apache.REEF.Utilities;
using Org.Apache.REEF.Utilities.Diagnostics;
using Org.Apache.REEF.Utilities.Logging;
using Org.Apache.REEF.Wake.Remote;
using Org.Apache.REEF.Wake.Time;
using Org.Apache.REEF.Wake.Time.Runtime.Event;
namespace Org.Apache.REEF.Driver
{
[Obsolete("Driver core logic no longer needed in.NET")]
public class DriverManager :
IEvaluatorRequestor,
IObserver<RuntimeStatusProto>,
IObserver<ResourceStatusProto>,
IObserver<ResourceAllocationProto>,
IObserver<NodeDescriptorProto>,
IObserver<RuntimeStart>,
IObserver<RuntimeStop>,
IObserver<IdleClock>
{
private static readonly Logger LOGGER = Logger.GetLogger(typeof(DriverManager));
private readonly IInjector _injector;
private readonly IInjectionFuture<IClock> _clockFuture;
private readonly ResourceCatalogImpl _resourceCatalog;
private IInjectionFuture<IResourceRequestHandler> _futureResourceRequestHandler;
private readonly Dictionary<string, EvaluatorManager> _evaluators = new Dictionary<string, EvaluatorManager>();
private readonly EvaluatorHeartBeatSanityChecker _sanityChecker = new EvaluatorHeartBeatSanityChecker();
private readonly ClientJobStatusHandler _clientJobStatusHandler;
private readonly IDisposable _heartbeatConnectionChannel;
private readonly IDisposable _errorChannel;
private readonly IObserver<RuntimeErrorProto> _runtimeErrorHandler;
public DriverManager(
IInjector injector,
ResourceCatalogImpl resourceCatalog,
IRemoteManager<REEFMessage> remoteManager,
IInjectionFuture<IClock> clockFuture,
IInjectionFuture<IResourceRequestHandler> futureResourceRequestHandler,
ClientJobStatusHandler clientJobStatusHandler,
string clientRId)
{
_injector = injector;
_clockFuture = clockFuture;
_resourceCatalog = resourceCatalog;
_futureResourceRequestHandler = futureResourceRequestHandler;
_clientJobStatusHandler = clientJobStatusHandler;
_heartbeatConnectionChannel = null;
_errorChannel = null;
_runtimeErrorHandler = null;
LOGGER.Log(Level.Info, "DriverManager instantiated");
}
public IResourceCatalog ResourceCatalog
{
get
{
return _resourceCatalog;
}
set
{
}
}
private RuntimeStatusProto _runtimeStatusProto
{
get
{
RuntimeStatusProto proto = new RuntimeStatusProto();
proto.state = State.INIT;
proto.name = "REEF";
proto.outstanding_container_requests = 0;
return proto;
}
set
{
_runtimeStatusProto = value;
}
}
public void Submit(IEvaluatorRequest request)
{
LOGGER.Log(Level.Info, "Got an EvaluatorRequest");
ResourceRequestProto proto = new ResourceRequestProto();
//TODO: request.size deprecated should use megabytes instead
//switch (request.Size)
//{
// case EvaluatorRequest.EvaluatorSize.SMALL:
// proto.resource_size = SIZE.SMALL;
// break;
// case EvaluatorRequest.EvaluatorSize.MEDIUM:
// proto.resource_size = SIZE.MEDIUM;
// break;
// case EvaluatorRequest.EvaluatorSize.LARGE:
// proto.resource_size = SIZE.LARGE;
// break;
// case EvaluatorRequest.EvaluatorSize.XLARGE:
// proto.resource_size = SIZE.XLARGE;
// break;
// default:
// throw new InvalidOperationException("invalid request size" + request.Size);
//}
proto.resource_count = request.Number;
if (request.MemoryMegaBytes > 0)
{
proto.memory_size = request.MemoryMegaBytes;
}
//final ResourceCatalog.Descriptor descriptor = req.getDescriptor();
//if (descriptor != null) {
// if (descriptor instanceof RackDescriptor) {
// request.addRackName(descriptor.getName());
// } else if (descriptor instanceof NodeDescriptor) {
// request.addNodeName(descriptor.getName());
// }
//}
//_futureResourceRequestHandler.Get().OnNext(proto);
}
public void Release(EvaluatorManager evaluatorManager)
{
lock (this)
{
string evaluatorManagerId = evaluatorManager.Id;
if (_evaluators.ContainsKey(evaluatorManagerId))
{
_evaluators.Remove(evaluatorManagerId);
}
else
{
var e = new InvalidOperationException("Trying to remove an unknown evaluator manager with id " + evaluatorManagerId);
Exceptions.Throw(e, LOGGER);
}
}
}
/// <summary>
/// This handles runtime error occurs on the evaluator
/// </summary>
/// <param name="runtimeErrorProto"></param>
public void Handle(RuntimeErrorProto runtimeErrorProto)
{
FailedRuntime error = new FailedRuntime(runtimeErrorProto);
LOGGER.Log(Level.Warning, "Runtime error:" + error);
EvaluatorException evaluatorException = error.Cause != null
? new EvaluatorException(error.Id, error.Cause.Value)
: new EvaluatorException(error.Id, "Runtime error");
EvaluatorManager evaluatorManager = null;
lock (_evaluators)
{
if (_evaluators.ContainsKey(error.Id))
{
evaluatorManager = _evaluators[error.Id];
}
else
{
LOGGER.Log(Level.Warning, "Unknown evaluator runtime error: " + error.Cause);
}
}
if (null != evaluatorManager)
{
evaluatorManager.Handle(evaluatorException);
}
}
/// <summary>
/// A RuntimeStatusProto comes from the ResourceManager layer indicating its current status
/// </summary>
/// <param name="runtimeStatusProto"></param>
public void OnNext(RuntimeStatusProto runtimeStatusProto)
{
Handle(runtimeStatusProto);
}
/// <summary>
/// A ResourceStatusProto message comes from the ResourceManager layer to indicate what it thinks
/// about the current state of a given resource. Ideally, we should think the same thing.
/// </summary>
/// <param name="resourceStatusProto"></param>
public void OnNext(ResourceStatusProto resourceStatusProto)
{
Handle(resourceStatusProto);
}
/// <summary>
/// A ResourceAllocationProto indicates a resource allocation given by the ResourceManager layer.
/// </summary>
/// <param name="resourceAllocationProto"></param>
public void OnNext(ResourceAllocationProto resourceAllocationProto)
{
Handle(resourceAllocationProto);
}
/// <summary>
/// A NodeDescriptorProto defines a new node in the cluster. We should add this to the resource catalog
/// so that clients can make resource requests against it.
/// </summary>
/// <param name="nodeDescriptorProto"></param>
public void OnNext(NodeDescriptorProto nodeDescriptorProto)
{
_resourceCatalog.Handle(nodeDescriptorProto);
}
/// <summary>
/// This EventHandler is subscribed to the StartTime event of the Clock statically. It therefore provides the entrance
/// point to REEF.
/// </summary>
/// <param name="runtimeStart"></param>
public void OnNext(RuntimeStart runtimeStart)
{
LOGGER.Log(Level.Info, "RuntimeStart: " + runtimeStart);
_runtimeStatusProto = new RuntimeStatusProto();
_runtimeStatusProto.state = State.RUNNING;
_runtimeStatusProto.name = "REEF";
_runtimeStatusProto.outstanding_container_requests = 0;
}
/// <summary>
/// Handles RuntimeStop
/// </summary>
/// <param name="runtimeStop"></param>
public void OnNext(RuntimeStop runtimeStop)
{
LOGGER.Log(Level.Info, "RuntimeStop: " + runtimeStop);
if (runtimeStop.Exception != null)
{
string exceptionMessage = runtimeStop.Exception.Message;
LOGGER.Log(Level.Warning, "Sending runtime error:" + exceptionMessage);
RuntimeErrorProto runtimeErrorProto = new RuntimeErrorProto();
runtimeErrorProto.message = exceptionMessage;
runtimeErrorProto.exception = ByteUtilities.StringToByteArrays(exceptionMessage);
runtimeErrorProto.name = "REEF";
_runtimeErrorHandler.OnNext(runtimeErrorProto);
LOGGER.Log(Level.Warning, "DONE Sending runtime error: " + exceptionMessage);
}
lock (_evaluators)
{
foreach (EvaluatorManager evaluatorManager in _evaluators.Values)
{
LOGGER.Log(Level.Warning, "Unclean shutdown of evaluator: " + evaluatorManager.Id);
evaluatorManager.Dispose();
}
}
try
{
_heartbeatConnectionChannel.Dispose();
_errorChannel.Dispose();
Optional<Exception> e = runtimeStop.Exception != null ?
Optional<Exception>.Of(runtimeStop.Exception) : Optional<Exception>.Empty();
_clientJobStatusHandler.Dispose(e);
LOGGER.Log(Level.Info, "driver manager closed");
}
catch (Exception e)
{
Exceptions.Caught(e, Level.Error, "Error disposing Driver manager", LOGGER);
Exceptions.Throw(new InvalidOperationException("Cannot dispose driver manager"), LOGGER);
}
}
public void OnNext(IdleClock value)
{
string message = string.Format(
CultureInfo.InvariantCulture,
"IdleClock: [{0}], RuntimeState [{1}], Outstanding container requests [{2}], Container allocation count[{3}]",
value + Environment.NewLine,
_runtimeStatusProto.state + Environment.NewLine,
_runtimeStatusProto.outstanding_container_requests + Environment.NewLine,
_runtimeStatusProto.container_allocation.Count);
LOGGER.Log(Level.Info, message);
lock (_evaluators)
{
if (_runtimeStatusProto.state == State.RUNNING
&& _runtimeStatusProto.outstanding_container_requests == 0
&& _runtimeStatusProto.container_allocation.Count == 0)
{
LOGGER.Log(Level.Info, "Idle runtime shutdown");
_clockFuture.Get().Dispose();
}
}
}
void IObserver<IdleClock>.OnError(Exception error)
{
throw new NotImplementedException();
}
void IObserver<IdleClock>.OnCompleted()
{
throw new NotImplementedException();
}
void IObserver<RuntimeStop>.OnError(Exception error)
{
throw new NotImplementedException();
}
void IObserver<RuntimeStop>.OnCompleted()
{
throw new NotImplementedException();
}
void IObserver<RuntimeStart>.OnError(Exception error)
{
throw new NotImplementedException();
}
void IObserver<RuntimeStart>.OnCompleted()
{
throw new NotImplementedException();
}
void IObserver<NodeDescriptorProto>.OnError(Exception error)
{
throw new NotImplementedException();
}
void IObserver<NodeDescriptorProto>.OnCompleted()
{
throw new NotImplementedException();
}
void IObserver<ResourceAllocationProto>.OnError(Exception error)
{
throw new NotImplementedException();
}
void IObserver<ResourceAllocationProto>.OnCompleted()
{
throw new NotImplementedException();
}
void IObserver<ResourceStatusProto>.OnError(Exception error)
{
throw new NotImplementedException();
}
void IObserver<ResourceStatusProto>.OnCompleted()
{
throw new NotImplementedException();
}
void IObserver<RuntimeStatusProto>.OnError(Exception error)
{
throw new NotImplementedException();
}
void IObserver<RuntimeStatusProto>.OnCompleted()
{
throw new NotImplementedException();
}
/// <summary>
/// Something went wrong at the runtime layer (either driver or evaluator). This
/// method simply forwards the RuntimeErrorProto to the client via the RuntimeErrorHandler.
/// </summary>
/// <param name="runtimeErrorProto"></param>
private void Fail(RuntimeErrorProto runtimeErrorProto)
{
_runtimeErrorHandler.OnNext(runtimeErrorProto);
_clockFuture.Get().Dispose();
}
/// <summary>
/// Helper method to create a new EvaluatorManager instance
/// </summary>
/// <param name="id">identifier of the Evaluator</param>
/// <param name="descriptor"> NodeDescriptor on which the Evaluator executes.</param>
/// <returns>new EvaluatorManager instance.</returns>
private EvaluatorManager GetNewEvaluatorManagerInstance(string id, EvaluatorDescriptorImpl descriptor)
{
LOGGER.Log(Level.Info, "Creating Evaluator Manager: " + id);
//TODO bindVolatieParameter
return (EvaluatorManager)_injector.GetInstance(typeof(EvaluatorManager));
}
/// <summary>
/// Receives and routes heartbeats from Evaluators.
/// </summary>
/// <param name="evaluatorHearBeatProto"></param>
private void Handle(IRemoteMessage<EvaluatorHeartbeatProto> evaluatorHearBeatProto)
{
EvaluatorHeartbeatProto heartbeat = evaluatorHearBeatProto.Message;
EvaluatorStatusProto status = heartbeat.evaluator_status;
string evaluatorId = status.evaluator_id;
LOGGER.Log(Level.Info, string.Format(CultureInfo.InvariantCulture, "Heartbeat from Evaluator {0} with state {1} timestamp {2}", evaluatorId, status.state, heartbeat.timestamp));
_sanityChecker.check(evaluatorId, heartbeat.timestamp);
lock (_evaluators)
{
if (_evaluators.ContainsKey(evaluatorId))
{
EvaluatorManager evaluatorManager = _evaluators[evaluatorId];
evaluatorManager.Handle(evaluatorHearBeatProto);
}
else
{
string msg = "Contact from unkonwn evaluator with id: " + evaluatorId;
if (heartbeat.evaluator_status != null)
{
msg += " with state" + status.state;
}
LOGGER.Log(Level.Error, msg);
Exceptions.Throw(new InvalidOperationException(msg), LOGGER);
}
}
}
/// <summary>
/// This resource status message comes from the ResourceManager layer; telling me what it thinks
/// about the state of the resource executing an Evaluator; This method simply passes the message
/// off to the referenced EvaluatorManager
/// </summary>
/// <param name="resourceStatusProto"></param>
private void Handle(ResourceStatusProto resourceStatusProto)
{
lock (_evaluators)
{
if (_evaluators.ContainsKey(resourceStatusProto.identifier))
{
EvaluatorManager evaluatorManager = _evaluators[resourceStatusProto.identifier];
evaluatorManager.Handle(resourceStatusProto);
}
else
{
var e = new InvalidOperationException(string.Format(CultureInfo.InvariantCulture, "Unknown resource status from evaluator {0} with state {1}", resourceStatusProto.identifier, resourceStatusProto.state));
Exceptions.Throw(e, LOGGER);
}
}
}
/// <summary>
/// This method handles resource allocations by creating a new EvaluatorManager instance.
/// </summary>
/// <param name="resourceAllocationProto"></param>
private void Handle(ResourceAllocationProto resourceAllocationProto)
{
lock (_evaluators)
{
try
{
INodeDescriptor nodeDescriptor = _resourceCatalog.GetNode(resourceAllocationProto.node_id);
if (nodeDescriptor == null)
{
Exceptions.Throw(new InvalidOperationException("Unknown resurce: " + resourceAllocationProto.node_id), LOGGER);
}
EvaluatorDescriptorImpl evaluatorDescriptor = new EvaluatorDescriptorImpl(nodeDescriptor, EvaluatorType.UNDECIDED, resourceAllocationProto.resource_memory, resourceAllocationProto.virtual_cores);
LOGGER.Log(Level.Info, "Resource allocation: new evaluator id: " + resourceAllocationProto.identifier);
EvaluatorManager evaluatorManager = GetNewEvaluatorManagerInstance(resourceAllocationProto.identifier, evaluatorDescriptor);
_evaluators.Add(resourceAllocationProto.identifier, evaluatorManager);
}
catch (Exception e)
{
Exceptions.Caught(e, Level.Error, LOGGER);
Exceptions.Throw(new InvalidOperationException("Error handling resourceAllocationProto."), LOGGER);
}
}
}
private void Handle(RuntimeStatusProto runtimeStatusProto)
{
State runtimeState = runtimeStatusProto.state;
LOGGER.Log(Level.Info, "Runtime status: " + runtimeStatusProto.state);
switch (runtimeState)
{
case State.FAILED:
Fail(runtimeStatusProto.error);
break;
case State.DONE:
_clockFuture.Get().Dispose();
break;
case State.RUNNING:
lock (_evaluators)
{
_runtimeStatusProto = runtimeStatusProto;
if (_clockFuture.Get().IsIdle()
&& runtimeStatusProto.outstanding_container_requests == 0
&& runtimeStatusProto.container_allocation.Count == 0)
{
_clockFuture.Get().Dispose();
}
}
break;
}
}
}
}