blob: 9b3f13921e86e35bb43652138ca8bc677c2ddfa9 [file] [log] [blame]
/*
*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*
*/
package org.apache.qpid.server.store.berkeleydb.replication;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
import java.io.File;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
import org.apache.qpid.AMQStoreException;
import org.apache.qpid.server.configuration.updater.TaskExecutor;
import org.apache.qpid.server.model.ReplicationNode;
import org.apache.qpid.server.model.VirtualHost;
import org.apache.qpid.server.replication.ReplicationGroupListener;
import org.apache.qpid.server.store.berkeleydb.EnvironmentFacade;
import org.apache.qpid.test.utils.QpidTestCase;
import org.apache.qpid.test.utils.TestFileUtils;
import org.apache.qpid.util.FileUtils;
import com.sleepycat.je.Database;
import com.sleepycat.je.DatabaseConfig;
import com.sleepycat.je.Durability;
import com.sleepycat.je.Environment;
import com.sleepycat.je.rep.InsufficientReplicasException;
import com.sleepycat.je.rep.ReplicatedEnvironment;
import com.sleepycat.je.rep.ReplicatedEnvironment.State;
import com.sleepycat.je.rep.ReplicationConfig;
import com.sleepycat.je.rep.StateChangeEvent;
import com.sleepycat.je.rep.StateChangeListener;
public class ReplicatedEnvironmentFacadeTest extends QpidTestCase
{
private static final int TEST_NODE_PORT = new QpidTestCase().findFreePort();
private static final int LISTENER_TIMEOUT = 5;
private static final int WAIT_STATE_CHANGE_TIMEOUT = 30;
private static final String TEST_GROUP_NAME = "testGroupName";
private static final String TEST_NODE_NAME = "testNodeName";
private static final String TEST_NODE_HOST_PORT = "localhost:" + TEST_NODE_PORT;
private static final String TEST_NODE_HELPER_HOST_PORT = TEST_NODE_HOST_PORT;
private static final String TEST_DURABILITY = Durability.parse("NO_SYNC,NO_SYNC,SIMPLE_MAJORITY").toString();
private static final boolean TEST_DESIGNATED_PRIMARY = false;
private static final boolean TEST_COALESCING_SYNC = true;
private static final int TEST_PRIORITY = 10;
private static final int TEST_ELECTABLE_GROUP_OVERRIDE = 0;
private File _storePath;
private final Map<String, ReplicatedEnvironmentFacade> _nodes = new HashMap<String, ReplicatedEnvironmentFacade>();
private VirtualHost _virtualHost = mock(VirtualHost.class);
private RemoteReplicationNodeFactory _remoteReplicationNodeFactory = new DefaultRemoteReplicationNodeFactory(_virtualHost);
public void setUp() throws Exception
{
super.setUp();
TaskExecutor taskExecutor = mock(TaskExecutor.class);
when(taskExecutor.isTaskExecutorThread()).thenReturn(true);
when(_virtualHost.getTaskExecutor()).thenReturn(taskExecutor);
_storePath = TestFileUtils.createTestDirectory("bdb", true);
when(_virtualHost.getAttribute(VirtualHost.REMOTE_REPLICATION_NODE_MONITOR_INTERVAL)).thenReturn(100L);
setTestSystemProperty(ReplicatedEnvironmentFacade.DB_PING_SOCKET_TIMEOUT_PROPERTY_NAME, "100");
}
@Override
public void tearDown() throws Exception
{
try
{
for (EnvironmentFacade ef : _nodes.values())
{
ef.close();
}
}
finally
{
try
{
if (_storePath != null)
{
FileUtils.delete(_storePath, true);
}
}
finally
{
super.tearDown();
}
}
}
public void testEnvironmentFacade() throws Exception
{
EnvironmentFacade ef = createMaster();
assertNotNull("Environment should not be null", ef);
Environment e = ef.getEnvironment();
assertTrue("Environment is not valid", e.isValid());
}
public void testClose() throws Exception
{
EnvironmentFacade ef = createMaster();
ef.close();
Environment e = ef.getEnvironment();
assertNull("Environment should be null after facade close", e);
}
public void testTransferMasterToSelf() throws Exception
{
final CountDownLatch firstNodeReplicaStateLatch = new CountDownLatch(1);
final CountDownLatch firstNodeMasterStateLatch = new CountDownLatch(1);
StateChangeListener stateChangeListener = new StateChangeListener(){
@Override
public void stateChange(StateChangeEvent event) throws RuntimeException
{
ReplicatedEnvironment.State state = event.getState();
if (state == ReplicatedEnvironment.State.REPLICA)
{
firstNodeReplicaStateLatch.countDown();
}
if (state == ReplicatedEnvironment.State.MASTER)
{
firstNodeMasterStateLatch.countDown();
}
}
};
ReplicatedEnvironmentFacade firstNode = addNode(State.MASTER, stateChangeListener, new NoopReplicationGroupListener());
assertTrue("Environment did not become a master", firstNodeMasterStateLatch.await(10, TimeUnit.SECONDS));
int replica1Port = getNextAvailable(TEST_NODE_PORT + 1);
String node1NodeHostPort = "localhost:" + replica1Port;
ReplicatedEnvironmentFacade secondNode = addReplica(TEST_NODE_NAME + "_1", node1NodeHostPort);
assertEquals("Unexpected state", ReplicatedEnvironment.State.REPLICA.name(), secondNode.getNodeState());
int replica2Port = getNextAvailable(replica1Port + 1);
String node2NodeHostPort = "localhost:" + replica2Port;
final CountDownLatch replicaStateLatch = new CountDownLatch(1);
final CountDownLatch masterStateLatch = new CountDownLatch(1);
StateChangeListener testStateChangeListener = new StateChangeListener()
{
@Override
public void stateChange(StateChangeEvent event) throws RuntimeException
{
ReplicatedEnvironment.State state = event.getState();
if (state == ReplicatedEnvironment.State.REPLICA)
{
replicaStateLatch.countDown();
}
if (state == ReplicatedEnvironment.State.MASTER)
{
masterStateLatch.countDown();
}
}
};
ReplicatedEnvironmentFacade thirdNode = addNode(TEST_NODE_NAME + "_2", node2NodeHostPort, TEST_DESIGNATED_PRIMARY, State.REPLICA, testStateChangeListener, new NoopReplicationGroupListener());
assertTrue("Environment did not become a replica", replicaStateLatch.await(10, TimeUnit.SECONDS));
assertEquals(3, thirdNode.getNumberOfElectableGroupMembers());
thirdNode.transferMasterToSelfAsynchronously();
assertTrue("Environment did not become a master", masterStateLatch.await(10, TimeUnit.SECONDS));
assertTrue("First node environment did not become a replica", firstNodeReplicaStateLatch.await(10, TimeUnit.SECONDS));
assertEquals("Unexpected state", ReplicatedEnvironment.State.REPLICA.name(), firstNode.getNodeState());
}
public void testTransferMasterAnotherNode() throws Exception
{
final CountDownLatch firstNodeReplicaStateLatch = new CountDownLatch(1);
final CountDownLatch firstNodeMasterStateLatch = new CountDownLatch(1);
StateChangeListener stateChangeListener = new StateChangeListener(){
@Override
public void stateChange(StateChangeEvent event) throws RuntimeException
{
ReplicatedEnvironment.State state = event.getState();
if (state == ReplicatedEnvironment.State.REPLICA)
{
firstNodeReplicaStateLatch.countDown();
}
if (state == ReplicatedEnvironment.State.MASTER)
{
firstNodeMasterStateLatch.countDown();
}
}
};
ReplicatedEnvironmentFacade firstNode = addNode(State.MASTER, stateChangeListener, new NoopReplicationGroupListener());
assertTrue("Environment did not become a master", firstNodeMasterStateLatch.await(10, TimeUnit.SECONDS));
int replica1Port = getNextAvailable(TEST_NODE_PORT + 1);
String node1NodeHostPort = "localhost:" + replica1Port;
ReplicatedEnvironmentFacade secondNode = addReplica(TEST_NODE_NAME + "_1", node1NodeHostPort);
assertEquals("Unexpected state", ReplicatedEnvironment.State.REPLICA.name(), secondNode.getNodeState());
int replica2Port = getNextAvailable(replica1Port + 1);
String node2NodeHostPort = "localhost:" + replica2Port;
final CountDownLatch replicaStateLatch = new CountDownLatch(1);
final CountDownLatch masterStateLatch = new CountDownLatch(1);
StateChangeListener testStateChangeListener = new StateChangeListener()
{
@Override
public void stateChange(StateChangeEvent event) throws RuntimeException
{
ReplicatedEnvironment.State state = event.getState();
if (state == ReplicatedEnvironment.State.REPLICA)
{
replicaStateLatch.countDown();
}
if (state == ReplicatedEnvironment.State.MASTER)
{
masterStateLatch.countDown();
}
}
};
String thirdNodeName = TEST_NODE_NAME + "_2";
ReplicatedEnvironmentFacade thirdNode = addNode(thirdNodeName, node2NodeHostPort, TEST_DESIGNATED_PRIMARY, State.REPLICA, testStateChangeListener, new NoopReplicationGroupListener());
assertTrue("Environment did not become a replica", replicaStateLatch.await(10, TimeUnit.SECONDS));
assertEquals(3, thirdNode.getNumberOfElectableGroupMembers());
firstNode.transferMasterAsynchronously(thirdNodeName);
assertTrue("Environment did not become a master", masterStateLatch.await(10, TimeUnit.SECONDS));
assertTrue("First node environment did not become a replica", firstNodeReplicaStateLatch.await(10, TimeUnit.SECONDS));
assertEquals("Unexpected state", ReplicatedEnvironment.State.REPLICA.name(), firstNode.getNodeState());
}
public void testOpenDatabases() throws Exception
{
EnvironmentFacade ef = createMaster();
DatabaseConfig dbConfig = new DatabaseConfig();
dbConfig.setTransactional(true);
dbConfig.setAllowCreate(true);
ef.openDatabases(dbConfig, "test1", "test2");
Database test1 = ef.getOpenDatabase("test1");
Database test2 = ef.getOpenDatabase("test2");
assertEquals("Unexpected name for open database test1", "test1" , test1.getDatabaseName());
assertEquals("Unexpected name for open database test2", "test2" , test2.getDatabaseName());
}
public void testGetOpenDatabaseForNonExistingDatabase() throws Exception
{
EnvironmentFacade ef = createMaster();
DatabaseConfig dbConfig = new DatabaseConfig();
dbConfig.setTransactional(true);
dbConfig.setAllowCreate(true);
ef.openDatabases(dbConfig, "test1");
Database test1 = ef.getOpenDatabase("test1");
assertEquals("Unexpected name for open database test1", "test1" , test1.getDatabaseName());
try
{
ef.getOpenDatabase("test2");
fail("An exception should be thrown for the non existing database");
}
catch(IllegalArgumentException e)
{
assertEquals("Unexpected exception message", "Database with name 'test2' has never been requested to be opened", e.getMessage());
}
}
public void testGetGroupName() throws Exception
{
assertEquals("Unexpected group name", TEST_GROUP_NAME, createMaster().getGroupName());
}
public void testGetNodeName() throws Exception
{
assertEquals("Unexpected group name", TEST_NODE_NAME, createMaster().getNodeName());
}
public void testLastKnownReplicationTransactionId() throws Exception
{
ReplicatedEnvironmentFacade master = createMaster();
long lastKnownReplicationTransactionId = master.getLastKnownReplicationTransactionId();
assertTrue("Unexpected LastKnownReplicationTransactionId " + lastKnownReplicationTransactionId, lastKnownReplicationTransactionId > 0);
}
public void testGetNodeHostPort() throws Exception
{
assertEquals("Unexpected node host port", TEST_NODE_HOST_PORT, createMaster().getHostPort());
}
public void testGetHelperHostPort() throws Exception
{
assertEquals("Unexpected node helper host port", TEST_NODE_HELPER_HOST_PORT, createMaster().getHelperHostPort());
}
public void testGetDurability() throws Exception
{
assertEquals("Unexpected durability", TEST_DURABILITY.toString(), createMaster().getDurability());
}
public void testIsCoalescingSync() throws Exception
{
assertEquals("Unexpected coalescing sync", TEST_COALESCING_SYNC, createMaster().isCoalescingSync());
}
public void testGetNodeState() throws Exception
{
assertEquals("Unexpected state", State.MASTER.name(), createMaster().getNodeState());
}
public void testPriority() throws Exception
{
ReplicatedEnvironmentFacade facade = createMaster();
assertEquals("Unexpected priority", TEST_PRIORITY, facade.getPriority());
Future<Void> future = facade.setPriority(TEST_PRIORITY + 1);
future.get(5, TimeUnit.SECONDS);
assertEquals("Unexpected priority after change", TEST_PRIORITY + 1, facade.getPriority());
}
public void testDesignatedPrimary() throws Exception
{
ReplicatedEnvironmentFacade master = createMaster();
assertEquals("Unexpected designated primary", TEST_DESIGNATED_PRIMARY, master.isDesignatedPrimary());
Future<Void> future = master.setDesignatedPrimary(!TEST_DESIGNATED_PRIMARY);
future.get(5, TimeUnit.SECONDS);
assertEquals("Unexpected designated primary after change", !TEST_DESIGNATED_PRIMARY, master.isDesignatedPrimary());
}
public void testElectableGroupSizeOverride() throws Exception
{
ReplicatedEnvironmentFacade facade = createMaster();
assertEquals("Unexpected Electable Group Size Override", TEST_ELECTABLE_GROUP_OVERRIDE, facade.getElectableGroupSizeOverride());
Future<Void> future = facade.setElectableGroupSizeOverride(TEST_ELECTABLE_GROUP_OVERRIDE + 1);
future.get(5, TimeUnit.SECONDS);
assertEquals("Unexpected Electable Group Size Override after change", TEST_ELECTABLE_GROUP_OVERRIDE + 1, facade.getElectableGroupSizeOverride());
}
public void testReplicationGroupListenerHearsAboutExistingRemoteReplicationNodes() throws Exception
{
ReplicatedEnvironmentFacade master = createMaster();
String nodeName2 = TEST_NODE_NAME + "_2";
String host = "localhost";
int port = getNextAvailable(TEST_NODE_PORT + 1);
String node2NodeHostPort = host + ":" + port;
final AtomicInteger invocationCount = new AtomicInteger();
final CountDownLatch nodeRecoveryLatch = new CountDownLatch(1);
ReplicationGroupListener listener = new NoopReplicationGroupListener()
{
@Override
public void onReplicationNodeRecovered(ReplicationNode node)
{
nodeRecoveryLatch.countDown();
invocationCount.incrementAndGet();
}
};
addReplica(nodeName2, node2NodeHostPort, listener);
assertEquals("Unexpected number of nodes", 2, master.getNumberOfElectableGroupMembers());
assertTrue("Listener not fired within timeout", nodeRecoveryLatch.await(LISTENER_TIMEOUT, TimeUnit.SECONDS));
assertEquals("Unexpected number of listener invocations", 1, invocationCount.get());
}
public void testReplicationGroupListenerHearsNodeAdded() throws Exception
{
final CountDownLatch nodeAddedLatch = new CountDownLatch(1);
final AtomicInteger invocationCount = new AtomicInteger();
ReplicationGroupListener listener = new NoopReplicationGroupListener()
{
@Override
public void onReplicationNodeAddedToGroup(ReplicationNode node)
{
invocationCount.getAndIncrement();
nodeAddedLatch.countDown();
}
};
TestStateChangeListener stateChangeListener = new TestStateChangeListener(State.MASTER);
ReplicatedEnvironmentFacade replicatedEnvironmentFacade = addNode(State.MASTER, stateChangeListener, listener);
assertTrue("Master was not started", stateChangeListener.awaitForStateChange(LISTENER_TIMEOUT, TimeUnit.SECONDS));
assertEquals("Unexpected number of nodes at start of test", 1, replicatedEnvironmentFacade.getNumberOfElectableGroupMembers());
String node2Name = TEST_NODE_NAME + "_2";
String node2NodeHostPort = "localhost" + ":" + getNextAvailable(TEST_NODE_PORT + 1);
addReplica(node2Name, node2NodeHostPort);
assertTrue("Listener not fired within timeout", nodeAddedLatch.await(LISTENER_TIMEOUT, TimeUnit.SECONDS));
assertEquals("Unexpected number of nodes", 2, replicatedEnvironmentFacade.getNumberOfElectableGroupMembers());
assertEquals("Unexpected number of listener invocations", 1, invocationCount.get());
}
public void testReplicationGroupListenerHearsNodeRemoved() throws Exception
{
final CountDownLatch nodeDeletedLatch = new CountDownLatch(1);
final CountDownLatch nodeAddedLatch = new CountDownLatch(1);
final AtomicInteger invocationCount = new AtomicInteger();
ReplicationGroupListener listener = new NoopReplicationGroupListener()
{
@Override
public void onReplicationNodeRecovered(ReplicationNode node)
{
nodeAddedLatch.countDown();
}
@Override
public void onReplicationNodeAddedToGroup(ReplicationNode node)
{
nodeAddedLatch.countDown();
}
@Override
public void onReplicationNodeRemovedFromGroup(ReplicationNode node)
{
invocationCount.getAndIncrement();
nodeDeletedLatch.countDown();
}
};
TestStateChangeListener stateChangeListener = new TestStateChangeListener(State.MASTER);
ReplicatedEnvironmentFacade replicatedEnvironmentFacade = addNode(State.MASTER, stateChangeListener, listener);
assertTrue("Master was not started", stateChangeListener.awaitForStateChange(LISTENER_TIMEOUT, TimeUnit.SECONDS));
String node2Name = TEST_NODE_NAME + "_2";
String node2NodeHostPort = "localhost" + ":" + getNextAvailable(TEST_NODE_PORT + 1);
addReplica(node2Name, node2NodeHostPort);
assertEquals("Unexpected number of nodes at start of test", 2, replicatedEnvironmentFacade.getNumberOfElectableGroupMembers());
// Need to await the listener hearing the addition of the node to the model.
assertTrue("Node add not fired within timeout", nodeAddedLatch.await(LISTENER_TIMEOUT, TimeUnit.SECONDS));
// Now remove the node and ensure we hear the event
replicatedEnvironmentFacade.removeNodeFromGroup(node2Name);
assertTrue("Node delete not fired within timeout", nodeDeletedLatch.await(LISTENER_TIMEOUT, TimeUnit.SECONDS));
assertEquals("Unexpected number of nodes after node removal", 1, replicatedEnvironmentFacade.getNumberOfElectableGroupMembers());
assertEquals("Unexpected number of listener invocations", 1, invocationCount.get());
}
public void testMasterHearsRemoteNodeRoles() throws Exception
{
final CountDownLatch nodeAddedLatch = new CountDownLatch(1);
final AtomicReference<ReplicationNode> nodeRef = new AtomicReference<ReplicationNode>();
ReplicationGroupListener listener = new NoopReplicationGroupListener()
{
@Override
public void onReplicationNodeAddedToGroup(ReplicationNode node)
{
nodeRef.set(node);
nodeAddedLatch.countDown();
}
};
TestStateChangeListener stateChangeListener = new TestStateChangeListener(State.MASTER);
ReplicatedEnvironmentFacade replicatedEnvironmentFacade = addNode(State.MASTER, stateChangeListener, listener);
assertTrue("Master was not started", stateChangeListener.awaitForStateChange(LISTENER_TIMEOUT, TimeUnit.SECONDS));
String node2Name = TEST_NODE_NAME + "_2";
String node2NodeHostPort = "localhost" + ":" + getNextAvailable(TEST_NODE_PORT + 1);
addReplica(node2Name, node2NodeHostPort);
assertEquals("Unexpected number of nodes at start of test", 2, replicatedEnvironmentFacade.getNumberOfElectableGroupMembers());
assertTrue("Node add not fired within timeout", nodeAddedLatch.await(LISTENER_TIMEOUT, TimeUnit.SECONDS));
RemoteReplicationNode remoteNode = (RemoteReplicationNode)nodeRef.get();
assertEquals("Unexpcted node name", node2Name, remoteNode.getName());
// Need to poll to await the remote node updating itself
long timeout = System.currentTimeMillis() + 5000;
while(!State.REPLICA.name().equals(remoteNode.getAttribute(ReplicationNode.ROLE)) && System.currentTimeMillis() < timeout)
{
Thread.sleep(200);
}
assertEquals("Unexpcted node role (after waiting)", State.REPLICA.name(), remoteNode.getAttribute(ReplicationNode.ROLE));
assertNotNull("Replica node " + ReplicationNode.JOIN_TIME + " attribute is not set", remoteNode.getAttribute(ReplicationNode.JOIN_TIME));
assertNotNull("Replica node " + ReplicationNode.LAST_KNOWN_REPLICATION_TRANSACTION_ID + " attribute is not set", remoteNode.getAttribute(ReplicationNode.LAST_KNOWN_REPLICATION_TRANSACTION_ID));
}
public void testRemoveNodeFromGroup() throws Exception
{
ReplicatedEnvironmentFacade environmentFacade = createMaster();
String node2Name = TEST_NODE_NAME + "_2";
String node2NodeHostPort = "localhost:" + getNextAvailable(TEST_NODE_PORT + 1);
ReplicatedEnvironmentFacade ref2 = addReplica(node2Name, node2NodeHostPort);
assertEquals("Unexpected group members count", 2, environmentFacade.getNumberOfElectableGroupMembers());
ref2.close();
environmentFacade.removeNodeFromGroup(node2Name);
assertEquals("Unexpected group members count", 1, environmentFacade.getNumberOfElectableGroupMembers());
}
public void testEnvironmentRestartOnInsufficientReplicas() throws Exception
{
ReplicatedEnvironmentFacade master = createMaster();
int replica1Port = getNextAvailable(TEST_NODE_PORT + 1);
String replica1NodeName = TEST_NODE_NAME + "_1";
String replica1NodeHostPort = "localhost:" + replica1Port;
ReplicatedEnvironmentFacade replica1 = addReplica(replica1NodeName, replica1NodeHostPort);
int replica2Port = getNextAvailable(replica1Port + 1);
String replica2NodeName = TEST_NODE_NAME + "_2";
String replica2NodeHostPort = "localhost:" + replica2Port;
ReplicatedEnvironmentFacade replica2 = addReplica(replica2NodeName, replica2NodeHostPort);
String databaseName = "test";
DatabaseConfig dbConfig = createDatabase(master, databaseName);
// close replicas
replica1.close();
replica2.close();
Environment e = master.getEnvironment();
Database db = master.getOpenDatabase(databaseName);
try
{
master.openDatabases(dbConfig, "test2");
fail("Opening of new database without quorum should fail");
}
catch(InsufficientReplicasException ex)
{
master.handleDatabaseException(null, ex);
}
replica1 = addReplica(replica1NodeName, replica1NodeHostPort);
replica2 = addReplica(replica2NodeName, replica2NodeHostPort);
// Need to poll to await the remote node updating itself
long timeout = System.currentTimeMillis() + 5000;
while(!(State.REPLICA.name().equals(master.getNodeState()) || State.MASTER.name().equals(master.getNodeState()) ) && System.currentTimeMillis() < timeout)
{
Thread.sleep(200);
}
assertTrue("The node could not rejoin the cluster. State is " + master.getNodeState(),
State.REPLICA.name().equals(master.getNodeState()) || State.MASTER.name().equals(master.getNodeState()) );
Environment e2 = master.getEnvironment();
assertNotSame("Environment has not been restarted", e2, e);
Database db1 = master.getOpenDatabase(databaseName);
assertNotSame("Database should be the re-created", db1, db);
}
public void testEnvironmentAutomaticallyRestartsAndBecomesUnknownOnInsufficientReplicas() throws Exception
{
final CountDownLatch masterLatch = new CountDownLatch(1);
final AtomicInteger masterStateChangeCount = new AtomicInteger();
final CountDownLatch unknownLatch = new CountDownLatch(1);
final AtomicInteger unknownStateChangeCount = new AtomicInteger();
StateChangeListener stateChangeListener = new StateChangeListener()
{
@Override
public void stateChange(StateChangeEvent stateChangeEvent) throws RuntimeException
{
if (stateChangeEvent.getState() == State.MASTER)
{
masterStateChangeCount.incrementAndGet();
masterLatch.countDown();
}
else if (stateChangeEvent.getState() == State.UNKNOWN)
{
unknownStateChangeCount.incrementAndGet();
unknownLatch.countDown();
}
}
};
addNode(State.MASTER, stateChangeListener, new NoopReplicationGroupListener());
assertTrue("Master was not started", masterLatch.await(LISTENER_TIMEOUT, TimeUnit.SECONDS));
int replica1Port = getNextAvailable(TEST_NODE_PORT + 1);
String node1NodeHostPort = "localhost:" + replica1Port;
int replica2Port = getNextAvailable(replica1Port + 1);
String node2NodeHostPort = "localhost:" + replica2Port;
ReplicatedEnvironmentFacade replica1 = addReplica(TEST_NODE_NAME + "_1", node1NodeHostPort);
ReplicatedEnvironmentFacade replica2 = addReplica(TEST_NODE_NAME + "_2", node2NodeHostPort);
// close replicas
replica1.close();
replica2.close();
assertTrue("Environment should be recreated and go into unknown state",
unknownLatch.await(WAIT_STATE_CHANGE_TIMEOUT, TimeUnit.SECONDS));
assertEquals("Node made master an unexpected number of times", 1, masterStateChangeCount.get());
assertEquals("Node made unknown an unexpected number of times", 1, unknownStateChangeCount.get());
}
public void testEnvironmentFacadeDetectsRemovalOfRemoteNode() throws Exception
{
final CountDownLatch nodeRemovedLatch = new CountDownLatch(1);
final CountDownLatch nodeAddedLatch = new CountDownLatch(1);
final AtomicReference<ReplicationNode> addedNodeRef = new AtomicReference<ReplicationNode>();
final AtomicReference<ReplicationNode> removedNodeRef = new AtomicReference<ReplicationNode>();
ReplicationGroupListener listener = new NoopReplicationGroupListener()
{
@Override
public void onReplicationNodeAddedToGroup(ReplicationNode node)
{
if (addedNodeRef.compareAndSet(null, node))
{
nodeAddedLatch.countDown();
}
}
@Override
public void onReplicationNodeRemovedFromGroup(ReplicationNode node)
{
removedNodeRef.set(node);
nodeRemovedLatch.countDown();
}
};
TestStateChangeListener stateChangeListener = new TestStateChangeListener(State.MASTER);
final ReplicatedEnvironmentFacade masterEnvironment = addNode(State.MASTER, stateChangeListener, listener);
assertTrue("Master was not started", stateChangeListener.awaitForStateChange(LISTENER_TIMEOUT, TimeUnit.SECONDS));
masterEnvironment.setDesignatedPrimary(true);
int replica1Port = getNextAvailable(TEST_NODE_PORT + 1);
String node1NodeHostPort = "localhost:" + replica1Port;
String replicaName = TEST_NODE_NAME + "_1";
addReplica(replicaName, node1NodeHostPort);
assertTrue("Node should be added", nodeAddedLatch.await(WAIT_STATE_CHANGE_TIMEOUT, TimeUnit.SECONDS));
ReplicationNode node = addedNodeRef.get();
assertEquals("Unexpected node name", replicaName, node.getName());
// Need to poll to await the remote node updating itself
long timeout = System.currentTimeMillis() + 5000;
while(!State.REPLICA.name().equals(node.getAttribute(ReplicationNode.ROLE)) && System.currentTimeMillis() < timeout)
{
Thread.sleep(200);
}
assertEquals("Unexpected node role", State.REPLICA.name(), node.getAttribute(ReplicationNode.ROLE));
// removing remote node
node.setDesiredState(node.getActualState(), org.apache.qpid.server.model.State.DELETED);
assertTrue("Node deleting is undetected by the environment facade", nodeRemovedLatch.await(WAIT_STATE_CHANGE_TIMEOUT, TimeUnit.SECONDS));
assertEquals("Unexpected node is deleted", node, removedNodeRef.get());
}
public void testCloseStateTransitions() throws Exception
{
ReplicatedEnvironmentFacade replicatedEnvironmentFacade = createMaster();
assertEquals("Unexpected state " + replicatedEnvironmentFacade.getFacadeState(), ReplicatedEnvironmentFacade.State.OPEN, replicatedEnvironmentFacade.getFacadeState());
replicatedEnvironmentFacade.close();
assertEquals("Unexpected state " + replicatedEnvironmentFacade.getFacadeState(), ReplicatedEnvironmentFacade.State.CLOSED, replicatedEnvironmentFacade.getFacadeState());
}
private ReplicatedEnvironmentFacade createMaster() throws Exception
{
TestStateChangeListener stateChangeListener = new TestStateChangeListener(State.MASTER);
ReplicatedEnvironmentFacade env = addNode(State.MASTER, stateChangeListener, new NoopReplicationGroupListener());
assertTrue("Environment was not created", stateChangeListener.awaitForStateChange(LISTENER_TIMEOUT, TimeUnit.SECONDS));
return env;
}
private ReplicatedEnvironmentFacade addReplica(String nodeName, String nodeHostPort) throws Exception
{
return addReplica(nodeName, nodeHostPort, new NoopReplicationGroupListener());
}
private ReplicatedEnvironmentFacade addReplica(String nodeName, String nodeHostPort, ReplicationGroupListener replicationGroupListener)
throws Exception
{
TestStateChangeListener testStateChangeListener = new TestStateChangeListener(State.REPLICA);
ReplicatedEnvironmentFacade replicaEnvironmentFacade = addNode(nodeName, nodeHostPort, TEST_DESIGNATED_PRIMARY, State.REPLICA, testStateChangeListener, replicationGroupListener);
assertTrue("Replica " + nodeName + " was not started", testStateChangeListener.awaitForStateChange(LISTENER_TIMEOUT, TimeUnit.SECONDS));
return replicaEnvironmentFacade;
}
private ReplicatedEnvironmentFacade addNode(String nodeName, String nodeHostPort, boolean designatedPrimary,
State desiredState, StateChangeListener stateChangeListener, ReplicationGroupListener replicationGroupListener)
{
ReplicatedEnvironmentConfiguration config = createReplicatedEnvironmentConfiguration(nodeName, nodeHostPort, designatedPrimary);
ReplicatedEnvironmentFacade ref = new ReplicatedEnvironmentFacade(config, _remoteReplicationNodeFactory);
ref.setReplicationGroupListener(replicationGroupListener);
ref.setStateChangeListener(stateChangeListener);
_nodes.put(nodeName, ref);
return ref;
}
private ReplicatedEnvironmentFacade addNode(State desiredState, StateChangeListener stateChangeListener, ReplicationGroupListener groupChangeListener)
{
return addNode(TEST_NODE_NAME, TEST_NODE_HOST_PORT, TEST_DESIGNATED_PRIMARY, desiredState, stateChangeListener, groupChangeListener);
}
private DatabaseConfig createDatabase(ReplicatedEnvironmentFacade environmentFacade, String databaseName) throws AMQStoreException
{
DatabaseConfig dbConfig = new DatabaseConfig();
dbConfig.setTransactional(true);
dbConfig.setAllowCreate(true);
environmentFacade.openDatabases(dbConfig, databaseName);
return dbConfig;
}
private ReplicatedEnvironmentConfiguration createReplicatedEnvironmentConfiguration(String nodeName, String nodeHostPort, boolean designatedPrimary)
{
ReplicatedEnvironmentConfiguration node = mock(ReplicatedEnvironmentConfiguration.class);
when(node.getName()).thenReturn(nodeName);
when(node.getHostPort()).thenReturn(nodeHostPort);
when(node.isDesignatedPrimary()).thenReturn(designatedPrimary);
when(node.getQuorumOverride()).thenReturn(TEST_ELECTABLE_GROUP_OVERRIDE);
when(node.getPriority()).thenReturn(TEST_PRIORITY);
when(node.getGroupName()).thenReturn(TEST_GROUP_NAME);
when(node.getHelperHostPort()).thenReturn(TEST_NODE_HELPER_HOST_PORT);
when(node.getDurability()).thenReturn(TEST_DURABILITY);
when(node.isCoalescingSync()).thenReturn(TEST_COALESCING_SYNC);
Map<String, String> repConfig = new HashMap<String, String>();
repConfig.put(ReplicationConfig.REPLICA_ACK_TIMEOUT, "2 s");
repConfig.put(ReplicationConfig.INSUFFICIENT_REPLICAS_TIMEOUT, "2 s");
when(node.getReplicationParameters()).thenReturn(repConfig);
when(node.getStorePath()).thenReturn(new File(_storePath, nodeName).getAbsolutePath());
return node;
}
}