﻿/*
 * Licensed under the Apache License, Version 2.0 (the "License");
 * you may not use this file except in compliance with the License.
 * You may obtain a copy of the License at
 *
 *   http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

namespace DotPulsar;

using Abstractions;
using DotPulsar.Internal.Compression;
using DotPulsar.Internal.PulsarApi;
using Exceptions;
using Internal;
using Internal.Abstractions;
using System;
using System.Linq;
using System.Threading;
using System.Threading.Tasks;

/// <summary>
/// Pulsar client for creating producers, consumers and readers.
/// </summary>
public sealed class PulsarClient : IPulsarClient
{
    private readonly IConnectionPool _connectionPool;
    private readonly ProcessManager _processManager;
    private readonly IHandleException _exceptionHandler;
    private int _isDisposed;

    public Uri ServiceUrl { get; }

    internal PulsarClient(
        IConnectionPool connectionPool,
        ProcessManager processManager,
        IHandleException exceptionHandler,
        Uri serviceUrl)
    {
        _connectionPool = connectionPool;
        _processManager = processManager;
        _exceptionHandler = exceptionHandler;
        ServiceUrl = serviceUrl;
        _isDisposed = 0;
        DotPulsarEventSource.Log.ClientCreated();
    }

    /// <summary>
    /// Get a builder that can be used to configure and build a PulsarClient instance.
    /// </summary>
    public static IPulsarClientBuilder Builder()
        => new PulsarClientBuilder();

    /// <summary>
    /// Create a producer.
    /// </summary>
    public IProducer<TMessage> CreateProducer<TMessage>(ProducerOptions<TMessage> options)
    {
        ThrowIfDisposed();

        ICompressorFactory? compressorFactory = null;
        if (options.CompressionType != CompressionType.None)
        {
            var compressionType = (Internal.PulsarApi.CompressionType) options.CompressionType;
            compressorFactory = CompressionFactories.CompressorFactories().SingleOrDefault(f => f.CompressionType == compressionType);

            if (compressorFactory is null)
                throw new CompressionException($"Support for {compressionType} compression was not found");
        }

        var producer = new Producer<TMessage>(ServiceUrl, options, _processManager, _exceptionHandler, _connectionPool, compressorFactory);

        if (options.StateChangedHandler is not null)
            _ = StateMonitor.MonitorProducer(producer, options.StateChangedHandler);

        return producer;
    }

    /// <summary>
    /// Create a consumer.
    /// </summary>
    public IConsumer<TMessage> CreateConsumer<TMessage>(ConsumerOptions<TMessage> options)
    {
        ThrowIfDisposed();

        var correlationId = Guid.NewGuid();
        var consumerName = options.ConsumerName ?? $"Consumer-{correlationId:N}";
        var subscribe = new CommandSubscribe
        {
            ConsumerName = consumerName,
            InitialPosition = (CommandSubscribe.InitialPositionType) options.InitialPosition,
            PriorityLevel = options.PriorityLevel,
            ReadCompacted = options.ReadCompacted,
            Subscription = options.SubscriptionName,
            Topic = options.Topic,
            Type = (CommandSubscribe.SubType) options.SubscriptionType
        };
        var messagePrefetchCount = options.MessagePrefetchCount;
        var messageFactory = new MessageFactory<TMessage>(options.Schema);
        var batchHandler = new BatchHandler<TMessage>(true, messageFactory);
        var decompressorFactories = CompressionFactories.DecompressorFactories();
        var factory = new ConsumerChannelFactory<TMessage>(correlationId, _processManager, _connectionPool, subscribe, messagePrefetchCount, batchHandler, messageFactory, decompressorFactories);
        var stateManager = new StateManager<ConsumerState>(ConsumerState.Disconnected, ConsumerState.Closed, ConsumerState.ReachedEndOfTopic, ConsumerState.Faulted);
        var initialChannel = new NotReadyChannel<TMessage>();
        var executor = new Executor(correlationId, _processManager, _exceptionHandler);
        var consumer = new Consumer<TMessage>(correlationId, ServiceUrl, options.SubscriptionName, options.Topic, _processManager, initialChannel, executor, stateManager, factory);
        if (options.StateChangedHandler is not null)
            _ = StateMonitor.MonitorConsumer(consumer, options.StateChangedHandler);
        var process = new ConsumerProcess(correlationId, stateManager, consumer, options.SubscriptionType == SubscriptionType.Failover);
        _processManager.Add(process);
        process.Start();
        return consumer;
    }

    /// <summary>
    /// Create a reader.
    /// </summary>
    public IReader<TMessage> CreateReader<TMessage>(ReaderOptions<TMessage> options)
    {
        ThrowIfDisposed();

        var correlationId = Guid.NewGuid();
        var subscription = $"Reader-{correlationId:N}";
        var subscribe = new CommandSubscribe
        {
            ConsumerName = options.ReaderName ?? subscription,
            Durable = false,
            ReadCompacted = options.ReadCompacted,
            StartMessageId = options.StartMessageId.ToMessageIdData(),
            Subscription = subscription,
            Topic = options.Topic
        };
        var messagePrefetchCount = options.MessagePrefetchCount;
        var messageFactory = new MessageFactory<TMessage>(options.Schema);
        var batchHandler = new BatchHandler<TMessage>(false, messageFactory);
        var decompressorFactories = CompressionFactories.DecompressorFactories();
        var factory = new ConsumerChannelFactory<TMessage>(correlationId, _processManager, _connectionPool, subscribe, messagePrefetchCount, batchHandler, messageFactory, decompressorFactories);
        var stateManager = new StateManager<ReaderState>(ReaderState.Disconnected, ReaderState.Closed, ReaderState.ReachedEndOfTopic, ReaderState.Faulted);
        var initialChannel = new NotReadyChannel<TMessage>();
        var executor = new Executor(correlationId, _processManager, _exceptionHandler);
        var reader = new Reader<TMessage>(correlationId, ServiceUrl, options.Topic, _processManager, initialChannel, executor, stateManager, factory);
        if (options.StateChangedHandler is not null)
            _ = StateMonitor.MonitorReader(reader, options.StateChangedHandler);
        var process = new ReaderProcess(correlationId, stateManager, reader);
        _processManager.Add(process);
        process.Start();
        return reader;
    }

    /// <summary>
    /// Dispose the client and all its producers, consumers and readers.
    /// </summary>
    public async ValueTask DisposeAsync()
    {
        if (Interlocked.Exchange(ref _isDisposed, 1) != 0)
            return;

        if (_processManager is IAsyncDisposable disposable)
            await disposable.DisposeAsync().ConfigureAwait(false);

        DotPulsarEventSource.Log.ClientDisposed();
    }

    private void ThrowIfDisposed()
    {
        if (_isDisposed != 0)
            throw new PulsarClientDisposedException();
    }
}
