blob: dcdf825df4eff9d5855240d82001bf7f656950a1 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
using System;
using System.Collections.Generic;
using System.Security.Principal;
using System.Threading.Tasks;
using Apache.Qpid.Proton.Buffer;
using Apache.Qpid.Proton.Client.Exceptions;
using Apache.Qpid.Proton.Client.Concurrent;
using Apache.Qpid.Proton.Client.Utilities;
using Apache.Qpid.Proton.Engine.Sasl.Client;
using Apache.Qpid.Proton.Client.Transport;
using Apache.Qpid.Proton.Logging;
namespace Apache.Qpid.Proton.Client.Implementation
{
/// <summary>
/// The client connection class manages a single connection to a remote AMQP
/// peer and handles connection errors and reconnection operations if those
/// are enabled.
/// </summary>
public class ClientConnection : IConnection
{
private static IProtonLogger LOG = ProtonLoggerFactory.GetLogger<ClientConnection>();
private const int UNLIMITED = -1;
private const int UNDEFINED = -1;
private readonly ClientInstance client;
private readonly ConnectionOptions options;
private readonly ClientSessionBuilder sessionBuilder;
private readonly string connectionId;
private readonly ReconnectLocationPool reconnectPool = new ReconnectLocationPool();
private readonly AtomicBoolean closed = new AtomicBoolean();
private readonly ClientConnectionCapabilities capabilities = new ClientConnectionCapabilities();
private readonly TaskCompletionSource<IConnection> openFuture = new TaskCompletionSource<IConnection>();
private readonly TaskCompletionSource<IConnection> closeFuture = new TaskCompletionSource<IConnection>();
private readonly IOContext ioContext;
private Engine.IEngine engine;
private Engine.IConnection protonConnection;
private ITransport transport;
private AtomicReference<Exception> failureCause = new AtomicReference<Exception>();
private ClientSession connectionSession;
private ClientSender connectionSender;
private long totalConnections;
private long reconnectAttempts;
private long nextReconnectDelay = -1;
internal ClientConnection(ClientInstance client, string host, int port, ConnectionOptions options)
{
this.client = client;
this.options = options;
this.connectionId = client.NextConnectionId();
this.sessionBuilder = new ClientSessionBuilder(this);
this.ioContext = new IOContext(options.TransportOptions, options.SslOptions);
reconnectPool.Add(new ReconnectLocation(host, port));
reconnectPool.AddAll(options.ReconnectOptions.ReconnectLocations);
}
#region Connection Properties Access APIs
public IClient Client => client;
public Task<IConnection> OpenTask => openFuture.Task;
public IReadOnlyDictionary<string, object> Properties
{
get
{
WaitForOpenToComplete();
return ClientConversionSupport.ToStringKeyedMap(protonConnection.RemoteProperties);
}
}
public IReadOnlyCollection<string> OfferedCapabilities
{
get
{
WaitForOpenToComplete();
return ClientConversionSupport.ToStringArray(protonConnection.RemoteOfferedCapabilities);
}
}
public IReadOnlyCollection<string> DesiredCapabilities
{
get
{
WaitForOpenToComplete();
return ClientConversionSupport.ToStringArray(protonConnection.RemoteDesiredCapabilities);
}
}
#endregion
public void Close(IErrorCondition error = null)
{
try
{
DoCloseAsync(error).GetAwaiter().GetResult();
}
catch (Exception)
{
// Ignore any exception as we are closed regardless
}
}
public Task<IConnection> CloseAsync(IErrorCondition error = null)
{
return DoCloseAsync(error);
}
public void Dispose()
{
try
{
Close();
}
catch (Exception)
{
}
}
public ISender DefaultSender()
{
CheckClosedOrFailed();
TaskCompletionSource<ISender> defaultSender = new TaskCompletionSource<ISender>();
Execute(() =>
{
try
{
CheckClosedOrFailed();
defaultSender.TrySetResult(LazyCreateConnectionSender());
}
catch (Exception error)
{
defaultSender.TrySetException(ClientExceptionSupport.CreateNonFatalOrPassthrough(error));
}
});
return Request(this, defaultSender).Task.GetAwaiter().GetResult();
}
public ISession DefaultSession()
{
CheckClosedOrFailed();
TaskCompletionSource<ISession> defaultSession = new TaskCompletionSource<ISession>();
Execute(() =>
{
try
{
CheckClosedOrFailed();
defaultSession.TrySetResult(LazyCreateConnectionSession());
}
catch (Exception error)
{
defaultSession.TrySetException(ClientExceptionSupport.CreateNonFatalOrPassthrough(error));
}
});
return Request(this, defaultSession).Task.GetAwaiter().GetResult();
}
public ISession OpenSession(SessionOptions options = null)
{
CheckClosedOrFailed();
TaskCompletionSource<ISession> createSession = new TaskCompletionSource<ISession>();
Execute(() =>
{
try
{
CheckClosedOrFailed();
createSession.TrySetResult(sessionBuilder.Session(options).Open());
}
catch (Exception error)
{
createSession.TrySetException(ClientExceptionSupport.CreateNonFatalOrPassthrough(error));
}
});
return Request(this, createSession).Task.GetAwaiter().GetResult();
}
public IReceiver OpenDurableReceiver(string address, string subscriptionName, ReceiverOptions options = null)
{
CheckClosedOrFailed();
Objects.RequireNonNull(address, "Cannot create a receiver with a null address");
TaskCompletionSource<IReceiver> createReceiver = new TaskCompletionSource<IReceiver>();
Execute(() =>
{
try
{
CheckClosedOrFailed();
createReceiver.TrySetResult(LazyCreateConnectionSession().InternalOpenDurableReceiver(address, subscriptionName, options));
}
catch (Exception error)
{
createReceiver.TrySetException(ClientExceptionSupport.CreateNonFatalOrPassthrough(error));
}
});
return Request(this, createReceiver).Task.GetAwaiter().GetResult();
}
public IReceiver OpenDynamicReceiver(IDictionary<string, object> dynamicNodeProperties = null)
{
return OpenDynamicReceiver(null, dynamicNodeProperties);
}
public IReceiver OpenDynamicReceiver(ReceiverOptions options, IDictionary<string, object> dynamicNodeProperties = null)
{
CheckClosedOrFailed();
TaskCompletionSource<IReceiver> createReceiver = new TaskCompletionSource<IReceiver>();
Execute(() =>
{
try
{
CheckClosedOrFailed();
createReceiver.TrySetResult(LazyCreateConnectionSession().InternalOpenDynamicReceiver(dynamicNodeProperties, options));
}
catch (Exception error)
{
createReceiver.TrySetException(ClientExceptionSupport.CreateNonFatalOrPassthrough(error));
}
});
return Request(this, createReceiver).Task.GetAwaiter().GetResult();
}
public IReceiver OpenReceiver(string address, ReceiverOptions options = null)
{
CheckClosedOrFailed();
Objects.RequireNonNull(address, "Cannot create a receiver with a null address");
TaskCompletionSource<IReceiver> createReceiver = new TaskCompletionSource<IReceiver>();
Execute(() =>
{
try
{
CheckClosedOrFailed();
createReceiver.TrySetResult(LazyCreateConnectionSession().InternalOpenReceiver(address, options));
}
catch (Exception error)
{
createReceiver.TrySetException(ClientExceptionSupport.CreateNonFatalOrPassthrough(error));
}
});
return Request(this, createReceiver).Task.GetAwaiter().GetResult();
}
public ISender OpenAnonymousSender(SenderOptions options = null)
{
CheckClosedOrFailed();
TaskCompletionSource<ISender> createSender = new TaskCompletionSource<ISender>();
Execute(() =>
{
try
{
CheckClosedOrFailed();
createSender.TrySetResult(LazyCreateConnectionSession().InternalOpenAnonymousSender(options));
}
catch (Exception error)
{
createSender.TrySetException(ClientExceptionSupport.CreateNonFatalOrPassthrough(error));
}
});
return Request(this, createSender).Task.GetAwaiter().GetResult();
}
public ISender OpenSender(string address, SenderOptions options = null)
{
CheckClosedOrFailed();
Objects.RequireNonNull(address, "Cannot create a sender with a null address");
TaskCompletionSource<ISender> createSender = new TaskCompletionSource<ISender>();
Execute(() =>
{
try
{
CheckClosedOrFailed();
createSender.TrySetResult(LazyCreateConnectionSession().InternalOpenSender(address, options));
}
catch (Exception error)
{
createSender.TrySetException(ClientExceptionSupport.CreateNonFatalOrPassthrough(error));
}
});
return Request(this, createSender).Task.GetAwaiter().GetResult();
}
public IStreamReceiver OpenStreamReceiver(string address, StreamReceiverOptions options = null)
{
CheckClosedOrFailed();
TaskCompletionSource<IStreamReceiver> createReceiver = new TaskCompletionSource<IStreamReceiver>();
Execute(() =>
{
try
{
uint sessionCapacity = StreamReceiverOptions.DEFAULT_READ_BUFFER_SIZE;
if (options != null)
{
sessionCapacity = options.ReadBufferSize / 2;
}
// Session capacity cannot be smaller than one frame size so we adjust to the lower bound
sessionCapacity = Math.Max(sessionCapacity, protonConnection.MaxFrameSize);
CheckClosedOrFailed();
SessionOptions sessionOptions = new SessionOptions(sessionBuilder.DefaultSessionOptions);
sessionOptions.IncomingCapacity = sessionCapacity;
ClientStreamSession session = (ClientStreamSession)sessionBuilder.StreamSession(sessionOptions).Open();
createReceiver.TrySetResult(session.InternalOpenStreamReceiver(address, options));
}
catch (Exception error)
{
createReceiver.TrySetException(ClientExceptionSupport.CreateNonFatalOrPassthrough(error));
}
});
return Request(this, createReceiver).Task.GetAwaiter().GetResult();
}
public IStreamSender OpenStreamSender(string address, StreamSenderOptions options = null)
{
CheckClosedOrFailed();
Objects.RequireNonNull(address, "Cannot create a sender with a null address");
TaskCompletionSource<IStreamSender> createSender = new TaskCompletionSource<IStreamSender>();
Execute(() =>
{
try
{
uint sessionCapacity = StreamSenderOptions.DEFAULT_PENDING_WRITES_BUFFER_SIZE;
if (options != null)
{
sessionCapacity = options.PendingWriteBufferSize;
}
// Session capacity cannot be smaller than one frame size so we adjust to the lower bound
sessionCapacity = Math.Max(sessionCapacity, protonConnection.MaxFrameSize);
CheckClosedOrFailed();
SessionOptions sessionOptions = new SessionOptions(sessionBuilder.DefaultSessionOptions);
sessionOptions.OutgoingCapacity = sessionCapacity;
ClientStreamSession session = (ClientStreamSession)sessionBuilder.StreamSession(sessionOptions).Open();
createSender.TrySetResult(session.InternalOpenStreamSender(address, options));
}
catch (Exception error)
{
createSender.TrySetException(ClientExceptionSupport.CreateNonFatalOrPassthrough(error));
}
});
return Request(this, createSender).Task.GetAwaiter().GetResult();
}
public ITracker Send<T>(IMessage<T> message)
{
CheckClosedOrFailed();
Objects.RequireNonNull(message, "Cannot send a null message");
TaskCompletionSource<ISender> result = new TaskCompletionSource<ISender>();
Execute(() =>
{
try
{
CheckClosedOrFailed();
result.TrySetResult(LazyCreateConnectionSender());
}
catch (Exception error)
{
result.TrySetException(ClientExceptionSupport.CreateNonFatalOrPassthrough(error));
}
});
return Request(this, result).Task.GetAwaiter().GetResult().Send(message);
}
public override string ToString()
{
return "ClientConnection:[" + ConnectionId + "]";
}
#region Internal Connection API
internal string ConnectionId => connectionId;
internal bool IsClosed => closed;
internal bool HasOpened => openFuture.Task.IsCompleted;
internal Engine.IEngine ProtonEngine => protonConnection.Engine;
internal Engine.IConnection ProtonConnection => protonConnection;
internal ConnectionOptions Options => options;
internal ClientConnection Connect()
{
try
{
ReconnectLocation remoteLocation = reconnectPool.Next.Value;
// Initial configuration validation happens here, if this step fails then the
// user most likely configured something incorrect or that violates some constraint
// like an invalid SASL mechanism etc.
InitializeProtonResources(remoteLocation);
ScheduleReconnect(remoteLocation);
return this;
}
catch (Exception ex)
{
closed.Set(true);
failureCause.CompareAndSet(null, ClientExceptionSupport.CreateOrPassthroughFatal(ex));
_ = openFuture.TrySetException(failureCause);
_ = closeFuture.TrySetResult(this);
ioContext.Shutdown();
throw failureCause;
}
}
internal void CheckClosedOrFailed()
{
if (closed)
{
throw new ClientIllegalStateException("The Connection was explicitly closed", failureCause);
}
else if (failureCause.Get() != null)
{
throw failureCause;
}
}
internal bool IsAnonymousRelaySupported => capabilities.AnonymousRelaySupported;
internal void CheckAnonymousRelaySupported()
{
if (!capabilities.AnonymousRelaySupported)
{
throw new ClientUnsupportedOperationException("Anonymous relay support not available from this connection");
}
}
internal void Execute(Action action)
{
ioContext.EventLoop.Execute(action);
}
internal void Schedule(Action action, TimeSpan delay)
{
// TODO: Either add scheduling to event loop or handle timeouts here somehow
Task.Delay(delay).ContinueWith((t) => Execute(action));
}
internal TaskCompletionSource<T> Request<T>(Object requestor, TaskCompletionSource<T> request)
{
// TODO : Wait for completion or fail later via tracking map
return request;
}
#endregion
#region Proton Engine and Connection event handlers
private void HandleLocalOpen(Engine.IConnection connection)
{
// TODO connection.TickAuto(Scheduler);
if (options.OpenTimeout > 0)
{
Schedule(() =>
{
if (!openFuture.Task.IsCompleted)
{
// Ensure a close write is attempted and then force failure regardless
// as we don't expect the remote to respond given it hasn't done so yet.
try
{
connection.Close();
}
catch (Exception) { }
connection.Engine.EngineFailed(new ClientOperationTimedOutException(
"Connection Open timed out waiting for remote to open"));
}
}, TimeSpan.FromMilliseconds(options.OpenTimeout));
}
}
private void HandleLocalClose(Engine.IConnection connection)
{
if (connection.IsRemotelyClosed)
{
ClientException failureCause;
if (engine.Connection.RemoteErrorCondition != null)
{
failureCause = ClientExceptionSupport.ConvertToConnectionClosedException(connection.RemoteErrorCondition);
}
else
{
failureCause = new ClientConnectionRemotelyClosedException("Unknown error led to connection disconnect");
}
try
{
connection.Engine.EngineFailed(failureCause);
}
catch (Exception)
{
}
}
else if (!engine.IsShutdown || !engine.IsFailed)
{
// Ensure engine gets shut down and future completed if remote doesn't respond.
Schedule(() =>
{
try
{
connection.Engine.Shutdown();
}
catch (Exception)
{
}
}, TimeSpan.FromMilliseconds(options.CloseTimeout));
}
}
private void HandleRemoteOpen(Engine.IConnection connection)
{
ConnectionEstablished();
capabilities.DetermineCapabilities(connection);
if (totalConnections == 1)
{
LOG.Info("Connection {0} connected to server: {1}", ConnectionId, transport.EndPoint);
SubmitConnectionEvent(options.ConnectedHandler, transport.Host, transport.Port);
}
else
{
LOG.Info("Connection {0} reconnected to server: {1}", ConnectionId, transport.EndPoint);
SubmitConnectionEvent(options.ReconnectedHandler, transport.Host, transport.Port);
}
openFuture.TrySetResult(this);
}
private void HandleRemoteClose(Engine.IConnection connection)
{
// When the connection is already locally closed this implies the application requested
// a close of this connection so this is normal, if not then the remote is closing for
// some reason and we should react as if the connection has failed which we will determine
// in the local close handler based on state.
if (connection.IsLocallyClosed)
{
try
{
connection.Engine.Shutdown();
}
catch (Exception ignore)
{
LOG.Debug("Unexpected exception thrown from engine shutdown: {0}", ignore.Message);
}
}
else
{
try
{
connection.Close();
}
catch (Exception)
{
// Engine handlers will ensure we close down if not already locally closed.
}
}
}
private void HandleEngineOutput(IProtonBuffer buffer, Action ioComplete)
{
bool autoFlush = false;
if (autoFlush)
{
//transport.WriteAndFlush(buffer, ioComplete);
}
else
{
transport.Write(buffer, ioComplete);
}
}
/// <summary>
/// Handle normal engine shutdown which should only happen when the connection is closed
/// by the user, all other cases should lead to engine failed event first which will deal
/// with reconnect cases and avoid this event unless reconnect cannot proceed.
/// </summary>
/// <param name="engine"></param>
private void HandleEngineShutdown(Engine.IEngine engine)
{
// Only handle this on normal shutdown failure will perform its own controlled shutdown
// and or reconnection logic which this method should avoid interfering with.
if (engine.FailureCause == null)
{
try
{
protonConnection.Close();
}
catch (Exception)
{
}
try
{
transport.Close();
}
catch (Exception)
{
}
client.UnregisterClosedConnection(this);
_ = openFuture.TrySetResult(this);
_ = closeFuture.TrySetResult(this);
}
}
private void HandleEngineFailure(Engine.IEngine engine)
{
ClientIOException failureCause;
if (engine.Connection.RemoteErrorCondition != null)
{
failureCause = ClientExceptionSupport.ConvertToConnectionClosedException(engine.Connection.RemoteErrorCondition);
}
else if (engine.FailureCause != null)
{
failureCause = ClientExceptionSupport.ConvertToConnectionClosedException(engine.FailureCause);
}
else
{
failureCause = new ClientConnectionRemotelyClosedException("Unknown error led to connection disconnect");
}
LOG.Trace("Engine reports failure with error: {}", failureCause.Message);
if (IsReconnectAllowed(failureCause))
{
LOG.Info("Connection {0} interrupted to server: {1}", ConnectionId, transport.EndPoint);
SubmitDisconnectionEvent(options.InterruptedHandler, transport.Host, transport.Port, failureCause);
// Initial configuration validation happens here, if this step fails then the
// user most likely configured something incorrect or that violates some constraint
// like an invalid SASL mechanism etc.
try
{
ReconnectLocation remoteLocation = reconnectPool.Next.Value;
InitializeProtonResources(remoteLocation);
ScheduleReconnect(remoteLocation);
}
catch (ClientException initError)
{
FailConnection(ClientExceptionSupport.CreateOrPassthroughFatal(initError));
}
finally
{
engine.Shutdown();
}
}
else
{
FailConnection(failureCause);
}
}
#endregion
#region private connection utility methods
private Task<IConnection> DoCloseAsync(IErrorCondition error)
{
if (closed.CompareAndSet(false, true))
{
try
{
ioContext.EventLoop.Execute(() =>
{
LOG.Trace("Close requested for connection: {0}", this);
if (protonConnection.IsLocallyOpen)
{
protonConnection.ErrorCondition = ClientErrorCondition.AsProtonErrorCondition(error);
try
{
protonConnection.Close();
}
catch (Exception)
{
// Engine error handler will kick in if the write of Close fails
}
}
else
{
engine.Shutdown();
}
});
}
catch (RejectedExecutionException rje)
{
LOG.Trace("Close task rejected from the event loop", rje);
}
finally
{
try
{
// TODO: Blocking here isn't ideal but for now we want to await
/// the remote sending the close performative back to us
/// before dropping the connection. We should probably schedule
/// a task that closes the connection and completes the close
/// future if the remote hasn't responded by then.
closeFuture.Task.GetAwaiter().GetResult();
}
catch (Exception)
{
// Ignore error as we are closed regardless
}
finally
{
try
{
transport.Close();
}
catch (Exception) { }
ioContext.Shutdown();
}
}
}
return closeFuture.Task;
}
private void InitializeProtonResources(ReconnectLocation location)
{
if (options.SaslOptions.SaslEnabled)
{
engine = Engine.IEngineFactory.Proton.CreateEngine();
}
else
{
engine = Engine.IEngineFactory.Proton.CreateNonSaslEngine();
}
if (options.TraceFrames)
{
engine.Configuration.TraceFrames = true;
if (!engine.Configuration.TraceFrames)
{
LOG.Warn("Connection {0} frame tracing was enabled but protocol engine does not support it", ConnectionId);
}
}
engine.OutputHandler(HandleEngineOutput)
.ShutdownHandler(HandleEngineShutdown)
.ErrorHandler(HandleEngineFailure);
protonConnection = engine.Connection;
if (client.ContainerId != null)
{
protonConnection.ContainerId = client.ContainerId;
}
else
{
protonConnection.ContainerId = connectionId;
}
protonConnection.LinkedResource = this;
protonConnection.ChannelMax = options.ChannelMax;
protonConnection.MaxFrameSize = options.MaxFrameSize;
protonConnection.Hostname = location.Host;
protonConnection.IdleTimeout = (uint)options.IdleTimeout;
protonConnection.OfferedCapabilities = ClientConversionSupport.ToSymbolArray(options.OfferedCapabilities);
protonConnection.DesiredCapabilities = ClientConversionSupport.ToSymbolArray(options.DesiredCapabilities);
protonConnection.Properties = ClientConversionSupport.ToSymbolKeyedMap(options.Properties);
protonConnection.LocalOpenHandler(HandleLocalOpen)
.LocalCloseHandler(HandleLocalClose)
.OpenHandler(HandleRemoteOpen)
.CloseHandler(HandleRemoteClose);
if (options.SaslOptions.SaslEnabled)
{
SaslMechanismSelector mechSelector = new SaslMechanismSelector(
ClientConversionSupport.ToSymbolSet(options.SaslOptions.AllowedMechanisms));
engine.SaslDriver.Client().Authenticator =
new SaslAuthenticator(mechSelector, new ClientSaslCredentialsProvider(this));
}
}
private ClientSession LazyCreateConnectionSession()
{
return connectionSession ?? (connectionSession = sessionBuilder.Session().Open());
}
private ClientSender LazyCreateConnectionSender()
{
if (connectionSender == null)
{
if (openFuture.Task.IsCompleted)
{
CheckAnonymousRelaySupported();
}
connectionSender = LazyCreateConnectionSession().InternalOpenAnonymousSender();
connectionSender.SenderRemotelyClosedHandler(sender =>
{
try
{
sender.CloseAsync();
}
catch (Exception) { }
// Clear the old closed sender, a lazy create needs to construct a new sender.
connectionSender = null;
});
}
return connectionSender;
}
private void WaitForOpenToComplete()
{
if (!openFuture.Task.IsCompleted || openFuture.Task.IsFaulted)
{
try
{
openFuture.Task.GetAwaiter().GetResult();
}
catch (Exception e)
{
throw failureCause.Value ?? ClientExceptionSupport.CreateNonFatalOrPassthrough(e);
}
}
}
private void FailConnection(ClientIOException cause)
{
failureCause.CompareAndSet(null, cause);
try
{
protonConnection.Close();
}
catch (Exception) { }
try
{
engine.Shutdown();
}
catch (Exception) { }
_ = openFuture.TrySetException(failureCause);
_ = closeFuture.TrySetResult(this);
LOG.Warn("Connection {0} has failed due to: {1}", ConnectionId, failureCause != null ?
failureCause.Get().GetType().Name + " -> " + failureCause.Get().Message : "No failure details provided.");
SubmitDisconnectionEvent(options.DisconnectedHandler, transport.Host, transport.Port, cause);
}
#endregion
#region Client reconnection support API
private void AttemptConnection(ReconnectLocation location)
{
try
{
reconnectAttempts++;
transport = new ClientTransportProxy(engine, ioContext.NewTransport()).Transport;
LOG.Trace("Connection {0} Attempting connection to remote {1}", ConnectionId, location.Host, location.Port);
transport.Connect(location.Host, location.Port);
}
catch (Exception error)
{
engine.EngineFailed(ClientExceptionSupport.CreateOrPassthroughFatal(error));
}
}
private void ScheduleReconnect(ReconnectLocation location)
{
// Warn of ongoing connection attempts if configured.
int warnInterval = options.ReconnectOptions.WarnAfterReconnectAttempts;
if (reconnectAttempts > 0 && warnInterval > 0 && (reconnectAttempts % warnInterval) == 0)
{
LOG.Warn("Connection {0}: Failed to connect after: {1} attempt(s) continuing to retry.", ConnectionId, reconnectAttempts);
}
// If no connection recovery required then we have never fully connected to a remote
// so we proceed down the connect with one immediate connection attempt and then follow
// on delayed attempts based on configuration.
if (totalConnections == 0)
{
if (reconnectAttempts == 0)
{
LOG.Trace("Initial connect attempt will be performed immediately");
ioContext.EventLoop.Execute(() => AttemptConnection(location));
}
else
{
long delay = NextReconnectDelay();
LOG.Trace("Next connect attempt will be in {0} milliseconds", delay);
// TODO: Executor scheduling would handle connection close, this will
// try and run this even after a close.
Task.Delay((int)delay).ContinueWith(
(t) => ioContext.EventLoop.Execute(() =>
{
if (!IsClosed)
{
AttemptConnection(location);
}
})
);
}
}
else if (reconnectAttempts == 0)
{
LOG.Trace("Initial reconnect attempt will be performed immediately");
ioContext.EventLoop.Execute(() => AttemptConnection(location));
}
else
{
long delay = NextReconnectDelay();
LOG.Trace("Next reconnect attempt will be in {0} milliseconds", delay);
// TODO: Executor scheduling would handle connection close, this will
// try and run this even after a close.
Task.Delay((int)delay).ContinueWith(
(t) => ioContext.EventLoop.Execute(() =>
{
if (!IsClosed)
{
AttemptConnection(location);
}
})
);
}
}
private void ConnectionEstablished()
{
// After each successful connection is made, update stats to
// prepare for the next eventual failure.
totalConnections++;
nextReconnectDelay = -1;
reconnectAttempts = 0;
}
private bool IsLimitExceeded()
{
int reconnectLimit = ReconnectAttemptLimit();
if (reconnectLimit != UNLIMITED && reconnectAttempts >= reconnectLimit)
{
return true;
}
return false;
}
private bool IsReconnectAllowed(ClientException cause)
{
if (options.ReconnectOptions.ReconnectEnabled && !IsClosed)
{
// If a connection attempts fail due to Security errors then we abort
// reconnection as there is a configuration issue and we want to avoid
// a spinning reconnect cycle that can never complete.
if (IsStoppageCause(cause))
{
return false;
}
return !IsLimitExceeded();
}
else
{
return false;
}
}
private bool IsStoppageCause(ClientException cause)
{
if (cause is ClientConnectionSecuritySaslException saslFailure)
{
return !saslFailure.IsTemporaryFailure;
}
else if (cause is ClientConnectionSecurityException)
{
return true;
}
else
{
return false;
}
}
private int ReconnectAttemptLimit()
{
int maxReconnectValue = options.ReconnectOptions.MaxReconnectAttempts;
if (totalConnections == 0 && options.ReconnectOptions.MaxInitialConnectionAttempts != UNDEFINED)
{
// If this is the first connection attempt and a specific startup retry limit
// is configured then use it, otherwise use the main reconnect limit
maxReconnectValue = options.ReconnectOptions.MaxInitialConnectionAttempts;
}
return maxReconnectValue;
}
private long NextReconnectDelay()
{
if (nextReconnectDelay == UNDEFINED)
{
nextReconnectDelay = options.ReconnectOptions.ReconnectDelay;
}
if (options.ReconnectOptions.UseReconnectBackOff && reconnectAttempts > 1)
{
// Exponential increment of reconnect delay.
nextReconnectDelay = (long)(nextReconnectDelay * options.ReconnectOptions.ReconnectBackOffMultiplier);
if (nextReconnectDelay > options.ReconnectOptions.MaxReconnectDelay)
{
nextReconnectDelay = options.ReconnectOptions.MaxReconnectDelay;
}
}
return nextReconnectDelay;
}
private void SubmitConnectionEvent(Action<IConnection, ConnectionEvent> handler, string host, int port)
{
if (handler != null)
{
try
{
Task.Factory.StartNew(() => handler(this, new ConnectionEvent(host, port)));
}
catch (Exception ex)
{
LOG.Trace("Error thrown while attempting to submit event notification ", ex);
}
}
}
private void SubmitDisconnectionEvent(Action<IConnection, DisconnectionEvent> handler, string host, int port, ClientIOException cause)
{
if (handler != null)
{
try
{
Task.Factory.StartNew(() => handler(this, new DisconnectionEvent(host, port, cause)));
}
catch (Exception ex)
{
LOG.Trace("Error thrown while attempting to submit event notification ", ex);
}
}
}
#endregion
#region Connection defined SASL Credentials provider
private class ClientSaslCredentialsProvider : ISaslCredentialsProvider
{
private ClientConnection connection;
private ConnectionOptions options;
public ClientSaslCredentialsProvider(ClientConnection connection)
{
this.connection = connection;
this.options = connection.options;
}
public string VHost => options.VirtualHost;
public string Username => options.User;
public string Password => options.Password;
public IPrincipal LocalPrincipal => connection.transport?.LocalPrincipal;
}
#endregion
}
}