PROTON-1767 Allow for the transport buffer to use a duplicate
Instead of forcing a read-only buffer as the head or output buffer,
allow the transport to be configured to expose a simple duplicate of the
output buffer for users who know the risks and can benefit from more
direct access to the bytes.
diff --git a/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/TransportImpl.java b/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/TransportImpl.java
index d7fbb10..0f969c8 100644
--- a/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/TransportImpl.java
+++ b/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/TransportImpl.java
@@ -112,13 +112,14 @@
private Open _open;
private SaslImpl _sasl;
private SslImpl _ssl;
- private final Ref<ProtocolTracer> _protocolTracer = new Ref(null);
+ private final Ref<ProtocolTracer> _protocolTracer = new Ref<>(null);
private TransportResult _lastTransportResult = TransportResultFactory.ok();
private boolean _init;
private boolean _processingStarted;
private boolean _emitFlowEventOnSend = true;
+ private boolean _useReadOnlyOutputBuffer = true;
private FrameHandler _frameHandler = this;
private boolean _head_closed = false;
@@ -175,7 +176,7 @@
_init = true;
_frameParser = new FrameParser(_frameHandler , _decoder, _maxFrameSize);
_inputProcessor = _frameParser;
- _outputProcessor = new TransportOutputAdaptor(this, _maxFrameSize);
+ _outputProcessor = new TransportOutputAdaptor(this, _maxFrameSize, isUseReadOnlyOutputBuffer());
}
}
@@ -1746,6 +1747,18 @@
return _emitFlowEventOnSend;
}
+ @Override
+ public void setUseReadOnlyOutputBuffer(boolean value)
+ {
+ this._useReadOnlyOutputBuffer = value;
+ }
+
+ @Override
+ public boolean isUseReadOnlyOutputBuffer()
+ {
+ return _useReadOnlyOutputBuffer;
+ }
+
// From TransportInternal
@Override
public void addTransportLayer(TransportLayer layer)
diff --git a/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/TransportInternal.java b/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/TransportInternal.java
index 73b4d44..f270ef6 100644
--- a/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/TransportInternal.java
+++ b/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/TransportInternal.java
@@ -37,4 +37,9 @@
* @throws IllegalStateException if processing has already started.
*/
void addTransportLayer(TransportLayer layer) throws IllegalStateException;
+
+ void setUseReadOnlyOutputBuffer(boolean value);
+
+ boolean isUseReadOnlyOutputBuffer();
+
}
diff --git a/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/TransportOutputAdaptor.java b/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/TransportOutputAdaptor.java
index 2c43bfe..c427a9a 100644
--- a/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/TransportOutputAdaptor.java
+++ b/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/TransportOutputAdaptor.java
@@ -35,11 +35,13 @@
private ByteBuffer _head = null;
private boolean _output_done = false;
private boolean _head_closed = false;
+ private boolean _readOnlyHead = true;
- TransportOutputAdaptor(TransportOutputWriter transportOutputWriter, int maxFrameSize)
+ TransportOutputAdaptor(TransportOutputWriter transportOutputWriter, int maxFrameSize, boolean readOnlyHead)
{
_transportOutputWriter = transportOutputWriter;
_maxFrameSize = maxFrameSize > 0 ? maxFrameSize : 4*1024;
+ _readOnlyHead = readOnlyHead;
}
@Override
@@ -104,7 +106,11 @@
private void init_buffers() {
_outputBuffer = newWriteableBuffer(_maxFrameSize);
- _head = _outputBuffer.asReadOnlyBuffer();
+ if (_readOnlyHead) {
+ _head = _outputBuffer.asReadOnlyBuffer();
+ } else {
+ _head = _outputBuffer.duplicate();
+ }
_head.limit(0);
}
diff --git a/proton-j/src/test/java/org/apache/qpid/proton/engine/impl/TransportImplTest.java b/proton-j/src/test/java/org/apache/qpid/proton/engine/impl/TransportImplTest.java
index b474802..ddff6b3 100644
--- a/proton-j/src/test/java/org/apache/qpid/proton/engine/impl/TransportImplTest.java
+++ b/proton-j/src/test/java/org/apache/qpid/proton/engine/impl/TransportImplTest.java
@@ -164,6 +164,73 @@
}
@Test
+ public void testOutputBufferIsReadOnly()
+ {
+ doTestTransportBufferReadability(true, false);
+ }
+
+ @Test
+ public void testOutputBufferNotReadOnlyWhenConfigured()
+ {
+ doTestTransportBufferReadability(false, false);
+ }
+
+ @Test
+ public void testHeadIsReadOnly()
+ {
+ doTestTransportBufferReadability(true, true);
+ }
+
+ @Test
+ public void testHeadNotReadOnlyWhenConfigured()
+ {
+ doTestTransportBufferReadability(false, true);
+ }
+
+ private void doTestTransportBufferReadability(boolean readOnly, boolean headOrOutput)
+ {
+ TransportImpl transport = new TransportImpl();
+
+ // Default should be Read-Only
+ if (!readOnly) {
+ transport.setUseReadOnlyOutputBuffer(readOnly);
+ }
+
+ final ByteBuffer outputBuffer;
+ if (headOrOutput) {
+ outputBuffer = transport.head();
+ } else {
+ outputBuffer = transport.getOutputBuffer();
+ }
+
+ assertTrue(outputBuffer.hasRemaining());
+ if (readOnly) {
+ assertTrue(outputBuffer.isReadOnly());
+ } else {
+ assertFalse(outputBuffer.isReadOnly());
+ }
+
+ byte[] outputBytes = new byte[outputBuffer.remaining()];
+ outputBuffer.get(outputBytes);
+
+ transport.outputConsumed();
+
+ final ByteBuffer emptyBuffer;
+ if (headOrOutput) {
+ emptyBuffer = transport.head();
+ } else {
+ emptyBuffer = transport.getOutputBuffer();
+ }
+
+ assertFalse(emptyBuffer.hasRemaining());
+ if (readOnly) {
+ assertTrue(emptyBuffer.isReadOnly());
+ } else {
+ assertFalse(emptyBuffer.isReadOnly());
+ }
+ }
+
+ @Test
public void testTransportInitiallyHandlesFrames()
{
assertTrue(_transport.isHandlingFrames());
diff --git a/proton-j/src/test/java/org/apache/qpid/proton/engine/impl/TransportOutputAdaptorTest.java b/proton-j/src/test/java/org/apache/qpid/proton/engine/impl/TransportOutputAdaptorTest.java
index 19c2f7b..6e26527 100644
--- a/proton-j/src/test/java/org/apache/qpid/proton/engine/impl/TransportOutputAdaptorTest.java
+++ b/proton-j/src/test/java/org/apache/qpid/proton/engine/impl/TransportOutputAdaptorTest.java
@@ -22,6 +22,7 @@
import static org.apache.qpid.proton.engine.impl.TransportTestHelper.assertByteArrayContentEquals;
import static org.apache.qpid.proton.engine.impl.TransportTestHelper.assertByteBufferContentEquals;
import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
@@ -34,7 +35,7 @@
public class TransportOutputAdaptorTest
{
private final CannedTransportOutputWriter _transportOutputWriter = new CannedTransportOutputWriter();
- private final TransportOutput _transportOutput = new TransportOutputAdaptor(_transportOutputWriter, 1024);
+ private final TransportOutput _transportOutput = new TransportOutputAdaptor(_transportOutputWriter, 1024, true);
@Test
public void testThatOutputBufferIsReadOnly()
@@ -43,6 +44,15 @@
}
@Test
+ public void testThatOutputBufferCanBeMadeNotReadOnly()
+ {
+ final TransportOutput _transportOutput =
+ new TransportOutputAdaptor(_transportOutputWriter, 1024, false);
+
+ assertFalse(_transportOutput.head().isReadOnly());
+ }
+
+ @Test
public void testGetOutputBuffer_containsCorrectBytes()
{
byte[] testBytes = "testbytes".getBytes();