﻿/*
 * Licensed under the Apache License, Version 2.0 (the "License");
 * you may not use this file except in compliance with the License.
 * You may obtain a copy of the License at
 *
 *   http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

namespace DotPulsar.Internal;

using Abstractions;
using Extensions;
using PulsarApi;
using System;
using System.Buffers;
using System.Threading.Tasks;

public sealed class ChannelManager : IDisposable
{
    private readonly RequestResponseHandler _requestResponseHandler;
    private readonly IdLookup<IChannel> _consumerChannels;
    private readonly IdLookup<IChannel> _producerChannels;
    private readonly EnumLookup<BaseCommand.Type, Action<BaseCommand>> _incoming;

    public ChannelManager()
    {
        _requestResponseHandler = new RequestResponseHandler();
        _consumerChannels = new IdLookup<IChannel>();
        _producerChannels = new IdLookup<IChannel>();
        _incoming = new EnumLookup<BaseCommand.Type, Action<BaseCommand>>(cmd => _requestResponseHandler.Incoming(cmd));
        _incoming.Set(BaseCommand.Type.CloseConsumer, cmd => Incoming(cmd.CloseConsumer));
        _incoming.Set(BaseCommand.Type.CloseProducer, cmd => Incoming(cmd.CloseProducer));
        _incoming.Set(BaseCommand.Type.ActiveConsumerChange, cmd => Incoming(cmd.ActiveConsumerChange));
        _incoming.Set(BaseCommand.Type.ReachedEndOfTopic, cmd => Incoming(cmd.ReachedEndOfTopic));
    }

    public bool HasChannels()
        => !_consumerChannels.IsEmpty() || !_producerChannels.IsEmpty();

    public Task<ProducerResponse> Outgoing(CommandProducer command, IChannel channel)
    {
        var producerId = _producerChannels.Add(channel);
        command.ProducerId = producerId;
        var response = _requestResponseHandler.Outgoing(command);

        return response.ContinueWith(result =>
        {
            if (result.Result.CommandType == BaseCommand.Type.Error)
            {
                _ = _producerChannels.Remove(producerId);
                result.Result.Error.Throw();
            }

            channel.Connected();

            return new ProducerResponse(producerId, result.Result.ProducerSuccess.ProducerName);
        }, TaskContinuationOptions.OnlyOnRanToCompletion);
    }

    public Task<SubscribeResponse> Outgoing(CommandSubscribe command, IChannel channel)
    {
        var consumerId = _consumerChannels.Add(channel);
        command.ConsumerId = consumerId;
        var response = _requestResponseHandler.Outgoing(command);

        return response.ContinueWith(result =>
        {
            if (result.Result.CommandType == BaseCommand.Type.Error)
            {
                _ = _consumerChannels.Remove(consumerId);
                result.Result.Error.Throw();
            }

            channel.Connected();

            return new SubscribeResponse(consumerId);
        }, TaskContinuationOptions.OnlyOnRanToCompletion);
    }

    public Task<BaseCommand> Outgoing(CommandCloseConsumer command)
    {
        var consumerId = command.ConsumerId;

        Task<BaseCommand> response;

        using (TakeConsumerSenderLock(consumerId))
        {
            response = _requestResponseHandler.Outgoing(command);
        }

        _ = response.ContinueWith(result =>
        {
            if (result.Result.CommandType == BaseCommand.Type.Success)
                _ = _consumerChannels.Remove(consumerId);
        }, TaskContinuationOptions.OnlyOnRanToCompletion);

        return response;
    }

    public Task<BaseCommand> Outgoing(CommandCloseProducer command)
    {
        var producerId = command.ProducerId;

        Task<BaseCommand> response;

        using (TakeProducerSenderLock(producerId))
        {
            response = _requestResponseHandler.Outgoing(command);
        }

        _ = response.ContinueWith(result =>
        {
            if (result.Result.CommandType == BaseCommand.Type.Success)
                _ = _producerChannels.Remove(producerId);
        }, TaskContinuationOptions.OnlyOnRanToCompletion);

        return response;
    }

    public Task<BaseCommand> Outgoing(CommandUnsubscribe command)
    {
        var consumerId = command.ConsumerId;

        Task<BaseCommand> response;

        using (TakeConsumerSenderLock(consumerId))
        {
            response = _requestResponseHandler.Outgoing(command);
        }

        _ = response.ContinueWith(result =>
        {
            if (result.Result.CommandType == BaseCommand.Type.Success)
                _consumerChannels.Remove(consumerId)?.Unsubscribed();
        }, TaskContinuationOptions.OnlyOnRanToCompletion);

        return response;
    }

    public Task<BaseCommand> Outgoing(CommandSend command)
    {
        using (TakeProducerSenderLock(command.ProducerId))
        {
            return _requestResponseHandler.Outgoing(command);
        }
    }

    public Task<BaseCommand> Outgoing(CommandGetOrCreateSchema command)
        => _requestResponseHandler.Outgoing(command);

    public Task<BaseCommand> Outgoing(CommandConnect command)
        => _requestResponseHandler.Outgoing(command);

    public Task<BaseCommand> Outgoing(CommandLookupTopic command)
        => _requestResponseHandler.Outgoing(command);

    public Task<BaseCommand> Outgoing(CommandPartitionedTopicMetadata command)
        => _requestResponseHandler.Outgoing(command);

    public Task<BaseCommand> Outgoing(CommandSeek command)
    {
        using (TakeConsumerSenderLock(command.ConsumerId))
        {
            return _requestResponseHandler.Outgoing(command);
        }
    }

    public Task<BaseCommand> Outgoing(CommandGetLastMessageId command)
    {
        using (TakeConsumerSenderLock(command.ConsumerId))
        {
            return _requestResponseHandler.Outgoing(command);
        }
    }

    public void Incoming(BaseCommand command)
        => _incoming.Get(command.CommandType)(command);

    public void Incoming(CommandMessage command, ReadOnlySequence<byte> data)
        => _consumerChannels[command.ConsumerId]?.Received(new MessagePackage(command.MessageId, command.RedeliveryCount, data));

    public void Dispose()
    {
        _requestResponseHandler.Dispose();

        foreach (var channel in _consumerChannels.RemoveAll())
            channel.Disconnected();

        foreach (var channel in _producerChannels.RemoveAll())
            channel.Disconnected();
    }

    private void Incoming(CommandReachedEndOfTopic command)
        => _consumerChannels[command.ConsumerId]?.ReachedEndOfTopic();

    private void Incoming(CommandCloseConsumer command)
    {
        var channel = _consumerChannels[command.ConsumerId];

        if (channel is null)
            return;

        _ = _consumerChannels.Remove(command.ConsumerId);
        _requestResponseHandler.Incoming(command);
        channel.ClosedByServer();
    }

    private void Incoming(CommandCloseProducer command)
    {
        var channel = _producerChannels[command.ProducerId];

        if (channel is null)
            return;

        _ = _producerChannels.Remove(command.ProducerId);
        _requestResponseHandler.Incoming(command);
        channel.ClosedByServer();
    }

    private void Incoming(CommandActiveConsumerChange command)
    {
        var channel = _consumerChannels[command.ConsumerId];

        if (channel is null)
            return;

        if (command.IsActive)
            channel.Activated();
        else
            channel.Deactivated();
    }

    private IDisposable TakeConsumerSenderLock(ulong consumerId)
    {
        var channel = _consumerChannels[consumerId];
        if (channel is null)
            throw new OperationCanceledException();

        return channel.SenderLock();
    }

    private IDisposable TakeProducerSenderLock(ulong producerId)
    {
        var channel = _producerChannels[producerId];
        if (channel is null)
            throw new OperationCanceledException();

        return channel.SenderLock();
    }
}
