| /* |
| * Licensed to the Apache Software Foundation (ASF) under one or more |
| * contributor license agreements. See the NOTICE file distributed with |
| * this work for additional information regarding copyright ownership. |
| * The ASF licenses this file to You under the Apache License, Version 2.0 |
| * (the "License"); you may not use this file except in compliance with |
| * the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| |
| using System; |
| using System.Collections; |
| using System.Collections.Generic; |
| using System.Threading; |
| using System.Text; |
| using System.Net; |
| using Apache.NMS.ActiveMQ.Commands; |
| using Apache.NMS.ActiveMQ.State; |
| using Apache.NMS.ActiveMQ.Threads; |
| using Apache.NMS.Util; |
| |
| namespace Apache.NMS.ActiveMQ.Transport.Failover |
| { |
| /// <summary> |
| /// A Transport that is made reliable by being able to fail over to another |
| /// transport when a transport failure is detected. |
| /// </summary> |
| public class FailoverTransport : ICompositeTransport, IComparable |
| { |
| private static int DEFAULT_INITIAL_RECONNECT_DELAY = 10; |
| private static int INFINITE = -1; |
| |
| private static int idCounter = 0; |
| private readonly int id; |
| |
| private bool disposed; |
| private bool connected; |
| private readonly List<Uri> uris = new List<Uri>(); |
| private readonly List<Uri> updated = new List<Uri>(); |
| |
| private CommandHandler commandHandler; |
| private ExceptionHandler exceptionHandler; |
| private InterruptedHandler interruptedHandler; |
| private ResumedHandler resumedHandler; |
| |
| private readonly CountDownLatch listenerLatch = new CountDownLatch(4); |
| private readonly Mutex reconnectMutex = new Mutex(); |
| private readonly Mutex backupMutex = new Mutex(); |
| private readonly Mutex sleepMutex = new Mutex(); |
| private readonly ConnectionStateTracker stateTracker = new ConnectionStateTracker(); |
| private readonly Dictionary<int, Command> requestMap = new Dictionary<int, Command>(); |
| |
| private Uri connectedTransportURI; |
| private Uri failedConnectTransportURI; |
| private readonly AtomicReference<ITransport> connectedTransport = new AtomicReference<ITransport>(null); |
| private TaskRunner reconnectTask = null; |
| private bool started; |
| private bool initialized; |
| private int initialReconnectDelay = DEFAULT_INITIAL_RECONNECT_DELAY; |
| private int maxReconnectDelay = 1000 * 30; |
| private int backOffMultiplier = 2; |
| private int timeout = INFINITE; |
| private bool useExponentialBackOff = true; |
| private bool randomize = true; |
| private int maxReconnectAttempts = INFINITE; |
| private int startupMaxReconnectAttempts = INFINITE; |
| private int connectFailures; |
| private int reconnectDelay = DEFAULT_INITIAL_RECONNECT_DELAY; |
| private Exception connectionFailure; |
| private bool firstConnection = true; |
| private bool backup = false; |
| private readonly List<BackupTransport> backups = new List<BackupTransport>(); |
| private int backupPoolSize = 1; |
| private bool trackMessages = false; |
| private bool trackTransactionProducers = true; |
| private int maxCacheSize = 256; |
| private volatile Exception failure; |
| private readonly object mutex = new object(); |
| private bool reconnectSupported = true; |
| private bool updateURIsSupported = true; |
| private bool doRebalance = false; |
| private bool connectedToPriority = false; |
| private bool priorityBackup = false; |
| private List<Uri> priorityList = new List<Uri>(); |
| private bool priorityBackupAvailable = false; |
| |
| // Not Sure how to work these back in with all the changes. |
| //private int asyncTimeout = 45000; |
| //private bool asyncConnect = false; |
| |
| public FailoverTransport() |
| { |
| id = idCounter++; |
| |
| stateTracker.TrackTransactions = true; |
| reconnectTask = DefaultThreadPools.DefaultTaskRunnerFactory.CreateTaskRunner( |
| new FailoverTask(this), "ActiveMQ Failover Worker: " + this.GetHashCode().ToString()); |
| } |
| |
| ~FailoverTransport() |
| { |
| Dispose(false); |
| } |
| |
| #region FailoverTask |
| |
| private class FailoverTask : Task |
| { |
| private readonly FailoverTransport parent; |
| |
| public FailoverTask(FailoverTransport p) |
| { |
| parent = p; |
| } |
| |
| public bool Iterate() |
| { |
| bool result = false; |
| if (!parent.IsStarted) |
| { |
| return false; |
| } |
| |
| bool buildBackup = true; |
| lock (parent.backupMutex) |
| { |
| if ((parent.connectedTransport.Value == null || parent.doRebalance || parent.priorityBackupAvailable) && !parent.disposed) |
| { |
| result = parent.DoConnect(); |
| buildBackup = false; |
| } |
| } |
| if (buildBackup) |
| { |
| parent.BuildBackups(); |
| if (parent.priorityBackup && !parent.connectedToPriority) |
| { |
| try |
| { |
| parent.DoDelay(); |
| if (parent.reconnectTask == null) |
| { |
| return true; |
| } |
| parent.reconnectTask.Wakeup(); |
| } |
| catch (ThreadInterruptedException) |
| { |
| Tracer.Debug("Reconnect task has been interrupted."); |
| } |
| } |
| } |
| else |
| { |
| try |
| { |
| if (parent.reconnectTask == null) |
| { |
| return true; |
| } |
| parent.reconnectTask.Wakeup(); |
| } |
| catch (ThreadInterruptedException) |
| { |
| Tracer.Debug("Reconnect task has been interrupted."); |
| } |
| } |
| return result; |
| } |
| } |
| |
| #endregion |
| |
| #region Property Accessors |
| |
| public CommandHandler Command |
| { |
| get { return commandHandler; } |
| set |
| { |
| commandHandler = value; |
| listenerLatch.countDown(); |
| } |
| } |
| |
| public ExceptionHandler Exception |
| { |
| get { return exceptionHandler; } |
| set |
| { |
| exceptionHandler = value; |
| listenerLatch.countDown(); |
| } |
| } |
| |
| public InterruptedHandler Interrupted |
| { |
| get { return interruptedHandler; } |
| set |
| { |
| this.interruptedHandler = value; |
| this.listenerLatch.countDown(); |
| } |
| } |
| |
| public ResumedHandler Resumed |
| { |
| get { return resumedHandler; } |
| set |
| { |
| this.resumedHandler = value; |
| this.listenerLatch.countDown(); |
| } |
| } |
| |
| internal Exception Failure |
| { |
| get { return failure; } |
| set |
| { |
| lock(mutex) |
| { |
| failure = value; |
| } |
| } |
| } |
| |
| public int Timeout |
| { |
| get { return this.timeout; } |
| set { this.timeout = value; } |
| } |
| |
| public int InitialReconnectDelay |
| { |
| get { return initialReconnectDelay; } |
| set { initialReconnectDelay = value; } |
| } |
| |
| public int MaxReconnectDelay |
| { |
| get { return maxReconnectDelay; } |
| set { maxReconnectDelay = value; } |
| } |
| |
| public int ReconnectDelay |
| { |
| get { return reconnectDelay; } |
| set { reconnectDelay = value; } |
| } |
| |
| public int ReconnectDelayExponent |
| { |
| get { return backOffMultiplier; } |
| set { backOffMultiplier = value; } |
| } |
| |
| public ITransport ConnectedTransport |
| { |
| get { return connectedTransport.Value; } |
| set { connectedTransport.Value = value; } |
| } |
| |
| public Uri ConnectedTransportURI |
| { |
| get { return connectedTransportURI; } |
| set { connectedTransportURI = value; } |
| } |
| |
| public int MaxReconnectAttempts |
| { |
| get { return maxReconnectAttempts; } |
| set { maxReconnectAttempts = value; } |
| } |
| |
| public int StartupMaxReconnectAttempts |
| { |
| get { return startupMaxReconnectAttempts; } |
| set { startupMaxReconnectAttempts = value; } |
| } |
| |
| public bool Randomize |
| { |
| get { return randomize; } |
| set { randomize = value; } |
| } |
| |
| public bool Backup |
| { |
| get { return backup; } |
| set { backup = value; } |
| } |
| |
| public bool PriorityBackup |
| { |
| get { return priorityBackup; } |
| set { this.priorityBackup = value; } |
| } |
| |
| public String PriorityURIs |
| { |
| get { return PrintableUriList(priorityList); } |
| set { this.ProcessDelimitedUriList(value, priorityList); } |
| } |
| |
| public int BackupPoolSize |
| { |
| get { return backupPoolSize; } |
| set { backupPoolSize = value; } |
| } |
| |
| public bool TrackMessages |
| { |
| get { return trackMessages; } |
| set { trackMessages = value; } |
| } |
| |
| public bool TrackTransactionProducers |
| { |
| get { return trackTransactionProducers; } |
| set { this.trackTransactionProducers = value; } |
| } |
| |
| public int MaxCacheSize |
| { |
| get { return maxCacheSize; } |
| set { maxCacheSize = value; } |
| } |
| |
| public bool UseExponentialBackOff |
| { |
| get { return useExponentialBackOff; } |
| set { useExponentialBackOff = value; } |
| } |
| |
| public IWireFormat WireFormat |
| { |
| get |
| { |
| ITransport transport = ConnectedTransport; |
| if(transport != null) |
| { |
| return transport.WireFormat; |
| } |
| |
| return null; |
| } |
| } |
| |
| /// <summary> |
| /// Gets or sets a value indicating whether to asynchronously connect to sockets |
| /// </summary> |
| /// <value><c>true</c> if [async connect]; otherwise, <c>false</c>.</value> |
| public bool AsyncConnect |
| { |
| set { } |
| } |
| |
| /// <summary> |
| /// If doing an asynchronous connect, the milliseconds before timing out if no connection can be made |
| /// </summary> |
| /// <value>The async timeout.</value> |
| public int AsyncTimeout |
| { |
| get { return 0; } |
| set { } |
| } |
| |
| public ConnectionStateTracker StateTracker |
| { |
| get { return this.stateTracker; } |
| } |
| |
| #endregion |
| |
| public bool IsFaultTolerant |
| { |
| get { return true; } |
| } |
| |
| public bool IsDisposed |
| { |
| get { return disposed; } |
| } |
| |
| public bool IsConnected |
| { |
| get { return connected; } |
| } |
| |
| public bool IsConnectedToPriority |
| { |
| get { return connectedToPriority; } |
| } |
| |
| public bool IsStarted |
| { |
| get { return started; } |
| } |
| |
| public bool IsReconnectSupported |
| { |
| get { return this.reconnectSupported; } |
| } |
| |
| public bool IsUpdateURIsSupported |
| { |
| get { return this.updateURIsSupported; } |
| } |
| |
| public void OnException(ITransport sender, Exception error) |
| { |
| try |
| { |
| HandleTransportFailure(error); |
| } |
| catch(Exception) |
| { |
| this.Exception(this, new IOException("Unexpected Transport Failure.")); |
| } |
| } |
| |
| public void DisposedOnCommand(ITransport sender, Command c) |
| { |
| } |
| |
| public void DisposedOnException(ITransport sender, Exception e) |
| { |
| } |
| |
| public void HandleTransportFailure(Exception e) |
| { |
| ITransport transport = connectedTransport.GetAndSet(null); |
| if (transport == null) |
| { |
| // sync with possible in progress reconnect |
| lock(reconnectMutex) |
| { |
| transport = connectedTransport.GetAndSet(null); |
| } |
| } |
| |
| if(transport != null) |
| { |
| DisposeTransport(transport); |
| |
| bool reconnectOk = false; |
| lock(reconnectMutex) |
| { |
| if (CanReconnect()) |
| { |
| Tracer.WarnFormat("Transport failed to {0}, attempting to automatically reconnect due to: {1}", |
| ConnectedTransportURI, e.Message); |
| reconnectOk = true; |
| } |
| |
| initialized = false; |
| failedConnectTransportURI = ConnectedTransportURI; |
| ConnectedTransportURI = null; |
| connectedToPriority = false; |
| connected = false; |
| |
| if(this.Interrupted != null) |
| { |
| this.Interrupted(transport); |
| } |
| |
| if (reconnectOk) |
| { |
| updated.Remove(failedConnectTransportURI); |
| reconnectTask.Wakeup(); |
| } |
| else if (!disposed) |
| { |
| PropagateFailureToExceptionListener(e); |
| } |
| } |
| } |
| } |
| |
| private bool CanReconnect() |
| { |
| return started && 0 != CalculateReconnectAttemptLimit(); |
| } |
| |
| public void Start() |
| { |
| lock(reconnectMutex) |
| { |
| if(started) |
| { |
| Tracer.Debug("FailoverTransport Already Started."); |
| return; |
| } |
| |
| Tracer.Debug("FailoverTransport Started."); |
| started = true; |
| stateTracker.MaxCacheSize = MaxCacheSize; |
| stateTracker.TrackMessages = TrackMessages; |
| stateTracker.TrackTransactionProducers = TrackTransactionProducers; |
| if(ConnectedTransport != null) |
| { |
| Tracer.Debug("FailoverTransport already connected, start is restoring."); |
| stateTracker.DoRestore(ConnectedTransport); |
| } |
| else |
| { |
| Tracer.Debug("FailoverTransport not connected, start is reconnecting."); |
| Reconnect(false); |
| } |
| } |
| } |
| |
| public virtual void Stop() |
| { |
| ITransport transportToStop = null; |
| List<ITransport> backupsToStop = new List<ITransport>(backups.Count); |
| |
| try |
| { |
| lock(reconnectMutex) |
| { |
| if(!started) |
| { |
| Tracer.Debug("FailoverTransport Already Stopped."); |
| return; |
| } |
| |
| Tracer.Debug("FailoverTransport Stopped."); |
| started = false; |
| disposed = true; |
| connected = false; |
| if(ConnectedTransport != null) |
| { |
| transportToStop = connectedTransport.GetAndSet(null); |
| } |
| } |
| lock(sleepMutex) |
| { |
| Monitor.PulseAll(sleepMutex); |
| } |
| } |
| finally |
| { |
| if(reconnectTask != null) |
| { |
| reconnectTask.Shutdown(); |
| } |
| } |
| |
| lock(backupMutex) |
| { |
| foreach (BackupTransport backup in backups) |
| { |
| backup.Disposed = true; |
| ITransport transport = backup.Transport; |
| if (transport != null) |
| { |
| transport.Command = DisposedOnCommand; |
| transport.Exception = DisposedOnException; |
| backupsToStop.Add(transport); |
| } |
| } |
| backups.Clear(); |
| } |
| |
| foreach (ITransport transport in backupsToStop) |
| { |
| try |
| { |
| if (Tracer.IsDebugEnabled) |
| { |
| Tracer.Debug("Stopped backup: " + transport); |
| } |
| DisposeTransport(transport); |
| } |
| catch (Exception) |
| { |
| } |
| } |
| |
| if(transportToStop != null) |
| { |
| transportToStop.Stop(); |
| } |
| } |
| |
| public FutureResponse AsyncRequest(Command command) |
| { |
| throw new ApplicationException("FailoverTransport does not implement AsyncRequest(Command)"); |
| } |
| |
| public Response Request(Command command) |
| { |
| throw new ApplicationException("FailoverTransport does not implement Request(Command)"); |
| } |
| |
| public Response Request(Command command, TimeSpan ts) |
| { |
| throw new ApplicationException("FailoverTransport does not implement Request(Command, TimeSpan)"); |
| } |
| |
| public void OnCommand(ITransport sender, Command command) |
| { |
| if(command != null) |
| { |
| if(command.IsResponse) |
| { |
| Command request = null; |
| lock(((ICollection) requestMap).SyncRoot) |
| { |
| int v = ((Response) command).CorrelationId; |
| try |
| { |
| if(requestMap.TryGetValue(v, out request)) |
| { |
| requestMap.Remove(v); |
| } |
| } |
| catch |
| { |
| } |
| } |
| |
| Tracked tracked = request as Tracked; |
| if(tracked != null) |
| { |
| tracked.OnResponse(); |
| } |
| } |
| |
| if(!initialized) |
| { |
| initialized = true; |
| } |
| |
| if(command.IsConnectionControl) |
| { |
| this.HandleConnectionControl(command as ConnectionControl); |
| } |
| } |
| |
| this.Command(sender, command); |
| } |
| |
| public void Oneway(Command command) |
| { |
| Exception error = null; |
| |
| lock(reconnectMutex) |
| { |
| if(command != null && ConnectedTransport == null) |
| { |
| if(command.IsShutdownInfo) |
| { |
| // Skipping send of ShutdownInfo command when not connected. |
| return; |
| } |
| else if(command.IsRemoveInfo || command.IsMessageAck) |
| {
|
| stateTracker.Track(command); |
| // Simulate response to RemoveInfo command or a MessageAck |
| // since it would be stale at this point. |
| if(command.ResponseRequired) |
| { |
| OnCommand(this, new Response() { CorrelationId = command.CommandId }); |
| } |
| return; |
| } |
| else if(command.IsMessagePull) |
| { |
| // Simulate response to MessagePull if timed as we can't honor that now. |
| MessagePull pullRequest = command as MessagePull; |
| if (pullRequest.Timeout != 0) |
| { |
| MessageDispatch dispatch = new MessageDispatch(); |
| dispatch.ConsumerId = pullRequest.ConsumerId; |
| dispatch.Destination = pullRequest.Destination; |
| OnCommand(this, dispatch); |
| } |
| return; |
| } |
| } |
| |
| // Keep trying until the message is sent. |
| for(int i = 0; !disposed; i++) |
| { |
| try |
| { |
| // Any Ack that was being sent when the connection dropped is now |
| // stale so we don't send it here as it would cause an unmatched ack |
| // on the broker side and probably prevent a consumer from getting |
| // any new messages. |
| if(command.IsMessageAck && i > 0) |
| { |
| Tracer.Debug("Inflight MessageAck being dropped as stale."); |
| if(command.ResponseRequired) |
| { |
| OnCommand(this, new Response() { CorrelationId = command.CommandId }); |
| } |
| return; |
| } |
| |
| // Wait for transport to be connected. |
| ITransport transport = ConnectedTransport; |
| DateTime start = DateTime.Now; |
| bool timedout = false;
|
| while(transport == null && !disposed && connectionFailure == null) |
| { |
| Tracer.Info("Waiting for transport to reconnect."); |
| |
| int elapsed = (int) (DateTime.Now - start).TotalMilliseconds; |
| if(this.timeout > 0 && elapsed > this.timeout) |
| { |
| timedout = true; |
| Tracer.DebugFormat("FailoverTransport.oneway - timed out after {0} mills", elapsed); |
| break; |
| }
|
|
|
| // Release so that the reconnect task can run |
| try |
| { |
| // Wait for something. The mutex will be pulsed if we connect. |
| Monitor.Wait(reconnectMutex, 100); |
| } |
| catch(ThreadInterruptedException e) |
| { |
| Tracer.DebugFormat("Interrupted: {0}", e.Message); |
| } |
| |
| transport = ConnectedTransport; |
| } |
| |
| if(transport == null) |
| { |
| // Previous loop may have exited due to use being disposed. |
| if(disposed) |
| { |
| error = new IOException("Transport disposed."); |
| } |
| else if(connectionFailure != null) |
| { |
| error = connectionFailure; |
| } |
| else if(timedout) |
| { |
| error = new IOException("Failover oneway timed out after " + timeout + " milliseconds."); |
| } |
| else |
| { |
| error = new IOException("Unexpected failure."); |
| } |
| break; |
| } |
| |
| // If it was a request and it was not being tracked by |
| // the state tracker, then hold it in the requestMap so |
| // that we can replay it later. |
| Tracked tracked = stateTracker.Track(command); |
| lock(((ICollection) requestMap).SyncRoot) |
| { |
| if(tracked != null && tracked.WaitingForResponse) |
| { |
| requestMap.Add(command.CommandId, tracked); |
| } |
| else if(tracked == null && command.ResponseRequired) |
| { |
| requestMap.Add(command.CommandId, command); |
| } |
| } |
| |
| // Send the message. |
| try |
| { |
| transport.Oneway(command); |
| stateTracker.TrackBack(command); |
| } |
| catch(Exception e) |
| { |
| // If the command was not tracked.. we will retry in this method |
| // otherwise we need to trigger a reconnect before returning as |
| // the transport is failed. |
| if (tracked == null) |
| { |
| // since we will retry in this method.. take it |
| // out of the request map so that it is not |
| // sent 2 times on recovery |
| if(command.ResponseRequired) |
| { |
| lock(((ICollection) requestMap).SyncRoot) |
| { |
| requestMap.Remove(command.CommandId); |
| } |
| } |
| |
| // Rethrow the exception so it will handled by |
| // the outer catch |
| throw; |
| } |
| else |
| { |
| if (Tracer.IsDebugEnabled) |
| { |
| Tracer.DebugFormat("Send Oneway attempt: {0} failed: Message = {1}", |
| i, e.Message); |
| Tracer.DebugFormat("Failed Message Was: {0}", command); |
| } |
| HandleTransportFailure(e); |
| } |
| } |
| |
| return; |
| } |
| catch(Exception e) |
| { |
| if (Tracer.IsDebugEnabled) |
| { |
| Tracer.DebugFormat("Send Oneway attempt: {0} failed: Message = {1}", |
| i, e.Message); |
| Tracer.DebugFormat("Failed Message Was: {0}", command); |
| } |
| HandleTransportFailure(e); |
| } |
| } |
| } |
| |
| if(!disposed) |
| { |
| if(error != null) |
| { |
| throw error; |
| } |
| } |
| } |
| |
| public void Add(bool rebalance, Uri[] urisToAdd) |
| { |
| bool newUri = false; |
| lock(uris) |
| { |
| foreach (Uri uri in urisToAdd) |
| { |
| if(!Contains(uri)) |
| { |
| uris.Add(uri); |
| newUri = true; |
| } |
| } |
| } |
| |
| if (newUri) |
| { |
| Reconnect(rebalance); |
| } |
| } |
| |
| public void Add(bool rebalance, String u) |
| { |
| try |
| { |
| Add(rebalance, new Uri[] { new Uri(u) }); |
| } |
| catch(Exception e) |
| { |
| Tracer.ErrorFormat("Failed to parse URI '{0}': {1}", u, e.Message); |
| } |
| } |
| |
| public void Remove(bool rebalance, Uri[] u) |
| { |
| lock(uris) |
| { |
| for(int i = 0; i < u.Length; i++) |
| { |
| uris.Remove(u[i]); |
| } |
| } |
| |
| Reconnect(rebalance); |
| } |
| |
| public void Remove(bool rebalance, String u) |
| { |
| try |
| { |
| Remove(rebalance, new Uri[] { new Uri(u) }); |
| } |
| catch(Exception e) |
| { |
| Tracer.ErrorFormat("Failed to parse URI '{0}': {1}", u, e.Message); |
| } |
| } |
| |
| public void Reconnect(Uri uri) |
| { |
| Add(true, new Uri[] { uri }); |
| } |
| |
| public void Reconnect(bool rebalance) |
| { |
| lock(reconnectMutex) |
| { |
| if(started) |
| { |
| if (rebalance) |
| { |
| doRebalance = true; |
| } |
| Tracer.Debug("Waking up reconnect task"); |
| try |
| { |
| reconnectTask.Wakeup(); |
| } |
| catch (ThreadInterruptedException) |
| { |
| } |
| } |
| else |
| { |
| Tracer.Debug("Reconnect was triggered but transport is not started yet. Wait for start to connect the transport."); |
| } |
| } |
| } |
| |
| private List<Uri> ConnectList |
| { |
| get |
| { |
| if (updated.Count != 0) |
| { |
| return updated; |
| } |
| |
| List<Uri> l = new List<Uri>(uris); |
| bool removed = false; |
| if(failedConnectTransportURI != null) |
| { |
| removed = l.Remove(failedConnectTransportURI); |
| } |
| |
| if(Randomize) |
| { |
| Shuffle(l); |
| } |
| |
| if(removed) |
| { |
| l.Add(failedConnectTransportURI); |
| } |
| |
| if (Tracer.IsDebugEnabled) |
| { |
| Tracer.DebugFormat("Uri connection list: {0} from: {1}", |
| PrintableUriList(l), PrintableUriList(uris)); |
| } |
| |
| return l; |
| } |
| } |
| |
| protected void RestoreTransport(ITransport t) |
| { |
| Tracer.Info("Restoring previous transport connection."); |
| t.Start(); |
| |
| // Send information to the broker - informing it we are a fault tolerant client |
| t.Oneway(new ConnectionControl() { FaultTolerant = true }); |
| stateTracker.DoRestore(t); |
| |
| Tracer.Info("Sending queued commands..."); |
| Dictionary<int, Command> tmpMap = null; |
| lock(((ICollection) requestMap).SyncRoot) |
| { |
| tmpMap = new Dictionary<int, Command>(requestMap); |
| } |
| |
| foreach(Command command in tmpMap.Values) |
| { |
| if(command.IsMessageAck) |
| { |
| Tracer.Debug("Stored MessageAck being dropped as stale."); |
| OnCommand(this, new Response() { CorrelationId = command.CommandId }); |
| continue; |
| } |
| |
| t.Oneway(command); |
| } |
| } |
| |
| public Uri RemoteAddress |
| { |
| get |
| { |
| if(ConnectedTransport != null) |
| { |
| return ConnectedTransport.RemoteAddress; |
| } |
| return null; |
| } |
| } |
| |
| public Object Narrow(Type type) |
| { |
| if(this.GetType().Equals(type)) |
| { |
| return this; |
| } |
| else if(ConnectedTransport != null) |
| { |
| return ConnectedTransport.Narrow(type); |
| } |
| |
| return null; |
| } |
| |
| private bool DoConnect() |
| { |
| lock(reconnectMutex) |
| { |
| if (disposed || connectionFailure != null) |
| { |
| Monitor.PulseAll(reconnectMutex); |
| } |
| |
| if ((connectedTransport.Value != null && !doRebalance && !priorityBackupAvailable) || disposed || connectionFailure != null) |
| { |
| return false; |
| } |
| else |
| { |
| List<Uri> connectList = ConnectList; |
| if(connectList.Count == 0) |
| { |
| Failure = new NMSConnectionException("No URIs available for connection."); |
| } |
| else |
| { |
| if (doRebalance) |
| { |
| if (connectedToPriority || CompareUris(connectList[0], connectedTransportURI)) |
| { |
| // already connected to first in the list, no need to rebalance |
| doRebalance = false; |
| return false; |
| } |
| else |
| { |
| if (Tracer.IsDebugEnabled) |
| { |
| Tracer.DebugFormat("Doing rebalance from: {0} to {1}", |
| connectedTransportURI, PrintableUriList(connectList)); |
| } |
| try |
| { |
| ITransport current = this.connectedTransport.GetAndSet(null); |
| if (current != null) |
| { |
| DisposeTransport(current); |
| } |
| } |
| catch (Exception e) |
| { |
| if (Tracer.IsDebugEnabled) |
| { |
| Tracer.DebugFormat("Caught an exception stopping existing " + |
| "transport for rebalance {0}", e.Message); |
| } |
| } |
| } |
| |
| doRebalance = false; |
| } |
| |
| ResetReconnectDelay(); |
| |
| ITransport transport = null; |
| Uri uri = null; |
| |
| // If we have a backup already waiting lets try it. |
| lock(backupMutex) |
| { |
| if ((priorityBackup || backup) && backups.Count > 0) |
| { |
| List<BackupTransport> l = new List<BackupTransport>(backups); |
| if (randomize) |
| { |
| Shuffle(l); |
| } |
| BackupTransport bt = l[0]; |
| l.RemoveAt(0); |
| backups.Remove(bt); |
| transport = bt.Transport; |
| uri = bt.Uri; |
| if (priorityBackup && priorityBackupAvailable) |
| { |
| ITransport old = this.connectedTransport.GetAndSet(null); |
| if (old != null) |
| { |
| DisposeTransport(old); |
| } |
| priorityBackupAvailable = false; |
| } |
| } |
| } |
| |
| // Sleep for the reconnectDelay if there's no backup and we aren't trying |
| // for the first time, or we were disposed for some reason. |
| if (transport == null && !firstConnection && (reconnectDelay > 0) && !disposed) |
| { |
| lock(sleepMutex) |
| { |
| if (Tracer.IsDebugEnabled) |
| { |
| Tracer.DebugFormat("Waiting {0} ms before attempting connection.", reconnectDelay); |
| } |
| try |
| { |
| Monitor.Wait(sleepMutex, reconnectDelay); |
| } |
| catch (ThreadInterruptedException) |
| { |
| } |
| } |
| } |
| |
| IEnumerator<Uri> iter = connectList.GetEnumerator(); |
| while ((transport != null || iter.MoveNext()) && (connectedTransport.Value == null && !disposed)) |
| { |
| try |
| { |
| if (Tracer.IsDebugEnabled) |
| { |
| Tracer.DebugFormat("Attempting {0}th connect to: {1}", |
| connectFailures, uri); |
| } |
| |
| // We could be starting with a backup and if so we wait to grab a |
| // URI from the pool until next time around. |
| if (transport == null) |
| { |
| uri = iter.Current; |
| transport = TransportFactory.CompositeConnect(uri); |
| } |
| |
| transport.Command = OnCommand; |
| transport.Exception = OnException; |
| transport.Start(); |
| |
| if (started && !firstConnection) |
| { |
| RestoreTransport(transport); |
| } |
| |
| if (Tracer.IsDebugEnabled) |
| { |
| Tracer.Debug("Connection established"); |
| } |
| reconnectDelay = initialReconnectDelay; |
| connectedTransportURI = uri; |
| connectedTransport.Value = transport; |
| connectedToPriority = IsPriority(connectedTransportURI); |
| Monitor.PulseAll(reconnectMutex); |
| connectFailures = 0; |
| |
| // Try to wait long enough for client to init the event callbacks. |
| listenerLatch.await(TimeSpan.FromSeconds(2)); |
| |
| if (Resumed != null) |
| { |
| Resumed(transport); |
| } |
| else |
| { |
| if (Tracer.IsDebugEnabled) |
| { |
| Tracer.Debug("transport resumed by transport listener not set"); |
| } |
| } |
| |
| if (firstConnection) |
| { |
| firstConnection = false; |
| Tracer.Info("Successfully connected to " + uri); |
| } |
| else |
| { |
| Tracer.Info("Successfully reconnected to " + uri); |
| } |
| |
| connected = true; |
| return false; |
| } |
| catch (Exception e) |
| { |
| failure = e; |
| if (Tracer.IsDebugEnabled) |
| { |
| Tracer.Debug("Connect fail to: " + uri + ", reason: " + e.Message); |
| } |
| if (transport != null) |
| { |
| try |
| { |
| transport.Stop(); |
| transport = null; |
| } |
| catch (Exception ee) |
| { |
| if (Tracer.IsDebugEnabled) |
| { |
| Tracer.Debug("Stop of failed transport: " + transport + |
| " failed with reason: " + ee.Message); |
| } |
| } |
| } |
| } |
| } |
| } |
| } |
| |
| int reconnectLimit = CalculateReconnectAttemptLimit(); |
| |
| connectFailures++; |
| if (reconnectLimit != INFINITE && connectFailures >= reconnectLimit) |
| { |
| Tracer.ErrorFormat("Failed to connect to {0} after: {1} attempt(s)", |
| PrintableUriList(uris), connectFailures); |
| connectionFailure = failure; |
| |
| // Make sure on initial startup, that the transportListener has been |
| // initialized for this instance. |
| listenerLatch.await(TimeSpan.FromSeconds(2)); |
| PropagateFailureToExceptionListener(connectionFailure); |
| return false; |
| } |
| } |
| |
| if(!disposed) |
| { |
| DoDelay(); |
| } |
| |
| return !disposed; |
| } |
| |
| private bool BuildBackups() |
| { |
| lock(backupMutex) |
| { |
| if (!disposed && (backup || priorityBackup) && backups.Count < backupPoolSize) |
| { |
| List<Uri> backupList = new List<Uri>(priorityList); |
| List<Uri> connectList = ConnectList; |
| foreach(Uri uri in connectList) |
| { |
| if (!backupList.Contains(uri)) |
| { |
| backupList.Add(uri); |
| } |
| } |
| foreach(BackupTransport bt in backups) |
| { |
| if(bt.Disposed) |
| { |
| backups.Remove(bt); |
| } |
| } |
| |
| foreach(Uri uri in connectList) |
| { |
| if (disposed) |
| { |
| break; |
| } |
| |
| if(ConnectedTransportURI != null && !ConnectedTransportURI.Equals(uri)) |
| { |
| try |
| { |
| BackupTransport bt = new BackupTransport(this) |
| { |
| Uri = uri |
| }; |
| |
| if(!backups.Contains(bt)) |
| { |
| ITransport t = TransportFactory.CompositeConnect(uri); |
| t.Command = bt.OnCommand; |
| t.Exception = bt.OnException; |
| t.Start(); |
| bt.Transport = t; |
| if (priorityBackup && IsPriority(uri)) |
| { |
| priorityBackupAvailable = true; |
| backups.Insert(0, bt); |
| } |
| else |
| { |
| backups.Add(bt); |
| } |
| } |
| } |
| catch(Exception e) |
| { |
| Tracer.DebugFormat("Failed to build backup: {0}", e.Message); |
| } |
| } |
| |
| if(backups.Count == BackupPoolSize) |
| { |
| break; |
| } |
| } |
| } |
| } |
| |
| return false; |
| } |
| |
| public void ConnectionInterruptProcessingComplete(ConnectionId connectionId) |
| { |
| lock(reconnectMutex) |
| { |
| Tracer.Debug("Connection Interrupt Processing is complete for ConnectionId: " + connectionId); |
| stateTracker.ConnectionInterruptProcessingComplete(this, connectionId); |
| } |
| } |
| |
| public void UpdateURIs(bool rebalance, Uri[] updatedURIs) |
| { |
| if(IsUpdateURIsSupported) |
| { |
| Dictionary<Uri, bool> copy = new Dictionary<Uri, bool>(); |
| foreach(Uri uri in updated) |
| { |
| if(uri != null) |
| { |
| copy[uri] = true; |
| } |
| } |
| |
| updated.Clear(); |
| |
| if(updatedURIs != null && updatedURIs.Length > 0) |
| { |
| Dictionary<Uri, bool> uriSet = new Dictionary<Uri, bool>(); |
| for(int i = 0; i < updatedURIs.Length; i++) |
| { |
| Uri uri = updatedURIs[i]; |
| if(uri != null) |
| { |
| uriSet[uri] = true; |
| } |
| } |
| |
| foreach(Uri uri in uriSet.Keys) |
| { |
| if(!updated.Contains(uri)) |
| { |
| updated.Add(uri); |
| } |
| } |
| |
| if (Tracer.IsDebugEnabled) |
| { |
| Tracer.DebugFormat("Updated URIs list {0}", PrintableUriList(updated)); |
| } |
| |
| if (!(copy.Count == 0 && updated.Count == 0) && !copy.Keys.Equals(updated)) |
| { |
| BuildBackups(); |
| lock(reconnectMutex) |
| { |
| Reconnect(rebalance); |
| } |
| } |
| } |
| } |
| } |
| |
| public void HandleConnectionControl(ConnectionControl control) |
| { |
| string reconnectStr = control.ReconnectTo; |
| |
| if(reconnectStr != null) |
| { |
| reconnectStr = reconnectStr.Trim(); |
| if(reconnectStr.Length > 0) |
| { |
| try |
| { |
| Uri uri = new Uri(reconnectStr); |
| if(IsReconnectSupported) |
| { |
| Tracer.Info("Reconnecting to: " + uri.OriginalString); |
| Reconnect(uri); |
| } |
| } |
| catch(Exception e) |
| { |
| Tracer.ErrorFormat("Failed to handle ConnectionControl reconnect to {0}: {1}", reconnectStr, e); |
| } |
| } |
| } |
| |
| ProcessNewTransports(control.RebalanceConnection, control.ConnectedBrokers); |
| } |
| |
| private void ProcessNewTransports(bool rebalance, String newTransports) |
| { |
| if(newTransports != null) |
| { |
| newTransports = newTransports.Trim(); |
| |
| if(newTransports.Length > 0 && IsUpdateURIsSupported) |
| { |
| List<Uri> list = new List<Uri>(); |
| ProcessDelimitedUriList(newTransports, list); |
| |
| if(list.Count != 0) |
| { |
| try |
| { |
| UpdateURIs(rebalance, list.ToArray()); |
| } |
| catch |
| { |
| Tracer.Error("Failed to update transport URI's from: " + newTransports); |
| } |
| } |
| } |
| } |
| } |
| |
| private void ProcessDelimitedUriList(String priorityUris, List<Uri> target) |
| { |
| String[] tokens = priorityUris.Split(new Char[] { ',' }); |
| |
| foreach(String str in tokens) |
| { |
| try |
| { |
| Uri uri = new Uri(str); |
| target.Add(uri); |
| |
| if (Tracer.IsDebugEnabled) |
| { |
| Tracer.DebugFormat("Adding new Uri[{0}] to list,", uri); |
| } |
| } |
| catch (Exception e) |
| { |
| Tracer.ErrorFormat("Failed to parse broker address: {0} because of: {1}", |
| str, e.Message); |
| } |
| } |
| } |
| |
| public void Dispose() |
| { |
| Dispose(true); |
| GC.SuppressFinalize(this); |
| } |
| |
| public void Dispose(bool disposing) |
| { |
| this.Stop(); |
| disposed = true; |
| } |
| |
| public int CompareTo(Object o) |
| { |
| if(o is FailoverTransport) |
| { |
| FailoverTransport oo = o as FailoverTransport; |
| |
| return this.id - oo.id; |
| } |
| else |
| { |
| throw new ArgumentException(); |
| } |
| } |
| |
| public override String ToString() |
| { |
| return ConnectedTransportURI == null ? "unconnected" : ConnectedTransportURI.ToString(); |
| } |
| |
| internal bool IsPriority(Uri uri) |
| { |
| if (priorityBackup) |
| { |
| if (priorityList.Count > 0) |
| { |
| return priorityList.Contains(uri); |
| } |
| |
| if (this.uris.Count > 0) |
| { |
| return uris[0].Equals(uri); |
| } |
| } |
| return false; |
| } |
| |
| public void DisposeTransport(ITransport transport) |
| { |
| transport.Command = DisposedOnCommand; |
| transport.Exception = DisposedOnException; |
| |
| try |
| { |
| transport.Stop(); |
| } |
| catch (Exception e) |
| { |
| Tracer.DebugFormat("Could not stop transport: {0]. Reason: {1}", transport, e.Message); |
| } |
| } |
| |
| private void ResetReconnectDelay() |
| { |
| if (!useExponentialBackOff || reconnectDelay == DEFAULT_INITIAL_RECONNECT_DELAY) |
| { |
| reconnectDelay = initialReconnectDelay; |
| } |
| } |
| |
| private void DoDelay() |
| { |
| if (reconnectDelay > 0) |
| { |
| lock(sleepMutex) |
| { |
| if (Tracer.IsDebugEnabled) |
| { |
| Tracer.DebugFormat("Waiting {0} ms before attempting connection", reconnectDelay); |
| } |
| try |
| { |
| Monitor.Wait(sleepMutex, reconnectDelay); |
| } |
| catch (ThreadInterruptedException) |
| { |
| } |
| } |
| } |
| |
| if (useExponentialBackOff) |
| { |
| // Exponential increment of reconnect delay. |
| reconnectDelay *= backOffMultiplier; |
| if (reconnectDelay > maxReconnectDelay) |
| { |
| reconnectDelay = maxReconnectDelay; |
| } |
| } |
| } |
| |
| private void PropagateFailureToExceptionListener(Exception exception) |
| { |
| if (Exception != null) |
| { |
| Exception(this, exception); |
| } |
| else |
| { |
| Exception(this, new IOException()); |
| } |
| Monitor.PulseAll(reconnectMutex); |
| } |
| |
| private int CalculateReconnectAttemptLimit() |
| { |
| int maxReconnectValue = this.maxReconnectAttempts; |
| if (firstConnection && this.startupMaxReconnectAttempts != INFINITE) |
| { |
| maxReconnectValue = this.startupMaxReconnectAttempts; |
| } |
| return maxReconnectValue; |
| } |
| |
| public void Shuffle<T>(List<T> list) |
| { |
| Random random = new Random(DateTime.Now.Millisecond); |
| int index = list.Count; |
| while (index > 1) |
| { |
| index--; |
| int k = random.Next(index + 1); |
| T value = list[k]; |
| list[k] = list[index]; |
| list[index] = value; |
| } |
| } |
| |
| private String PrintableUriList(List<Uri> uriList) |
| { |
| if (uriList.Count == 0) |
| { |
| return ""; |
| } |
| |
| StringBuilder builder = new StringBuilder(); |
| for (int i = 0; i < uriList.Count; ++i) |
| { |
| builder.Append(uriList[i]); |
| if (i < (uriList.Count - 1)) |
| { |
| builder.Append(","); |
| } |
| } |
| |
| return builder.ToString(); |
| } |
| |
| private bool CompareUris(Uri first, Uri second) |
| { |
| bool result = false; |
| if (first.Port == second.Port) |
| { |
| IPHostEntry firstAddr = null; |
| IPHostEntry secondAddr = null; |
| try |
| { |
| firstAddr = Dns.GetHostEntry(first.Host); |
| secondAddr = Dns.GetHostEntry(second.Host); |
| |
| if (firstAddr.Equals(secondAddr)) |
| { |
| result = true; |
| } |
| } |
| catch(Exception e) |
| { |
| if (firstAddr == null) |
| { |
| Tracer.WarnFormat("Failed to Lookup IPHostEntry for URI[{0}] : {1}", first, e); |
| } |
| else |
| { |
| Tracer.WarnFormat("Failed to Lookup IPHostEntry for URI[{0}] : {1}", second, e); |
| } |
| |
| if(String.Equals(first.Host, second.Host, StringComparison.CurrentCultureIgnoreCase)) |
| { |
| result = true; |
| } |
| } |
| |
| } |
| |
| return result; |
| } |
| |
| private bool Contains(Uri newURI) |
| { |
| bool result = false; |
| foreach (Uri uri in uris) |
| { |
| if (CompareUris(newURI, uri)) |
| { |
| result = true; |
| break; |
| } |
| } |
| |
| return result; |
| } |
| } |
| } |