Added support for setting the listener name on the client
diff --git a/src/DotPulsar/Abstractions/IPulsarClientBuilder.cs b/src/DotPulsar/Abstractions/IPulsarClientBuilder.cs
index d632aae..839b1bf 100644
--- a/src/DotPulsar/Abstractions/IPulsarClientBuilder.cs
+++ b/src/DotPulsar/Abstractions/IPulsarClientBuilder.cs
@@ -43,6 +43,11 @@
         IPulsarClientBuilder ExceptionHandler(IHandleException exceptionHandler);
 
         /// <summary>
+        /// Set the listener name. This is optional.
+        /// </summary>
+        IPulsarClientBuilder ListenerName(string listenerName);
+
+        /// <summary>
         /// The time to wait before retrying an operation or a reconnect. The default is 3 seconds.
         /// </summary>
         IPulsarClientBuilder RetryInterval(TimeSpan interval);
diff --git a/src/DotPulsar/Internal/ConnectionPool.cs b/src/DotPulsar/Internal/ConnectionPool.cs
index f6eeb60..dd4f366 100644
--- a/src/DotPulsar/Internal/ConnectionPool.cs
+++ b/src/DotPulsar/Internal/ConnectionPool.cs
@@ -32,22 +32,24 @@
         private readonly Connector _connector;
         private readonly EncryptionPolicy _encryptionPolicy;
         private readonly ConcurrentDictionary<PulsarUrl, Connection> _connections;
-
         private readonly CancellationTokenSource _cancellationTokenSource;
         private readonly Task _closeInactiveConnections;
+        private readonly string? _listenerName;
 
         public ConnectionPool(
             CommandConnect commandConnect,
             Uri serviceUrl,
             Connector connector,
             EncryptionPolicy encryptionPolicy,
-            TimeSpan closeInactiveConnectionsInterval)
+            TimeSpan closeInactiveConnectionsInterval,
+            string? listenerName)
         {
             _lock = new AsyncLock();
             _commandConnect = commandConnect;
             _serviceUrl = serviceUrl;
             _connector = connector;
             _encryptionPolicy = encryptionPolicy;
+            _listenerName = listenerName;
             _connections = new ConcurrentDictionary<PulsarUrl, Connection>();
             _cancellationTokenSource = new CancellationTokenSource();
             _closeInactiveConnections = CloseInactiveConnections(closeInactiveConnectionsInterval, _cancellationTokenSource.Token);
@@ -72,7 +74,8 @@
             var lookup = new CommandLookupTopic
             {
                 Topic = topic,
-                Authoritative = false
+                Authoritative = false,
+                AdvertisedListenerName = _listenerName
             };
 
             var physicalUrl = _serviceUrl;
diff --git a/src/DotPulsar/Internal/PulsarClientBuilder.cs b/src/DotPulsar/Internal/PulsarClientBuilder.cs
index fc1a2e0..77b4afb 100644
--- a/src/DotPulsar/Internal/PulsarClientBuilder.cs
+++ b/src/DotPulsar/Internal/PulsarClientBuilder.cs
@@ -27,6 +27,7 @@
         private readonly CommandConnect _commandConnect;
         private readonly List<IHandleException> _exceptionHandlers;
         private EncryptionPolicy? _encryptionPolicy;
+        private string? _listenerName;
         private TimeSpan _retryInterval;
         private Uri _serviceUrl;
         private X509Certificate2? _trustedCertificateAuthority;
@@ -78,6 +79,12 @@
             return this;
         }
 
+        public IPulsarClientBuilder ListenerName(string listenerName)
+        {
+            _listenerName = listenerName;
+            return this;
+        }
+
         public IPulsarClientBuilder RetryInterval(TimeSpan interval)
         {
             _retryInterval = interval;
@@ -139,7 +146,7 @@
 
 
             var connector = new Connector(_clientCertificates, _trustedCertificateAuthority, _verifyCertificateAuthority, _verifyCertificateName);
-            var connectionPool = new ConnectionPool(_commandConnect, _serviceUrl, connector, _encryptionPolicy.Value, _closeInactiveConnectionsInterval);
+            var connectionPool = new ConnectionPool(_commandConnect, _serviceUrl, connector, _encryptionPolicy.Value, _closeInactiveConnectionsInterval, _listenerName);
             var processManager = new ProcessManager(connectionPool);
             var exceptionHandlers = new List<IHandleException>(_exceptionHandlers) { new DefaultExceptionHandler(_retryInterval) };
             var exceptionHandlerPipeline = new ExceptionHandlerPipeline(exceptionHandlers);