QPID-8546:[Broker-J] Use special durability for non-sync commits in BDB HA

This closes #99
diff --git a/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/AbstractBDBMessageStore.java b/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/AbstractBDBMessageStore.java
index 94d6fb1..84e1190 100644
--- a/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/AbstractBDBMessageStore.java
+++ b/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/AbstractBDBMessageStore.java
@@ -315,7 +315,7 @@
         }
     }
 
-    void removeMessage(long messageId, boolean sync) throws StoreException
+    void removeMessage(long messageId) throws StoreException
     {
         boolean complete = false;
         Transaction tx = null;
@@ -351,7 +351,7 @@
 
                     getLogger().debug("Deleted content for message {}", messageId);
 
-                    getEnvironmentFacade().commit(tx, sync);
+                    getEnvironmentFacade().commitNoSync(tx);
 
                     complete = true;
                     tx = null;
@@ -789,17 +789,16 @@
      *
      * @throws org.apache.qpid.server.store.StoreException If the operation fails for any reason.
      */
-    private void commitTranImpl(final Transaction tx, boolean syncCommit) throws StoreException
+    private void commitTranImpl(final Transaction tx) throws StoreException
     {
         if (tx == null)
         {
             throw new StoreException("Fatal internal error: transactional is null at commitTran");
         }
 
-        getEnvironmentFacade().commit(tx, syncCommit);
+        getEnvironmentFacade().commit(tx);
 
-        getLogger().debug("commitTranImpl completed {} transaction {}",
-                          syncCommit ? "synchronous" : "asynchronous", tx);
+        getLogger().debug("commitTranImpl completed {} transaction synchronous", tx);
 
 
     }
@@ -1201,7 +1200,7 @@
                         throw getEnvironmentFacade().handleDatabaseException("failed to begin transaction", e);
                     }
                     store(txn);
-                    getEnvironmentFacade().commit(txn, false);
+                    getEnvironmentFacade().commitAsync(txn, false);
 
                 }
             }
@@ -1214,7 +1213,7 @@
             _messages.remove(this);
             if(stored())
             {
-                removeMessage(_messageId, false);
+                removeMessage(_messageId);
                 storedSizeChangeOccurred(-getContentSize());
             }
             if (!_messageDeleteListeners.isEmpty())
@@ -1378,7 +1377,7 @@
         {
             checkMessageStoreOpen();
             doPreCommitActions();
-            AbstractBDBMessageStore.this.commitTranImpl(_txn, true);
+            AbstractBDBMessageStore.this.commitTranImpl(_txn);
             doPostCommitActions();
             AbstractBDBMessageStore.this.storedSizeChangeOccurred(_storeSizeIncrease);
         }
diff --git a/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/EnvironmentFacade.java b/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/EnvironmentFacade.java
index fcf6d78..bf5e9b7 100644
--- a/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/EnvironmentFacade.java
+++ b/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/EnvironmentFacade.java
@@ -67,7 +67,7 @@
 
     Transaction beginTransaction(TransactionConfig transactionConfig);
 
-    void commit(Transaction tx, boolean sync);
+    void commit(Transaction tx);
     <X> ListenableFuture<X> commitAsync(Transaction tx, X val);
 
     RuntimeException handleDatabaseException(String contextMessage, RuntimeException e);
@@ -98,4 +98,6 @@
     Map<String,Object> getDatabaseStatistics(String database, boolean reset);
 
     void deleteDatabase(String databaseName);
+
+    void commitNoSync(final Transaction tx);
 }
diff --git a/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/StandardEnvironmentFacade.java b/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/StandardEnvironmentFacade.java
index 271c54c..3b55d24 100644
--- a/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/StandardEnvironmentFacade.java
+++ b/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/StandardEnvironmentFacade.java
@@ -166,7 +166,12 @@
     }
 
     @Override
-    public void commit(com.sleepycat.je.Transaction tx, boolean syncCommit)
+    public void commit(Transaction tx)
+    {
+        commitInternal(tx, true);
+    }
+
+    private void commitInternal(final Transaction tx, final boolean syncCommit)
     {
         try
         {
@@ -184,6 +189,12 @@
     }
 
     @Override
+    public void commitNoSync(final Transaction tx)
+    {
+        commitInternal(tx, false);
+    }
+
+    @Override
     public <X> ListenableFuture<X> commitAsync(final Transaction tx, final X val)
     {
         try
diff --git a/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/replication/ReplicatedEnvironmentFacade.java b/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/replication/ReplicatedEnvironmentFacade.java
index 7b15c44..3e201ba 100644
--- a/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/replication/ReplicatedEnvironmentFacade.java
+++ b/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/replication/ReplicatedEnvironmentFacade.java
@@ -93,6 +93,7 @@
     public static final String ENVIRONMENT_RESTART_RETRY_LIMIT_PROPERTY_NAME = "qpid.bdb.ha.environment_restart_retry_limit";
     public static final String EXECUTOR_SHUTDOWN_TIMEOUT_PROPERTY_NAME = "qpid.bdb.ha.executor_shutdown_timeout";
     public static final String DISABLE_COALESCING_COMMITTER_PROPERTY_NAME = "qpid.bdb.ha.disable_coalescing_committer";
+    public static final String NO_SYNC_TX_DURABILITY_PROPERTY_NAME = "qpid.bdb.ha.noSyncTxDurablity";
 
     private static final Logger LOGGER = LoggerFactory.getLogger(ReplicatedEnvironmentFacade.class);
 
@@ -103,6 +104,8 @@
     private static final int DEFAULT_ENVIRONMENT_RESTART_RETRY_LIMIT = 3;
     private static final int DEFAULT_EXECUTOR_SHUTDOWN_TIMEOUT = 5000;
     private static final boolean DEFAULT_DISABLE_COALESCING_COMMITTER = false;
+    private static final String DEFAULT_NO_SYNC_TX_DURABILITY_PROPERTY_NAME = "NO_SYNC,NO_SYNC,NONE";
+    private static final String DEFAULT_SYNC_TX_DURABILITY_PROPERTY_NAME = "SYNC,NO_SYNC,NONE";
 
     /** Length of time allowed for a master transfer to complete before the operation will timeout */
     private final int _masterTransferTimeout;
@@ -139,6 +142,8 @@
 
     private final boolean _disableCoalescingCommiter;
 
+    private final Durability _noSyncTxDurability;
+
     private final int _logHandlerCleanerProtectedFilesLimit;
 
     static final SyncPolicy LOCAL_TRANSACTION_SYNCHRONIZATION_POLICY = SyncPolicy.SYNC;
@@ -284,6 +289,7 @@
         _stateChangeExecutor = MoreExecutors.listeningDecorator(Executors.newSingleThreadExecutor(new DaemonThreadFactory("StateChange-" + _prettyGroupNodeName)));
         _groupChangeExecutor = new ScheduledThreadPoolExecutor(2, new DaemonThreadFactory("Group-Change-Learner:" + _prettyGroupNodeName));
         _disableCoalescingCommiter = configuration.getFacadeParameter(Boolean.class,DISABLE_COALESCING_COMMITTER_PROPERTY_NAME, DEFAULT_DISABLE_COALESCING_COMMITTER);
+        _noSyncTxDurability = Durability.parse(configuration.getFacadeParameter(String.class, NO_SYNC_TX_DURABILITY_PROPERTY_NAME, getDefaultDurability(_disableCoalescingCommiter)));
 
         // create environment in a separate thread to avoid renaming of the current thread by JE
         EnvHomeRegistry.getInstance().registerHome(_environmentDirectory);
@@ -318,6 +324,18 @@
         }
     }
 
+    private String getDefaultDurability(final boolean disableCoalescingCommiter)
+    {
+        if (disableCoalescingCommiter)
+        {
+            return DEFAULT_SYNC_TX_DURABILITY_PROPERTY_NAME;
+        }
+        else
+        {
+            return DEFAULT_NO_SYNC_TX_DURABILITY_PROPERTY_NAME;
+        }
+    }
+
     @Override
     public Transaction beginTransaction(TransactionConfig transactionConfig)
     {
@@ -325,40 +343,48 @@
     }
 
     @Override
-    public void commit(final Transaction tx, boolean syncCommit)
+    public void commit(final Transaction tx)
+    {
+        commitInternal(tx, _realMessageStoreDurability);
+
+        if (_coalescingCommiter != null && _realMessageStoreDurability.getLocalSync() == SyncPolicy.NO_SYNC
+                && _messageStoreDurability.getLocalSync() == SyncPolicy.SYNC)
+        {
+            _coalescingCommiter.commit(tx, true);
+        }
+
+    }
+
+    private void commitInternal(final Transaction tx, final Durability realMessageStoreDurability)
     {
         try
         {
             // Using commit() instead of commitNoSync() for the HA store to allow
             // the HA durability configuration to influence resulting behaviour.
-            tx.commit(_realMessageStoreDurability);
+            tx.commit(realMessageStoreDurability);
         }
         catch (DatabaseException de)
         {
             throw handleDatabaseException("Got DatabaseException on commit, closing environment", de);
         }
+    }
+
+    @Override
+    public void commitNoSync(final Transaction tx)
+    {
+        commitInternal(tx, _noSyncTxDurability);
 
         if (_coalescingCommiter != null && _realMessageStoreDurability.getLocalSync() == SyncPolicy.NO_SYNC
-                && _messageStoreDurability.getLocalSync() == SyncPolicy.SYNC)
+            && _messageStoreDurability.getLocalSync() == SyncPolicy.SYNC)
         {
-            _coalescingCommiter.commit(tx, syncCommit);
+            _coalescingCommiter.commit(tx, false);
         }
-
     }
 
     @Override
     public <X> ListenableFuture<X> commitAsync(final Transaction tx, final X val)
     {
-        try
-        {
-            // Using commit() instead of commitNoSync() for the HA store to allow
-            // the HA durability configuration to influence resulting behaviour.
-            tx.commit(_realMessageStoreDurability);
-        }
-        catch (DatabaseException de)
-        {
-            throw handleDatabaseException("Got DatabaseException on commit, closing environment", de);
-        }
+        commitInternal(tx, _realMessageStoreDurability);
 
         if (_coalescingCommiter != null && _realMessageStoreDurability.getLocalSync() == SyncPolicy.NO_SYNC
             && _messageStoreDurability.getLocalSync() == SyncPolicy.SYNC)
diff --git a/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/BDBMessageStoreTest.java b/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/BDBMessageStoreTest.java
index d3fea6e..8b03750 100644
--- a/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/BDBMessageStoreTest.java
+++ b/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/BDBMessageStoreTest.java
@@ -116,7 +116,7 @@
         StoredMessage<MessageMetaData> storedMessage_0_8 = createAndStoreSingleChunkMessage_0_8(bdbStore);
         long messageid_0_8 = storedMessage_0_8.getMessageNumber();
 
-        bdbStore.removeMessage(messageid_0_8, true);
+        bdbStore.removeMessage(messageid_0_8);
 
         //verify the removal using the BDB store implementation methods directly
         try
diff --git a/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/replication/ReplicatedEnvironmentFacadeTest.java b/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/replication/ReplicatedEnvironmentFacadeTest.java
index 90c15f7..9094f02 100644
--- a/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/replication/ReplicatedEnvironmentFacadeTest.java
+++ b/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/replication/ReplicatedEnvironmentFacadeTest.java
@@ -21,11 +21,11 @@
 package org.apache.qpid.server.store.berkeleydb.replication;
 
 import static org.apache.qpid.server.store.berkeleydb.EnvironmentFacade.JUL_LOGGER_LEVEL_OVERRIDE;
-import static org.apache.qpid.server.store.berkeleydb.EnvironmentFacade
-        .LOG_HANDLER_CLEANER_PROTECTED_FILES_LIMIT_PROPERTY_NAME;
+import static org.apache.qpid.server.store.berkeleydb.EnvironmentFacade.LOG_HANDLER_CLEANER_PROTECTED_FILES_LIMIT_PROPERTY_NAME;
 import static org.apache.qpid.server.store.berkeleydb.replication.ReplicatedEnvironmentFacade.*;
 import static org.hamcrest.CoreMatchers.equalTo;
 import static org.hamcrest.CoreMatchers.is;
+import static org.junit.Assert.assertArrayEquals;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertNotNull;
@@ -37,8 +37,8 @@
 import static org.mockito.ArgumentMatchers.any;
 import static org.mockito.ArgumentMatchers.anyBoolean;
 import static org.mockito.ArgumentMatchers.anyInt;
+import static org.mockito.ArgumentMatchers.anyString;
 import static org.mockito.ArgumentMatchers.eq;
-import static org.mockito.ArgumentMatchers.isA;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.when;
 
@@ -69,6 +69,8 @@
 import com.sleepycat.je.DatabaseEntry;
 import com.sleepycat.je.Durability;
 import com.sleepycat.je.EnvironmentConfig;
+import com.sleepycat.je.LockMode;
+import com.sleepycat.je.OperationStatus;
 import com.sleepycat.je.Transaction;
 import com.sleepycat.je.TransactionConfig;
 import com.sleepycat.je.rep.NoConsistencyRequiredPolicy;
@@ -1196,18 +1198,172 @@
                    masterListener.awaitForStateChange(State.MASTER, _timeout, TimeUnit.SECONDS));
     }
 
+    @Test
+    public void testNodeCommitNoSyncWithCoalescing() throws Exception
+    {
+        DatabaseConfig createConfig = new DatabaseConfig();
+        createConfig.setAllowCreate(true);
+        createConfig.setTransactional(true);
+
+        TestStateChangeListener masterListener = new TestStateChangeListener();
+        ReplicatedEnvironmentFacade node1 =
+                addNodeWithDurability(TEST_NODE_NAME, TEST_NODE_HOST_PORT, true, masterListener, new NoopReplicationGroupListener(), false,"NO_SYNC,NO_SYNC,NONE");
+        assertTrue("Environment was not created", masterListener.awaitForStateChange(State.MASTER,
+                                                                                     _timeout, TimeUnit.SECONDS));
+
+        String replicaNodeHostPort = "localhost:" + _portHelper.getNextAvailable();
+        String replicaName = TEST_NODE_NAME + 1;
+        ReplicatedEnvironmentFacade node2 =
+                createReplica(replicaName, replicaNodeHostPort, new NoopReplicationGroupListener());
+
+        Database db = node1.openDatabase("mydb", createConfig);
+
+        int key = 1;
+        String data = "value";
+        // Put a record (using commitNoSync)
+        TransactionConfig transactionConfig = addTestKeyValueWithCommitNoSync(node1, db, key, data);
+        db.close();
+
+        node1.close();
+        node2.close();
+
+        LOGGER.debug("RESTARTING " + TEST_NODE_NAME);
+
+        node1 = addNodeWithDurability(TEST_NODE_NAME, TEST_NODE_HOST_PORT, true, masterListener, new NoopReplicationGroupListener(), false,"NO_SYNC,NO_SYNC,NONE");
+        boolean awaitForStateChange = masterListener.awaitForStateChange(State.MASTER,
+                                                                         _timeout, TimeUnit.SECONDS);
+        LOGGER.debug("RESTARTING " + replicaName);
+        TestStateChangeListener node2StateChangeListener = new TestStateChangeListener();
+        node2 = addNode(replicaName,
+                        replicaNodeHostPort,
+                        false,
+                        node2StateChangeListener,
+                        new NoopReplicationGroupListener());
+        db = node1.openDatabase("mydb", DatabaseConfig.DEFAULT);
+        byte[] resultData = getTestKeyValue(node1, db, key, transactionConfig);
+        DatabaseEntry dbData = getDatabaseEntry(data);
+        assertArrayEquals(resultData, dbData.getData());
+        assertEquals("value", StringBinding.entryToString(dbData));
+
+        db.close();
+
+        LOGGER.debug("CLOSING");
+        node1.close();
+        node2.close();
+    }
+
+    @Test
+    public void testNodeCommitSyncWithoutCoalescing() throws Exception
+    {
+        DatabaseConfig createConfig = new DatabaseConfig();
+        createConfig.setAllowCreate(true);
+        createConfig.setTransactional(true);
+
+        TestStateChangeListener masterListener = new TestStateChangeListener();
+        ReplicatedEnvironmentFacade node1 =
+                addNodeWithDurability(TEST_NODE_NAME, TEST_NODE_HOST_PORT, true, masterListener, new NoopReplicationGroupListener(),true,"SYNC,NO_SYNC,NONE");
+
+        assertTrue("Environment was not created", masterListener.awaitForStateChange(State.MASTER,
+                                                                                     _timeout, TimeUnit.SECONDS));
+
+        String replicaNodeHostPort = "localhost:" + _portHelper.getNextAvailable();
+        String replicaName = TEST_NODE_NAME + 1;
+        ReplicatedEnvironmentFacade node2 =
+                createReplica(replicaName, replicaNodeHostPort, new NoopReplicationGroupListener());
+
+        Database db = node1.openDatabase("mydb", createConfig);
+
+        int key = 1;
+        String data = "value";
+        // Put a record (using commitNoSync)
+        TransactionConfig transactionConfig = addTestKeyValueWithCommitNoSync(node1, db, key, data);
+        db.close();
+
+        node1.close();
+        node2.close();
+
+        LOGGER.debug("RESTARTING " + TEST_NODE_NAME);
+
+        node1 = addNodeWithDurability(TEST_NODE_NAME, TEST_NODE_HOST_PORT, true, masterListener, new NoopReplicationGroupListener(),true,"SYNC,NO_SYNC,NONE");
+        boolean awaitForStateChange = masterListener.awaitForStateChange(State.MASTER,
+                                                                         _timeout, TimeUnit.SECONDS);
+        LOGGER.debug("RESTARTING " + replicaName);
+        TestStateChangeListener node2StateChangeListener = new TestStateChangeListener();
+        node2 = addNode(replicaName,
+                        replicaNodeHostPort,
+                        false,
+                        node2StateChangeListener,
+                        new NoopReplicationGroupListener());
+        db = node1.openDatabase("mydb", DatabaseConfig.DEFAULT);
+        byte[] resultData = getTestKeyValue(node1, db, key, transactionConfig);
+        DatabaseEntry dbData = getDatabaseEntry(data);
+        assertArrayEquals(resultData, dbData.getData());
+        assertEquals("value", StringBinding.entryToString(dbData));
+
+        db.close();
+
+        LOGGER.debug("CLOSING");
+        node1.close();
+        node2.close();
+    }
+
+    private DatabaseEntry getDatabaseEntry(final String data)
+    {
+        DatabaseEntry dbData = new DatabaseEntry();
+        StringBinding.stringToEntry(data, dbData);
+        return dbData;
+    }
+    private DatabaseEntry getDatabaseEntry(final int data)
+    {
+        DatabaseEntry dbData = new DatabaseEntry();
+        IntegerBinding.intToEntry(data, dbData);
+        return dbData;
+    }
+
+    private byte[] getTestKeyValue(final ReplicatedEnvironmentFacade node1,
+                                   final Database db,
+                                   final int keyValue,
+                                   final TransactionConfig transactionConfig)
+    {
+        Transaction txn = node1.beginTransaction(transactionConfig);
+
+        DatabaseEntry key = getDatabaseEntry(keyValue);
+        DatabaseEntry result = new DatabaseEntry();
+        OperationStatus status = db.get(txn, key, result, LockMode.READ_UNCOMMITTED);
+        txn.commit();
+        byte[] resultData = new byte[0];
+        if (status == OperationStatus.SUCCESS)
+        {
+            resultData = result.getData();
+        }
+        return resultData;
+    }
+
+    private TransactionConfig addTestKeyValueWithCommitNoSync(final ReplicatedEnvironmentFacade node1,
+                                                              final Database db,
+                                                              final int keyValue, final String dataValue)
+    {
+        DatabaseEntry key = getDatabaseEntry(keyValue);
+        DatabaseEntry data = getDatabaseEntry(dataValue);
+        TransactionConfig transactionConfig = new TransactionConfig();
+        transactionConfig.setDurability(node1.getRealMessageStoreDurability());
+
+        Transaction txn = node1.beginTransaction(null);
+        db.put(txn, key, data);
+        node1.commitNoSync(txn);
+        return transactionConfig;
+    }
+
+
     private void putRecord(final ReplicatedEnvironmentFacade master, final Database db, final int keyValue,
                            final String dataValue)
     {
-        DatabaseEntry key = new DatabaseEntry();
-        DatabaseEntry data = new DatabaseEntry();
+        DatabaseEntry key = getDatabaseEntry(keyValue);
+        DatabaseEntry data = getDatabaseEntry(dataValue);
 
         TransactionConfig transactionConfig = new TransactionConfig();
         transactionConfig.setDurability(master.getRealMessageStoreDurability());
         Transaction txn = master.beginTransaction(transactionConfig);
-        IntegerBinding.intToEntry(keyValue, key);
-        StringBinding.stringToEntry(dataValue, data);
-
         db.put(txn, key, data);
         txn.commit();
     }
@@ -1293,6 +1449,16 @@
        return addNode(nodeName,nodeHostPort,designatedPrimary,stateChangeListener,replicationGroupListener,false);
     }
 
+    private ReplicatedEnvironmentFacade addNodeWithDurability(String nodeName, String nodeHostPort, boolean designatedPrimary,
+                                                StateChangeListener stateChangeListener, ReplicationGroupListener replicationGroupListener, boolean disableCoalescing, String durability)
+    {
+        ReplicatedEnvironmentConfiguration config = createReplicatedEnvironmentConfiguration(nodeName, nodeHostPort, designatedPrimary,disableCoalescing);
+        when(config.getFacadeParameter(eq(String.class),
+                                     eq(NO_SYNC_TX_DURABILITY_PROPERTY_NAME),
+                                     anyString())).thenReturn(durability);
+        return createReplicatedEnvironmentFacade(nodeName, stateChangeListener, replicationGroupListener, config);
+
+    }
     private ReplicatedEnvironmentFacade createReplicatedEnvironmentFacade(String nodeName, StateChangeListener stateChangeListener, ReplicationGroupListener replicationGroupListener, ReplicatedEnvironmentConfiguration config) {
         ReplicatedEnvironmentFacade ref = new ReplicatedEnvironmentFacade(config);
         ref.setStateChangeListener(stateChangeListener);