blob: c91abe2887481d3eab981a06847151b45968ed0a [file] [log] [blame]
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
using System.Collections.Generic;
using System.Diagnostics;
using System.IO;
using Org.Apache.REEF.IMRU.API;
using Org.Apache.REEF.Network.Group.Operators;
using Org.Apache.REEF.Tang.Annotations;
using Org.Apache.REEF.Wake.Remote.Impl;
using Org.Apache.REEF.Wake.StreamingCodec;
namespace Org.Apache.REEF.IMRU.InProcess
{
/// <summary>
/// Simple, single-threaded executor for IMRU Jobs.
/// </summary>
/// <typeparam name="TMapInput">Input to map function</typeparam>
/// <typeparam name="TMapOutput">Output of map function</typeparam>
/// <typeparam name="TResult">Final result</typeparam>
internal sealed class IMRURunner<TMapInput, TMapOutput, TResult>
{
private readonly ISet<IMapFunction<TMapInput, TMapOutput>> _mapfunctions;
private readonly IReduceFunction<TMapOutput> _reduceTask;
private readonly IUpdateFunction<TMapInput, TMapOutput, TResult> _updateTask;
private readonly IStreamingCodec<TMapInput> _mapInputCodec;
private readonly IStreamingCodec<TMapOutput> _mapOutputCodec;
[Inject]
private IMRURunner(MapFunctions<TMapInput, TMapOutput> mapfunctions,
IReduceFunction<TMapOutput> reduceTask,
IUpdateFunction<TMapInput, TMapOutput, TResult> updateTask,
InputCodecWrapper<TMapInput> mapInputCodec,
OutputCodecWrapper<TMapOutput> mapOutputCodec)
{
_mapfunctions = mapfunctions.Mappers;
_reduceTask = reduceTask;
_updateTask = updateTask;
_mapInputCodec = mapInputCodec.Codec;
_mapOutputCodec = mapOutputCodec.Codec;
}
internal IList<TResult> Run()
{
var results = new List<TResult>();
var updateResult = _updateTask.Initialize();
while (updateResult.IsNotDone)
{
if (updateResult.HasResult)
{
results.Add(updateResult.Result);
}
Debug.Assert(updateResult.HasMapInput, "MapInput is needed.");
var mapinput = updateResult.MapInput;
var mapOutputs = new List<TMapOutput>();
foreach (var mapfunc in _mapfunctions)
{
// We create a copy by doing coding and decoding since the map task might
// reuse the fields in next iteration and meanwhile update task might update it.
using (MemoryStream mapInputStream = new MemoryStream(), mapOutputStream = new MemoryStream())
{
var mapInputWriter = new StreamDataWriter(mapInputStream);
_mapInputCodec.Write(mapinput, mapInputWriter);
mapInputStream.Position = 0;
var mapInputReader = new StreamDataReader(mapInputStream);
var output = mapfunc.Map(_mapInputCodec.Read(mapInputReader));
var mapOutputWriter = new StreamDataWriter(mapOutputStream);
_mapOutputCodec.Write(output, mapOutputWriter);
mapOutputStream.Position = 0;
var mapOutputReader = new StreamDataReader(mapOutputStream);
output = _mapOutputCodec.Read(mapOutputReader);
mapOutputs.Add(output);
}
}
var mapOutput = _reduceTask.Reduce(mapOutputs);
updateResult = _updateTask.Update(mapOutput);
}
if (updateResult.HasResult)
{
results.Add(updateResult.Result);
}
return results;
}
}
}