Added support for encrypted connections. Still need work in regards to choosing encrypted vs unencrypted (telling the client which to enforce or prefer).
diff --git a/src/DotPulsar/Abstractions/IPulsarClientBuilder.cs b/src/DotPulsar/Abstractions/IPulsarClientBuilder.cs
index 35fe1cb..a82b965 100644
--- a/src/DotPulsar/Abstractions/IPulsarClientBuilder.cs
+++ b/src/DotPulsar/Abstractions/IPulsarClientBuilder.cs
@@ -1,4 +1,5 @@
 using System;
+using System.Security.Cryptography.X509Certificates;
 
 namespace DotPulsar.Abstractions
 {
@@ -18,6 +19,21 @@
         IPulsarClientBuilder ServiceUrl(Uri uri);
 
         /// <summary>
+        /// Add a trusted certificate authority. This is optional.
+        /// </summary>
+        IPulsarClientBuilder TrustedCertificateAuthority(X509Certificate2 trustedCertificateAuthority);
+
+        /// <summary>
+        /// Verify the certificate authority. The default is 'true'.
+        /// </summary>
+        IPulsarClientBuilder VerifyCertificateAuthority(bool verifyCertificateAuthority);
+
+        /// <summary>
+        /// Verify the certificate name with the hostname. The default is 'false'.
+        /// </summary>
+        IPulsarClientBuilder VerifyCertificateName(bool verifyCertificateName);
+
+        /// <summary>
         /// Create the client.
         /// </summary>
         IPulsarClient Build();
diff --git a/src/DotPulsar/Exceptions/AuthenticationException.cs b/src/DotPulsar/Exceptions/AuthenticationException.cs
index 6e9e098..7ef239c 100644
--- a/src/DotPulsar/Exceptions/AuthenticationException.cs
+++ b/src/DotPulsar/Exceptions/AuthenticationException.cs
@@ -1,7 +1,11 @@
-namespace DotPulsar.Exceptions
+using System;
+
+namespace DotPulsar.Exceptions
 {
     public sealed class AuthenticationException : DotPulsarException
     {
         public AuthenticationException(string message) : base(message) { }
+
+        public AuthenticationException(string message, Exception innerException) : base(message, innerException) { }
     }
 }
diff --git a/src/DotPulsar/Exceptions/InvalidSchemeException.cs b/src/DotPulsar/Exceptions/InvalidSchemeException.cs
new file mode 100644
index 0000000..191169d
--- /dev/null
+++ b/src/DotPulsar/Exceptions/InvalidSchemeException.cs
@@ -0,0 +1,7 @@
+namespace DotPulsar.Exceptions
+{
+    public sealed class InvalidSchemeException : DotPulsarException
+    {
+        public InvalidSchemeException(string message) : base(message) { }
+    }
+}
diff --git a/src/DotPulsar/Internal/ConnectionPool.cs b/src/DotPulsar/Internal/ConnectionPool.cs
index 68a2642..b871e39 100644
--- a/src/DotPulsar/Internal/ConnectionPool.cs
+++ b/src/DotPulsar/Internal/ConnectionPool.cs
@@ -3,8 +3,6 @@
 using System;
 using System.Collections.Concurrent;
 using System.Linq;
-using System.Net;
-using System.Net.Sockets;
 using System.Threading;
 using System.Threading.Tasks;
 
@@ -16,16 +14,19 @@
         private readonly int _protocolVersion;
         private readonly string _clientVersion;
         private readonly Uri _serviceUrl;
+        private readonly Connector _connector;
         private readonly ConcurrentDictionary<Uri, Connection> _connections;
+
         private readonly CancellationTokenSource _cancellationTokenSource;
         private readonly Task _closeInactiveConnections;
 
-        public ConnectionPool(int protocolVersion, string clientVersion, Uri serviceUrl)
+        public ConnectionPool(int protocolVersion, string clientVersion, Uri serviceUrl, Connector connector)
         {
             _lock = new AsyncLock();
             _protocolVersion = protocolVersion;
             _clientVersion = clientVersion;
             _serviceUrl = serviceUrl;
+            _connector = connector;
             _connections = new ConcurrentDictionary<Uri, Connection>();
             _cancellationTokenSource = new CancellationTokenSource();
             _closeInactiveConnections = CloseInactiveConnections(TimeSpan.FromSeconds(60), _cancellationTokenSource.Token);
@@ -52,7 +53,7 @@
                 Authoritative = false
             };
 
-            Uri serviceUrl = _serviceUrl;
+            var serviceUrl = _serviceUrl;
 
             while (true)
             {
@@ -69,7 +70,7 @@
                 if (response.LookupTopicResponse.Response == CommandLookupTopicResponse.LookupType.Redirect || !response.LookupTopicResponse.Authoritative)
                     continue;
 
-                if (_serviceUrl.IsLoopback) // LookupType is Connect, ServiceUrl is local and response is authoritative. Assume the Pulsar server is a standalone docker
+                if (_serviceUrl.IsLoopback) // LookupType is 'Connect', ServiceUrl is local and response is authoritative. Assume the Pulsar server is a standalone docker.
                     return connection;
                 else
                     return await CreateConnection(serviceUrl, cancellationToken);
@@ -78,25 +79,15 @@
 
         private async Task<Connection> CreateConnection(Uri serviceUrl, CancellationToken cancellationToken)
         {
+
             using (await _lock.Lock(cancellationToken))
             {
                 if (_connections.TryGetValue(serviceUrl, out Connection connection))
                     return connection;
 
-                var tcpClient = new TcpClient();
+                var stream = await _connector.Connect(serviceUrl);
 
-                switch (Uri.CheckHostName(serviceUrl.Host))
-                {
-                    case UriHostNameType.IPv4:
-                    case UriHostNameType.IPv6:
-                        await tcpClient.ConnectAsync(IPAddress.Parse(serviceUrl.Host), serviceUrl.Port);
-                        break;
-                    default:
-                        await tcpClient.ConnectAsync(serviceUrl.Host, serviceUrl.Port);
-                        break;
-                }
-
-                connection = new Connection(tcpClient.GetStream());
+                connection = new Connection(stream);
                 Register(serviceUrl, connection);
 
                 var connect = new CommandConnect
diff --git a/src/DotPulsar/Internal/Connector.cs b/src/DotPulsar/Internal/Connector.cs
new file mode 100644
index 0000000..fadc54e
--- /dev/null
+++ b/src/DotPulsar/Internal/Connector.cs
@@ -0,0 +1,110 @@
+using DotPulsar.Exceptions;
+using System;
+using System.IO;
+using System.Net;
+using System.Net.Security;
+using System.Net.Sockets;
+using System.Security.Cryptography.X509Certificates;
+using System.Threading.Tasks;
+
+namespace DotPulsar.Internal
+{
+    public sealed class Connector
+    {
+        private const string PulsarScheme = "pulsar";
+        private const string PulsarSslScheme = "pulsar+ssl";
+        private const int DefaultPulsarPort = 6650;
+        private const int DefaultPulsarSSLPort = 6651;
+
+        private readonly X509Certificate2 _trustedCertificateAuthority;
+        private readonly bool _verifyCertificateAuthority;
+        private readonly bool _verifyCertificateName;
+
+        public Connector(X509Certificate2 trustedCertificateAuthority, bool verifyCertificateAuthority, bool verifyCertificateName)
+        {
+            _trustedCertificateAuthority = trustedCertificateAuthority;
+            _verifyCertificateAuthority = verifyCertificateAuthority;
+            _verifyCertificateName = verifyCertificateName;
+        }
+
+        public async Task<Stream> Connect(Uri serviceUrl)
+        {
+            var scheme = serviceUrl.Scheme;
+            var host = serviceUrl.Host;
+            var port = serviceUrl.Port;
+            var encrypt = false;
+
+            switch (scheme)
+            {
+                case PulsarScheme:
+                    if (port == -1)
+                        port = DefaultPulsarPort;
+                    break;
+                case PulsarSslScheme:
+                    if (port == -1)
+                        port = DefaultPulsarSSLPort;
+                    encrypt = true;
+                    break;
+                default:
+                    throw new InvalidSchemeException($"Invalid scheme '{scheme}'. Expected '{PulsarScheme}' or '{PulsarSslScheme}'");
+            }
+
+            var tcpClient = new TcpClient();
+
+            switch (Uri.CheckHostName(host))
+            {
+                case UriHostNameType.IPv4:
+                case UriHostNameType.IPv6:
+                    await tcpClient.ConnectAsync(IPAddress.Parse(host), port);
+                    break;
+                default:
+                    await tcpClient.ConnectAsync(host, port);
+                    break;
+            }
+
+            if (!encrypt)
+                return tcpClient.GetStream();
+
+            try
+            {
+                var sslStream = new SslStream(tcpClient.GetStream(), false, new RemoteCertificateValidationCallback(ValidateServerCertificate), null);
+                await sslStream.AuthenticateAsClientAsync(host);
+                return sslStream;
+            }
+            catch (System.Security.Authentication.AuthenticationException exception)
+            {
+                throw new AuthenticationException("Got an authentication exception while trying to establish an encrypted connection. See inner exception for details.", exception);
+            }
+        }
+
+        private bool ValidateServerCertificate(object sender, X509Certificate certificate, X509Chain chain, SslPolicyErrors sslPolicyErrors)
+        {
+            if (sslPolicyErrors == SslPolicyErrors.None)
+                return true;
+
+            if (sslPolicyErrors.HasFlag(SslPolicyErrors.RemoteCertificateNotAvailable))
+                return false;
+
+            if (sslPolicyErrors.HasFlag(SslPolicyErrors.RemoteCertificateNameMismatch) && _verifyCertificateName)
+                return false;
+
+            if (sslPolicyErrors.HasFlag(SslPolicyErrors.RemoteCertificateChainErrors) && _verifyCertificateAuthority)
+            {
+                if (_trustedCertificateAuthority is null)
+                    return false;
+
+                chain.ChainPolicy.ExtraStore.Add(_trustedCertificateAuthority);
+                _ = chain.Build((X509Certificate2)certificate);
+                for (var i = 0; i < chain.ChainElements.Count; i++)
+                {
+                    if (chain.ChainElements[i].Certificate.Thumbprint == _trustedCertificateAuthority.Thumbprint)
+                        return true;
+                }
+
+                return false;
+            }
+
+            return true;
+        }
+    }
+}
diff --git a/src/DotPulsar/Internal/PulsarClientBuilder.cs b/src/DotPulsar/Internal/PulsarClientBuilder.cs
index 2e95dea..2056024 100644
--- a/src/DotPulsar/Internal/PulsarClientBuilder.cs
+++ b/src/DotPulsar/Internal/PulsarClientBuilder.cs
@@ -1,6 +1,7 @@
 using DotPulsar.Abstractions;
 using System;
 using System.Reflection;
+using System.Security.Cryptography.X509Certificates;
 
 namespace DotPulsar.Internal
 {
@@ -18,11 +19,16 @@
 
         private TimeSpan _retryInterval;
         private Uri _serviceUrl;
+        private X509Certificate2 _trustedCertificateAuthority;
+        private bool _verifyCertificateAuthority;
+        private bool _verifyCertificateName;
 
         public PulsarClientBuilder()
         {
             _retryInterval = TimeSpan.FromSeconds(3);
             _serviceUrl = new Uri("pulsar://localhost:6650");
+            _verifyCertificateAuthority = true;
+            _verifyCertificateName = false;
         }
 
         public IPulsarClientBuilder RetryInterval(TimeSpan interval)
@@ -37,9 +43,28 @@
             return this;
         }
 
+        public IPulsarClientBuilder TrustedCertificateAuthority(X509Certificate2 trustedCertificateAuthority)
+        {
+            _trustedCertificateAuthority = trustedCertificateAuthority;
+            return this;
+        }
+
+        public IPulsarClientBuilder VerifyCertificateAuthority(bool verifyCertificateAuthority)
+        {
+            _verifyCertificateAuthority = verifyCertificateAuthority;
+            return this;
+        }
+
+        public IPulsarClientBuilder VerifyCertificateName(bool verifyCertificateName)
+        {
+            _verifyCertificateName = verifyCertificateName;
+            return this;
+        }
+
         public IPulsarClient Build()
         {
-            var connectionPool = new ConnectionPool(ProtocolVersion, ClientVersion, _serviceUrl);
+            var connector = new Connector(_trustedCertificateAuthority, _verifyCertificateAuthority, _verifyCertificateName);
+            var connectionPool = new ConnectionPool(ProtocolVersion, ClientVersion, _serviceUrl, connector);
             return new PulsarClient(connectionPool, new FaultStrategy(_retryInterval));
         }
     }