blob: 4c2fa910f54da77961cd126dce701b456d43e2c4 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*
*/
package org.apache.qpid.server.store.berkeleydb;
import java.net.InetAddress;
import java.net.UnknownHostException;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Set;
import java.util.TreeMap;
import java.util.concurrent.Callable;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import javax.jms.Connection;
import org.apache.commons.configuration.XMLConfiguration;
import org.apache.commons.lang.StringUtils;
import org.apache.log4j.Logger;
import org.apache.qpid.client.AMQConnection;
import org.apache.qpid.client.AMQConnectionURL;
import org.apache.qpid.test.utils.TestBrokerConfiguration;
import org.apache.qpid.test.utils.QpidBrokerTestCase;
import org.apache.qpid.url.URLSyntaxException;
public class HATestClusterCreator
{
protected static final Logger LOGGER = Logger.getLogger(HATestClusterCreator.class);
private static final String MANY_BROKER_URL_FORMAT = "amqp://guest:guest@/%s?brokerlist='%s'&failover='roundrobin?cyclecount='%d''";
private static final String BROKER_PORTION_FORMAT = "tcp://localhost:%d?connectdelay='%d',retries='%d'";
private static final int FAILOVER_CYCLECOUNT = 10;
private static final int FAILOVER_RETRIES = 1;
private static final int FAILOVER_CONNECTDELAY = 1000;
private static final String SINGLE_BROKER_URL_WITH_RETRY_FORMAT = "amqp://guest:guest@/%s?brokerlist='tcp://localhost:%d?connectdelay='%d',retries='%d''";
private static final String SINGLE_BROKER_URL_WITHOUT_RETRY_FORMAT = "amqp://guest:guest@/%s?brokerlist='tcp://localhost:%d'";
private static final int RETRIES = 60;
private static final int CONNECTDELAY = 75;
private final QpidBrokerTestCase _testcase;
private final Map<Integer, Integer> _brokerPortToBdbPortMap = new HashMap<Integer, Integer>();
private final Map<Integer, BrokerConfigHolder> _brokerConfigurations = new TreeMap<Integer, BrokerConfigHolder>();
private final String _virtualHostName;
private final String _vhostStoreConfigKeyPrefix;
private final String _ipAddressOfBroker;
private final String _groupName ;
private final int _numberOfNodes;
private int _bdbHelperPort;
private int _primaryBrokerPort;
public HATestClusterCreator(QpidBrokerTestCase testcase, String virtualHostName, int numberOfNodes)
{
_testcase = testcase;
_virtualHostName = virtualHostName;
_groupName = "group" + _testcase.getName();
_ipAddressOfBroker = getIpAddressOfBrokerHost();
_numberOfNodes = numberOfNodes;
_vhostStoreConfigKeyPrefix = "virtualhosts.virtualhost." + _virtualHostName + ".store.";
_bdbHelperPort = 0;
}
public void configureClusterNodes() throws Exception
{
int brokerPort = _testcase.findFreePort();
for (int i = 0; i < _numberOfNodes; i++)
{
int bdbPort = _testcase.getNextAvailable(brokerPort + 1);
_brokerPortToBdbPortMap.put(brokerPort, bdbPort);
LOGGER.debug("Cluster broker port " + brokerPort + ", bdb replication port " + bdbPort);
if (_bdbHelperPort == 0)
{
_bdbHelperPort = bdbPort;
}
configureClusterNode(brokerPort, bdbPort);
TestBrokerConfiguration brokerConfiguration = _testcase.getBrokerConfiguration(brokerPort);
brokerConfiguration.addJmxManagementConfiguration();
collectConfig(brokerPort, brokerConfiguration, _testcase.getTestVirtualhosts());
brokerPort = _testcase.getNextAvailable(bdbPort + 1);
}
}
public void setDesignatedPrimaryOnFirstBroker(boolean designatedPrimary) throws Exception
{
if (_numberOfNodes != 2)
{
throw new IllegalArgumentException("Only two nodes groups have the concept of primary");
}
final Entry<Integer, BrokerConfigHolder> brokerConfigEntry = _brokerConfigurations.entrySet().iterator().next();
final String configKey = getConfigKey("highAvailability.designatedPrimary");
brokerConfigEntry.getValue().getTestVirtualhosts().setProperty(configKey, Boolean.toString(designatedPrimary));
_primaryBrokerPort = brokerConfigEntry.getKey();
}
/**
* @param configKeySuffix "highAvailability.designatedPrimary", for example
* @return "virtualhost.test.store.highAvailability.designatedPrimary", for example
*/
private String getConfigKey(String configKeySuffix)
{
final String configKey = StringUtils.substringAfter(_vhostStoreConfigKeyPrefix + configKeySuffix, "virtualhosts.");
return configKey;
}
public void startNode(final int brokerPortNumber) throws Exception
{
final BrokerConfigHolder brokerConfigHolder = _brokerConfigurations.get(brokerPortNumber);
_testcase.setTestVirtualhosts(brokerConfigHolder.getTestVirtualhosts());
_testcase.startBroker(brokerPortNumber);
}
public void startCluster() throws Exception
{
for (final Integer brokerPortNumber : _brokerConfigurations.keySet())
{
startNode(brokerPortNumber);
}
}
public void startClusterParallel() throws Exception
{
final ExecutorService executor = Executors.newFixedThreadPool(_brokerConfigurations.size());
try
{
List<Future<Object>> brokers = new CopyOnWriteArrayList<Future<Object>>();
for (final Integer brokerPortNumber : _brokerConfigurations.keySet())
{
final BrokerConfigHolder brokerConfigHolder = _brokerConfigurations.get(brokerPortNumber);
Future<Object> future = executor.submit(new Callable<Object>()
{
public Object call()
{
try
{
_testcase.startBroker(brokerPortNumber, brokerConfigHolder.getTestConfiguration(),
brokerConfigHolder.getTestVirtualhosts());
return "OK";
}
catch (Exception e)
{
return e;
}
}
});
brokers.add(future);
}
for (Future<Object> future : brokers)
{
Object result = future.get(30, TimeUnit.SECONDS);
LOGGER.debug("Node startup result:" + result);
if (result instanceof Exception)
{
throw (Exception) result;
}
else if (!"OK".equals(result))
{
throw new Exception("One of the cluster nodes is not started");
}
}
}
catch (Exception e)
{
stopCluster();
throw e;
}
finally
{
executor.shutdown();
}
}
public void stopNode(final int brokerPortNumber)
{
_testcase.killBroker(brokerPortNumber);
}
public void stopCluster() throws Exception
{
for (final Integer brokerPortNumber : _brokerConfigurations.keySet())
{
try
{
stopNode(brokerPortNumber);
}
catch(Exception e)
{
LOGGER.warn("Failed to stop node on port:" + brokerPortNumber);
}
}
}
public int getBrokerPortNumberFromConnection(Connection connection)
{
final AMQConnection amqConnection = (AMQConnection)connection;
return amqConnection.getActiveBrokerDetails().getPort();
}
public int getPortNumberOfAnInactiveBroker(final Connection activeConnection)
{
final Set<Integer> allBrokerPorts = _testcase.getBrokerPortNumbers();
LOGGER.debug("Broker ports:" + allBrokerPorts);
final int activeBrokerPort = getBrokerPortNumberFromConnection(activeConnection);
allBrokerPorts.remove(activeBrokerPort);
LOGGER.debug("Broker ports:" + allBrokerPorts);
final int inactiveBrokerPort = allBrokerPorts.iterator().next();
return inactiveBrokerPort;
}
public int getBdbPortForBrokerPort(final int brokerPortNumber)
{
return _brokerPortToBdbPortMap.get(brokerPortNumber);
}
public Set<Integer> getBdbPortNumbers()
{
return new HashSet<Integer>(_brokerPortToBdbPortMap.values());
}
public AMQConnectionURL getConnectionUrlForAllClusterNodes() throws Exception
{
final StringBuilder brokerList = new StringBuilder();
for(Iterator<Integer> itr = _brokerPortToBdbPortMap.keySet().iterator(); itr.hasNext(); )
{
int brokerPortNumber = itr.next();
brokerList.append(String.format(BROKER_PORTION_FORMAT, brokerPortNumber, FAILOVER_CONNECTDELAY, FAILOVER_RETRIES));
if (itr.hasNext())
{
brokerList.append(";");
}
}
return new AMQConnectionURL(String.format(MANY_BROKER_URL_FORMAT, _virtualHostName, brokerList, FAILOVER_CYCLECOUNT));
}
public AMQConnectionURL getConnectionUrlForSingleNodeWithoutRetry(final int brokerPortNumber) throws URLSyntaxException
{
return getConnectionUrlForSingleNode(brokerPortNumber, false);
}
public AMQConnectionURL getConnectionUrlForSingleNodeWithRetry(final int brokerPortNumber) throws URLSyntaxException
{
return getConnectionUrlForSingleNode(brokerPortNumber, true);
}
private AMQConnectionURL getConnectionUrlForSingleNode(final int brokerPortNumber, boolean retryAllowed) throws URLSyntaxException
{
final String url;
if (retryAllowed)
{
url = String.format(SINGLE_BROKER_URL_WITH_RETRY_FORMAT, _virtualHostName, brokerPortNumber, CONNECTDELAY, RETRIES);
}
else
{
url = String.format(SINGLE_BROKER_URL_WITHOUT_RETRY_FORMAT, _virtualHostName, brokerPortNumber);
}
return new AMQConnectionURL(url);
}
public String getGroupName()
{
return _groupName;
}
public String getNodeNameForNodeAt(final int bdbPort)
{
return "node" + _testcase.getName() + bdbPort;
}
public String getNodeHostPortForNodeAt(final int bdbPort)
{
return _ipAddressOfBroker + ":" + bdbPort;
}
public String getHelperHostPort()
{
if (_bdbHelperPort == 0)
{
throw new IllegalStateException("Helper port not yet assigned.");
}
return _ipAddressOfBroker + ":" + _bdbHelperPort;
}
public void setHelperHostPort(int bdbHelperPort)
{
_bdbHelperPort = bdbHelperPort;
}
public int getBrokerPortNumberOfPrimary()
{
if (_numberOfNodes != 2)
{
throw new IllegalArgumentException("Only two nodes groups have the concept of primary");
}
return _primaryBrokerPort;
}
public int getBrokerPortNumberOfSecondaryNode()
{
final Set<Integer> portNumbers = getBrokerPortNumbersForNodes();
portNumbers.remove(getBrokerPortNumberOfPrimary());
return portNumbers.iterator().next();
}
public Set<Integer> getBrokerPortNumbersForNodes()
{
return new HashSet<Integer>(_brokerConfigurations.keySet());
}
private void configureClusterNode(final int brokerPort, final int bdbPort) throws Exception
{
final String nodeName = getNodeNameForNodeAt(bdbPort);
_testcase.setVirtualHostConfigurationProperty(_vhostStoreConfigKeyPrefix + "class", "org.apache.qpid.server.store.berkeleydb.BDBHAMessageStore");
_testcase.setVirtualHostConfigurationProperty(_vhostStoreConfigKeyPrefix + "highAvailability.groupName", _groupName);
_testcase.setVirtualHostConfigurationProperty(_vhostStoreConfigKeyPrefix + "highAvailability.nodeName", nodeName);
_testcase.setVirtualHostConfigurationProperty(_vhostStoreConfigKeyPrefix + "highAvailability.nodeHostPort", getNodeHostPortForNodeAt(bdbPort));
_testcase.setVirtualHostConfigurationProperty(_vhostStoreConfigKeyPrefix + "highAvailability.helperHostPort", getHelperHostPort());
}
public String getIpAddressOfBrokerHost()
{
String brokerHost = _testcase.getBroker().getHost();
try
{
return InetAddress.getByName(brokerHost).getHostAddress();
}
catch (UnknownHostException e)
{
throw new RuntimeException("Could not determine IP address of host : " + brokerHost, e);
}
}
private void collectConfig(final int brokerPortNumber, TestBrokerConfiguration testConfiguration, XMLConfiguration testVirtualhosts)
{
_brokerConfigurations.put(brokerPortNumber, new BrokerConfigHolder(testConfiguration,
(XMLConfiguration) testVirtualhosts.clone()));
}
public class BrokerConfigHolder
{
private final TestBrokerConfiguration _testConfiguration;
private final XMLConfiguration _testVirtualhosts;
public BrokerConfigHolder(TestBrokerConfiguration testConfiguration, XMLConfiguration testVirtualhosts)
{
_testConfiguration = testConfiguration;
_testVirtualhosts = testVirtualhosts;
}
public TestBrokerConfiguration getTestConfiguration()
{
return _testConfiguration;
}
public XMLConfiguration getTestVirtualhosts()
{
return _testVirtualhosts;
}
}
public void modifyClusterNodeBdbAddress(int brokerPortNumberToBeMoved, int newBdbPort)
{
final BrokerConfigHolder brokerConfigHolder = _brokerConfigurations.get(brokerPortNumberToBeMoved);
final XMLConfiguration virtualHostConfig = brokerConfigHolder.getTestVirtualhosts();
final String configKey = getConfigKey("highAvailability.nodeHostPort");
final String oldBdbHostPort = virtualHostConfig.getString(configKey);
final String[] oldHostAndPort = StringUtils.split(oldBdbHostPort, ":");
final String oldHost = oldHostAndPort[0];
final String newBdbHostPort = oldHost + ":" + newBdbPort;
virtualHostConfig.setProperty(configKey, newBdbHostPort);
collectConfig(brokerPortNumberToBeMoved, brokerConfigHolder.getTestConfiguration(), virtualHostConfig);
}
public String getStoreConfigKeyPrefix()
{
return _vhostStoreConfigKeyPrefix;
}
}