/*
 * Licensed to the Apache Software Foundation (ASF) under one or more
 * contributor license agreements.  See the NOTICE file distributed with
 * this work for additional information regarding copyright ownership.
 * The ASF licenses this file to You under the Apache License, Version 2.0
 * (the "License"); you may not use this file except in compliance with
 * the License.  You may obtain a copy of the License at
 *
 *      http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

using System;
using System.Collections.Generic;
using Apache.NMS.ActiveMQ.Commands;
using Apache.NMS.ActiveMQ.Transport;
using Apache.NMS.ActiveMQ.Util;
using System.Collections;

namespace Apache.NMS.ActiveMQ.State
{
    /// <summary>
    /// Tracks the state of a connection so a newly established transport can be
    /// re-initialized to the state that was tracked.
    /// </summary>
    public class ConnectionStateTracker : CommandVisitorAdapter
    {
        private static readonly Tracked TRACKED_RESPONSE_MARKER = new Tracked(null);

        protected readonly Dictionary<ConnectionId, ConnectionState> connectionStates =
            new Dictionary<ConnectionId, ConnectionState>();

        private bool isTrackTransactions;
        private bool isTrackTransactionProducers = true;
        private bool isRestoreSessions = true;
        private bool isRestoreConsumers = true;
        private bool isRestoreProducers = true;
        private bool isRestoreTransaction = true;
        private bool isTrackMessages = true;
        private int maxCacheSize = 256;
        private readonly LRUCache<Object, Command> messageCache = new LRUCache<Object, Command>(256);

        private class RemoveTransactionAction : ResponseHandler
        {
            private readonly TransactionInfo info;
            private readonly ConnectionStateTracker cst;

            public RemoveTransactionAction(TransactionInfo info, ConnectionStateTracker aCst)
            {
                this.info = info;
                this.cst = aCst;
            }

            public override void OnResponse()
            {
                ConnectionState cs;

                if(cst.connectionStates.TryGetValue(info.ConnectionId, out cs))
                {
                    cs.RemoveTransactionState(info.TransactionId);
                }
            }
        }

        /// <summary>
        /// </summary>
        /// <param name="command"></param>
        /// <returns>null if the command is not state tracked.</returns>
        public Tracked Track(Command command)
        {
            try
            {
                return (Tracked) command.Visit(this);
            }
            catch(IOException)
            {
                throw;
            }
            catch(Exception e)
            {
                throw new IOException(e.Message);
            }
        }

        public void TrackBack(Command command)
        {
        }

        public void DoRestore(ITransport transport)
        {
            // Restore the connections.
            foreach(ConnectionState connectionState in connectionStates.Values)
            {
                ConnectionInfo info = connectionState.Info;
                info.FailoverReconnect = true;
                if (Tracer.IsDebugEnabled)
                {
                    Tracer.Debug("conn: " + connectionState.Info.ConnectionId);
                }
                transport.Oneway(info);

                DoRestoreTempDestinations(transport, connectionState);

                if(RestoreSessions)
                {
                    DoRestoreSessions(transport, connectionState);
                }

                if(RestoreTransaction)
                {
                    DoRestoreTransactions(transport, connectionState);
                }
            }

            // Now flush messages
            foreach(Command command in messageCache.Values)
            {
                if (Tracer.IsDebugEnabled)
                {
                    Tracer.Debug("Replaying command: " + command);
                }

                transport.Oneway(command);
            }
        }

        private void DoRestoreTransactions(ITransport transport, ConnectionState connectionState)
        {
            AtomicCollection<TransactionState> transactionStates = connectionState.TransactionStates;
            List<TransactionInfo> toRollback = new List<TransactionInfo>();

            foreach(TransactionState transactionState in transactionStates)
            {
                // rollback any completed transactions - no way to know if commit got there
                // or if reply went missing
                if (transactionState.Commands.Count != 0)
                {
                    Command lastCommand = transactionState.Commands[transactionState.Commands.Count - 1];
                    if (lastCommand.IsTransactionInfo)
                    {
                        TransactionInfo transactionInfo = lastCommand as TransactionInfo;
                        if (transactionInfo.Type == TransactionInfo.COMMIT_ONE_PHASE)
                        {
                            if (Tracer.IsDebugEnabled)
                            {
                                Tracer.Debug("rolling back potentially completed tx: " +
                                             transactionState.Id);
                            }
                            toRollback.Add(transactionInfo);
                            continue;
                        }
                    }
                }

                // replay the add and remove of short lived producers that may have been
                // involved in the transaction
                foreach (ProducerState producerState in transactionState.ProducerStates)
                {
                    if (Tracer.IsDebugEnabled)
                    {
                        Tracer.Debug("tx replay producer :" + producerState.Info);
                    }
                    transport.Oneway(producerState.Info);
                }

                foreach (Command command in transactionState.Commands)
                {
                    if (Tracer.IsDebugEnabled)
                    {
                        Tracer.Debug("tx replay: " + command);
                    }
                    transport.Oneway(command);
                }

                foreach (ProducerState producerState in transactionState.ProducerStates)
                {
                    if (Tracer.IsDebugEnabled)
                    {
                        Tracer.Debug("tx remove replayed producer :" + producerState.Info);
                    }

                    RemoveInfo producerRemove = new RemoveInfo();
                    producerRemove.ObjectId = producerState.Info.ProducerId;
                    transport.Oneway(producerRemove);
                }
            }

            foreach (TransactionInfo command in toRollback)
            {
                // respond to the outstanding commit
                ExceptionResponse response = new ExceptionResponse();
                response.Exception = new BrokerError();
                response.Exception.Message =
                    "Transaction completion in doubt due to failover. Forcing rollback of " + command.TransactionId;
                response.Exception.ExceptionClass = (new TransactionRolledBackException()).GetType().FullName;
                response.CorrelationId = command.CommandId;
                transport.Command(transport, response);
            }
        }

        /// <summary>
        /// </summary>
        /// <param name="transport"></param>
        /// <param name="connectionState"></param>
        protected void DoRestoreSessions(ITransport transport, ConnectionState connectionState)
        {
            // Restore the connection's sessions
            foreach(SessionState sessionState in connectionState.SessionStates)
            {
                if (Tracer.IsDebugEnabled)
                {
                    Tracer.Debug("Restoring session: " + sessionState.Info.SessionId);
                }
                transport.Oneway(sessionState.Info);

                if(RestoreProducers)
                {
                    DoRestoreProducers(transport, sessionState);
                }

                if(RestoreConsumers)
                {
                    DoRestoreConsumers(transport, sessionState);
                }
            }
        }

        /// <summary>
        /// </summary>
        /// <param name="transport"></param>
        /// <param name="sessionState"></param>
        protected void DoRestoreConsumers(ITransport transport, SessionState sessionState)
        {
            // Restore the session's consumers but possibly in pull only (prefetch 0 state) till
            // recovery completes.

            ConnectionState connectionState = null;
            bool connectionInterruptionProcessingComplete = false;

            if(connectionStates.TryGetValue(sessionState.Info.SessionId.ParentId, out connectionState))
            {
                connectionInterruptionProcessingComplete = connectionState.ConnectionInterruptProcessingComplete;
            }

            // Restore the session's consumers
            foreach(ConsumerState consumerState in sessionState.ConsumerStates)
            {
                ConsumerInfo infoToSend = consumerState.Info;

                if(!connectionInterruptionProcessingComplete && infoToSend.PrefetchSize > 0 && transport.WireFormat.Version > 5)
                {
                    infoToSend = consumerState.Info.Clone() as ConsumerInfo;
                    lock(((ICollection) connectionState.RecoveringPullConsumers).SyncRoot)
                    {
                        if(!connectionState.RecoveringPullConsumers.ContainsKey(infoToSend.ConsumerId))
                        {
                            connectionState.RecoveringPullConsumers.Add(infoToSend.ConsumerId, consumerState.Info);
                        }
                    }
                    infoToSend.PrefetchSize = 0;
                    if(Tracer.IsDebugEnabled)
                    {
                        Tracer.Debug("restore consumer: " + infoToSend.ConsumerId +
                                     " in pull mode pending recovery, overriding prefetch: " +
                                     consumerState.Info.PrefetchSize);
                    }
                }

                if(Tracer.IsDebugEnabled)
                {
                    Tracer.Debug("restore consumer: " + infoToSend.ConsumerId);
                }

                transport.Oneway(infoToSend);
            }
        }

        /// <summary>
        /// </summary>
        /// <param name="transport"></param>
        /// <param name="sessionState"></param>
        protected void DoRestoreProducers(ITransport transport, SessionState sessionState)
        {
            // Restore the session's producers
            foreach(ProducerState producerState in sessionState.ProducerStates)
            {
                if (Tracer.IsDebugEnabled)
                {
                    Tracer.Debug("Restoring producer: " + producerState.Info.ProducerId);
                }
                transport.Oneway(producerState.Info);
            }
        }

        /// <summary>
        /// </summary>
        /// <param name="transport"></param>
        /// <param name="connectionState"></param>
        protected void DoRestoreTempDestinations(ITransport transport, ConnectionState connectionState)
        {
            // Restore the connection's temp destinations.
            foreach(DestinationInfo destinationInfo in connectionState.TempDestinations)
            {
                transport.Oneway(destinationInfo);
            }
        }

        public override Response ProcessAddDestination(DestinationInfo info)
        {
            if(info != null && info.Destination.IsTemporary)
            {
                ConnectionState cs;

                if(connectionStates.TryGetValue(info.ConnectionId, out cs))
                {
                    cs.AddTempDestination(info);
                }
            }
            return TRACKED_RESPONSE_MARKER;
        }

        public override Response ProcessRemoveDestination(DestinationInfo info)
        {
            if(info != null && info.Destination.IsTemporary)
            {
                ConnectionState cs;
                if(connectionStates.TryGetValue(info.ConnectionId, out cs))
                {
                    cs.RemoveTempDestination(info.Destination);
                }
            }
            return TRACKED_RESPONSE_MARKER;
        }

        public override Response ProcessAddProducer(ProducerInfo info)
        {
            if(info != null && info.ProducerId != null)
            {
                SessionId sessionId = info.ProducerId.ParentId;
                if(sessionId != null)
                {
                    ConnectionId connectionId = sessionId.ParentId;
                    if(connectionId != null)
                    {
                        ConnectionState cs;

                        if(connectionStates.TryGetValue(connectionId, out cs))
                        {
                            SessionState ss = cs[sessionId];
                            if(ss != null)
                            {
                                ss.AddProducer(info);
                            }
                        }
                    }
                }
            }
            return TRACKED_RESPONSE_MARKER;
        }

        public override Response ProcessRemoveProducer(ProducerId id)
        {
            if(id != null)
            {
                SessionId sessionId = id.ParentId;
                if(sessionId != null)
                {
                    ConnectionId connectionId = sessionId.ParentId;
                    if(connectionId != null)
                    {
                        ConnectionState cs = null;

                        if(connectionStates.TryGetValue(connectionId, out cs))
                        {
                            SessionState ss = cs[sessionId];
                            if(ss != null)
                            {
                                ss.RemoveProducer(id);
                            }
                        }
                    }
                }
            }
            return TRACKED_RESPONSE_MARKER;
        }

        public override Response ProcessAddConsumer(ConsumerInfo info)
        {
            if(info != null)
            {
                SessionId sessionId = info.ConsumerId.ParentId;
                if(sessionId != null)
                {
                    ConnectionId connectionId = sessionId.ParentId;
                    if(connectionId != null)
                    {
                        ConnectionState cs = null;

                        if(connectionStates.TryGetValue(connectionId, out cs))
                        {
                            SessionState ss = cs[sessionId];
                            if(ss != null)
                            {
                                ss.AddConsumer(info);
                            }
                        }
                    }
                }
            }
            return TRACKED_RESPONSE_MARKER;
        }

        public override Response ProcessRemoveConsumer(ConsumerId id)
        {
            if(id != null)
            {
                SessionId sessionId = id.ParentId;
                if(sessionId != null)
                {
                    ConnectionId connectionId = sessionId.ParentId;
                    if(connectionId != null)
                    {
                        ConnectionState cs = null;

                        if(connectionStates.TryGetValue(connectionId, out cs))
                        {
                            SessionState ss = cs[sessionId];
                            if(ss != null)
                            {
                                ss.RemoveConsumer(id);
                            }

                            cs.RecoveringPullConsumers.Remove(id);
                        }
                    }
                }
            }
            return TRACKED_RESPONSE_MARKER;
        }

        public override Response ProcessAddSession(SessionInfo info)
        {
            if(info != null)
            {
                ConnectionId connectionId = info.SessionId.ParentId;
                if(connectionId != null)
                {
                    ConnectionState cs = null;

                    if(connectionStates.TryGetValue(connectionId, out cs))
                    {
                        cs.AddSession(info);
                    }
                }
            }
            return TRACKED_RESPONSE_MARKER;
        }

        public override Response ProcessRemoveSession(SessionId id)
        {
            if(id != null)
            {
                ConnectionId connectionId = id.ParentId;
                if(connectionId != null)
                {
                    ConnectionState cs = null;

                    if(connectionStates.TryGetValue(connectionId, out cs))
                    {
                        cs.RemoveSession(id);
                    }
                }
            }
            return TRACKED_RESPONSE_MARKER;
        }

        public override Response ProcessAddConnection(ConnectionInfo info)
        {
            if(info != null)
            {
                ConnectionState connState = new ConnectionState(info);

                if(connectionStates.ContainsKey(info.ConnectionId))
                {
                    connectionStates[info.ConnectionId] = connState;
                }
                else
                {
                    connectionStates.Add(info.ConnectionId, connState);
                }
            }

            return TRACKED_RESPONSE_MARKER;
        }

        public override Response ProcessRemoveConnection(ConnectionId id)
        {
            if(id != null)
            {
                connectionStates.Remove(id);
            }
            return TRACKED_RESPONSE_MARKER;
        }

        public override Response ProcessMessage(Message send)
        {
            if(send != null)
            {
                if(TrackTransactions && send.TransactionId != null)
                {
                    ProducerId producerId = send.ProducerId;
                    ConnectionId connectionId = producerId.ParentId.ParentId;
                    if(connectionId != null)
                    {
                        ConnectionState cs = null;

                        if(connectionStates.TryGetValue(connectionId, out cs))
                        {
                            TransactionState transactionState = cs[send.TransactionId];
                            if(transactionState != null)
                            {
                                transactionState.AddCommand(send);

                                if (isTrackTransactionProducers)
                                {
                                    SessionState ss = cs[producerId.ParentId];
                                    ProducerState producerState = ss[producerId];
                                    producerState.TransactionState = transactionState;
                                }
                            }
                        }
                    }
                    return TRACKED_RESPONSE_MARKER;
                }
                else if(TrackMessages)
                {
                    messageCache.Add(send.MessageId, (Message) send.Clone());
                }
            }
            return null;
        }

        public override Response ProcessMessageAck(MessageAck ack)
        {
            if(TrackTransactions && ack != null && ack.TransactionId != null)
            {
                ConnectionId connectionId = ack.ConsumerId.ParentId.ParentId;
                if(connectionId != null)
                {
                    ConnectionState cs = null;

                    if(connectionStates.TryGetValue(connectionId, out cs))
                    {
                        TransactionState transactionState = cs[ack.TransactionId];
                        if(transactionState != null)
                        {
                            transactionState.AddCommand(ack);
                        }
                    }
                }
                return TRACKED_RESPONSE_MARKER;
            }
            return null;
        }

        public override Response ProcessBeginTransaction(TransactionInfo info)
        {
            if(TrackTransactions && info != null && info.TransactionId != null)
            {
                ConnectionId connectionId = info.ConnectionId;
                if(connectionId != null)
                {
                    ConnectionState cs = null;

                    if(connectionStates.TryGetValue(connectionId, out cs))
                    {
                        cs.AddTransactionState(info.TransactionId);
                        TransactionState state = cs[info.TransactionId];
                        state.AddCommand(info);
                    }
                }
                return TRACKED_RESPONSE_MARKER;
            }
            return null;
        }

        public override Response ProcessPrepareTransaction(TransactionInfo info)
        {
            if(TrackTransactions && info != null)
            {
                ConnectionId connectionId = info.ConnectionId;
                if(connectionId != null)
                {
                    ConnectionState cs = null;

                    if(connectionStates.TryGetValue(connectionId, out cs))
                    {
                        TransactionState transactionState = cs[info.TransactionId];
                        if(transactionState != null)
                        {
                            transactionState.AddCommand(info);
                        }
                    }
                }
                return TRACKED_RESPONSE_MARKER;
            }
            return null;
        }

        public override Response ProcessCommitTransactionOnePhase(TransactionInfo info)
        {
            if(TrackTransactions && info != null)
            {
                ConnectionId connectionId = info.ConnectionId;
                if(connectionId != null)
                {
                    ConnectionState cs = null;

                    if(connectionStates.TryGetValue(connectionId, out cs))
                    {
                        TransactionState transactionState = cs[info.TransactionId];
                        if(transactionState != null)
                        {
                            transactionState.AddCommand(info);
                            return new Tracked(new RemoveTransactionAction(info, this));
                        }
                    }
                }
            }
            return null;
        }

        public override Response ProcessCommitTransactionTwoPhase(TransactionInfo info)
        {
            if(TrackTransactions && info != null)
            {
                ConnectionId connectionId = info.ConnectionId;
                if(connectionId != null)
                {
                    ConnectionState cs = null;

                    if(connectionStates.TryGetValue(connectionId, out cs))
                    {
                        TransactionState transactionState = cs[info.TransactionId];
                        if(transactionState != null)
                        {
                            transactionState.AddCommand(info);
                            return new Tracked(new RemoveTransactionAction(info, this));
                        }
                    }
                }
            }
            return null;
        }

        public override Response ProcessRollbackTransaction(TransactionInfo info)
        {
            if(TrackTransactions && info != null)
            {
                ConnectionId connectionId = info.ConnectionId;
                if(connectionId != null)
                {
                    ConnectionState cs = null;

                    if(connectionStates.TryGetValue(connectionId, out cs))
                    {
                        TransactionState transactionState = cs[info.TransactionId];
                        if(transactionState != null)
                        {
                            transactionState.AddCommand(info);
                            return new Tracked(new RemoveTransactionAction(info, this));
                        }
                    }
                }
            }
            return null;
        }

        public override Response ProcessEndTransaction(TransactionInfo info)
        {
            if(TrackTransactions && info != null)
            {
                ConnectionId connectionId = info.ConnectionId;
                if(connectionId != null)
                {
                    ConnectionState cs = null;

                    if(connectionStates.TryGetValue(connectionId, out cs))
                    {
                        TransactionState transactionState = cs[info.TransactionId];
                        if(transactionState != null)
                        {
                            transactionState.AddCommand(info);
                        }
                    }
                }
                return TRACKED_RESPONSE_MARKER;
            }
            return null;
        }

        public override Response ProcessMessagePull(MessagePull pull)
        {
            if (pull != null)
            {
                // leave a single instance in the cache
                String id = pull.Destination + "::" + pull.ConsumerId;
                messageCache[id] = pull;
            }
            return null;
        }

        public bool RestoreConsumers
        {
            get { return isRestoreConsumers; }
            set { isRestoreConsumers = value; }
        }

        public bool RestoreProducers
        {
            get { return isRestoreProducers; }
            set { isRestoreProducers = value; }
        }

        public bool RestoreSessions
        {
            get { return isRestoreSessions; }
            set { isRestoreSessions = value; }
        }

        public bool TrackTransactions
        {
            get { return isTrackTransactions; }
            set { isTrackTransactions = value; }
        }

        public bool TrackTransactionProducers
        {
            get { return isTrackTransactionProducers; }
            set { isTrackTransactionProducers = value; }
        }

        public bool RestoreTransaction
        {
            get { return isRestoreTransaction; }
            set { isRestoreTransaction = value; }
        }

        public bool TrackMessages
        {
            get { return isTrackMessages; }
            set { isTrackMessages = value; }
        }

        public int MaxCacheSize
        {
            get { return maxCacheSize; }
            set
            {
                this.maxCacheSize = value;
                this.messageCache.MaxCacheSize = maxCacheSize;
            }
        }

        public void ConnectionInterruptProcessingComplete(ITransport transport, ConnectionId connectionId)
        {
            ConnectionState connectionState = null;

            if(connectionStates.TryGetValue(connectionId, out connectionState))
            {
                connectionState.ConnectionInterruptProcessingComplete = true;

                Dictionary<ConsumerId, ConsumerInfo> consumersToRestorePrefetchOn;

                lock(((ICollection) connectionState.RecoveringPullConsumers).SyncRoot)
                {
                    consumersToRestorePrefetchOn = new Dictionary<ConsumerId, ConsumerInfo>(connectionState.RecoveringPullConsumers);
                    connectionState.RecoveringPullConsumers.Clear();
                }

                foreach(KeyValuePair<ConsumerId, ConsumerInfo> entry in consumersToRestorePrefetchOn)
                {
                    ConsumerControl control = new ConsumerControl();
                    control.ConsumerId = entry.Key;
                    control.Prefetch = entry.Value.PrefetchSize;
                    control.Destination = entry.Value.Destination;
                    try
                    {
                        if(Tracer.IsDebugEnabled)
                        {
                            Tracer.Debug("restored recovering consumer: " + control.ConsumerId +
                                         " with: " + control.Prefetch);
                        }
                        transport.Oneway(control);
                    }
                    catch(Exception ex)
                    {
                        if(Tracer.IsDebugEnabled)
                        {
                            Tracer.Debug("Failed to submit control for consumer: " + control.ConsumerId +
                                         " with: " + control.Prefetch + "Error: " + ex.Message);
                        }
                    }
                }
            }
        }

        public void TransportInterrupted(ConnectionId id)
        {
            ConnectionState connection = null;

            if(connectionStates.TryGetValue(id, out connection))
            {
                connection.ConnectionInterruptProcessingComplete = false;
            }
        }
    }
}
