PROTON-1948 Refactor FrameWriter with more performant buffer

Refactor the FrameWriter to use a more performant ReadableBuffer wrapper
around a byte array that auto grows as an ecode requires instead of
reallocating a ByteBuffer and copying past contents and then performing
a new encode any time space is exhausted.  Removing the layer of buffer
abstractions leads to an increase in overall performance under normal
operations as well.
diff --git a/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/FrameWriter.java b/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/FrameWriter.java
index 13d481c..b76dc5c 100644
--- a/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/FrameWriter.java
+++ b/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/FrameWriter.java
@@ -20,7 +20,6 @@
  */
 package org.apache.qpid.proton.engine.impl;
 
-import java.nio.BufferOverflowException;
 import java.nio.ByteBuffer;
 
 import org.apache.qpid.proton.amqp.Binary;
@@ -28,225 +27,153 @@
 import org.apache.qpid.proton.amqp.transport.FrameBody;
 import org.apache.qpid.proton.codec.EncoderImpl;
 import org.apache.qpid.proton.codec.ReadableBuffer;
-import org.apache.qpid.proton.codec.WritableBuffer;
 import org.apache.qpid.proton.framing.TransportFrame;
 
 /**
- * FrameWriter
- *
+ * Writes Frames to an internal buffer for later processing by the transport.
  */
-class FrameWriter
-{
+class FrameWriter {
+
+    static final int DEFAULT_FRAME_BUFFER_FULL_MARK = 64 * 1024;
+    static final int FRAME_HEADER_SIZE = 8;
 
     static final byte AMQP_FRAME_TYPE = 0;
-    static final byte SASL_FRAME_TYPE = (byte) 1;
+    static final byte SASL_FRAME_TYPE = 1;
 
-    private EncoderImpl _encoder;
-    private ByteBuffer _bbuf;
-    private WritableBuffer _buffer;
-    private int _maxFrameSize;
-    private byte _frameType;
-    final private Ref<ProtocolTracer> _protocolTracer;
-    private TransportImpl _transport;
+    private final TransportImpl transport;
+    private final EncoderImpl encoder;
+    private final FrameWriterBuffer frameBuffer = new FrameWriterBuffer();
 
-    private int _frameStart = 0;
-    private int _payloadStart;
-    private int _performativeSize;
-    private long _framesOutput = 0;
+    // Configuration of this Frame Writer
+    private int maxFrameSize;
+    private final byte frameType;
+    private int frameBufferMaxBytes = DEFAULT_FRAME_BUFFER_FULL_MARK;
 
-    FrameWriter(EncoderImpl encoder, int maxFrameSize, byte frameType,
-                Ref<ProtocolTracer> protocolTracer, TransportImpl transport)
-    {
-        _encoder = encoder;
-        _bbuf = ByteBuffer.allocate(1024);
-        _buffer = new WritableBuffer.ByteBufferWrapper(_bbuf);
-        _encoder.setByteBuffer(_buffer);
-        _maxFrameSize = maxFrameSize;
-        _frameType = frameType;
-        _protocolTracer = protocolTracer;
-        _transport = transport;
+    // State of current write operation, reset on start of each new write
+    private int frameStart;
+
+    // Frame Writer metrics
+    private long framesOutput;
+
+    FrameWriter(EncoderImpl encoder, int maxFrameSize, byte frameType, TransportImpl transport) {
+        this.encoder = encoder;
+        this.maxFrameSize = maxFrameSize;
+        this.frameType = frameType;
+        this.transport = transport;
+
+        encoder.setByteBuffer(frameBuffer);
     }
 
-    void setMaxFrameSize(int maxFrameSize)
-    {
-        _maxFrameSize = maxFrameSize;
+    boolean isFull() {
+        return frameBuffer.position() > frameBufferMaxBytes;
     }
 
-    private void grow()
-    {
-        grow(_bbuf.capacity());  // Double current capacity
+    int readBytes(ByteBuffer dst) {
+        return frameBuffer.transferTo(dst);
     }
 
-    private void grow(int amount)
-    {
-        ByteBuffer old = _bbuf;
-        _bbuf = ByteBuffer.allocate(old.capacity() + amount);
-        _buffer = new WritableBuffer.ByteBufferWrapper(_bbuf);
-        old.flip();
-        _bbuf.put(old);
-        _encoder.setByteBuffer(_buffer);
+    long getFramesOutput() {
+        return framesOutput;
     }
 
-    void writeHeader(byte[] header)
-    {
-        _buffer.put(header, 0, header.length);
+    void setMaxFrameSize(int maxFrameSize) {
+        this.maxFrameSize = maxFrameSize;
     }
 
-    private void startFrame()
-    {
-        _frameStart = _buffer.position();
+    void setFrameWriterMaxBytes(int maxBytes) {
+        this.frameBufferMaxBytes = maxBytes;
     }
 
-    private void writePerformative(Object frameBody, ReadableBuffer payload, Runnable onPayloadTooLarge)
-    {
-        while (_buffer.remaining() < 8) {
-            grow();
+    int getFrameWriterMaxBytes() {
+        return frameBufferMaxBytes;
+    }
+
+    void writeHeader(byte[] header) {
+        frameBuffer.put(header, 0, header.length);
+    }
+
+    void writeFrame(Object frameBody) {
+        writeFrame(0, frameBody, null, null);
+    }
+
+    void writeFrame(int channel, Object frameBody, ReadableBuffer payload, Runnable onPayloadTooLarge) {
+        frameStart = frameBuffer.position();
+
+        final int performativeSize = writePerformative(frameBody, payload, onPayloadTooLarge);
+        final int capacity = maxFrameSize > 0 ? maxFrameSize - performativeSize : Integer.MAX_VALUE;
+        final int payloadSize = Math.min(payload == null ? 0 : payload.remaining(), capacity);
+
+        if (transport.isFrameTracingEnabled()) {
+            logFrame(channel, frameBody, payload, payloadSize);
         }
 
-        while (true)
-        {
-            try
-            {
-                _buffer.position(_frameStart + 8);
-                if (frameBody != null) _encoder.writeObject(frameBody);
-
-                _payloadStart = _buffer.position();
-                _performativeSize = _payloadStart - _frameStart;
-
-                if (onPayloadTooLarge == null)
-                {
-                    break;
-                }
-
-                if (_maxFrameSize > 0 && payload != null && (payload.remaining() + _performativeSize) > _maxFrameSize)
-                {
-                    onPayloadTooLarge.run();
-                    onPayloadTooLarge = null;
-                }
-                else
-                {
-                    break;
-                }
-            }
-            catch (BufferOverflowException | IndexOutOfBoundsException e)
-            {
-                grow();
-            }
-        }
-    }
-
-    private void endFrame(int channel)
-    {
-        int frameSize = _buffer.position() - _frameStart;
-        int limit = _buffer.position();
-        _buffer.position(_frameStart);
-        _buffer.putInt(frameSize);
-        _buffer.put((byte) 2);
-        _buffer.put(_frameType);
-        _buffer.putShort((short) channel);
-        _buffer.position(limit);
-    }
-
-    void writeFrame(int channel, Object frameBody, ReadableBuffer payload,
-                    Runnable onPayloadTooLarge)
-    {
-        startFrame();
-
-        writePerformative(frameBody, payload, onPayloadTooLarge);
-
-        int capacity;
-        if (_maxFrameSize > 0) {
-            capacity = _maxFrameSize - _performativeSize;
-        } else {
-            capacity = Integer.MAX_VALUE;
-        }
-        int payloadSize = Math.min(payload == null ? 0 : payload.remaining(), capacity);
-
-        ProtocolTracer tracer = _protocolTracer == null ? null : _protocolTracer.get();
-        if (tracer != null || _transport.isTraceFramesEnabled())
-        {
-            logFrame(tracer, channel, frameBody, payload, payloadSize);
-        }
-
-        if(payloadSize > 0)
-        {
-            while (_buffer.remaining() < payloadSize)
-            {
-                grow(payloadSize - _buffer.remaining());
-            }
-
+        if (payloadSize > 0) {
             int oldLimit = payload.limit();
             payload.limit(payload.position() + payloadSize);
-            _buffer.put(payload);
+            frameBuffer.put(payload);
             payload.limit(oldLimit);
         }
 
         endFrame(channel);
 
-        _framesOutput += 1;
+        framesOutput++;
     }
 
-    private void logFrame(ProtocolTracer tracer, int channel, Object frameBody, ReadableBuffer payload, int payloadSize)
-    {
-        if (_frameType == AMQP_FRAME_TYPE)
-        {
+    private int writePerformative(Object frameBody, ReadableBuffer payload, Runnable onPayloadTooLarge) {
+        frameBuffer.position(frameStart + FRAME_HEADER_SIZE);
+
+        if (frameBody != null) {
+            encoder.writeObject(frameBody);
+        }
+
+        int performativeSize = frameBuffer.position() - frameStart;
+
+        if (onPayloadTooLarge != null && maxFrameSize > 0 && payload != null && (payload.remaining() + performativeSize) > maxFrameSize) {
+            // Next iteration will re-encode the frame body again with updates from the <payload-to-large>
+            // handler and then we can move onto the body portion.
+            onPayloadTooLarge.run();
+            performativeSize = writePerformative(frameBody, payload, null);
+        }
+
+        return performativeSize;
+    }
+
+    private void endFrame(int channel) {
+        int frameSize = frameBuffer.position() - frameStart;
+        int originalPosition = frameBuffer.position();
+
+        frameBuffer.position(frameStart);
+        frameBuffer.putInt(frameSize);
+        frameBuffer.put((byte) 2);
+        frameBuffer.put(frameType);
+        frameBuffer.putShort((short) channel);
+        frameBuffer.position(originalPosition);
+    }
+
+    private void logFrame(int channel, Object frameBody, ReadableBuffer payload, int payloadSize) {
+        if (frameType == AMQP_FRAME_TYPE) {
             ReadableBuffer originalPayload = null;
-            if (payload!=null)
-            {
+            if (payload != null) {
                 originalPayload = payload.slice();
                 originalPayload.limit(payloadSize);
             }
 
             Binary payloadBin = Binary.create(originalPayload);
             FrameBody body = null;
-            if (frameBody == null)
-            {
+            if (frameBody == null) {
                 body = EmptyFrame.INSTANCE;
-            }
-            else
-            {
+            } else {
                 body = (FrameBody) frameBody;
             }
 
             TransportFrame frame = new TransportFrame(channel, body, payloadBin);
 
-            _transport.log(TransportImpl.OUTGOING, frame);
+            transport.log(TransportImpl.OUTGOING, frame);
 
-            if (tracer != null)
-            {
+            ProtocolTracer tracer = transport.getProtocolTracer();
+            if (tracer != null) {
                 tracer.sentFrame(frame);
             }
         }
     }
-
-    void writeFrame(Object frameBody)
-    {
-        writeFrame(0, frameBody, null, null);
-    }
-
-    boolean isFull() {
-        // XXX: this should probably be tunable
-        return _bbuf.position() > 64*1024;
-    }
-
-    int readBytes(ByteBuffer dst)
-    {
-        ByteBuffer src = _bbuf.duplicate();
-        src.flip();
-
-        int size = Math.min(src.remaining(), dst.remaining());
-        int limit = src.limit();
-        src.limit(size);
-        dst.put(src);
-        src.limit(limit);
-        _bbuf.rewind();
-        _bbuf.put(src);
-
-        return size;
-    }
-
-    long getFramesOutput()
-    {
-        return _framesOutput;
-    }
 }
diff --git a/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/FrameWriterBuffer.java b/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/FrameWriterBuffer.java
new file mode 100644
index 0000000..e028c19
--- /dev/null
+++ b/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/FrameWriterBuffer.java
@@ -0,0 +1,232 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.qpid.proton.engine.impl;
+
+import java.nio.ByteBuffer;
+
+import org.apache.qpid.proton.codec.ReadableBuffer;
+import org.apache.qpid.proton.codec.WritableBuffer;
+
+public class FrameWriterBuffer implements WritableBuffer {
+
+    public final static int DEFAULT_CAPACITY = 1024;
+
+    byte array[];
+    int position;
+
+   /**
+    * Creates a new WritableBuffer with default capacity.
+    */
+   public FrameWriterBuffer() {
+       this(DEFAULT_CAPACITY);
+   }
+
+    /**
+     * Create a new WritableBuffer with the given capacity.
+     *
+     * @param capacity
+     *      the inital capacity to allocate for this buffer.
+     */
+    public FrameWriterBuffer(int capacity) {
+        this.array = new byte[capacity];
+    }
+
+    public byte[] array() {
+        return array;
+    }
+
+    public int arrayOffset() {
+        return 0;
+    }
+
+    @Override
+    public void put(byte b) {
+        ensureRemaining(Byte.BYTES);
+        array[position++] = b;
+    }
+
+    @Override
+    public void putShort(short value) {
+        ensureRemaining(Short.BYTES);
+        array[position++] = (byte)(value >>> 8);
+        array[position++] = (byte)(value >>> 0);
+    }
+
+    @Override
+    public void putInt(int value) {
+        ensureRemaining(Integer.BYTES);
+        array[position++] = (byte)(value >>> 24);
+        array[position++] = (byte)(value >>> 16);
+        array[position++] = (byte)(value >>> 8);
+        array[position++] = (byte)(value >>> 0);
+    }
+
+    @Override
+    public void putLong(long value) {
+        ensureRemaining(Long.BYTES);
+        array[position++] = (byte)(value >>> 56);
+        array[position++] = (byte)(value >>> 48);
+        array[position++] = (byte)(value >>> 40);
+        array[position++] = (byte)(value >>> 32);
+        array[position++] = (byte)(value >>> 24);
+        array[position++] = (byte)(value >>> 16);
+        array[position++] = (byte)(value >>> 8);
+        array[position++] = (byte)(value >>> 0);
+    }
+
+    @Override
+    public void putFloat(float value) {
+        putInt(Float.floatToRawIntBits(value));
+    }
+
+    @Override
+    public void putDouble(double value) {
+        putLong(Double.doubleToRawLongBits(value));
+    }
+
+    @Override
+    public void put(byte[] src, int offset, int length) {
+        if (length == 0) {
+            return;
+        }
+
+        ensureRemaining(length);
+        System.arraycopy(src, offset, array, position, length);
+        position += length;
+    }
+
+    @Override
+    public void put(ByteBuffer payload) {
+        final int toCopy = payload.remaining();
+        ensureRemaining(toCopy);
+
+        if (payload.hasArray()) {
+            System.arraycopy(payload.array(), payload.arrayOffset() + payload.position(), array, position, toCopy);
+            payload.position(payload.position() + toCopy);
+        } else {
+            payload.get(array, position, toCopy);
+        }
+
+        position += toCopy;
+    }
+
+    @Override
+    public void put(ReadableBuffer payload) {
+        final int toCopy = payload.remaining();
+        ensureRemaining(toCopy);
+
+        if (payload.hasArray()) {
+            System.arraycopy(payload.array(), payload.arrayOffset() + payload.position(), array, position, toCopy);
+            payload.position(payload.position() + toCopy);
+        } else {
+            payload.get(array, position, toCopy);
+        }
+
+        position += toCopy;
+    }
+
+    @Override
+    public boolean hasRemaining() {
+        return position < Integer.MAX_VALUE;
+    }
+
+    @Override
+    public int remaining() {
+        return Integer.MAX_VALUE - position;
+    }
+
+    /**
+     * Ensures the the buffer has at least the requiredRemaining space specified.
+     * <p>
+     * The internal buffer will be doubled if the requested capacity is less than that
+     * amount or the buffer will be expanded to the full new requiredRemaining value.
+     *
+     * @param requiredRemaining
+     *      the minimum remaining bytes needed to meet the next write operation.
+     */
+    @Override
+    public void ensureRemaining(int requiredRemaining) {
+        if (requiredRemaining > array.length - position) {
+            byte newBuffer[] = new byte[Math.max(array.length << 1, requiredRemaining + position)];
+            System.arraycopy(array, 0, newBuffer, 0, array.length);
+            array = newBuffer;
+        }
+    }
+
+    @Override
+    public int position() {
+        return position;
+    }
+
+    @Override
+    public void position(int position) {
+        if (position < 0) {
+            throw new IllegalArgumentException("Requested new buffer position cannot be negative");
+        }
+
+        if (position > array.length) {
+            ensureRemaining(position - array.length);
+        }
+
+        this.position = position;
+    }
+
+    @Override
+    public int limit() {
+        return Integer.MAX_VALUE;
+    }
+
+    /**
+     * Copy bytes from this buffer into the target buffer and compacts this buffer.
+     * <p>
+     * Copy either all bytes written into this buffer (start to current position) or
+     * as many as will fit if the target capacity is less that the bytes written.  Bytes
+     * not read from this buffer are moved to the front of the buffer and the position is
+     * reset to the end of the copied region.
+     *
+     * @param target
+     *      The array to move bytes to from those written into this buffer.
+     *
+     * @return the number of bytes transfered to the target buffer.
+     */
+    public int transferTo(ByteBuffer target) {
+        int size = Math.min(position, target.remaining());
+
+        if (size == 0) {
+            return 0;
+        }
+
+        if (target.hasArray()) {
+            System.arraycopy(array, 0, target.array(), target.arrayOffset() + target.position(), size);
+            target.position(target.position() + size);
+        } else {
+            target.put(array, 0, size);
+        }
+
+        // Compact any remaining data to the front of the array so that new writes can reuse
+        // space previously allocated and not extend the array if possible.
+        if (size != position) {
+            int remainder = position - size;
+            System.arraycopy(array, size, array, 0, remainder);
+            position = remainder;  // ensure we are at end of unread chunk
+        } else {
+            position = 0; // reset to empty state.
+        }
+
+        return size;
+    }
+}
diff --git a/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/SaslImpl.java b/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/SaslImpl.java
index cee798f..ec7bffc 100644
--- a/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/SaslImpl.java
+++ b/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/SaslImpl.java
@@ -99,7 +99,7 @@
 
         AMQPDefinedTypes.registerAllTypes(_decoder,_encoder);
         _frameParser = new SaslFrameParser(this, _decoder, maxFrameSize);
-        _frameWriter = new FrameWriter(_encoder, maxFrameSize, FrameWriter.SASL_FRAME_TYPE, null, _transport);
+        _frameWriter = new FrameWriter(_encoder, maxFrameSize, FrameWriter.SASL_FRAME_TYPE, _transport);
     }
 
     void fail() {
@@ -498,10 +498,12 @@
         _allowSkip = allowSkip;
     }
 
+    @Override
     public TransportWrapper wrap(final TransportInput input, final TransportOutput output)
     {
         return new SaslSniffer(new SwitchingSaslTransportWrapper(input, output),
                                new PlainTransportWrapper(output, input)) {
+            @Override
             protected boolean isDeterminationMade() {
                 if (_role == Role.SERVER && _allowSkip) {
                     return super.isDeterminationMade();
diff --git a/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/TransportImpl.java b/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/TransportImpl.java
index 3ec6ef4..8947fa3 100644
--- a/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/TransportImpl.java
+++ b/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/TransportImpl.java
@@ -167,7 +167,6 @@
         _maxFrameSize = maxFrameSize;
         _frameWriter = new FrameWriter(_encoder, _remoteMaxFrameSize,
                                        FrameWriter.AMQP_FRAME_TYPE,
-                                       _protocolTracer,
                                        this);
     }
 
@@ -1719,6 +1718,11 @@
         }
     }
 
+    boolean isFrameTracingEnabled()
+    {
+        return (_levels & TRACE_FRM) != 0 || _protocolTracer.get() != null;
+    }
+
     boolean isTraceFramesEnabled()
     {
         return (_levels & TRACE_FRM) != 0;
diff --git a/proton-j/src/test/java/org/apache/qpid/proton/engine/impl/FrameWriterBufferTest.java b/proton-j/src/test/java/org/apache/qpid/proton/engine/impl/FrameWriterBufferTest.java
new file mode 100644
index 0000000..2f08c71
--- /dev/null
+++ b/proton-j/src/test/java/org/apache/qpid/proton/engine/impl/FrameWriterBufferTest.java
@@ -0,0 +1,723 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.qpid.proton.engine.impl;
+
+import static org.junit.Assert.assertArrayEquals;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
+import java.nio.ByteBuffer;
+
+import org.apache.qpid.proton.codec.ReadableBuffer;
+import org.junit.Test;
+
+/**
+ * Test behavior of the FrameWriterBuffer implementation
+ */
+public class FrameWriterBufferTest {
+
+    //----- Test newly create buffer behaviors -------------------------------//
+
+    @Test
+    public void testDefaultCtor() {
+        FrameWriterBuffer buffer = new FrameWriterBuffer();
+
+        assertEquals(FrameWriterBuffer.DEFAULT_CAPACITY, buffer.array().length);
+        assertEquals(0, buffer.arrayOffset());
+        assertEquals(Integer.MAX_VALUE, buffer.remaining());
+        assertEquals(0, buffer.position());
+        assertEquals(Integer.MAX_VALUE, buffer.limit());
+        assertTrue(buffer.hasRemaining());
+    }
+
+    @Test
+    public void testCreateBufferWithGivenSize() {
+        FrameWriterBuffer buffer = new FrameWriterBuffer(255);
+
+        assertEquals(255, buffer.array().length);
+        assertEquals(0, buffer.arrayOffset());
+        assertEquals(Integer.MAX_VALUE, buffer.remaining());
+        assertEquals(0, buffer.position());
+        assertEquals(Integer.MAX_VALUE, buffer.limit());
+        assertTrue(buffer.hasRemaining());
+    }
+
+    //----- Test hasRemaining ------------------------------------------------//
+
+    @Test
+    public void testHasRemaining() {
+        FrameWriterBuffer buffer = new FrameWriterBuffer(10);
+
+        for (int i = 0; i < 42; ++i) {
+            assertTrue(buffer.hasRemaining());
+            buffer.put((byte) 127);
+            assertTrue(buffer.hasRemaining());
+        }
+    }
+
+    //----- Test remaining ---------------------------------------------------//
+
+    @Test
+    public void testRemaining() {
+        FrameWriterBuffer buffer = new FrameWriterBuffer(10);
+
+        assertEquals(Integer.MAX_VALUE, buffer.remaining());
+        buffer.put((byte) 127);
+        assertEquals(Integer.MAX_VALUE - 1, buffer.remaining());
+        buffer.put((byte) 128);
+        assertEquals(Integer.MAX_VALUE - 2, buffer.remaining());
+    }
+
+    @Test
+    public void testRemainingResetsWhenDataConsumed() {
+        FrameWriterBuffer buffer = new FrameWriterBuffer(10);
+
+        assertEquals(Integer.MAX_VALUE, buffer.remaining());
+        buffer.put((byte) 127);
+        assertEquals(Integer.MAX_VALUE - 1, buffer.remaining());
+        buffer.put((byte) 128);
+        assertEquals(Integer.MAX_VALUE - 2, buffer.remaining());
+
+        ByteBuffer target = ByteBuffer.allocate(1);
+        buffer.transferTo(target);
+        assertEquals(Integer.MAX_VALUE - 1, buffer.remaining());
+        target.clear();
+        buffer.transferTo(target);
+        assertEquals(Integer.MAX_VALUE, buffer.remaining());
+    }
+
+    //----- Test position handling -------------------------------------------//
+
+    @Test
+    public void testPositionWithOneArrayAppended() {
+        FrameWriterBuffer buffer = new FrameWriterBuffer();
+
+        buffer.put(new byte[] { 0, 1, 2, 3, 4, 5, 6, 7, 8, 9 }, 0, 10);
+
+        assertEquals(Integer.MAX_VALUE, buffer.limit());
+        assertEquals(10, buffer.position());
+
+        buffer.position(5);
+        assertEquals(5, buffer.position());
+
+        buffer.position(6);
+        assertEquals(6, buffer.position());
+
+        buffer.position(10);
+        assertEquals(10, buffer.position());
+
+        try {
+            buffer.position(11);
+        } catch (IllegalArgumentException e) {
+            fail("Should not throw a IllegalArgumentException");
+        }
+    }
+
+    @Test
+    public void testPositionMovedBeyondCurrentArraySize() {
+        FrameWriterBuffer buffer = new FrameWriterBuffer(10);
+
+        byte[] data = new byte[] { 0, 1, 2, 3, 4, 5, 6, 7, 8, 9 };
+        buffer.put(data, 0, data.length);
+
+        assertEquals(data.length, buffer.array().length);
+
+        try {
+            buffer.position(15);
+        } catch (IllegalArgumentException e) {
+            fail("Should not throw a IllegalArgumentException");
+        }
+
+        // Size should have doubled.
+        assertEquals(20, buffer.array().length);
+
+        // Check written bytes are same
+        for (int i = 0; i < data.length; ++i) {
+            assertEquals(data[i], buffer.array()[i]);
+        }
+    }
+
+    @Test
+    public void testPositionMovedBeyondDoubleTheCurrentCapacity() {
+        FrameWriterBuffer buffer = new FrameWriterBuffer(10);
+
+        byte[] data = new byte[] { 0, 1, 2, 3, 4, 5, 6, 7, 8, 9 };
+        buffer.put(data, 0, data.length);
+
+        assertEquals(data.length, buffer.array().length);
+
+        try {
+            buffer.position(30);
+        } catch (IllegalArgumentException e) {
+            fail("Should not throw a IllegalArgumentException");
+        }
+
+        // Size should have expanded to meet requested size.
+        assertEquals(30, buffer.array().length);
+
+        // Check written bytes are same
+        for (int i = 0; i < data.length; ++i) {
+            assertEquals(data[i], buffer.array()[i]);
+        }
+    }
+
+    @Test
+    public void testPositionEnforcesPreconditions() {
+        FrameWriterBuffer buffer = new FrameWriterBuffer();
+
+        try {
+            buffer.position(-1);
+            fail("Should throw a IllegalArgumentException");
+        } catch (IllegalArgumentException e) {}
+    }
+
+    //----- Test put array ---------------------------------------------------//
+
+    @Test
+    public void testPutByteArray() {
+        FrameWriterBuffer buffer = new FrameWriterBuffer(10);
+
+        byte[] data = new byte[] { 127, (byte) 128 };
+
+        assertEquals(0, buffer.position());
+        buffer.put(data, 0, data.length);
+        assertEquals(2, buffer.position());
+        buffer.put(data, 0, data.length);
+        assertEquals(4, buffer.position());
+
+        assertEquals(Integer.MAX_VALUE - 4, buffer.remaining());
+    }
+
+    @Test
+    public void testPutByteArrayWithZeroLengthDoesNotExpandBuffer() {
+        FrameWriterBuffer buffer = new FrameWriterBuffer(0);
+
+        byte[] data = new byte[] { 127, (byte) 128 };
+
+        assertEquals(0, buffer.array().length);
+        buffer.put(data, 0, 0);
+        assertEquals(0, buffer.array().length);
+    }
+
+    @Test
+    public void testPutByteArrayPastExistingCapacity() {
+        FrameWriterBuffer buffer = new FrameWriterBuffer(6);
+
+        byte[] data = new byte[] { 127, (byte) 128 };
+
+        buffer.put(data, 0, data.length);
+        buffer.put(data, 0, data.length);
+        buffer.put(data, 0, data.length);
+
+        assertEquals(6, buffer.array().length);
+        assertEquals(6, buffer.position());
+
+        // Should prompt a resize of the array
+        buffer.put(data, 0, data.length);
+
+        assertEquals(12, buffer.array().length);
+        assertEquals(8, buffer.position());
+    }
+
+    @Test
+    public void testPutByteArrayLargerThanDefaultExpansionSize() {
+        FrameWriterBuffer buffer = new FrameWriterBuffer(6);
+
+        byte[] data = new byte[] {0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19, 20, 21, 22, 23};
+
+        buffer.put(data, 0, data.length);
+
+        // Should use the size needed instead of double current size
+        assertEquals(data.length, buffer.array().length);
+        assertEquals(data.length, buffer.position());
+    }
+
+    //----- Test put ByteBuffer ----------------------------------------------//
+
+    @Test
+    public void testPutByteBuffer() {
+        FrameWriterBuffer buffer = new FrameWriterBuffer(10);
+
+        byte[] data = new byte[] { 127, (byte) 128 };
+        ByteBuffer byteBuffer = ByteBuffer.wrap(data);
+
+        assertEquals(0, buffer.position());
+        buffer.put(byteBuffer.duplicate());
+        assertEquals(2, buffer.position());
+        buffer.put(byteBuffer.duplicate());
+        assertEquals(4, buffer.position());
+
+        assertEquals(Integer.MAX_VALUE - 4, buffer.remaining());
+    }
+
+    @Test
+    public void testPutByteBufferWithoutArrayAccess() {
+        FrameWriterBuffer buffer = new FrameWriterBuffer(10);
+
+        byte[] data = new byte[] { 127, (byte) 128 };
+        ByteBuffer byteBuffer = ByteBuffer.wrap(data).asReadOnlyBuffer();
+
+        assertEquals(0, buffer.position());
+        buffer.put(byteBuffer.duplicate());
+        assertEquals(2, buffer.position());
+        buffer.put(byteBuffer.duplicate());
+        assertEquals(4, buffer.position());
+
+        assertEquals(Integer.MAX_VALUE - 4, buffer.remaining());
+    }
+
+    @Test
+    public void testPutByteBufferPastExistingCapacity() {
+        FrameWriterBuffer buffer = new FrameWriterBuffer(6);
+
+        byte[] data = new byte[] { 127, (byte) 128 };
+        ByteBuffer byteBuffer = ByteBuffer.wrap(data);
+
+        buffer.put(byteBuffer.duplicate());
+        buffer.put(byteBuffer.duplicate());
+        buffer.put(byteBuffer.duplicate());
+
+        assertEquals(6, buffer.array().length);
+        assertEquals(6, buffer.position());
+
+        // Should prompt a resize of the array
+        buffer.put(byteBuffer);
+
+        assertEquals(12, buffer.array().length);
+        assertEquals(8, buffer.position());
+    }
+
+    @Test
+    public void testPutByteBufferLargerThanDefaultExpansionSize() {
+        FrameWriterBuffer buffer = new FrameWriterBuffer(6);
+
+        byte[] data = new byte[] {0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19, 20, 21, 22, 23};
+        ByteBuffer byteBuffer = ByteBuffer.wrap(data);
+
+        buffer.put(byteBuffer);
+
+        // Should use the size needed instead of double current size
+        assertEquals(data.length, buffer.array().length);
+        assertEquals(data.length, buffer.position());
+    }
+
+    //----- Test put ReadableBuffer --------------------------------------------//
+
+    @Test
+    public void testPutReadableBuffer() {
+        FrameWriterBuffer buffer = new FrameWriterBuffer(10);
+
+        byte[] data = new byte[] { 127, (byte) 128 };
+        ReadableBuffer readable = ReadableBuffer.ByteBufferReader.wrap(data);
+
+        assertEquals(0, buffer.position());
+        buffer.put(readable.duplicate());
+        assertEquals(2, buffer.position());
+        buffer.put(readable.duplicate());
+        assertEquals(4, buffer.position());
+
+        assertEquals(Integer.MAX_VALUE - 4, buffer.remaining());
+    }
+
+    @Test
+    public void testPutReadableBufferWithoutArrayAccess() {
+        FrameWriterBuffer buffer = new FrameWriterBuffer(10);
+
+        byte[] data = new byte[] { 127, (byte) 128 };
+        ByteBuffer byteBuffer = ByteBuffer.wrap(data);
+        ReadableBuffer readable = ReadableBuffer.ByteBufferReader.wrap(byteBuffer.asReadOnlyBuffer());
+
+        assertEquals(0, buffer.position());
+        buffer.put(readable.duplicate());
+        assertEquals(2, buffer.position());
+        buffer.put(readable.duplicate());
+        assertEquals(4, buffer.position());
+
+        assertEquals(Integer.MAX_VALUE - 4, buffer.remaining());
+    }
+
+    @Test
+    public void testPutReadableBufferPastExistingCapacity() {
+        FrameWriterBuffer buffer = new FrameWriterBuffer(6);
+
+        byte[] data = new byte[] { 127, (byte) 128 };
+        ReadableBuffer readable = ReadableBuffer.ByteBufferReader.wrap(data);
+
+        buffer.put(readable.duplicate());
+        buffer.put(readable.duplicate());
+        buffer.put(readable.duplicate());
+
+        assertEquals(6, buffer.array().length);
+        assertEquals(6, buffer.position());
+
+        // Should prompt a resize of the array
+        buffer.put(readable.duplicate());
+
+        assertEquals(12, buffer.array().length);
+        assertEquals(8, buffer.position());
+    }
+
+    @Test
+    public void testPutReadableBufferLargerThanDefaultExpansionSize() {
+        FrameWriterBuffer buffer = new FrameWriterBuffer(6);
+
+        byte[] data = new byte[] {0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19, 20, 21, 22, 23};
+        ReadableBuffer readable = ReadableBuffer.ByteBufferReader.wrap(data);
+
+        buffer.put(readable);
+
+        // Should use the size needed instead of double current size
+        assertEquals(data.length, buffer.array().length);
+        assertEquals(data.length, buffer.position());
+    }
+
+    //----- Test put byte ----------------------------------------------------//
+
+    @Test
+    public void testPutByte() {
+        FrameWriterBuffer buffer = new FrameWriterBuffer(10);
+
+        assertEquals(0, buffer.position());
+        buffer.put((byte) 127);
+        assertEquals(1, buffer.position());
+        buffer.put((byte) 128);
+        assertEquals(2, buffer.position());
+
+        assertEquals(Integer.MAX_VALUE - 2, buffer.remaining());
+    }
+
+    @Test
+    public void testPutBytePastExistingCapacity() {
+        FrameWriterBuffer buffer = new FrameWriterBuffer(5);
+
+        buffer.put((byte) 127);
+        buffer.put((byte) 128);
+        buffer.put((byte) 127);
+        buffer.put((byte) 128);
+        buffer.put((byte) 128);
+
+        assertEquals(5, buffer.array().length);
+        assertEquals(5, buffer.position());
+
+        // Should prompt a resize of the array
+        buffer.put((byte) 127);
+
+        assertEquals(10, buffer.array().length);
+        assertEquals(6, buffer.position());
+    }
+
+    //----- Test put short ---------------------------------------------------//
+
+    @Test
+    public void testPutShort() {
+        FrameWriterBuffer buffer = new FrameWriterBuffer(10);
+
+        assertEquals(0, buffer.position());
+        buffer.putShort((short) 127);
+        assertEquals(2, buffer.position());
+        buffer.putShort((short) 128);
+        assertEquals(4, buffer.position());
+
+        assertEquals(Integer.MAX_VALUE - 4, buffer.remaining());
+    }
+
+    @Test
+    public void testPutShortPastExistingCapacity() {
+        FrameWriterBuffer buffer = new FrameWriterBuffer(6);
+
+        buffer.putShort((short) 128);
+        buffer.putShort((short) 127);
+        buffer.putShort((short) 128);
+
+        assertEquals(6, buffer.array().length);
+        assertEquals(6, buffer.position());
+
+        // Should prompt a resize of the array
+        buffer.putShort((short) 127);
+
+        assertEquals(12, buffer.array().length);
+        assertEquals(8, buffer.position());
+    }
+
+    //----- Test put int -----------------------------------------------------//
+
+    @Test
+    public void testPutInt() {
+        FrameWriterBuffer buffer = new FrameWriterBuffer(10);
+
+        assertEquals(0, buffer.position());
+        buffer.putInt(127);
+        assertEquals(4, buffer.position());
+        buffer.putInt(128);
+        assertEquals(8, buffer.position());
+
+        assertEquals(Integer.MAX_VALUE - 8, buffer.remaining());
+    }
+
+    @Test
+    public void testPutIntPastExistingCapacity() {
+        FrameWriterBuffer buffer = new FrameWriterBuffer(12);
+
+        buffer.putInt(127);
+        buffer.putInt(128);
+        buffer.putInt(127);
+
+        assertEquals(12, buffer.array().length);
+        assertEquals(12, buffer.position());
+
+        // Should prompt a resize of the array
+        buffer.putInt(128);
+
+        assertEquals(24, buffer.array().length);
+        assertEquals(16, buffer.position());
+    }
+
+    //----- Test put long ----------------------------------------------------//
+
+    @Test
+    public void testPutLong() {
+        FrameWriterBuffer buffer = new FrameWriterBuffer(20);
+
+        assertEquals(0, buffer.position());
+        buffer.putLong(127);
+        assertEquals(8, buffer.position());
+        buffer.putLong(128);
+        assertEquals(16, buffer.position());
+
+        assertEquals(Integer.MAX_VALUE - 16, buffer.remaining());
+    }
+
+    @Test
+    public void testPutLongPastExistingCapacity() {
+        FrameWriterBuffer buffer = new FrameWriterBuffer(24);
+
+        buffer.putLong(127);
+        buffer.putLong(128);
+        buffer.putLong(127);
+
+        assertEquals(24, buffer.array().length);
+        assertEquals(24, buffer.position());
+
+        // Should prompt a resize of the array
+        buffer.putLong(128);
+
+        assertEquals(48, buffer.array().length);
+        assertEquals(32, buffer.position());
+    }
+
+    //----- Test put float ---------------------------------------------------//
+
+    @Test
+    public void testPutFloat() {
+        FrameWriterBuffer buffer = new FrameWriterBuffer(8);
+
+        assertEquals(0, buffer.position());
+        buffer.putFloat(127);
+        assertEquals(4, buffer.position());
+        buffer.putFloat(128);
+        assertEquals(8, buffer.position());
+
+        assertEquals(Integer.MAX_VALUE - 8, buffer.remaining());
+    }
+
+    @Test
+    public void testPutFloatPastExistingCapacity() {
+        FrameWriterBuffer buffer = new FrameWriterBuffer(12);
+
+        buffer.putFloat(127);
+        buffer.putFloat(128);
+        buffer.putFloat(127);
+
+        assertEquals(12, buffer.array().length);
+        assertEquals(12, buffer.position());
+
+        // Should prompt a resize of the array
+        buffer.putFloat(128);
+
+        assertEquals(24, buffer.array().length);
+        assertEquals(16, buffer.position());
+    }
+
+    //----- Test put double --------------------------------------------------//
+
+    @Test
+    public void testPutDouble() {
+        FrameWriterBuffer buffer = new FrameWriterBuffer(20);
+
+        assertEquals(0, buffer.position());
+        buffer.putDouble(127);
+        assertEquals(8, buffer.position());
+        buffer.putDouble(128);
+        assertEquals(16, buffer.position());
+
+        assertEquals(Integer.MAX_VALUE - 16, buffer.remaining());
+    }
+
+    @Test
+    public void testPutDoublePastExistingCapacity() {
+        FrameWriterBuffer buffer = new FrameWriterBuffer(24);
+
+        buffer.putDouble(127);
+        buffer.putDouble(128);
+        buffer.putDouble(127);
+
+        assertEquals(24, buffer.array().length);
+        assertEquals(24, buffer.position());
+
+        // Should prompt a resize of the array
+        buffer.putDouble(128);
+
+        assertEquals(48, buffer.array().length);
+        assertEquals(32, buffer.position());
+    }
+
+    //----- Test transferTo --------------------------------------------------//
+
+    @Test
+    public void testTrasnferHandlesZeroSizedReadRequest() {
+        FrameWriterBuffer buffer = new FrameWriterBuffer(6);
+
+        byte[] data = new byte[] {0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19, 20, 21, 22, 23};
+        ByteBuffer target = ByteBuffer.allocate(0);
+
+        buffer.put(data, 0, data.length);
+
+        // Should use the size needed instead of double current size
+        assertEquals(data.length, buffer.array().length);
+        assertEquals(data.length, buffer.position());
+
+        buffer.transferTo(target);
+
+        assertEquals(data.length, buffer.position());
+
+        // Should now be nothing to transfer
+        target.clear();
+
+        buffer.transferTo(target);
+
+        assertEquals(data.length, buffer.position());
+        assertEquals(0, target.position());
+    }
+
+    @Test
+    public void testTrasnferFullBufferToTarget() {
+        FrameWriterBuffer buffer = new FrameWriterBuffer(6);
+
+        byte[] data = new byte[] {0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19, 20, 21, 22, 23};
+        ByteBuffer target = ByteBuffer.allocate(data.length);
+
+        buffer.put(data, 0, data.length);
+
+        // Should use the size needed instead of double current size
+        assertEquals(data.length, buffer.array().length);
+        assertEquals(data.length, buffer.position());
+
+        buffer.transferTo(target);
+
+        assertEquals(0, buffer.position());
+
+        assertArrayEquals(data, target.array());
+    }
+
+    @Test
+    public void testTrasnferFullBufferToTargetWithoutArrayAccess() {
+        FrameWriterBuffer buffer = new FrameWriterBuffer(6);
+
+        byte[] data = new byte[] {0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19, 20, 21, 22, 23};
+        ByteBuffer target = ByteBuffer.allocateDirect(data.length);
+
+        buffer.put(data, 0, data.length);
+
+        // Should use the size needed instead of double current size
+        assertEquals(data.length, buffer.array().length);
+        assertEquals(data.length, buffer.position());
+
+        buffer.transferTo(target);
+
+        assertEquals(0, buffer.position());
+
+        // Check contents in target are correct which requires getting the data out first.
+        target.flip();
+        byte[] targetPayload = new byte[target.remaining()];
+        target.get(targetPayload);
+        assertArrayEquals(data, targetPayload);
+    }
+
+    @Test
+    public void testTrasnferPartialBufferToTarget() {
+        FrameWriterBuffer buffer = new FrameWriterBuffer(6);
+
+        byte[] data = new byte[] {0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19, 20, 21, 22, 23};
+        ByteBuffer target = ByteBuffer.allocate(data.length);
+
+        buffer.put(data, 0, data.length);
+
+        // Should use the size needed instead of double current size
+        assertEquals(data.length, buffer.array().length);
+        assertEquals(data.length, buffer.position());
+
+        // First Half
+        target.limit(data.length / 2);
+        buffer.transferTo(target);
+
+        // The buffer should have compacted
+        assertEquals(data.length / 2, buffer.position());
+
+        // Second half
+        target.limit(target.capacity());
+        target.position(data.length / 2);
+        buffer.transferTo(target);
+
+        // Buffer contents should have been consumed
+        assertEquals(0, buffer.arrayOffset());
+        assertEquals(0, buffer.position());
+
+        assertArrayEquals(data, target.array());
+    }
+
+    @Test
+    public void testTrasnferPartialBufferThenRequestMoreThanIsRemaining() {
+        FrameWriterBuffer buffer = new FrameWriterBuffer(10);
+
+        byte[] data = new byte[] {0, 1, 2, 3, 4, 5, 6, 7, 8, 9};
+
+        buffer.put(data, 0, data.length);
+
+        // Should use the size needed instead of double current size
+        assertEquals(data.length, buffer.array().length);
+        assertEquals(data.length, buffer.position());
+
+        // First Half transfered which should lead to compaction
+        ByteBuffer target = ByteBuffer.allocate(5);
+        buffer.transferTo(target);
+        assertEquals(data.length / 2, buffer.position());
+
+        // Now the buffer should compact
+        buffer.ensureRemaining(10);
+        assertEquals(data.length / 2, buffer.position());
+
+        // Second half
+        target = ByteBuffer.allocate(5);
+        buffer.transferTo(target);
+
+        // Buffer contents should have been consumed
+        assertEquals(0, buffer.position());
+
+        assertArrayEquals(new byte[] {5, 6, 7, 8, 9}, target.array());
+    }
+}
\ No newline at end of file
diff --git a/proton-j/src/test/java/org/apache/qpid/proton/engine/impl/FrameWriterTest.java b/proton-j/src/test/java/org/apache/qpid/proton/engine/impl/FrameWriterTest.java
new file mode 100644
index 0000000..636c7ac
--- /dev/null
+++ b/proton-j/src/test/java/org/apache/qpid/proton/engine/impl/FrameWriterTest.java
@@ -0,0 +1,357 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.qpid.proton.engine.impl;
+
+import static org.junit.Assert.assertArrayEquals;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNotEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertTrue;
+
+import java.nio.ByteBuffer;
+import java.util.Random;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import org.apache.qpid.proton.amqp.Binary;
+import org.apache.qpid.proton.amqp.UnsignedInteger;
+import org.apache.qpid.proton.amqp.transport.ReceiverSettleMode;
+import org.apache.qpid.proton.amqp.transport.Transfer;
+import org.apache.qpid.proton.codec.AMQPDefinedTypes;
+import org.apache.qpid.proton.codec.DecoderImpl;
+import org.apache.qpid.proton.codec.EncoderImpl;
+import org.apache.qpid.proton.codec.ReadableBuffer;
+import org.apache.qpid.proton.framing.TransportFrame;
+import org.junit.Before;
+import org.junit.Test;
+import org.mockito.ArgumentCaptor;
+import org.mockito.Mockito;
+
+/**
+ * Tests for the FrameWriter implementation
+ */
+public class FrameWriterTest {
+
+    private final TransportImpl transport = new TransportImpl();
+
+    private final DecoderImpl decoder = new DecoderImpl();
+    private final EncoderImpl encoder = new EncoderImpl(decoder);
+
+    private ReadableBuffer bigPayload;
+    private ByteBuffer buffer;
+
+    @Before
+    public void setUp() {
+        AMQPDefinedTypes.registerAllTypes(decoder, encoder);
+
+        buffer = ByteBuffer.allocate(16384);
+
+        encoder.setByteBuffer(buffer);
+        decoder.setByteBuffer(buffer);
+
+        Random random = new Random(System.currentTimeMillis());
+        bigPayload = ReadableBuffer.ByteBufferReader.allocate(4096);
+        for (int i = 0; i < bigPayload.remaining(); ++i) {
+            bigPayload.array()[i] = (byte) random.nextInt(127);
+        }
+    }
+
+    @Test
+    public void testFrameWrittenToBuffer() {
+        Transfer transfer = createTransfer();
+        FrameWriter framer = new FrameWriter(encoder, Integer.MAX_VALUE, (byte) 0, transport);
+
+        framer.writeFrame(transfer);
+        assertNotEquals(0, framer.readBytes(buffer));
+
+        buffer.flip();
+        buffer.position(8); // Remove the Frame Header
+
+        Object decoded = decoder.readObject();
+        assertNotNull(decoded);
+        assertTrue(decoded instanceof Transfer);
+        Transfer result = (Transfer) decoded;
+
+        assertEquals(UnsignedInteger.ONE, result.getHandle());
+        assertEquals(UnsignedInteger.ZERO, result.getMessageFormat());
+        assertEquals(UnsignedInteger.valueOf(127), result.getDeliveryId());
+    }
+
+    @Test
+    public void testFrameWrittenToBufferWithLargePayloadAndMaxFrameSizeInvokesHandlerOnce() {
+        Transfer transfer = createTransfer();
+        FrameWriter framer = new FrameWriter(encoder, 2048, (byte) 0, transport);
+
+        final AtomicInteger toLargeCallbackCount = new AtomicInteger();
+
+        framer.writeFrame(0, transfer, bigPayload, new Runnable() {
+
+            @Override
+            public void run() {
+                toLargeCallbackCount.incrementAndGet();
+            }
+        });
+
+        // Should have read some data during this encode.
+        assertNotEquals(0, framer.readBytes(buffer));
+
+        buffer.flip();
+        byte[] header = new byte[FrameWriter.FRAME_HEADER_SIZE];
+        buffer.get(header);
+
+        ReadableBuffer headerReader = ReadableBuffer.ByteBufferReader.wrap(header);
+        int size = headerReader.getInt();
+        assertEquals(2048, size);
+
+        // Should have only asked once for the handler to respond to the to large paylod.
+        assertEquals(1, toLargeCallbackCount.get());
+    }
+
+    @Test
+    public void testFrameWrittenToBufferWithLargePayloadAndNoMaxFrameSize() {
+        Transfer transfer = createTransfer();
+        FrameWriter framer = new FrameWriter(encoder, Integer.MAX_VALUE, (byte) 0, transport);
+
+        framer.writeFrame(0, transfer, bigPayload, new PartialTransferHandler(transfer));
+        assertNotEquals(0, framer.readBytes(buffer));
+
+        buffer.flip();
+
+        byte[] header = new byte[FrameWriter.FRAME_HEADER_SIZE];
+        buffer.get(header);
+
+        ReadableBuffer headerReader = ReadableBuffer.ByteBufferReader.wrap(header);
+        int size = headerReader.getInt();
+        assertTrue(size > 4096);
+
+        Object decoded = decoder.readObject();
+        assertNotNull(decoded);
+        assertTrue(decoded instanceof Transfer);
+
+        // Check for our payload
+        assertTrue(buffer.hasRemaining());
+        assertEquals(4096, buffer.remaining());
+
+        byte[] payload = new byte[4096];
+        buffer.get(payload);
+
+        assertArrayEquals(bigPayload.array(), payload);
+    }
+
+    @Test
+    public void testFrameWrittenToBufferWithLargePayloadAndMaxFrameSize() {
+        Transfer transfer = createTransfer();
+        FrameWriter framer = new FrameWriter(encoder, 2048, (byte) 0, transport);
+
+        int payloadSize = 0;
+
+        // Should take three write and three reads to get it all and the Transfer should
+        // indicate that there is more data to come after the first two writes
+        for (int i = 1; i <= 3; ++i) {
+            transfer.setMore(false);
+            framer.writeFrame(0, transfer, bigPayload, new PartialTransferHandler(transfer));
+
+            ByteBuffer intermediate = ByteBuffer.allocate(4096);
+            int bytesRead = framer.readBytes(intermediate);
+            intermediate.flip();
+
+            // Read the Frame Header
+            byte[] header = new byte[FrameWriter.FRAME_HEADER_SIZE];
+            intermediate.get(header);
+            ReadableBuffer headerReader = ReadableBuffer.ByteBufferReader.wrap(header);
+            int frameSize = headerReader.getInt();
+
+            if (i < 3) {
+                assertTrue(transfer.getMore());
+                assertEquals(2048, bytesRead);
+                assertEquals(2048, frameSize);
+            } else {
+                assertFalse(transfer.getMore());
+                assertTrue(bytesRead < 2048);
+                assertTrue(frameSize < 2048);
+            }
+
+            decoder.setBuffer(ReadableBuffer.ByteBufferReader.wrap(intermediate));
+            Object decoded = decoder.readObject();
+            assertNotNull(decoded);
+            assertTrue(decoded instanceof Transfer);
+
+            // Trim the Frame header and Transfer encoding size and store off actual payload
+            payloadSize += bytesRead - intermediate.position();
+
+            // Accumulate the data minus the frame headers
+            buffer.put(intermediate);
+        }
+
+        assertEquals(3, framer.getFramesOutput());
+
+        buffer.rewind();
+        buffer.limit(payloadSize);
+
+        // Check for our payload
+        assertTrue(buffer.hasRemaining());
+        assertEquals(4096, buffer.remaining());
+
+        byte[] payload = new byte[4096];
+        buffer.get(payload);
+
+        assertArrayEquals(bigPayload.array(), payload);
+    }
+
+    @Test
+    public void testWriteEmptyFrame() {
+        FrameWriter framer = new FrameWriter(encoder, Integer.MAX_VALUE, (byte) 1, transport);
+
+        framer.writeFrame(16, null, null, null);
+
+        ByteBuffer headerBuffer = ByteBuffer.allocate(16);
+        framer.readBytes(headerBuffer);
+
+        assertEquals(FrameWriter.FRAME_HEADER_SIZE, headerBuffer.position());
+
+        headerBuffer.flip();
+
+        // Size, offset, Frame type, channel
+        assertEquals(FrameWriter.FRAME_HEADER_SIZE, headerBuffer.getInt());
+        assertEquals(2, headerBuffer.get());
+        assertEquals(1, headerBuffer.get());
+        assertEquals(16, headerBuffer.getShort());
+    }
+
+    @Test
+    public void testFrameWriterReportsFullBasedOnConfiguration() {
+        Transfer transfer = createTransfer();
+        FrameWriter framer = new FrameWriter(encoder, Integer.MAX_VALUE, (byte) 0, transport);
+
+        framer.writeFrame(0, transfer, bigPayload, new PartialTransferHandler(transfer));
+
+        assertEquals(FrameWriter.DEFAULT_FRAME_BUFFER_FULL_MARK ,framer.getFrameWriterMaxBytes());
+        assertFalse(framer.isFull());
+        framer.setFrameWriterMaxBytes(2048);
+        assertTrue(framer.isFull());
+        assertEquals(2048 ,framer.getFrameWriterMaxBytes());
+        framer.setFrameWriterMaxBytes(16384);
+        assertFalse(framer.isFull());
+        assertEquals(16384, framer.getFrameWriterMaxBytes());
+    }
+
+    @Test
+    public void testFrameWriterLogsFramesToTracer() {
+        FrameWriterProtocolTracer tracer = new FrameWriterProtocolTracer();
+        transport.setProtocolTracer(tracer);
+
+        Transfer transfer = createTransfer();
+        FrameWriter framer = new FrameWriter(encoder, Integer.MAX_VALUE, (byte) 0, transport);
+
+        framer.writeFrame(16, transfer, bigPayload, new PartialTransferHandler(transfer));
+
+        assertNotNull(tracer.getSentFrame());
+        TransportFrame sentFrame = tracer.getSentFrame();
+
+        assertEquals(16, sentFrame.getChannel());
+        assertTrue(sentFrame.getBody() instanceof Transfer);
+
+        Binary payload = sentFrame.getPayload();
+
+        assertEquals(bigPayload.capacity(), payload.getLength());
+    }
+
+    @Test
+    public void testFrameWriterLogsFramesToSystem() {
+        transport.trace(2);
+        TransportImpl spy = Mockito.spy(transport);
+
+        Transfer transfer = createTransfer();
+        FrameWriter framer = new FrameWriter(encoder, Integer.MAX_VALUE, (byte) 0, spy);
+
+        framer.writeFrame(16, transfer, bigPayload, new PartialTransferHandler(transfer));
+
+        ArgumentCaptor<TransportFrame> frameCatcher = ArgumentCaptor.forClass(TransportFrame.class);
+        Mockito.verify(spy).log(Mockito.anyString(), frameCatcher.capture());
+
+        assertEquals(16, frameCatcher.getValue().getChannel());
+        assertTrue(frameCatcher.getValue().getBody() instanceof Transfer);
+
+        Binary payload = frameCatcher.getValue().getPayload();
+
+        assertEquals(bigPayload.capacity(), payload.getLength());
+    }
+
+    @Test
+    public void testWriteHeader() {
+        FrameWriter framer = new FrameWriter(encoder, Integer.MAX_VALUE, (byte) 1, transport);
+        byte[] header = new byte[] {0, 0, 0, 9, 3, 4, 0, 12};
+        framer.writeHeader(header);
+
+        ByteBuffer headerBuffer = ByteBuffer.allocate(16);
+        framer.readBytes(headerBuffer);
+
+        assertEquals(FrameWriter.FRAME_HEADER_SIZE, headerBuffer.position());
+
+        headerBuffer.flip();
+
+        // Size, offset, Frame type, channel
+        assertEquals(FrameWriter.FRAME_HEADER_SIZE + 1, headerBuffer.getInt());
+        assertEquals(3, headerBuffer.get());
+        assertEquals(4, headerBuffer.get());
+        assertEquals(12, headerBuffer.getShort());
+    }
+
+    private Transfer createTransfer() {
+        Transfer transfer = new Transfer();
+        transfer.setHandle(UnsignedInteger.ONE);
+        transfer.setDeliveryTag(new Binary(new byte[] {0, 1}));
+        transfer.setMessageFormat(UnsignedInteger.ZERO);
+        transfer.setDeliveryId(UnsignedInteger.valueOf(127));
+        transfer.setAborted(false);
+        transfer.setBatchable(false);
+        transfer.setRcvSettleMode(ReceiverSettleMode.SECOND);
+
+        return transfer;
+    }
+
+    private static final class FrameWriterProtocolTracer implements ProtocolTracer {
+
+        private TransportFrame sentFrame;
+
+        public TransportFrame getSentFrame() {
+            return sentFrame;
+        }
+
+        @Override
+        public void receivedFrame(TransportFrame transportFrame) {
+        }
+
+        @Override
+        public void sentFrame(TransportFrame transportFrame) {
+            sentFrame = transportFrame;
+        }
+    }
+
+    private static final class PartialTransferHandler implements Runnable {
+        private Transfer transfer;
+
+        public PartialTransferHandler(Transfer transfer) {
+            this.transfer = transfer;
+        }
+
+        @Override
+        public void run() {
+            transfer.setMore(true);
+        }
+    }
+}
diff --git a/proton-j/src/test/java/org/apache/qpid/proton/systemtests/DeliveryTest.java b/proton-j/src/test/java/org/apache/qpid/proton/systemtests/DeliveryTest.java
index 69db7c1..a923353 100644
--- a/proton-j/src/test/java/org/apache/qpid/proton/systemtests/DeliveryTest.java
+++ b/proton-j/src/test/java/org/apache/qpid/proton/systemtests/DeliveryTest.java
@@ -29,6 +29,7 @@
 
 import java.nio.charset.StandardCharsets;
 import java.util.Arrays;
+import java.util.Random;
 import java.util.logging.Logger;
 
 import org.apache.qpid.proton.Proton;
@@ -70,14 +71,12 @@
         getServer().connection = Proton.connection();
         getServer().transport.bind(getServer().connection);
 
-
         LOGGER.fine(bold("======== About to open connections"));
         getClient().connection.open();
         getServer().connection.open();
 
         doOutputInputCycle();
 
-
         LOGGER.fine(bold("======== About to open sessions"));
         getClient().session = getClient().connection.session();
         getClient().session.open();
@@ -93,7 +92,6 @@
         pumpServerToClient();
         assertEndpointState(getClient().session, ACTIVE, ACTIVE);
 
-
         LOGGER.fine(bold("======== About to create reciever"));
 
         getClient().source = new Source();
@@ -116,7 +114,6 @@
 
         pumpClientToServer();
 
-
         LOGGER.fine(bold("======== About to set up implicitly created sender"));
 
         getServer().sender = (Sender) getServer().connection.linkHead(of(UNINITIALIZED), of(ACTIVE));
@@ -141,7 +138,6 @@
 
         pumpClientToServer();
 
-
         LOGGER.fine(bold("======== About to create messages and send to the client"));
 
         sendMessageToClient("delivery1", "Msg1", null); // Don't set it, so it should be defaulted
@@ -165,6 +161,130 @@
         assertEquals("Unexpected message format", UnsignedInteger.MAX_VALUE.intValue(), clientDelivery4.getMessageFormat());
     }
 
+    @Test
+    public void testSendReceiveLargeMessage() throws Exception
+    {
+        doTestSendReceiveLargeMessage(-1, -1);
+    }
+
+    @Test
+    public void testSendReceiveLargeMessageWithFrameSizeLimits() throws Exception
+    {
+        doTestSendReceiveLargeMessage(128 * 1024, 128 * 1024);
+    }
+
+    private void doTestSendReceiveLargeMessage(int maxFrameSize, int maxOutboundFrameSize) throws Exception
+    {
+        LOGGER.fine(bold("======== About to create transports"));
+
+        getClient().transport = Proton.transport();
+        if (maxFrameSize > 0) {
+            getClient().transport.setMaxFrameSize(maxFrameSize);
+        }
+        if (maxOutboundFrameSize > 0) {
+            getClient().transport.setOutboundFrameSizeLimit(maxOutboundFrameSize);
+        }
+        ProtocolTracerEnabler.setProtocolTracer(getClient().transport, TestLoggingHelper.CLIENT_PREFIX);
+
+        getServer().transport = Proton.transport();
+        if (maxFrameSize > 0) {
+            getServer().transport.setMaxFrameSize(maxFrameSize);
+        }
+        if (maxOutboundFrameSize > 0) {
+            getServer().transport.setOutboundFrameSizeLimit(maxOutboundFrameSize);
+        }
+        ProtocolTracerEnabler.setProtocolTracer(getServer().transport, "            " + TestLoggingHelper.SERVER_PREFIX);
+
+        doOutputInputCycle();
+
+        getClient().connection = Proton.connection();
+        getClient().transport.bind(getClient().connection);
+
+        getServer().connection = Proton.connection();
+        getServer().transport.bind(getServer().connection);
+
+        LOGGER.fine(bold("======== About to open connections"));
+        getClient().connection.open();
+        getServer().connection.open();
+
+        doOutputInputCycle();
+
+        LOGGER.fine(bold("======== About to open sessions"));
+        getClient().session = getClient().connection.session();
+        getClient().session.open();
+
+        pumpClientToServer();
+
+        getServer().session = getServer().connection.sessionHead(of(UNINITIALIZED), of(ACTIVE));
+        assertEndpointState(getServer().session, UNINITIALIZED, ACTIVE);
+
+        getServer().session.open();
+        assertEndpointState(getServer().session, ACTIVE, ACTIVE);
+
+        pumpServerToClient();
+        assertEndpointState(getClient().session, ACTIVE, ACTIVE);
+
+        LOGGER.fine(bold("======== About to create reciever"));
+
+        getClient().source = new Source();
+        getClient().source.setAddress(_sourceAddress);
+
+        getClient().target = new Target();
+        getClient().target.setAddress(null);
+
+        getClient().receiver = getClient().session.receiver("link1");
+        getClient().receiver.setTarget(getClient().target);
+        getClient().receiver.setSource(getClient().source);
+
+        getClient().receiver.setReceiverSettleMode(ReceiverSettleMode.FIRST);
+        getClient().receiver.setSenderSettleMode(SenderSettleMode.UNSETTLED);
+
+        assertEndpointState(getClient().receiver, UNINITIALIZED, UNINITIALIZED);
+
+        getClient().receiver.open();
+        assertEndpointState(getClient().receiver, ACTIVE, UNINITIALIZED);
+
+        pumpClientToServer();
+
+        LOGGER.fine(bold("======== About to set up implicitly created sender"));
+
+        getServer().sender = (Sender) getServer().connection.linkHead(of(UNINITIALIZED), of(ACTIVE));
+
+        getServer().sender.setReceiverSettleMode(getServer().sender.getRemoteReceiverSettleMode());
+        getServer().sender.setSenderSettleMode(getServer().sender.getRemoteSenderSettleMode());
+
+        org.apache.qpid.proton.amqp.transport.Source serverRemoteSource = getServer().sender.getRemoteSource();
+        getServer().sender.setSource(serverRemoteSource);
+
+        assertEndpointState(getServer().sender, UNINITIALIZED, ACTIVE);
+        getServer().sender.open();
+
+        assertEndpointState(getServer().sender, ACTIVE, ACTIVE);
+
+        pumpServerToClient();
+
+        assertEndpointState(getClient().receiver, ACTIVE, ACTIVE);
+
+        getClient().receiver.flow(1);
+
+        pumpClientToServer();
+
+        String messageContent = createMessageContent(256 * 1024);
+
+        LOGGER.fine(bold("======== About to create messages and send to the client"));
+
+        sendMessageToClient("delivery1", messageContent, 0); // Explicitly set it to the default
+
+        pumpAllServerPendingToClient();
+
+        LOGGER.fine(bold("======== About to process the messages on the client"));
+
+        Delivery clientDelivery = receiveMessageFromServer("delivery1", messageContent);
+
+        // Verify the message format is as expected
+        assertEquals("Unexpected message format", 0, clientDelivery.getMessageFormat());
+    }
+
     private Delivery receiveMessageFromServer(String deliveryTag, String messageContent)
     {
         Delivery delivery = getClient().connection.getWorkHead();
@@ -179,6 +299,7 @@
         assertFalse(delivery.isPartial());
         assertTrue(delivery.isReadable());
 
+        final int BUFFER_SIZE = messageContent.length() * 4;
         byte[] received = new byte[BUFFER_SIZE];
         int len = getClient().receiver.recv(received, 0, BUFFER_SIZE);
 
@@ -196,6 +317,17 @@
         return delivery;
     }
 
+    private static String createMessageContent(int length) {
+        Random rand = new Random(System.currentTimeMillis());
+
+        byte[] payload = new byte[length];
+        for (int i = 0; i < length; i++) {
+            payload[i] = (byte) (64 + 1 + rand.nextInt(9));
+        }
+
+        return new String(payload, StandardCharsets.UTF_8);
+    }
+
     private Delivery sendMessageToClient(String deliveryTag, String messageContent, Integer messageFormat)
     {
         byte[] tag = deliveryTag.getBytes(StandardCharsets.UTF_8);
@@ -203,10 +335,11 @@
         Message m = Proton.message();
         m.setBody(new AmqpValue(messageContent));
 
+        final int BUFFER_SIZE = messageContent.length() * 4;
         byte[] encoded = new byte[BUFFER_SIZE];
         int len = m.encode(encoded, 0, BUFFER_SIZE);
 
-        assertTrue("given array was too small", len < BUFFER_SIZE);
+        assertTrue("given array was too small", len <= BUFFER_SIZE);
 
         Delivery serverDelivery = getServer().sender.delivery(tag);
 
diff --git a/proton-j/src/test/java/org/apache/qpid/proton/systemtests/EngineTestBase.java b/proton-j/src/test/java/org/apache/qpid/proton/systemtests/EngineTestBase.java
index 6868496..a17b9e0 100644
--- a/proton-j/src/test/java/org/apache/qpid/proton/systemtests/EngineTestBase.java
+++ b/proton-j/src/test/java/org/apache/qpid/proton/systemtests/EngineTestBase.java
@@ -82,6 +82,31 @@
         getServer().transport.outputConsumed();
     }
 
+    protected void pumpAllServerPendingToClient()
+    {
+        while (getServer().transport.pending() != 0)
+        {
+            ByteBuffer serverBuffer = getServer().transport.getOutputBuffer();
+
+            if (shouldLogPumpedBytes())
+            {
+                getTestLoggingHelper().prettyPrint("          <<<" + TestLoggingHelper.SERVER_PREFIX + " ", serverBuffer);
+            }
+            assertTrue("Server expected to produce some output", serverBuffer.hasRemaining());
+
+            ByteBuffer clientBuffer = getClient().transport.getInputBuffer();
+
+            clientBuffer.put(serverBuffer);
+
+            assertEquals("Client expected to consume all server's output", 0, serverBuffer.remaining());
+
+            getClient().transport.processInput().checkIsOk();
+            getServer().transport.outputConsumed();
+        }
+
+        assertEquals("Client expected to consume all server's pending work", 0, getServer().transport.pending());
+    }
+
     protected void pumpClientToServer()
     {
         ByteBuffer clientBuffer = getClient().transport.getOutputBuffer();
diff --git a/tests/performance-jmh/src/main/java/org/apache/qpid/proton/engine/impl/FrameWriterBenchmark.java b/tests/performance-jmh/src/main/java/org/apache/qpid/proton/engine/impl/FrameWriterBenchmark.java
index e91f945..69920fa 100644
--- a/tests/performance-jmh/src/main/java/org/apache/qpid/proton/engine/impl/FrameWriterBenchmark.java
+++ b/tests/performance-jmh/src/main/java/org/apache/qpid/proton/engine/impl/FrameWriterBenchmark.java
@@ -27,8 +27,6 @@
 import org.apache.qpid.proton.codec.DecoderImpl;
 import org.apache.qpid.proton.codec.EncoderImpl;
 import org.apache.qpid.proton.codec.ReadableBuffer;
-import org.apache.qpid.proton.engine.impl.FrameWriter;
-import org.apache.qpid.proton.engine.impl.TransportImpl;
 import org.openjdk.jmh.annotations.Benchmark;
 import org.openjdk.jmh.annotations.BenchmarkMode;
 import org.openjdk.jmh.annotations.Measurement;
@@ -90,7 +88,7 @@
         AMQPDefinedTypes.registerAllTypes(decoder, encoder);
 
         transport = (TransportImpl) Proton.transport();
-        frameWriter = new FrameWriter(encoder, 16 * 1024, (byte) 0, null, transport);
+        frameWriter = new FrameWriter(encoder, 16 * 1024, (byte) 0, transport);
 
         transfer = new Transfer();
         transfer.setDeliveryId(UnsignedInteger.ONE);