﻿/*
 * Licensed to the Apache Software Foundation (ASF) under one or more
 * contributor license agreements.  See the NOTICE file distributed with
 * this work for additional information regarding copyright ownership.
 * The ASF licenses this file to You under the Apache License, Version 2.0
 * (the "License"); you may not use this file except in compliance with
 * the License.  You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

using System;
using System.Collections.Generic;
using System.Linq;
using System.Threading.Tasks;
using Amqp;
using Amqp.Framing;
using Amqp.Types;
using Apache.NMS.AMQP.Message;
using Apache.NMS.AMQP.Meta;
using Apache.NMS.AMQP.Provider.Amqp.Filters;
using Apache.NMS.AMQP.Provider.Amqp.Message;
using Apache.NMS.AMQP.Util;
using Apache.NMS.AMQP.Util.Synchronization;

namespace Apache.NMS.AMQP.Provider.Amqp
{
    public interface IAmqpConsumer
    {
        IDestination Destination { get; }
        IAmqpConnection Connection { get; }
    }

    public class AmqpConsumer : IAmqpConsumer
    {
        private readonly NmsConsumerInfo info;
        private ReceiverLink receiverLink;
        private readonly LinkedList<InboundMessageDispatch> messages;
        private readonly object syncRoot = new object();
        
        private bool validateSharedSubsLinkCapability;
        private bool sharedSubsNotSupported;

        private readonly AmqpSession session;
        public IDestination Destination => info.Destination;
        public IAmqpConnection Connection => session.Connection;

        public AmqpConsumer(AmqpSession amqpSession, NmsConsumerInfo info)
        {
            session = amqpSession;
            this.info = info;

            messages = new LinkedList<InboundMessageDispatch>();
        }

        public NmsConsumerId ConsumerId => this.info.Id;
        

        public Task Attach()
        {
            Target target = new Target();
            Source source = CreateSource();

            Attach attach = new Attach
            {
                Target = target,
                Source = source,
                RcvSettleMode = ReceiverSettleMode.First,
                SndSettleMode = (info.IsBrowser) ? SenderSettleMode.Settled : SenderSettleMode.Unsettled,
            };

            string receiverLinkName = null;

            string subscriptionName = info.SubscriptionName;
            if (!string.IsNullOrEmpty(subscriptionName))
            {
                AmqpConnection connection = session.Connection;

                if (info.IsShared && !connection.Info.SharedSubsSupported) {
                    validateSharedSubsLinkCapability = true;
                }

                AmqpSubscriptionTracker subTracker = connection.SubscriptionTracker;

                // Validate subscriber type allowed given existing active subscriber types.
                if (info.IsShared && info.IsDurable) {
                    if(subTracker.IsActiveExclusiveDurableSub(subscriptionName)) {
                        // Don't allow shared sub if there is already an active exclusive durable sub
                        throw new NMSException("A non-shared durable subscription is already active with name '" + subscriptionName + "'");
                    }
                } else if (!info.IsShared && info.IsDurable) {
                    if (subTracker.IsActiveExclusiveDurableSub(subscriptionName)) {
                        // Exclusive durable sub is already active
                        throw new NMSException("A non-shared durable subscription is already active with name '" + subscriptionName + "'");
                    } else if (subTracker.IsActiveSharedDurableSub(subscriptionName)) {
                        // Don't allow exclusive durable sub if there is already an active shared durable sub
                        throw new NMSException("A shared durable subscription is already active with name '" + subscriptionName + "'");
                    }
                }

                // Get the link name for the subscription. Throws if certain further validations fail.
                receiverLinkName = subTracker.ReserveNextSubscriptionLinkName(subscriptionName, info);
            }

            
            if (receiverLinkName == null) {
                string destinationAddress = source.Address ?? "";
                receiverLinkName = "nms:receiver:" + info.Id
                                                   + (destinationAddress.Length == 0 ? "" : (":" + destinationAddress));
            }

            // TODO: Add timeout
            var tsc = new TaskCompletionSource<bool>(TaskCreationOptions.RunContinuationsAsynchronously);
            receiverLink = new ReceiverLink(session.UnderlyingSession, receiverLinkName, attach, HandleOpened(tsc));
            receiverLink.AddClosedCallback(HandleClosed(tsc));
            return tsc.Task;
        }

        private OnAttached HandleOpened(TaskCompletionSource<bool> tsc) => (link, attach) =>
        {
            if (validateSharedSubsLinkCapability)
            {
                Symbol[] remoteOfferedCapabilities = attach.OfferedCapabilities;

                bool supported = false;
                if (remoteOfferedCapabilities != null)
                {
                    if (Array.Exists(remoteOfferedCapabilities, symbol => SymbolUtil.OPEN_CAPABILITY_SHARED_SUBS.Equals(symbol)))
                    {
                        supported = true;
                    }
                }

                if (!supported)
                {
                    sharedSubsNotSupported = true;

                    if (info.IsDurable)
                    {
                        link.Detach(null);
                    }
                    else
                    {
                        link.Close();
                    }
                }
            }

            if (IsClosePending(attach))
                return;

            tsc.SetResult(true);
        };

        private bool IsClosePending(Attach attach)
        {
            // When no link terminus was created, the peer will now detach/close us otherwise
            // we need to validate the returned remote source prior to open completion.
            return sharedSubsNotSupported || attach.Source == null;
        }

        private ClosedCallback HandleClosed(TaskCompletionSource<bool> tsc) => (sender, error) =>
        {
            NMSException exception = ExceptionSupport.GetException(error, "Received Amqp link detach with Error for link {0}", info.Id);
            if (!tsc.TrySetException(exception))
            {
                session.Connection.SubscriptionTracker.ConsumerRemoved(info);
                session.RemoveConsumer(info.Id);

                // If session is not closed it means that the link was remotely detached 
                if (!receiverLink.Session.IsClosed)
                {
                    session.Connection.Provider.FireResourceClosed(info, exception);
                }
            }
        };

        private Source CreateSource()
        {
            Source source = new Source();
            source.Address = AmqpDestinationHelper.GetDestinationAddress(info.Destination, session.Connection);
            source.Outcomes = new[]
            {
                SymbolUtil.ATTACH_OUTCOME_ACCEPTED,
                SymbolUtil.ATTACH_OUTCOME_RELEASED,
                SymbolUtil.ATTACH_OUTCOME_REJECTED,
                SymbolUtil.ATTACH_OUTCOME_MODIFIED
            };
            source.DefaultOutcome = MessageSupport.MODIFIED_FAILED_INSTANCE;

            if (info.IsDurable)
            {
                source.ExpiryPolicy = SymbolUtil.ATTACH_EXPIRY_POLICY_NEVER;
                source.Durable = (int) TerminusDurability.UNSETTLED_STATE;
                source.DistributionMode = SymbolUtil.ATTACH_DISTRIBUTION_MODE_COPY;
            }
            else
            {
                source.ExpiryPolicy = SymbolUtil.ATTACH_EXPIRY_POLICY_SESSION_END;
                source.Durable = (int) TerminusDurability.NONE;
            }
            
            

            if (info.IsBrowser)
            {
                source.DistributionMode = SymbolUtil.ATTACH_DISTRIBUTION_MODE_COPY;
            }

            
            IList<Symbol> capabilities = new List<Symbol>();
            Symbol typeCapability = SymbolUtil.GetTerminusCapabilitiesForDestination(info.Destination);
            if (typeCapability != null)
            {
                capabilities.Add(typeCapability);
            }
            
            if (info.IsShared) {
                capabilities.Add(SymbolUtil.SHARED);

                if(!info.IsExplicitClientId) {
                    capabilities.Add(SymbolUtil.GLOBAL);
                }
            }

            if (capabilities.Any()) {
                source.Capabilities = capabilities.ToArray();
            }

            Map filters = new Map();
            
            if (info.NoLocal)
            {
                filters.Add(SymbolUtil.ATTACH_FILTER_NO_LOCAL, AmqpNmsNoLocalType.NO_LOCAL);
            }

            if (info.HasSelector())
            {
                filters.Add(SymbolUtil.ATTACH_FILTER_SELECTOR, new AmqpNmsSelectorType(info.Selector));
            }

            if (filters.Count > 0)
            {
                source.FilterSet = filters;
            }

            return source;
        }

        public void Start()
        {
            receiverLink.Start(info.LinkCredit, OnMessage);
        }

        public void Stop()
        {
            receiverLink.SetCredit(0, CreditMode.Drain);
        }

        private void OnMessage(IReceiverLink receiver, global::Amqp.Message amqpMessage)
        {
            if (Tracer.IsDebugEnabled)
            {
                Tracer.Debug($"Received message {amqpMessage?.Properties?.MessageId}");
            }
            
            NmsMessage message;
            try
            {
                message = AmqpCodec.DecodeMessage(this, amqpMessage).AsMessage();
            }
            catch (Exception e)
            {
                Tracer.Error($"Error on transform: {e.Message}");

                // Mark message as undeliverable
                receiverLink.Modify(amqpMessage, true, true);
                return;
            }

            // Let the message do any final processing before sending it onto a consumer.
            // We could defer this to a later stage such as the NmsConnection or even in
            // the NmsMessageConsumer dispatch method if we needed to.
            message.OnDispatch();

            InboundMessageDispatch inboundMessageDispatch = new InboundMessageDispatch
            {
                Message = message,
                ConsumerId = info.Id,
                ConsumerInfo = info,
            };

            AddMessage(inboundMessageDispatch);

            IProviderListener providerListener = session.Connection.Provider.Listener;
            providerListener.OnInboundMessage(inboundMessageDispatch);
        }

        public void Acknowledge(AckType ackType)
        {
            foreach (InboundMessageDispatch envelope in GetMessages().Where(x => x.IsDelivered))
            {
                Acknowledge(envelope, ackType);
            }
        }

        public void Acknowledge(InboundMessageDispatch envelope, AckType ackType)
        {
            if (envelope.Message.Facade is AmqpNmsMessageFacade facade)
            {
                global::Amqp.Message message = facade.Message;
                switch (ackType)
                {
                    case AckType.DELIVERED:
                        envelope.IsDelivered = true;
                        break;
                    case AckType.ACCEPTED:
                        HandleAccepted(envelope, message);
                        break;
                    case AckType.RELEASED:
                        receiverLink.Release(message);
                        RemoveMessage(envelope);
                        break;
                    case AckType.MODIFIED_FAILED_UNDELIVERABLE:
                        receiverLink.Modify(message, true, true);
                        RemoveMessage(envelope);
                        break;
                    default:
                        Tracer.ErrorFormat("Unsupported Ack Type for message: {0}", envelope);
                        throw new ArgumentException($"Unsupported Ack Type for message: {envelope}");
                }
            }
            else
            {
                Tracer.ErrorFormat($"Received Ack for unknown message: {envelope}");
            }
        }

        private void HandleAccepted(InboundMessageDispatch envelope, global::Amqp.Message message)
        {
            Tracer.DebugFormat("Accepted Ack of message: {0}", envelope);
            
            if (session.IsTransacted)
            {
                if (!session.IsTransactionFailed)
                {
                    var transactionalState = session.TransactionContext;
                    receiverLink.Complete(message, transactionalState.GetTxnAcceptState());
                    transactionalState.RegisterTxConsumer(this);
                }
                else
                {
                    Tracer.DebugFormat("Skipping ack of message {0} in failed transaction.", envelope);
                }
                
            }
            else
            {
                receiverLink.Accept(message);
            }

            RemoveMessage(envelope);
        }

        private void AddMessage(InboundMessageDispatch envelope)
        {
            lock (syncRoot)
            {
                messages.AddLast(envelope);
            }
        }

        private void RemoveMessage(InboundMessageDispatch envelope)
        {
            lock (syncRoot)
            {
                messages.Remove(envelope);
            }
        }

        private InboundMessageDispatch[] GetMessages()
        {
            lock (syncRoot)
            {
                return messages.ToArray();
            }
        }

        public async Task CloseAsync()
        {
            if (info.IsDurable)
            {
                if (receiverLink != null) await receiverLink.DetachAsync().AwaitRunContinuationAsync();
            }
            else
            {
                if (receiverLink != null) await receiverLink.CloseAsync().AwaitRunContinuationAsync();
            }
        }

        public void Recover()
        {
            Tracer.Debug($"Session Recover for consumer: {info.Id}");

            List<InboundMessageDispatch> redispatchList = new List<InboundMessageDispatch>();
            foreach (var envelope in GetMessages().Where(x => x.IsDelivered))
            {
                envelope.IsDelivered = false;
                envelope.Message.Facade.RedeliveryCount++;
                envelope.EnqueueFirst = true;
                redispatchList.Add(envelope);
            }

            IProviderListener providerListener = session.Connection.Provider.Listener;

            for (int i = redispatchList.Count - 1; i >= 0; i--)
            {
                providerListener.OnInboundMessage(redispatchList[i]);
            }
        }

        public bool HasSubscription(string subscriptionName)
        {
            return (info.IsDurable || info.IsShared) && info.SubscriptionName.Equals(subscriptionName);
        }

        public void PostRollback()
        {
            var pendingMessages = GetMessages().Where(x => !x.IsDelivered);
            foreach (InboundMessageDispatch message in pendingMessages)
            {
                Acknowledge(message, AckType.RELEASED);
            }
        }
    }
}