AMQNET-610: Race condition during consumer creation
diff --git a/src/NMS.AMQP/NmsDurableTopicSubscriber.cs b/src/NMS.AMQP/NmsDurableTopicSubscriber.cs
new file mode 100644
index 0000000..f39d7ea
--- /dev/null
+++ b/src/NMS.AMQP/NmsDurableTopicSubscriber.cs
@@ -0,0 +1,17 @@
+using Apache.NMS.AMQP.Util;
+
+namespace Apache.NMS.AMQP
+{
+ public class NmsDurableTopicSubscriber : NmsMessageConsumer
+ {
+ public NmsDurableTopicSubscriber(Id consumerId, NmsSession session, IDestination destination, string selector, bool noLocal) : base(consumerId, session, destination, selector, noLocal)
+ {
+ }
+
+ public NmsDurableTopicSubscriber(Id consumerId, NmsSession session, IDestination destination, string name, string selector, bool noLocal) : base(consumerId, session, destination, name, selector, noLocal)
+ {
+ }
+
+ protected override bool IsDurableSubscription => true;
+ }
+}
\ No newline at end of file
diff --git a/src/NMS.AMQP/NmsMessageConsumer.cs b/src/NMS.AMQP/NmsMessageConsumer.cs
index 13a4cf0..34afd0c 100644
--- a/src/NMS.AMQP/NmsMessageConsumer.cs
+++ b/src/NMS.AMQP/NmsMessageConsumer.cs
@@ -40,7 +40,7 @@
{
}
- public NmsMessageConsumer(Id consumerId, NmsSession session, IDestination destination, string name, string selector, bool noLocal)
+ protected NmsMessageConsumer(Id consumerId, NmsSession session, IDestination destination, string name, string selector, bool noLocal)
{
Session = session;
acknowledgementMode = session.AcknowledgementMode;
@@ -56,9 +56,14 @@
Selector = selector,
NoLocal = noLocal,
SubscriptionName = name,
- LocalMessageExpiry = Session.Connection.ConnectionInfo.LocalMessageExpiry
+ LocalMessageExpiry = Session.Connection.ConnectionInfo.LocalMessageExpiry,
+ IsDurable = IsDurableSubscription
};
deliveryTask = new MessageDeliveryTask(this);
+
+ Session.Connection.CreateResource(Info).ConfigureAwait(false).GetAwaiter().GetResult();
+
+ Session.Add(this);
if (Session.IsStarted)
Start();
@@ -68,6 +73,8 @@
public ConsumerInfo Info { get; }
public IDestination Destination => Info.Destination;
+ protected virtual bool IsDurableSubscription => false;
+
public void Dispose()
{
try
@@ -186,10 +193,9 @@
private event MessageListener Listener;
- public async Task Init()
+ public Task Init()
{
- await Session.Connection.CreateResource(Info);
- await Session.Connection.StartResource(Info);
+ return Session.Connection.StartResource(Info);
}
public void OnInboundMessage(InboundMessageDispatch envelope)
@@ -417,10 +423,10 @@
{
if (closed.CompareAndSet(false, true))
{
- messageQueue.Dispose();
failureCause = exception;
Session.Remove(this);
started.Set(false);
+ messageQueue.Dispose();
}
}
@@ -500,9 +506,9 @@
{
try
{
- Session.Connection.StartResource(Info);
+ Session.Connection.StartResource(Info).ConfigureAwait(false).GetAwaiter().GetResult();
}
- catch (NMSException ex)
+ catch (NMSException)
{
Session.Remove(this);
throw;
diff --git a/src/NMS.AMQP/NmsMessageProducer.cs b/src/NMS.AMQP/NmsMessageProducer.cs
index feca455..c5da8e2 100644
--- a/src/NMS.AMQP/NmsMessageProducer.cs
+++ b/src/NMS.AMQP/NmsMessageProducer.cs
@@ -43,6 +43,10 @@
{
Destination = destination
};
+
+ session.Connection.CreateResource(Info).ConfigureAwait(false).GetAwaiter().GetResult();
+
+ session.Add(this);
}
public ProducerInfo Info { get; }
diff --git a/src/NMS.AMQP/NmsSession.cs b/src/NMS.AMQP/NmsSession.cs
index 5843606..c9f008d 100644
--- a/src/NMS.AMQP/NmsSession.cs
+++ b/src/NMS.AMQP/NmsSession.cs
@@ -20,6 +20,7 @@
using System.Linq;
using System.Threading;
using System.Threading.Tasks;
+using Amqp;
using Apache.NMS.AMQP.Message;
using Apache.NMS.AMQP.Meta;
using Apache.NMS.AMQP.Provider;
@@ -99,10 +100,7 @@
public IMessageProducer CreateProducer(IDestination destination)
{
- NmsMessageProducer messageProducer = new NmsMessageProducer(producerIdGenerator.GenerateId(), this, destination);
- messageProducer.Init().ConfigureAwait(false).GetAwaiter().GetResult();
- producers.TryAdd(messageProducer.Info.Id, messageProducer);
- return messageProducer;
+ return new NmsMessageProducer(producerIdGenerator.GenerateId(), this, destination);
}
public IMessageConsumer CreateConsumer(IDestination destination)
@@ -121,8 +119,6 @@
NmsMessageConsumer messageConsumer = new NmsMessageConsumer(consumerIdGenerator.GenerateId(), this, destination, selector, noLocal);
messageConsumer.Init().ConfigureAwait(false).GetAwaiter().GetResult();
-
- consumers.TryAdd(messageConsumer.Info.Id, messageConsumer);
return messageConsumer;
}
@@ -131,10 +127,8 @@
{
CheckClosed();
- NmsMessageConsumer messageConsumer = new NmsMessageConsumer(consumerIdGenerator.GenerateId(), this, destination, name, selector, noLocal);
- messageConsumer.Info.IsDurable = true;
+ NmsMessageConsumer messageConsumer = new NmsDurableTopicSubscriber(consumerIdGenerator.GenerateId(), this, destination, name, selector, noLocal);
messageConsumer.Init().ConfigureAwait(false).GetAwaiter().GetResult();
- consumers.TryAdd(messageConsumer.Info.Id, messageConsumer);
return messageConsumer;
}
@@ -474,14 +468,24 @@
return producer;
}
+ public void Add(NmsMessageConsumer messageConsumer)
+ {
+ consumers.TryAdd(messageConsumer.Info.Id, messageConsumer);
+ }
+
+ public void Add(NmsMessageProducer messageProducer)
+ {
+ producers.TryAdd(messageProducer.Info.Id, messageProducer);
+ }
+
public void Remove(NmsMessageConsumer messageConsumer)
{
- consumers.TryRemove(messageConsumer.Info.Id, out messageConsumer);
+ consumers.TryRemove(messageConsumer.Info.Id, out _);
}
public void Remove(NmsMessageProducer messageProducer)
{
- producers.TryRemove(messageProducer.Info.Id, out messageProducer);
+ producers.TryRemove(messageProducer.Info.Id, out _);
}
public void Shutdown(NMSException exception = null)
diff --git a/src/NMS.AMQP/Provider/Amqp/AmqpConnection.cs b/src/NMS.AMQP/Provider/Amqp/AmqpConnection.cs
index 03ca15e..3410eb6 100644
--- a/src/NMS.AMQP/Provider/Amqp/AmqpConnection.cs
+++ b/src/NMS.AMQP/Provider/Amqp/AmqpConnection.cs
@@ -74,7 +74,7 @@
underlyingConnection.AddClosedCallback((sender, error) => Provider.OnConnectionClosed(error));
// Wait for connection to be opened
- await tsc.Task;
+ await tsc.Task.ConfigureAwait(false);
// Create a Session for this connection that is used for Temporary Destinations
// and perhaps later on management and advisory monitoring.
@@ -130,7 +130,7 @@
public async Task CreateSession(SessionInfo sessionInfo)
{
var amqpSession = new AmqpSession(this, sessionInfo);
- await amqpSession.Start();
+ await amqpSession.Start().ConfigureAwait(false);
sessions.TryAdd(sessionInfo.Id, amqpSession);
}
@@ -154,8 +154,7 @@
{
return session;
}
-
- throw new Exception();
+ throw new InvalidOperationException($"Amqp Session {sessionId} doesn't exist and cannot be retrieved.");
}
public void RemoveSession(Id sessionId)
diff --git a/src/NMS.AMQP/Provider/Amqp/AmqpSession.cs b/src/NMS.AMQP/Provider/Amqp/AmqpSession.cs
index d20ce76..ee81348 100644
--- a/src/NMS.AMQP/Provider/Amqp/AmqpSession.cs
+++ b/src/NMS.AMQP/Provider/Amqp/AmqpSession.cs
@@ -69,7 +69,6 @@
SessionInfo.remoteChannel = begin.RemoteChannel;
tcs.TrySetResult(true);
});
- UnderlyingSession.AddClosedCallback((sender, error) => { tcs.TrySetException(ExceptionSupport.GetException(error)); });
UnderlyingSession.AddClosedCallback((sender, error) =>
{
if (!tcs.TrySetException(ExceptionSupport.GetException(error)))
diff --git a/src/NMS.AMQP/Provider/Failover/FailoverProvider.cs b/src/NMS.AMQP/Provider/Failover/FailoverProvider.cs
index b338555..923ab83 100644
--- a/src/NMS.AMQP/Provider/Failover/FailoverProvider.cs
+++ b/src/NMS.AMQP/Provider/Failover/FailoverProvider.cs
@@ -128,8 +128,8 @@
{
Tracer.Debug($"Connection attempt:[{reconnectAttempts}] to: {target.Scheme}://{target.Host}:{target.Port} in-progress");
provider = ProviderFactory.Create(target);
- await provider.Connect(connectionInfo);
- await InitializeNewConnection(provider);
+ await provider.Connect(connectionInfo).ConfigureAwait(false);
+ await InitializeNewConnection(provider).ConfigureAwait(false);
return;
}
catch (Exception e)
@@ -171,7 +171,7 @@
}
else
{
- await reconnectControl.ScheduleReconnect(Reconnect);
+ await reconnectControl.ScheduleReconnect(Reconnect).ConfigureAwait(false);
}
}
}
@@ -218,21 +218,21 @@
Tracer.Debug($"Signalling connection recovery: {provider}");
// Allow listener to recover its resources
- await listener.OnConnectionRecovery(provider);
+ await listener.OnConnectionRecovery(provider).ConfigureAwait(false);
// Restart consumers, send pull commands, etc.
- await listener.OnConnectionRecovered(provider);
+ await listener.OnConnectionRecovered(provider).ConfigureAwait(false);
// Let the client know that connection has restored.
listener.OnConnectionRestored(connectedUri);
// If we try to run pending requests right after the connection is reestablished
// it will result in timeout on the first send request
- await Task.Delay(50);
+ await Task.Delay(50).ConfigureAwait(false);
foreach (FailoverRequest request in GetPendingRequests())
{
- await request.Run();
+ await request.Run().ConfigureAwait(false);
}
reconnectControl.ConnectionEstablished();
diff --git a/src/NMS.AMQP/Provider/Failover/FailoverRequest.cs b/src/NMS.AMQP/Provider/Failover/FailoverRequest.cs
index b54a20f..b6a5684 100644
--- a/src/NMS.AMQP/Provider/Failover/FailoverRequest.cs
+++ b/src/NMS.AMQP/Provider/Failover/FailoverRequest.cs
@@ -61,7 +61,7 @@
{
try
{
- await this.DoTask(activeProvider);
+ await this.DoTask(activeProvider).ConfigureAwait(false);
this.taskCompletionSource.TrySetResult(true);
this.failoverProvider.RemoveFailoverRequest(this);
this.cancellationTokenSource?.Dispose();