blob: 46b5dbb9fc5ed177322dacb9a1966cd308aa2045 [file] [log] [blame]
/*
*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*
*/
package org.apache.qpid.server.virtualhost;
import static org.apache.qpid.server.model.VirtualHost.CURRENT_CONFIG_VERSION;
import java.util.Collections;
import java.util.HashMap;
import java.util.LinkedHashMap;
import java.util.Map;
import java.util.Map.Entry;
import java.util.UUID;
import org.apache.log4j.Logger;
import org.apache.qpid.server.exchange.TopicExchange;
import org.apache.qpid.server.filter.FilterSupport;
import org.apache.qpid.server.model.Binding;
import org.apache.qpid.server.model.Exchange;
import org.apache.qpid.server.model.Queue;
import org.apache.qpid.server.model.UUIDGenerator;
import org.apache.qpid.server.queue.QueueArgumentsConverter;
import org.apache.qpid.server.store.ConfiguredObjectRecord;
import org.apache.qpid.server.store.ConfiguredObjectRecordImpl;
import org.apache.qpid.server.store.DurableConfigurationRecoverer;
import org.apache.qpid.server.store.DurableConfigurationStoreUpgrader;
import org.apache.qpid.server.store.NonNullUpgrader;
import org.apache.qpid.server.store.NullUpgrader;
import org.apache.qpid.server.store.UpgraderProvider;
public class DefaultUpgraderProvider implements UpgraderProvider
{
private static final Logger LOGGER = Logger.getLogger(DefaultUpgraderProvider.class);
public static final String EXCLUSIVE = "exclusive";
public static final String NAME = "name";
private final VirtualHost _virtualHost;
@SuppressWarnings("serial")
private static final Map<String, String> DEFAULT_EXCHANGES = Collections.unmodifiableMap(new HashMap<String, String>()
{{
put("amq.direct", "direct");
put("amq.topic", "topic");
put("amq.fanout", "fanout");
put("amq.match", "headers");
}});
private final Map<String, UUID> _defaultExchangeIds;
public DefaultUpgraderProvider(final VirtualHost virtualHost)
{
_virtualHost = virtualHost;
Map<String, UUID> defaultExchangeIds = new HashMap<String, UUID>();
for (String exchangeName : DEFAULT_EXCHANGES.keySet())
{
UUID id = UUIDGenerator.generateExchangeUUID(exchangeName, _virtualHost.getName());
defaultExchangeIds.put(exchangeName, id);
}
_defaultExchangeIds = Collections.unmodifiableMap(defaultExchangeIds);
}
public DurableConfigurationStoreUpgrader getUpgrader(final int configVersion, DurableConfigurationRecoverer recoverer)
{
if (LOGGER.isDebugEnabled())
{
LOGGER.debug("Getting upgrader for configVersion: " + configVersion);
}
DurableConfigurationStoreUpgrader currentUpgrader = null;
switch(configVersion)
{
case 0:
currentUpgrader = addUpgrader(currentUpgrader, new Version0Upgrader());
case 1:
currentUpgrader = addUpgrader(currentUpgrader, new Version1Upgrader());
case 2:
currentUpgrader = addUpgrader(currentUpgrader, new Version2Upgrader());
case 3:
currentUpgrader = addUpgrader(currentUpgrader, new Version3Upgrader());
case 4:
currentUpgrader = addUpgrader(currentUpgrader, new Version4Upgrader());
case CURRENT_CONFIG_VERSION:
currentUpgrader = addUpgrader(currentUpgrader, new NullUpgrader(recoverer));
break;
default:
throw new IllegalStateException("Unknown configuration model version: " + configVersion
+ ". Attempting to run an older instance against an upgraded configuration?");
}
return currentUpgrader;
}
private DurableConfigurationStoreUpgrader addUpgrader(DurableConfigurationStoreUpgrader currentUpgrader,
final DurableConfigurationStoreUpgrader nextUpgrader)
{
if(currentUpgrader == null)
{
currentUpgrader = nextUpgrader;
}
else
{
currentUpgrader.setNextUpgrader(nextUpgrader);
}
return currentUpgrader;
}
/*
* Removes filters from queue bindings to exchanges other than topic exchanges. In older versions of the broker
* such bindings would have been ignored, starting from the point at which the config version changed, these
* arguments would actually cause selectors to be enforced, thus changing which messages would reach a queue.
*/
private class Version0Upgrader extends NonNullUpgrader
{
private final Map<UUID, ConfiguredObjectRecord> _records = new HashMap<UUID, ConfiguredObjectRecord>();
public Version0Upgrader()
{
}
@Override
public void configuredObject(final ConfiguredObjectRecord record)
{
_records.put(record.getId(), record);
}
private void removeSelectorArguments(Map<String, Object> binding)
{
@SuppressWarnings("unchecked")
Map<String, Object> arguments = new LinkedHashMap<String, Object>((Map<String,Object>)binding.get(Binding.ARGUMENTS));
FilterSupport.removeFilters(arguments);
binding.put(Binding.ARGUMENTS, arguments);
}
private boolean isTopicExchange(ConfiguredObjectRecord entry)
{
ConfiguredObjectRecord exchangeRecord = entry.getParents().get("Exchange");
if (exchangeRecord == null)
{
return false;
}
UUID exchangeId = exchangeRecord.getId();
if(_records.containsKey(exchangeId))
{
return "topic".equals(_records.get(exchangeId)
.getAttributes()
.get(org.apache.qpid.server.model.Exchange.TYPE));
}
else
{
if (_defaultExchangeIds.get("amq.topic").equals(exchangeId))
{
return true;
}
return _virtualHost.getExchange(exchangeId) != null
&& _virtualHost.getExchange(exchangeId).getExchangeType() == TopicExchange.TYPE;
}
}
private boolean hasSelectorArguments(Map<String, Object> binding)
{
@SuppressWarnings("unchecked")
Map<String, Object> arguments = (Map<String, Object>) binding.get(Binding.ARGUMENTS);
return (arguments != null) && FilterSupport.argumentsContainFilter(arguments);
}
@Override
public void complete()
{
for(Map.Entry<UUID,ConfiguredObjectRecord> entry : _records.entrySet())
{
ConfiguredObjectRecord record = entry.getValue();
String type = record.getType();
Map<String, Object> attributes = record.getAttributes();
UUID id = record.getId();
if(type.equals(Binding.class.getName()) && hasSelectorArguments(attributes) && !isTopicExchange(record))
{
attributes = new LinkedHashMap<String, Object>(attributes);
removeSelectorArguments(attributes);
record = new ConfiguredObjectRecordImpl(id, type, attributes, record.getParents());
getUpdateMap().put(id, record);
entry.setValue(record);
}
getNextUpgrader().configuredObject(record);
}
getNextUpgrader().complete();
}
}
/*
* Change the type string from org.apache.qpid.server.model.Foo to Foo (in line with the practice in the broker
* configuration store). Also remove bindings which reference nonexistent queues or exchanges.
*/
private class Version1Upgrader extends NonNullUpgrader
{
@Override
public void configuredObject(final ConfiguredObjectRecord record)
{
String type = record.getType().substring(1 + record.getType().lastIndexOf('.'));
getUpdateMap().put(record.getId(),
new ConfiguredObjectRecordImpl(record.getId(), type, record.getAttributes(), record.getParents()));
}
@Override
public void complete()
{
for(Map.Entry<UUID, ConfiguredObjectRecord> entry : getUpdateMap().entrySet())
{
final ConfiguredObjectRecord record = entry.getValue();
final ConfiguredObjectRecord exchangeParent = record.getParents().get(Exchange.class.getSimpleName());
final ConfiguredObjectRecord queueParent = record.getParents().get(Queue.class.getSimpleName());
if(isBinding(record.getType()) && (exchangeParent == null || unknownExchange(exchangeParent.getId())
|| queueParent == null || unknownQueue(queueParent.getId())))
{
getDeleteMap().put(entry.getKey(), entry.getValue());
entry.setValue(null);
}
else
{
getNextUpgrader().configuredObject(record);
}
}
getNextUpgrader().complete();
}
private boolean unknownExchange(final UUID exchangeId)
{
if (_defaultExchangeIds.containsValue(exchangeId))
{
return false;
}
ConfiguredObjectRecord localRecord = getUpdateMap().get(exchangeId);
return !((localRecord != null && localRecord.getType().equals(Exchange.class.getSimpleName()))
|| _virtualHost.getExchange(exchangeId) != null);
}
private boolean unknownQueue(final UUID queueId)
{
ConfiguredObjectRecord localRecord = getUpdateMap().get(queueId);
return !((localRecord != null && localRecord.getType().equals(Queue.class.getSimpleName()))
|| _virtualHost.getQueue(queueId) != null);
}
private boolean isBinding(final String type)
{
return Binding.class.getSimpleName().equals(type);
}
}
/*
* Convert the storage of queue attributes to remove the separate "ARGUMENT" attribute, and flatten the
* attributes into the map using the model attribute names rather than the wire attribute names
*/
private class Version2Upgrader extends NonNullUpgrader
{
private static final String ARGUMENTS = "arguments";
@Override
public void configuredObject(ConfiguredObjectRecord record)
{
if(Queue.class.getSimpleName().equals(record.getType()))
{
Map<String, Object> newAttributes = new LinkedHashMap<String, Object>();
if(record.getAttributes().get(ARGUMENTS) instanceof Map)
{
newAttributes.putAll(QueueArgumentsConverter.convertWireArgsToModel((Map<String, Object>) record.getAttributes()
.get(ARGUMENTS)));
}
newAttributes.putAll(record.getAttributes());
record = new ConfiguredObjectRecordImpl(record.getId(), record.getType(), newAttributes, record.getParents());
getUpdateMap().put(record.getId(), record);
}
getNextUpgrader().configuredObject(record);
}
@Override
public void complete()
{
getNextUpgrader().complete();
}
}
/*
* Convert the storage of queue attribute exclusive to change exclusive from a boolean to an enum
* where exclusive was false it will now be "NONE", and where true it will now be "CONTAINER"
* ensure OWNER is null unless the exclusivity policy is CONTAINER
*/
private class Version3Upgrader extends NonNullUpgrader
{
@Override
public void configuredObject(ConfiguredObjectRecord record)
{
if(Queue.class.getSimpleName().equals(record.getType()))
{
Map<String, Object> newAttributes = new LinkedHashMap<String, Object>(record.getAttributes());
if(record.getAttributes().get(EXCLUSIVE) instanceof Boolean)
{
boolean isExclusive = (Boolean) record.getAttributes().get(EXCLUSIVE);
newAttributes.put(EXCLUSIVE, isExclusive ? "CONTAINER" : "NONE");
if(!isExclusive && record.getAttributes().containsKey("owner"))
{
newAttributes.remove("owner");
}
}
else
{
newAttributes.remove("owner");
}
if(!record.getAttributes().containsKey("durable"))
{
newAttributes.put("durable","true");
}
record = new ConfiguredObjectRecordImpl(record.getId(),record.getType(),newAttributes, record.getParents());
getUpdateMap().put(record.getId(), record);
}
getNextUpgrader().configuredObject(record);
}
@Override
public void complete()
{
getNextUpgrader().complete();
}
}
private class Version4Upgrader extends NonNullUpgrader
{
private Map<String, String> _missingAmqpExchanges = new HashMap<String, String>(DEFAULT_EXCHANGES);
@Override
public void configuredObject(ConfiguredObjectRecord record)
{
if(Exchange.class.getSimpleName().equals(record.getType()))
{
Map<String, Object> attributes = record.getAttributes();
String name = (String)attributes.get(NAME);
_missingAmqpExchanges.remove(name);
}
getNextUpgrader().configuredObject(record);
}
@Override
public void complete()
{
for (Entry<String, String> entry : _missingAmqpExchanges.entrySet())
{
String name = entry.getKey();
String type = entry.getValue();
UUID id = _defaultExchangeIds.get(name);
if (LOGGER.isDebugEnabled())
{
LOGGER.debug("Creating amqp exchange " + name + " with id " + id);
}
Map<String, Object> attributes = new HashMap<String, Object>();
attributes.put(org.apache.qpid.server.model.Exchange.NAME, name);
attributes.put(org.apache.qpid.server.model.Exchange.TYPE, type);
attributes.put(org.apache.qpid.server.model.Exchange.DURABLE, true);
ConfiguredObjectRecord virtualHostRecord = new ConfiguredObjectRecordImpl(_virtualHost.getId(), org.apache.qpid.server.model.VirtualHost.class.getSimpleName(), Collections.<String, Object>emptyMap());
ConfiguredObjectRecord record = new ConfiguredObjectRecordImpl(id, Exchange.class.getSimpleName(), attributes, Collections.singletonMap(virtualHostRecord.getType(), virtualHostRecord));
getUpdateMap().put(id, record);
getNextUpgrader().configuredObject(record);
}
getNextUpgrader().complete();
}
}
}