blob: 01a80fe51796f3a3f39853e47d2d02346f3ee5ee [file] [log] [blame]
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
using System;
using System.Collections.Generic;
using System.Diagnostics;
using Org.Apache.REEF.Driver.Evaluator;
using Org.Apache.REEF.Driver.Task;
using Org.Apache.REEF.IMRU.OnREEF.Driver;
using Org.Apache.REEF.Tang.Annotations;
using Org.Apache.REEF.Tang.Interface;
using Org.Apache.REEF.Tang.Util;
using Org.Apache.REEF.Utilities;
using Org.Apache.REEF.Utilities.Logging;
using Xunit;
namespace Org.Apache.REEF.Tests.Functional.IMRU
{
/// <summary>
/// This is to test close event handler in IMRU tasks
/// The test provide IRunningTask, IFailedTask and ICompletedTask handlers so that to trigger close events and handle the
/// failed tasks and completed tasks
/// </summary>
[Collection("FunctionalTests")]
public class IMRUCloseTaskTest : IMRUBrodcastReduceTestBase
{
/// <summary>
/// This test is for running in local runtime
/// It sends close event for all the running tasks.
/// In the task close handler, the cancellation token will be set, and as a result tasks will return from the Call()
/// method and driver will receive ICompletedTask.
/// In the exceptional case, task might throw exception from Call() method, as a result, driver will receive IFailedTask.
/// Expect number of CompletedTask and FailedTask equals to the total number of tasks. No failed Evaluator.
/// </summary>
[Fact]
public void TestTaskCloseOnLocalRuntime()
{
const int chunkSize = 2;
const int dims = 50;
const int iterations = 1000;
const int mapperMemory = 512;
const int updateTaskMemory = 512;
const int numTasks = 4;
const int numOfRetryInRecovery = 4;
var testFolder = DefaultRuntimeFolder + TestId;
TestBroadCastAndReduce(false, numTasks, chunkSize, dims, iterations, mapperMemory, updateTaskMemory, numOfRetryInRecovery, testFolder);
string[] lines = ReadLogFile(DriverStdout, "driver", testFolder, 120);
var completedCount = GetMessageCount(lines, CompletedTaskMessage);
var failedCount = GetMessageCount(lines, FailedTaskMessage);
Assert.Equal(numTasks, completedCount + failedCount);
CleanUp(testFolder);
}
/// <summary>
/// Same testing for running on YARN
/// It sends close event for all the running tasks.
/// </summary>
[Fact(Skip = "Requires Yarn")]
public void TestTaskCloseOnLocalRuntimeOnYarn()
{
const int chunkSize = 2;
const int dims = 50;
const int iterations = 200;
const int mapperMemory = 512;
const int updateTaskMemory = 512;
const int numTasks = 4;
const int numOfRetryInRecovery = 4;
TestBroadCastAndReduce(true, numTasks, chunkSize, dims, iterations, mapperMemory, updateTaskMemory, numOfRetryInRecovery);
}
/// <summary>
/// This method overrides base class method and defines its own event handlers for driver.
/// It uses its own RunningTaskHandler, FailedEvaluatorHandler and CompletedTaskHandler, FailedTaskHandler so that to simulate the test scenarios
/// and verify the test result.
/// Rest of the event handlers use those from IMRUDriver. In IActiveContext handler in IMRUDriver, IMRU tasks are bound for the test.
/// </summary>
/// <typeparam name="TMapInput"></typeparam>
/// <typeparam name="TMapOutput"></typeparam>
/// <typeparam name="TResult"></typeparam>
/// <typeparam name="TPartitionType"></typeparam>
/// <returns></returns>
protected override IConfiguration DriverEventHandlerConfigurations<TMapInput, TMapOutput, TResult, TPartitionType>()
{
return REEF.Driver.DriverConfiguration.ConfigurationModule
.Set(REEF.Driver.DriverConfiguration.OnEvaluatorAllocated,
GenericType<IMRUDriver<TMapInput, TMapOutput, TResult, TPartitionType>>.Class)
.Set(REEF.Driver.DriverConfiguration.OnDriverStarted,
GenericType<IMRUDriver<TMapInput, TMapOutput, TResult, TPartitionType>>.Class)
.Set(REEF.Driver.DriverConfiguration.OnContextActive,
GenericType<IMRUDriver<TMapInput, TMapOutput, TResult, TPartitionType>>.Class)
.Set(REEF.Driver.DriverConfiguration.OnTaskCompleted,
GenericType<TestHandlers>.Class)
.Set(REEF.Driver.DriverConfiguration.OnEvaluatorFailed,
GenericType<TestHandlers>.Class)
.Set(REEF.Driver.DriverConfiguration.OnContextFailed,
GenericType<IMRUDriver<TMapInput, TMapOutput, TResult, TPartitionType>>.Class)
.Set(REEF.Driver.DriverConfiguration.OnTaskFailed,
GenericType<TestHandlers>.Class)
.Set(REEF.Driver.DriverConfiguration.OnTaskRunning,
GenericType<TestHandlers>.Class)
.Set(REEF.Driver.DriverConfiguration.CustomTraceLevel, TraceLevel.Info.ToString())
.Build();
}
/// <summary>
/// Test handlers
/// </summary>
internal sealed class TestHandlers : IObserver<IRunningTask>, IObserver<ICompletedTask>, IObserver<IFailedTask>, IObserver<IFailedEvaluator>
{
private readonly IDictionary<string, IRunningTask> _runningTasks = new Dictionary<string, IRunningTask>();
private readonly object _lock = new object();
[Inject]
private TestHandlers()
{
}
/// <summary>
/// Add the RunningTask to _runningTasks and dispose the last received running task
/// </summary>
public void OnNext(IRunningTask value)
{
lock (_lock)
{
Logger.Log(Level.Info, "Received running task:" + value.Id);
_runningTasks.Add(value.Id, value);
if (_runningTasks.Count == 4)
{
Logger.Log(Level.Info, "Dispose running task from driver:" + value.Id);
value.Dispose(ByteUtilities.StringToByteArrays(TaskManager.CloseTaskByDriver));
_runningTasks.Remove(value.Id);
}
}
}
/// <summary>
/// Log the task id and FailTaskMessage
/// Close the rest of the running tasks, then dispose the context
/// </summary>
/// <param name="value"></param>
public void OnNext(IFailedTask value)
{
lock (_lock)
{
Logger.Log(Level.Info, FailedTaskMessage + value.Id);
_runningTasks.Remove(value.Id);
CloseRunningTasks();
value.GetActiveContext().Value.Dispose();
}
}
/// <summary>
/// No Failed Evaluator is expected
/// </summary>
/// <param name="value"></param>
public void OnNext(IFailedEvaluator value)
{
throw new Exception(FailedEvaluatorMessage);
}
/// <summary>
/// Log the task id and ICompletedTask
/// Remove the task from _runningTasks
/// Close the rest of the running tasks, then dispose the context
/// </summary>
public void OnNext(ICompletedTask value)
{
lock (_lock)
{
Logger.Log(Level.Info, CompletedTaskMessage + value.Id);
_runningTasks.Remove(value.Id);
CloseRunningTasks();
value.ActiveContext.Dispose();
}
}
private void CloseRunningTasks()
{
foreach (var task in _runningTasks)
{
Logger.Log(Level.Info, "Dispose running task from driver:" + task.Key);
task.Value.Dispose(ByteUtilities.StringToByteArrays(TaskManager.CloseTaskByDriver));
}
_runningTasks.Clear();
}
public void OnCompleted()
{
throw new NotImplementedException();
}
public void OnError(Exception error)
{
throw new NotImplementedException();
}
}
}
}