﻿/**
 * Licensed to the Apache Software Foundation (ASF) under one or more
 * contributor license agreements.  See the NOTICE file distributed with
 * this work for additional information regarding copyright ownership.
 * The ASF licenses this file to You under the Apache License, Version 2.0
 * (the "License"); you may not use this file except in compliance with
 * the License.  You may obtain a copy of the License at
 *
 *    http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

namespace Kafka.Client.Producers.Async
{
    using System;
    using System.Collections.Generic;
    using System.Linq;
    using Kafka.Client.Cfg;
    using Kafka.Client.Messages;
    using Kafka.Client.Requests;
    using Kafka.Client.Utils;

    /// <summary>
    /// Sends messages encapsulated in request to Kafka server asynchronously
    /// </summary>
    public class AsyncProducer : IAsyncProducer
    {
        private readonly ICallbackHandler callbackHandler;
        private readonly KafkaConnection connection;
        private volatile bool disposed;

        /// <summary>
        /// Gets producer config
        /// </summary>
        public AsyncProducerConfiguration Config { get; private set; }

        /// <summary>
        /// Initializes a new instance of the <see cref="AsyncProducer"/> class.
        /// </summary>
        /// <param name="config">
        /// The producer config.
        /// </param>
        public AsyncProducer(AsyncProducerConfiguration config)
            : this(
                config,
                ReflectionHelper.Instantiate<ICallbackHandler>(config.CallbackHandlerClass))
        {
        }

        /// <summary>
        /// Initializes a new instance of the <see cref="AsyncProducer"/> class.
        /// </summary>
        /// <param name="config">
        /// The producer config.
        /// </param>
        /// <param name="callbackHandler">
        /// The callback invoked when a request is finished being sent.
        /// </param>
        public AsyncProducer(
            AsyncProducerConfiguration config,
            ICallbackHandler callbackHandler)
        {
            Guard.NotNull(config, "config");

            this.Config = config;
            this.callbackHandler = callbackHandler;
            this.connection = new KafkaConnection(
                this.Config.Host,
                this.Config.Port,
                this.Config.BufferSize,
                this.Config.SocketTimeout);
        }

        /// <summary>
        /// Sends request to Kafka server asynchronously
        /// </summary>
        /// <param name="request">
        /// The request.
        /// </param>
        public void Send(ProducerRequest request)
        {
            this.EnsuresNotDisposed();
            Guard.NotNull(request, "request");
            Guard.Assert<ArgumentException>(() => request.MessageSet.Messages.All(x => x.PayloadSize <= this.Config.MaxMessageSize));
            if (this.callbackHandler != null)
            {
                this.Send(request, this.callbackHandler.Handle);
            }
            else
            {
                this.connection.BeginWrite(request);
            }
        }

        /// <summary>
        /// Sends request to Kafka server asynchronously
        /// </summary>
        /// <param name="request">
        /// The request.
        /// </param>
        /// <param name="callback">
        /// The callback invoked when a request is finished being sent.
        /// </param>
        public void Send(ProducerRequest request, MessageSent<ProducerRequest> callback)
        {
            this.EnsuresNotDisposed();
            Guard.NotNull(request, "request");
            Guard.NotNull(request.MessageSet, "request.MessageSet");
            Guard.NotNull(request.MessageSet.Messages, "request.MessageSet.Messages");
            Guard.Assert<ArgumentException>(
                () => request.MessageSet.Messages.All(x => x.PayloadSize <= this.Config.MaxMessageSize));

            connection.BeginWrite(request, callback);
        }

        /// <summary>
        /// Constructs request and sent it to Kafka server asynchronously
        /// </summary>
        /// <param name="topic">
        /// The topic.
        /// </param>
        /// <param name="partition">
        /// The partition.
        /// </param>
        /// <param name="messages">
        /// The list of messages to sent.
        /// </param>
        public void Send(string topic, int partition, IEnumerable<Message> messages)
        {
            this.EnsuresNotDisposed();
            Guard.NotNullNorEmpty(topic, "topic");
            Guard.NotNull(messages, "messages");
            Guard.Assert<ArgumentException>(() => messages.All(x => x.PayloadSize <= this.Config.MaxMessageSize));

            this.Send(new ProducerRequest(topic, partition, messages));
        }

        /// <summary>
        /// Constructs request and sent it to Kafka server asynchronously
        /// </summary>
        /// <param name="topic">
        /// The topic.
        /// </param>
        /// <param name="partition">
        /// The partition.
        /// </param>
        /// <param name="messages">
        /// The list of messages to sent.
        /// </param>
        /// <param name="callback">
        /// The callback invoked when a request is finished being sent.
        /// </param>
        public void Send(string topic, int partition, IEnumerable<Message> messages, MessageSent<ProducerRequest> callback)
        {
            this.EnsuresNotDisposed();
            Guard.NotNullNorEmpty(topic, "topic");
            Guard.NotNull(messages, "messages");
            Guard.Assert<ArgumentException>(() => messages.All(x => x.PayloadSize <= this.Config.MaxMessageSize));

            this.Send(new ProducerRequest(topic, partition, messages), callback);
        }

        /// <summary>
        /// Releases all unmanaged and managed resources
        /// </summary>
        public void Dispose()
        {
            this.Dispose(true);
            GC.SuppressFinalize(this);
        }

        protected virtual void Dispose(bool disposing)
        {
            if (!disposing)
            {
                return;
            }

            if (this.disposed)
            {
                return;
            }

            this.disposed = true;
            if (this.connection != null)
            {
                this.connection.Dispose();
            }
        }

        /// <summary>
        /// Ensures that object was not disposed
        /// </summary>
        private void EnsuresNotDisposed()
        {
            if (this.disposed)
            {
                throw new ObjectDisposedException(this.GetType().Name);
            }
        }
    }
}
