Added support for encrypted connections. Still need work in regards to choosing encrypted vs unencrypted (telling the client which to enforce or prefer).
diff --git a/src/DotPulsar/Abstractions/IPulsarClientBuilder.cs b/src/DotPulsar/Abstractions/IPulsarClientBuilder.cs
index 35fe1cb..a82b965 100644
--- a/src/DotPulsar/Abstractions/IPulsarClientBuilder.cs
+++ b/src/DotPulsar/Abstractions/IPulsarClientBuilder.cs
@@ -1,4 +1,5 @@
using System;
+using System.Security.Cryptography.X509Certificates;
namespace DotPulsar.Abstractions
{
@@ -18,6 +19,21 @@
IPulsarClientBuilder ServiceUrl(Uri uri);
/// <summary>
+ /// Add a trusted certificate authority. This is optional.
+ /// </summary>
+ IPulsarClientBuilder TrustedCertificateAuthority(X509Certificate2 trustedCertificateAuthority);
+
+ /// <summary>
+ /// Verify the certificate authority. The default is 'true'.
+ /// </summary>
+ IPulsarClientBuilder VerifyCertificateAuthority(bool verifyCertificateAuthority);
+
+ /// <summary>
+ /// Verify the certificate name with the hostname. The default is 'false'.
+ /// </summary>
+ IPulsarClientBuilder VerifyCertificateName(bool verifyCertificateName);
+
+ /// <summary>
/// Create the client.
/// </summary>
IPulsarClient Build();
diff --git a/src/DotPulsar/Exceptions/AuthenticationException.cs b/src/DotPulsar/Exceptions/AuthenticationException.cs
index 6e9e098..7ef239c 100644
--- a/src/DotPulsar/Exceptions/AuthenticationException.cs
+++ b/src/DotPulsar/Exceptions/AuthenticationException.cs
@@ -1,7 +1,11 @@
-namespace DotPulsar.Exceptions
+using System;
+
+namespace DotPulsar.Exceptions
{
public sealed class AuthenticationException : DotPulsarException
{
public AuthenticationException(string message) : base(message) { }
+
+ public AuthenticationException(string message, Exception innerException) : base(message, innerException) { }
}
}
diff --git a/src/DotPulsar/Exceptions/InvalidSchemeException.cs b/src/DotPulsar/Exceptions/InvalidSchemeException.cs
new file mode 100644
index 0000000..191169d
--- /dev/null
+++ b/src/DotPulsar/Exceptions/InvalidSchemeException.cs
@@ -0,0 +1,7 @@
+namespace DotPulsar.Exceptions
+{
+ public sealed class InvalidSchemeException : DotPulsarException
+ {
+ public InvalidSchemeException(string message) : base(message) { }
+ }
+}
diff --git a/src/DotPulsar/Internal/ConnectionPool.cs b/src/DotPulsar/Internal/ConnectionPool.cs
index 68a2642..b871e39 100644
--- a/src/DotPulsar/Internal/ConnectionPool.cs
+++ b/src/DotPulsar/Internal/ConnectionPool.cs
@@ -3,8 +3,6 @@
using System;
using System.Collections.Concurrent;
using System.Linq;
-using System.Net;
-using System.Net.Sockets;
using System.Threading;
using System.Threading.Tasks;
@@ -16,16 +14,19 @@
private readonly int _protocolVersion;
private readonly string _clientVersion;
private readonly Uri _serviceUrl;
+ private readonly Connector _connector;
private readonly ConcurrentDictionary<Uri, Connection> _connections;
+
private readonly CancellationTokenSource _cancellationTokenSource;
private readonly Task _closeInactiveConnections;
- public ConnectionPool(int protocolVersion, string clientVersion, Uri serviceUrl)
+ public ConnectionPool(int protocolVersion, string clientVersion, Uri serviceUrl, Connector connector)
{
_lock = new AsyncLock();
_protocolVersion = protocolVersion;
_clientVersion = clientVersion;
_serviceUrl = serviceUrl;
+ _connector = connector;
_connections = new ConcurrentDictionary<Uri, Connection>();
_cancellationTokenSource = new CancellationTokenSource();
_closeInactiveConnections = CloseInactiveConnections(TimeSpan.FromSeconds(60), _cancellationTokenSource.Token);
@@ -52,7 +53,7 @@
Authoritative = false
};
- Uri serviceUrl = _serviceUrl;
+ var serviceUrl = _serviceUrl;
while (true)
{
@@ -69,7 +70,7 @@
if (response.LookupTopicResponse.Response == CommandLookupTopicResponse.LookupType.Redirect || !response.LookupTopicResponse.Authoritative)
continue;
- if (_serviceUrl.IsLoopback) // LookupType is Connect, ServiceUrl is local and response is authoritative. Assume the Pulsar server is a standalone docker
+ if (_serviceUrl.IsLoopback) // LookupType is 'Connect', ServiceUrl is local and response is authoritative. Assume the Pulsar server is a standalone docker.
return connection;
else
return await CreateConnection(serviceUrl, cancellationToken);
@@ -78,25 +79,15 @@
private async Task<Connection> CreateConnection(Uri serviceUrl, CancellationToken cancellationToken)
{
+
using (await _lock.Lock(cancellationToken))
{
if (_connections.TryGetValue(serviceUrl, out Connection connection))
return connection;
- var tcpClient = new TcpClient();
+ var stream = await _connector.Connect(serviceUrl);
- switch (Uri.CheckHostName(serviceUrl.Host))
- {
- case UriHostNameType.IPv4:
- case UriHostNameType.IPv6:
- await tcpClient.ConnectAsync(IPAddress.Parse(serviceUrl.Host), serviceUrl.Port);
- break;
- default:
- await tcpClient.ConnectAsync(serviceUrl.Host, serviceUrl.Port);
- break;
- }
-
- connection = new Connection(tcpClient.GetStream());
+ connection = new Connection(stream);
Register(serviceUrl, connection);
var connect = new CommandConnect
diff --git a/src/DotPulsar/Internal/Connector.cs b/src/DotPulsar/Internal/Connector.cs
new file mode 100644
index 0000000..fadc54e
--- /dev/null
+++ b/src/DotPulsar/Internal/Connector.cs
@@ -0,0 +1,110 @@
+using DotPulsar.Exceptions;
+using System;
+using System.IO;
+using System.Net;
+using System.Net.Security;
+using System.Net.Sockets;
+using System.Security.Cryptography.X509Certificates;
+using System.Threading.Tasks;
+
+namespace DotPulsar.Internal
+{
+ public sealed class Connector
+ {
+ private const string PulsarScheme = "pulsar";
+ private const string PulsarSslScheme = "pulsar+ssl";
+ private const int DefaultPulsarPort = 6650;
+ private const int DefaultPulsarSSLPort = 6651;
+
+ private readonly X509Certificate2 _trustedCertificateAuthority;
+ private readonly bool _verifyCertificateAuthority;
+ private readonly bool _verifyCertificateName;
+
+ public Connector(X509Certificate2 trustedCertificateAuthority, bool verifyCertificateAuthority, bool verifyCertificateName)
+ {
+ _trustedCertificateAuthority = trustedCertificateAuthority;
+ _verifyCertificateAuthority = verifyCertificateAuthority;
+ _verifyCertificateName = verifyCertificateName;
+ }
+
+ public async Task<Stream> Connect(Uri serviceUrl)
+ {
+ var scheme = serviceUrl.Scheme;
+ var host = serviceUrl.Host;
+ var port = serviceUrl.Port;
+ var encrypt = false;
+
+ switch (scheme)
+ {
+ case PulsarScheme:
+ if (port == -1)
+ port = DefaultPulsarPort;
+ break;
+ case PulsarSslScheme:
+ if (port == -1)
+ port = DefaultPulsarSSLPort;
+ encrypt = true;
+ break;
+ default:
+ throw new InvalidSchemeException($"Invalid scheme '{scheme}'. Expected '{PulsarScheme}' or '{PulsarSslScheme}'");
+ }
+
+ var tcpClient = new TcpClient();
+
+ switch (Uri.CheckHostName(host))
+ {
+ case UriHostNameType.IPv4:
+ case UriHostNameType.IPv6:
+ await tcpClient.ConnectAsync(IPAddress.Parse(host), port);
+ break;
+ default:
+ await tcpClient.ConnectAsync(host, port);
+ break;
+ }
+
+ if (!encrypt)
+ return tcpClient.GetStream();
+
+ try
+ {
+ var sslStream = new SslStream(tcpClient.GetStream(), false, new RemoteCertificateValidationCallback(ValidateServerCertificate), null);
+ await sslStream.AuthenticateAsClientAsync(host);
+ return sslStream;
+ }
+ catch (System.Security.Authentication.AuthenticationException exception)
+ {
+ throw new AuthenticationException("Got an authentication exception while trying to establish an encrypted connection. See inner exception for details.", exception);
+ }
+ }
+
+ private bool ValidateServerCertificate(object sender, X509Certificate certificate, X509Chain chain, SslPolicyErrors sslPolicyErrors)
+ {
+ if (sslPolicyErrors == SslPolicyErrors.None)
+ return true;
+
+ if (sslPolicyErrors.HasFlag(SslPolicyErrors.RemoteCertificateNotAvailable))
+ return false;
+
+ if (sslPolicyErrors.HasFlag(SslPolicyErrors.RemoteCertificateNameMismatch) && _verifyCertificateName)
+ return false;
+
+ if (sslPolicyErrors.HasFlag(SslPolicyErrors.RemoteCertificateChainErrors) && _verifyCertificateAuthority)
+ {
+ if (_trustedCertificateAuthority is null)
+ return false;
+
+ chain.ChainPolicy.ExtraStore.Add(_trustedCertificateAuthority);
+ _ = chain.Build((X509Certificate2)certificate);
+ for (var i = 0; i < chain.ChainElements.Count; i++)
+ {
+ if (chain.ChainElements[i].Certificate.Thumbprint == _trustedCertificateAuthority.Thumbprint)
+ return true;
+ }
+
+ return false;
+ }
+
+ return true;
+ }
+ }
+}
diff --git a/src/DotPulsar/Internal/PulsarClientBuilder.cs b/src/DotPulsar/Internal/PulsarClientBuilder.cs
index 2e95dea..2056024 100644
--- a/src/DotPulsar/Internal/PulsarClientBuilder.cs
+++ b/src/DotPulsar/Internal/PulsarClientBuilder.cs
@@ -1,6 +1,7 @@
using DotPulsar.Abstractions;
using System;
using System.Reflection;
+using System.Security.Cryptography.X509Certificates;
namespace DotPulsar.Internal
{
@@ -18,11 +19,16 @@
private TimeSpan _retryInterval;
private Uri _serviceUrl;
+ private X509Certificate2 _trustedCertificateAuthority;
+ private bool _verifyCertificateAuthority;
+ private bool _verifyCertificateName;
public PulsarClientBuilder()
{
_retryInterval = TimeSpan.FromSeconds(3);
_serviceUrl = new Uri("pulsar://localhost:6650");
+ _verifyCertificateAuthority = true;
+ _verifyCertificateName = false;
}
public IPulsarClientBuilder RetryInterval(TimeSpan interval)
@@ -37,9 +43,28 @@
return this;
}
+ public IPulsarClientBuilder TrustedCertificateAuthority(X509Certificate2 trustedCertificateAuthority)
+ {
+ _trustedCertificateAuthority = trustedCertificateAuthority;
+ return this;
+ }
+
+ public IPulsarClientBuilder VerifyCertificateAuthority(bool verifyCertificateAuthority)
+ {
+ _verifyCertificateAuthority = verifyCertificateAuthority;
+ return this;
+ }
+
+ public IPulsarClientBuilder VerifyCertificateName(bool verifyCertificateName)
+ {
+ _verifyCertificateName = verifyCertificateName;
+ return this;
+ }
+
public IPulsarClient Build()
{
- var connectionPool = new ConnectionPool(ProtocolVersion, ClientVersion, _serviceUrl);
+ var connector = new Connector(_trustedCertificateAuthority, _verifyCertificateAuthority, _verifyCertificateName);
+ var connectionPool = new ConnectionPool(ProtocolVersion, ClientVersion, _serviceUrl, connector);
return new PulsarClient(connectionPool, new FaultStrategy(_retryInterval));
}
}