/*
 * Licensed to the Apache Software Foundation (ASF) under one or more
 * contributor license agreements.  See the NOTICE file distributed with
 * this work for additional information regarding copyright ownership.
 * The ASF licenses this file to You under the Apache License, Version 2.0
 * (the "License"); you may not use this file except in compliance with
 * the License.  You may obtain a copy of the License at
 *
 *      http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

using System;
using System.Collections;
using System.Collections.Generic;
using System.Threading;
using System.Text;
using System.Net;
using Apache.NMS.ActiveMQ.Commands;
using Apache.NMS.ActiveMQ.State;
using Apache.NMS.ActiveMQ.Threads;
using Apache.NMS.Util;

namespace Apache.NMS.ActiveMQ.Transport.Failover
{
    /// <summary>
    /// A Transport that is made reliable by being able to fail over to another
    /// transport when a transport failure is detected.
    /// </summary>
    public class FailoverTransport : ICompositeTransport, IComparable
    {
		private static int DEFAULT_INITIAL_RECONNECT_DELAY = 10;
		private static int INFINITE = -1;

        private static int idCounter = 0;
        private readonly int id;

        private bool disposed;
        private bool connected;
        private readonly List<Uri> uris = new List<Uri>();
        private readonly List<Uri> updated = new List<Uri>();

        private CommandHandler commandHandler;
        private ExceptionHandler exceptionHandler;
        private InterruptedHandler interruptedHandler;
        private ResumedHandler resumedHandler;

		private readonly CountDownLatch listenerLatch = new CountDownLatch(4);
        private readonly Mutex reconnectMutex = new Mutex();
        private readonly Mutex backupMutex = new Mutex();
        private readonly Mutex sleepMutex = new Mutex();
        private readonly ConnectionStateTracker stateTracker = new ConnectionStateTracker();
        private readonly Dictionary<int, Command> requestMap = new Dictionary<int, Command>();

        private Uri connectedTransportURI;
        private Uri failedConnectTransportURI;
        private readonly AtomicReference<ITransport> connectedTransport = new AtomicReference<ITransport>(null);
        private TaskRunner reconnectTask = null;
        private bool started;
        private bool initialized;
        private int initialReconnectDelay = DEFAULT_INITIAL_RECONNECT_DELAY;
        private int maxReconnectDelay = 1000 * 30;
        private int backOffMultiplier = 2;
        private int timeout = INFINITE;
        private bool useExponentialBackOff = true;
        private bool randomize = true;
        private int maxReconnectAttempts = INFINITE;
        private int startupMaxReconnectAttempts = INFINITE;
        private int connectFailures;
        private int reconnectDelay = DEFAULT_INITIAL_RECONNECT_DELAY;
        private Exception connectionFailure;
        private bool firstConnection = true;
        private bool backup = false;
        private readonly List<BackupTransport> backups = new List<BackupTransport>();
        private int backupPoolSize = 1;
        private bool trackMessages = false;
    	private bool trackTransactionProducers = true;
        private int maxCacheSize = 256;
        private volatile Exception failure;
        private readonly object mutex = new object();
        private bool reconnectSupported = true;
        private bool updateURIsSupported = true;
    	private bool doRebalance = false;
    	private bool connectedToPriority = false;
	 	private bool priorityBackup = false;
    	private List<Uri> priorityList = new List<Uri>();
    	private bool priorityBackupAvailable = false;

		// Not Sure how to work these back in with all the changes.
		//private int asyncTimeout = 45000;
        //private bool asyncConnect = false;

        public FailoverTransport()
        {
            id = idCounter++;

            stateTracker.TrackTransactions = true;
            reconnectTask = DefaultThreadPools.DefaultTaskRunnerFactory.CreateTaskRunner(
                new FailoverTask(this), "ActiveMQ Failover Worker: " + this.GetHashCode().ToString());
        }

        ~FailoverTransport()
        {
            Dispose(false);
        }

        #region FailoverTask

        private class FailoverTask : Task
        {
            private readonly FailoverTransport parent;

            public FailoverTask(FailoverTransport p)
            {
                parent = p;
            }

            public bool Iterate()
            {
                bool result = false;
                if (!parent.IsStarted)
                {
                    return false;
                }

                bool buildBackup = true;
                lock (parent.backupMutex) 
				{
                    if ((parent.connectedTransport.Value == null || parent.doRebalance || parent.priorityBackupAvailable) && !parent.disposed)
					{
                        result = parent.DoConnect();
                        buildBackup = false;
                    }
                }
                if (buildBackup) 
				{
                    parent.BuildBackups();
                    if (parent.priorityBackup && !parent.connectedToPriority) 
					{
                        try 
						{
                            parent.DoDelay();
                            if (parent.reconnectTask == null)
							{
                                return true;
                            }
                            parent.reconnectTask.Wakeup();
                        } 
						catch (ThreadInterruptedException) 
						{
                        	Tracer.Debug("Reconnect task has been interrupted.");
                        }
                    }
                }
				else 
				{
                    try 
					{
                        if (parent.reconnectTask == null) 
						{
                            return true;
                        }
                        parent.reconnectTask.Wakeup();
                    }
					catch (ThreadInterruptedException) 
					{
                        Tracer.Debug("Reconnect task has been interrupted.");
                    }
                }
                return result;
            }
        }

        #endregion

        #region Property Accessors

        public CommandHandler Command
        {
            get { return commandHandler; }
            set 
			{ 
				commandHandler = value; 
				listenerLatch.countDown();
			}
        }

        public ExceptionHandler Exception
        {
            get { return exceptionHandler; }
            set 
			{ 
				exceptionHandler = value; 
				listenerLatch.countDown();
			}
        }

        public InterruptedHandler Interrupted
        {
            get { return interruptedHandler; }
            set 
			{ 
				this.interruptedHandler = value; 
				this.listenerLatch.countDown();
			}
        }

        public ResumedHandler Resumed
        {
            get { return resumedHandler; }
            set 
			{ 
				this.resumedHandler = value; 
				this.listenerLatch.countDown();
			}
        }

        internal Exception Failure
        {
            get { return failure; }
            set
            {
                lock(mutex)
                {
                    failure = value;
                }
            }
        }

        public int Timeout
        {
            get { return this.timeout; }
            set { this.timeout = value; }
        }

        public int InitialReconnectDelay
        {
            get { return initialReconnectDelay; }
            set { initialReconnectDelay = value; }
        }

        public int MaxReconnectDelay
        {
            get { return maxReconnectDelay; }
            set { maxReconnectDelay = value; }
        }

        public int ReconnectDelay
        {
            get { return reconnectDelay; }
            set { reconnectDelay = value; }
        }

        public int ReconnectDelayExponent
        {
            get { return backOffMultiplier; }
            set { backOffMultiplier = value; }
        }

        public ITransport ConnectedTransport
        {
            get { return connectedTransport.Value; }
            set { connectedTransport.Value = value; }
        }

        public Uri ConnectedTransportURI
        {
            get { return connectedTransportURI; }
            set { connectedTransportURI = value; }
        }

        public int MaxReconnectAttempts
        {
            get { return maxReconnectAttempts; }
            set { maxReconnectAttempts = value; }
        }

        public int StartupMaxReconnectAttempts
        {
            get { return startupMaxReconnectAttempts; }
            set { startupMaxReconnectAttempts = value; }
        }

        public bool Randomize
        {
            get { return randomize; }
            set { randomize = value; }
        }

        public bool Backup
        {
            get { return backup; }
            set { backup = value; }
        }

		public bool PriorityBackup
		{
			get { return priorityBackup; }
			set { this.priorityBackup = value; }
		}

	    public String PriorityURIs
		{
			get { return PrintableUriList(priorityList); }
			set { this.ProcessDelimitedUriList(value, priorityList); }
	    }

        public int BackupPoolSize
        {
            get { return backupPoolSize; }
            set { backupPoolSize = value; }
        }

        public bool TrackMessages
        {
            get { return trackMessages; }
            set { trackMessages = value; }
        }

		public bool TrackTransactionProducers
		{
			get { return trackTransactionProducers; }
			set { this.trackTransactionProducers = value; }
		}

        public int MaxCacheSize
        {
            get { return maxCacheSize; }
            set { maxCacheSize = value; }
        }

        public bool UseExponentialBackOff
        {
            get { return useExponentialBackOff; }
            set { useExponentialBackOff = value; }
        }

        public IWireFormat WireFormat
        {
            get
            {
                ITransport transport = ConnectedTransport;
                if(transport != null)
                {
                    return transport.WireFormat;
                }

                return null;
            }
        }

        /// <summary>
        /// Gets or sets a value indicating whether to asynchronously connect to sockets
        /// </summary>
        /// <value><c>true</c> if [async connect]; otherwise, <c>false</c>.</value>
        public bool AsyncConnect
        {
            set { }
        }

        /// <summary>
        /// If doing an asynchronous connect, the milliseconds before timing out if no connection can be made
        /// </summary>
        /// <value>The async timeout.</value>
        public int AsyncTimeout
        {
            get { return 0; }
            set { }
        }

        public ConnectionStateTracker StateTracker
        {
            get { return this.stateTracker; }
        }

        #endregion

        public bool IsFaultTolerant
        {
            get { return true; }
        }

        public bool IsDisposed
        {
            get { return disposed; }
        }

        public bool IsConnected
        {
            get { return connected; }
        }

        public bool IsConnectedToPriority
        {
            get { return connectedToPriority; }
        }

        public bool IsStarted
        {
            get { return started; }
        }

        public bool IsReconnectSupported
        {
            get { return this.reconnectSupported; }
        }

        public bool IsUpdateURIsSupported
        {
            get { return this.updateURIsSupported; }
        }

        public void OnException(ITransport sender, Exception error)
        {
            try
            {
                HandleTransportFailure(error);
            }
            catch(Exception)
            {
				this.Exception(this, new IOException("Unexpected Transport Failure."));
            }
        }

        public void DisposedOnCommand(ITransport sender, Command c)
        {
        }

        public void DisposedOnException(ITransport sender, Exception e)
        {
        }

        public void HandleTransportFailure(Exception e)
        {
            ITransport transport = connectedTransport.GetAndSet(null);
	        if (transport == null) 
			{
	            // sync with possible in progress reconnect
	            lock(reconnectMutex) 
				{
	                transport = connectedTransport.GetAndSet(null);
	            }
	        }

			if(transport != null)
            {
				DisposeTransport(transport);

	            bool reconnectOk = false;
	            lock(reconnectMutex) 
				{
	                if (CanReconnect()) 
					{
                    	Tracer.WarnFormat("Transport failed to {0}, attempting to automatically reconnect due to: {1}", 
						                  ConnectedTransportURI, e.Message);
	                    reconnectOk = true;
	                }

                    initialized = false;
                    failedConnectTransportURI = ConnectedTransportURI;
                    ConnectedTransportURI = null;
					connectedToPriority = false;
                    connected = false;

                    if(this.Interrupted != null)
                    {
                        this.Interrupted(transport);
                    }

	                if (reconnectOk) 
					{
	                    updated.Remove(failedConnectTransportURI);
	                    reconnectTask.Wakeup();
	                }
					else if (!disposed) 
					{
	                    PropagateFailureToExceptionListener(e);
	                }
	            }
            }
        }

	    private bool CanReconnect() 
		{
	        return started && 0 != CalculateReconnectAttemptLimit();
	    }

        public void Start()
        {
            lock(reconnectMutex)
            {
                if(started)
                {
                    Tracer.Debug("FailoverTransport Already Started.");
                    return;
                }

                Tracer.Debug("FailoverTransport Started.");
                started = true;
                stateTracker.MaxCacheSize = MaxCacheSize;
                stateTracker.TrackMessages = TrackMessages;
				stateTracker.TrackTransactionProducers = TrackTransactionProducers;
                if(ConnectedTransport != null)
                {
                    Tracer.Debug("FailoverTransport already connected, start is restoring.");
                    stateTracker.DoRestore(ConnectedTransport);
                }
                else
                {
                    Tracer.Debug("FailoverTransport not connected, start is reconnecting.");
                    Reconnect(false);
                }
            }
        }

        public virtual void Stop()
        {
            ITransport transportToStop = null;
	        List<ITransport> backupsToStop = new List<ITransport>(backups.Count);

			try 
			{
	            lock(reconnectMutex)
	            {
	                if(!started)
	                {
	                    Tracer.Debug("FailoverTransport Already Stopped.");
	                    return;
	                }

	                Tracer.Debug("FailoverTransport Stopped.");
	                started = false;
	                disposed = true;
	                connected = false;
	                if(ConnectedTransport != null)
	                {
	                    transportToStop = connectedTransport.GetAndSet(null);
	                }
	            }
				lock(sleepMutex)
				{
					Monitor.PulseAll(sleepMutex);
				}
			}
			finally
			{
            	if(reconnectTask != null)
            	{
	                reconnectTask.Shutdown();
            	}
			}

	        lock(backupMutex) 
			{
	            foreach (BackupTransport backup in backups) 
				{
	                backup.Disposed = true;
	                ITransport transport = backup.Transport;
	                if (transport != null) 
					{
	                    transport.Command = DisposedOnCommand;
						transport.Exception = DisposedOnException;
	                    backupsToStop.Add(transport);
	                }
	            }
	            backups.Clear();
	        }
	        
			foreach (ITransport transport in backupsToStop) 
			{
	            try 
				{
	                if (Tracer.IsDebugEnabled) 
					{
	                    Tracer.Debug("Stopped backup: " + transport);
	                }
	                DisposeTransport(transport);
	            } 
				catch (Exception) 
				{
	            }
	        }

			if(transportToStop != null)
            {
                transportToStop.Stop();
            }
        }

        public FutureResponse AsyncRequest(Command command)
        {
            throw new ApplicationException("FailoverTransport does not implement AsyncRequest(Command)");
        }

        public Response Request(Command command)
        {
            throw new ApplicationException("FailoverTransport does not implement Request(Command)");
        }

        public Response Request(Command command, TimeSpan ts)
        {
            throw new ApplicationException("FailoverTransport does not implement Request(Command, TimeSpan)");
        }

        public void OnCommand(ITransport sender, Command command)
        {
            if(command != null)
            {
                if(command.IsResponse)
                {
                    Command request = null;
                    lock(((ICollection) requestMap).SyncRoot)
                    {
                        int v = ((Response) command).CorrelationId;
                        try
                        {
                            if(requestMap.TryGetValue(v, out request))
                            {
                                requestMap.Remove(v);
                            }
                        }
                        catch
                        {
                        }
                    }

                    Tracked tracked = request as Tracked;
                    if(tracked != null)
                    {
                        tracked.OnResponse();
                    }
                }

                if(!initialized)
                {
                    initialized = true;
                }

                if(command.IsConnectionControl)
                {
                    this.HandleConnectionControl(command as ConnectionControl);
                }
            }

            this.Command(sender, command);
        }

        public void Oneway(Command command)
        {
            Exception error = null;

            lock(reconnectMutex)
            {
                if(command != null && ConnectedTransport == null)
                {
                    if(command.IsShutdownInfo)
                    {
                        // Skipping send of ShutdownInfo command when not connected.
                        return;
                    }
                    else if(command.IsRemoveInfo || command.IsMessageAck)
                    {
                        stateTracker.Track(command);
                        // Simulate response to RemoveInfo command or a MessageAck
                        // since it would be stale at this point.
                        if(command.ResponseRequired)
                        {
                            OnCommand(this, new Response() { CorrelationId = command.CommandId });
                        }
                        return;
                    }
					else if(command.IsMessagePull) 
					{
                        // Simulate response to MessagePull if timed as we can't honor that now.
                        MessagePull pullRequest = command as MessagePull;
                        if (pullRequest.Timeout != 0) 
						{
                            MessageDispatch dispatch = new MessageDispatch();
                            dispatch.ConsumerId = pullRequest.ConsumerId;
                            dispatch.Destination = pullRequest.Destination;
                            OnCommand(this, dispatch);
                        }
                        return;
                    }
                }

                // Keep trying until the message is sent.
                for(int i = 0; !disposed; i++)
                {
                    try
                    {
                        // Any Ack that was being sent when the connection dropped is now
                        // stale so we don't send it here as it would cause an unmatched ack
                        // on the broker side and probably prevent a consumer from getting
                        // any new messages.
                        if(command.IsMessageAck && i > 0)
                        {
                            Tracer.Debug("Inflight MessageAck being dropped as stale.");
                            if(command.ResponseRequired)
                            {
                                OnCommand(this, new Response() { CorrelationId = command.CommandId });
                            }
                            return;
                        }

                        // Wait for transport to be connected.
                        ITransport transport = ConnectedTransport;
                        DateTime start = DateTime.Now;
                        bool timedout = false;
                        while(transport == null && !disposed && connectionFailure == null)
                        {
                            Tracer.Info("Waiting for transport to reconnect.");

                            int elapsed = (int) (DateTime.Now - start).TotalMilliseconds;
                            if(this.timeout > 0 && elapsed > this.timeout)
                            {
                                timedout = true;
                                Tracer.DebugFormat("FailoverTransport.oneway - timed out after {0} mills", elapsed);
                                break;
                            }

                            // Release so that the reconnect task can run
                            try
                            {
                                // Wait for something.  The mutex will be pulsed if we connect.
                                Monitor.Wait(reconnectMutex, 100);
                            }
                            catch(ThreadInterruptedException e)
                            {
                                Tracer.DebugFormat("Interrupted: {0}", e.Message);
                            }

                            transport = ConnectedTransport;
                        }

                        if(transport == null)
                        {
                            // Previous loop may have exited due to use being disposed.
                            if(disposed)
                            {
                                error = new IOException("Transport disposed.");
                            }
                            else if(connectionFailure != null)
                            {
                                error = connectionFailure;
                            }
                            else if(timedout)
                            {
                                error = new IOException("Failover oneway timed out after " + timeout + " milliseconds.");
                            }
                            else
                            {
                                error = new IOException("Unexpected failure.");
                            }
                            break;
                        }

                        // If it was a request and it was not being tracked by
                        // the state tracker, then hold it in the requestMap so
                        // that we can replay it later.
                        Tracked tracked = stateTracker.Track(command);
                        lock(((ICollection) requestMap).SyncRoot)
                        {
                            if(tracked != null && tracked.WaitingForResponse)
                            {
                                requestMap.Add(command.CommandId, tracked);
                            }
                            else if(tracked == null && command.ResponseRequired)
                            {
                                requestMap.Add(command.CommandId, command);
                            }
                        }

                        // Send the message.
                        try
                        {
                            transport.Oneway(command);
                            stateTracker.TrackBack(command);
                        }
                        catch(Exception e)
                        {
                            // If the command was not tracked.. we will retry in this method
                            // otherwise we need to trigger a reconnect before returning as
                            // the transport is failed.
                            if (tracked == null)
                            {
                                // since we will retry in this method.. take it
                                // out of the request map so that it is not
                                // sent 2 times on recovery
                                if(command.ResponseRequired)
                                {
                                    lock(((ICollection) requestMap).SyncRoot)
                                    {
                                        requestMap.Remove(command.CommandId);
                                    }
                                }

                                // Rethrow the exception so it will handled by
                                // the outer catch
                                throw;
                            }
                            else
                            {
								if (Tracer.IsDebugEnabled)
								{
                                	Tracer.DebugFormat("Send Oneway attempt: {0} failed: Message = {1}", 
									                   i, e.Message);
                                	Tracer.DebugFormat("Failed Message Was: {0}", command);
								}
                                HandleTransportFailure(e);
                            }
                        }

                        return;
                    }
                    catch(Exception e)
                    {
						if (Tracer.IsDebugEnabled)
						{
                        	Tracer.DebugFormat("Send Oneway attempt: {0} failed: Message = {1}", 
							                   i, e.Message);
                        	Tracer.DebugFormat("Failed Message Was: {0}", command);
						}
                        HandleTransportFailure(e);
                    }
                }
            }

            if(!disposed)
            {
                if(error != null)
                {
                    throw error;
                }
            }
        }

        public void Add(bool rebalance, Uri[] urisToAdd)
        {
			bool newUri = false;
            lock(uris)
            {
                foreach (Uri uri in urisToAdd)
                {
                    if(!Contains(uri))
                    {
                        uris.Add(uri);
						newUri = true;
                    }
                }
            }

			if (newUri)
			{
            	Reconnect(rebalance);
			}
        }

        public void Add(bool rebalance, String u)
        {
            try
            {
                Add(rebalance, new Uri[] { new Uri(u) });
            }
            catch(Exception e)
            {
                Tracer.ErrorFormat("Failed to parse URI '{0}': {1}", u, e.Message);
            }
        }

        public void Remove(bool rebalance, Uri[] u)
        {
            lock(uris)
            {
                for(int i = 0; i < u.Length; i++)
                {
                    uris.Remove(u[i]);
                }
            }

            Reconnect(rebalance);
        }

        public void Remove(bool rebalance, String u)
        {
            try
            {
                Remove(rebalance, new Uri[] { new Uri(u) });
            }
            catch(Exception e)
            {
                Tracer.ErrorFormat("Failed to parse URI '{0}': {1}", u, e.Message);
            }
        }

        public void Reconnect(Uri uri)
        {
            Add(true, new Uri[] { uri });
        }

	    public void Reconnect(bool rebalance)
		{
			lock(reconnectMutex) 
			{
	            if(started) 
				{
	                if (rebalance) 
					{
	                    doRebalance = true;
	                }
                    Tracer.Debug("Waking up reconnect task");
	                try 
					{
	                    reconnectTask.Wakeup();
	                } 
					catch (ThreadInterruptedException) 
					{
	                }
	            } 
				else 
				{
                    Tracer.Debug("Reconnect was triggered but transport is not started yet. Wait for start to connect the transport.");
	            }
	        }
	    }

        private List<Uri> ConnectList
        {
            get
            {
				if (updated.Count != 0)
				{
					return updated;
				}

                List<Uri> l = new List<Uri>(uris);
                bool removed = false;
                if(failedConnectTransportURI != null)
                {
                    removed = l.Remove(failedConnectTransportURI);
                }

                if(Randomize)
                {
					Shuffle(l);
                }

                if(removed)
                {
                    l.Add(failedConnectTransportURI);
                }

		        if (Tracer.IsDebugEnabled)
				{
					Tracer.DebugFormat("Uri connection list: {0} from: {1}", 
					                   PrintableUriList(l), PrintableUriList(uris));
		        }

                return l;
            }
        }

        protected void RestoreTransport(ITransport t)
        {
            Tracer.Info("Restoring previous transport connection.");
            t.Start();

            // Send information to the broker - informing it we are a fault tolerant client
            t.Oneway(new ConnectionControl() { FaultTolerant = true });
            stateTracker.DoRestore(t);

            Tracer.Info("Sending queued commands...");
            Dictionary<int, Command> tmpMap = null;
            lock(((ICollection) requestMap).SyncRoot)
            {
                tmpMap = new Dictionary<int, Command>(requestMap);
            }

            foreach(Command command in tmpMap.Values)
            {
                if(command.IsMessageAck)
                {
                    Tracer.Debug("Stored MessageAck being dropped as stale.");
                    OnCommand(this, new Response() { CorrelationId = command.CommandId });
                    continue;
                }

                t.Oneway(command);
            }
        }

        public Uri RemoteAddress
        {
            get
            {
                if(ConnectedTransport != null)
                {
                    return ConnectedTransport.RemoteAddress;
                }
                return null;
            }
        }

        public Object Narrow(Type type)
        {
            if(this.GetType().Equals(type))
            {
                return this;
            }
            else if(ConnectedTransport != null)
            {
                return ConnectedTransport.Narrow(type);
            }

            return null;
        }

        private bool DoConnect()
        {
            lock(reconnectMutex)
            {
				if (disposed || connectionFailure != null)
				{
					Monitor.PulseAll(reconnectMutex);
				}

            	if ((connectedTransport.Value != null && !doRebalance && !priorityBackupAvailable) || disposed || connectionFailure != null)
				{
                    return false;
                }
                else
                {
                    List<Uri> connectList = ConnectList;
                    if(connectList.Count == 0)
                    {
                        Failure = new NMSConnectionException("No URIs available for connection.");
                    }
                    else
                    {
	                    if (doRebalance)
						{
	                        if (connectedToPriority || CompareUris(connectList[0], connectedTransportURI))
							{
	                            // already connected to first in the list, no need to rebalance
	                            doRebalance = false;
	                            return false;
	                        } 
							else
							{
	                            if (Tracer.IsDebugEnabled)
								{
									Tracer.DebugFormat("Doing rebalance from: {0} to {1}", 
									                   connectedTransportURI, PrintableUriList(connectList));
	                            }
	                            try 
								{
	                                ITransport current = this.connectedTransport.GetAndSet(null);
	                                if (current != null) 
									{
	                                    DisposeTransport(current);
	                                }
	                            } 
								catch (Exception e) 
								{
	                            	if (Tracer.IsDebugEnabled)
									{
										Tracer.DebugFormat("Caught an exception stopping existing " + 
										                   "transport for rebalance {0}", e.Message);
	                                }
	                            }
	                        }

	                        doRebalance = false;
	                    }

	                    ResetReconnectDelay();

	                    ITransport transport = null;
	                    Uri uri = null;

	                    // If we have a backup already waiting lets try it.
	                    lock(backupMutex) 
						{
	                        if ((priorityBackup || backup) && backups.Count > 0)
							{
                            	List<BackupTransport> l = new List<BackupTransport>(backups);
	                            if (randomize) 
								{
									Shuffle(l);
	                            }
								BackupTransport bt = l[0];
								l.RemoveAt(0);
	                            backups.Remove(bt);
	                            transport = bt.Transport;
	                            uri = bt.Uri;
	                            if (priorityBackup && priorityBackupAvailable) 
								{
	                                ITransport old = this.connectedTransport.GetAndSet(null);
	                                if (old != null) 
									{
	                                    DisposeTransport(old);
	                                }
	                                priorityBackupAvailable = false;
	                            }
	                        }
	                    }

	                    // Sleep for the reconnectDelay if there's no backup and we aren't trying
	                    // for the first time, or we were disposed for some reason.
	                    if (transport == null && !firstConnection && (reconnectDelay > 0) && !disposed) 
						{
	                        lock(sleepMutex) 
							{
	                            if (Tracer.IsDebugEnabled)
								{
									Tracer.DebugFormat("Waiting {0} ms before attempting connection.", reconnectDelay);
	                            }
	                            try 
								{
	                                Monitor.Wait(sleepMutex, reconnectDelay);
	                            }
								catch (ThreadInterruptedException)
								{
	                            }
	                        }
	                    }

						IEnumerator<Uri> iter = connectList.GetEnumerator();
	                    while ((transport != null || iter.MoveNext()) && (connectedTransport.Value == null && !disposed))
						{
	                        try 
							{
	                            if (Tracer.IsDebugEnabled)
								{
									Tracer.DebugFormat("Attempting {0}th connect to: {1}",
									                   connectFailures, uri);
	                            }

								// We could be starting with a backup and if so we wait to grab a
	                            // URI from the pool until next time around.
	                            if (transport == null) 
								{
	                                uri = iter.Current;
	                                transport = TransportFactory.CompositeConnect(uri);
	                            }

                                transport.Command = OnCommand;
                                transport.Exception = OnException;
	                            transport.Start();

	                            if (started && !firstConnection) 
								{
	                                RestoreTransport(transport);
	                            }

	                            if (Tracer.IsDebugEnabled)
								{
	                                Tracer.Debug("Connection established");
	                            }
	                            reconnectDelay = initialReconnectDelay;
	                            connectedTransportURI = uri;
	                            connectedTransport.Value = transport;
								connectedToPriority = IsPriority(connectedTransportURI);
	                            Monitor.PulseAll(reconnectMutex);
	                            connectFailures = 0;

								// Try to wait long enough for client to init the event callbacks.
								listenerLatch.await(TimeSpan.FromSeconds(2));

	                            if (Resumed != null) 
								{
	                                Resumed(transport);
	                            }
								else 
								{
	                                if (Tracer.IsDebugEnabled) 
									{
	                                    Tracer.Debug("transport resumed by transport listener not set");
	                                }
	                            }

	                            if (firstConnection) 
								{
	                                firstConnection = false;
	                                Tracer.Info("Successfully connected to " + uri);
	                            }
								else 
								{
	                                Tracer.Info("Successfully reconnected to " + uri);
	                            }

	                            connected = true;
	                            return false;
	                        }
							catch (Exception e) 
							{
	                            failure = e;
                                if (Tracer.IsDebugEnabled) 
								{
	                                Tracer.Debug("Connect fail to: " + uri + ", reason: " + e.Message);
	                            }
	                            if (transport != null) 
								{
	                                try 
									{
	                                    transport.Stop();
	                                    transport = null;
	                                }
									catch (Exception ee) 
									{
	                                	if (Tracer.IsDebugEnabled) 
										{
	                                        Tracer.Debug("Stop of failed transport: " + transport +
	                                                     " failed with reason: " + ee.Message);
	                                    }
	                                }
	                            }
	                        }
	                    }
					}
				}
            
	            int reconnectLimit = CalculateReconnectAttemptLimit();

	            connectFailures++;
	            if (reconnectLimit != INFINITE && connectFailures >= reconnectLimit) 
				{
					Tracer.ErrorFormat("Failed to connect to {0} after: {1} attempt(s)", 
					                   PrintableUriList(uris), connectFailures);
	                connectionFailure = failure;

	                // Make sure on initial startup, that the transportListener has been
	                // initialized for this instance.
					listenerLatch.await(TimeSpan.FromSeconds(2));
	                PropagateFailureToExceptionListener(connectionFailure);
	                return false;
	            }
	        }

	        if(!disposed)
			{
	            DoDelay();
	        }

	        return !disposed;
        }

        private bool BuildBackups()
        {
            lock(backupMutex)
            {
            	if (!disposed && (backup || priorityBackup) && backups.Count < backupPoolSize) 
				{
	                List<Uri> backupList = new List<Uri>(priorityList);
                    List<Uri> connectList = ConnectList;
	                foreach(Uri uri in connectList) 
					{
	                    if (!backupList.Contains(uri)) 
						{
	                        backupList.Add(uri);
	                    }
	                }
                    foreach(BackupTransport bt in backups)
                    {
                        if(bt.Disposed)
                        {
                            backups.Remove(bt);
                        }
                    }

                    foreach(Uri uri in connectList)
                    {
						if (disposed)
						{
							break;
						}

                        if(ConnectedTransportURI != null && !ConnectedTransportURI.Equals(uri))
                        {
                            try
                            {
                                BackupTransport bt = new BackupTransport(this)
                                {
                                    Uri = uri
                                };

                                if(!backups.Contains(bt))
                                {
                                    ITransport t = TransportFactory.CompositeConnect(uri);
                                    t.Command = bt.OnCommand;
                                    t.Exception = bt.OnException;
                                    t.Start();
                                    bt.Transport = t;
	                                if (priorityBackup && IsPriority(uri))
									{
	                                   priorityBackupAvailable = true;
	                                   backups.Insert(0, bt);
	                                }
									else 
									{
	                                    backups.Add(bt);
	                                }
                                }
                            }
                            catch(Exception e)
                            {
                                Tracer.DebugFormat("Failed to build backup: {0}", e.Message);
                            }
                        }

                        if(backups.Count == BackupPoolSize)
                        {
                            break;
                        }
                    }
                }
            }

            return false;
        }

        public void ConnectionInterruptProcessingComplete(ConnectionId connectionId)
        {
            lock(reconnectMutex)
            {
                Tracer.Debug("Connection Interrupt Processing is complete for ConnectionId: " + connectionId);
                stateTracker.ConnectionInterruptProcessingComplete(this, connectionId);
            }
        }

        public void UpdateURIs(bool rebalance, Uri[] updatedURIs)
        {
            if(IsUpdateURIsSupported)
            {
                Dictionary<Uri, bool> copy = new Dictionary<Uri, bool>();
                foreach(Uri uri in updated)
                {
                    if(uri != null)
                    {
                        copy[uri] = true;
                    }
                }
	
				updated.Clear();

                if(updatedURIs != null && updatedURIs.Length > 0)
                {
                    Dictionary<Uri, bool> uriSet = new Dictionary<Uri, bool>();
                    for(int i = 0; i < updatedURIs.Length; i++)
                    {
                        Uri uri = updatedURIs[i];
                        if(uri != null)
                        {
                            uriSet[uri] = true;
                        }
                    }

                    foreach(Uri uri in uriSet.Keys)
                    {
                        if(!updated.Contains(uri))
                        {
							updated.Add(uri);
                        }
                    }

					if (Tracer.IsDebugEnabled)
					{
						Tracer.DebugFormat("Updated URIs list {0}", PrintableUriList(updated));
					}

	                if (!(copy.Count == 0 && updated.Count == 0) && !copy.Keys.Equals(updated))
					{
	                    BuildBackups();
	                    lock(reconnectMutex) 
						{
	                        Reconnect(rebalance);
	                    }
	                }
                }
            }
        }

        public void HandleConnectionControl(ConnectionControl control)
        {
            string reconnectStr = control.ReconnectTo;

            if(reconnectStr != null)
            {
                reconnectStr = reconnectStr.Trim();
                if(reconnectStr.Length > 0)
                {
                    try
                    {
                        Uri uri = new Uri(reconnectStr);
                        if(IsReconnectSupported)
                        {
                            Tracer.Info("Reconnecting to: " + uri.OriginalString);
                            Reconnect(uri);
                        }
                    }
                    catch(Exception e)
                    {
                        Tracer.ErrorFormat("Failed to handle ConnectionControl reconnect to {0}: {1}", reconnectStr, e);
                    }
                }
            }

            ProcessNewTransports(control.RebalanceConnection, control.ConnectedBrokers);
        }

        private void ProcessNewTransports(bool rebalance, String newTransports)
        {
            if(newTransports != null)
            {
                newTransports = newTransports.Trim();

                if(newTransports.Length > 0 && IsUpdateURIsSupported)
                {
                    List<Uri> list = new List<Uri>();
					ProcessDelimitedUriList(newTransports, list);

                    if(list.Count != 0)
                    {
                        try
                        {
                            UpdateURIs(rebalance, list.ToArray());
                        }
                        catch
                        {
                            Tracer.Error("Failed to update transport URI's from: " + newTransports);
                        }
                    }
                }
            }        
		}

		private void ProcessDelimitedUriList(String priorityUris, List<Uri> target)
		{
            String[] tokens = priorityUris.Split(new Char[] { ',' });

            foreach(String str in tokens)
            {
                try
                {
                    Uri uri = new Uri(str);
                    target.Add(uri);

					if (Tracer.IsDebugEnabled)
					{
						Tracer.DebugFormat("Adding new Uri[{0}] to list,", uri);
					}
                }
                catch (Exception e)
                {
					Tracer.ErrorFormat("Failed to parse broker address: {0} because of: {1}",
					                   str, e.Message);
                }
            }
		}

        public void Dispose()
        {
            Dispose(true);
            GC.SuppressFinalize(this);
        }

        public void Dispose(bool disposing)
        {
            this.Stop();
            disposed = true;
        }

        public int CompareTo(Object o)
        {
            if(o is FailoverTransport)
            {
                FailoverTransport oo = o as FailoverTransport;

                return this.id - oo.id;
            }
            else
            {
                throw new ArgumentException();
            }
        }

        public override String ToString()
        {
            return ConnectedTransportURI == null ? "unconnected" : ConnectedTransportURI.ToString();
        }

	    internal bool IsPriority(Uri uri) 
		{
			if (priorityBackup)
			{
		        if (priorityList.Count > 0) 
				{
		            return priorityList.Contains(uri);
		        }

				if (this.uris.Count > 0) 
				{
		        	return uris[0].Equals(uri);
				}
			}
			return false;
	    }

		public void DisposeTransport(ITransport transport) 
		{
			transport.Command = DisposedOnCommand;
			transport.Exception = DisposedOnException;

			try 
			{
	            transport.Stop();
        	}
			catch (Exception e) 
			{
				Tracer.DebugFormat("Could not stop transport: {0]. Reason: {1}", transport, e.Message);
        	}
    	}

	    private void ResetReconnectDelay() 
		{
	        if (!useExponentialBackOff || reconnectDelay == DEFAULT_INITIAL_RECONNECT_DELAY) 
			{
	            reconnectDelay = initialReconnectDelay;
	        }
	    }

	    private void DoDelay()
		{
	        if (reconnectDelay > 0) 
			{
	            lock(sleepMutex) 
				{
	                if (Tracer.IsDebugEnabled) 
					{
						Tracer.DebugFormat("Waiting {0} ms before attempting connection", reconnectDelay);
	                }
	                try 
					{
						Monitor.Wait(sleepMutex, reconnectDelay);
	                } 
					catch (ThreadInterruptedException) 
					{
	                }
	            }
	        }

	        if (useExponentialBackOff) 
			{
	            // Exponential increment of reconnect delay.
	            reconnectDelay *= backOffMultiplier;
	            if (reconnectDelay > maxReconnectDelay) 
				{
	                reconnectDelay = maxReconnectDelay;
	            }
	        }
	    }

	    private void PropagateFailureToExceptionListener(Exception exception) 
		{
	        if (Exception != null) 
			{
                Exception(this, exception);
	        }
			else
			{
				Exception(this, new IOException());
			}
			Monitor.PulseAll(reconnectMutex);
	    }

	    private int CalculateReconnectAttemptLimit() 
		{
	        int maxReconnectValue = this.maxReconnectAttempts;
	        if (firstConnection && this.startupMaxReconnectAttempts != INFINITE) 
			{
	            maxReconnectValue = this.startupMaxReconnectAttempts;
	        }
	        return maxReconnectValue;
	    }

		public void Shuffle<T>(List<T> list)  
		{  
            Random random = new Random(DateTime.Now.Millisecond);
		    int index = list.Count;  
		    while (index > 1) 
			{  
		        index--;  
		        int k = random.Next(index + 1);  
		        T value = list[k];  
		        list[k] = list[index];  
		        list[index] = value;  
		    }  
		}

		private String PrintableUriList(List<Uri> uriList)
		{
			if (uriList.Count == 0)
			{
				return "";
			}

			StringBuilder builder = new StringBuilder();
			for (int i = 0; i < uriList.Count; ++i)
			{
				builder.Append(uriList[i]);
				if (i < (uriList.Count - 1))
				{
					builder.Append(",");
				}
			}

			return builder.ToString();
		}

		private bool CompareUris(Uri first, Uri second) 
		{
			bool result = false;
            if (first.Port == second.Port)
			{
                IPHostEntry firstAddr = null;
                IPHostEntry secondAddr = null;
                try 
				{
            		firstAddr = Dns.GetHostEntry(first.Host);
            		secondAddr = Dns.GetHostEntry(second.Host);

	                if (firstAddr.Equals(secondAddr)) 
					{
						result = true;
	                }
				} 
				catch(Exception e)
				{
                    if (firstAddr == null) 
					{
						Tracer.WarnFormat("Failed to Lookup IPHostEntry for URI[{0}] : {1}", first, e);
                    } 
					else 
					{
						Tracer.WarnFormat("Failed to Lookup IPHostEntry for URI[{0}] : {1}", second, e);
                    }

					if(String.Equals(first.Host, second.Host, StringComparison.CurrentCultureIgnoreCase))
					{
						result = true;
                    }
                }

            }

			return result;
		}

	    private bool Contains(Uri newURI) 
		{
	        bool result = false;
	        foreach (Uri uri in uris) 
			{
	            if (CompareUris(newURI, uri))
				{
					result = true;
					break;
	            }
	        }

	        return result;
	    }
    }
}
