| /* |
| * |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, |
| * software distributed under the License is distributed on an |
| * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| * KIND, either express or implied. See the License for the |
| * specific language governing permissions and limitations |
| * under the License. |
| * |
| */ |
| package org.apache.qpid.server.store.berkeleydb.replication; |
| |
| import java.io.File; |
| import java.io.IOException; |
| import java.util.ArrayList; |
| import java.util.Arrays; |
| import java.util.Collections; |
| import java.util.HashMap; |
| import java.util.List; |
| import java.util.Map; |
| |
| import javax.servlet.http.HttpServletResponse; |
| |
| import com.sleepycat.je.Durability; |
| import com.sleepycat.je.EnvironmentConfig; |
| import com.sleepycat.je.rep.NoConsistencyRequiredPolicy; |
| import com.sleepycat.je.rep.ReplicatedEnvironment; |
| import com.sleepycat.je.rep.ReplicationConfig; |
| |
| import org.apache.qpid.server.management.plugin.servlet.rest.AbstractServlet; |
| import org.apache.qpid.server.model.RemoteReplicationNode; |
| import org.apache.qpid.server.model.State; |
| import org.apache.qpid.server.model.VirtualHost; |
| import org.apache.qpid.server.model.VirtualHostNode; |
| import org.apache.qpid.server.virtualhost.berkeleydb.BDBHAVirtualHost; |
| import org.apache.qpid.server.virtualhostnode.AbstractVirtualHostNode; |
| import org.apache.qpid.server.virtualhostnode.berkeleydb.BDBHARemoteReplicationNode; |
| import org.apache.qpid.server.virtualhostnode.berkeleydb.BDBHAVirtualHostNode; |
| import org.apache.qpid.server.virtualhostnode.berkeleydb.BDBHAVirtualHostNodeImpl; |
| import org.apache.qpid.systest.rest.Asserts; |
| import org.apache.qpid.systest.rest.QpidRestTestCase; |
| import org.apache.qpid.test.utils.TestBrokerConfiguration; |
| |
| public class BDBHAVirtualHostNodeRestTest extends QpidRestTestCase |
| { |
| private static final String NODE1 = "node1"; |
| private static final String NODE2 = "node2"; |
| private static final String NODE3 = "node3"; |
| |
| private int _node1HaPort; |
| private int _node2HaPort; |
| private int _node3HaPort; |
| |
| private String _hostName; |
| private String _baseNodeRestUrl; |
| |
| @Override |
| public void setUp() throws Exception |
| { |
| setTestSystemProperty(ReplicatedEnvironmentFacade.REMOTE_NODE_MONITOR_INTERVAL_PROPERTY_NAME, "1000"); |
| setTestSystemProperty(ReplicatedEnvironmentFacade.REMOTE_NODE_MONITOR_TIMEOUT_PROPERTY_NAME, "1000"); |
| |
| super.setUp(); |
| _hostName = getTestName(); |
| _baseNodeRestUrl = "virtualhostnode/"; |
| |
| _node1HaPort = findFreePort(); |
| _node2HaPort = getNextAvailable(_node1HaPort + 1); |
| _node3HaPort = getNextAvailable(_node2HaPort + 1); |
| |
| |
| } |
| |
| @Override |
| protected void customizeConfiguration() throws Exception |
| { |
| super.customizeConfiguration(); |
| TestBrokerConfiguration config = getDefaultBrokerConfiguration(); |
| config.removeObjectConfiguration(VirtualHostNode.class, TEST2_VIRTUALHOST); |
| config.removeObjectConfiguration(VirtualHostNode.class, TEST3_VIRTUALHOST); |
| } |
| |
| public void testCreate3NodeGroup() throws Exception |
| { |
| createHANode(NODE1, _node1HaPort, _node1HaPort); |
| assertNode(NODE1, _node1HaPort, _node1HaPort, NODE1); |
| createHANode(NODE2, _node2HaPort, _node1HaPort); |
| assertNode(NODE2, _node2HaPort, _node1HaPort, NODE1); |
| createHANode(NODE3, _node3HaPort, _node1HaPort); |
| assertNode(NODE3, _node3HaPort, _node1HaPort, NODE1); |
| assertRemoteNodes(NODE1, NODE2, NODE3); |
| } |
| |
| public void testMutateStateOfOneNode() throws Exception |
| { |
| createHANode(NODE1, _node1HaPort, _node1HaPort); |
| createHANode(NODE2, _node2HaPort, _node1HaPort); |
| createHANode(NODE3, _node3HaPort, _node1HaPort); |
| |
| String node1Url = _baseNodeRestUrl + NODE1; |
| String node2Url = _baseNodeRestUrl + NODE2; |
| String node3Url = _baseNodeRestUrl + NODE3; |
| |
| assertActualAndDesiredStates(node1Url, "ACTIVE", "ACTIVE"); |
| assertActualAndDesiredStates(node2Url, "ACTIVE", "ACTIVE"); |
| assertActualAndDesiredStates(node3Url, "ACTIVE", "ACTIVE"); |
| |
| // verify that remote nodes for node1 are created and their state is set to ACTIVE |
| _restTestHelper.waitForAttributeChanged("remotereplicationnode/" + NODE2 + "/" + NODE1, |
| BDBHARemoteReplicationNode.STATE, "ACTIVE"); |
| _restTestHelper.waitForAttributeChanged("remotereplicationnode/" + NODE3 + "/" + NODE1, |
| BDBHARemoteReplicationNode.STATE, "ACTIVE"); |
| |
| mutateDesiredState(node1Url, "STOPPED"); |
| |
| assertActualAndDesiredStates(node1Url, "STOPPED", "STOPPED"); |
| assertActualAndDesiredStates(node2Url, "ACTIVE", "ACTIVE"); |
| assertActualAndDesiredStates(node3Url, "ACTIVE", "ACTIVE"); |
| |
| // verify that remote node state fro node1 is changed to UNAVAILABLE |
| _restTestHelper.waitForAttributeChanged("remotereplicationnode/" + NODE2 + "/" + NODE1, |
| BDBHARemoteReplicationNode.STATE, "UNAVAILABLE"); |
| _restTestHelper.waitForAttributeChanged("remotereplicationnode/" + NODE3 + "/" + NODE1, |
| BDBHARemoteReplicationNode.STATE, "UNAVAILABLE"); |
| |
| List<Map<String, Object>> remoteNodes = getRestTestHelper().getJsonAsList("remotereplicationnode/" + NODE2); |
| assertEquals("Unexpected number of remote nodes on " + NODE2, 2, remoteNodes.size()); |
| |
| Map<String, Object> remoteNode1 = findRemoteNodeByName(remoteNodes, NODE1); |
| |
| assertEquals("Node 1 observed from node 2 is in the wrong state", |
| "UNAVAILABLE", remoteNode1.get(BDBHARemoteReplicationNode.STATE)); |
| assertEquals("Node 1 observed from node 2 has the wrong role", |
| "UNREACHABLE", remoteNode1.get(BDBHARemoteReplicationNode.ROLE)); |
| |
| } |
| |
| public void testNewMasterElectedWhenVirtualHostIsStopped() throws Exception |
| { |
| createHANode(NODE1, _node1HaPort, _node1HaPort); |
| createHANode(NODE2, _node2HaPort, _node1HaPort); |
| createHANode(NODE3, _node3HaPort, _node1HaPort); |
| |
| String node1Url = _baseNodeRestUrl + NODE1; |
| String node2Url = _baseNodeRestUrl + NODE2; |
| String node3Url = _baseNodeRestUrl + NODE3; |
| |
| assertActualAndDesiredStates(node1Url, "ACTIVE", "ACTIVE"); |
| assertActualAndDesiredStates(node2Url, "ACTIVE", "ACTIVE"); |
| assertActualAndDesiredStates(node3Url, "ACTIVE", "ACTIVE"); |
| |
| // Put virtualhost in STOPPED state |
| String virtualHostRestUrl = "virtualhost/" + NODE1 + "/" + _hostName; |
| assertActualAndDesiredStates(virtualHostRestUrl, "ACTIVE", "ACTIVE"); |
| mutateDesiredState(virtualHostRestUrl, "STOPPED"); |
| assertActualAndDesiredStates(virtualHostRestUrl, "STOPPED", "STOPPED"); |
| |
| // Now stop node 1 to cause an election between nodes 2 & 3 |
| mutateDesiredState(node1Url, "STOPPED"); |
| assertActualAndDesiredStates(node1Url, "STOPPED", "STOPPED"); |
| |
| Map<String, Object> newMasterData = awaitNewMaster(node2Url, node3Url); |
| |
| //Check the virtual host of the new master is in the stopped state |
| String newMasterVirtualHostRestUrl = "virtualhost/" + newMasterData.get(BDBHAVirtualHostNode.NAME) + "/" + _hostName; |
| assertActualAndDesiredStates(newMasterVirtualHostRestUrl, "STOPPED", "STOPPED"); |
| } |
| |
| public void testDeleteReplicaNode() throws Exception |
| { |
| createHANode(NODE1, _node1HaPort, _node1HaPort); |
| createHANode(NODE2, _node2HaPort, _node1HaPort); |
| createHANode(NODE3, _node3HaPort, _node1HaPort); |
| |
| assertRemoteNodes(NODE1, NODE2, NODE3); |
| |
| List<Map<String,Object>> data = getRestTestHelper().getJsonAsList("remotereplicationnode/" + NODE1); |
| assertEquals("Unexpected number of remote nodes on " + NODE1, 2, data.size()); |
| |
| getRestTestHelper().submitRequest(_baseNodeRestUrl + NODE2, "DELETE", HttpServletResponse.SC_OK); |
| |
| int counter = 0; |
| while (data.size() != 1 && counter<50) |
| { |
| data = getRestTestHelper().getJsonAsList("remotereplicationnode/" + NODE1); |
| if (data.size() != 1) |
| { |
| Thread.sleep(100l); |
| } |
| counter++; |
| } |
| assertEquals("Unexpected number of remote nodes on " + NODE1, 1, data.size()); |
| } |
| |
| public void testDeleteMasterNode() throws Exception |
| { |
| createHANode(NODE1, _node1HaPort, _node1HaPort); |
| createHANode(NODE2, _node2HaPort, _node1HaPort); |
| createHANode(NODE3, _node3HaPort, _node1HaPort); |
| |
| assertNode(NODE1, _node1HaPort, _node1HaPort, NODE1); |
| assertRemoteNodes(NODE1, NODE2, NODE3); |
| |
| // change priority to ensure that Node2 becomes a master |
| getRestTestHelper().submitRequest(_baseNodeRestUrl + NODE2, |
| "PUT", |
| Collections.<String, Object>singletonMap(BDBHAVirtualHostNode.PRIORITY, 100), |
| HttpServletResponse.SC_OK); |
| |
| List<Map<String,Object>> data = getRestTestHelper().getJsonAsList("remotereplicationnode/" + NODE2); |
| assertEquals("Unexpected number of remote nodes on " + NODE2, 2, data.size()); |
| |
| // delete master |
| getRestTestHelper().submitRequest(_baseNodeRestUrl + NODE1, "DELETE", HttpServletResponse.SC_OK); |
| |
| // wait for new master |
| _restTestHelper.waitForAttributeChanged(_baseNodeRestUrl + NODE2 + "?depth=0", BDBHAVirtualHostNode.ROLE, "MASTER"); |
| |
| // delete remote node |
| getRestTestHelper().submitRequest("remotereplicationnode/" + NODE2 + "/" + NODE1, "DELETE", HttpServletResponse.SC_OK); |
| |
| int counter = 0; |
| while (data.size() != 1 && counter<50) |
| { |
| data = getRestTestHelper().getJsonAsList("remotereplicationnode/" + NODE2); |
| if (data.size() != 1) |
| { |
| Thread.sleep(100l); |
| } |
| counter++; |
| } |
| assertEquals("Unexpected number of remote nodes on " + NODE2, 1, data.size()); |
| } |
| |
| public void testIntruderBDBHAVHNNotAllowedToConnect() throws Exception |
| { |
| createHANode(NODE1, _node1HaPort, _node1HaPort); |
| assertNode(NODE1, _node1HaPort, _node1HaPort, NODE1); |
| |
| // add permitted node |
| Map<String, Object> node3Data = createNodeAttributeMap(NODE3, _node3HaPort, _node1HaPort); |
| getRestTestHelper().submitRequest(_baseNodeRestUrl + NODE3, "PUT", node3Data, HttpServletResponse.SC_CREATED); |
| assertNode(NODE3, _node3HaPort, _node1HaPort, NODE1); |
| assertRemoteNodes(NODE1, NODE3); |
| |
| int intruderPort = getNextAvailable(_node3HaPort + 1); |
| |
| // try to add not permitted node |
| Map<String, Object> nodeData = createNodeAttributeMap(NODE2, intruderPort, _node1HaPort); |
| getRestTestHelper().submitRequest(_baseNodeRestUrl + NODE2, "PUT", nodeData, AbstractServlet.SC_UNPROCESSABLE_ENTITY); |
| |
| assertRemoteNodes(NODE1, NODE3); |
| } |
| |
| public void testIntruderProtection() throws Exception |
| { |
| createHANode(NODE1, _node1HaPort, _node1HaPort); |
| assertNode(NODE1, _node1HaPort, _node1HaPort, NODE1); |
| |
| Map<String,Object> nodeData = getRestTestHelper().getJsonAsMap(_baseNodeRestUrl + NODE1); |
| String node1StorePath = (String)nodeData.get(BDBHAVirtualHostNode.STORE_PATH); |
| long transactionId = ((Number)nodeData.get(BDBHAVirtualHostNode.LAST_KNOWN_REPLICATION_TRANSACTION_ID)).longValue(); |
| |
| // add permitted node |
| Map<String, Object> node3Data = createNodeAttributeMap(NODE3, _node3HaPort, _node1HaPort); |
| getRestTestHelper().submitRequest(_baseNodeRestUrl + NODE3, "PUT", node3Data, HttpServletResponse.SC_CREATED); |
| assertNode(NODE3, _node3HaPort, _node1HaPort, NODE1); |
| assertRemoteNodes(NODE1, NODE3); |
| |
| // Ensure PINGDB is created |
| // in order to exclude hanging of environment |
| // when environment.close is called whilst PINGDB is created. |
| // On node joining, a record is updated in PINGDB |
| // if lastTransactionId is incremented then node ping task was executed |
| int counter = 0; |
| long newTransactionId = transactionId; |
| while(newTransactionId == transactionId && counter<50) |
| { |
| nodeData = getRestTestHelper().getJsonAsMap(_baseNodeRestUrl + NODE1); |
| newTransactionId = ((Number)nodeData.get(BDBHAVirtualHostNode.LAST_KNOWN_REPLICATION_TRANSACTION_ID)).longValue(); |
| if (newTransactionId != transactionId) |
| { |
| break; |
| } |
| counter++; |
| Thread.sleep(100L); |
| } |
| |
| //connect intruder node |
| String nodeName = NODE2; |
| String nodeHostPort = "localhost:" + getNextAvailable(_node3HaPort + 1); |
| File environmentPathFile = new File(node1StorePath, nodeName); |
| environmentPathFile.mkdirs(); |
| ReplicationConfig replicationConfig = new ReplicationConfig((String)nodeData.get(BDBHAVirtualHostNode.GROUP_NAME), nodeName, nodeHostPort); |
| replicationConfig.setHelperHosts((String)nodeData.get(BDBHAVirtualHostNode.ADDRESS)); |
| replicationConfig.setConsistencyPolicy(NoConsistencyRequiredPolicy.NO_CONSISTENCY); |
| EnvironmentConfig envConfig = new EnvironmentConfig(); |
| envConfig.setAllowCreate(true); |
| envConfig.setTransactional(true); |
| envConfig.setDurability(Durability.parse((String)nodeData.get(BDBHAVirtualHostNode.DURABILITY))); |
| |
| ReplicatedEnvironment intruder = null; |
| try |
| { |
| intruder = new ReplicatedEnvironment(environmentPathFile, replicationConfig, envConfig); |
| } |
| finally |
| { |
| if (intruder != null) |
| { |
| intruder.close(); |
| } |
| } |
| |
| _restTestHelper.waitForAttributeChanged(_baseNodeRestUrl + NODE1, VirtualHostNode.STATE, State.ERRORED.name()); |
| _restTestHelper.waitForAttributeChanged(_baseNodeRestUrl + NODE3, VirtualHostNode.STATE, State.ERRORED.name()); |
| } |
| |
| private void createHANode(String nodeName, int nodePort, int helperPort) throws Exception |
| { |
| Map<String, Object> nodeData = createNodeAttributeMap(nodeName, nodePort, helperPort); |
| |
| getRestTestHelper().submitRequest(_baseNodeRestUrl + nodeName, "PUT", nodeData, HttpServletResponse.SC_CREATED); |
| String hostExpectedState = nodePort == helperPort ? State.ACTIVE.name(): State.UNAVAILABLE.name(); |
| _restTestHelper.waitForAttributeChanged("virtualhost/" + nodeName + "/" + _hostName, BDBHAVirtualHost.STATE, hostExpectedState); |
| } |
| |
| private Map<String, Object> createNodeAttributeMap(String nodeName, int nodePort, int helperPort) throws Exception |
| { |
| Map<String, Object> nodeData = new HashMap<>(); |
| nodeData.put(BDBHAVirtualHostNode.NAME, nodeName); |
| nodeData.put(BDBHAVirtualHostNode.TYPE, BDBHAVirtualHostNodeImpl.VIRTUAL_HOST_NODE_TYPE); |
| nodeData.put(BDBHAVirtualHostNode.GROUP_NAME, _hostName); |
| nodeData.put(BDBHAVirtualHostNode.ADDRESS, "localhost:" + nodePort); |
| nodeData.put(BDBHAVirtualHostNode.HELPER_ADDRESS, "localhost:" + helperPort); |
| if (nodePort != helperPort) |
| { |
| nodeData.put(BDBHAVirtualHostNode.HELPER_NODE_NAME, NODE1); |
| } |
| |
| Map<String,String> context = new HashMap<>(); |
| nodeData.put(BDBHAVirtualHostNode.CONTEXT, context); |
| if (nodePort == helperPort) |
| { |
| nodeData.put(BDBHAVirtualHostNode.PERMITTED_NODES, GroupCreator.getPermittedNodes("localhost", _node1HaPort, _node2HaPort, _node3HaPort)); |
| } |
| String bluePrint = GroupCreator.getBlueprint(); |
| context.put(AbstractVirtualHostNode.VIRTUALHOST_BLUEPRINT_CONTEXT_VAR, bluePrint); |
| return nodeData; |
| } |
| |
| private void assertNode(String nodeName, int nodePort, int nodeHelperPort, String masterNode) throws Exception |
| { |
| boolean isMaster = nodeName.equals(masterNode); |
| String expectedRole = isMaster? "MASTER" : "REPLICA"; |
| _restTestHelper.waitForAttributeChanged(_baseNodeRestUrl + nodeName + "?depth=0", BDBHAVirtualHostNode.ROLE, expectedRole); |
| |
| Map<String, Object> nodeData = getRestTestHelper().getJsonAsMap(_baseNodeRestUrl + nodeName + "?depth=0"); |
| assertEquals("Unexpected name", nodeName, nodeData.get(BDBHAVirtualHostNode.NAME)); |
| assertEquals("Unexpected type", BDBHAVirtualHostNodeImpl.VIRTUAL_HOST_NODE_TYPE, nodeData.get(BDBHAVirtualHostNode.TYPE)); |
| assertEquals("Unexpected address", "localhost:" + nodePort, nodeData.get(BDBHAVirtualHostNode.ADDRESS)); |
| assertEquals("Unexpected helper address", "localhost:" + nodeHelperPort, nodeData.get(BDBHAVirtualHostNode.HELPER_ADDRESS)); |
| assertEquals("Unexpected group name", _hostName, nodeData.get(BDBHAVirtualHostNode.GROUP_NAME)); |
| assertEquals("Unexpected role", expectedRole, nodeData.get(BDBHAVirtualHostNode.ROLE)); |
| |
| Integer lastKnownTransactionId = (Integer) nodeData.get(BDBHAVirtualHostNode.LAST_KNOWN_REPLICATION_TRANSACTION_ID); |
| assertNotNull("Unexpected lastKnownReplicationId", lastKnownTransactionId); |
| assertTrue("Unexpected lastKnownReplicationId " + lastKnownTransactionId, lastKnownTransactionId > 0); |
| |
| Long joinTime = (Long) nodeData.get(BDBHAVirtualHostNode.JOIN_TIME); |
| assertNotNull("Unexpected joinTime", joinTime); |
| assertTrue("Unexpected joinTime " + joinTime, joinTime > 0); |
| |
| if (isMaster) |
| { |
| _restTestHelper.waitForAttributeChanged("virtualhost/" + masterNode + "/" + _hostName + "?depth=0", VirtualHost.STATE, State.ACTIVE.name()); |
| } |
| |
| } |
| |
| private void assertRemoteNodes(String masterNode, String... replicaNodes) throws Exception |
| { |
| List<String> clusterNodes = new ArrayList<>(Arrays.asList(replicaNodes)); |
| clusterNodes.add(masterNode); |
| |
| for (String clusterNodeName : clusterNodes) |
| { |
| List<String> remotes = new ArrayList<>(clusterNodes); |
| remotes.remove(clusterNodeName); |
| for (String remote : remotes) |
| { |
| String remoteUrl = "remotereplicationnode/" + clusterNodeName + "/" + remote; |
| String desiredNodeState = remote.equals(masterNode) ? "MASTER" : "REPLICA"; |
| _restTestHelper.waitForAttributeChanged(remoteUrl, |
| node -> desiredNodeState.equals(node.get( |
| BDBHARemoteReplicationNode.ROLE)) |
| && (Integer) node.get(BDBHAVirtualHostNode.LAST_KNOWN_REPLICATION_TRANSACTION_ID) > 0 |
| && ((Number) node.get(BDBHAVirtualHostNode.JOIN_TIME)).longValue() > 0L); |
| } |
| } |
| } |
| |
| private void assertActualAndDesiredStates(final String restUrl, |
| final String expectedDesiredState, |
| final String expectedActualState) throws IOException |
| { |
| Map<String, Object> objectData = getRestTestHelper().getJsonAsMap(restUrl); |
| Asserts.assertActualAndDesiredState(expectedDesiredState, expectedActualState, objectData); |
| } |
| |
| private void mutateDesiredState(final String restUrl, final String newState) throws IOException |
| { |
| Map<String, Object> newAttributes = new HashMap<>(); |
| newAttributes.put(VirtualHostNode.DESIRED_STATE, newState); |
| |
| getRestTestHelper().submitRequest(restUrl, "PUT", newAttributes, HttpServletResponse.SC_OK); |
| } |
| |
| private Map<String, Object> findRemoteNodeByName(final List<Map<String, Object>> remoteNodes, final String nodeName) |
| { |
| Map<String, Object> foundNode = null; |
| for (Map<String, Object> remoteNode : remoteNodes) |
| { |
| if (nodeName.equals(remoteNode.get(RemoteReplicationNode.NAME))) |
| { |
| foundNode = remoteNode; |
| break; |
| } |
| } |
| assertNotNull("Could not find node with name " + nodeName + " amongst remote nodes."); |
| return foundNode; |
| } |
| |
| private Map<String, Object> awaitNewMaster(final String... nodeUrls) |
| throws IOException, InterruptedException |
| { |
| Map<String, Object> newMasterData = null; |
| int counter = 0; |
| while (newMasterData == null && counter < 50) |
| { |
| for(String nodeUrl: nodeUrls) |
| { |
| Map<String, Object> nodeData = getRestTestHelper().getJsonAsMap(nodeUrl); |
| if ("MASTER".equals(nodeData.get(BDBHAVirtualHostNode.ROLE))) |
| { |
| newMasterData = nodeData; |
| break; |
| } |
| } |
| if (newMasterData == null) |
| { |
| Thread.sleep(100L); |
| counter++; |
| } |
| } |
| assertNotNull("Could not find new master", newMasterData); |
| return newMasterData; |
| } |
| |
| |
| } |