QPID-7763: [Java Broker] Flow to disk if allocated direct memory exceeds broker wide broker.flowToDiskThreshold
diff --git a/broker-core/src/main/java/org/apache/qpid/server/model/Broker.java b/broker-core/src/main/java/org/apache/qpid/server/model/Broker.java
index 86a8d09..533d6e2 100644
--- a/broker-core/src/main/java/org/apache/qpid/server/model/Broker.java
+++ b/broker-core/src/main/java/org/apache/qpid/server/model/Broker.java
@@ -371,6 +371,9 @@
ScheduledFuture<?> scheduleTask(long delay, final TimeUnit unit, Runnable task);
+ @DerivedAttribute(description = "Threshold direct memory size (in bytes) at which the Broker will start flowing incoming messages to disk.")
+ long getFlowToDiskThreshold();
+
@DerivedAttribute(description = "Threshold direct memory size (in bytes) at which the Broker will start considering to compact sparse buffers. Set to -1 to disable. See also " + MEMORY_OCCUPANCY_THRESHOLD)
long getCompactMemoryThreshold();
diff --git a/broker-core/src/main/java/org/apache/qpid/server/model/BrokerImpl.java b/broker-core/src/main/java/org/apache/qpid/server/model/BrokerImpl.java
index 0a28e2b..c5c4aea 100644
--- a/broker-core/src/main/java/org/apache/qpid/server/model/BrokerImpl.java
+++ b/broker-core/src/main/java/org/apache/qpid/server/model/BrokerImpl.java
@@ -152,6 +152,7 @@
private long _compactMemoryInterval;
private double _memoryOccupancyThreshold;
private long _memoryCompactionIncrement;
+ private long _flowToDiskThreshold;
@ManagedObjectFactoryConstructor
public BrokerImpl(Map<String, Object> attributes,
@@ -518,8 +519,7 @@
@Override
public synchronized void assignTargetSizes()
{
- long totalTarget = getContextValue(Long.class, BROKER_FLOW_TO_DISK_THRESHOLD);
- LOGGER.debug("Assigning target sizes based on total target {}", totalTarget);
+ LOGGER.debug("Assigning target sizes based on total target {}", _flowToDiskThreshold);
long totalSize = 0l;
Collection<VirtualHostNode<?>> vhns = getVirtualHostNodes();
Map<QueueManagingVirtualHost<?>, Long> vhs = new HashMap<>();
@@ -535,20 +535,20 @@
}
}
- if (totalSize > totalTarget && !_totalMessageSizeExceedThresholdReported)
+ if (totalSize > _flowToDiskThreshold && !_totalMessageSizeExceedThresholdReported)
{
- _eventLogger.message(BrokerMessages.FLOW_TO_DISK_ACTIVE(totalSize / 1024, totalTarget / 1024));
+ _eventLogger.message(BrokerMessages.FLOW_TO_DISK_ACTIVE(totalSize / 1024, _flowToDiskThreshold / 1024));
_totalMessageSizeExceedThresholdReported = true;
_totalMessageSizeWithinThresholdReported = false;
}
- else if (totalSize <= totalTarget && !_totalMessageSizeWithinThresholdReported)
+ else if (totalSize <= _flowToDiskThreshold && !_totalMessageSizeWithinThresholdReported)
{
- _eventLogger.message(BrokerMessages.FLOW_TO_DISK_INACTIVE(totalSize / 1024, totalTarget / 1024));
+ _eventLogger.message(BrokerMessages.FLOW_TO_DISK_INACTIVE(totalSize / 1024, _flowToDiskThreshold / 1024));
_totalMessageSizeWithinThresholdReported = true;
_totalMessageSizeExceedThresholdReported = false;
}
- final long proportionalShare = (long) ((double) totalTarget / (double) vhs.size());
+ final long proportionalShare = (long) ((double) _flowToDiskThreshold / (double) vhs.size());
for (Map.Entry<QueueManagingVirtualHost<?>, Long> entry : vhs.entrySet())
{
long virtualHostTotalQueueSize = entry.getValue();
@@ -559,7 +559,7 @@
}
else
{
- long queueSizeBasedShare = (totalTarget * virtualHostTotalQueueSize) / (2 * totalSize);
+ long queueSizeBasedShare = (_flowToDiskThreshold * virtualHostTotalQueueSize) / (2 * totalSize);
size = queueSizeBasedShare + (proportionalShare / 2);
}
@@ -594,6 +594,7 @@
long heapMemory = Runtime.getRuntime().maxMemory();
getEventLogger().message(BrokerMessages.MAX_MEMORY(heapMemory, directMemory));
+ _flowToDiskThreshold = getContextValue(Long.class, BROKER_FLOW_TO_DISK_THRESHOLD);
_compactMemoryThreshold = getContextValue(Long.class, Broker.COMPACT_MEMORY_THRESHOLD);
_compactMemoryInterval = getContextValue(Long.class, Broker.COMPACT_MEMORY_INTERVAL);
_memoryOccupancyThreshold = getContextValue(Double.class, Broker.MEMORY_OCCUPANCY_THRESHOLD);
@@ -854,6 +855,12 @@
}
@Override
+ public long getFlowToDiskThreshold()
+ {
+ return _flowToDiskThreshold;
+ }
+
+ @Override
public long getNumberOfActivePooledBuffers()
{
return QpidByteBuffer.getNumberOfActivePooledBuffers();
diff --git a/broker-core/src/main/java/org/apache/qpid/server/queue/AbstractQueue.java b/broker-core/src/main/java/org/apache/qpid/server/queue/AbstractQueue.java
index 92a994d..2a46c82 100644
--- a/broker-core/src/main/java/org/apache/qpid/server/queue/AbstractQueue.java
+++ b/broker-core/src/main/java/org/apache/qpid/server/queue/AbstractQueue.java
@@ -65,6 +65,7 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import org.apache.qpid.server.bytebuffer.QpidByteBuffer;
import org.apache.qpid.server.bytebuffer.QpidByteBufferInputStream;
import org.apache.qpid.server.configuration.IllegalConfigurationException;
import org.apache.qpid.server.configuration.updater.Task;
@@ -265,6 +266,7 @@
private AdvanceConsumersTask _queueHouseKeepingTask;
private volatile int _bindingCount;
private volatile OverflowPolicyHandler _overflowPolicyHandler;
+ private long _flowToDiskThreshold;
private interface HoldMethod
{
@@ -469,6 +471,7 @@
}
_mimeTypeToFileExtension = getContextValue(Map.class, MAP_OF_STRING_STRING, MIME_TYPE_TO_FILE_EXTENSION);
+ _flowToDiskThreshold = getAncestor(Broker.class).getFlowToDiskThreshold();
if(_defaultFilters != null)
{
@@ -3332,7 +3335,9 @@
void flowToDiskIfNecessary(StoredMessage<?> storedMessage, long estimatedQueueSize, final long targetQueueSize)
{
- if ((estimatedQueueSize > targetQueueSize) && storedMessage.isInMemory())
+ if ((estimatedQueueSize > targetQueueSize
+ || QpidByteBuffer.getAllocatedDirectMemorySize() > _flowToDiskThreshold)
+ && storedMessage.isInMemory())
{
storedMessage.flowToDisk();
}
@@ -3348,7 +3353,8 @@
void reportFlowToDiskStatusIfNecessary(final long estimatedQueueSize, final long targetQueueSize)
{
- if (estimatedQueueSize > targetQueueSize)
+ if (estimatedQueueSize > targetQueueSize
+ || QpidByteBuffer.getAllocatedDirectMemorySize() > _flowToDiskThreshold)
{
reportFlowToDiskActiveIfNecessary(estimatedQueueSize, targetQueueSize);
}