﻿/*
 * Licensed under the Apache License, Version 2.0 (the "License");
 * you may not use this file except in compliance with the License.
 * You may obtain a copy of the License at
 *
 *   http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

namespace DotPulsar.Internal.Extensions;

using DotPulsar.Exceptions;
using Exceptions;
using PulsarApi;

public static class CommandExtensions
{
    public static void Expect(this BaseCommand command, BaseCommand.Type type)
    {
        if (command.CommandType == type)
            return;

        if (command.CommandType == BaseCommand.Type.Error)
            command.Error.Throw();

        if (command.CommandType == BaseCommand.Type.SendError)
            command.SendError.Throw();

        throw new UnexpectedResponseException($"Expected '{type}' but got '{command.CommandType}'");
    }

    public static void Throw(this CommandSendError command)
        => Throw(command.Error, command.Message);

    public static void Throw(this CommandLookupTopicResponse command)
        => Throw(command.Error, command.Message);

    public static void Throw(this CommandError command)
        => Throw(command.Error, command.Message);

    public static void Throw(this CommandGetOrCreateSchemaResponse command)
        => Throw(command.ErrorCode, command.ErrorMessage);

    public static void Throw(this CommandPartitionedTopicMetadataResponse command)
        => Throw(command.Error, command.Message);

    private static void Throw(ServerError error, string message)
        => throw (error switch
        {
            ServerError.AuthenticationError => new AuthenticationException(message),
            ServerError.AuthorizationError => new AuthorizationException(message),
            ServerError.ChecksumError => new ChecksumException(message),
            ServerError.ConsumerAssignError => new ConsumerAssignException(message),
            ServerError.ConsumerBusy => new ConsumerBusyException(message),
            ServerError.ConsumerNotFound => new ConsumerNotFoundException(message),
            ServerError.IncompatibleSchema => new IncompatibleSchemaException(message),
            ServerError.InvalidTopicName => new InvalidTopicNameException(message),
            ServerError.InvalidTxnStatus => new InvalidTransactionStatusException(message),
            ServerError.MetadataError => new MetadataException(message),
            ServerError.NotAllowedError => new NotAllowedException(message),
            ServerError.PersistenceError => new PersistenceException(message),
            ServerError.ProducerBlockedQuotaExceededError => new ProducerBlockedQuotaExceededException($"{message}. Error code: {error}"),
            ServerError.ProducerBlockedQuotaExceededException => new ProducerBlockedQuotaExceededException($"{message}. Error code: {error}"),
            ServerError.ProducerBusy => new ProducerBusyException(message),
            ServerError.ServiceNotReady => new ServiceNotReadyException(message),
            ServerError.SubscriptionNotFound => new SubscriptionNotFoundException(message),
            ServerError.TooManyRequests => new TooManyRequestsException(message),
            ServerError.TopicNotFound => new TopicNotFoundException(message),
            ServerError.TopicTerminatedError => new TopicTerminatedException(message),
            ServerError.TransactionConflict => new TransactionConflictException(message),
            ServerError.TransactionCoordinatorNotFound => new TransactionCoordinatorNotFoundException(message),
            ServerError.UnknownError => new UnknownException($"{message}. Error code: {error}"),
            ServerError.UnsupportedVersionError => new UnsupportedVersionException(message),
            _ => new UnknownException($"{message}. Error code: {error}")
        });

    public static BaseCommand AsBaseCommand(this CommandAck command)
        => new()
        {
            CommandType = BaseCommand.Type.Ack,
            Ack = command
        };

    public static BaseCommand AsBaseCommand(this CommandConnect command)
        => new()
        {
            CommandType = BaseCommand.Type.Connect,
            Connect = command
        };

    public static BaseCommand AsBaseCommand(this CommandPing command)
        => new()
        {
            CommandType = BaseCommand.Type.Ping,
            Ping = command
        };

    public static BaseCommand AsBaseCommand(this CommandAuthResponse command)
        => new()
        {
            CommandType = BaseCommand.Type.AuthResponse,
            AuthResponse = command
        };

    public static BaseCommand AsBaseCommand(this CommandPong command)
        => new()
        {
            CommandType = BaseCommand.Type.Pong,
            Pong = command
        };

    public static BaseCommand AsBaseCommand(this CommandProducer command)
        => new()
        {
            CommandType = BaseCommand.Type.Producer,
            Producer = command
        };

    public static BaseCommand AsBaseCommand(this CommandGetLastMessageId command)
        => new()
        {
            CommandType = BaseCommand.Type.GetLastMessageId,
            GetLastMessageId = command
        };

    public static BaseCommand AsBaseCommand(this CommandUnsubscribe command)
        => new()
        {
            CommandType = BaseCommand.Type.Unsubscribe,
            Unsubscribe = command
        };

    public static BaseCommand AsBaseCommand(this CommandSubscribe command)
        => new()
        {
            CommandType = BaseCommand.Type.Subscribe,
            Subscribe = command
        };

    public static BaseCommand AsBaseCommand(this CommandLookupTopic command)
        => new()
        {
            CommandType = BaseCommand.Type.Lookup,
            LookupTopic = command
        };

    public static BaseCommand AsBaseCommand(this CommandSend command)
        => new()
        {
            CommandType = BaseCommand.Type.Send,
            Send = command
        };

    public static BaseCommand AsBaseCommand(this CommandFlow command)
        => new()
        {
            CommandType = BaseCommand.Type.Flow,
            Flow = command
        };

    public static BaseCommand AsBaseCommand(this CommandCloseProducer command)
        => new()
        {
            CommandType = BaseCommand.Type.CloseProducer,
            CloseProducer = command
        };

    public static BaseCommand AsBaseCommand(this CommandCloseConsumer command)
        => new()
        {
            CommandType = BaseCommand.Type.CloseConsumer,
            CloseConsumer = command
        };

    public static BaseCommand AsBaseCommand(this CommandSeek command)
        => new()
        {
            CommandType = BaseCommand.Type.Seek,
            Seek = command
        };

    public static BaseCommand AsBaseCommand(this CommandRedeliverUnacknowledgedMessages command)
        => new()
        {
            CommandType = BaseCommand.Type.RedeliverUnacknowledgedMessages,
            RedeliverUnacknowledgedMessages = command
        };

    public static BaseCommand AsBaseCommand(this CommandGetOrCreateSchema command)
        => new()
        {
            CommandType = BaseCommand.Type.GetOrCreateSchema,
            GetOrCreateSchema = command
        };

    public static BaseCommand AsBaseCommand(this CommandPartitionedTopicMetadata command)
        => new()
        {
            CommandType = BaseCommand.Type.PartitionedMetadata,
            PartitionMetadata = command
        };
}
