Dispose channels to ensure the consumer and reader attach to the new queue.
diff --git a/src/DotPulsar/Internal/Consumer.cs b/src/DotPulsar/Internal/Consumer.cs
index 2090cdf..80e6369 100644
--- a/src/DotPulsar/Internal/Consumer.cs
+++ b/src/DotPulsar/Internal/Consumer.cs
@@ -145,10 +145,19 @@
}, cancellationToken).ConfigureAwait(false);
}
- internal void SetChannel(IConsumerChannel channel)
+ internal async ValueTask SetChannel(IConsumerChannel channel)
{
- ThrowIfDisposed();
+ if (_isDisposed != 0)
+ {
+ await channel.DisposeAsync().ConfigureAwait(false);
+ return;
+ }
+
+ var oldChannel = _channel;
_channel = channel;
+
+ if (oldChannel != null)
+ await oldChannel.DisposeAsync().ConfigureAwait(false);
}
private void ThrowIfDisposed()
diff --git a/src/DotPulsar/Internal/ConsumerProcess.cs b/src/DotPulsar/Internal/ConsumerProcess.cs
index fabf4d8..449aedb 100644
--- a/src/DotPulsar/Internal/ConsumerProcess.cs
+++ b/src/DotPulsar/Internal/ConsumerProcess.cs
@@ -84,18 +84,8 @@
private async void SetupChannel()
{
- IConsumerChannel? channel = null;
-
- try
- {
- channel = await _factory.Create(CancellationTokenSource.Token).ConfigureAwait(false);
- _consumer.SetChannel(channel);
- }
- catch
- {
- if (channel != null)
- await channel.DisposeAsync().ConfigureAwait(false);
- }
+ var channel = await _factory.Create(CancellationTokenSource.Token).ConfigureAwait(false);
+ await _consumer.SetChannel(channel).ConfigureAwait(false);
}
}
}
diff --git a/src/DotPulsar/Internal/Producer.cs b/src/DotPulsar/Internal/Producer.cs
index a40c494..3303d01 100644
--- a/src/DotPulsar/Internal/Producer.cs
+++ b/src/DotPulsar/Internal/Producer.cs
@@ -108,10 +108,19 @@
return new MessageId(response.MessageId);
}
- internal void SetChannel(IProducerChannel channel)
+ internal async ValueTask SetChannel(IProducerChannel channel)
{
- ThrowIfDisposed();
+ if (_isDisposed != 0)
+ {
+ await channel.DisposeAsync().ConfigureAwait(false);
+ return;
+ }
+
+ var oldChannel = _channel;
_channel = channel;
+
+ if (oldChannel != null)
+ await oldChannel.DisposeAsync().ConfigureAwait(false);
}
private void ThrowIfDisposed()
diff --git a/src/DotPulsar/Internal/ProducerProcess.cs b/src/DotPulsar/Internal/ProducerProcess.cs
index c2f1cba..fad638a 100644
--- a/src/DotPulsar/Internal/ProducerProcess.cs
+++ b/src/DotPulsar/Internal/ProducerProcess.cs
@@ -68,18 +68,8 @@
private async void SetupChannel()
{
- IProducerChannel? channel = null;
-
- try
- {
- channel = await _factory.Create(CancellationTokenSource.Token).ConfigureAwait(false);
- _producer.SetChannel(channel);
- }
- catch
- {
- if (channel != null)
- await channel.DisposeAsync().ConfigureAwait(false);
- }
+ var channel = await _factory.Create(CancellationTokenSource.Token).ConfigureAwait(false);
+ await _producer.SetChannel(channel).ConfigureAwait(false);
}
}
}
diff --git a/src/DotPulsar/Internal/Reader.cs b/src/DotPulsar/Internal/Reader.cs
index 2cab4a9..03f4e4f 100644
--- a/src/DotPulsar/Internal/Reader.cs
+++ b/src/DotPulsar/Internal/Reader.cs
@@ -89,10 +89,19 @@
await _channel.DisposeAsync().ConfigureAwait(false);
}
- internal void SetChannel(IReaderChannel channel)
+ internal async ValueTask SetChannel(IReaderChannel channel)
{
- ThrowIfDisposed();
+ if (_isDisposed != 0)
+ {
+ await channel.DisposeAsync().ConfigureAwait(false);
+ return;
+ }
+
+ var oldChannel = _channel;
_channel = channel;
+
+ if (oldChannel != null)
+ await oldChannel.DisposeAsync().ConfigureAwait(false);
}
private void ThrowIfDisposed()
diff --git a/src/DotPulsar/Internal/ReaderProcess.cs b/src/DotPulsar/Internal/ReaderProcess.cs
index f5ef119..9b9a438 100644
--- a/src/DotPulsar/Internal/ReaderProcess.cs
+++ b/src/DotPulsar/Internal/ReaderProcess.cs
@@ -71,18 +71,8 @@
private async void SetupChannel()
{
- IReaderChannel? channel = null;
-
- try
- {
- channel = await _factory.Create(CancellationTokenSource.Token).ConfigureAwait(false);
- _reader.SetChannel(channel);
- }
- catch
- {
- if (channel != null)
- await channel.DisposeAsync().ConfigureAwait(false);
- }
+ var channel = await _factory.Create(CancellationTokenSource.Token).ConfigureAwait(false);
+ await _reader.SetChannel(channel).ConfigureAwait(false);
}
}
}