Add token authentication and fault producer/consumer/reader when host/network is unknown/unreachable.
diff --git a/src/DotPulsar/Abstractions/IPulsarClientBuilder.cs b/src/DotPulsar/Abstractions/IPulsarClientBuilder.cs
index 95ea924..894d1a7 100644
--- a/src/DotPulsar/Abstractions/IPulsarClientBuilder.cs
+++ b/src/DotPulsar/Abstractions/IPulsarClientBuilder.cs
@@ -9,6 +9,11 @@
     public interface IPulsarClientBuilder
     {
         /// <summary>
+        /// Authenticate using a (JSON Web) token. This is optional.
+        /// </summary>
+        IPulsarClientBuilder AuthenticateUsingToken(string token);
+
+        /// <summary>
         /// Set connection encryption policy. The default is 'EnforceUnencrypted' if the ServiceUrl scheme is 'pulsar' and 'EnforceEncrypted' if it's 'pulsar+ssl'.
         /// </summary>
         IPulsarClientBuilder ConnectionSecurity(EncryptionPolicy encryptionPolicy);
diff --git a/src/DotPulsar/Internal/ConnectionPool.cs b/src/DotPulsar/Internal/ConnectionPool.cs
index fd0e7c4..6ccaa62 100644
--- a/src/DotPulsar/Internal/ConnectionPool.cs
+++ b/src/DotPulsar/Internal/ConnectionPool.cs
@@ -12,8 +12,7 @@
     public sealed class ConnectionPool : IDisposable
     {
         private readonly AsyncLock _lock;
-        private readonly int _protocolVersion;
-        private readonly string _clientVersion;
+        private readonly CommandConnect _commandConnect;
         private readonly Uri _serviceUrl;
         private readonly Connector _connector;
         private readonly EncryptionPolicy _encryptionPolicy;
@@ -22,11 +21,10 @@
         private readonly CancellationTokenSource _cancellationTokenSource;
         private readonly Task _closeInactiveConnections;
 
-        public ConnectionPool(int protocolVersion, string clientVersion, Uri serviceUrl, Connector connector, EncryptionPolicy encryptionPolicy)
+        public ConnectionPool(CommandConnect commandConnect, Uri serviceUrl, Connector connector, EncryptionPolicy encryptionPolicy)
         {
             _lock = new AsyncLock();
-            _protocolVersion = protocolVersion;
-            _clientVersion = clientVersion;
+            _commandConnect = commandConnect;
             _serviceUrl = serviceUrl;
             _connector = connector;
             _encryptionPolicy = encryptionPolicy;
@@ -90,11 +88,11 @@
             {
                 case EncryptionPolicy.EnforceEncrypted:
                     if (!hasBrokerServiceUrlTls)
-                        throw new ConnectionSecurityException("Cannot enforce encrypted connections. Lookup response from broker gave no secure alternative.");
+                        throw new ConnectionSecurityException("Cannot enforce encrypted connections. The lookup topic response from broker gave no secure alternative.");
                     return response.BrokerServiceUrlTls;
                 case EncryptionPolicy.EnforceUnencrypted:
                     if (!hasBrokerServiceUrl)
-                        throw new ConnectionSecurityException("Cannot enforce unencrypted connections. Lookup response from broker gave no unsecure alternative.");
+                        throw new ConnectionSecurityException("Cannot enforce unencrypted connections. The lookup topic response from broker gave no unsecure alternative.");
                     return response.BrokerServiceUrl;
                 case EncryptionPolicy.PreferEncrypted:
                     return hasBrokerServiceUrlTls ? response.BrokerServiceUrlTls : response.BrokerServiceUrl;
@@ -117,13 +115,7 @@
                 connection = new Connection(stream);
                 Register(serviceUrl, connection);
 
-                var connect = new CommandConnect
-                {
-                    ProtocolVersion = _protocolVersion,
-                    ClientVersion = _clientVersion
-                };
-
-                var response = await connection.Send(connect);
+                var response = await connection.Send(_commandConnect);
                 response.Expect(BaseCommand.Type.Connected);
 
                 return connection;
diff --git a/src/DotPulsar/Internal/Connector.cs b/src/DotPulsar/Internal/Connector.cs
index f2cb693..2f0c5e9 100644
--- a/src/DotPulsar/Internal/Connector.cs
+++ b/src/DotPulsar/Internal/Connector.cs
@@ -32,30 +32,53 @@
             if (port == -1)
                 port = encrypt ? Constants.DefaultPulsarSSLPort : Constants.DefaultPulsarPort;
 
+            var stream = await GetStream(host, port);
+
+            if (encrypt)
+                stream = await EncryptStream(stream, host);
+
+            return stream;
+        }
+
+        private async Task<Stream> GetStream(string host, int port)
+        {
             var tcpClient = new TcpClient();
 
-            switch (Uri.CheckHostName(host))
-            {
-                case UriHostNameType.IPv4:
-                case UriHostNameType.IPv6:
-                    await tcpClient.ConnectAsync(IPAddress.Parse(host), port);
-                    break;
-                default:
-                    await tcpClient.ConnectAsync(host, port);
-                    break;
-            }
-
-            if (!encrypt)
-                return tcpClient.GetStream();
-
             try
             {
-                var sslStream = new SslStream(tcpClient.GetStream(), false, new RemoteCertificateValidationCallback(ValidateServerCertificate), null);
+                var type = Uri.CheckHostName(host);
+
+                if (type == UriHostNameType.IPv4 || type == UriHostNameType.IPv6)
+                    await tcpClient.ConnectAsync(IPAddress.Parse(host), port);
+                else
+                    await tcpClient.ConnectAsync(host, port);
+
+                return tcpClient.GetStream();
+            }
+            catch
+            {
+                tcpClient.Dispose();
+                throw;
+            }
+        }
+
+        private async Task<Stream> EncryptStream(Stream stream, string host)
+        {
+            SslStream sslStream = null;
+
+            try
+            {
+                sslStream = new SslStream(stream, false, new RemoteCertificateValidationCallback(ValidateServerCertificate), null);
                 await sslStream.AuthenticateAsClientAsync(host);
                 return sslStream;
             }
             catch (System.Security.Authentication.AuthenticationException exception)
             {
+                if (sslStream == null)
+                    stream.Dispose();
+                else
+                    sslStream.Dispose();
+
                 throw new AuthenticationException("Got an authentication exception while trying to establish an encrypted connection. See inner exception for details.", exception);
             }
         }
diff --git a/src/DotPulsar/Internal/FaultStrategy.cs b/src/DotPulsar/Internal/FaultStrategy.cs
index a0775aa..9b3e80b 100644
--- a/src/DotPulsar/Internal/FaultStrategy.cs
+++ b/src/DotPulsar/Internal/FaultStrategy.cs
@@ -2,6 +2,7 @@
 using DotPulsar.Internal.Abstractions;
 using DotPulsar.Internal.Exceptions;
 using System;
+using System.Net.Sockets;
 
 namespace DotPulsar.Internal
 {
@@ -22,6 +23,15 @@
                 case StreamNotReadyException _: return FaultAction.Relookup;
                 case ServiceNotReadyException _: return FaultAction.Relookup;
                 case DotPulsarException _: return FaultAction.Fault;
+                case SocketException socketException:
+                    switch (socketException.SocketErrorCode)
+                    {
+                        case SocketError.HostNotFound:
+                        case SocketError.HostUnreachable:
+                        case SocketError.NetworkUnreachable:
+                            return FaultAction.Fault;
+                    }
+                    break;
             }
 
             return FaultAction.Relookup;
diff --git a/src/DotPulsar/Internal/PulsarClientBuilder.cs b/src/DotPulsar/Internal/PulsarClientBuilder.cs
index 2dda63a..fa7e000 100644
--- a/src/DotPulsar/Internal/PulsarClientBuilder.cs
+++ b/src/DotPulsar/Internal/PulsarClientBuilder.cs
@@ -1,12 +1,15 @@
 using DotPulsar.Abstractions;
 using DotPulsar.Exceptions;
+using DotPulsar.Internal.PulsarApi;
 using System;
 using System.Security.Cryptography.X509Certificates;
+using System.Text;
 
 namespace DotPulsar.Internal
 {
     public sealed class PulsarClientBuilder : IPulsarClientBuilder
     {
+        private readonly CommandConnect _commandConnect;
         private EncryptionPolicy? _encryptionPolicy;
         private TimeSpan _retryInterval;
         private Uri _serviceUrl;
@@ -16,12 +19,24 @@
 
         public PulsarClientBuilder()
         {
+            _commandConnect = new CommandConnect
+            {
+                ProtocolVersion = Constants.ProtocolVersion,
+                ClientVersion = Constants.ClientVersion
+            };
             _retryInterval = TimeSpan.FromSeconds(3);
             _serviceUrl = new Uri(Constants.PulsarScheme + "://localhost:" + Constants.DefaultPulsarPort);
             _verifyCertificateAuthority = true;
             _verifyCertificateName = false;
         }
 
+        public IPulsarClientBuilder AuthenticateUsingToken(string token)
+        {
+            _commandConnect.AuthMethodName = "token";
+            _commandConnect.AuthData = Encoding.ASCII.GetBytes(token);
+            return this;
+        }
+
         public IPulsarClientBuilder ConnectionSecurity(EncryptionPolicy encryptionPolicy)
         {
             _encryptionPolicy = encryptionPolicy;
@@ -82,7 +97,7 @@
                 throw new InvalidSchemeException($"Invalid scheme '{scheme}'. Expected '{Constants.PulsarScheme}' or '{Constants.PulsarSslScheme}'");
 
             var connector = new Connector(_trustedCertificateAuthority, _verifyCertificateAuthority, _verifyCertificateName);
-            var connectionPool = new ConnectionPool(Constants.ProtocolVersion, Constants.ClientVersion, _serviceUrl, connector, _encryptionPolicy.Value);
+            var connectionPool = new ConnectionPool(_commandConnect, _serviceUrl, connector, _encryptionPolicy.Value);
             return new PulsarClient(connectionPool, new FaultStrategy(_retryInterval));
         }
     }