blob: 5800081817a69abc3758c98af574ebd95f469eb8 [file] [log] [blame]
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
using System;
using System.Collections.Generic;
using System.Globalization;
using System.Linq;
using System.Net;
using System.Threading;
using Org.Apache.REEF.Common.Evaluator;
using Org.Apache.REEF.Common.Protobuf.ReefProtocol;
using Org.Apache.REEF.Common.Runtime.Evaluator.Context;
using Org.Apache.REEF.Common.Runtime.Evaluator.Utils;
using Org.Apache.REEF.Common.Tasks;
using Org.Apache.REEF.Tang.Annotations;
using Org.Apache.REEF.Tang.Exceptions;
using Org.Apache.REEF.Tang.Implementations.InjectionPlan;
using Org.Apache.REEF.Utilities;
using Org.Apache.REEF.Utilities.Attributes;
using Org.Apache.REEF.Utilities.Logging;
using Org.Apache.REEF.Wake.Remote;
using Org.Apache.REEF.Wake.Remote.Impl;
using Org.Apache.REEF.Wake.Time;
using Org.Apache.REEF.Wake.Time.Event;
namespace Org.Apache.REEF.Common.Runtime.Evaluator
{
[ThreadSafe]
internal sealed class HeartBeatManager : IHeartBeatManager
{
private static readonly Logger LOGGER = Logger.GetLogger(typeof(HeartBeatManager));
private static readonly MachineStatus MachineStatus = new MachineStatus();
private readonly IRemoteManager<REEFMessage> _remoteManager;
private readonly IClock _clock;
private readonly int _heartBeatPeriodInMillSeconds;
private readonly int _maxHeartbeatRetries = 0;
private readonly int _maxHeartbeatRetriesForNonRecoveryMode = 0;
private IRemoteIdentifier _remoteId;
private IObserver<REEFMessage> _observer;
private int _heartbeatFailures = 0;
private readonly IInjectionFuture<IDriverConnection> _driverConnection;
private readonly EvaluatorSettings _evaluatorSettings;
private readonly IInjectionFuture<EvaluatorRuntime> _evaluatorRuntime;
private readonly IInjectionFuture<ContextManager> _contextManager;
private bool _isCompletedHeartbeatQueued = false;
// the queue can only contains the following:
// 1. all failed heartbeats (regular and event-based) before entering RECOVERY state
// 2. event-based heartbeats generated in RECOVERY state (since there will be no attempt to send regular heartbeat)
private readonly Queue<EvaluatorHeartbeatProto> _queuedHeartbeats = new Queue<EvaluatorHeartbeatProto>();
[Inject]
private HeartBeatManager(
EvaluatorSettings settings,
IInjectionFuture<EvaluatorRuntime> evaluatorRuntime,
IInjectionFuture<ContextManager> contextManager,
[Parameter(typeof(ErrorHandlerRid))] string errorHandlerRid,
IInjectionFuture<IDriverConnection> driverConnection)
{
using (LOGGER.LogFunction("HeartBeatManager::HeartBeatManager"))
{
_evaluatorSettings = settings;
_evaluatorRuntime = evaluatorRuntime;
_contextManager = contextManager;
_remoteManager = settings.RemoteManager;
_remoteId = new SocketRemoteIdentifier(NetUtilities.ParseIpEndpoint(errorHandlerRid));
_observer = _remoteManager.GetRemoteObserver(new RemoteEventEndPoint<REEFMessage>(_remoteId));
_clock = settings.RuntimeClock;
_heartBeatPeriodInMillSeconds = settings.HeartBeatPeriodInMs;
_maxHeartbeatRetries = settings.MaxHeartbeatRetries;
_maxHeartbeatRetriesForNonRecoveryMode = settings.MaxHeartbeatRetriesForNonRecoveryMode;
_driverConnection = driverConnection;
MachineStatus.ToString(); // kick start the CPU perf counter
}
}
/// <summary>
/// Return EvaluatorRuntime referenced from HeartBeatManager
/// </summary>
public EvaluatorRuntime EvaluatorRuntime
{
get { return _evaluatorRuntime.Get(); }
}
/// <summary>
/// Return ContextManager referenced from HeartBeatManager
/// </summary>
public ContextManager ContextManager
{
get { return _contextManager.Get(); }
}
/// <summary>
/// EvaluatorSettings contains the configuration data of the evaluators
/// </summary>
public EvaluatorSettings EvaluatorSettings
{
get { return _evaluatorSettings; }
}
public void Send(EvaluatorHeartbeatProto evaluatorHeartbeatProto)
{
lock (_queuedHeartbeats)
{
// Do not send a heartbeat if Evaluator has already signaled that it was done.
if (_isCompletedHeartbeatQueued)
{
LOGGER.Log(Level.Warning, "Evaluator trying to schedule a heartbeat after a completed heartbeat has already been scheduled or sent.");
return;
}
if (IsEvaluatorStateCompleted(evaluatorHeartbeatProto.evaluator_status.state))
{
_isCompletedHeartbeatQueued = true;
}
if (_evaluatorSettings.OperationState == EvaluatorOperationState.RECOVERY)
{
LOGGER.Log(Level.Warning, string.Format(CultureInfo.InvariantCulture, "In RECOVERY mode, heartbeat queued as [{0}]. ", evaluatorHeartbeatProto));
_queuedHeartbeats.Enqueue(evaluatorHeartbeatProto);
return;
}
// NOT during recovery, try to send
REEFMessage payload = new REEFMessage(evaluatorHeartbeatProto);
try
{
_observer.OnNext(payload);
_heartbeatFailures = 0; // reset failure counts if we are having intermittent (not continuous) failures
}
catch (Exception e)
{
if (evaluatorHeartbeatProto.task_status == null || evaluatorHeartbeatProto.task_status.state != State.RUNNING)
{
Utilities.Diagnostics.Exceptions.Throw(e, "Lost communications to driver when no task is running, recovery NOT supported for such scenario", LOGGER);
}
_heartbeatFailures++;
_queuedHeartbeats.Enqueue(evaluatorHeartbeatProto);
LOGGER.Log(Level.Error, string.Format(CultureInfo.InvariantCulture, "Sending heartbeat to driver experienced #{0} failure. Hearbeat queued as: [{1}]. ", _heartbeatFailures, evaluatorHeartbeatProto), e);
if (_driverConnection.Get() is MissingDriverConnection)
{
if (_heartbeatFailures >= _maxHeartbeatRetriesForNonRecoveryMode)
{
var msg =
string.Format(CultureInfo.InvariantCulture,
"Have encountered {0} heartbeat failures. Limit of heartbeat sending failures exceeded. Driver reconnect logic is not implemented, failing evaluator.",
_heartbeatFailures);
LOGGER.Log(Level.Error, msg);
throw new ReefRuntimeException(msg, e);
}
}
else
{
if (_heartbeatFailures >= _maxHeartbeatRetries)
{
LOGGER.Log(Level.Warning,
"Heartbeat communications to driver reached max of {0} failures. Driver is considered dead/unreachable",
_heartbeatFailures);
LOGGER.Log(Level.Info, "Entering RECOVERY mode!!!");
ContextManager.HandleDriverConnectionMessage(
new DriverConnectionMessageImpl(DriverConnectionState.Disconnected));
LOGGER.Log(Level.Info, "instantiate driver reconnect implementation: " + _driverConnection);
_evaluatorSettings.OperationState = EvaluatorOperationState.RECOVERY;
// clean heartbeat failure
_heartbeatFailures = 0;
}
}
}
}
}
/// <summary>
/// Assemble a complete new heartbeat and send it out.
/// </summary>
public void OnNext()
{
LOGGER.Log(Level.Verbose, "Before acquiring lock: HeartbeatManager::OnNext()");
lock (this)
{
LOGGER.Log(Level.Verbose, "HeartbeatManager::OnNext()");
EvaluatorHeartbeatProto heartbeatProto = GetEvaluatorHeartbeatProto();
LOGGER.Log(Level.Info, string.Format(CultureInfo.InvariantCulture, "Triggered a heartbeat: {0}.", heartbeatProto));
Send(heartbeatProto);
}
}
/// <summary>
/// Called with a specific TaskStatus that must be delivered to the driver
/// </summary>
/// <param name="taskStatusProto"></param>
public void OnNext(TaskStatusProto taskStatusProto)
{
LOGGER.Log(Level.Verbose, "Before acquiring lock: HeartbeatManager::OnNext(TaskStatusProto)");
lock (this)
{
LOGGER.Log(Level.Verbose, "HeartbeatManager::OnNext(TaskStatusProto)");
EvaluatorHeartbeatProto heartbeatProto = GetEvaluatorHeartbeatProto(
EvaluatorRuntime.GetEvaluatorStatus(),
ContextManager.GetContextStatusCollection(),
Optional<TaskStatusProto>.Of(taskStatusProto));
LOGGER.Log(Level.Info, string.Format(CultureInfo.InvariantCulture, "Triggered a heartbeat: {0}.", heartbeatProto));
Send(heartbeatProto);
}
}
/// <summary>
/// Called with a specific ContextStatusProto that must be delivered to the driver
/// </summary>
/// <param name="contextStatusProto"></param>
public void OnNext(ContextStatusProto contextStatusProto)
{
LOGGER.Log(Level.Verbose, "Before acquiring lock: HeartbeatManager::OnNext(ContextStatusProto)");
lock (this)
{
LOGGER.Log(Level.Verbose, "HeartbeatManager::OnNext(ContextStatusProto)");
List<ContextStatusProto> contextStatusProtos = new List<ContextStatusProto>();
contextStatusProtos.Add(contextStatusProto);
contextStatusProtos.AddRange(ContextManager.GetContextStatusCollection());
EvaluatorHeartbeatProto heartbeatProto = GetEvaluatorHeartbeatProto(
EvaluatorRuntime.GetEvaluatorStatus(),
contextStatusProtos,
Optional<TaskStatusProto>.Empty());
LOGGER.Log(Level.Info, string.Format(CultureInfo.InvariantCulture, "Triggered a heartbeat: {0}.", heartbeatProto));
Send(heartbeatProto);
}
}
/// <summary>
/// Called with a specific EvaluatorStatus that must be delivered to the driver
/// </summary>
/// <param name="evaluatorStatusProto"></param>
public void OnNext(EvaluatorStatusProto evaluatorStatusProto)
{
LOGGER.Log(Level.Verbose, "Before acquiring lock: HeartbeatManager::OnNext(EvaluatorStatusProto)");
lock (this)
{
LOGGER.Log(Level.Verbose, "HeartbeatManager::OnNext(EvaluatorStatusProto)");
EvaluatorHeartbeatProto heartbeatProto = new EvaluatorHeartbeatProto()
{
timestamp = CurrentTimeMilliSeconds(),
evaluator_status = evaluatorStatusProto
};
LOGGER.Log(Level.Info, string.Format(CultureInfo.InvariantCulture, "Triggered a heartbeat: {0}.", heartbeatProto));
Send(heartbeatProto);
}
}
public void OnNext(Alarm value)
{
LOGGER.Log(Level.Verbose, "Before acquiring lock: HeartbeatManager::OnNext(Alarm)");
lock (this)
{
LOGGER.Log(Level.Verbose, "HeartbeatManager::OnNext(Alarm)");
if (_evaluatorSettings.OperationState == EvaluatorOperationState.OPERATIONAL && EvaluatorRuntime.State == State.RUNNING)
{
try
{
EvaluatorHeartbeatProto evaluatorHeartbeatProto = GetEvaluatorHeartbeatProto();
LOGGER.Log(Level.Verbose,
string.Format(CultureInfo.InvariantCulture, "Triggered a heartbeat: {0}. {1}Node Health: {2}", evaluatorHeartbeatProto, Environment.NewLine, MachineStatus));
Send(evaluatorHeartbeatProto);
}
catch (Exception e)
{
Utilities.Diagnostics.Exceptions.Caught(e, Level.Error, LOGGER);
EvaluatorRuntime.OnException(e);
}
}
else
{
LOGGER.Log(Level.Verbose, "Ignoring regular heartbeat since Evaluator operation state is [{0}] and runtime state is [{1}]. ", EvaluatorSettings.OperationState, EvaluatorRuntime.State);
// Do not try to recover if Evaluator is done.
if (IsEvaluatorStateCompleted(EvaluatorRuntime.State))
{
return;
}
if (_evaluatorSettings.OperationState == EvaluatorOperationState.RECOVERY)
{
var driverConnection = _driverConnection.Get();
try
{
var driverInformation = driverConnection.GetDriverInformation();
if (driverInformation == null)
{
LOGGER.Log(Level.Verbose,
"In RECOVERY mode, cannot retrieve driver information, will try again later.");
}
else
{
var msg = string.Format(CultureInfo.InvariantCulture,
"Detect driver restarted at {0} and is running on endpoint {1} with services {2}. Now trying to re-establish connection",
driverInformation.DriverStartTime,
driverInformation.DriverRemoteIdentifier,
driverInformation.NameServerId);
LOGGER.Log(Level.Info, msg);
Recover(driverInformation);
}
}
catch (NotImplementedException)
{
LOGGER.Log(Level.Error, "Reaching EvaluatorOperation RECOVERY mode, however, there is no IDriverConnection implemented for HA.");
throw;
}
catch (Exception e)
{
// we do not want any exception to stop the query for driver status
Utilities.Diagnostics.Exceptions.Caught(e, Level.Warning, LOGGER);
}
}
}
_clock.ScheduleAlarm(_heartBeatPeriodInMillSeconds, this);
}
}
public void OnError(Exception error)
{
throw new NotImplementedException();
}
public void OnCompleted()
{
throw new NotImplementedException();
}
private static bool IsEvaluatorStateCompleted(State state)
{
return state == State.DONE || state == State.FAILED || state == State.KILLED;
}
private static long CurrentTimeMilliSeconds()
{
// this is an implementation to get current time in milliseconds counted from Jan 1st, 1970
// it is chosen as such to be compatible with Java implementation
DateTime jan1St1970 = new DateTime(1970, 1, 1, 0, 0, 0, DateTimeKind.Utc);
return (long)(DateTime.UtcNow - jan1St1970).TotalMilliseconds;
}
private void Recover(DriverInformation driverInformation)
{
IPEndPoint driverEndpoint = NetUtilities.ParseIpEndpoint(driverInformation.DriverRemoteIdentifier);
_remoteId = new SocketRemoteIdentifier(driverEndpoint);
_observer = _remoteManager.GetRemoteObserver(new RemoteEventEndPoint<REEFMessage>(_remoteId));
lock (_evaluatorSettings)
{
if (_evaluatorSettings.NameClient != null)
{
try
{
LOGGER.Log(Level.Verbose, "Trying to reset and reconnect to name server" + driverInformation.NameServerId);
_evaluatorSettings.NameClient.Restart(NetUtilities.ParseIpEndpoint(driverInformation.NameServerId));
LOGGER.Log(Level.Info, "Reconnected to name server: " + driverInformation.NameServerId);
}
catch (Exception e)
{
Org.Apache.REEF.Utilities.Diagnostics.Exceptions.Caught(e, Level.Error, LOGGER);
}
}
}
lock (_queuedHeartbeats)
{
bool firstHeartbeatInQueue = true;
while (_queuedHeartbeats.Any())
{
LOGGER.Log(Level.Info, "Sending cached recovery heartbeats to " + _remoteId);
try
{
if (firstHeartbeatInQueue)
{
// first heartbeat is specially constructed to include the recovery flag
EvaluatorHeartbeatProto recoveryHeartbeat = ConstructRecoveryHeartBeat(_queuedHeartbeats.Dequeue());
LOGGER.Log(Level.Info, "Recovery heartbeat to be sent:" + recoveryHeartbeat);
_observer.OnNext(new REEFMessage(recoveryHeartbeat));
firstHeartbeatInQueue = false;
}
else
{
_observer.OnNext(new REEFMessage(_queuedHeartbeats.Dequeue()));
}
}
catch (Exception e)
{
// we do not handle failures during RECOVERY
Org.Apache.REEF.Utilities.Diagnostics.Exceptions.CaughtAndThrow(
e,
Level.Error,
string.Format(CultureInfo.InvariantCulture, "Hearbeat attempt failed in RECOVERY mode to Driver {0} , giving up...", _remoteId),
LOGGER);
}
Thread.Sleep(500);
}
}
_evaluatorSettings.OperationState = EvaluatorOperationState.OPERATIONAL;
ContextManager.HandleDriverConnectionMessage(new DriverConnectionMessageImpl(DriverConnectionState.Reconnected));
LOGGER.Log(Level.Info, "Exiting RECOVERY mode!!!");
}
private EvaluatorHeartbeatProto ConstructRecoveryHeartBeat(EvaluatorHeartbeatProto heartbeat)
{
heartbeat.recovery = true;
heartbeat.context_status.ForEach(c => c.recovery = true);
heartbeat.task_status.recovery = true;
return heartbeat;
}
private EvaluatorHeartbeatProto GetEvaluatorHeartbeatProto()
{
return GetEvaluatorHeartbeatProto(
EvaluatorRuntime.GetEvaluatorStatus(),
ContextManager.GetContextStatusCollection(),
ContextManager.GetTaskStatus());
}
private EvaluatorHeartbeatProto GetEvaluatorHeartbeatProto(
EvaluatorStatusProto evaluatorStatusProto,
ICollection<ContextStatusProto> contextStatusProtos,
Optional<TaskStatusProto> taskStatusProto)
{
EvaluatorHeartbeatProto evaluatorHeartbeatProto = new EvaluatorHeartbeatProto()
{
timestamp = CurrentTimeMilliSeconds(),
evaluator_status = evaluatorStatusProto
};
foreach (ContextStatusProto contextStatusProto in contextStatusProtos)
{
evaluatorHeartbeatProto.context_status.Add(contextStatusProto);
}
if (taskStatusProto.IsPresent())
{
evaluatorHeartbeatProto.task_status = taskStatusProto.Value;
}
return evaluatorHeartbeatProto;
}
}
}