blob: 90047a95fbe6791a40c383e78091e909165ace8b [file] [log] [blame]
/*
*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*
*/
package org.apache.qpid.server.virtualhostnode.berkeleydb;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
import static org.mockito.Mockito.when;
import java.io.File;
import java.io.StringWriter;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.UUID;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.SerializationFeature;
import com.sleepycat.je.rep.ReplicationConfig;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.qpid.server.configuration.updater.TaskExecutor;
import org.apache.qpid.server.configuration.updater.TaskExecutorImpl;
import org.apache.qpid.server.model.Broker;
import org.apache.qpid.server.model.BrokerModel;
import org.apache.qpid.server.model.BrokerTestHelper;
import org.apache.qpid.server.model.ConfiguredObject;
import org.apache.qpid.server.model.ConfiguredObjectFactory;
import org.apache.qpid.server.model.RemoteReplicationNode;
import org.apache.qpid.server.model.State;
import org.apache.qpid.server.model.VirtualHost;
import org.apache.qpid.server.model.VirtualHostNode;
import org.apache.qpid.server.store.ConfiguredObjectRecordImpl;
import org.apache.qpid.server.store.UnresolvedConfiguredObject;
import org.apache.qpid.server.util.FileUtils;
import org.apache.qpid.server.virtualhost.berkeleydb.BDBHAVirtualHostImpl;
import org.apache.qpid.server.virtualhost.berkeleydb.BDBVirtualHost;
import org.apache.qpid.server.virtualhostnode.AbstractVirtualHostNode;
import org.apache.qpid.test.utils.UnitTestBase;
/**
* Helper class to make the tests of BDB HA Virtual Host Nodes simpler and more concise.
*/
public class BDBHAVirtualHostNodeTestHelper
{
private static final Logger LOGGER = LoggerFactory.getLogger(BDBHAVirtualHostNodeTestHelper.class);
private final String _testName;
private final Broker<?> _broker;
private final File _bdbStorePath;
private final TaskExecutor _taskExecutor;
private final ConfiguredObjectFactory _objectFactory = BrokerModel.getInstance().getObjectFactory();
private final Set<BDBHAVirtualHostNode<?>> _nodes = new HashSet<>();
private final int _numberOfSleeps;
private final int _sleepInterval;
private final int _waitForVirtualhostInterval;
public BDBHAVirtualHostNodeTestHelper(String testName) throws Exception
{
_testName = testName;
_broker = BrokerTestHelper.createBrokerMock();
_taskExecutor = new TaskExecutorImpl();
_taskExecutor.start();
when(_broker.getTaskExecutor()).thenReturn(_taskExecutor);
when(_broker.getChildExecutor()).thenReturn(_taskExecutor);
when(_broker.getContextValue(Long.class, BDBVirtualHost.QPID_BROKER_BDB_TOTAL_CACHE_SIZE)).thenReturn(BDBVirtualHost.BDB_MIN_CACHE_SIZE);
_bdbStorePath = new File(UnitTestBase.TMP_FOLDER, _testName + "." + System.currentTimeMillis());
_bdbStorePath.deleteOnExit();
_numberOfSleeps = Integer.getInteger("BDBHAVirtualHostNodeTestHelper.numberOfSleepsOnChangeWaiting", 50);
_sleepInterval = Integer.getInteger("BDBHAVirtualHostNodeTestHelper.sleepIntervalOnChangeWaiting", 100);
_waitForVirtualhostInterval = Integer.getInteger("BDBHAVirtualHostNodeTestHelper.waitForVirtualhostInterval", 30000);
}
public void tearDown() throws Exception
{
try
{
Exception firstException = null;
for (VirtualHostNode<?> node : _nodes)
{
try
{
node.delete();
}
catch(Exception e)
{
if (firstException != null)
{
firstException = e;
}
}
}
if (firstException != null)
{
throw firstException;
}
}
finally
{
if (_taskExecutor != null)
{
_taskExecutor.stopImmediately();
}
if (_bdbStorePath != null)
{
LOGGER.debug("Deleting '{}'", _bdbStorePath);
FileUtils.delete(_bdbStorePath, true);
}
}
}
public BDBHARemoteReplicationNode<?> findRemoteNode(BDBHAVirtualHostNode<?> node, String name)
{
for (RemoteReplicationNode<?> remoteNode : node.getRemoteReplicationNodes())
{
if (remoteNode.getName().equals(name))
{
return (BDBHARemoteReplicationNode<?>)remoteNode;
}
}
return null;
}
public void awaitRemoteNodes(BDBHAVirtualHostNode<?> node, int expectedNodeNumber) throws InterruptedException
{
int counter = 0;
@SuppressWarnings("rawtypes")
Collection<? extends RemoteReplicationNode> remoteNodes = null;
do
{
remoteNodes = node.getRemoteReplicationNodes();
if (counter > 0)
{
Thread.sleep(_sleepInterval);
}
counter++;
}
// TODO: 30 seconds is quite a lot to wait, we need to reduce this limit
while(remoteNodes.size() != expectedNodeNumber && counter<100);
assertEquals("Unexpected node number", expectedNodeNumber, node.getRemoteReplicationNodes().size());
}
public void awaitForAttributeChange(ConfiguredObject<?> object, String name, Object expectedValue) throws InterruptedException
{
int awaitCounter = 0;
while(!object.equals(object.getAttribute(name)) && awaitCounter < _numberOfSleeps)
{
Thread.sleep(100);
awaitCounter++;
}
assertEquals("Unexpected attribute " + name + " on " + object, expectedValue, object.getAttribute(name) );
}
public BDBHAVirtualHostNode<?> awaitAndFindNodeInRole(NodeRole desiredRole) throws InterruptedException
{
BDBHAVirtualHostNode<?> replica = null;
int findReplicaCount = 0;
while(replica == null)
{
replica = findNodeInRole(desiredRole);
if (replica == null)
{
Thread.sleep(_sleepInterval);
}
if (findReplicaCount > _numberOfSleeps)
{
fail("Could not find a node in role " + desiredRole);
}
findReplicaCount++;
}
return replica;
}
public BDBHAVirtualHostNode<?> findNodeInRole(NodeRole role)
{
for (BDBHAVirtualHostNode<?> node : _nodes)
{
if (role == node.getRole())
{
return node;
}
}
return null;
}
public BDBHAVirtualHostNode<?> createHaVHN(Map<String, Object> attributes)
{
@SuppressWarnings("unchecked")
BDBHAVirtualHostNode<?> node = (BDBHAVirtualHostNode<?>) _objectFactory.create(VirtualHostNode.class, attributes, _broker);
_nodes.add(node);
return node;
}
public BDBHAVirtualHostNode<?> recoverHaVHN(UUID id, Map<String, Object> attributes)
{
Map<String,UUID> parents = new HashMap<>();
parents.put(Broker.class.getSimpleName(),_broker.getId());
ConfiguredObjectRecordImpl record = new ConfiguredObjectRecordImpl(id, VirtualHostNode.class.getSimpleName(), attributes, parents );
@SuppressWarnings("unchecked")
UnresolvedConfiguredObject<BDBHAVirtualHostNodeImpl> unresolved = _objectFactory.recover(record, _broker);
BDBHAVirtualHostNode<?> node = unresolved.resolve();
node.open();
_nodes.add(node);
return node;
}
public void assertNodeRole(BDBHAVirtualHostNode<?> node, NodeRole... roleName) throws InterruptedException
{
int iterationCounter = 0;
boolean inRole =false;
do
{
for (NodeRole role : roleName)
{
if (role == node.getRole())
{
inRole = true;
break;
}
}
if (!inRole)
{
Thread.sleep(_sleepInterval);
}
iterationCounter++;
}
while(!inRole && iterationCounter < _numberOfSleeps);
assertTrue("Node " + node.getName() + " did not transit into role " + Arrays.toString(roleName)
+ " Node role is " + node.getRole(), inRole);
}
public BDBHAVirtualHostNode<?> createAndStartHaVHN(Map<String, Object> attributes) throws InterruptedException
{
BDBHAVirtualHostNode<?> node = createHaVHN(attributes);
return startNodeAndWait(node);
}
public BDBHAVirtualHostNode<?> startNodeAndWait(BDBHAVirtualHostNode<?> node) throws InterruptedException
{
node.start();
assertNodeRole(node, NodeRole.MASTER, NodeRole.REPLICA);
assertEquals("Unexpected node state", State.ACTIVE, node.getState());
return node;
}
public String getMessageStorePath()
{
return _bdbStorePath.getAbsolutePath();
}
public Broker getBroker()
{
return _broker;
}
public Map<String, Object> createNodeAttributes(String nodeName, String groupName, String address,
String helperAddress, String helperNodeNode, int... ports)
throws Exception
{
Map<String, Object> node1Attributes = new HashMap<>();
node1Attributes.put(BDBHAVirtualHostNode.ID, UUID.randomUUID());
node1Attributes.put(BDBHAVirtualHostNode.TYPE, BDBHAVirtualHostNodeImpl.VIRTUAL_HOST_NODE_TYPE);
node1Attributes.put(BDBHAVirtualHostNode.NAME, nodeName);
node1Attributes.put(BDBHAVirtualHostNode.GROUP_NAME, groupName);
node1Attributes.put(BDBHAVirtualHostNode.ADDRESS, address);
node1Attributes.put(BDBHAVirtualHostNode.HELPER_ADDRESS, helperAddress);
node1Attributes.put(BDBHAVirtualHostNode.STORE_PATH, getMessageStorePath() + File.separator + nodeName);
if (address.equals(helperAddress))
{
node1Attributes.put(BDBHAVirtualHostNode.PERMITTED_NODES, getPermittedNodes(ports));
}
else
{
node1Attributes.put(BDBHAVirtualHostNode.HELPER_NODE_NAME, helperNodeNode);
}
Map<String, String> context = new HashMap<>();
context.put(ReplicationConfig.REPLICA_ACK_TIMEOUT, "2 s");
context.put(ReplicationConfig.INSUFFICIENT_REPLICAS_TIMEOUT, "2 s");
if (ports != null)
{
String bluePrint = getBlueprint();
node1Attributes.put(AbstractVirtualHostNode.VIRTUALHOST_INITIAL_CONFIGURATION, bluePrint);
}
node1Attributes.put(BDBHAVirtualHostNode.CONTEXT, context);
return node1Attributes;
}
public static String getBlueprint() throws Exception
{
Map<String,Object> bluePrint = new HashMap<>();
bluePrint.put(VirtualHost.TYPE, BDBHAVirtualHostImpl.VIRTUAL_HOST_TYPE);
StringWriter writer = new StringWriter();
ObjectMapper mapper = new ObjectMapper();
mapper.configure(SerializationFeature.INDENT_OUTPUT, true);
mapper.writeValue(writer, bluePrint);
return writer.toString();
}
public static List<String> getPermittedNodes(int[] ports)
{
List<String> permittedNodes = new ArrayList<>();
for (int port:ports)
{
permittedNodes.add("localhost:" + port);
}
return permittedNodes;
}
public void awaitForVirtualhost(final VirtualHostNode<?> node)
{
long endTime = System.currentTimeMillis() + _waitForVirtualhostInterval;
do
{
if(node.getVirtualHost() != null)
{
return;
}
try
{
Thread.sleep(_sleepInterval);
}
catch (InterruptedException e)
{
// ignore
}
}
while(System.currentTimeMillis() < endTime);
}
}