blob: 9094f021b382a25c993897b2effb113e99b182a5 [file] [log] [blame]
/*
*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*
*/
package org.apache.qpid.server.store.berkeleydb.replication;
import static org.apache.qpid.server.store.berkeleydb.EnvironmentFacade.JUL_LOGGER_LEVEL_OVERRIDE;
import static org.apache.qpid.server.store.berkeleydb.EnvironmentFacade.LOG_HANDLER_CLEANER_PROTECTED_FILES_LIMIT_PROPERTY_NAME;
import static org.apache.qpid.server.store.berkeleydb.replication.ReplicatedEnvironmentFacade.*;
import static org.hamcrest.CoreMatchers.equalTo;
import static org.hamcrest.CoreMatchers.is;
import static org.junit.Assert.assertArrayEquals;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertNotSame;
import static org.junit.Assert.assertSame;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
import static org.junit.Assume.assumeThat;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.anyBoolean;
import static org.mockito.ArgumentMatchers.anyInt;
import static org.mockito.ArgumentMatchers.anyString;
import static org.mockito.ArgumentMatchers.eq;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
import java.io.File;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.Callable;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.util.concurrent.SettableFuture;
import com.sleepycat.bind.tuple.IntegerBinding;
import com.sleepycat.bind.tuple.StringBinding;
import com.sleepycat.je.Database;
import com.sleepycat.je.DatabaseConfig;
import com.sleepycat.je.DatabaseEntry;
import com.sleepycat.je.Durability;
import com.sleepycat.je.EnvironmentConfig;
import com.sleepycat.je.LockMode;
import com.sleepycat.je.OperationStatus;
import com.sleepycat.je.Transaction;
import com.sleepycat.je.TransactionConfig;
import com.sleepycat.je.rep.NoConsistencyRequiredPolicy;
import com.sleepycat.je.rep.NodeState;
import com.sleepycat.je.rep.ReplicaWriteException;
import com.sleepycat.je.rep.ReplicatedEnvironment;
import com.sleepycat.je.rep.ReplicatedEnvironment.State;
import com.sleepycat.je.rep.ReplicationConfig;
import com.sleepycat.je.rep.ReplicationNode;
import com.sleepycat.je.rep.StateChangeEvent;
import com.sleepycat.je.rep.StateChangeListener;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.qpid.server.store.berkeleydb.EnvironmentFacade;
import org.apache.qpid.server.util.ConnectionScopedRuntimeException;
import org.apache.qpid.server.util.FileUtils;
import org.apache.qpid.test.utils.PortHelper;
import org.apache.qpid.test.utils.TestFileUtils;
import org.apache.qpid.test.utils.UnitTestBase;
import org.apache.qpid.test.utils.VirtualHostNodeStoreType;
public class ReplicatedEnvironmentFacadeTest extends UnitTestBase
{
private static final Logger LOGGER = LoggerFactory.getLogger(ReplicatedEnvironmentFacadeTest.class);
private int _timeout = 30;
private final PortHelper _portHelper = new PortHelper();
private final String TEST_GROUP_NAME = "testGroupName";
private final String TEST_NODE_NAME = "testNodeName";
private final int TEST_NODE_PORT = _portHelper.getNextAvailable();
private final String TEST_NODE_HOST_PORT = "localhost:" + TEST_NODE_PORT;
private final String TEST_NODE_HELPER_HOST_PORT = TEST_NODE_HOST_PORT;
private final Durability TEST_DURABILITY = Durability.parse("SYNC,NO_SYNC,SIMPLE_MAJORITY");
private final boolean TEST_DESIGNATED_PRIMARY = false;
private final int TEST_PRIORITY = 1;
private final int TEST_ELECTABLE_GROUP_OVERRIDE = 0;
private File _storePath;
private final Map<String, ReplicatedEnvironmentFacade> _nodes = new HashMap<String, ReplicatedEnvironmentFacade>();
private Thread.UncaughtExceptionHandler _defaultUncaughtExceptionHandler;
private CopyOnWriteArrayList<Throwable> _unhandledExceptions = new CopyOnWriteArrayList<>();
@Before
public void setUp() throws Exception
{
assumeThat(getVirtualHostNodeStoreType(), is(equalTo(VirtualHostNodeStoreType.BDB)));
_timeout = Integer.getInteger("ReplicatedEnvironmentFacadeTest.timeout", 30);
_defaultUncaughtExceptionHandler = Thread.getDefaultUncaughtExceptionHandler();
Thread.setDefaultUncaughtExceptionHandler((t, e) ->
{
LOGGER.error("Unhandled exception in thread " + t, e);
_unhandledExceptions.add(e);
});
_storePath = TestFileUtils.createTestDirectory("bdb", true);
setTestSystemProperty(DB_PING_SOCKET_TIMEOUT_PROPERTY_NAME, "100");
}
@After
public void tearDown() throws Exception
{
try
{
for (EnvironmentFacade ef : _nodes.values())
{
ef.close();
}
}
finally
{
if (_defaultUncaughtExceptionHandler != null)
{
Thread.setDefaultUncaughtExceptionHandler(_defaultUncaughtExceptionHandler);
}
try
{
if (_storePath != null)
{
FileUtils.delete(_storePath, true);
}
}
finally
{
if (!_unhandledExceptions.isEmpty())
{
fail("Unhandled exception(s) detected:" + _unhandledExceptions);
}
}
}
_portHelper.waitUntilAllocatedPortsAreFree();
}
@Test
public void testClose() throws Exception
{
ReplicatedEnvironmentFacade ef = createMaster();
ef.close();
assertEquals("Unexpected state after close",
ReplicatedEnvironmentFacade.State.CLOSED,
ef.getFacadeState());
}
@Test
public void testOpenDatabaseReusesCachedHandle() throws Exception
{
DatabaseConfig createIfAbsentDbConfig = DatabaseConfig.DEFAULT.setAllowCreate(true);
EnvironmentFacade ef = createMaster();
Database handle1 = ef.openDatabase("myDatabase", createIfAbsentDbConfig);
assertNotNull(handle1);
Database handle2 = ef.openDatabase("myDatabase", createIfAbsentDbConfig);
assertSame("Database handle should be cached", handle1, handle2);
ef.closeDatabase("myDatabase");
Database handle3 = ef.openDatabase("myDatabase", createIfAbsentDbConfig);
assertNotSame("Expecting a new handle after database closure", handle1, handle3);
}
@Test
public void testOpenDatabaseWhenFacadeIsNotOpened() throws Exception
{
DatabaseConfig createIfAbsentDbConfig = DatabaseConfig.DEFAULT.setAllowCreate(true);
EnvironmentFacade ef = createMaster();
ef.close();
try
{
ef.openDatabase("myDatabase", createIfAbsentDbConfig );
fail("Database open should fail");
}
catch(ConnectionScopedRuntimeException e)
{
assertEquals("Unexpected exception", "Environment facade is not in opened state", e.getMessage());
}
}
@Test
public void testGetGroupName() throws Exception
{
assertEquals("Unexpected group name", TEST_GROUP_NAME, createMaster().getGroupName());
}
@Test
public void testGetNodeName() throws Exception
{
assertEquals("Unexpected group name", TEST_NODE_NAME, createMaster().getNodeName());
}
@Test
public void testLastKnownReplicationTransactionId() throws Exception
{
ReplicatedEnvironmentFacade master = createMaster();
long lastKnownReplicationTransactionId = master.getLastKnownReplicationTransactionId();
assertTrue("Unexpected LastKnownReplicationTransactionId " + lastKnownReplicationTransactionId,
lastKnownReplicationTransactionId > 0);
}
@Test
public void testGetNodeHostPort() throws Exception
{
assertEquals("Unexpected node host port", TEST_NODE_HOST_PORT, createMaster().getHostPort());
}
@Test
public void testGetHelperHostPort() throws Exception
{
assertEquals("Unexpected node helper host port",
TEST_NODE_HELPER_HOST_PORT,
createMaster().getHelperHostPort());
}
@Test
public void testSetMessageStoreDurability() throws Exception
{
ReplicatedEnvironmentFacade master = createMaster();
assertEquals("Unexpected message store durability",
new Durability(Durability.SyncPolicy.NO_SYNC, Durability.SyncPolicy.NO_SYNC, Durability.ReplicaAckPolicy.SIMPLE_MAJORITY),
master.getRealMessageStoreDurability());
assertEquals("Unexpected durability", TEST_DURABILITY, master.getMessageStoreDurability());
assertTrue("Unexpected coalescing sync", master.isCoalescingSync());
master.setMessageStoreDurability(Durability.SyncPolicy.WRITE_NO_SYNC, Durability.SyncPolicy.SYNC, Durability.ReplicaAckPolicy.ALL);
assertEquals("Unexpected message store durability",
new Durability(Durability.SyncPolicy.WRITE_NO_SYNC, Durability.SyncPolicy.SYNC, Durability.ReplicaAckPolicy.ALL),
master.getRealMessageStoreDurability());
assertFalse("Coalescing sync committer is still running", master.isCoalescingSync());
}
@Test
public void testSetMessageStoreDurabilityWithDisabledCoalescingSync() throws Exception
{
ReplicatedEnvironmentFacade master = createMaster(true);
assertEquals("Unexpected message store durability",
new Durability(Durability.SyncPolicy.NO_SYNC, Durability.SyncPolicy.NO_SYNC,
Durability.ReplicaAckPolicy.SIMPLE_MAJORITY),
master.getRealMessageStoreDurability());
assertTrue("Unexpected coalescing sync", master.isCoalescingSync());
master.setMessageStoreDurability(Durability.SyncPolicy.NO_SYNC,
Durability.SyncPolicy.WRITE_NO_SYNC,
Durability.ReplicaAckPolicy.ALL);
assertEquals("Unexpected message store durability",
new Durability(Durability.SyncPolicy.NO_SYNC,
Durability.SyncPolicy.WRITE_NO_SYNC,
Durability.ReplicaAckPolicy.ALL),
master.getRealMessageStoreDurability());
assertFalse("Coalescing sync committer is still running", master.isCoalescingSync());
}
@Test
public void testGetNodeState() throws Exception
{
assertEquals("Unexpected state", State.MASTER.name(), createMaster().getNodeState());
}
@Test
public void testPriority() throws Exception
{
final TestStateChangeListener masterListener = new TestStateChangeListener();
final ReplicationGroupListener masterGroupListener = new NoopReplicationGroupListener();
ReplicatedEnvironmentConfiguration masterConfig = createReplicatedEnvironmentConfiguration(TEST_NODE_NAME, TEST_NODE_HOST_PORT, TEST_DESIGNATED_PRIMARY);
ReplicatedEnvironmentFacade facade = createReplicatedEnvironmentFacade(TEST_NODE_NAME, masterListener, masterGroupListener, masterConfig);
assertTrue("Master was not created", masterListener.awaitForStateChange(State.MASTER,
_timeout, TimeUnit.SECONDS));
assertEquals("Unexpected priority", (long) TEST_PRIORITY, (long) facade.getPriority());
int newPriority = TEST_PRIORITY + 1;
when(masterConfig.getPriority()).thenReturn(newPriority);
Future<Void> future = facade.reapplyPriority();
future.get(_timeout, TimeUnit.SECONDS);
assertEquals("Unexpected priority after change", (long) newPriority, (long) facade.getPriority());
}
@Test
public void testDesignatedPrimary() throws Exception
{
final TestStateChangeListener masterListener = new TestStateChangeListener();
final ReplicationGroupListener masterGroupListener = new NoopReplicationGroupListener();
ReplicatedEnvironmentConfiguration masterConfig = createReplicatedEnvironmentConfiguration(TEST_NODE_NAME, TEST_NODE_HOST_PORT, TEST_DESIGNATED_PRIMARY);
ReplicatedEnvironmentFacade master = createReplicatedEnvironmentFacade(TEST_NODE_NAME, masterListener, masterGroupListener, masterConfig);
assertTrue("Master was not created", masterListener.awaitForStateChange(State.MASTER,
_timeout, TimeUnit.SECONDS));
assertEquals("Unexpected designated primary", TEST_DESIGNATED_PRIMARY, master.isDesignatedPrimary());
boolean newDesignatedPrimary = !TEST_DESIGNATED_PRIMARY;
when(masterConfig.isDesignatedPrimary()).thenReturn(newDesignatedPrimary);
Future<Void> future = master.reapplyDesignatedPrimary();
future.get(_timeout, TimeUnit.SECONDS);
assertEquals("Unexpected designated primary after change",
newDesignatedPrimary,
master.isDesignatedPrimary());
}
@Test
public void testElectableGroupSizeOverride() throws Exception
{
final TestStateChangeListener masterListener = new TestStateChangeListener();
final ReplicationGroupListener masterGroupListener = new NoopReplicationGroupListener();
ReplicatedEnvironmentConfiguration masterConfig = createReplicatedEnvironmentConfiguration(TEST_NODE_NAME, TEST_NODE_HOST_PORT, false);
ReplicatedEnvironmentFacade facade = createReplicatedEnvironmentFacade(TEST_NODE_NAME, masterListener, masterGroupListener, masterConfig);
assertTrue("Master was not created", masterListener.awaitForStateChange(State.MASTER,
_timeout, TimeUnit.SECONDS));
assertEquals("Unexpected Electable Group Size Override",
(long) TEST_ELECTABLE_GROUP_OVERRIDE,
(long) facade.getElectableGroupSizeOverride());
int newElectableGroupOverride = TEST_ELECTABLE_GROUP_OVERRIDE + 1;
when(masterConfig.getQuorumOverride()).thenReturn(newElectableGroupOverride);
Future<Void> future = facade.reapplyElectableGroupSizeOverride();
future.get(_timeout, TimeUnit.SECONDS);
assertEquals("Unexpected Electable Group Size Override after change",
(long) newElectableGroupOverride,
(long) facade.getElectableGroupSizeOverride());
}
@Test
public void testReplicationGroupListenerHearsAboutExistingRemoteReplicationNodes() throws Exception
{
ReplicatedEnvironmentFacade master = createMaster();
String nodeName2 = TEST_NODE_NAME + "_2";
String host = "localhost";
int port = _portHelper.getNextAvailable();
String node2NodeHostPort = host + ":" + port;
final AtomicInteger invocationCount = new AtomicInteger();
final CountDownLatch nodeRecoveryLatch = new CountDownLatch(1);
ReplicationGroupListener listener = new NoopReplicationGroupListener()
{
@Override
public void onReplicationNodeRecovered(ReplicationNode node)
{
nodeRecoveryLatch.countDown();
invocationCount.incrementAndGet();
}
};
createReplica(nodeName2, node2NodeHostPort, listener);
assertEquals("Unexpected number of nodes", (long) 2, (long) master.getNumberOfElectableGroupMembers());
assertTrue("Listener not fired within timeout", nodeRecoveryLatch.await(_timeout, TimeUnit.SECONDS));
assertEquals("Unexpected number of listener invocations", (long) 1, (long) invocationCount.get());
}
@Test
public void testReplicationGroupListenerHearsNodeAdded() throws Exception
{
final CountDownLatch nodeAddedLatch = new CountDownLatch(1);
final AtomicInteger invocationCount = new AtomicInteger();
ReplicationGroupListener listener = new NoopReplicationGroupListener()
{
@Override
public void onReplicationNodeAddedToGroup(ReplicationNode node)
{
invocationCount.getAndIncrement();
nodeAddedLatch.countDown();
}
};
TestStateChangeListener stateChangeListener = new TestStateChangeListener();
ReplicatedEnvironmentFacade replicatedEnvironmentFacade = addNode(stateChangeListener, listener);
assertTrue("Master was not started", stateChangeListener.awaitForStateChange(State.MASTER,
_timeout, TimeUnit
.SECONDS));
assertEquals("Unexpected number of nodes at start of test",
(long) 1,
(long) replicatedEnvironmentFacade.getNumberOfElectableGroupMembers());
String node2Name = TEST_NODE_NAME + "_2";
String node2NodeHostPort = "localhost" + ":" + _portHelper.getNextAvailable();
replicatedEnvironmentFacade.setPermittedNodes(Arrays.asList(replicatedEnvironmentFacade.getHostPort(), node2NodeHostPort));
createReplica(node2Name, node2NodeHostPort, new NoopReplicationGroupListener());
assertTrue("Listener not fired within timeout", nodeAddedLatch.await(_timeout, TimeUnit.SECONDS));
assertEquals("Unexpected number of nodes",
(long) 2,
(long) replicatedEnvironmentFacade.getNumberOfElectableGroupMembers());
assertEquals("Unexpected number of listener invocations", (long) 1, (long) invocationCount.get());
}
@Test
public void testReplicationGroupListenerHearsNodeRemoved() throws Exception
{
final CountDownLatch nodeDeletedLatch = new CountDownLatch(1);
final CountDownLatch nodeAddedLatch = new CountDownLatch(1);
final AtomicInteger invocationCount = new AtomicInteger();
ReplicationGroupListener listener = new NoopReplicationGroupListener()
{
@Override
public void onReplicationNodeRecovered(ReplicationNode node)
{
nodeAddedLatch.countDown();
}
@Override
public void onReplicationNodeAddedToGroup(ReplicationNode node)
{
nodeAddedLatch.countDown();
}
@Override
public void onReplicationNodeRemovedFromGroup(ReplicationNode node)
{
invocationCount.getAndIncrement();
nodeDeletedLatch.countDown();
}
};
TestStateChangeListener stateChangeListener = new TestStateChangeListener();
ReplicatedEnvironmentFacade replicatedEnvironmentFacade = addNode(stateChangeListener, listener);
// Set the node to be primary so that the node will remain master even when the 2nd node is shutdown
replicatedEnvironmentFacade.reapplyDesignatedPrimary();
assertTrue("Master was not started", stateChangeListener.awaitForStateChange(State.MASTER,
_timeout, TimeUnit.SECONDS));
String node2Name = TEST_NODE_NAME + "_2";
String node2NodeHostPort = "localhost" + ":" + _portHelper.getNextAvailable();
replicatedEnvironmentFacade.setPermittedNodes(Arrays.asList(replicatedEnvironmentFacade.getHostPort(), node2NodeHostPort));
createReplica(node2Name, node2NodeHostPort, new NoopReplicationGroupListener());
assertEquals("Unexpected number of nodes at start of test",
(long) 2,
(long) replicatedEnvironmentFacade.getNumberOfElectableGroupMembers());
// Need to await the listener hearing the addition of the node to the model.
assertTrue("Node add not fired within timeout", nodeAddedLatch.await(_timeout, TimeUnit.SECONDS));
// Now remove the node and ensure we hear the event
replicatedEnvironmentFacade.removeNodeFromGroup(node2Name);
assertTrue("Node delete not fired within timeout", nodeDeletedLatch.await(_timeout, TimeUnit.SECONDS));
assertEquals("Unexpected number of nodes after node removal",
(long) 1,
(long) replicatedEnvironmentFacade.getNumberOfElectableGroupMembers());
assertEquals("Unexpected number of listener invocations", (long) 1, (long) invocationCount.get());
}
@Test
public void testMasterHearsRemoteNodeRoles() throws Exception
{
final String node2Name = TEST_NODE_NAME + "_2";
final CountDownLatch nodeAddedLatch = new CountDownLatch(1);
final AtomicReference<ReplicationNode> nodeRef = new AtomicReference<ReplicationNode>();
final CountDownLatch stateLatch = new CountDownLatch(1);
final AtomicReference<NodeState> stateRef = new AtomicReference<NodeState>();
ReplicationGroupListener listener = new NoopReplicationGroupListener()
{
@Override
public void onReplicationNodeAddedToGroup(ReplicationNode node)
{
nodeRef.set(node);
nodeAddedLatch.countDown();
}
@Override
public void onNodeState(ReplicationNode node, NodeState nodeState)
{
if (node2Name.equals(node.getName()))
{
stateRef.set(nodeState);
stateLatch.countDown();
}
}
};
TestStateChangeListener stateChangeListener = new TestStateChangeListener();
ReplicatedEnvironmentFacade replicatedEnvironmentFacade = addNode(stateChangeListener, listener);
assertTrue("Master was not started", stateChangeListener.awaitForStateChange(State.MASTER,
_timeout, TimeUnit
.SECONDS));
String node2NodeHostPort = "localhost" + ":" + _portHelper.getNextAvailable();
replicatedEnvironmentFacade.setPermittedNodes(Arrays.asList(replicatedEnvironmentFacade.getHostPort(), node2NodeHostPort));
createReplica(node2Name, node2NodeHostPort, new NoopReplicationGroupListener());
assertEquals("Unexpected number of nodes at start of test",
(long) 2,
(long) replicatedEnvironmentFacade.getNumberOfElectableGroupMembers());
assertTrue("Node add not fired within timeout", nodeAddedLatch.await(_timeout, TimeUnit.SECONDS));
ReplicationNode remoteNode = (ReplicationNode)nodeRef.get();
assertEquals("Unexpected node name", node2Name, remoteNode.getName());
assertTrue("Node state not fired within timeout", stateLatch.await(_timeout, TimeUnit.SECONDS));
assertEquals("Unexpected node state", State.REPLICA, stateRef.get().getNodeState());
}
@Test
public void testRemoveNodeFromGroup() throws Exception
{
TestStateChangeListener stateChangeListener = new TestStateChangeListener();
ReplicatedEnvironmentFacade environmentFacade = addNode(TEST_NODE_NAME, TEST_NODE_HOST_PORT, true, stateChangeListener, new NoopReplicationGroupListener());
assertTrue("Environment was not created", stateChangeListener.awaitForStateChange(State.MASTER,
_timeout, TimeUnit.SECONDS));
String node2Name = TEST_NODE_NAME + "_2";
String node2NodeHostPort = "localhost:" + _portHelper.getNextAvailable();
ReplicatedEnvironmentFacade ref2 = createReplica(node2Name, node2NodeHostPort, new NoopReplicationGroupListener());
assertEquals("Unexpected group members count",
(long) 2,
(long) environmentFacade.getNumberOfElectableGroupMembers());
ref2.close();
environmentFacade.removeNodeFromGroup(node2Name);
assertEquals("Unexpected group members count",
(long) 1,
(long) environmentFacade.getNumberOfElectableGroupMembers());
}
@Test
public void testRemoveNodeFromGroupTwice() throws Exception
{
TestStateChangeListener stateChangeListener = new TestStateChangeListener();
ReplicatedEnvironmentFacade environmentFacade = addNode(TEST_NODE_NAME,
TEST_NODE_HOST_PORT,
true,
stateChangeListener,
new NoopReplicationGroupListener());
assertTrue("Environment was not created", stateChangeListener.awaitForStateChange(State.MASTER,
_timeout, TimeUnit.SECONDS));
String node2Name = TEST_NODE_NAME + "_2";
String node2NodeHostPort = "localhost:" + _portHelper.getNextAvailable();
ReplicatedEnvironmentFacade ref2 =
createReplica(node2Name, node2NodeHostPort, new NoopReplicationGroupListener());
ref2.close();
environmentFacade.removeNodeFromGroup(node2Name);
try
{
environmentFacade.removeNodeFromGroup(node2Name);
fail("Exception is expected");
}
catch (IllegalArgumentException e)
{
// pass
}
}
@Test
public void testEnvironmentFacadeDetectsRemovalOfRemoteNode() throws Exception
{
final String replicaName = TEST_NODE_NAME + "_1";
final CountDownLatch nodeRemovedLatch = new CountDownLatch(1);
final CountDownLatch nodeAddedLatch = new CountDownLatch(1);
final AtomicReference<ReplicationNode> addedNodeRef = new AtomicReference<ReplicationNode>();
final AtomicReference<ReplicationNode> removedNodeRef = new AtomicReference<ReplicationNode>();
final CountDownLatch stateLatch = new CountDownLatch(1);
final AtomicReference<NodeState> stateRef = new AtomicReference<NodeState>();
ReplicationGroupListener listener = new NoopReplicationGroupListener()
{
@Override
public void onReplicationNodeAddedToGroup(ReplicationNode node)
{
if (addedNodeRef.compareAndSet(null, node))
{
nodeAddedLatch.countDown();
}
}
@Override
public void onReplicationNodeRemovedFromGroup(ReplicationNode node)
{
removedNodeRef.set(node);
nodeRemovedLatch.countDown();
}
@Override
public void onNodeState(ReplicationNode node, NodeState nodeState)
{
if (replicaName.equals(node.getName()))
{
stateRef.set(nodeState);
stateLatch.countDown();
}
}
};
TestStateChangeListener stateChangeListener = new TestStateChangeListener();
final ReplicatedEnvironmentFacade masterEnvironment = addNode(stateChangeListener, listener);
assertTrue("Master was not started", stateChangeListener.awaitForStateChange(State.MASTER,
_timeout, TimeUnit.SECONDS));
masterEnvironment.reapplyDesignatedPrimary();
int replica1Port = _portHelper.getNextAvailable();
String node1NodeHostPort = "localhost:" + replica1Port;
masterEnvironment.setPermittedNodes(Arrays.asList(masterEnvironment.getHostPort(), node1NodeHostPort));
ReplicatedEnvironmentFacade replica = createReplica(replicaName, node1NodeHostPort, new NoopReplicationGroupListener());
assertTrue("Node should be added", nodeAddedLatch.await(_timeout, TimeUnit.SECONDS));
ReplicationNode node = addedNodeRef.get();
assertEquals("Unexpected node name", replicaName, node.getName());
assertTrue("Node state was not heard", stateLatch.await(_timeout, TimeUnit.SECONDS));
assertEquals("Unexpected node role", State.REPLICA, stateRef.get().getNodeState());
assertEquals("Unexpected node name", replicaName, stateRef.get().getNodeName());
replica.close();
masterEnvironment.removeNodeFromGroup(node.getName());
assertTrue("Node deleting is undetected by the environment facade",
nodeRemovedLatch.await(_timeout, TimeUnit.SECONDS));
assertEquals("Unexpected node is deleted", node, removedNodeRef.get());
}
@Test
public void testCloseStateTransitions() throws Exception
{
ReplicatedEnvironmentFacade replicatedEnvironmentFacade = createMaster();
assertEquals("Unexpected state " + replicatedEnvironmentFacade.getFacadeState(),
ReplicatedEnvironmentFacade.State.OPEN,
replicatedEnvironmentFacade.getFacadeState());
replicatedEnvironmentFacade.close();
assertEquals("Unexpected state " + replicatedEnvironmentFacade.getFacadeState(),
ReplicatedEnvironmentFacade.State.CLOSED,
replicatedEnvironmentFacade.getFacadeState());
}
@Test
public void testEnvironmentAutomaticallyRestartsAndBecomesUnknownOnInsufficientReplicas() throws Exception
{
final CountDownLatch masterLatch = new CountDownLatch(1);
final CountDownLatch secondMasterLatch = new CountDownLatch(1);
final AtomicInteger masterStateChangeCount = new AtomicInteger();
final CountDownLatch unknownLatch = new CountDownLatch(1);
final AtomicInteger unknownStateChangeCount = new AtomicInteger();
StateChangeListener stateChangeListener = new StateChangeListener()
{
@Override
public void stateChange(StateChangeEvent stateChangeEvent) throws RuntimeException
{
if (stateChangeEvent.getState() == State.MASTER)
{
masterStateChangeCount.incrementAndGet();
if (masterLatch.getCount() == 1)
{
masterLatch.countDown();
}
else
{
secondMasterLatch.countDown();
}
}
else if (stateChangeEvent.getState() == State.UNKNOWN)
{
unknownStateChangeCount.incrementAndGet();
unknownLatch.countDown();
}
}
};
// make sure that node is re-elected as MASTER on second start-up
ReplicatedEnvironmentConfiguration config = createReplicatedEnvironmentConfiguration(TEST_NODE_NAME, TEST_NODE_HOST_PORT, TEST_DESIGNATED_PRIMARY);
when(config.getPriority()).thenReturn(2);
createReplicatedEnvironmentFacade(TEST_NODE_NAME, stateChangeListener, new NoopReplicationGroupListener(), config);
assertTrue("Master was not started", masterLatch.await(_timeout, TimeUnit.SECONDS));
int replica1Port = _portHelper.getNextAvailable();
String node1NodeHostPort = "localhost:" + replica1Port;
int replica2Port = _portHelper.getNextAvailable();
String node2NodeHostPort = "localhost:" + replica2Port;
ReplicatedEnvironmentFacade replica1 = createReplica(TEST_NODE_NAME + "_1", node1NodeHostPort, new NoopReplicationGroupListener());
ReplicatedEnvironmentFacade replica2 = createReplica(TEST_NODE_NAME + "_2", node2NodeHostPort, new NoopReplicationGroupListener());
// close replicas
replica1.close();
replica2.close();
assertTrue("Environment should be recreated and go into unknown state", unknownLatch.await(_timeout,
TimeUnit.SECONDS));
// bring back the cluster in order to make sure that no extra state transition happens between UNKNOWN and MASTER
createReplica(TEST_NODE_NAME + "_1", node1NodeHostPort, new NoopReplicationGroupListener());
assertTrue("Master node did not resume", secondMasterLatch.await(_timeout, TimeUnit.SECONDS));
assertEquals("Node transited into Master state unexpected number of times",
(long) 2,
(long) masterStateChangeCount.get());
assertEquals("Node transited into Unknown state unexpected number of times",
(long) 1,
(long) unknownStateChangeCount.get());
}
@Test
public void testTransferMasterToSelf() throws Exception
{
final CountDownLatch firstNodeReplicaStateLatch = new CountDownLatch(1);
final CountDownLatch firstNodeMasterStateLatch = new CountDownLatch(1);
StateChangeListener stateChangeListener = new StateChangeListener(){
@Override
public void stateChange(StateChangeEvent event) throws RuntimeException
{
ReplicatedEnvironment.State state = event.getState();
if (state == ReplicatedEnvironment.State.REPLICA)
{
firstNodeReplicaStateLatch.countDown();
}
if (state == ReplicatedEnvironment.State.MASTER)
{
firstNodeMasterStateLatch.countDown();
}
}
};
ReplicatedEnvironmentFacade firstNode = addNode(stateChangeListener, new NoopReplicationGroupListener());
assertTrue("Environment did not become a master",
firstNodeMasterStateLatch.await(_timeout, TimeUnit.SECONDS));
int replica1Port = _portHelper.getNextAvailable();
String node1NodeHostPort = "localhost:" + replica1Port;
ReplicatedEnvironmentFacade secondNode = createReplica(TEST_NODE_NAME + "_1", node1NodeHostPort, new NoopReplicationGroupListener());
assertEquals("Unexpected state", State.REPLICA.name(), secondNode.getNodeState());
int replica2Port = _portHelper.getNextAvailable();
String node2NodeHostPort = "localhost:" + replica2Port;
final CountDownLatch replicaStateLatch = new CountDownLatch(1);
final CountDownLatch masterStateLatch = new CountDownLatch(1);
StateChangeListener testStateChangeListener = new StateChangeListener()
{
@Override
public void stateChange(StateChangeEvent event) throws RuntimeException
{
ReplicatedEnvironment.State state = event.getState();
if (state == ReplicatedEnvironment.State.REPLICA)
{
replicaStateLatch.countDown();
}
if (state == ReplicatedEnvironment.State.MASTER)
{
masterStateLatch.countDown();
}
}
};
ReplicatedEnvironmentFacade thirdNode = addNode(TEST_NODE_NAME + "_2", node2NodeHostPort, TEST_DESIGNATED_PRIMARY,
testStateChangeListener, new NoopReplicationGroupListener());
assertTrue("Environment did not become a replica", replicaStateLatch.await(_timeout, TimeUnit.SECONDS));
assertEquals((long) 3, (long) thirdNode.getNumberOfElectableGroupMembers());
thirdNode.transferMasterToSelfAsynchronously();
assertTrue("Environment did not become a master", masterStateLatch.await(_timeout, TimeUnit.SECONDS));
assertTrue("First node environment did not become a replica", firstNodeReplicaStateLatch.await(
_timeout, TimeUnit.SECONDS));
assertEquals("Unexpected state", State.REPLICA.name(), firstNode.getNodeState());
}
@Test
public void testTransferMasterAnotherNode() throws Exception
{
final CountDownLatch firstNodeReplicaStateLatch = new CountDownLatch(1);
final CountDownLatch firstNodeMasterStateLatch = new CountDownLatch(1);
StateChangeListener stateChangeListener = new StateChangeListener(){
@Override
public void stateChange(StateChangeEvent event) throws RuntimeException
{
ReplicatedEnvironment.State state = event.getState();
if (state == ReplicatedEnvironment.State.REPLICA)
{
firstNodeReplicaStateLatch.countDown();
}
if (state == ReplicatedEnvironment.State.MASTER)
{
firstNodeMasterStateLatch.countDown();
}
}
};
ReplicatedEnvironmentFacade firstNode = addNode(stateChangeListener, new NoopReplicationGroupListener());
assertTrue("Environment did not become a master",
firstNodeMasterStateLatch.await(_timeout, TimeUnit.SECONDS));
int replica1Port = _portHelper.getNextAvailable();
String node1NodeHostPort = "localhost:" + replica1Port;
ReplicatedEnvironmentFacade secondNode = createReplica(TEST_NODE_NAME + "_1", node1NodeHostPort, new NoopReplicationGroupListener());
assertEquals("Unexpected state", State.REPLICA.name(), secondNode.getNodeState());
int replica2Port = _portHelper.getNextAvailable();
String node2NodeHostPort = "localhost:" + replica2Port;
final CountDownLatch replicaStateLatch = new CountDownLatch(1);
final CountDownLatch masterStateLatch = new CountDownLatch(1);
StateChangeListener testStateChangeListener = new StateChangeListener()
{
@Override
public void stateChange(StateChangeEvent event) throws RuntimeException
{
ReplicatedEnvironment.State state = event.getState();
if (state == ReplicatedEnvironment.State.REPLICA)
{
replicaStateLatch.countDown();
}
if (state == ReplicatedEnvironment.State.MASTER)
{
masterStateLatch.countDown();
}
}
};
String thirdNodeName = TEST_NODE_NAME + "_2";
ReplicatedEnvironmentFacade thirdNode = addNode(thirdNodeName, node2NodeHostPort, TEST_DESIGNATED_PRIMARY,
testStateChangeListener, new NoopReplicationGroupListener());
assertTrue("Environment did not become a replica", replicaStateLatch.await(_timeout, TimeUnit.SECONDS));
assertEquals((long) 3, (long) thirdNode.getNumberOfElectableGroupMembers());
firstNode.transferMasterAsynchronously(thirdNodeName);
assertTrue("Environment did not become a master", masterStateLatch.await(_timeout, TimeUnit.SECONDS));
assertTrue("First node environment did not become a replica", firstNodeReplicaStateLatch.await(
_timeout, TimeUnit.SECONDS));
assertEquals("Unexpected state", State.REPLICA.name(), firstNode.getNodeState());
}
@Test
public void testBeginTransaction() throws Exception
{
ReplicatedEnvironmentFacade facade = createMaster();
Transaction txn = null;
try
{
TransactionConfig transactionConfig = new TransactionConfig();
transactionConfig.setDurability(facade.getRealMessageStoreDurability());
txn = facade.beginTransaction(transactionConfig);
assertNotNull("Transaction is not created", txn);
txn.commit();
txn = null;
}
finally
{
if (txn != null)
{
txn.abort();
}
}
}
@Test
public void testSetPermittedNodes() throws Exception
{
ReplicatedEnvironmentFacade firstNode = createMaster();
Set<String> permittedNodes = new HashSet<String>();
permittedNodes.add("localhost:" + TEST_NODE_PORT);
permittedNodes.add("localhost:" + _portHelper.getNextAvailable());
firstNode.setPermittedNodes(permittedNodes);
ReplicationNodeImpl replicationNode = new ReplicationNodeImpl(TEST_NODE_NAME, TEST_NODE_HOST_PORT);
NodeState nodeState = getRemoteNodeState(TEST_GROUP_NAME, replicationNode, 5000);
ObjectMapper objectMapper = new ObjectMapper();
Map<String, Object> settings = objectMapper.readValue(nodeState.getAppState(), Map.class);
Collection<String> appStatePermittedNodes = (Collection<String>)settings.get(PERMITTED_NODE_LIST);
assertEquals("Unexpected permitted nodes", permittedNodes, new HashSet<String>(appStatePermittedNodes));
}
@Test
public void testPermittedNodeIsAllowedToConnect() throws Exception
{
ReplicatedEnvironmentFacade firstNode = createMaster();
int replica1Port = _portHelper.getNextAvailable();
String node1NodeHostPort = "localhost:" + replica1Port;
Set<String> permittedNodes = new HashSet<String>();
permittedNodes.add("localhost:" + TEST_NODE_PORT);
permittedNodes.add(node1NodeHostPort);
firstNode.setPermittedNodes(permittedNodes);
ReplicatedEnvironmentConfiguration configuration = createReplicatedEnvironmentConfiguration(TEST_NODE_NAME + "_1", node1NodeHostPort, false);
when(configuration.getHelperNodeName()).thenReturn(TEST_NODE_NAME);
TestStateChangeListener stateChangeListener = new TestStateChangeListener();
ReplicatedEnvironmentFacade secondNode = createReplicatedEnvironmentFacade(TEST_NODE_NAME + "_1",
stateChangeListener, new NoopReplicationGroupListener(), configuration);
assertTrue("Environment was not created", stateChangeListener.awaitForStateChange(State.REPLICA,
_timeout, TimeUnit.SECONDS));
assertEquals("Unexpected state", State.REPLICA.name(), secondNode.getNodeState());
}
@Test
public void testIntruderNodeIsDetected() throws Exception
{
final CountDownLatch intruderLatch = new CountDownLatch(1);
ReplicationGroupListener listener = new NoopReplicationGroupListener()
{
@Override
public boolean onIntruderNode(ReplicationNode node)
{
intruderLatch.countDown();
return true;
}
};
ReplicatedEnvironmentFacade firstNode = createMaster(listener);
int replica1Port = _portHelper.getNextAvailable();
String node1NodeHostPort = "localhost:" + replica1Port;
Set<String> permittedNodes = new HashSet<String>();
permittedNodes.add("localhost:" + TEST_NODE_PORT);
firstNode.setPermittedNodes(permittedNodes);
String nodeName = TEST_NODE_NAME + "_1";
createIntruder(nodeName, node1NodeHostPort);
assertTrue("Intruder node was not detected", intruderLatch.await(_timeout, TimeUnit.SECONDS));
}
@Test
public void testNodeRolledback() throws Exception
{
DatabaseConfig createConfig = new DatabaseConfig();
createConfig.setAllowCreate(true);
createConfig.setTransactional(true);
TestStateChangeListener masterListener = new TestStateChangeListener();
ReplicatedEnvironmentFacade node1 = addNode(TEST_NODE_NAME, TEST_NODE_HOST_PORT, true, masterListener, new NoopReplicationGroupListener());
assertTrue("Environment was not created", masterListener.awaitForStateChange(State.MASTER,
_timeout, TimeUnit.SECONDS));
String replicaNodeHostPort = "localhost:" + _portHelper.getNextAvailable();
String replicaName = TEST_NODE_NAME + 1;
ReplicatedEnvironmentFacade node2 = createReplica(replicaName, replicaNodeHostPort, new NoopReplicationGroupListener());
Database db = node1.openDatabase("mydb", createConfig);
// Put a record (that will be replicated)
putRecord(node1, db, 1, "value1");
node2.close();
// Put a record (that will be only on node1 as node2 is now offline)
putRecord(node1, db, 2, "value2");
db.close();
// Stop node1
node1.close();
LOGGER.debug("RESTARTING " + replicaName);
// Restart the node2, making it primary so it becomes master
TestStateChangeListener node2StateChangeListener = new TestStateChangeListener();
node2 = addNode(replicaName, replicaNodeHostPort, true, node2StateChangeListener, new NoopReplicationGroupListener());
boolean awaitForStateChange = node2StateChangeListener.awaitForStateChange(State.MASTER,
_timeout, TimeUnit.SECONDS);
assertTrue(replicaName + " did not go into desired state; current actual state is "
+ node2StateChangeListener.getCurrentActualState(), awaitForStateChange);
db = node2.openDatabase("mydb", DatabaseConfig.DEFAULT);
// Do a transaction on node2. The two environments will have diverged
putRecord(node2, db, 3, "diverged");
LOGGER.debug("RESTARTING " + TEST_NODE_NAME);
// Now restart node1 and ensure that it realises it needs to rollback before it can rejoin.
TestStateChangeListener node1StateChangeListener = new TestStateChangeListener();
final CountDownLatch _replicaRolledback = new CountDownLatch(1);
node1 = addNode(node1StateChangeListener, new NoopReplicationGroupListener()
{
@Override
public void onNodeRolledback()
{
LOGGER.debug("onNodeRolledback in " + TEST_NODE_NAME);
_replicaRolledback.countDown();
}
});
assertTrue("Node 1 did not go into desired state", node1StateChangeListener.awaitForStateChange(State.REPLICA,
_timeout, TimeUnit.SECONDS));
assertTrue("Node 1 did not experience rollback within timeout",
_replicaRolledback.await(_timeout, TimeUnit.SECONDS));
// Finally do one more transaction through the master
putRecord(node2, db, 4, "value4");
db.close();
LOGGER.debug("CLOSING");
node1.close();
node2.close();
}
@Test
public void testReplicaTransactionBeginsImmediately() throws Exception
{
ReplicatedEnvironmentFacade master = createMaster();
String nodeName2 = TEST_NODE_NAME + "_2";
String host = "localhost";
int port = _portHelper.getNextAvailable();
String node2NodeHostPort = host + ":" + port;
final ReplicatedEnvironmentFacade replica = createReplica(nodeName2, node2NodeHostPort, new NoopReplicationGroupListener() );
// close the master
master.close();
// try to create a transaction in a separate thread
// and make sure that transaction is created immediately.
ExecutorService service = Executors.newSingleThreadExecutor();
try
{
Future<Transaction> future = service.submit(new Callable<Transaction>(){
@Override
public Transaction call() throws Exception
{
return replica.beginTransaction(null);
}
});
Transaction transaction = future.get(_timeout, TimeUnit.SECONDS);
assertNotNull("Transaction was not created during expected time", transaction);
transaction.abort();
}
finally
{
service.shutdown();
}
}
@Test
public void testReplicaWriteExceptionIsConvertedIntoConnectionScopedRuntimeException() throws Exception
{
ReplicatedEnvironmentFacade master = createMaster();
String nodeName2 = TEST_NODE_NAME + "_2";
String host = "localhost";
int port = _portHelper.getNextAvailable();
String node2NodeHostPort = host + ":" + port;
final ReplicatedEnvironmentFacade replica = createReplica(nodeName2, node2NodeHostPort, new NoopReplicationGroupListener() );
// close the master
master.close();
try
{
replica.openDatabase("test", DatabaseConfig.DEFAULT.setAllowCreate(true) );
fail("Replica write operation should fail");
}
catch(ReplicaWriteException e)
{
RuntimeException handledException = master.handleDatabaseException("test", e);
final boolean condition = handledException instanceof ConnectionScopedRuntimeException;
assertTrue("Unexpected exception", condition);
}
}
@Test
public void testSetElectableGroupSizeOverrideAfterMajorityLost() throws Exception
{
final SettableFuture<Boolean> majorityLost = SettableFuture.create();
final TestStateChangeListener masterListener = new TestStateChangeListener();
ReplicationGroupListener masterGroupListener = new NoopReplicationGroupListener()
{
@Override
public void onNoMajority()
{
majorityLost.set(true);
}
};
ReplicatedEnvironmentConfiguration masterConfig = createReplicatedEnvironmentConfiguration(TEST_NODE_NAME, TEST_NODE_HOST_PORT, false);
ReplicatedEnvironmentFacade master = createReplicatedEnvironmentFacade(TEST_NODE_NAME, masterListener, masterGroupListener, masterConfig);
assertTrue("Master was not created", masterListener.awaitForStateChange(State.MASTER,
_timeout, TimeUnit.SECONDS));
int replica1Port = _portHelper.getNextAvailable();
String node1NodeHostPort = "localhost:" + replica1Port;
int replica2Port = _portHelper.getNextAvailable();
String node2NodeHostPort = "localhost:" + replica2Port;
master.setPermittedNodes(Arrays.asList(master.getHostPort(), node1NodeHostPort, node2NodeHostPort));
ReplicatedEnvironmentFacade replica1 = createReplica(TEST_NODE_NAME + "_1", node1NodeHostPort, new NoopReplicationGroupListener());
ReplicatedEnvironmentFacade replica2 = createReplica(TEST_NODE_NAME + "_2", node2NodeHostPort, new NoopReplicationGroupListener());
replica1.close();
replica2.close();
assertTrue("Node that was master did not become detached after the replica closed",
masterListener.awaitForStateChange(State.DETACHED, _timeout, TimeUnit.SECONDS));
assertTrue("Majority lost is undetected", majorityLost.get(_timeout, TimeUnit.SECONDS));
assertEquals("Unexpected facade state",
ReplicatedEnvironmentFacade.State.RESTARTING,
master.getFacadeState());
when(masterConfig.getQuorumOverride()).thenReturn(1);
master.reapplyElectableGroupSizeOverride();
assertTrue(
"Master did not become available again following the application of the electable group override",
masterListener.awaitForStateChange(State.MASTER, _timeout, TimeUnit.SECONDS));
}
@Test
public void testSetDesignatedPrimaryAfterMajorityLost() throws Exception
{
final SettableFuture<Boolean> majorityLost = SettableFuture.create();
final TestStateChangeListener masterListener = new TestStateChangeListener();
final NoopReplicationGroupListener masterGroupListener = new NoopReplicationGroupListener()
{
@Override
public void onNoMajority()
{
super.onNoMajority();
majorityLost.set(true);
}
};
ReplicatedEnvironmentConfiguration masterConfig = createReplicatedEnvironmentConfiguration(TEST_NODE_NAME, TEST_NODE_HOST_PORT, false);
ReplicatedEnvironmentFacade master = createReplicatedEnvironmentFacade(TEST_NODE_NAME, masterListener, masterGroupListener, masterConfig);
assertTrue("Master was not created", masterListener.awaitForStateChange(State.MASTER,
_timeout, TimeUnit.SECONDS));
int replicaPort = _portHelper.getNextAvailable();
String replicaNodeHostPort = "localhost:" + replicaPort;
master.setPermittedNodes(Arrays.asList(master.getHostPort(), replicaNodeHostPort));
ReplicatedEnvironmentFacade replica1 = createReplica(TEST_NODE_NAME + "_1", replicaNodeHostPort, new NoopReplicationGroupListener());
replica1.close();
assertTrue("Node that was master did not become detached after the replica closed",
masterListener.awaitForStateChange(State.DETACHED, _timeout, TimeUnit.SECONDS));
assertTrue("Majority lost is undetected", majorityLost.get(_timeout, TimeUnit.SECONDS));
assertEquals("Unexpected facade state",
ReplicatedEnvironmentFacade.State.RESTARTING,
master.getFacadeState());
when(masterConfig.isDesignatedPrimary()).thenReturn(true);
master.reapplyDesignatedPrimary();
assertTrue("Master did not become available again following designated primary",
masterListener.awaitForStateChange(State.MASTER, _timeout, TimeUnit.SECONDS));
}
@Test
public void testNodeCommitNoSyncWithCoalescing() throws Exception
{
DatabaseConfig createConfig = new DatabaseConfig();
createConfig.setAllowCreate(true);
createConfig.setTransactional(true);
TestStateChangeListener masterListener = new TestStateChangeListener();
ReplicatedEnvironmentFacade node1 =
addNodeWithDurability(TEST_NODE_NAME, TEST_NODE_HOST_PORT, true, masterListener, new NoopReplicationGroupListener(), false,"NO_SYNC,NO_SYNC,NONE");
assertTrue("Environment was not created", masterListener.awaitForStateChange(State.MASTER,
_timeout, TimeUnit.SECONDS));
String replicaNodeHostPort = "localhost:" + _portHelper.getNextAvailable();
String replicaName = TEST_NODE_NAME + 1;
ReplicatedEnvironmentFacade node2 =
createReplica(replicaName, replicaNodeHostPort, new NoopReplicationGroupListener());
Database db = node1.openDatabase("mydb", createConfig);
int key = 1;
String data = "value";
// Put a record (using commitNoSync)
TransactionConfig transactionConfig = addTestKeyValueWithCommitNoSync(node1, db, key, data);
db.close();
node1.close();
node2.close();
LOGGER.debug("RESTARTING " + TEST_NODE_NAME);
node1 = addNodeWithDurability(TEST_NODE_NAME, TEST_NODE_HOST_PORT, true, masterListener, new NoopReplicationGroupListener(), false,"NO_SYNC,NO_SYNC,NONE");
boolean awaitForStateChange = masterListener.awaitForStateChange(State.MASTER,
_timeout, TimeUnit.SECONDS);
LOGGER.debug("RESTARTING " + replicaName);
TestStateChangeListener node2StateChangeListener = new TestStateChangeListener();
node2 = addNode(replicaName,
replicaNodeHostPort,
false,
node2StateChangeListener,
new NoopReplicationGroupListener());
db = node1.openDatabase("mydb", DatabaseConfig.DEFAULT);
byte[] resultData = getTestKeyValue(node1, db, key, transactionConfig);
DatabaseEntry dbData = getDatabaseEntry(data);
assertArrayEquals(resultData, dbData.getData());
assertEquals("value", StringBinding.entryToString(dbData));
db.close();
LOGGER.debug("CLOSING");
node1.close();
node2.close();
}
@Test
public void testNodeCommitSyncWithoutCoalescing() throws Exception
{
DatabaseConfig createConfig = new DatabaseConfig();
createConfig.setAllowCreate(true);
createConfig.setTransactional(true);
TestStateChangeListener masterListener = new TestStateChangeListener();
ReplicatedEnvironmentFacade node1 =
addNodeWithDurability(TEST_NODE_NAME, TEST_NODE_HOST_PORT, true, masterListener, new NoopReplicationGroupListener(),true,"SYNC,NO_SYNC,NONE");
assertTrue("Environment was not created", masterListener.awaitForStateChange(State.MASTER,
_timeout, TimeUnit.SECONDS));
String replicaNodeHostPort = "localhost:" + _portHelper.getNextAvailable();
String replicaName = TEST_NODE_NAME + 1;
ReplicatedEnvironmentFacade node2 =
createReplica(replicaName, replicaNodeHostPort, new NoopReplicationGroupListener());
Database db = node1.openDatabase("mydb", createConfig);
int key = 1;
String data = "value";
// Put a record (using commitNoSync)
TransactionConfig transactionConfig = addTestKeyValueWithCommitNoSync(node1, db, key, data);
db.close();
node1.close();
node2.close();
LOGGER.debug("RESTARTING " + TEST_NODE_NAME);
node1 = addNodeWithDurability(TEST_NODE_NAME, TEST_NODE_HOST_PORT, true, masterListener, new NoopReplicationGroupListener(),true,"SYNC,NO_SYNC,NONE");
boolean awaitForStateChange = masterListener.awaitForStateChange(State.MASTER,
_timeout, TimeUnit.SECONDS);
LOGGER.debug("RESTARTING " + replicaName);
TestStateChangeListener node2StateChangeListener = new TestStateChangeListener();
node2 = addNode(replicaName,
replicaNodeHostPort,
false,
node2StateChangeListener,
new NoopReplicationGroupListener());
db = node1.openDatabase("mydb", DatabaseConfig.DEFAULT);
byte[] resultData = getTestKeyValue(node1, db, key, transactionConfig);
DatabaseEntry dbData = getDatabaseEntry(data);
assertArrayEquals(resultData, dbData.getData());
assertEquals("value", StringBinding.entryToString(dbData));
db.close();
LOGGER.debug("CLOSING");
node1.close();
node2.close();
}
private DatabaseEntry getDatabaseEntry(final String data)
{
DatabaseEntry dbData = new DatabaseEntry();
StringBinding.stringToEntry(data, dbData);
return dbData;
}
private DatabaseEntry getDatabaseEntry(final int data)
{
DatabaseEntry dbData = new DatabaseEntry();
IntegerBinding.intToEntry(data, dbData);
return dbData;
}
private byte[] getTestKeyValue(final ReplicatedEnvironmentFacade node1,
final Database db,
final int keyValue,
final TransactionConfig transactionConfig)
{
Transaction txn = node1.beginTransaction(transactionConfig);
DatabaseEntry key = getDatabaseEntry(keyValue);
DatabaseEntry result = new DatabaseEntry();
OperationStatus status = db.get(txn, key, result, LockMode.READ_UNCOMMITTED);
txn.commit();
byte[] resultData = new byte[0];
if (status == OperationStatus.SUCCESS)
{
resultData = result.getData();
}
return resultData;
}
private TransactionConfig addTestKeyValueWithCommitNoSync(final ReplicatedEnvironmentFacade node1,
final Database db,
final int keyValue, final String dataValue)
{
DatabaseEntry key = getDatabaseEntry(keyValue);
DatabaseEntry data = getDatabaseEntry(dataValue);
TransactionConfig transactionConfig = new TransactionConfig();
transactionConfig.setDurability(node1.getRealMessageStoreDurability());
Transaction txn = node1.beginTransaction(null);
db.put(txn, key, data);
node1.commitNoSync(txn);
return transactionConfig;
}
private void putRecord(final ReplicatedEnvironmentFacade master, final Database db, final int keyValue,
final String dataValue)
{
DatabaseEntry key = getDatabaseEntry(keyValue);
DatabaseEntry data = getDatabaseEntry(dataValue);
TransactionConfig transactionConfig = new TransactionConfig();
transactionConfig.setDurability(master.getRealMessageStoreDurability());
Transaction txn = master.beginTransaction(transactionConfig);
db.put(txn, key, data);
txn.commit();
}
private void createIntruder(String nodeName, String node1NodeHostPort)
{
File environmentPathFile = new File(_storePath, nodeName);
environmentPathFile.mkdirs();
ReplicationConfig replicationConfig = new ReplicationConfig(TEST_GROUP_NAME, nodeName, node1NodeHostPort);
replicationConfig.setHelperHosts(TEST_NODE_HOST_PORT);
replicationConfig.setConsistencyPolicy(NoConsistencyRequiredPolicy.NO_CONSISTENCY);
EnvironmentConfig envConfig = new EnvironmentConfig();
envConfig.setAllowCreate(true);
envConfig.setTransactional(true);
envConfig.setDurability(TEST_DURABILITY);
ReplicatedEnvironment intruder = null;
try
{
intruder = new ReplicatedEnvironment(environmentPathFile, replicationConfig, envConfig);
}
finally
{
if (intruder != null)
{
intruder.close();
}
}
}
private ReplicatedEnvironmentFacade createMaster() throws Exception
{
return createMaster(false);
}
private ReplicatedEnvironmentFacade createMaster(final boolean disableCoalescing) throws Exception
{
return createMaster(new NoopReplicationGroupListener(),disableCoalescing);
}
private ReplicatedEnvironmentFacade createMaster(ReplicationGroupListener replicationGroupListener) throws Exception
{
return createMaster(replicationGroupListener,false);
}
private ReplicatedEnvironmentFacade createMaster(ReplicationGroupListener replicationGroupListener,final boolean disableCoalescing) throws Exception
{
TestStateChangeListener stateChangeListener = new TestStateChangeListener();
ReplicatedEnvironmentFacade env = addNode(stateChangeListener, replicationGroupListener);
assertTrue("Environment was not created", stateChangeListener.awaitForStateChange(State.MASTER,
_timeout, TimeUnit.SECONDS));
return env;
}
private ReplicatedEnvironmentFacade createReplica(String nodeName, String nodeHostPort, ReplicationGroupListener replicationGroupListener) throws Exception
{
TestStateChangeListener testStateChangeListener = new TestStateChangeListener();
return createReplica(nodeName, nodeHostPort, testStateChangeListener, replicationGroupListener);
}
private ReplicatedEnvironmentFacade createReplica(String nodeName, String nodeHostPort,
TestStateChangeListener testStateChangeListener, ReplicationGroupListener replicationGroupListener)
throws Exception
{
ReplicatedEnvironmentFacade replicaEnvironmentFacade = addNode(nodeName, nodeHostPort, TEST_DESIGNATED_PRIMARY,
testStateChangeListener, replicationGroupListener);
boolean awaitForStateChange = testStateChangeListener.awaitForStateChange(State.REPLICA,
_timeout, TimeUnit.SECONDS);
assertTrue("Replica " + nodeName + " did not go into desired state; current actual state is " + testStateChangeListener.getCurrentActualState(),
awaitForStateChange);
return replicaEnvironmentFacade;
}
private ReplicatedEnvironmentFacade addNode(String nodeName, String nodeHostPort, boolean designatedPrimary,
StateChangeListener stateChangeListener, ReplicationGroupListener replicationGroupListener,final boolean disableCoalescing)
{
ReplicatedEnvironmentConfiguration config = createReplicatedEnvironmentConfiguration(nodeName, nodeHostPort, designatedPrimary,disableCoalescing);
return createReplicatedEnvironmentFacade(nodeName, stateChangeListener, replicationGroupListener, config);
}
private ReplicatedEnvironmentFacade addNode(String nodeName, String nodeHostPort, boolean designatedPrimary,
StateChangeListener stateChangeListener, ReplicationGroupListener replicationGroupListener)
{
return addNode(nodeName,nodeHostPort,designatedPrimary,stateChangeListener,replicationGroupListener,false);
}
private ReplicatedEnvironmentFacade addNodeWithDurability(String nodeName, String nodeHostPort, boolean designatedPrimary,
StateChangeListener stateChangeListener, ReplicationGroupListener replicationGroupListener, boolean disableCoalescing, String durability)
{
ReplicatedEnvironmentConfiguration config = createReplicatedEnvironmentConfiguration(nodeName, nodeHostPort, designatedPrimary,disableCoalescing);
when(config.getFacadeParameter(eq(String.class),
eq(NO_SYNC_TX_DURABILITY_PROPERTY_NAME),
anyString())).thenReturn(durability);
return createReplicatedEnvironmentFacade(nodeName, stateChangeListener, replicationGroupListener, config);
}
private ReplicatedEnvironmentFacade createReplicatedEnvironmentFacade(String nodeName, StateChangeListener stateChangeListener, ReplicationGroupListener replicationGroupListener, ReplicatedEnvironmentConfiguration config) {
ReplicatedEnvironmentFacade ref = new ReplicatedEnvironmentFacade(config);
ref.setStateChangeListener(stateChangeListener);
ref.setReplicationGroupListener(replicationGroupListener);
ref.setMessageStoreDurability(TEST_DURABILITY.getLocalSync(), TEST_DURABILITY.getReplicaSync(), TEST_DURABILITY.getReplicaAck());
_nodes.put(nodeName, ref);
return ref;
}
private ReplicatedEnvironmentFacade addNode(StateChangeListener stateChangeListener,
ReplicationGroupListener replicationGroupListener,final boolean diableCoalescing)
{
return addNode(TEST_NODE_NAME, TEST_NODE_HOST_PORT, TEST_DESIGNATED_PRIMARY,
stateChangeListener, replicationGroupListener);
}
private ReplicatedEnvironmentFacade addNode(StateChangeListener stateChangeListener,
ReplicationGroupListener replicationGroupListener)
{
return addNode(stateChangeListener,replicationGroupListener,false);
}
private ReplicatedEnvironmentConfiguration createReplicatedEnvironmentConfiguration(String nodeName, String nodeHostPort, boolean designatedPrimary)
{
return createReplicatedEnvironmentConfiguration(nodeName,nodeHostPort,designatedPrimary,false);
}
private ReplicatedEnvironmentConfiguration createReplicatedEnvironmentConfiguration(String nodeName, String nodeHostPort, boolean designatedPrimary,final boolean disableCoalescing)
{
ReplicatedEnvironmentConfiguration node = mock(ReplicatedEnvironmentConfiguration.class);
when(node.getName()).thenReturn(nodeName);
when(node.getHostPort()).thenReturn(nodeHostPort);
when(node.isDesignatedPrimary()).thenReturn(designatedPrimary);
when(node.getQuorumOverride()).thenReturn(TEST_ELECTABLE_GROUP_OVERRIDE);
when(node.getPriority()).thenReturn(TEST_PRIORITY);
when(node.getGroupName()).thenReturn(TEST_GROUP_NAME);
when(node.getHelperHostPort()).thenReturn(TEST_NODE_HELPER_HOST_PORT);
when(node.getHelperNodeName()).thenReturn(TEST_NODE_NAME);
when(node.getFacadeParameter(eq(Integer.class),
eq(MASTER_TRANSFER_TIMEOUT_PROPERTY_NAME),
anyInt())).thenReturn(60000);
when(node.getFacadeParameter(eq(Integer.class), eq(DB_PING_SOCKET_TIMEOUT_PROPERTY_NAME), anyInt())).thenReturn(
10000);
when(node.getFacadeParameter(eq(Integer.class),
eq(REMOTE_NODE_MONITOR_INTERVAL_PROPERTY_NAME),
anyInt())).thenReturn(1000);
when(node.getFacadeParameter(eq(Integer.class),
eq(REMOTE_NODE_MONITOR_TIMEOUT_PROPERTY_NAME),
anyInt())).thenReturn(1000);
when(node.getFacadeParameter(eq(Integer.class),
eq(ENVIRONMENT_RESTART_RETRY_LIMIT_PROPERTY_NAME),
anyInt())).thenReturn(3);
when(node.getFacadeParameter(eq(Integer.class),
eq(EXECUTOR_SHUTDOWN_TIMEOUT_PROPERTY_NAME),
anyInt())).thenReturn(10000);
when(node.getFacadeParameter(eq(Integer.class),
eq(LOG_HANDLER_CLEANER_PROTECTED_FILES_LIMIT_PROPERTY_NAME),
anyInt())).thenReturn(0);
when(node.getFacadeParameter(eq(Map.class), any(), eq(JUL_LOGGER_LEVEL_OVERRIDE), any())).thenReturn(Collections.emptyMap());
when(node.getFacadeParameter(eq(Boolean.class),
eq(DISABLE_COALESCING_COMMITTER_PROPERTY_NAME),
anyBoolean())).thenReturn(disableCoalescing);
Map<String, String> repConfig = new HashMap<>();
repConfig.put(ReplicationConfig.REPLICA_ACK_TIMEOUT, "2 s");
repConfig.put(ReplicationConfig.INSUFFICIENT_REPLICAS_TIMEOUT, "2 s");
when(node.getReplicationParameters()).thenReturn(repConfig);
when(node.getStorePath()).thenReturn(new File(_storePath, nodeName).getAbsolutePath());
return node;
}
class NoopReplicationGroupListener implements ReplicationGroupListener
{
@Override
public void onReplicationNodeAddedToGroup(ReplicationNode node)
{
}
@Override
public void onReplicationNodeRecovered(ReplicationNode node)
{
}
@Override
public void onReplicationNodeRemovedFromGroup(ReplicationNode node)
{
}
@Override
public void onNodeState(ReplicationNode node, NodeState nodeState)
{
}
@Override
public boolean onIntruderNode(ReplicationNode node)
{
LOGGER.warn("Intruder node " + node);
return true;
}
@Override
public void onNoMajority()
{
}
@Override
public void onNodeRolledback()
{
}
@Override
public void onException(Exception e)
{
}
}
}