QPID-8547: [Broker-J] Configurable parameters for CoalescingCommiter
This closes #101
diff --git a/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/CoalescingCommiter.java b/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/CoalescingCommiter.java
index 3dceae4..a4d082d 100644
--- a/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/CoalescingCommiter.java
+++ b/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/CoalescingCommiter.java
@@ -40,9 +40,9 @@
{
private final CommitThread _commitThread;
- public CoalescingCommiter(String name, EnvironmentFacade environmentFacade)
+ public CoalescingCommiter(String name, int commiterNotifyThreshold, long commiterWaitTimeout, EnvironmentFacade environmentFacade)
{
- _commitThread = new CommitThread("Commit-Thread-" + name, environmentFacade);
+ _commitThread = new CommitThread("Commit-Thread-" + name, commiterNotifyThreshold, commiterWaitTimeout, environmentFacade);
}
@Override
@@ -134,8 +134,9 @@
private static class CommitThread extends Thread
{
private static final Logger LOGGER = LoggerFactory.getLogger(CommitThread.class);
- private static final int JOB_QUEUE_NOTIFY_THRESHOLD = 8;
+ private final int _jobQueueNotifyThreshold;
+ private final long _commiterWaitTimeout;
private final AtomicBoolean _stopped = new AtomicBoolean(false);
private final Queue<CommitThreadJob> _jobQueue = new ConcurrentLinkedQueue<>();
private final Object _lock = new Object();
@@ -143,9 +144,11 @@
private final List<CommitThreadJob> _inProcessJobs = new ArrayList<>(256);
- public CommitThread(String name, EnvironmentFacade environmentFacade)
+ public CommitThread(String name, int commiterNotifyThreshold, long commiterWaitTimeout, EnvironmentFacade environmentFacade)
{
super(name);
+ this._jobQueueNotifyThreshold = commiterNotifyThreshold;
+ this._commiterWaitTimeout = commiterWaitTimeout;
_environmentFacade = environmentFacade;
}
@@ -170,7 +173,7 @@
{
// Periodically wake up and check, just in case we
// missed a notification. Don't want to lock the broker hard.
- _lock.wait(500);
+ _lock.wait(_commiterWaitTimeout);
}
catch (InterruptedException e)
{
@@ -248,7 +251,7 @@
throw new IllegalStateException("Commit thread is stopped");
}
_jobQueue.add(commit);
- if(sync || _jobQueue.size() >= JOB_QUEUE_NOTIFY_THRESHOLD)
+ if(sync || _jobQueue.size() >= _jobQueueNotifyThreshold)
{
synchronized (_lock)
{
diff --git a/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/StandardEnvironmentFacade.java b/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/StandardEnvironmentFacade.java
index 3b55d24..59070a1 100644
--- a/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/StandardEnvironmentFacade.java
+++ b/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/StandardEnvironmentFacade.java
@@ -32,6 +32,7 @@
import java.util.concurrent.atomic.AtomicReference;
import com.google.common.util.concurrent.ListenableFuture;
+
import com.sleepycat.je.CheckpointConfig;
import com.sleepycat.je.Database;
import com.sleepycat.je.DatabaseConfig;
@@ -45,14 +46,15 @@
import com.sleepycat.je.SequenceConfig;
import com.sleepycat.je.Transaction;
import com.sleepycat.je.TransactionConfig;
-import org.apache.qpid.server.model.ConfiguredObject;
-import org.apache.qpid.server.store.berkeleydb.upgrade.Upgrader;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import org.apache.qpid.server.model.ConfiguredObject;
import org.apache.qpid.server.store.StoreException;
import org.apache.qpid.server.store.berkeleydb.logging.Slf4jLoggingHandler;
+import org.apache.qpid.server.store.berkeleydb.upgrade.Upgrader;
+import org.apache.qpid.server.virtualhost.berkeleydb.BDBVirtualHost;
public class StandardEnvironmentFacade implements EnvironmentFacade
{
@@ -154,7 +156,17 @@
}
}
- _committer = new CoalescingCommiter(name, this);
+ final int commiterNotifyThreshold = configuration.getFacadeParameter(
+ Integer.class,
+ BDBVirtualHost.QPID_BROKER_BDB_COMMITER_NOTIFY_THRESHOLD,
+ BDBVirtualHost.DEFAULT_QPID_BROKER_BDB_COMMITER_NOTIFY_THRESHOLD
+ );
+ final long commiterWaitTimeout = configuration.getFacadeParameter(
+ Long.class,
+ BDBVirtualHost.QPID_BROKER_BDB_COMMITER_WAIT_TIMEOUT,
+ BDBVirtualHost.DEFAULT_QPID_BROKER_BDB_COMMITER_WAIT_TIMEOUT
+ );
+ _committer = new CoalescingCommiter(name, commiterNotifyThreshold, commiterWaitTimeout, this);
_committer.start();
}
diff --git a/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/replication/ReplicatedEnvironmentFacade.java b/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/replication/ReplicatedEnvironmentFacade.java
index 3e201ba..e3a891f 100644
--- a/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/replication/ReplicatedEnvironmentFacade.java
+++ b/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/replication/ReplicatedEnvironmentFacade.java
@@ -23,8 +23,10 @@
import java.io.ByteArrayOutputStream;
import java.io.File;
import java.io.IOException;
+
import java.net.InetSocketAddress;
import java.net.SocketTimeoutException;
+
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
@@ -51,10 +53,49 @@
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.ListeningExecutorService;
import com.google.common.util.concurrent.MoreExecutors;
-import com.sleepycat.je.*;
+
+import com.sleepycat.je.CheckpointConfig;
+import com.sleepycat.je.Database;
+import com.sleepycat.je.DatabaseConfig;
+import com.sleepycat.je.DatabaseEntry;
+import com.sleepycat.je.DatabaseException;
+import com.sleepycat.je.DbInternal;
+import com.sleepycat.je.Durability;
import com.sleepycat.je.Durability.ReplicaAckPolicy;
import com.sleepycat.je.Durability.SyncPolicy;
-import com.sleepycat.je.rep.*;
+import com.sleepycat.je.EnvironmentConfig;
+import com.sleepycat.je.EnvironmentFailureException;
+import com.sleepycat.je.EnvironmentMutableConfig;
+import com.sleepycat.je.ExceptionEvent;
+import com.sleepycat.je.LogWriteException;
+import com.sleepycat.je.Sequence;
+import com.sleepycat.je.SequenceConfig;
+import com.sleepycat.je.Transaction;
+import com.sleepycat.je.TransactionConfig;
+import com.sleepycat.je.rep.AppStateMonitor;
+import com.sleepycat.je.rep.InsufficientAcksException;
+import com.sleepycat.je.rep.InsufficientLogException;
+import com.sleepycat.je.rep.InsufficientReplicasException;
+import com.sleepycat.je.rep.MasterStateException;
+import com.sleepycat.je.rep.MemberNotFoundException;
+import com.sleepycat.je.rep.NetworkRestore;
+import com.sleepycat.je.rep.NetworkRestoreConfig;
+import com.sleepycat.je.rep.NoConsistencyRequiredPolicy;
+import com.sleepycat.je.rep.NodeState;
+import com.sleepycat.je.rep.NodeType;
+import com.sleepycat.je.rep.RepInternal;
+import com.sleepycat.je.rep.ReplicaConsistencyException;
+import com.sleepycat.je.rep.ReplicaWriteException;
+import com.sleepycat.je.rep.ReplicatedEnvironment;
+import com.sleepycat.je.rep.ReplicationConfig;
+import com.sleepycat.je.rep.ReplicationGroup;
+import com.sleepycat.je.rep.ReplicationMutableConfig;
+import com.sleepycat.je.rep.ReplicationNode;
+import com.sleepycat.je.rep.RestartRequiredException;
+import com.sleepycat.je.rep.RollbackException;
+import com.sleepycat.je.rep.StateChangeEvent;
+import com.sleepycat.je.rep.StateChangeListener;
+import com.sleepycat.je.rep.UnknownMasterException;
import com.sleepycat.je.rep.impl.node.NameIdPair;
import com.sleepycat.je.rep.util.DbPing;
import com.sleepycat.je.rep.util.ReplicationGroupAdmin;
@@ -64,6 +105,7 @@
import com.sleepycat.je.rep.vlsn.VLSNRange;
import com.sleepycat.je.utilint.PropUtil;
import com.sleepycat.je.utilint.VLSN;
+
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -83,6 +125,7 @@
import org.apache.qpid.server.util.ExternalServiceException;
import org.apache.qpid.server.util.ExternalServiceTimeoutException;
import org.apache.qpid.server.util.ServerScopedRuntimeException;
+import org.apache.qpid.server.virtualhost.berkeleydb.BDBVirtualHost;
public class ReplicatedEnvironmentFacade implements EnvironmentFacade, StateChangeListener
{
@@ -1722,7 +1765,17 @@
if (!_disableCoalescingCommiter && localTransactionSynchronizationPolicy == LOCAL_TRANSACTION_SYNCHRONIZATION_POLICY)
{
localTransactionSynchronizationPolicy = SyncPolicy.NO_SYNC;
- _coalescingCommiter = new CoalescingCommiter(_configuration.getGroupName(), this);
+ final int commiterNotifyThreshold = _configuration.getFacadeParameter(
+ Integer.class,
+ BDBVirtualHost.QPID_BROKER_BDB_COMMITER_NOTIFY_THRESHOLD,
+ BDBVirtualHost.DEFAULT_QPID_BROKER_BDB_COMMITER_NOTIFY_THRESHOLD
+ );
+ final long commiterWaitTimeout = _configuration.getFacadeParameter(
+ Long.class,
+ BDBVirtualHost.QPID_BROKER_BDB_COMMITER_WAIT_TIMEOUT,
+ BDBVirtualHost.DEFAULT_QPID_BROKER_BDB_COMMITER_WAIT_TIMEOUT
+ );
+ _coalescingCommiter = new CoalescingCommiter(_configuration.getGroupName(), commiterNotifyThreshold, commiterWaitTimeout, this);
_coalescingCommiter.start();
}
_realMessageStoreDurability = new Durability(localTransactionSynchronizationPolicy, remoteTransactionSynchronizationPolicy, replicaAcknowledgmentPolicy);
diff --git a/bdbstore/src/main/java/org/apache/qpid/server/virtualhost/berkeleydb/BDBVirtualHost.java b/bdbstore/src/main/java/org/apache/qpid/server/virtualhost/berkeleydb/BDBVirtualHost.java
index 9b44503..08db3bc 100644
--- a/bdbstore/src/main/java/org/apache/qpid/server/virtualhost/berkeleydb/BDBVirtualHost.java
+++ b/bdbstore/src/main/java/org/apache/qpid/server/virtualhost/berkeleydb/BDBVirtualHost.java
@@ -37,11 +37,21 @@
long BDB_MIN_CACHE_SIZE = 10*1024*1024;
String QPID_BROKER_BDB_TOTAL_CACHE_SIZE = "qpid.broker.bdbTotalCacheSize";
+ String QPID_BROKER_BDB_COMMITER_NOTIFY_THRESHOLD = "qpid.broker.bdbCommiterNotifyThreshold";
+ String QPID_BROKER_BDB_COMMITER_WAIT_TIMEOUT = "qpid.broker.bdbCommiterWaitTimeout";
// Default the JE cache to 5% of total memory, but no less than 10Mb
@ManagedContextDefault(name= QPID_BROKER_BDB_TOTAL_CACHE_SIZE)
long DEFAULT_JE_CACHE_SIZE = Math.max(BDB_MIN_CACHE_SIZE, Runtime.getRuntime().maxMemory()/20l);
+ @SuppressWarnings("unused")
+ @ManagedContextDefault(name = QPID_BROKER_BDB_COMMITER_NOTIFY_THRESHOLD, description = "Threshold for amount of messages triggering BDB log flush to the disk")
+ int DEFAULT_QPID_BROKER_BDB_COMMITER_NOTIFY_THRESHOLD = 8;
+
+ @SuppressWarnings("unused")
+ @ManagedContextDefault(name = QPID_BROKER_BDB_COMMITER_WAIT_TIMEOUT, description = "Timeout for BDB log flush to the disk")
+ long DEFAULT_QPID_BROKER_BDB_COMMITER_WAIT_TIMEOUT = 500L;
+
@Override
@ManagedAttribute(mandatory = true, defaultValue = "${qpid.work_dir}${file.separator}${this:name}${file.separator}messages")
String getStorePath();
diff --git a/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/CoalescingCommitterTest.java b/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/CoalescingCommitterTest.java
index 4ac18c2..f7968b8 100644
--- a/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/CoalescingCommitterTest.java
+++ b/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/CoalescingCommitterTest.java
@@ -53,7 +53,7 @@
assumeThat(getVirtualHostNodeStoreType(), is(equalTo(VirtualHostNodeStoreType.BDB)));
_environmentFacade = mock(EnvironmentFacade.class);
- _coalescingCommitter = new CoalescingCommiter("Test", _environmentFacade);
+ _coalescingCommitter = new CoalescingCommiter("Test", 8, 500, _environmentFacade);
_coalescingCommitter.start();
}
@@ -119,4 +119,4 @@
verify(_environmentFacade, times(2)).flushLog();
verify(_environmentFacade, times(1)).flushLogFailed(testFailure);
}
-}
\ No newline at end of file
+}
diff --git a/doc/java-broker/src/docbkx/Java-Broker-High-Availability.xml b/doc/java-broker/src/docbkx/Java-Broker-High-Availability.xml
index 121e4be..ff5d7d2 100644
--- a/doc/java-broker/src/docbkx/Java-Broker-High-Availability.xml
+++ b/doc/java-broker/src/docbkx/Java-Broker-High-Availability.xml
@@ -294,7 +294,10 @@
<para><emphasis>NO_SYNC</emphasis>. The node immediately sends the acknowledgement. The
transaction will be written and OS level buffers flushed as some point later. NO_SYNC
offers the highest performance but the lowest durability level. This synchronization
- policy is sometimes known as <emphasis>commit to the network</emphasis>.</para>
+ policy is sometimes known as <emphasis>commit to the network</emphasis>. Flushing
+ behavior can be influenced by virtual host context parameters "qpid.broker.bdbCommiterNotifyThreshold"
+ (defines threshold for amount of messages triggering BDB log flush to the disk) and
+ "qpid.broker.bdbCommiterWaitTimeout" (defines timeout for BDB log flush to the disk).</para>
</listitem>
</itemizedlist></para>
<para>It is possible to assign a one policy to the master and a different policy to the