﻿// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements.  See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership.  The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License.  You may obtain a copy of the License at
// 
//   http://www.apache.org/licenses/LICENSE-2.0
// 
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied.  See the License for the
// specific language governing permissions and limitations
// under the License.

using System;
using System.IO;
using System.Net.Sockets;
using System.Threading;
using Org.Apache.REEF.Common.Exceptions;
using Org.Apache.REEF.Common.Runtime;
using Org.Apache.REEF.Common.Tasks;
using Org.Apache.REEF.Common.Tasks.Events;
using Org.Apache.REEF.IMRU.OnREEF.Driver;
using Org.Apache.REEF.Network.Group.Task;
using Org.Apache.REEF.Utilities.Logging;
using Org.Apache.REEF.Wake.Remote.Impl;

namespace Org.Apache.REEF.IMRU.OnREEF.IMRUTasks
{
    internal abstract class TaskHostBase : ITask, IObserver<ICloseEvent>
    {
        private static readonly Logger Logger = Logger.GetLogger(typeof(TaskHostBase));

        /// <summary>
        /// Shows if the object has been disposed.
        /// </summary>
        protected int _disposed;

        /// <summary>
        /// Group Communication client for the task
        /// </summary>
        protected readonly IGroupCommClient _groupCommunicationsClient;

        /// <summary>
        /// Task close Coordinator to handle the work when receiving task close event
        /// </summary>
        private readonly TaskCloseCoordinator _taskCloseCoordinator;

        /// <summary>
        /// Specify whether to invoke garbage collector or not
        /// </summary>
        protected readonly bool _invokeGc;

        /// <summary>
        /// CommunicationGroupClient for the task
        /// </summary>
        protected readonly ICommunicationGroupClient _communicationGroupClient;

        /// <summary>
        /// The cancellation token to control the group communication operation cancellation
        /// </summary>
        protected readonly CancellationTokenSource _cancellationSource;

        /// <summary>
        /// Machine status for log purpose
        /// </summary>
        private readonly MachineStatus _machineStatus = new MachineStatus();

        /// <summary>
        /// Task host base class to hold the common stuff of both mapper and update tasks
        /// </summary>
        /// <param name="groupCommunicationsClient">Group Communication Client</param>
        /// <param name="taskCloseCoordinator">The class that handles the close event for the task</param>
        /// <param name="invokeGc">specify if want to invoke garbage collector or not </param>
        protected TaskHostBase(
            IGroupCommClient groupCommunicationsClient,
            TaskCloseCoordinator taskCloseCoordinator,
            bool invokeGc)
        {
            Logger.Log(Level.Info, "Entering TaskHostBase constructor with machine status {0}.", _machineStatus.ToString());
            _groupCommunicationsClient = groupCommunicationsClient;
            _communicationGroupClient = groupCommunicationsClient.GetCommunicationGroup(IMRUConstants.CommunicationGroupName);

            _invokeGc = invokeGc;
            _taskCloseCoordinator = taskCloseCoordinator;
            _cancellationSource = new CancellationTokenSource();
        }

        /// <summary>
        /// Handle the exceptions in the Call() method
        /// Default to IMRUSystemException to make it recoverable
        /// </summary>
        public byte[] Call(byte[] memento)
        {
            Logger.Log(Level.Info, "Entering {0} Call() with machine status {1}.", TaskHostName, _machineStatus.ToString());
            try
            {
                _groupCommunicationsClient.Initialize(_cancellationSource);
                return TaskBody(memento);
            }
            catch (Exception e)
            {
                if (e is IMRUTaskAppException)
                {
                    throw;
                }
                if (IsCommunicationException(e))
                {
                    HandleCommunicationException(e);
                }
                else
                {
                    HandleSystemException(e);
                }
            }
            finally
            {
                Logger.Log(Level.Info, "TaskHostBase::Finally");
                _taskCloseCoordinator.SignalTaskStopped();
            }
            Logger.Log(Level.Info, "{0} returned with cancellation token:{1}.", TaskHostName, _cancellationSource.IsCancellationRequested);
            return null;
        }

        private static bool IsCommunicationException(Exception e)
        {
            if (e is OperationCanceledException || e is IOException || e is TcpClientConnectionException ||
                e is RecoverableNetworkException || e is SocketException ||
                (e is AggregateException && e.InnerException != null && e.InnerException is IOException))
            {
                return true;
            }
            return false;
        }

        /// <summary>
        /// The body of Call method. Subclass must override it. 
        /// </summary>
        protected abstract byte[] TaskBody(byte[] memento);

        /// <summary>
        /// Task host name
        /// </summary>
        protected abstract string TaskHostName { get; }

        /// <summary>
        /// Task close handler. Call TaskCloseCoordinator to handle the event.
        /// </summary>
        public void OnNext(ICloseEvent closeEvent)
        {
            _taskCloseCoordinator.HandleEvent(closeEvent, _cancellationSource);
        }

        /// <summary>
        /// Dispose function. Dispose IGroupCommunicationsClient.
        /// </summary>
        public virtual void Dispose()
        {
            if (Interlocked.Exchange(ref _disposed, 1) == 0)
            {
                _groupCommunicationsClient.Dispose();
            }
        }

        public void OnError(Exception error)
        {
            throw new NotImplementedException();
        }

        public void OnCompleted()
        {
            throw new NotImplementedException();
        }

        /// <summary>
        /// Convert the exception into IMRUTaskGroupCommunicationException
        /// </summary>
        protected void HandleCommunicationException(Exception e)
        {
            HandleException(e, new IMRUTaskGroupCommunicationException(TaskManager.TaskGroupCommunicationError, e));
        }

        /// <summary>
        /// Convert the exception into IMRUSystemException
        /// </summary>
        protected void HandleSystemException(Exception e)
        {
            HandleException(e, new IMRUTaskSystemException(TaskManager.TaskSystemError, e));
        }

        /// <summary>
        /// Convert the exception into IMRUTaskAppException
        /// </summary>
        protected void HandleTaskAppException(Exception e)
        {
            HandleException(e, new IMRUTaskAppException(TaskManager.TaskAppError, e));
        }

        /// <summary>
        /// Log and throw target exception if cancellation token is not set
        /// In the cancellation case, simply log and return.
        /// </summary>
        private void HandleException(Exception originalException, Exception targetException)
        {
            Logger.Log(Level.Error,
                "Received exception in {0} with cancellation token {1}: [{2}]",
                TaskHostName,
                _cancellationSource.IsCancellationRequested,
                originalException);
            if (!_cancellationSource.IsCancellationRequested)
            {
                Logger.Log(Level.Error,
                    "{0} is throwing {1} with cancellation token: {2}.",
                    TaskHostName,
                    targetException.GetType(),
                    _cancellationSource.IsCancellationRequested);
                throw targetException;
            }
        }
    }
}