blob: 36916db3af0afc9d57b2e41a283c7c6337a8646f [file] [log] [blame]
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
using System;
using System.Collections.Generic;
using Org.Apache.REEF.Common.Context;
using Org.Apache.REEF.Common.Events;
using Org.Apache.REEF.Common.Services;
using Org.Apache.REEF.IO.PartitionedData;
using Org.Apache.REEF.Tang.Implementations.Tang;
using Org.Apache.REEF.Tang.Util;
using Org.Apache.REEF.Utilities.Diagnostics;
using Org.Apache.REEF.Utilities.Logging;
namespace Org.Apache.REEF.IMRU.OnREEF.Driver
{
/// <summary>
/// Class that handles failed evaluators and also provides Service
/// and Context configuration
/// </summary>
/// <typeparam name="TMapInput"></typeparam>
/// <typeparam name="TMapOutput"></typeparam>
/// <typeparam name="TPartitionType"></typeparam>
internal sealed class ServiceAndContextConfigurationProvider<TMapInput, TMapOutput, TPartitionType>
{
private static readonly Logger Logger = Logger.GetLogger(typeof(ServiceAndContextConfigurationProvider<TMapInput, TMapOutput, TPartitionType>));
private readonly Dictionary<string, string> _partitionIdProvider = new Dictionary<string, string>();
private readonly ISet<string> _submittedEvaluators = new HashSet<string>();
private readonly ISet<string> _contextLoadedEvaluators = new HashSet<string>();
private readonly object _lock = new object();
private readonly Stack<string> _partitionDescriptorIds = new Stack<string>();
private readonly IPartitionedInputDataSet _dataset;
private string _masterEvaluatorId;
internal ServiceAndContextConfigurationProvider(IPartitionedInputDataSet dataset)
{
_dataset = dataset;
foreach (var descriptor in _dataset)
{
_partitionDescriptorIds.Push(descriptor.Id);
}
}
/// <summary>
/// Handles failed evaluator. Moves the id from
/// submitted evaluator to failed evaluator
/// </summary>
/// <param name="evaluatorId"></param>
/// <returns>Whether failed evaluator is master or not</returns>
internal bool RecordEvaluatorFailureById(string evaluatorId)
{
lock (_lock)
{
string msg;
bool isMaster = IsMasterEvaluatorId(evaluatorId);
if (_contextLoadedEvaluators.Contains(evaluatorId))
{
msg =
string.Format(
"Failed evaluator:{0} already had context loaded. Cannot handle failure at this stage",
evaluatorId);
Exceptions.Throw(new Exception(msg), Logger);
}
if (!_submittedEvaluators.Contains(evaluatorId))
{
msg = string.Format("Failed evaluator:{0} was never submitted", evaluatorId);
Exceptions.Throw(new Exception(msg), Logger);
}
if (!_partitionIdProvider.ContainsKey(evaluatorId) && !isMaster)
{
msg = string.Format("Partition descriptor for Failed evaluator:{0} not present", evaluatorId);
Exceptions.Throw(new Exception(msg), Logger);
}
_submittedEvaluators.Remove(evaluatorId);
if (isMaster)
{
Logger.Log(Level.Info, "Failed Evaluator is Master");
_masterEvaluatorId = null;
return true;
}
Logger.Log(Level.Info, "Failed Evaluator is a Mapper");
_partitionDescriptorIds.Push(_partitionIdProvider[evaluatorId]);
_partitionIdProvider.Remove(evaluatorId);
return false;
}
}
/// <summary>
/// Notifies that active context state has been reached
/// </summary>
/// <param name="evaluatorId"></param>
internal void RecordActiveContextPerEvaluatorId(string evaluatorId)
{
lock (_lock)
{
if (!_submittedEvaluators.Contains(evaluatorId))
{
var msg = string.Format("Evaluator:{0} never loaded data but still reached active context stage",
evaluatorId);
Exceptions.Throw(new Exception(msg), Logger);
}
if (_contextLoadedEvaluators.Contains(evaluatorId))
{
var msg = string.Format("Evaluator:{0} already reached the active context stage", evaluatorId);
Exceptions.Throw(new Exception(msg), Logger);
}
_contextLoadedEvaluators.Add(evaluatorId);
_submittedEvaluators.Remove(evaluatorId);
}
}
/// <summary>
/// Gets next context configuration. Either master or mapper.
/// </summary>
/// <param name="evaluatorId">Evaluator Id</param>
/// <returns>The context and service configuration</returns>
internal ContextAndServiceConfiguration GetContextConfigurationForEvaluatorById(string evaluatorId)
{
lock (_lock)
{
if (_submittedEvaluators.Contains(evaluatorId))
{
string msg = string.Format("The context is already submitted to evaluator:{0}", evaluatorId);
Exceptions.Throw(new Exception(msg), Logger);
}
if (_masterEvaluatorId == null)
{
Logger.Log(Level.Info, "Submitting root context and service for master");
_masterEvaluatorId = evaluatorId;
_submittedEvaluators.Add(evaluatorId);
return new ContextAndServiceConfiguration(
ContextConfiguration.ConfigurationModule.Set(ContextConfiguration.Identifier,
IMRUConstants.MasterContextId).Build(),
TangFactory.GetTang().NewConfigurationBuilder().Build());
}
Logger.Log(Level.Info, "Submitting root context and service for a map task");
return GetDataLoadingConfigurationForEvaluatorById(evaluatorId);
}
}
/// <summary>
/// Checks whether evaluator id is that of master
/// </summary>
/// <param name="evaluatorId">Id of evaluator</param>
/// <returns>true if id is that of master</returns>
internal bool IsMasterEvaluatorId(string evaluatorId)
{
return evaluatorId.Equals(_masterEvaluatorId);
}
/// <summary>
/// Gets partition Id for the evaluator
/// </summary>
/// <param name="evaluatorId"></param>
/// <returns></returns>
internal string GetPartitionIdByEvaluatorId(string evaluatorId)
{
lock (_lock)
{
string msg;
if (!_submittedEvaluators.Contains(evaluatorId) && !_contextLoadedEvaluators.Contains(evaluatorId))
{
msg = string.Format("Context for Evaluator:{0} has never been submitted", evaluatorId);
Exceptions.Throw(new IMRUSystemException(msg), Logger);
}
if (IsMasterEvaluatorId(evaluatorId))
{
msg = string.Format("Evaluator:{0} is master and does not get partition", evaluatorId);
Exceptions.Throw(new IMRUSystemException(msg), Logger);
}
if (!_partitionIdProvider.ContainsKey(evaluatorId))
{
msg = string.Format("Partition descriptor for evaluator:{0} is not present in the mapping", evaluatorId);
Exceptions.Throw(new IMRUSystemException(msg), Logger);
}
return _partitionIdProvider[evaluatorId];
}
}
/// <summary>
/// Gives context and service configuration for next evaluator either from failed
/// evaluator or new configuration
/// </summary>
/// <param name="evaluatorId"></param>
/// <returns></returns>
private ContextAndServiceConfiguration GetDataLoadingConfigurationForEvaluatorById(string evaluatorId)
{
string msg;
if (_contextLoadedEvaluators.Contains(evaluatorId))
{
msg = string.Format("Evaluator:{0} already has the data loaded", evaluatorId);
Exceptions.Throw(new IMRUSystemException(msg), Logger);
}
if (_partitionDescriptorIds.Count == 0)
{
Exceptions.Throw(new IMRUSystemException("No more data configuration can be provided"), Logger);
}
if (_partitionIdProvider.ContainsKey(evaluatorId))
{
msg =
string.Format(
"Evaluator Id:{0} already present in configuration cache, they have to be unique",
evaluatorId);
Exceptions.Throw(new IMRUSystemException(msg), Logger);
}
Logger.Log(Level.Info, "Getting a new data loading configuration");
_partitionIdProvider[evaluatorId] = _partitionDescriptorIds.Pop();
_submittedEvaluators.Add(evaluatorId);
msg = string.Format(
"Current status: Submitted Evaluators-{0}, Data Loaded Evaluators-{1}, Unused data partitions-{2}",
_submittedEvaluators.Count,
_contextLoadedEvaluators.Count,
_partitionDescriptorIds.Count);
Logger.Log(Level.Info, msg);
try
{
IPartitionDescriptor partitionDescriptor =
_dataset.GetPartitionDescriptorForId(_partitionIdProvider[evaluatorId]);
return GetDataLoadingContextAndServiceConfiguration(partitionDescriptor, evaluatorId);
}
catch (Exception e)
{
msg = string.Format("Error while trying to access partition descriptor:{0} from dataset",
_partitionIdProvider[evaluatorId]);
Exceptions.Throw(e, msg, Logger);
return null;
}
}
private ContextAndServiceConfiguration GetDataLoadingContextAndServiceConfiguration(
IPartitionDescriptor partitionDescriptor,
string evaluatorId)
{
var dataLoadingContextConf =
TangFactory.GetTang()
.NewConfigurationBuilder()
.BindSetEntry<ContextConfigurationOptions.StartHandlers, DataLoadingContext<TPartitionType>, IObserver<IContextStart>>(
GenericType<ContextConfigurationOptions.StartHandlers>.Class,
GenericType<DataLoadingContext<TPartitionType>>.Class)
.Build();
var serviceConf =
TangFactory.GetTang()
.NewConfigurationBuilder(ServiceConfiguration.ConfigurationModule.Build(),
dataLoadingContextConf,
partitionDescriptor.GetPartitionConfiguration())
.Build();
var contextConf = ContextConfiguration.ConfigurationModule
.Set(ContextConfiguration.Identifier, string.Format("DataLoading-{0}", evaluatorId))
.Build();
return new ContextAndServiceConfiguration(contextConf, serviceConf);
}
}
}