/*
 *
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
 *
 *   http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing,
 * software distributed under the License is distributed on an
 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
 * KIND, either express or implied.  See the License for the
 * specific language governing permissions and limitations
 * under the License.
 *
 */
package org.apache.qpid.systests.jms_1_1.messagegroup;

import static javax.jms.Session.AUTO_ACKNOWLEDGE;
import static javax.jms.Session.CLIENT_ACKNOWLEDGE;
import static javax.jms.Session.SESSION_TRANSACTED;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertNotNull;
import static org.junit.jupiter.api.Assertions.assertNull;
import static org.junit.jupiter.api.Assertions.assertTrue;

import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;

import javax.jms.Connection;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.MessageListener;
import javax.jms.MessageProducer;
import javax.jms.Session;

import org.junit.jupiter.api.Test;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import org.apache.qpid.server.model.ConfiguredObject;
import org.apache.qpid.server.queue.MessageGroupType;
import org.apache.qpid.systests.JmsTestBase;

public class MessageGroupTest extends JmsTestBase
{
    private static final Logger LOGGER = LoggerFactory.getLogger(MessageGroupTest.class);

    @Test
    public void simpleGroupAssignment() throws Exception
    {
        simpleGroupAssignment(false, false);
    }

    @Test
    public void sharedGroupSimpleGroupAssignment() throws Exception
    {
        simpleGroupAssignment(true, false);
    }

    @Test
    public void simpleGroupAssignmentWithJMSXGroupID() throws Exception
    {
        simpleGroupAssignment(false, true);
    }

    @Test
    public void sharedGroupSimpleGroupAssignmentWithJMSXGroupID() throws Exception
    {
        simpleGroupAssignment(true, true);
    }

    /**
     * Pre populate the queue with messages with groups as follows
     *
     *  ONE
     *  TWO
     *  ONE
     *  TWO
     *
     *  Create two consumers with prefetch of 1, the first consumer should then be assigned group ONE, the second
     *  consumer assigned group TWO if they are started in sequence.
     *
     *  Thus doing
     *
     *  c1 <--- (ONE)
     *  c2 <--- (TWO)
     *  c2 ack --->
     *
     *  c2 should now be able to receive a second message from group TWO (skipping over the message from group ONE)
     *
     *  i.e.
     *
     *  c2 <--- (TWO)
     *  c2 ack --->
     *  c1 <--- (ONE)
     *  c1 ack --->
     *
     */
    private void simpleGroupAssignment(boolean sharedGroups, final boolean useDefaultGroup) throws Exception
    {
        final String groupKey = getGroupKey(useDefaultGroup);
        String queueName = getTestName();
        Destination queue = createQueue(queueName, sharedGroups, useDefaultGroup);

        final Connection producerConnection = getConnection();
        try
        {
            producerConnection.start();
            final Session producerSession = producerConnection.createSession(true, SESSION_TRANSACTED);
            final MessageProducer producer = producerSession.createProducer(queue);

            String[] groups = { "ONE", "TWO"};

            for (int msg = 0; msg < 4; msg++)
            {
                producer.send(createMessage(producerSession, msg, groups[msg % groups.length], useDefaultGroup));
            }
            producerSession.commit();
        }
        finally
        {
            producerConnection.close();
        }

        final Connection consumerConnection = getConnectionBuilder().setPrefetch(0).build();
        try
        {
            Session cs1 = consumerConnection.createSession(false, CLIENT_ACKNOWLEDGE);
            Session cs2 = consumerConnection.createSession(false, CLIENT_ACKNOWLEDGE);

            MessageConsumer consumer1 = cs1.createConsumer(queue);
            MessageConsumer consumer2 = cs2.createConsumer(queue);

            consumerConnection.start();
            Message cs1Received = consumer1.receive(getReceiveTimeout());
            assertNotNull(cs1Received, "Consumer 1 should have received first message");

            Message cs2Received = consumer2.receive(getReceiveTimeout());

            assertNotNull(cs2Received, "Consumer 2 should have received first message");

            cs2Received.acknowledge();

            Message cs2Received2 = consumer2.receive(getReceiveTimeout());

            assertNotNull(cs2Received2, "Consumer 2 should have received second message");
            assertEquals(cs2Received2.getStringProperty(groupKey), cs2Received.getStringProperty(groupKey),
                    "Differing groups");

            cs1Received.acknowledge();
            Message cs1Received2 = consumer1.receive(getReceiveTimeout());

            assertNotNull(cs1Received2, "Consumer 1 should have received second message");
            assertEquals(cs1Received2.getStringProperty(groupKey), cs1Received.getStringProperty(groupKey),
                    "Differing groups");

            cs1Received2.acknowledge();
            cs2Received2.acknowledge();

            assertNull(consumer1.receive(getReceiveTimeout()));
            assertNull(consumer2.receive(getReceiveTimeout()));
        }
        finally
        {
            consumerConnection.close();
        }
    }

    @Test
    public void consumerCloseGroupAssignment() throws Exception
    {
        consumerCloseGroupAssignment(false, false);
    }

    @Test
    public void sharedGroupConsumerCloseGroupAssignment() throws Exception
    {
        consumerCloseGroupAssignment(true, false);
    }

    /**
     *
     * Tests that upon closing a consumer, groups previously assigned to that consumer are reassigned to a different
     * consumer.
     *
     * Pre-populate the queue as ONE, ONE, TWO, ONE
     *
     * create in sequence two consumers
     *
     * receive first from c1 then c2 (thus ONE is assigned to c1, TWO to c2)
     *
     * Then close c1 before acking.
     *
     * If we now attempt to receive from c2, then the remaining messages in group ONE should be available (which
     * requires c2 to go "backwards" in the queue).
     *
     **/
    private void consumerCloseGroupAssignment(boolean sharedGroups, final boolean useDefaultGroup) throws Exception
    {
        final String groupKey = getGroupKey(useDefaultGroup);
        String queueName = getTestName();
        Destination queue = createQueue(queueName, sharedGroups, false);

        final Connection producerConnection = getConnection();
        try
        {
            final Session producerSession = producerConnection.createSession(true, SESSION_TRANSACTED);
            final MessageProducer producer = producerSession.createProducer(queue);

            producer.send(createMessage(producerSession, 1, "ONE", useDefaultGroup));
            producer.send(createMessage(producerSession, 2, "ONE", useDefaultGroup));
            producer.send(createMessage(producerSession, 3, "TWO", useDefaultGroup));
            producer.send(createMessage(producerSession, 4, "ONE", useDefaultGroup));
            producerSession.commit();
        }
        finally
        {
            producerConnection.close();
        }

        final Connection consumerConnection = getConnectionBuilder().setPrefetch(0).build();
        try
        {
            consumerConnection.start();
            Session cs1 = consumerConnection.createSession(true, SESSION_TRANSACTED);
            Session cs2 = consumerConnection.createSession(true, SESSION_TRANSACTED);

            MessageConsumer consumer1 = cs1.createConsumer(queue);
            MessageConsumer consumer2 = cs2.createConsumer(queue);

            Message cs1Received = consumer1.receive(getReceiveTimeout());
            assertNotNull(cs1Received, "Consumer 1 should have received first message");
            assertEquals(1, cs1Received.getIntProperty("msg"), "incorrect message received");

            Message cs2Received = consumer2.receive(getReceiveTimeout());

            assertNotNull(cs2Received, "Consumer 2 should have received first message");
            assertEquals(3, cs2Received.getIntProperty("msg"), "incorrect message received");
            cs2.commit();

            Message cs2Received2 = consumer2.receive(getReceiveTimeout());

            assertNull(cs2Received2, "Consumer 2 should not yet have received a second message");

            consumer1.close();

            cs1.commit();
            Message cs2Received3 = consumer2.receive(getReceiveTimeout());

            assertNotNull(cs2Received3, "Consumer 2 should have received second message");
            assertEquals("ONE", cs2Received3.getStringProperty(groupKey), "Unexpected group");
            assertEquals(2, cs2Received3.getIntProperty("msg"), "incorrect message received");

            cs2.commit();

            Message cs2Received4 = consumer2.receive(getReceiveTimeout());

            assertNotNull(cs2Received4, "Consumer 2 should have received third message");
            assertEquals("ONE", cs2Received4.getStringProperty(groupKey), "Unexpected group");
            assertEquals(4, cs2Received4.getIntProperty("msg"), "incorrect message received");
            cs2.commit();

            assertNull(consumer2.receive(getReceiveTimeout()));
        }
        finally
        {
            consumerConnection.close();
        }
    }



    @Test
    public void consumerCloseWithRelease() throws Exception
    {
        consumerCloseWithRelease(false, false);
    }

    @Test
    public void sharedGroupConsumerCloseWithRelease() throws Exception
    {
        consumerCloseWithRelease(true, false);
    }

    /**
     *
     * Tests that upon closing a consumer and its session, groups previously assigned to that consumer are reassigned
     * to a different consumer, including messages which were previously delivered but have now been released.
     *
     * Pre-populate the queue as ONE, ONE, TWO, ONE
     *
     * create in sequence two consumers
     *
     * receive first from c1 then c2 (thus ONE is assigned to c1, TWO to c2)
     *
     * Then close c1 and its session without acking.
     *
     * If we now attempt to receive from c2, then the all messages in group ONE should be available (which
     * requires c2 to go "backwards" in the queue). The first such message should be marked as redelivered
     *
     */
    private void consumerCloseWithRelease(boolean sharedGroups, final boolean useDefaultGroup) throws Exception
    {
        String queueName = getTestName();
        final String groupKey = getGroupKey(useDefaultGroup);
        Destination queue = createQueue(queueName, sharedGroups, false);

        final Connection producerConnection = getConnection();
        try
        {
            producerConnection.start();
            final Session producerSession = producerConnection.createSession(true, SESSION_TRANSACTED);
            final MessageProducer producer = producerSession.createProducer(queue);

            producer.send(createMessage(producerSession, 1, "ONE", useDefaultGroup));
            producer.send(createMessage(producerSession, 2, "ONE", useDefaultGroup));
            producer.send(createMessage(producerSession, 3, "TWO", useDefaultGroup));
            producer.send(createMessage(producerSession, 4, "ONE", useDefaultGroup));
            producerSession.commit();
        }
        finally
        {
            producerConnection.close();
        }

        final Connection consumerConnection = getConnectionBuilder().setPrefetch(0).build();
        try
        {
            consumerConnection.start();

            Session cs1 = consumerConnection.createSession(true, SESSION_TRANSACTED);
            Session cs2 = consumerConnection.createSession(true, SESSION_TRANSACTED);

            MessageConsumer consumer1 = cs1.createConsumer(queue);
            MessageConsumer consumer2 = cs2.createConsumer(queue);

            Message cs1Received = consumer1.receive(getReceiveTimeout());
            assertNotNull(cs1Received, "Consumer 1 should have received its first message");
            assertEquals(1, cs1Received.getIntProperty("msg"), "incorrect message received");

            Message received = consumer2.receive(getReceiveTimeout());

            assertNotNull(received, "Consumer 2 should have received its first message");
            assertEquals(3, received.getIntProperty("msg"), "incorrect message received");

            Message received2 = consumer2.receive(getReceiveTimeout());

            assertNull(received2, "Consumer 2 should not yet have received second message");

            consumer1.close();
            cs1.close();
            cs2.commit();

            received = consumer2.receive(getReceiveTimeout());

            assertNotNull(received, "Consumer 2 should now have received second message");
            assertEquals("ONE", received.getStringProperty(groupKey), "Unexpected group");
            assertEquals(1, received.getIntProperty("msg"), "incorrect message received");
            assertTrue(received.getJMSRedelivered(),
                    "Expected second message to be marked as redelivered " + received.getIntProperty("msg"));

            cs2.commit();

            received = consumer2.receive(getReceiveTimeout());

            assertNotNull(received, "Consumer 2 should have received a third message");
            assertEquals("ONE", received.getStringProperty(groupKey), "Unexpected group");
            assertEquals(2, received.getIntProperty("msg"), "incorrect message received");

            cs2.commit();

            received = consumer2.receive(getReceiveTimeout());

            assertNotNull(received, "Consumer 2 should have received a fourth message");
            assertEquals("ONE", received.getStringProperty(groupKey), "Unexpected group");
            assertEquals(4, received.getIntProperty("msg"), "incorrect message received");

            cs2.commit();

            assertNull(consumer2.receive(getReceiveTimeout()));
        }
        finally
        {
            consumerConnection.close();
        }
    }

    @Test
    public void groupAssignmentSurvivesEmpty() throws Exception
    {
        groupAssignmentOnEmpty(false, false);
    }

    @Test
    public void sharedGroupAssignmentDoesNotSurviveEmpty() throws Exception
    {
        groupAssignmentOnEmpty(true, false);
    }

    private void groupAssignmentOnEmpty(boolean sharedGroups, final boolean useDefaultGroup) throws Exception
    {
        String queueName = getTestName();
        Destination queue = createQueue(queueName, sharedGroups, false);

        final Connection producerConnection = getConnection();
        try
        {
            producerConnection.start();
            final Session producerSession = producerConnection.createSession(true, SESSION_TRANSACTED);
            final MessageProducer producer = producerSession.createProducer(queue);

            producer.send(createMessage(producerSession, 1, "ONE", useDefaultGroup));
            producer.send(createMessage(producerSession, 2, "TWO", useDefaultGroup));
            producer.send(createMessage(producerSession, 3, "THREE", useDefaultGroup));
            producer.send(createMessage(producerSession, 4, "ONE", useDefaultGroup));
            producerSession.commit();
        }
        finally
        {
            producerConnection.close();
        }

        final Connection consumerConnection = getConnectionBuilder().setPrefetch(0).build();
        try
        {
            consumerConnection.start();

            Session cs1 = consumerConnection.createSession(true, SESSION_TRANSACTED);
            Session cs2 = consumerConnection.createSession(true, SESSION_TRANSACTED);

            MessageConsumer consumer1 = cs1.createConsumer(queue);
            MessageConsumer consumer2 = cs2.createConsumer(queue);

            Message cs1Received = consumer1.receive(getReceiveTimeout());
            assertNotNull(cs1Received, "Consumer 1 should have received its first message");
            assertEquals(1, cs1Received.getIntProperty("msg"), "incorrect message received");

            Message cs2Received = consumer2.receive(getReceiveTimeout());

            assertNotNull(cs2Received, "Consumer 2 should have received its first message");
            assertEquals(2, cs2Received.getIntProperty("msg"), "incorrect message received");

            cs1.commit();

            cs1Received = consumer1.receive(getReceiveTimeout());
            assertNotNull(cs1Received, "Consumer 1 should have received its second message");
            assertEquals(3, cs1Received.getIntProperty("msg"), "incorrect message received");

            // We expect different behaviours from "shared groups": here the assignment of a subscription to a group
            // is terminated when there are no outstanding delivered but unacknowledged messages.  In contrast, with a
            // standard message grouping queue the assignment will be retained until the subscription is no longer
            // registered
            if (sharedGroups)
            {
                cs2.commit();
                cs2Received = consumer2.receive(getReceiveTimeout());

                assertNotNull(cs2Received, "Consumer 2 should have received its second message");
                assertEquals(4, cs2Received.getIntProperty("msg"), "incorrect message received");

                cs2.commit();
            }
            else
            {
                cs2.commit();
                cs2Received = consumer2.receive(getReceiveTimeout());

                assertNull(cs2Received, "Consumer 2 should not have received a second message");

                cs1.commit();

                cs1Received = consumer1.receive(getReceiveTimeout());
                assertNotNull(cs1Received, "Consumer 1 should have received its third message");
                assertEquals(4, cs1Received.getIntProperty("msg"), "incorrect message received");
            }
        }
        finally
        {
            consumerConnection.close();
        }

    }

    /**
     * Tests that when a number of new messages for a given groupid are arriving while the delivery group
     * state is also in the process of being emptied (due to acking a message while using prefetch=1), that only
     * 1 of a number of existing consumers is ever receiving messages for the shared group at a time.
     */
    @Test
    public void singleSharedGroupWithMultipleConsumers() throws Exception
    {
        String queueName = getTestName();
        final boolean useDefaultGroup = false;
        Destination queue = createQueue(queueName, true, useDefaultGroup);

        final Connection consumerConnection = getConnectionBuilder().setPrefetch(1).build();
        try
        {
            int numMessages = 100;
            SharedGroupTestMessageListener groupingTestMessageListener =
                    new SharedGroupTestMessageListener(numMessages);

            Session cs1 = consumerConnection.createSession(false, AUTO_ACKNOWLEDGE);
            Session cs2 = consumerConnection.createSession(false, AUTO_ACKNOWLEDGE);
            Session cs3 = consumerConnection.createSession(false, AUTO_ACKNOWLEDGE);
            Session cs4 = consumerConnection.createSession(false, AUTO_ACKNOWLEDGE);

            MessageConsumer consumer1 = cs1.createConsumer(queue);
            consumer1.setMessageListener(groupingTestMessageListener);
            MessageConsumer consumer2 = cs2.createConsumer(queue);
            consumer2.setMessageListener(groupingTestMessageListener);
            MessageConsumer consumer3 = cs3.createConsumer(queue);
            consumer3.setMessageListener(groupingTestMessageListener);
            MessageConsumer consumer4 = cs4.createConsumer(queue);
            consumer4.setMessageListener(groupingTestMessageListener);
            consumerConnection.start();

            final Connection producerConnection = getConnection();
            try
            {
                final Session producerSession = producerConnection.createSession(true, SESSION_TRANSACTED);
                final MessageProducer producer = producerSession.createProducer(queue);

                for (int i = 1; i <= numMessages; i++)
                {
                    producer.send(createMessage(producerSession, i, "GROUP", useDefaultGroup));
                }

                producerSession.commit();
            }
            finally
            {
                producerConnection.close();
            }

            assertTrue(groupingTestMessageListener.waitForLatch(30),
                    "Mesages not all received in the allowed timeframe");
            assertEquals(0, groupingTestMessageListener.getConcurrentProcessingCases(),
                    "Unexpected concurrent processing of messages for the group");
            assertNull(groupingTestMessageListener.getThrowable(), "Unexpected throwable in message listeners");
        }
        finally
        {
            consumerConnection.close();
        }
    }

    private Destination createQueue(final String queueName, final boolean sharedGroups,
                                    final boolean useDefaultKey) throws Exception
    {
        final Map<String, Object> arguments = new HashMap<>();
        if(!useDefaultKey)
        {
            arguments.put(org.apache.qpid.server.model.Queue.MESSAGE_GROUP_KEY_OVERRIDE, "group");
        }
        arguments.put(ConfiguredObject.DURABLE, "false");
        arguments.put(org.apache.qpid.server.model.Queue.MESSAGE_GROUP_TYPE, sharedGroups ? MessageGroupType.SHARED_GROUPS.name() : MessageGroupType.STANDARD.name());

        createEntityUsingAmqpManagement(queueName, "org.apache.qpid.Queue", arguments);

        return createQueue(queueName);
    }

    private Message createMessage(final Session session, int msg, String group, final boolean useDefaultGroup) throws JMSException
    {
        Message send = session.createTextMessage("Message: " + msg);
        send.setIntProperty("msg", msg);
        send.setStringProperty(getGroupKey(useDefaultGroup), group);

        return send;
    }

    private String getGroupKey(final boolean useDefaultGroup)
    {
        return useDefaultGroup ? "JMSXGroupID" : "group";
    }

    public static class SharedGroupTestMessageListener implements MessageListener
    {

        private final CountDownLatch _count;
        private final AtomicInteger _activeListeners = new AtomicInteger();
        private final AtomicInteger _concurrentProcessingCases = new AtomicInteger();
        private Throwable _throwable;

        SharedGroupTestMessageListener(int numMessages)
        {
            _count = new CountDownLatch(numMessages);
        }

        @Override
        public void onMessage(Message message)
        {
            try
            {
                int currentActiveListeners = _activeListeners.incrementAndGet();

                if (currentActiveListeners > 1)
                {
                    _concurrentProcessingCases.incrementAndGet();

                    LOGGER.error("Concurrent processing when handling message: " + message.getIntProperty("msg"));
                }

                try
                {
                    Thread.sleep(25);
                }
                catch (InterruptedException e)
                {
                    Thread.currentThread().interrupt();
                }

                _activeListeners.decrementAndGet();
            }
            catch (Throwable t)
            {
                LOGGER.error("Unexpected throwable received by listener", t);
                _throwable = t;
            }
            finally
            {
                _count.countDown();
            }
        }

        boolean waitForLatch(int seconds) throws Exception
        {
            return _count.await(seconds, TimeUnit.SECONDS);
        }

        int getConcurrentProcessingCases()
        {
            return _concurrentProcessingCases.get();
        }

        Throwable getThrowable()
        {
            return _throwable;
        }
    }
}
