﻿// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements.  See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership.  The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License.  You may obtain a copy of the License at
//
//   http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied.  See the License for the
// specific language governing permissions and limitations
// under the License.

using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading;
using Org.Apache.REEF.Common.Context;
using Org.Apache.REEF.Common.Tasks;
using Org.Apache.REEF.Common.Tasks.Events;
using Org.Apache.REEF.Driver;
using Org.Apache.REEF.Driver.Context;
using Org.Apache.REEF.Driver.Evaluator;
using Org.Apache.REEF.Driver.Task;
using Org.Apache.REEF.Tang.Annotations;
using Org.Apache.REEF.Tang.Interface;
using Org.Apache.REEF.Tang.Util;
using Org.Apache.REEF.Utilities;
using Org.Apache.REEF.Utilities.Logging;
using Xunit;

namespace Org.Apache.REEF.Tests.Functional.FaultTolerant
{
    /// <summary>
    /// This is scenario testing. It is to test the following scenario to make sure the events, messages we receive are what expected.
    /// * Submit a task on an active context
    /// * After the task is running, driver sends an event to evaluator to close the task
    /// * Task throws exception with a message telling the driver that the task fails as instructed by driver
    /// * Driver receives the FailedTask event and resubmit a task on the existing context
    /// * In task IDriverMessage, verify the message send from drive is the same as what is expected
    /// * In task ICloseEvent, verify the message in the close event is the same as what is expected.
    /// The test can submit two evaluators/Contexts/Tasks and let both to close, and verify:
    /// * In IFailedTask, the task and context mappings are the same as the assignment before the task was submitted. 
    /// * In IFailedTask, the exception message in IFailedTask is the same as the one thrown in the Task 
    /// * In ICompletedTask, verify the task and context mapping are still remain the same as the assignment before the task was submitted. 
    /// Test Verification:
    /// * numberOfContextsToClose == 2
    /// * numberOfTasksToFail == 2
    /// * numberOfEvaluatorsToFail == 0
    /// If any of above verification fails, the test fails. 
    /// </summary>
    [Collection("FunctionalTests")]
    public sealed class TestResubimitTask : ReefFunctionalTest
    {
        private static readonly Logger Logger = Logger.GetLogger(typeof(TestResubimitTask));

        private const string KillTaskCommandFromDriver = "KillTaskCommandFromDriver";
        private const string CompleteTaskCommandFromDriver = "CompleteTaskCommandFromDriver";
        private const string TaskKilledByDriver = "TaskKilledByDriver";
        private const string UnExpectedCloseMessage = "UnExpectedCloseMessage";
        private const string UnExpectedCompleteMessage = "UnExpectedCompleteMessage";

        /// <summary>
        /// This test submits two evaluators/contexts/tasks, then close the two running tasks and resubmit two new tasks 
        /// on the existing active contexts. It is to verify events and messages received are the same as what we expected. 
        /// It is to verify we can submit tasks on existing contexts if previous tasks fail.
        /// </summary>
        [Fact]
        public void TestStopAndResubmitTaskOnLocalRuntime()
        {
            string testFolder = DefaultRuntimeFolder + TestId;
            TestRun(DriverConfigurations(), typeof(ResubmitTaskTestDriver), 2, "TestResubimitTask", "local", testFolder);
            ValidateSuccessForLocalRuntime(2, 1, 0, testFolder);
            CleanUp(testFolder);
        }

        /// <summary>
        /// Driver configuration for the test driver
        /// </summary>
        /// <returns></returns>
        public IConfiguration DriverConfigurations()
        {
            return DriverConfiguration.ConfigurationModule
                .Set(DriverConfiguration.OnDriverStarted, GenericType<ResubmitTaskTestDriver>.Class)
                .Set(DriverConfiguration.OnEvaluatorAllocated, GenericType<ResubmitTaskTestDriver>.Class)
                .Set(DriverConfiguration.OnContextActive, GenericType<ResubmitTaskTestDriver>.Class)
                .Set(DriverConfiguration.OnTaskRunning, GenericType<ResubmitTaskTestDriver>.Class)
                .Set(DriverConfiguration.OnTaskCompleted, GenericType<ResubmitTaskTestDriver>.Class)
                .Set(DriverConfiguration.OnTaskFailed, GenericType<ResubmitTaskTestDriver>.Class)
                .Build();
        }

        /// <summary>
        /// Test driver
        /// </summary>
        private sealed class ResubmitTaskTestDriver :
            IObserver<IDriverStarted>,
            IObserver<IAllocatedEvaluator>,
            IObserver<IActiveContext>,
            IObserver<ICompletedTask>,
            IObserver<IFailedTask>,
            IObserver<IRunningTask>
        {
            private readonly IEvaluatorRequestor _requestor;
            private const string TaskId = "TaskId";
            private int _taskNumber = 1;
            private const string ContextId = "ContextId";
            private readonly IDictionary<string, string> _taskContextMapping = new Dictionary<string, string>();
            private readonly object _lock = new object();

            [Inject]
            private ResubmitTaskTestDriver(IEvaluatorRequestor evaluatorRequestor)
            {
                _requestor = evaluatorRequestor;
            }

            public void OnNext(IDriverStarted value)
            {
                _requestor.Submit(_requestor.NewBuilder().SetNumber(2).Build());
            }

            public void OnNext(IAllocatedEvaluator value)
            {
                value.SubmitContext(
                    ContextConfiguration.ConfigurationModule
                        .Set(ContextConfiguration.Identifier, ContextId)
                        .Build());
            }

            public void OnNext(IActiveContext value)
            {
                value.SubmitTask(GetTaskConfigurationForCloseTask(TaskId + _taskNumber));
                _taskContextMapping.Add(TaskId + _taskNumber, value.Id);
                _taskNumber++;
            }

            public void OnNext(ICompletedTask value)
            {
                Logger.Log(Level.Info, "Task completed: " + value.Id);
                value.ActiveContext.Dispose();
            }

            /// <summary>
            /// Verify when exception is shown in TaskCloseHandler, IFailedEvaluator will be received here with the message set in the task
            /// </summary>
            public void OnNext(IFailedTask value)
            {
                value.GetActiveContext().Value.SubmitTask(GetTaskConfigurationForCloseTask(TaskId + _taskNumber));
                _taskContextMapping.Add(TaskId + _taskNumber, value.Id);
                _taskNumber++;
            }

            /// <summary>
            /// Close the first two tasks and send message to the 3rd and 4th tasks
            /// </summary>
            /// <param name="value"></param>
            public void OnNext(IRunningTask value)
            {
                Logger.Log(Level.Info, "Task running: " + value.Id);
                switch (value.Id)
                {
                    case TaskId + "1":
                        value.Dispose(Encoding.UTF8.GetBytes(KillTaskCommandFromDriver));
                        break;
                    case TaskId + "2":
                    case TaskId + "3":
                        value.Send(Encoding.UTF8.GetBytes(CompleteTaskCommandFromDriver));
                        break;
                    default: 
                        throw new Exception("It should not be reached.");
                }
            }

            public void OnCompleted()
            {
                throw new NotImplementedException();
            }

            public void OnError(Exception error)
            {
                throw new NotImplementedException();
            }

            private static IConfiguration GetTaskConfigurationForCloseTask(string taskId)
            {
                return TaskConfiguration.ConfigurationModule
                    .Set(TaskConfiguration.Identifier, taskId)
                    .Set(TaskConfiguration.Task, GenericType<ResubmitTask>.Class)
                    .Set(TaskConfiguration.OnClose, GenericType<ResubmitTask>.Class)
                    .Set(TaskConfiguration.OnMessage, GenericType<ResubmitTask>.Class)
                    .Build();
            }
        }

        private sealed class ResubmitTask : ITask, IDriverMessageHandler, IObserver<ICloseEvent>
        {
            private readonly CountdownEvent _suspendSignal = new CountdownEvent(1);

            [Inject]
            private ResubmitTask()
            {
            }

            public byte[] Call(byte[] memento)
            {
                Logger.Log(Level.Info, "Hello in ResubmitTask");
                _suspendSignal.Wait();
                return null;
            }

            public void Dispose()
            {
                Logger.Log(Level.Info, "Task is disposed.");
            }

            /// <summary>
            /// When receiving closed task event, verify the command from the driver. If it matches expected message, 
            /// throw exception to close the task. Otherwise, signal the task to return, that would result in test failure 
            /// as the test expect two failed tasks. 
            /// </summary>
            /// <param name="value"></param>
            public void OnNext(ICloseEvent value)
            {
                if (value.Value != null && value.Value.Value != null)
                {
                    Logger.Log(Level.Info,
                        "Closed event received in task:" + Encoding.UTF8.GetString(value.Value.Value));
                    if (Encoding.UTF8.GetString(value.Value.Value).Equals(KillTaskCommandFromDriver))
                    {
                        throw new Exception(TaskKilledByDriver);
                    }
                    Logger.Log(Level.Error, UnExpectedCloseMessage);
                    _suspendSignal.Signal();
                }
            }

            /// <summary>
            /// Expect the message from driver. If the message is the same as what is sent from driver, signal the task to properly return
            /// Otherwise, throw exception which would cause an unexpected failed task therefore failed test verification. 
            /// </summary>
            /// <param name="value"></param>
            public void Handle(IDriverMessage value)
            {
                var message = ByteUtilities.ByteArraysToString(value.Message.Value);
                Logger.Log(Level.Info, "Complete task message received in task:" + message);
                if (message.Equals(CompleteTaskCommandFromDriver))
                {
                    _suspendSignal.Signal();
                }
                else
                {
                    Logger.Log(Level.Error, UnExpectedCompleteMessage);
                    throw new Exception(UnExpectedCompleteMessage);                    
                }
            }

            public void OnCompleted()
            {
                throw new NotImplementedException();
            }

            public void OnError(Exception error)
            {
                throw new NotImplementedException();
            }
        }
    }
}