/*
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
 *
 *   http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing,
 * software distributed under the License is distributed on an
 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
 * KIND, either express or implied.  See the License for the
 * specific language governing permissions and limitations
 * under the License.
 *
 */

package org.apache.qpid.systest.core.brokerj;

import java.util.ArrayList;
import java.util.Collections;
import java.util.Enumeration;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.TreeMap;

import javax.jms.BytesMessage;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.MapMessage;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.MessageProducer;
import javax.jms.ObjectMessage;
import javax.jms.Queue;
import javax.jms.Session;

import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;

public class AmqpManagementFacade
{
    private static final String AMQP_0_X_REPLY_TO_DESTINATION = "ADDR:!response";
    private static final String AMQP_0_X_CONSUMER_REPLY_DESTINATION =
            "ADDR:$management ; {assert : never, node: { type: queue }, link:{name: \"!response\"}}";
    private final String _managementAddress;


    public AmqpManagementFacade()
    {
        _managementAddress = "ADDR:$management";
    }

    @SuppressWarnings("unused")
    Map<String, Object> createEntityUsingAmqpManagement(final String name,
                                                        final String type,
                                                        final Session session)
            throws JMSException
    {
        return createEntityUsingAmqpManagement(name, type, Collections.<String, Object>emptyMap(), session);
    }

    public Map<String, Object> createEntityUsingAmqpManagement(final String name,
                                                        final String type,
                                                        Map<String, Object> attributes,
                                                        final Session session)
            throws JMSException
    {
        Destination replyToDestination = session.createQueue(AMQP_0_X_REPLY_TO_DESTINATION);
        Destination replyConsumerDestination = session.createQueue(AMQP_0_X_CONSUMER_REPLY_DESTINATION);

        MessageConsumer consumer = session.createConsumer(replyConsumerDestination);
        MessageProducer producer = session.createProducer(session.createQueue(_managementAddress));

        MapMessage createMessage = session.createMapMessage();
        createMessage.setStringProperty("type", type);
        createMessage.setStringProperty("operation", "CREATE");
        createMessage.setString("name", name);
        createMessage.setString("object-path", name);
        createMessage.setJMSReplyTo(replyToDestination);
        for (Map.Entry<String, Object> entry : attributes.entrySet())
        {
            createMessage.setObject(entry.getKey(), entry.getValue());
        }
        producer.send(createMessage);
        if (session.getTransacted())
        {
            session.commit();
        }
        producer.close();

        Message response = consumer.receive(getManagementResponseTimeout());
        try
        {
            if (response != null)
            {
                int statusCode = response.getIntProperty("statusCode");
                if (statusCode == 201)
                {
                    if (response instanceof MapMessage)
                    {
                        MapMessage bodyMap = (MapMessage) response;
                        Map<String, Object> result = new HashMap<>();
                        Enumeration keys = bodyMap.getMapNames();
                        while (keys.hasMoreElements())
                        {
                            final String key = String.valueOf(keys.nextElement());
                            Object value = bodyMap.getObject(key);
                            result.put(key, value);
                        }
                        return result;
                    }
                    else if (response instanceof ObjectMessage)
                    {
                        Object body = ((ObjectMessage) response).getObject();
                        if (body instanceof Map)
                        {
                            @SuppressWarnings("unchecked")
                            Map<String, Object> bodyMap = (Map<String, Object>) body;
                            return new HashMap<>(bodyMap);
                        }
                    }
                }
                else
                {
                    throw new OperationUnsuccessfulException(response.getStringProperty("statusDescription"),
                                                             statusCode);
                }
            }

            throw new OperationUnsuccessfulException("Cannot get the results from a management create operation", -1);
        }
        finally
        {
            consumer.close();
        }
    }

    public void updateEntityUsingAmqpManagement(final String name,
                                         final String type,
                                         final Map<String, Object> attributes,
                                         final Session session)
            throws JMSException
    {
        Destination replyToDestination = session.createQueue(AMQP_0_X_REPLY_TO_DESTINATION);
        Destination replyConsumerDestination = session.createQueue(AMQP_0_X_CONSUMER_REPLY_DESTINATION);

        MessageConsumer consumer = session.createConsumer(replyConsumerDestination);

        MessageProducer producer = session.createProducer(session.createQueue(_managementAddress));

        MapMessage createMessage = session.createMapMessage();
        createMessage.setStringProperty("type", type);
        createMessage.setStringProperty("operation", "UPDATE");
        createMessage.setStringProperty("index", "object-path");
        createMessage.setStringProperty("key", name);
        createMessage.setJMSReplyTo(replyToDestination);
        for (Map.Entry<String, Object> entry : attributes.entrySet())
        {
            createMessage.setObject(entry.getKey(), entry.getValue());
        }
        producer.send(createMessage);
        if (session.getTransacted())
        {
            session.commit();
        }
        producer.close();

        Message response = consumer.receive(getManagementResponseTimeout());
        try
        {
            if (response != null)
            {
                int statusCode = response.getIntProperty("statusCode");
                if (statusCode != 200)
                {

                    throw new OperationUnsuccessfulException(response.getStringProperty("statusDescription"),
                                                             statusCode);
                }
            }
            else
            {
                throw new OperationUnsuccessfulException("Cannot get the results from a management update operation",
                                                         -1);
            }
        }
        finally
        {
            consumer.close();
        }
    }

    void deleteEntityUsingAmqpManagement(final String name,
                                         final String type,
                                         final Session session)
            throws JMSException
    {
        Destination replyToDestination = session.createQueue(AMQP_0_X_REPLY_TO_DESTINATION);
        Destination replyConsumerDestination = session.createQueue(AMQP_0_X_CONSUMER_REPLY_DESTINATION);

        MessageConsumer consumer = session.createConsumer(replyConsumerDestination);

        MessageProducer producer = session.createProducer(session.createQueue(_managementAddress));

        MapMessage createMessage = session.createMapMessage();
        createMessage.setStringProperty("type", type);
        createMessage.setStringProperty("operation", "DELETE");
        createMessage.setStringProperty("index", "object-path");
        createMessage.setJMSReplyTo(replyToDestination);

        createMessage.setStringProperty("key", name);
        producer.send(createMessage);
        if (session.getTransacted())
        {
            session.commit();
        }

        Message response = consumer.receive(getManagementResponseTimeout());
        try
        {
            if (response != null)
            {
                int statusCode = response.getIntProperty("statusCode");
                if (statusCode != 200 && statusCode != 204)
                {

                    throw new OperationUnsuccessfulException(response.getStringProperty("statusDescription"),
                                                             statusCode);
                }
            }
            else
            {
                throw new OperationUnsuccessfulException("Cannot get the results from a management delete operation",
                                                         -1);
            }
        }
        finally
        {
            consumer.close();
        }
    }

    public Object performOperationUsingAmqpManagement(final String name,
                                               final String type,
                                               final String operation,
                                               final Map<String, Object> arguments,
                                               final Session session)
            throws JMSException
    {
        Destination replyToDestination = session.createQueue(AMQP_0_X_REPLY_TO_DESTINATION);
        Destination replyConsumerDestination = session.createQueue(AMQP_0_X_CONSUMER_REPLY_DESTINATION);

        MessageConsumer consumer = session.createConsumer(replyConsumerDestination);

        MessageProducer producer = session.createProducer(session.createQueue(_managementAddress));

        MapMessage opMessage = session.createMapMessage();
        opMessage.setStringProperty("type", type);
        opMessage.setStringProperty("operation", operation);
        opMessage.setStringProperty("index", "object-path");
        opMessage.setJMSReplyTo(replyToDestination);

        opMessage.setStringProperty("key", name);
        for (Map.Entry<String, Object> argument : arguments.entrySet())
        {
            Object value = argument.getValue();
            if (value.getClass().isPrimitive() || value instanceof String)
            {
                opMessage.setObjectProperty(argument.getKey(), value);
            }
            else
            {
                ObjectMapper objectMapper = new ObjectMapper();
                String jsonifiedValue;
                try
                {
                    jsonifiedValue = objectMapper.writeValueAsString(value);
                }
                catch (JsonProcessingException e)
                {
                    throw new IllegalArgumentException(String.format(
                            "Cannot convert the argument '%s' to JSON to meet JMS type restrictions",
                            argument.getKey()));
                }
                opMessage.setObjectProperty(argument.getKey(), jsonifiedValue);
            }
        }

        producer.send(opMessage);
        if (session.getTransacted())
        {
            session.commit();
        }

        Message response = consumer.receive(getManagementResponseTimeout());
        try
        {
            int statusCode = response.getIntProperty("statusCode");
            if (statusCode < 200 || statusCode > 299)
            {
                throw new OperationUnsuccessfulException(response.getStringProperty("statusDescription"), statusCode);
            }
            if (response instanceof MapMessage)
            {
                MapMessage bodyMap = (MapMessage) response;
                Map<String, Object> result = new TreeMap<>();
                Enumeration mapNames = bodyMap.getMapNames();
                while (mapNames.hasMoreElements())
                {
                    String key = (String) mapNames.nextElement();
                    result.put(key, bodyMap.getObject(key));
                }
                return result;
            }
            else if (response instanceof ObjectMessage)
            {
                return ((ObjectMessage) response).getObject();
            }
            else if (response instanceof BytesMessage)
            {
                BytesMessage bytesMessage = (BytesMessage) response;
                if (bytesMessage.getBodyLength() == 0)
                {
                    return null;
                }
                else
                {
                    byte[] buf = new byte[(int) bytesMessage.getBodyLength()];
                    bytesMessage.readBytes(buf);
                    return buf;
                }
            }
            throw new IllegalArgumentException(
                    "Cannot parse the results from a management operation.  JMS response message : " + response);
        }
        finally
        {
            if (session.getTransacted())
            {
                session.commit();
            }
            consumer.close();
        }
    }

    @SuppressWarnings(value = {"unused", "unchecked"})
    List<Map<String, Object>> managementQueryObjects(final String type, final Session session)
            throws JMSException
    {
        Destination replyToDestination = session.createQueue(AMQP_0_X_REPLY_TO_DESTINATION);
        Destination replyConsumerDestination = session.createQueue(AMQP_0_X_CONSUMER_REPLY_DESTINATION);

        MessageConsumer consumer = session.createConsumer(replyConsumerDestination);
        MapMessage message = session.createMapMessage();
        message.setStringProperty("identity", "self");
        message.setStringProperty("type", "org.amqp.management");
        message.setStringProperty("operation", "QUERY");
        message.setStringProperty("entityType", type);
        message.setString("attributeNames", "[]");
        message.setJMSReplyTo(replyToDestination);

        MessageProducer producer = session.createProducer(session.createQueue(_managementAddress));
        producer.send(message);

        Message response = consumer.receive(getManagementResponseTimeout());
        try
        {
            if (response instanceof MapMessage)
            {
                MapMessage bodyMap = (MapMessage) response;
                List<String> attributeNames = (List<String>) bodyMap.getObject("attributeNames");
                List<List<Object>> attributeValues = (List<List<Object>>) bodyMap.getObject("results");
                return getResultsAsMaps(attributeNames, attributeValues);
            }
            else if (response instanceof ObjectMessage)
            {
                Object body = ((ObjectMessage) response).getObject();
                if (body instanceof Map)
                {
                    Map<String, ?> bodyMap = (Map<String, ?>) body;
                    List<String> attributeNames = (List<String>) bodyMap.get("attributeNames");
                    List<List<Object>> attributeValues = (List<List<Object>>) bodyMap.get("results");
                    return getResultsAsMaps(attributeNames, attributeValues);
                }
            }
            throw new IllegalArgumentException("Cannot parse the results from a management query");
        }
        finally
        {
            consumer.close();
        }
    }

    @SuppressWarnings("unused")
    public Map<String, Object> readEntityUsingAmqpManagement(final String name,
                                                      final String type,
                                                      final boolean actuals,
                                                      final Session session)
            throws JMSException
    {
        Destination replyToDestination = session.createQueue(AMQP_0_X_REPLY_TO_DESTINATION);
        Destination replyConsumerDestination = session.createQueue(AMQP_0_X_CONSUMER_REPLY_DESTINATION);

        MessageConsumer consumer = session.createConsumer(replyConsumerDestination);

        MessageProducer producer = session.createProducer(session.createQueue(_managementAddress));

        MapMessage request = session.createMapMessage();
        request.setStringProperty("type", type);
        request.setStringProperty("operation", "READ");
        request.setString("name", name);
        request.setString("object-path", name);
        request.setStringProperty("index", "object-path");
        request.setStringProperty("key", name);
        request.setBooleanProperty("actuals", actuals);
        request.setJMSReplyTo(replyToDestination);

        producer.send(request);
        if (session.getTransacted())
        {
            session.commit();
        }

        Message response = consumer.receive(getManagementResponseTimeout());
        if (session.getTransacted())
        {
            session.commit();
        }
        try
        {
            if (response instanceof MapMessage)
            {
                MapMessage bodyMap = (MapMessage) response;
                Map<String, Object> data = new HashMap<>();
                @SuppressWarnings("unchecked")
                Enumeration<String> keys = bodyMap.getMapNames();
                while (keys.hasMoreElements())
                {
                    String key = keys.nextElement();
                    data.put(key, bodyMap.getObject(key));
                }
                return data;
            }
            else if (response instanceof ObjectMessage)
            {
                Object body = ((ObjectMessage) response).getObject();
                if (body instanceof Map)
                {
                    @SuppressWarnings("unchecked")
                    Map<String, ?> bodyMap = (Map<String, ?>) body;
                    return new HashMap<>(bodyMap);
                }
            }
            throw new IllegalArgumentException("Management read failed : "
                                               + response.getStringProperty("statusCode")
                                               + " - "
                                               + response.getStringProperty("statusDescription"));
        }
        finally
        {
            consumer.close();
        }
    }

    @SuppressWarnings("unused")
    long getQueueDepth(final Queue destination, final Session session) throws Exception
    {
        final String escapedName = getEscapedName(destination);
        Map<String, Object> arguments =
                Collections.singletonMap("statistics", (Object) Collections.singletonList("queueDepthMessages"));

        Object statistics = performOperationUsingAmqpManagement(escapedName,
                                                                "org.apache.qpid.Queue", "getStatistics",
                                                                arguments, session
                                                               );
        @SuppressWarnings("unchecked")
        Map<String, Object> statisticsMap = (Map<String, Object>) statistics;
        return ((Number) statisticsMap.get("queueDepthMessages")).intValue();
    }

    @SuppressWarnings("unused")
    boolean isQueueExist(final Queue destination, final Session session) throws Exception
    {
        final String escapedName = getEscapedName(destination);
        try
        {
            performOperationUsingAmqpManagement(escapedName,
                                                "org.apache.qpid.Queue",
                                                "READ",
                                                Collections.<String, Object>emptyMap(),
                                                session);
            return true;
        }
        catch (AmqpManagementFacade.OperationUnsuccessfulException e)
        {
            if (e.getStatusCode() == 404)
            {
                return false;
            }
            else
            {
                throw e;
            }
        }
    }

    private String getEscapedName(final Queue destination) throws JMSException
    {
        return destination.getQueueName().replaceAll("([/\\\\])", "\\\\$1");
    }

    private List<Map<String, Object>> getResultsAsMaps(final List<String> attributeNames,
                                                       final List<List<Object>> attributeValues)
    {
        List<Map<String, Object>> results = new ArrayList<>();
        for (List<Object> resultObject : attributeValues)
        {
            Map<String, Object> result = new HashMap<>();
            for (int i = 0; i < attributeNames.size(); ++i)
            {
                result.put(attributeNames.get(i), resultObject.get(i));
            }
            results.add(result);
        }
        return results;
    }

    private int getManagementResponseTimeout()
    {
        return Integer.getInteger("qpid.systests.management_response_timeout", 5000);
    }

    static class OperationUnsuccessfulException extends RuntimeException
    {
        private final int _statusCode;

        private OperationUnsuccessfulException(final String message, final int statusCode)
        {
            super(message == null ? String.format("Unexpected status code %d", statusCode) : message);
            _statusCode = statusCode;
        }

        int getStatusCode()
        {
            return _statusCode;
        }
    }
}
