/*
 *
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
 *
 *   http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing,
 * software distributed under the License is distributed on an
 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
 * KIND, either express or implied.  See the License for the
 * specific language governing permissions and limitations
 * under the License.
 *
 */
package org.apache.qpid.server.store;

import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.UUID;

import org.apache.qpid.server.configuration.BrokerProperties;
import org.apache.qpid.server.configuration.IllegalConfigurationException;
import org.apache.qpid.server.configuration.store.StoreConfigurationChangeListener;
import org.apache.qpid.server.filter.FilterSupport;
import org.apache.qpid.server.model.AbstractConfiguredObject;
import org.apache.qpid.server.model.Binding;
import org.apache.qpid.server.model.ConfigurationChangeListener;
import org.apache.qpid.server.model.ConfiguredObject;
import org.apache.qpid.server.model.Exchange;
import org.apache.qpid.server.model.Queue;
import org.apache.qpid.server.model.State;
import org.apache.qpid.server.model.UUIDGenerator;
import org.apache.qpid.server.model.VirtualHost;
import org.apache.qpid.server.model.VirtualHostNode;
import org.apache.qpid.server.queue.QueueArgumentsConverter;
import org.apache.qpid.server.store.handler.ConfiguredObjectRecordHandler;
import org.apache.qpid.server.util.Action;

public class VirtualHostStoreUpgraderAndRecoverer
{
    private final VirtualHostNode<?> _virtualHostNode;
    private Map<String, StoreUpgraderPhase> _upgraders = new HashMap<String, StoreUpgraderPhase>();

    @SuppressWarnings("serial")
    private static final Map<String, String> DEFAULT_EXCHANGES = Collections.unmodifiableMap(new HashMap<String, String>()
    {{
        put("amq.direct", "direct");
        put("amq.topic", "topic");
        put("amq.fanout", "fanout");
        put("amq.match", "headers");
    }});

    private final Map<String, UUID> _defaultExchangeIds;

    public VirtualHostStoreUpgraderAndRecoverer(VirtualHostNode<?> virtualHostNode)
    {
        _virtualHostNode = virtualHostNode;
        register(new Upgrader_0_0_to_0_1());
        register(new Upgrader_0_1_to_0_2());
        register(new Upgrader_0_2_to_0_3());
        register(new Upgrader_0_3_to_0_4());
        register(new Upgrader_0_4_to_2_0());
        register(new Upgrader_2_0_to_3_0());
        register(new Upgrader_3_0_to_6_0());
        register(new Upgrader_6_0_to_6_1());

        Map<String, UUID> defaultExchangeIds = new HashMap<String, UUID>();
        for (String exchangeName : DEFAULT_EXCHANGES.keySet())
        {
            UUID id = UUIDGenerator.generateExchangeUUID(exchangeName, virtualHostNode.getName());
            defaultExchangeIds.put(exchangeName, id);
        }
        _defaultExchangeIds = Collections.unmodifiableMap(defaultExchangeIds);
    }

    private void register(StoreUpgraderPhase upgrader)
    {
        _upgraders.put(upgrader.getFromVersion(), upgrader);
    }

    /*
     * Removes filters from queue bindings to exchanges other than topic exchanges.  In older versions of the broker
     * such bindings would have been ignored, starting from the point at which the config version changed, these
     * arguments would actually cause selectors to be enforced, thus changing which messages would reach a queue.
     */
    private class Upgrader_0_0_to_0_1  extends StoreUpgraderPhase
    {
        private final Map<UUID, ConfiguredObjectRecord> _records = new HashMap<UUID, ConfiguredObjectRecord>();

        public Upgrader_0_0_to_0_1()
        {
            super("modelVersion", "0.0", "0.1");
        }

        @Override
        public void configuredObject(final ConfiguredObjectRecord record)
        {
            _records.put(record.getId(), record);
        }

        private void removeSelectorArguments(Map<String, Object> binding)
        {
            @SuppressWarnings("unchecked")
            Map<String, Object> arguments = new LinkedHashMap<String, Object>((Map<String,Object>)binding.get(Binding.ARGUMENTS));

            FilterSupport.removeFilters(arguments);
            binding.put(Binding.ARGUMENTS, arguments);
        }

        private boolean isTopicExchange(ConfiguredObjectRecord entry)
        {
            UUID exchangeId = entry.getParents().get("Exchange");
            if (exchangeId == null)
            {
                return false;
            }

            if(_records.containsKey(exchangeId))
            {
                return "topic".equals(_records.get(exchangeId)
                        .getAttributes()
                        .get(org.apache.qpid.server.model.Exchange.TYPE));
            }
            else
            {
                if (_defaultExchangeIds.get("amq.topic").equals(exchangeId))
                {
                    return true;
                }

                return false;
            }

        }

        private boolean hasSelectorArguments(Map<String, Object> binding)
        {
            @SuppressWarnings("unchecked")
            Map<String, Object> arguments = (Map<String, Object>) binding.get(Binding.ARGUMENTS);
            return (arguments != null) && FilterSupport.argumentsContainFilter(arguments);
        }

        @Override
        public void complete()
        {
            for(Map.Entry<UUID,ConfiguredObjectRecord> entry : _records.entrySet())
            {
                ConfiguredObjectRecord record = entry.getValue();
                String type = record.getType();
                Map<String, Object> attributes = record.getAttributes();
                UUID id = record.getId();
                if ("org.apache.qpid.server.model.VirtualHost".equals(type))
                {
                    record = upgradeRootRecord(record);
                }
                else if(type.equals(Binding.class.getName()) && hasSelectorArguments(attributes) && !isTopicExchange(record))
                {
                    attributes = new LinkedHashMap<String, Object>(attributes);
                    removeSelectorArguments(attributes);

                    record = new ConfiguredObjectRecordImpl(id, type, attributes, record.getParents());
                    getUpdateMap().put(id, record);
                    entry.setValue(record);

                }
                getNextUpgrader().configuredObject(record);
            }

            getNextUpgrader().complete();
        }

    }

    /*
     * Change the type string from org.apache.qpid.server.model.Foo to Foo (in line with the practice in the broker
     * configuration store).  Also remove bindings which reference nonexistent queues or exchanges.
     */
    private class Upgrader_0_1_to_0_2 extends StoreUpgraderPhase
    {
        public Upgrader_0_1_to_0_2()
        {
            super("modelVersion", "0.1", "0.2");
        }

        @Override
        public void configuredObject(final ConfiguredObjectRecord record)
        {
            String type = record.getType().substring(1 + record.getType().lastIndexOf('.'));
            ConfiguredObjectRecord newRecord = new ConfiguredObjectRecordImpl(record.getId(), type, record.getAttributes(), record.getParents());
            getUpdateMap().put(record.getId(), newRecord);

            if ("VirtualHost".equals(type))
            {
                newRecord = upgradeRootRecord(newRecord);
            }
        }

        @Override
        public void complete()
        {
            for (Iterator<Map.Entry<UUID, ConfiguredObjectRecord>> iterator = getUpdateMap().entrySet().iterator(); iterator.hasNext();)
            {
                Map.Entry<UUID, ConfiguredObjectRecord> entry = iterator.next();
                final ConfiguredObjectRecord record = entry.getValue();
                final UUID exchangeParent = record.getParents().get(Exchange.class.getSimpleName());
                final UUID queueParent = record.getParents().get(Queue.class.getSimpleName());
                if(isBinding(record.getType()) && (exchangeParent == null || unknownExchange(exchangeParent)
                                                   || queueParent == null || unknownQueue(queueParent)))
                {
                    getDeleteMap().put(entry.getKey(), entry.getValue());
                    iterator.remove();
                }
                else
                {
                    getNextUpgrader().configuredObject(record);
                }
            }
            getNextUpgrader().complete();
        }

        private boolean unknownExchange(final UUID exchangeId)
        {
            if (_defaultExchangeIds.containsValue(exchangeId))
            {
                return false;
            }
            ConfiguredObjectRecord localRecord = getUpdateMap().get(exchangeId);
            return !(localRecord != null && localRecord.getType().equals(Exchange.class.getSimpleName()));
        }

        private boolean unknownQueue(final UUID queueId)
        {
            ConfiguredObjectRecord localRecord = getUpdateMap().get(queueId);
            return !(localRecord != null  && localRecord.getType().equals(Queue.class.getSimpleName()));
        }

        private boolean isBinding(final String type)
        {
            return Binding.class.getSimpleName().equals(type);
        }
    }


    /*
     * Convert the storage of queue attributes to remove the separate "ARGUMENT" attribute, and flatten the
     * attributes into the map using the model attribute names rather than the wire attribute names
     */
    private class Upgrader_0_2_to_0_3 extends StoreUpgraderPhase
    {
        private static final String ARGUMENTS = "arguments";

        public Upgrader_0_2_to_0_3()
        {
            super("modelVersion", "0.2", "0.3");
        }

        @SuppressWarnings("unchecked")
        @Override
        public void configuredObject(ConfiguredObjectRecord record)
        {
            if("VirtualHost".equals(record.getType()))
            {
                record = upgradeRootRecord(record);
            }
            else if("Queue".equals(record.getType()))
            {
                Map<String, Object> newAttributes = new LinkedHashMap<String, Object>();
                if(record.getAttributes().get(ARGUMENTS) instanceof Map)
                {
                    newAttributes.putAll(QueueArgumentsConverter.convertWireArgsToModel((Map<String, Object>) record.getAttributes()
                            .get(ARGUMENTS)));
                }
                newAttributes.putAll(record.getAttributes());

                record = new ConfiguredObjectRecordImpl(record.getId(), record.getType(), newAttributes, record.getParents());
                getUpdateMap().put(record.getId(), record);
            }

            getNextUpgrader().configuredObject(record);
        }

        @Override
        public void complete()
        {
            getNextUpgrader().complete();
        }

    }

    /*
     * Convert the storage of queue attribute exclusive to change exclusive from a boolean to an enum
     * where exclusive was false it will now be "NONE", and where true it will now be "CONTAINER"
     * ensure OWNER is null unless the exclusivity policy is CONTAINER
     */
    private class Upgrader_0_3_to_0_4 extends StoreUpgraderPhase
    {
        private static final String EXCLUSIVE = "exclusive";

        public Upgrader_0_3_to_0_4()
        {
            super("modelVersion", "0.3", "0.4");
        }


        @Override
        public void configuredObject(ConfiguredObjectRecord record)
        {
            if("VirtualHost".equals(record.getType()))
            {
                record = upgradeRootRecord(record);
            }
            else if(Queue.class.getSimpleName().equals(record.getType()))
            {
                Map<String, Object> newAttributes = new LinkedHashMap<String, Object>(record.getAttributes());
                if(record.getAttributes().get(EXCLUSIVE) instanceof Boolean)
                {
                    boolean isExclusive = (Boolean) record.getAttributes().get(EXCLUSIVE);
                    newAttributes.put(EXCLUSIVE, isExclusive ? "CONTAINER" : "NONE");
                    if(!isExclusive && record.getAttributes().containsKey("owner"))
                    {
                        newAttributes.remove("owner");
                    }
                }
                else
                {
                    newAttributes.remove("owner");
                }
                if(!record.getAttributes().containsKey("durable"))
                {
                    newAttributes.put("durable", "true");
                }

                record = new ConfiguredObjectRecordImpl(record.getId(),record.getType(),newAttributes, record.getParents());
                getUpdateMap().put(record.getId(), record);
            }

            getNextUpgrader().configuredObject(record);
        }

        @Override
        public void complete()
        {
            getNextUpgrader().complete();
        }

    }

    private class Upgrader_0_4_to_2_0 extends StoreUpgraderPhase
    {
        private static final String ALTERNATE_EXCHANGE = "alternateExchange";
        private static final String DLQ_ENABLED_ARGUMENT = "x-qpid-dlq-enabled";
        private static final String  DEFAULT_DLE_NAME_SUFFIX = "_DLE";

        private Map<String, String> _missingAmqpExchanges = new HashMap<String, String>(DEFAULT_EXCHANGES);
        private ConfiguredObjectRecord _virtualHostRecord;

        private Map<UUID, String> _queuesMissingAlternateExchange = new HashMap<>();
        private Map<String, ConfiguredObjectRecord> _exchanges = new HashMap<>();

        public Upgrader_0_4_to_2_0()
        {
            super("modelVersion", "0.4", "2.0");
        }

        @Override
        public void configuredObject(ConfiguredObjectRecord record)
        {
            if("VirtualHost".equals(record.getType()))
            {
                record = upgradeRootRecord(record);
                Map<String, Object> virtualHostAttributes = new HashMap<String, Object>(record.getAttributes());
                virtualHostAttributes.put("name", _virtualHostNode.getName());
                virtualHostAttributes.put("modelVersion", getToVersion());
                record = new ConfiguredObjectRecordImpl(record.getId(), "VirtualHost", virtualHostAttributes, Collections.<String, UUID>emptyMap());
                _virtualHostRecord = record;
            }
            else if("Exchange".equals(record.getType()))
            {
                Map<String, Object> attributes = record.getAttributes();
                String name = (String)attributes.get("name");
                _missingAmqpExchanges.remove(name);
                _exchanges.put(name, record);
            }
            else if("Queue".equals(record.getType()))
            {
                record = updateQueueRecordIfNecessary(record);
            }
            getNextUpgrader().configuredObject(record);
        }

        @Override
        public void complete()
        {
            for (UUID queueId : _queuesMissingAlternateExchange.keySet())
            {
                ConfiguredObjectRecord record = getUpdateMap().get(queueId);
                if (record != null)
                {
                    String dleExchangeName = _queuesMissingAlternateExchange.get(queueId);
                    ConfiguredObjectRecord alternateExchange = _exchanges.get(dleExchangeName);
                    if (alternateExchange != null)
                    {
                        setAlternateExchangeAttribute(record, alternateExchange);
                    }
                }
            }

            for (Entry<String, String> entry : _missingAmqpExchanges.entrySet())
            {
                String name = entry.getKey();
                String type = entry.getValue();
                UUID id = _defaultExchangeIds.get(name);

                Map<String, Object> attributes = new HashMap<String, Object>();
                attributes.put("name", name);
                attributes.put("type", type);
                attributes.put("lifetimePolicy", "PERMANENT");

                ConfiguredObjectRecord record = new ConfiguredObjectRecordImpl(id, Exchange.class.getSimpleName(), attributes, Collections.singletonMap(_virtualHostRecord.getType(), _virtualHostRecord.getId()));
                getUpdateMap().put(id, record);

                getNextUpgrader().configuredObject(record);

            }

            getNextUpgrader().complete();
        }

        private ConfiguredObjectRecord updateQueueRecordIfNecessary(ConfiguredObjectRecord record)
        {
            Map<String, Object> attributes = record.getAttributes();
            boolean queueDLQEnabled = Boolean.parseBoolean(String.valueOf(attributes.get(DLQ_ENABLED_ARGUMENT)));
            if(queueDLQEnabled && attributes.get(ALTERNATE_EXCHANGE) == null)
            {
                Object queueName =  attributes.get("name");
                if (queueName == null || "".equals(queueName))
                {
                    throw new IllegalConfigurationException("Queue name is not found in queue configuration entry attributes: " + attributes);
                }

                String dleSuffix = System.getProperty(BrokerProperties.PROPERTY_DEAD_LETTER_EXCHANGE_SUFFIX, DEFAULT_DLE_NAME_SUFFIX);
                String dleExchangeName = queueName + dleSuffix;

                ConfiguredObjectRecord exchangeRecord = findConfiguredObjectRecordInUpdateMap("Exchange", dleExchangeName);
                if (exchangeRecord == null)
                {
                    // add record to update Map if it is not there
                    if (!getUpdateMap().containsKey(record.getId()))
                    {
                        getUpdateMap().put(record.getId(), record);
                    }
                    _queuesMissingAlternateExchange.put(record.getId(), dleExchangeName);
                }
                else
                {
                    record = setAlternateExchangeAttribute(record, exchangeRecord);
                }
            }
            return record;
        }

        private ConfiguredObjectRecord setAlternateExchangeAttribute(ConfiguredObjectRecord record, ConfiguredObjectRecord alternateExchange)
        {
            Map<String, Object> attributes = new LinkedHashMap<>(record.getAttributes());
            attributes.put(ALTERNATE_EXCHANGE, alternateExchange.getId());
            record = new ConfiguredObjectRecordImpl(record.getId(), record.getType(), attributes, record.getParents());
            getUpdateMap().put(record.getId(), record);
            return record;
        }

        private ConfiguredObjectRecord findConfiguredObjectRecordInUpdateMap(String type, String name)
        {
            for(ConfiguredObjectRecord record: getUpdateMap().values())
            {
                if (type.equals(record.getType()) && name.equals(record.getAttributes().get("name")))
                {
                    return record;
                }
            }
            return null;
        }

    }

    private class Upgrader_2_0_to_3_0 extends StoreUpgraderPhase
    {
        public Upgrader_2_0_to_3_0()
        {
            super("modelVersion", "2.0", "3.0");
        }

        @Override
        public void configuredObject(ConfiguredObjectRecord record)
        {

            if("VirtualHost".equals(record.getType()))
            {
                record = upgradeRootRecord(record);
            }
            getNextUpgrader().configuredObject(record);
        }

        @Override
        public void complete()
        {
            getNextUpgrader().complete();
        }

    }
    private class Upgrader_3_0_to_6_0 extends StoreUpgraderPhase
    {
        public Upgrader_3_0_to_6_0()
        {
            super("modelVersion", "3.0", "6.0");
        }

        @Override
        public void configuredObject(ConfiguredObjectRecord record)
        {

            if("VirtualHost".equals(record.getType()))
            {
                record = upgradeRootRecord(record);
            }
            getNextUpgrader().configuredObject(record);
        }

        @Override
        public void complete()
        {
            getNextUpgrader().complete();
        }

    }

    private class Upgrader_6_0_to_6_1 extends StoreUpgraderPhase
    {
        public Upgrader_6_0_to_6_1()
        {
            super("modelVersion", "6.0", "6.1");
        }

        @Override
        public void configuredObject(ConfiguredObjectRecord record)
        {

            if("VirtualHost".equals(record.getType()))
            {
                record = upgradeRootRecord(record);
            }
            getNextUpgrader().configuredObject(record);
        }

        @Override
        public void complete()
        {
            getNextUpgrader().complete();
        }

    }

    public boolean upgradeAndRecover(final DurableConfigurationStore durableConfigurationStore,
                                     final ConfiguredObjectRecord... initialRecords)
    {
        String virtualHostCategory = VirtualHost.class.getSimpleName();
        final List<ConfiguredObjectRecord> records = new ArrayList<>();
        boolean isNew = durableConfigurationStore.openConfigurationStore(new ConfiguredObjectRecordHandler()
        {
            @Override
            public void handle(final ConfiguredObjectRecord record)
            {
                records.add(record);
            }
        }, initialRecords);

        GenericStoreUpgrader upgraderHandler = new GenericStoreUpgrader(virtualHostCategory, VirtualHost.MODEL_VERSION, durableConfigurationStore, _upgraders);
        upgraderHandler.upgrade(records);

        List<ConfiguredObjectRecord> upgradedRecords = upgraderHandler.getRecords();
        recover(durableConfigurationStore, upgradedRecords, isNew);
        return isNew;
    }

    public void reloadAndRecover(final DurableConfigurationStore durableConfigurationStore)
    {
        final List<ConfiguredObjectRecord> records = new ArrayList<>();
        durableConfigurationStore.reload(new ConfiguredObjectRecordHandler()
        {
            @Override
            public void handle(final ConfiguredObjectRecord record)
            {
                records.add(record);
            }
        });
        recover(durableConfigurationStore, records, false);
    }

    private void recover(final DurableConfigurationStore durableConfigurationStore,
                         final List<ConfiguredObjectRecord> records, final boolean isNew)
    {
        new GenericRecoverer(_virtualHostNode).recover(records, isNew);

        final StoreConfigurationChangeListener
                configChangeListener = new StoreConfigurationChangeListener(durableConfigurationStore);
        if(_virtualHostNode.getVirtualHost() != null)
        {
            applyRecursively(_virtualHostNode.getVirtualHost(), new Action<ConfiguredObject<?>>()
            {
                @Override
                public void performAction(final ConfiguredObject<?> object)
                {
                    object.addChangeListener(configChangeListener);
                }
            });
        }
        _virtualHostNode.addChangeListener(new ConfigurationChangeListener()
        {
            @Override
            public void stateChanged(final ConfiguredObject<?> object, final State oldState, final State newState)
            {

            }

            @Override
            public void childAdded(final ConfiguredObject<?> object, final ConfiguredObject<?> child)
            {
                if(child instanceof VirtualHost)
                {
                    applyRecursively(child, new Action<ConfiguredObject<?>>()
                    {
                        @Override
                        public void performAction(final ConfiguredObject<?> object)
                        {
                            if(object.isDurable())
                            {
                                durableConfigurationStore.update(true, object.asObjectRecord());
                                object.addChangeListener(configChangeListener);
                            }
                        }
                    });

                }
            }

            @Override
            public void childRemoved(final ConfiguredObject<?> object, final ConfiguredObject<?> child)
            {
                if(child instanceof VirtualHost)
                {
                    child.removeChangeListener(configChangeListener);
                }
            }

            @Override
            public void attributeSet(final ConfiguredObject<?> object,
                                     final String attributeName,
                                     final Object oldAttributeValue,
                                     final Object newAttributeValue)
            {

            }

            @Override
            public void bulkChangeStart(final ConfiguredObject<?> object)
            {

            }

            @Override
            public void bulkChangeEnd(final ConfiguredObject<?> object)
            {

            }
        });
        if(isNew)
        {
            if(_virtualHostNode instanceof AbstractConfiguredObject)
            {
                ((AbstractConfiguredObject)_virtualHostNode).forceUpdateAllSecureAttributes();
            }
        }
    }

    private void applyRecursively(final ConfiguredObject<?> object, final Action<ConfiguredObject<?>> action)
    {
        applyRecursively(object, action, new HashSet<ConfiguredObject<?>>());
    }

    private void applyRecursively(final ConfiguredObject<?> object,
                                  final Action<ConfiguredObject<?>> action,
                                  final HashSet<ConfiguredObject<?>> visited)
    {
        if(!visited.contains(object))
        {
            visited.add(object);
            action.performAction(object);
            for(Class<? extends ConfiguredObject> childClass : object.getModel().getChildTypes(object.getCategoryClass()))
            {
                Collection<? extends ConfiguredObject> children = object.getChildren(childClass);
                if(children != null)
                {
                    for(ConfiguredObject<?> child : children)
                    {
                        applyRecursively(child, action, visited);
                    }
                }
            }
        }
    }

}
