Added support for setting the listener name on the client
diff --git a/src/DotPulsar/Abstractions/IPulsarClientBuilder.cs b/src/DotPulsar/Abstractions/IPulsarClientBuilder.cs
index d632aae..839b1bf 100644
--- a/src/DotPulsar/Abstractions/IPulsarClientBuilder.cs
+++ b/src/DotPulsar/Abstractions/IPulsarClientBuilder.cs
@@ -43,6 +43,11 @@
IPulsarClientBuilder ExceptionHandler(IHandleException exceptionHandler);
/// <summary>
+ /// Set the listener name. This is optional.
+ /// </summary>
+ IPulsarClientBuilder ListenerName(string listenerName);
+
+ /// <summary>
/// The time to wait before retrying an operation or a reconnect. The default is 3 seconds.
/// </summary>
IPulsarClientBuilder RetryInterval(TimeSpan interval);
diff --git a/src/DotPulsar/Internal/ConnectionPool.cs b/src/DotPulsar/Internal/ConnectionPool.cs
index f6eeb60..dd4f366 100644
--- a/src/DotPulsar/Internal/ConnectionPool.cs
+++ b/src/DotPulsar/Internal/ConnectionPool.cs
@@ -32,22 +32,24 @@
private readonly Connector _connector;
private readonly EncryptionPolicy _encryptionPolicy;
private readonly ConcurrentDictionary<PulsarUrl, Connection> _connections;
-
private readonly CancellationTokenSource _cancellationTokenSource;
private readonly Task _closeInactiveConnections;
+ private readonly string? _listenerName;
public ConnectionPool(
CommandConnect commandConnect,
Uri serviceUrl,
Connector connector,
EncryptionPolicy encryptionPolicy,
- TimeSpan closeInactiveConnectionsInterval)
+ TimeSpan closeInactiveConnectionsInterval,
+ string? listenerName)
{
_lock = new AsyncLock();
_commandConnect = commandConnect;
_serviceUrl = serviceUrl;
_connector = connector;
_encryptionPolicy = encryptionPolicy;
+ _listenerName = listenerName;
_connections = new ConcurrentDictionary<PulsarUrl, Connection>();
_cancellationTokenSource = new CancellationTokenSource();
_closeInactiveConnections = CloseInactiveConnections(closeInactiveConnectionsInterval, _cancellationTokenSource.Token);
@@ -72,7 +74,8 @@
var lookup = new CommandLookupTopic
{
Topic = topic,
- Authoritative = false
+ Authoritative = false,
+ AdvertisedListenerName = _listenerName
};
var physicalUrl = _serviceUrl;
diff --git a/src/DotPulsar/Internal/PulsarClientBuilder.cs b/src/DotPulsar/Internal/PulsarClientBuilder.cs
index fc1a2e0..77b4afb 100644
--- a/src/DotPulsar/Internal/PulsarClientBuilder.cs
+++ b/src/DotPulsar/Internal/PulsarClientBuilder.cs
@@ -27,6 +27,7 @@
private readonly CommandConnect _commandConnect;
private readonly List<IHandleException> _exceptionHandlers;
private EncryptionPolicy? _encryptionPolicy;
+ private string? _listenerName;
private TimeSpan _retryInterval;
private Uri _serviceUrl;
private X509Certificate2? _trustedCertificateAuthority;
@@ -78,6 +79,12 @@
return this;
}
+ public IPulsarClientBuilder ListenerName(string listenerName)
+ {
+ _listenerName = listenerName;
+ return this;
+ }
+
public IPulsarClientBuilder RetryInterval(TimeSpan interval)
{
_retryInterval = interval;
@@ -139,7 +146,7 @@
var connector = new Connector(_clientCertificates, _trustedCertificateAuthority, _verifyCertificateAuthority, _verifyCertificateName);
- var connectionPool = new ConnectionPool(_commandConnect, _serviceUrl, connector, _encryptionPolicy.Value, _closeInactiveConnectionsInterval);
+ var connectionPool = new ConnectionPool(_commandConnect, _serviceUrl, connector, _encryptionPolicy.Value, _closeInactiveConnectionsInterval, _listenerName);
var processManager = new ProcessManager(connectionPool);
var exceptionHandlers = new List<IHandleException>(_exceptionHandlers) { new DefaultExceptionHandler(_retryInterval) };
var exceptionHandlerPipeline = new ExceptionHandlerPipeline(exceptionHandlers);