Add token authentication and fault producer/consumer/reader when host/network is unknown/unreachable.
diff --git a/src/DotPulsar/Abstractions/IPulsarClientBuilder.cs b/src/DotPulsar/Abstractions/IPulsarClientBuilder.cs
index 95ea924..894d1a7 100644
--- a/src/DotPulsar/Abstractions/IPulsarClientBuilder.cs
+++ b/src/DotPulsar/Abstractions/IPulsarClientBuilder.cs
@@ -9,6 +9,11 @@
public interface IPulsarClientBuilder
{
/// <summary>
+ /// Authenticate using a (JSON Web) token. This is optional.
+ /// </summary>
+ IPulsarClientBuilder AuthenticateUsingToken(string token);
+
+ /// <summary>
/// Set connection encryption policy. The default is 'EnforceUnencrypted' if the ServiceUrl scheme is 'pulsar' and 'EnforceEncrypted' if it's 'pulsar+ssl'.
/// </summary>
IPulsarClientBuilder ConnectionSecurity(EncryptionPolicy encryptionPolicy);
diff --git a/src/DotPulsar/Internal/ConnectionPool.cs b/src/DotPulsar/Internal/ConnectionPool.cs
index fd0e7c4..6ccaa62 100644
--- a/src/DotPulsar/Internal/ConnectionPool.cs
+++ b/src/DotPulsar/Internal/ConnectionPool.cs
@@ -12,8 +12,7 @@
public sealed class ConnectionPool : IDisposable
{
private readonly AsyncLock _lock;
- private readonly int _protocolVersion;
- private readonly string _clientVersion;
+ private readonly CommandConnect _commandConnect;
private readonly Uri _serviceUrl;
private readonly Connector _connector;
private readonly EncryptionPolicy _encryptionPolicy;
@@ -22,11 +21,10 @@
private readonly CancellationTokenSource _cancellationTokenSource;
private readonly Task _closeInactiveConnections;
- public ConnectionPool(int protocolVersion, string clientVersion, Uri serviceUrl, Connector connector, EncryptionPolicy encryptionPolicy)
+ public ConnectionPool(CommandConnect commandConnect, Uri serviceUrl, Connector connector, EncryptionPolicy encryptionPolicy)
{
_lock = new AsyncLock();
- _protocolVersion = protocolVersion;
- _clientVersion = clientVersion;
+ _commandConnect = commandConnect;
_serviceUrl = serviceUrl;
_connector = connector;
_encryptionPolicy = encryptionPolicy;
@@ -90,11 +88,11 @@
{
case EncryptionPolicy.EnforceEncrypted:
if (!hasBrokerServiceUrlTls)
- throw new ConnectionSecurityException("Cannot enforce encrypted connections. Lookup response from broker gave no secure alternative.");
+ throw new ConnectionSecurityException("Cannot enforce encrypted connections. The lookup topic response from broker gave no secure alternative.");
return response.BrokerServiceUrlTls;
case EncryptionPolicy.EnforceUnencrypted:
if (!hasBrokerServiceUrl)
- throw new ConnectionSecurityException("Cannot enforce unencrypted connections. Lookup response from broker gave no unsecure alternative.");
+ throw new ConnectionSecurityException("Cannot enforce unencrypted connections. The lookup topic response from broker gave no unsecure alternative.");
return response.BrokerServiceUrl;
case EncryptionPolicy.PreferEncrypted:
return hasBrokerServiceUrlTls ? response.BrokerServiceUrlTls : response.BrokerServiceUrl;
@@ -117,13 +115,7 @@
connection = new Connection(stream);
Register(serviceUrl, connection);
- var connect = new CommandConnect
- {
- ProtocolVersion = _protocolVersion,
- ClientVersion = _clientVersion
- };
-
- var response = await connection.Send(connect);
+ var response = await connection.Send(_commandConnect);
response.Expect(BaseCommand.Type.Connected);
return connection;
diff --git a/src/DotPulsar/Internal/Connector.cs b/src/DotPulsar/Internal/Connector.cs
index f2cb693..2f0c5e9 100644
--- a/src/DotPulsar/Internal/Connector.cs
+++ b/src/DotPulsar/Internal/Connector.cs
@@ -32,30 +32,53 @@
if (port == -1)
port = encrypt ? Constants.DefaultPulsarSSLPort : Constants.DefaultPulsarPort;
+ var stream = await GetStream(host, port);
+
+ if (encrypt)
+ stream = await EncryptStream(stream, host);
+
+ return stream;
+ }
+
+ private async Task<Stream> GetStream(string host, int port)
+ {
var tcpClient = new TcpClient();
- switch (Uri.CheckHostName(host))
- {
- case UriHostNameType.IPv4:
- case UriHostNameType.IPv6:
- await tcpClient.ConnectAsync(IPAddress.Parse(host), port);
- break;
- default:
- await tcpClient.ConnectAsync(host, port);
- break;
- }
-
- if (!encrypt)
- return tcpClient.GetStream();
-
try
{
- var sslStream = new SslStream(tcpClient.GetStream(), false, new RemoteCertificateValidationCallback(ValidateServerCertificate), null);
+ var type = Uri.CheckHostName(host);
+
+ if (type == UriHostNameType.IPv4 || type == UriHostNameType.IPv6)
+ await tcpClient.ConnectAsync(IPAddress.Parse(host), port);
+ else
+ await tcpClient.ConnectAsync(host, port);
+
+ return tcpClient.GetStream();
+ }
+ catch
+ {
+ tcpClient.Dispose();
+ throw;
+ }
+ }
+
+ private async Task<Stream> EncryptStream(Stream stream, string host)
+ {
+ SslStream sslStream = null;
+
+ try
+ {
+ sslStream = new SslStream(stream, false, new RemoteCertificateValidationCallback(ValidateServerCertificate), null);
await sslStream.AuthenticateAsClientAsync(host);
return sslStream;
}
catch (System.Security.Authentication.AuthenticationException exception)
{
+ if (sslStream == null)
+ stream.Dispose();
+ else
+ sslStream.Dispose();
+
throw new AuthenticationException("Got an authentication exception while trying to establish an encrypted connection. See inner exception for details.", exception);
}
}
diff --git a/src/DotPulsar/Internal/FaultStrategy.cs b/src/DotPulsar/Internal/FaultStrategy.cs
index a0775aa..9b3e80b 100644
--- a/src/DotPulsar/Internal/FaultStrategy.cs
+++ b/src/DotPulsar/Internal/FaultStrategy.cs
@@ -2,6 +2,7 @@
using DotPulsar.Internal.Abstractions;
using DotPulsar.Internal.Exceptions;
using System;
+using System.Net.Sockets;
namespace DotPulsar.Internal
{
@@ -22,6 +23,15 @@
case StreamNotReadyException _: return FaultAction.Relookup;
case ServiceNotReadyException _: return FaultAction.Relookup;
case DotPulsarException _: return FaultAction.Fault;
+ case SocketException socketException:
+ switch (socketException.SocketErrorCode)
+ {
+ case SocketError.HostNotFound:
+ case SocketError.HostUnreachable:
+ case SocketError.NetworkUnreachable:
+ return FaultAction.Fault;
+ }
+ break;
}
return FaultAction.Relookup;
diff --git a/src/DotPulsar/Internal/PulsarClientBuilder.cs b/src/DotPulsar/Internal/PulsarClientBuilder.cs
index 2dda63a..fa7e000 100644
--- a/src/DotPulsar/Internal/PulsarClientBuilder.cs
+++ b/src/DotPulsar/Internal/PulsarClientBuilder.cs
@@ -1,12 +1,15 @@
using DotPulsar.Abstractions;
using DotPulsar.Exceptions;
+using DotPulsar.Internal.PulsarApi;
using System;
using System.Security.Cryptography.X509Certificates;
+using System.Text;
namespace DotPulsar.Internal
{
public sealed class PulsarClientBuilder : IPulsarClientBuilder
{
+ private readonly CommandConnect _commandConnect;
private EncryptionPolicy? _encryptionPolicy;
private TimeSpan _retryInterval;
private Uri _serviceUrl;
@@ -16,12 +19,24 @@
public PulsarClientBuilder()
{
+ _commandConnect = new CommandConnect
+ {
+ ProtocolVersion = Constants.ProtocolVersion,
+ ClientVersion = Constants.ClientVersion
+ };
_retryInterval = TimeSpan.FromSeconds(3);
_serviceUrl = new Uri(Constants.PulsarScheme + "://localhost:" + Constants.DefaultPulsarPort);
_verifyCertificateAuthority = true;
_verifyCertificateName = false;
}
+ public IPulsarClientBuilder AuthenticateUsingToken(string token)
+ {
+ _commandConnect.AuthMethodName = "token";
+ _commandConnect.AuthData = Encoding.ASCII.GetBytes(token);
+ return this;
+ }
+
public IPulsarClientBuilder ConnectionSecurity(EncryptionPolicy encryptionPolicy)
{
_encryptionPolicy = encryptionPolicy;
@@ -82,7 +97,7 @@
throw new InvalidSchemeException($"Invalid scheme '{scheme}'. Expected '{Constants.PulsarScheme}' or '{Constants.PulsarSslScheme}'");
var connector = new Connector(_trustedCertificateAuthority, _verifyCertificateAuthority, _verifyCertificateName);
- var connectionPool = new ConnectionPool(Constants.ProtocolVersion, Constants.ClientVersion, _serviceUrl, connector, _encryptionPolicy.Value);
+ var connectionPool = new ConnectionPool(_commandConnect, _serviceUrl, connector, _encryptionPolicy.Value);
return new PulsarClient(connectionPool, new FaultStrategy(_retryInterval));
}
}