﻿/**
 * Licensed to the Apache Software Foundation (ASF) under one or more
 * contributor license agreements.  See the NOTICE file distributed with
 * this work for additional information regarding copyright ownership.
 * The ASF licenses this file to You under the Apache License, Version 2.0
 * (the "License"); you may not use this file except in compliance with
 * the License.  You may obtain a copy of the License at
 *
 *    http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

namespace Kafka.Client.Messages
{
    using System;
    using System.IO;
    using System.Linq;
    using System.Text;
    using Kafka.Client.Exceptions;
    using Kafka.Client.Serialization;
    using Kafka.Client.Utils;

    /// <summary>
    /// Message send to Kafaka server
    /// </summary>
    /// <remarks>
    /// Format:
    /// 1 byte "magic" identifier to allow format changes
    /// 4 byte CRC32 of the payload
    /// N - 5 byte payload
    /// </remarks>
    public class Message : IWritable
    {
        private const byte DefaultMagicValue = 1;
        private const byte DefaultMagicLength = 1;
        private const byte DefaultCrcLength = 4;
        private const int DefaultHeaderSize = DefaultMagicLength + DefaultCrcLength;
        private const byte CompressionCodeMask = 3;

        public CompressionCodecs CompressionCodec
        {
            get
            {
                switch (Magic)
                {
                    case 0:
                        return CompressionCodecs.NoCompressionCodec;
                    case 1:
                        return Messages.CompressionCodec.GetCompressionCodec(Attributes & CompressionCodeMask);
                    default:
                        throw new KafkaException(KafkaException.InvalidMessageCode);
                }
            }
        }

        /// <summary>
        /// Initializes a new instance of the <see cref="Message"/> class.
        /// </summary>
        /// <param name="payload">
        /// The payload.
        /// </param>
        /// <param name="checksum">
        /// The checksum.
        /// </param>
        /// <remarks>
        /// Initializes with default magic number
        /// </remarks>
        public Message(byte[] payload, byte[] checksum)
            : this(payload, checksum, CompressionCodecs.NoCompressionCodec)
        {
            Guard.NotNull(payload, "payload");
            Guard.NotNull(checksum, "checksum");
            Guard.Count(checksum, 4, "checksum");
        }

        /// <summary>
        /// Initializes a new instance of the <see cref="Message"/> class.
        /// </summary>
        /// <param name="payload">
        /// The payload.
        /// </param>
        /// <remarks>
        /// Initializes the magic number as default and the checksum as null. It will be automatically computed.
        /// </remarks>
        public Message(byte[] payload)
            : this(payload, CompressionCodecs.NoCompressionCodec)
        {
            Guard.NotNull(payload, "payload");
        }

        /// <summary>
        /// Initializes a new instance of the Message class.
        /// </summary>
        /// <remarks>
        /// Initializes the checksum as null.  It will be automatically computed.
        /// </remarks>
        /// <param name="payload">The data for the payload.</param>
        /// <param name="magic">The magic identifier.</param>
        public Message(byte[] payload, CompressionCodecs compressionCodec)
            : this(payload, Crc32Hasher.Compute(payload), compressionCodec)
        {
            Guard.NotNull(payload, "payload");
        }

        /// <summary>
        /// Initializes a new instance of the Message class.
        /// </summary>
        /// <param name="payload">The data for the payload.</param>
        /// <param name="magic">The magic identifier.</param>
        /// <param name="checksum">The checksum for the payload.</param>
        public Message(byte[] payload, byte[] checksum, CompressionCodecs compressionCodec)
        {
            Guard.NotNull(payload, "payload");
            Guard.NotNull(checksum, "checksum");
            Guard.Count(checksum, 4, "checksum");

            int length = DefaultHeaderSize + payload.Length;
            this.Payload = payload;
            this.Magic = DefaultMagicValue;
            
            if (compressionCodec != CompressionCodecs.NoCompressionCodec)
            {
                this.Attributes |=
                    (byte)(CompressionCodeMask & Messages.CompressionCodec.GetCompressionCodecValue(compressionCodec));
            }

            if (Magic == 1)
            {
                length++;
            }

            this.Checksum = checksum;
            this.Size = length;
        }

        /// <summary>
        /// Gets the payload.
        /// </summary>
        public byte[] Payload { get; private set; }

        /// <summary>
        /// Gets the magic bytes.
        /// </summary>
        public byte Magic { get; private set; }

        /// <summary>
        /// Gets the CRC32 checksum for the payload.
        /// </summary>
        public byte[] Checksum { get; private set; }

        /// <summary>
        /// Gets the Attributes for the message.
        /// </summary>
        public byte Attributes { get; private set; }

        /// <summary>
        /// Gets the total size of message.
        /// </summary>
        public int Size { get; private set; }

        /// <summary>
        /// Gets the payload size.
        /// </summary>
        public int PayloadSize
        {
            get
            {
                return this.Payload.Length;
            }
        }

        /// <summary>
        /// Writes message data into given message buffer
        /// </summary>
        /// <param name="output">
        /// The output.
        /// </param>
        public void WriteTo(MemoryStream output)
        {
            Guard.NotNull(output, "output");

            using (var writer = new KafkaBinaryWriter(output))
            {
                this.WriteTo(writer);
            }
        }

        /// <summary>
        /// Writes message data using given writer
        /// </summary>
        /// <param name="writer">
        /// The writer.
        /// </param>
        public void WriteTo(KafkaBinaryWriter writer)
        {
            Guard.NotNull(writer, "writer");
            writer.Write(this.Magic);
            writer.Write(this.Attributes);
            writer.Write(this.Checksum);
            writer.Write(this.Payload);
        }

        /// <summary>
        /// Try to show the payload as decoded to UTF-8.
        /// </summary>
        /// <returns>The decoded payload as string.</returns>
        public override string ToString()
        {
            var sb = new StringBuilder();
            sb.Append("Magic: ");
            sb.Append(this.Magic);
            if (this.Magic == 1)
            {
                sb.Append(", Attributes: ");
                sb.Append(this.Attributes);
            }

            sb.Append(", Checksum: ");
            for (int i = 0; i < 4; i++)
            {
                sb.Append("[");
                sb.Append(this.Checksum[i]);
                sb.Append("]");
            }

            sb.Append(", topic: ");
            try
            {
                sb.Append(Encoding.UTF8.GetString(this.Payload));
            }
            catch (Exception)
            {
                sb.Append("n/a");
            }

            return sb.ToString();
        }

        [Obsolete("Use KafkaBinaryReader instead")]
        public static Message FromMessageBytes(byte[] data)
        {
            byte magic = data[0];
            byte[] checksum;
            byte[] payload;
            byte attributes;
            if (magic == (byte)1)
            {
                attributes = data[1];
                checksum = data.Skip(2).Take(4).ToArray();
                payload = data.Skip(6).ToArray();
                return new Message(payload, checksum, Messages.CompressionCodec.GetCompressionCodec(attributes & CompressionCodeMask));
            }
            else
            {
                checksum = data.Skip(1).Take(4).ToArray();
                payload = data.Skip(5).ToArray();
                return new Message(payload, checksum);
            }
        }

        internal static Message ParseFrom(KafkaBinaryReader reader, int size)
        {
            Message result;
            int readed = 0;
            byte magic = reader.ReadByte();
            readed++;
            byte[] checksum;
            byte[] payload;
            if (magic == 1)
            {
                byte attributes = reader.ReadByte();
                readed++;
                checksum = reader.ReadBytes(4);
                readed += 4;
                payload = reader.ReadBytes(size - (DefaultHeaderSize + 1));
                readed += size - (DefaultHeaderSize + 1);
                result = new Message(payload, checksum, Messages.CompressionCodec.GetCompressionCodec(attributes & CompressionCodeMask));
            }
            else
            {
                checksum = reader.ReadBytes(4);
                readed += 4;
                payload = reader.ReadBytes(size - DefaultHeaderSize);
                readed += size - DefaultHeaderSize;
                result = new Message(payload, checksum);
            }

            if (size != readed)
            {
                throw new KafkaException(KafkaException.InvalidRetchSizeCode);
            }

            return result;
        }

        /// <summary>
        /// Parses a message from a byte array given the format Kafka likes. 
        /// </summary>
        /// <param name="data">The data for a message.</param>
        /// <returns>The message.</returns>
        [Obsolete("Use KafkaBinaryReader instead")]
        public static Message ParseFrom(byte[] data)
        {
            int size = BitConverter.ToInt32(BitWorks.ReverseBytes(data.Take(4).ToArray()), 0);
            byte magic = data[4];
            byte[] checksum;
            byte[] payload;
            byte attributes;
            if (magic == 1)
            {
                attributes = data[5];
                checksum = data.Skip(6).Take(4).ToArray();
                payload = data.Skip(10).Take(size).ToArray();
                return new Message(payload, checksum, Messages.CompressionCodec.GetCompressionCodec(attributes & CompressionCodeMask));
            }
            else
            {
                checksum = data.Skip(5).Take(4).ToArray();
                payload = data.Skip(9).Take(size).ToArray();
                return new Message(payload, checksum);
            }
        }
    }
}
