blob: a2acf5a414af8406a944313faa28fc4dad51a757 [file] [log] [blame]
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
using System;
using System.Collections.Generic;
using System.Diagnostics;
using System.Linq;
using Org.Apache.REEF.Client.Common;
using Org.Apache.REEF.IMRU.API;
using Org.Apache.REEF.IMRU.InProcess.Parameters;
using Org.Apache.REEF.IO.PartitionedData;
using Org.Apache.REEF.Tang.Annotations;
using Org.Apache.REEF.Tang.Implementations.Configuration;
using Org.Apache.REEF.Tang.Implementations.Tang;
using Org.Apache.REEF.Tang.Interface;
using Org.Apache.REEF.Tang.Util;
using Org.Apache.REEF.Utilities.Diagnostics;
using Org.Apache.REEF.Utilities.Logging;
using Org.Apache.REEF.Wake.StreamingCodec;
namespace Org.Apache.REEF.IMRU.InProcess
{
/// <summary>
/// Implements the IMRU client API for in-process execution
/// </summary>
/// <remarks>
/// This client assumes that all given Configurations can be merged in a conflict-free way.
/// </remarks>
public sealed class InProcessIMRUClient : IIMRUClient
{
private static readonly Logger Logger =
Logger.GetLogger(typeof(InProcessIMRUClient));
private readonly int _numberOfMappers;
/// <summary>
/// Use Tang to instantiate this.
/// </summary>
/// <param name="numberOfMappers">The number of mappers to instantiate</param>
[Inject]
private InProcessIMRUClient([Parameter(typeof(NumberOfMappers))] int numberOfMappers)
{
Debug.Assert(numberOfMappers > 0, "Number of mappers should be greater than zero.");
_numberOfMappers = numberOfMappers;
}
/// <summary>
/// Submits the map job
/// </summary>
/// <typeparam name="TMapInput">The type of the side information provided to the Map function</typeparam>
/// <typeparam name="TMapOutput">The return type of the Map function</typeparam>
/// <typeparam name="TResult">The return type of the computation.</typeparam>
/// <typeparam name="TPartitionType">Type of data partition (Generic type in IInputPartition)</typeparam>
/// <param name="jobDefinition">Job definition given by the user</param>
/// <returns>The result of the job</returns>
public IEnumerable<TResult> Submit<TMapInput, TMapOutput, TResult, TPartitionType>(IMRUJobDefinition jobDefinition)
{
IConfiguration overallPerMapConfig = null;
try
{
overallPerMapConfig = Configurations.Merge(jobDefinition.PerMapConfigGeneratorConfig.ToArray());
}
catch (Exception e)
{
Exceptions.Throw(e, "Issues in merging PerMapCOnfigGenerator configurations", Logger);
}
var mergedConfig = Configurations.Merge(
jobDefinition.ReduceFunctionConfiguration,
jobDefinition.UpdateFunctionConfiguration,
jobDefinition.UpdateFunctionCodecsConfiguration,
overallPerMapConfig);
var injector = TangFactory.GetTang().NewInjector(mergedConfig);
ISet<IPerMapperConfigGenerator> perMapConfigGenerators =
(ISet<IPerMapperConfigGenerator>)injector.GetNamedInstance(typeof(PerMapConfigGeneratorSet));
injector.BindVolatileInstance(GenericType<MapFunctions<TMapInput, TMapOutput>>.Class,
MakeMapFunctions<TMapInput, TMapOutput>(jobDefinition.MapFunctionConfiguration, jobDefinition.PartitionedDatasetConfiguration, perMapConfigGenerators));
var runner = injector.GetInstance<IMRURunner<TMapInput, TMapOutput, TResult>>();
return runner.Run();
}
/// <summary>
/// DriverHttpEndPoint returned by IReefClient after job submission
/// </summary>
public IJobSubmissionResult JobSubmissionResult
{
get { return null; }
}
/// <summary>
/// We also need IInputPartition at each map function
/// </summary>
/// <param name="mapConfiguration">Map configuration given by user</param>
/// <param name="partitionedDataSetConfig">Partitioned dataset configuration</param>
/// <param name="perMapConfigGenerators">Per map configuration generators</param>
/// <returns></returns>
private MapFunctions<TMapInput, TMapOutput> MakeMapFunctions<TMapInput, TMapOutput>(IConfiguration mapConfiguration, IConfiguration partitionedDataSetConfig, ISet<IPerMapperConfigGenerator> perMapConfigGenerators)
{
IPartitionedInputDataSet dataset =
TangFactory.GetTang().NewInjector(partitionedDataSetConfig).GetInstance<IPartitionedInputDataSet>();
ISet<IMapFunction<TMapInput, TMapOutput>> mappers = new HashSet<IMapFunction<TMapInput, TMapOutput>>();
int counter = 0;
foreach (var descriptor in dataset)
{
var emptyConfig = TangFactory.GetTang().NewConfigurationBuilder().Build();
IConfiguration perMapConfig = perMapConfigGenerators.Aggregate(emptyConfig,
(current, configGenerator) =>
Configurations.Merge(current, configGenerator.GetMapperConfiguration(counter, dataset.Count)));
var injector = TangFactory.GetTang()
.NewInjector(mapConfiguration, descriptor.GetPartitionConfiguration(), perMapConfig);
mappers.Add(injector.GetInstance<IMapFunction<TMapInput, TMapOutput>>());
counter++;
}
return new MapFunctions<TMapInput, TMapOutput>(mappers);
}
}
}