﻿/*
 * Licensed to the Apache Software Foundation (ASF) under one or more
 * contributor license agreements.  See the NOTICE file distributed with
 * this work for additional information regarding copyright ownership.
 * The ASF licenses this file to You under the Apache License, Version 2.0
 * (the "License"); you may not use this file except in compliance with
 * the License.  You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

using System;
using System.Collections.Generic;
using System.IO;
using System.Threading.Tasks;
using Apache.NMS.AMQP.Message;
using Apache.NMS.AMQP.Meta;
using Apache.NMS.AMQP.Util;
using Apache.NMS.Util;

namespace Apache.NMS.AMQP.Provider.Failover
{
    public class FailoverProvider : IProvider, IProviderListener
    {
        private readonly object SyncRoot = new object();

        private static int UNDEFINED = -1;

        public static int DEFAULT_INITIAL_RECONNECT_DELAY = 0;
        public static long DEFAULT_RECONNECT_DELAY = 10;
        public static double DEFAULT_RECONNECT_BACKOFF_MULTIPLIER = 2.0d;
        public static long DEFAULT_MAX_RECONNECT_DELAY = (long) Math.Round(TimeSpan.FromSeconds(30).TotalMilliseconds);
        public static int DEFAULT_STARTUP_MAX_RECONNECT_ATTEMPTS = UNDEFINED;
        public static int DEFAULT_MAX_RECONNECT_ATTEMPTS = UNDEFINED;
        public static bool DEFAULT_USE_RECONNECT_BACKOFF = true;
        public static int DEFAULT_WARN_AFTER_RECONNECT_ATTEMPTS = 10;
        public static bool DEFAULT_RANDOMIZE_ENABLED = false;

        private readonly ReconnectControls reconnectControl;
        private readonly FailoverUriPool uris;

        private readonly AtomicBool closed = new AtomicBool();
        private readonly Atomic<bool> failed = new Atomic<bool>();

        private long requestTimeout;
        private long sendTimeout;

        private Uri connectedUri;
        private ConnectionInfo connectionInfo;
        private IProvider provider;
        private IProviderListener listener;

        private List<FailoverRequest> requests = new List<FailoverRequest>();

        internal IProvider ActiveProvider => provider;

        public FailoverProvider(IEnumerable<Uri> uris)
        {
            this.uris = new FailoverUriPool(uris);
            reconnectControl = new ReconnectControls(this);
        }

        public long InitialReconnectDelay { get; set; } = DEFAULT_INITIAL_RECONNECT_DELAY;
        public long ReconnectDelay { get; set; } = DEFAULT_RECONNECT_DELAY;
        public bool UseReconnectBackOff { get; set; } = DEFAULT_USE_RECONNECT_BACKOFF;
        public double ReconnectBackOffMultiplier { get; set; } = DEFAULT_RECONNECT_BACKOFF_MULTIPLIER;
        public long MaxReconnectDelay { get; set; } = DEFAULT_MAX_RECONNECT_DELAY;
        public int StartupMaxReconnectAttempts { get; set; } = DEFAULT_STARTUP_MAX_RECONNECT_ATTEMPTS;
        public int MaxReconnectAttempts { get; set; } = DEFAULT_MAX_RECONNECT_ATTEMPTS;
        public int WarnAfterReconnectAttempts { get; set; } = DEFAULT_WARN_AFTER_RECONNECT_ATTEMPTS;
        public Uri RemoteUri => connectedUri;

        public void Start()
        {
            CheckClosed();

            if (listener == null)
            {
                throw new IllegalStateException("No ProviderListener registered.");
            }
        }

        public Task Connect(ConnectionInfo info)
        {
            CheckClosed();

            requestTimeout = info.requestTimeout;
            sendTimeout = info.SendTimeout;

            connectionInfo = info;
            Tracer.Debug("Initiating initial connection attempt task");
            return TriggerReconnectionAttempt();
        }

        private Task TriggerReconnectionAttempt()
        {
            if (closed)
                return Task.CompletedTask;

            return reconnectControl.ScheduleReconnect(Reconnect);

            async Task Reconnect()
            {
                IProvider provider = null;
                Exception failure = null;
                long reconnectAttempts = reconnectControl.RecordNextAttempt();

                try
                {
                    if (uris.Any())
                    {
                        for (int i = 0; i < uris.Size(); i++)
                        {
                            var target = uris.GetNext();
                            if (target == null)
                            {
                                Tracer.Debug("Failover URI collection unexpectedly modified during connection attempt.");
                                continue;
                            }

                            try
                            {
                                Tracer.Debug($"Connection attempt:[{reconnectAttempts}] to: {target.Scheme}://{target.Host}:{target.Port} in-progress");
                                provider = ProviderFactory.Create(target);
                                await provider.Connect(connectionInfo).ConfigureAwait(false);
                                await InitializeNewConnection(provider).ConfigureAwait(false);
                                return;
                            }
                            catch (Exception e)
                            {
                                Tracer.Info($"Connection attempt:[{reconnectAttempts}] to: {target.Scheme}://{target.Host}:{target.Port} failed");
                                failure = e;
                                try
                                {
                                    provider?.Close();
                                }
                                catch
                                {
                                }
                                finally
                                {
                                    provider = null;
                                }
                            }
                        }
                    }
                    else
                    {
                        Tracer.Debug("No remote URI available to connect to in failover list");
                        // TODO Handle this one.
                        failure = new IOException("No remote URI available for reconnection during connection attempt: " + reconnectAttempts);
                    }
                }
                catch (Exception unknownFailure)
                {
                    Tracer.Warn($"Connection attempt:[{reconnectAttempts}] failed abnormally.");
                    failure = failure ?? unknownFailure;
                }
                finally
                {
                    if (provider == null)
                    {
                        Tracer.Debug($"Connection attempt:[{reconnectControl.ReconnectAttempts}] failed error: {failure?.Message}");
                        if (!reconnectControl.IsReconnectAllowed(failure))
                        {
                            ReportReconnectFailure(failure);
                        }
                        else
                        {
                            await reconnectControl.ScheduleReconnect(Reconnect).ConfigureAwait(false);
                        }
                    }
                }
            }
        }

        private void ReportReconnectFailure(Exception lastFailure)
        {
            Tracer.Error($"Failed to connect after: {reconnectControl.ReconnectAttempts} attempt(s)");
            if (failed.CompareAndSet(false, true))
            {
                if (lastFailure == null)
                {
                    lastFailure = new IOException($"Failed to connect after: {reconnectControl.ReconnectAttempts} attempt(s)");
                }

                var exception = NMSExceptionSupport.Create(lastFailure);
                listener.OnConnectionFailure(exception);
                throw exception;
            }
        }

        private async Task InitializeNewConnection(IProvider provider)
        {
            if (closed)
            {
                try
                {
                    provider.Close();
                }
                catch (Exception e)
                {
                    Tracer.Debug($"Ignoring failure to close failed provider: {provider} {e.Message}");
                }
                return;
            }

            this.provider = provider;
            this.provider.SetProviderListener(this);
            this.connectedUri = provider.RemoteUri;

            if (reconnectControl.IsRecoveryRequired())
            {
                Tracer.Debug($"Signalling connection recovery: {provider}");

                // Allow listener to recover its resources
                await listener.OnConnectionRecovery(provider).ConfigureAwait(false);

                // Restart consumers, send pull commands, etc.
                await listener.OnConnectionRecovered(provider).ConfigureAwait(false);

                // Let the client know that connection has restored.
                listener.OnConnectionRestored(connectedUri);

                // If we try to run pending requests right after the connection is reestablished 
                // it will result in timeout on the first send request
                await Task.Delay(50).ConfigureAwait(false);

                foreach (FailoverRequest request in GetPendingRequests())
                {
                    await request.Run().ConfigureAwait(false);
                }

                reconnectControl.ConnectionEstablished();
            }
            else
            {
                listener.OnConnectionEstablished(connectedUri);
                reconnectControl.ConnectionEstablished();
            }
        }

        public override string ToString()
        {
            return "FailoverProvider: " + (connectedUri == null ? "unconnected" : connectedUri.ToString());
        }

        public void Close()
        {
            if (closed.CompareAndSet(false, true))
            {
                try
                {
                    provider.Close();
                }
                catch (Exception e)
                {
                    Tracer.Warn("Error caught while closing Provider: " + e.Message);
                }
            }
        }

        public void SetProviderListener(IProviderListener providerListener)
        {
            CheckClosed();

            listener = providerListener;
        }

        public Task CreateResource(ResourceInfo resourceInfo)
        {
            CheckClosed();

            FailoverRequest request = new FailoverRequest(this, requestTimeout)
            {
                DoTask = activeProvider => provider.CreateResource(resourceInfo),
                Name = nameof(CreateResource)
            };

            request.Run();

            return request.Task;
        }

        public Task DestroyResource(ResourceInfo resourceInfo)
        {
            CheckClosed();

            FailoverRequest request = new FailoverRequest(this, requestTimeout)
            {
                DoTask = activeProvider => activeProvider.DestroyResource(resourceInfo),
                Name = nameof(DestroyResource),

                // Allow this to succeed, resource won't get recreated on reconnect.
                SucceedsWhenOffline = true
            };

            request.Run();

            return request.Task;
        }

        public Task StartResource(ResourceInfo resourceInfo)
        {
            CheckClosed();

            FailoverRequest request = new FailoverRequest(this, requestTimeout)
            {
                DoTask = activeProvider => activeProvider.StartResource(resourceInfo),
                Name = nameof(StartResource)
            };

            request.Run();

            return request.Task;
        }

        public Task StopResource(ResourceInfo resourceInfo)
        {
            CheckClosed();

            FailoverRequest request = new FailoverRequest(this, requestTimeout)
            {
                DoTask = activeProvider => activeProvider.StopResource(resourceInfo),
                Name = nameof(StopResource)
            };

            request.Run();

            return request.Task;
        }

        public Task Recover(Id sessionId)
        {
            CheckClosed();

            FailoverRequest request = new FailoverRequest(this, requestTimeout)
            {
                DoTask = activeProvider => activeProvider.Recover(sessionId),
                SucceedsWhenOffline = true,
                Name = nameof(Recover)
            };

            request.Run();

            return request.Task;
        }

        public Task Acknowledge(Id sessionId, AckType ackType)
        {
            CheckClosed();

            FailoverRequest request = new FailoverRequest(this, requestTimeout)
            {
                DoTask = activeProvider => activeProvider.Acknowledge(sessionId, ackType),
                FailureWhenOffline = true,
                Name = nameof(Acknowledge)
            };

            request.Run();

            return request.Task;
        }

        public Task Acknowledge(InboundMessageDispatch envelope, AckType ackType)
        {
            CheckClosed();

            FailoverRequest request = new FailoverRequest(this, requestTimeout)
            {
                DoTask = activeProvider => activeProvider.Acknowledge(envelope, ackType),
                FailureWhenOffline = true,
                Name = nameof(Acknowledge)
            };

            request.Run();

            return request.Task;
        }

        public INmsMessageFactory MessageFactory => provider.MessageFactory;

        public long SendTimeout => sendTimeout;

        public Task Send(OutboundMessageDispatch envelope)
        {
            CheckClosed();

            FailoverRequest request = new FailoverRequest(this, SendTimeout)
            {
                DoTask = activeProvider => activeProvider.Send(envelope),
                Name = nameof(Send)
            };

            request.Run();

            return request.Task;
        }

        public Task Unsubscribe(string name)
        {
            CheckClosed();

            FailoverRequest request = new FailoverRequest(this, SendTimeout)
            {
                DoTask = activeProvider => activeProvider.Unsubscribe(name),
                Name = nameof(Unsubscribe)
            };

            request.Run();

            return request.Task;
        }

        public Task Rollback(TransactionInfo transactionInfo, TransactionInfo nextTransactionInfo)
        {
            CheckClosed();

            FailoverRequest request = new FailoverRequest(this, SendTimeout)
            {
                DoTask = activeProvider => activeProvider.Rollback(transactionInfo, nextTransactionInfo),
                Name = nameof(Rollback),
                SucceedsWhenOffline = true
            };

            request.Run();

            return request.Task;
        }

        public Task Commit(TransactionInfo transactionInfo, TransactionInfo nextTransactionInfo)
        {
            CheckClosed();

            FailoverRequest request = new FailoverRequest(this, SendTimeout)
            {
                DoTask = activeProvider => activeProvider.Commit(transactionInfo, nextTransactionInfo),
                Name = nameof(Commit),
                FailureWhenOffline = true
            };

            request.Run();

            return request.Task;
        }

        public void OnInboundMessage(InboundMessageDispatch envelope)
        {
            if (closed)
                return;

            listener.OnInboundMessage(envelope);
        }

        public void OnConnectionFailure(NMSException exception)
        {
            Tracer.Debug($"Failover: the provider reports failure: {exception.Message}");
            HandleProviderError(provider, exception);
        }

        public Task OnConnectionRecovery(IProvider provider)
        {
            return Task.CompletedTask;
        }

        public void OnConnectionEstablished(Uri remoteUri)
        {
        }

        public Task OnConnectionRecovered(IProvider provider)
        {
            return Task.CompletedTask;
        }

        public void OnConnectionRestored(Uri remoteUri)
        {
        }

        public void OnConnectionInterrupted(Uri failedUri)
        {
        }

        public void OnResourceClosed(ResourceInfo resourceInfo, Exception error)
        {
            if (closed)
                return;

            listener.OnResourceClosed(resourceInfo, error);
        }

        internal void HandleProviderError(IProvider provider, NMSException cause)
        {
            if (closed)
                return;

            lock (SyncRoot)
            {
                // It is possible that another failed request signaled an error for the same provider
                // and we already cleaned up the old failed provider and scheduled a reconnect that
                // has already succeeded, so we need to ensure that we don't kill a valid provider.
                if (provider == this.provider)
                {
                    Tracer.Debug($"handling Provider failure: {cause.ToString()}");
                    this.provider = null;
                    provider.SetProviderListener(null);
                    Uri failedUri = provider.RemoteUri;
                    try
                    {
                        provider.Close();
                    }
                    catch (Exception exception)
                    {
                        Tracer.Debug($"Caught exception while closing failed provider: {exception.Message}");
                    }

                    if (reconnectControl.IsReconnectAllowed(cause))
                    {
                        // Start watching for request timeouts while we are offline, unless we already are.
                        foreach (FailoverRequest failoverRequest in GetPendingRequests())
                        {
                            failoverRequest.ScheduleTimeout();
                        }

                        Task.Run(TriggerReconnectionAttempt);

                        listener?.OnConnectionInterrupted(failedUri);
                    }
                    else
                    {
                        listener?.OnConnectionFailure(cause);
                    }
                }
                else
                {
                    Tracer.Debug($"Ignoring duplicate provider failed event for provider: {provider}");
                }
            }
        }

        private void CheckClosed()
        {
            if (closed)
                throw new IOException("The Provider is already closed");
        }

        internal void AddFailoverRequest(FailoverRequest request)
        {
            lock (requests)
            {
                requests.Add(request);
            }
        }

        internal void RemoveFailoverRequest(FailoverRequest request)
        {
            lock (requests)
            {
                requests.Remove(request);
            }
        }

        internal IEnumerable<FailoverRequest> GetPendingRequests()
        {
            lock (requests)
            {
                return requests.ToArray();
            }
        }

        private class ReconnectControls
        {
            private readonly FailoverProvider failoverProvider;

            // Reconnection state tracking
            private volatile bool recoveryRequired;
            private long reconnectAttempts;
            private long nextReconnectDelay = -1;

            public ReconnectControls(FailoverProvider failoverProvider)
            {
                this.failoverProvider = failoverProvider;
            }

            public long ReconnectAttempts => reconnectAttempts;

            public async Task ScheduleReconnect(Func<Task> action)
            {
                // If no connection recovery required then we have never fully connected to a remote
                // so we proceed down the connect with one immediate connection attempt and then follow
                // on delayed attempts based on configuration.
                if (!recoveryRequired)
                {
                    if (ReconnectAttempts == 0)
                    {
                        Tracer.Debug("Initial connect attempt will be performed immediately");
                        await action();
                    }
                    else if (ReconnectAttempts == 1 && failoverProvider.InitialReconnectDelay > 0)
                    {
                        Tracer.Debug($"Delayed initial reconnect attempt will be in {failoverProvider.InitialReconnectDelay} milliseconds");
                        await Task.Delay(TimeSpan.FromMilliseconds(failoverProvider.InitialReconnectDelay));
                        await action();
                    }
                    else
                    {
                        double delay = NextReconnectDelay();
                        Tracer.Debug($"Next reconnect attempt will be in {delay} milliseconds");
                        await Task.Delay(TimeSpan.FromMilliseconds(delay));
                        await action();
                    }
                }
                else if (ReconnectAttempts == 0)
                {
                    if (failoverProvider.InitialReconnectDelay > 0)
                    {
                        Tracer.Debug($"Delayed initial reconnect attempt will be in {failoverProvider.InitialReconnectDelay} milliseconds");
                        await Task.Delay(TimeSpan.FromMilliseconds(failoverProvider.InitialReconnectDelay));
                        await action();
                    }
                    else
                    {
                        Tracer.Debug("Initial Reconnect attempt will be performed immediately");
                        await action();
                    }
                }
                else
                {
                    double delay = NextReconnectDelay();
                    Tracer.Debug($"Next reconnect attempt will be in {delay} milliseconds");
                    await Task.Delay(TimeSpan.FromMilliseconds(delay));
                    await action();
                }
            }

            private long NextReconnectDelay()
            {
                if (nextReconnectDelay == -1)
                {
                    nextReconnectDelay = failoverProvider.ReconnectDelay;
                }

                if (failoverProvider.UseReconnectBackOff && ReconnectAttempts > 1)
                {
                    // Exponential increment of reconnect delay.
                    nextReconnectDelay = (long) Math.Round(nextReconnectDelay * failoverProvider.ReconnectBackOffMultiplier);
                    if (nextReconnectDelay > failoverProvider.MaxReconnectDelay)
                    {
                        nextReconnectDelay = failoverProvider.MaxReconnectDelay;
                    }
                }

                return nextReconnectDelay;
            }

            public long RecordNextAttempt()
            {
                return ++reconnectAttempts;
            }

            public void ConnectionEstablished()
            {
                recoveryRequired = true;
                nextReconnectDelay = -1;
                reconnectAttempts = 0;
            }

            public bool IsReconnectAllowed(Exception cause)
            {
                // If a connection attempts fail due to Security errors than we abort
                // reconnection as there is a configuration issue and we want to avoid
                // a spinning reconnect cycle that can never complete.
                if (IsStoppageCause(cause))
                {
                    return false;
                }

                return !IsLimitExceeded();
            }

            private bool IsStoppageCause(Exception cause)
            {
                // TODO: Check if fail is due to Security errors
                return false;
            }

            public bool IsLimitExceeded()
            {
                int reconnectLimit = ReconnectAttemptLimit();
                if (reconnectLimit != UNDEFINED && reconnectAttempts >= reconnectLimit)
                {
                    return true;
                }

                return false;
            }

            private int ReconnectAttemptLimit()
            {
                int maxReconnectValue = failoverProvider.MaxReconnectAttempts;
                if (!recoveryRequired && failoverProvider.StartupMaxReconnectAttempts != UNDEFINED)
                {
                    // If this is the first connection attempt and a specific startup retry limit
                    // is configured then use it, otherwise use the main reconnect limit
                    maxReconnectValue = failoverProvider.StartupMaxReconnectAttempts;
                }

                return maxReconnectValue;
            }

            public bool IsRecoveryRequired()
            {
                return recoveryRequired;
            }
        }
    }
}