PROTON-1948 Refactor FrameWriter with more performant buffer
Refactor the FrameWriter to use a more performant ReadableBuffer wrapper
around a byte array that auto grows as an ecode requires instead of
reallocating a ByteBuffer and copying past contents and then performing
a new encode any time space is exhausted. Removing the layer of buffer
abstractions leads to an increase in overall performance under normal
operations as well.
diff --git a/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/FrameWriter.java b/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/FrameWriter.java
index 13d481c..b76dc5c 100644
--- a/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/FrameWriter.java
+++ b/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/FrameWriter.java
@@ -20,7 +20,6 @@
*/
package org.apache.qpid.proton.engine.impl;
-import java.nio.BufferOverflowException;
import java.nio.ByteBuffer;
import org.apache.qpid.proton.amqp.Binary;
@@ -28,225 +27,153 @@
import org.apache.qpid.proton.amqp.transport.FrameBody;
import org.apache.qpid.proton.codec.EncoderImpl;
import org.apache.qpid.proton.codec.ReadableBuffer;
-import org.apache.qpid.proton.codec.WritableBuffer;
import org.apache.qpid.proton.framing.TransportFrame;
/**
- * FrameWriter
- *
+ * Writes Frames to an internal buffer for later processing by the transport.
*/
-class FrameWriter
-{
+class FrameWriter {
+
+ static final int DEFAULT_FRAME_BUFFER_FULL_MARK = 64 * 1024;
+ static final int FRAME_HEADER_SIZE = 8;
static final byte AMQP_FRAME_TYPE = 0;
- static final byte SASL_FRAME_TYPE = (byte) 1;
+ static final byte SASL_FRAME_TYPE = 1;
- private EncoderImpl _encoder;
- private ByteBuffer _bbuf;
- private WritableBuffer _buffer;
- private int _maxFrameSize;
- private byte _frameType;
- final private Ref<ProtocolTracer> _protocolTracer;
- private TransportImpl _transport;
+ private final TransportImpl transport;
+ private final EncoderImpl encoder;
+ private final FrameWriterBuffer frameBuffer = new FrameWriterBuffer();
- private int _frameStart = 0;
- private int _payloadStart;
- private int _performativeSize;
- private long _framesOutput = 0;
+ // Configuration of this Frame Writer
+ private int maxFrameSize;
+ private final byte frameType;
+ private int frameBufferMaxBytes = DEFAULT_FRAME_BUFFER_FULL_MARK;
- FrameWriter(EncoderImpl encoder, int maxFrameSize, byte frameType,
- Ref<ProtocolTracer> protocolTracer, TransportImpl transport)
- {
- _encoder = encoder;
- _bbuf = ByteBuffer.allocate(1024);
- _buffer = new WritableBuffer.ByteBufferWrapper(_bbuf);
- _encoder.setByteBuffer(_buffer);
- _maxFrameSize = maxFrameSize;
- _frameType = frameType;
- _protocolTracer = protocolTracer;
- _transport = transport;
+ // State of current write operation, reset on start of each new write
+ private int frameStart;
+
+ // Frame Writer metrics
+ private long framesOutput;
+
+ FrameWriter(EncoderImpl encoder, int maxFrameSize, byte frameType, TransportImpl transport) {
+ this.encoder = encoder;
+ this.maxFrameSize = maxFrameSize;
+ this.frameType = frameType;
+ this.transport = transport;
+
+ encoder.setByteBuffer(frameBuffer);
}
- void setMaxFrameSize(int maxFrameSize)
- {
- _maxFrameSize = maxFrameSize;
+ boolean isFull() {
+ return frameBuffer.position() > frameBufferMaxBytes;
}
- private void grow()
- {
- grow(_bbuf.capacity()); // Double current capacity
+ int readBytes(ByteBuffer dst) {
+ return frameBuffer.transferTo(dst);
}
- private void grow(int amount)
- {
- ByteBuffer old = _bbuf;
- _bbuf = ByteBuffer.allocate(old.capacity() + amount);
- _buffer = new WritableBuffer.ByteBufferWrapper(_bbuf);
- old.flip();
- _bbuf.put(old);
- _encoder.setByteBuffer(_buffer);
+ long getFramesOutput() {
+ return framesOutput;
}
- void writeHeader(byte[] header)
- {
- _buffer.put(header, 0, header.length);
+ void setMaxFrameSize(int maxFrameSize) {
+ this.maxFrameSize = maxFrameSize;
}
- private void startFrame()
- {
- _frameStart = _buffer.position();
+ void setFrameWriterMaxBytes(int maxBytes) {
+ this.frameBufferMaxBytes = maxBytes;
}
- private void writePerformative(Object frameBody, ReadableBuffer payload, Runnable onPayloadTooLarge)
- {
- while (_buffer.remaining() < 8) {
- grow();
+ int getFrameWriterMaxBytes() {
+ return frameBufferMaxBytes;
+ }
+
+ void writeHeader(byte[] header) {
+ frameBuffer.put(header, 0, header.length);
+ }
+
+ void writeFrame(Object frameBody) {
+ writeFrame(0, frameBody, null, null);
+ }
+
+ void writeFrame(int channel, Object frameBody, ReadableBuffer payload, Runnable onPayloadTooLarge) {
+ frameStart = frameBuffer.position();
+
+ final int performativeSize = writePerformative(frameBody, payload, onPayloadTooLarge);
+ final int capacity = maxFrameSize > 0 ? maxFrameSize - performativeSize : Integer.MAX_VALUE;
+ final int payloadSize = Math.min(payload == null ? 0 : payload.remaining(), capacity);
+
+ if (transport.isFrameTracingEnabled()) {
+ logFrame(channel, frameBody, payload, payloadSize);
}
- while (true)
- {
- try
- {
- _buffer.position(_frameStart + 8);
- if (frameBody != null) _encoder.writeObject(frameBody);
-
- _payloadStart = _buffer.position();
- _performativeSize = _payloadStart - _frameStart;
-
- if (onPayloadTooLarge == null)
- {
- break;
- }
-
- if (_maxFrameSize > 0 && payload != null && (payload.remaining() + _performativeSize) > _maxFrameSize)
- {
- onPayloadTooLarge.run();
- onPayloadTooLarge = null;
- }
- else
- {
- break;
- }
- }
- catch (BufferOverflowException | IndexOutOfBoundsException e)
- {
- grow();
- }
- }
- }
-
- private void endFrame(int channel)
- {
- int frameSize = _buffer.position() - _frameStart;
- int limit = _buffer.position();
- _buffer.position(_frameStart);
- _buffer.putInt(frameSize);
- _buffer.put((byte) 2);
- _buffer.put(_frameType);
- _buffer.putShort((short) channel);
- _buffer.position(limit);
- }
-
- void writeFrame(int channel, Object frameBody, ReadableBuffer payload,
- Runnable onPayloadTooLarge)
- {
- startFrame();
-
- writePerformative(frameBody, payload, onPayloadTooLarge);
-
- int capacity;
- if (_maxFrameSize > 0) {
- capacity = _maxFrameSize - _performativeSize;
- } else {
- capacity = Integer.MAX_VALUE;
- }
- int payloadSize = Math.min(payload == null ? 0 : payload.remaining(), capacity);
-
- ProtocolTracer tracer = _protocolTracer == null ? null : _protocolTracer.get();
- if (tracer != null || _transport.isTraceFramesEnabled())
- {
- logFrame(tracer, channel, frameBody, payload, payloadSize);
- }
-
- if(payloadSize > 0)
- {
- while (_buffer.remaining() < payloadSize)
- {
- grow(payloadSize - _buffer.remaining());
- }
-
+ if (payloadSize > 0) {
int oldLimit = payload.limit();
payload.limit(payload.position() + payloadSize);
- _buffer.put(payload);
+ frameBuffer.put(payload);
payload.limit(oldLimit);
}
endFrame(channel);
- _framesOutput += 1;
+ framesOutput++;
}
- private void logFrame(ProtocolTracer tracer, int channel, Object frameBody, ReadableBuffer payload, int payloadSize)
- {
- if (_frameType == AMQP_FRAME_TYPE)
- {
+ private int writePerformative(Object frameBody, ReadableBuffer payload, Runnable onPayloadTooLarge) {
+ frameBuffer.position(frameStart + FRAME_HEADER_SIZE);
+
+ if (frameBody != null) {
+ encoder.writeObject(frameBody);
+ }
+
+ int performativeSize = frameBuffer.position() - frameStart;
+
+ if (onPayloadTooLarge != null && maxFrameSize > 0 && payload != null && (payload.remaining() + performativeSize) > maxFrameSize) {
+ // Next iteration will re-encode the frame body again with updates from the <payload-to-large>
+ // handler and then we can move onto the body portion.
+ onPayloadTooLarge.run();
+ performativeSize = writePerformative(frameBody, payload, null);
+ }
+
+ return performativeSize;
+ }
+
+ private void endFrame(int channel) {
+ int frameSize = frameBuffer.position() - frameStart;
+ int originalPosition = frameBuffer.position();
+
+ frameBuffer.position(frameStart);
+ frameBuffer.putInt(frameSize);
+ frameBuffer.put((byte) 2);
+ frameBuffer.put(frameType);
+ frameBuffer.putShort((short) channel);
+ frameBuffer.position(originalPosition);
+ }
+
+ private void logFrame(int channel, Object frameBody, ReadableBuffer payload, int payloadSize) {
+ if (frameType == AMQP_FRAME_TYPE) {
ReadableBuffer originalPayload = null;
- if (payload!=null)
- {
+ if (payload != null) {
originalPayload = payload.slice();
originalPayload.limit(payloadSize);
}
Binary payloadBin = Binary.create(originalPayload);
FrameBody body = null;
- if (frameBody == null)
- {
+ if (frameBody == null) {
body = EmptyFrame.INSTANCE;
- }
- else
- {
+ } else {
body = (FrameBody) frameBody;
}
TransportFrame frame = new TransportFrame(channel, body, payloadBin);
- _transport.log(TransportImpl.OUTGOING, frame);
+ transport.log(TransportImpl.OUTGOING, frame);
- if (tracer != null)
- {
+ ProtocolTracer tracer = transport.getProtocolTracer();
+ if (tracer != null) {
tracer.sentFrame(frame);
}
}
}
-
- void writeFrame(Object frameBody)
- {
- writeFrame(0, frameBody, null, null);
- }
-
- boolean isFull() {
- // XXX: this should probably be tunable
- return _bbuf.position() > 64*1024;
- }
-
- int readBytes(ByteBuffer dst)
- {
- ByteBuffer src = _bbuf.duplicate();
- src.flip();
-
- int size = Math.min(src.remaining(), dst.remaining());
- int limit = src.limit();
- src.limit(size);
- dst.put(src);
- src.limit(limit);
- _bbuf.rewind();
- _bbuf.put(src);
-
- return size;
- }
-
- long getFramesOutput()
- {
- return _framesOutput;
- }
}
diff --git a/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/FrameWriterBuffer.java b/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/FrameWriterBuffer.java
new file mode 100644
index 0000000..e028c19
--- /dev/null
+++ b/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/FrameWriterBuffer.java
@@ -0,0 +1,232 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.qpid.proton.engine.impl;
+
+import java.nio.ByteBuffer;
+
+import org.apache.qpid.proton.codec.ReadableBuffer;
+import org.apache.qpid.proton.codec.WritableBuffer;
+
+public class FrameWriterBuffer implements WritableBuffer {
+
+ public final static int DEFAULT_CAPACITY = 1024;
+
+ byte array[];
+ int position;
+
+ /**
+ * Creates a new WritableBuffer with default capacity.
+ */
+ public FrameWriterBuffer() {
+ this(DEFAULT_CAPACITY);
+ }
+
+ /**
+ * Create a new WritableBuffer with the given capacity.
+ *
+ * @param capacity
+ * the inital capacity to allocate for this buffer.
+ */
+ public FrameWriterBuffer(int capacity) {
+ this.array = new byte[capacity];
+ }
+
+ public byte[] array() {
+ return array;
+ }
+
+ public int arrayOffset() {
+ return 0;
+ }
+
+ @Override
+ public void put(byte b) {
+ ensureRemaining(Byte.BYTES);
+ array[position++] = b;
+ }
+
+ @Override
+ public void putShort(short value) {
+ ensureRemaining(Short.BYTES);
+ array[position++] = (byte)(value >>> 8);
+ array[position++] = (byte)(value >>> 0);
+ }
+
+ @Override
+ public void putInt(int value) {
+ ensureRemaining(Integer.BYTES);
+ array[position++] = (byte)(value >>> 24);
+ array[position++] = (byte)(value >>> 16);
+ array[position++] = (byte)(value >>> 8);
+ array[position++] = (byte)(value >>> 0);
+ }
+
+ @Override
+ public void putLong(long value) {
+ ensureRemaining(Long.BYTES);
+ array[position++] = (byte)(value >>> 56);
+ array[position++] = (byte)(value >>> 48);
+ array[position++] = (byte)(value >>> 40);
+ array[position++] = (byte)(value >>> 32);
+ array[position++] = (byte)(value >>> 24);
+ array[position++] = (byte)(value >>> 16);
+ array[position++] = (byte)(value >>> 8);
+ array[position++] = (byte)(value >>> 0);
+ }
+
+ @Override
+ public void putFloat(float value) {
+ putInt(Float.floatToRawIntBits(value));
+ }
+
+ @Override
+ public void putDouble(double value) {
+ putLong(Double.doubleToRawLongBits(value));
+ }
+
+ @Override
+ public void put(byte[] src, int offset, int length) {
+ if (length == 0) {
+ return;
+ }
+
+ ensureRemaining(length);
+ System.arraycopy(src, offset, array, position, length);
+ position += length;
+ }
+
+ @Override
+ public void put(ByteBuffer payload) {
+ final int toCopy = payload.remaining();
+ ensureRemaining(toCopy);
+
+ if (payload.hasArray()) {
+ System.arraycopy(payload.array(), payload.arrayOffset() + payload.position(), array, position, toCopy);
+ payload.position(payload.position() + toCopy);
+ } else {
+ payload.get(array, position, toCopy);
+ }
+
+ position += toCopy;
+ }
+
+ @Override
+ public void put(ReadableBuffer payload) {
+ final int toCopy = payload.remaining();
+ ensureRemaining(toCopy);
+
+ if (payload.hasArray()) {
+ System.arraycopy(payload.array(), payload.arrayOffset() + payload.position(), array, position, toCopy);
+ payload.position(payload.position() + toCopy);
+ } else {
+ payload.get(array, position, toCopy);
+ }
+
+ position += toCopy;
+ }
+
+ @Override
+ public boolean hasRemaining() {
+ return position < Integer.MAX_VALUE;
+ }
+
+ @Override
+ public int remaining() {
+ return Integer.MAX_VALUE - position;
+ }
+
+ /**
+ * Ensures the the buffer has at least the requiredRemaining space specified.
+ * <p>
+ * The internal buffer will be doubled if the requested capacity is less than that
+ * amount or the buffer will be expanded to the full new requiredRemaining value.
+ *
+ * @param requiredRemaining
+ * the minimum remaining bytes needed to meet the next write operation.
+ */
+ @Override
+ public void ensureRemaining(int requiredRemaining) {
+ if (requiredRemaining > array.length - position) {
+ byte newBuffer[] = new byte[Math.max(array.length << 1, requiredRemaining + position)];
+ System.arraycopy(array, 0, newBuffer, 0, array.length);
+ array = newBuffer;
+ }
+ }
+
+ @Override
+ public int position() {
+ return position;
+ }
+
+ @Override
+ public void position(int position) {
+ if (position < 0) {
+ throw new IllegalArgumentException("Requested new buffer position cannot be negative");
+ }
+
+ if (position > array.length) {
+ ensureRemaining(position - array.length);
+ }
+
+ this.position = position;
+ }
+
+ @Override
+ public int limit() {
+ return Integer.MAX_VALUE;
+ }
+
+ /**
+ * Copy bytes from this buffer into the target buffer and compacts this buffer.
+ * <p>
+ * Copy either all bytes written into this buffer (start to current position) or
+ * as many as will fit if the target capacity is less that the bytes written. Bytes
+ * not read from this buffer are moved to the front of the buffer and the position is
+ * reset to the end of the copied region.
+ *
+ * @param target
+ * The array to move bytes to from those written into this buffer.
+ *
+ * @return the number of bytes transfered to the target buffer.
+ */
+ public int transferTo(ByteBuffer target) {
+ int size = Math.min(position, target.remaining());
+
+ if (size == 0) {
+ return 0;
+ }
+
+ if (target.hasArray()) {
+ System.arraycopy(array, 0, target.array(), target.arrayOffset() + target.position(), size);
+ target.position(target.position() + size);
+ } else {
+ target.put(array, 0, size);
+ }
+
+ // Compact any remaining data to the front of the array so that new writes can reuse
+ // space previously allocated and not extend the array if possible.
+ if (size != position) {
+ int remainder = position - size;
+ System.arraycopy(array, size, array, 0, remainder);
+ position = remainder; // ensure we are at end of unread chunk
+ } else {
+ position = 0; // reset to empty state.
+ }
+
+ return size;
+ }
+}
diff --git a/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/SaslImpl.java b/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/SaslImpl.java
index cee798f..ec7bffc 100644
--- a/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/SaslImpl.java
+++ b/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/SaslImpl.java
@@ -99,7 +99,7 @@
AMQPDefinedTypes.registerAllTypes(_decoder,_encoder);
_frameParser = new SaslFrameParser(this, _decoder, maxFrameSize);
- _frameWriter = new FrameWriter(_encoder, maxFrameSize, FrameWriter.SASL_FRAME_TYPE, null, _transport);
+ _frameWriter = new FrameWriter(_encoder, maxFrameSize, FrameWriter.SASL_FRAME_TYPE, _transport);
}
void fail() {
@@ -498,10 +498,12 @@
_allowSkip = allowSkip;
}
+ @Override
public TransportWrapper wrap(final TransportInput input, final TransportOutput output)
{
return new SaslSniffer(new SwitchingSaslTransportWrapper(input, output),
new PlainTransportWrapper(output, input)) {
+ @Override
protected boolean isDeterminationMade() {
if (_role == Role.SERVER && _allowSkip) {
return super.isDeterminationMade();
diff --git a/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/TransportImpl.java b/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/TransportImpl.java
index 3ec6ef4..8947fa3 100644
--- a/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/TransportImpl.java
+++ b/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/TransportImpl.java
@@ -167,7 +167,6 @@
_maxFrameSize = maxFrameSize;
_frameWriter = new FrameWriter(_encoder, _remoteMaxFrameSize,
FrameWriter.AMQP_FRAME_TYPE,
- _protocolTracer,
this);
}
@@ -1719,6 +1718,11 @@
}
}
+ boolean isFrameTracingEnabled()
+ {
+ return (_levels & TRACE_FRM) != 0 || _protocolTracer.get() != null;
+ }
+
boolean isTraceFramesEnabled()
{
return (_levels & TRACE_FRM) != 0;
diff --git a/proton-j/src/test/java/org/apache/qpid/proton/engine/impl/FrameWriterBufferTest.java b/proton-j/src/test/java/org/apache/qpid/proton/engine/impl/FrameWriterBufferTest.java
new file mode 100644
index 0000000..2f08c71
--- /dev/null
+++ b/proton-j/src/test/java/org/apache/qpid/proton/engine/impl/FrameWriterBufferTest.java
@@ -0,0 +1,723 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.qpid.proton.engine.impl;
+
+import static org.junit.Assert.assertArrayEquals;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
+import java.nio.ByteBuffer;
+
+import org.apache.qpid.proton.codec.ReadableBuffer;
+import org.junit.Test;
+
+/**
+ * Test behavior of the FrameWriterBuffer implementation
+ */
+public class FrameWriterBufferTest {
+
+ //----- Test newly create buffer behaviors -------------------------------//
+
+ @Test
+ public void testDefaultCtor() {
+ FrameWriterBuffer buffer = new FrameWriterBuffer();
+
+ assertEquals(FrameWriterBuffer.DEFAULT_CAPACITY, buffer.array().length);
+ assertEquals(0, buffer.arrayOffset());
+ assertEquals(Integer.MAX_VALUE, buffer.remaining());
+ assertEquals(0, buffer.position());
+ assertEquals(Integer.MAX_VALUE, buffer.limit());
+ assertTrue(buffer.hasRemaining());
+ }
+
+ @Test
+ public void testCreateBufferWithGivenSize() {
+ FrameWriterBuffer buffer = new FrameWriterBuffer(255);
+
+ assertEquals(255, buffer.array().length);
+ assertEquals(0, buffer.arrayOffset());
+ assertEquals(Integer.MAX_VALUE, buffer.remaining());
+ assertEquals(0, buffer.position());
+ assertEquals(Integer.MAX_VALUE, buffer.limit());
+ assertTrue(buffer.hasRemaining());
+ }
+
+ //----- Test hasRemaining ------------------------------------------------//
+
+ @Test
+ public void testHasRemaining() {
+ FrameWriterBuffer buffer = new FrameWriterBuffer(10);
+
+ for (int i = 0; i < 42; ++i) {
+ assertTrue(buffer.hasRemaining());
+ buffer.put((byte) 127);
+ assertTrue(buffer.hasRemaining());
+ }
+ }
+
+ //----- Test remaining ---------------------------------------------------//
+
+ @Test
+ public void testRemaining() {
+ FrameWriterBuffer buffer = new FrameWriterBuffer(10);
+
+ assertEquals(Integer.MAX_VALUE, buffer.remaining());
+ buffer.put((byte) 127);
+ assertEquals(Integer.MAX_VALUE - 1, buffer.remaining());
+ buffer.put((byte) 128);
+ assertEquals(Integer.MAX_VALUE - 2, buffer.remaining());
+ }
+
+ @Test
+ public void testRemainingResetsWhenDataConsumed() {
+ FrameWriterBuffer buffer = new FrameWriterBuffer(10);
+
+ assertEquals(Integer.MAX_VALUE, buffer.remaining());
+ buffer.put((byte) 127);
+ assertEquals(Integer.MAX_VALUE - 1, buffer.remaining());
+ buffer.put((byte) 128);
+ assertEquals(Integer.MAX_VALUE - 2, buffer.remaining());
+
+ ByteBuffer target = ByteBuffer.allocate(1);
+ buffer.transferTo(target);
+ assertEquals(Integer.MAX_VALUE - 1, buffer.remaining());
+ target.clear();
+ buffer.transferTo(target);
+ assertEquals(Integer.MAX_VALUE, buffer.remaining());
+ }
+
+ //----- Test position handling -------------------------------------------//
+
+ @Test
+ public void testPositionWithOneArrayAppended() {
+ FrameWriterBuffer buffer = new FrameWriterBuffer();
+
+ buffer.put(new byte[] { 0, 1, 2, 3, 4, 5, 6, 7, 8, 9 }, 0, 10);
+
+ assertEquals(Integer.MAX_VALUE, buffer.limit());
+ assertEquals(10, buffer.position());
+
+ buffer.position(5);
+ assertEquals(5, buffer.position());
+
+ buffer.position(6);
+ assertEquals(6, buffer.position());
+
+ buffer.position(10);
+ assertEquals(10, buffer.position());
+
+ try {
+ buffer.position(11);
+ } catch (IllegalArgumentException e) {
+ fail("Should not throw a IllegalArgumentException");
+ }
+ }
+
+ @Test
+ public void testPositionMovedBeyondCurrentArraySize() {
+ FrameWriterBuffer buffer = new FrameWriterBuffer(10);
+
+ byte[] data = new byte[] { 0, 1, 2, 3, 4, 5, 6, 7, 8, 9 };
+ buffer.put(data, 0, data.length);
+
+ assertEquals(data.length, buffer.array().length);
+
+ try {
+ buffer.position(15);
+ } catch (IllegalArgumentException e) {
+ fail("Should not throw a IllegalArgumentException");
+ }
+
+ // Size should have doubled.
+ assertEquals(20, buffer.array().length);
+
+ // Check written bytes are same
+ for (int i = 0; i < data.length; ++i) {
+ assertEquals(data[i], buffer.array()[i]);
+ }
+ }
+
+ @Test
+ public void testPositionMovedBeyondDoubleTheCurrentCapacity() {
+ FrameWriterBuffer buffer = new FrameWriterBuffer(10);
+
+ byte[] data = new byte[] { 0, 1, 2, 3, 4, 5, 6, 7, 8, 9 };
+ buffer.put(data, 0, data.length);
+
+ assertEquals(data.length, buffer.array().length);
+
+ try {
+ buffer.position(30);
+ } catch (IllegalArgumentException e) {
+ fail("Should not throw a IllegalArgumentException");
+ }
+
+ // Size should have expanded to meet requested size.
+ assertEquals(30, buffer.array().length);
+
+ // Check written bytes are same
+ for (int i = 0; i < data.length; ++i) {
+ assertEquals(data[i], buffer.array()[i]);
+ }
+ }
+
+ @Test
+ public void testPositionEnforcesPreconditions() {
+ FrameWriterBuffer buffer = new FrameWriterBuffer();
+
+ try {
+ buffer.position(-1);
+ fail("Should throw a IllegalArgumentException");
+ } catch (IllegalArgumentException e) {}
+ }
+
+ //----- Test put array ---------------------------------------------------//
+
+ @Test
+ public void testPutByteArray() {
+ FrameWriterBuffer buffer = new FrameWriterBuffer(10);
+
+ byte[] data = new byte[] { 127, (byte) 128 };
+
+ assertEquals(0, buffer.position());
+ buffer.put(data, 0, data.length);
+ assertEquals(2, buffer.position());
+ buffer.put(data, 0, data.length);
+ assertEquals(4, buffer.position());
+
+ assertEquals(Integer.MAX_VALUE - 4, buffer.remaining());
+ }
+
+ @Test
+ public void testPutByteArrayWithZeroLengthDoesNotExpandBuffer() {
+ FrameWriterBuffer buffer = new FrameWriterBuffer(0);
+
+ byte[] data = new byte[] { 127, (byte) 128 };
+
+ assertEquals(0, buffer.array().length);
+ buffer.put(data, 0, 0);
+ assertEquals(0, buffer.array().length);
+ }
+
+ @Test
+ public void testPutByteArrayPastExistingCapacity() {
+ FrameWriterBuffer buffer = new FrameWriterBuffer(6);
+
+ byte[] data = new byte[] { 127, (byte) 128 };
+
+ buffer.put(data, 0, data.length);
+ buffer.put(data, 0, data.length);
+ buffer.put(data, 0, data.length);
+
+ assertEquals(6, buffer.array().length);
+ assertEquals(6, buffer.position());
+
+ // Should prompt a resize of the array
+ buffer.put(data, 0, data.length);
+
+ assertEquals(12, buffer.array().length);
+ assertEquals(8, buffer.position());
+ }
+
+ @Test
+ public void testPutByteArrayLargerThanDefaultExpansionSize() {
+ FrameWriterBuffer buffer = new FrameWriterBuffer(6);
+
+ byte[] data = new byte[] {0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19, 20, 21, 22, 23};
+
+ buffer.put(data, 0, data.length);
+
+ // Should use the size needed instead of double current size
+ assertEquals(data.length, buffer.array().length);
+ assertEquals(data.length, buffer.position());
+ }
+
+ //----- Test put ByteBuffer ----------------------------------------------//
+
+ @Test
+ public void testPutByteBuffer() {
+ FrameWriterBuffer buffer = new FrameWriterBuffer(10);
+
+ byte[] data = new byte[] { 127, (byte) 128 };
+ ByteBuffer byteBuffer = ByteBuffer.wrap(data);
+
+ assertEquals(0, buffer.position());
+ buffer.put(byteBuffer.duplicate());
+ assertEquals(2, buffer.position());
+ buffer.put(byteBuffer.duplicate());
+ assertEquals(4, buffer.position());
+
+ assertEquals(Integer.MAX_VALUE - 4, buffer.remaining());
+ }
+
+ @Test
+ public void testPutByteBufferWithoutArrayAccess() {
+ FrameWriterBuffer buffer = new FrameWriterBuffer(10);
+
+ byte[] data = new byte[] { 127, (byte) 128 };
+ ByteBuffer byteBuffer = ByteBuffer.wrap(data).asReadOnlyBuffer();
+
+ assertEquals(0, buffer.position());
+ buffer.put(byteBuffer.duplicate());
+ assertEquals(2, buffer.position());
+ buffer.put(byteBuffer.duplicate());
+ assertEquals(4, buffer.position());
+
+ assertEquals(Integer.MAX_VALUE - 4, buffer.remaining());
+ }
+
+ @Test
+ public void testPutByteBufferPastExistingCapacity() {
+ FrameWriterBuffer buffer = new FrameWriterBuffer(6);
+
+ byte[] data = new byte[] { 127, (byte) 128 };
+ ByteBuffer byteBuffer = ByteBuffer.wrap(data);
+
+ buffer.put(byteBuffer.duplicate());
+ buffer.put(byteBuffer.duplicate());
+ buffer.put(byteBuffer.duplicate());
+
+ assertEquals(6, buffer.array().length);
+ assertEquals(6, buffer.position());
+
+ // Should prompt a resize of the array
+ buffer.put(byteBuffer);
+
+ assertEquals(12, buffer.array().length);
+ assertEquals(8, buffer.position());
+ }
+
+ @Test
+ public void testPutByteBufferLargerThanDefaultExpansionSize() {
+ FrameWriterBuffer buffer = new FrameWriterBuffer(6);
+
+ byte[] data = new byte[] {0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19, 20, 21, 22, 23};
+ ByteBuffer byteBuffer = ByteBuffer.wrap(data);
+
+ buffer.put(byteBuffer);
+
+ // Should use the size needed instead of double current size
+ assertEquals(data.length, buffer.array().length);
+ assertEquals(data.length, buffer.position());
+ }
+
+ //----- Test put ReadableBuffer --------------------------------------------//
+
+ @Test
+ public void testPutReadableBuffer() {
+ FrameWriterBuffer buffer = new FrameWriterBuffer(10);
+
+ byte[] data = new byte[] { 127, (byte) 128 };
+ ReadableBuffer readable = ReadableBuffer.ByteBufferReader.wrap(data);
+
+ assertEquals(0, buffer.position());
+ buffer.put(readable.duplicate());
+ assertEquals(2, buffer.position());
+ buffer.put(readable.duplicate());
+ assertEquals(4, buffer.position());
+
+ assertEquals(Integer.MAX_VALUE - 4, buffer.remaining());
+ }
+
+ @Test
+ public void testPutReadableBufferWithoutArrayAccess() {
+ FrameWriterBuffer buffer = new FrameWriterBuffer(10);
+
+ byte[] data = new byte[] { 127, (byte) 128 };
+ ByteBuffer byteBuffer = ByteBuffer.wrap(data);
+ ReadableBuffer readable = ReadableBuffer.ByteBufferReader.wrap(byteBuffer.asReadOnlyBuffer());
+
+ assertEquals(0, buffer.position());
+ buffer.put(readable.duplicate());
+ assertEquals(2, buffer.position());
+ buffer.put(readable.duplicate());
+ assertEquals(4, buffer.position());
+
+ assertEquals(Integer.MAX_VALUE - 4, buffer.remaining());
+ }
+
+ @Test
+ public void testPutReadableBufferPastExistingCapacity() {
+ FrameWriterBuffer buffer = new FrameWriterBuffer(6);
+
+ byte[] data = new byte[] { 127, (byte) 128 };
+ ReadableBuffer readable = ReadableBuffer.ByteBufferReader.wrap(data);
+
+ buffer.put(readable.duplicate());
+ buffer.put(readable.duplicate());
+ buffer.put(readable.duplicate());
+
+ assertEquals(6, buffer.array().length);
+ assertEquals(6, buffer.position());
+
+ // Should prompt a resize of the array
+ buffer.put(readable.duplicate());
+
+ assertEquals(12, buffer.array().length);
+ assertEquals(8, buffer.position());
+ }
+
+ @Test
+ public void testPutReadableBufferLargerThanDefaultExpansionSize() {
+ FrameWriterBuffer buffer = new FrameWriterBuffer(6);
+
+ byte[] data = new byte[] {0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19, 20, 21, 22, 23};
+ ReadableBuffer readable = ReadableBuffer.ByteBufferReader.wrap(data);
+
+ buffer.put(readable);
+
+ // Should use the size needed instead of double current size
+ assertEquals(data.length, buffer.array().length);
+ assertEquals(data.length, buffer.position());
+ }
+
+ //----- Test put byte ----------------------------------------------------//
+
+ @Test
+ public void testPutByte() {
+ FrameWriterBuffer buffer = new FrameWriterBuffer(10);
+
+ assertEquals(0, buffer.position());
+ buffer.put((byte) 127);
+ assertEquals(1, buffer.position());
+ buffer.put((byte) 128);
+ assertEquals(2, buffer.position());
+
+ assertEquals(Integer.MAX_VALUE - 2, buffer.remaining());
+ }
+
+ @Test
+ public void testPutBytePastExistingCapacity() {
+ FrameWriterBuffer buffer = new FrameWriterBuffer(5);
+
+ buffer.put((byte) 127);
+ buffer.put((byte) 128);
+ buffer.put((byte) 127);
+ buffer.put((byte) 128);
+ buffer.put((byte) 128);
+
+ assertEquals(5, buffer.array().length);
+ assertEquals(5, buffer.position());
+
+ // Should prompt a resize of the array
+ buffer.put((byte) 127);
+
+ assertEquals(10, buffer.array().length);
+ assertEquals(6, buffer.position());
+ }
+
+ //----- Test put short ---------------------------------------------------//
+
+ @Test
+ public void testPutShort() {
+ FrameWriterBuffer buffer = new FrameWriterBuffer(10);
+
+ assertEquals(0, buffer.position());
+ buffer.putShort((short) 127);
+ assertEquals(2, buffer.position());
+ buffer.putShort((short) 128);
+ assertEquals(4, buffer.position());
+
+ assertEquals(Integer.MAX_VALUE - 4, buffer.remaining());
+ }
+
+ @Test
+ public void testPutShortPastExistingCapacity() {
+ FrameWriterBuffer buffer = new FrameWriterBuffer(6);
+
+ buffer.putShort((short) 128);
+ buffer.putShort((short) 127);
+ buffer.putShort((short) 128);
+
+ assertEquals(6, buffer.array().length);
+ assertEquals(6, buffer.position());
+
+ // Should prompt a resize of the array
+ buffer.putShort((short) 127);
+
+ assertEquals(12, buffer.array().length);
+ assertEquals(8, buffer.position());
+ }
+
+ //----- Test put int -----------------------------------------------------//
+
+ @Test
+ public void testPutInt() {
+ FrameWriterBuffer buffer = new FrameWriterBuffer(10);
+
+ assertEquals(0, buffer.position());
+ buffer.putInt(127);
+ assertEquals(4, buffer.position());
+ buffer.putInt(128);
+ assertEquals(8, buffer.position());
+
+ assertEquals(Integer.MAX_VALUE - 8, buffer.remaining());
+ }
+
+ @Test
+ public void testPutIntPastExistingCapacity() {
+ FrameWriterBuffer buffer = new FrameWriterBuffer(12);
+
+ buffer.putInt(127);
+ buffer.putInt(128);
+ buffer.putInt(127);
+
+ assertEquals(12, buffer.array().length);
+ assertEquals(12, buffer.position());
+
+ // Should prompt a resize of the array
+ buffer.putInt(128);
+
+ assertEquals(24, buffer.array().length);
+ assertEquals(16, buffer.position());
+ }
+
+ //----- Test put long ----------------------------------------------------//
+
+ @Test
+ public void testPutLong() {
+ FrameWriterBuffer buffer = new FrameWriterBuffer(20);
+
+ assertEquals(0, buffer.position());
+ buffer.putLong(127);
+ assertEquals(8, buffer.position());
+ buffer.putLong(128);
+ assertEquals(16, buffer.position());
+
+ assertEquals(Integer.MAX_VALUE - 16, buffer.remaining());
+ }
+
+ @Test
+ public void testPutLongPastExistingCapacity() {
+ FrameWriterBuffer buffer = new FrameWriterBuffer(24);
+
+ buffer.putLong(127);
+ buffer.putLong(128);
+ buffer.putLong(127);
+
+ assertEquals(24, buffer.array().length);
+ assertEquals(24, buffer.position());
+
+ // Should prompt a resize of the array
+ buffer.putLong(128);
+
+ assertEquals(48, buffer.array().length);
+ assertEquals(32, buffer.position());
+ }
+
+ //----- Test put float ---------------------------------------------------//
+
+ @Test
+ public void testPutFloat() {
+ FrameWriterBuffer buffer = new FrameWriterBuffer(8);
+
+ assertEquals(0, buffer.position());
+ buffer.putFloat(127);
+ assertEquals(4, buffer.position());
+ buffer.putFloat(128);
+ assertEquals(8, buffer.position());
+
+ assertEquals(Integer.MAX_VALUE - 8, buffer.remaining());
+ }
+
+ @Test
+ public void testPutFloatPastExistingCapacity() {
+ FrameWriterBuffer buffer = new FrameWriterBuffer(12);
+
+ buffer.putFloat(127);
+ buffer.putFloat(128);
+ buffer.putFloat(127);
+
+ assertEquals(12, buffer.array().length);
+ assertEquals(12, buffer.position());
+
+ // Should prompt a resize of the array
+ buffer.putFloat(128);
+
+ assertEquals(24, buffer.array().length);
+ assertEquals(16, buffer.position());
+ }
+
+ //----- Test put double --------------------------------------------------//
+
+ @Test
+ public void testPutDouble() {
+ FrameWriterBuffer buffer = new FrameWriterBuffer(20);
+
+ assertEquals(0, buffer.position());
+ buffer.putDouble(127);
+ assertEquals(8, buffer.position());
+ buffer.putDouble(128);
+ assertEquals(16, buffer.position());
+
+ assertEquals(Integer.MAX_VALUE - 16, buffer.remaining());
+ }
+
+ @Test
+ public void testPutDoublePastExistingCapacity() {
+ FrameWriterBuffer buffer = new FrameWriterBuffer(24);
+
+ buffer.putDouble(127);
+ buffer.putDouble(128);
+ buffer.putDouble(127);
+
+ assertEquals(24, buffer.array().length);
+ assertEquals(24, buffer.position());
+
+ // Should prompt a resize of the array
+ buffer.putDouble(128);
+
+ assertEquals(48, buffer.array().length);
+ assertEquals(32, buffer.position());
+ }
+
+ //----- Test transferTo --------------------------------------------------//
+
+ @Test
+ public void testTrasnferHandlesZeroSizedReadRequest() {
+ FrameWriterBuffer buffer = new FrameWriterBuffer(6);
+
+ byte[] data = new byte[] {0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19, 20, 21, 22, 23};
+ ByteBuffer target = ByteBuffer.allocate(0);
+
+ buffer.put(data, 0, data.length);
+
+ // Should use the size needed instead of double current size
+ assertEquals(data.length, buffer.array().length);
+ assertEquals(data.length, buffer.position());
+
+ buffer.transferTo(target);
+
+ assertEquals(data.length, buffer.position());
+
+ // Should now be nothing to transfer
+ target.clear();
+
+ buffer.transferTo(target);
+
+ assertEquals(data.length, buffer.position());
+ assertEquals(0, target.position());
+ }
+
+ @Test
+ public void testTrasnferFullBufferToTarget() {
+ FrameWriterBuffer buffer = new FrameWriterBuffer(6);
+
+ byte[] data = new byte[] {0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19, 20, 21, 22, 23};
+ ByteBuffer target = ByteBuffer.allocate(data.length);
+
+ buffer.put(data, 0, data.length);
+
+ // Should use the size needed instead of double current size
+ assertEquals(data.length, buffer.array().length);
+ assertEquals(data.length, buffer.position());
+
+ buffer.transferTo(target);
+
+ assertEquals(0, buffer.position());
+
+ assertArrayEquals(data, target.array());
+ }
+
+ @Test
+ public void testTrasnferFullBufferToTargetWithoutArrayAccess() {
+ FrameWriterBuffer buffer = new FrameWriterBuffer(6);
+
+ byte[] data = new byte[] {0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19, 20, 21, 22, 23};
+ ByteBuffer target = ByteBuffer.allocateDirect(data.length);
+
+ buffer.put(data, 0, data.length);
+
+ // Should use the size needed instead of double current size
+ assertEquals(data.length, buffer.array().length);
+ assertEquals(data.length, buffer.position());
+
+ buffer.transferTo(target);
+
+ assertEquals(0, buffer.position());
+
+ // Check contents in target are correct which requires getting the data out first.
+ target.flip();
+ byte[] targetPayload = new byte[target.remaining()];
+ target.get(targetPayload);
+ assertArrayEquals(data, targetPayload);
+ }
+
+ @Test
+ public void testTrasnferPartialBufferToTarget() {
+ FrameWriterBuffer buffer = new FrameWriterBuffer(6);
+
+ byte[] data = new byte[] {0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19, 20, 21, 22, 23};
+ ByteBuffer target = ByteBuffer.allocate(data.length);
+
+ buffer.put(data, 0, data.length);
+
+ // Should use the size needed instead of double current size
+ assertEquals(data.length, buffer.array().length);
+ assertEquals(data.length, buffer.position());
+
+ // First Half
+ target.limit(data.length / 2);
+ buffer.transferTo(target);
+
+ // The buffer should have compacted
+ assertEquals(data.length / 2, buffer.position());
+
+ // Second half
+ target.limit(target.capacity());
+ target.position(data.length / 2);
+ buffer.transferTo(target);
+
+ // Buffer contents should have been consumed
+ assertEquals(0, buffer.arrayOffset());
+ assertEquals(0, buffer.position());
+
+ assertArrayEquals(data, target.array());
+ }
+
+ @Test
+ public void testTrasnferPartialBufferThenRequestMoreThanIsRemaining() {
+ FrameWriterBuffer buffer = new FrameWriterBuffer(10);
+
+ byte[] data = new byte[] {0, 1, 2, 3, 4, 5, 6, 7, 8, 9};
+
+ buffer.put(data, 0, data.length);
+
+ // Should use the size needed instead of double current size
+ assertEquals(data.length, buffer.array().length);
+ assertEquals(data.length, buffer.position());
+
+ // First Half transfered which should lead to compaction
+ ByteBuffer target = ByteBuffer.allocate(5);
+ buffer.transferTo(target);
+ assertEquals(data.length / 2, buffer.position());
+
+ // Now the buffer should compact
+ buffer.ensureRemaining(10);
+ assertEquals(data.length / 2, buffer.position());
+
+ // Second half
+ target = ByteBuffer.allocate(5);
+ buffer.transferTo(target);
+
+ // Buffer contents should have been consumed
+ assertEquals(0, buffer.position());
+
+ assertArrayEquals(new byte[] {5, 6, 7, 8, 9}, target.array());
+ }
+}
\ No newline at end of file
diff --git a/proton-j/src/test/java/org/apache/qpid/proton/engine/impl/FrameWriterTest.java b/proton-j/src/test/java/org/apache/qpid/proton/engine/impl/FrameWriterTest.java
new file mode 100644
index 0000000..636c7ac
--- /dev/null
+++ b/proton-j/src/test/java/org/apache/qpid/proton/engine/impl/FrameWriterTest.java
@@ -0,0 +1,357 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.qpid.proton.engine.impl;
+
+import static org.junit.Assert.assertArrayEquals;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNotEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertTrue;
+
+import java.nio.ByteBuffer;
+import java.util.Random;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import org.apache.qpid.proton.amqp.Binary;
+import org.apache.qpid.proton.amqp.UnsignedInteger;
+import org.apache.qpid.proton.amqp.transport.ReceiverSettleMode;
+import org.apache.qpid.proton.amqp.transport.Transfer;
+import org.apache.qpid.proton.codec.AMQPDefinedTypes;
+import org.apache.qpid.proton.codec.DecoderImpl;
+import org.apache.qpid.proton.codec.EncoderImpl;
+import org.apache.qpid.proton.codec.ReadableBuffer;
+import org.apache.qpid.proton.framing.TransportFrame;
+import org.junit.Before;
+import org.junit.Test;
+import org.mockito.ArgumentCaptor;
+import org.mockito.Mockito;
+
+/**
+ * Tests for the FrameWriter implementation
+ */
+public class FrameWriterTest {
+
+ private final TransportImpl transport = new TransportImpl();
+
+ private final DecoderImpl decoder = new DecoderImpl();
+ private final EncoderImpl encoder = new EncoderImpl(decoder);
+
+ private ReadableBuffer bigPayload;
+ private ByteBuffer buffer;
+
+ @Before
+ public void setUp() {
+ AMQPDefinedTypes.registerAllTypes(decoder, encoder);
+
+ buffer = ByteBuffer.allocate(16384);
+
+ encoder.setByteBuffer(buffer);
+ decoder.setByteBuffer(buffer);
+
+ Random random = new Random(System.currentTimeMillis());
+ bigPayload = ReadableBuffer.ByteBufferReader.allocate(4096);
+ for (int i = 0; i < bigPayload.remaining(); ++i) {
+ bigPayload.array()[i] = (byte) random.nextInt(127);
+ }
+ }
+
+ @Test
+ public void testFrameWrittenToBuffer() {
+ Transfer transfer = createTransfer();
+ FrameWriter framer = new FrameWriter(encoder, Integer.MAX_VALUE, (byte) 0, transport);
+
+ framer.writeFrame(transfer);
+ assertNotEquals(0, framer.readBytes(buffer));
+
+ buffer.flip();
+ buffer.position(8); // Remove the Frame Header
+
+ Object decoded = decoder.readObject();
+ assertNotNull(decoded);
+ assertTrue(decoded instanceof Transfer);
+ Transfer result = (Transfer) decoded;
+
+ assertEquals(UnsignedInteger.ONE, result.getHandle());
+ assertEquals(UnsignedInteger.ZERO, result.getMessageFormat());
+ assertEquals(UnsignedInteger.valueOf(127), result.getDeliveryId());
+ }
+
+ @Test
+ public void testFrameWrittenToBufferWithLargePayloadAndMaxFrameSizeInvokesHandlerOnce() {
+ Transfer transfer = createTransfer();
+ FrameWriter framer = new FrameWriter(encoder, 2048, (byte) 0, transport);
+
+ final AtomicInteger toLargeCallbackCount = new AtomicInteger();
+
+ framer.writeFrame(0, transfer, bigPayload, new Runnable() {
+
+ @Override
+ public void run() {
+ toLargeCallbackCount.incrementAndGet();
+ }
+ });
+
+ // Should have read some data during this encode.
+ assertNotEquals(0, framer.readBytes(buffer));
+
+ buffer.flip();
+ byte[] header = new byte[FrameWriter.FRAME_HEADER_SIZE];
+ buffer.get(header);
+
+ ReadableBuffer headerReader = ReadableBuffer.ByteBufferReader.wrap(header);
+ int size = headerReader.getInt();
+ assertEquals(2048, size);
+
+ // Should have only asked once for the handler to respond to the to large paylod.
+ assertEquals(1, toLargeCallbackCount.get());
+ }
+
+ @Test
+ public void testFrameWrittenToBufferWithLargePayloadAndNoMaxFrameSize() {
+ Transfer transfer = createTransfer();
+ FrameWriter framer = new FrameWriter(encoder, Integer.MAX_VALUE, (byte) 0, transport);
+
+ framer.writeFrame(0, transfer, bigPayload, new PartialTransferHandler(transfer));
+ assertNotEquals(0, framer.readBytes(buffer));
+
+ buffer.flip();
+
+ byte[] header = new byte[FrameWriter.FRAME_HEADER_SIZE];
+ buffer.get(header);
+
+ ReadableBuffer headerReader = ReadableBuffer.ByteBufferReader.wrap(header);
+ int size = headerReader.getInt();
+ assertTrue(size > 4096);
+
+ Object decoded = decoder.readObject();
+ assertNotNull(decoded);
+ assertTrue(decoded instanceof Transfer);
+
+ // Check for our payload
+ assertTrue(buffer.hasRemaining());
+ assertEquals(4096, buffer.remaining());
+
+ byte[] payload = new byte[4096];
+ buffer.get(payload);
+
+ assertArrayEquals(bigPayload.array(), payload);
+ }
+
+ @Test
+ public void testFrameWrittenToBufferWithLargePayloadAndMaxFrameSize() {
+ Transfer transfer = createTransfer();
+ FrameWriter framer = new FrameWriter(encoder, 2048, (byte) 0, transport);
+
+ int payloadSize = 0;
+
+ // Should take three write and three reads to get it all and the Transfer should
+ // indicate that there is more data to come after the first two writes
+ for (int i = 1; i <= 3; ++i) {
+ transfer.setMore(false);
+ framer.writeFrame(0, transfer, bigPayload, new PartialTransferHandler(transfer));
+
+ ByteBuffer intermediate = ByteBuffer.allocate(4096);
+ int bytesRead = framer.readBytes(intermediate);
+ intermediate.flip();
+
+ // Read the Frame Header
+ byte[] header = new byte[FrameWriter.FRAME_HEADER_SIZE];
+ intermediate.get(header);
+ ReadableBuffer headerReader = ReadableBuffer.ByteBufferReader.wrap(header);
+ int frameSize = headerReader.getInt();
+
+ if (i < 3) {
+ assertTrue(transfer.getMore());
+ assertEquals(2048, bytesRead);
+ assertEquals(2048, frameSize);
+ } else {
+ assertFalse(transfer.getMore());
+ assertTrue(bytesRead < 2048);
+ assertTrue(frameSize < 2048);
+ }
+
+ decoder.setBuffer(ReadableBuffer.ByteBufferReader.wrap(intermediate));
+ Object decoded = decoder.readObject();
+ assertNotNull(decoded);
+ assertTrue(decoded instanceof Transfer);
+
+ // Trim the Frame header and Transfer encoding size and store off actual payload
+ payloadSize += bytesRead - intermediate.position();
+
+ // Accumulate the data minus the frame headers
+ buffer.put(intermediate);
+ }
+
+ assertEquals(3, framer.getFramesOutput());
+
+ buffer.rewind();
+ buffer.limit(payloadSize);
+
+ // Check for our payload
+ assertTrue(buffer.hasRemaining());
+ assertEquals(4096, buffer.remaining());
+
+ byte[] payload = new byte[4096];
+ buffer.get(payload);
+
+ assertArrayEquals(bigPayload.array(), payload);
+ }
+
+ @Test
+ public void testWriteEmptyFrame() {
+ FrameWriter framer = new FrameWriter(encoder, Integer.MAX_VALUE, (byte) 1, transport);
+
+ framer.writeFrame(16, null, null, null);
+
+ ByteBuffer headerBuffer = ByteBuffer.allocate(16);
+ framer.readBytes(headerBuffer);
+
+ assertEquals(FrameWriter.FRAME_HEADER_SIZE, headerBuffer.position());
+
+ headerBuffer.flip();
+
+ // Size, offset, Frame type, channel
+ assertEquals(FrameWriter.FRAME_HEADER_SIZE, headerBuffer.getInt());
+ assertEquals(2, headerBuffer.get());
+ assertEquals(1, headerBuffer.get());
+ assertEquals(16, headerBuffer.getShort());
+ }
+
+ @Test
+ public void testFrameWriterReportsFullBasedOnConfiguration() {
+ Transfer transfer = createTransfer();
+ FrameWriter framer = new FrameWriter(encoder, Integer.MAX_VALUE, (byte) 0, transport);
+
+ framer.writeFrame(0, transfer, bigPayload, new PartialTransferHandler(transfer));
+
+ assertEquals(FrameWriter.DEFAULT_FRAME_BUFFER_FULL_MARK ,framer.getFrameWriterMaxBytes());
+ assertFalse(framer.isFull());
+ framer.setFrameWriterMaxBytes(2048);
+ assertTrue(framer.isFull());
+ assertEquals(2048 ,framer.getFrameWriterMaxBytes());
+ framer.setFrameWriterMaxBytes(16384);
+ assertFalse(framer.isFull());
+ assertEquals(16384, framer.getFrameWriterMaxBytes());
+ }
+
+ @Test
+ public void testFrameWriterLogsFramesToTracer() {
+ FrameWriterProtocolTracer tracer = new FrameWriterProtocolTracer();
+ transport.setProtocolTracer(tracer);
+
+ Transfer transfer = createTransfer();
+ FrameWriter framer = new FrameWriter(encoder, Integer.MAX_VALUE, (byte) 0, transport);
+
+ framer.writeFrame(16, transfer, bigPayload, new PartialTransferHandler(transfer));
+
+ assertNotNull(tracer.getSentFrame());
+ TransportFrame sentFrame = tracer.getSentFrame();
+
+ assertEquals(16, sentFrame.getChannel());
+ assertTrue(sentFrame.getBody() instanceof Transfer);
+
+ Binary payload = sentFrame.getPayload();
+
+ assertEquals(bigPayload.capacity(), payload.getLength());
+ }
+
+ @Test
+ public void testFrameWriterLogsFramesToSystem() {
+ transport.trace(2);
+ TransportImpl spy = Mockito.spy(transport);
+
+ Transfer transfer = createTransfer();
+ FrameWriter framer = new FrameWriter(encoder, Integer.MAX_VALUE, (byte) 0, spy);
+
+ framer.writeFrame(16, transfer, bigPayload, new PartialTransferHandler(transfer));
+
+ ArgumentCaptor<TransportFrame> frameCatcher = ArgumentCaptor.forClass(TransportFrame.class);
+ Mockito.verify(spy).log(Mockito.anyString(), frameCatcher.capture());
+
+ assertEquals(16, frameCatcher.getValue().getChannel());
+ assertTrue(frameCatcher.getValue().getBody() instanceof Transfer);
+
+ Binary payload = frameCatcher.getValue().getPayload();
+
+ assertEquals(bigPayload.capacity(), payload.getLength());
+ }
+
+ @Test
+ public void testWriteHeader() {
+ FrameWriter framer = new FrameWriter(encoder, Integer.MAX_VALUE, (byte) 1, transport);
+ byte[] header = new byte[] {0, 0, 0, 9, 3, 4, 0, 12};
+ framer.writeHeader(header);
+
+ ByteBuffer headerBuffer = ByteBuffer.allocate(16);
+ framer.readBytes(headerBuffer);
+
+ assertEquals(FrameWriter.FRAME_HEADER_SIZE, headerBuffer.position());
+
+ headerBuffer.flip();
+
+ // Size, offset, Frame type, channel
+ assertEquals(FrameWriter.FRAME_HEADER_SIZE + 1, headerBuffer.getInt());
+ assertEquals(3, headerBuffer.get());
+ assertEquals(4, headerBuffer.get());
+ assertEquals(12, headerBuffer.getShort());
+ }
+
+ private Transfer createTransfer() {
+ Transfer transfer = new Transfer();
+ transfer.setHandle(UnsignedInteger.ONE);
+ transfer.setDeliveryTag(new Binary(new byte[] {0, 1}));
+ transfer.setMessageFormat(UnsignedInteger.ZERO);
+ transfer.setDeliveryId(UnsignedInteger.valueOf(127));
+ transfer.setAborted(false);
+ transfer.setBatchable(false);
+ transfer.setRcvSettleMode(ReceiverSettleMode.SECOND);
+
+ return transfer;
+ }
+
+ private static final class FrameWriterProtocolTracer implements ProtocolTracer {
+
+ private TransportFrame sentFrame;
+
+ public TransportFrame getSentFrame() {
+ return sentFrame;
+ }
+
+ @Override
+ public void receivedFrame(TransportFrame transportFrame) {
+ }
+
+ @Override
+ public void sentFrame(TransportFrame transportFrame) {
+ sentFrame = transportFrame;
+ }
+ }
+
+ private static final class PartialTransferHandler implements Runnable {
+ private Transfer transfer;
+
+ public PartialTransferHandler(Transfer transfer) {
+ this.transfer = transfer;
+ }
+
+ @Override
+ public void run() {
+ transfer.setMore(true);
+ }
+ }
+}
diff --git a/proton-j/src/test/java/org/apache/qpid/proton/systemtests/DeliveryTest.java b/proton-j/src/test/java/org/apache/qpid/proton/systemtests/DeliveryTest.java
index 69db7c1..a923353 100644
--- a/proton-j/src/test/java/org/apache/qpid/proton/systemtests/DeliveryTest.java
+++ b/proton-j/src/test/java/org/apache/qpid/proton/systemtests/DeliveryTest.java
@@ -29,6 +29,7 @@
import java.nio.charset.StandardCharsets;
import java.util.Arrays;
+import java.util.Random;
import java.util.logging.Logger;
import org.apache.qpid.proton.Proton;
@@ -70,14 +71,12 @@
getServer().connection = Proton.connection();
getServer().transport.bind(getServer().connection);
-
LOGGER.fine(bold("======== About to open connections"));
getClient().connection.open();
getServer().connection.open();
doOutputInputCycle();
-
LOGGER.fine(bold("======== About to open sessions"));
getClient().session = getClient().connection.session();
getClient().session.open();
@@ -93,7 +92,6 @@
pumpServerToClient();
assertEndpointState(getClient().session, ACTIVE, ACTIVE);
-
LOGGER.fine(bold("======== About to create reciever"));
getClient().source = new Source();
@@ -116,7 +114,6 @@
pumpClientToServer();
-
LOGGER.fine(bold("======== About to set up implicitly created sender"));
getServer().sender = (Sender) getServer().connection.linkHead(of(UNINITIALIZED), of(ACTIVE));
@@ -141,7 +138,6 @@
pumpClientToServer();
-
LOGGER.fine(bold("======== About to create messages and send to the client"));
sendMessageToClient("delivery1", "Msg1", null); // Don't set it, so it should be defaulted
@@ -165,6 +161,130 @@
assertEquals("Unexpected message format", UnsignedInteger.MAX_VALUE.intValue(), clientDelivery4.getMessageFormat());
}
+ @Test
+ public void testSendReceiveLargeMessage() throws Exception
+ {
+ doTestSendReceiveLargeMessage(-1, -1);
+ }
+
+ @Test
+ public void testSendReceiveLargeMessageWithFrameSizeLimits() throws Exception
+ {
+ doTestSendReceiveLargeMessage(128 * 1024, 128 * 1024);
+ }
+
+ private void doTestSendReceiveLargeMessage(int maxFrameSize, int maxOutboundFrameSize) throws Exception
+ {
+ LOGGER.fine(bold("======== About to create transports"));
+
+ getClient().transport = Proton.transport();
+ if (maxFrameSize > 0) {
+ getClient().transport.setMaxFrameSize(maxFrameSize);
+ }
+ if (maxOutboundFrameSize > 0) {
+ getClient().transport.setOutboundFrameSizeLimit(maxOutboundFrameSize);
+ }
+ ProtocolTracerEnabler.setProtocolTracer(getClient().transport, TestLoggingHelper.CLIENT_PREFIX);
+
+ getServer().transport = Proton.transport();
+ if (maxFrameSize > 0) {
+ getServer().transport.setMaxFrameSize(maxFrameSize);
+ }
+ if (maxOutboundFrameSize > 0) {
+ getServer().transport.setOutboundFrameSizeLimit(maxOutboundFrameSize);
+ }
+ ProtocolTracerEnabler.setProtocolTracer(getServer().transport, " " + TestLoggingHelper.SERVER_PREFIX);
+
+ doOutputInputCycle();
+
+ getClient().connection = Proton.connection();
+ getClient().transport.bind(getClient().connection);
+
+ getServer().connection = Proton.connection();
+ getServer().transport.bind(getServer().connection);
+
+ LOGGER.fine(bold("======== About to open connections"));
+ getClient().connection.open();
+ getServer().connection.open();
+
+ doOutputInputCycle();
+
+ LOGGER.fine(bold("======== About to open sessions"));
+ getClient().session = getClient().connection.session();
+ getClient().session.open();
+
+ pumpClientToServer();
+
+ getServer().session = getServer().connection.sessionHead(of(UNINITIALIZED), of(ACTIVE));
+ assertEndpointState(getServer().session, UNINITIALIZED, ACTIVE);
+
+ getServer().session.open();
+ assertEndpointState(getServer().session, ACTIVE, ACTIVE);
+
+ pumpServerToClient();
+ assertEndpointState(getClient().session, ACTIVE, ACTIVE);
+
+ LOGGER.fine(bold("======== About to create reciever"));
+
+ getClient().source = new Source();
+ getClient().source.setAddress(_sourceAddress);
+
+ getClient().target = new Target();
+ getClient().target.setAddress(null);
+
+ getClient().receiver = getClient().session.receiver("link1");
+ getClient().receiver.setTarget(getClient().target);
+ getClient().receiver.setSource(getClient().source);
+
+ getClient().receiver.setReceiverSettleMode(ReceiverSettleMode.FIRST);
+ getClient().receiver.setSenderSettleMode(SenderSettleMode.UNSETTLED);
+
+ assertEndpointState(getClient().receiver, UNINITIALIZED, UNINITIALIZED);
+
+ getClient().receiver.open();
+ assertEndpointState(getClient().receiver, ACTIVE, UNINITIALIZED);
+
+ pumpClientToServer();
+
+ LOGGER.fine(bold("======== About to set up implicitly created sender"));
+
+ getServer().sender = (Sender) getServer().connection.linkHead(of(UNINITIALIZED), of(ACTIVE));
+
+ getServer().sender.setReceiverSettleMode(getServer().sender.getRemoteReceiverSettleMode());
+ getServer().sender.setSenderSettleMode(getServer().sender.getRemoteSenderSettleMode());
+
+ org.apache.qpid.proton.amqp.transport.Source serverRemoteSource = getServer().sender.getRemoteSource();
+ getServer().sender.setSource(serverRemoteSource);
+
+ assertEndpointState(getServer().sender, UNINITIALIZED, ACTIVE);
+ getServer().sender.open();
+
+ assertEndpointState(getServer().sender, ACTIVE, ACTIVE);
+
+ pumpServerToClient();
+
+ assertEndpointState(getClient().receiver, ACTIVE, ACTIVE);
+
+ getClient().receiver.flow(1);
+
+ pumpClientToServer();
+
+ String messageContent = createMessageContent(256 * 1024);
+
+ LOGGER.fine(bold("======== About to create messages and send to the client"));
+
+ sendMessageToClient("delivery1", messageContent, 0); // Explicitly set it to the default
+
+ pumpAllServerPendingToClient();
+
+ LOGGER.fine(bold("======== About to process the messages on the client"));
+
+ Delivery clientDelivery = receiveMessageFromServer("delivery1", messageContent);
+
+ // Verify the message format is as expected
+ assertEquals("Unexpected message format", 0, clientDelivery.getMessageFormat());
+ }
+
private Delivery receiveMessageFromServer(String deliveryTag, String messageContent)
{
Delivery delivery = getClient().connection.getWorkHead();
@@ -179,6 +299,7 @@
assertFalse(delivery.isPartial());
assertTrue(delivery.isReadable());
+ final int BUFFER_SIZE = messageContent.length() * 4;
byte[] received = new byte[BUFFER_SIZE];
int len = getClient().receiver.recv(received, 0, BUFFER_SIZE);
@@ -196,6 +317,17 @@
return delivery;
}
+ private static String createMessageContent(int length) {
+ Random rand = new Random(System.currentTimeMillis());
+
+ byte[] payload = new byte[length];
+ for (int i = 0; i < length; i++) {
+ payload[i] = (byte) (64 + 1 + rand.nextInt(9));
+ }
+
+ return new String(payload, StandardCharsets.UTF_8);
+ }
+
private Delivery sendMessageToClient(String deliveryTag, String messageContent, Integer messageFormat)
{
byte[] tag = deliveryTag.getBytes(StandardCharsets.UTF_8);
@@ -203,10 +335,11 @@
Message m = Proton.message();
m.setBody(new AmqpValue(messageContent));
+ final int BUFFER_SIZE = messageContent.length() * 4;
byte[] encoded = new byte[BUFFER_SIZE];
int len = m.encode(encoded, 0, BUFFER_SIZE);
- assertTrue("given array was too small", len < BUFFER_SIZE);
+ assertTrue("given array was too small", len <= BUFFER_SIZE);
Delivery serverDelivery = getServer().sender.delivery(tag);
diff --git a/proton-j/src/test/java/org/apache/qpid/proton/systemtests/EngineTestBase.java b/proton-j/src/test/java/org/apache/qpid/proton/systemtests/EngineTestBase.java
index 6868496..a17b9e0 100644
--- a/proton-j/src/test/java/org/apache/qpid/proton/systemtests/EngineTestBase.java
+++ b/proton-j/src/test/java/org/apache/qpid/proton/systemtests/EngineTestBase.java
@@ -82,6 +82,31 @@
getServer().transport.outputConsumed();
}
+ protected void pumpAllServerPendingToClient()
+ {
+ while (getServer().transport.pending() != 0)
+ {
+ ByteBuffer serverBuffer = getServer().transport.getOutputBuffer();
+
+ if (shouldLogPumpedBytes())
+ {
+ getTestLoggingHelper().prettyPrint(" <<<" + TestLoggingHelper.SERVER_PREFIX + " ", serverBuffer);
+ }
+ assertTrue("Server expected to produce some output", serverBuffer.hasRemaining());
+
+ ByteBuffer clientBuffer = getClient().transport.getInputBuffer();
+
+ clientBuffer.put(serverBuffer);
+
+ assertEquals("Client expected to consume all server's output", 0, serverBuffer.remaining());
+
+ getClient().transport.processInput().checkIsOk();
+ getServer().transport.outputConsumed();
+ }
+
+ assertEquals("Client expected to consume all server's pending work", 0, getServer().transport.pending());
+ }
+
protected void pumpClientToServer()
{
ByteBuffer clientBuffer = getClient().transport.getOutputBuffer();
diff --git a/tests/performance-jmh/src/main/java/org/apache/qpid/proton/engine/impl/FrameWriterBenchmark.java b/tests/performance-jmh/src/main/java/org/apache/qpid/proton/engine/impl/FrameWriterBenchmark.java
index e91f945..69920fa 100644
--- a/tests/performance-jmh/src/main/java/org/apache/qpid/proton/engine/impl/FrameWriterBenchmark.java
+++ b/tests/performance-jmh/src/main/java/org/apache/qpid/proton/engine/impl/FrameWriterBenchmark.java
@@ -27,8 +27,6 @@
import org.apache.qpid.proton.codec.DecoderImpl;
import org.apache.qpid.proton.codec.EncoderImpl;
import org.apache.qpid.proton.codec.ReadableBuffer;
-import org.apache.qpid.proton.engine.impl.FrameWriter;
-import org.apache.qpid.proton.engine.impl.TransportImpl;
import org.openjdk.jmh.annotations.Benchmark;
import org.openjdk.jmh.annotations.BenchmarkMode;
import org.openjdk.jmh.annotations.Measurement;
@@ -90,7 +88,7 @@
AMQPDefinedTypes.registerAllTypes(decoder, encoder);
transport = (TransportImpl) Proton.transport();
- frameWriter = new FrameWriter(encoder, 16 * 1024, (byte) 0, null, transport);
+ frameWriter = new FrameWriter(encoder, 16 * 1024, (byte) 0, transport);
transfer = new Transfer();
transfer.setDeliveryId(UnsignedInteger.ONE);