QPID-7763: [Java Broker] Avoid allocating large non-pooled direct byte buffers in WebSocketProvider, MemoryMessageStore, and message inflation
diff --git a/broker-core/src/main/java/org/apache/qpid/server/bytebuffer/QpidByteBuffer.java b/broker-core/src/main/java/org/apache/qpid/server/bytebuffer/QpidByteBuffer.java
index fe5db24..203b009 100644
--- a/broker-core/src/main/java/org/apache/qpid/server/bytebuffer/QpidByteBuffer.java
+++ b/broker-core/src/main/java/org/apache/qpid/server/bytebuffer/QpidByteBuffer.java
@@ -655,6 +655,24 @@
}
}
+ public static Collection<QpidByteBuffer> asQpidByteBuffers(final byte[] data)
+ {
+ return asQpidByteBuffers(data, 0, data.length);
+ }
+
+ public static Collection<QpidByteBuffer> asQpidByteBuffers(final byte[] data, final int offset, final int length)
+ {
+ try (QpidByteBufferOutputStream outputStream = new QpidByteBufferOutputStream(true, getPooledBufferSize()))
+ {
+ outputStream.write(data, offset, length);
+ return outputStream.fetchAccumulatedBuffers();
+ }
+ catch (IOException e)
+ {
+ throw new RuntimeException("unexpected Error converting array to QpidByteBuffers", e);
+ }
+ }
+
public static SSLEngineResult encryptSSL(SSLEngine engine,
final Collection<QpidByteBuffer> buffers,
QpidByteBuffer dest) throws SSLException
@@ -700,10 +718,7 @@
int read;
while ((read = gzipInputStream.read(buf)) != -1)
{
- QpidByteBuffer output = isDirect ? allocateDirect(read) : allocate(read);
- output.put(buf, 0, read);
- output.flip();
- uncompressedBuffers.add(output);
+ uncompressedBuffers.addAll(asQpidByteBuffers(buf, 0, read));
}
return uncompressedBuffers;
}
diff --git a/broker-core/src/main/java/org/apache/qpid/server/store/StoredMemoryMessage.java b/broker-core/src/main/java/org/apache/qpid/server/store/StoredMemoryMessage.java
index a2a7837..dc54fbc 100755
--- a/broker-core/src/main/java/org/apache/qpid/server/store/StoredMemoryMessage.java
+++ b/broker-core/src/main/java/org/apache/qpid/server/store/StoredMemoryMessage.java
@@ -21,8 +21,12 @@
package org.apache.qpid.server.store;
+import java.util.ArrayList;
import java.util.Collection;
-import java.util.Collections;
+import java.util.Iterator;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Queue;
import org.apache.qpid.server.bytebuffer.QpidByteBuffer;
@@ -30,7 +34,7 @@
{
private final long _messageNumber;
private final int _contentSize;
- private volatile QpidByteBuffer _content;
+ private final Queue<QpidByteBuffer> _content = new LinkedList<>();
private volatile T _metaData;
public StoredMemoryMessage(long messageNumber, T metaData)
@@ -47,41 +51,12 @@
public synchronized void addContent(QpidByteBuffer src)
{
- if(_content == null)
- {
- _content = src.slice();
- _content.position(_content.limit());
- }
- else
- {
- if(_content.remaining() >= src.remaining())
- {
- _content.putCopyOf(src);
- }
- else
- {
- final int contentSize = getContentSize();
- int size = (contentSize < _content.position() + src.remaining())
- ? _content.position() + src.remaining()
- : contentSize;
- QpidByteBuffer oldContent = _content;
- oldContent.flip();
- _content = QpidByteBuffer.allocateDirect(size);
- _content.put(oldContent);
- _content.putCopyOf(src);
- oldContent.dispose();
- }
-
- }
+ _content.add(src.slice());
}
@Override
public synchronized StoredMessage<T> allContentAdded()
{
- if(_content != null)
- {
- _content.flip();
- }
return this;
}
@@ -89,11 +64,44 @@
@Override
public synchronized Collection<QpidByteBuffer> getContent(int offset, int length)
{
- if(_content == null)
+ Collection<QpidByteBuffer> content = new ArrayList<>(_content.size());
+ int pos = 0;
+ for (QpidByteBuffer buf : _content)
{
- return Collections.emptyList();
+ if (length > 0)
+ {
+ int bufRemaining = buf.remaining();
+ if (pos + bufRemaining <= offset)
+ {
+ pos += bufRemaining;
+ }
+ else if (pos >= offset)
+ {
+ buf = buf.duplicate();
+ if (bufRemaining <= length)
+ {
+ length -= bufRemaining;
+ }
+ else
+ {
+ buf.limit(length);
+ length = 0;
+ }
+ content.add(buf);
+ pos += buf.remaining();
+ }
+ else
+ {
+ int offsetInBuf = offset - pos;
+ int limit = length < bufRemaining - offsetInBuf ? length : bufRemaining - offsetInBuf;
+ final QpidByteBuffer bufView = buf.view(offsetInBuf, limit);
+ content.add(bufView);
+ length -= limit;
+ pos += limit + offsetInBuf;
+ }
+ }
}
- return Collections.singleton(_content.view(offset, length));
+ return content;
}
@Override
@@ -113,8 +121,11 @@
_metaData = null;
if (_content != null)
{
- _content.dispose();
- _content = null;
+ for (QpidByteBuffer content : _content)
+ {
+ content.dispose();
+ }
+ _content.clear();
}
}
@@ -134,6 +145,13 @@
public synchronized void reallocate(final long smallestAllowedBufferId)
{
_metaData.reallocate(smallestAllowedBufferId);
- _content = QpidByteBuffer.reallocateIfNecessary(smallestAllowedBufferId, _content);
+ List<QpidByteBuffer> newContent = new ArrayList<>(_content.size());
+ for (Iterator<QpidByteBuffer> iterator = _content.iterator(); iterator.hasNext(); )
+ {
+ final QpidByteBuffer buffer = iterator.next();
+ newContent.add(QpidByteBuffer.reallocateIfNecessary(smallestAllowedBufferId, buffer));
+ iterator.remove();
+ }
+ _content.addAll(newContent);
}
}
diff --git a/broker-plugins/websocket/src/main/java/org/apache/qpid/server/transport/websocket/WebSocketProvider.java b/broker-plugins/websocket/src/main/java/org/apache/qpid/server/transport/websocket/WebSocketProvider.java
index b923f51..d96c1d2 100644
--- a/broker-plugins/websocket/src/main/java/org/apache/qpid/server/transport/websocket/WebSocketProvider.java
+++ b/broker-plugins/websocket/src/main/java/org/apache/qpid/server/transport/websocket/WebSocketProvider.java
@@ -293,11 +293,11 @@
iter.next().run();
}
- QpidByteBuffer buffer = QpidByteBuffer.allocateDirect(length);
- buffer.put(data, offset, length);
- buffer.flip();
- _protocolEngine.received(buffer);
- buffer.dispose();
+ for (QpidByteBuffer qpidByteBuffer : QpidByteBuffer.asQpidByteBuffers(data, offset, length))
+ {
+ _protocolEngine.received(qpidByteBuffer);
+ qpidByteBuffer.dispose();
+ }
_connectionWrapper.doWrite();
}