/*
 * Licensed to the Apache Software Foundation (ASF) under one or more
 * contributor license agreements.  See the NOTICE file distributed with
 * this work for additional information regarding copyright ownership.
 * The ASF licenses this file to You under the Apache License, Version 2.0
 * (the "License"); you may not use this file except in compliance with
 * the License.  You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

using System;
using System.Collections;
using System.Threading;
using Apache.NMS.Stomp.Commands;
using Apache.NMS.Stomp.Util;
using Apache.NMS.Util;

namespace Apache.NMS.Stomp
{
    /// <summary>
    /// Default provider of ISession
    /// </summary>
    public class Session : ISession, IDispatcher
    {
        /// <summary>
        /// Private object used for synchronization, instead of public "this"
        /// </summary>
        private readonly object myLock = new object();

        private readonly IDictionary consumers = Hashtable.Synchronized(new Hashtable());
        private readonly IDictionary producers = Hashtable.Synchronized(new Hashtable());

        private SessionExecutor executor;
        private TransactionContext transactionContext;
        private Connection connection;

        private bool dispatchAsync;
        private bool exclusive;
        private bool retroactive;
        private byte priority;

        private readonly SessionInfo info;
        private int consumerCounter;
        private int producerCounter;
        private int nextDeliveryId;
        private bool disposed = false;
        private bool closed = false;
        private bool closing = false;
        private TimeSpan requestTimeout = Apache.NMS.NMSConstants.defaultRequestTimeout;
        private AcknowledgementMode acknowledgementMode;

        public Session(Connection connection, SessionInfo info, AcknowledgementMode acknowledgementMode)
        {
            this.connection = connection;
            this.info = info;
            this.acknowledgementMode = acknowledgementMode;
            this.requestTimeout = connection.RequestTimeout;

            if(acknowledgementMode == AcknowledgementMode.Transactional)
            {
                this.transactionContext = new TransactionContext(this);
            }
            else if(acknowledgementMode == AcknowledgementMode.DupsOkAcknowledge)
            {
                this.acknowledgementMode = AcknowledgementMode.AutoAcknowledge;
            }

            this.executor = new SessionExecutor(this, this.consumers);
        }

        ~Session()
        {
            Dispose(false);
        }

        #region Property Accessors

        /// <summary>
        /// Sets the prefetch size, the maximum number of messages a broker will dispatch to consumers
        /// until acknowledgements are received.
        /// </summary>
        public int PrefetchSize
        {
            set{ this.connection.PrefetchPolicy.SetAll(value); }
        }

        /// <summary>
        /// Sets the maximum number of messages to keep around per consumer
        /// in addition to the prefetch window for non-durable topics until messages
        /// will start to be evicted for slow consumers.
        /// Must be > 0 to enable this feature
        /// </summary>
        public int MaximumPendingMessageLimit
        {
            set{ this.connection.PrefetchPolicy.MaximumPendingMessageLimit = value; }
        }

        /// <summary>
        /// Enables or disables whether asynchronous dispatch should be used by the broker
        /// </summary>
        public bool DispatchAsync
        {
            get{ return this.dispatchAsync; }
            set{ this.dispatchAsync = value; }
        }

        /// <summary>
        /// Enables or disables exclusive consumers when using queues. An exclusive consumer means
        /// only one instance of a consumer is allowed to process messages on a queue to preserve order
        /// </summary>
        public bool Exclusive
        {
            get{ return this.exclusive; }
            set{ this.exclusive = value; }
        }

        /// <summary>
        /// Enables or disables retroactive mode for consumers; i.e. do they go back in time or not?
        /// </summary>
        public bool Retroactive
        {
            get{ return this.retroactive; }
            set{ this.retroactive = value; }
        }

        /// <summary>
        /// Sets the default consumer priority for consumers
        /// </summary>
        public byte Priority
        {
            get{ return this.priority; }
            set{ this.priority = value; }
        }

        public Connection Connection
        {
            get { return this.connection; }
        }

        public SessionId SessionId
        {
            get { return info.SessionId; }
        }

        public TransactionContext TransactionContext
        {
            get { return this.transactionContext; }
        }

        public TimeSpan RequestTimeout
        {
            get { return this.requestTimeout; }
            set { this.requestTimeout = value; }
        }

        public bool Transacted
        {
            get { return this.AcknowledgementMode == AcknowledgementMode.Transactional; }
        }

        public AcknowledgementMode AcknowledgementMode
        {
            get { return this.acknowledgementMode; }
        }

        public bool IsClientAcknowledge
        {
            get { return this.acknowledgementMode == AcknowledgementMode.ClientAcknowledge; }
        }

        public bool IsAutoAcknowledge
        {
            get { return this.acknowledgementMode == AcknowledgementMode.AutoAcknowledge; }
        }

        public bool IsDupsOkAcknowledge
        {
            get { return this.acknowledgementMode == AcknowledgementMode.DupsOkAcknowledge; }
        }

        public bool IsIndividualAcknowledge
        {
            get { return this.acknowledgementMode == AcknowledgementMode.IndividualAcknowledge; }
        }

        public bool IsTransacted
        {
            get { return this.acknowledgementMode == AcknowledgementMode.Transactional; }
        }

        public SessionExecutor Executor
        {
            get { return this.executor; }
        }

        public long NextDeliveryId
        {
            get { return Interlocked.Increment(ref this.nextDeliveryId); }
        }

        #endregion

        #region ISession Members

        public void Dispose()
        {
            Dispose(true);
            GC.SuppressFinalize(this);
        }

        protected void Dispose(bool disposing)
        {
            if(this.disposed)
            {
                return;
            }

            if(disposing)
            {
                // Dispose managed code here.
            }

            try
            {
                Close();
            }
            catch
            {
                // Ignore network errors.
            }

            this.disposed = true;
        }

        public void Close()
        {
            lock(myLock)
            {
                if(this.closed)
                {
                    return;
                }

                try
                {
                    Tracer.InfoFormat("Closing The Session with Id {0}", this.info.SessionId.ToString());
                    DoClose();
                    Tracer.InfoFormat("Closed The Session with Id {0}", this.info.SessionId.ToString());
                }
                catch(Exception ex)
                {
                    Tracer.ErrorFormat("Error during session close: {0}", ex);
                }
                finally
                {
                    this.connection = null;
                    this.closed = true;
                    this.closing = false;
                }
            }
        }

        internal void DoClose()
        {
            lock(myLock)
            {
                if(this.closed)
                {
                    return;
                }

                try
                {
                    this.closing = true;

                    // Stop all message deliveries from this Session
                    Stop();

                    lock(consumers.SyncRoot)
                    {
                        foreach(MessageConsumer consumer in consumers.Values)
                        {
                            consumer.DoClose();
                        }
                    }
                    consumers.Clear();

                    lock(producers.SyncRoot)
                    {
                        foreach(MessageProducer producer in producers.Values)
                        {
                            producer.DoClose();
                        }
                    }
                    producers.Clear();

                    // If in a transaction roll it back
                    if(this.IsTransacted && this.transactionContext.InTransaction)
                    {
                        try
                        {
                            this.transactionContext.Rollback();
                        }
                        catch
                        {
                        }
                    }

                    Connection.RemoveSession(this);
                }
                catch(Exception ex)
                {
                    Tracer.ErrorFormat("Error during session close: {0}", ex);
                }
                finally
                {
                    this.closed = true;
                    this.closing = false;
                }
            }
        }

        public IMessageProducer CreateProducer()
        {
            return CreateProducer(null);
        }

        public IMessageProducer CreateProducer(IDestination destination)
        {
            ProducerInfo command = CreateProducerInfo(destination);
            ProducerId producerId = command.ProducerId;
            MessageProducer producer = null;

            try
            {
                producer = new MessageProducer(this, command);
                producers[producerId] = producer;
            }
            catch(Exception)
            {
                if(producer != null)
                {
                    producer.Close();
                }

                throw;
            }

            return producer;
        }

        public IMessageConsumer CreateConsumer(IDestination destination)
        {
            return CreateConsumer(destination, null, false);
        }

        public IMessageConsumer CreateConsumer(IDestination destination, string selector)
        {
            return CreateConsumer(destination, selector, false);
        }

        public IMessageConsumer CreateConsumer(IDestination destination, string selector, bool noLocal)
        {
            if (destination == null)
            {
                throw new InvalidDestinationException("Cannot create a Consumer with a Null destination");
            }

            ConsumerInfo command = CreateConsumerInfo(destination, selector);
            command.NoLocal = noLocal;
            ConsumerId consumerId = command.ConsumerId;
            MessageConsumer consumer = null;

            // Registered with Connection before we register at the broker.
            connection.addDispatcher(consumerId, this);

            try
            {
                consumer = new MessageConsumer(this, command);
                // lets register the consumer first in case we start dispatching messages immediately
                consumers[consumerId] = consumer;
                this.Connection.SyncRequest(command);

                if(this.Started)
                {
                    consumer.Start();
                }

                return consumer;
            }
            catch(Exception)
            {
                if(consumer != null)
                {
                    consumer.Close();
                }

                throw;
            }
        }

        public IMessageConsumer CreateDurableConsumer(ITopic destination, string name, string selector, bool noLocal)
        {
            if (destination == null)
            {
                throw new InvalidDestinationException("Cannot create a Consumer with a Null destination");
            }

            ConsumerInfo command = CreateConsumerInfo(destination, selector);
            ConsumerId consumerId = command.ConsumerId;
            command.SubscriptionName = name;
            command.NoLocal = noLocal;
            command.PrefetchSize = this.connection.PrefetchPolicy.DurableTopicPrefetch;
            MessageConsumer consumer = null;

            // Registered with Connection before we register at the broker.
            connection.addDispatcher(consumerId, this);

            try
            {
                consumer = new MessageConsumer(this, command);
                // lets register the consumer first in case we start dispatching messages immediately
                consumers[consumerId] = consumer;

                if(this.Started)
                {
                    consumer.Start();
                }

                this.connection.SyncRequest(command);
            }
            catch(Exception)
            {
                if(consumer != null)
                {
                    consumer.Close();
                }

                throw;
            }

            return consumer;
        }

        public void DeleteDurableConsumer(string name)
        {
            RemoveSubscriptionInfo command = new RemoveSubscriptionInfo();
            command.ConnectionId = Connection.ConnectionId;
            command.ClientId = Connection.ClientId;
            command.SubscriptionName = name;
            this.connection.SyncRequest(command);
        }

        public IQueueBrowser CreateBrowser(IQueue queue)
        {
            throw new NotSupportedException("Not Yet Implemented");
        }

        public IQueueBrowser CreateBrowser(IQueue queue, string selector)
        {
            throw new NotSupportedException("Not Yet Implemented");
        }

        public IQueue GetQueue(string name)
        {
            return new Commands.Queue(name);
        }

        public ITopic GetTopic(string name)
        {
            return new Commands.Topic(name);
        }

        public ITemporaryQueue CreateTemporaryQueue()
        {
            TempQueue answer = new TempQueue(Connection.CreateTemporaryDestinationName());
            return answer;
        }

        public ITemporaryTopic CreateTemporaryTopic()
        {
            TempTopic answer = new TempTopic(Connection.CreateTemporaryDestinationName());
            return answer;
        }

        /// <summary>
        /// Delete a destination (Queue, Topic, Temp Queue, Temp Topic).
        /// </summary>
        public void DeleteDestination(IDestination destination)
        {
            // Not Possible with Stomp
        }

        public IMessage CreateMessage()
        {
            Message answer = new Message();
            return ConfigureMessage(answer) as IMessage;
        }

        public ITextMessage CreateTextMessage()
        {
            TextMessage answer = new TextMessage();
            return ConfigureMessage(answer) as ITextMessage;
        }

        public ITextMessage CreateTextMessage(string text)
        {
            TextMessage answer = new TextMessage(text);
            return ConfigureMessage(answer) as ITextMessage;
        }

        public IMapMessage CreateMapMessage()
        {
            throw new NotSupportedException("No Object Message in Stomp");
        }

        public IBytesMessage CreateBytesMessage()
        {
            return ConfigureMessage(new BytesMessage()) as IBytesMessage;
        }

        public IBytesMessage CreateBytesMessage(byte[] body)
        {
            BytesMessage answer = new BytesMessage();
            answer.Content = body;
            return ConfigureMessage(answer) as IBytesMessage;
        }

        public IStreamMessage CreateStreamMessage()
        {
            throw new NotSupportedException("No Object Message in Stomp");
        }

        public IObjectMessage CreateObjectMessage(object body)
        {
            throw new NotSupportedException("No Object Message in Stomp");
        }

        public void Commit()
        {
            if(!Transacted)
            {
                throw new InvalidOperationException(
                        "You cannot perform a Commit() on a non-transacted session. Acknowlegement mode is: "
                        + this.AcknowledgementMode);
            }

            this.TransactionContext.Commit();
        }

        public void Rollback()
        {
            if(!Transacted)
            {
                throw new InvalidOperationException(
                        "You cannot perform a Commit() on a non-transacted session. Acknowlegement mode is: "
                        + this.AcknowledgementMode);
            }

            this.TransactionContext.Rollback();
        }

        #endregion

        public void DoSend( Message message, MessageProducer producer, TimeSpan sendTimeout )
        {
            Message msg = message;

            if(Transacted)
            {
                DoStartTransaction();
                msg.TransactionId = TransactionContext.TransactionId;
            }

            msg.RedeliveryCounter = 0;

            if(this.connection.CopyMessageOnSend)
            {
                msg = (Message)msg.Clone();
            }

            msg.OnSend();
            msg.ProducerId = msg.MessageId.ProducerId;

            if(sendTimeout.TotalMilliseconds <= 0 && !msg.ResponseRequired && !connection.AlwaysSyncSend &&
               (!msg.Persistent || connection.AsyncSend || msg.TransactionId != null))
            {
                this.connection.Oneway(msg);
            }
            else
            {
                if(sendTimeout.TotalMilliseconds > 0)
                {
                    this.connection.SyncRequest(msg, sendTimeout);
                }
                else
                {
                    this.connection.SyncRequest(msg);
                }
            }
        }

        /// <summary>
        /// Ensures that a transaction is started
        /// </summary>
        public void DoStartTransaction()
        {
            if(Transacted)
            {
                this.TransactionContext.Begin();
            }
        }

        public void DisposeOf(ConsumerId objectId)
        {
            connection.removeDispatcher(objectId);
            if(!this.closing)
            {
                consumers.Remove(objectId);
            }
        }

        public void DisposeOf(ProducerId objectId)
        {
            if(!this.closing)
            {
                producers.Remove(objectId);
            }
        }

        protected virtual ConsumerInfo CreateConsumerInfo(IDestination destination, string selector)
        {
            ConsumerInfo answer = new ConsumerInfo();
            ConsumerId id = new ConsumerId();
            id.ConnectionId = info.SessionId.ConnectionId;
            id.SessionId = info.SessionId.Value;
            id.Value = Interlocked.Increment(ref consumerCounter);
            answer.ConsumerId = id;
            answer.Destination = Destination.Transform(destination);
            answer.Selector = selector;
            answer.Priority = this.Priority;
            answer.Exclusive = this.Exclusive;
            answer.DispatchAsync = this.DispatchAsync;
            answer.Retroactive = this.Retroactive;
            answer.MaximumPendingMessageLimit = this.connection.PrefetchPolicy.MaximumPendingMessageLimit;
            answer.AckMode = this.AcknowledgementMode;

            if(destination is ITopic || destination is ITemporaryTopic)
            {
                answer.PrefetchSize = this.connection.PrefetchPolicy.TopicPrefetch;
            }
            else if(destination is IQueue || destination is ITemporaryQueue)
            {
                answer.PrefetchSize = this.connection.PrefetchPolicy.QueuePrefetch;
            }

            // If the destination contained a URI query, then use it to set public properties
            // on the ConsumerInfo
            Destination amqDestination = destination as Destination;
            if(amqDestination != null && amqDestination.Options != null)
            {
                URISupport.SetProperties(answer, amqDestination.Options, "consumer.");
            }

            return answer;
        }

        protected virtual ProducerInfo CreateProducerInfo(IDestination destination)
        {
            ProducerInfo answer = new ProducerInfo();
            ProducerId id = new ProducerId();
            id.ConnectionId = info.SessionId.ConnectionId;
            id.SessionId = info.SessionId.Value;
            id.Value = Interlocked.Increment(ref producerCounter);
            answer.ProducerId = id;
            answer.Destination = Destination.Transform(destination);

            // If the destination contained a URI query, then use it to set public
            // properties on the ProducerInfo
            Destination amqDestination = destination as Destination;
            if(amqDestination != null && amqDestination.Options != null)
            {
                URISupport.SetProperties(answer, amqDestination.Options, "producer.");
            }

            return answer;
        }

        public void Stop()
        {
            if(this.executor != null)
            {
                this.executor.Stop();
            }
        }

        public void Start()
        {
            foreach(MessageConsumer consumer in this.consumers.Values)
            {
                consumer.Start();
            }

            if(this.executor != null)
            {
                this.executor.Start();
            }
        }

        public bool Started
        {
            get
            {
                return this.executor != null ? this.executor.Running : false;
            }
        }

        internal void Redispatch(MessageDispatchChannel channel)
        {
            MessageDispatch[] messages = channel.RemoveAll();
            System.Array.Reverse(messages);

            foreach(MessageDispatch message in messages)
            {
                if(Tracer.IsDebugEnabled)
                {
                    Tracer.DebugFormat("Resending Message Dispatch: ", message.ToString());
                }
                this.executor.ExecuteFirst(message);
            }
        }

        public void Dispatch(MessageDispatch dispatch)
        {
            if(this.executor != null)
            {
                if(Tracer.IsDebugEnabled)
                {
                    Tracer.DebugFormat("Send Message Dispatch: ", dispatch.ToString());
                }
                this.executor.Execute(dispatch);
            }
        }

        internal void ClearMessagesInProgress()
        {
            if( this.executor != null ) {
                this.executor.ClearMessagesInProgress();
            }

            lock(this.consumers.SyncRoot)
            {
                foreach(MessageConsumer consumer in this.consumers)
                {
                    consumer.ClearMessagesInProgress();
                }
            }
        }

        internal void Acknowledge()
        {
            lock(this.consumers.SyncRoot)
            {
                foreach(MessageConsumer consumer in this.consumers.Values)
                {
                    consumer.Acknowledge();
                }
            }
        }

        private Message ConfigureMessage(Message message)
        {
            message.Connection = this.connection;

            if(this.IsTransacted)
            {
                // Allows Acknowledge to be called in a transaction with no effect per JMS Spec.
                message.Acknowledger += new AcknowledgeHandler(DoNothingAcknowledge);
            }

            return message;
        }

        internal void SendAck(MessageAck ack)
        {
            this.SendAck(ack, false);
        }

        internal void SendAck(MessageAck ack, bool lazy)
        {
            if(lazy || connection.SendAcksAsync || this.IsTransacted )
            {
                this.connection.Oneway(ack);
            }
            else
            {
                this.connection.SyncRequest(ack);
            }
        }

        /// <summary>
        /// Prevents message from throwing an exception if a client calls Acknoweldge on
        /// a message that is part of a transaction either being produced or consumed.  The
        /// JMS Spec indicates that users should be able to call Acknowledge with no effect
        /// if the message is in a transaction.
        /// </summary>
        /// <param name="message">
        /// A <see cref="Message"/>
        /// </param>
        private void DoNothingAcknowledge(Message message)
        {
        }

    }
}
