QPID-8546:[Broker-J] Use special durability for non-sync commits in BDB HA
This closes #99
diff --git a/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/AbstractBDBMessageStore.java b/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/AbstractBDBMessageStore.java
index 94d6fb1..84e1190 100644
--- a/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/AbstractBDBMessageStore.java
+++ b/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/AbstractBDBMessageStore.java
@@ -315,7 +315,7 @@
}
}
- void removeMessage(long messageId, boolean sync) throws StoreException
+ void removeMessage(long messageId) throws StoreException
{
boolean complete = false;
Transaction tx = null;
@@ -351,7 +351,7 @@
getLogger().debug("Deleted content for message {}", messageId);
- getEnvironmentFacade().commit(tx, sync);
+ getEnvironmentFacade().commitNoSync(tx);
complete = true;
tx = null;
@@ -789,17 +789,16 @@
*
* @throws org.apache.qpid.server.store.StoreException If the operation fails for any reason.
*/
- private void commitTranImpl(final Transaction tx, boolean syncCommit) throws StoreException
+ private void commitTranImpl(final Transaction tx) throws StoreException
{
if (tx == null)
{
throw new StoreException("Fatal internal error: transactional is null at commitTran");
}
- getEnvironmentFacade().commit(tx, syncCommit);
+ getEnvironmentFacade().commit(tx);
- getLogger().debug("commitTranImpl completed {} transaction {}",
- syncCommit ? "synchronous" : "asynchronous", tx);
+ getLogger().debug("commitTranImpl completed {} transaction synchronous", tx);
}
@@ -1201,7 +1200,7 @@
throw getEnvironmentFacade().handleDatabaseException("failed to begin transaction", e);
}
store(txn);
- getEnvironmentFacade().commit(txn, false);
+ getEnvironmentFacade().commitAsync(txn, false);
}
}
@@ -1214,7 +1213,7 @@
_messages.remove(this);
if(stored())
{
- removeMessage(_messageId, false);
+ removeMessage(_messageId);
storedSizeChangeOccurred(-getContentSize());
}
if (!_messageDeleteListeners.isEmpty())
@@ -1378,7 +1377,7 @@
{
checkMessageStoreOpen();
doPreCommitActions();
- AbstractBDBMessageStore.this.commitTranImpl(_txn, true);
+ AbstractBDBMessageStore.this.commitTranImpl(_txn);
doPostCommitActions();
AbstractBDBMessageStore.this.storedSizeChangeOccurred(_storeSizeIncrease);
}
diff --git a/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/EnvironmentFacade.java b/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/EnvironmentFacade.java
index fcf6d78..bf5e9b7 100644
--- a/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/EnvironmentFacade.java
+++ b/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/EnvironmentFacade.java
@@ -67,7 +67,7 @@
Transaction beginTransaction(TransactionConfig transactionConfig);
- void commit(Transaction tx, boolean sync);
+ void commit(Transaction tx);
<X> ListenableFuture<X> commitAsync(Transaction tx, X val);
RuntimeException handleDatabaseException(String contextMessage, RuntimeException e);
@@ -98,4 +98,6 @@
Map<String,Object> getDatabaseStatistics(String database, boolean reset);
void deleteDatabase(String databaseName);
+
+ void commitNoSync(final Transaction tx);
}
diff --git a/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/StandardEnvironmentFacade.java b/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/StandardEnvironmentFacade.java
index 271c54c..3b55d24 100644
--- a/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/StandardEnvironmentFacade.java
+++ b/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/StandardEnvironmentFacade.java
@@ -166,7 +166,12 @@
}
@Override
- public void commit(com.sleepycat.je.Transaction tx, boolean syncCommit)
+ public void commit(Transaction tx)
+ {
+ commitInternal(tx, true);
+ }
+
+ private void commitInternal(final Transaction tx, final boolean syncCommit)
{
try
{
@@ -184,6 +189,12 @@
}
@Override
+ public void commitNoSync(final Transaction tx)
+ {
+ commitInternal(tx, false);
+ }
+
+ @Override
public <X> ListenableFuture<X> commitAsync(final Transaction tx, final X val)
{
try
diff --git a/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/replication/ReplicatedEnvironmentFacade.java b/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/replication/ReplicatedEnvironmentFacade.java
index 7b15c44..3e201ba 100644
--- a/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/replication/ReplicatedEnvironmentFacade.java
+++ b/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/replication/ReplicatedEnvironmentFacade.java
@@ -93,6 +93,7 @@
public static final String ENVIRONMENT_RESTART_RETRY_LIMIT_PROPERTY_NAME = "qpid.bdb.ha.environment_restart_retry_limit";
public static final String EXECUTOR_SHUTDOWN_TIMEOUT_PROPERTY_NAME = "qpid.bdb.ha.executor_shutdown_timeout";
public static final String DISABLE_COALESCING_COMMITTER_PROPERTY_NAME = "qpid.bdb.ha.disable_coalescing_committer";
+ public static final String NO_SYNC_TX_DURABILITY_PROPERTY_NAME = "qpid.bdb.ha.noSyncTxDurablity";
private static final Logger LOGGER = LoggerFactory.getLogger(ReplicatedEnvironmentFacade.class);
@@ -103,6 +104,8 @@
private static final int DEFAULT_ENVIRONMENT_RESTART_RETRY_LIMIT = 3;
private static final int DEFAULT_EXECUTOR_SHUTDOWN_TIMEOUT = 5000;
private static final boolean DEFAULT_DISABLE_COALESCING_COMMITTER = false;
+ private static final String DEFAULT_NO_SYNC_TX_DURABILITY_PROPERTY_NAME = "NO_SYNC,NO_SYNC,NONE";
+ private static final String DEFAULT_SYNC_TX_DURABILITY_PROPERTY_NAME = "SYNC,NO_SYNC,NONE";
/** Length of time allowed for a master transfer to complete before the operation will timeout */
private final int _masterTransferTimeout;
@@ -139,6 +142,8 @@
private final boolean _disableCoalescingCommiter;
+ private final Durability _noSyncTxDurability;
+
private final int _logHandlerCleanerProtectedFilesLimit;
static final SyncPolicy LOCAL_TRANSACTION_SYNCHRONIZATION_POLICY = SyncPolicy.SYNC;
@@ -284,6 +289,7 @@
_stateChangeExecutor = MoreExecutors.listeningDecorator(Executors.newSingleThreadExecutor(new DaemonThreadFactory("StateChange-" + _prettyGroupNodeName)));
_groupChangeExecutor = new ScheduledThreadPoolExecutor(2, new DaemonThreadFactory("Group-Change-Learner:" + _prettyGroupNodeName));
_disableCoalescingCommiter = configuration.getFacadeParameter(Boolean.class,DISABLE_COALESCING_COMMITTER_PROPERTY_NAME, DEFAULT_DISABLE_COALESCING_COMMITTER);
+ _noSyncTxDurability = Durability.parse(configuration.getFacadeParameter(String.class, NO_SYNC_TX_DURABILITY_PROPERTY_NAME, getDefaultDurability(_disableCoalescingCommiter)));
// create environment in a separate thread to avoid renaming of the current thread by JE
EnvHomeRegistry.getInstance().registerHome(_environmentDirectory);
@@ -318,6 +324,18 @@
}
}
+ private String getDefaultDurability(final boolean disableCoalescingCommiter)
+ {
+ if (disableCoalescingCommiter)
+ {
+ return DEFAULT_SYNC_TX_DURABILITY_PROPERTY_NAME;
+ }
+ else
+ {
+ return DEFAULT_NO_SYNC_TX_DURABILITY_PROPERTY_NAME;
+ }
+ }
+
@Override
public Transaction beginTransaction(TransactionConfig transactionConfig)
{
@@ -325,40 +343,48 @@
}
@Override
- public void commit(final Transaction tx, boolean syncCommit)
+ public void commit(final Transaction tx)
+ {
+ commitInternal(tx, _realMessageStoreDurability);
+
+ if (_coalescingCommiter != null && _realMessageStoreDurability.getLocalSync() == SyncPolicy.NO_SYNC
+ && _messageStoreDurability.getLocalSync() == SyncPolicy.SYNC)
+ {
+ _coalescingCommiter.commit(tx, true);
+ }
+
+ }
+
+ private void commitInternal(final Transaction tx, final Durability realMessageStoreDurability)
{
try
{
// Using commit() instead of commitNoSync() for the HA store to allow
// the HA durability configuration to influence resulting behaviour.
- tx.commit(_realMessageStoreDurability);
+ tx.commit(realMessageStoreDurability);
}
catch (DatabaseException de)
{
throw handleDatabaseException("Got DatabaseException on commit, closing environment", de);
}
+ }
+
+ @Override
+ public void commitNoSync(final Transaction tx)
+ {
+ commitInternal(tx, _noSyncTxDurability);
if (_coalescingCommiter != null && _realMessageStoreDurability.getLocalSync() == SyncPolicy.NO_SYNC
- && _messageStoreDurability.getLocalSync() == SyncPolicy.SYNC)
+ && _messageStoreDurability.getLocalSync() == SyncPolicy.SYNC)
{
- _coalescingCommiter.commit(tx, syncCommit);
+ _coalescingCommiter.commit(tx, false);
}
-
}
@Override
public <X> ListenableFuture<X> commitAsync(final Transaction tx, final X val)
{
- try
- {
- // Using commit() instead of commitNoSync() for the HA store to allow
- // the HA durability configuration to influence resulting behaviour.
- tx.commit(_realMessageStoreDurability);
- }
- catch (DatabaseException de)
- {
- throw handleDatabaseException("Got DatabaseException on commit, closing environment", de);
- }
+ commitInternal(tx, _realMessageStoreDurability);
if (_coalescingCommiter != null && _realMessageStoreDurability.getLocalSync() == SyncPolicy.NO_SYNC
&& _messageStoreDurability.getLocalSync() == SyncPolicy.SYNC)
diff --git a/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/BDBMessageStoreTest.java b/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/BDBMessageStoreTest.java
index d3fea6e..8b03750 100644
--- a/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/BDBMessageStoreTest.java
+++ b/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/BDBMessageStoreTest.java
@@ -116,7 +116,7 @@
StoredMessage<MessageMetaData> storedMessage_0_8 = createAndStoreSingleChunkMessage_0_8(bdbStore);
long messageid_0_8 = storedMessage_0_8.getMessageNumber();
- bdbStore.removeMessage(messageid_0_8, true);
+ bdbStore.removeMessage(messageid_0_8);
//verify the removal using the BDB store implementation methods directly
try
diff --git a/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/replication/ReplicatedEnvironmentFacadeTest.java b/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/replication/ReplicatedEnvironmentFacadeTest.java
index 90c15f7..9094f02 100644
--- a/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/replication/ReplicatedEnvironmentFacadeTest.java
+++ b/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/replication/ReplicatedEnvironmentFacadeTest.java
@@ -21,11 +21,11 @@
package org.apache.qpid.server.store.berkeleydb.replication;
import static org.apache.qpid.server.store.berkeleydb.EnvironmentFacade.JUL_LOGGER_LEVEL_OVERRIDE;
-import static org.apache.qpid.server.store.berkeleydb.EnvironmentFacade
- .LOG_HANDLER_CLEANER_PROTECTED_FILES_LIMIT_PROPERTY_NAME;
+import static org.apache.qpid.server.store.berkeleydb.EnvironmentFacade.LOG_HANDLER_CLEANER_PROTECTED_FILES_LIMIT_PROPERTY_NAME;
import static org.apache.qpid.server.store.berkeleydb.replication.ReplicatedEnvironmentFacade.*;
import static org.hamcrest.CoreMatchers.equalTo;
import static org.hamcrest.CoreMatchers.is;
+import static org.junit.Assert.assertArrayEquals;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNotNull;
@@ -37,8 +37,8 @@
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.anyBoolean;
import static org.mockito.ArgumentMatchers.anyInt;
+import static org.mockito.ArgumentMatchers.anyString;
import static org.mockito.ArgumentMatchers.eq;
-import static org.mockito.ArgumentMatchers.isA;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
@@ -69,6 +69,8 @@
import com.sleepycat.je.DatabaseEntry;
import com.sleepycat.je.Durability;
import com.sleepycat.je.EnvironmentConfig;
+import com.sleepycat.je.LockMode;
+import com.sleepycat.je.OperationStatus;
import com.sleepycat.je.Transaction;
import com.sleepycat.je.TransactionConfig;
import com.sleepycat.je.rep.NoConsistencyRequiredPolicy;
@@ -1196,18 +1198,172 @@
masterListener.awaitForStateChange(State.MASTER, _timeout, TimeUnit.SECONDS));
}
+ @Test
+ public void testNodeCommitNoSyncWithCoalescing() throws Exception
+ {
+ DatabaseConfig createConfig = new DatabaseConfig();
+ createConfig.setAllowCreate(true);
+ createConfig.setTransactional(true);
+
+ TestStateChangeListener masterListener = new TestStateChangeListener();
+ ReplicatedEnvironmentFacade node1 =
+ addNodeWithDurability(TEST_NODE_NAME, TEST_NODE_HOST_PORT, true, masterListener, new NoopReplicationGroupListener(), false,"NO_SYNC,NO_SYNC,NONE");
+ assertTrue("Environment was not created", masterListener.awaitForStateChange(State.MASTER,
+ _timeout, TimeUnit.SECONDS));
+
+ String replicaNodeHostPort = "localhost:" + _portHelper.getNextAvailable();
+ String replicaName = TEST_NODE_NAME + 1;
+ ReplicatedEnvironmentFacade node2 =
+ createReplica(replicaName, replicaNodeHostPort, new NoopReplicationGroupListener());
+
+ Database db = node1.openDatabase("mydb", createConfig);
+
+ int key = 1;
+ String data = "value";
+ // Put a record (using commitNoSync)
+ TransactionConfig transactionConfig = addTestKeyValueWithCommitNoSync(node1, db, key, data);
+ db.close();
+
+ node1.close();
+ node2.close();
+
+ LOGGER.debug("RESTARTING " + TEST_NODE_NAME);
+
+ node1 = addNodeWithDurability(TEST_NODE_NAME, TEST_NODE_HOST_PORT, true, masterListener, new NoopReplicationGroupListener(), false,"NO_SYNC,NO_SYNC,NONE");
+ boolean awaitForStateChange = masterListener.awaitForStateChange(State.MASTER,
+ _timeout, TimeUnit.SECONDS);
+ LOGGER.debug("RESTARTING " + replicaName);
+ TestStateChangeListener node2StateChangeListener = new TestStateChangeListener();
+ node2 = addNode(replicaName,
+ replicaNodeHostPort,
+ false,
+ node2StateChangeListener,
+ new NoopReplicationGroupListener());
+ db = node1.openDatabase("mydb", DatabaseConfig.DEFAULT);
+ byte[] resultData = getTestKeyValue(node1, db, key, transactionConfig);
+ DatabaseEntry dbData = getDatabaseEntry(data);
+ assertArrayEquals(resultData, dbData.getData());
+ assertEquals("value", StringBinding.entryToString(dbData));
+
+ db.close();
+
+ LOGGER.debug("CLOSING");
+ node1.close();
+ node2.close();
+ }
+
+ @Test
+ public void testNodeCommitSyncWithoutCoalescing() throws Exception
+ {
+ DatabaseConfig createConfig = new DatabaseConfig();
+ createConfig.setAllowCreate(true);
+ createConfig.setTransactional(true);
+
+ TestStateChangeListener masterListener = new TestStateChangeListener();
+ ReplicatedEnvironmentFacade node1 =
+ addNodeWithDurability(TEST_NODE_NAME, TEST_NODE_HOST_PORT, true, masterListener, new NoopReplicationGroupListener(),true,"SYNC,NO_SYNC,NONE");
+
+ assertTrue("Environment was not created", masterListener.awaitForStateChange(State.MASTER,
+ _timeout, TimeUnit.SECONDS));
+
+ String replicaNodeHostPort = "localhost:" + _portHelper.getNextAvailable();
+ String replicaName = TEST_NODE_NAME + 1;
+ ReplicatedEnvironmentFacade node2 =
+ createReplica(replicaName, replicaNodeHostPort, new NoopReplicationGroupListener());
+
+ Database db = node1.openDatabase("mydb", createConfig);
+
+ int key = 1;
+ String data = "value";
+ // Put a record (using commitNoSync)
+ TransactionConfig transactionConfig = addTestKeyValueWithCommitNoSync(node1, db, key, data);
+ db.close();
+
+ node1.close();
+ node2.close();
+
+ LOGGER.debug("RESTARTING " + TEST_NODE_NAME);
+
+ node1 = addNodeWithDurability(TEST_NODE_NAME, TEST_NODE_HOST_PORT, true, masterListener, new NoopReplicationGroupListener(),true,"SYNC,NO_SYNC,NONE");
+ boolean awaitForStateChange = masterListener.awaitForStateChange(State.MASTER,
+ _timeout, TimeUnit.SECONDS);
+ LOGGER.debug("RESTARTING " + replicaName);
+ TestStateChangeListener node2StateChangeListener = new TestStateChangeListener();
+ node2 = addNode(replicaName,
+ replicaNodeHostPort,
+ false,
+ node2StateChangeListener,
+ new NoopReplicationGroupListener());
+ db = node1.openDatabase("mydb", DatabaseConfig.DEFAULT);
+ byte[] resultData = getTestKeyValue(node1, db, key, transactionConfig);
+ DatabaseEntry dbData = getDatabaseEntry(data);
+ assertArrayEquals(resultData, dbData.getData());
+ assertEquals("value", StringBinding.entryToString(dbData));
+
+ db.close();
+
+ LOGGER.debug("CLOSING");
+ node1.close();
+ node2.close();
+ }
+
+ private DatabaseEntry getDatabaseEntry(final String data)
+ {
+ DatabaseEntry dbData = new DatabaseEntry();
+ StringBinding.stringToEntry(data, dbData);
+ return dbData;
+ }
+ private DatabaseEntry getDatabaseEntry(final int data)
+ {
+ DatabaseEntry dbData = new DatabaseEntry();
+ IntegerBinding.intToEntry(data, dbData);
+ return dbData;
+ }
+
+ private byte[] getTestKeyValue(final ReplicatedEnvironmentFacade node1,
+ final Database db,
+ final int keyValue,
+ final TransactionConfig transactionConfig)
+ {
+ Transaction txn = node1.beginTransaction(transactionConfig);
+
+ DatabaseEntry key = getDatabaseEntry(keyValue);
+ DatabaseEntry result = new DatabaseEntry();
+ OperationStatus status = db.get(txn, key, result, LockMode.READ_UNCOMMITTED);
+ txn.commit();
+ byte[] resultData = new byte[0];
+ if (status == OperationStatus.SUCCESS)
+ {
+ resultData = result.getData();
+ }
+ return resultData;
+ }
+
+ private TransactionConfig addTestKeyValueWithCommitNoSync(final ReplicatedEnvironmentFacade node1,
+ final Database db,
+ final int keyValue, final String dataValue)
+ {
+ DatabaseEntry key = getDatabaseEntry(keyValue);
+ DatabaseEntry data = getDatabaseEntry(dataValue);
+ TransactionConfig transactionConfig = new TransactionConfig();
+ transactionConfig.setDurability(node1.getRealMessageStoreDurability());
+
+ Transaction txn = node1.beginTransaction(null);
+ db.put(txn, key, data);
+ node1.commitNoSync(txn);
+ return transactionConfig;
+ }
+
+
private void putRecord(final ReplicatedEnvironmentFacade master, final Database db, final int keyValue,
final String dataValue)
{
- DatabaseEntry key = new DatabaseEntry();
- DatabaseEntry data = new DatabaseEntry();
+ DatabaseEntry key = getDatabaseEntry(keyValue);
+ DatabaseEntry data = getDatabaseEntry(dataValue);
TransactionConfig transactionConfig = new TransactionConfig();
transactionConfig.setDurability(master.getRealMessageStoreDurability());
Transaction txn = master.beginTransaction(transactionConfig);
- IntegerBinding.intToEntry(keyValue, key);
- StringBinding.stringToEntry(dataValue, data);
-
db.put(txn, key, data);
txn.commit();
}
@@ -1293,6 +1449,16 @@
return addNode(nodeName,nodeHostPort,designatedPrimary,stateChangeListener,replicationGroupListener,false);
}
+ private ReplicatedEnvironmentFacade addNodeWithDurability(String nodeName, String nodeHostPort, boolean designatedPrimary,
+ StateChangeListener stateChangeListener, ReplicationGroupListener replicationGroupListener, boolean disableCoalescing, String durability)
+ {
+ ReplicatedEnvironmentConfiguration config = createReplicatedEnvironmentConfiguration(nodeName, nodeHostPort, designatedPrimary,disableCoalescing);
+ when(config.getFacadeParameter(eq(String.class),
+ eq(NO_SYNC_TX_DURABILITY_PROPERTY_NAME),
+ anyString())).thenReturn(durability);
+ return createReplicatedEnvironmentFacade(nodeName, stateChangeListener, replicationGroupListener, config);
+
+ }
private ReplicatedEnvironmentFacade createReplicatedEnvironmentFacade(String nodeName, StateChangeListener stateChangeListener, ReplicationGroupListener replicationGroupListener, ReplicatedEnvironmentConfiguration config) {
ReplicatedEnvironmentFacade ref = new ReplicatedEnvironmentFacade(config);
ref.setStateChangeListener(stateChangeListener);