blob: ff47ed958d5837545a425ef0e247eee7e0d9a462 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*
*/
package org.apache.qpid.server.store.berkeleydb;
import static com.sleepycat.je.rep.ReplicatedEnvironment.State.DETACHED;
import static com.sleepycat.je.rep.ReplicatedEnvironment.State.MASTER;
import static com.sleepycat.je.rep.ReplicatedEnvironment.State.REPLICA;
import static com.sleepycat.je.rep.ReplicatedEnvironment.State.UNKNOWN;
import java.util.Arrays;
import java.util.HashSet;
import java.util.Iterator;
import java.util.Set;
import javax.jms.Connection;
import javax.management.ObjectName;
import javax.management.openmbean.CompositeData;
import javax.management.openmbean.TabularData;
import org.apache.log4j.Logger;
import org.apache.qpid.jms.ConnectionURL;
import org.apache.qpid.management.common.mbeans.ManagedBroker;
import org.apache.qpid.server.store.berkeleydb.jmx.ManagedBDBHAMessageStore;
import org.apache.qpid.test.utils.JMXTestUtils;
import org.apache.qpid.test.utils.QpidBrokerTestCase;
import com.sleepycat.je.EnvironmentFailureException;
/**
* System test verifying the ability to control a cluster via the Management API.
*
* @see HAClusterBlackboxTest
*/
public class HAClusterManagementTest extends QpidBrokerTestCase
{
protected static final Logger LOGGER = Logger.getLogger(HAClusterManagementTest.class);
private static final Set<String> NON_MASTER_STATES = new HashSet<String>(Arrays.asList(REPLICA.toString(), DETACHED.toString(), UNKNOWN.toString()));;
private static final String VIRTUAL_HOST = "test";
private static final String MANAGED_OBJECT_QUERY = "org.apache.qpid:type=BDBHAMessageStore,name=" + ObjectName.quote(VIRTUAL_HOST);
private static final int NUMBER_OF_NODES = 4;
private final HATestClusterCreator _clusterCreator = new HATestClusterCreator(this, VIRTUAL_HOST, NUMBER_OF_NODES);
private final JMXTestUtils _jmxUtils = new JMXTestUtils(this);
private ConnectionURL _brokerFailoverUrl;
@Override
protected void setUp() throws Exception
{
_brokerType = BrokerType.SPAWNED;
_clusterCreator.configureClusterNodes();
_brokerFailoverUrl = _clusterCreator.getConnectionUrlForAllClusterNodes();
_clusterCreator.startCluster();
super.setUp();
}
@Override
protected void tearDown() throws Exception
{
try
{
_jmxUtils.close();
}
finally
{
super.tearDown();
}
}
@Override
public void startBroker() throws Exception
{
// Don't start default broker provided by QBTC.
}
public void testReadonlyMBeanAttributes() throws Exception
{
final int brokerPortNumber = getBrokerPortNumbers().iterator().next();
final int bdbPortNumber = _clusterCreator.getBdbPortForBrokerPort(brokerPortNumber);
ManagedBDBHAMessageStore storeBean = getStoreBeanForNodeAtBrokerPort(brokerPortNumber);
assertEquals("Unexpected store group name", _clusterCreator.getGroupName(), storeBean.getGroupName());
assertEquals("Unexpected store node name", _clusterCreator.getNodeNameForNodeAt(bdbPortNumber), storeBean.getNodeName());
assertEquals("Unexpected store node host port",_clusterCreator.getNodeHostPortForNodeAt(bdbPortNumber), storeBean.getNodeHostPort());
assertEquals("Unexpected store helper host port", _clusterCreator.getHelperHostPort(), storeBean.getHelperHostPort());
// As we have chosen an arbitrary broker from the cluster, we cannot predict its state
assertNotNull("Store state must not be null", storeBean.getNodeState());
}
public void testStateOfActiveBrokerIsMaster() throws Exception
{
final Connection activeConnection = getConnection(_brokerFailoverUrl);
final int activeBrokerPortNumber = _clusterCreator.getBrokerPortNumberFromConnection(activeConnection);
ManagedBDBHAMessageStore storeBean = getStoreBeanForNodeAtBrokerPort(activeBrokerPortNumber);
assertEquals("Unexpected store state", MASTER.toString(), storeBean.getNodeState());
}
public void testStateOfNonActiveBrokerIsNotMaster() throws Exception
{
final Connection activeConnection = getConnection(_brokerFailoverUrl);
final int inactiveBrokerPortNumber = _clusterCreator.getPortNumberOfAnInactiveBroker(activeConnection);
ManagedBDBHAMessageStore storeBean = getStoreBeanForNodeAtBrokerPort(inactiveBrokerPortNumber);
final String nodeState = storeBean.getNodeState();
assertTrue("Unexpected store state : " + nodeState, NON_MASTER_STATES.contains(nodeState));
}
public void testGroupMembers() throws Exception
{
final int brokerPortNumber = getBrokerPortNumbers().iterator().next();
ManagedBDBHAMessageStore storeBean = getStoreBeanForNodeAtBrokerPort(brokerPortNumber);
awaitAllNodesJoiningGroup(storeBean, NUMBER_OF_NODES);
final TabularData groupMembers = storeBean.getAllNodesInGroup();
assertNotNull(groupMembers);
for(int bdbPortNumber : _clusterCreator.getBdbPortNumbers())
{
final String nodeName = _clusterCreator.getNodeNameForNodeAt(bdbPortNumber);
final String nodeHostPort = _clusterCreator.getNodeHostPortForNodeAt(bdbPortNumber);
CompositeData row = groupMembers.get(new Object[] {nodeName});
assertNotNull("Table does not contain row for node name " + nodeName, row);
assertEquals(nodeHostPort, row.get(BDBHAMessageStore.GRP_MEM_COL_NODE_HOST_PORT));
}
}
public void testRemoveNodeFromGroup() throws Exception
{
final Iterator<Integer> brokerPortNumberIterator = getBrokerPortNumbers().iterator();
final int brokerPortNumberToMakeObservation = brokerPortNumberIterator.next();
final int brokerPortNumberToBeRemoved = brokerPortNumberIterator.next();
final ManagedBDBHAMessageStore storeBean = getStoreBeanForNodeAtBrokerPort(brokerPortNumberToMakeObservation);
awaitAllNodesJoiningGroup(storeBean, NUMBER_OF_NODES);
final String removedNodeName = _clusterCreator.getNodeNameForNodeAt(_clusterCreator.getBdbPortForBrokerPort(brokerPortNumberToBeRemoved));
_clusterCreator.stopNode(brokerPortNumberToBeRemoved);
storeBean.removeNodeFromGroup(removedNodeName);
final int numberOfDataRowsAfterRemoval = storeBean.getAllNodesInGroup().size();
assertEquals("Unexpected number of data rows before test", NUMBER_OF_NODES - 1,numberOfDataRowsAfterRemoval);
}
/**
* Updates the address of a node.
*
* If the broker (node) can subsequently start without error then the update was a success, hence no need for an explicit
* assert.
*
* @see #testRestartNodeWithNewPortNumberWithoutFirstCallingUpdateAddressThrowsAnException() for converse case
*/
public void testUpdateAddress() throws Exception
{
final Iterator<Integer> brokerPortNumberIterator = getBrokerPortNumbers().iterator();
final int brokerPortNumberToPerformUpdate = brokerPortNumberIterator.next();
final int brokerPortNumberToBeMoved = brokerPortNumberIterator.next();
final ManagedBDBHAMessageStore storeBean = getStoreBeanForNodeAtBrokerPort(brokerPortNumberToPerformUpdate);
_clusterCreator.stopNode(brokerPortNumberToBeMoved);
final int oldBdbPort = _clusterCreator.getBdbPortForBrokerPort(brokerPortNumberToBeMoved);
final int newBdbPort = getNextAvailable(oldBdbPort + 1);
storeBean.updateAddress(_clusterCreator.getNodeNameForNodeAt(oldBdbPort), _clusterCreator.getIpAddressOfBrokerHost(), newBdbPort);
_clusterCreator.modifyClusterNodeBdbAddress(brokerPortNumberToBeMoved, newBdbPort);
_clusterCreator.startNode(brokerPortNumberToBeMoved);
}
/**
* @see #testUpdateAddress()
*/
public void testRestartNodeWithNewPortNumberWithoutFirstCallingUpdateAddressThrowsAnException() throws Exception
{
final Iterator<Integer> brokerPortNumberIterator = getBrokerPortNumbers().iterator();
final int brokerPortNumberToBeMoved = brokerPortNumberIterator.next();
_clusterCreator.stopNode(brokerPortNumberToBeMoved);
final int oldBdbPort = _clusterCreator.getBdbPortForBrokerPort(brokerPortNumberToBeMoved);
final int newBdbPort = getNextAvailable(oldBdbPort + 1);
// now deliberately don't call updateAddress
_clusterCreator.modifyClusterNodeBdbAddress(brokerPortNumberToBeMoved, newBdbPort);
try
{
_clusterCreator.startNode(brokerPortNumberToBeMoved);
fail("Exception not thrown");
}
catch(RuntimeException rte)
{
//check cause was BDBs EnvironmentFailureException
assertTrue(rte.getMessage().contains(EnvironmentFailureException.class.getName()));
// PASS
}
}
public void testVirtualHostOperationsDeniedForNonMasterNode() throws Exception
{
final Connection activeConnection = getConnection(_brokerFailoverUrl);
final int inactiveBrokerPortNumber = _clusterCreator.getPortNumberOfAnInactiveBroker(activeConnection);
ManagedBroker inactiveBroker = getManagedBrokerBeanForNodeAtBrokerPort(inactiveBrokerPortNumber);
try
{
inactiveBroker.createNewQueue(getTestQueueName(), null, true);
fail("Exception not thrown");
}
catch (Exception e)
{
String message = e.getMessage();
assertEquals(message, "The virtual hosts state of INITIALISING does not permit this operation.");
}
try
{
inactiveBroker.createNewExchange(getName(), "direct", true);
fail("Exception not thrown");
}
catch (Exception e)
{
String message = e.getMessage();
assertEquals(message, "The virtual hosts state of INITIALISING does not permit this operation.");
}
}
private ManagedBDBHAMessageStore getStoreBeanForNodeAtBrokerPort(final int brokerPortNumber) throws Exception
{
_jmxUtils.open(brokerPortNumber);
return _jmxUtils.getManagedObject(ManagedBDBHAMessageStore.class, MANAGED_OBJECT_QUERY);
}
private ManagedBroker getManagedBrokerBeanForNodeAtBrokerPort(final int brokerPortNumber) throws Exception
{
_jmxUtils.open(brokerPortNumber);
return _jmxUtils.getManagedBroker(VIRTUAL_HOST);
}
private void awaitAllNodesJoiningGroup(ManagedBDBHAMessageStore storeBean, int expectedNumberOfNodes) throws Exception
{
long totalTimeWaited = 0l;
long waitInterval = 100l;
long maxWaitTime = 10000;
int currentNumberOfNodes = storeBean.getAllNodesInGroup().size();
while (expectedNumberOfNodes > currentNumberOfNodes || totalTimeWaited > maxWaitTime)
{
LOGGER.debug("Still awaiting nodes to join group; expecting "
+ expectedNumberOfNodes + " node(s) but only have " + currentNumberOfNodes
+ " after " + totalTimeWaited + " ms.");
totalTimeWaited += waitInterval;
Thread.sleep(waitInterval);
currentNumberOfNodes = storeBean.getAllNodesInGroup().size();
}
assertEquals("Unexpected number of nodes in group after " + totalTimeWaited + " ms",
expectedNumberOfNodes ,currentNumberOfNodes);
}
}