blob: 18c1dee330fd981e93defd7ff0f866a02bc57ade [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
using System;
using System.Collections.Generic;
using System.IO;
using System.Threading.Tasks;
using Apache.NMS.AMQP.Message;
using Apache.NMS.AMQP.Meta;
using Apache.NMS.AMQP.Util;
using Apache.NMS.Util;
namespace Apache.NMS.AMQP.Provider.Failover
{
public class FailoverProvider : IProvider, IProviderListener
{
private readonly object SyncRoot = new object();
private static int UNDEFINED = -1;
public static int DEFAULT_INITIAL_RECONNECT_DELAY = 0;
public static long DEFAULT_RECONNECT_DELAY = 10;
public static double DEFAULT_RECONNECT_BACKOFF_MULTIPLIER = 2.0d;
public static long DEFAULT_MAX_RECONNECT_DELAY = (long) Math.Round(TimeSpan.FromSeconds(30).TotalMilliseconds);
public static int DEFAULT_STARTUP_MAX_RECONNECT_ATTEMPTS = UNDEFINED;
public static int DEFAULT_MAX_RECONNECT_ATTEMPTS = UNDEFINED;
public static bool DEFAULT_USE_RECONNECT_BACKOFF = true;
public static int DEFAULT_WARN_AFTER_RECONNECT_ATTEMPTS = 10;
public static bool DEFAULT_RANDOMIZE_ENABLED = false;
private readonly ReconnectControls reconnectControl;
private readonly FailoverUriPool uris;
private readonly AtomicBool closed = new AtomicBool();
private readonly Atomic<bool> failed = new Atomic<bool>();
private long requestTimeout;
private long sendTimeout;
private Uri connectedUri;
private ConnectionInfo connectionInfo;
private IProvider provider;
private IProviderListener listener;
private List<FailoverRequest> requests = new List<FailoverRequest>();
internal IProvider ActiveProvider => provider;
public FailoverProvider(IEnumerable<Uri> uris)
{
this.uris = new FailoverUriPool(uris);
reconnectControl = new ReconnectControls(this);
}
public long InitialReconnectDelay { get; set; } = DEFAULT_INITIAL_RECONNECT_DELAY;
public long ReconnectDelay { get; set; } = DEFAULT_RECONNECT_DELAY;
public bool UseReconnectBackOff { get; set; } = DEFAULT_USE_RECONNECT_BACKOFF;
public double ReconnectBackOffMultiplier { get; set; } = DEFAULT_RECONNECT_BACKOFF_MULTIPLIER;
public long MaxReconnectDelay { get; set; } = DEFAULT_MAX_RECONNECT_DELAY;
public int StartupMaxReconnectAttempts { get; set; } = DEFAULT_STARTUP_MAX_RECONNECT_ATTEMPTS;
public int MaxReconnectAttempts { get; set; } = DEFAULT_MAX_RECONNECT_ATTEMPTS;
public int WarnAfterReconnectAttempts { get; set; } = DEFAULT_WARN_AFTER_RECONNECT_ATTEMPTS;
public Uri RemoteUri => connectedUri;
public void Start()
{
CheckClosed();
if (listener == null)
{
throw new IllegalStateException("No ProviderListener registered.");
}
}
public Task Connect(ConnectionInfo info)
{
CheckClosed();
requestTimeout = info.requestTimeout;
sendTimeout = info.SendTimeout;
connectionInfo = info;
Tracer.Debug("Initiating initial connection attempt task");
return TriggerReconnectionAttempt();
}
private Task TriggerReconnectionAttempt()
{
if (closed)
return Task.CompletedTask;
return reconnectControl.ScheduleReconnect(Reconnect);
async Task Reconnect()
{
IProvider provider = null;
Exception failure = null;
long reconnectAttempts = reconnectControl.RecordNextAttempt();
try
{
if (uris.Any())
{
for (int i = 0; i < uris.Size(); i++)
{
var target = uris.GetNext();
if (target == null)
{
Tracer.Debug("Failover URI collection unexpectedly modified during connection attempt.");
continue;
}
try
{
Tracer.Debug($"Connection attempt:[{reconnectAttempts}] to: {target.Scheme}://{target.Host}:{target.Port} in-progress");
provider = ProviderFactory.Create(target);
await provider.Connect(connectionInfo).ConfigureAwait(false);
await InitializeNewConnection(provider).ConfigureAwait(false);
return;
}
catch (Exception e)
{
Tracer.Info($"Connection attempt:[{reconnectAttempts}] to: {target.Scheme}://{target.Host}:{target.Port} failed");
failure = e;
try
{
provider?.Close();
}
catch
{
}
finally
{
provider = null;
}
}
}
}
else
{
Tracer.Debug("No remote URI available to connect to in failover list");
// TODO Handle this one.
failure = new IOException("No remote URI available for reconnection during connection attempt: " + reconnectAttempts);
}
}
catch (Exception unknownFailure)
{
Tracer.Warn($"Connection attempt:[{reconnectAttempts}] failed abnormally.");
failure = failure ?? unknownFailure;
}
finally
{
if (provider == null)
{
Tracer.Debug($"Connection attempt:[{reconnectControl.ReconnectAttempts}] failed error: {failure?.Message}");
if (!reconnectControl.IsReconnectAllowed(failure))
{
ReportReconnectFailure(failure);
}
else
{
await reconnectControl.ScheduleReconnect(Reconnect).ConfigureAwait(false);
}
}
}
}
}
private void ReportReconnectFailure(Exception lastFailure)
{
Tracer.Error($"Failed to connect after: {reconnectControl.ReconnectAttempts} attempt(s)");
if (failed.CompareAndSet(false, true))
{
if (lastFailure == null)
{
lastFailure = new IOException($"Failed to connect after: {reconnectControl.ReconnectAttempts} attempt(s)");
}
var exception = NMSExceptionSupport.Create(lastFailure);
listener.OnConnectionFailure(exception);
throw exception;
}
}
private async Task InitializeNewConnection(IProvider provider)
{
if (closed)
{
try
{
provider.Close();
}
catch (Exception e)
{
Tracer.Debug($"Ignoring failure to close failed provider: {provider} {e.Message}");
}
return;
}
this.provider = provider;
this.provider.SetProviderListener(this);
this.connectedUri = provider.RemoteUri;
if (reconnectControl.IsRecoveryRequired())
{
Tracer.Debug($"Signalling connection recovery: {provider}");
// Allow listener to recover its resources
await listener.OnConnectionRecovery(provider).ConfigureAwait(false);
// Restart consumers, send pull commands, etc.
await listener.OnConnectionRecovered(provider).ConfigureAwait(false);
// Let the client know that connection has restored.
listener.OnConnectionRestored(connectedUri);
// If we try to run pending requests right after the connection is reestablished
// it will result in timeout on the first send request
await Task.Delay(50).ConfigureAwait(false);
foreach (FailoverRequest request in GetPendingRequests())
{
await request.Run().ConfigureAwait(false);
}
reconnectControl.ConnectionEstablished();
}
else
{
listener.OnConnectionEstablished(connectedUri);
reconnectControl.ConnectionEstablished();
}
}
public override string ToString()
{
return "FailoverProvider: " + (connectedUri == null ? "unconnected" : connectedUri.ToString());
}
public void Close()
{
if (closed.CompareAndSet(false, true))
{
try
{
provider.Close();
}
catch (Exception e)
{
Tracer.Warn("Error caught while closing Provider: " + e.Message);
}
}
}
public void SetProviderListener(IProviderListener providerListener)
{
CheckClosed();
listener = providerListener;
}
public Task CreateResource(ResourceInfo resourceInfo)
{
CheckClosed();
FailoverRequest request = new FailoverRequest(this, requestTimeout)
{
DoTask = activeProvider => provider.CreateResource(resourceInfo),
Name = nameof(CreateResource)
};
request.Run();
return request.Task;
}
public Task DestroyResource(ResourceInfo resourceInfo)
{
CheckClosed();
FailoverRequest request = new FailoverRequest(this, requestTimeout)
{
DoTask = activeProvider => activeProvider.DestroyResource(resourceInfo),
Name = nameof(DestroyResource),
// Allow this to succeed, resource won't get recreated on reconnect.
SucceedsWhenOffline = true
};
request.Run();
return request.Task;
}
public Task StartResource(ResourceInfo resourceInfo)
{
CheckClosed();
FailoverRequest request = new FailoverRequest(this, requestTimeout)
{
DoTask = activeProvider => activeProvider.StartResource(resourceInfo),
Name = nameof(StartResource)
};
request.Run();
return request.Task;
}
public Task StopResource(ResourceInfo resourceInfo)
{
CheckClosed();
FailoverRequest request = new FailoverRequest(this, requestTimeout)
{
DoTask = activeProvider => activeProvider.StopResource(resourceInfo),
Name = nameof(StopResource)
};
request.Run();
return request.Task;
}
public Task Recover(Id sessionId)
{
CheckClosed();
FailoverRequest request = new FailoverRequest(this, requestTimeout)
{
DoTask = activeProvider => activeProvider.Recover(sessionId),
SucceedsWhenOffline = true,
Name = nameof(Recover)
};
request.Run();
return request.Task;
}
public Task Acknowledge(Id sessionId, AckType ackType)
{
CheckClosed();
FailoverRequest request = new FailoverRequest(this, requestTimeout)
{
DoTask = activeProvider => activeProvider.Acknowledge(sessionId, ackType),
FailureWhenOffline = true,
Name = nameof(Acknowledge)
};
request.Run();
return request.Task;
}
public Task Acknowledge(InboundMessageDispatch envelope, AckType ackType)
{
CheckClosed();
FailoverRequest request = new FailoverRequest(this, requestTimeout)
{
DoTask = activeProvider => activeProvider.Acknowledge(envelope, ackType),
FailureWhenOffline = true,
Name = nameof(Acknowledge)
};
request.Run();
return request.Task;
}
public INmsMessageFactory MessageFactory => provider.MessageFactory;
public long SendTimeout => sendTimeout;
public Task Send(OutboundMessageDispatch envelope)
{
CheckClosed();
FailoverRequest request = new FailoverRequest(this, SendTimeout)
{
DoTask = activeProvider => activeProvider.Send(envelope),
Name = nameof(Send)
};
request.Run();
return request.Task;
}
public Task Unsubscribe(string name)
{
CheckClosed();
FailoverRequest request = new FailoverRequest(this, SendTimeout)
{
DoTask = activeProvider => activeProvider.Unsubscribe(name),
Name = nameof(Unsubscribe)
};
request.Run();
return request.Task;
}
public Task Rollback(TransactionInfo transactionInfo, TransactionInfo nextTransactionInfo)
{
CheckClosed();
FailoverRequest request = new FailoverRequest(this, SendTimeout)
{
DoTask = activeProvider => activeProvider.Rollback(transactionInfo, nextTransactionInfo),
Name = nameof(Rollback),
SucceedsWhenOffline = true
};
request.Run();
return request.Task;
}
public Task Commit(TransactionInfo transactionInfo, TransactionInfo nextTransactionInfo)
{
CheckClosed();
FailoverRequest request = new FailoverRequest(this, SendTimeout)
{
DoTask = activeProvider => activeProvider.Commit(transactionInfo, nextTransactionInfo),
Name = nameof(Commit),
FailureWhenOffline = true
};
request.Run();
return request.Task;
}
public void OnInboundMessage(InboundMessageDispatch envelope)
{
if (closed)
return;
listener.OnInboundMessage(envelope);
}
public void OnConnectionFailure(NMSException exception)
{
Tracer.Debug($"Failover: the provider reports failure: {exception.Message}");
HandleProviderError(provider, exception);
}
public Task OnConnectionRecovery(IProvider provider)
{
return Task.CompletedTask;
}
public void OnConnectionEstablished(Uri remoteUri)
{
}
public Task OnConnectionRecovered(IProvider provider)
{
return Task.CompletedTask;
}
public void OnConnectionRestored(Uri remoteUri)
{
}
public void OnConnectionInterrupted(Uri failedUri)
{
}
public void OnResourceClosed(ResourceInfo resourceInfo, Exception error)
{
if (closed)
return;
listener.OnResourceClosed(resourceInfo, error);
}
internal void HandleProviderError(IProvider provider, NMSException cause)
{
if (closed)
return;
lock (SyncRoot)
{
// It is possible that another failed request signaled an error for the same provider
// and we already cleaned up the old failed provider and scheduled a reconnect that
// has already succeeded, so we need to ensure that we don't kill a valid provider.
if (provider == this.provider)
{
Tracer.Debug($"handling Provider failure: {cause.ToString()}");
this.provider = null;
provider.SetProviderListener(null);
Uri failedUri = provider.RemoteUri;
try
{
provider.Close();
}
catch (Exception exception)
{
Tracer.Debug($"Caught exception while closing failed provider: {exception.Message}");
}
if (reconnectControl.IsReconnectAllowed(cause))
{
// Start watching for request timeouts while we are offline, unless we already are.
foreach (FailoverRequest failoverRequest in GetPendingRequests())
{
failoverRequest.ScheduleTimeout();
}
Task.Run(TriggerReconnectionAttempt);
listener?.OnConnectionInterrupted(failedUri);
}
else
{
listener?.OnConnectionFailure(cause);
}
}
else
{
Tracer.Debug($"Ignoring duplicate provider failed event for provider: {provider}");
}
}
}
private void CheckClosed()
{
if (closed)
throw new IOException("The Provider is already closed");
}
internal void AddFailoverRequest(FailoverRequest request)
{
lock (requests)
{
requests.Add(request);
}
}
internal void RemoveFailoverRequest(FailoverRequest request)
{
lock (requests)
{
requests.Remove(request);
}
}
internal IEnumerable<FailoverRequest> GetPendingRequests()
{
lock (requests)
{
return requests.ToArray();
}
}
private class ReconnectControls
{
private readonly FailoverProvider failoverProvider;
// Reconnection state tracking
private volatile bool recoveryRequired;
private long reconnectAttempts;
private long nextReconnectDelay = -1;
public ReconnectControls(FailoverProvider failoverProvider)
{
this.failoverProvider = failoverProvider;
}
public long ReconnectAttempts => reconnectAttempts;
public async Task ScheduleReconnect(Func<Task> action)
{
// If no connection recovery required then we have never fully connected to a remote
// so we proceed down the connect with one immediate connection attempt and then follow
// on delayed attempts based on configuration.
if (!recoveryRequired)
{
if (ReconnectAttempts == 0)
{
Tracer.Debug("Initial connect attempt will be performed immediately");
await action();
}
else if (ReconnectAttempts == 1 && failoverProvider.InitialReconnectDelay > 0)
{
Tracer.Debug($"Delayed initial reconnect attempt will be in {failoverProvider.InitialReconnectDelay} milliseconds");
await Task.Delay(TimeSpan.FromMilliseconds(failoverProvider.InitialReconnectDelay));
await action();
}
else
{
double delay = NextReconnectDelay();
Tracer.Debug($"Next reconnect attempt will be in {delay} milliseconds");
await Task.Delay(TimeSpan.FromMilliseconds(delay));
await action();
}
}
else if (ReconnectAttempts == 0)
{
if (failoverProvider.InitialReconnectDelay > 0)
{
Tracer.Debug($"Delayed initial reconnect attempt will be in {failoverProvider.InitialReconnectDelay} milliseconds");
await Task.Delay(TimeSpan.FromMilliseconds(failoverProvider.InitialReconnectDelay));
await action();
}
else
{
Tracer.Debug("Initial Reconnect attempt will be performed immediately");
await action();
}
}
else
{
double delay = NextReconnectDelay();
Tracer.Debug($"Next reconnect attempt will be in {delay} milliseconds");
await Task.Delay(TimeSpan.FromMilliseconds(delay));
await action();
}
}
private long NextReconnectDelay()
{
if (nextReconnectDelay == -1)
{
nextReconnectDelay = failoverProvider.ReconnectDelay;
}
if (failoverProvider.UseReconnectBackOff && ReconnectAttempts > 1)
{
// Exponential increment of reconnect delay.
nextReconnectDelay = (long) Math.Round(nextReconnectDelay * failoverProvider.ReconnectBackOffMultiplier);
if (nextReconnectDelay > failoverProvider.MaxReconnectDelay)
{
nextReconnectDelay = failoverProvider.MaxReconnectDelay;
}
}
return nextReconnectDelay;
}
public long RecordNextAttempt()
{
return ++reconnectAttempts;
}
public void ConnectionEstablished()
{
recoveryRequired = true;
nextReconnectDelay = -1;
reconnectAttempts = 0;
}
public bool IsReconnectAllowed(Exception cause)
{
// If a connection attempts fail due to Security errors than we abort
// reconnection as there is a configuration issue and we want to avoid
// a spinning reconnect cycle that can never complete.
if (IsStoppageCause(cause))
{
return false;
}
return !IsLimitExceeded();
}
private bool IsStoppageCause(Exception cause)
{
// TODO: Check if fail is due to Security errors
return false;
}
public bool IsLimitExceeded()
{
int reconnectLimit = ReconnectAttemptLimit();
if (reconnectLimit != UNDEFINED && reconnectAttempts >= reconnectLimit)
{
return true;
}
return false;
}
private int ReconnectAttemptLimit()
{
int maxReconnectValue = failoverProvider.MaxReconnectAttempts;
if (!recoveryRequired && failoverProvider.StartupMaxReconnectAttempts != UNDEFINED)
{
// If this is the first connection attempt and a specific startup retry limit
// is configured then use it, otherwise use the main reconnect limit
maxReconnectValue = failoverProvider.StartupMaxReconnectAttempts;
}
return maxReconnectValue;
}
public bool IsRecoveryRequired()
{
return recoveryRequired;
}
}
}
}