﻿// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements.  See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership.  The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License.  You may obtain a copy of the License at
//
//   http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied.  See the License for the
// specific language governing permissions and limitations
// under the License.

using Org.Apache.REEF.Bridge.Core.Common.Client.Config;
using Org.Apache.REEF.Common.Context;
using Org.Apache.REEF.Driver;
using Org.Apache.REEF.Driver.Context;
using Org.Apache.REEF.Driver.Evaluator;
using Org.Apache.REEF.Driver.Task;
using Org.Apache.REEF.Tang.Annotations;
using Org.Apache.REEF.Utilities.Logging;
using System;
using System.Collections.Generic;
using System.Diagnostics;
using System.Globalization;
using System.Threading;
using System.Threading.Tasks;

namespace Org.Apache.REEF.Bridge.Core.Common.Driver
{
    /// <summary>
    /// DriverBridge is responsible for running application handlers and keeping
    /// track of how many are currently active. It exposes a method <see cref="IsIdle"/>
    /// that indicates if there are any active handlers, which is used to determine
    /// (among other things) whether the driver is currently idle.
    /// </summary>
    internal sealed class DriverBridge
    {
        private static readonly Logger Log = Logger.GetLogger(typeof(DriverBridge));

        // Control event dispatchers

        private readonly DispatchEventHandler<IDriverStarted> _driverStartedDispatcher;

        private readonly DispatchEventHandler<IDriverStopped> _driverStoppedDispatcher;

        // Evaluator event dispatchers

        private readonly DispatchEventHandler<IAllocatedEvaluator> _allocatedEvaluatorDispatcher;

        private readonly DispatchEventHandler<IFailedEvaluator> _failedEvaluatorDispatcher;

        private readonly DispatchEventHandler<ICompletedEvaluator> _completedEvaluatorDispatcher;

        // Context event dispatchers

        private readonly DispatchEventHandler<IActiveContext> _activeContextDispatcher;

        private readonly DispatchEventHandler<IClosedContext> _closedContextDispatcher;

        private readonly DispatchEventHandler<IFailedContext> _failedContextDispatcher;

        private readonly DispatchEventHandler<IContextMessage> _contextMessageDispatcher;

        // Task event dispatchers

        private readonly DispatchEventHandler<ITaskMessage> _taskMessageDispatcher;

        private readonly DispatchEventHandler<IFailedTask> _failedTaskDispatcher;

        private readonly DispatchEventHandler<IRunningTask> _runningTaskDispatcher;

        private readonly DispatchEventHandler<ICompletedTask> _completedTaskDispatcher;

        private readonly DispatchEventHandler<ISuspendedTask> _suspendedTaskDispatcher;

        // Driver restart event dispatchers

        private readonly DispatchEventHandler<IDriverRestarted> _driverRestartedDispatcher;

        private readonly DispatchEventHandler<IActiveContext> _driverRestartActiveContextDispatcher;

        private readonly DispatchEventHandler<IRunningTask> _driverRestartRunningTaskDispatcher;

        private readonly DispatchEventHandler<IDriverRestartCompleted> _driverRestartCompletedDispatcher;

        private readonly DispatchEventHandler<IFailedEvaluator> _driverRestartFailedEvaluatorDispatcher;

        // Client event handlers

        private readonly DispatchEventHandler<byte[]> _clientCloseDispatcher;

        private readonly DispatchEventHandler<byte[]> _clientCloseWithMessageDispatcher;

        private readonly DispatchEventHandler<byte[]> _clientMessageDispatcher;

        private static int s_activeDispatchCounter;

        public static bool IsIdle => s_activeDispatchCounter == 0;

        [Inject]
        private DriverBridge(
            // Runtime events
            [Parameter(Value = typeof(DriverApplicationParameters.DriverStartedHandlers))]
            ISet<IObserver<IDriverStarted>> driverStartHandlers,
            [Parameter(Value = typeof(DriverApplicationParameters.DriverStopHandlers))]
            ISet<IObserver<IDriverStopped>> driverStopHandlers,
            // Evaluator events
            [Parameter(Value = typeof(DriverApplicationParameters.AllocatedEvaluatorHandlers))]
            ISet<IObserver<IAllocatedEvaluator>> allocatedEvaluatorHandlers,
            [Parameter(Value = typeof(DriverApplicationParameters.FailedEvaluatorHandlers))]
            ISet<IObserver<IFailedEvaluator>> failedEvaluatorHandlers,
            [Parameter(Value = typeof(DriverApplicationParameters.CompletedEvaluatorHandlers))]
            ISet<IObserver<ICompletedEvaluator>> completedEvaluatorHandlers,
            // Context events
            [Parameter(Value = typeof(DriverApplicationParameters.ActiveContextHandlers))]
            ISet<IObserver<IActiveContext>> activeContextHandlers,
            [Parameter(Value = typeof(DriverApplicationParameters.ClosedContextHandlers))]
            ISet<IObserver<IClosedContext>> closedContextHandlers,
            [Parameter(Value = typeof(DriverApplicationParameters.FailedContextHandlers))]
            ISet<IObserver<IFailedContext>> failedContextHandlers,
            [Parameter(Value = typeof(DriverApplicationParameters.ContextMessageHandlers))]
            ISet<IObserver<IContextMessage>> contextMessageHandlers,
            // Task events
            [Parameter(Value = typeof(DriverApplicationParameters.TaskMessageHandlers))]
            ISet<IObserver<ITaskMessage>> taskMessageHandlers,
            [Parameter(Value = typeof(DriverApplicationParameters.FailedTaskHandlers))]
            ISet<IObserver<IFailedTask>> failedTaskHandlers,
            [Parameter(Value = typeof(DriverApplicationParameters.RunningTaskHandlers))]
            ISet<IObserver<IRunningTask>> runningTaskHandlers,
            [Parameter(Value = typeof(DriverApplicationParameters.CompletedTaskHandlers))]
            ISet<IObserver<ICompletedTask>> completedTaskHandlers,
            [Parameter(Value = typeof(DriverApplicationParameters.SuspendedTaskHandlers))]
            ISet<IObserver<ISuspendedTask>> suspendedTaskHandlers,
            // Driver restart events
            [Parameter(Value = typeof(DriverApplicationParameters.DriverRestartedHandlers))]
            ISet<IObserver<IDriverRestarted>> driverRestartedHandlers,
            [Parameter(Value = typeof(DriverApplicationParameters.DriverRestartActiveContextHandlers))]
            ISet<IObserver<IActiveContext>> driverRestartActiveContextHandlers,
            [Parameter(Value = typeof(DriverApplicationParameters.DriverRestartRunningTaskHandlers))]
            ISet<IObserver<IRunningTask>> driverRestartRunningTaskHandlers,
            [Parameter(Value = typeof(DriverApplicationParameters.DriverRestartCompletedHandlers))]
            ISet<IObserver<IDriverRestartCompleted>> driverRestartCompletedHandlers,
            [Parameter(Value = typeof(DriverApplicationParameters.DriverRestartFailedEvaluatorHandlers))]
            ISet<IObserver<IFailedEvaluator>> driverRestartFailedEvaluatorHandlers,
            // Client event
            [Parameter(Value = typeof(DriverApplicationParameters.ClientCloseWithMessageHandlers))]
            ISet<IObserver<byte[]>> clientCloseWithMessageHandlers,
            [Parameter(Value = typeof(DriverApplicationParameters.ClientCloseHandlers))]
            ISet<IObserver<byte[]>> clientCloseHandlers,
            [Parameter(Value = typeof(DriverApplicationParameters.ClientMessageHandlers))]
            ISet<IObserver<byte[]>> clientMessageHandlers,
            // Misc.
            [Parameter(Value = typeof(DriverApplicationParameters.TraceListeners))]
            ISet<TraceListener> traceListeners,
            [Parameter(Value = typeof(DriverApplicationParameters.TraceLevel))]
            string traceLevel)
        {
            _driverStartedDispatcher = new DispatchEventHandler<IDriverStarted>(driverStartHandlers);
            _driverStoppedDispatcher = new DispatchEventHandler<IDriverStopped>(driverStopHandlers);
            _allocatedEvaluatorDispatcher = new DispatchEventHandler<IAllocatedEvaluator>(allocatedEvaluatorHandlers);
            _failedEvaluatorDispatcher = new DispatchEventHandler<IFailedEvaluator>(failedEvaluatorHandlers);
            _completedEvaluatorDispatcher = new DispatchEventHandler<ICompletedEvaluator>(completedEvaluatorHandlers);
            _activeContextDispatcher = new DispatchEventHandler<IActiveContext>(activeContextHandlers);
            _closedContextDispatcher = new DispatchEventHandler<IClosedContext>(closedContextHandlers);
            _failedContextDispatcher = new DispatchEventHandler<IFailedContext>(failedContextHandlers);
            _contextMessageDispatcher = new DispatchEventHandler<IContextMessage>(contextMessageHandlers);
            _taskMessageDispatcher = new DispatchEventHandler<ITaskMessage>(taskMessageHandlers);
            _failedTaskDispatcher = new DispatchEventHandler<IFailedTask>(failedTaskHandlers);
            _runningTaskDispatcher = new DispatchEventHandler<IRunningTask>(runningTaskHandlers);
            _completedTaskDispatcher = new DispatchEventHandler<ICompletedTask>(completedTaskHandlers);
            _suspendedTaskDispatcher = new DispatchEventHandler<ISuspendedTask>(suspendedTaskHandlers);
            _driverRestartedDispatcher = new DispatchEventHandler<IDriverRestarted>(driverRestartedHandlers);
            _driverRestartActiveContextDispatcher = new DispatchEventHandler<IActiveContext>(driverRestartActiveContextHandlers);
            _driverRestartRunningTaskDispatcher = new DispatchEventHandler<IRunningTask>(driverRestartRunningTaskHandlers);
            _driverRestartCompletedDispatcher = new DispatchEventHandler<IDriverRestartCompleted>(driverRestartCompletedHandlers);
            _driverRestartFailedEvaluatorDispatcher = new DispatchEventHandler<IFailedEvaluator>(driverRestartFailedEvaluatorHandlers);
            _clientCloseDispatcher = new DispatchEventHandler<byte[]>(clientCloseHandlers);
            _clientCloseWithMessageDispatcher = new DispatchEventHandler<byte[]>(clientCloseWithMessageHandlers);
            _clientMessageDispatcher = new DispatchEventHandler<byte[]>(clientMessageHandlers);

            foreach (var listener in traceListeners)
            {
                Logger.AddTraceListener(listener);
            }
            Log.Log(Level.Info, "Constructing DriverBridge");

            if (Enum.TryParse(traceLevel.ToString(CultureInfo.InvariantCulture), out Level level))
            {
                Logger.SetCustomLevel(level);
            }
            else
            {
                Log.Log(Level.Warning, "Invalid trace level {0} provided, will by default use verbose level", traceLevel);
            }
            s_activeDispatchCounter = 0;
        }

        public async Task DispatchDriverRestartFailedEvaluatorEvent(IFailedEvaluator failedEvaluatorEvent)
        {
            await DispatchAsync(_driverRestartFailedEvaluatorDispatcher, failedEvaluatorEvent);
        }

        public async Task DispatchDriverRestartCompletedEvent(IDriverRestartCompleted driverRestartCompletedEvent)
        {
            await DispatchAsync(_driverRestartCompletedDispatcher, driverRestartCompletedEvent);
        }

        public async Task DispatchDriverRestartRunningTaskEvent(IRunningTask runningTaskEvent)
        {
            await DispatchAsync(_driverRestartRunningTaskDispatcher, runningTaskEvent);
        }

        public async Task DispatchDriverRestartActiveContextEvent(IActiveContext activeContextEvent)
        {
            await DispatchAsync(_driverRestartActiveContextDispatcher, activeContextEvent);
        }

        public async Task DispatchDriverRestartedEvent(IDriverRestarted driverRestartedEvent)
        {
            await DispatchAsync(_driverRestartedDispatcher, driverRestartedEvent);
        }

        public async Task DispatchCompletedTaskEvent(ICompletedTask completedTaskEvent)
        {
            await DispatchAsync(_completedTaskDispatcher, completedTaskEvent);
        }

        public async Task DispatchRunningTaskEvent(IRunningTask runningTaskEvent)
        {
            await DispatchAsync(_runningTaskDispatcher, runningTaskEvent);
        }

        public async Task DispatchFailedTaskEvent(IFailedTask failedTaskEvent)
        {
            await DispatchAsync(_failedTaskDispatcher, failedTaskEvent);
        }

        public async Task DispatchTaskMessageEvent(ITaskMessage taskMessageEvent)
        {
            await DispatchAsync(_taskMessageDispatcher, taskMessageEvent);
        }

        public async Task DispatchSuspendedTaskEvent(ISuspendedTask suspendedTask)
        {
            await DispatchAsync(_suspendedTaskDispatcher, suspendedTask);
        }

        public async Task DispatchContextMessageEvent(IContextMessage contextMessageEvent)
        {
            await DispatchAsync(_contextMessageDispatcher, contextMessageEvent);
        }

        public async Task DispatchFailedContextEvent(IFailedContext failedContextEvent)
        {
            await DispatchAsync(_failedContextDispatcher, failedContextEvent);
        }

        public async Task DispatchClosedContextEvent(IClosedContext closedContextEvent)
        {
            await DispatchAsync(_closedContextDispatcher, closedContextEvent);
        }

        public async Task DispatchActiveContextEvent(IActiveContext activeContextEvent)
        {
            await DispatchAsync(_activeContextDispatcher, activeContextEvent);
        }

        public async Task DispatchCompletedEvaluatorEvent(ICompletedEvaluator completedEvaluatorEvent)
        {
            await DispatchAsync(_completedEvaluatorDispatcher, completedEvaluatorEvent);
        }

        public async Task DispatchFailedEvaluatorEvent(IFailedEvaluator failedEvaluatorEvent)
        {
            await DispatchAsync(_failedEvaluatorDispatcher, failedEvaluatorEvent);
        }

        public async Task DispatchAllocatedEvaluatorEventAsync(IAllocatedEvaluator allocatedEvaluatorEvent)
        {
            await DispatchAsync(_allocatedEvaluatorDispatcher, allocatedEvaluatorEvent);
        }

        public async Task DispatchStartEventAsync(IDriverStarted startEvent)
        {
            await DispatchAsync(_driverStartedDispatcher, startEvent);
        }

        public async Task DispatchStopEvent(IDriverStopped stopEvent)
        {
            await DispatchAsync(_driverStoppedDispatcher, stopEvent);
        }

        public async Task DispatchClientCloseEvent()
        {
            await DispatchAsync(_clientCloseDispatcher, null);
        }

        public async Task DispatchClientCloseWithMessageEvent(byte[] message)
        {
            await DispatchAsync(_clientCloseWithMessageDispatcher, message);
        }

        public async Task DispatchClientMessageEvent(byte[] message)
        {
            await DispatchAsync(_clientMessageDispatcher, message);
        }

        private static async Task DispatchAsync<T>(DispatchEventHandler<T> handler, T message)
        {
            using (var operation = new DisposableOperation(() => handler.OnNext(message)))
            {
                await operation.Run();
            }
        }

        private sealed class DisposableOperation : IDisposable
        {
            private readonly Action _operation;

            public DisposableOperation(Action operation)
            {
                _operation = operation;
            }

            public async Task Run()
            {
                try
                {
                    Interlocked.Increment(ref s_activeDispatchCounter);
                    await Task.Run(_operation);
                }
                catch (Exception ex)
                {
                    Log.Log(Level.Error, "Operation error", ex);
                    throw;
                }
                finally
                {
                    Interlocked.Decrement(ref s_activeDispatchCounter);
                }
            }

            public void Dispose()
            {
            }
        }
    }
}