/*
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
 *
 *   http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing,
 * software distributed under the License is distributed on an
 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
 * KIND, either express or implied.  See the License for the
 * specific language governing permissions and limitations
 * under the License.
 *
 */
package org.apache.qpid.systests.jms_1_1.acknowledge;

import static org.apache.qpid.systests.Utils.INDEX;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertNotNull;
import static org.junit.jupiter.api.Assertions.assertNull;
import static org.junit.jupiter.api.Assertions.assertTrue;

import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;

import javax.jms.Connection;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.MessageProducer;
import javax.jms.Queue;
import javax.jms.Session;
import javax.jms.TextMessage;

import org.junit.jupiter.api.Test;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import org.apache.qpid.systests.JmsTestBase;
import org.apache.qpid.systests.Utils;

public class RecoverTest extends JmsTestBase
{
    private static final Logger LOGGER = LoggerFactory.getLogger(RecoverTest.class);
    private static final int SENT_COUNT = 4;

    @Test
    public void testRecoverForClientAcknowledge() throws Exception
    {
        Queue queue = createQueue(getTestName());
        Connection connection = getConnection();
        try
        {
            Session session = connection.createSession(false, Session.CLIENT_ACKNOWLEDGE);
            MessageConsumer consumer = session.createConsumer(queue);
            Utils.sendMessages(connection, queue, SENT_COUNT);
            connection.start();

            Message message = receiveAndValidateMessage(consumer, 0);
            message.acknowledge();

            receiveAndValidateMessage(consumer, 1);
            receiveAndValidateMessage(consumer, 2);
            session.recover();

            receiveAndValidateMessage(consumer, 1);
            receiveAndValidateMessage(consumer, 2);
            receiveAndValidateMessage(consumer, 3);
        }
        finally
        {
            connection.close();
        }
    }

    @Test
    public void testMessageOrderForClientAcknowledge() throws Exception
    {
        Queue queue = createQueue(getTestName());
        Connection connection = getConnection();
        try
        {
            Session session = connection.createSession(false, Session.CLIENT_ACKNOWLEDGE);
            MessageConsumer consumer = session.createConsumer(queue);
            Utils.sendMessages(connection, queue, SENT_COUNT);
            connection.start();

            int messageSeen = 0;
            int expectedIndex = 0;
            while (expectedIndex < SENT_COUNT)
            {
                Message message = receiveAndValidateMessage(consumer, expectedIndex);

                //don't ack the message until we receive it 5 times
                if (messageSeen < 5)
                {
                    LOGGER.debug(String.format("Recovering message with index %d", expectedIndex));
                    session.recover();
                    messageSeen++;
                }
                else
                {
                    LOGGER.debug(String.format("Acknowledging message with index %d", expectedIndex));
                    messageSeen = 0;
                    expectedIndex++;
                    message.acknowledge();
                }
            }
        }
        finally
        {
            connection.close();
        }
    }

    @Test
    public void testAcknowledgePerConsumer() throws Exception
    {
        Queue queue1 = createQueue("Q1");
        Queue queue2 = createQueue("Q2");

        Connection consumerConnection = getConnection();
        try
        {
            Session consumerSession = consumerConnection.createSession(false, Session.CLIENT_ACKNOWLEDGE);
            MessageConsumer consumer1 = consumerSession.createConsumer(queue1);
            MessageConsumer consumer2 = consumerSession.createConsumer(queue2);

            Connection producerConnection = getConnection();
            try
            {
                Session producerSession = producerConnection.createSession(false, Session.AUTO_ACKNOWLEDGE);
                MessageProducer producer1 = producerSession.createProducer(queue1);
                MessageProducer producer2 = producerSession.createProducer(queue2);

                producer1.send(producerSession.createTextMessage("msg1"));
                producer2.send(producerSession.createTextMessage("msg2"));
            }
            finally
            {
                producerConnection.close();
            }
            consumerConnection.start();

            TextMessage message2 = (TextMessage) consumer2.receive(getReceiveTimeout());
            assertNotNull(message2);
            assertEquals("msg2", message2.getText());

            message2.acknowledge();
            consumerSession.recover();

            TextMessage message1 = (TextMessage) consumer1.receive(getReceiveTimeout());
            assertNotNull(message1);
            assertEquals("msg1", message1.getText());
        }
        finally
        {
            consumerConnection.close();
        }
    }

    @Test
    public void testRecoverInAutoAckListener() throws Exception
    {
        Queue queue = createQueue(getTestName());
        Connection connection = getConnection();
        try
        {
            Session consumerSession = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
            MessageConsumer consumer = consumerSession.createConsumer(queue);
            Utils.sendMessages(connection, queue, 1);

            final CountDownLatch awaitMessages = new CountDownLatch(2);
            final AtomicReference<Throwable> listenerCaughtException = new AtomicReference<>();
            final AtomicInteger deliveryCounter = new AtomicInteger();
            consumer.setMessageListener(message -> {
                try
                {
                    deliveryCounter.incrementAndGet();
                    assertEquals(deliveryCounter.get() > 1, message.getJMSRedelivered(), "Unexpected JMSRedelivered");
                    if (deliveryCounter.get() == 1)
                    {
                        consumerSession.recover();
                    }
                }
                catch (Throwable e)
                {
                    LOGGER.error("Unexpected failure on message receiving", e);
                    listenerCaughtException.set(e);
                }
                finally
                {
                    awaitMessages.countDown();
                }
            });

            connection.start();

            assertTrue(awaitMessages.await(getReceiveTimeout() * 2, TimeUnit.MILLISECONDS),
                    "Message is not received in timely manner");
            assertEquals(2, deliveryCounter.get(), "Message not received the correct number of times.");
            assertNull(listenerCaughtException.get(),
                    "No exception should be caught by listener : " + listenerCaughtException.get());
        }
        finally
        {
            connection.close();
        }
    }

    private Message receiveAndValidateMessage(final MessageConsumer consumer,
                                              final int messageIndex)
            throws JMSException
    {
        Message message = consumer.receive(getReceiveTimeout());
        assertNotNull(message, String.format("Expected message '%d' is not received", messageIndex));
        assertEquals(messageIndex, message.getIntProperty(INDEX), "Received message out of order");
        return message;
    }
}
