blob: 0d4e27b9ef5fe8d780152dd134d5aaad81756e70 [file] [log] [blame]
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
using System;
using System.Collections.Generic;
using System.Globalization;
using Org.Apache.REEF.Common.Context;
using Org.Apache.REEF.Common.Events;
using Org.Apache.REEF.Common.Services;
using Org.Apache.REEF.IO.PartitionedData;
using Org.Apache.REEF.Tang.Implementations.Configuration;
using Org.Apache.REEF.Tang.Implementations.Tang;
using Org.Apache.REEF.Tang.Util;
using Org.Apache.REEF.Utilities.Attributes;
using Org.Apache.REEF.Utilities.Diagnostics;
using Org.Apache.REEF.Utilities.Logging;
namespace Org.Apache.REEF.IMRU.OnREEF.Driver
{
/// <summary>
/// Class that provides Service and Context configuration
/// </summary>
/// <typeparam name="TMapInput"></typeparam>
/// <typeparam name="TMapOutput"></typeparam>
/// <typeparam name="TPartitionType"></typeparam>
[NotThreadSafe]
internal sealed class ServiceAndContextConfigurationProvider<TMapInput, TMapOutput, TPartitionType>
{
private static readonly Logger Logger = Logger.GetLogger(typeof(ServiceAndContextConfigurationProvider<TMapInput, TMapOutput, TPartitionType>));
private readonly Dictionary<string, string> _partitionIdProvider = new Dictionary<string, string>();
private readonly Stack<string> _partitionDescriptorIds = new Stack<string>();
private readonly IPartitionedInputDataSet _dataset;
private readonly ConfigurationManager _configurationManager;
/// <summary>
/// Constructs the object witch maintains partitionDescriptor Ids so that to provide proper data load configuration
/// </summary>
/// <param name="dataset">partition input dataset</param>
/// <param name="configurationManager">Configuration manager that holds configurations for context and tasks</param>
internal ServiceAndContextConfigurationProvider(IPartitionedInputDataSet dataset, ConfigurationManager configurationManager)
{
_dataset = dataset;
foreach (var descriptor in _dataset)
{
_partitionDescriptorIds.Push(descriptor.Id);
}
_configurationManager = configurationManager;
}
/// <summary>
/// Handles failed evaluator. Push the partitionId back to Partition Descriptor Id stack and
/// remove evaluatorId from Partition Id Provider collection
/// </summary>
/// <param name="evaluatorId"></param>
/// <returns>Whether failed evaluator is master or not</returns>
internal void RemoveEvaluatorIdFromPartitionIdProvider(string evaluatorId)
{
if (!_partitionIdProvider.ContainsKey(evaluatorId))
{
var msg = string.Format(CultureInfo.InvariantCulture, "Partition descriptor for Failed evaluator:{0} not present", evaluatorId);
Exceptions.Throw(new Exception(msg), Logger);
}
_partitionDescriptorIds.Push(_partitionIdProvider[evaluatorId]);
_partitionIdProvider.Remove(evaluatorId);
}
/// <summary>
/// Gets Context and Service configuration for Master
/// </summary>
/// <param name="evaluatorId"></param>
/// <returns></returns>
internal ContextAndServiceConfiguration GetContextConfigurationForMasterEvaluatorById(string evaluatorId)
{
Logger.Log(Level.Info, "Getting root context and service configuration for master");
var serviceConf = ServiceConfiguration.ConfigurationModule
.Set(ServiceConfiguration.Services, GenericType<TaskStateService>.Class)
.Build();
return new ContextAndServiceConfiguration(
ContextConfiguration.ConfigurationModule.Set(ContextConfiguration.Identifier,
IMRUConstants.MasterContextId).Build(),
Configurations.Merge(serviceConf, _configurationManager.UpdateTaskStateConfiguration));
}
/// <summary>
/// Gets partition Id for the evaluator
/// </summary>
/// <param name="evaluatorId"></param>
/// <returns></returns>
internal string GetPartitionIdByEvaluatorId(string evaluatorId)
{
if (!_partitionIdProvider.ContainsKey(evaluatorId))
{
var msg = string.Format(CultureInfo.InvariantCulture, "Partition descriptor for evaluator:{0} is not present in the mapping", evaluatorId);
Exceptions.Throw(new IMRUSystemException(msg), Logger);
}
return _partitionIdProvider[evaluatorId];
}
/// <summary>
/// Gives context and service configuration for next evaluator either from failed
/// evaluator or new configuration
/// </summary>
/// <param name="evaluatorId"></param>
/// <returns></returns>
internal ContextAndServiceConfiguration GetDataLoadingConfigurationForEvaluatorById(string evaluatorId)
{
if (_partitionDescriptorIds.Count == 0)
{
Exceptions.Throw(new IMRUSystemException("No more data configuration can be provided"), Logger);
}
if (_partitionIdProvider.ContainsKey(evaluatorId))
{
var msg =
string.Format(
CultureInfo.InvariantCulture,
"Evaluator Id:{0} already present in configuration cache, they have to be unique",
evaluatorId);
Exceptions.Throw(new IMRUSystemException(msg), Logger);
}
Logger.Log(Level.Info, "Getting a new data loading configuration");
_partitionIdProvider[evaluatorId] = _partitionDescriptorIds.Pop();
try
{
IPartitionDescriptor partitionDescriptor =
_dataset.GetPartitionDescriptorForId(_partitionIdProvider[evaluatorId]);
return GetDataLoadingContextAndServiceConfiguration(partitionDescriptor, evaluatorId);
}
catch (Exception e)
{
var msg = string.Format(CultureInfo.InvariantCulture, "Error while trying to access partition descriptor:{0} from dataset",
_partitionIdProvider[evaluatorId]);
Exceptions.Throw(e, msg, Logger);
return null;
}
}
/// <summary>
/// Creates service and data loading context configuration for given evaluator id
/// </summary>
/// <param name="partitionDescriptor"></param>
/// <param name="evaluatorId"></param>
/// <returns></returns>
private ContextAndServiceConfiguration GetDataLoadingContextAndServiceConfiguration(
IPartitionDescriptor partitionDescriptor,
string evaluatorId)
{
var dataLoadingContextConf =
TangFactory.GetTang()
.NewConfigurationBuilder()
.BindSetEntry<ContextConfigurationOptions.StartHandlers, DataLoadingContext<TPartitionType>, IObserver<IContextStart>>(
GenericType<ContextConfigurationOptions.StartHandlers>.Class,
GenericType<DataLoadingContext<TPartitionType>>.Class)
.Build();
var serviceConf =
TangFactory.GetTang()
.NewConfigurationBuilder(
ServiceConfiguration.ConfigurationModule
.Set(ServiceConfiguration.Services, GenericType<TaskStateService>.Class)
.Build(),
dataLoadingContextConf,
partitionDescriptor.GetPartitionConfiguration(),
_configurationManager.MapTaskStateConfiguration)
.Build();
var contextConf = ContextConfiguration.ConfigurationModule
.Set(ContextConfiguration.Identifier, string.Format("DataLoading-{0}", evaluatorId))
.Build();
return new ContextAndServiceConfiguration(contextConf, serviceConf);
}
}
}