| /* |
| * |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, |
| * software distributed under the License is distributed on an |
| * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| * KIND, either express or implied. See the License for the |
| * specific language governing permissions and limitations |
| * under the License. |
| * |
| */ |
| package org.apache.qpid.server.store; |
| |
| import java.util.ArrayList; |
| import java.util.Collections; |
| import java.util.HashMap; |
| import java.util.HashSet; |
| import java.util.Iterator; |
| import java.util.LinkedHashMap; |
| import java.util.List; |
| import java.util.Map; |
| import java.util.Map.Entry; |
| import java.util.Set; |
| import java.util.UUID; |
| |
| import org.apache.qpid.server.configuration.IllegalConfigurationException; |
| import org.apache.qpid.server.configuration.store.StoreConfigurationChangeListener; |
| import org.apache.qpid.server.filter.FilterSupport; |
| import org.apache.qpid.server.model.AbstractConfigurationChangeListener; |
| import org.apache.qpid.server.model.AbstractConfiguredObject; |
| import org.apache.qpid.server.model.Binding; |
| import org.apache.qpid.server.model.Broker; |
| import org.apache.qpid.server.model.ConfiguredObject; |
| import org.apache.qpid.server.model.Exchange; |
| import org.apache.qpid.server.model.Queue; |
| import org.apache.qpid.server.model.UUIDGenerator; |
| import org.apache.qpid.server.model.VirtualHost; |
| import org.apache.qpid.server.model.VirtualHostNode; |
| import org.apache.qpid.server.store.handler.ConfiguredObjectRecordHandler; |
| import org.apache.qpid.server.util.FixedKeyMapCreator; |
| |
| public class VirtualHostStoreUpgraderAndRecoverer extends AbstractConfigurationStoreUpgraderAndRecoverer |
| { |
| private final VirtualHostNode<?> _virtualHostNode; |
| |
| @SuppressWarnings("serial") |
| private static final Map<String, String> DEFAULT_EXCHANGES = Collections.unmodifiableMap(new HashMap<String, String>() |
| {{ |
| put("amq.direct", "direct"); |
| put("amq.topic", "topic"); |
| put("amq.fanout", "fanout"); |
| put("amq.match", "headers"); |
| }}); |
| |
| private final Map<String, UUID> _defaultExchangeIds; |
| |
| public VirtualHostStoreUpgraderAndRecoverer(VirtualHostNode<?> virtualHostNode) |
| { |
| super("0.0"); |
| _virtualHostNode = virtualHostNode; |
| register(new Upgrader_0_0_to_0_1()); |
| register(new Upgrader_0_1_to_0_2()); |
| register(new Upgrader_0_2_to_0_3()); |
| register(new Upgrader_0_3_to_0_4()); |
| register(new Upgrader_0_4_to_2_0()); |
| register(new Upgrader_2_0_to_3_0()); |
| register(new Upgrader_3_0_to_6_0()); |
| register(new Upgrader_6_0_to_6_1()); |
| register(new Upgrader_6_1_to_7_0()); |
| register(new Upgrader_7_0_to_7_1()); |
| register(new Upgrader_7_1_to_8_0()); |
| register(new Upgrader_8_0_to_9_0()); |
| |
| Map<String, UUID> defaultExchangeIds = new HashMap<String, UUID>(); |
| for (String exchangeName : DEFAULT_EXCHANGES.keySet()) |
| { |
| UUID id = UUIDGenerator.generateExchangeUUID(exchangeName, virtualHostNode.getName()); |
| defaultExchangeIds.put(exchangeName, id); |
| } |
| _defaultExchangeIds = Collections.unmodifiableMap(defaultExchangeIds); |
| } |
| |
| /* |
| * Removes filters from queue bindings to exchanges other than topic exchanges. In older versions of the broker |
| * such bindings would have been ignored, starting from the point at which the config version changed, these |
| * arguments would actually cause selectors to be enforced, thus changing which messages would reach a queue. |
| */ |
| private class Upgrader_0_0_to_0_1 extends StoreUpgraderPhase |
| { |
| private final Map<UUID, ConfiguredObjectRecord> _records = new HashMap<UUID, ConfiguredObjectRecord>(); |
| |
| public Upgrader_0_0_to_0_1() |
| { |
| super("modelVersion", "0.0", "0.1"); |
| } |
| |
| @Override |
| public void configuredObject(final ConfiguredObjectRecord record) |
| { |
| _records.put(record.getId(), record); |
| } |
| |
| private void removeSelectorArguments(Map<String, Object> binding) |
| { |
| @SuppressWarnings("unchecked") |
| Map<String, Object> arguments = new LinkedHashMap<String, Object>((Map<String,Object>)binding.get("arguments")); |
| |
| FilterSupport.removeFilters(arguments); |
| binding.put("arguments", arguments); |
| } |
| |
| private boolean isTopicExchange(ConfiguredObjectRecord entry) |
| { |
| UUID exchangeId = entry.getParents().get("Exchange"); |
| if (exchangeId == null) |
| { |
| return false; |
| } |
| |
| if(_records.containsKey(exchangeId)) |
| { |
| return "topic".equals(_records.get(exchangeId) |
| .getAttributes() |
| .get(org.apache.qpid.server.model.Exchange.TYPE)); |
| } |
| else |
| { |
| if (_defaultExchangeIds.get("amq.topic").equals(exchangeId)) |
| { |
| return true; |
| } |
| |
| return false; |
| } |
| |
| } |
| |
| private boolean hasSelectorArguments(Map<String, Object> binding) |
| { |
| @SuppressWarnings("unchecked") |
| Map<String, Object> arguments = (Map<String, Object>) binding.get("arguments"); |
| return (arguments != null) && FilterSupport.argumentsContainFilter(arguments); |
| } |
| |
| @Override |
| public void complete() |
| { |
| for(Map.Entry<UUID,ConfiguredObjectRecord> entry : _records.entrySet()) |
| { |
| ConfiguredObjectRecord record = entry.getValue(); |
| String type = record.getType(); |
| Map<String, Object> attributes = record.getAttributes(); |
| UUID id = record.getId(); |
| if ("org.apache.qpid.server.model.VirtualHost".equals(type)) |
| { |
| upgradeRootRecord(record); |
| } |
| else if(type.equals(Binding.class.getName()) && hasSelectorArguments(attributes) && !isTopicExchange(record)) |
| { |
| attributes = new LinkedHashMap<String, Object>(attributes); |
| removeSelectorArguments(attributes); |
| |
| record = new ConfiguredObjectRecordImpl(id, type, attributes, record.getParents()); |
| getUpdateMap().put(id, record); |
| entry.setValue(record); |
| |
| } |
| } |
| } |
| |
| } |
| |
| /* |
| * Change the type string from org.apache.qpid.server.model.Foo to Foo (in line with the practice in the broker |
| * configuration store). Also remove bindings which reference nonexistent queues or exchanges. |
| */ |
| private class Upgrader_0_1_to_0_2 extends StoreUpgraderPhase |
| { |
| public Upgrader_0_1_to_0_2() |
| { |
| super("modelVersion", "0.1", "0.2"); |
| } |
| |
| @Override |
| public void configuredObject(final ConfiguredObjectRecord record) |
| { |
| String type = record.getType().substring(1 + record.getType().lastIndexOf('.')); |
| ConfiguredObjectRecord newRecord = new ConfiguredObjectRecordImpl(record.getId(), type, record.getAttributes(), record.getParents()); |
| getUpdateMap().put(record.getId(), newRecord); |
| |
| if ("VirtualHost".equals(type)) |
| { |
| upgradeRootRecord(newRecord); |
| } |
| } |
| |
| @Override |
| public void complete() |
| { |
| for (Iterator<Map.Entry<UUID, ConfiguredObjectRecord>> iterator = getUpdateMap().entrySet().iterator(); iterator.hasNext();) |
| { |
| Map.Entry<UUID, ConfiguredObjectRecord> entry = iterator.next(); |
| final ConfiguredObjectRecord record = entry.getValue(); |
| final UUID exchangeParent = record.getParents().get(Exchange.class.getSimpleName()); |
| final UUID queueParent = record.getParents().get(Queue.class.getSimpleName()); |
| if(isBinding(record.getType()) && (exchangeParent == null || unknownExchange(exchangeParent) |
| || queueParent == null || unknownQueue(queueParent))) |
| { |
| getDeleteMap().put(entry.getKey(), entry.getValue()); |
| iterator.remove(); |
| } |
| } |
| } |
| |
| private boolean unknownExchange(final UUID exchangeId) |
| { |
| if (_defaultExchangeIds.containsValue(exchangeId)) |
| { |
| return false; |
| } |
| ConfiguredObjectRecord localRecord = getUpdateMap().get(exchangeId); |
| return !(localRecord != null && localRecord.getType().equals(Exchange.class.getSimpleName())); |
| } |
| |
| private boolean unknownQueue(final UUID queueId) |
| { |
| ConfiguredObjectRecord localRecord = getUpdateMap().get(queueId); |
| return !(localRecord != null && localRecord.getType().equals(Queue.class.getSimpleName())); |
| } |
| |
| private boolean isBinding(final String type) |
| { |
| return Binding.class.getSimpleName().equals(type); |
| } |
| } |
| |
| |
| /* |
| * Convert the storage of queue attributes to remove the separate "ARGUMENT" attribute, and flatten the |
| * attributes into the map using the model attribute names rather than the wire attribute names |
| */ |
| private class Upgrader_0_2_to_0_3 extends StoreUpgraderPhase |
| { |
| private static final String ARGUMENTS = "arguments"; |
| |
| public Upgrader_0_2_to_0_3() |
| { |
| super("modelVersion", "0.2", "0.3"); |
| } |
| |
| @SuppressWarnings("unchecked") |
| @Override |
| public void configuredObject(ConfiguredObjectRecord record) |
| { |
| if ("VirtualHost".equals(record.getType())) |
| { |
| upgradeRootRecord(record); |
| } |
| else if ("Queue".equals(record.getType())) |
| { |
| Map<String, Object> newAttributes = new LinkedHashMap<String, Object>(); |
| if (record.getAttributes().get(ARGUMENTS) instanceof Map) |
| { |
| newAttributes.putAll(convertWireArgsToModel((Map<String, Object>) record.getAttributes() |
| .get(ARGUMENTS))); |
| } |
| newAttributes.putAll(record.getAttributes()); |
| |
| record = new ConfiguredObjectRecordImpl(record.getId(), |
| record.getType(), |
| newAttributes, |
| record.getParents()); |
| getUpdateMap().put(record.getId(), record); |
| } |
| } |
| |
| @Override |
| public void complete() |
| { |
| } |
| |
| private final Map<String, String> ATTRIBUTE_MAPPINGS = new LinkedHashMap<>(); |
| |
| { |
| ATTRIBUTE_MAPPINGS.put("x-qpid-minimum-alert-repeat-gap", "alertRepeatGap"); |
| ATTRIBUTE_MAPPINGS.put("x-qpid-maximum-message-age", "alertThresholdMessageAge"); |
| ATTRIBUTE_MAPPINGS.put("x-qpid-maximum-message-size", "alertThresholdMessageSize"); |
| ATTRIBUTE_MAPPINGS.put("x-qpid-maximum-message-count", "alertThresholdQueueDepthMessages"); |
| ATTRIBUTE_MAPPINGS.put("x-qpid-maximum-queue-depth", "alertThresholdQueueDepthBytes"); |
| ATTRIBUTE_MAPPINGS.put("qpid.alert_count", "alertThresholdQueueDepthMessages"); |
| ATTRIBUTE_MAPPINGS.put("qpid.alert_size", "alertThresholdQueueDepthBytes"); |
| ATTRIBUTE_MAPPINGS.put("qpid.alert_repeat_gap", "alertRepeatGap"); |
| ATTRIBUTE_MAPPINGS.put("x-qpid-maximum-delivery-count", "maximumDeliveryAttempts"); |
| ATTRIBUTE_MAPPINGS.put("x-qpid-capacity", "queueFlowControlSizeBytes"); |
| ATTRIBUTE_MAPPINGS.put("x-qpid-flow-resume-capacity", "queueFlowResumeSizeBytes"); |
| ATTRIBUTE_MAPPINGS.put("qpid.queue_sort_key", "sortKey"); |
| ATTRIBUTE_MAPPINGS.put("qpid.last_value_queue_key", "lvqKey"); |
| ATTRIBUTE_MAPPINGS.put("x-qpid-priorities", "priorities"); |
| ATTRIBUTE_MAPPINGS.put("x-qpid-description", "description"); |
| ATTRIBUTE_MAPPINGS.put("x-qpid-dlq-enabled", "x-qpid-dlq-enabled"); |
| ATTRIBUTE_MAPPINGS.put("qpid.group_header_key", "messageGroupKey"); |
| ATTRIBUTE_MAPPINGS.put("qpid.default-message-group", "messageGroupDefaultGroup"); |
| ATTRIBUTE_MAPPINGS.put("no-local", "noLocal"); |
| ATTRIBUTE_MAPPINGS.put("qpid.message_durability", "messageDurability"); |
| } |
| |
| private Map<String, Object> convertWireArgsToModel(Map<String, Object> wireArguments) |
| { |
| Map<String, Object> modelArguments = new HashMap<>(); |
| if (wireArguments != null) |
| { |
| for (Map.Entry<String, String> entry : ATTRIBUTE_MAPPINGS.entrySet()) |
| { |
| if (wireArguments.containsKey(entry.getKey())) |
| { |
| modelArguments.put(entry.getValue(), wireArguments.get(entry.getKey())); |
| } |
| } |
| if (wireArguments.containsKey("qpid.last_value_queue") |
| && !wireArguments.containsKey("qpid.last_value_queue_key")) |
| { |
| modelArguments.put("lvqKey", "qpid.LVQ_key"); |
| } |
| if (wireArguments.containsKey("qpid.shared_msg_group")) |
| { |
| modelArguments.put("messageGroupSharedGroups", |
| "1".equals(String.valueOf(wireArguments.get("qpid.shared_msg_group")))); |
| } |
| if (wireArguments.get("x-qpid-dlq-enabled") != null) |
| { |
| modelArguments.put("x-qpid-dlq-enabled", |
| Boolean.parseBoolean(wireArguments.get("x-qpid-dlq-enabled").toString())); |
| } |
| if (wireArguments.get("no-local") != null) |
| { |
| modelArguments.put("noLocal", Boolean.parseBoolean(wireArguments.get("no-local").toString())); |
| } |
| } |
| return modelArguments; |
| } |
| } |
| |
| /* |
| * Convert the storage of queue attribute exclusive to change exclusive from a boolean to an enum |
| * where exclusive was false it will now be "NONE", and where true it will now be "CONTAINER" |
| * ensure OWNER is null unless the exclusivity policy is CONTAINER |
| */ |
| private class Upgrader_0_3_to_0_4 extends StoreUpgraderPhase |
| { |
| private static final String EXCLUSIVE = "exclusive"; |
| |
| public Upgrader_0_3_to_0_4() |
| { |
| super("modelVersion", "0.3", "0.4"); |
| } |
| |
| |
| @Override |
| public void configuredObject(ConfiguredObjectRecord record) |
| { |
| if("VirtualHost".equals(record.getType())) |
| { |
| upgradeRootRecord(record); |
| } |
| else if(Queue.class.getSimpleName().equals(record.getType())) |
| { |
| Map<String, Object> newAttributes = new LinkedHashMap<String, Object>(record.getAttributes()); |
| if(record.getAttributes().get(EXCLUSIVE) instanceof Boolean) |
| { |
| boolean isExclusive = (Boolean) record.getAttributes().get(EXCLUSIVE); |
| newAttributes.put(EXCLUSIVE, isExclusive ? "CONTAINER" : "NONE"); |
| if(!isExclusive && record.getAttributes().containsKey("owner")) |
| { |
| newAttributes.remove("owner"); |
| } |
| } |
| else |
| { |
| newAttributes.remove("owner"); |
| } |
| if(!record.getAttributes().containsKey("durable")) |
| { |
| newAttributes.put("durable", "true"); |
| } |
| |
| record = new ConfiguredObjectRecordImpl(record.getId(),record.getType(),newAttributes, record.getParents()); |
| getUpdateMap().put(record.getId(), record); |
| } |
| } |
| |
| @Override |
| public void complete() |
| { |
| } |
| |
| } |
| |
| private class Upgrader_0_4_to_2_0 extends StoreUpgraderPhase |
| { |
| private static final String ALTERNATE_EXCHANGE = "alternateExchange"; |
| private static final String DLQ_ENABLED_ARGUMENT = "x-qpid-dlq-enabled"; |
| private static final String DEFAULT_DLE_NAME_SUFFIX = "_DLE"; |
| |
| private Map<String, String> _missingAmqpExchanges = new HashMap<String, String>(DEFAULT_EXCHANGES); |
| private ConfiguredObjectRecord _virtualHostRecord; |
| |
| private Map<UUID, String> _queuesMissingAlternateExchange = new HashMap<>(); |
| private Map<String, ConfiguredObjectRecord> _exchanges = new HashMap<>(); |
| |
| public Upgrader_0_4_to_2_0() |
| { |
| super("modelVersion", "0.4", "2.0"); |
| } |
| |
| @Override |
| public void configuredObject(ConfiguredObjectRecord record) |
| { |
| if("VirtualHost".equals(record.getType())) |
| { |
| record = upgradeRootRecord(record); |
| Map<String, Object> virtualHostAttributes = new HashMap<String, Object>(record.getAttributes()); |
| virtualHostAttributes.put("name", _virtualHostNode.getName()); |
| virtualHostAttributes.put("modelVersion", getToVersion()); |
| record = new ConfiguredObjectRecordImpl(record.getId(), "VirtualHost", virtualHostAttributes, Collections.<String, UUID>emptyMap()); |
| _virtualHostRecord = record; |
| } |
| else if("Exchange".equals(record.getType())) |
| { |
| Map<String, Object> attributes = record.getAttributes(); |
| String name = (String)attributes.get("name"); |
| _missingAmqpExchanges.remove(name); |
| _exchanges.put(name, record); |
| } |
| else if("Queue".equals(record.getType())) |
| { |
| updateQueueRecordIfNecessary(record); |
| } |
| } |
| |
| @Override |
| public void complete() |
| { |
| for (UUID queueId : _queuesMissingAlternateExchange.keySet()) |
| { |
| ConfiguredObjectRecord record = getUpdateMap().get(queueId); |
| if (record != null) |
| { |
| String dleExchangeName = _queuesMissingAlternateExchange.get(queueId); |
| ConfiguredObjectRecord alternateExchange = _exchanges.get(dleExchangeName); |
| if (alternateExchange != null) |
| { |
| setAlternateExchangeAttribute(record, alternateExchange); |
| } |
| } |
| } |
| |
| for (Entry<String, String> entry : _missingAmqpExchanges.entrySet()) |
| { |
| String name = entry.getKey(); |
| String type = entry.getValue(); |
| UUID id = _defaultExchangeIds.get(name); |
| |
| Map<String, Object> attributes = new HashMap<String, Object>(); |
| attributes.put("name", name); |
| attributes.put("type", type); |
| attributes.put("lifetimePolicy", "PERMANENT"); |
| |
| ConfiguredObjectRecord record = new ConfiguredObjectRecordImpl(id, Exchange.class.getSimpleName(), attributes, Collections.singletonMap(_virtualHostRecord.getType(), _virtualHostRecord.getId())); |
| getUpdateMap().put(id, record); |
| |
| } |
| |
| } |
| |
| private ConfiguredObjectRecord updateQueueRecordIfNecessary(ConfiguredObjectRecord record) |
| { |
| Map<String, Object> attributes = record.getAttributes(); |
| boolean queueDLQEnabled = Boolean.parseBoolean(String.valueOf(attributes.get(DLQ_ENABLED_ARGUMENT))); |
| if(queueDLQEnabled && attributes.get(ALTERNATE_EXCHANGE) == null) |
| { |
| Object queueName = attributes.get("name"); |
| if (queueName == null || "".equals(queueName)) |
| { |
| throw new IllegalConfigurationException("Queue name is not found in queue configuration entry attributes: " + attributes); |
| } |
| |
| String dleSuffix = System.getProperty("qpid.broker_dead_letter_exchange_suffix", DEFAULT_DLE_NAME_SUFFIX); |
| String dleExchangeName = queueName + dleSuffix; |
| |
| ConfiguredObjectRecord exchangeRecord = findConfiguredObjectRecordInUpdateMap("Exchange", dleExchangeName); |
| if (exchangeRecord == null) |
| { |
| // add record to update Map if it is not there |
| if (!getUpdateMap().containsKey(record.getId())) |
| { |
| getUpdateMap().put(record.getId(), record); |
| } |
| _queuesMissingAlternateExchange.put(record.getId(), dleExchangeName); |
| } |
| else |
| { |
| record = setAlternateExchangeAttribute(record, exchangeRecord); |
| } |
| } |
| return record; |
| } |
| |
| private ConfiguredObjectRecord setAlternateExchangeAttribute(ConfiguredObjectRecord record, ConfiguredObjectRecord alternateExchange) |
| { |
| Map<String, Object> attributes = new LinkedHashMap<>(record.getAttributes()); |
| attributes.put(ALTERNATE_EXCHANGE, alternateExchange.getId()); |
| record = new ConfiguredObjectRecordImpl(record.getId(), record.getType(), attributes, record.getParents()); |
| getUpdateMap().put(record.getId(), record); |
| return record; |
| } |
| |
| private ConfiguredObjectRecord findConfiguredObjectRecordInUpdateMap(String type, String name) |
| { |
| for(ConfiguredObjectRecord record: getUpdateMap().values()) |
| { |
| if (type.equals(record.getType()) && name.equals(record.getAttributes().get("name"))) |
| { |
| return record; |
| } |
| } |
| return null; |
| } |
| |
| } |
| |
| private class Upgrader_2_0_to_3_0 extends StoreUpgraderPhase |
| { |
| public Upgrader_2_0_to_3_0() |
| { |
| super("modelVersion", "2.0", "3.0"); |
| } |
| |
| @Override |
| public void configuredObject(ConfiguredObjectRecord record) |
| { |
| |
| if("VirtualHost".equals(record.getType())) |
| { |
| upgradeRootRecord(record); |
| } |
| } |
| |
| @Override |
| public void complete() |
| { |
| } |
| |
| } |
| private class Upgrader_3_0_to_6_0 extends StoreUpgraderPhase |
| { |
| public Upgrader_3_0_to_6_0() |
| { |
| super("modelVersion", "3.0", "6.0"); |
| } |
| |
| @Override |
| public void configuredObject(ConfiguredObjectRecord record) |
| { |
| |
| if("VirtualHost".equals(record.getType())) |
| { |
| upgradeRootRecord(record); |
| } |
| } |
| |
| @Override |
| public void complete() |
| { |
| } |
| |
| } |
| |
| private class Upgrader_6_0_to_6_1 extends StoreUpgraderPhase |
| { |
| public Upgrader_6_0_to_6_1() |
| { |
| super("modelVersion", "6.0", "6.1"); |
| } |
| |
| @Override |
| public void configuredObject(ConfiguredObjectRecord record) |
| { |
| |
| if("VirtualHost".equals(record.getType())) |
| { |
| upgradeRootRecord(record); |
| } |
| } |
| |
| @Override |
| public void complete() |
| { |
| } |
| |
| } |
| |
| private static final FixedKeyMapCreator BINDING_MAP_CREATOR = new FixedKeyMapCreator("bindingKey", "destination", "arguments"); |
| private static final FixedKeyMapCreator NO_ARGUMENTS_BINDING_MAP_CREATOR = new FixedKeyMapCreator("bindingKey", "destination"); |
| |
| private class Upgrader_6_1_to_7_0 extends StoreUpgraderPhase |
| { |
| private final Map<UUID, List<BindingRecord>> _exchangeBindings = new HashMap<>(); |
| private final Map<UUID, ConfiguredObjectRecord> _exchanges = new HashMap<>(); |
| private final Map<UUID, String> _queues = new HashMap<>(); |
| private final Map<String, List<Map<String,Object>>> _queueBindings = new HashMap<>(); |
| private Set<UUID> _destinationsWithAlternateExchange = new HashSet<>(); |
| |
| |
| public Upgrader_6_1_to_7_0() |
| { |
| super("modelVersion", "6.1", "7.0"); |
| } |
| |
| @Override |
| public void configuredObject(ConfiguredObjectRecord record) |
| { |
| if("VirtualHost".equals(record.getType())) |
| { |
| record = upgradeRootRecord(record); |
| Map<String, Object> attributes = new HashMap<>(record.getAttributes()); |
| boolean modified = attributes.remove("queue_deadLetterQueueEnabled") != null; |
| Object context = attributes.get("context"); |
| Map<String,Object> contextMap = null; |
| if(context instanceof Map) |
| { |
| contextMap = new HashMap<>((Map<String,Object>) context); |
| modified |= contextMap.remove("queue.deadLetterQueueEnabled") != null; |
| if (modified) |
| { |
| attributes.put("context", contextMap); |
| } |
| } |
| |
| int brokerStatisticsReportingPeriod = ((Broker) _virtualHostNode.getParent()).getStatisticsReportingPeriod(); |
| if (brokerStatisticsReportingPeriod > 0) |
| { |
| attributes.put("statisticsReportingPeriod", brokerStatisticsReportingPeriod); |
| if (contextMap == null) |
| { |
| contextMap = new HashMap<>(); |
| } |
| |
| contextMap.put("qpid.virtualhost.statisticsReportPattern", "${ancestor:virtualhost:name}: messagesIn=${messagesIn}, bytesIn=${bytesIn:byteunit}, messagesOut=${messagesOut}, bytesOut=${bytesOut:byteunit}"); |
| attributes.put("context", contextMap); |
| modified = true; |
| } |
| |
| if (modified) |
| { |
| record = new ConfiguredObjectRecordImpl(record.getId(), record.getType(), attributes, record.getParents()); |
| getUpdateMap().put(record.getId(), record); |
| } |
| |
| } |
| else if("Binding".equals(record.getType())) |
| { |
| BindingRecord binding = new BindingRecord(String.valueOf(record.getAttributes().get("name")), |
| record.getParents().get("Queue").toString(), |
| record.getAttributes().get("arguments")); |
| final UUID exchangeId = record.getParents().get("Exchange"); |
| List<BindingRecord> existingBindings = _exchangeBindings.get(exchangeId); |
| if(existingBindings == null) |
| { |
| existingBindings = new ArrayList<>(); |
| _exchangeBindings.put(exchangeId, existingBindings); |
| } |
| existingBindings.add(binding); |
| getDeleteMap().put(record.getId(), record); |
| } |
| else if("Exchange".equals(record.getType())) |
| { |
| final UUID exchangeId = record.getId(); |
| _exchanges.put(exchangeId, record); |
| if(record.getAttributes().containsKey("bindings")) |
| { |
| List<BindingRecord> existingBindings = _exchangeBindings.get(exchangeId); |
| if(existingBindings == null) |
| { |
| existingBindings = new ArrayList<>(); |
| _exchangeBindings.put(exchangeId, existingBindings); |
| } |
| |
| List<Map<String,Object>> bindingList = |
| (List<Map<String, Object>>) record.getAttributes().get("bindings"); |
| for(Map<String,Object> existingBinding : bindingList) |
| { |
| existingBindings.add(new BindingRecord((String)existingBinding.get("name"), |
| String.valueOf(existingBinding.get("queue")), |
| existingBinding.get("arguments"))); |
| } |
| } |
| |
| if (record.getAttributes().containsKey("alternateExchange")) |
| { |
| _destinationsWithAlternateExchange.add(record.getId()); |
| |
| getUpdateMap().put(record.getId(), record); |
| } |
| } |
| else if("Queue".equals(record.getType())) |
| { |
| Map<String, Object> attributes = new HashMap<>(record.getAttributes()); |
| Object queueFlowControlSizeBytes = attributes.remove("queueFlowControlSizeBytes"); |
| Object queueFlowResumeSizeBytes = attributes.remove("queueFlowResumeSizeBytes"); |
| if (queueFlowControlSizeBytes != null) |
| { |
| long queueFlowControlSizeBytesValue = convertAttributeValueToLong("queueFlowControlSizeBytes", |
| queueFlowControlSizeBytes); |
| if (queueFlowControlSizeBytesValue > 0) |
| { |
| if (queueFlowResumeSizeBytes != null) |
| { |
| long queueFlowResumeSizeBytesValue = |
| convertAttributeValueToLong("queueFlowResumeSizeBytes", queueFlowResumeSizeBytes); |
| double ratio = ((double) queueFlowResumeSizeBytesValue) |
| / ((double) queueFlowControlSizeBytesValue); |
| String flowResumeLimit = String.format("%.2f", ratio * 100.0); |
| |
| Object context = attributes.get("context"); |
| Map<String, String> contextMap; |
| if (context instanceof Map) |
| { |
| contextMap = (Map) context; |
| } |
| else |
| { |
| contextMap = new HashMap<>(); |
| attributes.put("context", contextMap); |
| } |
| contextMap.put("queue.queueFlowResumeLimit", flowResumeLimit); |
| } |
| attributes.put("overflowPolicy", "PRODUCER_FLOW_CONTROL"); |
| attributes.put("maximumQueueDepthBytes", queueFlowControlSizeBytes); |
| } |
| } |
| |
| boolean addToUpdateMap = false; |
| if (attributes.containsKey("alternateExchange")) |
| { |
| _destinationsWithAlternateExchange.add(record.getId()); |
| addToUpdateMap = true; |
| |
| } |
| |
| if(attributes.containsKey("bindings")) |
| { |
| _queueBindings.put(String.valueOf(attributes.get("name")), |
| (List<Map<String, Object>>) attributes.get("bindings")); |
| attributes.remove("bindings"); |
| } |
| |
| if(attributes.containsKey("messageGroupKey")) |
| { |
| if(attributes.containsKey("messageGroupSharedGroups") |
| && convertAttributeValueToBoolean("messageGroupSharedGroups", |
| attributes.remove("messageGroupSharedGroups"))) |
| { |
| attributes.put("messageGroupType", "SHARED_GROUPS"); |
| |
| } |
| else |
| { |
| attributes.put("messageGroupType", "STANDARD"); |
| } |
| Object oldMessageGroupKey = attributes.remove("messageGroupKey"); |
| if (!"JMSXGroupId".equals(oldMessageGroupKey)) |
| { |
| attributes.put("messageGroupKeyOverride", oldMessageGroupKey); |
| } |
| } |
| else |
| { |
| attributes.put("messageGroupType", "NONE"); |
| } |
| |
| _queues.put(record.getId(), (String) attributes.get("name")); |
| |
| if (!attributes.equals(new HashMap<>(record.getAttributes())) || addToUpdateMap) |
| { |
| getUpdateMap().put(record.getId(), |
| new ConfiguredObjectRecordImpl(record.getId(), |
| record.getType(), |
| attributes, |
| record.getParents())); |
| } |
| } |
| else if (record.getType().equals("VirtualHostLogger")) |
| { |
| Map<String,Object> attributes = new HashMap<>(); |
| attributes.put("name", "statistics-" + record.getAttributes().get("name")); |
| attributes.put("level", "INFO"); |
| attributes.put("loggerName", "qpid.statistics.*"); |
| attributes.put("type", "NameAndLevel"); |
| |
| |
| final ConfiguredObjectRecord filterRecord = new ConfiguredObjectRecordImpl(UUID.randomUUID(), |
| "VirtualHostLogInclusionRule", |
| attributes, |
| Collections.singletonMap("VirtualHostLogger", |
| record.getId())); |
| getUpdateMap().put(filterRecord.getId(), filterRecord); |
| } |
| } |
| |
| private long convertAttributeValueToLong(final String attributeName, |
| final Object attributeValue) |
| { |
| long value; |
| if (attributeValue instanceof Number) |
| { |
| value = ((Number) attributeValue).longValue(); |
| } |
| else if (attributeValue instanceof String) |
| { |
| try |
| { |
| value = Long.parseLong((String) attributeValue); |
| } |
| catch (Exception e) |
| { |
| throw new IllegalConfigurationException(String.format( |
| "Cannot evaluate '%s': %s", |
| attributeName, attributeValue)); |
| } |
| } |
| else |
| { |
| throw new IllegalConfigurationException(String.format("Cannot evaluate '%s': %s", |
| attributeName, |
| String.valueOf(attributeValue))); |
| } |
| return value; |
| } |
| |
| private boolean convertAttributeValueToBoolean(final String attributeName, |
| final Object attributeValue) |
| { |
| boolean value; |
| if (attributeValue instanceof Boolean) |
| { |
| value = (Boolean) attributeValue; |
| } |
| else if (attributeValue instanceof String) |
| { |
| String strValue = (String)attributeValue; |
| if(strValue.equalsIgnoreCase("true")) |
| { |
| value = true; |
| } |
| else if(strValue.equalsIgnoreCase("false")) |
| { |
| value = false; |
| } |
| else |
| { |
| throw new IllegalConfigurationException(String.format( |
| "Cannot evaluate '%s': %s", |
| attributeName, attributeValue)); |
| } |
| |
| } |
| else |
| { |
| throw new IllegalConfigurationException(String.format("Cannot evaluate '%s': %s", |
| attributeName, |
| String.valueOf(attributeValue))); |
| } |
| return value; |
| } |
| |
| |
| @Override |
| public void complete() |
| { |
| for(Map.Entry<String, List<Map<String,Object>>> entry : _queueBindings.entrySet()) |
| { |
| for(Map<String, Object> existingBinding : entry.getValue()) |
| { |
| UUID exchangeId; |
| if(existingBinding.get("exchange") instanceof UUID) |
| { |
| exchangeId = (UUID) existingBinding.get("exchange"); |
| } |
| else |
| { |
| exchangeId = getExchangeIdFromNameOrId( existingBinding.get("exchange").toString()); |
| } |
| List<BindingRecord> existingBindings = _exchangeBindings.get(exchangeId); |
| if(existingBindings == null) |
| { |
| existingBindings = new ArrayList<>(); |
| _exchangeBindings.put(exchangeId, existingBindings); |
| } |
| existingBindings.add(new BindingRecord((String)existingBinding.get("name"), |
| entry.getKey(), |
| existingBinding.get("arguments"))); |
| } |
| } |
| |
| for(Map.Entry<UUID, List<BindingRecord>> entry : _exchangeBindings.entrySet()) |
| { |
| ConfiguredObjectRecord exchangeRecord = _exchanges.get(entry.getKey()); |
| if(exchangeRecord != null) |
| { |
| final List<BindingRecord> bindingRecords = entry.getValue(); |
| List<Map<String,Object>> actualBindings = new ArrayList<>(bindingRecords.size()); |
| for(BindingRecord bindingRecord : bindingRecords) |
| { |
| if(bindingRecord._arguments == null) |
| { |
| actualBindings.add(NO_ARGUMENTS_BINDING_MAP_CREATOR.createMap(bindingRecord._name, |
| getQueueFromIdOrName(bindingRecord))); |
| } |
| else |
| { |
| actualBindings.add(BINDING_MAP_CREATOR.createMap(bindingRecord._name, |
| getQueueFromIdOrName(bindingRecord), bindingRecord._arguments)); |
| } |
| } |
| Map<String, Object> updatedAttributes = new HashMap<>(exchangeRecord.getAttributes()); |
| updatedAttributes.remove("bindings"); |
| updatedAttributes.put("durableBindings", actualBindings); |
| exchangeRecord = new ConfiguredObjectRecordImpl(exchangeRecord.getId(), exchangeRecord.getType(), updatedAttributes, exchangeRecord.getParents()); |
| getUpdateMap().put(exchangeRecord.getId(), exchangeRecord); |
| } |
| |
| } |
| |
| for (UUID recordId : _destinationsWithAlternateExchange) |
| { |
| ConfiguredObjectRecord record = getUpdateMap().get(recordId); |
| Map<String, Object> attributes = new HashMap<>(record.getAttributes()); |
| |
| String exchangeNameOrUuid = String.valueOf(attributes.remove("alternateExchange")); |
| |
| ConfiguredObjectRecord exchangeRecord = getExchangeFromNameOrUUID(exchangeNameOrUuid); |
| if (exchangeRecord != null) |
| { |
| attributes.put("alternateBinding", |
| Collections.singletonMap("destination", exchangeRecord.getAttributes().get("name"))); |
| } |
| else |
| { |
| throw new IllegalConfigurationException(String.format( |
| "Cannot upgrade record UUID '%s' as cannot find exchange with name or UUID '%s'", |
| recordId, |
| exchangeNameOrUuid)); |
| } |
| |
| getUpdateMap().put(record.getId(), |
| new ConfiguredObjectRecordImpl(record.getId(), |
| record.getType(), |
| attributes, |
| record.getParents())); |
| } |
| } |
| |
| private ConfiguredObjectRecord getExchangeFromNameOrUUID(final String exchangeNameOrUuid) |
| { |
| for(ConfiguredObjectRecord record : _exchanges.values()) |
| { |
| if(exchangeNameOrUuid.equals(record.getAttributes().get("name"))) |
| { |
| return record; |
| } |
| else |
| { |
| try |
| { |
| UUID uuid = UUID.fromString(exchangeNameOrUuid); |
| if (uuid.equals(record.getId())) |
| { |
| return record; |
| } |
| } |
| catch (IllegalArgumentException e) |
| { |
| // ignore - not a UUID |
| } |
| } |
| } |
| |
| return null; |
| } |
| |
| private UUID getExchangeIdFromNameOrId(final String exchange) |
| { |
| for(ConfiguredObjectRecord record : _exchanges.values()) |
| { |
| if(exchange.equals(record.getAttributes().get("name"))) |
| { |
| return record.getId(); |
| } |
| } |
| return UUID.fromString(exchange); |
| } |
| |
| private String getQueueFromIdOrName(final BindingRecord bindingRecord) |
| { |
| final String queueIdOrName = bindingRecord._queueIdOrName; |
| try |
| { |
| UUID queueId = UUID.fromString(queueIdOrName); |
| String name = _queues.get(queueId); |
| return name == null ? queueIdOrName : name; |
| } |
| catch(IllegalArgumentException e) |
| { |
| return queueIdOrName; |
| } |
| } |
| |
| private class BindingRecord |
| { |
| private final String _name; |
| private final String _queueIdOrName; |
| private final Object _arguments; |
| |
| public BindingRecord(final String name, final String queueIdOrName, final Object arguments) |
| { |
| _name = name; |
| _queueIdOrName = queueIdOrName; |
| _arguments = arguments; |
| } |
| } |
| } |
| |
| |
| private class Upgrader_7_0_to_7_1 extends StoreUpgraderPhase |
| { |
| |
| public Upgrader_7_0_to_7_1() |
| { |
| super("modelVersion", "7.0", "7.1"); |
| } |
| |
| @Override |
| public void configuredObject(final ConfiguredObjectRecord record) |
| { |
| if("VirtualHost".equals(record.getType())) |
| { |
| upgradeRootRecord(record); |
| } |
| } |
| |
| @Override |
| public void complete() |
| { |
| |
| } |
| } |
| |
| private class Upgrader_7_1_to_8_0 extends StoreUpgraderPhase |
| { |
| |
| public Upgrader_7_1_to_8_0() |
| { |
| super("modelVersion", "7.1", "8.0"); |
| } |
| |
| @Override |
| public void configuredObject(final ConfiguredObjectRecord record) |
| { |
| if("VirtualHost".equals(record.getType())) |
| { |
| upgradeRootRecord(record); |
| } |
| } |
| |
| @Override |
| public void complete() |
| { |
| |
| } |
| } |
| |
| private static class Upgrader_8_0_to_9_0 extends StoreUpgraderPhase |
| { |
| Upgrader_8_0_to_9_0() |
| { |
| super("modelVersion", "8.0", "9.0"); |
| } |
| |
| @Override |
| public void configuredObject(ConfiguredObjectRecord record) |
| { |
| if ("VirtualHost".equals(record.getType())) |
| { |
| record = upgradeRootRecord(record); |
| } |
| renameContextVariables(record, |
| "context", |
| UpgraderHelper.MODEL9_MAPPING_FOR_RENAME_TO_ALLOW_DENY_CONTEXT_VARIABLES); |
| } |
| |
| @Override |
| public void complete() |
| { |
| |
| } |
| } |
| |
| public boolean upgradeAndRecover(final DurableConfigurationStore durableConfigurationStore, |
| final ConfiguredObjectRecord... initialRecords) |
| { |
| final List<ConfiguredObjectRecord> records = new ArrayList<>(); |
| boolean isNew = durableConfigurationStore.openConfigurationStore(new ConfiguredObjectRecordHandler() |
| { |
| @Override |
| public void handle(final ConfiguredObjectRecord record) |
| { |
| records.add(record); |
| } |
| }, initialRecords); |
| |
| List<ConfiguredObjectRecord> upgradedRecords = upgrade(durableConfigurationStore, |
| records, |
| VirtualHost.class.getSimpleName(), |
| VirtualHost.MODEL_VERSION); |
| recover(_virtualHostNode, durableConfigurationStore, upgradedRecords, isNew); |
| return isNew; |
| } |
| |
| public void reloadAndRecover(final DurableConfigurationStore durableConfigurationStore) |
| { |
| reloadAndRecoverInternal(_virtualHostNode, durableConfigurationStore); |
| } |
| |
| public void reloadAndRecoverVirtualHost(final DurableConfigurationStore durableConfigurationStore) |
| { |
| reloadAndRecoverInternal(_virtualHostNode.getVirtualHost(), durableConfigurationStore); |
| } |
| |
| private void reloadAndRecoverInternal(final ConfiguredObject<?> recoveryRoot, |
| final DurableConfigurationStore durableConfigurationStore) |
| { |
| final List<ConfiguredObjectRecord> records = new ArrayList<>(); |
| durableConfigurationStore.reload(records::add); |
| recover(recoveryRoot, durableConfigurationStore, records, false); |
| } |
| |
| private void recover(final ConfiguredObject<?> recoveryRoot, |
| final DurableConfigurationStore durableConfigurationStore, |
| final List<ConfiguredObjectRecord> records, |
| final boolean isNew) |
| { |
| new GenericRecoverer(recoveryRoot).recover(records, isNew); |
| |
| final StoreConfigurationChangeListener |
| configChangeListener = new StoreConfigurationChangeListener(durableConfigurationStore); |
| if(_virtualHostNode.getVirtualHost() != null) |
| { |
| applyRecursively(_virtualHostNode.getVirtualHost(), new RecursiveAction<ConfiguredObject<?>>() |
| { |
| @Override |
| public boolean applyToChildren(final ConfiguredObject<?> object) |
| { |
| return object.isDurable(); |
| } |
| |
| @Override |
| public void performAction(final ConfiguredObject<?> object) |
| { |
| object.addChangeListener(configChangeListener); |
| } |
| }); |
| } |
| |
| if (recoveryRoot instanceof VirtualHostNode) |
| { |
| _virtualHostNode.addChangeListener(new AbstractConfigurationChangeListener() |
| { |
| @Override |
| public void childAdded(final ConfiguredObject<?> object, final ConfiguredObject<?> child) |
| { |
| if (child instanceof VirtualHost) |
| { |
| applyRecursively(child, new RecursiveAction<ConfiguredObject<?>>() |
| { |
| @Override |
| public boolean applyToChildren(final ConfiguredObject<?> object) |
| { |
| return object.isDurable(); |
| } |
| |
| @Override |
| public void performAction(final ConfiguredObject<?> object) |
| { |
| if (object.isDurable()) |
| { |
| durableConfigurationStore.update(true, object.asObjectRecord()); |
| object.addChangeListener(configChangeListener); |
| } |
| } |
| }); |
| } |
| } |
| |
| @Override |
| public void childRemoved(final ConfiguredObject<?> object, final ConfiguredObject<?> child) |
| { |
| if (child instanceof VirtualHost) |
| { |
| child.removeChangeListener(configChangeListener); |
| } |
| } |
| }); |
| if (isNew) |
| { |
| if (_virtualHostNode instanceof AbstractConfiguredObject) |
| { |
| ((AbstractConfiguredObject) _virtualHostNode).forceUpdateAllSecureAttributes(); |
| } |
| } |
| } |
| } |
| } |