blob: c114e7e7287942d1f5271c50af0badf56c4f82d8 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
using System;
using System.Collections.Generic;
using System.Configuration;
using System.Diagnostics;
using System.Globalization;
using System.IO;
using System.Linq;
using System.Text;
using System.Threading;
using System.Threading.Tasks;
using Org.Apache.REEF.Common.Protobuf.ReefProtocol;
using Org.Apache.REEF.Common.Runtime.Evaluator;
using Org.Apache.REEF.Common.Runtime.Evaluator.Context;
using Org.Apache.REEF.Common.Runtime.Evaluator.Utils;
using Org.Apache.REEF.Common.Services;
using Org.Apache.REEF.Common.Tasks;
using Org.Apache.REEF.Driver.Bridge;
using Org.Apache.REEF.Evaluator.Exceptions;
using Org.Apache.REEF.Tang.Formats;
using Org.Apache.REEF.Tang.Implementations.InjectionPlan;
using Org.Apache.REEF.Tang.Implementations.Tang;
using Org.Apache.REEF.Tang.Interface;
using Org.Apache.REEF.Utilities;
using Org.Apache.REEF.Utilities.Logging;
using Org.Apache.REEF.Wake.Remote;
using Org.Apache.REEF.Wake.Remote.Impl;
using Org.Apache.REEF.Wake.Time.Runtime;
using Org.Apache.REEF.Wake.Time.Runtime.Event;
using Org.Apache.REEF.Wake.Util;
namespace Org.Apache.REEF.Evaluator
{
public sealed class Evaluator
{
private static Logger _logger = Logger.GetLogger(typeof(Evaluator));
private static int _heartbeatPeriodInMs = Constants.DefaultEvaluatorHeartbeatPeriodInMs;
private static int _heartbeatMaxRetry = Constants.DefaultEvaluatorHeartbeatMaxRetry;
private static IInjector _injector;
private static EvaluatorConfigurations _evaluatorConfig;
public static void Main(string[] args)
{
try
{
Console.WriteLine(string.Format(CultureInfo.InvariantCulture, "START: {0} Evaluator::InitInjector.",
DateTime.Now));
Stopwatch timer = new Stopwatch();
InitInjector();
SetCustomTraceListners(); // _logger is reset by this.
timer.Stop();
Console.WriteLine(string.Format(CultureInfo.InvariantCulture,
"EXIT: {0} Evaluator::InitInjector. Duration: [{1}].", DateTime.Now, timer.Elapsed));
using (_logger.LogScope("Evaluator::Main"))
{
// Wait for the debugger, if enabled
AttachDebuggerIfEnabled();
// Register our exception handler
AppDomain.CurrentDomain.UnhandledException += UnhandledExceptionHandler;
// Fetch some settings from the ConfigurationManager
SetHeartbeatPeriod();
SetHeartbeatMaxRetry();
// Parse the command line
// The error handler RID should now be written in the configuration file instead
if (args.Count() != 1)
{
var e = new InvalidOperationException("must supply only the evaluator config file!");
Utilities.Diagnostics.Exceptions.Throw(e, _logger);
}
// evaluator configuraiton file
string evaluatorConfigurationPath = args[0];
// Parse the evaluator configuration.
_evaluatorConfig = new EvaluatorConfigurations(evaluatorConfigurationPath);
string rId = _evaluatorConfig.ErrorHandlerRID;
ContextConfiguration rootContextConfiguration = _evaluatorConfig.RootContextConfiguration;
Optional<TaskConfiguration> rootTaskConfig = _evaluatorConfig.TaskConfiguration;
Optional<ServiceConfiguration> rootServiceConfig = _evaluatorConfig.RootServiceConfiguration;
// remoteManager used as client-only in evaluator
IRemoteManager<REEFMessage> remoteManager = _injector.GetInstance<IRemoteManagerFactory>().GetInstance(new REEFMessageCodec());
IRemoteIdentifier remoteId = new SocketRemoteIdentifier(NetUtilities.ParseIpEndpoint(rId));
RuntimeClock clock = InstantiateClock();
_logger.Log(Level.Info, "Application Id: " + _evaluatorConfig.ApplicationId);
EvaluatorSettings evaluatorSettings = new EvaluatorSettings(
_evaluatorConfig.ApplicationId,
_evaluatorConfig.EvaluatorId,
_heartbeatPeriodInMs,
_heartbeatMaxRetry,
rootContextConfiguration,
clock,
remoteManager,
_injector);
HeartBeatManager heartBeatManager = new HeartBeatManager(evaluatorSettings, remoteId);
ContextManager contextManager = new ContextManager(heartBeatManager, rootServiceConfig,
rootTaskConfig);
EvaluatorRuntime evaluatorRuntime = new EvaluatorRuntime(contextManager, heartBeatManager);
// TODO: replace with injectionFuture
heartBeatManager._evaluatorRuntime = evaluatorRuntime;
heartBeatManager._contextManager = contextManager;
SetRuntimeHandlers(evaluatorRuntime, clock);
Task evaluatorTask = Task.Run(new Action(clock.Run));
evaluatorTask.Wait();
}
}
catch (Exception e)
{
Fail(e);
}
}
/// <summary>
/// Determines whether debugging is enabled.
/// </summary>
/// <returns>true, if debugging is enabled</returns>
private static Boolean IsDebuggingEnabled()
{
var debugEnabledString = Environment.GetEnvironmentVariable("Org.Apache.REEF.EvaluatorDebug");
return !string.IsNullOrWhiteSpace(debugEnabledString) &&
debugEnabledString.Equals("enabled", StringComparison.OrdinalIgnoreCase);
}
/// <summary>
/// Waits for the debugger to be attached.
/// </summary>
private static void AttachDebuggerIfEnabled()
{
if (IsDebuggingEnabled())
{
while (true)
{
if (Debugger.IsAttached)
{
break;
}
else
{
_logger.Log(Level.Info,
"Evaluator in debug mode, waiting for debugger to be attached...");
Thread.Sleep(2000);
}
}
}
}
/// <summary>
/// Sets the heartbeat period from the ConfigurationManager
/// </summary>
private static void SetHeartbeatPeriod()
{
var heartbeatPeriodFromConfig = ConfigurationManager.AppSettings["EvaluatorHeartbeatPeriodInMs"];
var heartbeatPeriod = 0;
if (!string.IsNullOrWhiteSpace(heartbeatPeriodFromConfig) &&
int.TryParse(heartbeatPeriodFromConfig, out heartbeatPeriod))
{
_heartbeatPeriodInMs = heartbeatPeriod;
}
_logger.Log(Level.Verbose,
"Evaluator heartbeat period set to be " + _heartbeatPeriodInMs + " milliSeconds.");
}
/// <summary>
/// Sets the heartbeat retry count from the ConfigurationManager
/// </summary>
private static void SetHeartbeatMaxRetry()
{
var maxHeartbeatRetry = 0;
var heartbeatMaxRetryFromConfig =
ConfigurationManager.AppSettings["EvaluatorHeartbeatRetryMaxTimes"];
if (!string.IsNullOrWhiteSpace(heartbeatMaxRetryFromConfig) &&
int.TryParse(heartbeatMaxRetryFromConfig, out maxHeartbeatRetry))
{
_heartbeatMaxRetry = maxHeartbeatRetry;
}
_logger.Log(Level.Verbose,
"Evaluator heatrbeat max retry set to be " + _heartbeatMaxRetry + " times.");
}
/// <summary>
/// Instantiates the root injector of the Evaluator.
/// </summary>
/// <exception cref="EvaluatorInjectorInstantiationException">If the injector cannot be instantiated.</exception>
private static void InitInjector()
{
try
{
_injector = TangFactory.GetTang().NewInjector(ReadEvaluatorConfiguration());
}
catch (Exception e)
{
throw new EvaluatorInjectorInstantiationException(e);
}
}
/// <summary>
/// Reads the Evaluator Configuration.
/// </summary>
/// <exception cref="EvaluatorConfigurationFileNotFoundException">When the configuration file cannot be found.</exception>
/// <exception cref="EvaluatorConfigurationParseException">When the configuration file exists, but can't be deserialized.</exception>
/// <returns></returns>
private static IConfiguration ReadEvaluatorConfiguration()
{
string clrRuntimeConfigurationFile = Path.Combine(Directory.GetCurrentDirectory(), "reef", "global",
Common.Constants.ClrBridgeRuntimeConfiguration);
if (!File.Exists(clrRuntimeConfigurationFile))
{
throw new EvaluatorConfigurationFileNotFoundException(clrRuntimeConfigurationFile);
}
try
{
return new AvroConfigurationSerializer().FromFile(clrRuntimeConfigurationFile);
}
catch (Exception e)
{
throw new EvaluatorConfigurationParseException(e);
}
}
/// <summary>
/// Instantiates the RuntimeClock
/// </summary>
/// <exception cref="ClockInstantiationException">When the clock can't be instantiated.</exception>
/// <returns></returns>
private static RuntimeClock InstantiateClock()
{
IConfiguration clockConfiguraiton = new ConfigurationModuleBuilder().Build().Build();
try
{
return TangFactory.GetTang().NewInjector(clockConfiguraiton).GetInstance<RuntimeClock>();
}
catch (Exception exception)
{
throw new ClockInstantiationException("Unable to instantiate the clock", exception);
}
}
private static void SetCustomTraceListners()
{
ISet<TraceListener> customTraceListeners;
try
{
customTraceListeners = _injector.GetInstance<CustomTraceListeners>().Listeners;
}
catch (Exception e)
{
Utilities.Diagnostics.Exceptions.Caught(e, Level.Error, _logger);
// custom trace listner not set properly, use empty set
customTraceListeners = new HashSet<TraceListener>();
}
foreach (TraceListener listener in customTraceListeners)
{
Logger.AddTraceListner(listener);
}
_logger = Logger.GetLogger(typeof(Evaluator));
CustomTraceLevel traceLevel = _injector.GetInstance<CustomTraceLevel>();
Logger.SetCustomLevel(traceLevel.TraceLevel);
}
private static void UnhandledExceptionHandler(object sender, UnhandledExceptionEventArgs e)
{
Fail((Exception)e.ExceptionObject);
}
private static string GetDirectoryListing(string path, StringBuilder resultBuilder = null)
{
if (null == resultBuilder)
{
resultBuilder = new StringBuilder();
}
// First, add the files to the listing
var files = Directory.GetFiles(path).Select(e => Path.Combine(path, e));
resultBuilder.Append(string.Join(", ", files));
// Second, add the directories recursively
var dirs = Directory.GetDirectories(path).Select(e => Path.Combine(path, e));
foreach (var dir in dirs)
{
GetDirectoryListing(dir, resultBuilder);
}
return resultBuilder.ToString();
}
// set the handlers for runtimeclock manually
// we only need runtimestart and runtimestop handlers now
private static void SetRuntimeHandlers(EvaluatorRuntime evaluatorRuntime, RuntimeClock clock)
{
ISet<IObserver<RuntimeStart>> runtimeStarts = new HashSet<IObserver<RuntimeStart>> {evaluatorRuntime};
InjectionFutureImpl<ISet<IObserver<RuntimeStart>>> injectRuntimeStart = new InjectionFutureImpl<ISet<IObserver<RuntimeStart>>>(runtimeStarts);
clock.InjectedRuntimeStartHandler = injectRuntimeStart;
ISet<IObserver<RuntimeStop>> runtimeStops = new HashSet<IObserver<RuntimeStop>> { evaluatorRuntime };
InjectionFutureImpl<ISet<IObserver<RuntimeStop>>> injectRuntimeStop = new InjectionFutureImpl<ISet<IObserver<RuntimeStop>>>(runtimeStops);
clock.InjectedRuntimeStopHandler = injectRuntimeStop;
}
private static void Fail(Exception ex)
{
var message = "Unhandled exception caught in Evaluator. Current files in the working directory: " +
GetDirectoryListing(Directory.GetCurrentDirectory());
_logger.Log(Level.Error, message, ex);
throw ex;
}
}
}