QPID-7763: [Java Broker] Avoid allocating large non-pooled direct byte buffers in WebSocketProvider, MemoryMessageStore, and message inflation
diff --git a/broker-core/src/main/java/org/apache/qpid/server/bytebuffer/QpidByteBuffer.java b/broker-core/src/main/java/org/apache/qpid/server/bytebuffer/QpidByteBuffer.java
index fe5db24..203b009 100644
--- a/broker-core/src/main/java/org/apache/qpid/server/bytebuffer/QpidByteBuffer.java
+++ b/broker-core/src/main/java/org/apache/qpid/server/bytebuffer/QpidByteBuffer.java
@@ -655,6 +655,24 @@
         }
     }
 
+    public static Collection<QpidByteBuffer> asQpidByteBuffers(final byte[] data)
+    {
+        return asQpidByteBuffers(data, 0, data.length);
+    }
+
+    public static Collection<QpidByteBuffer> asQpidByteBuffers(final byte[] data, final int offset, final int length)
+    {
+        try (QpidByteBufferOutputStream outputStream = new QpidByteBufferOutputStream(true, getPooledBufferSize()))
+        {
+            outputStream.write(data, offset, length);
+            return outputStream.fetchAccumulatedBuffers();
+        }
+        catch (IOException e)
+        {
+            throw new RuntimeException("unexpected Error converting array to QpidByteBuffers", e);
+        }
+    }
+
     public static SSLEngineResult encryptSSL(SSLEngine engine,
                                              final Collection<QpidByteBuffer> buffers,
                                              QpidByteBuffer dest) throws SSLException
@@ -700,10 +718,7 @@
             int read;
             while ((read = gzipInputStream.read(buf)) != -1)
             {
-                QpidByteBuffer output = isDirect ? allocateDirect(read) : allocate(read);
-                output.put(buf, 0, read);
-                output.flip();
-                uncompressedBuffers.add(output);
+                uncompressedBuffers.addAll(asQpidByteBuffers(buf, 0, read));
             }
             return uncompressedBuffers;
         }
diff --git a/broker-core/src/main/java/org/apache/qpid/server/store/StoredMemoryMessage.java b/broker-core/src/main/java/org/apache/qpid/server/store/StoredMemoryMessage.java
index a2a7837..dc54fbc 100755
--- a/broker-core/src/main/java/org/apache/qpid/server/store/StoredMemoryMessage.java
+++ b/broker-core/src/main/java/org/apache/qpid/server/store/StoredMemoryMessage.java
@@ -21,8 +21,12 @@
 
 package org.apache.qpid.server.store;
 
+import java.util.ArrayList;
 import java.util.Collection;
-import java.util.Collections;
+import java.util.Iterator;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Queue;
 
 import org.apache.qpid.server.bytebuffer.QpidByteBuffer;
 
@@ -30,7 +34,7 @@
 {
     private final long _messageNumber;
     private final int _contentSize;
-    private volatile QpidByteBuffer _content;
+    private final Queue<QpidByteBuffer> _content = new LinkedList<>();
     private volatile T _metaData;
 
     public StoredMemoryMessage(long messageNumber, T metaData)
@@ -47,41 +51,12 @@
 
     public synchronized void addContent(QpidByteBuffer src)
     {
-        if(_content == null)
-        {
-            _content = src.slice();
-            _content.position(_content.limit());
-        }
-        else
-        {
-            if(_content.remaining() >= src.remaining())
-            {
-                _content.putCopyOf(src);
-            }
-            else
-            {
-                final int contentSize = getContentSize();
-                int size = (contentSize < _content.position() + src.remaining())
-                        ? _content.position() + src.remaining()
-                        : contentSize;
-                QpidByteBuffer oldContent = _content;
-                oldContent.flip();
-                _content = QpidByteBuffer.allocateDirect(size);
-                _content.put(oldContent);
-                _content.putCopyOf(src);
-                oldContent.dispose();
-            }
-
-        }
+        _content.add(src.slice());
     }
 
     @Override
     public synchronized StoredMessage<T> allContentAdded()
     {
-        if(_content != null)
-        {
-            _content.flip();
-        }
         return this;
     }
 
@@ -89,11 +64,44 @@
     @Override
     public synchronized Collection<QpidByteBuffer> getContent(int offset, int length)
     {
-        if(_content == null)
+        Collection<QpidByteBuffer> content = new ArrayList<>(_content.size());
+        int pos = 0;
+        for (QpidByteBuffer buf : _content)
         {
-            return Collections.emptyList();
+            if (length > 0)
+            {
+                int bufRemaining = buf.remaining();
+                if (pos + bufRemaining <= offset)
+                {
+                    pos += bufRemaining;
+                }
+                else if (pos >= offset)
+                {
+                    buf = buf.duplicate();
+                    if (bufRemaining <= length)
+                    {
+                        length -= bufRemaining;
+                    }
+                    else
+                    {
+                        buf.limit(length);
+                        length = 0;
+                    }
+                    content.add(buf);
+                    pos += buf.remaining();
+                }
+                else
+                {
+                    int offsetInBuf = offset - pos;
+                    int limit = length < bufRemaining - offsetInBuf ? length : bufRemaining - offsetInBuf;
+                    final QpidByteBuffer bufView = buf.view(offsetInBuf, limit);
+                    content.add(bufView);
+                    length -= limit;
+                    pos += limit + offsetInBuf;
+                }
+            }
         }
-        return Collections.singleton(_content.view(offset, length));
+        return content;
     }
 
     @Override
@@ -113,8 +121,11 @@
         _metaData = null;
         if (_content != null)
         {
-            _content.dispose();
-            _content = null;
+            for (QpidByteBuffer content : _content)
+            {
+                content.dispose();
+            }
+            _content.clear();
         }
     }
 
@@ -134,6 +145,13 @@
     public synchronized void reallocate(final long smallestAllowedBufferId)
     {
         _metaData.reallocate(smallestAllowedBufferId);
-        _content = QpidByteBuffer.reallocateIfNecessary(smallestAllowedBufferId, _content);
+        List<QpidByteBuffer> newContent = new ArrayList<>(_content.size());
+        for (Iterator<QpidByteBuffer> iterator = _content.iterator(); iterator.hasNext(); )
+        {
+            final QpidByteBuffer buffer = iterator.next();
+            newContent.add(QpidByteBuffer.reallocateIfNecessary(smallestAllowedBufferId, buffer));
+            iterator.remove();
+        }
+        _content.addAll(newContent);
     }
 }
diff --git a/broker-plugins/websocket/src/main/java/org/apache/qpid/server/transport/websocket/WebSocketProvider.java b/broker-plugins/websocket/src/main/java/org/apache/qpid/server/transport/websocket/WebSocketProvider.java
index b923f51..d96c1d2 100644
--- a/broker-plugins/websocket/src/main/java/org/apache/qpid/server/transport/websocket/WebSocketProvider.java
+++ b/broker-plugins/websocket/src/main/java/org/apache/qpid/server/transport/websocket/WebSocketProvider.java
@@ -293,11 +293,11 @@
                         iter.next().run();
                     }
 
-                    QpidByteBuffer buffer = QpidByteBuffer.allocateDirect(length);
-                    buffer.put(data, offset, length);
-                    buffer.flip();
-                    _protocolEngine.received(buffer);
-                    buffer.dispose();
+                    for (QpidByteBuffer qpidByteBuffer : QpidByteBuffer.asQpidByteBuffers(data, offset, length))
+                    {
+                        _protocolEngine.received(qpidByteBuffer);
+                        qpidByteBuffer.dispose();
+                    }
 
                     _connectionWrapper.doWrite();
                 }