blob: 77b4afb7654e16a5b93169bb0a4e20d088d57024 [file] [log] [blame]
/*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
namespace DotPulsar.Internal
{
using DotPulsar.Abstractions;
using DotPulsar.Exceptions;
using PulsarApi;
using System;
using System.Collections.Generic;
using System.Security.Cryptography.X509Certificates;
using System.Text;
public sealed class PulsarClientBuilder : IPulsarClientBuilder
{
private readonly CommandConnect _commandConnect;
private readonly List<IHandleException> _exceptionHandlers;
private EncryptionPolicy? _encryptionPolicy;
private string? _listenerName;
private TimeSpan _retryInterval;
private Uri _serviceUrl;
private X509Certificate2? _trustedCertificateAuthority;
private readonly X509Certificate2Collection _clientCertificates;
private bool _verifyCertificateAuthority;
private bool _verifyCertificateName;
private TimeSpan _closeInactiveConnectionsInterval;
public PulsarClientBuilder()
{
_commandConnect = new CommandConnect
{
ProtocolVersion = Constants.ProtocolVersion,
ClientVersion = $"{Constants.ClientName} {Constants.ClientVersion}"
};
_exceptionHandlers = new List<IHandleException>();
_retryInterval = TimeSpan.FromSeconds(3);
_serviceUrl = new Uri($"{Constants.PulsarScheme}://localhost:{Constants.DefaultPulsarPort}");
_clientCertificates = new X509Certificate2Collection();
_verifyCertificateAuthority = true;
_verifyCertificateName = false;
_closeInactiveConnectionsInterval = TimeSpan.FromSeconds(60);
}
public IPulsarClientBuilder AuthenticateUsingClientCertificate(X509Certificate2 clientCertificate)
{
_commandConnect.AuthMethodName = "tls";
_clientCertificates.Add(clientCertificate);
return this;
}
public IPulsarClientBuilder AuthenticateUsingToken(string token)
{
_commandConnect.AuthMethodName = "token";
_commandConnect.AuthData = Encoding.UTF8.GetBytes(token);
return this;
}
public IPulsarClientBuilder ConnectionSecurity(EncryptionPolicy encryptionPolicy)
{
_encryptionPolicy = encryptionPolicy;
return this;
}
public IPulsarClientBuilder ExceptionHandler(IHandleException exceptionHandler)
{
_exceptionHandlers.Add(exceptionHandler);
return this;
}
public IPulsarClientBuilder ListenerName(string listenerName)
{
_listenerName = listenerName;
return this;
}
public IPulsarClientBuilder RetryInterval(TimeSpan interval)
{
_retryInterval = interval;
return this;
}
public IPulsarClientBuilder ServiceUrl(Uri uri)
{
_serviceUrl = uri;
return this;
}
public IPulsarClientBuilder TrustedCertificateAuthority(X509Certificate2 trustedCertificateAuthority)
{
_trustedCertificateAuthority = trustedCertificateAuthority;
return this;
}
public IPulsarClientBuilder VerifyCertificateAuthority(bool verifyCertificateAuthority)
{
_verifyCertificateAuthority = verifyCertificateAuthority;
return this;
}
public IPulsarClientBuilder VerifyCertificateName(bool verifyCertificateName)
{
_verifyCertificateName = verifyCertificateName;
return this;
}
public IPulsarClientBuilder CloseInactiveConnectionsInterval(TimeSpan interval)
{
_closeInactiveConnectionsInterval = interval;
return this;
}
public IPulsarClient Build()
{
var scheme = _serviceUrl.Scheme;
if (scheme == Constants.PulsarScheme)
{
_encryptionPolicy ??= EncryptionPolicy.EnforceUnencrypted;
if (_encryptionPolicy.Value == EncryptionPolicy.EnforceEncrypted)
throw new ConnectionSecurityException(
$"The scheme of the ServiceUrl ({_serviceUrl}) is '{Constants.PulsarScheme}' and cannot be used with an encryption policy of 'EnforceEncrypted'");
}
else if (scheme == Constants.PulsarSslScheme)
{
_encryptionPolicy ??= EncryptionPolicy.EnforceEncrypted;
if (_encryptionPolicy.Value == EncryptionPolicy.EnforceUnencrypted)
throw new ConnectionSecurityException(
$"The scheme of the ServiceUrl ({_serviceUrl}) is '{Constants.PulsarSslScheme}' and cannot be used with an encryption policy of 'EnforceUnencrypted'");
}
else
throw new InvalidSchemeException($"Invalid scheme '{scheme}'. Expected '{Constants.PulsarScheme}' or '{Constants.PulsarSslScheme}'");
var connector = new Connector(_clientCertificates, _trustedCertificateAuthority, _verifyCertificateAuthority, _verifyCertificateName);
var connectionPool = new ConnectionPool(_commandConnect, _serviceUrl, connector, _encryptionPolicy.Value, _closeInactiveConnectionsInterval, _listenerName);
var processManager = new ProcessManager(connectionPool);
var exceptionHandlers = new List<IHandleException>(_exceptionHandlers) { new DefaultExceptionHandler(_retryInterval) };
var exceptionHandlerPipeline = new ExceptionHandlerPipeline(exceptionHandlers);
return new PulsarClient(connectionPool, processManager, exceptionHandlerPipeline, _serviceUrl);
}
}
}