blob: ddd1a527565059b22f523fdcc5d3cf94391bb47b [file] [log] [blame]
/*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
namespace DotPulsar.Internal;
using Abstractions;
using DotPulsar.Abstractions;
using DotPulsar.Exceptions;
using PulsarApi;
using System;
using System.Collections.Generic;
using System.Security.Cryptography.X509Certificates;
using System.Text;
using System.Threading.Tasks;
public sealed class PulsarClientBuilder : IPulsarClientBuilder
{
private readonly CommandConnect _commandConnect;
private readonly List<IHandleException> _exceptionHandlers;
private EncryptionPolicy? _encryptionPolicy;
private TimeSpan _keepAliveInterval;
private string? _listenerName;
private TimeSpan _retryInterval;
private Uri _serviceUrl;
private X509Certificate2? _trustedCertificateAuthority;
private readonly X509Certificate2Collection _clientCertificates;
private bool _verifyCertificateAuthority;
private bool _verifyCertificateName;
private TimeSpan _closeInactiveConnectionsInterval;
private Func<Task<string>>? _tokenFactory;
public PulsarClientBuilder()
{
_commandConnect = new CommandConnect
{
ProtocolVersion = Constants.ProtocolVersion,
ClientVersion = $"{Constants.ClientName} {Constants.ClientVersion}"
};
_exceptionHandlers = new List<IHandleException>();
_keepAliveInterval = TimeSpan.FromSeconds(30);
_retryInterval = TimeSpan.FromSeconds(3);
_serviceUrl = new Uri($"{Constants.PulsarScheme}://localhost:{Constants.DefaultPulsarPort}");
_clientCertificates = new X509Certificate2Collection();
_verifyCertificateAuthority = true;
_verifyCertificateName = false;
_closeInactiveConnectionsInterval = TimeSpan.FromSeconds(60);
}
public IPulsarClientBuilder AuthenticateUsingClientCertificate(X509Certificate2 clientCertificate)
{
_commandConnect.AuthMethodName = "tls";
_clientCertificates.Add(clientCertificate);
return this;
}
public IPulsarClientBuilder AuthenticateUsingToken(string token)
{
_commandConnect.AuthMethodName = "token";
_commandConnect.AuthData = Encoding.UTF8.GetBytes(token);
return this;
}
public IPulsarClientBuilder AuthenticateUsingToken(Func<Task<string>> tokenFactory)
{
_tokenFactory = tokenFactory;
var featureFlags = _commandConnect.FeatureFlags ?? new FeatureFlags();
featureFlags.SupportsAuthRefresh = true;
_commandConnect.FeatureFlags = featureFlags;
return this;
}
public IPulsarClientBuilder ConnectionSecurity(EncryptionPolicy encryptionPolicy)
{
_encryptionPolicy = encryptionPolicy;
return this;
}
public IPulsarClientBuilder ExceptionHandler(IHandleException exceptionHandler)
{
_exceptionHandlers.Add(exceptionHandler);
return this;
}
public IPulsarClientBuilder KeepAliveInterval(TimeSpan interval)
{
_keepAliveInterval = interval;
return this;
}
public IPulsarClientBuilder ListenerName(string listenerName)
{
_listenerName = listenerName;
return this;
}
public IPulsarClientBuilder RetryInterval(TimeSpan interval)
{
_retryInterval = interval;
return this;
}
public IPulsarClientBuilder ServiceUrl(Uri uri)
{
_serviceUrl = uri;
return this;
}
public IPulsarClientBuilder TrustedCertificateAuthority(X509Certificate2 trustedCertificateAuthority)
{
_trustedCertificateAuthority = trustedCertificateAuthority;
return this;
}
public IPulsarClientBuilder VerifyCertificateAuthority(bool verifyCertificateAuthority)
{
_verifyCertificateAuthority = verifyCertificateAuthority;
return this;
}
public IPulsarClientBuilder VerifyCertificateName(bool verifyCertificateName)
{
_verifyCertificateName = verifyCertificateName;
return this;
}
public IPulsarClientBuilder CloseInactiveConnectionsInterval(TimeSpan interval)
{
_closeInactiveConnectionsInterval = interval;
return this;
}
public IPulsarClient Build()
{
var scheme = _serviceUrl.Scheme;
if (scheme == Constants.PulsarScheme)
{
_encryptionPolicy ??= EncryptionPolicy.EnforceUnencrypted;
if (_encryptionPolicy.Value == EncryptionPolicy.EnforceEncrypted)
throw new ConnectionSecurityException(
$"The scheme of the ServiceUrl ({_serviceUrl}) is '{Constants.PulsarScheme}' and cannot be used with an encryption policy of 'EnforceEncrypted'");
}
else if (scheme == Constants.PulsarSslScheme)
{
_encryptionPolicy ??= EncryptionPolicy.EnforceEncrypted;
if (_encryptionPolicy.Value == EncryptionPolicy.EnforceUnencrypted)
throw new ConnectionSecurityException(
$"The scheme of the ServiceUrl ({_serviceUrl}) is '{Constants.PulsarSslScheme}' and cannot be used with an encryption policy of 'EnforceUnencrypted'");
}
else
throw new InvalidSchemeException($"Invalid scheme '{scheme}'. Expected '{Constants.PulsarScheme}' or '{Constants.PulsarSslScheme}'");
var connector = new Connector(_clientCertificates, _trustedCertificateAuthority, _verifyCertificateAuthority, _verifyCertificateName);
var exceptionHandlers = new List<IHandleException>(_exceptionHandlers) { new DefaultExceptionHandler(_retryInterval) };
var exceptionHandlerPipeline = new ExceptionHandlerPipeline(exceptionHandlers);
var connectionPool = new ConnectionPool(_commandConnect, _serviceUrl, connector, _encryptionPolicy.Value, _closeInactiveConnectionsInterval, _listenerName,
_keepAliveInterval,
new Executor(Guid.Empty, new EmptyRegisterEvent(), exceptionHandlerPipeline),
_tokenFactory);
var processManager = new ProcessManager(connectionPool);
return new PulsarClient(connectionPool, processManager, exceptionHandlerPipeline, _serviceUrl);
}
}
internal class EmptyRegisterEvent : IRegisterEvent
{
public void Register(IEvent @event) { }
}