﻿/*
 * Licensed under the Apache License, Version 2.0 (the "License");
 * you may not use this file except in compliance with the License.
 * You may obtain a copy of the License at
 *
 *   http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

namespace DotPulsar.Internal;

using Abstractions;
using DotPulsar.Abstractions;
using DotPulsar.Exceptions;
using Extensions;
using PulsarApi;
using System;
using System.Collections.Generic;
using System.Threading;
using System.Threading.Tasks;

public sealed class ConsumerChannel<TMessage> : IConsumerChannel<TMessage>
{
    private readonly ulong _id;
    private readonly AsyncQueue<MessagePackage> _queue;
    private readonly IConnection _connection;
    private readonly BatchHandler<TMessage> _batchHandler;
    private readonly CommandFlow _cachedCommandFlow;
    private readonly IMessageFactory<TMessage> _messageFactory;
    private readonly IDecompress?[] _decompressors;
    private readonly AsyncLock _lock;
    private uint _sendWhenZero;
    private bool _firstFlow;

    public ConsumerChannel(
        ulong id,
        uint messagePrefetchCount,
        AsyncQueue<MessagePackage> queue,
        IConnection connection,
        BatchHandler<TMessage> batchHandler,
        IMessageFactory<TMessage> messageFactory,
        IEnumerable<IDecompressorFactory> decompressorFactories)
    {
        _id = id;
        _queue = queue;
        _connection = connection;
        _batchHandler = batchHandler;
        _messageFactory = messageFactory;

        _decompressors = new IDecompress[5];

        foreach (var decompressorFactory in decompressorFactories)
        {
            _decompressors[(int) decompressorFactory.CompressionType] = decompressorFactory.Create();
        }

        _lock = new AsyncLock();

        _cachedCommandFlow = new CommandFlow
        {
            ConsumerId = id,
            MessagePermits = messagePrefetchCount
        };

        _sendWhenZero = 0;
        _firstFlow = true;
    }

    public async ValueTask<IMessage<TMessage>> Receive(CancellationToken cancellationToken)
    {
        using (await _lock.Lock(cancellationToken).ConfigureAwait(false))
        {
            while (true)
            {
                if (_sendWhenZero == 0)
                    await SendFlow(cancellationToken).ConfigureAwait(false);

                _sendWhenZero--;

                var message = _batchHandler.GetNext();

                if (message is not null)
                    return message;

                var messagePackage = await _queue.Dequeue(cancellationToken).ConfigureAwait(false);

                if (!messagePackage.ValidateMagicNumberAndChecksum())
                {
                    await RejectPackage(messagePackage, CommandAck.ValidationErrorType.ChecksumMismatch, cancellationToken).ConfigureAwait(false);
                    continue;
                }

                var metadataSize = messagePackage.GetMetadataSize();
                var metadata = messagePackage.ExtractMetadata(metadataSize);
                var data = messagePackage.ExtractData(metadataSize);

                if (metadata.Compression != CompressionType.None)
                {
                    var decompressor = _decompressors[(int) metadata.Compression];
                    if (decompressor is null)
                        throw new CompressionException($"Support for {metadata.Compression} compression was not found");

                    try
                    {
                        data = decompressor.Decompress(data, (int) metadata.UncompressedSize);
                    }
                    catch
                    {
                        await RejectPackage(messagePackage, CommandAck.ValidationErrorType.DecompressionError, cancellationToken).ConfigureAwait(false);
                        continue;
                    }
                }

                var messageId = messagePackage.MessageId;
                var redeliveryCount = messagePackage.RedeliveryCount;

                if (metadata.ShouldSerializeNumMessagesInBatch())
                {
                    try
                    {
                        return _batchHandler.Add(messageId, redeliveryCount, metadata, data);
                    }
                    catch
                    {
                        await RejectPackage(messagePackage, CommandAck.ValidationErrorType.BatchDeSerializeError, cancellationToken).ConfigureAwait(false);
                        continue;
                    }
                }

                return _messageFactory.Create(messageId.ToMessageId(), redeliveryCount, data, metadata);
            }
        }
    }

    public async Task Send(CommandAck command, CancellationToken cancellationToken)
    {
        var messageId = command.MessageIds[0];

        if (messageId.BatchIndex != -1)
        {
            var batchMessageId = _batchHandler.Acknowledge(messageId);

            if (batchMessageId is null)
                return;

            command.MessageIds[0] = batchMessageId;
        }

        command.ConsumerId = _id;
        await _connection.Send(command, cancellationToken).ConfigureAwait(false);
    }

    public async Task Send(CommandRedeliverUnacknowledgedMessages command, CancellationToken cancellationToken)
    {
        command.ConsumerId = _id;
        await _connection.Send(command, cancellationToken).ConfigureAwait(false);
    }

    public async Task Send(CommandUnsubscribe command, CancellationToken cancellationToken)
    {
        command.ConsumerId = _id;
        var response = await _connection.Send(command, cancellationToken).ConfigureAwait(false);
        response.Expect(BaseCommand.Type.Success);
    }

    public async Task Send(CommandSeek command, CancellationToken cancellationToken)
    {
        command.ConsumerId = _id;
        var response = await _connection.Send(command, cancellationToken).ConfigureAwait(false);
        response.Expect(BaseCommand.Type.Success);
        _batchHandler.Clear();
    }

    public async Task<MessageId> Send(CommandGetLastMessageId command, CancellationToken cancellationToken)
    {
        command.ConsumerId = _id;
        var response = await _connection.Send(command, cancellationToken).ConfigureAwait(false);
        response.Expect(BaseCommand.Type.GetLastMessageIdResponse);
        return response.GetLastMessageIdResponse.LastMessageId.ToMessageId();
    }

    public async ValueTask DisposeAsync()
    {
        _queue.Dispose();

        for (var i = 0; i < _decompressors.Length; ++i)
        {
            _decompressors[i]?.Dispose();
        }

        await _lock.DisposeAsync().ConfigureAwait(false);
    }

    private async ValueTask SendFlow(CancellationToken cancellationToken)
    {
        await _connection.Send(_cachedCommandFlow, cancellationToken).ConfigureAwait(false);

        if (_firstFlow)
        {
            _cachedCommandFlow.MessagePermits = (uint) Math.Ceiling(_cachedCommandFlow.MessagePermits * 0.5);
            _firstFlow = false;
        }

        _sendWhenZero = _cachedCommandFlow.MessagePermits;
    }

    private async Task RejectPackage(MessagePackage messagePackage, CommandAck.ValidationErrorType validationErrorType, CancellationToken cancellationToken)
    {
        var ack = new CommandAck
        {
            Type = CommandAck.AckType.Individual,
            ValidationError = validationErrorType
        };

        ack.MessageIds.Add(messagePackage.MessageId);

        await Send(ack, cancellationToken).ConfigureAwait(false);
    }

    public async ValueTask ClosedByClient(CancellationToken cancellationToken)
    {
        try
        {
            var closeConsumer = new CommandCloseConsumer { ConsumerId = _id };
            await _connection.Send(closeConsumer, cancellationToken).ConfigureAwait(false);
        }
        catch
        {
            // Ignore
        }
    }
}
