QPID-7763: [Java Broker] Flow to disk if allocated direct memory exceeds broker wide broker.flowToDiskThreshold
diff --git a/broker-core/src/main/java/org/apache/qpid/server/model/Broker.java b/broker-core/src/main/java/org/apache/qpid/server/model/Broker.java
index 86a8d09..533d6e2 100644
--- a/broker-core/src/main/java/org/apache/qpid/server/model/Broker.java
+++ b/broker-core/src/main/java/org/apache/qpid/server/model/Broker.java
@@ -371,6 +371,9 @@
 
     ScheduledFuture<?> scheduleTask(long delay, final TimeUnit unit, Runnable task);
 
+    @DerivedAttribute(description = "Threshold direct memory size (in bytes) at which the Broker will start flowing incoming messages to disk.")
+    long getFlowToDiskThreshold();
+
     @DerivedAttribute(description = "Threshold direct memory size (in bytes) at which the Broker will start considering to compact sparse buffers. Set to -1 to disable. See also " + MEMORY_OCCUPANCY_THRESHOLD)
     long getCompactMemoryThreshold();
 
diff --git a/broker-core/src/main/java/org/apache/qpid/server/model/BrokerImpl.java b/broker-core/src/main/java/org/apache/qpid/server/model/BrokerImpl.java
index 0a28e2b..c5c4aea 100644
--- a/broker-core/src/main/java/org/apache/qpid/server/model/BrokerImpl.java
+++ b/broker-core/src/main/java/org/apache/qpid/server/model/BrokerImpl.java
@@ -152,6 +152,7 @@
     private long _compactMemoryInterval;
     private double _memoryOccupancyThreshold;
     private long _memoryCompactionIncrement;
+    private long _flowToDiskThreshold;
 
     @ManagedObjectFactoryConstructor
     public BrokerImpl(Map<String, Object> attributes,
@@ -518,8 +519,7 @@
     @Override
     public synchronized void assignTargetSizes()
     {
-        long totalTarget = getContextValue(Long.class, BROKER_FLOW_TO_DISK_THRESHOLD);
-        LOGGER.debug("Assigning target sizes based on total target {}", totalTarget);
+        LOGGER.debug("Assigning target sizes based on total target {}", _flowToDiskThreshold);
         long totalSize = 0l;
         Collection<VirtualHostNode<?>> vhns = getVirtualHostNodes();
         Map<QueueManagingVirtualHost<?>, Long> vhs = new HashMap<>();
@@ -535,20 +535,20 @@
             }
         }
 
-        if (totalSize > totalTarget && !_totalMessageSizeExceedThresholdReported)
+        if (totalSize > _flowToDiskThreshold && !_totalMessageSizeExceedThresholdReported)
         {
-            _eventLogger.message(BrokerMessages.FLOW_TO_DISK_ACTIVE(totalSize / 1024, totalTarget / 1024));
+            _eventLogger.message(BrokerMessages.FLOW_TO_DISK_ACTIVE(totalSize / 1024, _flowToDiskThreshold / 1024));
             _totalMessageSizeExceedThresholdReported = true;
             _totalMessageSizeWithinThresholdReported = false;
         }
-        else if (totalSize <= totalTarget && !_totalMessageSizeWithinThresholdReported)
+        else if (totalSize <= _flowToDiskThreshold && !_totalMessageSizeWithinThresholdReported)
         {
-            _eventLogger.message(BrokerMessages.FLOW_TO_DISK_INACTIVE(totalSize / 1024, totalTarget / 1024));
+            _eventLogger.message(BrokerMessages.FLOW_TO_DISK_INACTIVE(totalSize / 1024, _flowToDiskThreshold / 1024));
             _totalMessageSizeWithinThresholdReported = true;
             _totalMessageSizeExceedThresholdReported = false;
         }
 
-        final long proportionalShare = (long) ((double) totalTarget / (double) vhs.size());
+        final long proportionalShare = (long) ((double) _flowToDiskThreshold / (double) vhs.size());
         for (Map.Entry<QueueManagingVirtualHost<?>, Long> entry : vhs.entrySet())
         {
             long virtualHostTotalQueueSize = entry.getValue();
@@ -559,7 +559,7 @@
             }
             else
             {
-                long queueSizeBasedShare = (totalTarget * virtualHostTotalQueueSize) / (2 * totalSize);
+                long queueSizeBasedShare = (_flowToDiskThreshold * virtualHostTotalQueueSize) / (2 * totalSize);
                 size = queueSizeBasedShare + (proportionalShare / 2);
             }
 
@@ -594,6 +594,7 @@
         long heapMemory = Runtime.getRuntime().maxMemory();
         getEventLogger().message(BrokerMessages.MAX_MEMORY(heapMemory, directMemory));
 
+        _flowToDiskThreshold = getContextValue(Long.class, BROKER_FLOW_TO_DISK_THRESHOLD);
         _compactMemoryThreshold = getContextValue(Long.class, Broker.COMPACT_MEMORY_THRESHOLD);
         _compactMemoryInterval = getContextValue(Long.class, Broker.COMPACT_MEMORY_INTERVAL);
         _memoryOccupancyThreshold = getContextValue(Double.class, Broker.MEMORY_OCCUPANCY_THRESHOLD);
@@ -854,6 +855,12 @@
     }
 
     @Override
+    public long getFlowToDiskThreshold()
+    {
+        return _flowToDiskThreshold;
+    }
+
+    @Override
     public long getNumberOfActivePooledBuffers()
     {
         return QpidByteBuffer.getNumberOfActivePooledBuffers();
diff --git a/broker-core/src/main/java/org/apache/qpid/server/queue/AbstractQueue.java b/broker-core/src/main/java/org/apache/qpid/server/queue/AbstractQueue.java
index 92a994d..2a46c82 100644
--- a/broker-core/src/main/java/org/apache/qpid/server/queue/AbstractQueue.java
+++ b/broker-core/src/main/java/org/apache/qpid/server/queue/AbstractQueue.java
@@ -65,6 +65,7 @@
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import org.apache.qpid.server.bytebuffer.QpidByteBuffer;
 import org.apache.qpid.server.bytebuffer.QpidByteBufferInputStream;
 import org.apache.qpid.server.configuration.IllegalConfigurationException;
 import org.apache.qpid.server.configuration.updater.Task;
@@ -265,6 +266,7 @@
     private AdvanceConsumersTask _queueHouseKeepingTask;
     private volatile int _bindingCount;
     private volatile OverflowPolicyHandler _overflowPolicyHandler;
+    private long _flowToDiskThreshold;
 
     private interface HoldMethod
     {
@@ -469,6 +471,7 @@
         }
 
         _mimeTypeToFileExtension = getContextValue(Map.class, MAP_OF_STRING_STRING, MIME_TYPE_TO_FILE_EXTENSION);
+        _flowToDiskThreshold = getAncestor(Broker.class).getFlowToDiskThreshold();
 
         if(_defaultFilters != null)
         {
@@ -3332,7 +3335,9 @@
 
         void flowToDiskIfNecessary(StoredMessage<?> storedMessage, long estimatedQueueSize, final long targetQueueSize)
         {
-            if ((estimatedQueueSize > targetQueueSize) && storedMessage.isInMemory())
+            if ((estimatedQueueSize > targetQueueSize
+                 || QpidByteBuffer.getAllocatedDirectMemorySize() > _flowToDiskThreshold)
+                && storedMessage.isInMemory())
             {
                 storedMessage.flowToDisk();
             }
@@ -3348,7 +3353,8 @@
 
         void reportFlowToDiskStatusIfNecessary(final long estimatedQueueSize, final long targetQueueSize)
         {
-            if (estimatedQueueSize > targetQueueSize)
+            if (estimatedQueueSize > targetQueueSize
+                || QpidByteBuffer.getAllocatedDirectMemorySize() > _flowToDiskThreshold)
             {
                 reportFlowToDiskActiveIfNecessary(estimatedQueueSize, targetQueueSize);
             }