blob: 319f541a5b697da2ec4e215c0d86c29d64beb918 [file] [log] [blame]
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on anAssert.Equal
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
using System;
using NSubstitute;
using Org.Apache.REEF.Driver.Context;
using Org.Apache.REEF.Driver.Evaluator;
using Org.Apache.REEF.Driver.Task;
using Org.Apache.REEF.IMRU.OnREEF.Driver;
using Org.Apache.REEF.IMRU.OnREEF.Driver.StateMachine;
using Org.Apache.REEF.Tang.Implementations.Tang;
using Org.Apache.REEF.Tang.Interface;
using Org.Apache.REEF.Utilities;
using Xunit;
namespace Org.Apache.REEF.IMRU.Tests
{
/// <summary>
/// Test cases for TaskManager
/// </summary>
public sealed class TestTaskManager
{
private const string MapperTaskIdPrefix = "MapperTaskIdPrefix";
private const string MasterTaskId = "MasterTaskId";
private const string EvaluatorIdPrefix = "EvaluatorId";
private const string ContextIdPrefix = "ContextId";
private const int TotalNumberOfTasks = 3;
/// <summary>
/// Tests valid Add task cases
/// </summary>
[Fact]
public void TestValidAddAndReset()
{
var taskManager = TaskManagerWithTasksAdded();
Assert.True(taskManager.AreAllTasksInState(TaskState.TaskNew));
Assert.Equal(TotalNumberOfTasks, taskManager.NumberOfTasks);
taskManager.Reset();
Assert.Equal(0, taskManager.NumberOfTasks);
Assert.Equal(0, taskManager.NumberOfAppErrors());
}
/// <summary>
/// Tests SubmitTasks after adding all the tasks to the TaskManager
/// </summary>
[Fact]
public void TestSubmitTasks()
{
var taskManager = TaskManagerWithTasksSubmitted();
Assert.True(taskManager.AreAllTasksInState(TaskState.TaskSubmitted));
}
/// <summary>
/// Tests SubmitTask with a missing mapper task
/// </summary>
[Fact]
public void TestMissingMapperTasksSubmit()
{
var taskManager = CreateTaskManager();
taskManager.AddTask(MasterTaskId, MockConfig(), CreateMockActiveContext(0));
taskManager.AddTask(MapperTaskIdPrefix + 1, MockConfig(), CreateMockActiveContext(1));
Action submit = () => taskManager.SubmitTasks();
Assert.Throws<IMRUSystemException>(submit);
}
/// <summary>
/// Tests SubmitTask with missing master task
/// </summary>
[Fact]
public void TestMissingMasterTaskSubmit()
{
var taskManager = CreateTaskManager();
taskManager.AddTask(MapperTaskIdPrefix + 1, MockConfig(), CreateMockActiveContext(1));
taskManager.AddTask(MapperTaskIdPrefix + 2, MockConfig(), CreateMockActiveContext(2));
Action submit = () => taskManager.SubmitTasks();
Assert.Throws<IMRUSystemException>(submit);
}
/// <summary>
/// Tests adding all mapper tasks without master task
/// </summary>
[Fact]
public void NoMasterTask()
{
var taskManager = CreateTaskManager();
taskManager.AddTask(MapperTaskIdPrefix + 1, MockConfig(), CreateMockActiveContext(1));
taskManager.AddTask(MapperTaskIdPrefix + 2, MockConfig(), CreateMockActiveContext(2));
Action add = () => taskManager.AddTask(MapperTaskIdPrefix + 3, MockConfig(), CreateMockActiveContext(3));
Assert.Throws<IMRUSystemException>(add);
}
/// <summary>
/// Tests adding more than expected tasks
/// </summary>
[Fact]
public void ExceededTotalNumber()
{
var taskManager = TaskManagerWithTasksAdded();
Action add = () => taskManager.AddTask(MapperTaskIdPrefix + 4, MockConfig(), CreateMockActiveContext(4));
Assert.Throws<IMRUSystemException>(add);
}
/// <summary>
/// Tests adding a task with duplicated task id and duplicated master id
/// </summary>
[Fact]
public void DuplicatedTaskIdInAdd()
{
var taskManager = CreateTaskManager();
taskManager.AddTask(MasterTaskId, MockConfig(), CreateMockActiveContext(0));
taskManager.AddTask(MapperTaskIdPrefix + 1, MockConfig(), CreateMockActiveContext(1));
Action add = () => taskManager.AddTask(MapperTaskIdPrefix + 1, MockConfig(), CreateMockActiveContext(1));
Assert.Throws<IMRUSystemException>(add);
add = () => taskManager.AddTask(MasterTaskId, MockConfig(), CreateMockActiveContext(1));
Assert.Throws<IMRUSystemException>(add);
}
/// <summary>
/// Tests invalid arguments when adding tasks
/// </summary>
[Fact]
public void NullArguments()
{
var taskManager = CreateTaskManager();
taskManager.AddTask(MasterTaskId, MockConfig(), CreateMockActiveContext(0));
Action add = () => taskManager.AddTask(MapperTaskIdPrefix + 1, null, CreateMockActiveContext(1));
Assert.Throws<IMRUSystemException>(add);
add = () => taskManager.AddTask(MapperTaskIdPrefix + 2, MockConfig(), null);
Assert.Throws<IMRUSystemException>(add);
}
/// <summary>
/// Tests passing invalid arguments in creating TaskManager
/// </summary>
[Fact]
public void InvalidArgumentsInCreatingTaskManger()
{
Action taskManager = () => CreateTaskManager(0, MasterTaskId);
Assert.Throws<IMRUSystemException>(taskManager);
taskManager = () => CreateTaskManager(1, null);
Assert.Throws<IMRUSystemException>(taskManager);
}
/// <summary>
/// Tests whether all tasks rightly reach Running and Completed states
/// </summary>
[Fact]
public void TestCompletingTasks()
{
var taskManager = TaskManagerWithTasksRunning();
Assert.True(taskManager.AreAllTasksInState(TaskState.TaskRunning));
taskManager.RecordCompletedTask(CreateMockCompletedTask(MapperTaskIdPrefix + 1));
taskManager.RecordCompletedTask(CreateMockCompletedTask(MapperTaskIdPrefix + 2));
taskManager.RecordCompletedTask(CreateMockCompletedTask(MasterTaskId));
Assert.True(taskManager.AreAllTasksInState(TaskState.TaskCompleted));
}
/// <summary>
/// Tests closing running tasks
/// </summary>
[Fact]
public void TestClosingRunningTasks()
{
var taskManager = TaskManagerWithTasksSubmitted();
taskManager.RecordRunningTask(CreateMockRunningTask(MasterTaskId));
taskManager.RecordRunningTask(CreateMockRunningTask(MapperTaskIdPrefix + 1));
var runningTask2 = CreateMockRunningTask(MapperTaskIdPrefix + 2);
taskManager.RecordRunningTaskDuringSystemFailure(runningTask2, TaskManager.CloseTaskByDriver);
taskManager.CloseAllRunningTasks(TaskManager.CloseTaskByDriver);
Assert.True(taskManager.AreAllTasksInState(TaskState.TaskWaitingForClose));
}
/// <summary>
/// Tests record failed tasks after all the tasks are running
/// </summary>
[Fact]
public void TestFailedRunningTasks()
{
var taskManager = TaskManagerWithTasksRunning();
taskManager.RecordFailedTaskDuringRunningOrSubmissionState(CreateMockFailedTask(MapperTaskIdPrefix + 1, TaskManager.TaskAppError));
taskManager.RecordFailedTaskDuringRunningOrSubmissionState(CreateMockFailedTask(MapperTaskIdPrefix + 2, TaskManager.TaskGroupCommunicationError));
taskManager.RecordFailedTaskDuringRunningOrSubmissionState(CreateMockFailedTask(MasterTaskId, TaskManager.TaskSystemError));
Assert.True(taskManager.AllInFinalState());
}
/// <summary>
/// Tests number of application errors
/// </summary>
[Fact]
public void TestAppError()
{
var taskManager = TaskManagerWithTasksSubmitted();
taskManager.RecordFailedTaskDuringRunningOrSubmissionState(CreateMockFailedTask(MapperTaskIdPrefix + 1, TaskManager.TaskAppError));
taskManager.RecordFailedTaskDuringRunningOrSubmissionState(CreateMockFailedTask(MapperTaskIdPrefix + 2, TaskManager.TaskSystemError));
Assert.Equal(1, taskManager.NumberOfAppErrors());
}
/// <summary>
/// Tests failed tasks in various event sequences
/// </summary>
[Fact]
public void TestFailedTasks()
{
var taskManager = TaskManagerWithTasksSubmitted();
taskManager.RecordRunningTask(CreateMockRunningTask(MapperTaskIdPrefix + 1));
taskManager.RecordRunningTask(CreateMockRunningTask(MapperTaskIdPrefix + 2));
// This task failed by evaluator then failed by itself
var failedTask1 = CreateMockFailedTask(MapperTaskIdPrefix + 1, TaskManager.TaskSystemError);
taskManager.RecordTaskFailWhenReceivingFailedEvaluator(CreateMockFailedEvaluator("eId1", failedTask1));
Assert.Equal(TaskState.TaskFailedByEvaluatorFailure, taskManager.GetTaskState(MapperTaskIdPrefix + 1));
// no state change should happen in this case
taskManager.RecordFailedTaskDuringSystemShuttingDownState(failedTask1);
Assert.Equal(TaskState.TaskFailedByEvaluatorFailure, taskManager.GetTaskState(MapperTaskIdPrefix + 1));
// This task failed by itself first, then failed by Evaluator failure
var failedTask2 = CreateMockFailedTask(MapperTaskIdPrefix + 2, TaskManager.TaskGroupCommunicationError);
taskManager.RecordFailedTaskDuringRunningOrSubmissionState(failedTask2);
Assert.Equal(TaskState.TaskFailedByGroupCommunication, taskManager.GetTaskState(MapperTaskIdPrefix + 2));
taskManager.RecordTaskFailWhenReceivingFailedEvaluator(CreateMockFailedEvaluator("eId2", failedTask2));
Assert.Equal(TaskState.TaskFailedByEvaluatorFailure, taskManager.GetTaskState(MapperTaskIdPrefix + 2));
// close the running task during shutting down
var masterRuningTask = CreateMockRunningTask(MasterTaskId);
taskManager.RecordRunningTaskDuringSystemFailure(masterRuningTask, TaskManager.CloseTaskByDriver);
taskManager.RecordFailedTaskDuringSystemShuttingDownState(CreateMockFailedTask(MasterTaskId, TaskManager.TaskKilledByDriver));
Assert.Equal(TaskState.TaskClosedByDriver, taskManager.GetTaskState(MasterTaskId));
Assert.True(taskManager.AllInFinalState());
}
/// <summary>
/// Tests after all the tasks are running, a task fails first, then close all running tasks
/// </summary>
[Fact]
public void TestFailedTasksAfterAllTasksAreRunnigScenario()
{
var taskManager = TaskManagerWithTasksRunning();
// A task fail first
var failedTask1 = CreateMockFailedTask(MapperTaskIdPrefix + 1, TaskManager.TaskSystemError);
taskManager.RecordFailedTaskDuringRunningOrSubmissionState(failedTask1);
Assert.Equal(TaskState.TaskFailedBySystemError, taskManager.GetTaskState(MapperTaskIdPrefix + 1));
// system is in shutting down, close all other tasks
taskManager.CloseAllRunningTasks(TaskManager.CloseTaskByDriver);
// task 2 is killed by driver
var failedTask2 = CreateMockFailedTask(MapperTaskIdPrefix + 2, TaskManager.TaskKilledByDriver);
taskManager.RecordFailedTaskDuringSystemShuttingDownState(failedTask2);
// master task is killed by driver
var masterTask = CreateMockFailedTask(MasterTaskId, TaskManager.TaskKilledByDriver);
taskManager.RecordFailedTaskDuringSystemShuttingDownState(masterTask);
Assert.True(taskManager.AllInFinalState());
}
/// <summary>
/// Tests after all the tasks are running, an evaluator fails first, then a task fails with communication error
/// </summary>
[Fact]
public void TestFailedEvaluatorThenFailedTaskAfterTasksAreRunningScenario()
{
var taskManager = TaskManagerWithTasksRunning();
// Evaluator error
var failedTask1 = CreateMockFailedTask(MapperTaskIdPrefix + 1, TaskManager.TaskSystemError);
taskManager.RecordTaskFailWhenReceivingFailedEvaluator(CreateMockFailedEvaluator("eId1", failedTask1));
Assert.Equal(TaskState.TaskFailedByEvaluatorFailure, taskManager.GetTaskState(MapperTaskIdPrefix + 1));
// system is in shutting down, close all other tasks
taskManager.CloseAllRunningTasks(TaskManager.CloseTaskByDriver);
Assert.Equal(TaskState.TaskWaitingForClose, taskManager.GetTaskState(MapperTaskIdPrefix + 2));
Assert.Equal(TaskState.TaskWaitingForClose, taskManager.GetTaskState(MasterTaskId));
// Another task may get failed by communication during the shutting down
var failedTask2 = CreateMockFailedTask(MapperTaskIdPrefix + 2, TaskManager.TaskGroupCommunicationError);
taskManager.RecordFailedTaskDuringSystemShuttingDownState(failedTask2);
Assert.Equal(TaskState.TaskClosedByDriver, taskManager.GetTaskState(MapperTaskIdPrefix + 2));
// The task that receives the close from driver now send failed event back to driver
var masterTask = CreateMockFailedTask(MasterTaskId, TaskManager.TaskKilledByDriver);
taskManager.RecordFailedTaskDuringSystemShuttingDownState(masterTask);
Assert.Equal(TaskState.TaskClosedByDriver, taskManager.GetTaskState(MasterTaskId));
Assert.True(taskManager.AllInFinalState());
}
/// <summary>
/// Tests after all the tasks are running, a task fails first, then an evaluator fails
/// </summary>
[Fact]
public void TestFailedTasksThenFailedEvaluatorAfterAllTasksAreRunningScenario()
{
var taskManager = TaskManagerWithTasksRunning();
// A task fails first
var failedTask1 = CreateMockFailedTask(MapperTaskIdPrefix + 1, TaskManager.TaskSystemError);
taskManager.RecordFailedTaskDuringRunningOrSubmissionState(failedTask1);
Assert.Equal(TaskState.TaskFailedBySystemError, taskManager.GetTaskState(MapperTaskIdPrefix + 1));
// system is in shutting down, close all other tasks
taskManager.CloseAllRunningTasks(TaskManager.CloseTaskByDriver);
// An Evaluator fails during shut down, as the task is already in waiting for close state, its state will be changed to TaskClosedByDriver
var failedTask2 = CreateMockFailedTask(MapperTaskIdPrefix + 2, TaskManager.TaskSystemError);
taskManager.RecordTaskFailWhenReceivingFailedEvaluator(CreateMockFailedEvaluator("eId2", failedTask2));
Assert.Equal(TaskState.TaskClosedByDriver, taskManager.GetTaskState(MapperTaskIdPrefix + 2));
// master task gets communication error before it receives close event, as the task is already in waiting for close state, its state will be changed to TaskClosedByDriver
var masterTask = CreateMockFailedTask(MasterTaskId, TaskManager.TaskGroupCommunicationError);
taskManager.RecordFailedTaskDuringSystemShuttingDownState(masterTask);
Assert.Equal(TaskState.TaskClosedByDriver, taskManager.GetTaskState(MasterTaskId));
Assert.True(taskManager.AllInFinalState());
}
/// <summary>
/// Test evaluator fails before any task is running after all the tasks are submitted
/// </summary>
[Fact]
public void TestFailedEvaluatorBeforeAnyTaskIsRunningScenario()
{
var taskManager = TaskManagerWithTasksSubmitted();
// Evaluator error
var failedTask1 = CreateMockFailedTask(MapperTaskIdPrefix + 1, TaskManager.TaskSystemError);
taskManager.RecordTaskFailWhenReceivingFailedEvaluator(CreateMockFailedEvaluator("eId1", failedTask1));
Assert.Equal(TaskState.TaskFailedByEvaluatorFailure, taskManager.GetTaskState(MapperTaskIdPrefix + 1));
// there is no any running task yet
taskManager.CloseAllRunningTasks(TaskManager.CloseTaskByDriver);
// task2 is running , close it
var runingTask2 = CreateMockRunningTask(MapperTaskIdPrefix + 2);
taskManager.RecordRunningTaskDuringSystemFailure(runingTask2, TaskManager.CloseTaskByDriver);
Assert.Equal(TaskState.TaskWaitingForClose, taskManager.GetTaskState(MapperTaskIdPrefix + 2));
// master task is running, close it
var masterTask = CreateMockRunningTask(MasterTaskId);
taskManager.RecordRunningTaskDuringSystemFailure(masterTask, TaskManager.CloseTaskByDriver);
Assert.Equal(TaskState.TaskWaitingForClose, taskManager.GetTaskState(MasterTaskId));
// received task failure because of the closing
var failedTask2 = CreateMockFailedTask(MapperTaskIdPrefix + 2, TaskManager.TaskKilledByDriver);
taskManager.RecordFailedTaskDuringSystemShuttingDownState(failedTask2);
// received task failure because of the closing
var failedMasterTask = CreateMockFailedTask(MasterTaskId, TaskManager.TaskKilledByDriver);
taskManager.RecordFailedTaskDuringSystemShuttingDownState(failedMasterTask);
Assert.True(taskManager.AllInFinalState());
}
/// <summary>
/// Tests an evaluator fails for a running task before all the tasks are running
/// </summary>
[Fact]
public void TestFailedEvaluatorOnRunningTaskBeforeAllTasksAreRunningScenario()
{
var taskManager = TaskManagerWithTasksSubmitted();
taskManager.RecordRunningTask(CreateMockRunningTask(MasterTaskId));
taskManager.RecordRunningTask(CreateMockRunningTask(MapperTaskIdPrefix + 1));
// Evaluator error
var failedTask1 = CreateMockFailedTask(MapperTaskIdPrefix + 1, TaskManager.TaskSystemError);
taskManager.RecordTaskFailWhenReceivingFailedEvaluator(CreateMockFailedEvaluator("eId1", failedTask1));
Assert.Equal(TaskState.TaskFailedByEvaluatorFailure, taskManager.GetTaskState(MapperTaskIdPrefix + 1));
// the master task should be closed
taskManager.CloseAllRunningTasks(TaskManager.CloseTaskByDriver);
// task 2 is now running, close it
var runingTask2 = CreateMockRunningTask(MapperTaskIdPrefix + 2);
taskManager.RecordRunningTaskDuringSystemFailure(runingTask2, TaskManager.CloseTaskByDriver);
Assert.Equal(TaskState.TaskWaitingForClose, taskManager.GetTaskState(MapperTaskIdPrefix + 2));
var failedTask2 = CreateMockFailedTask(MapperTaskIdPrefix + 2, TaskManager.TaskKilledByDriver);
taskManager.RecordFailedTaskDuringSystemShuttingDownState(failedTask2);
var failedMasterTask = CreateMockFailedTask(MasterTaskId, TaskManager.TaskKilledByDriver);
taskManager.RecordFailedTaskDuringSystemShuttingDownState(failedMasterTask);
Assert.True(taskManager.AllInFinalState());
}
/// <summary>
/// Tests an evaluator fails for a non running task before all the tasks are running
/// </summary>
[Fact]
public void TestFailedEvaluatorOnNoRunningTaskBeforeAllTasksAreRunningScenario()
{
var taskManager = TaskManagerWithTasksSubmitted();
taskManager.RecordRunningTask(CreateMockRunningTask(MasterTaskId));
taskManager.RecordRunningTask(CreateMockRunningTask(MapperTaskIdPrefix + 1));
// Evaluator error
var failedTask2 = CreateMockFailedTask(MapperTaskIdPrefix + 2, TaskManager.TaskSystemError);
taskManager.RecordTaskFailWhenReceivingFailedEvaluator(CreateMockFailedEvaluator("eId2", failedTask2));
Assert.Equal(TaskState.TaskFailedByEvaluatorFailure, taskManager.GetTaskState(MapperTaskIdPrefix + 2));
// Send event to close master task and task1
taskManager.CloseAllRunningTasks(TaskManager.CloseTaskByDriver);
var failedTask1 = CreateMockFailedTask(MapperTaskIdPrefix + 1, TaskManager.TaskKilledByDriver);
taskManager.RecordFailedTaskDuringSystemShuttingDownState(failedTask1);
var failedMasterTask = CreateMockFailedTask(MasterTaskId, TaskManager.TaskKilledByDriver);
taskManager.RecordFailedTaskDuringSystemShuttingDownState(failedMasterTask);
Assert.True(taskManager.AllInFinalState());
}
/// <summary>
/// Tests a task fails before any task is running after all the tasks are submitted.
/// </summary>
[Fact]
public void TestFailedTaskBeforeAnyTaskIsRunningScenario()
{
var taskManager = TaskManagerWithTasksSubmitted();
// Evaluator error
var failedTask1 = CreateMockFailedTask(MapperTaskIdPrefix + 1, TaskManager.TaskSystemError);
taskManager.RecordFailedTaskDuringRunningOrSubmissionState(failedTask1);
Assert.Equal(TaskState.TaskFailedBySystemError, taskManager.GetTaskState(MapperTaskIdPrefix + 1));
// there is no any running task yet
taskManager.CloseAllRunningTasks(TaskManager.CloseTaskByDriver);
// task 2 is running, now close it
var runingTask2 = CreateMockRunningTask(MapperTaskIdPrefix + 2);
taskManager.RecordRunningTaskDuringSystemFailure(runingTask2, TaskManager.CloseTaskByDriver);
Assert.Equal(TaskState.TaskWaitingForClose, taskManager.GetTaskState(MapperTaskIdPrefix + 2));
// master task is running, close it
var masterTask = CreateMockRunningTask(MasterTaskId);
taskManager.RecordRunningTaskDuringSystemFailure(masterTask, TaskManager.CloseTaskByDriver);
Assert.Equal(TaskState.TaskWaitingForClose, taskManager.GetTaskState(MasterTaskId));
// The task 2 could be failed by communication before receiving close event
var failedTask2 = CreateMockFailedTask(MapperTaskIdPrefix + 2, TaskManager.TaskGroupCommunicationError);
taskManager.RecordFailedTaskDuringSystemShuttingDownState(failedTask2);
// master task failed because receiving close event
var failedMasterTask = CreateMockFailedTask(MasterTaskId, TaskManager.TaskKilledByDriver);
taskManager.RecordFailedTaskDuringSystemShuttingDownState(failedMasterTask);
Assert.True(taskManager.AllInFinalState());
}
/// <summary>
/// Tests a running task fails before all the tasks are running
/// </summary>
[Fact]
public void TestFailedRunningTaskBeforeAllTasksAreRunningScenario()
{
var taskManager = TaskManagerWithTasksSubmitted();
taskManager.RecordRunningTask(CreateMockRunningTask(MapperTaskIdPrefix + 1));
// Evaluator error
var failedTask1 = CreateMockFailedTask(MapperTaskIdPrefix + 1, TaskManager.TaskSystemError);
taskManager.RecordFailedTaskDuringRunningOrSubmissionState(failedTask1);
Assert.Equal(TaskState.TaskFailedBySystemError, taskManager.GetTaskState(MapperTaskIdPrefix + 1));
// there is no any running task yet
taskManager.CloseAllRunningTasks(TaskManager.CloseTaskByDriver);
// task 2 is running, now close it
var runingTask2 = CreateMockRunningTask(MapperTaskIdPrefix + 2);
taskManager.RecordRunningTaskDuringSystemFailure(runingTask2, TaskManager.CloseTaskByDriver);
Assert.Equal(TaskState.TaskWaitingForClose, taskManager.GetTaskState(MapperTaskIdPrefix + 2));
// master task is running, close it
var masterTask = CreateMockRunningTask(MasterTaskId);
taskManager.RecordRunningTaskDuringSystemFailure(masterTask, TaskManager.CloseTaskByDriver);
Assert.Equal(TaskState.TaskWaitingForClose, taskManager.GetTaskState(MasterTaskId));
// The task 2 could be failed by communication before receiving close event
var failedTask2 = CreateMockFailedTask(MapperTaskIdPrefix + 2, TaskManager.TaskGroupCommunicationError);
taskManager.RecordFailedTaskDuringSystemShuttingDownState(failedTask2);
// master task failed because receiving close event
var failedMasterTask = CreateMockFailedTask(MasterTaskId, TaskManager.TaskKilledByDriver);
taskManager.RecordFailedTaskDuringSystemShuttingDownState(failedMasterTask);
Assert.True(taskManager.AllInFinalState());
}
/// <summary>
/// Tests a non running task fails before all the tasks are running
/// </summary>
[Fact]
public void TestFailedNoRunningTaskBeforeAllTasksAreRunningScenario()
{
var taskManager = TaskManagerWithTasksSubmitted();
taskManager.RecordRunningTask(CreateMockRunningTask(MapperTaskIdPrefix + 1));
// Evaluator error
var failedTask2 = CreateMockFailedTask(MapperTaskIdPrefix + 2, TaskManager.TaskSystemError);
taskManager.RecordFailedTaskDuringRunningOrSubmissionState(failedTask2);
Assert.Equal(TaskState.TaskFailedBySystemError, taskManager.GetTaskState(MapperTaskIdPrefix + 2));
// there is no any running task yet
taskManager.CloseAllRunningTasks(TaskManager.CloseTaskByDriver);
// master task is running, close it
var masterTask = CreateMockRunningTask(MasterTaskId);
taskManager.RecordRunningTaskDuringSystemFailure(masterTask, TaskManager.CloseTaskByDriver);
Assert.Equal(TaskState.TaskWaitingForClose, taskManager.GetTaskState(MasterTaskId));
// The task 1 could be failed by communication before receiving close event
var failedTask1 = CreateMockFailedTask(MapperTaskIdPrefix + 1, TaskManager.TaskGroupCommunicationError);
taskManager.RecordFailedTaskDuringSystemShuttingDownState(failedTask1);
// master task failed could be failed by communication error as well
var failedMasterTask = CreateMockFailedTask(MasterTaskId, TaskManager.TaskGroupCommunicationError);
taskManager.RecordFailedTaskDuringSystemShuttingDownState(failedMasterTask);
Assert.True(taskManager.AllInFinalState());
}
/// <summary>
/// Creates a TaskManager with specified numTasks, masterTaskId and IGroupCommDriver
/// </summary>
/// <param name="numTasks"></param>
/// <param name="masterTaskId"></param>
/// <returns></returns>
private static TaskManager CreateTaskManager(int numTasks = TotalNumberOfTasks, string masterTaskId = MasterTaskId)
{
var taskManager = new TaskManager(numTasks, masterTaskId);
return taskManager;
}
/// <summary>
/// Creates a TaskManager and add one master task and two mapping tasks
/// </summary>
/// <returns></returns>
private static TaskManager TaskManagerWithTasksAdded()
{
var taskManager = CreateTaskManager();
taskManager.AddTask(MasterTaskId, MockConfig(), CreateMockActiveContext(0));
taskManager.AddTask(MapperTaskIdPrefix + 1, MockConfig(), CreateMockActiveContext(1));
taskManager.AddTask(MapperTaskIdPrefix + 2, MockConfig(), CreateMockActiveContext(2));
return taskManager;
}
/// <summary>
/// Create a TaskManager with all the tasks submitted
/// </summary>
/// <returns></returns>
private static TaskManager TaskManagerWithTasksSubmitted()
{
var taskManager = TaskManagerWithTasksAdded();
taskManager.SubmitTasks();
return taskManager;
}
/// <summary>
/// Create a TaskManager with all the tasks running
/// </summary>
/// <returns></returns>
private static TaskManager TaskManagerWithTasksRunning()
{
var taskManager = TaskManagerWithTasksSubmitted();
taskManager.RecordRunningTask(CreateMockRunningTask(MasterTaskId));
taskManager.RecordRunningTask(CreateMockRunningTask(MapperTaskIdPrefix + 1));
taskManager.RecordRunningTask(CreateMockRunningTask(MapperTaskIdPrefix + 2));
return taskManager;
}
/// <summary>
/// Creates a mock IActiveContext
/// </summary>
/// <param name="id"></param>
/// <returns></returns>
private static IActiveContext CreateMockActiveContext(int id)
{
var mockActiveContext = Substitute.For<IActiveContext>();
mockActiveContext.Id.Returns(ContextIdPrefix + id);
mockActiveContext.EvaluatorId.Returns(EvaluatorIdPrefix + ContextIdPrefix + id);
return mockActiveContext;
}
/// <summary>
/// Creates a mock FailedTask with specified taskId and error message
/// </summary>
/// <param name="taskId"></param>
/// <param name="errorMsg"></param>
/// <returns></returns>
private static IFailedTask CreateMockFailedTask(string taskId, string errorMsg)
{
IFailedTask failedtask = Substitute.For<IFailedTask>();
failedtask.Id.Returns(taskId);
failedtask.Message.Returns(errorMsg);
return failedtask;
}
/// <summary>
/// Creates a mock running task with the taskId specified
/// </summary>
/// <param name="taskId"></param>
/// <returns></returns>
private static IRunningTask CreateMockRunningTask(string taskId)
{
var runningTask = Substitute.For<IRunningTask>();
runningTask.Id.Returns(taskId);
return runningTask;
}
/// <summary>
/// Creates a mock running task with the taskId specified
/// </summary>
/// <param name="taskId"></param>
/// <returns></returns>
private static ICompletedTask CreateMockCompletedTask(string taskId)
{
var completedTask = Substitute.For<ICompletedTask>();
completedTask.Id.Returns(taskId);
return completedTask;
}
/// <summary>
/// Creates a mock IFailedEvaluator with the specified IFailedTask associated
/// </summary>
/// <param name="evaluatorId"></param>
/// <param name="failedTask"></param>
/// <returns></returns>
private static IFailedEvaluator CreateMockFailedEvaluator(string evaluatorId, IFailedTask failedTask)
{
var failedEvalutor = Substitute.For<IFailedEvaluator>();
failedEvalutor.Id.Returns(evaluatorId);
failedEvalutor.FailedTask.Returns(Optional<IFailedTask>.Of(failedTask));
return failedEvalutor;
}
/// <summary>
/// Creates a mock IConfiguration
/// </summary>
/// <returns></returns>
private static IConfiguration MockConfig()
{
return TangFactory.GetTang().NewConfigurationBuilder().Build();
}
}
}