blob: 8b362e6b03708226503ca54a41f2280f4510ee11 [file] [log] [blame]
/*
*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*
*/
package org.apache.qpid.server.store;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.UUID;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.qpid.server.configuration.IllegalConfigurationException;
import org.apache.qpid.server.configuration.store.StoreConfigurationChangeListener;
import org.apache.qpid.server.logging.LogLevel;
import org.apache.qpid.server.model.Broker;
import org.apache.qpid.server.model.ConfiguredObject;
import org.apache.qpid.server.model.ContainerStoreUpgraderAndRecoverer;
import org.apache.qpid.server.model.Port;
import org.apache.qpid.server.model.SystemConfig;
import org.apache.qpid.server.model.VirtualHostAlias;
public class BrokerStoreUpgraderAndRecoverer extends AbstractConfigurationStoreUpgraderAndRecoverer implements ContainerStoreUpgraderAndRecoverer<Broker>
{
private static final Logger LOGGER = LoggerFactory.getLogger(BrokerStoreUpgraderAndRecoverer.class);
public static final String VIRTUALHOSTS = "virtualhosts";
private final SystemConfig<?> _systemConfig;
// Note: don't use externally defined constants in upgraders in case they change, the values here MUST stay the same
// no matter what changes are made to the code in the future
public BrokerStoreUpgraderAndRecoverer(SystemConfig<?> systemConfig)
{
super("1.0");
_systemConfig = systemConfig;
register(new Upgrader_1_0_to_1_1());
register(new Upgrader_1_1_to_1_2());
register(new Upgrader_1_2_to_1_3());
register(new Upgrader_1_3_to_2_0());
register(new Upgrader_2_0_to_3_0());
register(new Upgrader_3_0_to_6_0());
register(new Upgrader_6_0_to_6_1());
register(new Upgrader_6_1_to_7_0());
register(new Upgrader_7_0_to_7_1());
register(new Upgrader_7_1_to_8_0());
register(new Upgrader_8_0_to_9_0());
}
private static final class Upgrader_1_0_to_1_1 extends StoreUpgraderPhase
{
private Upgrader_1_0_to_1_1()
{
super("modelVersion", "1.0", "1.1");
}
@Override
public void configuredObject(ConfiguredObjectRecord record)
{
if (record.getType().equals("Broker"))
{
record = upgradeRootRecord(record);
createVirtualHostsRecordsFromBrokerRecordForModel_1_x(record, this);
}
else if (record.getType().equals("VirtualHost") && record.getAttributes().containsKey("storeType"))
{
Map<String, Object> updatedAttributes = new HashMap<String, Object>(record.getAttributes());
updatedAttributes.put("type", "STANDARD");
record = new ConfiguredObjectRecordImpl(record.getId(), record.getType(), updatedAttributes, record.getParents());
getUpdateMap().put(record.getId(), record);
}
}
@Override
public void complete()
{
}
}
private static final class Upgrader_1_1_to_1_2 extends StoreUpgraderPhase
{
private Upgrader_1_1_to_1_2()
{
super("modelVersion", "1.1", "1.2");
}
@Override
public void configuredObject(ConfiguredObjectRecord record)
{
if (record.getType().equals("Broker"))
{
record = upgradeRootRecord(record);
createVirtualHostsRecordsFromBrokerRecordForModel_1_x(record, this);
}
}
@Override
public void complete()
{
}
}
private static final class Upgrader_1_2_to_1_3 extends StoreUpgraderPhase
{
private Upgrader_1_2_to_1_3()
{
super("modelVersion", "1.2", "1.3");
}
@Override
public void configuredObject(ConfiguredObjectRecord record)
{
if (record.getType().equals("TrustStore") && record.getAttributes().containsKey("type"))
{
Map<String, Object> updatedAttributes = new HashMap<String, Object>(record.getAttributes());
updatedAttributes.put("trustStoreType", updatedAttributes.remove("type"));
record = new ConfiguredObjectRecordImpl(record.getId(), record.getType(), updatedAttributes, record.getParents());
getUpdateMap().put(record.getId(), record);
}
else if (record.getType().equals("KeyStore") && record.getAttributes().containsKey("type"))
{
Map<String, Object> updatedAttributes = new HashMap<String, Object>(record.getAttributes());
updatedAttributes.put("keyStoreType", updatedAttributes.remove("type"));
record = new ConfiguredObjectRecordImpl(record.getId(), record.getType(), updatedAttributes, record.getParents());
getUpdateMap().put(record.getId(), record);
}
else if (record.getType().equals("Broker"))
{
record = upgradeRootRecord(record);
createVirtualHostsRecordsFromBrokerRecordForModel_1_x(record, this);
}
}
@Override
public void complete()
{
}
}
private static final class Upgrader_1_3_to_2_0 extends StoreUpgraderPhase
{
private final VirtualHostEntryUpgrader _virtualHostUpgrader;
private Upgrader_1_3_to_2_0()
{
super("modelVersion", "1.3", "2.0");
_virtualHostUpgrader = new VirtualHostEntryUpgrader();
}
@Override
public void configuredObject(ConfiguredObjectRecord record)
{
if (record.getType().equals("VirtualHost"))
{
Map<String, Object> attributes = record.getAttributes();
if (attributes.containsKey("configPath"))
{
throw new IllegalConfigurationException("Auto-upgrade of virtual host " + attributes.get("name") + " having XML configuration is not supported. Virtual host configuration file is " + attributes.get("configPath"));
}
record = _virtualHostUpgrader.upgrade(record);
getUpdateMap().put(record.getId(), record);
}
else if (record.getType().equals("Plugin") && record.getAttributes().containsKey("pluginType"))
{
Map<String, Object> updatedAttributes = new HashMap<String, Object>(record.getAttributes());
updatedAttributes.put("type", updatedAttributes.remove("pluginType"));
record = new ConfiguredObjectRecordImpl(record.getId(), record.getType(), updatedAttributes, record.getParents());
getUpdateMap().put(record.getId(), record);
}
else if (record.getType().equals("Broker"))
{
record = upgradeRootRecord(record);
createVirtualHostsRecordsFromBrokerRecordForModel_1_x(record, this);
}
}
@Override
public void complete()
{
}
}
private class Upgrader_2_0_to_3_0 extends StoreUpgraderPhase
{
public Upgrader_2_0_to_3_0()
{
super("modelVersion", "2.0", "3.0");
}
@Override
public void configuredObject(ConfiguredObjectRecord record)
{
if(record.getType().equals("Port") && isAmqpPort(record.getAttributes()))
{
createAliasRecord(record, "nameAlias", "nameAlias");
createAliasRecord(record, "defaultAlias", "defaultAlias");
createAliasRecord(record, "hostnameAlias", "hostnameAlias");
}
else if(record.getType().equals("User") && "scram".equals(record.getAttributes().get("type")) )
{
Map<String, Object> updatedAttributes = new HashMap<String, Object>(record.getAttributes());
updatedAttributes.put("type", "managed");
record = new ConfiguredObjectRecordImpl(record.getId(), record.getType(), updatedAttributes, record.getParents());
getUpdateMap().put(record.getId(), record);
}
else if (record.getType().equals("Broker"))
{
upgradeRootRecord(record);
}
else if("KeyStore".equals(record.getType()))
{
upgradeKeyStoreRecordIfTypeTheSame(record, "FileKeyStore");
}
else if("TrustStore".equals(record.getType()))
{
upgradeKeyStoreRecordIfTypeTheSame(record, "FileTrustStore");
}
}
private ConfiguredObjectRecord upgradeKeyStoreRecordIfTypeTheSame(ConfiguredObjectRecord record, String expectedType)
{
Map<String, Object> attributes = new HashMap<>(record.getAttributes());
// Type may not be present, in which case the default type - which is the type affected - will be being used
if(!attributes.containsKey("type"))
{
attributes.put("type", expectedType);
}
if (expectedType.equals(attributes.get("type")))
{
Object path = attributes.remove("path");
attributes.put("storeUrl", path);
record = new ConfiguredObjectRecordImpl(record.getId(), record.getType(), attributes, record.getParents());
getUpdateMap().put(record.getId(), record);
}
return record;
}
private boolean isAmqpPort(final Map<String, Object> attributes)
{
Object type = attributes.get(ConfiguredObject.TYPE);
Object protocols = attributes.get(Port.PROTOCOLS);
String protocolString = protocols == null ? null : protocols.toString();
return "AMQP".equals(type)
|| ((type == null || "".equals(type.toString().trim()))
&& (protocolString == null
|| !protocolString.matches(".*\\w.*")
|| protocolString.contains("AMQP")));
}
private void createAliasRecord(ConfiguredObjectRecord parent, String name, String type)
{
Map<String,Object> attributes = new HashMap<>();
attributes.put(VirtualHostAlias.NAME, name);
attributes.put(VirtualHostAlias.TYPE, type);
final ConfiguredObjectRecord record = new ConfiguredObjectRecordImpl(UUID.randomUUID(),
"VirtualHostAlias",
attributes,
Collections.singletonMap("Port", parent.getId()));
getUpdateMap().put(record.getId(), record);
}
@Override
public void complete()
{
}
}
private class Upgrader_3_0_to_6_0 extends StoreUpgraderPhase
{
private String _defaultVirtualHost;
private final Set<ConfiguredObjectRecord> _knownBdbHaVirtualHostNode = new HashSet<>();
private final Map<String, ConfiguredObjectRecord> _knownNonBdbHaVirtualHostNode = new HashMap<>();
public Upgrader_3_0_to_6_0()
{
super("modelVersion", "3.0", "6.0");
}
@Override
public void configuredObject(ConfiguredObjectRecord record)
{
if (record.getType().equals("Broker"))
{
record = upgradeRootRecord(record);
Map<String, Object> brokerAttributes = new HashMap<>(record.getAttributes());
_defaultVirtualHost = (String)brokerAttributes.remove("defaultVirtualHost");
boolean typeDetected = brokerAttributes.remove("type") != null;
if (_defaultVirtualHost != null || typeDetected)
{
record = new ConfiguredObjectRecordImpl(record.getId(),
record.getType(),
brokerAttributes,
record.getParents());
getUpdateMap().put(record.getId(), record);
}
addLogger(record, "memory", "Memory");
addLogger(record, "logfile", "File");
}
else if (record.getType().equals("VirtualHostNode"))
{
if ("BDB_HA".equals(record.getAttributes().get("type")))
{
_knownBdbHaVirtualHostNode.add(record);
}
else
{
String nodeName = (String) record.getAttributes().get("name");
_knownNonBdbHaVirtualHostNode.put(nodeName, record);
}
}
else if (record.getType().equals("Port") && "AMQP".equals(record.getAttributes().get("type")))
{
Map<String, Object> updatedAttributes = new HashMap<>(record.getAttributes());
if (updatedAttributes.containsKey("receiveBufferSize") || updatedAttributes.containsKey("sendBufferSize"))
{
updatedAttributes.remove("receiveBufferSize");
updatedAttributes.remove("sendBufferSize");
record = new ConfiguredObjectRecordImpl(record.getId(), record.getType(), updatedAttributes, record.getParents());
getUpdateMap().put(record.getId(), record);
}
}
}
private void addLogger(final ConfiguredObjectRecord record, String name, String type)
{
Map<String,Object> attributes = new HashMap<>();
attributes.put("name", name);
attributes.put("type", type);
final ConfiguredObjectRecord logger = new ConfiguredObjectRecordImpl(UUID.randomUUID(),
"BrokerLogger",
attributes,
Collections.singletonMap("Broker",
record.getId()));
addNameValueFilter("Root", logger, LogLevel.WARN, "ROOT");
addNameValueFilter("Qpid", logger, LogLevel.INFO, "org.apache.qpid.*");
addNameValueFilter("Operational", logger, LogLevel.INFO, "qpid.message.*");
getUpdateMap().put(logger.getId(), logger);
}
private void addNameValueFilter(String inclusionRuleName,
final ConfiguredObjectRecord loggerRecord,
final LogLevel level,
final String loggerName)
{
Map<String,Object> attributes = new HashMap<>();
attributes.put("name", inclusionRuleName);
attributes.put("level", level.name());
attributes.put("loggerName", loggerName);
attributes.put("type", "NameAndLevel");
final ConfiguredObjectRecord filterRecord = new ConfiguredObjectRecordImpl(UUID.randomUUID(),
"BrokerLogInclusionRule",
attributes,
Collections.singletonMap("BrokerLogger",
loggerRecord.getId()));
getUpdateMap().put(filterRecord.getId(), filterRecord);
}
@Override
public void complete()
{
if (_defaultVirtualHost != null)
{
final ConfiguredObjectRecord defaultVirtualHostNode;
if (_knownNonBdbHaVirtualHostNode.containsKey(_defaultVirtualHost))
{
defaultVirtualHostNode = _knownNonBdbHaVirtualHostNode.get(_defaultVirtualHost);
}
else if (_knownBdbHaVirtualHostNode.size() == 1)
{
// We had a default VHN but it didn't match the non-BDBHAVHNs and we have only one BDBHAVHN.
// It has to be the target.
defaultVirtualHostNode = _knownBdbHaVirtualHostNode.iterator().next();
}
else
{
LOGGER.warn("Unable to identify the target virtual host node for old default virtualhost name : '{}'",
_defaultVirtualHost);
defaultVirtualHostNode = null;
}
if (defaultVirtualHostNode != null)
{
final Map<String, Object> updatedAttributes = new HashMap<>(defaultVirtualHostNode.getAttributes());
updatedAttributes.put("defaultVirtualHostNode", "true");
ConfiguredObjectRecordImpl updateRecord =
new ConfiguredObjectRecordImpl(defaultVirtualHostNode.getId(),
defaultVirtualHostNode.getType(),
updatedAttributes,
defaultVirtualHostNode.getParents());
getUpdateMap().put(updateRecord.getId(), updateRecord);
}
}
}
}
private class Upgrader_6_0_to_6_1 extends StoreUpgraderPhase
{
private boolean _hasAcl = false;
private UUID _rootRecordId;
public Upgrader_6_0_to_6_1()
{
super("modelVersion", "6.0", "6.1");
}
@Override
public void configuredObject(ConfiguredObjectRecord record)
{
if (record.getType().equals("Broker"))
{
record = upgradeRootRecord(record);
_rootRecordId = record.getId();
}
else if (record.getType().equals("TrustStore"))
{
upgradeTrustStore(record);
}
else
{
Map<String, Object> attributes = record.getAttributes();
String type = (String)attributes.get("type");
if (record.getType().equals("Plugin") && "MANAGEMENT-JMX".equals(type))
{
getDeleteMap().put(record.getId(), record);
}
else if (record.getType().equals("Port"))
{
Object protocols = attributes.get("protocols");
if ((protocols instanceof Collection && (((Collection) protocols).contains("RMI")
|| ((Collection) protocols).contains("JMX_RMI")))
|| "JMX".equals(type)
|| "RMI".equals(type))
{
getDeleteMap().put(record.getId(), record);
}
}
else if (record.getType().equals("AuthenticationProvider") && attributes.containsKey("preferencesproviders"))
{
// removing of Preferences Provider from AuthenticationProvider attribute for JSON configuration store
Map<String, Object> updatedAttributes = new LinkedHashMap<>(attributes);
updatedAttributes.remove("preferencesproviders");
record = new ConfiguredObjectRecordImpl(record.getId(), record.getType(), updatedAttributes, record.getParents());
getUpdateMap().put(record.getId(), record);
}
else if (record.getType().equals("PreferencesProvider"))
{
// removing of f Preferences Provider record for non-JSON configuration store
getDeleteMap().put(record.getId(), record);
}
}
}
private void upgradeTrustStore(ConfiguredObjectRecord record)
{
Map<String, Object> updatedAttributes = new LinkedHashMap<>(record.getAttributes());
if (updatedAttributes.containsKey("includedVirtualHostMessageSources")
|| updatedAttributes.containsKey("excludedVirtualHostMessageSources"))
{
if (updatedAttributes.containsKey("includedVirtualHostMessageSources"))
{
LOGGER.warn("Detected 'includedVirtualHostMessageSources' attribute during upgrade."
+ " Starting with version 6.1 this attribute has been replaced with"
+ " 'includedVirtualHostNodeMessageSources'. The upgrade is automatic but"
+ " assumes that the VirtualHostNode has the same name as the VirtualHost."
+ " Assumed name: '{}'", updatedAttributes.get("includedVirtualHostMessageSources"));
updatedAttributes.put("includedVirtualHostNodeMessageSources",
updatedAttributes.get("includedVirtualHostMessageSources"));
updatedAttributes.remove("includedVirtualHostMessageSources");
}
if (updatedAttributes.containsKey("excludedVirtualHostMessageSources"))
{
LOGGER.warn("Detected 'excludedVirtualHostMessageSources' attribute during upgrade."
+ " Starting with version 6.1 this attribute has been replaced with"
+ " 'excludedVirtualHostNodeMessageSources'. The upgrade is automatic but"
+ " assumes that the VirtualHostNode has the same name as the VirtualHost."
+ " Assumed name: '{}'", updatedAttributes.get("excludedVirtualHostMessageSources"));
updatedAttributes.put("excludedVirtualHostNodeMessageSources",
updatedAttributes.get("excludedVirtualHostMessageSources"));
updatedAttributes.remove("excludedVirtualHostMessageSources");
}
record = new ConfiguredObjectRecordImpl(record.getId(), record.getType(), updatedAttributes, record.getParents());
getUpdateMap().put(record.getId(), record);
}
}
@Override
public void complete()
{
if(!_hasAcl)
{
UUID allowAllACLId = UUID.randomUUID();
Map<String,Object> attrs = new HashMap<>();
attrs.put(ConfiguredObject.NAME, "AllowAll");
attrs.put(ConfiguredObject.TYPE, "AllowAll");
attrs.put("priority", 9999);
ConfiguredObjectRecord allowAllAclRecord =
new ConfiguredObjectRecordImpl(allowAllACLId, "AccessControlProvider", attrs, Collections.singletonMap("Broker", _rootRecordId));
getUpdateMap().put(allowAllAclRecord.getId(), allowAllAclRecord);
}
}
}
private class Upgrader_6_1_to_7_0 extends StoreUpgraderPhase
{
private Map<String,String> BROKER_ATTRIBUTES_MOVED_INTO_CONTEXT = new HashMap<>();
{
BROKER_ATTRIBUTES_MOVED_INTO_CONTEXT.put("connection.sessionCountLimit", "qpid.port.sessionCountLimit");
BROKER_ATTRIBUTES_MOVED_INTO_CONTEXT.put("connection.heartBeatDelay", "qpid.port.heartbeatDelay");
BROKER_ATTRIBUTES_MOVED_INTO_CONTEXT.put("connection.closeWhenNoRoute", "qpid.port.closeWhenNoRoute");
};
public Upgrader_6_1_to_7_0()
{
super("modelVersion", "6.1", "7.0");
}
@Override
public void configuredObject(ConfiguredObjectRecord record)
{
if (record.getType().equals("Broker"))
{
boolean rebuildRecord = false;
Map<String, Object> attributes = new HashMap<>(record.getAttributes());
Map<String, String> additionalContext = new HashMap<>();
for (String attributeName : BROKER_ATTRIBUTES_MOVED_INTO_CONTEXT.keySet())
{
Object value = attributes.remove(attributeName);
if (value != null)
{
additionalContext.put(BROKER_ATTRIBUTES_MOVED_INTO_CONTEXT.get(attributeName),
String.valueOf(value));
}
}
if (attributes.containsKey("statisticsReportingResetEnabled"))
{
attributes.remove("statisticsReportingResetEnabled");
rebuildRecord = true;
}
if (attributes.containsKey("statisticsReportingPeriod")
&& Integer.parseInt(String.valueOf(attributes.get("statisticsReportingPeriod"))) > 0)
{
additionalContext.put("qpid.broker.statisticsReportPattern", "messagesIn=${messagesIn}, bytesIn=${bytesIn:byteunit}, messagesOut=${messagesOut}, bytesOut=${bytesOut:byteunit}");
rebuildRecord = true;
}
if (!additionalContext.isEmpty())
{
Map<String, String> newContext = new HashMap<>();
if (attributes.containsKey("context"))
{
newContext.putAll((Map<String, String>) attributes.get("context"));
}
newContext.putAll(additionalContext);
attributes.put("context", newContext);
rebuildRecord = true;
}
if (rebuildRecord)
{
record = new ConfiguredObjectRecordImpl(record.getId(),
record.getType(),
attributes,
record.getParents());
}
upgradeRootRecord(record);
}
else if (record.getType().equals("Port"))
{
Map<String, Object> attributes = record.getAttributes();
Object protocols = attributes.get("protocols");
String type = (String) attributes.get("type");
if ((protocols instanceof Collection && ((Collection) protocols).contains("HTTP"))
|| "HTTP".equals(type))
{
upgradeHttpPortIfRequired(record);
}
}
else if (record.getType().equals("BrokerLogger"))
{
Map<String,Object> attributes = new HashMap<>();
attributes.put("name", "statistics-" + record.getAttributes().get("name"));
attributes.put("level", "INFO");
attributes.put("loggerName", "qpid.statistics.*");
attributes.put("type", "NameAndLevel");
final ConfiguredObjectRecord filterRecord = new ConfiguredObjectRecordImpl(UUID.randomUUID(),
"BrokerLogInclusionRule",
attributes,
Collections.singletonMap("BrokerLogger",
record.getId()));
getUpdateMap().put(filterRecord.getId(), filterRecord);
}
}
private void upgradeHttpPortIfRequired(final ConfiguredObjectRecord record)
{
Map<String, Object> attributes = record.getAttributes();
if (attributes.containsKey("context"))
{
Map<String, String> context = (Map<String, String>) attributes.get("context");
if (context != null
&& (context.containsKey("port.http.additionalInternalThreads")
|| context.containsKey("port.http.maximumQueuedRequests")))
{
Map<String, String> updatedContext = new HashMap<>(context);
updatedContext.remove("port.http.additionalInternalThreads");
String acceptorsBacklog = updatedContext.remove("port.http.maximumQueuedRequests");
if (acceptorsBacklog != null)
{
updatedContext.put("qpid.port.http.acceptBacklog", acceptorsBacklog);
}
Map<String, Object> updatedAttributes = new LinkedHashMap<>(attributes);
updatedAttributes.put("context", updatedContext);
ConfiguredObjectRecord upgradedRecord = new ConfiguredObjectRecordImpl(record.getId(),
record.getType(),
updatedAttributes,
record.getParents());
getUpdateMap().put(upgradedRecord.getId(), upgradedRecord);
}
}
}
@Override
public void complete()
{
}
}
private class Upgrader_7_0_to_7_1 extends StoreUpgraderPhase
{
public Upgrader_7_0_to_7_1()
{
super("modelVersion", "7.0", "7.1");
}
@Override
public void configuredObject(final ConfiguredObjectRecord record)
{
if("Broker".equals(record.getType()))
{
upgradeRootRecord(record);
}
}
@Override
public void complete()
{
}
}
private class Upgrader_7_1_to_8_0 extends StoreUpgraderPhase
{
public Upgrader_7_1_to_8_0()
{
super("modelVersion", "7.1", "8.0");
}
@Override
public void configuredObject(final ConfiguredObjectRecord record)
{
if("Broker".equals(record.getType()))
{
upgradeRootRecord(record);
}
}
@Override
public void complete()
{
}
}
private static class Upgrader_8_0_to_9_0 extends StoreUpgraderPhase
{
Upgrader_8_0_to_9_0()
{
super("modelVersion", "8.0", "9.0");
}
@Override
public void configuredObject(ConfiguredObjectRecord record)
{
if ("Broker".equals(record.getType()))
{
record = upgradeRootRecord(record);
}
renameContextVariables(record,
"context",
UpgraderHelper.MODEL9_MAPPING_FOR_RENAME_TO_ALLOW_DENY_CONTEXT_VARIABLES);
}
@Override
public void complete()
{
}
}
private static class VirtualHostEntryUpgrader
{
@SuppressWarnings("serial")
Map<String, AttributesTransformer> _messageStoreToNodeTransformers = new HashMap<String, AttributesTransformer>()
{{
put("DERBY", new AttributesTransformer().
addAttributeTransformer("id", copyAttribute()).
addAttributeTransformer("name", copyAttribute()).
addAttributeTransformer("createdTime", copyAttribute()).
addAttributeTransformer("createdBy", copyAttribute()).
addAttributeTransformer("storePath", copyAttribute()).
addAttributeTransformer("storeUnderfullSize", copyAttribute()).
addAttributeTransformer("storeOverfullSize", copyAttribute()));
put("Memory", new AttributesTransformer().
addAttributeTransformer("id", copyAttribute()).
addAttributeTransformer("name", copyAttribute()).
addAttributeTransformer("createdTime", copyAttribute()).
addAttributeTransformer("createdBy", copyAttribute()));
put("BDB", new AttributesTransformer().
addAttributeTransformer("id", copyAttribute()).
addAttributeTransformer("name", copyAttribute()).
addAttributeTransformer("createdTime", copyAttribute()).
addAttributeTransformer("createdBy", copyAttribute()).
addAttributeTransformer("storePath", copyAttribute()).
addAttributeTransformer("storeUnderfullSize", copyAttribute()).
addAttributeTransformer("storeOverfullSize", copyAttribute()).
addAttributeTransformer("bdbEnvironmentConfig", mutateAttributeName("context")));
put("JDBC", new AttributesTransformer().
addAttributeTransformer("id", copyAttribute()).
addAttributeTransformer("name", copyAttribute()).
addAttributeTransformer("createdTime", copyAttribute()).
addAttributeTransformer("createdBy", copyAttribute()).
addAttributeTransformer("storePath", mutateAttributeName("connectionURL")).
addAttributeTransformer("connectionURL", mutateAttributeName("connectionUrl")).
addAttributeTransformer("connectionPool", new AttributeTransformer()
{
@Override
public MutableEntry transform(MutableEntry entry)
{
Object value = entry.getValue();
if ("DEFAULT".equals(value))
{
value = "NONE";
}
return new MutableEntry("connectionPoolType", value);
}
}).
addAttributeTransformer("jdbcBigIntType", addContextVar("qpid.jdbcstore.bigIntType")).
addAttributeTransformer("jdbcBytesForBlob", addContextVar("qpid.jdbcstore.useBytesForBlob")).
addAttributeTransformer("jdbcBlobType", addContextVar("qpid.jdbcstore.blobType")).
addAttributeTransformer("jdbcVarbinaryType", addContextVar("qpid.jdbcstore.varBinaryType")).
addAttributeTransformer("partitionCount", addContextVar("qpid.jdbcstore.bonecp.partitionCount")).
addAttributeTransformer("maxConnectionsPerPartition", addContextVar("qpid.jdbcstore.bonecp.maxConnectionsPerPartition")).
addAttributeTransformer("minConnectionsPerPartition", addContextVar("qpid.jdbcstore.bonecp.minConnectionsPerPartition")));
put("BDB_HA", new AttributesTransformer().
addAttributeTransformer("id", copyAttribute()).
addAttributeTransformer("createdTime", copyAttribute()).
addAttributeTransformer("createdBy", copyAttribute()).
addAttributeTransformer("storePath", copyAttribute()).
addAttributeTransformer("storeUnderfullSize", copyAttribute()).
addAttributeTransformer("storeOverfullSize", copyAttribute()).
addAttributeTransformer("haNodeName", mutateAttributeName("name")).
addAttributeTransformer("haGroupName", mutateAttributeName("groupName")).
addAttributeTransformer("haHelperAddress", mutateAttributeName("helperAddress")).
addAttributeTransformer("haNodeAddress", mutateAttributeName("address")).
addAttributeTransformer("haDesignatedPrimary", mutateAttributeName("designatedPrimary")).
addAttributeTransformer("haReplicationConfig", mutateAttributeName("context")).
addAttributeTransformer("bdbEnvironmentConfig", mutateAttributeName("context")));
}};
public ConfiguredObjectRecord upgrade(ConfiguredObjectRecord vhost)
{
Map<String, Object> attributes = vhost.getAttributes();
String type = (String) attributes.get("type");
AttributesTransformer nodeAttributeTransformer = null;
if ("STANDARD".equalsIgnoreCase(type))
{
if (attributes.containsKey("configStoreType"))
{
throw new IllegalConfigurationException("Auto-upgrade of virtual host " + attributes.get("name")
+ " with split configuration and message store is not supported."
+ " Configuration store type is " + attributes.get("configStoreType") + " and message store type is "
+ attributes.get("storeType"));
}
else
{
type = (String) attributes.get("storeType");
}
}
if (type == null)
{
throw new IllegalConfigurationException("Cannot auto-upgrade virtual host with attributes: " + attributes);
}
type = getVirtualHostNodeType(type);
nodeAttributeTransformer = _messageStoreToNodeTransformers.get(type);
if (nodeAttributeTransformer == null)
{
throw new IllegalConfigurationException("Don't know how to perform an upgrade from version for virtualhost type " + type);
}
Map<String, Object> nodeAttributes = nodeAttributeTransformer.upgrade(attributes);
nodeAttributes.put("type", type);
return new ConfiguredObjectRecordImpl(vhost.getId(), "VirtualHostNode", nodeAttributes, vhost.getParents());
}
private String getVirtualHostNodeType(String type)
{
for (String t : _messageStoreToNodeTransformers.keySet())
{
if (type.equalsIgnoreCase(t))
{
return t;
}
}
return null;
}
}
private static class AttributesTransformer
{
private final Map<String, List<AttributeTransformer>> _transformers = new HashMap<String, List<AttributeTransformer>>();
public AttributesTransformer addAttributeTransformer(String string, AttributeTransformer... attributeTransformers)
{
_transformers.put(string, Arrays.asList(attributeTransformers));
return this;
}
public Map<String, Object> upgrade(Map<String, Object> attributes)
{
Map<String, Object> settings = new HashMap<>();
for (Map.Entry<String, List<AttributeTransformer>> entry : _transformers.entrySet())
{
String attributeName = entry.getKey();
if (attributes.containsKey(attributeName))
{
Object attributeValue = attributes.get(attributeName);
MutableEntry newEntry = new MutableEntry(attributeName, attributeValue);
List<AttributeTransformer> transformers = entry.getValue();
for (AttributeTransformer attributeTransformer : transformers)
{
newEntry = attributeTransformer.transform(newEntry);
if (newEntry == null)
{
break;
}
}
if (newEntry != null)
{
if (settings.get(newEntry.getKey()) instanceof Map && newEntry.getValue() instanceof Map)
{
final Map newMap = (Map)newEntry.getValue();
final Map mergedMap = new HashMap((Map) settings.get(newEntry.getKey()));
mergedMap.putAll(newMap);
settings.put(newEntry.getKey(), mergedMap);
}
else
{
settings.put(newEntry.getKey(), newEntry.getValue());
}
}
}
}
return settings;
}
}
private static AttributeTransformer copyAttribute()
{
return CopyAttribute.INSTANCE;
}
private static AttributeTransformer mutateAttributeName(String newName)
{
return new MutateAttributeName(newName);
}
private static AttributeTransformer addContextVar(String newName)
{
return new AddContextVar(newName);
}
private static interface AttributeTransformer
{
MutableEntry transform(MutableEntry entry);
}
private static class CopyAttribute implements AttributeTransformer
{
private static final CopyAttribute INSTANCE = new CopyAttribute();
private CopyAttribute()
{
}
@Override
public MutableEntry transform(MutableEntry entry)
{
return entry;
}
}
private static class AddContextVar implements AttributeTransformer
{
private final String _newName;
public AddContextVar(String newName)
{
_newName = newName;
}
@Override
public MutableEntry transform(MutableEntry entry)
{
return new MutableEntry("context", Collections.singletonMap(_newName, entry.getValue()));
}
}
private static class MutateAttributeName implements AttributeTransformer
{
private final String _newName;
public MutateAttributeName(String newName)
{
_newName = newName;
}
@Override
public MutableEntry transform(MutableEntry entry)
{
entry.setKey(_newName);
return entry;
}
}
private static class MutableEntry
{
private String _key;
private Object _value;
public MutableEntry(String key, Object value)
{
_key = key;
_value = value;
}
public String getKey()
{
return _key;
}
public void setKey(String key)
{
_key = key;
}
public Object getValue()
{
return _value;
}
}
private static ConfiguredObjectRecord createVirtualHostsRecordsFromBrokerRecordForModel_1_x(ConfiguredObjectRecord brokerRecord, StoreUpgraderPhase upgrader)
{
Map<String, Object> attributes = brokerRecord.getAttributes();
if (attributes.containsKey(VIRTUALHOSTS) && attributes.get(VIRTUALHOSTS) instanceof Collection)
{
Collection<?> virtualHosts = (Collection<?>)attributes.get(VIRTUALHOSTS);
for (Object virtualHost: virtualHosts)
{
if (virtualHost instanceof Map)
{
Map<String, Object> virtualHostAttributes = (Map)virtualHost;
if (virtualHostAttributes.containsKey("configPath"))
{
throw new IllegalConfigurationException("Auto-upgrade of virtual host " + attributes.get("name")
+ " having XML configuration is not supported. Virtual host configuration file is " + attributes.get("configPath"));
}
virtualHostAttributes = new HashMap<>(virtualHostAttributes);
Object nameAttribute = virtualHostAttributes.get("name");
Object idAttribute = virtualHostAttributes.remove("id");
UUID id;
if (idAttribute == null)
{
id = UUID.randomUUID();
}
else
{
if (idAttribute instanceof String)
{
id = UUID.fromString((String)idAttribute);
}
else if (idAttribute instanceof UUID)
{
id = (UUID)idAttribute;
}
else
{
throw new IllegalConfigurationException("Illegal ID value '" + idAttribute + "' for virtual host " + nameAttribute);
}
}
ConfiguredObjectRecord nodeRecord = new ConfiguredObjectRecordImpl(id, "VirtualHost", virtualHostAttributes, Collections.singletonMap("Broker", brokerRecord.getId()));
upgrader.getUpdateMap().put(nodeRecord.getId(), nodeRecord);
upgrader.configuredObject(nodeRecord);
}
}
attributes = new HashMap<>(attributes);
attributes.remove(VIRTUALHOSTS);
brokerRecord = new ConfiguredObjectRecordImpl(brokerRecord.getId(), brokerRecord.getType(), attributes, brokerRecord.getParents());
upgrader.getUpdateMap().put(brokerRecord.getId(), brokerRecord);
}
return brokerRecord;
}
@Override
public Broker<?> upgradeAndRecover(List<ConfiguredObjectRecord> records)
{
final DurableConfigurationStore store = _systemConfig.getConfigurationStore();
List<ConfiguredObjectRecord> upgradedRecords = upgrade(store, records);
new GenericRecoverer(_systemConfig).recover(upgradedRecords, false);
final StoreConfigurationChangeListener configChangeListener = new StoreConfigurationChangeListener(store);
applyRecursively(_systemConfig.getContainer(Broker.class), new RecursiveAction<ConfiguredObject<?>>()
{
@Override
public void performAction(final ConfiguredObject<?> object)
{
object.addChangeListener(configChangeListener);
}
@Override
public boolean applyToChildren(ConfiguredObject<?> object)
{
return !object.managesChildStorage();
}
});
return _systemConfig.getContainer(Broker.class);
}
public List<ConfiguredObjectRecord> upgrade(final DurableConfigurationStore dcs,
final List<ConfiguredObjectRecord> records)
{
return upgrade(dcs, records, Broker.class.getSimpleName(), Broker.MODEL_VERSION);
}
}