/*
 *
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
 *
 *   http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing,
 * software distributed under the License is distributed on an
 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
 * KIND, either express or implied.  See the License for the
 * specific language governing permissions and limitations
 * under the License.
 *
 */
package org.apache.qpid.systests.jms_1_1.extensions.autocreation;

import static org.hamcrest.CoreMatchers.anyOf;
import static org.hamcrest.CoreMatchers.equalTo;
import static org.hamcrest.CoreMatchers.is;
import static org.hamcrest.CoreMatchers.not;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
import static org.junit.Assume.assumeThat;

import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.stream.Stream;

import javax.jms.Connection;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.MessageProducer;
import javax.jms.Queue;
import javax.jms.Session;
import javax.jms.TextMessage;
import javax.jms.Topic;

import com.fasterxml.jackson.databind.ObjectMapper;
import org.junit.Test;

import org.apache.qpid.server.exchange.ExchangeDefaults;
import org.apache.qpid.server.model.AlternateBinding;
import org.apache.qpid.server.model.Exchange;
import org.apache.qpid.server.model.Protocol;
import org.apache.qpid.server.virtualhost.NodeAutoCreationPolicy;
import org.apache.qpid.server.virtualhost.QueueManagingVirtualHost;
import org.apache.qpid.systests.JmsTestBase;

public class NodeAutoCreationPolicyTest extends JmsTestBase
{
    private static final String DEAD_LETTER_QUEUE_SUFFIX = "_DLQ";
    private static final String DEAD_LETTER_EXCHANGE_SUFFIX = "_DLE";
    private static final String AUTO_CREATION_POLICIES = createAutoCreationPolicies();
    private static final String TEST_MESSAGE = "Hello world!";
    private static final String VALID_QUEUE_NAME = "fooQueue";
    private static final String TYPE_QUEUE = "queue";
    private static final String TYPE_TOPIC = "topic";


    private static String createAutoCreationPolicies()
    {
        ObjectMapper mapper = new ObjectMapper();
        try
        {

            NodeAutoCreationPolicy[] policies = new NodeAutoCreationPolicy[] {
                    new NodeAutoCreationPolicy()
                    {
                        @Override
                        public String getPattern()
                        {
                            return "fooQ.*";
                        }

                        @Override
                        public boolean isCreatedOnPublish()
                        {
                            return true;
                        }

                        @Override
                        public boolean isCreatedOnConsume()
                        {
                            return true;
                        }

                        @Override
                        public String getNodeType()
                        {
                            return "Queue";
                        }

                        @Override
                        public Map<String, Object> getAttributes()
                        {
                            return Collections.emptyMap();
                        }
                    },
                    new NodeAutoCreationPolicy()
                    {
                        @Override
                        public String getPattern()
                        {
                            return "barE.*";
                        }

                        @Override
                        public boolean isCreatedOnPublish()
                        {
                            return true;
                        }

                        @Override
                        public boolean isCreatedOnConsume()
                        {
                            return false;
                        }

                        @Override
                        public String getNodeType()
                        {
                            return "Exchange";
                        }

                        @Override
                        public Map<String, Object> getAttributes()
                        {
                            return Collections.singletonMap(Exchange.TYPE, "fanout");
                        }
                    },

                    new NodeAutoCreationPolicy()
                    {
                        @Override
                        public String getPattern()
                        {
                            return ".*" + DEAD_LETTER_QUEUE_SUFFIX;
                        }

                        @Override
                        public boolean isCreatedOnPublish()
                        {
                            return true;
                        }

                        @Override
                        public boolean isCreatedOnConsume()
                        {
                            return true;
                        }

                        @Override
                        public String getNodeType()
                        {
                            return "Queue";
                        }

                        @Override
                        public Map<String, Object> getAttributes()
                        {
                            return Collections.emptyMap();
                        }
                    },

                    new NodeAutoCreationPolicy()
                    {
                        @Override
                        public String getPattern()
                        {
                            return ".*" + DEAD_LETTER_EXCHANGE_SUFFIX;
                        }

                        @Override
                        public boolean isCreatedOnPublish()
                        {
                            return true;
                        }

                        @Override
                        public boolean isCreatedOnConsume()
                        {
                            return false;
                        }

                        @Override
                        public String getNodeType()
                        {
                            return "Exchange";
                        }

                        @Override
                        public Map<String, Object> getAttributes()
                        {
                            return Collections.singletonMap(Exchange.TYPE, ExchangeDefaults.FANOUT_EXCHANGE_CLASS);
                        }
                    }
            };

            return mapper.writeValueAsString(Arrays.asList(policies));
        }
        catch (IOException e)
        {
            throw new RuntimeException(e);
        }
    }

    private void updateAutoCreationPolicies() throws Exception
    {
        updateEntityUsingAmqpManagement(getVirtualHostName(), "org.apache.qpid.VirtualHost", Collections.singletonMap(QueueManagingVirtualHost.NODE_AUTO_CREATION_POLICIES, AUTO_CREATION_POLICIES));
    }

    @Test
    public void testSendingToQueuePattern() throws Exception
    {
        updateAutoCreationPolicies();

        Connection connection = getConnection();
        try
        {
            connection.start();
            Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
            final Queue queue = session.createQueue(getDestinationAddress(VALID_QUEUE_NAME, TYPE_QUEUE));
            final MessageProducer producer = session.createProducer(queue);
            producer.send(session.createTextMessage(TEST_MESSAGE));

            final MessageConsumer consumer = session.createConsumer(queue);
            Message received = consumer.receive(getReceiveTimeout());
            assertNotNull(received);
            assertTrue(received instanceof TextMessage);
            assertEquals(TEST_MESSAGE, ((TextMessage) received).getText());
        }
        finally
        {
            connection.close();
        }
    }

    @Test
    public void testConcurrentQueueCreation() throws Exception
    {
        updateAutoCreationPolicies();

        final String destination = getDestinationAddress(VALID_QUEUE_NAME, TYPE_QUEUE);
        final int numberOfActors = 3;
        final Connection[] connections = new Connection[numberOfActors];
        try
        {
            final Session[] sessions = new Session[numberOfActors];
            for (int i = 0; i < numberOfActors; i++)
            {
                final Connection connection = getConnection();
                connections[i] = connection;
                sessions[i] = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
            }

            final List<CompletableFuture<MessageProducer>> futures = new ArrayList<>(numberOfActors);
            final ExecutorService executorService = Executors.newFixedThreadPool(numberOfActors);
            try
            {
                Stream.of(sessions)
                      .forEach(session -> futures.add(CompletableFuture.supplyAsync(() -> publishMessage(session,
                                                                                                         destination),
                                                                                    executorService)));
                final CompletableFuture<Void> combinedFuture =
                        CompletableFuture.allOf(futures.toArray(new CompletableFuture[numberOfActors]));
                combinedFuture.get(getReceiveTimeout(), TimeUnit.MILLISECONDS);
            }
            finally
            {
                executorService.shutdown();
            }

            final Connection connection = getConnection();
            try
            {
                connection.start();
                final Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
                final Queue queue = session.createQueue(destination);
                final MessageConsumer consumer = session.createConsumer(queue);

                for (int i = 0; i < numberOfActors; i++)
                {
                    Message received = consumer.receive(getReceiveTimeout());
                    assertNotNull(received);
                    assertTrue(received instanceof TextMessage);
                    assertEquals(TEST_MESSAGE, ((TextMessage) received).getText());
                }
            }
            finally
            {
                connection.close();
            }
        }
        finally
        {
            for (Connection connection : connections)
            {
                if (connection != null)
                {
                    connection.close();
                }
            }
        }
    }

    private MessageProducer publishMessage(final Session session, final String destination)
    {
        try
        {
            final Queue queue = session.createQueue(destination);
            final MessageProducer producer = session.createProducer(queue);
            producer.send(session.createTextMessage(TEST_MESSAGE));
            return producer;
        }
        catch (JMSException e)
        {
            throw new IllegalStateException(e);
        }
    }

    @Test
    public void testSendingToNonMatchingQueuePattern() throws Exception
    {
        updateAutoCreationPolicies();

        Connection connection = getConnection();
        try
        {
            Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
            final Queue queue = session.createQueue(getDestinationAddress("foQueue", TYPE_QUEUE));
            try
            {
                session.createProducer(queue);
                fail("Creating producer should fail");
            }
            catch (JMSException e)
            {
                // pass
            }
        }
        finally
        {
            connection.close();
        }
    }

    @Test
    public void testSendingToExchangePattern() throws Exception
    {
        updateAutoCreationPolicies();

        Connection connection = getConnection();
        try
        {
            connection.start();
            Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
            final Topic topic = session.createTopic(getDestinationAddress("barExchange/foo", TYPE_TOPIC));
            final MessageProducer producer = session.createProducer(topic);
            producer.send(session.createTextMessage(TEST_MESSAGE));

            final MessageConsumer consumer = session.createConsumer(topic);
            Message received = consumer.receive(getReceiveTimeout() / 4);
            assertNull(received);

            producer.send(session.createTextMessage("Hello world2!"));
            received = consumer.receive(getReceiveTimeout());

            assertNotNull(received);

            assertTrue(received instanceof TextMessage);
            assertEquals("Hello world2!", ((TextMessage) received).getText());
        }
        finally
        {
            connection.close();
        }
    }

    @Test
    public void testSendingToNonMatchingTopicPattern() throws Exception
    {
        updateAutoCreationPolicies();

        Connection connection = getConnection();
        try
        {
            Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
            final Topic topic = session.createTopic(getDestinationAddress("baa", TYPE_TOPIC));
            try
            {
                session.createProducer(topic);
                fail("Creating producer should fail");
            }
            catch (JMSException e)
            {
                // pass
            }
        }
        finally
        {
            connection.close();
        }
    }

    @Test
    public void testSendingToQueuePatternBURL() throws Exception
    {
        assumeThat("Qpid JMS Client does not support BURL syntax",
                   getProtocol(),
                   is(not(equalTo(Protocol.AMQP_1_0))));
        updateAutoCreationPolicies();

        Connection connection = getConnection();
        try
        {
            connection.start();
            Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
            final Queue queue = session.createQueue("BURL:direct:///fooQ/fooQ");
            final MessageProducer producer = session.createProducer(queue);
            producer.send(session.createTextMessage(TEST_MESSAGE));

            final MessageConsumer consumer = session.createConsumer(queue);
            Message received = consumer.receive(getReceiveTimeout());
            assertNotNull(received);
            assertTrue(received instanceof TextMessage);
            assertEquals(TEST_MESSAGE, ((TextMessage) received).getText());
        }
        finally
        {
            connection.close();
        }
    }

    @Test
    public void testSendingToNonMatchingQueuePatternBURL() throws Exception
    {
        assumeThat("Using AMQP 0-8..0-9-1 to test BURL syntax",
                   getProtocol(),
                   is(not(anyOf(equalTo(Protocol.AMQP_1_0), equalTo(Protocol.AMQP_0_10)))));
        updateAutoCreationPolicies();

        Connection connection = getConnectionBuilder().setSyncPublish(true).build();
        try
        {
            Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
            final Queue queue = session.createQueue("BURL:direct:///fo/fo");
            try
            {
                final MessageProducer producer = session.createProducer(queue);
                producer.send(session.createTextMessage(TEST_MESSAGE));

                fail("Sending a message should fail");
            }
            catch (JMSException e)
            {
                // pass
            }
        }
        finally
        {
            connection.close();
        }
    }

    @Test
    public void testQueueAlternateBindingCreation() throws Exception
    {
        updateAutoCreationPolicies();

        String queueName = getTestName();
        String deadLetterQueueName = queueName + DEAD_LETTER_QUEUE_SUFFIX;

        final Map<String, Object> attributes = new HashMap<>();
        Map<String, Object> expectedAlternateBinding =
                Collections.singletonMap(AlternateBinding.DESTINATION, deadLetterQueueName);
        attributes.put(org.apache.qpid.server.model.Queue.ALTERNATE_BINDING,
                       new ObjectMapper().writeValueAsString(expectedAlternateBinding));
        createEntityUsingAmqpManagement(queueName, "org.apache.qpid.Queue", attributes);

        Map<String, Object> queueAttributes = readEntityUsingAmqpManagement(queueName, "org.apache.qpid.Queue", true);

        Object actualAlternateBinding = queueAttributes.get(org.apache.qpid.server.model.Queue.ALTERNATE_BINDING);
        Map<String, Object> actualAlternateBindingMap = convertIfNecessary(actualAlternateBinding);
        assertEquals("Unexpected alternate binding",
                     new HashMap<>(expectedAlternateBinding),
                     new HashMap<>(actualAlternateBindingMap));

        Map<String, Object> dlqAttributes =
                readEntityUsingAmqpManagement(deadLetterQueueName, "org.apache.qpid.Queue", true);
        assertNotNull("Cannot get dead letter queue", dlqAttributes);
    }

    @Test
    public void testExchangeAlternateBindingCreation() throws Exception
    {
        updateAutoCreationPolicies();

        String exchangeName = getTestName();
        String deadLetterExchangeName = exchangeName + DEAD_LETTER_EXCHANGE_SUFFIX;

        final Map<String, Object> attributes = new HashMap<>();
        Map<String, Object> expectedAlternateBinding =
                Collections.singletonMap(AlternateBinding.DESTINATION, deadLetterExchangeName);
        attributes.put(Exchange.ALTERNATE_BINDING, new ObjectMapper().writeValueAsString(expectedAlternateBinding));
        attributes.put(Exchange.TYPE, ExchangeDefaults.DIRECT_EXCHANGE_CLASS);
        createEntityUsingAmqpManagement(exchangeName, "org.apache.qpid.DirectExchange", attributes);

        Map<String, Object> exchangeAttributes = readEntityUsingAmqpManagement(exchangeName, "org.apache.qpid.Exchange", true);

        Object actualAlternateBinding = exchangeAttributes.get(Exchange.ALTERNATE_BINDING);
        Map<String, Object> actualAlternateBindingMap = convertIfNecessary(actualAlternateBinding);
        assertEquals("Unexpected alternate binding",
                     new HashMap<>(expectedAlternateBinding),
                     new HashMap<>(actualAlternateBindingMap));

        Map<String, Object> dlqExchangeAttributes = readEntityUsingAmqpManagement(
                deadLetterExchangeName,
                "org.apache.qpid.FanoutExchange",
                true);
        assertNotNull("Cannot get dead letter exchange", dlqExchangeAttributes);
    }

    @SuppressWarnings("unchecked")
    private Map<String, Object> convertIfNecessary(final Object actualAlternateBinding) throws IOException
    {
        Map<String, Object> actualAlternateBindingMap;
        if (actualAlternateBinding instanceof String)
        {
            actualAlternateBindingMap = new ObjectMapper().readValue((String)actualAlternateBinding, Map.class);
        }
        else
        {
            actualAlternateBindingMap = (Map<String, Object>) actualAlternateBinding;
        }
        return actualAlternateBindingMap;
    }

    private String getDestinationAddress(final String name, final String type)
    {
        return getProtocol() == Protocol.AMQP_1_0
                ? name
                : String.format("ADDR: %s; { assert: never, node: { type: %s } }", name, type);
    }
}
