blob: 9e8cd3878315d2941a4785a70eae9585efabda3e [file] [log] [blame]
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading;
using Org.Apache.REEF.Common.Context;
using Org.Apache.REEF.Common.Tasks;
using Org.Apache.REEF.Common.Tasks.Events;
using Org.Apache.REEF.Driver;
using Org.Apache.REEF.Driver.Context;
using Org.Apache.REEF.Driver.Evaluator;
using Org.Apache.REEF.Driver.Task;
using Org.Apache.REEF.Tang.Annotations;
using Org.Apache.REEF.Tang.Interface;
using Org.Apache.REEF.Tang.Util;
using Org.Apache.REEF.Utilities;
using Org.Apache.REEF.Utilities.Logging;
using Xunit;
namespace Org.Apache.REEF.Tests.Functional.FaultTolerant
{
/// <summary>
/// This is scenario testing. It is to test the following scenario to make sure the events, messages we receive are what expected.
/// * Submit a task on an active context
/// * After the task is running, driver sends an event to evaluator to close the task
/// * Task throws exception with a message telling the driver that the task fails as instructed by driver
/// * Driver receives the FailedTask event and resubmit a task on the existing context
/// * In task IDriverMessage, verify the message send from drive is the same as what is expected
/// * In task ICloseEvent, verify the message in the close event is the same as what is expected.
/// The test can submit two evaluators/Contexts/Tasks and let both to close, and verify:
/// * In IFailedTask, the task and context mappings are the same as the assignment before the task was submitted.
/// * In IFailedTask, the exception message in IFailedTask is the same as the one thrown in the Task
/// * In ICompletedTask, verify the task and context mapping are still remain the same as the assignment before the task was submitted.
/// Test Verification:
/// * numberOfContextsToClose == 2
/// * numberOfTasksToFail == 2
/// * numberOfEvaluatorsToFail == 0
/// If any of above verification fails, the test fails.
/// </summary>
[Collection("FunctionalTests")]
public sealed class TestResubimitTask : ReefFunctionalTest
{
private static readonly Logger Logger = Logger.GetLogger(typeof(TestResubimitTask));
private const string KillTaskCommandFromDriver = "KillTaskCommandFromDriver";
private const string CompleteTaskCommandFromDriver = "CompleteTaskCommandFromDriver";
private const string TaskKilledByDriver = "TaskKilledByDriver";
private const string UnExpectedCloseMessage = "UnExpectedCloseMessage";
private const string UnExpectedCompleteMessage = "UnExpectedCompleteMessage";
/// <summary>
/// This test submits two evaluators/contexts/tasks, then close the two running tasks and resubmit two new tasks
/// on the existing active contexts. It is to verify events and messages received are the same as what we expected.
/// It is to verify we can submit tasks on existing contexts if previous tasks fail.
/// </summary>
[Fact]
public void TestStopAndResubmitTaskOnLocalRuntime()
{
string testFolder = DefaultRuntimeFolder + TestId;
TestRun(DriverConfigurations(), typeof(ResubmitTaskTestDriver), 2, "TestResubimitTask", "local", testFolder);
ValidateSuccessForLocalRuntime(2, 1, 0, testFolder);
CleanUp(testFolder);
}
/// <summary>
/// Driver configuration for the test driver
/// </summary>
/// <returns></returns>
public IConfiguration DriverConfigurations()
{
return DriverConfiguration.ConfigurationModule
.Set(DriverConfiguration.OnDriverStarted, GenericType<ResubmitTaskTestDriver>.Class)
.Set(DriverConfiguration.OnEvaluatorAllocated, GenericType<ResubmitTaskTestDriver>.Class)
.Set(DriverConfiguration.OnContextActive, GenericType<ResubmitTaskTestDriver>.Class)
.Set(DriverConfiguration.OnTaskRunning, GenericType<ResubmitTaskTestDriver>.Class)
.Set(DriverConfiguration.OnTaskCompleted, GenericType<ResubmitTaskTestDriver>.Class)
.Set(DriverConfiguration.OnTaskFailed, GenericType<ResubmitTaskTestDriver>.Class)
.Build();
}
/// <summary>
/// Test driver
/// </summary>
private sealed class ResubmitTaskTestDriver :
IObserver<IDriverStarted>,
IObserver<IAllocatedEvaluator>,
IObserver<IActiveContext>,
IObserver<ICompletedTask>,
IObserver<IFailedTask>,
IObserver<IRunningTask>
{
private readonly IEvaluatorRequestor _requestor;
private const string TaskId = "TaskId";
private int _taskNumber = 1;
private const string ContextId = "ContextId";
private readonly IDictionary<string, string> _taskContextMapping = new Dictionary<string, string>();
private readonly object _lock = new object();
[Inject]
private ResubmitTaskTestDriver(IEvaluatorRequestor evaluatorRequestor)
{
_requestor = evaluatorRequestor;
}
public void OnNext(IDriverStarted value)
{
_requestor.Submit(_requestor.NewBuilder().SetNumber(2).Build());
}
public void OnNext(IAllocatedEvaluator value)
{
value.SubmitContext(
ContextConfiguration.ConfigurationModule
.Set(ContextConfiguration.Identifier, ContextId)
.Build());
}
public void OnNext(IActiveContext value)
{
value.SubmitTask(GetTaskConfigurationForCloseTask(TaskId + _taskNumber));
_taskContextMapping.Add(TaskId + _taskNumber, value.Id);
_taskNumber++;
}
public void OnNext(ICompletedTask value)
{
Logger.Log(Level.Info, "Task completed: " + value.Id);
value.ActiveContext.Dispose();
}
/// <summary>
/// Verify when exception is shown in TaskCloseHandler, IFailedEvaluator will be received here with the message set in the task
/// </summary>
public void OnNext(IFailedTask value)
{
value.GetActiveContext().Value.SubmitTask(GetTaskConfigurationForCloseTask(TaskId + _taskNumber));
_taskContextMapping.Add(TaskId + _taskNumber, value.Id);
_taskNumber++;
}
/// <summary>
/// Close the first two tasks and send message to the 3rd and 4th tasks
/// </summary>
/// <param name="value"></param>
public void OnNext(IRunningTask value)
{
Logger.Log(Level.Info, "Task running: " + value.Id);
switch (value.Id)
{
case TaskId + "1":
value.Dispose(Encoding.UTF8.GetBytes(KillTaskCommandFromDriver));
break;
case TaskId + "2":
case TaskId + "3":
value.Send(Encoding.UTF8.GetBytes(CompleteTaskCommandFromDriver));
break;
default:
throw new Exception("It should not be reached.");
}
}
public void OnCompleted()
{
throw new NotImplementedException();
}
public void OnError(Exception error)
{
throw new NotImplementedException();
}
private static IConfiguration GetTaskConfigurationForCloseTask(string taskId)
{
return TaskConfiguration.ConfigurationModule
.Set(TaskConfiguration.Identifier, taskId)
.Set(TaskConfiguration.Task, GenericType<ResubmitTask>.Class)
.Set(TaskConfiguration.OnClose, GenericType<ResubmitTask>.Class)
.Set(TaskConfiguration.OnMessage, GenericType<ResubmitTask>.Class)
.Build();
}
}
private sealed class ResubmitTask : ITask, IDriverMessageHandler, IObserver<ICloseEvent>
{
private readonly CountdownEvent _suspendSignal = new CountdownEvent(1);
[Inject]
private ResubmitTask()
{
}
public byte[] Call(byte[] memento)
{
Logger.Log(Level.Info, "Hello in ResubmitTask");
_suspendSignal.Wait();
return null;
}
public void Dispose()
{
Logger.Log(Level.Info, "Task is disposed.");
}
/// <summary>
/// When receiving closed task event, verify the command from the driver. If it matches expected message,
/// throw exception to close the task. Otherwise, signal the task to return, that would result in test failure
/// as the test expect two failed tasks.
/// </summary>
/// <param name="value"></param>
public void OnNext(ICloseEvent value)
{
if (value.Value != null && value.Value.Value != null)
{
Logger.Log(Level.Info,
"Closed event received in task:" + Encoding.UTF8.GetString(value.Value.Value));
if (Encoding.UTF8.GetString(value.Value.Value).Equals(KillTaskCommandFromDriver))
{
throw new Exception(TaskKilledByDriver);
}
Logger.Log(Level.Error, UnExpectedCloseMessage);
_suspendSignal.Signal();
}
}
/// <summary>
/// Expect the message from driver. If the message is the same as what is sent from driver, signal the task to properly return
/// Otherwise, throw exception which would cause an unexpected failed task therefore failed test verification.
/// </summary>
/// <param name="value"></param>
public void Handle(IDriverMessage value)
{
var message = ByteUtilities.ByteArraysToString(value.Message.Value);
Logger.Log(Level.Info, "Complete task message received in task:" + message);
if (message.Equals(CompleteTaskCommandFromDriver))
{
_suspendSignal.Signal();
}
else
{
Logger.Log(Level.Error, UnExpectedCompleteMessage);
throw new Exception(UnExpectedCompleteMessage);
}
}
public void OnCompleted()
{
throw new NotImplementedException();
}
public void OnError(Exception error)
{
throw new NotImplementedException();
}
}
}
}