/*
 * Licensed to the Apache Software Foundation (ASF) under one or more
 * contributor license agreements.  See the NOTICE file distributed with
 * this work for additional information regarding copyright ownership.
 * The ASF licenses this file to You under the Apache License, Version 2.0
 * (the "License"); you may not use this file except in compliance with
 * the License.  You may obtain a copy of the License at
 *
 *      http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

namespace Apache.Ignite.Core.Impl.Client.Datastream
{
    using System;
    using System.Collections.Concurrent;
    using System.Collections.Generic;
    using System.Diagnostics;
    using System.Diagnostics.CodeAnalysis;
    using System.IO;
    using System.Net.Sockets;
    using System.Threading;
    using System.Threading.Tasks;
    using Apache.Ignite.Core.Client;
    using Apache.Ignite.Core.Client.Datastream;
    using Apache.Ignite.Core.Datastream;
    using Apache.Ignite.Core.Impl.Binary;
    using Apache.Ignite.Core.Impl.Common;
    using Apache.Ignite.Core.Impl.Datastream;
    using BinaryWriter = Apache.Ignite.Core.Impl.Binary.BinaryWriter;

    /// <summary>
    /// Thin client data streamer.
    /// </summary>
    internal sealed class DataStreamerClient<TK, TV> : IDataStreamerClient<TK, TV>
    {
        /** Streamer flags. */
        [Flags]
        private enum Flags : byte
        {
            AllowOverwrite = 0x01,
            SkipStore = 0x02,
            KeepBinary = 0x04,
            Flush = 0x08,
            Close = 0x10
        }

        /** */
        private const int ServerBufferSizeAuto = -1;

        /** */
        private const int MaxRetries = 16;

        /** */
        private readonly ClientFailoverSocket _socket;

        /** */
        private readonly int _cacheId;

        /** */
        private readonly string _cacheName;

        /** */
        private readonly DataStreamerClientOptions<TK, TV> _options;

        /** */
        private readonly ConcurrentDictionary<ClientSocket, DataStreamerClientPerNodeBuffer<TK, TV>> _buffers =
            new ConcurrentDictionary<ClientSocket, DataStreamerClientPerNodeBuffer<TK, TV>>();

        /** */
        private readonly ReaderWriterLockSlim _rwLock = new ReaderWriterLockSlim();

        /** Cached flags. */
        private readonly Flags _flags;

        /** */
        private readonly ConcurrentStack<DataStreamerClientEntry<TK, TV>[]> _arrayPool
            = new ConcurrentStack<DataStreamerClientEntry<TK, TV>[]>();

        private readonly Timer _autoFlushTimer;

        /** */
        private int _arraysAllocated;

        /** */
        private long _entriesSent;

        /** Exception. When set, the streamer is closed. */
        private volatile Exception _exception;

        /** Cancelled flag. */
        private volatile bool _cancelled;

        /// <summary>
        /// Initializes a new instance of <see cref="DataStreamerClient{TK,TV}"/>.
        /// </summary>
        /// <param name="socket">Socket.</param>
        /// <param name="cacheName">Cache name.</param>
        /// <param name="options">Options.</param>
        public DataStreamerClient(
            ClientFailoverSocket socket,
            string cacheName,
            DataStreamerClientOptions<TK, TV> options)
        {
            Debug.Assert(socket != null);
            Debug.Assert(!string.IsNullOrEmpty(cacheName));

            // Copy to prevent modification.
            _options = new DataStreamerClientOptions<TK, TV>(options);
            _socket = socket;
            _cacheName = cacheName;
            _cacheId = BinaryUtils.GetCacheId(cacheName);
            _flags = GetFlags(_options);

            var interval = _options.AutoFlushInterval;
            if (interval != TimeSpan.Zero)
            {
                _autoFlushTimer = new Timer(_ => AutoFlush(), null, interval, interval);
            }
        }

        /** <inheritdoc /> */
        public void Dispose()
        {
            Close(cancel: false);
        }

        /** <inheritdoc /> */
        public string CacheName
        {
            get { return _cacheName; }
        }

        /** <inheritdoc /> */
        public bool IsClosed
        {
            get { return _exception != null; }
        }

        /** <inheritdoc /> */
        public DataStreamerClientOptions<TK, TV> Options
        {
            get
            {
                // Copy to prevent modification.
                return new DataStreamerClientOptions<TK, TV>(_options);
            }
        }

        /** <inheritdoc /> */
        public void Add(TK key, TV val)
        {
            IgniteArgumentCheck.NotNull(key, "key");

            Add(new DataStreamerClientEntry<TK, TV>(key, val));
        }

        /** <inheritdoc /> */
        public void Remove(TK key)
        {
            IgniteArgumentCheck.NotNull(key, "key");

            if (!_options.AllowOverwrite)
            {
                throw new IgniteClientException("DataStreamer can't remove data when AllowOverwrite is false.");
            }

            Add(new DataStreamerClientEntry<TK, TV>(key));
        }

        /** <inheritdoc /> */
        public void Flush()
        {
            FlushAsync().Wait();
        }

        /** <inheritdoc /> */
        public Task FlushAsync()
        {
            ThrowIfClosed();

            return FlushInternalAsync();
        }

        /** <inheritdoc /> */
        public void Close(bool cancel)
        {
            CloseAsync(cancel).Wait();
        }

        /** <inheritdoc /> */
        public Task CloseAsync(bool cancel)
        {
            _rwLock.EnterWriteLock();

            try
            {
                if (_exception != null)
                {
                    // Already closed.
                    return TaskRunner.CompletedTask;
                }

                _exception = new ObjectDisposedException("DataStreamerClient", "Data streamer has been disposed");

                if (_autoFlushTimer != null)
                {
                    _autoFlushTimer.Dispose();
                }

                if (cancel)
                {
                    // Disregard current buffers, stop all retry loops.
                    _cancelled = true;

                    return TaskRunner.CompletedTask;
                }

                return FlushInternalAsync();
            }
            finally
            {
                _rwLock.ExitWriteLock();
            }
        }

        /** <inheritdoc /> */
        public override string ToString()
        {
            return string.Format("{0} [CacheName={1}, IsClosed={2}]", GetType().Name, CacheName, IsClosed);
        }

        /// <summary>
        /// Gets the count of sent entries.
        /// </summary>
        internal long EntriesSent
        {
            get { return Interlocked.CompareExchange(ref _entriesSent, -1, -1); }
        }

        /// <summary>
        /// Gets the count of allocated arrays.
        /// </summary>
        internal int ArraysAllocated
        {
            get { return Interlocked.CompareExchange(ref _arraysAllocated, -1, -1); }
        }

        /// <summary>
        /// Gets the count of pooled arrays.
        /// </summary>
        internal int ArraysPooled
        {
            get { return _arrayPool.Count; }
        }

        /// <summary>
        /// Gets the pooled entry array.
        /// </summary>
        internal DataStreamerClientEntry<TK, TV>[] GetPooledArray()
        {
            DataStreamerClientEntry<TK,TV>[] res;

            if (_arrayPool.TryPop(out res))
            {
                // Reset buffer and return.
                Array.Clear(res, 0, res.Length);

                return res;
            }

            Interlocked.Increment(ref _arraysAllocated);
            res = new DataStreamerClientEntry<TK, TV>[_options.PerNodeBufferSize];
            return res;
        }

        /// <summary>
        /// Returns entry array to the pool.
        /// </summary>
        internal void ReturnPooledArray(DataStreamerClientEntry<TK, TV>[] buffer)
        {
            _arrayPool.Push(buffer);
        }

        /// <summary>
        /// Adds an entry to the streamer.
        /// </summary>
        private void Add(DataStreamerClientEntry<TK, TV> entry)
        {
            if (!_rwLock.TryEnterReadLock(0))
            {
                throw new ObjectDisposedException("DataStreamerClient", "Data streamer has been disposed");
            }

            try
            {
                ThrowIfClosed();

                AddNoLock(entry);
            }
            finally
            {
                _rwLock.ExitReadLock();
            }
        }

        /// <summary>
        /// Adds an entry without RW lock.
        /// </summary>
        private void AddNoLock(DataStreamerClientEntry<TK, TV> entry)
        {
            var retries = MaxRetries;

            while (!_cancelled)
            {
                try
                {
                    var socket = _socket.GetAffinitySocket(_cacheId, entry.Key) ?? _socket.GetSocket();
                    var buffer = GetOrAddPerNodeBuffer(socket);

                    if (buffer.Add(entry))
                    {
                        return;
                    }
                }
                catch (Exception e)
                {
                    if (ShouldRetry(e) && retries --> 0)
                    {
                        continue;
                    }

                    throw;
                }
            }
        }

        /// <summary>
        /// Flushes the streamer asynchronously.
        /// </summary>
        private Task FlushInternalAsync()
        {
            if (_buffers.IsEmpty)
            {
                return TaskRunner.CompletedTask;
            }

            var tasks = new List<Task>(_buffers.Count);

            foreach (var pair in _buffers)
            {
                var buffer = pair.Value;
                var task = buffer.FlushAllAsync();

                if (task != null && !task.IsCompleted)
                {
                    tasks.Add(task);
                }
            }

            return TaskRunner.WhenAll(tasks.ToArray());
        }

        /// <summary>
        /// Flushes the specified buffer asynchronously.
        /// </summary>
        internal Task FlushBufferAsync(
            DataStreamerClientBuffer<TK, TV> buffer,
            ClientSocket socket,
            SemaphoreSlim semaphore)
        {
            semaphore.Wait();

            var tcs = new TaskCompletionSource<object>();

            FlushBufferInternalAsync(buffer, socket, tcs);

            return tcs.Task.ContinueWith(t =>
            {
                semaphore.Release();

                _exception = _exception ?? t.Exception;

                return t.Result;
            }, TaskContinuationOptions.ExecuteSynchronously);
        }

        /// <summary>
        /// Flushes the specified buffer asynchronously.
        /// </summary>
        [SuppressMessage("Microsoft.Design", "CA1031:DoNotCatchGeneralExceptionTypes",
            Justification = "Any exception should be propagated to TaskCompletionSource.")]
        private void FlushBufferInternalAsync(
            DataStreamerClientBuffer<TK, TV> buffer,
            ClientSocket socket,
            TaskCompletionSource<object> tcs)
        {
            try
            {
                socket.DoOutInOpAsync(
                        ClientOp.DataStreamerStart,
                        ctx => WriteBuffer(buffer, ctx.Writer),
                        ctx => (object)null,
                        syncCallback: true)
                    .ContinueWith(
                        t => FlushBufferCompleteOrRetry(buffer, socket, tcs, t.Exception),
                        TaskContinuationOptions.ExecuteSynchronously);
            }
            catch (Exception exception)
            {
                FlushBufferCompleteOrRetry(buffer, socket, tcs, exception);
            }
        }

        /// <summary>
        /// Completes or retries the buffer flush operation.
        /// </summary>
        private void FlushBufferCompleteOrRetry(
            DataStreamerClientBuffer<TK, TV> buffer,
            ClientSocket socket,
            TaskCompletionSource<object> tcs,
            Exception exception)
        {
            if (exception == null)
            {
                // Successful flush.
                Interlocked.Add(ref _entriesSent, buffer.Count);
                ReturnPooledArray(buffer.Entries);
                tcs.SetResult(null);

                return;
            }

            if (_cancelled || (!socket.IsDisposed && !ShouldRetry(exception)))
            {
                // Socket is still connected: this error does not need to be retried.
                ReturnPooledArray(buffer.Entries);
                tcs.SetException(exception);

                return;
            }

            // Release receiver thread, perform retry on a separate thread.
            ThreadPool.QueueUserWorkItem(_ => FlushBufferRetry(buffer, socket, tcs));
        }

        /// <summary>
        /// Retries the buffer flush operation.
        /// </summary>
        [SuppressMessage("Microsoft.Design", "CA1031:DoNotCatchGeneralExceptionTypes",
            Justification = "Any exception should be propagated to TaskCompletionSource.")]
        private void FlushBufferRetry(
            DataStreamerClientBuffer<TK, TV> buffer,
            ClientSocket failedSocket,
            TaskCompletionSource<object> tcs)
        {
            try
            {
                // Connection failed. Remove disconnected socket from the map.
                DataStreamerClientPerNodeBuffer<TK, TV> removed;
                _buffers.TryRemove(failedSocket, out removed);

                // Re-add entries to other buffers.
                ReAddEntriesAndReturnBuffer(buffer);

                if (removed != null)
                {
                    var remaining = removed.Close();

                    while (remaining != null)
                    {
                        if (remaining.MarkFlushed())
                        {
                            ReAddEntriesAndReturnBuffer(remaining);
                        }

                        remaining = remaining.Previous;
                    }
                }

                // Note: if initial flush was caused by full buffer, not requested by the user,
                // we don't need to force flush everything here - just re-add entries to other buffers.
                FlushInternalAsync().ContinueWith(flushTask => flushTask.SetAsResult(tcs));
            }
            catch (Exception e)
            {
                tcs.SetException(e);
            }
        }

        /// <summary>
        /// Re-adds obsolete buffer entries to new buffers and returns the array to the pool.
        /// </summary>
        private void ReAddEntriesAndReturnBuffer(DataStreamerClientBuffer<TK, TV> buffer)
        {
            var count = buffer.Count;
            var entries = buffer.Entries;

            for (var i = 0; i < count; i++)
            {
                var entry = entries[i];

                if (!entry.IsEmpty)
                {
                    AddNoLock(entry);
                }
            }

            ReturnPooledArray(entries);
        }

        /// <summary>
        /// Writes buffer data to the specified writer.
        /// </summary>
        private void WriteBuffer(DataStreamerClientBuffer<TK, TV> buffer, BinaryWriter w)
        {
            w.WriteInt(_cacheId);
            w.WriteByte((byte) _flags);
            w.WriteInt(ServerBufferSizeAuto); // Server per-node buffer size.
            w.WriteInt(ServerBufferSizeAuto); // Server per-thread buffer size.

            if (_options.Receiver != null)
            {
                var rcvHolder = new StreamReceiverHolder(_options.Receiver,
                    (rec, grid, cache, stream, keepBinary) =>
                        StreamReceiverHolder.InvokeReceiver((IStreamReceiver<TK, TV>) rec, grid, cache, stream,
                            keepBinary));

                w.WriteObjectDetached(rcvHolder);
                w.WriteByte(ClientPlatformId.Dotnet);
            }
            else
            {
                w.WriteObject<object>(null);
            }

            var count = buffer.Count;
            w.WriteInt(count);

            var entries = buffer.Entries;

            for (var i = 0; i < count; i++)
            {
                var entry = entries[i];

                if (entry.IsEmpty)
                {
                    continue;
                }

                w.WriteObjectDetached(entry.Key);

                if (entry.Remove)
                {
                    w.WriteObject<object>(null);
                }
                else
                {
                    w.WriteObjectDetached(entry.Val);
                }
            }
        }

        /// <summary>
        /// Gets or adds per-node buffer for the specified socket.
        /// </summary>
        private DataStreamerClientPerNodeBuffer<TK, TV> GetOrAddPerNodeBuffer(ClientSocket socket)
        {
            DataStreamerClientPerNodeBuffer<TK,TV> res;
            if (_buffers.TryGetValue(socket, out res))
            {
                return res;
            }

            var candidate = new DataStreamerClientPerNodeBuffer<TK, TV>(this, socket);

            res = _buffers.GetOrAdd(socket, candidate);

            if (res != candidate)
            {
                // Another thread won - return array to the pool.
                ReturnPooledArray(candidate.Close().Entries);
            }

            return res;
        }

        /// <summary>
        /// Throws an exception if current streamer instance is closed.
        /// </summary>
        private void ThrowIfClosed()
        {
            var ex = _exception;

            if (ex == null)
            {
                return;
            }

            if (ex is ObjectDisposedException)
            {
                throw new ObjectDisposedException("Streamer is closed.");
            }

            throw new IgniteClientException("Streamer is closed with error, check inner exception for details.", ex);
        }

        /// <summary>
        /// Performs timer-based automatic flush.
        /// </summary>
        [SuppressMessage("Microsoft.Design", "CA1031:DoNotCatchGeneralExceptionTypes",
            Justification = "Streamer will be closed if an exception occurs during automated flush. " +
                            "Timer thread should not throw.")]
        private void AutoFlush()
        {
            if (_exception != null)
            {
                return;
            }

            // Prevent multiple parallel timer calls.
            if (!Monitor.TryEnter(_autoFlushTimer))
            {
                return;
            }

            try
            {
                // Initiate flush, don't wait for completion.
                FlushInternalAsync();
            }
            catch (Exception)
            {
                // Ignore.
            }
            finally
            {
                Monitor.Exit(_autoFlushTimer);
            }
        }

        /// <summary>
        /// Gets a value indicating whether flush should be retried after the specified exception.
        /// </summary>
        private static bool ShouldRetry(Exception exception)
        {
            while (exception.InnerException != null)
            {
                exception = exception.InnerException;
            }

            if (exception is SocketException || exception is IOException)
            {
                return true;
            }

            var clientEx = exception as IgniteClientException;

            if (clientEx != null && clientEx.StatusCode == ClientStatusCode.InvalidNodeState)
            {
                return true;
            }

            return false;
        }

        /// <summary>
        /// Gets the flags.
        /// </summary>
        private static Flags GetFlags(DataStreamerClientOptions options)
        {
            var flags = Flags.Flush | Flags.Close;

            if (options.AllowOverwrite)
            {
                flags |= Flags.AllowOverwrite;
            }

            if (options.SkipStore)
            {
                flags |= Flags.SkipStore;
            }

            if (options.ReceiverKeepBinary)
            {
                flags |= Flags.KeepBinary;
            }

            return flags;
        }
    }
}
