﻿/*
 * Licensed under the Apache License, Version 2.0 (the "License");
 * you may not use this file except in compliance with the License.
 * You may obtain a copy of the License at
 *
 *   http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

namespace DotPulsar.Internal;

using Abstractions;
using DotPulsar.Abstractions;
using DotPulsar.Exceptions;
using PulsarApi;
using System;
using System.Collections.Generic;
using System.Security.Cryptography.X509Certificates;
using System.Text;
using System.Threading.Tasks;

public sealed class PulsarClientBuilder : IPulsarClientBuilder
{
    private readonly CommandConnect _commandConnect;
    private readonly List<IHandleException> _exceptionHandlers;
    private EncryptionPolicy? _encryptionPolicy;
    private TimeSpan _keepAliveInterval;
    private string? _listenerName;
    private TimeSpan _retryInterval;
    private Uri _serviceUrl;
    private X509Certificate2? _trustedCertificateAuthority;
    private readonly X509Certificate2Collection _clientCertificates;
    private bool _verifyCertificateAuthority;
    private bool _verifyCertificateName;
    private TimeSpan _closeInactiveConnectionsInterval;
    private Func<Task<string>>? _tokenFactory;

    public PulsarClientBuilder()
    {
        _commandConnect = new CommandConnect
        {
            ProtocolVersion = Constants.ProtocolVersion,
            ClientVersion = $"{Constants.ClientName} {Constants.ClientVersion}"
        };

        _exceptionHandlers = new List<IHandleException>();
        _keepAliveInterval = TimeSpan.FromSeconds(30);
        _retryInterval = TimeSpan.FromSeconds(3);
        _serviceUrl = new Uri($"{Constants.PulsarScheme}://localhost:{Constants.DefaultPulsarPort}");
        _clientCertificates = new X509Certificate2Collection();
        _verifyCertificateAuthority = true;
        _verifyCertificateName = false;
        _closeInactiveConnectionsInterval = TimeSpan.FromSeconds(60);
    }

    public IPulsarClientBuilder AuthenticateUsingClientCertificate(X509Certificate2 clientCertificate)
    {
        _commandConnect.AuthMethodName = "tls";
        _clientCertificates.Add(clientCertificate);
        return this;
    }

    public IPulsarClientBuilder AuthenticateUsingToken(string token)
    {
        _commandConnect.AuthMethodName = "token";
        _commandConnect.AuthData = Encoding.UTF8.GetBytes(token);
        return this;
    }

    public IPulsarClientBuilder AuthenticateUsingToken(Func<Task<string>> tokenFactory)
    {
        _tokenFactory = tokenFactory;
        var featureFlags = _commandConnect.FeatureFlags ?? new FeatureFlags();
        featureFlags.SupportsAuthRefresh = true;
        _commandConnect.FeatureFlags = featureFlags;
        return this;
    }

    public IPulsarClientBuilder ConnectionSecurity(EncryptionPolicy encryptionPolicy)
    {
        _encryptionPolicy = encryptionPolicy;
        return this;
    }

    public IPulsarClientBuilder ExceptionHandler(IHandleException exceptionHandler)
    {
        _exceptionHandlers.Add(exceptionHandler);
        return this;
    }

    public IPulsarClientBuilder KeepAliveInterval(TimeSpan interval)
    {
        _keepAliveInterval = interval;
        return this;
    }

    public IPulsarClientBuilder ListenerName(string listenerName)
    {
        _listenerName = listenerName;
        return this;
    }

    public IPulsarClientBuilder RetryInterval(TimeSpan interval)
    {
        _retryInterval = interval;
        return this;
    }

    public IPulsarClientBuilder ServiceUrl(Uri uri)
    {
        _serviceUrl = uri;
        return this;
    }

    public IPulsarClientBuilder TrustedCertificateAuthority(X509Certificate2 trustedCertificateAuthority)
    {
        _trustedCertificateAuthority = trustedCertificateAuthority;
        return this;
    }

    public IPulsarClientBuilder VerifyCertificateAuthority(bool verifyCertificateAuthority)
    {
        _verifyCertificateAuthority = verifyCertificateAuthority;
        return this;
    }

    public IPulsarClientBuilder VerifyCertificateName(bool verifyCertificateName)
    {
        _verifyCertificateName = verifyCertificateName;
        return this;
    }

    public IPulsarClientBuilder CloseInactiveConnectionsInterval(TimeSpan interval)
    {
        _closeInactiveConnectionsInterval = interval;
        return this;
    }

    public IPulsarClient Build()
    {
        var scheme = _serviceUrl.Scheme;

        if (scheme == Constants.PulsarScheme)
        {
            _encryptionPolicy ??= EncryptionPolicy.EnforceUnencrypted;

            if (_encryptionPolicy.Value == EncryptionPolicy.EnforceEncrypted)
                throw new ConnectionSecurityException(
                    $"The scheme of the ServiceUrl ({_serviceUrl}) is '{Constants.PulsarScheme}' and cannot be used with an encryption policy of 'EnforceEncrypted'");
        }
        else if (scheme == Constants.PulsarSslScheme)
        {
            _encryptionPolicy ??= EncryptionPolicy.EnforceEncrypted;

            if (_encryptionPolicy.Value == EncryptionPolicy.EnforceUnencrypted)
                throw new ConnectionSecurityException(
                    $"The scheme of the ServiceUrl ({_serviceUrl}) is '{Constants.PulsarSslScheme}' and cannot be used with an encryption policy of 'EnforceUnencrypted'");
        }
        else
            throw new InvalidSchemeException($"Invalid scheme '{scheme}'. Expected '{Constants.PulsarScheme}' or '{Constants.PulsarSslScheme}'");

        var connector = new Connector(_clientCertificates, _trustedCertificateAuthority, _verifyCertificateAuthority, _verifyCertificateName);

        var exceptionHandlers = new List<IHandleException>(_exceptionHandlers) { new DefaultExceptionHandler(_retryInterval) };
        var exceptionHandlerPipeline = new ExceptionHandlerPipeline(exceptionHandlers);
        var connectionPool = new ConnectionPool(_commandConnect, _serviceUrl, connector, _encryptionPolicy.Value, _closeInactiveConnectionsInterval, _listenerName,
            _keepAliveInterval,
            new Executor(Guid.Empty, new EmptyRegisterEvent(), exceptionHandlerPipeline),
            _tokenFactory);
        var processManager = new ProcessManager(connectionPool);

        return new PulsarClient(connectionPool, processManager, exceptionHandlerPipeline, _serviceUrl);
    }
}

internal class EmptyRegisterEvent : IRegisterEvent
{
    public void Register(IEvent @event) { }
}
