blob: 07e7a5304e23a68ba6c7a32317daae1cd7a6c74c [file] [log] [blame]
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
using System;
using System.Collections.Generic;
using System.Diagnostics;
using System.Globalization;
using System.IO;
using System.Linq;
using System.Threading;
using Org.Apache.REEF.Common.Files;
using Org.Apache.REEF.Common.Runtime.Evaluator;
using Org.Apache.REEF.Common.Runtime.Evaluator.Utils;
using Org.Apache.REEF.Driver.Bridge;
using Org.Apache.REEF.Evaluator.Exceptions;
using Org.Apache.REEF.Tang.Annotations;
using Org.Apache.REEF.Tang.Formats;
using Org.Apache.REEF.Tang.Implementations.InjectionPlan;
using Org.Apache.REEF.Tang.Implementations.Tang;
using Org.Apache.REEF.Tang.Interface;
using Org.Apache.REEF.Utilities.Logging;
using Org.Apache.REEF.Wake.Time.Runtime;
using Org.Apache.REEF.Wake.Time.Runtime.Event;
namespace Org.Apache.REEF.Evaluator
{
public sealed class Evaluator
{
private static Logger logger = Logger.GetLogger(typeof(Evaluator));
private readonly RuntimeClock _clock;
[Inject]
private Evaluator(
RuntimeClock clock,
EvaluatorRuntime evaluatorRuntime,
CustomTraceListeners customTraceListeners,
CustomTraceLevel customTraceLevel)
{
_clock = clock;
SetCustomTraceListeners(customTraceListeners, customTraceLevel);
SetRuntimeHandlers(evaluatorRuntime, clock);
}
private void Run()
{
_clock.Run();
}
/// <summary>
/// The command line to run it is "evaluator.exe evaluator.config"
/// </summary>
/// <param name="args"></param>
public static void Main(string[] args)
{
try
{
if (args.Count() != 1)
{
var e = new InvalidOperationException("Must supply only the evaluator.config file!");
Utilities.Diagnostics.Exceptions.Throw(e, logger);
}
if (IsDebuggingEnabled())
{
AttachDebugger();
}
AppDomain.CurrentDomain.UnhandledException += UnhandledExceptionHandler;
Evaluator evaluator = TangFactory.GetTang()
.NewInjector(ReadClrBridgeConfiguration(), ReadEvaluatorConfiguration(args[0]))
.GetInstance<Evaluator>();
evaluator.Run();
logger.Log(Level.Info, "Evaluator is returned from Run()");
}
catch (Exception e)
{
Fail(e);
}
}
/// <summary>
/// Determines whether debugging is enabled.
/// </summary>
/// <returns>true, if debugging is enabled</returns>
private static bool IsDebuggingEnabled()
{
var debugEnabledString = Environment.GetEnvironmentVariable("Org.Apache.REEF.EvaluatorDebug");
return !string.IsNullOrWhiteSpace(debugEnabledString) &&
debugEnabledString.Equals("enabled", StringComparison.OrdinalIgnoreCase);
}
/// <summary>
/// Waits for the debugger to be attached.
/// </summary>
private static void AttachDebugger()
{
// Wait for the debugger
while (true)
{
if (Debugger.IsAttached)
{
break;
}
logger.Log(Level.Info, "Evaluator in debug mode, waiting for debugger to be attached...");
Thread.Sleep(2000);
}
}
/// <summary>
/// Reads the Evaluator Configuration.
/// </summary>
/// <exception cref="EvaluatorConfigurationFileNotFoundException">When the configuration file cannot be found.</exception>
/// <exception cref="EvaluatorConfigurationParseException">When the configuration file exists, but can't be deserialized.</exception>
/// <returns></returns>
private static IConfiguration ReadClrBridgeConfiguration()
{
var clrRuntimeConfigurationFile = Path.Combine(Directory.GetCurrentDirectory(), "reef", "global",
new REEFFileNames().GetClrBridgeConfigurationName());
if (!File.Exists(clrRuntimeConfigurationFile))
{
throw new EvaluatorConfigurationFileNotFoundException(clrRuntimeConfigurationFile);
}
try
{
var clrDriverConfig = new AvroConfigurationSerializer().FromFile(clrRuntimeConfigurationFile);
logger.Log(Level.Info,
string.Format(CultureInfo.CurrentCulture, "Clr Driver Configuration is deserialized from file {0}:", clrRuntimeConfigurationFile));
return clrDriverConfig;
}
catch (Exception e)
{
throw new EvaluatorConfigurationParseException(e);
}
}
// TODO[JIRA REEF-217]: Remove this method.
private static IConfiguration ReadEvaluatorConfiguration(string evaluatorConfigFile)
{
if (string.IsNullOrWhiteSpace(evaluatorConfigFile))
{
Utilities.Diagnostics.Exceptions.Throw(new ArgumentNullException("configFile"), logger);
}
if (!File.Exists(evaluatorConfigFile))
{
Utilities.Diagnostics.Exceptions.Throw(new FileNotFoundException("cannot find file " + evaluatorConfigFile), logger);
}
AvroConfigurationSerializer serializer = new AvroConfigurationSerializer();
var classHierarchy = TangFactory.GetTang()
.GetClassHierarchy(new string[] { typeof(ApplicationIdentifier).Assembly.GetName().Name });
var evaluatorConfiguration = serializer.FromFile(evaluatorConfigFile, classHierarchy);
logger.Log(Level.Info,
string.Format(CultureInfo.CurrentCulture, "Evaluator Configuration is deserialized from file {0}:", evaluatorConfigFile));
return evaluatorConfiguration;
}
private static void SetCustomTraceListeners(CustomTraceListeners customTraceListener, CustomTraceLevel traceLevel)
{
ISet<TraceListener> customTraceListeners = customTraceListener.Listeners;
foreach (TraceListener listener in customTraceListeners)
{
Logger.AddTraceListener(listener);
}
logger = Logger.GetLogger(typeof(Evaluator));
Logger.SetCustomLevel(traceLevel.TraceLevel);
}
private static void UnhandledExceptionHandler(object sender, UnhandledExceptionEventArgs e)
{
Fail((Exception)e.ExceptionObject);
}
// set the handlers for runtimeclock manually
// we only need runtimestart and runtimestop handlers now
private static void SetRuntimeHandlers(EvaluatorRuntime evaluatorRuntime, RuntimeClock clock)
{
ISet<IObserver<RuntimeStart>> runtimeStarts = new HashSet<IObserver<RuntimeStart>> { evaluatorRuntime };
clock.InjectedRuntimeStartHandler = new InjectionFutureImpl<ISet<IObserver<RuntimeStart>>>(runtimeStarts);
ISet<IObserver<RuntimeStop>> runtimeStops = new HashSet<IObserver<RuntimeStop>> { evaluatorRuntime };
clock.InjectedRuntimeStopHandler = new InjectionFutureImpl<ISet<IObserver<RuntimeStop>>>(runtimeStops);
}
private static void Fail(Exception ex)
{
var message = "Unhandled exception caught in Evaluator. Current files in the working directory: " +
string.Join(", ", Directory.EnumerateFiles(Directory.GetCurrentDirectory(), "*.*", SearchOption.AllDirectories));
Utilities.Diagnostics.Exceptions.Throw(ex, message, logger);
}
}
}