Fix for: https://issues.apache.org/jira/browse/AMQNET-344
diff --git a/src/main/csharp/Transport/Failover/FailoverTransport.cs b/src/main/csharp/Transport/Failover/FailoverTransport.cs
index ee21769..7e96035 100644
--- a/src/main/csharp/Transport/Failover/FailoverTransport.cs
+++ b/src/main/csharp/Transport/Failover/FailoverTransport.cs
@@ -1,1385 +1,1384 @@
-/*

- * Licensed to the Apache Software Foundation (ASF) under one or more

- * contributor license agreements.  See the NOTICE file distributed with

- * this work for additional information regarding copyright ownership.

- * The ASF licenses this file to You under the Apache License, Version 2.0

- * (the "License"); you may not use this file except in compliance with

- * the License.  You may obtain a copy of the License at

- *

- *      http://www.apache.org/licenses/LICENSE-2.0

- *

- * Unless required by applicable law or agreed to in writing, software

- * distributed under the License is distributed on an "AS IS" BASIS,

- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.

- * See the License for the specific language governing permissions and

- * limitations under the License.

- */

-

-using System;

-using System.Collections;

-using System.Collections.Generic;

-using System.Threading;

-using Apache.NMS.ActiveMQ.Commands;

-using Apache.NMS.ActiveMQ.State;

-using Apache.NMS.ActiveMQ.Threads;

-using Apache.NMS.Util;

-

-namespace Apache.NMS.ActiveMQ.Transport.Failover

-{

-	/// <summary>

-	/// A Transport that is made reliable by being able to fail over to another

-	/// transport when a transport failure is detected.

-	/// </summary>

-	public class FailoverTransport : ICompositeTransport, IComparable

-	{

-		private static int idCounter = 0;

-		private readonly int id;

-

-		private bool disposed;

-		private bool connected;

-		private readonly List<Uri> uris = new List<Uri>();

-		private readonly List<Uri> updated = new List<Uri>();

-		private CommandHandler commandHandler;

-		private ExceptionHandler exceptionHandler;

-		private InterruptedHandler interruptedHandler;

-		private ResumedHandler resumedHandler;

-

-		private readonly Mutex reconnectMutex = new Mutex();

-		private readonly Mutex backupMutex = new Mutex();

-		private readonly Mutex sleepMutex = new Mutex();

-		private readonly ConnectionStateTracker stateTracker = new ConnectionStateTracker();

-		private readonly Dictionary<int, Command> requestMap = new Dictionary<int, Command>();

-

-		private Uri connectedTransportURI;

-		private Uri failedConnectTransportURI;

-		private readonly AtomicReference<ITransport> connectedTransport = new AtomicReference<ITransport>(null);

-		private TaskRunner reconnectTask = null;

-		private bool started;

-

-		private int timeout = -1;

-		private int initialReconnectDelay = 10;

-		private int maxReconnectDelay = 1000 * 30;

-		private int backOffMultiplier = 2;

-		private bool useExponentialBackOff = true;

-		private bool randomize = true;

-		private bool initialized;

-        private int maxReconnectAttempts;

-        private int startupMaxReconnectAttempts;

-		private int connectFailures;

-		private int reconnectDelay = 10;

-		private int asyncTimeout = 45000;

-		private bool asyncConnect = false;

-		private Exception connectionFailure;

-		private bool firstConnection = true;

-		private bool backup = false;

-		private readonly List<BackupTransport> backups = new List<BackupTransport>();

-		private int backupPoolSize = 1;

-		private bool trackMessages = false;

-		private int maxCacheSize = 256;

-		private volatile Exception failure;

-		private readonly object mutex = new object();

-		private bool reconnectSupported = true;

-		private bool updateURIsSupported = true;

-

-		public FailoverTransport()

-		{

-			id = idCounter++;

-

-			stateTracker.TrackTransactions = true;

-		}

-

-		~FailoverTransport()

-		{

-			Dispose(false);

-		}

-

-		#region FailoverTask

-

-		private class FailoverTask : Task

-		{

-			private readonly FailoverTransport parent;

-

-			public FailoverTask(FailoverTransport p)

-			{

-				parent = p;

-			}

-

-			public bool Iterate()

-			{

-				bool result = false;

-				bool buildBackup = true;

-				bool doReconnect = !parent.disposed && parent.connectionFailure == null;

-				try

-				{

-					parent.backupMutex.WaitOne();

-					if(parent.ConnectedTransport == null && doReconnect)

-					{

-						result = parent.DoConnect();

-						buildBackup = false;

-					}

-				}

-				finally

-				{

-					parent.backupMutex.ReleaseMutex();

-				}

-

-				if(buildBackup)

-				{

-					parent.BuildBackups();

-				}

-				else

-				{

-					//build backups on the next iteration

-					result = true;

-					try

-					{

-						parent.reconnectTask.Wakeup();

-					}

-					catch(ThreadInterruptedException)

-					{

-						Tracer.Debug("Reconnect task has been interrupted.");

-					}

-				}

-				return result;

-			}

-		}

-

-		#endregion

-

-		#region Property Accessors

-

-		public CommandHandler Command

-		{

-			get { return commandHandler; }

-			set { commandHandler = value; }

-		}

-

-		public ExceptionHandler Exception

-		{

-			get { return exceptionHandler; }

-			set { exceptionHandler = value; }

-		}

-

-		public InterruptedHandler Interrupted

-		{

-			get { return interruptedHandler; }

-			set { this.interruptedHandler = value; }

-		}

-

-		public ResumedHandler Resumed

-		{

-			get { return resumedHandler; }

-			set { this.resumedHandler = value; }

-		}

-

-		internal Exception Failure

-		{

-			get { return failure; }

-			set

-			{

-				lock(mutex)

-				{

-					failure = value;

-				}

-			}

-		}

-

-		public int Timeout

-		{

-			get { return this.timeout; }

-			set { this.timeout = value; }

-		}

-

-		public int InitialReconnectDelay

-		{

-			get { return initialReconnectDelay; }

-			set { initialReconnectDelay = value; }

-		}

-

-		public int MaxReconnectDelay

-		{

-			get { return maxReconnectDelay; }

-			set { maxReconnectDelay = value; }

-		}

-

-		public int ReconnectDelay

-		{

-			get { return reconnectDelay; }

-			set { reconnectDelay = value; }

-		}

-

-		public int ReconnectDelayExponent

-		{

-			get { return backOffMultiplier; }

-			set { backOffMultiplier = value; }

-		}

-

-		public ITransport ConnectedTransport

-		{

-			get { return connectedTransport.Value; }

-			set { connectedTransport.Value = value; }

-		}

-

-		public Uri ConnectedTransportURI

-		{

-			get { return connectedTransportURI; }

-			set { connectedTransportURI = value; }

-		}

-

-		public int MaxReconnectAttempts

-		{

-			get { return maxReconnectAttempts; }

-			set { maxReconnectAttempts = value; }

-		}

-

-        public int StartupMaxReconnectAttempts

-        {

-            get { return startupMaxReconnectAttempts; }

-            set { startupMaxReconnectAttempts = value; }

-        }

-

-		public bool Randomize

-		{

-			get { return randomize; }

-			set { randomize = value; }

-		}

-

-		public bool Backup

-		{

-			get { return backup; }

-			set { backup = value; }

-		}

-

-		public int BackupPoolSize

-		{

-			get { return backupPoolSize; }

-			set { backupPoolSize = value; }

-		}

-

-		public bool TrackMessages

-		{

-			get { return trackMessages; }

-			set { trackMessages = value; }

-		}

-

-		public int MaxCacheSize

-		{

-			get { return maxCacheSize; }

-			set { maxCacheSize = value; }

-		}

-

-		public bool UseExponentialBackOff

-		{

-			get { return useExponentialBackOff; }

-			set { useExponentialBackOff = value; }

-		}

-

-        public IWireFormat WireFormat

-        {

-            get

-            {

-                ITransport transport = ConnectedTransport;

-                if(transport != null)

-                {

-                    return transport.WireFormat;

-                }

-

-                return null;

-            }

-        }

-

-		/// <summary>

-		/// Gets or sets a value indicating whether to asynchronously connect to sockets

-		/// </summary>

-		/// <value><c>true</c> if [async connect]; otherwise, <c>false</c>.</value>

-		public bool AsyncConnect

-		{

-			set { asyncConnect = value; }

-		}

-

-		/// <summary>

-		/// If doing an asynchronous connect, the milliseconds before timing out if no connection can be made

-		/// </summary>

-		/// <value>The async timeout.</value>

-		public int AsyncTimeout

-		{

-			get { return asyncTimeout; }

-			set { asyncTimeout = value; }

-		}

-

-        public ConnectionStateTracker StateTracker

-        {

-            get { return this.stateTracker; }

-        }

-

-		#endregion

-

-		public bool IsFaultTolerant

-		{

-			get { return true; }

-		}

-

-		public bool IsDisposed

-		{

-			get { return disposed; }

-		}

-

-		public bool IsConnected

-		{

-			get { return connected; }

-		}

-

-		public bool IsStarted

-		{

-			get { return started; }

-		}

-

-		public bool IsReconnectSupported

-		{

-			get { return this.reconnectSupported; }

-		}

-

-		public bool IsUpdateURIsSupported

-		{

-			get { return this.updateURIsSupported; }

-		}

-

-		public void OnException(ITransport sender, Exception error)

-		{

-			try

-			{

-				HandleTransportFailure(error);

-			}

-			catch(Exception e)

-			{

-				e.GetType();

-				// What to do here?

-			}

-		}

-

-		public void disposedOnCommand(ITransport sender, Command c)

-		{

-		}

-

-		public void disposedOnException(ITransport sender, Exception e)

-		{

-		}

-

-		public void HandleTransportFailure(Exception e)

-		{

-			ITransport transport = connectedTransport.GetAndSet(null);

-			if(transport != null)

-			{

-				transport.Command = disposedOnCommand;

-				transport.Exception = disposedOnException;

-				try

-				{

-					transport.Stop();

-				}

-				catch(Exception ex)

-				{

-					ex.GetType();	// Ignore errors but this lets us see the error during debugging

-				}

-

-				lock(reconnectMutex)

-				{

-					bool reconnectOk = false;

-					if(started)

-					{

-						Tracer.WarnFormat("Transport failed to {0}, attempting to automatically reconnect due to: {1}", ConnectedTransportURI.ToString(), e.Message);

-						reconnectOk = true;

-					}

-

-					initialized = false;

-					failedConnectTransportURI = ConnectedTransportURI;

-					ConnectedTransportURI = null;

-					connected = false;

-

-					if(this.Interrupted != null)

-					{

-						this.Interrupted(transport);

-					}

-

-					if(reconnectOk)

-					{

-						reconnectTask.Wakeup();

-					}

-				}

-			}

-		}

-

-		public void Start()

-		{

-			lock(reconnectMutex)

-			{

-				if(started)

-				{

-					Tracer.Debug("FailoverTransport Already Started.");

-					return;

-				}

-

-				Tracer.Debug("FailoverTransport Started.");

-				started = true;

-				stateTracker.MaxCacheSize = MaxCacheSize;

-				stateTracker.TrackMessages = TrackMessages;

-				if(ConnectedTransport != null)

-				{

-					stateTracker.DoRestore(ConnectedTransport);

-				}

-				else

-				{

-					Reconnect(false);

-				}

-			}

-		}

-

-		public virtual void Stop()

-		{

-			ITransport transportToStop = null;

-

-			lock(reconnectMutex)

-			{

-				if(!started)

-				{

-					Tracer.Debug("FailoverTransport Already Stopped.");

-					return;

-				}

-

-				Tracer.Debug("FailoverTransport Stopped.");

-				started = false;

-				disposed = true;

-				connected = false;

-				foreach(BackupTransport t in backups)

-				{

-					t.Disposed = true;

-				}

-				backups.Clear();

-

-				if(ConnectedTransport != null)

-				{

-					transportToStop = connectedTransport.GetAndSet(null);

-				}

-			}

-

-			try

-			{

-				sleepMutex.WaitOne();

-			}

-			finally

-			{

-				sleepMutex.ReleaseMutex();

-			}

-

-			if(reconnectTask != null)

-			{

-				reconnectTask.Shutdown();

-			}

-

-			if(transportToStop != null)

-			{

-				transportToStop.Stop();

-			}

-		}

-

-		public FutureResponse AsyncRequest(Command command)

-		{

-			throw new ApplicationException("FailoverTransport does not implement AsyncRequest(Command)");

-		}

-

-		public Response Request(Command command)

-		{

-			throw new ApplicationException("FailoverTransport does not implement Request(Command)");

-		}

-

-		public Response Request(Command command, TimeSpan ts)

-		{

-			throw new ApplicationException("FailoverTransport does not implement Request(Command, TimeSpan)");

-		}

-

-		public void OnCommand(ITransport sender, Command command)

-		{

-			if(command != null)

-			{

-				if(command.IsResponse)

-				{

-					Object oo = null;

-					lock(((ICollection) requestMap).SyncRoot)

-					{

-						int v = ((Response) command).CorrelationId;

-						try

-						{

-							if(requestMap.ContainsKey(v))

-							{

-								oo = requestMap[v];

-								requestMap.Remove(v);

-							}

-						}

-						catch

-						{

-						}

-					}

-

-					Tracked t = oo as Tracked;

-					if(t != null)

-					{

-						t.onResponses();

-					}

-				}

-

-				if(!initialized)

-				{

-					initialized = true;

-				}

-

-				if(command.IsConnectionControl)

-				{

-					this.HandleConnectionControl(command as ConnectionControl);

-				}

-			}

-

-			this.Command(sender, command);

-		}

-

-		public void Oneway(Command command)

-		{

-			Exception error = null;

-

-			lock(reconnectMutex)

-			{

-				if(command != null && ConnectedTransport == null)

-				{

-					if(command.IsShutdownInfo)

-					{

-						// Skipping send of ShutdownInfo command when not connected.

-						return;

-					}

-

-					if(command.IsRemoveInfo || command.IsMessageAck)

-					{

-						// Simulate response to RemoveInfo command or a MessageAck

-                        // since it would be stale at this point.

-                        if(command.ResponseRequired)

-                        {

-						    OnCommand(this, new Response() { CorrelationId = command.CommandId });

-                        }

-						return;

-					}

-				}

-

-				// Keep trying until the message is sent.

-				for(int i = 0; !disposed; i++)

-				{

-					try

-					{

-                        // Any Ack that was being sent when the connection dropped is now

-                        // stale so we don't send it here as it would cause an unmatched ack

-                        // on the broker side and probably prevent a consumer from getting

-                        // any new messages.

-                        if(command.IsMessageAck && i > 0)

-                        {

-                            Tracer.Debug("Inflight MessageAck being dropped as stale.");

-                            if(command.ResponseRequired)

-                            {

-                                OnCommand(this, new Response() { CorrelationId = command.CommandId });

-                            }

-                            return;

-                        }

-

-						// Wait for transport to be connected.

-						ITransport transport = ConnectedTransport;

-						DateTime start = DateTime.Now;

-						bool timedout = false;

-						while(transport == null && !disposed && connectionFailure == null)

-						{

-							Tracer.Info("Waiting for transport to reconnect.");

-

-							int elapsed = (int) (DateTime.Now - start).TotalMilliseconds;

-							if(this.timeout > 0 && elapsed > timeout)

-							{

-								timedout = true;

-								Tracer.DebugFormat("FailoverTransport.oneway - timed out after {0} mills", elapsed);

-								break;

-							}

-

-							// Release so that the reconnect task can run

-							try

-							{

-								// Wait for something

-								Monitor.Wait(reconnectMutex, 1000);

-							}

-							catch(ThreadInterruptedException e)

-							{

-								Tracer.DebugFormat("Interrupted: {0}", e.Message);

-							}

-

-							transport = ConnectedTransport;

-						}

-

-						if(transport == null)

-						{

-							// Previous loop may have exited due to use being disposed.

-							if(disposed)

-							{

-								error = new IOException("Transport disposed.");

-							}

-							else if(connectionFailure != null)

-							{

-								error = connectionFailure;

-							}

-							else if(timedout)

-							{

-								error = new IOException("Failover oneway timed out after " + timeout + " milliseconds.");

-							}

-							else

-							{

-								error = new IOException("Unexpected failure.");

-							}

-							break;

-						}

-

-						// If it was a request and it was not being tracked by

-						// the state tracker, then hold it in the requestMap so

-						// that we can replay it later.

-						Tracked tracked = stateTracker.Track(command);

-						lock(((ICollection) requestMap).SyncRoot)

-						{

-							if(tracked != null && tracked.WaitingForResponse)

-							{

-								requestMap.Add(command.CommandId, tracked);

-							}

-							else if(tracked == null && command.ResponseRequired)

-							{

-								requestMap.Add(command.CommandId, command);

-							}

-						}

-

-						// Send the message.

-						try

-						{

-							transport.Oneway(command);

-							stateTracker.TrackBack(command);

-						}

-						catch(Exception e)

-						{

-							// If the command was not tracked.. we will retry in

-							// this method

-							if(tracked == null)

-							{

-

-								// since we will retry in this method.. take it

-								// out of the request map so that it is not

-								// sent 2 times on recovery

-								if(command.ResponseRequired)

-								{

-									lock(((ICollection) requestMap).SyncRoot)

-									{

-										requestMap.Remove(command.CommandId);

-									}

-								}

-

-								// Rethrow the exception so it will handled by

-								// the outer catch

-								throw e;

-							}

-

-						}

-

-						return;

-

-					}

-					catch(Exception e)

-					{

-						Tracer.DebugFormat("Send Oneway attempt: {0} failed: Message = {1}", i, e.Message);

-						Tracer.DebugFormat("Failed Message Was: {0}", command);

-						HandleTransportFailure(e);

-					}

-				}

-			}

-

-			if(!disposed)

-			{

-				if(error != null)

-				{

-					throw error;

-				}

-			}

-		}

-

-		public void Add(bool rebalance, Uri[] u)

-		{

-			lock(uris)

-			{

-				for(int i = 0; i < u.Length; i++)

-				{

-					if(!uris.Contains(u[i]))

-					{

-						uris.Add(u[i]);

-					}

-				}

-			}

-

-			Reconnect(rebalance);

-		}

-

-		public void Add(bool rebalance, String u)

-		{

-			try

-			{

-				Add(rebalance, new Uri[] { new Uri(u) });

-			}

-			catch(Exception e)

-			{

-				Tracer.ErrorFormat("Failed to parse URI '{0}': {1}", u, e.Message);

-			}

-		}

-

-		public void Remove(bool rebalance, Uri[] u)

-		{

-			lock(uris)

-			{

-				for(int i = 0; i < u.Length; i++)

-				{

-					uris.Remove(u[i]);

-				}

-			}

-

-			Reconnect(rebalance);

-		}

-

-		public void Remove(bool rebalance, String u)

-		{

-			try

-			{

-				Remove(rebalance, new Uri[] { new Uri(u) });

-			}

-			catch(Exception e)

-			{

-				Tracer.ErrorFormat("Failed to parse URI '{0}': {1}", u, e.Message);

-			}

-		}

-

-		public void Reconnect(Uri uri)

-		{

-			Add(true, new Uri[] { uri });

-		}

-

-		public void Reconnect(bool rebalance)

-		{

-			lock(reconnectMutex)

-			{

-				if(started)

-				{

-					if(reconnectTask == null)

-					{

-						Tracer.Debug("Creating reconnect task");

-						reconnectTask = DefaultThreadPools.DefaultTaskRunnerFactory.CreateTaskRunner(new FailoverTask(this),

-											"ActiveMQ Failover Worker: " + this.GetHashCode().ToString());

-					}

-

-					if(rebalance)

-					{

-						ITransport transport = connectedTransport.GetAndSet(null);

-						if(transport != null)

-						{

-							transport.Command = disposedOnCommand;

-							transport.Exception = disposedOnException;

-							try

-							{

-								transport.Stop();

-							}

-							catch(Exception ex)

-							{

-								ex.GetType();   // Ignore errors but this lets us see the error during debugging

-							}

-						}

-					}

-

-					Tracer.Debug("Waking up reconnect task");

-					try

-					{

-						reconnectTask.Wakeup();

-					}

-					catch(ThreadInterruptedException)

-					{

-					}

-				}

-				else

-				{

-					Tracer.Debug("Reconnect was triggered but transport is not started yet. Wait for start to connect the transport.");

-				}

-			}

-		}

-

-		private List<Uri> ConnectList

-		{

-			get

-			{

-				List<Uri> l = new List<Uri>(uris);

-				bool removed = false;

-				if(failedConnectTransportURI != null)

-				{

-					removed = l.Remove(failedConnectTransportURI);

-				}

-

-				if(Randomize)

-				{

-					// Randomly, reorder the list by random swapping

-					Random r = new Random(DateTime.Now.Millisecond);

-					for(int i = 0; i < l.Count; i++)

-					{

-						int p = r.Next(l.Count);

-						Uri t = l[p];

-						l[p] = l[i];

-						l[i] = t;

-					}

-				}

-

-				if(removed)

-				{

-					l.Add(failedConnectTransportURI);

-				}

-

-				return l;

-			}

-		}

-

-		protected void RestoreTransport(ITransport t)

-		{

-			Tracer.Info("Restoring previous transport connection.");

-			t.Start();

-

-			// Send information to the broker - informing it we are a fault tolerant client

-			t.Oneway(new ConnectionControl() { FaultTolerant = true });

-			stateTracker.DoRestore(t);

-

-			Tracer.Info("Sending queued commands...");

-			Dictionary<int, Command> tmpMap = null;

-			lock(((ICollection) requestMap).SyncRoot)

-			{

-				tmpMap = new Dictionary<int, Command>(requestMap);

-			}

-

-			foreach(Command command in tmpMap.Values)

-			{

-                if(command.IsMessageAck)

-                {

-                    Tracer.Debug("Stored MessageAck being dropped as stale.");

-                    OnCommand(this, new Response() { CorrelationId = command.CommandId });

-                    continue;

-                }

-

-                t.Oneway(command);

-			}

-		}

-

-		public Uri RemoteAddress

-		{

-			get

-			{

-				if(ConnectedTransport != null)

-				{

-					return ConnectedTransport.RemoteAddress;

-				}

-				return null;

-			}

-		}

-

-		public Object Narrow(Type type)

-		{

-			if(this.GetType().Equals(type))

-			{

-				return this;

-			}

-			else if(ConnectedTransport != null)

-			{

-				return ConnectedTransport.Narrow(type);

-			}

-

-			return null;

-		}

-

-		private bool DoConnect()

-		{

-			lock(reconnectMutex)

-			{

-				if(ConnectedTransport != null || disposed || connectionFailure != null)

-				{

-					return false;

-				}

-				else

-				{

-					List<Uri> connectList = ConnectList;

-					if(connectList.Count == 0)

-					{

-						Failure = new NMSConnectionException("No URIs available for connection.");

-					}

-					else

-					{

-						if(!UseExponentialBackOff)

-						{

-							ReconnectDelay = InitialReconnectDelay;

-						}

-

-						try

-						{

-							backupMutex.WaitOne();

-							if(Backup && backups.Count != 0)

-							{

-								BackupTransport bt = backups[0];

-								backups.RemoveAt(0);

-								ITransport t = bt.Transport;

-								Uri uri = bt.Uri;

-								t.Command = OnCommand;

-								t.Exception = OnException;

-								try

-								{

-									if(started)

-									{

-										RestoreTransport(t);

-									}

-									ReconnectDelay = InitialReconnectDelay;

-									failedConnectTransportURI = null;

-									ConnectedTransportURI = uri;

-									ConnectedTransport = t;

-									connectFailures = 0;

-									connected = true;

-									if(this.Resumed != null)

-									{

-										this.Resumed(t);

-									}

-									Tracer.InfoFormat("Successfully reconnected to backup {0}", uri.ToString());

-									return false;

-								}

-								catch(Exception e)

-								{

-									Tracer.DebugFormat("Backup transport failed: {0}", e.Message);

-								}

-							}

-						}

-						finally

-						{

-							backupMutex.ReleaseMutex();

-						}

-

-						ManualResetEvent allDone = new ManualResetEvent(false);

-						ITransport transport = null;

-						Uri chosenUri = null;

-						object syncLock = new object();

-

-						try

-						{

-							foreach(Uri uri in connectList)

-							{

-								if(ConnectedTransport != null || disposed)

-								{

-									break;

-								}

-

-								if(asyncConnect)

-								{

-									Tracer.DebugFormat("Attempting async connect to: {0}", uri);

-									// set connector up

-									Connector connector = new Connector(

-										delegate(ITransport transportToUse, Uri uriToUse)

-										{

-											if(transport == null)

-											{

-												lock(syncLock)

-												{

-													if(transport == null)

-													{

-														//the transport has not yet been set asynchronously so set it

-														transport = transportToUse;

-														chosenUri = uriToUse;

-													}

-													//notify issuing thread to move on

-													allDone.Set();

-												}

-											}

-										}, uri, this);

-

-									// initiate a thread to try connecting to broker

-									Thread thread = new Thread(connector.DoConnect) {Name = uri.ToString()};

-								    thread.Start();

-								}

-								else

-								{

-									// synchronous connect

-									try

-									{

-										Tracer.DebugFormat("Attempting sync connect to: {0}", uri);

-										transport = TransportFactory.CompositeConnect(uri);

-										chosenUri = transport.RemoteAddress;

-										break;

-									}

-									catch(Exception e)

-									{

-										Failure = e;

-										Tracer.DebugFormat("Connect fail to: {0}, reason: {1}", uri, e.Message);

-									}

-								}

-							}

-

-							if(asyncConnect)

-							{

-								// now wait for transport to be populated, but timeout eventually

-								allDone.WaitOne(asyncTimeout, false);

-							}

-

-							if(transport != null)

-							{

-								transport.Command = OnCommand;

-								transport.Exception = OnException;

-								transport.Start();

-

-								if(started)

-								{

-									RestoreTransport(transport);

-								}

-

-								if(this.Resumed != null)

-								{

-									this.Resumed(transport);

-								}

-

-								Tracer.Debug("Connection established");

-								ReconnectDelay = InitialReconnectDelay;

-								ConnectedTransportURI = chosenUri;

-								ConnectedTransport = transport;

-								connectFailures = 0;

-								connected = true;

-

-								if(firstConnection)

-								{

-									firstConnection = false;

-									Tracer.InfoFormat("Successfully connected to: {0}", chosenUri.ToString());

-								}

-								else

-								{

-									Tracer.InfoFormat("Successfully reconnected to: {0}", chosenUri.ToString());

-								}

-

-								return false;

-							}

-

-							if(asyncConnect)

-							{

-								Tracer.DebugFormat("Connect failed after waiting for asynchronous callback.");

-							}

-

-						}

-						catch(Exception e)

-						{

-							Failure = e;

-							Tracer.DebugFormat("Connect attempt failed.  Reason: {0}", e.Message);

-						}

-					}

-

-                    int reconnectAttempts = 0;

-                    if( firstConnection ) {

-                        if( StartupMaxReconnectAttempts != 0 ) {

-                            reconnectAttempts = StartupMaxReconnectAttempts;

-                        }

-                    }

-                    if( reconnectAttempts == 0 ) {

-                        reconnectAttempts = MaxReconnectAttempts;

-                    }

-        

-					if(reconnectAttempts > 0 && ++connectFailures >= reconnectAttempts)

-					{

-						Tracer.ErrorFormat("Failed to connect to transport after {0} attempt(s)", connectFailures);

-						connectionFailure = Failure;

-						this.Exception(this, connectionFailure);

-						return false;

-					}

-				}

-			}

-

-			if(!disposed)

-			{

-				Tracer.DebugFormat("Waiting {0}ms before attempting connection.", ReconnectDelay);

-				lock(sleepMutex)

-				{

-					try

-					{

-						Thread.Sleep(ReconnectDelay);

-					}

-					catch(ThreadInterruptedException)

-					{

-					}

-				}

-

-				if(UseExponentialBackOff)

-				{

-					// Exponential increment of reconnect delay.

-					ReconnectDelay *= ReconnectDelayExponent;

-					if(ReconnectDelay > MaxReconnectDelay)

-					{

-						ReconnectDelay = MaxReconnectDelay;

-					}

-				}

-			}

-			return !disposed;

-		}

-

-		/// <summary>

-		/// This class is a helper for the asynchronous connect option

-		/// </summary>

-		public class Connector

-		{

-			/// <summary>

-			/// callback to properly set chosen transport

-			/// </summary>

-			readonly SetTransport _setTransport;

-

-			/// <summary>

-			/// Uri to try connecting to

-			/// </summary>

-			readonly Uri _uri;

-

-			/// <summary>

-			/// Failover transport issuing the connection attempt

-			/// </summary>

-			private readonly FailoverTransport _transport;

-

-			/// <summary>

-			/// Initializes a new instance of the <see cref="Connector"/> class.

-			/// </summary>

-			/// <param name="setTransport">The set transport.</param>

-			/// <param name="uri">The URI.</param>

-			/// <param name="transport">The transport.</param>

-			public Connector(SetTransport setTransport, Uri uri, FailoverTransport transport)

-			{

-				_uri = uri;

-				_setTransport = setTransport;

-				_transport = transport;

-			}

-

-			/// <summary>

-			/// Does the connect.

-			/// </summary>

-			public void DoConnect()

-			{

-				try

-				{

-					TransportFactory.AsyncCompositeConnect(_uri, _setTransport);

-				}

-				catch(Exception e)

-				{

-					_transport.Failure = e;

-					Tracer.DebugFormat("Connect fail to: {0}, reason: {1}", _uri, e.Message);

-				}

-

-			}

-		}

-

-		private bool BuildBackups()

-		{

-			lock(backupMutex)

-			{

-				if(!disposed && Backup && backups.Count < BackupPoolSize)

-				{

-					List<Uri> connectList = ConnectList;

-					foreach(BackupTransport bt in backups)

-					{

-						if(bt.Disposed)

-						{

-							backups.Remove(bt);

-						}

-					}

-

-					foreach(Uri uri in connectList)

-					{

-						if(ConnectedTransportURI != null && !ConnectedTransportURI.Equals(uri))

-						{

-							try

-							{

-								BackupTransport bt = new BackupTransport(this)

-								{

-									Uri = uri

-								};

-

-								if(!backups.Contains(bt))

-								{

-									ITransport t = TransportFactory.CompositeConnect(uri);

-									t.Command = bt.OnCommand;

-									t.Exception = bt.OnException;

-									t.Start();

-									bt.Transport = t;

-									backups.Add(bt);

-								}

-							}

-							catch(Exception e)

-							{

-								Tracer.DebugFormat("Failed to build backup: {0}", e.Message);

-							}

-						}

-

-						if(backups.Count == BackupPoolSize)

-						{

-							break;

-						}

-					}

-				}

-			}

-

-			return false;

-		}

-

-		public void ConnectionInterruptProcessingComplete(ConnectionId connectionId)

-		{

-			lock(reconnectMutex)

-			{

-				Tracer.Debug("Connection Interrupt Processing is complete for ConnectionId: " + connectionId);

-				stateTracker.ConnectionInterruptProcessingComplete(this, connectionId);

-			}

-		}

-

-		public void UpdateURIs(bool rebalance, Uri[] updatedURIs)

-		{

-			if(IsUpdateURIsSupported)

-			{

-				List<Uri> copy = new List<Uri>(this.updated);

-				List<Uri> added = new List<Uri>();

-

-				if(updatedURIs != null && updatedURIs.Length > 0)

-				{

-					Dictionary<Uri, bool> uriSet = new Dictionary<Uri, bool>();

-					for(int i = 0; i < updatedURIs.Length; i++)

-					{

-						Uri uri = updatedURIs[i];

-						if(uri != null)

-						{

-							uriSet[uri] = true;

-						}

-					}

-

-					foreach(Uri uri in uriSet.Keys)

-					{

-						if(copy.Remove(uri) == false)

-						{

-							added.Add(uri);

-						}

-					}

-

-					lock(reconnectMutex)

-					{

-						this.updated.Clear();

-						this.updated.AddRange(added);

-

-						foreach(Uri uri in copy)

-						{

-							this.uris.Remove(uri);

-						}

-

-						this.Add(rebalance, added.ToArray());

-					}

-				}

-			}

-		}

-

-		public void HandleConnectionControl(ConnectionControl control)

-		{

-			string reconnectStr = control.ReconnectTo;

-

-			if(reconnectStr != null)

-			{

-				reconnectStr = reconnectStr.Trim();

-				if(reconnectStr.Length > 0)

-				{

-					try

-					{

-						Uri uri = new Uri(reconnectStr);

-						if(IsReconnectSupported)

-						{

-							Reconnect(uri);

-							Tracer.Info("Reconnected to: " + uri.OriginalString);

-						}

-					}

-					catch(Exception e)

-					{

-						Tracer.ErrorFormat("Failed to handle ConnectionControl reconnect to {0}: {1}", reconnectStr, e);

-					}

-				}

-			}

-

-			ProcessNewTransports(control.RebalanceConnection, control.ConnectedBrokers);

-		}

-

-		private void ProcessNewTransports(bool rebalance, String newTransports)

-		{

-			if(newTransports != null)

-			{

-				newTransports = newTransports.Trim();

-

-				if(newTransports.Length > 0 && IsUpdateURIsSupported)

-				{

-					List<Uri> list = new List<Uri>();

-					String[] tokens = newTransports.Split(new Char[] { ',' });

-

-					foreach(String str in tokens)

-					{

-						try

-						{

-							Uri uri = new Uri(str);

-							list.Add(uri);

-						}

-						catch

-						{

-							Tracer.Error("Failed to parse broker address: " + str);

-						}

-					}

-

-					if(list.Count != 0)

-					{

-						try

-						{

-							UpdateURIs(rebalance, list.ToArray());

-						}

-						catch

-						{

-							Tracer.Error("Failed to update transport URI's from: " + newTransports);

-						}

-					}

-				}

-			}

-		}

-

-		public void Dispose()

-		{

-			Dispose(true);

-			GC.SuppressFinalize(this);

-		}

-

-		public void Dispose(bool disposing)

-		{

-			if(disposing)

-			{

-				// get rid of unmanaged stuff

-			}

-

-            this.Stop();

-

-			disposed = true;

-		}

-

-		public int CompareTo(Object o)

-		{

-			if(o is FailoverTransport)

-			{

-				FailoverTransport oo = o as FailoverTransport;

-

-				return this.id - oo.id;

-			}

-			else

-			{

-				throw new ArgumentException();

-			}

-		}

-

-		public override String ToString()

-		{

-			return ConnectedTransportURI == null ? "unconnected" : ConnectedTransportURI.ToString();

-		}

-	}

-}

+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+using System;
+using System.Collections;
+using System.Collections.Generic;
+using System.Threading;
+using Apache.NMS.ActiveMQ.Commands;
+using Apache.NMS.ActiveMQ.State;
+using Apache.NMS.ActiveMQ.Threads;
+using Apache.NMS.Util;
+
+namespace Apache.NMS.ActiveMQ.Transport.Failover
+{
+	/// <summary>
+	/// A Transport that is made reliable by being able to fail over to another
+	/// transport when a transport failure is detected.
+	/// </summary>
+	public class FailoverTransport : ICompositeTransport, IComparable
+	{
+		private static int idCounter = 0;
+		private readonly int id;
+
+		private bool disposed;
+		private bool connected;
+		private readonly List<Uri> uris = new List<Uri>();
+		private readonly List<Uri> updated = new List<Uri>();
+		private CommandHandler commandHandler;
+		private ExceptionHandler exceptionHandler;
+		private InterruptedHandler interruptedHandler;
+		private ResumedHandler resumedHandler;
+
+		private readonly Mutex reconnectMutex = new Mutex();
+		private readonly Mutex backupMutex = new Mutex();
+		private readonly Mutex sleepMutex = new Mutex();
+		private readonly ConnectionStateTracker stateTracker = new ConnectionStateTracker();
+		private readonly Dictionary<int, Command> requestMap = new Dictionary<int, Command>();
+
+		private Uri connectedTransportURI;
+		private Uri failedConnectTransportURI;
+		private readonly AtomicReference<ITransport> connectedTransport = new AtomicReference<ITransport>(null);
+		private TaskRunner reconnectTask = null;
+		private bool started;
+
+		private int timeout = -1;
+		private int initialReconnectDelay = 10;
+		private int maxReconnectDelay = 1000 * 30;
+		private int backOffMultiplier = 2;
+		private bool useExponentialBackOff = true;
+		private bool randomize = true;
+		private bool initialized;
+        private int maxReconnectAttempts;
+        private int startupMaxReconnectAttempts;
+		private int connectFailures;
+		private int reconnectDelay = 10;
+		private int asyncTimeout = 45000;
+		private bool asyncConnect = false;
+		private Exception connectionFailure;
+		private bool firstConnection = true;
+		private bool backup = false;
+		private readonly List<BackupTransport> backups = new List<BackupTransport>();
+		private int backupPoolSize = 1;
+		private bool trackMessages = false;
+		private int maxCacheSize = 256;
+		private volatile Exception failure;
+		private readonly object mutex = new object();
+		private bool reconnectSupported = true;
+		private bool updateURIsSupported = true;
+
+		public FailoverTransport()
+		{
+			id = idCounter++;
+
+			stateTracker.TrackTransactions = true;
+            reconnectTask = DefaultThreadPools.DefaultTaskRunnerFactory.CreateTaskRunner(
+                new FailoverTask(this), "ActiveMQ Failover Worker: " + this.GetHashCode().ToString());
+		}
+
+		~FailoverTransport()
+		{
+			Dispose(false);
+		}
+
+		#region FailoverTask
+
+		private class FailoverTask : Task
+		{
+			private readonly FailoverTransport parent;
+
+			public FailoverTask(FailoverTransport p)
+			{
+				parent = p;
+			}
+
+			public bool Iterate()
+			{
+				bool result = false;
+                if (!parent.IsStarted)
+                {
+                    return false;
+                }
+
+				bool buildBackup = true;
+				bool doReconnect = !parent.disposed && parent.connectionFailure == null;
+				try
+				{
+					parent.backupMutex.WaitOne();
+					if(parent.ConnectedTransport == null && doReconnect)
+					{
+						result = parent.DoConnect();
+						buildBackup = false;
+					}
+				}
+				finally
+				{
+					parent.backupMutex.ReleaseMutex();
+				}
+
+				if(buildBackup)
+				{
+					parent.BuildBackups();
+				}
+				else
+				{
+					//build backups on the next iteration
+					result = true;
+					try
+					{
+						parent.reconnectTask.Wakeup();
+					}
+					catch(ThreadInterruptedException)
+					{
+						Tracer.Debug("Reconnect task has been interrupted.");
+					}
+				}
+				return result;
+			}
+		}
+
+		#endregion
+
+		#region Property Accessors
+
+		public CommandHandler Command
+		{
+			get { return commandHandler; }
+			set { commandHandler = value; }
+		}
+
+		public ExceptionHandler Exception
+		{
+			get { return exceptionHandler; }
+			set { exceptionHandler = value; }
+		}
+
+		public InterruptedHandler Interrupted
+		{
+			get { return interruptedHandler; }
+			set { this.interruptedHandler = value; }
+		}
+
+		public ResumedHandler Resumed
+		{
+			get { return resumedHandler; }
+			set { this.resumedHandler = value; }
+		}
+
+		internal Exception Failure
+		{
+			get { return failure; }
+			set
+			{
+				lock(mutex)
+				{
+					failure = value;
+				}
+			}
+		}
+
+		public int Timeout
+		{
+			get { return this.timeout; }
+			set { this.timeout = value; }
+		}
+
+		public int InitialReconnectDelay
+		{
+			get { return initialReconnectDelay; }
+			set { initialReconnectDelay = value; }
+		}
+
+		public int MaxReconnectDelay
+		{
+			get { return maxReconnectDelay; }
+			set { maxReconnectDelay = value; }
+		}
+
+		public int ReconnectDelay
+		{
+			get { return reconnectDelay; }
+			set { reconnectDelay = value; }
+		}
+
+		public int ReconnectDelayExponent
+		{
+			get { return backOffMultiplier; }
+			set { backOffMultiplier = value; }
+		}
+
+		public ITransport ConnectedTransport
+		{
+			get { return connectedTransport.Value; }
+			set { connectedTransport.Value = value; }
+		}
+
+		public Uri ConnectedTransportURI
+		{
+			get { return connectedTransportURI; }
+			set { connectedTransportURI = value; }
+		}
+
+		public int MaxReconnectAttempts
+		{
+			get { return maxReconnectAttempts; }
+			set { maxReconnectAttempts = value; }
+		}
+
+        public int StartupMaxReconnectAttempts
+        {
+            get { return startupMaxReconnectAttempts; }
+            set { startupMaxReconnectAttempts = value; }
+        }
+
+		public bool Randomize
+		{
+			get { return randomize; }
+			set { randomize = value; }
+		}
+
+		public bool Backup
+		{
+			get { return backup; }
+			set { backup = value; }
+		}
+
+		public int BackupPoolSize
+		{
+			get { return backupPoolSize; }
+			set { backupPoolSize = value; }
+		}
+
+		public bool TrackMessages
+		{
+			get { return trackMessages; }
+			set { trackMessages = value; }
+		}
+
+		public int MaxCacheSize
+		{
+			get { return maxCacheSize; }
+			set { maxCacheSize = value; }
+		}
+
+		public bool UseExponentialBackOff
+		{
+			get { return useExponentialBackOff; }
+			set { useExponentialBackOff = value; }
+		}
+
+        public IWireFormat WireFormat
+        {
+            get
+            {
+                ITransport transport = ConnectedTransport;
+                if(transport != null)
+                {
+                    return transport.WireFormat;
+                }
+
+                return null;
+            }
+        }
+
+		/// <summary>
+		/// Gets or sets a value indicating whether to asynchronously connect to sockets
+		/// </summary>
+		/// <value><c>true</c> if [async connect]; otherwise, <c>false</c>.</value>
+		public bool AsyncConnect
+		{
+			set { asyncConnect = value; }
+		}
+
+		/// <summary>
+		/// If doing an asynchronous connect, the milliseconds before timing out if no connection can be made
+		/// </summary>
+		/// <value>The async timeout.</value>
+		public int AsyncTimeout
+		{
+			get { return asyncTimeout; }
+			set { asyncTimeout = value; }
+		}
+
+        public ConnectionStateTracker StateTracker
+        {
+            get { return this.stateTracker; }
+        }
+
+		#endregion
+
+		public bool IsFaultTolerant
+		{
+			get { return true; }
+		}
+
+		public bool IsDisposed
+		{
+			get { return disposed; }
+		}
+
+		public bool IsConnected
+		{
+			get { return connected; }
+		}
+
+		public bool IsStarted
+		{
+			get { return started; }
+		}
+
+		public bool IsReconnectSupported
+		{
+			get { return this.reconnectSupported; }
+		}
+
+		public bool IsUpdateURIsSupported
+		{
+			get { return this.updateURIsSupported; }
+		}
+
+		public void OnException(ITransport sender, Exception error)
+		{
+			try
+			{
+				HandleTransportFailure(error);
+			}
+			catch(Exception e)
+			{
+				e.GetType();
+				// What to do here?
+			}
+		}
+
+		public void disposedOnCommand(ITransport sender, Command c)
+		{
+		}
+
+		public void disposedOnException(ITransport sender, Exception e)
+		{
+		}
+
+		public void HandleTransportFailure(Exception e)
+		{
+			ITransport transport = connectedTransport.GetAndSet(null);
+			if(transport != null)
+			{
+				transport.Command = disposedOnCommand;
+				transport.Exception = disposedOnException;
+				try
+				{
+					transport.Stop();
+				}
+				catch(Exception ex)
+				{
+					ex.GetType();	// Ignore errors but this lets us see the error during debugging
+				}
+
+				lock(reconnectMutex)
+				{
+					bool reconnectOk = false;
+					if(started)
+					{
+						Tracer.WarnFormat("Transport failed to {0}, attempting to automatically reconnect due to: {1}", ConnectedTransportURI.ToString(), e.Message);
+						reconnectOk = true;
+					}
+
+					initialized = false;
+					failedConnectTransportURI = ConnectedTransportURI;
+					ConnectedTransportURI = null;
+					connected = false;
+
+					if(this.Interrupted != null)
+					{
+						this.Interrupted(transport);
+					}
+
+					if(reconnectOk)
+					{
+						reconnectTask.Wakeup();
+					}
+				}
+			}
+		}
+
+		public void Start()
+		{
+			lock(reconnectMutex)
+			{
+				if(started)
+				{
+					Tracer.Debug("FailoverTransport Already Started.");
+					return;
+				}
+
+				Tracer.Debug("FailoverTransport Started.");
+				started = true;
+				stateTracker.MaxCacheSize = MaxCacheSize;
+				stateTracker.TrackMessages = TrackMessages;
+				if(ConnectedTransport != null)
+				{
+                    Tracer.Debug("FailoverTransport already connected, start is restoring.");
+					stateTracker.DoRestore(ConnectedTransport);
+				}
+				else
+				{
+                    Tracer.Debug("FailoverTransport not connected, start is reconnecting.");
+					Reconnect(false);
+				}
+			}
+		}
+
+		public virtual void Stop()
+		{
+			ITransport transportToStop = null;
+
+			lock(reconnectMutex)
+			{
+				if(!started)
+				{
+					Tracer.Debug("FailoverTransport Already Stopped.");
+					return;
+				}
+
+				Tracer.Debug("FailoverTransport Stopped.");
+				started = false;
+				disposed = true;
+				connected = false;
+				foreach(BackupTransport t in backups)
+				{
+					t.Disposed = true;
+				}
+				backups.Clear();
+
+				if(ConnectedTransport != null)
+				{
+					transportToStop = connectedTransport.GetAndSet(null);
+				}
+			}
+
+			try
+			{
+				sleepMutex.WaitOne();
+			}
+			finally
+			{
+				sleepMutex.ReleaseMutex();
+			}
+
+			if(reconnectTask != null)
+			{
+				reconnectTask.Shutdown();
+			}
+
+			if(transportToStop != null)
+			{
+				transportToStop.Stop();
+			}
+		}
+
+		public FutureResponse AsyncRequest(Command command)
+		{
+			throw new ApplicationException("FailoverTransport does not implement AsyncRequest(Command)");
+		}
+
+		public Response Request(Command command)
+		{
+			throw new ApplicationException("FailoverTransport does not implement Request(Command)");
+		}
+
+		public Response Request(Command command, TimeSpan ts)
+		{
+			throw new ApplicationException("FailoverTransport does not implement Request(Command, TimeSpan)");
+		}
+
+		public void OnCommand(ITransport sender, Command command)
+		{
+			if(command != null)
+			{
+				if(command.IsResponse)
+				{
+					Object oo = null;
+					lock(((ICollection) requestMap).SyncRoot)
+					{
+						int v = ((Response) command).CorrelationId;
+						try
+						{
+							if(requestMap.ContainsKey(v))
+							{
+								oo = requestMap[v];
+								requestMap.Remove(v);
+							}
+						}
+						catch
+						{
+						}
+					}
+
+					Tracked t = oo as Tracked;
+					if(t != null)
+					{
+						t.onResponses();
+					}
+				}
+
+				if(!initialized)
+				{
+					initialized = true;
+				}
+
+				if(command.IsConnectionControl)
+				{
+					this.HandleConnectionControl(command as ConnectionControl);
+				}
+			}
+
+			this.Command(sender, command);
+		}
+
+		public void Oneway(Command command)
+		{
+			Exception error = null;
+
+			lock(reconnectMutex)
+			{
+				if(command != null && ConnectedTransport == null)
+				{
+					if(command.IsShutdownInfo)
+					{
+						// Skipping send of ShutdownInfo command when not connected.
+						return;
+					}
+					else if(command.IsRemoveInfo || command.IsMessageAck)
+					{
+						// Simulate response to RemoveInfo command or a MessageAck
+                        // since it would be stale at this point.
+                        if(command.ResponseRequired)
+                        {
+						    OnCommand(this, new Response() { CorrelationId = command.CommandId });
+                        }
+						return;
+					}
+				}
+
+				// Keep trying until the message is sent.
+				for(int i = 0; !disposed; i++)
+				{
+					try
+					{
+                        // Any Ack that was being sent when the connection dropped is now
+                        // stale so we don't send it here as it would cause an unmatched ack
+                        // on the broker side and probably prevent a consumer from getting
+                        // any new messages.
+                        if(command.IsMessageAck && i > 0)
+                        {
+                            Tracer.Debug("Inflight MessageAck being dropped as stale.");
+                            if(command.ResponseRequired)
+                            {
+                                OnCommand(this, new Response() { CorrelationId = command.CommandId });
+                            }
+                            return;
+                        }
+
+						// Wait for transport to be connected.
+						ITransport transport = ConnectedTransport;
+						DateTime start = DateTime.Now;
+						bool timedout = false;
+						while(transport == null && !disposed && connectionFailure == null)
+						{
+							Tracer.Info("Waiting for transport to reconnect.");
+
+							int elapsed = (int) (DateTime.Now - start).TotalMilliseconds;
+							if(this.timeout > 0 && elapsed > timeout)
+							{
+								timedout = true;
+								Tracer.DebugFormat("FailoverTransport.oneway - timed out after {0} mills", elapsed);
+								break;
+							}
+
+							// Release so that the reconnect task can run
+							try
+							{
+								// Wait for something
+								Monitor.Wait(reconnectMutex, 100);
+							}
+							catch(ThreadInterruptedException e)
+							{
+								Tracer.DebugFormat("Interrupted: {0}", e.Message);
+							}
+
+							transport = ConnectedTransport;
+						}
+
+						if(transport == null)
+						{
+							// Previous loop may have exited due to use being disposed.
+							if(disposed)
+							{
+								error = new IOException("Transport disposed.");
+							}
+							else if(connectionFailure != null)
+							{
+								error = connectionFailure;
+							}
+							else if(timedout)
+							{
+								error = new IOException("Failover oneway timed out after " + timeout + " milliseconds.");
+							}
+							else
+							{
+								error = new IOException("Unexpected failure.");
+							}
+							break;
+						}
+
+						// If it was a request and it was not being tracked by
+						// the state tracker, then hold it in the requestMap so
+						// that we can replay it later.
+						Tracked tracked = stateTracker.Track(command);
+						lock(((ICollection) requestMap).SyncRoot)
+						{
+							if(tracked != null && tracked.WaitingForResponse)
+							{
+								requestMap.Add(command.CommandId, tracked);
+							}
+							else if(tracked == null && command.ResponseRequired)
+							{
+								requestMap.Add(command.CommandId, command);
+							}
+						}
+
+						// Send the message.
+						try
+						{
+							transport.Oneway(command);
+							stateTracker.TrackBack(command);
+						}
+						catch(Exception e)
+						{
+							// If the command was not tracked.. we will retry in
+							// this method
+							if(tracked == null)
+							{
+								// since we will retry in this method.. take it
+								// out of the request map so that it is not
+								// sent 2 times on recovery
+								if(command.ResponseRequired)
+								{
+									lock(((ICollection) requestMap).SyncRoot)
+									{
+										requestMap.Remove(command.CommandId);
+									}
+								}
+
+								// Rethrow the exception so it will handled by
+								// the outer catch
+								throw e;
+							}
+						}
+
+						return;
+					}
+					catch(Exception e)
+					{
+						Tracer.DebugFormat("Send Oneway attempt: {0} failed: Message = {1}", i, e.Message);
+						Tracer.DebugFormat("Failed Message Was: {0}", command);
+						HandleTransportFailure(e);
+					}
+				}
+			}
+
+			if(!disposed)
+			{
+				if(error != null)
+				{
+					throw error;
+				}
+			}
+		}
+
+		public void Add(bool rebalance, Uri[] u)
+		{
+			lock(uris)
+			{
+				for(int i = 0; i < u.Length; i++)
+				{
+					if(!uris.Contains(u[i]))
+					{
+						uris.Add(u[i]);
+					}
+				}
+			}
+
+			Reconnect(rebalance);
+		}
+
+		public void Add(bool rebalance, String u)
+		{
+			try
+			{
+				Add(rebalance, new Uri[] { new Uri(u) });
+			}
+			catch(Exception e)
+			{
+				Tracer.ErrorFormat("Failed to parse URI '{0}': {1}", u, e.Message);
+			}
+		}
+
+		public void Remove(bool rebalance, Uri[] u)
+		{
+			lock(uris)
+			{
+				for(int i = 0; i < u.Length; i++)
+				{
+					uris.Remove(u[i]);
+				}
+			}
+
+			Reconnect(rebalance);
+		}
+
+		public void Remove(bool rebalance, String u)
+		{
+			try
+			{
+				Remove(rebalance, new Uri[] { new Uri(u) });
+			}
+			catch(Exception e)
+			{
+				Tracer.ErrorFormat("Failed to parse URI '{0}': {1}", u, e.Message);
+			}
+		}
+
+		public void Reconnect(Uri uri)
+		{
+			Add(true, new Uri[] { uri });
+		}
+
+		public void Reconnect(bool rebalance)
+		{
+			lock(reconnectMutex)
+			{
+				if(started)
+				{
+					if(rebalance)
+					{
+						ITransport transport = connectedTransport.GetAndSet(null);
+						if(transport != null)
+						{
+							transport.Command = disposedOnCommand;
+							transport.Exception = disposedOnException;
+							try
+							{
+								transport.Stop();
+							}
+							catch(Exception ex)
+							{
+								ex.GetType();   // Ignore errors but this lets us see the error during debugging
+							}
+						}
+					}
+
+					Tracer.Debug("Waking up reconnect task");
+					try
+					{
+						reconnectTask.Wakeup();
+					}
+					catch(ThreadInterruptedException)
+					{
+					}
+				}
+				else
+				{
+					Tracer.Debug("Reconnect was triggered but transport is not started yet. Wait for start to connect the transport.");
+				}
+			}
+		}
+
+		private List<Uri> ConnectList
+		{
+			get
+			{
+				List<Uri> l = new List<Uri>(uris);
+				bool removed = false;
+				if(failedConnectTransportURI != null)
+				{
+					removed = l.Remove(failedConnectTransportURI);
+				}
+
+				if(Randomize)
+				{
+					// Randomly, reorder the list by random swapping
+					Random r = new Random(DateTime.Now.Millisecond);
+					for(int i = 0; i < l.Count; i++)
+					{
+						int p = r.Next(l.Count);
+						Uri t = l[p];
+						l[p] = l[i];
+						l[i] = t;
+					}
+				}
+
+				if(removed)
+				{
+					l.Add(failedConnectTransportURI);
+				}
+
+				return l;
+			}
+		}
+
+		protected void RestoreTransport(ITransport t)
+		{
+			Tracer.Info("Restoring previous transport connection.");
+			t.Start();
+
+			// Send information to the broker - informing it we are a fault tolerant client
+			t.Oneway(new ConnectionControl() { FaultTolerant = true });
+			stateTracker.DoRestore(t);
+
+			Tracer.Info("Sending queued commands...");
+			Dictionary<int, Command> tmpMap = null;
+			lock(((ICollection) requestMap).SyncRoot)
+			{
+				tmpMap = new Dictionary<int, Command>(requestMap);
+			}
+
+			foreach(Command command in tmpMap.Values)
+			{
+                if(command.IsMessageAck)
+                {
+                    Tracer.Debug("Stored MessageAck being dropped as stale.");
+                    OnCommand(this, new Response() { CorrelationId = command.CommandId });
+                    continue;
+                }
+
+                t.Oneway(command);
+			}
+		}
+
+		public Uri RemoteAddress
+		{
+			get
+			{
+				if(ConnectedTransport != null)
+				{
+					return ConnectedTransport.RemoteAddress;
+				}
+				return null;
+			}
+		}
+
+		public Object Narrow(Type type)
+		{
+			if(this.GetType().Equals(type))
+			{
+				return this;
+			}
+			else if(ConnectedTransport != null)
+			{
+				return ConnectedTransport.Narrow(type);
+			}
+
+			return null;
+		}
+
+		private bool DoConnect()
+		{
+			lock(reconnectMutex)
+			{
+				if(ConnectedTransport != null || disposed || connectionFailure != null)
+				{
+					return false;
+				}
+				else
+				{
+					List<Uri> connectList = ConnectList;
+					if(connectList.Count == 0)
+					{
+						Failure = new NMSConnectionException("No URIs available for connection.");
+					}
+					else
+					{
+						if(!UseExponentialBackOff)
+						{
+							ReconnectDelay = InitialReconnectDelay;
+						}
+
+						try
+						{
+							backupMutex.WaitOne();
+							if(Backup && backups.Count != 0)
+							{
+								BackupTransport bt = backups[0];
+								backups.RemoveAt(0);
+								ITransport t = bt.Transport;
+								Uri uri = bt.Uri;
+								t.Command = OnCommand;
+								t.Exception = OnException;
+								try
+								{
+									if(started)
+									{
+										RestoreTransport(t);
+									}
+									ReconnectDelay = InitialReconnectDelay;
+									failedConnectTransportURI = null;
+									ConnectedTransportURI = uri;
+									ConnectedTransport = t;
+									connectFailures = 0;
+									connected = true;
+                                    Monitor.PulseAll(reconnectMutex);
+									if(this.Resumed != null)
+									{
+										this.Resumed(t);
+									}
+									Tracer.InfoFormat("Successfully reconnected to backup {0}", uri.ToString());
+									return false;
+								}
+								catch(Exception e)
+								{
+									Tracer.DebugFormat("Backup transport failed: {0}", e.Message);
+								}
+							}
+						}
+						finally
+						{
+							backupMutex.ReleaseMutex();
+						}
+
+						ManualResetEvent allDone = new ManualResetEvent(false);
+						ITransport transport = null;
+						Uri chosenUri = null;
+						object syncLock = new object();
+
+						try
+						{
+							foreach(Uri uri in connectList)
+							{
+								if(ConnectedTransport != null || disposed)
+								{
+									break;
+								}
+
+								if(asyncConnect)
+								{
+									Tracer.DebugFormat("Attempting async connect to: {0}", uri);
+									// set connector up
+									Connector connector = new Connector(
+										delegate(ITransport transportToUse, Uri uriToUse)
+										{
+											if(transport == null)
+											{
+												lock(syncLock)
+												{
+													if(transport == null)
+													{
+														//the transport has not yet been set asynchronously so set it
+														transport = transportToUse;
+														chosenUri = uriToUse;
+													}
+													//notify issuing thread to move on
+													allDone.Set();
+												}
+											}
+										}, uri, this);
+
+									// initiate a thread to try connecting to broker
+									Thread thread = new Thread(connector.DoConnect) {Name = uri.ToString()};
+								    thread.Start();
+								}
+								else
+								{
+									// synchronous connect
+									try
+									{
+										Tracer.DebugFormat("Attempting sync connect to: {0}", uri);
+										transport = TransportFactory.CompositeConnect(uri);
+										chosenUri = transport.RemoteAddress;
+										break;
+									}
+									catch(Exception e)
+									{
+										Failure = e;
+										Tracer.DebugFormat("Connect fail to: {0}, reason: {1}", uri, e.Message);
+									}
+								}
+							}
+
+							if(asyncConnect)
+							{
+								// now wait for transport to be populated, but timeout eventually
+								allDone.WaitOne(asyncTimeout, false);
+							}
+
+							if(transport != null)
+							{
+								transport.Command = OnCommand;
+								transport.Exception = OnException;
+								transport.Start();
+
+								if(started)
+								{
+									RestoreTransport(transport);
+								}
+
+								if(this.Resumed != null)
+								{
+									this.Resumed(transport);
+								}
+
+								Tracer.Debug("Connection established");
+								ReconnectDelay = InitialReconnectDelay;
+								ConnectedTransportURI = chosenUri;
+								ConnectedTransport = transport;
+								connectFailures = 0;
+								connected = true;
+                                Monitor.PulseAll(reconnectMutex);
+
+								if(firstConnection)
+								{
+									firstConnection = false;
+									Tracer.InfoFormat("Successfully connected to: {0}", chosenUri.ToString());
+								}
+								else
+								{
+									Tracer.InfoFormat("Successfully reconnected to: {0}", chosenUri.ToString());
+								}
+
+								return false;
+							}
+
+							if(asyncConnect)
+							{
+								Tracer.DebugFormat("Connect failed after waiting for asynchronous callback.");
+							}
+						}
+						catch(Exception e)
+						{
+							Failure = e;
+							Tracer.DebugFormat("Connect attempt failed.  Reason: {0}", e.Message);
+						}
+					}
+
+                    int reconnectAttempts = 0;
+                    if( firstConnection ) {
+                        if( StartupMaxReconnectAttempts != 0 ) {
+                            reconnectAttempts = StartupMaxReconnectAttempts;
+                        }
+                    }
+                    if( reconnectAttempts == 0 ) {
+                        reconnectAttempts = MaxReconnectAttempts;
+                    }
+        
+					if(reconnectAttempts > 0 && ++connectFailures >= reconnectAttempts)
+					{
+						Tracer.ErrorFormat("Failed to connect to transport after {0} attempt(s)", connectFailures);
+						connectionFailure = Failure;
+						this.Exception(this, connectionFailure);
+						return false;
+					}
+				}
+			}
+
+			if(!disposed)
+			{
+				Tracer.DebugFormat("Waiting {0}ms before attempting connection.", ReconnectDelay);
+				lock(sleepMutex)
+				{
+					try
+					{
+						Thread.Sleep(ReconnectDelay);
+					}
+					catch(ThreadInterruptedException)
+					{
+					}
+				}
+
+				if(UseExponentialBackOff)
+				{
+					// Exponential increment of reconnect delay.
+					ReconnectDelay *= ReconnectDelayExponent;
+					if(ReconnectDelay > MaxReconnectDelay)
+					{
+						ReconnectDelay = MaxReconnectDelay;
+					}
+				}
+			}
+			return !disposed;
+		}
+
+		/// <summary>
+		/// This class is a helper for the asynchronous connect option
+		/// </summary>
+		public class Connector
+		{
+			/// <summary>
+			/// callback to properly set chosen transport
+			/// </summary>
+			readonly SetTransport _setTransport;
+
+			/// <summary>
+			/// Uri to try connecting to
+			/// </summary>
+			readonly Uri _uri;
+
+			/// <summary>
+			/// Failover transport issuing the connection attempt
+			/// </summary>
+			private readonly FailoverTransport _transport;
+
+			/// <summary>
+			/// Initializes a new instance of the <see cref="Connector"/> class.
+			/// </summary>
+			/// <param name="setTransport">The set transport.</param>
+			/// <param name="uri">The URI.</param>
+			/// <param name="transport">The transport.</param>
+			public Connector(SetTransport setTransport, Uri uri, FailoverTransport transport)
+			{
+				_uri = uri;
+				_setTransport = setTransport;
+				_transport = transport;
+			}
+
+			/// <summary>
+			/// Does the connect.
+			/// </summary>
+			public void DoConnect()
+			{
+				try
+				{
+					TransportFactory.AsyncCompositeConnect(_uri, _setTransport);
+				}
+				catch(Exception e)
+				{
+					_transport.Failure = e;
+					Tracer.DebugFormat("Connect fail to: {0}, reason: {1}", _uri, e.Message);
+				}
+
+			}
+		}
+
+		private bool BuildBackups()
+		{
+			lock(backupMutex)
+			{
+				if(!disposed && Backup && backups.Count < BackupPoolSize)
+				{
+					List<Uri> connectList = ConnectList;
+					foreach(BackupTransport bt in backups)
+					{
+						if(bt.Disposed)
+						{
+							backups.Remove(bt);
+						}
+					}
+
+					foreach(Uri uri in connectList)
+					{
+						if(ConnectedTransportURI != null && !ConnectedTransportURI.Equals(uri))
+						{
+							try
+							{
+								BackupTransport bt = new BackupTransport(this)
+								{
+									Uri = uri
+								};
+
+								if(!backups.Contains(bt))
+								{
+									ITransport t = TransportFactory.CompositeConnect(uri);
+									t.Command = bt.OnCommand;
+									t.Exception = bt.OnException;
+									t.Start();
+									bt.Transport = t;
+									backups.Add(bt);
+								}
+							}
+							catch(Exception e)
+							{
+								Tracer.DebugFormat("Failed to build backup: {0}", e.Message);
+							}
+						}
+
+						if(backups.Count == BackupPoolSize)
+						{
+							break;
+						}
+					}
+				}
+			}
+
+			return false;
+		}
+
+		public void ConnectionInterruptProcessingComplete(ConnectionId connectionId)
+		{
+			lock(reconnectMutex)
+			{
+				Tracer.Debug("Connection Interrupt Processing is complete for ConnectionId: " + connectionId);
+				stateTracker.ConnectionInterruptProcessingComplete(this, connectionId);
+			}
+		}
+
+		public void UpdateURIs(bool rebalance, Uri[] updatedURIs)
+		{
+			if(IsUpdateURIsSupported)
+			{
+				List<Uri> copy = new List<Uri>(this.updated);
+				List<Uri> added = new List<Uri>();
+
+				if(updatedURIs != null && updatedURIs.Length > 0)
+				{
+					Dictionary<Uri, bool> uriSet = new Dictionary<Uri, bool>();
+					for(int i = 0; i < updatedURIs.Length; i++)
+					{
+						Uri uri = updatedURIs[i];
+						if(uri != null)
+						{
+							uriSet[uri] = true;
+						}
+					}
+
+					foreach(Uri uri in uriSet.Keys)
+					{
+						if(copy.Remove(uri) == false)
+						{
+							added.Add(uri);
+						}
+					}
+
+					lock(reconnectMutex)
+					{
+						this.updated.Clear();
+						this.updated.AddRange(added);
+
+						foreach(Uri uri in copy)
+						{
+							this.uris.Remove(uri);
+						}
+
+						this.Add(rebalance, added.ToArray());
+					}
+				}
+			}
+		}
+
+		public void HandleConnectionControl(ConnectionControl control)
+		{
+			string reconnectStr = control.ReconnectTo;
+
+			if(reconnectStr != null)
+			{
+				reconnectStr = reconnectStr.Trim();
+				if(reconnectStr.Length > 0)
+				{
+					try
+					{
+						Uri uri = new Uri(reconnectStr);
+						if(IsReconnectSupported)
+						{
+							Reconnect(uri);
+							Tracer.Info("Reconnected to: " + uri.OriginalString);
+						}
+					}
+					catch(Exception e)
+					{
+						Tracer.ErrorFormat("Failed to handle ConnectionControl reconnect to {0}: {1}", reconnectStr, e);
+					}
+				}
+			}
+
+			ProcessNewTransports(control.RebalanceConnection, control.ConnectedBrokers);
+		}
+
+		private void ProcessNewTransports(bool rebalance, String newTransports)
+		{
+			if(newTransports != null)
+			{
+				newTransports = newTransports.Trim();
+
+				if(newTransports.Length > 0 && IsUpdateURIsSupported)
+				{
+					List<Uri> list = new List<Uri>();
+					String[] tokens = newTransports.Split(new Char[] { ',' });
+
+					foreach(String str in tokens)
+					{
+						try
+						{
+							Uri uri = new Uri(str);
+							list.Add(uri);
+						}
+						catch
+						{
+							Tracer.Error("Failed to parse broker address: " + str);
+						}
+					}
+
+					if(list.Count != 0)
+					{
+						try
+						{
+							UpdateURIs(rebalance, list.ToArray());
+						}
+						catch
+						{
+							Tracer.Error("Failed to update transport URI's from: " + newTransports);
+						}
+					}
+				}
+			}
+		}
+
+		public void Dispose()
+		{
+			Dispose(true);
+			GC.SuppressFinalize(this);
+		}
+
+		public void Dispose(bool disposing)
+		{
+			if(disposing)
+			{
+				// get rid of unmanaged stuff
+			}
+
+            this.Stop();
+
+			disposed = true;
+		}
+
+		public int CompareTo(Object o)
+		{
+			if(o is FailoverTransport)
+			{
+				FailoverTransport oo = o as FailoverTransport;
+
+				return this.id - oo.id;
+			}
+			else
+			{
+				throw new ArgumentException();
+			}
+		}
+
+		public override String ToString()
+		{
+			return ConnectedTransportURI == null ? "unconnected" : ConnectedTransportURI.ToString();
+		}
+	}
+}
diff --git a/src/main/csharp/Transport/InactivityMonitor.cs b/src/main/csharp/Transport/InactivityMonitor.cs
index f4b9203..b5f90d7 100644
--- a/src/main/csharp/Transport/InactivityMonitor.cs
+++ b/src/main/csharp/Transport/InactivityMonitor.cs
@@ -262,9 +262,9 @@
 		public override void Oneway(Command command)
 		{
 			// Disable inactivity monitoring while processing a command.
-			//synchronize this method - its not synchronized
-			//further down the transport stack and gets called by more
-			//than one thread  by this class
+			// synchronize this method - its not synchronized
+			// further down the transport stack and gets called by more
+			// than one thread  by this class
 			lock(inWrite)
 			{
 				inWrite.Value = true;
@@ -330,10 +330,10 @@
 					Math.Min(
 						localWireFormatInfo.MaxInactivityDuration,
 						remoteWireFormatInfo.MaxInactivityDuration);
-				initialDelayTime =
-					Math.Min(
-						localWireFormatInfo.MaxInactivityDurationInitialDelay,
-						remoteWireFormatInfo.MaxInactivityDurationInitialDelay);
+				initialDelayTime = remoteWireFormatInfo.MaxInactivityDurationInitialDelay > 0 ?
+					Math.Min(localWireFormatInfo.MaxInactivityDurationInitialDelay,
+						     remoteWireFormatInfo.MaxInactivityDurationInitialDelay) :
+                    localWireFormatInfo.MaxInactivityDurationInitialDelay;
 
 				if(readCheckTime > 0)
 				{