| // Licensed to the Apache Software Foundation (ASF) under one |
| // or more contributor license agreements. See the NOTICE file |
| // distributed with this work for additional information |
| // regarding copyright ownership. The ASF licenses this file |
| // to you under the Apache License, Version 2.0 (the |
| // "License"); you may not use this file except in compliance |
| // with the License. You may obtain a copy of the License at |
| // |
| // http://www.apache.org/licenses/LICENSE-2.0 |
| // |
| // Unless required by applicable law or agreed to in writing, |
| // software distributed under the License is distributed on an |
| // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| // KIND, either express or implied. See the License for the |
| // specific language governing permissions and limitations |
| // under the License. |
| |
| using System; |
| using System.Collections.Concurrent; |
| using System.Collections.Generic; |
| using System.Linq; |
| using System.Threading; |
| using Org.Apache.REEF.Common.Tasks; |
| using Org.Apache.REEF.Driver; |
| using Org.Apache.REEF.Driver.Context; |
| using Org.Apache.REEF.Driver.Evaluator; |
| using Org.Apache.REEF.Driver.Task; |
| using Org.Apache.REEF.IMRU.API; |
| using Org.Apache.REEF.IMRU.OnREEF.IMRUTasks; |
| using Org.Apache.REEF.IMRU.OnREEF.MapInputWithControlMessage; |
| using Org.Apache.REEF.IMRU.OnREEF.Parameters; |
| using Org.Apache.REEF.IMRU.OnREEF.ResultHandler; |
| using Org.Apache.REEF.IO.PartitionedData; |
| using Org.Apache.REEF.Network.Group.Config; |
| using Org.Apache.REEF.Network.Group.Driver; |
| using Org.Apache.REEF.Network.Group.Driver.Impl; |
| using Org.Apache.REEF.Network.Group.Pipelining; |
| using Org.Apache.REEF.Network.Group.Pipelining.Impl; |
| using Org.Apache.REEF.Network.Group.Topology; |
| using Org.Apache.REEF.Tang.Annotations; |
| using Org.Apache.REEF.Tang.Exceptions; |
| using Org.Apache.REEF.Tang.Implementations.Configuration; |
| using Org.Apache.REEF.Tang.Implementations.Tang; |
| using Org.Apache.REEF.Tang.Interface; |
| using Org.Apache.REEF.Tang.Util; |
| using Org.Apache.REEF.Utilities.Diagnostics; |
| using Org.Apache.REEF.Utilities.Logging; |
| |
| namespace Org.Apache.REEF.IMRU.OnREEF.Driver |
| { |
| /// <summary> |
| /// Implements the IMRU driver on REEF |
| /// </summary> |
| /// <typeparam name="TMapInput">Map Input</typeparam> |
| /// <typeparam name="TMapOutput">Map output</typeparam> |
| /// <typeparam name="TResult">Result</typeparam> |
| /// <typeparam name="TPartitionType">Type of data partition (Generic type in IInputPartition)</typeparam> |
| internal sealed class IMRUDriver<TMapInput, TMapOutput, TResult, TPartitionType> |
| : IObserver<IDriverStarted>, |
| IObserver<IAllocatedEvaluator>, |
| IObserver<IActiveContext>, |
| IObserver<ICompletedTask>, |
| IObserver<IFailedEvaluator>, |
| IObserver<IFailedContext>, |
| IObserver<IFailedTask> |
| { |
| private static readonly Logger Logger = |
| Logger.GetLogger(typeof(IMRUDriver<TMapInput, TMapOutput, TResult, TPartitionType>)); |
| |
| private readonly ConfigurationManager _configurationManager; |
| private readonly int _totalMappers; |
| private readonly IEvaluatorRequestor _evaluatorRequestor; |
| private ICommunicationGroupDriver _commGroup; |
| private readonly IGroupCommDriver _groupCommDriver; |
| private readonly TaskStarter _groupCommTaskStarter; |
| private readonly ConcurrentStack<IConfiguration> _perMapperConfiguration; |
| private readonly int _coresPerMapper; |
| private readonly int _coresForUpdateTask; |
| private readonly int _memoryPerMapper; |
| private readonly int _memoryForUpdateTask; |
| private readonly ISet<IPerMapperConfigGenerator> _perMapperConfigs; |
| private readonly ISet<ICompletedTask> _completedTasks = new HashSet<ICompletedTask>(); |
| private readonly int _allowedFailedEvaluators; |
| private int _currentFailedEvaluators = 0; |
| private readonly bool _invokeGC; |
| private int _numberOfReadyTasks = 0; |
| |
| private readonly ServiceAndContextConfigurationProvider<TMapInput, TMapOutput, TPartitionType> |
| _serviceAndContextConfigurationProvider; |
| |
| [Inject] |
| private IMRUDriver(IPartitionedInputDataSet dataSet, |
| [Parameter(typeof(PerMapConfigGeneratorSet))] ISet<IPerMapperConfigGenerator> perMapperConfigs, |
| ConfigurationManager configurationManager, |
| IEvaluatorRequestor evaluatorRequestor, |
| [Parameter(typeof(CoresPerMapper))] int coresPerMapper, |
| [Parameter(typeof(CoresForUpdateTask))] int coresForUpdateTask, |
| [Parameter(typeof(MemoryPerMapper))] int memoryPerMapper, |
| [Parameter(typeof(MemoryForUpdateTask))] int memoryForUpdateTask, |
| [Parameter(typeof(AllowedFailedEvaluatorsFraction))] double failedEvaluatorsFraction, |
| [Parameter(typeof(InvokeGC))] bool invokeGC, |
| IGroupCommDriver groupCommDriver) |
| { |
| _configurationManager = configurationManager; |
| _evaluatorRequestor = evaluatorRequestor; |
| _groupCommDriver = groupCommDriver; |
| _coresPerMapper = coresPerMapper; |
| _coresForUpdateTask = coresForUpdateTask; |
| _memoryPerMapper = memoryPerMapper; |
| _memoryForUpdateTask = memoryForUpdateTask; |
| _perMapperConfigs = perMapperConfigs; |
| _totalMappers = dataSet.Count; |
| |
| _allowedFailedEvaluators = (int)(failedEvaluatorsFraction * dataSet.Count); |
| _invokeGC = invokeGC; |
| |
| AddGroupCommunicationOperators(); |
| _groupCommTaskStarter = new TaskStarter(_groupCommDriver, _totalMappers + 1); |
| _perMapperConfiguration = ConstructPerMapperConfigStack(_totalMappers); |
| _serviceAndContextConfigurationProvider = |
| new ServiceAndContextConfigurationProvider<TMapInput, TMapOutput, TPartitionType>(dataSet); |
| |
| var msg = |
| string.Format("map task memory:{0}, update task memory:{1}, map task cores:{2}, update task cores:{3}", |
| _memoryPerMapper, |
| _memoryForUpdateTask, |
| _coresPerMapper, |
| _coresForUpdateTask); |
| Logger.Log(Level.Info, msg); |
| } |
| |
| /// <summary> |
| /// Requests for evaluator for update task |
| /// </summary> |
| /// <param name="value">Event fired when driver started</param> |
| public void OnNext(IDriverStarted value) |
| { |
| RequestUpdateEvaluator(); |
| //// TODO[REEF-598]: Set a timeout for this request to be satisfied. If it is not within that time, exit the Driver. |
| } |
| |
| /// <summary> |
| /// Specifies context and service configuration for evaluator depending |
| /// on whether it is for Update function or for map function |
| /// </summary> |
| /// <param name="allocatedEvaluator">The allocated evaluator</param> |
| public void OnNext(IAllocatedEvaluator allocatedEvaluator) |
| { |
| var configs = |
| _serviceAndContextConfigurationProvider.GetContextConfigurationForEvaluatorById(allocatedEvaluator.Id); |
| allocatedEvaluator.SubmitContextAndService(configs.Context, configs.Service); |
| } |
| |
| /// <summary> |
| /// Specifies the Map or Update task to run on the active context |
| /// </summary> |
| /// <param name="activeContext"></param> |
| public void OnNext(IActiveContext activeContext) |
| { |
| Logger.Log(Level.Verbose, string.Format("Received Active Context {0}", activeContext.Id)); |
| |
| if (_serviceAndContextConfigurationProvider.IsMasterEvaluatorId(activeContext.EvaluatorId)) |
| { |
| Logger.Log(Level.Verbose, "Submitting master task"); |
| _commGroup.AddTask(IMRUConstants.UpdateTaskName); |
| _groupCommTaskStarter.QueueTask(GetUpdateTaskConfiguration(), activeContext); |
| RequestMapEvaluators(_totalMappers); |
| } |
| else |
| { |
| Logger.Log(Level.Verbose, "Submitting map task"); |
| _serviceAndContextConfigurationProvider.RecordActiveContextPerEvaluatorId(activeContext.EvaluatorId); |
| string taskId = GetTaskIdByEvaluatorId(activeContext.EvaluatorId); |
| _commGroup.AddTask(taskId); |
| _groupCommTaskStarter.QueueTask(GetMapTaskConfiguration(activeContext, taskId), activeContext); |
| Interlocked.Increment(ref _numberOfReadyTasks); |
| Logger.Log(Level.Verbose, string.Format("{0} Tasks are ready for submission", _numberOfReadyTasks)); |
| } |
| } |
| |
| /// <summary> |
| /// Specifies what to do when the task is completed |
| /// In this case just disposes off the task |
| /// </summary> |
| /// <param name="completedTask">The link to the completed task</param> |
| public void OnNext(ICompletedTask completedTask) |
| { |
| lock (_completedTasks) |
| { |
| Logger.Log(Level.Info, |
| string.Format("Received completed task message from task Id: {0}", completedTask.Id)); |
| _completedTasks.Add(completedTask); |
| |
| if (AreIMRUTasksCompleted()) |
| { |
| ShutDownAllEvaluators(); |
| } |
| } |
| } |
| |
| /// <summary> |
| /// Specifies what to do when evaluator fails. |
| /// If we get all completed tasks then ignore the failure |
| /// Else request a new evaluator. If failure happens in middle of IMRU |
| /// job we expect neighboring evaluators to fail while doing |
| /// communication and will use FailedTask and FailedContext logic to |
| /// order shutdown. |
| /// </summary> |
| /// <param name="value"></param> |
| public void OnNext(IFailedEvaluator value) |
| { |
| if (AreIMRUTasksCompleted()) |
| { |
| Logger.Log(Level.Info, |
| string.Format("Evaluator with Id: {0} failed but IMRU task is completed. So ignoring.", value.Id)); |
| return; |
| } |
| |
| Logger.Log(Level.Info, |
| string.Format("Evaluator with Id: {0} failed with Exception: {1}", value.Id, value.EvaluatorException)); |
| int currFailedEvaluators = Interlocked.Increment(ref _currentFailedEvaluators); |
| if (currFailedEvaluators > _allowedFailedEvaluators) |
| { |
| Exceptions.Throw(new MaximumNumberOfEvaluatorFailuresExceededException(_allowedFailedEvaluators), |
| Logger); |
| } |
| |
| _serviceAndContextConfigurationProvider.RecordEvaluatorFailureById(value.Id); |
| bool isMaster = _serviceAndContextConfigurationProvider.IsMasterEvaluatorId(value.Id); |
| |
| // If failed evaluator is master then ask for master |
| // evaluator else ask for mapper evaluator |
| if (!isMaster) |
| { |
| Logger.Log(Level.Info, string.Format("Requesting a replacement map Evaluator for {0}", value.Id)); |
| RequestMapEvaluators(1); |
| } |
| else |
| { |
| Logger.Log(Level.Info, string.Format("Requesting a replacement master Evaluator for {0}", value.Id)); |
| RequestUpdateEvaluator(); |
| } |
| } |
| |
| /// <summary> |
| /// Specifies what to do if Failed Context is received. |
| /// An exception is thrown if tasks are not completed. |
| /// </summary> |
| /// <param name="value"></param> |
| public void OnNext(IFailedContext value) |
| { |
| if (AreIMRUTasksCompleted()) |
| { |
| Logger.Log(Level.Info, |
| string.Format("Context with Id: {0} failed but IMRU task is completed. So ignoring.", value.Id)); |
| return; |
| } |
| Exceptions.Throw(new Exception(string.Format("Data Loading Context with Id: {0} failed", value.Id)), Logger); |
| } |
| |
| /// <summary> |
| /// Specifies what to do if a task fails. |
| /// We throw the exception and fail IMRU unless IMRU job is already done. |
| /// </summary> |
| /// <param name="value"></param> |
| public void OnNext(IFailedTask value) |
| { |
| if (AreIMRUTasksCompleted()) |
| { |
| Logger.Log(Level.Info, |
| string.Format("Task with Id: {0} failed but IMRU task is completed. So ignoring.", value.Id)); |
| return; |
| } |
| Exceptions.Throw(new Exception(string.Format("Task with Id: {0} failed", value.Id)), Logger); |
| } |
| |
| public void OnError(Exception error) |
| { |
| } |
| |
| public void OnCompleted() |
| { |
| } |
| |
| private bool AreIMRUTasksCompleted() |
| { |
| return _completedTasks.Count >= _totalMappers + 1; |
| } |
| |
| private string GetTaskIdByEvaluatorId(string evaluatorId) |
| { |
| return string.Format("{0}-{1}-Version0", |
| IMRUConstants.MapTaskPrefix, |
| _serviceAndContextConfigurationProvider.GetPartitionIdByEvaluatorId(evaluatorId)); |
| } |
| |
| /// <summary> |
| /// Shuts down evaluators once all completed task messages are received |
| /// </summary> |
| private void ShutDownAllEvaluators() |
| { |
| foreach (var task in _completedTasks) |
| { |
| Logger.Log(Level.Info, string.Format("Disposing task: {0}", task.Id)); |
| task.ActiveContext.Dispose(); |
| } |
| } |
| |
| /// <summary> |
| /// Generates map task configuration given the active context. |
| /// Merge configurations of all the inputs to the MapTaskHost. |
| /// </summary> |
| /// <param name="activeContext">Active context to which task needs to be submitted</param> |
| /// <param name="taskId">Task Id</param> |
| /// <returns>Map task configuration</returns> |
| private IConfiguration GetMapTaskConfiguration(IActiveContext activeContext, string taskId) |
| { |
| IConfiguration mapSpecificConfig; |
| |
| if (!_perMapperConfiguration.TryPop(out mapSpecificConfig)) |
| { |
| Exceptions.Throw( |
| new IMRUSystemException(string.Format("No per map configuration exist for the active context {0}", |
| activeContext.Id)), |
| Logger); |
| } |
| |
| return TangFactory.GetTang() |
| .NewConfigurationBuilder(TaskConfiguration.ConfigurationModule |
| .Set(TaskConfiguration.Identifier, taskId) |
| .Set(TaskConfiguration.Task, GenericType<MapTaskHost<TMapInput, TMapOutput>>.Class) |
| .Build(), |
| _configurationManager.MapFunctionConfiguration, |
| mapSpecificConfig, |
| GetGroupCommConfiguration()) |
| .BindNamedParameter<InvokeGC, bool>(GenericType<InvokeGC>.Class, _invokeGC.ToString()) |
| .Build(); |
| } |
| |
| /// <summary> |
| /// Generates the update task configuration. |
| /// Merge configurations of all the inputs to the UpdateTaskHost. |
| /// </summary> |
| /// <returns>Update task configuration</returns> |
| private IConfiguration GetUpdateTaskConfiguration() |
| { |
| var partialTaskConf = |
| TangFactory.GetTang() |
| .NewConfigurationBuilder(TaskConfiguration.ConfigurationModule |
| .Set(TaskConfiguration.Identifier, |
| IMRUConstants.UpdateTaskName) |
| .Set(TaskConfiguration.Task, |
| GenericType<UpdateTaskHost<TMapInput, TMapOutput, TResult>>.Class) |
| .Build(), |
| _configurationManager.UpdateFunctionConfiguration, |
| _configurationManager.ResultHandlerConfiguration, |
| GetGroupCommConfiguration()) |
| .BindNamedParameter<InvokeGC, bool>(GenericType<InvokeGC>.Class, _invokeGC.ToString()) |
| .Build(); |
| |
| // This piece of code basically checks if user has given any implementation |
| // of IIMRUResultHandler. If not then bind it to default implementation which |
| // does nothing. For interfaces with generic type we cannot assign default |
| // implementation. |
| try |
| { |
| TangFactory.GetTang() |
| .NewInjector(partialTaskConf) |
| .GetInstance<IIMRUResultHandler<TResult>>(); |
| } |
| catch (InjectionException) |
| { |
| partialTaskConf = TangFactory.GetTang().NewConfigurationBuilder(partialTaskConf) |
| .BindImplementation(GenericType<IIMRUResultHandler<TResult>>.Class, |
| GenericType<DefaultResultHandler<TResult>>.Class) |
| .Build(); |
| Logger.Log(Level.Info, |
| "User has not given any way to handle IMRU result, defaulting to ignoring it"); |
| } |
| return partialTaskConf; |
| } |
| |
| /// <summary> |
| /// Generate the group communicaiton configuration to be added |
| /// to the tasks |
| /// </summary> |
| /// <returns>The group communication configuration</returns> |
| private IConfiguration GetGroupCommConfiguration() |
| { |
| var codecConfig = |
| TangFactory.GetTang() |
| .NewConfigurationBuilder( |
| StreamingCodecConfiguration<MapInputWithControlMessage<TMapInput>>.Conf.Set( |
| StreamingCodecConfiguration<MapInputWithControlMessage<TMapInput>>.Codec, |
| GenericType<MapInputWithControlMessageCodec<TMapInput>>.Class).Build(), |
| StreamingCodecConfigurationMinusMessage<TMapOutput>.Conf.Build(), |
| _configurationManager.UpdateFunctionCodecsConfiguration) |
| .Build(); |
| |
| return Configurations.Merge(_groupCommDriver.GetServiceConfiguration(), codecConfig); |
| } |
| |
| /// <summary> |
| /// Adds broadcast and reduce operators to the default communication group |
| /// </summary> |
| private void AddGroupCommunicationOperators() |
| { |
| var reduceFunctionConfig = _configurationManager.ReduceFunctionConfiguration; |
| var mapOutputPipelineDataConverterConfig = _configurationManager.MapOutputPipelineDataConverterConfiguration; |
| var mapInputPipelineDataConverterConfig = _configurationManager.MapInputPipelineDataConverterConfiguration; |
| |
| try |
| { |
| TangFactory.GetTang() |
| .NewInjector(mapInputPipelineDataConverterConfig) |
| .GetInstance<IPipelineDataConverter<TMapInput>>(); |
| |
| mapInputPipelineDataConverterConfig = |
| TangFactory.GetTang() |
| .NewConfigurationBuilder(mapInputPipelineDataConverterConfig) |
| .BindImplementation( |
| GenericType<IPipelineDataConverter<MapInputWithControlMessage<TMapInput>>>.Class, |
| GenericType<MapInputwithControlMessagePipelineDataConverter<TMapInput>>.Class) |
| .Build(); |
| } |
| catch (Exception) |
| { |
| mapInputPipelineDataConverterConfig = TangFactory.GetTang() |
| .NewConfigurationBuilder() |
| .BindImplementation( |
| GenericType<IPipelineDataConverter<MapInputWithControlMessage<TMapInput>>>.Class, |
| GenericType<DefaultPipelineDataConverter<MapInputWithControlMessage<TMapInput>>>.Class) |
| .Build(); |
| } |
| |
| try |
| { |
| TangFactory.GetTang() |
| .NewInjector(mapOutputPipelineDataConverterConfig) |
| .GetInstance<IPipelineDataConverter<TMapOutput>>(); |
| } |
| catch (Exception) |
| { |
| mapOutputPipelineDataConverterConfig = |
| TangFactory.GetTang() |
| .NewConfigurationBuilder() |
| .BindImplementation(GenericType<IPipelineDataConverter<TMapOutput>>.Class, |
| GenericType<DefaultPipelineDataConverter<TMapOutput>>.Class) |
| .Build(); |
| } |
| |
| _commGroup = |
| _groupCommDriver.DefaultGroup |
| .AddBroadcast<MapInputWithControlMessage<TMapInput>>( |
| IMRUConstants.BroadcastOperatorName, |
| IMRUConstants.UpdateTaskName, |
| TopologyTypes.Tree, |
| mapInputPipelineDataConverterConfig) |
| .AddReduce<TMapOutput>( |
| IMRUConstants.ReduceOperatorName, |
| IMRUConstants.UpdateTaskName, |
| TopologyTypes.Tree, |
| reduceFunctionConfig, |
| mapOutputPipelineDataConverterConfig) |
| .Build(); |
| } |
| |
| /// <summary> |
| /// Construct the stack of map configuraion which |
| /// is specific to each mapper. If user does not |
| /// specify any then its empty configuration |
| /// </summary> |
| /// <param name="totalMappers">Total mappers</param> |
| /// <returns>Stack of configuration</returns> |
| private ConcurrentStack<IConfiguration> ConstructPerMapperConfigStack(int totalMappers) |
| { |
| var perMapperConfiguration = new ConcurrentStack<IConfiguration>(); |
| for (int i = 0; i < totalMappers; i++) |
| { |
| var emptyConfig = TangFactory.GetTang().NewConfigurationBuilder().Build(); |
| IConfiguration config = _perMapperConfigs.Aggregate(emptyConfig, |
| (current, configGenerator) => |
| Configurations.Merge(current, configGenerator.GetMapperConfiguration(i, totalMappers))); |
| perMapperConfiguration.Push(config); |
| } |
| return perMapperConfiguration; |
| } |
| |
| /// <summary> |
| /// Request map evaluators from resource manager |
| /// </summary> |
| /// <param name="numEvaluators">Number of evaluators to request</param> |
| private void RequestMapEvaluators(int numEvaluators) |
| { |
| _evaluatorRequestor.Submit( |
| _evaluatorRequestor.NewBuilder() |
| .SetMegabytes(_memoryPerMapper) |
| .SetNumber(numEvaluators) |
| .SetCores(_coresPerMapper) |
| .Build()); |
| } |
| |
| /// <summary> |
| /// Request update/master evaluator from resource manager |
| /// </summary> |
| private void RequestUpdateEvaluator() |
| { |
| _evaluatorRequestor.Submit( |
| _evaluatorRequestor.NewBuilder() |
| .SetCores(_coresForUpdateTask) |
| .SetMegabytes(_memoryForUpdateTask) |
| .SetNumber(1) |
| .Build()); |
| } |
| } |
| } |