blob: 4473b40af6ccfffc2b36cfabf70111f8df0a7342 [file] [log] [blame]
/*
*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*
*/
package org.apache.qpid.server.store.berkeleydb.replication;
import static org.apache.qpid.systests.admin.SpawnBrokerAdmin.SYSTEST_PROPERTY_SPAWN_BROKER_STARTUP_TIME;
import java.lang.reflect.Method;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.Callable;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.base.Joiner;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.ListeningExecutorService;
import com.google.common.util.concurrent.MoreExecutors;
import com.sleepycat.je.rep.ReplicationConfig;
import org.apache.qpid.server.virtualhostnode.AbstractVirtualHostNode;
import org.apache.qpid.server.virtualhostnode.berkeleydb.BDBHARemoteReplicationNode;
import org.apache.qpid.server.virtualhostnode.berkeleydb.BDBHAVirtualHostNode;
import org.apache.qpid.server.virtualhostnode.berkeleydb.BDBHAVirtualHostNodeImpl;
import org.apache.qpid.systests.admin.BrokerAdminException;
import org.apache.qpid.systests.admin.SpawnBrokerAdmin;
import org.apache.qpid.test.utils.PortHelper;
import org.apache.qpid.tests.utils.BrokerAdmin;
public class GroupBrokerAdmin
{
private static final int WAIT_LIMIT = Integer.getInteger("qpid.test.ha.await", 10000);
private static final String AMQP_NODE_TYPE = "org.apache.qpid.VirtualHostNode";
private static final String AMQP_REMOTE_NODE_TYPE = "org.apache.qpid.server.model.RemoteReplicationNode";
private static final String ROLE_MASTER = "MASTER";
private static final String ROLE_REPLICA = "REPLICA";
private static final String NODE_TYPE = "BDB_HA";
private static final String HOST = "127.0.0.1";
private GroupMember[] _members;
private ListeningExecutorService _executorService;
private SpawnBrokerAdmin[] _brokers;
private String _groupName;
public void beforeTestClass(final Class testClass)
{
GroupConfig runBrokerAdmin = (GroupConfig) testClass.getAnnotation(GroupConfig.class);
int numberOfNodes = runBrokerAdmin == null ? 2 : runBrokerAdmin.numberOfNodes();
_groupName = runBrokerAdmin == null ? "test-ha" : runBrokerAdmin.groupName();
_executorService = MoreExecutors.listeningDecorator(Executors.newFixedThreadPool(numberOfNodes));
_brokers = Stream.generate(SpawnBrokerAdmin::new).limit(numberOfNodes).toArray(SpawnBrokerAdmin[]::new);
boolean started = false;
try
{
int startupTimeout = Integer.getInteger(SYSTEST_PROPERTY_SPAWN_BROKER_STARTUP_TIME, 30000);
awaitFuture(startupTimeout, invokeParallel(Arrays.stream(_brokers).map(a -> (Callable<Void>) () -> {
a.beforeTestClass(testClass);
return null;
}).collect(Collectors.toList())));
started = true;
}
finally
{
if (!started)
{
for (SpawnBrokerAdmin a : _brokers)
{
a.afterTestClass(testClass);
}
_executorService.shutdown();
_brokers = null;
_executorService = null;
}
}
}
public void beforeTestMethod(final Class testClass, final Method method)
{
_members = initializeGroupData(_groupName, _brokers);
GroupMember first = _members[0];
first.getAdmin().beforeTestMethod(_members[0].getName(), NODE_TYPE, _members[0].getNodeAttributes());
awaitNodeRoleReplicaOrMaster(first);
ListenableFuture<Void> f;
if (_members.length > 2)
{
f = invokeParallel(Arrays.stream(_members).skip(1).map(m -> (Callable<Void>) () -> {
m.getAdmin().beforeTestMethod(m.getName(), NODE_TYPE, m.getNodeAttributes());
return null;
}).collect(Collectors.toList()));
}
else
{
for (int i = 1; i < _members.length; i++)
{
_members[i].getAdmin()
.beforeTestMethod(_members[i].getName(), NODE_TYPE, _members[i].getNodeAttributes());
}
f = Futures.immediateFuture(null);
}
awaitFuture(WAIT_LIMIT, f);
awaitAllTransitionIntoReplicaOrMaster();
}
public void afterTestMethod(final Class testClass, final Method method)
{
if (_members != null)
{
awaitFuture(WAIT_LIMIT, invokeParallel(Arrays.stream(_members).map(m -> (Callable<Void>) () -> {
m.getAdmin().afterTestMethod(testClass, method);
return null;
}).collect(Collectors.toList())));
_members = null;
}
}
public void afterTestClass(final Class testClass)
{
try
{
if (_brokers != null)
{
awaitFuture(WAIT_LIMIT, invokeParallel(Arrays.stream(_brokers).map(m -> (Callable<Void>) () -> {
m.afterTestClass(testClass);
return null;
}).collect(Collectors.toList())));
_brokers = null;
}
}
finally
{
if (_executorService != null)
{
_executorService.shutdown();
}
}
}
public ListenableFuture<Void> restart()
{
awaitFuture(WAIT_LIMIT, invokeParallel(Arrays.stream(_members).map(m -> (Callable<Void>) () -> {
m.getAdmin().restart();
return null;
}).collect(Collectors.toList())));
awaitAllTransitionIntoReplicaOrMaster();
return Futures.immediateFuture(null);
}
public void stop()
{
awaitFuture(WAIT_LIMIT, invokeParallel(Arrays.stream(_members).map(m -> (Callable<Void>) () -> {
m.getAdmin().stop();
return null;
}).collect(Collectors.toList())));
}
public void start()
{
awaitFuture(WAIT_LIMIT, invokeParallel(Arrays.stream(_members).map(m -> (Callable<Void>) () -> {
m.getAdmin().start();
return null;
}).collect(Collectors.toList())));
awaitAllTransitionIntoReplicaOrMaster();
}
public void startNode(final int amqpPort)
{
GroupMember member = getMemberByAmqpPort(amqpPort);
member.getAdmin().start();
awaitNodeRole(amqpPort, ROLE_MASTER, ROLE_REPLICA);
}
public void stopNode(final int amqpPort)
{
GroupMember member = getMemberByAmqpPort(amqpPort);
member.getAdmin().stop();
}
public int[] getGroupAmqpPorts()
{
int[] ports = new int[_members.length];
int i = 0;
for (GroupMember m : _members)
{
ports[i++] = m.getAmqpPort();
}
return ports;
}
public int[] getBdbPorts()
{
int[] ports = new int[_members.length];
int i = 0;
for (GroupMember m : _members)
{
ports[i++] = m.getBdbPort();
}
return ports;
}
public int getAmqpPort(final int... exclude)
{
Set<Integer> excluded = Arrays.stream(exclude).boxed().collect(Collectors.toSet());
return Arrays.stream(_members)
.map(GroupMember::getAmqpPort)
.filter(p -> !excluded.contains(p))
.findFirst()
.orElseThrow(() -> new BrokerAdminException("Amqp Port not found"));
}
public String getHost()
{
return HOST;
}
public Map<Integer, String> groupThreadDumps()
{
Map<Integer, String> threadDumps = new HashMap<>();
for (GroupMember m : _members)
{
threadDumps.put(m._amqpPort, m.getAdmin().dumpThreads());
}
return threadDumps;
}
public String getHelperHostPort()
{
return HOST + ":" + _members[0].getBdbPort();
}
public Map<String, Object> getNodeAttributes(final int amqpPort)
{
GroupMember member = getMemberByAmqpPort(amqpPort);
return member.getAdmin().getAttributes(true, member.getName(), AMQP_NODE_TYPE);
}
public void setNodeAttributes(final int amqpPort, final Map<String, Object> attributes)
{
GroupMember member = getMemberByAmqpPort(amqpPort);
member.getAdmin().update(true, member.getName(), AMQP_NODE_TYPE, attributes);
}
public void setDesignatedPrimary(int brokerPort, boolean designatedPrimary)
{
setNodeAttributes(brokerPort, Collections.singletonMap(BDBHAVirtualHostNode.DESIGNATED_PRIMARY,
String.valueOf(designatedPrimary)));
}
public void awaitNodeRole(final int amqpPort, String... role)
{
awaitNodeToAttainAttributeValue(amqpPort, BDBHAVirtualHostNode.ROLE, (Object[]) role);
}
public Object awaitNodeToAttainAttributeValue(final int amqpPort,
final String attributeName,
final Object... attributeValue)
{
GroupMember member = getMemberByAmqpPort(amqpPort);
return member.getAdmin().awaitAttributeValue(WAIT_LIMIT,
true,
member.getName(),
AMQP_NODE_TYPE,
attributeName,
attributeValue);
}
public Map<String, Object> getRemoteNodeAttributes(final int amqpPort, final int remoteAmqpPort)
{
GroupMember member = getMemberByAmqpPort(amqpPort);
GroupMember member2 = getMemberByAmqpPort(remoteAmqpPort);
return member.getAdmin()
.getAttributes(true,
member.getName() + "/" + member2.getName(),
AMQP_REMOTE_NODE_TYPE);
}
public void setRemoteNodeAttributes(final int amqpPort,
final int remoteAmqpPort,
final Map<String, Object> attributes)
{
GroupMember member = getMemberByAmqpPort(amqpPort);
GroupMember member2 = getMemberByAmqpPort(remoteAmqpPort);
member.getAdmin()
.update(true,
member.getName() + "/" + member2.getName(),
AMQP_REMOTE_NODE_TYPE,
attributes);
}
public void awaitRemoteNodeRole(final int amqpPort, final int remoteAmqpPort, final String... role)
{
GroupMember member = getMemberByAmqpPort(amqpPort);
GroupMember member2 = getMemberByAmqpPort(remoteAmqpPort);
member.getAdmin().awaitAttributeValue(WAIT_LIMIT,
true,
member.getName() + "/" + member2.getName(),
AMQP_REMOTE_NODE_TYPE,
BDBHARemoteReplicationNode.ROLE,
ROLE_REPLICA,
ROLE_MASTER);
}
private SpawnBrokerAdmin getNodeAdmin(final int amqpPort)
{
GroupMember member = getMemberByAmqpPort(amqpPort);
return member.getAdmin();
}
private GroupMember[] initializeGroupData(final String groupName, final SpawnBrokerAdmin[] admins)
{
PortHelper helper = new PortHelper();
int[] ports = new int[admins.length];
String[] addresses = new String[admins.length];
int port = -1;
for (int i = 0; i < admins.length; i++)
{
port = i == 0 ? helper.getNextAvailable() : helper.getNextAvailable(port + 1);
addresses[i] = HOST + ":" + port;
ports[i] = port;
}
Map<String, String> context = new HashMap<>();
context.put(ReplicationConfig.INSUFFICIENT_REPLICAS_TIMEOUT, "2 s");
context.put(ReplicationConfig.ELECTIONS_PRIMARY_RETRIES, "0");
context.put(AbstractVirtualHostNode.VIRTUALHOST_BLUEPRINT_CONTEXT_VAR, "{\"type\":\"BDB_HA\"}");
String permitted = objectToJson(addresses);
String contextAsString = objectToJson(context);
GroupMember[] members = new GroupMember[admins.length];
for (int i = 0; i < admins.length; i++)
{
String nodeName = "node-" + ports[i];
Map<String, Object> nodeAttributes = new HashMap<>();
nodeAttributes.put(BDBHAVirtualHostNode.GROUP_NAME, groupName);
nodeAttributes.put(BDBHAVirtualHostNode.NAME, nodeName);
nodeAttributes.put(BDBHAVirtualHostNode.ADDRESS, addresses[i]);
nodeAttributes.put(BDBHAVirtualHostNode.TYPE, BDBHAVirtualHostNodeImpl.VIRTUAL_HOST_NODE_TYPE);
nodeAttributes.put(BDBHAVirtualHostNode.DEFAULT_VIRTUAL_HOST_NODE, true);
nodeAttributes.put(BDBHAVirtualHostNode.HELPER_ADDRESS, addresses[0]);
if (i > 0)
{
nodeAttributes.put(BDBHAVirtualHostNode.HELPER_NODE_NAME, "node-" + ports[0]);
}
nodeAttributes.put(BDBHAVirtualHostNode.PERMITTED_NODES, permitted);
nodeAttributes.put(BDBHAVirtualHostNode.CONTEXT, contextAsString);
members[i] = new GroupMember(nodeName,
admins[i].getBrokerAddress(BrokerAdmin.PortType.AMQP).getPort(),
port,
admins[i],
nodeAttributes);
}
return members;
}
GroupMember getMemberByAmqpPort(final int amqpPort)
{
return Arrays.stream(_members)
.filter(m -> m.getAmqpPort() == amqpPort)
.findFirst()
.orElseThrow(() -> new BrokerAdminException(
String.format("Could not find node by amqp port %d", amqpPort)));
}
private <T> ListenableFuture<T> invokeParallel(Collection<Callable<T>> tasks)
{
try
{
@SuppressWarnings("unchecked")
List<ListenableFuture<T>> futures = (List) _executorService.invokeAll(tasks);
ListenableFuture<List<T>> combinedFuture = Futures.allAsList(futures);
return Futures.transform(combinedFuture, input -> null, _executorService);
}
catch (InterruptedException e)
{
Thread.interrupted();
return Futures.immediateFailedFuture(e);
}
}
private <T> void awaitFuture(long waitLimit, ListenableFuture<T> future)
{
try
{
future.get(waitLimit, TimeUnit.MILLISECONDS);
}
catch (InterruptedException e)
{
Thread.interrupted();
throw new BrokerAdminException("Interrupted", e);
}
catch (ExecutionException e)
{
throw new BrokerAdminException("Operation failed", e.getCause());
}
catch (TimeoutException e)
{
throw new BrokerAdminException("Timeout");
}
}
private void awaitAllTransitionIntoReplicaOrMaster()
{
Map<String, Object> roles = new ConcurrentHashMap<>();
awaitFuture(WAIT_LIMIT, invokeParallel(Arrays.stream(_members).map(m -> (Callable<Void>) () -> {
Object role = awaitNodeRoleReplicaOrMaster(m);
roles.put(m.getName(), String.valueOf(role));
return null;
}).collect(Collectors.toList())));
if (roles.values().stream().noneMatch(role -> ROLE_MASTER.equals(role) || ROLE_REPLICA.equals(role)))
{
throw new BrokerAdminException("Unexpected node roles " + Joiner.on(", ").withKeyValueSeparator(" -> ")
.join(roles));
}
}
private Object awaitNodeRoleReplicaOrMaster(final GroupMember m)
{
return m.getAdmin().awaitAttributeValue(WAIT_LIMIT,
true,
m.getName(),
AMQP_NODE_TYPE,
BDBHAVirtualHostNode.ROLE,
ROLE_REPLICA,
ROLE_MASTER);
}
private String objectToJson(final Object object)
{
try
{
return new ObjectMapper().writeValueAsString(object);
}
catch (JsonProcessingException e)
{
throw new BrokerAdminException("Cannot convert object to json", e);
}
}
private class GroupMember
{
private final Map<String, Object> _nodeAttributes;
private final SpawnBrokerAdmin _admin;
private final int _bdbPort;
private final int _amqpPort;
private final String _name;
private GroupMember(final String name,
final int amqpPort,
final int bdbPort,
final SpawnBrokerAdmin admin,
final Map<String, Object> nodeAttributes)
{
_name = name;
_admin = admin;
_bdbPort = bdbPort;
_amqpPort = amqpPort;
_nodeAttributes = nodeAttributes;
}
private String getName()
{
return _name;
}
private SpawnBrokerAdmin getAdmin()
{
return _admin;
}
private int getBdbPort()
{
return _bdbPort;
}
private int getAmqpPort()
{
return _amqpPort;
}
private Map<String, Object> getNodeAttributes()
{
return _nodeAttributes;
}
@Override
public String toString()
{
return "GroupMember{" +
"name='" + _name + '\'' +
",bdbPort=" + _bdbPort +
",amqpPort=" + _amqpPort +
'}';
}
}
}