blob: 998605f5913a60d15ecd368f2bab15659a2827c8 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
using System;
using System.Collections;
using System.Collections.Generic;
using System.Threading;
using System.Text;
using System.Net;
using Apache.NMS.ActiveMQ.Commands;
using Apache.NMS.ActiveMQ.State;
using Apache.NMS.ActiveMQ.Threads;
using Apache.NMS.Util;
namespace Apache.NMS.ActiveMQ.Transport.Failover
{
/// <summary>
/// A Transport that is made reliable by being able to fail over to another
/// transport when a transport failure is detected.
/// </summary>
public class FailoverTransport : ICompositeTransport, IComparable
{
private static int DEFAULT_INITIAL_RECONNECT_DELAY = 10;
private static int INFINITE = -1;
private static int idCounter = 0;
private readonly int id;
private bool disposed;
private bool connected;
private readonly List<Uri> uris = new List<Uri>();
private readonly List<Uri> updated = new List<Uri>();
private CommandHandler commandHandler;
private ExceptionHandler exceptionHandler;
private InterruptedHandler interruptedHandler;
private ResumedHandler resumedHandler;
private readonly CountDownLatch listenerLatch = new CountDownLatch(4);
private readonly Mutex reconnectMutex = new Mutex();
private readonly Mutex backupMutex = new Mutex();
private readonly Mutex sleepMutex = new Mutex();
private readonly ConnectionStateTracker stateTracker = new ConnectionStateTracker();
private readonly Dictionary<int, Command> requestMap = new Dictionary<int, Command>();
private Uri connectedTransportURI;
private Uri failedConnectTransportURI;
private readonly AtomicReference<ITransport> connectedTransport = new AtomicReference<ITransport>(null);
private TaskRunner reconnectTask = null;
private bool started;
private bool initialized;
private int initialReconnectDelay = DEFAULT_INITIAL_RECONNECT_DELAY;
private int maxReconnectDelay = 1000 * 30;
private int backOffMultiplier = 2;
private int timeout = INFINITE;
private bool useExponentialBackOff = true;
private bool randomize = true;
private int maxReconnectAttempts = INFINITE;
private int startupMaxReconnectAttempts = INFINITE;
private int connectFailures;
private int reconnectDelay = DEFAULT_INITIAL_RECONNECT_DELAY;
private Exception connectionFailure;
private bool firstConnection = true;
private bool backup = false;
private readonly List<BackupTransport> backups = new List<BackupTransport>();
private int backupPoolSize = 1;
private bool trackMessages = false;
private bool trackTransactionProducers = true;
private int maxCacheSize = 256;
private volatile Exception failure;
private readonly object mutex = new object();
private bool reconnectSupported = true;
private bool updateURIsSupported = true;
private bool doRebalance = false;
private bool connectedToPriority = false;
private bool priorityBackup = false;
private List<Uri> priorityList = new List<Uri>();
private bool priorityBackupAvailable = false;
// Not Sure how to work these back in with all the changes.
//private int asyncTimeout = 45000;
//private bool asyncConnect = false;
public FailoverTransport()
{
id = idCounter++;
stateTracker.TrackTransactions = true;
reconnectTask = DefaultThreadPools.DefaultTaskRunnerFactory.CreateTaskRunner(
new FailoverTask(this), "ActiveMQ Failover Worker: " + this.GetHashCode().ToString());
}
~FailoverTransport()
{
Dispose(false);
}
#region FailoverTask
private class FailoverTask : Task
{
private readonly FailoverTransport parent;
public FailoverTask(FailoverTransport p)
{
parent = p;
}
public bool Iterate()
{
bool result = false;
if (!parent.IsStarted)
{
return false;
}
bool buildBackup = true;
lock (parent.backupMutex)
{
if ((parent.connectedTransport.Value == null || parent.doRebalance || parent.priorityBackupAvailable) && !parent.disposed)
{
result = parent.DoConnect();
buildBackup = false;
}
}
if (buildBackup)
{
parent.BuildBackups();
if (parent.priorityBackup && !parent.connectedToPriority)
{
try
{
parent.DoDelay();
if (parent.reconnectTask == null)
{
return true;
}
parent.reconnectTask.Wakeup();
}
catch (ThreadInterruptedException)
{
Tracer.Debug("Reconnect task has been interrupted.");
}
}
}
else
{
try
{
if (parent.reconnectTask == null)
{
return true;
}
parent.reconnectTask.Wakeup();
}
catch (ThreadInterruptedException)
{
Tracer.Debug("Reconnect task has been interrupted.");
}
}
return result;
}
}
#endregion
#region Property Accessors
public CommandHandler Command
{
get { return commandHandler; }
set
{
commandHandler = value;
listenerLatch.countDown();
}
}
public ExceptionHandler Exception
{
get { return exceptionHandler; }
set
{
exceptionHandler = value;
listenerLatch.countDown();
}
}
public InterruptedHandler Interrupted
{
get { return interruptedHandler; }
set
{
this.interruptedHandler = value;
this.listenerLatch.countDown();
}
}
public ResumedHandler Resumed
{
get { return resumedHandler; }
set
{
this.resumedHandler = value;
this.listenerLatch.countDown();
}
}
internal Exception Failure
{
get { return failure; }
set
{
lock(mutex)
{
failure = value;
}
}
}
public int Timeout
{
get { return this.timeout; }
set { this.timeout = value; }
}
public int InitialReconnectDelay
{
get { return initialReconnectDelay; }
set { initialReconnectDelay = value; }
}
public int MaxReconnectDelay
{
get { return maxReconnectDelay; }
set { maxReconnectDelay = value; }
}
public int ReconnectDelay
{
get { return reconnectDelay; }
set { reconnectDelay = value; }
}
public int ReconnectDelayExponent
{
get { return backOffMultiplier; }
set { backOffMultiplier = value; }
}
public ITransport ConnectedTransport
{
get { return connectedTransport.Value; }
set { connectedTransport.Value = value; }
}
public Uri ConnectedTransportURI
{
get { return connectedTransportURI; }
set { connectedTransportURI = value; }
}
public int MaxReconnectAttempts
{
get { return maxReconnectAttempts; }
set { maxReconnectAttempts = value; }
}
public int StartupMaxReconnectAttempts
{
get { return startupMaxReconnectAttempts; }
set { startupMaxReconnectAttempts = value; }
}
public bool Randomize
{
get { return randomize; }
set { randomize = value; }
}
public bool Backup
{
get { return backup; }
set { backup = value; }
}
public bool PriorityBackup
{
get { return priorityBackup; }
set { this.priorityBackup = value; }
}
public String PriorityURIs
{
get { return PrintableUriList(priorityList); }
set { this.ProcessDelimitedUriList(value, priorityList); }
}
public int BackupPoolSize
{
get { return backupPoolSize; }
set { backupPoolSize = value; }
}
public bool TrackMessages
{
get { return trackMessages; }
set { trackMessages = value; }
}
public bool TrackTransactionProducers
{
get { return trackTransactionProducers; }
set { this.trackTransactionProducers = value; }
}
public int MaxCacheSize
{
get { return maxCacheSize; }
set { maxCacheSize = value; }
}
public bool UseExponentialBackOff
{
get { return useExponentialBackOff; }
set { useExponentialBackOff = value; }
}
public IWireFormat WireFormat
{
get
{
ITransport transport = ConnectedTransport;
if(transport != null)
{
return transport.WireFormat;
}
return null;
}
}
/// <summary>
/// Gets or sets a value indicating whether to asynchronously connect to sockets
/// </summary>
/// <value><c>true</c> if [async connect]; otherwise, <c>false</c>.</value>
public bool AsyncConnect
{
set { }
}
/// <summary>
/// If doing an asynchronous connect, the milliseconds before timing out if no connection can be made
/// </summary>
/// <value>The async timeout.</value>
public int AsyncTimeout
{
get { return 0; }
set { }
}
public ConnectionStateTracker StateTracker
{
get { return this.stateTracker; }
}
#endregion
public bool IsFaultTolerant
{
get { return true; }
}
public bool IsDisposed
{
get { return disposed; }
}
public bool IsConnected
{
get { return connected; }
}
public bool IsConnectedToPriority
{
get { return connectedToPriority; }
}
public bool IsStarted
{
get { return started; }
}
public bool IsReconnectSupported
{
get { return this.reconnectSupported; }
}
public bool IsUpdateURIsSupported
{
get { return this.updateURIsSupported; }
}
public void OnException(ITransport sender, Exception error)
{
try
{
HandleTransportFailure(error);
}
catch(Exception)
{
this.Exception(this, new IOException("Unexpected Transport Failure."));
}
}
public void DisposedOnCommand(ITransport sender, Command c)
{
}
public void DisposedOnException(ITransport sender, Exception e)
{
}
public void HandleTransportFailure(Exception e)
{
ITransport transport = connectedTransport.GetAndSet(null);
if (transport == null)
{
// sync with possible in progress reconnect
lock(reconnectMutex)
{
transport = connectedTransport.GetAndSet(null);
}
}
if(transport != null)
{
DisposeTransport(transport);
bool reconnectOk = false;
lock(reconnectMutex)
{
if (CanReconnect())
{
Tracer.WarnFormat("Transport failed to {0}, attempting to automatically reconnect due to: {1}",
ConnectedTransportURI, e.Message);
reconnectOk = true;
}
initialized = false;
failedConnectTransportURI = ConnectedTransportURI;
ConnectedTransportURI = null;
connectedToPriority = false;
connected = false;
if(this.Interrupted != null)
{
this.Interrupted(transport);
}
if (reconnectOk)
{
updated.Remove(failedConnectTransportURI);
reconnectTask.Wakeup();
}
else if (!disposed)
{
PropagateFailureToExceptionListener(e);
}
}
}
}
private bool CanReconnect()
{
return started && 0 != CalculateReconnectAttemptLimit();
}
public void Start()
{
lock(reconnectMutex)
{
if(started)
{
Tracer.Debug("FailoverTransport Already Started.");
return;
}
Tracer.Debug("FailoverTransport Started.");
started = true;
stateTracker.MaxCacheSize = MaxCacheSize;
stateTracker.TrackMessages = TrackMessages;
stateTracker.TrackTransactionProducers = TrackTransactionProducers;
if(ConnectedTransport != null)
{
Tracer.Debug("FailoverTransport already connected, start is restoring.");
stateTracker.DoRestore(ConnectedTransport);
}
else
{
Tracer.Debug("FailoverTransport not connected, start is reconnecting.");
Reconnect(false);
}
}
}
public virtual void Stop()
{
ITransport transportToStop = null;
List<ITransport> backupsToStop = new List<ITransport>(backups.Count);
try
{
lock(reconnectMutex)
{
if(!started)
{
Tracer.Debug("FailoverTransport Already Stopped.");
return;
}
Tracer.Debug("FailoverTransport Stopped.");
started = false;
disposed = true;
connected = false;
if(ConnectedTransport != null)
{
transportToStop = connectedTransport.GetAndSet(null);
}
}
lock(sleepMutex)
{
Monitor.PulseAll(sleepMutex);
}
}
finally
{
if(reconnectTask != null)
{
reconnectTask.Shutdown();
}
}
lock(backupMutex)
{
foreach (BackupTransport backup in backups)
{
backup.Disposed = true;
ITransport transport = backup.Transport;
if (transport != null)
{
transport.Command = DisposedOnCommand;
transport.Exception = DisposedOnException;
backupsToStop.Add(transport);
}
}
backups.Clear();
}
foreach (ITransport transport in backupsToStop)
{
try
{
if (Tracer.IsDebugEnabled)
{
Tracer.Debug("Stopped backup: " + transport);
}
DisposeTransport(transport);
}
catch (Exception)
{
}
}
if(transportToStop != null)
{
transportToStop.Stop();
}
}
public FutureResponse AsyncRequest(Command command)
{
throw new ApplicationException("FailoverTransport does not implement AsyncRequest(Command)");
}
public Response Request(Command command)
{
throw new ApplicationException("FailoverTransport does not implement Request(Command)");
}
public Response Request(Command command, TimeSpan ts)
{
throw new ApplicationException("FailoverTransport does not implement Request(Command, TimeSpan)");
}
public void OnCommand(ITransport sender, Command command)
{
if(command != null)
{
if(command.IsResponse)
{
Command request = null;
lock(((ICollection) requestMap).SyncRoot)
{
int v = ((Response) command).CorrelationId;
try
{
if(requestMap.TryGetValue(v, out request))
{
requestMap.Remove(v);
}
}
catch
{
}
}
Tracked tracked = request as Tracked;
if(tracked != null)
{
tracked.OnResponse();
}
}
if(!initialized)
{
initialized = true;
}
if(command.IsConnectionControl)
{
this.HandleConnectionControl(command as ConnectionControl);
}
}
this.Command(sender, command);
}
public void Oneway(Command command)
{
Exception error = null;
lock(reconnectMutex)
{
if(command != null && ConnectedTransport == null)
{
if(command.IsShutdownInfo)
{
// Skipping send of ShutdownInfo command when not connected.
return;
}
else if(command.IsRemoveInfo || command.IsMessageAck)
{
stateTracker.Track(command);
// Simulate response to RemoveInfo command or a MessageAck
// since it would be stale at this point.
if(command.ResponseRequired)
{
OnCommand(this, new Response() { CorrelationId = command.CommandId });
}
return;
}
else if(command.IsMessagePull)
{
// Simulate response to MessagePull if timed as we can't honor that now.
MessagePull pullRequest = command as MessagePull;
if (pullRequest.Timeout != 0)
{
MessageDispatch dispatch = new MessageDispatch();
dispatch.ConsumerId = pullRequest.ConsumerId;
dispatch.Destination = pullRequest.Destination;
OnCommand(this, dispatch);
}
return;
}
}
// Keep trying until the message is sent.
for(int i = 0; !disposed; i++)
{
try
{
// Any Ack that was being sent when the connection dropped is now
// stale so we don't send it here as it would cause an unmatched ack
// on the broker side and probably prevent a consumer from getting
// any new messages.
if(command.IsMessageAck && i > 0)
{
Tracer.Debug("Inflight MessageAck being dropped as stale.");
if(command.ResponseRequired)
{
OnCommand(this, new Response() { CorrelationId = command.CommandId });
}
return;
}
// Wait for transport to be connected.
ITransport transport = ConnectedTransport;
DateTime start = DateTime.Now;
bool timedout = false;
while(transport == null && !disposed && connectionFailure == null)
{
Tracer.Debug("Waiting for transport to reconnect.");
int elapsed = (int) (DateTime.Now - start).TotalMilliseconds;
if(this.timeout > 0 && elapsed > this.timeout)
{
timedout = true;
Tracer.DebugFormat("FailoverTransport.oneway - timed out after {0} mills", elapsed);
break;
}
// Release so that the reconnect task can run
try
{
// Wait for something. The mutex will be pulsed if we connect.
Monitor.Wait(reconnectMutex, 100);
}
catch(ThreadInterruptedException e)
{
Tracer.DebugFormat("Interrupted: {0}", e.Message);
}
transport = ConnectedTransport;
}
if(transport == null)
{
// Previous loop may have exited due to use being disposed.
if(disposed)
{
error = new IOException("Transport disposed.");
}
else if(connectionFailure != null)
{
error = connectionFailure;
}
else if(timedout)
{
error = new IOException("Failover oneway timed out after " + timeout + " milliseconds.");
}
else
{
error = new IOException("Unexpected failure.");
}
break;
}
// If it was a request and it was not being tracked by
// the state tracker, then hold it in the requestMap so
// that we can replay it later.
Tracked tracked = stateTracker.Track(command);
lock(((ICollection) requestMap).SyncRoot)
{
if(tracked != null && tracked.WaitingForResponse)
{
requestMap.Add(command.CommandId, tracked);
}
else if(tracked == null && command.ResponseRequired)
{
requestMap.Add(command.CommandId, command);
}
}
// Send the message.
try
{
transport.Oneway(command);
stateTracker.TrackBack(command);
}
catch(Exception e)
{
// If the command was not tracked.. we will retry in this method
// otherwise we need to trigger a reconnect before returning as
// the transport is failed.
if (tracked == null)
{
// since we will retry in this method.. take it
// out of the request map so that it is not
// sent 2 times on recovery
if(command.ResponseRequired)
{
lock(((ICollection) requestMap).SyncRoot)
{
requestMap.Remove(command.CommandId);
}
}
// Rethrow the exception so it will handled by
// the outer catch
throw;
}
else
{
if (Tracer.IsDebugEnabled)
{
Tracer.DebugFormat("Send Oneway attempt: {0} failed: Message = {1}",
i, e.Message);
Tracer.DebugFormat("Failed Message Was: {0}", command);
}
HandleTransportFailure(e);
}
}
return;
}
catch(Exception e)
{
if (Tracer.IsDebugEnabled)
{
Tracer.DebugFormat("Send Oneway attempt: {0} failed: Message = {1}",
i, e.Message);
Tracer.DebugFormat("Failed Message Was: {0}", command);
}
HandleTransportFailure(e);
}
}
}
if(!disposed)
{
if(error != null)
{
throw error;
}
}
}
public void Add(bool rebalance, Uri[] urisToAdd)
{
bool newUri = false;
lock(uris)
{
foreach (Uri uri in urisToAdd)
{
if(!Contains(uri))
{
uris.Add(uri);
newUri = true;
}
}
}
if (newUri)
{
Reconnect(rebalance);
}
}
public void Add(bool rebalance, String u)
{
try
{
Add(rebalance, new Uri[] { new Uri(u) });
}
catch(Exception e)
{
Tracer.ErrorFormat("Failed to parse URI '{0}': {1}", u, e.Message);
}
}
public void Remove(bool rebalance, Uri[] u)
{
lock(uris)
{
for(int i = 0; i < u.Length; i++)
{
uris.Remove(u[i]);
}
}
Reconnect(rebalance);
}
public void Remove(bool rebalance, String u)
{
try
{
Remove(rebalance, new Uri[] { new Uri(u) });
}
catch(Exception e)
{
Tracer.ErrorFormat("Failed to parse URI '{0}': {1}", u, e.Message);
}
}
public void Reconnect(Uri uri)
{
Add(true, new Uri[] { uri });
}
public void Reconnect(bool rebalance)
{
lock(reconnectMutex)
{
if(started)
{
if (rebalance)
{
doRebalance = true;
}
Tracer.Debug("Waking up reconnect task");
try
{
reconnectTask.Wakeup();
}
catch (ThreadInterruptedException)
{
}
}
else
{
Tracer.Debug("Reconnect was triggered but transport is not started yet. Wait for start to connect the transport.");
}
}
}
private List<Uri> ConnectList
{
get
{
if (updated.Count != 0)
{
return updated;
}
List<Uri> l = new List<Uri>(uris);
bool removed = false;
if(failedConnectTransportURI != null)
{
removed = l.Remove(failedConnectTransportURI);
}
if(Randomize)
{
Shuffle(l);
}
if(removed)
{
l.Add(failedConnectTransportURI);
}
if (Tracer.IsDebugEnabled)
{
Tracer.DebugFormat("Uri connection list: {0} from: {1}",
PrintableUriList(l), PrintableUriList(uris));
}
return l;
}
}
protected void RestoreTransport(ITransport t)
{
Tracer.Info("Restoring previous transport connection.");
t.Start();
// Send information to the broker - informing it we are a fault tolerant client
t.Oneway(new ConnectionControl() { FaultTolerant = true });
stateTracker.DoRestore(t);
Tracer.Info("Sending queued commands...");
Dictionary<int, Command> tmpMap = null;
lock(((ICollection) requestMap).SyncRoot)
{
tmpMap = new Dictionary<int, Command>(requestMap);
}
foreach(Command command in tmpMap.Values)
{
if(command.IsMessageAck)
{
Tracer.Debug("Stored MessageAck being dropped as stale.");
OnCommand(this, new Response() { CorrelationId = command.CommandId });
continue;
}
t.Oneway(command);
}
}
public Uri RemoteAddress
{
get
{
if(ConnectedTransport != null)
{
return ConnectedTransport.RemoteAddress;
}
return null;
}
}
public Object Narrow(Type type)
{
if(this.GetType().Equals(type))
{
return this;
}
else if(ConnectedTransport != null)
{
return ConnectedTransport.Narrow(type);
}
return null;
}
private bool DoConnect()
{
lock(reconnectMutex)
{
if (disposed || connectionFailure != null)
{
Monitor.PulseAll(reconnectMutex);
}
if ((connectedTransport.Value != null && !doRebalance && !priorityBackupAvailable) || disposed || connectionFailure != null)
{
return false;
}
else
{
List<Uri> connectList = ConnectList;
if(connectList.Count == 0)
{
Failure = new NMSConnectionException("No URIs available for connection.");
}
else
{
if (doRebalance)
{
if (connectedToPriority || CompareUris(connectList[0], connectedTransportURI))
{
// already connected to first in the list, no need to rebalance
doRebalance = false;
return false;
}
else
{
if (Tracer.IsDebugEnabled)
{
Tracer.DebugFormat("Doing rebalance from: {0} to {1}",
connectedTransportURI, PrintableUriList(connectList));
}
try
{
ITransport current = this.connectedTransport.GetAndSet(null);
if (current != null)
{
DisposeTransport(current);
}
}
catch (Exception e)
{
if (Tracer.IsDebugEnabled)
{
Tracer.DebugFormat("Caught an exception stopping existing " +
"transport for rebalance {0}", e.Message);
}
}
}
doRebalance = false;
}
ResetReconnectDelay();
ITransport transport = null;
Uri uri = null;
// If we have a backup already waiting lets try it.
lock(backupMutex)
{
if ((priorityBackup || backup) && backups.Count > 0)
{
List<BackupTransport> l = new List<BackupTransport>(backups);
if (randomize)
{
Shuffle(l);
}
BackupTransport bt = l[0];
l.RemoveAt(0);
backups.Remove(bt);
transport = bt.Transport;
uri = bt.Uri;
if (priorityBackup && priorityBackupAvailable)
{
ITransport old = this.connectedTransport.GetAndSet(null);
if (old != null)
{
DisposeTransport(old);
}
priorityBackupAvailable = false;
}
}
}
// Sleep for the reconnectDelay if there's no backup and we aren't trying
// for the first time, or we were disposed for some reason.
if (transport == null && !firstConnection && (reconnectDelay > 0) && !disposed)
{
lock(sleepMutex)
{
if (Tracer.IsDebugEnabled)
{
Tracer.DebugFormat("Waiting {0} ms before attempting connection.", reconnectDelay);
}
try
{
Monitor.Wait(sleepMutex, reconnectDelay);
}
catch (ThreadInterruptedException)
{
}
}
}
IEnumerator<Uri> iter = connectList.GetEnumerator();
while ((transport != null || iter.MoveNext()) && (connectedTransport.Value == null && !disposed))
{
try
{
if (Tracer.IsDebugEnabled)
{
Tracer.DebugFormat("Attempting {0}th connect to: {1}",
connectFailures, uri);
}
// We could be starting with a backup and if so we wait to grab a
// URI from the pool until next time around.
if (transport == null)
{
uri = iter.Current;
transport = TransportFactory.CompositeConnect(uri);
}
transport.Command = OnCommand;
transport.Exception = OnException;
transport.Start();
if (started && !firstConnection)
{
RestoreTransport(transport);
}
if (Tracer.IsDebugEnabled)
{
Tracer.Debug("Connection established");
}
reconnectDelay = initialReconnectDelay;
connectedTransportURI = uri;
connectedTransport.Value = transport;
connectedToPriority = IsPriority(connectedTransportURI);
Monitor.PulseAll(reconnectMutex);
connectFailures = 0;
// Try to wait long enough for client to init the event callbacks.
listenerLatch.await(TimeSpan.FromSeconds(2));
if (Resumed != null)
{
Resumed(transport);
}
else
{
if (Tracer.IsDebugEnabled)
{
Tracer.Debug("transport resumed by transport listener not set");
}
}
if (firstConnection)
{
firstConnection = false;
Tracer.Info("Successfully connected to " + uri);
}
else
{
Tracer.Info("Successfully reconnected to " + uri);
}
connected = true;
return false;
}
catch (Exception e)
{
failure = e;
if (Tracer.IsDebugEnabled)
{
Tracer.Debug("Connect fail to: " + uri + ", reason: " + e.Message);
}
if (transport != null)
{
try
{
transport.Stop();
transport = null;
}
catch (Exception ee)
{
if (Tracer.IsDebugEnabled)
{
Tracer.Debug("Stop of failed transport: " + transport +
" failed with reason: " + ee.Message);
}
}
}
}
}
}
}
int reconnectLimit = CalculateReconnectAttemptLimit();
connectFailures++;
if (reconnectLimit != INFINITE && connectFailures >= reconnectLimit)
{
Tracer.ErrorFormat("Failed to connect to {0} after: {1} attempt(s)",
PrintableUriList(uris), connectFailures);
connectionFailure = failure;
// Make sure on initial startup, that the transportListener has been
// initialized for this instance.
listenerLatch.await(TimeSpan.FromSeconds(2));
PropagateFailureToExceptionListener(connectionFailure);
return false;
}
}
if(!disposed)
{
DoDelay();
}
return !disposed;
}
private bool BuildBackups()
{
lock(backupMutex)
{
if (!disposed && (backup || priorityBackup) && backups.Count < backupPoolSize)
{
List<Uri> backupList = new List<Uri>(priorityList);
List<Uri> connectList = ConnectList;
foreach(Uri uri in connectList)
{
if (!backupList.Contains(uri))
{
backupList.Add(uri);
}
}
foreach(BackupTransport bt in backups)
{
if(bt.Disposed)
{
backups.Remove(bt);
}
}
foreach(Uri uri in connectList)
{
if (disposed)
{
break;
}
if(ConnectedTransportURI != null && !ConnectedTransportURI.Equals(uri))
{
try
{
BackupTransport bt = new BackupTransport(this)
{
Uri = uri
};
if(!backups.Contains(bt))
{
ITransport t = TransportFactory.CompositeConnect(uri);
t.Command = bt.OnCommand;
t.Exception = bt.OnException;
t.Start();
bt.Transport = t;
if (priorityBackup && IsPriority(uri))
{
priorityBackupAvailable = true;
backups.Insert(0, bt);
}
else
{
backups.Add(bt);
}
}
}
catch(Exception e)
{
Tracer.DebugFormat("Failed to build backup: {0}", e.Message);
}
}
if(backups.Count == BackupPoolSize)
{
break;
}
}
}
}
return false;
}
public void ConnectionInterruptProcessingComplete(ConnectionId connectionId)
{
lock(reconnectMutex)
{
Tracer.Debug("Connection Interrupt Processing is complete for ConnectionId: " + connectionId);
stateTracker.ConnectionInterruptProcessingComplete(this, connectionId);
}
}
public void UpdateURIs(bool rebalance, Uri[] updatedURIs)
{
if(IsUpdateURIsSupported)
{
Dictionary<Uri, bool> copy = new Dictionary<Uri, bool>();
foreach(Uri uri in updated)
{
if(uri != null)
{
copy[uri] = true;
}
}
updated.Clear();
if(updatedURIs != null && updatedURIs.Length > 0)
{
Dictionary<Uri, bool> uriSet = new Dictionary<Uri, bool>();
for(int i = 0; i < updatedURIs.Length; i++)
{
Uri uri = updatedURIs[i];
if(uri != null)
{
uriSet[uri] = true;
}
}
foreach(Uri uri in uriSet.Keys)
{
if(!updated.Contains(uri))
{
updated.Add(uri);
}
}
if (Tracer.IsDebugEnabled)
{
Tracer.DebugFormat("Updated URIs list {0}", PrintableUriList(updated));
}
if (!(copy.Count == 0 && updated.Count == 0) && !copy.Keys.Equals(updated))
{
BuildBackups();
lock(reconnectMutex)
{
Reconnect(rebalance);
}
}
}
}
}
public void HandleConnectionControl(ConnectionControl control)
{
string reconnectStr = control.ReconnectTo;
if(reconnectStr != null)
{
reconnectStr = reconnectStr.Trim();
if(reconnectStr.Length > 0)
{
try
{
Uri uri = new Uri(reconnectStr);
if(IsReconnectSupported)
{
Tracer.Info("Reconnecting to: " + uri.OriginalString);
Reconnect(uri);
}
}
catch(Exception e)
{
Tracer.ErrorFormat("Failed to handle ConnectionControl reconnect to {0}: {1}", reconnectStr, e);
}
}
}
ProcessNewTransports(control.RebalanceConnection, control.ConnectedBrokers);
}
private void ProcessNewTransports(bool rebalance, String newTransports)
{
if(newTransports != null)
{
newTransports = newTransports.Trim();
if(newTransports.Length > 0 && IsUpdateURIsSupported)
{
List<Uri> list = new List<Uri>();
ProcessDelimitedUriList(newTransports, list);
if(list.Count != 0)
{
try
{
UpdateURIs(rebalance, list.ToArray());
}
catch
{
Tracer.Error("Failed to update transport URI's from: " + newTransports);
}
}
}
}
}
private void ProcessDelimitedUriList(String priorityUris, List<Uri> target)
{
String[] tokens = priorityUris.Split(new Char[] { ',' });
foreach(String str in tokens)
{
try
{
Uri uri = new Uri(str);
target.Add(uri);
if (Tracer.IsDebugEnabled)
{
Tracer.DebugFormat("Adding new Uri[{0}] to list,", uri);
}
}
catch (Exception e)
{
Tracer.ErrorFormat("Failed to parse broker address: {0} because of: {1}",
str, e.Message);
}
}
}
public void Dispose()
{
Dispose(true);
GC.SuppressFinalize(this);
}
public void Dispose(bool disposing)
{
this.Stop();
disposed = true;
}
public int CompareTo(Object o)
{
if(o is FailoverTransport)
{
FailoverTransport oo = o as FailoverTransport;
return this.id - oo.id;
}
else
{
throw new ArgumentException();
}
}
public override String ToString()
{
return ConnectedTransportURI == null ? "unconnected" : ConnectedTransportURI.ToString();
}
internal bool IsPriority(Uri uri)
{
if (priorityBackup)
{
if (priorityList.Count > 0)
{
return priorityList.Contains(uri);
}
if (this.uris.Count > 0)
{
return uris[0].Equals(uri);
}
}
return false;
}
public void DisposeTransport(ITransport transport)
{
transport.Command = DisposedOnCommand;
transport.Exception = DisposedOnException;
try
{
transport.Stop();
}
catch (Exception e)
{
Tracer.DebugFormat("Could not stop transport: {0]. Reason: {1}", transport, e.Message);
}
}
private void ResetReconnectDelay()
{
if (!useExponentialBackOff || reconnectDelay == DEFAULT_INITIAL_RECONNECT_DELAY)
{
reconnectDelay = initialReconnectDelay;
}
}
private void DoDelay()
{
if (reconnectDelay > 0)
{
lock(sleepMutex)
{
if (Tracer.IsDebugEnabled)
{
Tracer.DebugFormat("Waiting {0} ms before attempting connection", reconnectDelay);
}
try
{
Monitor.Wait(sleepMutex, reconnectDelay);
}
catch (ThreadInterruptedException)
{
}
}
}
if (useExponentialBackOff)
{
// Exponential increment of reconnect delay.
reconnectDelay *= backOffMultiplier;
if (reconnectDelay > maxReconnectDelay)
{
reconnectDelay = maxReconnectDelay;
}
}
}
private void PropagateFailureToExceptionListener(Exception exception)
{
if (Exception != null)
{
Exception(this, exception);
}
else
{
Exception(this, new IOException());
}
Monitor.PulseAll(reconnectMutex);
}
private int CalculateReconnectAttemptLimit()
{
int maxReconnectValue = this.maxReconnectAttempts;
if (firstConnection && this.startupMaxReconnectAttempts != INFINITE)
{
maxReconnectValue = this.startupMaxReconnectAttempts;
}
return maxReconnectValue;
}
public void Shuffle<T>(List<T> list)
{
Random random = new Random(DateTime.Now.Millisecond);
int index = list.Count;
while (index > 1)
{
index--;
int k = random.Next(index + 1);
T value = list[k];
list[k] = list[index];
list[index] = value;
}
}
private String PrintableUriList(List<Uri> uriList)
{
if (uriList.Count == 0)
{
return "";
}
StringBuilder builder = new StringBuilder();
for (int i = 0; i < uriList.Count; ++i)
{
builder.Append(uriList[i]);
if (i < (uriList.Count - 1))
{
builder.Append(",");
}
}
return builder.ToString();
}
private bool CompareUris(Uri first, Uri second)
{
bool result = false;
if (first.Port == second.Port)
{
IPHostEntry firstAddr = null;
IPHostEntry secondAddr = null;
try
{
firstAddr = Dns.GetHostEntry(first.Host);
secondAddr = Dns.GetHostEntry(second.Host);
if (firstAddr.Equals(secondAddr))
{
result = true;
}
}
catch(Exception e)
{
if (firstAddr == null)
{
Tracer.WarnFormat("Failed to Lookup IPHostEntry for URI[{0}] : {1}", first, e);
}
else
{
Tracer.WarnFormat("Failed to Lookup IPHostEntry for URI[{0}] : {1}", second, e);
}
if(String.Equals(first.Host, second.Host, StringComparison.CurrentCultureIgnoreCase))
{
result = true;
}
}
}
return result;
}
private bool Contains(Uri newURI)
{
bool result = false;
foreach (Uri uri in uris)
{
if (CompareUris(newURI, uri))
{
result = true;
break;
}
}
return result;
}
}
}