﻿/**
 * Licensed to the Apache Software Foundation (ASF) under one or more
 * contributor license agreements.  See the NOTICE file distributed with
 * this work for additional information regarding copyright ownership.
 * The ASF licenses this file to You under the Apache License, Version 2.0
 * (the "License"); you may not use this file except in compliance with
 * the License.  You may obtain a copy of the License at
 *
 *    http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

namespace Kafka.Client.IntegrationTests
{
    using System;
    using System.Collections.Concurrent;
    using System.Collections.Generic;
    using System.Reflection;
    using System.Text;
    using System.Threading;
    using Kafka.Client.Consumers;
    using Kafka.Client.Exceptions;
    using Kafka.Client.Messages;
    using Kafka.Client.Producers.Sync;
    using Kafka.Client.Requests;
    using NUnit.Framework;

    [TestFixture]
    public class ConsumerTests : IntegrationFixtureBase
    {
        [Test]
        public void ConsumerConnectorIsCreatedConnectsDisconnectsAndShutsDown()
        {
            var config = this.ZooKeeperBasedConsumerConfig;
            using (new ZookeeperConsumerConnector(config, true))
            {
            }
        }

        [Test]
        public void SimpleSyncProducerSends2MessagesAndConsumerConnectorGetsThemBack()
        {
            var prodConfig = this.SyncProducerConfig1;
            var consumerConfig = this.ZooKeeperBasedConsumerConfig;

            // first producing
            string payload1 = "kafka 1.";
            byte[] payloadData1 = Encoding.UTF8.GetBytes(payload1);
            var msg1 = new Message(payloadData1);

            string payload2 = "kafka 2.";
            byte[] payloadData2 = Encoding.UTF8.GetBytes(payload2);
            var msg2 = new Message(payloadData2);

            var producerRequest = new ProducerRequest(CurrentTestTopic, 0, new List<Message> { msg1, msg2 });
            using (var producer = new SyncProducer(prodConfig))
            {
                producer.Send(producerRequest);
            }

            // now consuming
            var resultMessages = new List<Message>();
            using (IConsumerConnector consumerConnector = new ZookeeperConsumerConnector(consumerConfig, true))
            {
                var topicCount = new Dictionary<string, int> { { CurrentTestTopic, 1 } };
                var messages = consumerConnector.CreateMessageStreams(topicCount);
                var sets = messages[CurrentTestTopic];
                try
                {
                    foreach (var set in sets)
                    {
                        foreach (var message in set)
                        {
                            resultMessages.Add(message);
                        }
                    }
                }
                catch (ConsumerTimeoutException)
                {
                    // do nothing, this is expected
                }
            }

            Assert.AreEqual(2, resultMessages.Count);
            Assert.AreEqual(msg1.ToString(), resultMessages[0].ToString());
            Assert.AreEqual(msg2.ToString(), resultMessages[1].ToString());
        }

        [Test]
        public void OneMessageIsSentAndReceivedThenExceptionsWhenNoMessageThenAnotherMessageIsSentAndReceived()
        {
            var prodConfig = this.SyncProducerConfig1;
            var consumerConfig = this.ZooKeeperBasedConsumerConfig;

            // first producing
            string payload1 = "kafka 1.";
            byte[] payloadData1 = Encoding.UTF8.GetBytes(payload1);
            var msg1 = new Message(payloadData1);
            using (var producer = new SyncProducer(prodConfig))
            {
                var producerRequest = new ProducerRequest(CurrentTestTopic, 0, new List<Message> { msg1 });
                producer.Send(producerRequest);

                // now consuming
                using (IConsumerConnector consumerConnector = new ZookeeperConsumerConnector(consumerConfig, true))
                {
                    var topicCount = new Dictionary<string, int> { { CurrentTestTopic, 1 } };
                    var messages = consumerConnector.CreateMessageStreams(topicCount);
                    var sets = messages[CurrentTestTopic];
                    KafkaMessageStream myStream = sets[0];
                    var enumerator = myStream.GetEnumerator();

                    Assert.IsTrue(enumerator.MoveNext());
                    Assert.AreEqual(msg1.ToString(), enumerator.Current.ToString());

                    Assert.Throws<ConsumerTimeoutException>(() => enumerator.MoveNext());

                    Assert.Throws<IllegalStateException>(() => enumerator.MoveNext()); // iterator is in failed state

                    enumerator.Reset();

                    // producing again
                    string payload2 = "kafka 2.";
                    byte[] payloadData2 = Encoding.UTF8.GetBytes(payload2);
                    var msg2 = new Message(payloadData2);

                    var producerRequest2 = new ProducerRequest(CurrentTestTopic, 0, new List<Message> { msg2 });
                    producer.Send(producerRequest2);
                    Thread.Sleep(3000);

                    Assert.IsTrue(enumerator.MoveNext());
                    Assert.AreEqual(msg2.ToString(), enumerator.Current.ToString());
                }
            }
        }

        [Test]
        public void ConsumerConnectorConsumesTwoDifferentTopics()
        {
            var prodConfig = this.SyncProducerConfig1;
            var consumerConfig = this.ZooKeeperBasedConsumerConfig;

            string topic1 = CurrentTestTopic + "1";
            string topic2 = CurrentTestTopic + "2";

            // first producing
            string payload1 = "kafka 1.";
            byte[] payloadData1 = Encoding.UTF8.GetBytes(payload1);
            var msg1 = new Message(payloadData1);

            string payload2 = "kafka 2.";
            byte[] payloadData2 = Encoding.UTF8.GetBytes(payload2);
            var msg2 = new Message(payloadData2);

            using (var producer = new SyncProducer(prodConfig))
            {
                var producerRequest1 = new ProducerRequest(topic1, 0, new List<Message> { msg1 });
                producer.Send(producerRequest1);
                var producerRequest2 = new ProducerRequest(topic2, 0, new List<Message> { msg2 });
                producer.Send(producerRequest2);
            }

            // now consuming
            var resultMessages1 = new List<Message>();
            var resultMessages2 = new List<Message>();
            using (IConsumerConnector consumerConnector = new ZookeeperConsumerConnector(consumerConfig, true))
            {
                var topicCount = new Dictionary<string, int> { { topic1, 1 }, { topic2, 1 } };
                var messages = consumerConnector.CreateMessageStreams(topicCount);

                Assert.IsTrue(messages.ContainsKey(topic1));
                Assert.IsTrue(messages.ContainsKey(topic2));

                var sets1 = messages[topic1];
                try
                {
                    foreach (var set in sets1)
                    {
                        foreach (var message in set)
                        {
                            resultMessages1.Add(message);
                        }
                    }
                }
                catch (ConsumerTimeoutException)
                {
                    // do nothing, this is expected
                }

                var sets2 = messages[topic2];
                try
                {
                    foreach (var set in sets2)
                    {
                        foreach (var message in set)
                        {
                            resultMessages2.Add(message);
                        }
                    }
                }
                catch (ConsumerTimeoutException)
                {
                    // do nothing, this is expected
                }
            }

            Assert.AreEqual(1, resultMessages1.Count);
            Assert.AreEqual(msg1.ToString(), resultMessages1[0].ToString());

            Assert.AreEqual(1, resultMessages2.Count);
            Assert.AreEqual(msg2.ToString(), resultMessages2[0].ToString());
        }

        [Test]
        public void ConsumerConnectorReceivesAShutdownSignal()
        {
            var consumerConfig = this.ZooKeeperBasedConsumerConfig;

            // now consuming
            using (IConsumerConnector consumerConnector = new ZookeeperConsumerConnector(consumerConfig, true))
            {
                var topicCount = new Dictionary<string, int> { { CurrentTestTopic, 1 } };
                var messages = consumerConnector.CreateMessageStreams(topicCount);

                // putting the shutdown command into the queue
                FieldInfo fi = typeof(ZookeeperConsumerConnector).GetField(
                    "queues", BindingFlags.NonPublic | BindingFlags.Instance);
                var value =
                    (IDictionary<Tuple<string, string>, BlockingCollection<FetchedDataChunk>>)
                    fi.GetValue(consumerConnector);
                foreach (var topicConsumerQueueMap in value)
                {
                    topicConsumerQueueMap.Value.Add(ZookeeperConsumerConnector.ShutdownCommand);
                }

                var sets = messages[CurrentTestTopic];
                var resultMessages = new List<Message>();

                foreach (var set in sets)
                {
                    foreach (var message in set)
                    {
                        resultMessages.Add(message);
                    }
                }

                Assert.AreEqual(0, resultMessages.Count);
            }
        }

        [Test]
        public void ProducersSendMessagesToDifferentPartitionsAndConsumerConnectorGetsThemBack()
        {
            var prodConfig = this.SyncProducerConfig1;
            var consumerConfig = this.ZooKeeperBasedConsumerConfig;

            // first producing
            string payload1 = "kafka 1.";
            byte[] payloadData1 = Encoding.UTF8.GetBytes(payload1);
            var msg1 = new Message(payloadData1);

            string payload2 = "kafka 2.";
            byte[] payloadData2 = Encoding.UTF8.GetBytes(payload2);
            var msg2 = new Message(payloadData2);

            using (var producer = new SyncProducer(prodConfig))
            {
                var producerRequest1 = new ProducerRequest(CurrentTestTopic, 0, new List<Message> { msg1 });
                producer.Send(producerRequest1);
                var producerRequest2 = new ProducerRequest(CurrentTestTopic, 1, new List<Message> { msg2 });
                producer.Send(producerRequest2);
            }

            // now consuming
            var resultMessages = new List<Message>();
            using (IConsumerConnector consumerConnector = new ZookeeperConsumerConnector(consumerConfig, true))
            {
                var topicCount = new Dictionary<string, int> { { CurrentTestTopic, 1 } };
                var messages = consumerConnector.CreateMessageStreams(topicCount);
                var sets = messages[CurrentTestTopic];
                try
                {
                    foreach (var set in sets)
                    {
                        foreach (var message in set)
                        {
                            resultMessages.Add(message);
                        }
                    }
                }
                catch (ConsumerTimeoutException)
                {
                    // do nothing, this is expected
                }
            }

            Assert.AreEqual(2, resultMessages.Count);
            Assert.AreEqual(msg1.ToString(), resultMessages[0].ToString());
            Assert.AreEqual(msg2.ToString(), resultMessages[1].ToString());
        }
    }
}
