More small tweaks.
diff --git a/src/DotPulsar/Internal/ProcessManager.cs b/src/DotPulsar/Internal/ProcessManager.cs
index 9be1653..29b1e0b 100644
--- a/src/DotPulsar/Internal/ProcessManager.cs
+++ b/src/DotPulsar/Internal/ProcessManager.cs
@@ -34,9 +34,7 @@
public async ValueTask DisposeAsync()
{
- var processes = _processes.Values.ToArray();
-
- foreach (var proc in processes)
+ foreach (var proc in _processes.Values.ToArray())
await proc.DisposeAsync().ConfigureAwait(false);
await _connectionPool.DisposeAsync().ConfigureAwait(false);
diff --git a/src/DotPulsar/Internal/ProducerBuilder.cs b/src/DotPulsar/Internal/ProducerBuilder.cs
index 8916cd0..40f4599 100644
--- a/src/DotPulsar/Internal/ProducerBuilder.cs
+++ b/src/DotPulsar/Internal/ProducerBuilder.cs
@@ -53,13 +53,11 @@
if (string.IsNullOrEmpty(_topic))
throw new ConfigurationException("ProducerOptions.Topic may not be null or empty");
- var options = new ProducerOptions(_topic!)
+ return _pulsarClient.CreateProducer(new ProducerOptions(_topic!)
{
InitialSequenceId = _initialSequenceId,
ProducerName = _producerName
- };
-
- return _pulsarClient.CreateProducer(options);
+ });
}
}
}
diff --git a/src/DotPulsar/Internal/ProducerChannel.cs b/src/DotPulsar/Internal/ProducerChannel.cs
index 537104c..d6181e7 100644
--- a/src/DotPulsar/Internal/ProducerChannel.cs
+++ b/src/DotPulsar/Internal/ProducerChannel.cs
@@ -106,9 +106,9 @@
}
finally
{
+ // Reset in case the user reuse the MessageMetadata, but is not explicitly setting the sequenceId
if (autoAssignSequenceId)
- _cachedSendPackage.Metadata.SequenceId =
- 0; // Reset in case the user reuse the MessageMetadata, but is not explicitly setting the sequenceId
+ _cachedSendPackage.Metadata.SequenceId = 0;
}
}
}
diff --git a/src/DotPulsar/Internal/PulsarClientBuilder.cs b/src/DotPulsar/Internal/PulsarClientBuilder.cs
index 95827e6..bd1b166 100644
--- a/src/DotPulsar/Internal/PulsarClientBuilder.cs
+++ b/src/DotPulsar/Internal/PulsarClientBuilder.cs
@@ -145,19 +145,17 @@
{
throw new InvalidSchemeException($"Invalid scheme '{scheme}'. Expected '{Constants.PulsarScheme}' or '{Constants.PulsarSslScheme}'");
}
-
+
var connector = new Connector(_clientCertificates, _trustedCertificateAuthority, _verifyCertificateAuthority, _verifyCertificateName);
var connectionPool = new ConnectionPool(_commandConnect, _serviceUrl, connector, _encryptionPolicy.Value, _closeInactiveConnectionsInterval);
- var exceptionHandlers = new List<IHandleException>(_exceptionHandlers)
+ var processManager = new ProcessManager(connectionPool);
+
+ var exceptionHandlerPipeline = new ExceptionHandlerPipeline(new List<IHandleException>(_exceptionHandlers)
{
new DefaultExceptionHandler(_retryInterval)
- };
-
- var exceptionHandlerPipeline = new ExceptionHandlerPipeline(exceptionHandlers);
-
- var processManager = new ProcessManager(connectionPool);
+ });
return new PulsarClient(connectionPool, processManager, exceptionHandlerPipeline);
}
diff --git a/src/DotPulsar/Internal/ReaderBuilder.cs b/src/DotPulsar/Internal/ReaderBuilder.cs
index 41ff928..15b7c64 100644
--- a/src/DotPulsar/Internal/ReaderBuilder.cs
+++ b/src/DotPulsar/Internal/ReaderBuilder.cs
@@ -71,14 +71,12 @@
if (string.IsNullOrEmpty(_topic))
throw new ConfigurationException("Topic may not be null or empty");
- var options = new ReaderOptions(_startMessageId, _topic!)
+ return _pulsarClient.CreateReader(new ReaderOptions(_startMessageId, _topic!)
{
MessagePrefetchCount = _messagePrefetchCount,
ReadCompacted = _readCompacted,
ReaderName = _readerName
- };
-
- return _pulsarClient.CreateReader(options);
+ });
}
}
}
diff --git a/src/DotPulsar/Internal/StateTask.cs b/src/DotPulsar/Internal/StateTask.cs
index dbd5a08..5943f09 100644
--- a/src/DotPulsar/Internal/StateTask.cs
+++ b/src/DotPulsar/Internal/StateTask.cs
@@ -30,15 +30,10 @@
public CancelableCompletionSource<TState> CancelableCompletionSource { get; }
+ public bool IsAwaiting(TState state)
+ => _change == StateChanged.To ? _state.Equals(state) : !_state.Equals(state);
+
public void Dispose()
=> CancelableCompletionSource.Dispose();
-
- public bool IsAwaiting(TState state)
- {
- if (_change == StateChanged.To)
- return _state.Equals(state);
-
- return !_state.Equals(state);
- }
}
}
diff --git a/src/DotPulsar/Internal/StateTaskCollection.cs b/src/DotPulsar/Internal/StateTaskCollection.cs
index d8d2547..7606f02 100644
--- a/src/DotPulsar/Internal/StateTaskCollection.cs
+++ b/src/DotPulsar/Internal/StateTaskCollection.cs
@@ -39,6 +39,7 @@
}
node.Value.CancelableCompletionSource.SetupCancellation(() => TaskWasCanceled(node), cancellationToken);
+
return node.Value.CancelableCompletionSource.Task;
}