blob: 48b14e0bff9173f26792dacbaf7a3273a9c8f207 [file] [log] [blame]
/*
*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*
*/
package org.apache.qpid.server.store.berkeleydb;
import static org.mockito.Mockito.when;
import java.io.File;
import java.net.InetSocketAddress;
import java.net.ServerSocket;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
import com.sleepycat.je.Durability;
import com.sleepycat.je.EnvironmentConfig;
import com.sleepycat.je.rep.NoConsistencyRequiredPolicy;
import com.sleepycat.je.rep.ReplicatedEnvironment;
import com.sleepycat.je.rep.ReplicationConfig;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.qpid.server.configuration.IllegalConfigurationException;
import org.apache.qpid.server.model.AbstractConfigurationChangeListener;
import org.apache.qpid.server.model.ConfigurationChangeListener;
import org.apache.qpid.server.model.ConfiguredObject;
import org.apache.qpid.server.model.RemoteReplicationNode;
import org.apache.qpid.server.model.State;
import org.apache.qpid.server.model.VirtualHost;
import org.apache.qpid.server.store.DurableConfigurationStore;
import org.apache.qpid.server.store.berkeleydb.replication.DatabasePinger;
import org.apache.qpid.server.store.berkeleydb.replication.ReplicatedEnvironmentFacade;
import org.apache.qpid.server.virtualhost.berkeleydb.BDBHAVirtualHost;
import org.apache.qpid.server.virtualhost.berkeleydb.BDBHAVirtualHostImpl;
import org.apache.qpid.server.virtualhostnode.berkeleydb.BDBHARemoteReplicationNode;
import org.apache.qpid.server.virtualhostnode.berkeleydb.BDBHARemoteReplicationNodeImpl;
import org.apache.qpid.server.virtualhostnode.berkeleydb.BDBHAVirtualHostNode;
import org.apache.qpid.server.virtualhostnode.berkeleydb.BDBHAVirtualHostNodeTestHelper;
import org.apache.qpid.server.virtualhostnode.berkeleydb.NodeRole;
import org.apache.qpid.test.utils.PortHelper;
import org.apache.qpid.test.utils.QpidTestCase;
import org.apache.qpid.test.utils.TestFileUtils;
import org.apache.qpid.util.FileUtils;
public class BDBHAVirtualHostNodeTest extends QpidTestCase
{
private final static Logger LOGGER = LoggerFactory.getLogger(BDBHAVirtualHostNodeTest.class);
private BDBHAVirtualHostNodeTestHelper _helper;
private PortHelper _portHelper = new PortHelper();
@Override
protected void setUp() throws Exception
{
super.setUp();
_helper = new BDBHAVirtualHostNodeTestHelper(getTestName());
}
@Override
protected void tearDown() throws Exception
{
try
{
_helper.tearDown();
}
finally
{
super.tearDown();
}
_portHelper.waitUntilAllocatedPortsAreFree();
}
public void testCreateAndActivateVirtualHostNode() throws Exception
{
int node1PortNumber = _portHelper.getNextAvailable();
String helperAddress = "localhost:" + node1PortNumber;
String groupName = "group";
String nodeName = "node1";
Map<String, Object> attributes = _helper.createNodeAttributes(nodeName, groupName, helperAddress, helperAddress, nodeName, node1PortNumber);
String messageStorePath = (String)attributes.get(BDBHAVirtualHostNode.STORE_PATH);
String repStreamTimeout = "2 h";
Map<String,String> context = (Map<String,String>)attributes.get(BDBHAVirtualHostNode.CONTEXT);
context.put(ReplicationConfig.REP_STREAM_TIMEOUT, repStreamTimeout);
context.put(EnvironmentConfig.ENV_IS_TRANSACTIONAL, "false");
try
{
_helper.createHaVHN(attributes);
fail("Exception was not thrown.");
}
catch (RuntimeException e)
{
assertTrue("Unexpected Exception being thrown.", e.getCause() instanceof IllegalArgumentException);
}
context.put(EnvironmentConfig.ENV_IS_TRANSACTIONAL, "true");
BDBHAVirtualHostNode<?> node = _helper.createHaVHN(attributes);
node.start();
_helper.assertNodeRole(node, NodeRole.MASTER, NodeRole.REPLICA);
assertEquals("Unexpected node state", State.ACTIVE, node.getState());
DurableConfigurationStore store = node.getConfigurationStore();
assertNotNull(store);
BDBConfigurationStore bdbConfigurationStore = (BDBConfigurationStore) store;
ReplicatedEnvironmentFacade environmentFacade = (ReplicatedEnvironmentFacade) bdbConfigurationStore.getEnvironmentFacade();
assertEquals(nodeName, environmentFacade.getNodeName());
assertEquals(groupName, environmentFacade.getGroupName());
assertEquals(helperAddress, environmentFacade.getHostPort());
assertEquals(helperAddress, environmentFacade.getHelperHostPort());
assertEquals("SYNC,NO_SYNC,SIMPLE_MAJORITY", environmentFacade.getMessageStoreDurability().toString());
_helper.awaitForVirtualhost(node);
VirtualHost<?> virtualHost = node.getVirtualHost();
assertNotNull("Virtual host child was not added", virtualHost);
assertEquals("Unexpected virtual host name", groupName, virtualHost.getName());
assertEquals("Unexpected virtual host store", bdbConfigurationStore.getMessageStore(), virtualHost.getMessageStore());
assertEquals("Unexpected virtual host state", State.ACTIVE, virtualHost.getState());
node.stop();
assertEquals("Unexpected state returned after stop", State.STOPPED, node.getState());
assertEquals("Unexpected state", State.STOPPED, node.getState());
assertNull("Virtual host is not destroyed", node.getVirtualHost());
node.delete();
assertEquals("Unexpected state returned after delete", State.DELETED, node.getState());
assertEquals("Unexpected state", State.DELETED, node.getState());
assertFalse("Store still exists " + messageStorePath, new File(messageStorePath).exists());
}
public void testMutableAttributes() throws Exception
{
int node1PortNumber = _portHelper.getNextAvailable();
String helperAddress = "localhost:" + node1PortNumber;
String groupName = "group";
String nodeName = "node1";
Map<String, Object> attributes = _helper.createNodeAttributes(nodeName, groupName, helperAddress, helperAddress, nodeName, node1PortNumber);
BDBHAVirtualHostNode<?> node = _helper.createAndStartHaVHN(attributes);
assertEquals("Unexpected node priority value before mutation", 1, node.getPriority());
assertFalse("Unexpected designated primary value before mutation", node.isDesignatedPrimary());
assertEquals("Unexpected electable group override value before mutation", 0, node.getQuorumOverride());
Map<String, Object> update = new HashMap<>();
update.put(BDBHAVirtualHostNode.PRIORITY, 2);
update.put(BDBHAVirtualHostNode.DESIGNATED_PRIMARY, true);
update.put(BDBHAVirtualHostNode.QUORUM_OVERRIDE, 1);
node.setAttributes(update);
assertEquals("Unexpected node priority value after mutation", 2, node.getPriority());
assertTrue("Unexpected designated primary value after mutation", node.isDesignatedPrimary());
assertEquals("Unexpected electable group override value after mutation", 1, node.getQuorumOverride());
assertNotNull("Join time should be set", node.getJoinTime());
assertNotNull("Last known replication transaction id should be set", node.getLastKnownReplicationTransactionId());
}
public void testMutableAttributesAfterMajorityLost() throws Exception
{
int node1PortNumber = _portHelper.getNextAvailable();
int node2PortNumber = _portHelper.getNextAvailable();
int node3PortNumber = _portHelper.getNextAvailable();
String helperAddress = "localhost:" + node1PortNumber;
String groupName = "group";
String nodeName = "node1";
Map<String, Object> node1Attributes = _helper.createNodeAttributes(nodeName, groupName, helperAddress, helperAddress, nodeName, node1PortNumber, node2PortNumber, node3PortNumber);
BDBHAVirtualHostNode<?> node1 = _helper.createAndStartHaVHN(node1Attributes);
Map<String, Object> node2Attributes = _helper.createNodeAttributes("node2", groupName, "localhost:" + node2PortNumber, helperAddress, nodeName);
BDBHAVirtualHostNode<?> node2 = _helper.createAndStartHaVHN(node2Attributes);
Map<String, Object> node3Attributes = _helper.createNodeAttributes("node3", groupName, "localhost:" + node3PortNumber, helperAddress, nodeName);
BDBHAVirtualHostNode<?> node3 = _helper.createAndStartHaVHN(node3Attributes);
assertEquals("Unexpected node priority value before mutation", 1, node1.getPriority());
assertFalse("Unexpected designated primary value before mutation", node1.isDesignatedPrimary());
assertEquals("Unexpected electable group override value before mutation", 0, node1.getQuorumOverride());
node2.close();
node3.close();
_helper.assertNodeRole(node1, NodeRole.DETACHED);
Map<String, Object> attributes = new HashMap<>();
attributes.put(BDBHAVirtualHostNode.PRIORITY, 200);
attributes.put(BDBHAVirtualHostNode.DESIGNATED_PRIMARY, true);
attributes.put(BDBHAVirtualHostNode.QUORUM_OVERRIDE, 1);
node1.setAttributes(attributes);
_helper.awaitForVirtualhost(node1);
assertEquals("Unexpected node priority value after mutation", 200, node1.getPriority());
assertTrue("Unexpected designated primary value after mutation", node1.isDesignatedPrimary());
assertEquals("Unexpected electable group override value after mutation", 1, node1.getQuorumOverride());
}
public void testTransferMasterToSelf() throws Exception
{
int node1PortNumber = _portHelper.getNextAvailable();
int node2PortNumber = _portHelper.getNextAvailable();
int node3PortNumber = _portHelper.getNextAvailable();
String helperAddress = "localhost:" + node1PortNumber;
String groupName = "group";
String nodeName = "node1";
Map<String, Object> node1Attributes = _helper.createNodeAttributes(nodeName, groupName, helperAddress, helperAddress, nodeName, node1PortNumber, node2PortNumber, node3PortNumber);
_helper.createAndStartHaVHN(node1Attributes);
Map<String, Object> node2Attributes = _helper.createNodeAttributes("node2", groupName, "localhost:" + node2PortNumber, helperAddress, nodeName);
_helper.createAndStartHaVHN(node2Attributes);
Map<String, Object> node3Attributes = _helper.createNodeAttributes("node3", groupName, "localhost:" + node3PortNumber, helperAddress, nodeName);
_helper.createAndStartHaVHN(node3Attributes);
BDBHAVirtualHostNode<?> replica = _helper.awaitAndFindNodeInRole(NodeRole.REPLICA);
replica.setAttributes(Collections.<String, Object>singletonMap(BDBHAVirtualHostNode.ROLE, NodeRole.MASTER));
_helper.assertNodeRole(replica, NodeRole.MASTER);
}
public void testTransferMasterToRemoteReplica() throws Exception
{
int node1PortNumber = _portHelper.getNextAvailable();
int node2PortNumber = _portHelper.getNextAvailable();
int node3PortNumber = _portHelper.getNextAvailable();
String helperAddress = "localhost:" + node1PortNumber;
String groupName = "group";
String nodeName = "node1";
Map<String, Object> node1Attributes = _helper.createNodeAttributes(nodeName, groupName, helperAddress,
helperAddress, nodeName, node1PortNumber, node2PortNumber, node3PortNumber);
BDBHAVirtualHostNode<?> node1 = _helper.createAndStartHaVHN(node1Attributes);
final AtomicReference<RemoteReplicationNode<?>> lastSeenReplica = new AtomicReference<>();
final CountDownLatch remoteNodeLatch = new CountDownLatch(2);
node1.addChangeListener(new AbstractConfigurationChangeListener()
{
@Override
public void childAdded(ConfiguredObject<?> object, ConfiguredObject<?> child)
{
if (child instanceof RemoteReplicationNode)
{
remoteNodeLatch.countDown();
lastSeenReplica.set((RemoteReplicationNode<?>)child);
}
}
});
Map<String, Object> node2Attributes = _helper.createNodeAttributes("node2", groupName, "localhost:" + node2PortNumber, helperAddress, nodeName);
BDBHAVirtualHostNode<?> node2 = _helper.createAndStartHaVHN(node2Attributes);
Map<String, Object> node3Attributes = _helper.createNodeAttributes("node3", groupName, "localhost:" + node3PortNumber, helperAddress, nodeName);
BDBHAVirtualHostNode<?> node3 = _helper.createAndStartHaVHN(node3Attributes);
assertTrue("Replication nodes have not been seen during 5s", remoteNodeLatch.await(5, TimeUnit.SECONDS));
BDBHARemoteReplicationNodeImpl replicaRemoteNode = (BDBHARemoteReplicationNodeImpl)lastSeenReplica.get();
_helper.awaitForAttributeChange(replicaRemoteNode, BDBHARemoteReplicationNodeImpl.ROLE, NodeRole.REPLICA);
replicaRemoteNode.setAttributes(Collections.<String,Object>singletonMap(BDBHARemoteReplicationNode.ROLE, NodeRole.MASTER));
BDBHAVirtualHostNode<?> replica = replicaRemoteNode.getName().equals(node2.getName())? node2 : node3;
_helper.assertNodeRole(replica, NodeRole.MASTER);
}
public void testMutatingRoleWhenNotReplica_IsDisallowed() throws Exception
{
int nodePortNumber = _portHelper.getNextAvailable();
String helperAddress = "localhost:" + nodePortNumber;
String groupName = "group";
String nodeName = "node1";
Map<String, Object> node1Attributes = _helper.createNodeAttributes(nodeName, groupName, helperAddress, helperAddress, nodeName, nodePortNumber);
BDBHAVirtualHostNode<?> node = _helper.createAndStartHaVHN(node1Attributes);
_helper.assertNodeRole(node, NodeRole.MASTER);
try
{
node.setAttributes(Collections.<String,Object>singletonMap(BDBHAVirtualHostNode.ROLE, NodeRole.REPLICA));
fail("Role mutation should fail");
}
catch(IllegalStateException e)
{
// PASS
}
}
public void testRemoveReplicaNode() throws Exception
{
int node1PortNumber = _portHelper.getNextAvailable();
int node2PortNumber = _portHelper.getNextAvailable();
int node3PortNumber = _portHelper.getNextAvailable();
String helperAddress = "localhost:" + node1PortNumber;
String groupName = "group";
String nodeName = "node1";
assertTrue(_portHelper.isPortAvailable(node1PortNumber));
Map<String, Object> node1Attributes = _helper.createNodeAttributes(nodeName, groupName, helperAddress, helperAddress, nodeName, node1PortNumber, node2PortNumber, node3PortNumber);
_helper.createAndStartHaVHN(node1Attributes);
assertTrue(_portHelper.isPortAvailable(node2PortNumber));
Map<String, Object> node2Attributes = _helper.createNodeAttributes("node2", groupName, "localhost:" + node2PortNumber, helperAddress, nodeName);
_helper.createAndStartHaVHN(node2Attributes);
assertTrue(_portHelper.isPortAvailable(node3PortNumber));
Map<String, Object> node3Attributes = _helper.createNodeAttributes("node3", groupName, "localhost:" + node3PortNumber, helperAddress, nodeName);
_helper.createAndStartHaVHN(node3Attributes);
BDBHAVirtualHostNode<?> master = _helper.awaitAndFindNodeInRole(NodeRole.MASTER);
_helper.awaitRemoteNodes(master, 2);
BDBHAVirtualHostNode<?> replica = _helper.awaitAndFindNodeInRole(NodeRole.REPLICA);
_helper.awaitRemoteNodes(replica, 2);
assertNotNull("Remote node " + replica.getName() + " is not found", _helper.findRemoteNode(master, replica.getName()));
replica.delete();
_helper.awaitRemoteNodes(master, 1);
assertNull("Remote node " + replica.getName() + " is not found", _helper.findRemoteNode(master, replica.getName()));
}
public void testSetSynchronizationPolicyAttributesOnVirtualHost() throws Exception
{
int node1PortNumber = _portHelper.getNextAvailable();
String helperAddress = "localhost:" + node1PortNumber;
String groupName = "group";
String nodeName = "node1";
Map<String, Object> nodeAttributes = _helper.createNodeAttributes(nodeName, groupName, helperAddress, helperAddress, nodeName, node1PortNumber);
BDBHAVirtualHostNode<?> node = _helper.createHaVHN(nodeAttributes);
node.start();
_helper.assertNodeRole(node, NodeRole.MASTER, NodeRole.REPLICA);
assertEquals("Unexpected node state", State.ACTIVE, node.getState());
_helper.awaitForVirtualhost(node);
BDBHAVirtualHostImpl virtualHost = (BDBHAVirtualHostImpl)node.getVirtualHost();
assertNotNull("Virtual host is not created", virtualHost);
_helper.awaitForAttributeChange(virtualHost, BDBHAVirtualHostImpl.COALESCING_SYNC, true);
assertEquals("Unexpected local transaction synchronization policy", "SYNC", virtualHost.getLocalTransactionSynchronizationPolicy());
assertEquals("Unexpected remote transaction synchronization policy", "NO_SYNC", virtualHost.getRemoteTransactionSynchronizationPolicy());
assertTrue("CoalescingSync is not ON", virtualHost.isCoalescingSync());
Map<String, Object> virtualHostAttributes = new HashMap<String,Object>();
virtualHostAttributes.put(BDBHAVirtualHost.LOCAL_TRANSACTION_SYNCHRONIZATION_POLICY, "WRITE_NO_SYNC");
virtualHostAttributes.put(BDBHAVirtualHost.REMOTE_TRANSACTION_SYNCHRONIZATION_POLICY, "SYNC");
virtualHost.setAttributes(virtualHostAttributes);
virtualHost.stop();
virtualHost.start();
assertEquals("Unexpected local transaction synchronization policy", "WRITE_NO_SYNC", virtualHost.getLocalTransactionSynchronizationPolicy());
assertEquals("Unexpected remote transaction synchronization policy", "SYNC", virtualHost.getRemoteTransactionSynchronizationPolicy());
assertFalse("CoalescingSync is not OFF", virtualHost.isCoalescingSync());
try
{
virtualHost.setAttributes(Collections.<String, Object>singletonMap(BDBHAVirtualHost.LOCAL_TRANSACTION_SYNCHRONIZATION_POLICY, "INVALID"));
fail("Invalid synchronization policy is set");
}
catch(IllegalArgumentException e)
{
//pass
}
try
{
virtualHost.setAttributes(Collections.<String, Object>singletonMap(BDBHAVirtualHost.REMOTE_TRANSACTION_SYNCHRONIZATION_POLICY, "INVALID"));
fail("Invalid synchronization policy is set");
}
catch(IllegalArgumentException e)
{
//pass
}
}
public void testNotPermittedNodeIsNotAllowedToConnect() throws Exception
{
int node1PortNumber = _portHelper.getNextAvailable();
int node2PortNumber = _portHelper.getNextAvailable();
int node3PortNumber = _portHelper.getNextAvailable();
String helperAddress = "localhost:" + node1PortNumber;
String groupName = "group";
String nodeName = "node1";
Map<String, Object> node1Attributes = _helper.createNodeAttributes(nodeName, groupName, helperAddress, helperAddress, nodeName, node1PortNumber, node2PortNumber);
BDBHAVirtualHostNode<?> node1 = _helper.createAndStartHaVHN(node1Attributes);
Map<String, Object> node2Attributes = _helper.createNodeAttributes("node2", groupName, "localhost:" + node2PortNumber, helperAddress, nodeName);
BDBHAVirtualHostNode<?> node2 = _helper.createAndStartHaVHN(node2Attributes);
Map<String, Object> node3Attributes = _helper.createNodeAttributes("node3", groupName, "localhost:" + node3PortNumber, helperAddress, nodeName);
try
{
_helper.createHaVHN(node3Attributes);
fail("The VHN should not be permitted to join the group");
}
catch(IllegalConfigurationException e)
{
assertEquals("Unexpected exception message", String.format("Node from '%s' is not permitted!", "localhost:" + node3PortNumber), e.getMessage());
}
}
public void testCurrentNodeCannotBeRemovedFromPermittedNodeList() throws Exception
{
int node1PortNumber = _portHelper.getNextAvailable();
int node2PortNumber = _portHelper.getNextAvailable();
int node3PortNumber = _portHelper.getNextAvailable();
String node1Address = "localhost:" + node1PortNumber;
String node2Address = "localhost:" + node2PortNumber;
String node3Address = "localhost:" + node3PortNumber;
String groupName = "group";
String node1Name = "node1";
Map<String, Object> node1Attributes = _helper.createNodeAttributes(node1Name, groupName, node1Address, node1Address, node1Name, node1PortNumber, node2PortNumber, node3PortNumber);
BDBHAVirtualHostNode<?> node1 = _helper.createAndStartHaVHN(node1Attributes);
Map<String, Object> node2Attributes = _helper.createNodeAttributes("node2", groupName, node2Address, node1Address, node1Name);
BDBHAVirtualHostNode<?> node2 = _helper.createAndStartHaVHN(node2Attributes);
Map<String, Object> node3Attributes = _helper.createNodeAttributes("node3", groupName, node3Address, node1Address, node1Name);
BDBHAVirtualHostNode<?> node3 = _helper.createAndStartHaVHN(node3Attributes);
_helper.awaitRemoteNodes(node1, 2);
// Create new "proposed" permitted nodes list with a current node missing
List<String> amendedPermittedNodes = new ArrayList<String>();
amendedPermittedNodes.add(node1Address);
amendedPermittedNodes.add(node2Address);
// Try to update the permitted nodes attributes using the new list
try
{
node1.setAttributes(Collections.<String, Object>singletonMap(BDBHAVirtualHostNode.PERMITTED_NODES, amendedPermittedNodes));
fail("Operation to remove current group node from permitted nodes should have failed");
}
catch(IllegalArgumentException e)
{
assertEquals("Unexpected exception message", String.format("The current group node '%s' cannot be removed from '%s' as its already a group member", node3Address, BDBHAVirtualHostNode.PERMITTED_NODES), e.getMessage());
}
}
public void testPermittedNodesAttributeModificationConditions() throws Exception
{
int node1PortNumber = _portHelper.getNextAvailable();
int node2PortNumber = _portHelper.getNextAvailable();
int node3PortNumber = _portHelper.getNextAvailable();
int node4PortNumber = _portHelper.getNextAvailable();
int node5PortNumber = _portHelper.getNextAvailable();
String node1Address = "localhost:" + node1PortNumber;
String node2Address = "localhost:" + node2PortNumber;
String node3Address = "localhost:" + node3PortNumber;
String node4Address = "localhost:" + node4PortNumber;
String node5Address = "localhost:" + node5PortNumber;
String groupName = "group";
String node1Name = "node1";
Map<String, Object> node1Attributes = _helper.createNodeAttributes(node1Name, groupName, node1Address, node1Address, node1Name, node1PortNumber, node2PortNumber, node3PortNumber);
BDBHAVirtualHostNode<?> node1 = _helper.createAndStartHaVHN(node1Attributes);
Map<String, Object> node2Attributes = _helper.createNodeAttributes("node2", groupName, node2Address, node1Address, node1Name);
BDBHAVirtualHostNode<?> node2 = _helper.createAndStartHaVHN(node2Attributes);
Map<String, Object> node3Attributes = _helper.createNodeAttributes("node3", groupName, node3Address, node1Address, node1Name);
BDBHAVirtualHostNode<?> node3 = _helper.createAndStartHaVHN(node3Attributes);
_helper.awaitRemoteNodes(node1, 2);
// Create new "proposed" permitted nodes list for update
List<String> amendedPermittedNodes = new ArrayList<String>();
amendedPermittedNodes.add(node1Address);
amendedPermittedNodes.add(node2Address);
amendedPermittedNodes.add(node3Address);
amendedPermittedNodes.add(node4Address);
// Try to update the permitted nodes attributes using the new list on REPLICA - should fail
BDBHAVirtualHostNode<?> nonMasterNode = _helper.findNodeInRole(NodeRole.REPLICA);
try
{
nonMasterNode.setAttributes(Collections.<String, Object>singletonMap(BDBHAVirtualHostNode.PERMITTED_NODES, amendedPermittedNodes));
fail("Operation to update permitted nodes should have failed from non MASTER node");
}
catch(IllegalArgumentException e)
{
assertEquals("Unexpected exception message", String.format("Attribute '%s' can only be set on '%s' node or node in '%s' or '%s' state", BDBHAVirtualHostNode.PERMITTED_NODES, NodeRole.MASTER, State.STOPPED, State.ERRORED), e.getMessage());
}
// Try to update the permitted nodes attributes using the new list on MASTER - should succeed
BDBHAVirtualHostNode<?> masterNode = _helper.findNodeInRole(NodeRole.MASTER);
masterNode.setAttributes(Collections.<String, Object>singletonMap(BDBHAVirtualHostNode.PERMITTED_NODES, amendedPermittedNodes));
// Try to update the permitted nodes attributes using the new list on a STOPPED node - should succeed
nonMasterNode.stop();
amendedPermittedNodes.add(node5Address);
nonMasterNode.setAttributes(Collections.<String, Object>singletonMap(BDBHAVirtualHostNode.PERMITTED_NODES, amendedPermittedNodes));
}
public void testNodeCannotStartWithIntruder() throws Exception
{
int nodePortNumber = _portHelper.getNextAvailable();
int intruderPortNumber = _portHelper.getNextAvailable();
String helperAddress = "localhost:" + nodePortNumber;
String groupName = "group";
String nodeName = "node";
Map<String, Object> node1Attributes = _helper.createNodeAttributes(nodeName, groupName, helperAddress, helperAddress, nodeName, nodePortNumber);
BDBHAVirtualHostNode<?> node = _helper.createAndStartHaVHN(node1Attributes);
final CountDownLatch stopLatch = new CountDownLatch(1);
ConfigurationChangeListener listener = new AbstractConfigurationChangeListener()
{
@Override
public void stateChanged(ConfiguredObject<?> object, State oldState, State newState)
{
if (newState == State.ERRORED)
{
stopLatch.countDown();
}
}
};
node.addChangeListener(listener);
File environmentPathFile = new File(_helper.getMessageStorePath() + File.separator + "intruder");
Durability durability = Durability.parse((String) node1Attributes.get(BDBHAVirtualHostNode.DURABILITY));
joinIntruder(intruderPortNumber, "intruder", groupName, helperAddress, durability, environmentPathFile);
assertTrue("Intruder protection was not triggered during expected timeout", stopLatch.await(10, TimeUnit.SECONDS));
final CountDownLatch stateChangeLatch = new CountDownLatch(1);
final CountDownLatch roleChangeLatch = new CountDownLatch(1);
node.addChangeListener(new AbstractConfigurationChangeListener()
{
@Override
public void stateChanged(final ConfiguredObject<?> object, final State oldState, final State newState)
{
if (newState == State.ERRORED)
{
stateChangeLatch.countDown();
}
}
@Override
public void attributeSet(final ConfiguredObject<?> object,
final String attributeName,
final Object oldAttributeValue,
final Object newAttributeValue)
{
if (BDBHAVirtualHostNode.ROLE.equals(attributeName) && NodeRole.DETACHED.equals(NodeRole.DETACHED))
{
roleChangeLatch.countDown();
}
}
});
// Try top re start the ERRORED node and ensure exception is thrown
try
{
node.start();
fail("Restart of node should have thrown exception");
}
catch (IllegalStateException ise)
{
assertEquals("Unexpected exception when restarting node post intruder detection",
"Intruder node detected: " + "localhost:" + intruderPortNumber, ise.getMessage());
}
// verify that intruder detection is triggered after restart and environment is closed
assertTrue("Node state was not set to ERRORED", stateChangeLatch.await(10, TimeUnit.SECONDS));
assertTrue("Node role was not set to DETACHED", roleChangeLatch.await(10, TimeUnit.SECONDS));
}
public void testIntruderProtectionInManagementMode() throws Exception
{
int nodePortNumber = _portHelper.getNextAvailable();
int intruderPortNumber = _portHelper.getNextAvailable();
String helperAddress = "localhost:" + nodePortNumber;
String groupName = "group";
String nodeName = "node";
Map<String, Object> nodeAttributes = _helper.createNodeAttributes(nodeName,
groupName,
helperAddress,
helperAddress,
nodeName,
nodePortNumber);
BDBHAVirtualHostNode<?> node = _helper.createAndStartHaVHN(nodeAttributes);
final CountDownLatch stopLatch = new CountDownLatch(1);
ConfigurationChangeListener listener = new AbstractConfigurationChangeListener()
{
@Override
public void stateChanged(ConfiguredObject<?> object, State oldState, State newState)
{
if (newState == State.ERRORED)
{
stopLatch.countDown();
}
}
};
node.addChangeListener(listener);
File environmentPathFile = new File(_helper.getMessageStorePath() + File.separator + "intruder");
Durability durability = Durability.parse((String) nodeAttributes.get(BDBHAVirtualHostNode.DURABILITY));
joinIntruder(intruderPortNumber, "intruder", groupName, helperAddress, durability, environmentPathFile);
LOGGER.debug("Permitted and intruder nodes are created");
assertTrue("Intruder protection was not triggered during expected timeout",
stopLatch.await(10, TimeUnit.SECONDS));
LOGGER.debug("Master node transited into ERRORED state due to intruder protection");
when(_helper.getBroker().isManagementMode()).thenReturn(true);
LOGGER.debug("Starting node in management mode");
final CountDownLatch stateChangeLatch = new CountDownLatch(1);
final CountDownLatch roleChangeLatch = new CountDownLatch(1);
node.addChangeListener(new AbstractConfigurationChangeListener()
{
@Override
public void stateChanged(final ConfiguredObject<?> object, final State oldState, final State newState)
{
if (newState == State.ERRORED)
{
stateChangeLatch.countDown();
}
}
@Override
public void attributeSet(final ConfiguredObject<?> object,
final String attributeName,
final Object oldAttributeValue,
final Object newAttributeValue)
{
if (BDBHAVirtualHostNode.ROLE.equals(attributeName) && NodeRole.DETACHED.equals(NodeRole.DETACHED))
{
roleChangeLatch.countDown();
}
}
});
node.start();
LOGGER.debug("Node is started");
// verify that intruder detection is triggered after restart and environment is closed
assertTrue("Node state was not set to ERRORED", stateChangeLatch.await(10, TimeUnit.SECONDS));
assertTrue("Node role was not set to DETACHED", roleChangeLatch.await(10, TimeUnit.SECONDS));
}
public void testPermittedNodesChangedOnReplicaNodeOnlyOnceAfterBeingChangedOnMaster() throws Exception
{
int node1PortNumber = _portHelper.getNextAvailable();
int node2PortNumber = _portHelper.getNextAvailable();
String helperAddress = "localhost:" + node1PortNumber;
String groupName = "group";
String nodeName = "node1";
Map<String, Object> node1Attributes = _helper.createNodeAttributes(nodeName, groupName, helperAddress, helperAddress, nodeName, node1PortNumber, node2PortNumber);
BDBHAVirtualHostNode<?> node1 = _helper.createAndStartHaVHN(node1Attributes);
Map<String, Object> node2Attributes = _helper.createNodeAttributes("node2", groupName, "localhost:" + node2PortNumber, helperAddress, nodeName);
node2Attributes.put(BDBHAVirtualHostNode.PRIORITY, 0);
BDBHAVirtualHostNode<?> node2 = _helper.createAndStartHaVHN(node2Attributes);
assertEquals("Unexpected role", NodeRole.REPLICA, node2.getRole());
_helper.awaitRemoteNodes(node2, 1);
BDBHARemoteReplicationNode<?> remote = _helper.findRemoteNode(node2, node1.getName());
final AtomicInteger permittedNodesChangeCounter = new AtomicInteger();
final CountDownLatch _permittedNodesLatch = new CountDownLatch(1);
node2.addChangeListener(new AbstractConfigurationChangeListener()
{
@Override
public void attributeSet(ConfiguredObject<?> object, String attributeName, Object oldAttributeValue, Object newAttributeValue)
{
if (attributeName.equals(BDBHAVirtualHostNode.PERMITTED_NODES))
{
permittedNodesChangeCounter.incrementAndGet();
_permittedNodesLatch.countDown();
}
}
});
List<String> permittedNodes = new ArrayList<>(node1.getPermittedNodes());
permittedNodes.add("localhost:5000");
node1.setAttributes(Collections.<String, Object>singletonMap(BDBHAVirtualHostNode.PERMITTED_NODES, permittedNodes));
assertTrue("Permitted nodes were not changed on Replica", _permittedNodesLatch.await(10, TimeUnit.SECONDS));
assertEquals("Not the same permitted nodes", new HashSet<>(node1.getPermittedNodes()), new HashSet<>(node2.getPermittedNodes()));
assertEquals("Unexpected counter of changes permitted nodes", 1, permittedNodesChangeCounter.get());
// change the order of permitted nodes
Collections.swap(permittedNodes, 0, 2);
node1.setAttributes(Collections.<String, Object>singletonMap(BDBHAVirtualHostNode.PERMITTED_NODES, permittedNodes));
// make sure that node2 onNodeState was invoked by performing transaction on master and making sure that it was replicated
performTransactionAndAwaitForRemoteNodeToGetAware(node1, remote);
// perform transaction second time because permitted nodes are changed after last transaction id
performTransactionAndAwaitForRemoteNodeToGetAware(node1, remote);
assertEquals("Unexpected counter of changes permitted nodes", 1, permittedNodesChangeCounter.get());
}
private void performTransactionAndAwaitForRemoteNodeToGetAware(BDBHAVirtualHostNode<?> node1, BDBHARemoteReplicationNode<?> remote) throws InterruptedException
{
new DatabasePinger().pingDb(((BDBConfigurationStore)node1.getConfigurationStore()).getEnvironmentFacade());
int waitCounter = 100;
while ( remote.getLastKnownReplicationTransactionId() != node1.getLastKnownReplicationTransactionId() && (waitCounter--) != 0)
{
Thread.sleep(100l);
}
assertEquals("Last transaction was not replicated", new Long(remote.getLastKnownReplicationTransactionId()), node1.getLastKnownReplicationTransactionId() );
}
public void testIntruderConnected() throws Exception
{
int node1PortNumber = _portHelper.getNextAvailable();
int node2PortNumber = _portHelper.getNextAvailable();
String helperAddress = "localhost:" + node1PortNumber;
String groupName = "group";
String nodeName = "node1";
Map<String, Object> node1Attributes = _helper.createNodeAttributes(nodeName, groupName, helperAddress, helperAddress, nodeName, node1PortNumber);
BDBHAVirtualHostNode<?> node1 = _helper.createAndStartHaVHN(node1Attributes);
final CountDownLatch stopLatch = new CountDownLatch(1);
ConfigurationChangeListener listener = new AbstractConfigurationChangeListener()
{
@Override
public void stateChanged(ConfiguredObject<?> object, State oldState, State newState)
{
if (newState == State.ERRORED)
{
stopLatch.countDown();
}
}
};
node1.addChangeListener(listener);
String node2Name = "node2";
File environmentPathFile = new File(_helper.getMessageStorePath() + File.separator + node2Name);
Durability durability = Durability.parse((String) node1Attributes.get(BDBHAVirtualHostNode.DURABILITY));
joinIntruder(node2PortNumber, node2Name, groupName, helperAddress, durability, environmentPathFile);
assertTrue("Intruder protection was not triggered during expected timeout", stopLatch.await(20, TimeUnit.SECONDS));
}
private void joinIntruder(final int nodePortNumber,
final String nodeName,
final String groupName,
final String helperAddress,
final Durability durability,
final File environmentPathFile)
{
environmentPathFile.mkdirs();
ReplicationConfig replicationConfig = new ReplicationConfig(groupName, nodeName, "localhost:" + nodePortNumber );
replicationConfig.setNodePriority(0);
replicationConfig.setHelperHosts(helperAddress);
replicationConfig.setConsistencyPolicy(NoConsistencyRequiredPolicy.NO_CONSISTENCY);
EnvironmentConfig envConfig = new EnvironmentConfig();
envConfig.setAllowCreate(true);
envConfig.setTransactional(true);
envConfig.setDurability(durability);
ReplicatedEnvironment intruder = null;
String originalThreadName = Thread.currentThread().getName();
try
{
intruder = new ReplicatedEnvironment(environmentPathFile, replicationConfig, envConfig);
}
finally
{
try
{
if (intruder != null)
{
intruder.close();
}
}
finally
{
Thread.currentThread().setName(originalThreadName);
}
}
}
public void testValidateOnCreateForNonExistingHelperNode() throws Exception
{
int node1PortNumber = findFreePort();
int node2PortNumber = getNextAvailable(node1PortNumber + 1);
Map<String, Object> attributes = _helper.createNodeAttributes("node1", "group", "localhost:" + node1PortNumber,
"localhost:" + node2PortNumber, "node2", node1PortNumber, node1PortNumber, node2PortNumber);
try
{
_helper.createAndStartHaVHN(attributes);
fail("Node creation should fail because of invalid helper address");
}
catch(IllegalConfigurationException e)
{
assertEquals("Unexpected exception on connection to non-existing helper address",
String.format("Cannot connect to existing node '%s' at '%s'", "node2", "localhost:" + node2PortNumber), e.getMessage());
}
}
public void testValidateOnCreateForAlreadyBoundAddress() throws Exception
{
try(ServerSocket serverSocket = new ServerSocket())
{
serverSocket.setReuseAddress(true);
serverSocket.bind(new InetSocketAddress("localhost", 0));
int node1PortNumber = serverSocket.getLocalPort();
Map<String, Object> attributes = _helper.createNodeAttributes("node1", "group", "localhost:" + node1PortNumber,
"localhost:" + node1PortNumber, "node2", node1PortNumber, node1PortNumber);
try
{
_helper.createAndStartHaVHN(attributes);
fail("Node creation should fail because of invalid address");
}
catch(IllegalConfigurationException e)
{
assertEquals("Unexpected exception on attempt to create node with already bound address",
String.format("Cannot bind to address '%s'. Address is already in use.", "localhost:" + node1PortNumber), e.getMessage());
}
}
}
public void testValidateOnCreateForInvalidStorePath() throws Exception
{
int node1PortNumber = 0;
File storeBaseFolder = TestFileUtils.createTestDirectory();
File file = new File(storeBaseFolder, getTestName());
file.createNewFile();
File storePath = new File(file, "test");
try
{
Map<String, Object> attributes = _helper.createNodeAttributes("node1", "group", "localhost:" + node1PortNumber,
"localhost:" + node1PortNumber, "node2", node1PortNumber, node1PortNumber);
attributes.put(BDBHAVirtualHostNode.STORE_PATH, storePath.getAbsoluteFile());
try
{
_helper.createAndStartHaVHN(attributes);
fail("Node creation should fail because of invalid store path");
}
catch (IllegalConfigurationException e)
{
assertEquals("Unexpected exception on attempt to create environment in invalid location",
String.format("Store path '%s' is not a folder", storePath.getAbsoluteFile()), e.getMessage());
}
}
finally
{
FileUtils.delete(storeBaseFolder, true);
}
}
}