blob: f1372882a813ec4b2cab628bef5d063c0884810d [file] [log] [blame]
/*
*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*
*/
package org.apache.qpid.server.store;
import java.util.Collections;
import java.util.HashMap;
import java.util.Iterator;
import java.util.LinkedHashMap;
import java.util.Map;
import java.util.Map.Entry;
import java.util.UUID;
import org.apache.qpid.server.filter.FilterSupport;
import org.apache.qpid.server.model.Binding;
import org.apache.qpid.server.model.Exchange;
import org.apache.qpid.server.model.Queue;
import org.apache.qpid.server.model.UUIDGenerator;
import org.apache.qpid.server.model.VirtualHost;
import org.apache.qpid.server.model.VirtualHostNode;
import org.apache.qpid.server.queue.QueueArgumentsConverter;
public class VirtualHostStoreUpgraderAndRecoverer
{
private final VirtualHostNode<?> _virtualHostNode;
private Map<String, StoreUpgraderPhase> _upgraders = new HashMap<String, StoreUpgraderPhase>();
@SuppressWarnings("serial")
private static final Map<String, String> DEFAULT_EXCHANGES = Collections.unmodifiableMap(new HashMap<String, String>()
{{
put("amq.direct", "direct");
put("amq.topic", "topic");
put("amq.fanout", "fanout");
put("amq.match", "headers");
}});
private final Map<String, UUID> _defaultExchangeIds;
public VirtualHostStoreUpgraderAndRecoverer(VirtualHostNode<?> virtualHostNode)
{
_virtualHostNode = virtualHostNode;
register(new Upgrader_0_0_to_0_1());
register(new Upgrader_0_1_to_0_2());
register(new Upgrader_0_2_to_0_3());
register(new Upgrader_0_3_to_0_4());
register(new Upgrader_0_4_to_2_0());
register(new Upgrader_2_0_to_2_1());
Map<String, UUID> defaultExchangeIds = new HashMap<String, UUID>();
for (String exchangeName : DEFAULT_EXCHANGES.keySet())
{
UUID id = UUIDGenerator.generateExchangeUUID(exchangeName, virtualHostNode.getName());
defaultExchangeIds.put(exchangeName, id);
}
_defaultExchangeIds = Collections.unmodifiableMap(defaultExchangeIds);
}
private void register(StoreUpgraderPhase upgrader)
{
_upgraders.put(upgrader.getFromVersion(), upgrader);
}
/*
* Removes filters from queue bindings to exchanges other than topic exchanges. In older versions of the broker
* such bindings would have been ignored, starting from the point at which the config version changed, these
* arguments would actually cause selectors to be enforced, thus changing which messages would reach a queue.
*/
private class Upgrader_0_0_to_0_1 extends StoreUpgraderPhase
{
private final Map<UUID, ConfiguredObjectRecord> _records = new HashMap<UUID, ConfiguredObjectRecord>();
public Upgrader_0_0_to_0_1()
{
super("modelVersion", "0.0", "0.1");
}
@Override
public void configuredObject(final ConfiguredObjectRecord record)
{
_records.put(record.getId(), record);
}
private void removeSelectorArguments(Map<String, Object> binding)
{
@SuppressWarnings("unchecked")
Map<String, Object> arguments = new LinkedHashMap<String, Object>((Map<String,Object>)binding.get(Binding.ARGUMENTS));
FilterSupport.removeFilters(arguments);
binding.put(Binding.ARGUMENTS, arguments);
}
private boolean isTopicExchange(ConfiguredObjectRecord entry)
{
UUID exchangeId = entry.getParents().get("Exchange");
if (exchangeId == null)
{
return false;
}
if(_records.containsKey(exchangeId))
{
return "topic".equals(_records.get(exchangeId)
.getAttributes()
.get(org.apache.qpid.server.model.Exchange.TYPE));
}
else
{
if (_defaultExchangeIds.get("amq.topic").equals(exchangeId))
{
return true;
}
return false;
}
}
private boolean hasSelectorArguments(Map<String, Object> binding)
{
@SuppressWarnings("unchecked")
Map<String, Object> arguments = (Map<String, Object>) binding.get(Binding.ARGUMENTS);
return (arguments != null) && FilterSupport.argumentsContainFilter(arguments);
}
@Override
public void complete()
{
for(Map.Entry<UUID,ConfiguredObjectRecord> entry : _records.entrySet())
{
ConfiguredObjectRecord record = entry.getValue();
String type = record.getType();
Map<String, Object> attributes = record.getAttributes();
UUID id = record.getId();
if ("org.apache.qpid.server.model.VirtualHost".equals(type))
{
record = upgradeRootRecord(record);
}
else if(type.equals(Binding.class.getName()) && hasSelectorArguments(attributes) && !isTopicExchange(record))
{
attributes = new LinkedHashMap<String, Object>(attributes);
removeSelectorArguments(attributes);
record = new ConfiguredObjectRecordImpl(id, type, attributes, record.getParents());
getUpdateMap().put(id, record);
entry.setValue(record);
}
getNextUpgrader().configuredObject(record);
}
getNextUpgrader().complete();
}
}
/*
* Change the type string from org.apache.qpid.server.model.Foo to Foo (in line with the practice in the broker
* configuration store). Also remove bindings which reference nonexistent queues or exchanges.
*/
private class Upgrader_0_1_to_0_2 extends StoreUpgraderPhase
{
public Upgrader_0_1_to_0_2()
{
super("modelVersion", "0.1", "0.2");
}
@Override
public void configuredObject(final ConfiguredObjectRecord record)
{
String type = record.getType().substring(1 + record.getType().lastIndexOf('.'));
ConfiguredObjectRecord newRecord = new ConfiguredObjectRecordImpl(record.getId(), type, record.getAttributes(), record.getParents());
getUpdateMap().put(record.getId(), newRecord);
if ("VirtualHost".equals(type))
{
newRecord = upgradeRootRecord(newRecord);
}
}
@Override
public void complete()
{
for (Iterator<Map.Entry<UUID, ConfiguredObjectRecord>> iterator = getUpdateMap().entrySet().iterator(); iterator.hasNext();)
{
Map.Entry<UUID, ConfiguredObjectRecord> entry = iterator.next();
final ConfiguredObjectRecord record = entry.getValue();
final UUID exchangeParent = record.getParents().get(Exchange.class.getSimpleName());
final UUID queueParent = record.getParents().get(Queue.class.getSimpleName());
if(isBinding(record.getType()) && (exchangeParent == null || unknownExchange(exchangeParent)
|| queueParent == null || unknownQueue(queueParent)))
{
getDeleteMap().put(entry.getKey(), entry.getValue());
iterator.remove();
}
else
{
getNextUpgrader().configuredObject(record);
}
}
getNextUpgrader().complete();
}
private boolean unknownExchange(final UUID exchangeId)
{
if (_defaultExchangeIds.containsValue(exchangeId))
{
return false;
}
ConfiguredObjectRecord localRecord = getUpdateMap().get(exchangeId);
return !(localRecord != null && localRecord.getType().equals(Exchange.class.getSimpleName()));
}
private boolean unknownQueue(final UUID queueId)
{
ConfiguredObjectRecord localRecord = getUpdateMap().get(queueId);
return !(localRecord != null && localRecord.getType().equals(Queue.class.getSimpleName()));
}
private boolean isBinding(final String type)
{
return Binding.class.getSimpleName().equals(type);
}
}
/*
* Convert the storage of queue attributes to remove the separate "ARGUMENT" attribute, and flatten the
* attributes into the map using the model attribute names rather than the wire attribute names
*/
private class Upgrader_0_2_to_0_3 extends StoreUpgraderPhase
{
private static final String ARGUMENTS = "arguments";
public Upgrader_0_2_to_0_3()
{
super("modelVersion", "0.2", "0.3");
}
@SuppressWarnings("unchecked")
@Override
public void configuredObject(ConfiguredObjectRecord record)
{
if("VirtualHost".equals(record.getType()))
{
record = upgradeRootRecord(record);
}
else if("Queue".equals(record.getType()))
{
Map<String, Object> newAttributes = new LinkedHashMap<String, Object>();
if(record.getAttributes().get(ARGUMENTS) instanceof Map)
{
newAttributes.putAll(QueueArgumentsConverter.convertWireArgsToModel((Map<String, Object>) record.getAttributes()
.get(ARGUMENTS)));
}
newAttributes.putAll(record.getAttributes());
record = new ConfiguredObjectRecordImpl(record.getId(), record.getType(), newAttributes, record.getParents());
getUpdateMap().put(record.getId(), record);
}
getNextUpgrader().configuredObject(record);
}
@Override
public void complete()
{
getNextUpgrader().complete();
}
}
/*
* Convert the storage of queue attribute exclusive to change exclusive from a boolean to an enum
* where exclusive was false it will now be "NONE", and where true it will now be "CONTAINER"
* ensure OWNER is null unless the exclusivity policy is CONTAINER
*/
private class Upgrader_0_3_to_0_4 extends StoreUpgraderPhase
{
private static final String EXCLUSIVE = "exclusive";
public Upgrader_0_3_to_0_4()
{
super("modelVersion", "0.3", "0.4");
}
@Override
public void configuredObject(ConfiguredObjectRecord record)
{
if("VirtualHost".equals(record.getType()))
{
record = upgradeRootRecord(record);
}
else if(Queue.class.getSimpleName().equals(record.getType()))
{
Map<String, Object> newAttributes = new LinkedHashMap<String, Object>(record.getAttributes());
if(record.getAttributes().get(EXCLUSIVE) instanceof Boolean)
{
boolean isExclusive = (Boolean) record.getAttributes().get(EXCLUSIVE);
newAttributes.put(EXCLUSIVE, isExclusive ? "CONTAINER" : "NONE");
if(!isExclusive && record.getAttributes().containsKey("owner"))
{
newAttributes.remove("owner");
}
}
else
{
newAttributes.remove("owner");
}
if(!record.getAttributes().containsKey("durable"))
{
newAttributes.put("durable","true");
}
record = new ConfiguredObjectRecordImpl(record.getId(),record.getType(),newAttributes, record.getParents());
getUpdateMap().put(record.getId(), record);
}
getNextUpgrader().configuredObject(record);
}
@Override
public void complete()
{
getNextUpgrader().complete();
}
}
private class Upgrader_0_4_to_2_0 extends StoreUpgraderPhase
{
private Map<String, String> _missingAmqpExchanges = new HashMap<String, String>(DEFAULT_EXCHANGES);
private ConfiguredObjectRecord _virtualHostRecord;
public Upgrader_0_4_to_2_0()
{
super("modelVersion", "0.4", "2.0");
}
@Override
public void configuredObject(ConfiguredObjectRecord record)
{
if("VirtualHost".equals(record.getType()))
{
record = upgradeRootRecord(record);
Map<String, Object> virtualHostAttributes = new HashMap<String, Object>(record.getAttributes());
virtualHostAttributes.put("name", _virtualHostNode.getName());
virtualHostAttributes.put("modelVersion", getToVersion());
record = new ConfiguredObjectRecordImpl(record.getId(), "VirtualHost", virtualHostAttributes, Collections.<String, UUID>emptyMap());
_virtualHostRecord = record;
}
else if("Exchange".equals(record.getType()))
{
Map<String, Object> attributes = record.getAttributes();
String name = (String)attributes.get("name");
_missingAmqpExchanges.remove(name);
}
getNextUpgrader().configuredObject(record);
}
@Override
public void complete()
{
for (Entry<String, String> entry : _missingAmqpExchanges.entrySet())
{
String name = entry.getKey();
String type = entry.getValue();
UUID id = _defaultExchangeIds.get(name);
Map<String, Object> attributes = new HashMap<String, Object>();
attributes.put("name", name);
attributes.put("type", type);
attributes.put("lifetimePolicy", "PERMANENT");
ConfiguredObjectRecord record = new ConfiguredObjectRecordImpl(id, Exchange.class.getSimpleName(), attributes, Collections.singletonMap(_virtualHostRecord.getType(), _virtualHostRecord.getId()));
getUpdateMap().put(id, record);
getNextUpgrader().configuredObject(record);
}
getNextUpgrader().complete();
}
}
private class Upgrader_2_0_to_2_1 extends StoreUpgraderPhase
{
public Upgrader_2_0_to_2_1()
{
super("modelVersion", "2.0", "2.1");
}
@Override
public void configuredObject(ConfiguredObjectRecord record)
{
if("VirtualHost".equals(record.getType()))
{
record = upgradeRootRecord(record);
}
getNextUpgrader().configuredObject(record);
}
@Override
public void complete()
{
getNextUpgrader().complete();
}
}
public void perform(DurableConfigurationStore durableConfigurationStore)
{
String virtualHostCategory = VirtualHost.class.getSimpleName();
GenericStoreUpgrader upgraderHandler = new GenericStoreUpgrader(virtualHostCategory, VirtualHost.MODEL_VERSION, durableConfigurationStore, _upgraders);
upgraderHandler.upgrade();
new GenericRecoverer(_virtualHostNode).recover(upgraderHandler.getRecords());
}
}