QPID-8547: [Broker-J] Configurable parameters for CoalescingCommiter

This closes #101
diff --git a/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/CoalescingCommiter.java b/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/CoalescingCommiter.java
index 3dceae4..a4d082d 100644
--- a/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/CoalescingCommiter.java
+++ b/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/CoalescingCommiter.java
@@ -40,9 +40,9 @@
 {
     private final CommitThread _commitThread;
 
-    public CoalescingCommiter(String name, EnvironmentFacade environmentFacade)
+    public CoalescingCommiter(String name, int commiterNotifyThreshold, long commiterWaitTimeout, EnvironmentFacade environmentFacade)
     {
-        _commitThread = new CommitThread("Commit-Thread-" + name, environmentFacade);
+        _commitThread = new CommitThread("Commit-Thread-" + name, commiterNotifyThreshold, commiterWaitTimeout, environmentFacade);
     }
 
     @Override
@@ -134,8 +134,9 @@
     private static class CommitThread extends Thread
     {
         private static final Logger LOGGER = LoggerFactory.getLogger(CommitThread.class);
-        private static final int JOB_QUEUE_NOTIFY_THRESHOLD = 8;
 
+        private final int _jobQueueNotifyThreshold;
+        private final long _commiterWaitTimeout;
         private final AtomicBoolean _stopped = new AtomicBoolean(false);
         private final Queue<CommitThreadJob> _jobQueue = new ConcurrentLinkedQueue<>();
         private final Object _lock = new Object();
@@ -143,9 +144,11 @@
 
         private final List<CommitThreadJob> _inProcessJobs = new ArrayList<>(256);
 
-        public CommitThread(String name, EnvironmentFacade environmentFacade)
+        public CommitThread(String name, int commiterNotifyThreshold, long commiterWaitTimeout, EnvironmentFacade environmentFacade)
         {
             super(name);
+            this._jobQueueNotifyThreshold = commiterNotifyThreshold;
+            this._commiterWaitTimeout = commiterWaitTimeout;
             _environmentFacade = environmentFacade;
         }
 
@@ -170,7 +173,7 @@
                         {
                             // Periodically wake up and check, just in case we
                             // missed a notification. Don't want to lock the broker hard.
-                            _lock.wait(500);
+                            _lock.wait(_commiterWaitTimeout);
                         }
                         catch (InterruptedException e)
                         {
@@ -248,7 +251,7 @@
                 throw new IllegalStateException("Commit thread is stopped");
             }
             _jobQueue.add(commit);
-            if(sync || _jobQueue.size() >= JOB_QUEUE_NOTIFY_THRESHOLD)
+            if(sync || _jobQueue.size() >= _jobQueueNotifyThreshold)
             {
                 synchronized (_lock)
                 {
diff --git a/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/StandardEnvironmentFacade.java b/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/StandardEnvironmentFacade.java
index 3b55d24..59070a1 100644
--- a/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/StandardEnvironmentFacade.java
+++ b/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/StandardEnvironmentFacade.java
@@ -32,6 +32,7 @@
 import java.util.concurrent.atomic.AtomicReference;
 
 import com.google.common.util.concurrent.ListenableFuture;
+
 import com.sleepycat.je.CheckpointConfig;
 import com.sleepycat.je.Database;
 import com.sleepycat.je.DatabaseConfig;
@@ -45,14 +46,15 @@
 import com.sleepycat.je.SequenceConfig;
 import com.sleepycat.je.Transaction;
 import com.sleepycat.je.TransactionConfig;
-import org.apache.qpid.server.model.ConfiguredObject;
-import org.apache.qpid.server.store.berkeleydb.upgrade.Upgrader;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import org.apache.qpid.server.model.ConfiguredObject;
 import org.apache.qpid.server.store.StoreException;
 import org.apache.qpid.server.store.berkeleydb.logging.Slf4jLoggingHandler;
+import org.apache.qpid.server.store.berkeleydb.upgrade.Upgrader;
+import org.apache.qpid.server.virtualhost.berkeleydb.BDBVirtualHost;
 
 public class StandardEnvironmentFacade implements EnvironmentFacade
 {
@@ -154,7 +156,17 @@
             }
         }
 
-        _committer =  new CoalescingCommiter(name, this);
+        final int commiterNotifyThreshold = configuration.getFacadeParameter(
+                Integer.class,
+                BDBVirtualHost.QPID_BROKER_BDB_COMMITER_NOTIFY_THRESHOLD,
+                BDBVirtualHost.DEFAULT_QPID_BROKER_BDB_COMMITER_NOTIFY_THRESHOLD
+        );
+        final long commiterWaitTimeout = configuration.getFacadeParameter(
+                Long.class,
+                BDBVirtualHost.QPID_BROKER_BDB_COMMITER_WAIT_TIMEOUT,
+                BDBVirtualHost.DEFAULT_QPID_BROKER_BDB_COMMITER_WAIT_TIMEOUT
+        );
+        _committer =  new CoalescingCommiter(name, commiterNotifyThreshold, commiterWaitTimeout, this);
         _committer.start();
     }
 
diff --git a/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/replication/ReplicatedEnvironmentFacade.java b/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/replication/ReplicatedEnvironmentFacade.java
index 3e201ba..e3a891f 100644
--- a/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/replication/ReplicatedEnvironmentFacade.java
+++ b/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/replication/ReplicatedEnvironmentFacade.java
@@ -23,8 +23,10 @@
 import java.io.ByteArrayOutputStream;
 import java.io.File;
 import java.io.IOException;
+
 import java.net.InetSocketAddress;
 import java.net.SocketTimeoutException;
+
 import java.util.Arrays;
 import java.util.Collection;
 import java.util.Collections;
@@ -51,10 +53,49 @@
 import com.google.common.util.concurrent.ListenableFuture;
 import com.google.common.util.concurrent.ListeningExecutorService;
 import com.google.common.util.concurrent.MoreExecutors;
-import com.sleepycat.je.*;
+
+import com.sleepycat.je.CheckpointConfig;
+import com.sleepycat.je.Database;
+import com.sleepycat.je.DatabaseConfig;
+import com.sleepycat.je.DatabaseEntry;
+import com.sleepycat.je.DatabaseException;
+import com.sleepycat.je.DbInternal;
+import com.sleepycat.je.Durability;
 import com.sleepycat.je.Durability.ReplicaAckPolicy;
 import com.sleepycat.je.Durability.SyncPolicy;
-import com.sleepycat.je.rep.*;
+import com.sleepycat.je.EnvironmentConfig;
+import com.sleepycat.je.EnvironmentFailureException;
+import com.sleepycat.je.EnvironmentMutableConfig;
+import com.sleepycat.je.ExceptionEvent;
+import com.sleepycat.je.LogWriteException;
+import com.sleepycat.je.Sequence;
+import com.sleepycat.je.SequenceConfig;
+import com.sleepycat.je.Transaction;
+import com.sleepycat.je.TransactionConfig;
+import com.sleepycat.je.rep.AppStateMonitor;
+import com.sleepycat.je.rep.InsufficientAcksException;
+import com.sleepycat.je.rep.InsufficientLogException;
+import com.sleepycat.je.rep.InsufficientReplicasException;
+import com.sleepycat.je.rep.MasterStateException;
+import com.sleepycat.je.rep.MemberNotFoundException;
+import com.sleepycat.je.rep.NetworkRestore;
+import com.sleepycat.je.rep.NetworkRestoreConfig;
+import com.sleepycat.je.rep.NoConsistencyRequiredPolicy;
+import com.sleepycat.je.rep.NodeState;
+import com.sleepycat.je.rep.NodeType;
+import com.sleepycat.je.rep.RepInternal;
+import com.sleepycat.je.rep.ReplicaConsistencyException;
+import com.sleepycat.je.rep.ReplicaWriteException;
+import com.sleepycat.je.rep.ReplicatedEnvironment;
+import com.sleepycat.je.rep.ReplicationConfig;
+import com.sleepycat.je.rep.ReplicationGroup;
+import com.sleepycat.je.rep.ReplicationMutableConfig;
+import com.sleepycat.je.rep.ReplicationNode;
+import com.sleepycat.je.rep.RestartRequiredException;
+import com.sleepycat.je.rep.RollbackException;
+import com.sleepycat.je.rep.StateChangeEvent;
+import com.sleepycat.je.rep.StateChangeListener;
+import com.sleepycat.je.rep.UnknownMasterException;
 import com.sleepycat.je.rep.impl.node.NameIdPair;
 import com.sleepycat.je.rep.util.DbPing;
 import com.sleepycat.je.rep.util.ReplicationGroupAdmin;
@@ -64,6 +105,7 @@
 import com.sleepycat.je.rep.vlsn.VLSNRange;
 import com.sleepycat.je.utilint.PropUtil;
 import com.sleepycat.je.utilint.VLSN;
+
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -83,6 +125,7 @@
 import org.apache.qpid.server.util.ExternalServiceException;
 import org.apache.qpid.server.util.ExternalServiceTimeoutException;
 import org.apache.qpid.server.util.ServerScopedRuntimeException;
+import org.apache.qpid.server.virtualhost.berkeleydb.BDBVirtualHost;
 
 public class ReplicatedEnvironmentFacade implements EnvironmentFacade, StateChangeListener
 {
@@ -1722,7 +1765,17 @@
             if (!_disableCoalescingCommiter && localTransactionSynchronizationPolicy == LOCAL_TRANSACTION_SYNCHRONIZATION_POLICY)
             {
                 localTransactionSynchronizationPolicy = SyncPolicy.NO_SYNC;
-                _coalescingCommiter = new CoalescingCommiter(_configuration.getGroupName(), this);
+                final int commiterNotifyThreshold = _configuration.getFacadeParameter(
+                        Integer.class,
+                        BDBVirtualHost.QPID_BROKER_BDB_COMMITER_NOTIFY_THRESHOLD,
+                        BDBVirtualHost.DEFAULT_QPID_BROKER_BDB_COMMITER_NOTIFY_THRESHOLD
+                );
+                final long commiterWaitTimeout = _configuration.getFacadeParameter(
+                        Long.class,
+                        BDBVirtualHost.QPID_BROKER_BDB_COMMITER_WAIT_TIMEOUT,
+                        BDBVirtualHost.DEFAULT_QPID_BROKER_BDB_COMMITER_WAIT_TIMEOUT
+                );
+                _coalescingCommiter = new CoalescingCommiter(_configuration.getGroupName(), commiterNotifyThreshold, commiterWaitTimeout,  this);
                 _coalescingCommiter.start();
             }
             _realMessageStoreDurability = new Durability(localTransactionSynchronizationPolicy, remoteTransactionSynchronizationPolicy, replicaAcknowledgmentPolicy);
diff --git a/bdbstore/src/main/java/org/apache/qpid/server/virtualhost/berkeleydb/BDBVirtualHost.java b/bdbstore/src/main/java/org/apache/qpid/server/virtualhost/berkeleydb/BDBVirtualHost.java
index 9b44503..08db3bc 100644
--- a/bdbstore/src/main/java/org/apache/qpid/server/virtualhost/berkeleydb/BDBVirtualHost.java
+++ b/bdbstore/src/main/java/org/apache/qpid/server/virtualhost/berkeleydb/BDBVirtualHost.java
@@ -37,11 +37,21 @@
 
     long BDB_MIN_CACHE_SIZE = 10*1024*1024;
     String QPID_BROKER_BDB_TOTAL_CACHE_SIZE = "qpid.broker.bdbTotalCacheSize";
+    String QPID_BROKER_BDB_COMMITER_NOTIFY_THRESHOLD = "qpid.broker.bdbCommiterNotifyThreshold";
+    String QPID_BROKER_BDB_COMMITER_WAIT_TIMEOUT = "qpid.broker.bdbCommiterWaitTimeout";
 
     // Default the JE cache to 5% of total memory, but no less than 10Mb
     @ManagedContextDefault(name= QPID_BROKER_BDB_TOTAL_CACHE_SIZE)
     long DEFAULT_JE_CACHE_SIZE = Math.max(BDB_MIN_CACHE_SIZE, Runtime.getRuntime().maxMemory()/20l);
 
+    @SuppressWarnings("unused")
+    @ManagedContextDefault(name = QPID_BROKER_BDB_COMMITER_NOTIFY_THRESHOLD, description = "Threshold for amount of messages triggering BDB log flush to the disk")
+    int DEFAULT_QPID_BROKER_BDB_COMMITER_NOTIFY_THRESHOLD = 8;
+
+    @SuppressWarnings("unused")
+    @ManagedContextDefault(name = QPID_BROKER_BDB_COMMITER_WAIT_TIMEOUT, description = "Timeout for BDB log flush to the disk")
+    long DEFAULT_QPID_BROKER_BDB_COMMITER_WAIT_TIMEOUT = 500L;
+
     @Override
     @ManagedAttribute(mandatory = true, defaultValue = "${qpid.work_dir}${file.separator}${this:name}${file.separator}messages")
     String getStorePath();
diff --git a/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/CoalescingCommitterTest.java b/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/CoalescingCommitterTest.java
index 4ac18c2..f7968b8 100644
--- a/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/CoalescingCommitterTest.java
+++ b/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/CoalescingCommitterTest.java
@@ -53,7 +53,7 @@
         assumeThat(getVirtualHostNodeStoreType(), is(equalTo(VirtualHostNodeStoreType.BDB)));
 
         _environmentFacade = mock(EnvironmentFacade.class);
-        _coalescingCommitter = new CoalescingCommiter("Test", _environmentFacade);
+        _coalescingCommitter = new CoalescingCommiter("Test", 8, 500, _environmentFacade);
         _coalescingCommitter.start();
     }
 
@@ -119,4 +119,4 @@
         verify(_environmentFacade, times(2)).flushLog();
         verify(_environmentFacade, times(1)).flushLogFailed(testFailure);
     }
-}
\ No newline at end of file
+}
diff --git a/doc/java-broker/src/docbkx/Java-Broker-High-Availability.xml b/doc/java-broker/src/docbkx/Java-Broker-High-Availability.xml
index 121e4be..ff5d7d2 100644
--- a/doc/java-broker/src/docbkx/Java-Broker-High-Availability.xml
+++ b/doc/java-broker/src/docbkx/Java-Broker-High-Availability.xml
@@ -294,7 +294,10 @@
             <para><emphasis>NO_SYNC</emphasis>. The node immediately sends the acknowledgement. The
               transaction will be written and OS level buffers flushed as some point later. NO_SYNC
               offers the highest performance but the lowest durability level. This synchronization
-              policy is sometimes known as <emphasis>commit to the network</emphasis>.</para>
+              policy is sometimes known as <emphasis>commit to the network</emphasis>. Flushing
+              behavior can be influenced by virtual host context parameters "qpid.broker.bdbCommiterNotifyThreshold"
+              (defines threshold for amount of messages triggering BDB log flush to the disk) and
+              "qpid.broker.bdbCommiterWaitTimeout" (defines timeout for BDB log flush to the disk).</para>
           </listitem>
         </itemizedlist></para>
       <para>It is possible to assign a one policy to the master and a different policy to the