/*
 * Licensed under the Apache License, Version 2.0 (the "License");
 * you may not use this file except in compliance with the License.
 * You may obtain a copy of the License at
 *
 *   http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

namespace DotPulsar.Internal;

using DotPulsar.Abstractions;
using DotPulsar.Exceptions;
using DotPulsar.Internal.Abstractions;
using DotPulsar.Internal.Extensions;
using DotPulsar.Internal.PulsarApi;
using System.Collections.Concurrent;

public sealed class ConnectionPool : IConnectionPool
{
    private readonly CommandConnect _commandConnect;
    private readonly Uri _serviceUrl;
    private readonly Connector _connector;
    private readonly EncryptionPolicy _encryptionPolicy;
    private readonly ConcurrentDictionary<PulsarUrl, Connection> _connections;
    private readonly CancellationTokenSource _cancellationTokenSource;
    private readonly string? _listenerName;
    private readonly TimeSpan _closeInactiveConnectionsInterval;
    private readonly TimeSpan _keepAliveInterval;
    private readonly IAuthentication? _authentication;

    public ConnectionPool(
        CommandConnect commandConnect,
        Uri serviceUrl,
        Connector connector,
        EncryptionPolicy encryptionPolicy,
        TimeSpan closeInactiveConnectionsInterval,
        string? listenerName,
        TimeSpan keepAliveInterval,
        IAuthentication? authentication)
    {
        _commandConnect = commandConnect;
        _serviceUrl = serviceUrl;
        _connector = connector;
        _encryptionPolicy = encryptionPolicy;
        _listenerName = listenerName;
        _connections = new ConcurrentDictionary<PulsarUrl, Connection>();
        _cancellationTokenSource = new CancellationTokenSource();
        _closeInactiveConnectionsInterval = closeInactiveConnectionsInterval;
        _keepAliveInterval = keepAliveInterval;
        _authentication = authentication;
    }

    public async ValueTask DisposeAsync()
    {
        _cancellationTokenSource.Cancel();

        foreach (var entry in _connections.ToArray())
        {
            await DisposeConnection(entry.Key, entry.Value).ConfigureAwait(false);
        }
    }

    public async ValueTask<IConnection> FindConnectionForTopic(string topic, CancellationToken cancellationToken)
    {
        var lookup = new CommandLookupTopic
        {
            Topic = topic,
            Authoritative = false,
            AdvertisedListenerName = _listenerName
        };

        var physicalUrl = _serviceUrl;

        while (true)
        {
            var connection = await GetConnection(physicalUrl, cancellationToken).ConfigureAwait(false);
            var response = await connection.Send(lookup, cancellationToken).ConfigureAwait(false);

            response.Expect(BaseCommand.Type.LookupResponse);

            if (response.LookupTopicResponse.Response == CommandLookupTopicResponse.LookupType.Failed)
                response.LookupTopicResponse.Throw();

            lookup.Authoritative = response.LookupTopicResponse.Authoritative;

            var lookupResponseServiceUrl = new Uri(GetBrokerServiceUrl(response.LookupTopicResponse));

            if (response.LookupTopicResponse.Response == CommandLookupTopicResponse.LookupType.Redirect || !response.LookupTopicResponse.Authoritative)
            {
                physicalUrl = lookupResponseServiceUrl;
                continue;
            }

            if (response.LookupTopicResponse.ProxyThroughServiceUrl)
            {
                var url = new PulsarUrl(physicalUrl, lookupResponseServiceUrl);
                return await GetConnection(url, cancellationToken).ConfigureAwait(false);
            }

            // LookupType is 'Connect', ServiceUrl is local and response is authoritative. Assume the Pulsar server is a standalone docker.
            return lookupResponseServiceUrl.IsLoopback
                ? connection
                : await GetConnection(lookupResponseServiceUrl, cancellationToken).ConfigureAwait(false);
        }
    }

    private string GetBrokerServiceUrl(CommandLookupTopicResponse response)
    {
        var hasBrokerServiceUrl = !string.IsNullOrEmpty(response.BrokerServiceUrl);
        var hasBrokerServiceUrlTls = !string.IsNullOrEmpty(response.BrokerServiceUrlTls);

        switch (_encryptionPolicy)
        {
            case EncryptionPolicy.EnforceEncrypted:
                if (!hasBrokerServiceUrlTls)
                    throw new ConnectionSecurityException("Cannot enforce encrypted connections. The lookup topic response from broker gave no secure alternative.");
                return response.BrokerServiceUrlTls;
            case EncryptionPolicy.EnforceUnencrypted:
                if (!hasBrokerServiceUrl)
                    throw new ConnectionSecurityException("Cannot enforce unencrypted connections. The lookup topic response from broker gave no unsecure alternative.");
                return response.BrokerServiceUrl;
            case EncryptionPolicy.PreferEncrypted:
                return hasBrokerServiceUrlTls ? response.BrokerServiceUrlTls : response.BrokerServiceUrl;
            default:
                return hasBrokerServiceUrl ? response.BrokerServiceUrl : response.BrokerServiceUrlTls;
        }
    }

    private async ValueTask<Connection> GetConnection(Uri serviceUrl, CancellationToken cancellationToken)
        => await GetConnection(new PulsarUrl(serviceUrl, serviceUrl), cancellationToken).ConfigureAwait(false);

    private async ValueTask<Connection> GetConnection(PulsarUrl url, CancellationToken cancellationToken)
    {
        if (_connections.TryGetValue(url, out var connection) && connection is not null)
            return connection;

        return await EstablishNewConnection(url, cancellationToken).ConfigureAwait(false);
    }

    private async Task<Connection> EstablishNewConnection(PulsarUrl url, CancellationToken cancellationToken)
    {
        var stream = await _connector.Connect(url.Physical, cancellationToken).ConfigureAwait(false);

        var commandConnect = _commandConnect;
        if (url.ProxyThroughServiceUrl)
            commandConnect = WithProxyToBroker(commandConnect, url.Logical);

        var connection = Connection.Connect(new PulsarStream(stream), _authentication, _keepAliveInterval, _closeInactiveConnectionsInterval);
        _ = connection.OnStateChangeFrom(ConnectionState.Connected, CancellationToken.None).AsTask().ContinueWith(t => DisposeConnection(url, connection));
        var response = await connection.Send(commandConnect, cancellationToken).ConfigureAwait(false);
        response.Expect(BaseCommand.Type.Connected);
        _connections[url] = connection;
        connection.MaxMessageSize = response.Connected.MaxMessageSize;
        return connection;
    }

    private async ValueTask DisposeConnection(PulsarUrl serviceUrl, Connection connection)
    {
        _connections.TryRemove(serviceUrl, out var _);
        await connection.DisposeAsync().ConfigureAwait(false);
    }

    private static CommandConnect WithProxyToBroker(CommandConnect commandConnect, Uri logicalUrl)
    {
        return new CommandConnect
        {
            AuthData = commandConnect.ShouldSerializeAuthData() ? commandConnect.AuthData : null,
            AuthMethod = commandConnect.ShouldSerializeAuthMethod() ? commandConnect.AuthMethod : AuthMethod.AuthMethodNone,
            AuthMethodName = commandConnect.ShouldSerializeAuthMethodName() ? commandConnect.AuthMethodName : null,
            ClientVersion = commandConnect.ClientVersion,
            OriginalPrincipal = commandConnect.ShouldSerializeOriginalPrincipal() ? commandConnect.OriginalPrincipal : null,
            ProtocolVersion = commandConnect.ProtocolVersion,
            OriginalAuthData = commandConnect.ShouldSerializeOriginalAuthData() ? commandConnect.OriginalAuthData : null,
            OriginalAuthMethod = commandConnect.ShouldSerializeOriginalAuthMethod() ? commandConnect.OriginalAuthMethod : null,
            ProxyToBrokerUrl = $"{logicalUrl.Host}:{logicalUrl.Port}",
            FeatureFlags = commandConnect.FeatureFlags
        };
    }

    private sealed class PulsarUrl : IEquatable<PulsarUrl>
    {
        public PulsarUrl(Uri physical, Uri logical)
        {
            Physical = physical;
            Logical = logical;
            ProxyThroughServiceUrl = physical != logical;
        }

        public Uri Physical { get; }

        public Uri Logical { get; }

        public bool ProxyThroughServiceUrl { get; }

        public bool Equals(PulsarUrl? other)
        {
            if (other is null)
                return false;

            if (ReferenceEquals(this, other))
                return true;

            return Physical.Equals(other.Physical) && Logical.Equals(other.Logical);
        }

        public override bool Equals(object? obj)
            => obj is PulsarUrl url && Equals(url);

        public override int GetHashCode()
            => HashCode.Combine(Physical, Logical);

        public override string ToString()
            => $"{nameof(Physical)}: {Physical}, {nameof(Logical)}: {Logical}, {nameof(ProxyThroughServiceUrl)}: {ProxyThroughServiceUrl}";
    }

    public async ValueTask<uint> GetNumberOfPartitions(string topic, CancellationToken cancellationToken = default)
    {
        var connection = await FindConnectionForTopic(topic, cancellationToken).ConfigureAwait(false);
        var commandPartitionedMetadata = new CommandPartitionedTopicMetadata { Topic = topic };
        var response = await connection.Send(commandPartitionedMetadata, cancellationToken).ConfigureAwait(false);

        response.Expect(BaseCommand.Type.PartitionedMetadataResponse);

        if (response.PartitionMetadataResponse.Response == CommandPartitionedTopicMetadataResponse.LookupType.Failed)
            response.PartitionMetadataResponse.Throw();

        return response.PartitionMetadataResponse.Partitions;
    }
}
