PROTON-1672 Handle multi-frame transfer payloads more efficiently

Replace reallocation and consolidation of transfer payloads when
multiple framed transfers are inbound.  Creates a
CompositeReadableBuffer that can be used to house the assembled payload
for use in the decoder. The decoder implementation refactored to handle
ReadableBuffer as the source of bytes as well as ByteBuffer.  Adds
no-copy method variants to the Sender and Receiver API such that clients
or servers can process inbound and outbound deliveries without copying
the payloads when it is known to be safe not to copy.

Adds tests and jacoco reports to validate test coverage.
diff --git a/pom.xml b/pom.xml
index ba6869b..945058f 100644
--- a/pom.xml
+++ b/pom.xml
@@ -42,6 +42,10 @@
 
     <!-- Plugin versions -->
     <maven-bundle-plugin-version>3.2.0</maven-bundle-plugin-version>
+    <jacoco-plugin-version>0.7.9</jacoco-plugin-version>
+
+    <!-- surefire forked jvm arguments -->
+    <argLine>-Xmx2g -enableassertions ${jacoco-config}</argLine>
   </properties>
 
   <dependencyManagement>
@@ -116,6 +120,20 @@
           </excludes>
         </configuration>
       </plugin>
+      <plugin>
+        <groupId>org.jacoco</groupId>
+        <artifactId>jacoco-maven-plugin</artifactId>
+        <executions>
+          <execution>
+            <goals>
+              <goal>prepare-agent</goal>
+            </goals>
+          </execution>
+        </executions>
+        <configuration>
+          <propertyName>jacoco-config</propertyName>
+        </configuration>
+      </plugin>
     </plugins>
     <pluginManagement>
       <plugins>
@@ -154,6 +172,11 @@
             <pushChanges>true</pushChanges>
           </configuration>
         </plugin>
+        <plugin>
+          <groupId>org.jacoco</groupId>
+          <artifactId>jacoco-maven-plugin</artifactId>
+          <version>${jacoco-plugin-version}</version>
+        </plugin>
       </plugins>
     </pluginManagement>
   </build>
@@ -181,6 +204,16 @@
     <url>https://builds.apache.org/view/M-R/view/Qpid/job/Qpid-proton-j/</url>
   </ciManagement>
 
+  <reporting>
+    <plugins>
+      <plugin>
+        <groupId>org.jacoco</groupId>
+        <artifactId>jacoco-maven-plugin</artifactId>
+        <version>${jacoco-plugin-version}</version>
+      </plugin>
+    </plugins>
+  </reporting>
+
   <profiles>
     <!-- Override the apache-release profile from the parent pom -->
     <profile>
diff --git a/proton-j/src/main/java/org/apache/qpid/proton/amqp/Binary.java b/proton-j/src/main/java/org/apache/qpid/proton/amqp/Binary.java
index aac3fc5..ab1bfe5 100644
--- a/proton-j/src/main/java/org/apache/qpid/proton/amqp/Binary.java
+++ b/proton-j/src/main/java/org/apache/qpid/proton/amqp/Binary.java
@@ -23,6 +23,8 @@
 import java.nio.ByteBuffer;
 import java.util.Collection;
 
+import org.apache.qpid.proton.codec.ReadableBuffer;
+
 public final class Binary
 {
 
@@ -167,7 +169,26 @@
         return new Binary(_data, _offset+offset, length);
     }
 
-    public static Binary create(ByteBuffer buffer) 
+    public static Binary create(ReadableBuffer buffer)
+    {
+        if (buffer == null)
+        {
+            return null;
+        }
+        else if (!buffer.hasArray())
+        {
+            byte data[] = new byte [buffer.remaining()];
+            ReadableBuffer dup = buffer.duplicate();
+            dup.get(data);
+            return new Binary(data);
+        }
+        else
+        {
+            return new Binary(buffer.array(), buffer.arrayOffset() + buffer.position(), buffer.remaining());
+        }
+    }
+
+    public static Binary create(ByteBuffer buffer)
     {
         if( buffer == null )
             return null;
@@ -178,7 +199,7 @@
             dup.get(data);
             return new Binary(data);
         }
-        else 
+        else
         {
             return new Binary(buffer.array(), buffer.arrayOffset()+buffer.position(), buffer.remaining());
         }
diff --git a/proton-j/src/main/java/org/apache/qpid/proton/codec/ArrayType.java b/proton-j/src/main/java/org/apache/qpid/proton/codec/ArrayType.java
index 32d6f85..f4f0c8a 100644
--- a/proton-j/src/main/java/org/apache/qpid/proton/codec/ArrayType.java
+++ b/proton-j/src/main/java/org/apache/qpid/proton/codec/ArrayType.java
@@ -21,7 +21,6 @@
 package org.apache.qpid.proton.codec;
 
 import java.lang.reflect.Array;
-import java.nio.ByteBuffer;
 import java.util.Arrays;
 import java.util.Collection;
 
@@ -48,7 +47,7 @@
         void writeValue(double[] a);
         void writeValue(char[] a);
 
-        void setValue(Object[] val, TypeEncoding encoder, int size);
+        void setValue(Object[] val, TypeEncoding<?> encoder, int size);
 
         int getSizeBytes();
 
@@ -92,7 +91,7 @@
 
     public ArrayEncoding getEncoding(final Object[] val)
     {
-        TypeEncoding encoder = calculateEncoder(val,_encoder);
+        TypeEncoding<?> encoder = calculateEncoder(val,_encoder);
         int size = calculateSize(val, encoder);
         ArrayEncoding arrayEncoding = (val.length > 255 || size > 254)
                                       ? _arrayEncoding
@@ -101,7 +100,7 @@
         return arrayEncoding;
     }
 
-    private static TypeEncoding calculateEncoder(final Object[] val, final EncoderImpl encoder)
+    private static TypeEncoding<?> calculateEncoder(final Object[] val, final EncoderImpl encoder)
     {
 
         if(val.length == 0)
@@ -156,7 +155,6 @@
             }
             else
             {
-
                 if(underlyingType == null)
                 {
                     checkTypes = true;
@@ -173,7 +171,6 @@
                                 .getType(val[i]) + " in array");
                     }
 
-
                     TypeEncoding elementEncoding = underlyingType.getEncoding(val[i]);
                     if(elementEncoding != underlyingEncoding && !underlyingEncoding.encodesSuperset(elementEncoding))
                     {
@@ -438,7 +435,6 @@
             extends LargeFloatingSizePrimitiveTypeEncoding<Object[]>
             implements ArrayEncoding
     {
-
         private Object[] _val;
         private TypeEncoding _underlyingEncoder;
         private int _size;
@@ -635,7 +631,7 @@
         public void skipValue()
         {
             DecoderImpl decoder = getDecoder();
-            ByteBuffer buffer = decoder.getByteBuffer();
+            ReadableBuffer buffer = decoder.getBuffer();
             int size = decoder.readRawInt();
             buffer.position(buffer.position() + size);
         }
@@ -900,7 +896,7 @@
         public void skipValue()
         {
             DecoderImpl decoder = getDecoder();
-            ByteBuffer buffer = decoder.getByteBuffer();
+            ReadableBuffer buffer = decoder.getBuffer();
             int size = ((int)decoder.readRawByte()) & 0xFF;
             buffer.position(buffer.position() + size);
         }
diff --git a/proton-j/src/main/java/org/apache/qpid/proton/codec/BinaryType.java b/proton-j/src/main/java/org/apache/qpid/proton/codec/BinaryType.java
index 1d739b8..be0a8f5 100644
--- a/proton-j/src/main/java/org/apache/qpid/proton/codec/BinaryType.java
+++ b/proton-j/src/main/java/org/apache/qpid/proton/codec/BinaryType.java
@@ -22,7 +22,6 @@
 
 import org.apache.qpid.proton.amqp.Binary;
 
-import java.nio.ByteBuffer;
 import java.util.Arrays;
 import java.util.Collection;
 
@@ -135,7 +134,7 @@
         public void skipValue()
         {
             DecoderImpl decoder = getDecoder();
-            ByteBuffer buffer = decoder.getByteBuffer();
+            ReadableBuffer buffer = decoder.getBuffer();
             int size = decoder.readRawInt();
             buffer.position(buffer.position() + size);
         }
@@ -190,7 +189,8 @@
 
         public void skipValue()
         {
-            ByteBuffer buffer = getDecoder().getByteBuffer();
+            DecoderImpl decoder = getDecoder();
+            ReadableBuffer buffer = decoder.getBuffer();
             int size = ((int)getDecoder().readRawByte()) & 0xff;
             buffer.position(buffer.position() + size);
         }
diff --git a/proton-j/src/main/java/org/apache/qpid/proton/codec/CompositeReadableBuffer.java b/proton-j/src/main/java/org/apache/qpid/proton/codec/CompositeReadableBuffer.java
new file mode 100644
index 0000000..e6652e4
--- /dev/null
+++ b/proton-j/src/main/java/org/apache/qpid/proton/codec/CompositeReadableBuffer.java
@@ -0,0 +1,744 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.qpid.proton.codec;
+
+import java.nio.BufferUnderflowException;
+import java.nio.ByteBuffer;
+import java.nio.CharBuffer;
+import java.nio.InvalidMarkException;
+import java.nio.charset.CharacterCodingException;
+import java.nio.charset.CharsetDecoder;
+import java.nio.charset.CoderResult;
+import java.nio.charset.StandardCharsets;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+
+/**
+ * ReadableBuffer implementation whose content is made up of one or more
+ * byte arrays.
+ */
+public class CompositeReadableBuffer implements ReadableBuffer {
+
+    private static final List<byte[]> EMPTY_LIST = Collections.unmodifiableList(new ArrayList<byte[]>());
+    private static final ByteBuffer EMPTY_BUFFER = ByteBuffer.wrap(new byte[0]);
+    private static final CompositeReadableBuffer EMPTY_SLICE = new CompositeReadableBuffer(true);
+    private static int UNSET_MARK = -1;
+
+    private ArrayList<byte[]> contents;
+
+    // Track active array and our offset into it.
+    private int currentArrayIndex = -1;
+    private byte[] currentArray;
+    private int currentOffset;
+
+    // State global to the buffer.
+    private int position;
+    private int limit;
+    private int capacity;
+    private int mark = -1;
+    private boolean compactable = true;
+
+    /**
+     * Creates a default empty composite buffer
+     */
+    public CompositeReadableBuffer() {
+    }
+
+    private CompositeReadableBuffer(byte[] array, int offset) {
+        this.currentArray = array;
+        this.currentOffset = offset;
+        this.capacity = array.length;
+        this.limit = capacity;
+    }
+
+    private CompositeReadableBuffer(boolean compactable) {
+        this.compactable = compactable;
+    }
+
+    public List<byte[]> getArrays() {
+        return contents == null ? EMPTY_LIST : Collections.unmodifiableList(contents);
+    }
+
+    public int getCurrentIndex() {
+        return currentArrayIndex;
+    }
+
+    @Override
+    public boolean hasArray() {
+        return currentArray != null && (contents == null || contents.size() == 1);
+    }
+
+    public int capacity() {
+        return capacity;
+    }
+
+    @Override
+    public byte[] array() {
+        if (hasArray()) {
+            return currentArray;
+        }
+
+        throw new UnsupportedOperationException("Buffer not backed by a single array");
+    }
+
+    @Override
+    public int arrayOffset() {
+        if (hasArray()) {
+            return currentOffset;
+        }
+
+        throw new UnsupportedOperationException("Buffer not backed by a single array");
+    }
+
+    @Override
+    public byte get() {
+        if (position == limit) {
+            throw new BufferUnderflowException();
+        }
+
+        final byte result = currentArray[currentOffset++];
+        position++;
+        maybeMoveToNextArray();
+
+        return result;
+    }
+
+    @Override
+    public byte get(int index) {
+        if (index < 0 || index >= limit) {
+            throw new IndexOutOfBoundsException("The given index is not valid: " + index);
+        }
+
+        byte result = 0;
+
+        if (index == position) {
+            result = currentArray[currentOffset];
+        } else if (index < position) {
+            result = getBackwards(index);
+        } else {
+            result = getForward(index);
+        }
+
+        return result;
+    }
+
+    private byte getForward(int index) {
+        byte result = 0;
+
+        int currentArrayIndex = this.currentArrayIndex;
+        int currentOffset = this.currentOffset;
+        byte[] currentArray = this.currentArray;
+
+        for (int amount = index - position; amount >= 0;) {
+            if (amount < currentArray.length - currentOffset) {
+                result = currentArray[currentOffset + amount];
+                break;
+            } else {
+                amount -= currentArray.length - currentOffset;
+                currentArray = contents.get(++currentArrayIndex);
+                currentOffset = 0;
+            }
+        }
+
+        return result;
+    }
+
+    private byte getBackwards(int index) {
+        byte result = 0;
+
+        int currentArrayIndex = this.currentArrayIndex;
+        int currentOffset = this.currentOffset;
+        byte[] currentArray = this.currentArray;
+
+        for (int amount = position - index; amount >= 0;) {
+            if ((currentOffset - amount) >= 0) {
+                result = currentArray[currentOffset - amount];
+                break;
+            } else {
+                amount -= currentOffset;
+                currentArray = contents.get(--currentArrayIndex);
+                currentOffset = currentArray.length;
+            }
+        }
+
+        return result;
+    }
+
+    @Override
+    public int getInt() {
+        if (remaining() < Integer.BYTES) {
+            throw new BufferUnderflowException();
+        }
+
+        int result = 0;
+
+        if (currentArray.length - currentOffset >= 4) {
+            result = (int)(currentArray[currentOffset++] & 0xFF) << 24 |
+                     (int)(currentArray[currentOffset++] & 0xFF) << 16 |
+                     (int)(currentArray[currentOffset++] & 0xFF) << 8 |
+                     (int)(currentArray[currentOffset++] & 0xFF) << 0;
+        } else {
+            for (int i = Integer.BYTES - 1; i >= 0; --i) {
+                result |= (int)(currentArray[currentOffset++] & 0xFF) << (i * Byte.SIZE);
+                maybeMoveToNextArray();
+            }
+        }
+
+        position += 4;
+
+        return result;
+    }
+
+    @Override
+    public long getLong() {
+        if (remaining() < Long.BYTES) {
+            throw new BufferUnderflowException();
+        }
+
+        long result = 0;
+
+        if (currentArray.length - currentOffset >= 8) {
+            result = (long)(currentArray[currentOffset++] & 0xFF) << 56 |
+                     (long)(currentArray[currentOffset++] & 0xFF) << 48 |
+                     (long)(currentArray[currentOffset++] & 0xFF) << 40 |
+                     (long)(currentArray[currentOffset++] & 0xFF) << 32 |
+                     (long)(currentArray[currentOffset++] & 0xFF) << 24 |
+                     (long)(currentArray[currentOffset++] & 0xFF) << 16 |
+                     (long)(currentArray[currentOffset++] & 0xFF) << 8 |
+                     (long)(currentArray[currentOffset++] & 0xFF) << 0;
+        } else {
+            for (int i = Long.BYTES - 1; i >= 0; --i) {
+                result |= (long)(currentArray[currentOffset++] & 0xFF) << (i * Byte.SIZE);
+                maybeMoveToNextArray();
+            }
+        }
+
+        position += 8;
+
+        return result;
+    }
+
+    @Override
+    public short getShort() {
+        if (remaining() < Short.BYTES) {
+            throw new BufferUnderflowException();
+        }
+
+        short result = 0;
+
+        for (int i = Short.BYTES - 1; i >= 0; --i) {
+            result |= (currentArray[currentOffset++] & 0xFF) << (i * Byte.SIZE);
+            maybeMoveToNextArray();
+        }
+
+        position += 2;
+
+        return result;
+    }
+
+    @Override
+    public float getFloat() {
+        return Float.intBitsToFloat(getInt());
+    }
+
+    @Override
+    public double getDouble() {
+        return Double.longBitsToDouble(getLong());
+    }
+
+    @Override
+    public CompositeReadableBuffer get(byte[] data) {
+        return get(data, 0, data.length);
+    }
+
+    @Override
+    public CompositeReadableBuffer get(byte[] data, int offset, int length) {
+        validateReadTarget(data.length, offset, length);
+
+        if (length > remaining()) {
+            throw new BufferUnderflowException();
+        }
+
+        int copied = 0;
+        while (length > 0) {
+            final int chunk = Math.min((currentArray.length - currentOffset), length);
+            System.arraycopy(currentArray, currentOffset, data, offset + copied, chunk);
+
+            currentOffset += chunk;
+            length -= chunk;
+            copied += chunk;
+
+            maybeMoveToNextArray();
+        }
+
+        position += copied;
+
+        return this;
+    }
+
+    @Override
+    public CompositeReadableBuffer get(WritableBuffer target) {
+        int length = Math.min(target.remaining(), remaining());
+
+        do {
+            final int chunk = Math.min((currentArray.length - currentOffset), length);
+
+            if (chunk == 0) {
+                break;  // This buffer is out of data
+            }
+
+            target.put(currentArray, currentOffset, chunk);
+
+            currentOffset += chunk;
+            position += chunk;
+            length -= chunk;
+
+            maybeMoveToNextArray();
+        } while (length > 0);
+
+        return this;
+    }
+
+    @Override
+    public CompositeReadableBuffer position(int position) {
+        if (position < 0 || position > limit) {
+            throw new IllegalArgumentException("position must be non-negative and no greater than the limit");
+        }
+
+        int moveBy = position - this.position;
+        if (moveBy >= 0) {
+            moveForward(moveBy);
+        } else {
+            moveBackwards(Math.abs(moveBy));
+        }
+
+        this.position = position;
+
+        if (mark > position) {
+            mark = UNSET_MARK;
+        }
+
+        return this;
+    }
+
+    private void moveForward(int moveBy) {
+        while (moveBy > 0) {
+            if (moveBy < currentArray.length - currentOffset) {
+                currentOffset += moveBy;
+                break;
+            } else {
+                moveBy -= currentArray.length - currentOffset;
+                if (currentArrayIndex != -1 && currentArrayIndex < contents.size() - 1) {
+                    currentArray = contents.get(++currentArrayIndex);
+                    currentOffset = 0;
+                } else {
+                    currentOffset = currentArray.length;
+                }
+            }
+        }
+    }
+
+    private void moveBackwards(int moveBy) {
+        while (moveBy > 0) {
+            if ((currentOffset - moveBy) >= 0) {
+                currentOffset -= moveBy;
+                break;
+            } else {
+                moveBy -= currentOffset;
+                currentArray = contents.get(--currentArrayIndex);
+                currentOffset = currentArray.length;
+            }
+        }
+    }
+
+    @Override
+    public int position() {
+        return position;
+    }
+
+    @Override
+    public CompositeReadableBuffer slice() {
+        int newCapacity = limit() - position();
+
+        final CompositeReadableBuffer result;
+
+        if (newCapacity == 0) {
+            result = EMPTY_SLICE;
+        } else {
+            result = new CompositeReadableBuffer(currentArray, currentOffset);
+            result.contents = contents;
+            result.currentArrayIndex = currentArrayIndex;
+            result.capacity = newCapacity;
+            result.limit = newCapacity;
+            result.position = 0;
+            result.compactable = false;
+        }
+
+        return result;
+    }
+
+    @Override
+    public CompositeReadableBuffer flip() {
+        limit = position;
+        position(0); // Move by index to avoid corrupting a slice.
+        mark = UNSET_MARK;
+
+        return this;
+    }
+
+    @Override
+    public CompositeReadableBuffer limit(int limit) {
+        if (limit < 0 || limit > capacity) {
+            throw new IllegalArgumentException("limit must be non-negative and no greater than the capacity");
+        }
+
+        if (mark > limit) {
+            mark = UNSET_MARK;
+        }
+
+        if (position > limit) {
+            position(limit);
+        }
+
+        this.limit = limit;
+
+        return this;
+    }
+
+    @Override
+    public int limit() {
+        return limit;
+    }
+
+    @Override
+    public CompositeReadableBuffer mark() {
+        this.mark = position;
+        return this;
+    }
+
+    @Override
+    public CompositeReadableBuffer reset() {
+        if (mark < 0) {
+            throw new InvalidMarkException();
+        }
+
+        position(mark);
+
+        return this;
+    }
+
+    @Override
+    public CompositeReadableBuffer rewind() {
+        return position(0);
+    }
+
+    @Override
+    public CompositeReadableBuffer clear() {
+        mark = UNSET_MARK;
+        limit = capacity;
+
+        return position(0);
+    }
+
+    @Override
+    public int remaining() {
+        return limit - position;
+    }
+
+    @Override
+    public boolean hasRemaining() {
+        return remaining() > 0;
+    }
+
+    @Override
+    public CompositeReadableBuffer duplicate() {
+        CompositeReadableBuffer duplicated =
+            new CompositeReadableBuffer(currentArray, currentOffset);
+
+        if (contents != null) {
+            duplicated.contents = new ArrayList<>(contents);
+        }
+
+        duplicated.capacity = capacity;
+        duplicated.currentArrayIndex = currentArrayIndex;
+        duplicated.limit = limit;
+        duplicated.position = position;
+        duplicated.mark = mark;
+        duplicated.compactable = compactable;   // A slice duplicated should not allow compaction.
+
+        return duplicated;
+    }
+
+    @Override
+    public ByteBuffer byteBuffer() {
+        int viewSpan = limit() - position();
+
+        final ByteBuffer result;
+
+        if (viewSpan == 0) {
+            result = EMPTY_BUFFER;
+        } else if (viewSpan <= currentArray.length - currentOffset) {
+            result = ByteBuffer.wrap(currentArray, currentOffset, viewSpan);
+        } else {
+            result = buildByteBuffer(viewSpan);
+        }
+
+        return result.asReadOnlyBuffer();
+    }
+
+    private ByteBuffer buildByteBuffer(int span) {
+        byte[] compactedView = new byte[span];
+        int arrayIndex = currentArrayIndex;
+
+        // Take whatever is left from the current array;
+        System.arraycopy(currentArray, currentOffset, compactedView, 0, currentArray.length - currentOffset);
+        int copied = currentArray.length - currentOffset;
+
+        while (copied < span) {
+            byte[] next = contents.get(++arrayIndex);
+            final int length = Math.min(span - copied, next.length);
+            System.arraycopy(next, 0, compactedView, copied, length);
+            copied += length;
+        }
+
+        return ByteBuffer.wrap(compactedView);
+    }
+
+    @Override
+    public String readUTF8() throws CharacterCodingException {
+        return readString(StandardCharsets.UTF_8.newDecoder());
+    }
+
+    @Override
+    public String readString(CharsetDecoder decoder) throws CharacterCodingException {
+        if (!hasRemaining()) {
+            return null;
+        }
+
+        CharBuffer decoded = null;
+
+        if (hasArray()) {
+            decoded = decoder.decode(ByteBuffer.wrap(currentArray, currentOffset, remaining()));
+        } else {
+            decoded = readStringFromComponents(decoder);
+        }
+
+        return decoded.toString();
+    }
+
+    private CharBuffer readStringFromComponents(CharsetDecoder decoder) throws CharacterCodingException {
+        int size = (int)(remaining() * decoder.averageCharsPerByte());
+        CharBuffer decoded = CharBuffer.allocate(size);
+
+        int arrayIndex = currentArrayIndex;
+        final int viewSpan = limit() - position();
+        int processed = Math.min(currentArray.length - currentOffset, viewSpan);
+        ByteBuffer wrapper = ByteBuffer.wrap(currentArray, currentOffset, processed);
+
+        CoderResult step = CoderResult.OVERFLOW;
+
+        do {
+            boolean endOfInput = processed == viewSpan;
+            step = decoder.decode(wrapper, decoded, endOfInput);
+            if (step.isUnderflow() && endOfInput) {
+                step = decoder.flush(decoded);
+                break;
+            }
+
+            if (step.isOverflow()) {
+                size = 2 * size + 1;
+                CharBuffer upsized = CharBuffer.allocate(size);
+                decoded.flip();
+                upsized.put(decoded);
+                decoded = upsized;
+                continue;
+            }
+
+            byte[] next = contents.get(++arrayIndex);
+            int wrapSize = Math.min(next.length, viewSpan - processed);
+            wrapper = ByteBuffer.wrap(next, 0, wrapSize);
+            processed += wrapSize;
+        } while (!step.isError());
+
+        if (step.isError()) {
+            step.throwException();
+        }
+
+        return (CharBuffer) decoded.flip();
+    }
+
+    /**
+     * Compact the buffer dropping arrays that have been consumed by previous
+     * reads from this Composite buffer.  The limit is reset to the new capacity
+     */
+    @Override
+    public CompositeReadableBuffer reclaimRead() {
+        if (!compactable || (currentArray == null && contents == null)) {
+            return this;
+        }
+
+        int totalCompaction = 0;
+        int totalRemovals = 0;
+
+        for (; totalRemovals < currentArrayIndex; ++totalRemovals) {
+            byte[] element = contents.remove(0);
+            totalCompaction += element.length;
+        }
+
+        currentArrayIndex -= totalRemovals;
+
+        if (currentArray.length == currentOffset) {
+            totalCompaction += currentArray.length;
+
+            // If we are sitting on the end of the data (length == offest) then
+            // we are also at the last element in the ArrayList if one is currently
+            // in use, so remove the data and release the list.
+            if (currentArrayIndex == 0) {
+                contents.clear();
+                contents = null;
+            }
+
+            currentArray = null;
+            currentArrayIndex = -1;
+            currentOffset = 0;
+        }
+
+        position -= totalCompaction;
+        limit = capacity -= totalCompaction;
+
+        if (mark != UNSET_MARK) {
+            mark -= totalCompaction;
+        }
+
+        return this;
+    }
+
+    /**
+     * Adds the given array into the composite buffer at the end.
+     * <p>
+     * The appended array is not copied so changes to the source array are visible in this
+     * buffer and vice versa.  If this composite was empty than it would return true for the
+     * {@link #hasArray()} method until another array is appended.
+     * <p>
+     * Calling this method resets the limit to the new capacity.
+     *
+     * @param array
+     *      The array to add to this composite buffer.
+     *
+     * @throws IllegalArgumentException if the array is null or zero size.
+     * @throws IllegalStateException if the buffer does not allow appends.
+     *
+     * @return a reference to this {@link CompositeReadableBuffer}.
+     */
+    public CompositeReadableBuffer append(byte[] array) {
+        if (!compactable) {
+            throw new IllegalStateException();
+        }
+
+        if (array == null || array.length == 0) {
+            throw new IllegalArgumentException("Array must not be empty or null");
+        }
+
+        if (currentArray == null) {
+            currentArray = array;
+            currentOffset = 0;
+        } else if (contents == null) {
+            contents = new ArrayList<>();
+            contents.add(currentArray);
+            contents.add(array);
+            currentArrayIndex = 0;
+        } else {
+            contents.add(array);
+        }
+
+        capacity += array.length;
+        limit = capacity;
+
+        return this;
+    }
+
+    @Override
+    public int hashCode() {
+        int hash = 1;
+
+        if (currentArrayIndex < 0) {
+            int span = limit() - position();
+            while (span > 0) {
+                hash = 31 * hash + currentArray[currentOffset + --span];
+            }
+        } else {
+            final int currentPos = position();
+            for (int i = limit() - 1; i >= currentPos; i--) {
+                hash = 31 * hash + (int)get(i);
+            }
+        }
+
+        return hash;
+    }
+
+    @Override
+    public boolean equals(Object other) {
+        if (this == other) {
+            return true;
+        }
+
+        if (!(other instanceof ReadableBuffer)) {
+            return false;
+        }
+
+        ReadableBuffer buffer = (ReadableBuffer)other;
+        if (this.remaining() != buffer.remaining()) {
+            return false;
+        }
+
+        final int currentPos = position();
+
+        for (int i = buffer.position(); hasRemaining(); i++) {
+            if (!equals(this.get(), buffer.get(i))) {
+                return false;
+            }
+        }
+
+        position(currentPos);
+
+        return true;
+    }
+
+    private static boolean equals(byte x, byte y) {
+        return x == y;
+    }
+
+    private void maybeMoveToNextArray() {
+        if (currentArray.length == currentOffset) {
+            if (currentArrayIndex >= 0 && currentArrayIndex < (contents.size() - 1)) {
+                currentArray = contents.get(++currentArrayIndex);
+                currentOffset = 0;
+            }
+        }
+    }
+
+    private static void validateReadTarget(int destSize, int offset, int length) {
+        if ((offset | length) < 0) {
+            throw new IndexOutOfBoundsException("offset and legnth must be non-negative");
+        }
+
+        if (((long) offset + (long) length) > destSize) {
+            throw new IndexOutOfBoundsException("target is to small for specified read size");
+        }
+    }
+}
diff --git a/proton-j/src/main/java/org/apache/qpid/proton/codec/CompositeWritableBuffer.java b/proton-j/src/main/java/org/apache/qpid/proton/codec/CompositeWritableBuffer.java
index 5786924..5b2c71c 100644
--- a/proton-j/src/main/java/org/apache/qpid/proton/codec/CompositeWritableBuffer.java
+++ b/proton-j/src/main/java/org/apache/qpid/proton/codec/CompositeWritableBuffer.java
@@ -188,4 +188,25 @@
     {
         return _first.toString() + " + "+_second.toString();
     }
+
+    @Override
+    public void put(ReadableBuffer payload) {
+        int firstRemaining = _first.remaining();
+        if(firstRemaining > 0)
+        {
+            if(firstRemaining >= payload.remaining())
+            {
+                _first.put(payload);
+                return;
+            }
+            else
+            {
+                int limit = payload.limit();
+                payload.limit(payload.position()+firstRemaining);
+                _first.put(payload);
+                payload.limit(limit);
+            }
+        }
+        _second.put(payload);
+    }
 }
diff --git a/proton-j/src/main/java/org/apache/qpid/proton/codec/DecoderImpl.java b/proton-j/src/main/java/org/apache/qpid/proton/codec/DecoderImpl.java
index c305916..2d4989b 100644
--- a/proton-j/src/main/java/org/apache/qpid/proton/codec/DecoderImpl.java
+++ b/proton-j/src/main/java/org/apache/qpid/proton/codec/DecoderImpl.java
@@ -32,7 +32,6 @@
 import org.apache.qpid.proton.amqp.UnsignedLong;
 import org.apache.qpid.proton.amqp.UnsignedShort;
 
-import java.io.IOException;
 import java.lang.reflect.Array;
 import java.nio.ByteBuffer;
 import java.nio.charset.CharsetDecoder;
@@ -41,7 +40,7 @@
 
 public class DecoderImpl implements ByteBufferDecoder
 {
-    private ByteBuffer _buffer;
+    private ReadableBuffer _buffer;
 
     private final CharsetDecoder _charsetDecoder = StandardCharsets.UTF_8.newDecoder();
 
@@ -58,7 +57,7 @@
 
     DecoderImpl(final ByteBuffer buffer)
     {
-        _buffer = buffer;
+        _buffer = new ReadableBuffer.ByteBufferReader(buffer);
     }
 
     public TypeConstructor<?> peekConstructor()
@@ -998,21 +997,30 @@
         _buffer.get(data, offset, length);
     }
 
-
     <V> V readRaw(TypeDecoder<V> decoder, int size)
     {
-        V decode = decoder.decode(this, (ByteBuffer) _buffer.slice().limit(size));
+        V decode = decoder.decode(this, _buffer.slice().limit(size));
         _buffer.position(_buffer.position()+size);
         return decode;
     }
 
     public void setByteBuffer(final ByteBuffer buffer)
     {
-        _buffer = buffer;
+        _buffer = new ReadableBuffer.ByteBufferReader(buffer);
     }
 
     public ByteBuffer getByteBuffer()
     {
+        return _buffer.byteBuffer();
+    }
+
+    public void setBuffer(final ReadableBuffer buffer)
+    {
+        _buffer = buffer;
+    }
+
+    public ReadableBuffer getBuffer()
+    {
         return _buffer;
     }
 
@@ -1023,7 +1031,7 @@
 
     interface TypeDecoder<V>
     {
-        V decode(DecoderImpl decoder, ByteBuffer buf);
+        V decode(DecoderImpl decoder, ReadableBuffer buf);
     }
 
     private static class UnknownDescribedType implements DescribedType
diff --git a/proton-j/src/main/java/org/apache/qpid/proton/codec/DroppingWritableBuffer.java b/proton-j/src/main/java/org/apache/qpid/proton/codec/DroppingWritableBuffer.java
index a6949b5..ade5a08 100644
--- a/proton-j/src/main/java/org/apache/qpid/proton/codec/DroppingWritableBuffer.java
+++ b/proton-j/src/main/java/org/apache/qpid/proton/codec/DroppingWritableBuffer.java
@@ -27,7 +27,7 @@
     private int _pos = 0;
 
     @Override
-    public boolean hasRemaining() 
+    public boolean hasRemaining()
     {
         return true;
     }
@@ -104,4 +104,10 @@
     {
         return Integer.MAX_VALUE;
     }
+
+    @Override
+    public void put(ReadableBuffer payload) {
+        _pos += payload.remaining();
+        payload.position(payload.limit());
+    }
 }
diff --git a/proton-j/src/main/java/org/apache/qpid/proton/codec/FixedSizePrimitiveTypeEncoding.java b/proton-j/src/main/java/org/apache/qpid/proton/codec/FixedSizePrimitiveTypeEncoding.java
index 7c055ae..872414a 100644
--- a/proton-j/src/main/java/org/apache/qpid/proton/codec/FixedSizePrimitiveTypeEncoding.java
+++ b/proton-j/src/main/java/org/apache/qpid/proton/codec/FixedSizePrimitiveTypeEncoding.java
@@ -40,7 +40,7 @@
 
     public final void skipValue()
     {
-        getDecoder().getByteBuffer().position(getDecoder().getByteBuffer().position() + getFixedSize());
+        getDecoder().getBuffer().position(getDecoder().getBuffer().position() + getFixedSize());
     }
 
     protected abstract int getFixedSize();
diff --git a/proton-j/src/main/java/org/apache/qpid/proton/codec/ListType.java b/proton-j/src/main/java/org/apache/qpid/proton/codec/ListType.java
index ba84141..a3ff4af 100644
--- a/proton-j/src/main/java/org/apache/qpid/proton/codec/ListType.java
+++ b/proton-j/src/main/java/org/apache/qpid/proton/codec/ListType.java
@@ -20,7 +20,6 @@
  */
 package org.apache.qpid.proton.codec;
 
-import java.nio.ByteBuffer;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collection;
@@ -151,7 +150,7 @@
         public List readValue()
         {
             DecoderImpl decoder = getDecoder();
-            ByteBuffer buffer = decoder.getByteBuffer();
+            ReadableBuffer buffer = decoder.getBuffer();
 
             int size = decoder.readRawInt();
             // todo - limit the decoder with size
@@ -228,7 +227,7 @@
         public void skipValue()
         {
             DecoderImpl decoder = getDecoder();
-            ByteBuffer buffer = decoder.getByteBuffer();
+            ReadableBuffer buffer = decoder.getBuffer();
             int size = decoder.readRawInt();
             buffer.position(buffer.position() + size);
         }
@@ -295,7 +294,7 @@
         public List readValue()
         {
             DecoderImpl decoder = getDecoder();
-            ByteBuffer buffer = decoder.getByteBuffer();
+            ReadableBuffer buffer = decoder.getBuffer();
 
             int size = ((int)decoder.readRawByte()) & 0xff;
             // todo - limit the decoder with size
@@ -367,7 +366,7 @@
         public void skipValue()
         {
             DecoderImpl decoder = getDecoder();
-            ByteBuffer buffer = decoder.getByteBuffer();
+            ReadableBuffer buffer = decoder.getBuffer();
             int size = ((int)decoder.readRawByte()) & 0xff;
             buffer.position(buffer.position() + size);
         }
diff --git a/proton-j/src/main/java/org/apache/qpid/proton/codec/MapType.java b/proton-j/src/main/java/org/apache/qpid/proton/codec/MapType.java
index 72d1bbd..b0e2330 100644
--- a/proton-j/src/main/java/org/apache/qpid/proton/codec/MapType.java
+++ b/proton-j/src/main/java/org/apache/qpid/proton/codec/MapType.java
@@ -20,7 +20,6 @@
  */
 package org.apache.qpid.proton.codec;
 
-import java.nio.ByteBuffer;
 import java.util.Arrays;
 import java.util.Collection;
 import java.util.Iterator;
@@ -33,7 +32,7 @@
     private final MapEncoding _shortMapEncoding;
     private EncoderImpl _encoder;
 
-    private AMQPType fixedKeyType;
+    private AMQPType<?> fixedKeyType;
 
     private static interface MapEncoding extends PrimitiveTypeEncoding<Map>
     {
@@ -95,7 +94,7 @@
         return len;
     }
 
-    private static TypeConstructor<?> findNextDecoder(DecoderImpl decoder, ByteBuffer buffer, TypeConstructor<?> previousConstructor)
+    private static TypeConstructor<?> findNextDecoder(DecoderImpl decoder, ReadableBuffer buffer, TypeConstructor<?> previousConstructor)
     {
         if (previousConstructor == null)
         {
@@ -205,7 +204,7 @@
         public Map readValue()
         {
             DecoderImpl decoder = getDecoder();
-            ByteBuffer buffer = decoder.getByteBuffer();
+            ReadableBuffer buffer = decoder.getBuffer();
 
             int size = decoder.readRawInt();
             // todo - limit the decoder with size
@@ -264,7 +263,7 @@
         public void skipValue()
         {
             DecoderImpl decoder = getDecoder();
-            ByteBuffer buffer = decoder.getByteBuffer();
+            ReadableBuffer buffer = decoder.getBuffer();
             int size = decoder.readRawInt();
             buffer.position(buffer.position() + size);
         }
@@ -343,7 +342,7 @@
         public Map readValue()
         {
             DecoderImpl decoder = getDecoder();
-            ByteBuffer buffer = decoder.getByteBuffer();
+            ReadableBuffer buffer = decoder.getBuffer();
 
             int size = (decoder.readRawByte()) & 0xff;
             // todo - limit the decoder with size
@@ -398,7 +397,7 @@
         public void skipValue()
         {
             DecoderImpl decoder = getDecoder();
-            ByteBuffer buffer = decoder.getByteBuffer();
+            ReadableBuffer buffer = decoder.getBuffer();
             int size = ((int)decoder.readRawByte()) & 0xff;
             buffer.position(buffer.position() + size);
         }
diff --git a/proton-j/src/main/java/org/apache/qpid/proton/codec/ReadableBuffer.java b/proton-j/src/main/java/org/apache/qpid/proton/codec/ReadableBuffer.java
index 1360d76..ea3ef17 100644
--- a/proton-j/src/main/java/org/apache/qpid/proton/codec/ReadableBuffer.java
+++ b/proton-j/src/main/java/org/apache/qpid/proton/codec/ReadableBuffer.java
@@ -18,59 +18,318 @@
  */
 package org.apache.qpid.proton.codec;
 
+import java.nio.BufferUnderflowException;
 import java.nio.ByteBuffer;
-import java.nio.CharBuffer;
-import java.nio.charset.Charset;
+import java.nio.InvalidMarkException;
+import java.nio.ReadOnlyBufferException;
+import java.nio.charset.CharacterCodingException;
+import java.nio.charset.CharsetDecoder;
+import java.nio.charset.StandardCharsets;
 
 /**
  * Interface to abstract a buffer, similar to {@link WritableBuffer}
  */
 public interface ReadableBuffer {
 
-    void put(ReadableBuffer other);
+    /**
+     * Returns the capacity of the backing buffer of this ReadableBuffer
+     * @return the capacity of the backing buffer of this ReadableBuffer
+     */
+    int capacity();
 
+    /**
+     * Returns true if this ReadableBuffer is backed by an array which can be
+     * accessed by the {@link #array()} and {@link #arrayOffset()} methods.
+     *
+     * @return true if the buffer is backed by a primitive array.
+     */
+    boolean hasArray();
+
+    /**
+     * Returns the primitive array that backs this buffer if one exists and the
+     * buffer is not read-only.  The caller should have checked the {@link #hasArray()}
+     * method before calling this method.
+     *
+     * @return the array that backs this buffer is available.
+     *
+     * @throws UnsupportedOperationException if this {@link ReadableBuffer} doesn't support array access.
+     * @throws ReadOnlyBufferException if the ReadableBuffer is read-only.
+     */
+    byte[] array();
+
+    /**
+     * Returns the offset into the backing array where data should be read from.  The caller
+     * should have checked the {@link #hasArray()} method before calling this method.
+     *
+     * @return the offset into the backing array to start reading from.
+     *
+     * @throws UnsupportedOperationException if this {@link ReadableBuffer} doesn't support array access.
+     * @throws ReadOnlyBufferException if the ReadableBuffer is read-only.
+     */
+    int arrayOffset();
+
+    /**
+     * Compact the backing storage of this ReadableBuffer, possibly freeing previously-read
+     * portions of pooled data or reducing the number of backing arrays if present.
+     * <p>
+     * This is an optional operation and care should be taken in its implementation.
+     *
+     * @return a reference to this buffer
+     */
+    ReadableBuffer reclaimRead();
+
+    /**
+     * Reads the byte at the current position and advances the position by 1.
+     *
+     * @return the byte at the current position.
+     *
+     * @throws BufferUnderflowException if the buffer position has reached the limit.
+     */
     byte get();
 
+    /**
+     * Reads the byte at the given index, the buffer position is not affected.
+     *
+     * @param index
+     *      The index in the buffer from which to read the byte.
+     *
+     * @return the byte value stored at the target index.
+     *
+     * @throws IndexOutOfBoundsException if the index is not in range for this buffer.
+     */
+    byte get(int index);
+
+    /**
+     * Reads four bytes from the buffer and returns them as an integer value.  The
+     * buffer position is advanced by four byes.
+     *
+     * @return and integer value composed of bytes read from the buffer.
+     *
+     * @throws BufferUnderflowException if the buffer position has reached the limit.
+     */
     int getInt();
 
+    /**
+     * Reads eight bytes from the buffer and returns them as an long value.  The
+     * buffer position is advanced by eight byes.
+     *
+     * @return and long value composed of bytes read from the buffer.
+     *
+     * @throws BufferUnderflowException if the buffer position has reached the limit.
+     */
     long getLong();
 
+    /**
+     * Reads two bytes from the buffer and returns them as an short value.  The
+     * buffer position is advanced by two byes.
+     *
+     * @return and short value composed of bytes read from the buffer.
+     *
+     * @throws BufferUnderflowException if the buffer position has reached the limit.
+     */
     short getShort();
 
+    /**
+     * Reads four bytes from the buffer and returns them as an float value.  The
+     * buffer position is advanced by four byes.
+     *
+     * @return and float value composed of bytes read from the buffer.
+     *
+     * @throws BufferUnderflowException if the buffer position has reached the limit.
+     */
     float getFloat();
 
+    /**
+     * Reads eight bytes from the buffer and returns them as an double value.  The
+     * buffer position is advanced by eight byes.
+     *
+     * @return and double value composed of bytes read from the buffer.
+     *
+     * @throws BufferUnderflowException if the buffer position has reached the limit.
+     */
     double getDouble();
 
-    ReadableBuffer get(final byte[] data, final int offset, final int length);
+    /**
+     * A bulk read method that copies bytes from this buffer into the target byte array.
+     *
+     * @param target
+     *      The byte array to copy bytes read from this buffer.
+     * @param offset
+     *      The offset into the given array where the copy starts.
+     * @param length
+     *      The number of bytes to copy into the target array.
+     *
+     * @return a reference to this ReadableBuffer instance.
+     *
+     * @throws BufferUnderflowException if the are less readable bytes than the array length.
+     * @throws IndexOutOfBoundsException if the offset or length values are invalid.
+     */
+    ReadableBuffer get(final byte[] target, final int offset, final int length);
 
-    ReadableBuffer get(final byte[] data);
+    /**
+     * A bulk read method that copies bytes from this buffer into the target byte array.
+     *
+     * @param target
+     *      The byte array to copy bytes read from this buffer.
+     *
+     * @return a reference to this ReadableBuffer instance.
+     *
+     * @throws BufferUnderflowException if the are less readable bytes than the array length.
+     */
+    ReadableBuffer get(final byte[] target);
 
-    ReadableBuffer position(int position);
+    /**
+     * Copy data from this buffer to the target buffer starting from the current
+     * position and continuing until either this buffer's remaining bytes are
+     * consumed or the target is full.
+     *
+     * @param target
+     *      The WritableBuffer to transfer this buffer's data to.
+     *
+     * @return a reference to this ReadableBuffer instance.
+     */
+    ReadableBuffer get(WritableBuffer target);
 
+    /**
+     * Creates a new ReadableBuffer instance that is a view of the readable portion of
+     * this buffer.  The position will be set to zero and the limit and the reported capacity
+     * will match the value returned by this buffer's {@link #remaining()} method, the mark
+     * will be undefined.
+     *
+     * @return a new ReadableBuffer that is a view of the readable portion of this buffer.
+     */
     ReadableBuffer slice();
 
+    /**
+     * Sets the buffer limit to the current position and the position is set to zero, the
+     * mark value reset to undefined.
+     *
+     * @return a reference to this {@link ReadableBuffer}.
+     */
     ReadableBuffer flip();
 
+    /**
+     * Sets the current read limit of this buffer to the given value.  If the buffer mark
+     * value is defined and is larger than the limit the mark will be discarded.  If the
+     * position is larger than the new limit it will be reset to the new limit.
+     *
+     * @param limit
+     *      The new read limit to set for this buffer.
+     *
+     * @return a reference to this {@link ReadableBuffer}.
+     *
+     * @throws IllegalArgumentException if the limit value is negative or greater than the capacity.
+     */
     ReadableBuffer limit(int limit);
 
+    /**
+     * @return the current value of this buffer's limit.
+     */
     int limit();
 
-    int remaining();
+    /**
+     * Sets the current position of this buffer to the given value.  If the buffer mark
+     * value is defined and is larger than the newly set position is must be discarded.
+     *
+     * @param position
+     *      The new position to set for this buffer.
+     *
+     * @return a reference to this {@link ReadableBuffer}.
+     *
+     * @throws IllegalArgumentException if the position value is negative or greater than the limit.
+     */
+    ReadableBuffer position(int position);
 
+    /**
+     * @return the current position from which the next read operation will start.
+     */
     int position();
 
+    /**
+     * Mark the current position of this buffer which can be returned to after a
+     * read operation by calling {@link #reset()}.
+     *
+     * @return a reference to this {@link ReadableBuffer}.
+     */
+    ReadableBuffer mark();
+
+    /**
+     * Reset the buffer's position to a previously marked value, the mark should remain
+     * set after calling this method.
+     *
+     * @return a reference to this {@link ReadableBuffer}.
+     *
+     * @throws InvalidMarkException if the mark value is undefined.
+     */
+    ReadableBuffer reset();
+
+    /**
+     * Resets the buffer position to zero and clears and previously set mark.
+     *
+     * @return a reference to this {@link ReadableBuffer}.
+     */
+    ReadableBuffer rewind();
+
+    /**
+     * Resets the buffer position to zero and sets the limit to the buffer capacity,
+     * the mark value is discarded if set.
+     *
+     * @return a reference to this {@link ReadableBuffer}.
+     */
+    ReadableBuffer clear();
+
+    /**
+     * @return the remaining number of readable bytes in this buffer.
+     */
+    int remaining();
+
+    /**
+     * @return true if there are readable bytes still remaining in this buffer.
+     */
     boolean hasRemaining();
 
+    /**
+     * Creates a duplicate {@link ReadableBuffer} to this instance.
+     * <p>
+     * The duplicated buffer will have the same position, limit and mark as this
+     * buffer.  The two buffers share the same backing data.
+     *
+     * @return a duplicate of this {@link ReadableBuffer}.
+     */
     ReadableBuffer duplicate();
 
+    /**
+     * @return a ByteBuffer view of the current readable portion of this buffer.
+     */
     ByteBuffer byteBuffer();
 
-    String readUTF8();
+    /**
+     * Reads a UTF-8 encoded String from the buffer starting the decode at the
+     * current position and reading until the limit is reached.  The position
+     * is advanced to the limit once this method returns.  If there is no bytes
+     * remaining in the buffer when this method is called a null is returned.
+     *
+     * @return a string decoded from the remaining bytes in this buffer.
+     *
+     * @throws CharacterCodingException if the encoding is invalid for any reason.
+     */
+    String readUTF8() throws CharacterCodingException;
+
+    /**
+     * Decodes a String from the buffer using the provided {@link CharsetDecoder}
+     * starting the decode at the current position and reading until the limit is
+     * reached.  The position is advanced to the limit once this method returns.
+     * If there is no bytes remaining in the buffer when this method is called a
+     * null is returned.
+     *
+     * @return a string decoded from the remaining bytes in this buffer.
+     *
+     * @throws CharacterCodingException if the encoding is invalid for any reason.
+     */
+    String readString(CharsetDecoder decoder) throws CharacterCodingException;
 
     final class ByteBufferReader implements ReadableBuffer {
 
-        private static final Charset Charset_UTF8 = Charset.forName("UTF-8");
-
         private ByteBuffer buffer;
 
         public static ByteBufferReader allocate(int size) {
@@ -78,16 +337,34 @@
             return new ByteBufferReader(allocated);
         }
 
+        public static ByteBufferReader wrap(ByteBuffer buffer) {
+            return new ByteBufferReader(buffer);
+        }
+
+        public static ByteBufferReader wrap(byte[] array) {
+            return new ByteBufferReader(ByteBuffer.wrap(array));
+        }
+
         public ByteBufferReader(ByteBuffer buffer) {
             this.buffer = buffer;
         }
 
         @Override
+        public int capacity() {
+            return buffer.capacity();
+        }
+
+        @Override
         public byte get() {
             return buffer.get();
         }
 
         @Override
+        public byte get(int index) {
+            return buffer.get(index);
+        }
+
+        @Override
         public int getInt() {
             return buffer.getInt();
         }
@@ -179,13 +456,95 @@
 
         @Override
         public String readUTF8() {
-            CharBuffer charBuf = Charset_UTF8.decode(buffer);
-            return charBuf.toString();
+            return StandardCharsets.UTF_8.decode(buffer).toString();
         }
 
         @Override
-        public void put(ReadableBuffer other) {
-            this.buffer.put(other.byteBuffer());
+        public String readString(CharsetDecoder decoder) throws CharacterCodingException {
+            return decoder.decode(buffer).toString();
+        }
+
+        @Override
+        public boolean hasArray() {
+            return buffer.hasArray();
+        }
+
+        @Override
+        public byte[] array() {
+            return buffer.array();
+        }
+
+        @Override
+        public int arrayOffset() {
+            return buffer.arrayOffset();
+        }
+
+        @Override
+        public ReadableBuffer reclaimRead() {
+            // Don't compact ByteBuffer due to the expense of the copy
+            return this;
+        }
+
+        @Override
+        public ReadableBuffer mark() {
+            buffer.mark();
+            return this;
+        }
+
+        @Override
+        public ReadableBuffer reset() {
+            buffer.reset();
+            return this;
+        }
+
+        @Override
+        public ReadableBuffer rewind() {
+            buffer.rewind();
+            return this;
+        }
+
+        @Override
+        public ReadableBuffer clear() {
+            buffer.clear();
+            return this;
+        }
+
+        @Override
+        public ReadableBuffer get(WritableBuffer target) {
+            target.put(buffer);
+            return this;
+        }
+
+        @Override
+        public String toString() {
+            return buffer.toString();
+        }
+
+        @Override
+        public int hashCode() {
+            return buffer.hashCode();
+        }
+
+        @Override
+        public boolean equals(Object other) {
+            if (this == other) {
+                return true;
+            }
+
+            if (!(other instanceof ReadableBuffer)) {
+                return false;
+            }
+
+            ReadableBuffer readable = (ReadableBuffer) other;
+            if (this.remaining() != readable.remaining()) {
+                return false;
+            }
+
+            if (other instanceof CompositeReadableBuffer) {
+                return other.equals(this);
+            }
+
+            return buffer.equals(readable.byteBuffer());
         }
     }
 }
\ No newline at end of file
diff --git a/proton-j/src/main/java/org/apache/qpid/proton/codec/StringType.java b/proton-j/src/main/java/org/apache/qpid/proton/codec/StringType.java
index 91476bc..dfc449c 100644
--- a/proton-j/src/main/java/org/apache/qpid/proton/codec/StringType.java
+++ b/proton-j/src/main/java/org/apache/qpid/proton/codec/StringType.java
@@ -20,7 +20,6 @@
  */
 package org.apache.qpid.proton.codec;
 
-import java.nio.ByteBuffer;
 import java.nio.charset.CharacterCodingException;
 import java.nio.charset.CharsetDecoder;
 import java.util.Arrays;
@@ -31,13 +30,12 @@
     private static final DecoderImpl.TypeDecoder<String> _stringCreator =
         new DecoderImpl.TypeDecoder<String>()
         {
-
-            public String decode(DecoderImpl decoder, final ByteBuffer buf)
+            public String decode(DecoderImpl decoder, final ReadableBuffer buffer)
             {
                 CharsetDecoder charsetDecoder = decoder.getCharsetDecoder();
                 try
                 {
-                    return decoder.getCharsetDecoder().decode(buf).toString();
+                    return buffer.readString(charsetDecoder);
                 }
                 catch (CharacterCodingException e)
                 {
@@ -50,7 +48,6 @@
             }
         };
 
-
     public static interface StringEncoding extends PrimitiveTypeEncoding<String>
     {
         void setValue(String val, int length);
@@ -106,7 +103,6 @@
         return len;
     }
 
-
     public StringEncoding getCanonicalEncoding()
     {
         return _stringEncoding;
@@ -121,11 +117,9 @@
             extends LargeFloatingSizePrimitiveTypeEncoding<String>
             implements StringEncoding
     {
-
         private String _value;
         private int _length;
 
-
         public AllStringEncoding(final EncoderImpl encoder, final DecoderImpl decoder)
         {
             super(encoder, decoder);
@@ -143,7 +137,6 @@
             return (val == _value) ? _length : calculateUTF8Length(val);
         }
 
-
         @Override
         public byte getEncodingCode()
         {
@@ -162,7 +155,6 @@
 
         public String readValue()
         {
-
             DecoderImpl decoder = getDecoder();
             int size = decoder.readRawInt();
             return decoder.readRaw(_stringCreator, size);
@@ -177,7 +169,7 @@
         public void skipValue()
         {
             DecoderImpl decoder = getDecoder();
-            ByteBuffer buffer = decoder.getByteBuffer();
+            ReadableBuffer buffer = decoder.getBuffer();
             int size = decoder.readRawInt();
             buffer.position(buffer.position() + size);
         }
@@ -187,7 +179,6 @@
             extends SmallFloatingSizePrimitiveTypeEncoding<String>
             implements StringEncoding
     {
-
         private String _value;
         private int _length;
 
@@ -196,7 +187,6 @@
             super(encoder, decoder);
         }
 
-
         @Override
         protected void writeEncodedValue(final String val)
         {
@@ -209,7 +199,6 @@
             return (val == _value) ? _length : calculateUTF8Length(val);
         }
 
-
         @Override
         public byte getEncodingCode()
         {
@@ -228,7 +217,6 @@
 
         public String readValue()
         {
-
             DecoderImpl decoder = getDecoder();
             int size = ((int)decoder.readRawByte()) & 0xff;
             return decoder.readRaw(_stringCreator, size);
@@ -243,10 +231,9 @@
         public void skipValue()
         {
             DecoderImpl decoder = getDecoder();
-            ByteBuffer buffer = decoder.getByteBuffer();
+            ReadableBuffer buffer = decoder.getBuffer();
             int size = ((int)decoder.readRawByte()) & 0xff;
             buffer.position(buffer.position() + size);
         }
     }
-
 }
diff --git a/proton-j/src/main/java/org/apache/qpid/proton/codec/SymbolType.java b/proton-j/src/main/java/org/apache/qpid/proton/codec/SymbolType.java
index e333e6a..00051ac 100644
--- a/proton-j/src/main/java/org/apache/qpid/proton/codec/SymbolType.java
+++ b/proton-j/src/main/java/org/apache/qpid/proton/codec/SymbolType.java
@@ -22,7 +22,6 @@
 
 import org.apache.qpid.proton.amqp.Symbol;
 
-import java.nio.ByteBuffer;
 import java.nio.charset.Charset;
 import java.util.Arrays;
 import java.util.Collection;
@@ -35,27 +34,27 @@
     private final SymbolEncoding _symbolEncoding;
     private final SymbolEncoding _shortSymbolEncoding;
 
-    private final Map<ByteBuffer, Symbol> _symbolCache = new HashMap<ByteBuffer, Symbol>();
+    private final Map<ReadableBuffer, Symbol> _symbolCache = new HashMap<ReadableBuffer, Symbol>();
     private DecoderImpl.TypeDecoder<Symbol> _symbolCreator =
-            new DecoderImpl.TypeDecoder<Symbol>()
+        new DecoderImpl.TypeDecoder<Symbol>()
+        {
+            @Override
+            public Symbol decode(DecoderImpl decoder, ReadableBuffer buffer)
             {
-                @Override
-                public Symbol decode(DecoderImpl decoder, ByteBuffer buf)
+                Symbol symbol = _symbolCache.get(buffer);
+                if (symbol == null)
                 {
-                    Symbol symbol = _symbolCache.get(buf);
-                    if(symbol == null)
-                    {
-                        byte[] bytes = new byte[buf.limit()];
-                        buf.get(bytes);
+                    byte[] bytes = new byte[buffer.limit()];
+                    buffer.get(bytes);
 
-                        String str = new String(bytes, ASCII_CHARSET);
-                        symbol = Symbol.getSymbol(str);
+                    String str = new String(bytes, ASCII_CHARSET);
+                    symbol = Symbol.getSymbol(str);
 
-                        _symbolCache.put(ByteBuffer.wrap(bytes), symbol);
-                    }
-                    return symbol;
+                    _symbolCache.put(ReadableBuffer.ByteBufferReader.wrap(bytes), symbol);
                 }
-            };
+                return symbol;
+            }
+        };
 
     public static interface SymbolEncoding extends PrimitiveTypeEncoding<Symbol>
     {
@@ -155,7 +154,7 @@
         public void skipValue()
         {
             DecoderImpl decoder = getDecoder();
-            ByteBuffer buffer = decoder.getByteBuffer();
+            ReadableBuffer buffer = decoder.getBuffer();
             int size = decoder.readRawInt();
             buffer.position(buffer.position() + size);
         }
@@ -210,7 +209,7 @@
         public void skipValue()
         {
             DecoderImpl decoder = getDecoder();
-            ByteBuffer buffer = decoder.getByteBuffer();
+            ReadableBuffer buffer = decoder.getBuffer();
             int size = ((int)decoder.readRawByte()) & 0xff;
             buffer.position(buffer.position() + size);
         }
diff --git a/proton-j/src/main/java/org/apache/qpid/proton/codec/WritableBuffer.java b/proton-j/src/main/java/org/apache/qpid/proton/codec/WritableBuffer.java
index 79676b3..67c8292 100644
--- a/proton-j/src/main/java/org/apache/qpid/proton/codec/WritableBuffer.java
+++ b/proton-j/src/main/java/org/apache/qpid/proton/codec/WritableBuffer.java
@@ -49,6 +49,8 @@
 
     void put(ByteBuffer payload);
 
+    void put(ReadableBuffer payload);
+
     int limit();
 
     class ByteBufferWrapper implements WritableBuffer
@@ -133,11 +135,27 @@
         }
 
         @Override
+        public void put(ReadableBuffer src)
+        {
+            src.get(this);
+        }
+
+        @Override
         public int limit()
         {
             return _buf.limit();
         }
 
+        public ByteBuffer byteBuffer()
+        {
+            return _buf;
+        }
+
+        public ReadableBuffer toReadableBuffer()
+        {
+            return ReadableBuffer.ByteBufferReader.wrap((ByteBuffer) _buf.duplicate().flip());
+        }
+
         @Override
         public String toString()
         {
@@ -149,5 +167,15 @@
             ByteBuffer allocated = ByteBuffer.allocate(size);
             return new ByteBufferWrapper(allocated);
         }
+
+        public static ByteBufferWrapper wrap(ByteBuffer buffer)
+        {
+            return new ByteBufferWrapper(buffer);
+        }
+
+        public static ByteBufferWrapper wrap(byte[] bytes)
+        {
+            return new ByteBufferWrapper(ByteBuffer.wrap(bytes));
+        }
     }
 }
diff --git a/proton-j/src/main/java/org/apache/qpid/proton/codec/messaging/FastPathAcceptedType.java b/proton-j/src/main/java/org/apache/qpid/proton/codec/messaging/FastPathAcceptedType.java
index 3624836..d9eb991 100644
--- a/proton-j/src/main/java/org/apache/qpid/proton/codec/messaging/FastPathAcceptedType.java
+++ b/proton-j/src/main/java/org/apache/qpid/proton/codec/messaging/FastPathAcceptedType.java
@@ -80,18 +80,18 @@
     @Override
     public Accepted readValue() {
         DecoderImpl decoder = getDecoder();
-        byte typeCode = decoder.getByteBuffer().get();
+        byte typeCode = decoder.getBuffer().get();
 
         switch (typeCode) {
             case EncodingCodes.LIST0:
                 break;
             case EncodingCodes.LIST8:
-                decoder.getByteBuffer().get();
-                decoder.getByteBuffer().get();
+                decoder.getBuffer().get();
+                decoder.getBuffer().get();
                 break;
             case EncodingCodes.LIST32:
-                decoder.getByteBuffer().getInt();
-                decoder.getByteBuffer().getInt();
+                decoder.getBuffer().getInt();
+                decoder.getBuffer().getInt();
                 break;
             default:
                 throw new DecodeException("Incorrect type found in Accepted type encoding: " + typeCode);
diff --git a/proton-j/src/main/java/org/apache/qpid/proton/codec/messaging/FastPathHeaderType.java b/proton-j/src/main/java/org/apache/qpid/proton/codec/messaging/FastPathHeaderType.java
index 189b360..06d8026 100644
--- a/proton-j/src/main/java/org/apache/qpid/proton/codec/messaging/FastPathHeaderType.java
+++ b/proton-j/src/main/java/org/apache/qpid/proton/codec/messaging/FastPathHeaderType.java
@@ -55,7 +55,7 @@
     @Override
     public Header readValue() {
         DecoderImpl decoder = getDecoder();
-        byte typeCode = decoder.getByteBuffer().get();
+        byte typeCode = decoder.getBuffer().get();
 
         @SuppressWarnings("unused")
         int size = 0;
@@ -65,12 +65,12 @@
             case EncodingCodes.LIST0:
                 break;
             case EncodingCodes.LIST8:
-                size = ((int)decoder.getByteBuffer().get()) & 0xff;
-                count = ((int)decoder.getByteBuffer().get()) & 0xff;
+                size = ((int)decoder.getBuffer().get()) & 0xff;
+                count = ((int)decoder.getBuffer().get()) & 0xff;
                 break;
             case EncodingCodes.LIST32:
-                size = decoder.getByteBuffer().getInt();
-                count = decoder.getByteBuffer().getInt();
+                size = decoder.getBuffer().getInt();
+                count = decoder.getBuffer().getInt();
                 break;
             default:
                 throw new DecodeException("Incorrect type found in Header encoding: " + typeCode);
diff --git a/proton-j/src/main/java/org/apache/qpid/proton/codec/messaging/FastPathPropertiesType.java b/proton-j/src/main/java/org/apache/qpid/proton/codec/messaging/FastPathPropertiesType.java
index e3caca5..e071ea9 100644
--- a/proton-j/src/main/java/org/apache/qpid/proton/codec/messaging/FastPathPropertiesType.java
+++ b/proton-j/src/main/java/org/apache/qpid/proton/codec/messaging/FastPathPropertiesType.java
@@ -55,7 +55,7 @@
     @Override
     public Properties readValue() {
         DecoderImpl decoder = getDecoder();
-        byte typeCode = decoder.getByteBuffer().get();
+        byte typeCode = decoder.getBuffer().get();
 
         @SuppressWarnings("unused")
         int size = 0;
@@ -65,12 +65,12 @@
             case EncodingCodes.LIST0:
                 break;
             case EncodingCodes.LIST8:
-                size = ((int)decoder.getByteBuffer().get()) & 0xff;
-                count = ((int)decoder.getByteBuffer().get()) & 0xff;
+                size = ((int)decoder.getBuffer().get()) & 0xff;
+                count = ((int)decoder.getBuffer().get()) & 0xff;
                 break;
             case EncodingCodes.LIST32:
-                size = decoder.getByteBuffer().getInt();
-                count = decoder.getByteBuffer().getInt();
+                size = decoder.getBuffer().getInt();
+                count = decoder.getBuffer().getInt();
                 break;
             default:
                 throw new DecodeException("Incorrect type found in Properties encoding: " + typeCode);
diff --git a/proton-j/src/main/java/org/apache/qpid/proton/codec/transport/FastPathDispositionType.java b/proton-j/src/main/java/org/apache/qpid/proton/codec/transport/FastPathDispositionType.java
index 01e18e7..c329aa7 100644
--- a/proton-j/src/main/java/org/apache/qpid/proton/codec/transport/FastPathDispositionType.java
+++ b/proton-j/src/main/java/org/apache/qpid/proton/codec/transport/FastPathDispositionType.java
@@ -84,7 +84,7 @@
     @Override
     public Disposition readValue() {
         DecoderImpl decoder = getDecoder();
-        byte typeCode = decoder.getByteBuffer().get();
+        byte typeCode = decoder.getBuffer().get();
 
         @SuppressWarnings("unused")
         int size = 0;
@@ -95,12 +95,12 @@
                 // TODO - Technically invalid however old decoder also allowed this.
                 break;
             case EncodingCodes.LIST8:
-                size = ((int)decoder.getByteBuffer().get()) & 0xff;
-                count = ((int)decoder.getByteBuffer().get()) & 0xff;
+                size = ((int)decoder.getBuffer().get()) & 0xff;
+                count = ((int)decoder.getBuffer().get()) & 0xff;
                 break;
             case EncodingCodes.LIST32:
-                size = decoder.getByteBuffer().getInt();
-                count = decoder.getByteBuffer().getInt();
+                size = decoder.getBuffer().getInt();
+                count = decoder.getBuffer().getInt();
                 break;
             default:
                 throw new DecodeException("Incorrect type found in Disposition encoding: " + typeCode);
diff --git a/proton-j/src/main/java/org/apache/qpid/proton/codec/transport/FastPathFlowType.java b/proton-j/src/main/java/org/apache/qpid/proton/codec/transport/FastPathFlowType.java
index 78abc5c..6f500be 100644
--- a/proton-j/src/main/java/org/apache/qpid/proton/codec/transport/FastPathFlowType.java
+++ b/proton-j/src/main/java/org/apache/qpid/proton/codec/transport/FastPathFlowType.java
@@ -80,7 +80,7 @@
     @Override
     public Flow readValue() {
         DecoderImpl decoder = getDecoder();
-        byte typeCode = decoder.getByteBuffer().get();
+        byte typeCode = decoder.getBuffer().get();
 
         @SuppressWarnings("unused")
         int size = 0;
@@ -91,12 +91,12 @@
                 // TODO - Technically invalid however old decoder also allowed this.
                 break;
             case EncodingCodes.LIST8:
-                size = ((int)decoder.getByteBuffer().get()) & 0xff;
-                count = ((int)decoder.getByteBuffer().get()) & 0xff;
+                size = ((int)decoder.getBuffer().get()) & 0xff;
+                count = ((int)decoder.getBuffer().get()) & 0xff;
                 break;
             case EncodingCodes.LIST32:
-                size = decoder.getByteBuffer().getInt();
-                count = decoder.getByteBuffer().getInt();
+                size = decoder.getBuffer().getInt();
+                count = decoder.getBuffer().getInt();
                 break;
             default:
                 throw new DecodeException("Incorrect type found in Flow encoding: " + typeCode);
diff --git a/proton-j/src/main/java/org/apache/qpid/proton/codec/transport/FastPathTransferType.java b/proton-j/src/main/java/org/apache/qpid/proton/codec/transport/FastPathTransferType.java
index 685890a..79842db 100644
--- a/proton-j/src/main/java/org/apache/qpid/proton/codec/transport/FastPathTransferType.java
+++ b/proton-j/src/main/java/org/apache/qpid/proton/codec/transport/FastPathTransferType.java
@@ -61,7 +61,7 @@
     @Override
     public Transfer readValue() {
         DecoderImpl decoder = getDecoder();
-        byte typeCode = decoder.getByteBuffer().get();
+        byte typeCode = decoder.getBuffer().get();
 
         @SuppressWarnings("unused")
         int size = 0;
@@ -72,12 +72,12 @@
                 // TODO - Technically invalid however old decoder also allowed this.
                 break;
             case EncodingCodes.LIST8:
-                size = ((int)decoder.getByteBuffer().get()) & 0xff;
-                count = ((int)decoder.getByteBuffer().get()) & 0xff;
+                size = ((int)decoder.getBuffer().get()) & 0xff;
+                count = ((int)decoder.getBuffer().get()) & 0xff;
                 break;
             case EncodingCodes.LIST32:
-                size = decoder.getByteBuffer().getInt();
-                count = decoder.getByteBuffer().getInt();
+                size = decoder.getBuffer().getInt();
+                count = decoder.getBuffer().getInt();
                 break;
             default:
                 throw new DecodeException("Incorrect type found in Transfer encoding: " + typeCode);
diff --git a/proton-j/src/main/java/org/apache/qpid/proton/engine/Receiver.java b/proton-j/src/main/java/org/apache/qpid/proton/engine/Receiver.java
index f9d718f..fea4361 100644
--- a/proton-j/src/main/java/org/apache/qpid/proton/engine/Receiver.java
+++ b/proton-j/src/main/java/org/apache/qpid/proton/engine/Receiver.java
@@ -20,6 +20,7 @@
  */
 package org.apache.qpid.proton.engine;
 
+import org.apache.qpid.proton.codec.ReadableBuffer;
 import org.apache.qpid.proton.codec.WritableBuffer;
 
 /**
@@ -69,6 +70,17 @@
      */
     public int recv(WritableBuffer buffer);
 
+    /**
+     * Receive message data for the current delivery returning the data in a Readable buffer.
+     *
+     * The delivery will return an empty buffer if there is no pending data to be read or if all
+     * data has been read either by a previous call to this method or by a call to one of the other
+     * receive methods.
+     *
+     * @return a ReadableBuffer that contains the currently available data for the current delivery.
+     */
+    public ReadableBuffer recv();
+
     public void drain(int credit);
 
     /**
diff --git a/proton-j/src/main/java/org/apache/qpid/proton/engine/Sender.java b/proton-j/src/main/java/org/apache/qpid/proton/engine/Sender.java
index 159d5c3..fdb2552 100644
--- a/proton-j/src/main/java/org/apache/qpid/proton/engine/Sender.java
+++ b/proton-j/src/main/java/org/apache/qpid/proton/engine/Sender.java
@@ -64,6 +64,22 @@
     public int send(ReadableBuffer buffer);
 
     /**
+     * Sends data to the current delivery attempting not to copy the data unless a previous
+     * send has already added data to the Delivery in which case a copy may occur depending on
+     * the implementation.
+     * <p>
+     * Care should be taken when passing ReadableBuffer instances that wrapped pooled bytes
+     * as the send does not mean the data will be sent immediately when the transport is
+     * flushed so the pooled bytes could be held for longer than expected.
+     *
+     * @param buffer
+     *      An immutable ReadableBuffer that can be held until the next transport flush.
+     *
+     * @return the number of bytes read from the provided buffer.
+     */
+    public int sendNoCopy(ReadableBuffer buffer);
+
+    /**
      * Abort the current delivery.
      *
      * Note "pn_link_abort" is commented out in the .h
diff --git a/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/DeliveryImpl.java b/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/DeliveryImpl.java
index 61f8ec8..e0b82b2 100644
--- a/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/DeliveryImpl.java
+++ b/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/DeliveryImpl.java
@@ -22,7 +22,9 @@
 
 import java.util.Arrays;
 
+import org.apache.qpid.proton.amqp.Binary;
 import org.apache.qpid.proton.amqp.transport.DeliveryState;
+import org.apache.qpid.proton.codec.CompositeReadableBuffer;
 import org.apache.qpid.proton.codec.ReadableBuffer;
 import org.apache.qpid.proton.codec.WritableBuffer;
 import org.apache.qpid.proton.engine.Delivery;
@@ -33,6 +35,8 @@
 {
     public static final int DEFAULT_MESSAGE_FORMAT = 0;
 
+    private static final ReadableBuffer EMPTY_BUFFER = ReadableBuffer.ByteBufferReader.allocate(0);
+
     private DeliveryImpl _linkPrevious;
     private DeliveryImpl _linkNext;
 
@@ -63,12 +67,12 @@
     private int _flags = (byte) 0;
 
     private TransportDelivery _transportDelivery;
-    private byte[] _data;
-    private int _dataSize;
     private boolean _complete;
     private boolean _updated;
     private boolean _done;
-    private int _offset;
+
+    private CompositeReadableBuffer _dataBuffer;
+    private ReadableBuffer _dataView;
 
     DeliveryImpl(final byte[] tag, final LinkImpl link, DeliveryImpl previous)
     {
@@ -76,7 +80,7 @@
         _link = link;
         _link.incrementUnsettled();
         _linkPrevious = previous;
-        if(previous != null)
+        if (previous != null)
         {
             previous._linkNext = this;
         }
@@ -212,7 +216,6 @@
         return _workPrev;
     }
 
-
     void setWorkNext(DeliveryImpl workNext)
     {
         _workNext = workNext;
@@ -226,41 +229,53 @@
     int recv(final byte[] bytes, int offset, int size)
     {
         final int consumed;
-        if (_data != null)
+        if (_dataBuffer != null && _dataBuffer.hasRemaining())
         {
-            //TODO - should only be if no bytes left
-            consumed = Math.min(size, _dataSize);
+            consumed = Math.min(size, _dataBuffer.remaining());
 
-            System.arraycopy(_data, _offset, bytes, offset, consumed);
-            _offset += consumed;
-            _dataSize -= consumed;
+            _dataBuffer.get(bytes, offset, consumed);
+            _dataBuffer.reclaimRead();
         }
         else
         {
-            _dataSize = consumed = 0;
+            consumed = 0;
         }
 
         return (_complete && consumed == 0) ? Transport.END_OF_STREAM : consumed;  //TODO - Implement
     }
 
-    int recv(final WritableBuffer buffer) {
+    int recv(final WritableBuffer buffer)
+    {
         final int consumed;
-        if (_data != null)
+        if (_dataBuffer != null && _dataBuffer.hasRemaining())
         {
-            consumed = Math.min(buffer.remaining(), _dataSize);
-
-            buffer.put(_data, _offset, consumed);
-            _offset += consumed;
-            _dataSize -= consumed;
+            consumed = Math.min(buffer.remaining(), _dataBuffer.remaining());
+            buffer.put(_dataBuffer);
+            _dataBuffer.reclaimRead();
         }
         else
         {
-            _dataSize = consumed = 0;
+            consumed = 0;
         }
 
         return (_complete && consumed == 0) ? Transport.END_OF_STREAM : consumed;
     }
 
+    ReadableBuffer recv()
+    {
+        ReadableBuffer result = _dataView;
+        if (_dataView != null)
+        {
+            _dataView = _dataBuffer = null;
+        }
+        else
+        {
+            result = EMPTY_BUFFER;
+        }
+
+        return result;
+    }
+
     void updateWork()
     {
         getLink().getConnectionImpl().workUpdate(this);
@@ -278,13 +293,11 @@
         getLink().getConnectionImpl().addTransportWork(this);
     }
 
-
     DeliveryImpl getTransportWorkNext()
     {
         return _transportWorkNext;
     }
 
-
     DeliveryImpl getTransportWorkPrev()
     {
         return _transportWorkPrev;
@@ -318,78 +331,132 @@
 
     int send(byte[] bytes, int offset, int length)
     {
-        if(_data == null)
-        {
-            _data = new byte[length];
-        }
-        else if(_data.length - _dataSize < length)
-        {
-            byte[] oldData = _data;
-            _data = new byte[oldData.length + _dataSize];
-            System.arraycopy(oldData, _offset, _data, 0, _dataSize);
-            _offset = 0;
-        }
-        System.arraycopy(bytes, offset, _data, _dataSize + _offset, length);
-        _dataSize += length;
+        byte[] copy = new byte[length];
+        System.arraycopy(bytes, offset, copy, 0, length);
+        getOrCreateDataBuffer().append(copy);
         addToTransportWorkList();
-        return length;  //TODO - Implement.
+        return length;
     }
 
     int send(final ReadableBuffer buffer)
     {
         int length = buffer.remaining();
-
-        if(_data == null)
-        {
-            _data = new byte[length];
-        }
-        else if(_data.length - _dataSize < length)
-        {
-            byte[] oldData = _data;
-            _data = new byte[oldData.length + _dataSize];
-            System.arraycopy(oldData, _offset, _data, 0, _dataSize);
-            _offset = 0;
-        }
-        buffer.get(_data, _offset, length);
-        _dataSize+=length;
+        getOrCreateDataBuffer().append(copyContents(buffer));
         addToTransportWorkList();
         return length;
     }
 
-    byte[] getData()
+    int sendNoCopy(ReadableBuffer buffer)
     {
-        return _data;
+        int length = buffer.remaining();
+
+        if (_dataView == null || !_dataView.hasRemaining())
+        {
+            _dataView = buffer;
+        }
+        else
+        {
+            consolidateSendBuffers(buffer);
+        }
+
+        addToTransportWorkList();
+        return length;
     }
 
-    int getDataOffset()
+    private byte[] copyContents(ReadableBuffer buffer)
     {
-        return _offset;
+        byte[] copy = new byte[buffer.remaining()];
+
+        if (buffer.hasArray())
+        {
+            System.arraycopy(buffer.array(), buffer.arrayOffset(), copy, 0, buffer.remaining());
+            buffer.position(buffer.limit());
+        }
+        else
+        {
+            buffer.get(copy, 0, buffer.remaining());
+        }
+
+        return copy;
+    }
+
+    private void consolidateSendBuffers(ReadableBuffer buffer)
+    {
+        if (_dataView == _dataBuffer)
+        {
+            getOrCreateDataBuffer().append(copyContents(buffer));
+        }
+        else
+        {
+            ReadableBuffer oldView = _dataView;
+
+            CompositeReadableBuffer dataBuffer = getOrCreateDataBuffer();
+            dataBuffer.append(copyContents(oldView));
+            dataBuffer.append(copyContents(buffer));
+
+            oldView.reclaimRead();
+        }
+
+        buffer.reclaimRead();  // A pooled buffer could release now.
+    }
+
+    void append(Binary payload)
+    {
+        byte[] data = payload.getArray();
+
+        // The Composite buffer cannot handle composites where the array
+        // is a view of a larger array so we must copy the payload into
+        // an array of the exact size
+        if (payload.getArrayOffset() > 0 || payload.getLength() < data.length)
+        {
+            data = new byte[payload.getLength()];
+            System.arraycopy(payload.getArray(), payload.getArrayOffset(), data, 0, payload.getLength());
+        }
+
+        getOrCreateDataBuffer().append(data);
+    }
+
+    private CompositeReadableBuffer getOrCreateDataBuffer()
+    {
+        if (_dataBuffer == null)
+        {
+            _dataView = _dataBuffer = new CompositeReadableBuffer();
+        }
+
+        return _dataBuffer;
+    }
+
+    void append(byte[] data)
+    {
+        getOrCreateDataBuffer().append(data);
+    }
+
+    void afterSend()
+    {
+        if (_dataView != null)
+        {
+            _dataView.reclaimRead();
+            if (!_dataView.hasRemaining())
+            {
+                _dataView = _dataBuffer;
+            }
+        }
+    }
+
+    ReadableBuffer getData()
+    {
+        return _dataView == null ? EMPTY_BUFFER : _dataView;
     }
 
     int getDataLength()
     {
-        return _dataSize;  //TODO - Implement.
-    }
-
-    void setData(byte[] data)
-    {
-        _data = data;
-    }
-
-    void setDataLength(int length)
-    {
-        _dataSize = length;
-    }
-
-    public void setDataOffset(int arrayOffset)
-    {
-        _offset = arrayOffset;
+        return _dataView == null ? 0 : _dataView.remaining();
     }
 
     @Override
     public int available()
     {
-        return _dataSize;
+        return _dataView == null ? 0 : _dataView.remaining();
     }
 
     @Override
@@ -437,7 +504,6 @@
         getLink().getConnectionImpl().workUpdate(this);
     }
 
-
     void setDone()
     {
         _done = true;
@@ -462,7 +528,12 @@
             if (isDone()) {
                 return false;
             } else {
-                return _complete || _dataSize > 0;
+                boolean hasRemaining = false;
+                if (_dataView != null) {
+                    hasRemaining = _dataView.hasRemaining();
+                }
+
+                return _complete || hasRemaining;
             }
         } else {
             return false;
@@ -505,18 +576,18 @@
             .append(", _flags=").append(_flags)
             .append(", _defaultDeliveryState=").append(_defaultDeliveryState)
             .append(", _transportDelivery=").append(_transportDelivery)
-            .append(", _dataSize=").append(_dataSize)
+            .append(", _data Size=").append(getDataLength())
             .append(", _complete=").append(_complete)
             .append(", _updated=").append(_updated)
             .append(", _done=").append(_done)
-            .append(", _offset=").append(_offset).append("]");
+            .append("]");
         return builder.toString();
     }
 
     @Override
     public int pending()
     {
-        return _dataSize;
+        return _dataView == null ? 0 : _dataView.remaining();
     }
 
     @Override
@@ -530,5 +601,4 @@
     {
         return _defaultDeliveryState;
     }
-
 }
diff --git a/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/FrameWriter.java b/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/FrameWriter.java
index eb16624..abf4ba9 100644
--- a/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/FrameWriter.java
+++ b/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/FrameWriter.java
@@ -24,6 +24,7 @@
 import org.apache.qpid.proton.amqp.transport.EmptyFrame;
 import org.apache.qpid.proton.amqp.transport.FrameBody;
 import org.apache.qpid.proton.codec.EncoderImpl;
+import org.apache.qpid.proton.codec.ReadableBuffer;
 import org.apache.qpid.proton.codec.WritableBuffer;
 import org.apache.qpid.proton.framing.TransportFrame;
 
@@ -96,7 +97,7 @@
         _frameStart = _buffer.position();
     }
 
-    private void writePerformative(Object frameBody, ByteBuffer payload, Runnable onPayloadTooLarge)
+    private void writePerformative(Object frameBody, ReadableBuffer payload, Runnable onPayloadTooLarge)
     {
         while (_buffer.remaining() < 8) {
             grow();
@@ -146,7 +147,7 @@
         _buffer.position(limit);
     }
 
-    void writeFrame(int channel, Object frameBody, ByteBuffer payload,
+    void writeFrame(int channel, Object frameBody, ReadableBuffer payload,
                     Runnable onPayloadTooLarge)
     {
         startFrame();
@@ -162,7 +163,7 @@
         int payloadSize = Math.min(payload == null ? 0 : payload.remaining(), capacity);
 
         ProtocolTracer tracer = _protocolTracer == null ? null : _protocolTracer.get();
-        if(tracer != null || _transport.isTraceFramesEnabled())
+        if (tracer != null || _transport.isTraceFramesEnabled())
         {
             logFrame(tracer, channel, frameBody, payload, payloadSize);
         }
@@ -185,13 +186,11 @@
         _framesOutput += 1;
     }
 
-    private void logFrame(ProtocolTracer tracer, int channel, Object frameBody, ByteBuffer payload, int payloadSize)
+    private void logFrame(ProtocolTracer tracer, int channel, Object frameBody, ReadableBuffer payload, int payloadSize)
     {
-        // XXX: this is a bit of a hack but it eliminates duplicate
-        // code, further refactor will fix this
         if (_frameType == AMQP_FRAME_TYPE)
         {
-            ByteBuffer originalPayload = null;
+            ReadableBuffer originalPayload = null;
             if (payload!=null)
             {
                 originalPayload = payload.duplicate();
diff --git a/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/ReceiverImpl.java b/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/ReceiverImpl.java
index 6f86700..337f847 100644
--- a/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/ReceiverImpl.java
+++ b/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/ReceiverImpl.java
@@ -21,6 +21,7 @@
 package org.apache.qpid.proton.engine.impl;
 
 import org.apache.qpid.proton.amqp.UnsignedInteger;
+import org.apache.qpid.proton.codec.ReadableBuffer;
 import org.apache.qpid.proton.codec.WritableBuffer;
 import org.apache.qpid.proton.engine.Receiver;
 
@@ -114,6 +115,23 @@
     }
 
     @Override
+    public ReadableBuffer recv()
+    {
+        if (_current == null) {
+            throw new IllegalStateException("no current delivery");
+        }
+
+        ReadableBuffer consumed = _current.recv();
+        if (consumed.remaining() > 0) {
+            getSession().incrementIncomingBytes(-consumed.remaining());
+            if (getSession().getTransportSession().getIncomingWindowSize().equals(UnsignedInteger.ZERO)) {
+                modified();
+            }
+        }
+        return consumed;
+    }
+
+    @Override
     void doFree()
     {
         getSession().freeReceiver(this);
diff --git a/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/SenderImpl.java b/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/SenderImpl.java
index f418655..afb1b2e 100644
--- a/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/SenderImpl.java
+++ b/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/SenderImpl.java
@@ -79,6 +79,25 @@
     }
 
     @Override
+    public int sendNoCopy(final ReadableBuffer buffer)
+    {
+        if (getLocalState() == EndpointState.CLOSED)
+        {
+            throw new IllegalStateException("send not allowed after the sender is closed.");
+        }
+        DeliveryImpl current = current();
+        if (current == null || current.getLink() != this)
+        {
+            throw new IllegalArgumentException();
+        }
+        int sent = current.sendNoCopy(buffer);
+        if (sent > 0) {
+            getSession().incrementOutgoingBytes(sent);
+        }
+        return sent;
+    }
+
+    @Override
     public void abort()
     {
         //TODO.
diff --git a/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/TransportImpl.java b/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/TransportImpl.java
index 0f969c8..1d0103e 100644
--- a/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/TransportImpl.java
+++ b/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/TransportImpl.java
@@ -47,6 +47,7 @@
 import org.apache.qpid.proton.codec.AMQPDefinedTypes;
 import org.apache.qpid.proton.codec.DecoderImpl;
 import org.apache.qpid.proton.codec.EncoderImpl;
+import org.apache.qpid.proton.codec.ReadableBuffer;
 import org.apache.qpid.proton.engine.Connection;
 import org.apache.qpid.proton.engine.EndpointState;
 import org.apache.qpid.proton.engine.Event;
@@ -589,24 +590,23 @@
                 transfer.setMessageFormat(UnsignedInteger.valueOf(messageFormat));
             }
 
-            ByteBuffer payload = delivery.getData() ==  null ? null :
-                ByteBuffer.wrap(delivery.getData(), delivery.getDataOffset(),
-                                delivery.getDataLength());
+            ReadableBuffer payload = delivery.getData();
+
+            int pending = payload.remaining();
 
             try {
                 writeFrame(tpSession.getLocalChannel(), transfer, payload, partialTransferHandler.setTransfer(transfer));
             } finally {
                 partialTransferHandler.setTransfer(null);
+                delivery.afterSend();  // Allow for freeing resources after write of buffered data
             }
 
             tpSession.incrementOutgoingId();
             tpSession.decrementRemoteIncomingWindow();
 
-            if(payload == null || !payload.hasRemaining())
+            if (payload == null || !payload.hasRemaining())
             {
-                session.incrementOutgoingBytes(-delivery.pending());
-                delivery.setData(null);
-                delivery.setDataLength(0);
+                session.incrementOutgoingBytes(-pending);
 
                 if (!transfer.getMore()) {
                     // Clear the in-progress delivery marker
@@ -622,10 +622,7 @@
             }
             else
             {
-                int delta = delivery.getDataLength() - payload.remaining();
-                delivery.setDataOffset(delivery.getDataOffset() + delta);
-                delivery.setDataLength(payload.remaining());
-                session.incrementOutgoingBytes(-delta);
+                session.incrementOutgoingBytes(-(pending - payload.remaining()));
 
                 // Remember the delivery we are still processing
                 // the body transfer frames for
@@ -1072,7 +1069,7 @@
     }
 
     protected void writeFrame(int channel, FrameBody frameBody,
-                            ByteBuffer payload, Runnable onPayloadTooLarge)
+                              ReadableBuffer payload, Runnable onPayloadTooLarge)
     {
         _frameWriter.writeFrame(channel, frameBody, payload, onPayloadTooLarge);
     }
diff --git a/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/TransportSession.java b/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/TransportSession.java
index a09889e..bbacd30 100644
--- a/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/TransportSession.java
+++ b/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/TransportSession.java
@@ -298,28 +298,14 @@
             delivery.setRemoteDeliveryState(transfer.getState());
         }
         _unsettledIncomingSize++;
-        // TODO - should this be a copy?
-        if(payload != null)
+
+        if (payload != null)
         {
-            if(delivery.getDataLength() == 0)
-            {
-                delivery.setData(payload.getArray());
-                delivery.setDataLength(payload.getLength());
-                delivery.setDataOffset(payload.getArrayOffset());
-            }
-            else
-            {
-                byte[] data = new byte[delivery.getDataLength() + payload.getLength()];
-                System.arraycopy(delivery.getData(), delivery.getDataOffset(), data, 0, delivery.getDataLength());
-                System.arraycopy(payload.getArray(), payload.getArrayOffset(), data, delivery.getDataLength(), payload.getLength());
-                delivery.setData(data);
-                delivery.setDataOffset(0);
-                delivery.setDataLength(data.length);
-            }
+            delivery.append(payload);
             getSession().incrementIncomingBytes(payload.getLength());
         }
-        delivery.updateWork();
 
+        delivery.updateWork();
 
         if(!(transfer.getMore() || transfer.getAborted()))
         {
diff --git a/proton-j/src/main/java/org/apache/qpid/proton/message/impl/MessageImpl.java b/proton-j/src/main/java/org/apache/qpid/proton/message/impl/MessageImpl.java
index cd90789..a31c169 100644
--- a/proton-j/src/main/java/org/apache/qpid/proton/message/impl/MessageImpl.java
+++ b/proton-j/src/main/java/org/apache/qpid/proton/message/impl/MessageImpl.java
@@ -40,7 +40,7 @@
     private ApplicationProperties _applicationProperties;
     private Section _body;
     private Footer _footer;
-    
+
     private static class EncoderDecoderPair {
       DecoderImpl decoder = new DecoderImpl();
       EncoderImpl encoder = new EncoderImpl(decoder);
@@ -576,8 +576,13 @@
 
     public void decode(ByteBuffer buffer)
     {
+        decode(ReadableBuffer.ByteBufferReader.wrap(buffer));
+    }
+
+    public void decode(ReadableBuffer buffer)
+    {
         DecoderImpl decoder = tlsCodec.get().decoder;
-        decoder.setByteBuffer(buffer);
+        decoder.setBuffer(buffer);
 
         _header = null;
         _deliveryAnnotations = null;
@@ -681,7 +686,7 @@
 
         }
 
-        decoder.setByteBuffer(null);
+        decoder.setBuffer(null);
     }
 
     @Override
diff --git a/proton-j/src/test/java/org/apache/qpid/proton/codec/Benchmark.java b/proton-j/src/test/java/org/apache/qpid/proton/codec/Benchmark.java
index 529a808..5dd4077 100644
--- a/proton-j/src/test/java/org/apache/qpid/proton/codec/Benchmark.java
+++ b/proton-j/src/test/java/org/apache/qpid/proton/codec/Benchmark.java
@@ -22,7 +22,6 @@
 
 import java.io.IOException;
 import java.lang.management.ManagementFactory;
-import java.nio.ByteBuffer;
 import java.util.ArrayList;
 import java.util.Date;
 import java.util.HashMap;
@@ -44,12 +43,13 @@
 import org.apache.qpid.proton.amqp.transport.Flow;
 import org.apache.qpid.proton.amqp.transport.Role;
 import org.apache.qpid.proton.amqp.transport.Transfer;
+import org.apache.qpid.proton.codec.WritableBuffer.ByteBufferWrapper;
 
 public class Benchmark implements Runnable {
 
     private static final int ITERATIONS = 10 * 1024 * 1024;
 
-    private ByteBuffer byteBuf = ByteBuffer.allocate(8192);
+    private ByteBufferWrapper outputBuf = WritableBuffer.ByteBufferWrapper.allocate(8192);
     private BenchmarkResult resultSet = new BenchmarkResult();
     private boolean warming = true;
 
@@ -66,8 +66,7 @@
     public void run() {
         AMQPDefinedTypes.registerAllTypes(decoder, encoder);
 
-        encoder.setByteBuffer(byteBuf);
-        decoder.setByteBuffer(byteBuf);
+        encoder.setByteBuffer(outputBuf);
 
         try {
             doBenchmarks();
@@ -102,6 +101,16 @@
         warming = false;
     }
 
+    private CompositeReadableBuffer convertToComposite(WritableBuffer buffer) {
+        CompositeReadableBuffer composite = new CompositeReadableBuffer();
+        ReadableBuffer readableView = outputBuf.toReadableBuffer();
+
+        byte[] copy = new byte[readableView.remaining()];
+        readableView.get(copy);
+
+        return composite.append(copy);
+    }
+
     private void benchmarkListOfInts() throws IOException {
         ArrayList<Object> list = new ArrayList<>(10);
         for (int j = 0; j < 10; j++) {
@@ -110,15 +119,18 @@
 
         resultSet.start();
         for (int i = 0; i < ITERATIONS; i++) {
-            byteBuf.clear();
+            outputBuf.byteBuffer().clear();
             encoder.writeList(list);
         }
         resultSet.encodesComplete();
 
+        CompositeReadableBuffer inputBuf = convertToComposite(outputBuf);
+        decoder.setBuffer(inputBuf);
+
         resultSet.start();
         for (int i = 0; i < ITERATIONS; i++) {
-            byteBuf.flip();
             decoder.readList();
+            inputBuf.flip();
         }
         resultSet.decodesComplete();
 
@@ -130,15 +142,18 @@
 
         resultSet.start();
         for (int i = 0; i < ITERATIONS; i++) {
-            byteBuf.clear();
+            outputBuf.byteBuffer().clear();
             encoder.writeUUID(uuid);
         }
         resultSet.encodesComplete();
 
+        CompositeReadableBuffer inputBuf = convertToComposite(outputBuf);
+        decoder.setBuffer(inputBuf);
+
         resultSet.start();
         for (int i = 0; i < ITERATIONS; i++) {
-            byteBuf.flip();
             decoder.readUUID();
+            inputBuf.flip();
         }
         resultSet.decodesComplete();
 
@@ -152,15 +167,18 @@
 
         resultSet.start();
         for (int i = 0; i < ITERATIONS; i++) {
-            byteBuf.clear();
+            outputBuf.byteBuffer().clear();
             encoder.writeObject(header);
         }
         resultSet.encodesComplete();
 
+        CompositeReadableBuffer inputBuf = convertToComposite(outputBuf);
+        decoder.setBuffer(inputBuf);
+
         resultSet.start();
         for (int i = 0; i < ITERATIONS; i++) {
-            byteBuf.flip();
             decoder.readObject();
+            inputBuf.flip();
         }
         resultSet.decodesComplete();
 
@@ -175,15 +193,18 @@
 
         resultSet.start();
         for (int i = 0; i < ITERATIONS; i++) {
-            byteBuf.clear();
+            outputBuf.byteBuffer().clear();
             encoder.writeObject(transfer);
         }
         resultSet.encodesComplete();
 
+        CompositeReadableBuffer inputBuf = convertToComposite(outputBuf);
+        decoder.setBuffer(inputBuf);
+
         resultSet.start();
         for (int i = 0; i < ITERATIONS; i++) {
-            byteBuf.flip();
             decoder.readObject();
+            inputBuf.flip();
         }
         resultSet.decodesComplete();
 
@@ -202,15 +223,18 @@
 
         resultSet.start();
         for (int i = 0; i < ITERATIONS; i++) {
-            byteBuf.clear();
+            outputBuf.byteBuffer().clear();
             encoder.writeObject(flow);
         }
         resultSet.encodesComplete();
 
+        CompositeReadableBuffer inputBuf = convertToComposite(outputBuf);
+        decoder.setBuffer(inputBuf);
+
         resultSet.start();
         for (int i = 0; i < ITERATIONS; i++) {
-            byteBuf.flip();
             decoder.readObject();
+            inputBuf.flip();
         }
         resultSet.decodesComplete();
 
@@ -225,15 +249,18 @@
 
         resultSet.start();
         for (int i = 0; i < ITERATIONS; i++) {
-            byteBuf.clear();
+            outputBuf.byteBuffer().clear();
             encoder.writeObject(properties);
         }
         resultSet.encodesComplete();
 
+        CompositeReadableBuffer inputBuf = convertToComposite(outputBuf);
+        decoder.setBuffer(inputBuf);
+
         resultSet.start();
         for (int i = 0; i < ITERATIONS; i++) {
-            byteBuf.flip();
             decoder.readObject();
+            inputBuf.flip();
         }
         resultSet.decodesComplete();
 
@@ -248,22 +275,24 @@
 
         resultSet.start();
         for (int i = 0; i < ITERATIONS; i++) {
-            byteBuf.clear();
+            outputBuf.byteBuffer().clear();
             encoder.writeObject(annotations);
         }
         resultSet.encodesComplete();
 
+        CompositeReadableBuffer inputBuf = convertToComposite(outputBuf);
+        decoder.setBuffer(inputBuf);
+
         resultSet.start();
         for (int i = 0; i < ITERATIONS; i++) {
-            byteBuf.flip();
             decoder.readObject();
+            inputBuf.flip();
         }
         resultSet.decodesComplete();
 
         time("MessageAnnotations", resultSet);
     }
 
-    @SuppressWarnings("unchecked")
     private void benchmarkApplicationProperties() throws IOException {
         ApplicationProperties properties = new ApplicationProperties(new HashMap<String, Object>());
         properties.getValue().put("test1", UnsignedByte.valueOf((byte) 128));
@@ -272,15 +301,18 @@
 
         resultSet.start();
         for (int i = 0; i < ITERATIONS; i++) {
-            byteBuf.clear();
+            outputBuf.byteBuffer().clear();
             encoder.writeObject(properties);
         }
         resultSet.encodesComplete();
 
+        CompositeReadableBuffer inputBuf = convertToComposite(outputBuf);
+        decoder.setBuffer(inputBuf);
+
         resultSet.start();
         for (int i = 0; i < ITERATIONS; i++) {
-            byteBuf.flip();
             decoder.readObject();
+            inputBuf.flip();
         }
         resultSet.decodesComplete();
 
@@ -294,19 +326,22 @@
 
         resultSet.start();
         for (int i = 0; i < ITERATIONS; i++) {
-            byteBuf.clear();
+            outputBuf.byteBuffer().clear();
             encoder.writeSymbol(symbol1);
             encoder.writeSymbol(symbol2);
             encoder.writeSymbol(symbol3);
         }
         resultSet.encodesComplete();
 
+        CompositeReadableBuffer inputBuf = convertToComposite(outputBuf);
+        decoder.setBuffer(inputBuf);
+
         resultSet.start();
         for (int i = 0; i < ITERATIONS; i++) {
-            byteBuf.flip();
             decoder.readSymbol();
             decoder.readSymbol();
             decoder.readSymbol();
+            inputBuf.flip();
         }
         resultSet.decodesComplete();
 
@@ -323,15 +358,18 @@
 
         resultSet.start();
         for (int i = 0; i < ITERATIONS; i++) {
-            byteBuf.clear();
+            outputBuf.byteBuffer().clear();
             encoder.writeObject(disposition);
         }
         resultSet.encodesComplete();
 
+        CompositeReadableBuffer inputBuf = convertToComposite(outputBuf);
+        decoder.setBuffer(inputBuf);
+
         resultSet.start();
         for (int i = 0; i < ITERATIONS; i++) {
-            byteBuf.flip();
             decoder.readObject();
+            inputBuf.flip();
         }
         resultSet.decodesComplete();
 
@@ -345,19 +383,22 @@
 
         resultSet.start();
         for (int i = 0; i < ITERATIONS; i++) {
-            byteBuf.clear();
+            outputBuf.byteBuffer().clear();
             encoder.writeString(string1);
             encoder.writeString(string2);
             encoder.writeString(string3);
         }
         resultSet.encodesComplete();
 
+        CompositeReadableBuffer inputBuf = convertToComposite(outputBuf);
+        decoder.setBuffer(inputBuf);
+
         resultSet.start();
         for (int i = 0; i < ITERATIONS; i++) {
-            byteBuf.flip();
             decoder.readString();
             decoder.readString();
             decoder.readString();
+            inputBuf.flip();
         }
         resultSet.decodesComplete();
 
@@ -371,19 +412,22 @@
 
         resultSet.start();
         for (int i = 0; i < ITERATIONS; i++) {
-            byteBuf.clear();
+            outputBuf.byteBuffer().clear();
             encoder.writeObject(data1);
             encoder.writeObject(data2);
             encoder.writeObject(data3);
         }
         resultSet.encodesComplete();
 
+        CompositeReadableBuffer inputBuf = convertToComposite(outputBuf);
+        decoder.setBuffer(inputBuf);
+
         resultSet.start();
         for (int i = 0; i < ITERATIONS; i++) {
-            byteBuf.flip();
             decoder.readObject();
             decoder.readObject();
             decoder.readObject();
+            inputBuf.flip();
         }
         resultSet.decodesComplete();
 
diff --git a/proton-j/src/test/java/org/apache/qpid/proton/codec/CompositeReadableBufferTest.java b/proton-j/src/test/java/org/apache/qpid/proton/codec/CompositeReadableBufferTest.java
new file mode 100644
index 0000000..7ed5c3e
--- /dev/null
+++ b/proton-j/src/test/java/org/apache/qpid/proton/codec/CompositeReadableBufferTest.java
@@ -0,0 +1,2305 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.qpid.proton.codec;
+
+import static org.junit.Assert.*;
+
+import java.nio.BufferUnderflowException;
+import java.nio.ByteBuffer;
+import java.nio.InvalidMarkException;
+import java.nio.charset.CharacterCodingException;
+import java.nio.charset.StandardCharsets;
+
+import org.junit.Test;
+
+/**
+ * Test for API of the CompositeReadableBuffer class.
+ */
+public class CompositeReadableBufferTest {
+
+    //----- Test newly create buffer behaviors -------------------------------//
+
+    @Test
+    public void testDefaultCtor() {
+        CompositeReadableBuffer buffer = new CompositeReadableBuffer();
+
+        assertFalse(buffer.hasArray());
+        assertEquals(0, buffer.remaining());
+        assertEquals(0, buffer.position());
+        assertEquals(0, buffer.limit());
+    }
+
+    //----- Test limit handling ----------------------------------------------//
+
+    @Test
+    public void testLimitAppliesUpdatesToPositionAndMark() {
+        CompositeReadableBuffer buffer = new CompositeReadableBuffer();
+
+        buffer.append(new byte[] { 0, 1, 2, 3, 4, 5, 6, 7, 8, 9 });
+        buffer.position(10);
+        buffer.mark();
+
+        assertEquals(10, buffer.capacity());
+        assertEquals(10, buffer.limit());
+        assertEquals(10, buffer.position());
+
+        buffer.limit(5);
+        assertEquals(5, buffer.limit());
+        assertEquals(5, buffer.position());
+
+        try {
+            buffer.reset();
+            fail("Should throw a InvalidMarkException");
+        } catch (InvalidMarkException e) {}
+
+        buffer.mark();
+        buffer.limit(10);
+        buffer.position(10);
+
+        try {
+            buffer.reset();
+        } catch (InvalidMarkException e) {
+            fail("Should not throw a InvalidMarkException");
+        }
+
+        assertEquals(5, buffer.position());
+    }
+
+    @Test
+    public void testLimitAppliesUpdatesToPositionAndMarkWithTwoArrays() {
+        CompositeReadableBuffer buffer = new CompositeReadableBuffer();
+
+        buffer.append(new byte[] { 0, 1, 2, 3, 4}).append(new byte[] { 5, 6, 7, 8, 9 });
+        buffer.position(10);
+        buffer.mark();
+
+        assertEquals(10, buffer.capacity());
+        assertEquals(10, buffer.limit());
+        assertEquals(10, buffer.position());
+
+        buffer.limit(5);
+        assertEquals(5, buffer.limit());
+        assertEquals(5, buffer.position());
+
+        try {
+            buffer.reset();
+            fail("Should throw a InvalidMarkException");
+        } catch (InvalidMarkException e) {}
+
+        buffer.mark();
+        buffer.limit(10);
+        buffer.position(10);
+
+        try {
+            buffer.reset();
+        } catch (InvalidMarkException e) {
+            fail("Should not throw a InvalidMarkException");
+        }
+
+        assertEquals(5, buffer.position());
+    }
+
+    @Test
+    public void testLimitWithOneArrayAppended() {
+        CompositeReadableBuffer buffer = new CompositeReadableBuffer();
+
+        buffer.append(new byte[] { 0, 1, 2, 3, 4, 5, 6, 7, 8, 9 });
+
+        assertEquals(10, buffer.capacity());
+        assertEquals(10, buffer.limit());
+
+        buffer.limit(5);
+        assertEquals(5, buffer.limit());
+
+        buffer.limit(6);
+        assertEquals(6, buffer.limit());
+
+        try {
+            buffer.limit(11);
+            fail("Should throw a IllegalArgumentException");
+        } catch (IllegalArgumentException e) {}
+    }
+
+    @Test
+    public void testLimitWithTwoArraysAppended() {
+        CompositeReadableBuffer buffer = new CompositeReadableBuffer();
+
+        buffer.append(new byte[] { 0, 1, 2, 3, 4}).append(new byte[] { 5, 6, 7, 8, 9 });
+
+        assertEquals(10, buffer.capacity());
+        assertEquals(10, buffer.limit());
+
+        buffer.limit(5);
+        assertEquals(5, buffer.limit());
+
+        buffer.limit(6);
+        assertEquals(6, buffer.limit());
+
+        try {
+            buffer.limit(11);
+            fail("Should throw a IllegalArgumentException");
+        } catch (IllegalArgumentException e) {}
+    }
+
+    @Test
+    public void testLimitEnforcesPreconditions() {
+        CompositeReadableBuffer buffer = new CompositeReadableBuffer();
+
+        // test with nothing appended.
+        try {
+            buffer.limit(2);
+            fail("Should throw a IllegalArgumentException");
+        } catch (IllegalArgumentException e) {}
+
+        try {
+            buffer.limit(-1);
+            fail("Should throw a IllegalArgumentException");
+        } catch (IllegalArgumentException e) {}
+
+        // Test with something appended
+        buffer.append(new byte[] { 127 });
+
+        try {
+            buffer.limit(2);
+            fail("Should throw a IllegalArgumentException");
+        } catch (IllegalArgumentException e) {}
+
+        try {
+            buffer.limit(-1);
+            fail("Should throw a IllegalArgumentException");
+        } catch (IllegalArgumentException e) {}
+    }
+
+    //----- Test position handling -------------------------------------------//
+
+    @Test
+    public void testPositionWithOneArrayAppended() {
+        CompositeReadableBuffer buffer = new CompositeReadableBuffer();
+
+        buffer.append(new byte[] { 0, 1, 2, 3, 4, 5, 6, 7, 8, 9 });
+
+        assertEquals(10, buffer.capacity());
+        assertEquals(10, buffer.limit());
+        assertEquals(0, buffer.position());
+
+        buffer.position(5);
+        assertEquals(5, buffer.position());
+
+        buffer.position(6);
+        assertEquals(6, buffer.position());
+
+        buffer.position(10);
+        assertEquals(10, buffer.position());
+
+        try {
+            buffer.position(11);
+            fail("Should throw a IllegalArgumentException");
+        } catch (IllegalArgumentException e) {}
+
+        buffer.mark();
+
+        buffer.position(0);
+        assertEquals(0, buffer.position());
+
+        try {
+            buffer.reset();
+            fail("Should throw InvalidMarkException");
+        } catch (InvalidMarkException ime) {}
+    }
+
+    @Test
+    public void testPositionWithTwoArraysAppended() {
+        CompositeReadableBuffer buffer = new CompositeReadableBuffer();
+
+        buffer.append(new byte[] { 0, 1, 2, 3, 4}).append(new byte[] { 5, 6, 7, 8, 9 });
+
+        assertEquals(10, buffer.capacity());
+        assertEquals(10, buffer.limit());
+
+        buffer.position(5);
+        assertEquals(5, buffer.position());
+
+        buffer.position(6);
+        assertEquals(6, buffer.position());
+
+        buffer.position(10);
+        assertEquals(10, buffer.position());
+
+        try {
+            buffer.position(11);
+            fail("Should throw a IllegalArgumentException");
+        } catch (IllegalArgumentException e) {}
+
+        buffer.position(0);
+        assertEquals(0, buffer.position());
+    }
+
+    @Test
+    public void testPositionEnforcesPreconditions() {
+        CompositeReadableBuffer buffer = new CompositeReadableBuffer();
+
+        // test with nothing appended.
+        try {
+            buffer.position(2);
+            fail("Should throw a IllegalArgumentException");
+        } catch (IllegalArgumentException e) {}
+
+        try {
+            buffer.position(-1);
+            fail("Should throw a IllegalArgumentException");
+        } catch (IllegalArgumentException e) {}
+
+        // Test with something appended
+        buffer.append(new byte[] { 127 });
+
+        try {
+            buffer.position(2);
+            fail("Should throw a IllegalArgumentException");
+        } catch (IllegalArgumentException e) {}
+
+        try {
+            buffer.position(-1);
+            fail("Should throw a IllegalArgumentException");
+        } catch (IllegalArgumentException e) {}
+    }
+
+    //----- Test buffer get methods ------------------------------------------//
+
+    @Test
+    public void testGetByteWithNothingAppended() {
+        CompositeReadableBuffer buffer = new CompositeReadableBuffer();
+
+        try {
+            buffer.get();
+            fail("Should throw a BufferUnderflowException");
+        } catch (BufferUnderflowException e) {}
+    }
+
+    @Test
+    public void testGetByteWithOneArrayWithOneElement() {
+        CompositeReadableBuffer buffer = new CompositeReadableBuffer();
+
+        buffer.append(new byte[] { 127 });
+
+        assertEquals(1, buffer.remaining());
+        assertTrue(buffer.hasRemaining());
+        assertEquals(0, buffer.position());
+
+        assertEquals(127, buffer.get());
+
+        assertEquals(0, buffer.remaining());
+        assertFalse(buffer.hasRemaining());
+        assertEquals(1, buffer.position());
+
+        try {
+            buffer.get();
+            fail("Should not be able to read past end");
+        } catch (BufferUnderflowException e) {}
+    }
+
+    @Test
+    public void testGetByteWithOneArrayWithManyElements() {
+        CompositeReadableBuffer buffer = new CompositeReadableBuffer();
+
+        buffer.append(new byte[] { 0, 1, 2, 3, 4, 5, 6, 7, 8, 9 });
+
+        assertEquals(10, buffer.remaining());
+        assertTrue(buffer.hasRemaining());
+        assertEquals(0, buffer.position());
+
+        for (int i = 0; i < 10; ++i) {
+            assertEquals(i, buffer.get());
+            assertEquals(i + 1, buffer.position());
+        }
+
+        assertEquals(0, buffer.remaining());
+        assertEquals(10, buffer.position());
+
+        try {
+            buffer.get();
+            fail("Should not be able to read past end");
+        } catch (BufferUnderflowException e) {}
+    }
+
+    @Test
+    public void testGetByteWithManyArraysWithOneElements() {
+        CompositeReadableBuffer buffer = new CompositeReadableBuffer();
+
+        buffer.append(new byte[] {0})
+              .append(new byte[] {1})
+              .append(new byte[] {2})
+              .append(new byte[] {3})
+              .append(new byte[] {4})
+              .append(new byte[] {5})
+              .append(new byte[] {6})
+              .append(new byte[] {7})
+              .append(new byte[] {8})
+              .append(new byte[] {9});
+
+        assertEquals(10, buffer.remaining());
+        assertFalse(buffer.hasArray());
+        assertTrue(buffer.hasRemaining());
+        assertEquals(0, buffer.position());
+
+        for (int i = 0; i < 10; ++i) {
+            assertEquals(i, buffer.get());
+        }
+
+        assertEquals(0, buffer.remaining());
+        assertEquals(10, buffer.position());
+
+        try {
+            buffer.get();
+            fail("Should not be able to read past end");
+        } catch (BufferUnderflowException e) {}
+    }
+
+    @Test
+    public void testGetByteWithManyArraysWithVariedElements() {
+        CompositeReadableBuffer buffer = new CompositeReadableBuffer();
+
+        buffer.append(new byte[] {0})
+              .append(new byte[] {1, 2})
+              .append(new byte[] {3, 4, 5})
+              .append(new byte[] {6})
+              .append(new byte[] {7, 8, 9});
+
+        assertEquals(10, buffer.remaining());
+        assertFalse(buffer.hasArray());
+        assertTrue(buffer.hasRemaining());
+        assertEquals(0, buffer.position());
+
+        for (int i = 0; i < 10; ++i) {
+            assertEquals(i, buffer.get());
+        }
+
+        assertEquals(0, buffer.remaining());
+        assertEquals(10, buffer.position());
+
+        try {
+            buffer.get();
+            fail("Should not be able to read past end");
+        } catch (BufferUnderflowException e) {}
+    }
+
+    @Test
+    public void testGetShortByteWithNothingAppended() {
+        CompositeReadableBuffer buffer = new CompositeReadableBuffer();
+
+        try {
+            buffer.getShort();
+            fail("Should throw a BufferUnderflowException");
+        } catch (BufferUnderflowException e) {}
+    }
+
+    @Test
+    public void testGetShortWithOneArrayWithOneElement() {
+        CompositeReadableBuffer buffer = new CompositeReadableBuffer();
+
+        buffer.append(new byte[] { 8, 0 });
+
+        assertEquals(2, buffer.remaining());
+        assertTrue(buffer.hasRemaining());
+        assertEquals(0, buffer.position());
+
+        assertEquals(2048, buffer.getShort());
+
+        assertEquals(0, buffer.remaining());
+        assertFalse(buffer.hasRemaining());
+        assertEquals(2, buffer.position());
+
+        try {
+            buffer.getShort();
+            fail("Should not be able to read past end");
+        } catch (BufferUnderflowException e) {}
+    }
+
+    @Test
+    public void testGetShortWithTwoArraysContainingOneElement() {
+        CompositeReadableBuffer buffer = new CompositeReadableBuffer();
+
+        buffer.append(new byte[] {8}).append(new byte[] {0});
+
+        assertEquals(2, buffer.remaining());
+        assertTrue(buffer.hasRemaining());
+        assertEquals(0, buffer.position());
+
+        assertEquals(2048, buffer.getShort());
+
+        assertEquals(0, buffer.remaining());
+        assertFalse(buffer.hasRemaining());
+        assertEquals(2, buffer.position());
+
+        try {
+            buffer.getShort();
+            fail("Should not be able to read past end");
+        } catch (BufferUnderflowException e) {}
+    }
+
+    @Test
+    public void testGetIntByteWithNothingAppended() {
+        CompositeReadableBuffer buffer = new CompositeReadableBuffer();
+
+        try {
+            buffer.getInt();
+            fail("Should throw a BufferUnderflowException");
+        } catch (BufferUnderflowException e) {}
+    }
+
+    @Test
+    public void testGetIntWithOneArrayWithOneElement() {
+        CompositeReadableBuffer buffer = new CompositeReadableBuffer();
+
+        buffer.append(new byte[] { 0, 0, 8, 0 });
+
+        assertEquals(4, buffer.remaining());
+        assertTrue(buffer.hasRemaining());
+        assertEquals(0, buffer.position());
+
+        assertEquals(2048, buffer.getInt());
+
+        assertEquals(0, buffer.remaining());
+        assertFalse(buffer.hasRemaining());
+        assertEquals(4, buffer.position());
+
+        try {
+            buffer.getInt();
+            fail("Should not be able to read past end");
+        } catch (BufferUnderflowException e) {}
+    }
+
+    @Test
+    public void testGetIntWithTwoArraysContainingOneElement() {
+        CompositeReadableBuffer buffer = new CompositeReadableBuffer();
+
+        buffer.append(new byte[] { 0 ,0 }).append(new byte[] { 8, 0 });
+
+        assertEquals(4, buffer.remaining());
+        assertTrue(buffer.hasRemaining());
+        assertEquals(0, buffer.position());
+
+        assertEquals(2048, buffer.getInt());
+
+        assertEquals(0, buffer.remaining());
+        assertFalse(buffer.hasRemaining());
+        assertEquals(4, buffer.position());
+
+        try {
+            buffer.getInt();
+            fail("Should not be able to read past end");
+        } catch (BufferUnderflowException e) {}
+    }
+
+    @Test
+    public void testGetLongByteWithNothingAppended() {
+        CompositeReadableBuffer buffer = new CompositeReadableBuffer();
+
+        try {
+            buffer.getLong();
+            fail("Should throw a BufferUnderflowException");
+        } catch (BufferUnderflowException e) {}
+    }
+
+    @Test
+    public void testGetLongWithOneArrayWithOneElement() {
+        CompositeReadableBuffer buffer = new CompositeReadableBuffer();
+
+        buffer.append(new byte[] { 0, 0, 0, 0, 0, 0, 8, 0 });
+
+        assertEquals(8, buffer.remaining());
+        assertTrue(buffer.hasRemaining());
+        assertEquals(0, buffer.position());
+
+        assertEquals(2048, buffer.getLong());
+
+        assertEquals(0, buffer.remaining());
+        assertFalse(buffer.hasRemaining());
+        assertEquals(8, buffer.position());
+
+        try {
+            buffer.getLong();
+            fail("Should not be able to read past end");
+        } catch (BufferUnderflowException e) {}
+    }
+
+    @Test
+    public void testGetLongWithTwoArraysContainingOneElement() {
+        CompositeReadableBuffer buffer = new CompositeReadableBuffer();
+
+        buffer.append(new byte[] { 0 ,0, 0, 0 }).append(new byte[] { 0, 0, 8, 0 });
+
+        assertEquals(8, buffer.remaining());
+        assertTrue(buffer.hasRemaining());
+        assertEquals(0, buffer.position());
+
+        assertEquals(2048, buffer.getLong());
+
+        assertEquals(0, buffer.remaining());
+        assertFalse(buffer.hasRemaining());
+        assertEquals(8, buffer.position());
+
+        try {
+            buffer.getLong();
+            fail("Should not be able to read past end");
+        } catch (BufferUnderflowException e) {}
+    }
+
+    @Test
+    public void testGetByteArrayWithContentsInSingleArray() {
+        CompositeReadableBuffer buffer = new CompositeReadableBuffer();
+
+        byte[] data = new byte[] {0, 1, 2, 3, 4, 5, 6, 7, 8, 9};
+        buffer.append(data);
+
+        assertEquals(data.length, buffer.limit());
+        byte array[] = new byte[1];
+
+        for (int i = 0; i < data.length; i++) {
+            assertEquals(buffer.position(), i);
+            ReadableBuffer self = buffer.get(array);
+            assertEquals(array[0], buffer.get(i));
+            assertSame(self, buffer);
+        }
+
+        try {
+            buffer.get(array);
+            fail("Should throw BufferUnderflowException");
+        } catch (BufferUnderflowException e) {
+        }
+
+        try {
+            buffer.get((byte[]) null);
+            fail("Should throw NullPointerException");
+        } catch (NullPointerException e) {
+        }
+    }
+
+    @Test
+    public void testGetWritableBufferWithContentsInSingleArray() {
+        CompositeReadableBuffer buffer = new CompositeReadableBuffer();
+
+        byte[] data = new byte[] {0, 1, 2, 3, 4, 5, 6, 7, 8, 9};
+        buffer.append(data);
+
+        assertEquals(data.length, buffer.limit());
+
+        ByteBuffer destination = ByteBuffer.allocate(1);
+        WritableBuffer target = WritableBuffer.ByteBufferWrapper.wrap(destination);
+
+        for (int i = 0; i < data.length; i++) {
+            assertEquals(buffer.position(), i);
+            ReadableBuffer self = buffer.get(target);
+            assertEquals(destination.get(0), buffer.get(i));
+            assertSame(self, buffer);
+            destination.rewind();
+        }
+
+        try {
+            buffer.get(target);
+        } catch (Throwable e) {
+            fail("Should not throw: " + e.getClass().getSimpleName());
+        }
+
+        try {
+            buffer.get((WritableBuffer) null);
+            fail("Should throw NullPointerException");
+        } catch (NullPointerException e) {
+        }
+    }
+
+    @Test
+    public void testGetWritableBufferWithContentsInSeveralArrays() {
+        CompositeReadableBuffer buffer = new CompositeReadableBuffer();
+
+        byte[] data1 = new byte[] {0, 1, 2, 3, 4};
+        byte[] data2 = new byte[] {5, 6, 7, 8, 9};
+        byte[] data3 = new byte[] {10, 11, 12};
+
+        int size = data1.length + data2.length + data3.length;
+
+        buffer.append(data1).append(data2).append(data3);
+
+        assertEquals(size, buffer.limit());
+
+        ByteBuffer destination = ByteBuffer.allocate(1);
+        WritableBuffer target = WritableBuffer.ByteBufferWrapper.wrap(destination);
+
+        for (int i = 0; i < size; i++) {
+            assertEquals(buffer.position(), i);
+            ReadableBuffer self = buffer.get(target);
+            assertEquals(destination.get(0), buffer.get(i));
+            assertSame(self, buffer);
+            destination.rewind();
+        }
+
+        try {
+            buffer.get(target);
+        } catch (Throwable e) {
+            fail("Should not throw: " + e.getClass().getSimpleName());
+        }
+
+        try {
+            buffer.get((WritableBuffer) null);
+            fail("Should throw NullPointerException");
+        } catch (NullPointerException e) {
+        }
+    }
+
+    @Test
+    public void testGetWritableBufferRespectsOwnLimit() {
+        CompositeReadableBuffer buffer = new CompositeReadableBuffer();
+
+        byte[] data = new byte[] {0, 1, 2, 3, 4, 5, 6, 7, 8, 9};
+        buffer.append(data);
+        buffer.limit(5);
+
+        ByteBuffer destination = ByteBuffer.allocate(data.length);
+        WritableBuffer target = WritableBuffer.ByteBufferWrapper.wrap(destination);
+
+        buffer.get(target);
+
+        assertEquals(5, buffer.position());
+        assertEquals(0, buffer.remaining());
+
+        assertEquals(5, target.position());
+        assertEquals(5, target.remaining());
+    }
+
+    @Test
+    public void testGetintWithContentsInSingleArray() {
+        CompositeReadableBuffer buffer = new CompositeReadableBuffer();
+
+        byte[] data = new byte[] {0, 1, 2, 3, 4, 5, 6, 7, 8, 9};
+        buffer.append(data);
+
+        for (int i = 0; i < buffer.capacity(); i++) {
+            assertEquals(buffer.position(), i);
+            assertEquals(buffer.get(i), buffer.get());
+        }
+
+        buffer.rewind();
+
+        for (int i = 0; i < buffer.capacity(); i++) {
+            assertEquals(buffer.position(), i);
+            assertEquals(buffer.get(), buffer.get(i));
+        }
+
+        try {
+            buffer.get(-1);
+            fail("Should throw IndexOutOfBoundsException");
+        } catch (IndexOutOfBoundsException e) {
+        }
+
+        try {
+            buffer.get(buffer.limit());
+            fail("Should throw IndexOutOfBoundsException");
+        } catch (IndexOutOfBoundsException e) {
+        }
+    }
+
+    @Test
+    public void testGetintWithContentsInMultipleArrays() {
+        CompositeReadableBuffer buffer = new CompositeReadableBuffer();
+
+        buffer.append(new byte[] {0, 1, 2, 3, 4}).append(new byte[] {5, 6, 7, 8, 9});
+
+        for (int i = 0; i < buffer.capacity(); i++) {
+            assertEquals(buffer.position(), i);
+            assertEquals(buffer.get(), buffer.get(i));
+        }
+
+        try {
+            buffer.get(-1);
+            fail("Should throw IndexOutOfBoundsException");
+        } catch (IndexOutOfBoundsException e) {
+        }
+
+        try {
+            buffer.get(buffer.limit());
+            fail("Should throw IndexOutOfBoundsException");
+        } catch (IndexOutOfBoundsException e) {
+        }
+    }
+
+    @Test
+    public void testGetbyteArrayIntIntWithContentsInSingleArray() {
+        CompositeReadableBuffer buffer = new CompositeReadableBuffer();
+
+        byte[] data = new byte[] {0, 1, 2, 3, 4, 5, 6, 7, 8, 9};
+        buffer.append(data);
+
+        byte array[] = new byte[data.length];
+
+        try {
+            buffer.get(new byte[data.length + 1], 0, data.length + 1);
+            fail("Should throw BufferUnderflowException");
+        } catch (BufferUnderflowException e) {
+        }
+
+        assertEquals(buffer.position(), 0);
+
+        try {
+            buffer.get(array, -1, array.length);
+            fail("Should throw IndexOutOfBoundsException");
+        } catch (IndexOutOfBoundsException e) {
+        }
+
+        buffer.get(array, array.length, 0);
+
+        try {
+            buffer.get(array, array.length + 1, 1);
+            fail("Should throw IndexOutOfBoundsException");
+        } catch (IndexOutOfBoundsException e) {
+        }
+
+        assertEquals(buffer.position(), 0);
+
+        try {
+            buffer.get(array, 2, -1);
+            fail("Should throw IndexOutOfBoundsException");
+        } catch (IndexOutOfBoundsException e) {
+        }
+
+        try {
+            buffer.get(array, 2, array.length);
+            fail("Should throw IndexOutOfBoundsException");
+        } catch (IndexOutOfBoundsException e) {
+        }
+
+        try {
+            buffer.get((byte[])null, -1, 0);
+            fail("Should throw NullPointerException");
+        } catch (NullPointerException e) {
+        }
+
+        try {
+            buffer.get(array, 1, Integer.MAX_VALUE);
+            fail("Should throw IndexOutOfBoundsException");
+        } catch (IndexOutOfBoundsException e) {
+        }
+
+        try {
+            buffer.get(array, Integer.MAX_VALUE, 1);
+            fail("Should throw IndexOutOfBoundsException");
+        } catch (IndexOutOfBoundsException e) {
+        }
+
+        assertEquals(buffer.position(), 0);
+
+        CompositeReadableBuffer self = buffer.get(array, 0, array.length);
+        assertEquals(buffer.position(), buffer.capacity());
+        assertContentEquals(buffer, array, 0, array.length);
+        assertSame(self, buffer);
+    }
+
+    @Test
+    public void testGetbyteArrayIntIntWithContentsInMultipleArrays() {
+        CompositeReadableBuffer buffer = new CompositeReadableBuffer();
+
+        byte[] data1 = new byte[] {0, 1, 2, 3, 4};
+        byte[] data2 = new byte[] {5, 6, 7, 8, 9};
+
+        final int dataLength = data1.length + data2.length;
+
+        buffer.append(data1).append(data2);
+
+        assertEquals(dataLength, buffer.remaining());
+
+        byte array[] = new byte[buffer.remaining()];
+
+        try {
+            buffer.get(new byte[dataLength + 1], 0, dataLength + 1);
+            fail("Should throw BufferUnderflowException");
+        } catch (BufferUnderflowException e) {
+        }
+
+        assertEquals(buffer.position(), 0);
+
+        try {
+            buffer.get(array, -1, array.length);
+            fail("Should throw IndexOutOfBoundsException");
+        } catch (IndexOutOfBoundsException e) {
+        }
+
+        buffer.get(array, array.length, 0);
+
+        try {
+            buffer.get(array, array.length + 1, 1);
+            fail("Should throw IndexOutOfBoundsException");
+        } catch (IndexOutOfBoundsException e) {
+        }
+
+        assertEquals(buffer.position(), 0);
+
+        try {
+            buffer.get(array, 2, -1);
+            fail("Should throw IndexOutOfBoundsException");
+        } catch (IndexOutOfBoundsException e) {
+        }
+
+        try {
+            buffer.get(array, 2, array.length);
+            fail("Should throw IndexOutOfBoundsException");
+        } catch (IndexOutOfBoundsException e) {
+        }
+
+        try {
+            buffer.get((byte[])null, -1, 0);
+            fail("Should throw NullPointerException");
+        } catch (NullPointerException e) {
+        }
+
+        try {
+            buffer.get(array, 1, Integer.MAX_VALUE);
+            fail("Should throw IndexOutOfBoundsException");
+        } catch (IndexOutOfBoundsException e) {
+        }
+
+        try {
+            buffer.get(array, Integer.MAX_VALUE, 1);
+            fail("Should throw IndexOutOfBoundsException");
+        } catch (IndexOutOfBoundsException e) {
+        }
+
+        assertEquals(buffer.position(), 0);
+
+        CompositeReadableBuffer self = buffer.get(array, 0, array.length);
+        assertEquals(buffer.position(), buffer.capacity());
+        assertContentEquals(buffer, array, 0, array.length);
+        assertSame(self, buffer);
+    }
+
+    @Test
+    public void testGetFloat() {
+        CompositeReadableBuffer buffer = new CompositeReadableBuffer();
+
+        buffer.append(float2bytes(3.14f));
+
+        assertEquals(4, buffer.remaining());
+        assertTrue(buffer.hasRemaining());
+        assertEquals(0, buffer.position());
+
+        assertEquals(3.14f, buffer.getFloat(), 0.1);
+
+        assertEquals(0, buffer.remaining());
+        assertFalse(buffer.hasRemaining());
+        assertEquals(4, buffer.position());
+
+        try {
+            buffer.getFloat();
+            fail("Should not be able to read past end");
+        } catch (BufferUnderflowException e) {}
+    }
+
+    @Test
+    public void testGetDouble() {
+        CompositeReadableBuffer buffer = new CompositeReadableBuffer();
+
+        buffer.append(double2bytes(6.11));
+
+        assertEquals(8, buffer.remaining());
+        assertTrue(buffer.hasRemaining());
+        assertEquals(0, buffer.position());
+
+        assertEquals(6.11, buffer.getDouble(), 0.1);
+
+        assertEquals(0, buffer.remaining());
+        assertFalse(buffer.hasRemaining());
+        assertEquals(8, buffer.position());
+
+        try {
+            buffer.getDouble();
+            fail("Should not be able to read past end");
+        } catch (BufferUnderflowException e) {}
+    }
+
+    //----- Test hasArray method ---------------------------------------------//
+
+    @Test
+    public void testHasArray() {
+        CompositeReadableBuffer buffer = new CompositeReadableBuffer();
+
+        byte[] data1 = new byte[] {0, 1, 2, 3, 4};
+        byte[] data2 = new byte[] {5, 6, 7, 8, 9};
+        buffer.append(data1);
+
+        assertTrue(buffer.hasArray());
+        assertNotNull(buffer.array());
+        assertSame(data1, buffer.array());
+        assertEquals(0, buffer.arrayOffset());
+
+        buffer.append(data2);
+
+        assertFalse(buffer.hasArray());
+        try {
+            buffer.array();
+            fail("Should throw UnsupportedOperationException");
+        } catch (UnsupportedOperationException e) {
+        }
+        try {
+            buffer.arrayOffset();
+            fail("Should throw UnsupportedOperationException");
+        } catch (UnsupportedOperationException e) {
+        }
+
+        byte[] result1 = new byte[data1.length];
+        byte[] result2 = new byte[data1.length];
+
+        buffer.get(result1);
+        assertArrayEquals(data1, result1);
+        assertFalse(buffer.hasArray());
+
+        buffer.reclaimRead();
+
+        assertTrue(buffer.hasArray());
+        assertNotNull(buffer.array());
+        assertSame(data2, buffer.array());
+        assertEquals(0, buffer.arrayOffset());
+
+        buffer.get(result2);
+        assertArrayEquals(data2, result2);
+        assertTrue(buffer.hasArray());
+
+        buffer.reclaimRead();
+        assertFalse(buffer.hasArray());
+
+        try {
+            buffer.array();
+            fail("Should throw UnsupportedOperationException");
+        } catch (UnsupportedOperationException e) {
+        }
+        try {
+            buffer.arrayOffset();
+            fail("Should throw UnsupportedOperationException");
+        } catch (UnsupportedOperationException e) {
+        }
+    }
+
+    //----- Test appending data to the buffer --------------------------------//
+
+    @Test
+    public void testAppendOne() {
+        CompositeReadableBuffer buffer = new CompositeReadableBuffer();
+
+        byte[] source = new byte[] { 0, 1, 2, 3 };
+
+        buffer.append(source);
+
+        assertEquals(4, buffer.remaining());
+        assertTrue(buffer.hasRemaining());
+        assertEquals(0, buffer.position());
+
+        assertTrue(buffer.hasArray());
+        assertSame(source, buffer.array());
+        assertEquals(0, buffer.getArrays().size());
+
+        assertEquals(-1, buffer.getCurrentIndex());
+    }
+
+    @Test
+    public void testAppendMoreThanOne() {
+        CompositeReadableBuffer buffer = new CompositeReadableBuffer();
+
+        byte[] source1 = new byte[] { 0, 1, 2, 3 };
+        byte[] source2 = new byte[] { 4, 5, 6, 7 };
+
+        buffer.append(source1);
+        assertTrue(buffer.hasArray());
+        assertSame(source1, buffer.array());
+        assertEquals(-1, buffer.getCurrentIndex());
+
+        buffer.append(source2);
+        assertFalse(buffer.hasArray());
+        assertEquals(2, buffer.getArrays().size());
+        assertEquals(0, buffer.getCurrentIndex());
+
+        byte[] source3 = new byte[] { 9, 10, 11, 12 };
+        buffer.append(source3);
+        assertFalse(buffer.hasArray());
+        assertEquals(3, buffer.getArrays().size());
+        assertEquals(0, buffer.getCurrentIndex());
+    }
+
+    @Test
+    public void testAppendNull() {
+        CompositeReadableBuffer buffer = new CompositeReadableBuffer();
+
+        try {
+            buffer.append(null);
+            fail("Should not be able to add a null array");
+        } catch (IllegalArgumentException iae) {}
+    }
+
+    @Test
+    public void testAppendEmpty() {
+        CompositeReadableBuffer buffer = new CompositeReadableBuffer();
+
+        try {
+            buffer.append(new byte[0]);
+            fail("Should not be able to add a empty array");
+        } catch (IllegalArgumentException iae) {}
+    }
+
+    //----- Test buffer compaction handling ----------------------------------//
+
+    @Test
+    public void testCompactEmptyBuffer() {
+        CompositeReadableBuffer buffer = new CompositeReadableBuffer();
+
+        try {
+            buffer.reclaimRead();
+        } catch (Throwable t) {
+            fail("Should not fail to compact empty buffer");
+        }
+
+        CompositeReadableBuffer slice = buffer.slice();
+
+        try {
+            slice.reclaimRead();
+        } catch (Throwable t) {
+            fail("Should not fail to compact empty slice");
+        }
+    }
+
+    @Test
+    public void testCompactSignleArrayBuffer() {
+        CompositeReadableBuffer buffer = new CompositeReadableBuffer();
+
+        byte[] source = new byte[] { 0, 1, 2, 3 };
+
+        buffer.append(source);
+
+        assertEquals(4, buffer.remaining());
+        assertTrue(buffer.hasRemaining());
+        assertEquals(0, buffer.position());
+        assertTrue(buffer.hasArray());
+        assertSame(source, buffer.array());
+        assertEquals(0, buffer.getArrays().size());
+
+        // Should not have any affect on buffer that is not consumed
+        buffer.reclaimRead();
+
+        assertEquals(4, buffer.remaining());
+        assertTrue(buffer.hasRemaining());
+        assertEquals(0, buffer.position());
+        assertTrue(buffer.hasArray());
+        assertSame(source, buffer.array());
+
+        // Should not have any affect on buffer that is not consumed
+        buffer.position(1);
+        buffer.reclaimRead();
+
+        assertEquals(3, buffer.remaining());
+        assertTrue(buffer.hasRemaining());
+        assertEquals(1, buffer.position());
+        assertTrue(buffer.hasArray());
+        assertSame(source, buffer.array());
+
+        // Should clear array from buffer as it is now consumed.
+        buffer.position(source.length);
+        buffer.reclaimRead();
+
+        assertEquals(0, buffer.remaining());
+        assertFalse(buffer.hasRemaining());
+        assertEquals(0, buffer.position());
+        assertFalse(buffer.hasArray());
+    }
+
+    @Test
+    public void testCompact() {
+        CompositeReadableBuffer buffer = new CompositeReadableBuffer();
+
+        buffer.append(new byte[] {0})
+              .append(new byte[] {1})
+              .append(new byte[] {2})
+              .append(new byte[] {3})
+              .append(new byte[] {4})
+              .append(new byte[] {5})
+              .append(new byte[] {6})
+              .append(new byte[] {7})
+              .append(new byte[] {8})
+              .append(new byte[] {9});
+
+        assertEquals(10, buffer.remaining());
+        assertEquals(10, buffer.getArrays().size());
+        assertFalse(buffer.hasArray());
+        assertTrue(buffer.hasRemaining());
+        assertEquals(0, buffer.position());
+
+        for (int i = 0; i < 10; ++i) {
+            assertEquals(i, buffer.get());
+            buffer.reclaimRead();
+            assertEquals(0, buffer.position());
+        }
+
+        assertTrue(buffer.getArrays().isEmpty());
+        assertFalse(buffer.hasArray());
+        assertEquals(0, buffer.remaining());
+        assertEquals(0, buffer.position());
+        assertEquals(0, buffer.getArrays().size());
+
+        try {
+            buffer.get();
+            fail("Should not be able to read past end");
+        } catch (BufferUnderflowException e) {}
+    }
+
+    @Test
+    public void testCompactWithDifferingBufferSizes() {
+        CompositeReadableBuffer buffer = new CompositeReadableBuffer();
+
+        buffer.append(new byte[] {0})
+              .append(new byte[] {1, 2})
+              .append(new byte[] {3, 4, 5})
+              .append(new byte[] {6})
+              .append(new byte[] {7})
+              .append(new byte[] {8, 9});
+
+        assertEquals(10, buffer.remaining());
+        assertFalse(buffer.hasArray());
+        assertTrue(buffer.hasRemaining());
+        assertEquals(0, buffer.position());
+
+        for (int i = 0; i < 10; ++i) {
+            assertEquals(i, buffer.get());
+            buffer.reclaimRead();
+        }
+
+        assertTrue(buffer.getArrays().isEmpty());
+        assertFalse(buffer.hasArray());
+        assertEquals(0, buffer.remaining());
+        assertEquals(0, buffer.position());
+
+        try {
+            buffer.get();
+            fail("Should not be able to read past end");
+        } catch (BufferUnderflowException e) {}
+    }
+
+    @Test
+    public void testCompactUpdatesMark() {
+        CompositeReadableBuffer buffer = new CompositeReadableBuffer();
+
+        byte[] source1 = new byte[] { 0, 1, 2, 3 };
+        byte[] source2 = new byte[] { 4, 5, 6, 7 };
+
+        buffer.append(source1).append(source2);
+
+        assertFalse(buffer.hasArray());
+        assertEquals(2, buffer.getArrays().size());
+        assertEquals(0, buffer.getCurrentIndex());
+
+        buffer.position(5);
+        buffer.mark();
+        assertEquals(5, buffer.get());
+        buffer.position(8);
+        buffer.reset();
+        assertEquals(5, buffer.position());
+        buffer.mark();
+
+        buffer.reclaimRead();
+        buffer.position(buffer.limit());
+        buffer.reset();
+        assertEquals(5, buffer.get());
+
+        assertFalse(buffer.getArrays().isEmpty());
+    }
+
+    @Test
+    public void testCompactThreeArraysToNone() {
+        CompositeReadableBuffer buffer = new CompositeReadableBuffer();
+
+        byte[] source1 = new byte[] { 0, 1, 2, 3 };
+        byte[] source2 = new byte[] { 4, 5, 6, 7 };
+        byte[] source3 = new byte[] { 8, 9, 10, 11 };
+
+        buffer.append(source1).append(source2).append(source3);
+
+        assertFalse(buffer.hasArray());
+        assertEquals(3, buffer.getArrays().size());
+        assertEquals(0, buffer.getCurrentIndex());
+        assertEquals(12, buffer.limit());
+        assertEquals(12, buffer.capacity());
+
+        buffer.position(4);
+        buffer.reclaimRead();
+
+        assertFalse(buffer.hasArray());
+        assertEquals(2, buffer.getArrays().size());
+        assertEquals(0, buffer.getCurrentIndex());
+        assertEquals(8, buffer.limit());
+        assertEquals(8, buffer.capacity());
+
+        buffer.position(4);
+        buffer.reclaimRead();
+
+        // TODO - Right now we hold off purging the array list until everything is consumed.
+        assertTrue(buffer.hasArray());
+        assertEquals(1, buffer.getArrays().size());
+        assertEquals(0, buffer.getCurrentIndex());
+        assertEquals(4, buffer.limit());
+        assertEquals(4, buffer.capacity());
+
+        buffer.position(4);
+        buffer.reclaimRead();
+
+        assertFalse(buffer.hasArray());
+        assertEquals(0, buffer.getArrays().size());
+        assertEquals(-1, buffer.getCurrentIndex());
+        assertEquals(0, buffer.limit());
+        assertEquals(0, buffer.capacity());
+    }
+
+    @Test
+    public void testCompactAllBuffersInOneShot() {
+        CompositeReadableBuffer buffer = new CompositeReadableBuffer();
+
+        byte[] source1 = new byte[] { 0, 1, 2, 3 };
+        byte[] source2 = new byte[] { 4, 5, 6, 7 };
+        byte[] source3 = new byte[] { 8, 9, 10, 11 };
+        byte[] source4 = new byte[] { 12, 13, 14, 15 };
+
+        int size = source1.length + source2.length + source3.length + source4.length;
+
+        buffer.append(source1).append(source2).append(source3).append(source4);
+
+        assertFalse(buffer.hasArray());
+        assertEquals(4, buffer.getArrays().size());
+        assertEquals(0, buffer.getCurrentIndex());
+        assertEquals(size, buffer.limit());
+        assertEquals(size, buffer.capacity());
+
+        buffer.position(buffer.limit());
+        buffer.reclaimRead();
+
+        assertFalse(buffer.hasArray());
+        assertEquals(0, buffer.getArrays().size());
+        assertEquals(-1, buffer.getCurrentIndex());
+        assertEquals(0, buffer.limit());
+        assertEquals(0, buffer.capacity());
+    }
+
+    @Test
+    public void testAppendAfterCompact() {
+        CompositeReadableBuffer buffer = new CompositeReadableBuffer();
+
+        byte[] source1 = new byte[] { 0, 1, 2, 3 };
+        byte[] source2 = new byte[] { 4, 5, 6, 7 };
+
+        buffer.append(source1).append(source2);
+
+        assertFalse(buffer.hasArray());
+        assertEquals(2, buffer.getArrays().size());
+        assertEquals(0, buffer.getCurrentIndex());
+
+        buffer.position(4);
+        buffer.reclaimRead();
+
+        assertEquals(0, buffer.position());
+        assertEquals(4, buffer.limit());
+        assertEquals(4, buffer.capacity());
+        assertEquals(1, buffer.getArrays().size());
+
+        byte[] source3 = new byte[] { 8, 9, 10, 11 };
+
+        buffer.append(source3);
+
+        buffer.position(4);
+
+        for (int i = 0; i < source3.length; ++i) {
+            assertEquals(source3[i], buffer.get());
+        }
+
+        assertFalse(buffer.getArrays().isEmpty());
+        buffer.reclaimRead();
+        assertTrue(buffer.getArrays().isEmpty());
+    }
+
+    //----- Tests on Mark ----------------------------------------------------//
+
+    @Test
+    public void testMark() {
+        CompositeReadableBuffer buffer = new CompositeReadableBuffer();
+
+        buffer.append(new byte[] {0, 1, 2, 3, 4}).append(new byte[] {5, 6, 7, 8, 9});
+
+        // save state
+        int oldPosition = buffer.position();
+        int oldLimit = buffer.limit();
+
+        assertEquals(0, oldPosition);
+        assertEquals(10, oldLimit);
+
+        CompositeReadableBuffer self = buffer.mark();
+        assertSame(self, buffer);
+
+        buffer.mark();
+        buffer.position(buffer.limit());
+        buffer.reset();
+        assertEquals(buffer.position(), oldPosition);
+
+        buffer.mark();
+        buffer.position(buffer.limit());
+        buffer.reset();
+        assertEquals(buffer.position(), oldPosition);
+
+        // restore state
+        buffer.limit(oldLimit);
+        buffer.position(oldPosition);
+    }
+
+    //----- Tests on Reset ---------------------------------------------------//
+
+    @Test
+    public void testReset() {
+        CompositeReadableBuffer buffer = new CompositeReadableBuffer();
+
+        buffer.append(new byte[] {0, 1, 2, 3, 4}).append(new byte[] {5, 6, 7, 8, 9});
+
+        try {
+            buffer.reset();
+            fail("Should throw InvalidMarkException when not marked.");
+        } catch (InvalidMarkException e) {
+        }
+
+        // save state
+        int oldPosition = buffer.position();
+        int oldLimit = buffer.limit();
+
+        assertEquals(0, oldPosition);
+        assertEquals(10, oldLimit);
+
+        buffer.mark();
+        buffer.position(buffer.limit());
+        buffer.reset();
+        assertEquals(buffer.position(), oldPosition);
+
+        buffer.mark();
+        buffer.position(buffer.limit());
+        buffer.reset();
+        assertEquals(buffer.position(), oldPosition);
+
+        // Can call as mark is not cleared on reset.
+        CompositeReadableBuffer self = buffer.reset();
+        assertSame(self, buffer);
+    }
+
+    //----- Tests on Rewind --------------------------------------------------//
+
+    @Test
+    public void testRewind() {
+        CompositeReadableBuffer buffer = new CompositeReadableBuffer();
+
+        buffer.append(new byte[] {0, 1, 2, 3, 4}).append(new byte[] {5, 6, 7, 8, 9});
+
+        // save state
+        int oldPosition = buffer.position();
+        int oldLimit = buffer.limit();
+
+        assertEquals(0, oldPosition);
+        assertEquals(10, oldLimit);
+
+        CompositeReadableBuffer self = buffer.rewind();
+        assertEquals(buffer.position(), 0);
+        assertSame(self, buffer);
+
+        try {
+            buffer.reset();
+            fail("Should throw InvalidMarkException");
+        } catch (InvalidMarkException e) {
+        }
+    }
+
+    //----- Tests on Flip ----------------------------------------------------//
+
+    @Test
+    public void testFlipWhenEmpty() {
+        CompositeReadableBuffer buffer = new CompositeReadableBuffer();
+        CompositeReadableBuffer ret = buffer.flip();
+
+        assertSame(ret, buffer);
+        assertEquals(buffer.position(), 0);
+        assertEquals(buffer.limit(), 0);
+    }
+
+    @Test
+    public void testFlipWithOneArray() {
+        CompositeReadableBuffer buffer = new CompositeReadableBuffer();
+
+        buffer.append(new byte[] {0, 1, 2, 3, 4, 5, 6, 7, 8, 9});
+
+        // save state
+        int oldPosition = buffer.position();
+        int oldLimit = buffer.limit();
+
+        assertEquals(0, oldPosition);
+        assertEquals(10, oldLimit);
+
+        CompositeReadableBuffer ret = buffer.flip();
+        assertSame(ret, buffer);
+        assertEquals(buffer.position(), 0);
+        assertEquals(buffer.limit(), oldPosition);
+
+        try {
+            buffer.reset();
+            fail("Should throw InvalidMarkException");
+        } catch (InvalidMarkException e) {
+        }
+    }
+
+    @Test
+    public void testFlipWithMultipleArrays() {
+        CompositeReadableBuffer buffer = new CompositeReadableBuffer();
+
+        buffer.append(new byte[] {0, 1, 2, 3, 4}).append(new byte[] {5, 6, 7, 8, 9});
+
+        // save state
+        int oldPosition = buffer.position();
+        int oldLimit = buffer.limit();
+
+        assertEquals(0, oldPosition);
+        assertEquals(10, oldLimit);
+
+        CompositeReadableBuffer ret = buffer.flip();
+        assertSame(ret, buffer);
+        assertEquals(buffer.position(), 0);
+        assertEquals(buffer.limit(), oldPosition);
+
+        try {
+            buffer.reset();
+            fail("Should throw InvalidMarkException");
+        } catch (InvalidMarkException e) {
+        }
+    }
+
+    @Test
+    public void testFlipSliceWithOneArray() {
+        CompositeReadableBuffer buffer = new CompositeReadableBuffer();
+        buffer.append(new byte[] {0, 1, 2, 3, 4, 5, 6, 7, 8, 9});
+
+        buffer.mark();
+        buffer.position(1);
+        buffer.limit(buffer.remaining() - 1);
+
+        ReadableBuffer slice = buffer.slice();
+
+        assertEquals(1, slice.get(0));
+        slice.position(1);
+        slice.mark();
+        slice.flip();
+
+        assertEquals(1, slice.get(0));
+        assertEquals(1, slice.limit());
+
+        try {
+            slice.reset();
+            fail("Should throw InvalidMarkException");
+        } catch (InvalidMarkException e) {
+        }
+
+        buffer.reset();
+        assertEquals(0, buffer.position());
+        assertEquals(buffer.remaining(), buffer.limit());
+    }
+
+    @Test
+    public void testFlipSliceWithMultipleArrays() {
+        CompositeReadableBuffer buffer = new CompositeReadableBuffer();
+        buffer.append(new byte[] {0, 1, 2, 3, 4}).append(new byte[] {5, 6, 7, 8, 9});
+
+        buffer.mark();
+        buffer.position(5);
+
+        ReadableBuffer slice = buffer.slice();
+
+        assertEquals(5, slice.get(0));
+        slice.position(1);
+        slice.mark();
+        slice.flip();
+
+        assertEquals(5, slice.get(0));
+        assertEquals(1, slice.limit());
+
+        try {
+            slice.reset();
+            fail("Should throw InvalidMarkException");
+        } catch (InvalidMarkException e) {
+        }
+
+        buffer.reset();
+        assertEquals(0, buffer.position());
+        assertEquals(buffer.remaining(), buffer.limit());
+    }
+
+    //----- Tests on Clear --------------------------------------------------//
+
+    @Test
+    public void testClear() {
+        CompositeReadableBuffer buffer = new CompositeReadableBuffer();
+
+        buffer.append(new byte[] {0, 1, 2, 3, 4}).append(new byte[] {5, 6, 7, 8, 9});
+
+        assertEquals(0, buffer.position());
+        assertEquals(10, buffer.limit());
+
+        buffer.position(5);
+        assertEquals(5, buffer.position());
+        buffer.mark();
+
+        CompositeReadableBuffer self = buffer.clear();
+        assertEquals(0, buffer.position());
+        assertEquals(10, buffer.limit());
+        assertSame(self, buffer);
+
+        try {
+            buffer.reset();
+            fail("Should throw InvalidMarkException");
+        } catch (InvalidMarkException e) {
+        }
+    }
+
+    //----- Test various cases of Duplicate ----------------------------------//
+
+    @Test
+    public void testDuplicateWithSingleArrayContent() {
+        CompositeReadableBuffer buffer = new CompositeReadableBuffer();
+
+        buffer.append(new byte[] {0, 1, 2, 3, 4, 5, 6, 7, 8, 9});
+        buffer.mark();
+        buffer.position(buffer.limit());
+
+        // duplicate's contents should be the same as buffer
+        CompositeReadableBuffer duplicate = buffer.duplicate();
+        assertNotSame(buffer, duplicate);
+        assertEquals(buffer.capacity(), duplicate.capacity());
+        assertEquals(buffer.position(), duplicate.position());
+        assertEquals(buffer.limit(), duplicate.limit());
+        assertContentEquals(buffer, duplicate);
+
+        // duplicate's position, mark, and limit should be independent to buffer
+        duplicate.reset();
+        assertEquals(duplicate.position(), 0);
+        duplicate.clear();
+        assertEquals(buffer.position(), buffer.limit());
+        buffer.reset();
+        assertEquals(buffer.position(), 0);
+
+        // One array buffer should share backing array
+        assertTrue(buffer.hasArray());
+        assertTrue(duplicate.hasArray());
+        assertSame(buffer.array(), duplicate.array());
+        assertTrue(buffer.getArrays().isEmpty());
+        assertTrue(duplicate.getArrays().isEmpty());
+        assertSame(buffer.getArrays(), duplicate.getArrays());  // Same Empty Buffer
+    }
+
+    @Test
+    public void testDuplicateWithSingleArrayContentCompactionIsNoOpWhenNotRead() {
+        CompositeReadableBuffer buffer = new CompositeReadableBuffer();
+        buffer.append(new byte[] {0, 1, 2, 3, 4, 5, 6, 7, 8, 9});
+
+        CompositeReadableBuffer duplicate = buffer.duplicate();
+
+        assertEquals(10, buffer.capacity());
+        assertEquals(buffer.capacity(), duplicate.capacity());
+
+        buffer.reclaimRead();
+        assertEquals(10, buffer.capacity());
+        assertEquals(buffer.capacity(), duplicate.capacity());
+
+        duplicate.reclaimRead();
+        assertEquals(10, buffer.capacity());
+        assertEquals(buffer.capacity(), duplicate.capacity());
+    }
+
+    @Test
+    public void testDuplicateWithSingleArrayContentCompactionLeavesOtherIntact() {
+        CompositeReadableBuffer buffer = new CompositeReadableBuffer();
+        buffer.append(new byte[] {0, 1, 2, 3, 4, 5, 6, 7, 8, 9});
+
+        CompositeReadableBuffer duplicate = buffer.duplicate();
+
+        assertEquals(10, buffer.capacity());
+        assertEquals(buffer.capacity(), duplicate.capacity());
+
+        duplicate.position(duplicate.limit());
+        assertFalse(duplicate.hasRemaining());
+
+        assertTrue(duplicate.hasArray());
+        duplicate.reclaimRead();
+        assertFalse(duplicate.hasArray());
+        assertEquals(0, duplicate.capacity());
+
+        // Buffer should be unaffected
+        assertEquals(10, buffer.capacity());
+        assertTrue(buffer.hasArray());
+    }
+
+    @Test
+    public void testDuplicateWithMulitiArrayContent() {
+        CompositeReadableBuffer buffer = new CompositeReadableBuffer();
+
+        buffer.append(new byte[] {0, 1, 2, 3, 4}).append(new byte[] {5, 6, 7, 8, 9});
+        buffer.mark();
+        buffer.position(buffer.limit());
+
+        // duplicate's contents should be the same as buffer
+        CompositeReadableBuffer duplicate = buffer.duplicate();
+        assertNotSame(buffer, duplicate);
+        assertEquals(buffer.capacity(), duplicate.capacity());
+        assertEquals(buffer.position(), duplicate.position());
+        assertEquals(buffer.limit(), duplicate.limit());
+        assertContentEquals(buffer, duplicate);
+
+        // duplicate's position, mark, and limit should be independent to buffer
+        duplicate.reset();
+        assertEquals(duplicate.position(), 0);
+        duplicate.clear();
+        assertEquals(buffer.position(), buffer.limit());
+        buffer.reset();
+        assertEquals(buffer.position(), 0);
+
+        // One array buffer should share backing array
+        assertFalse(buffer.hasArray());
+        assertFalse(duplicate.hasArray());
+        assertFalse(buffer.getArrays().isEmpty());
+        assertFalse(duplicate.getArrays().isEmpty());
+        assertNotSame(buffer.getArrays(), duplicate.getArrays());
+    }
+
+    @Test
+    public void testDuplicateWithMultiArrayContentCompactionLeavesOtherIntact() {
+        CompositeReadableBuffer buffer = new CompositeReadableBuffer();
+        buffer.append(new byte[] {0, 1, 2, 3, 4}).append(new byte[] {5, 6, 7, 8, 9});
+
+        CompositeReadableBuffer duplicate = buffer.duplicate();
+
+        assertEquals(10, buffer.capacity());
+        assertEquals(buffer.capacity(), duplicate.capacity());
+
+        duplicate.position(duplicate.limit());
+        assertFalse(duplicate.hasRemaining());
+
+        assertFalse(duplicate.hasArray());
+        duplicate.reclaimRead();
+        assertFalse(duplicate.hasArray());
+        assertEquals(0, duplicate.capacity());
+        assertEquals(0, duplicate.getArrays().size());
+
+        // Buffer should be unaffected
+        assertEquals(10, buffer.capacity());
+        assertFalse(buffer.hasArray());
+        assertEquals(2, buffer.getArrays().size());
+    }
+
+    //----- Test various cases of Slice --------------------------------------//
+
+    @Test
+    public void testSlice() {
+        CompositeReadableBuffer buffer = new CompositeReadableBuffer();
+
+        byte[] data = new byte[] {0, 1, 2, 3, 4, 5, 6, 7, 8, 9};
+        buffer.append(data);
+
+        assertEquals(buffer.capacity(), data.length);
+        buffer.position(1);
+        buffer.limit(buffer.capacity() - 2);
+
+        ReadableBuffer slice = buffer.slice();
+        assertEquals(slice.position(), 0);
+        assertEquals(slice.limit(), buffer.remaining());
+        assertEquals(slice.capacity(), buffer.remaining());
+        assertEquals(1, slice.get());
+
+        try {
+            slice.reset();
+            fail("Should throw InvalidMarkException");
+        } catch (InvalidMarkException e) {
+        }
+    }
+
+    @Test
+    public void testSliceOnEmptyBuffer() {
+        CompositeReadableBuffer buffer = new CompositeReadableBuffer();
+        CompositeReadableBuffer slice = buffer.slice();
+
+        // Sliced contents should be the same as buffer
+        assertNotSame(buffer, slice);
+        assertEquals(buffer.capacity(), slice.capacity());
+        assertEquals(buffer.position(), slice.position());
+        assertEquals(buffer.limit(), slice.limit());
+        assertContentEquals(buffer, slice);
+
+        try {
+            slice.reclaimRead();
+        } catch (Throwable t) {
+            fail("Compacting an empty slice should not fail");
+        }
+    }
+
+    @Test
+    public void testSliceIgnoresAppends() {
+        CompositeReadableBuffer buffer = new CompositeReadableBuffer();
+        buffer.append(new byte[] {0, 1, 2, 3, 4, 5, 6, 7, 8, 9});
+
+        // Sliced contents should be the same as buffer
+        CompositeReadableBuffer slice = buffer.slice();
+        assertNotSame(buffer, slice);
+
+        try {
+            slice.append(new byte[] { 10 });
+            fail("Should not be allowed to append to a slice, must throw ReadOnlyBufferException");
+        } catch (IllegalStateException ise) {}
+    }
+
+    @Test
+    public void testSliceWithSingleArrayContent() {
+        CompositeReadableBuffer buffer = new CompositeReadableBuffer();
+
+        buffer.append(new byte[] {0, 1, 2, 3, 4, 5, 6, 7, 8, 9});
+        buffer.mark();
+
+        // Sliced contents should be the same as buffer
+        CompositeReadableBuffer slice = buffer.slice();
+        assertNotSame(buffer, slice);
+        assertEquals(buffer.capacity(), slice.capacity());
+        assertEquals(buffer.position(), slice.position());
+        assertEquals(buffer.limit(), slice.limit());
+        assertContentEquals(buffer, slice);
+
+        // Sliced position, mark, and limit should be independent to buffer
+        try {
+            slice.reset();
+            fail("Mark should be undefined in the slice and throw InvalidMarkException");
+        } catch (InvalidMarkException e) {}
+
+        assertEquals(slice.position(), 0);
+        slice.clear();
+        assertEquals(10, buffer.limit());
+        buffer.reset();
+        assertEquals(buffer.position(), 0);
+
+        // One array buffer should share backing array
+        assertTrue(buffer.hasArray());
+        assertTrue(slice.hasArray());
+        assertSame(buffer.array(), slice.array());
+        assertTrue(buffer.getArrays().isEmpty());
+        assertTrue(slice.getArrays().isEmpty());
+        assertSame(buffer.getArrays(), slice.getArrays());  // Same Empty Buffer
+    }
+
+    @Test
+    public void testSliceWithSingleArrayContentSourcePartiallyRead() {
+        CompositeReadableBuffer buffer = new CompositeReadableBuffer();
+
+        buffer.append(new byte[] {0, 1, 2, 3, 4, 5, 6, 7, 8, 9});
+        buffer.position(5);
+
+        // Sliced contents should be the same as buffer
+        CompositeReadableBuffer slice = buffer.slice();
+        assertNotSame(buffer, slice);
+        assertEquals(10, buffer.capacity());
+        assertEquals(5, slice.capacity());
+        assertEquals(5, buffer.position());
+        assertEquals(0, slice.position());
+        assertEquals(10, buffer.limit());
+        assertEquals(5, slice.limit());
+        assertSpanEquals(buffer, slice);
+
+        slice.position(1);
+        assertEquals(6, slice.get());
+        assertEquals(5, slice.get(0));
+
+        try {
+            slice.limit(6);
+            fail("should throw IllegalArgumentException");
+        } catch (IllegalArgumentException iae) {}
+
+        try {
+            slice.position(6);
+            fail("should throw IllegalArgumentException");
+        } catch (IllegalArgumentException iae) {}
+    }
+
+    @Test
+    public void testSliceWithMulitpleArraysContentSourcePartiallyRead() {
+        CompositeReadableBuffer buffer = new CompositeReadableBuffer();
+
+        buffer.append(new byte[] {0, 1, 2, 3, 4}).append(new byte[] {5, 6, 7, 8, 9});
+        buffer.position(5);
+
+        // Sliced contents should be the same as buffer
+        CompositeReadableBuffer slice = buffer.slice();
+        assertNotSame(buffer, slice);
+        assertEquals(10, buffer.capacity());
+        assertEquals(5, slice.capacity());
+        assertEquals(5, buffer.position());
+        assertEquals(0, slice.position());
+        assertEquals(10, buffer.limit());
+        assertEquals(5, slice.limit());
+        assertSpanEquals(buffer, slice);
+
+        slice.position(1);
+        assertEquals(6, slice.get());
+        assertEquals(5, slice.get(0));
+
+        try {
+            slice.limit(6);
+            fail("should throw IllegalArgumentException");
+        } catch (IllegalArgumentException iae) {}
+
+        try {
+            slice.position(6);
+            fail("should throw IllegalArgumentException");
+        } catch (IllegalArgumentException iae) {}
+    }
+
+    //----- Test various cases of byteBuffer ---------------------------------//
+
+    @Test
+    public void testByteBufferFromEmptyBuffer() {
+        CompositeReadableBuffer buffer = new CompositeReadableBuffer();
+
+        ByteBuffer byteBuffer = buffer.byteBuffer();
+
+        assertNotNull(byteBuffer);
+        assertEquals(0, byteBuffer.position());
+        assertEquals(0, byteBuffer.limit());
+        assertEquals(0, byteBuffer.capacity());
+
+        // Our ByteBuffer results should be read-only and indicate no array.
+        assertTrue(byteBuffer.isReadOnly());
+        assertFalse(byteBuffer.hasArray());
+    }
+
+    @Test
+    public void testByteBufferOnSingleArrayContent() {
+        CompositeReadableBuffer buffer = new CompositeReadableBuffer();
+
+        buffer.append(new byte[] {9, 8, 7, 6, 5, 4, 3, 2, 1, 0});
+
+        ByteBuffer byteBuffer = buffer.byteBuffer();
+
+        assertNotNull(byteBuffer);
+        assertEquals(0, byteBuffer.position());
+        assertEquals(10, byteBuffer.limit());
+        assertEquals(10, byteBuffer.capacity());
+
+        // Our ByteBuffer results should be read-only and indicate no array.
+        assertTrue(byteBuffer.isReadOnly());
+        assertFalse(byteBuffer.hasArray());
+
+        for (int i = 0; i < 10; ++i) {
+            assertEquals(buffer.get(i), byteBuffer.get(i));
+        }
+    }
+
+    @Test
+    public void testByteBufferOnMultipleArrayContent() {
+        CompositeReadableBuffer buffer = new CompositeReadableBuffer();
+
+        buffer.append(new byte[] {9, 8, 7, 6, 5}).append(new byte[] {4, 3, 2, 1, 0});
+
+        ByteBuffer byteBuffer = buffer.byteBuffer();
+
+        assertNotNull(byteBuffer);
+        assertEquals(0, byteBuffer.position());
+        assertEquals(10, byteBuffer.limit());
+        assertEquals(10, byteBuffer.capacity());
+
+        // Our ByteBuffer results should be read-only and indicate no array.
+        assertTrue(byteBuffer.isReadOnly());
+        assertFalse(byteBuffer.hasArray());
+
+        for (int i = 0; i < 10; ++i) {
+            assertEquals(buffer.get(i), byteBuffer.get(i));
+        }
+    }
+
+    @Test
+    public void testByteBufferOnMultipleArrayContentWithLimits() {
+        CompositeReadableBuffer buffer = new CompositeReadableBuffer();
+
+        buffer.append(new byte[] {9, 8, 7, 6, 5}).append(new byte[] {4, 3, 2, 1, 0});
+
+        buffer.position(3);
+        buffer.limit(9);
+
+        assertEquals(6, buffer.remaining());
+
+        ByteBuffer byteBuffer = buffer.byteBuffer();
+
+        assertNotNull(byteBuffer);
+        assertEquals(0, byteBuffer.position());
+        assertEquals(6, byteBuffer.limit());
+        assertEquals(6, byteBuffer.capacity());
+
+        // Our ByteBuffer results should be read-only and indicate no array.
+        assertTrue(byteBuffer.isReadOnly());
+        assertFalse(byteBuffer.hasArray());
+
+        for (int i = 0; i < 6; ++i) {
+            assertEquals(buffer.get(), byteBuffer.get());
+        }
+    }
+
+    //----- Test put ReadableBuffer ------------------------------------------//
+
+    @Test
+    public void testReadStringFromEmptyBuffer() throws CharacterCodingException {
+        CompositeReadableBuffer buffer = new CompositeReadableBuffer();
+
+        assertNull(buffer.readString(StandardCharsets.UTF_8.newDecoder()));
+    }
+
+    @Test
+    public void testReadStringFromUTF8InSingleArray() throws CharacterCodingException {
+        CompositeReadableBuffer buffer = new CompositeReadableBuffer();
+
+        final String testString = "Test String to Decode!";
+        byte[] encoded = testString.getBytes(StandardCharsets.UTF_8);
+
+        buffer.append(encoded);
+
+        assertEquals(testString, buffer.readString(StandardCharsets.UTF_8.newDecoder()));
+    }
+
+    @Test
+    public void testReadStringFromUTF8InSingleArrayWithLimits() throws CharacterCodingException {
+        CompositeReadableBuffer buffer = new CompositeReadableBuffer();
+
+        final String testString = "Test String to Decode!";
+        byte[] encoded = testString.getBytes(StandardCharsets.UTF_8);
+
+        // Only read the first character
+        buffer.append(encoded);
+        buffer.limit(1);
+
+        assertEquals("T", buffer.readString(StandardCharsets.UTF_8.newDecoder()));
+    }
+
+    @Test
+    public void testReadStringFromUTF8InMulitpleArrays() throws CharacterCodingException {
+        CompositeReadableBuffer buffer = new CompositeReadableBuffer();
+
+        final String testString = "Test String to Decode!!";
+        byte[] encoded = testString.getBytes(StandardCharsets.UTF_8);
+
+        byte[] first = new byte[encoded.length / 2];
+        byte[] second = new byte[encoded.length - (encoded.length / 2)];
+
+        System.arraycopy(encoded, 0, first, 0, first.length);
+        System.arraycopy(encoded, first.length, second, 0, second.length);
+
+        buffer.append(first).append(second);
+
+        String result = buffer.readString(StandardCharsets.UTF_8.newDecoder());
+
+        assertEquals(testString, result);
+    }
+
+    @Test
+    public void testReadStringFromUTF8InMultipleArraysWithLimits() throws CharacterCodingException {
+        CompositeReadableBuffer buffer = new CompositeReadableBuffer();
+
+        final String testString = "Test String to Decode!";
+        byte[] encoded = testString.getBytes(StandardCharsets.UTF_8);
+
+        byte[] first = new byte[encoded.length / 2];
+        byte[] second = new byte[encoded.length - (encoded.length / 2)];
+
+        System.arraycopy(encoded, 0, first, 0, first.length);
+        System.arraycopy(encoded, first.length, second, 0, second.length);
+
+        buffer.append(first).append(second);
+
+        // Only read the first character
+        buffer.limit(1);
+
+        assertEquals("T", buffer.readString(StandardCharsets.UTF_8.newDecoder()));
+    }
+
+    //----- Tests for hashCode -----------------------------------------------//
+
+    @Test
+    public void testHashCodeNotFromIdentity() throws CharacterCodingException {
+        CompositeReadableBuffer buffer = new CompositeReadableBuffer();
+
+        assertEquals(1, buffer.hashCode());
+
+        byte[] data = new byte[] {9, 8, 7, 6, 5, 4, 3, 2, 1, 0};
+
+        buffer.append(data);
+
+        assertTrue(buffer.hashCode() != 1);
+        assertNotEquals(buffer.hashCode(), System.identityHashCode(buffer));
+        assertEquals(buffer.hashCode(), buffer.hashCode());
+    }
+
+    @Test
+    public void testHashCodeOnSameBackingBuffer() throws CharacterCodingException {
+        CompositeReadableBuffer buffer1 = new CompositeReadableBuffer();
+        CompositeReadableBuffer buffer2 = new CompositeReadableBuffer();
+        CompositeReadableBuffer buffer3 = new CompositeReadableBuffer();
+
+        byte[] data = new byte[] {9, 8, 7, 6, 5, 4, 3, 2, 1, 0};
+
+        buffer1.append(data);
+        buffer2.append(data);
+        buffer3.append(data);
+
+        assertEquals(buffer1.hashCode(), buffer2.hashCode());
+        assertEquals(buffer2.hashCode(), buffer3.hashCode());
+        assertEquals(buffer3.hashCode(), buffer1.hashCode());
+    }
+
+    @Test
+    public void testHashCodeOnDifferentBackingBuffer() throws CharacterCodingException {
+        CompositeReadableBuffer buffer1 = new CompositeReadableBuffer();
+        CompositeReadableBuffer buffer2 = new CompositeReadableBuffer();
+
+        byte[] data1 = new byte[] {9, 8, 7, 6, 5, 4, 3, 2, 1, 0};
+        byte[] data2 = new byte[] {0, 1, 2, 3, 4, 5, 6, 7, 8, 9};
+
+        buffer1.append(data1);
+        buffer2.append(data2);
+
+        assertNotEquals(buffer1.hashCode(), buffer2.hashCode());
+    }
+
+    @Test
+    public void testHashCodeOnSplitBufferContentsNotSame() throws CharacterCodingException {
+        CompositeReadableBuffer buffer1 = new CompositeReadableBuffer();
+        CompositeReadableBuffer buffer2 = new CompositeReadableBuffer();
+
+        byte[] data1 = new byte[] {9, 8, 7, 6, 5, 4, 3, 2, 1, 0};
+        byte[] data2 = new byte[] {0, 1, 2, 3, 4, 5, 6, 7, 8, 9};
+
+        buffer1.append(data1).append(data2);
+        buffer2.append(data2).append(data1);
+
+        assertNotEquals(buffer1.hashCode(), buffer2.hashCode());
+    }
+
+    @Test
+    public void testHashCodeOnSplitBufferContentsSame() throws CharacterCodingException {
+        CompositeReadableBuffer buffer1 = new CompositeReadableBuffer();
+        CompositeReadableBuffer buffer2 = new CompositeReadableBuffer();
+
+        byte[] data1 = new byte[] {9, 8, 7, 6, 5, 4, 3, 2, 1, 0};
+        byte[] data2 = new byte[] {0, 1, 2, 3, 4, 5, 6, 7, 8, 9};
+
+        buffer1.append(data1).append(data2);
+        buffer2.append(data1).append(data2);
+
+        assertEquals(buffer1.hashCode(), buffer2.hashCode());
+    }
+
+    @Test
+    public void testHashCodeMatchesByteBufferSingleArrayContents() throws CharacterCodingException {
+        byte[] data = new byte[] {9, 8, 7, 6, 5, 4, 3, 2, 1, 0};
+
+        CompositeReadableBuffer buffer1 = new CompositeReadableBuffer();
+        buffer1.append(data);
+
+        ByteBuffer buffer2 = ByteBuffer.wrap(data);
+
+        assertEquals(buffer1.hashCode(), buffer2.hashCode());
+    }
+
+    @Test
+    public void testHashCodeMatchesByteBufferSingleArrayContentsWithSlice() throws CharacterCodingException {
+        byte[] data = new byte[] {9, 8, 7, 6, 5, 4, 3, 2, 1, 0};
+
+        CompositeReadableBuffer buffer1 = new CompositeReadableBuffer();
+        buffer1.append(data);
+
+        ByteBuffer buffer2 = ByteBuffer.wrap(data);
+
+        ReadableBuffer slice1 = buffer1.position(1).slice();
+        ByteBuffer slice2 = ((ByteBuffer) buffer2.position(1)).slice();
+
+        assertEquals(slice1.hashCode(), slice2.hashCode());
+    }
+
+    @Test
+    public void testHashCodeMatchesByteBufferMultipleArrayContents() throws CharacterCodingException {
+        byte[] data = new byte[] {9, 8, 7, 6, 5, 4, 3, 2, 1, 0};
+
+        byte[] data1 = new byte[] {9, 8, 7, 6, 5};
+        byte[] data2 = new byte[] {4, 3, 2, 1, 0};
+
+        CompositeReadableBuffer buffer1 = new CompositeReadableBuffer();
+        buffer1.append(data1);
+        buffer1.append(data2);
+
+        ByteBuffer buffer2 = ByteBuffer.wrap(data);
+
+        assertEquals(buffer1.hashCode(), buffer2.hashCode());
+    }
+
+    @Test
+    public void testHashCodeMatchesByteBufferMultipleArrayContentsWithSlice() throws CharacterCodingException {
+        byte[] data = new byte[] {9, 8, 7, 6, 5, 4, 3, 2, 1, 0};
+
+        byte[] data1 = new byte[] {9, 8, 7, 6, 5};
+        byte[] data2 = new byte[] {4, 3, 2, 1, 0};
+
+        CompositeReadableBuffer buffer1 = new CompositeReadableBuffer();
+        buffer1.append(data1);
+        buffer1.append(data2);
+
+        ByteBuffer buffer2 = ByteBuffer.wrap(data);
+
+        ReadableBuffer slice1 = buffer1.position(1).limit(4).slice();
+        ByteBuffer slice2 = ((ByteBuffer) buffer2.position(1).limit(4)).slice();
+
+        assertEquals(slice1.hashCode(), slice2.hashCode());
+    }
+
+    //----- Tests for equals -------------------------------------------------//
+
+    @Test
+    public void testEqualsSelf() throws CharacterCodingException {
+        CompositeReadableBuffer buffer = new CompositeReadableBuffer();
+
+        assertEquals(buffer, buffer);
+
+        byte[] data = new byte[] {9, 8, 7, 6, 5, 4, 3, 2, 1, 0};
+
+        buffer.append(data);
+
+        assertEquals(buffer, buffer);
+    }
+
+    @Test
+    public void testEqualsOnSameBackingBuffer() throws CharacterCodingException {
+        CompositeReadableBuffer buffer1 = new CompositeReadableBuffer();
+        CompositeReadableBuffer buffer2 = new CompositeReadableBuffer();
+        CompositeReadableBuffer buffer3 = new CompositeReadableBuffer();
+
+        byte[] data = new byte[] {9, 8, 7, 6, 5, 4, 3, 2, 1, 0};
+
+        buffer1.append(data);
+        buffer2.append(data);
+        buffer3.append(data);
+
+        assertEquals(buffer1, buffer2);
+        assertEquals(buffer2, buffer3);
+        assertEquals(buffer3, buffer1);
+    }
+
+    @Test
+    public void testEqualsOnDifferentBackingBuffer() throws CharacterCodingException {
+        CompositeReadableBuffer buffer1 = new CompositeReadableBuffer();
+        CompositeReadableBuffer buffer2 = new CompositeReadableBuffer();
+
+        byte[] data1 = new byte[] {9, 8, 7, 6, 5, 4, 3, 2, 1, 0};
+        byte[] data2 = new byte[] {0, 1, 2, 3, 4, 5, 6, 7, 8, 9};
+
+        buffer1.append(data1);
+        buffer2.append(data2);
+
+        assertNotEquals(buffer1, buffer2);
+    }
+
+    @Test
+    public void testEqualsWhenContentsInMultipleArraysNotSame() throws CharacterCodingException {
+        CompositeReadableBuffer buffer1 = new CompositeReadableBuffer();
+        CompositeReadableBuffer buffer2 = new CompositeReadableBuffer();
+
+        byte[] data1 = new byte[] {9, 8, 7, 6, 5, 4, 3, 2, 1, 0};
+        byte[] data2 = new byte[] {0, 1, 2, 3, 4, 5, 6, 7, 8, 9};
+
+        buffer1.append(data1).append(data2);
+        buffer2.append(data2).append(data1);
+
+        assertNotEquals(buffer1, buffer2);
+    }
+
+    @Test
+    public void testEqualsWhenContentsInMultipleArraysSame() throws CharacterCodingException {
+        CompositeReadableBuffer buffer1 = new CompositeReadableBuffer();
+        CompositeReadableBuffer buffer2 = new CompositeReadableBuffer();
+
+        byte[] data1 = new byte[] {9, 8, 7, 6, 5, 4, 3, 2, 1, 0};
+        byte[] data2 = new byte[] {0, 1, 2, 3, 4, 5, 6, 7, 8, 9};
+
+        buffer1.append(data1).append(data2);
+        buffer2.append(data1).append(data2);
+
+        assertEquals(buffer1, buffer2);
+    }
+
+    //----- Utility Methods --------------------------------------------------//
+
+    private void assertContentEquals(CompositeReadableBuffer buffer, byte array[], int offset, int length) {
+        for (int i = 0; i < length; i++) {
+            assertEquals(buffer.get(i), array[offset + i]);
+        }
+    }
+
+    private void assertSpanEquals(ReadableBuffer source, ReadableBuffer other) {
+        assertEquals(source.remaining(), other.remaining());
+        for (int i = 0; i < source.remaining(); i++) {
+            assertEquals(source.get(), other.get());
+        }
+    }
+
+    private void assertContentEquals(ReadableBuffer source, ReadableBuffer other) {
+        assertEquals(source.capacity(), other.capacity());
+        for (int i = 0; i < source.capacity(); i++) {
+            assertEquals(source.get(i), other.get(i));
+        }
+    }
+
+    private byte[] int2bytes(int value) {
+        byte bytes[] = new byte[Integer.BYTES];
+        int index = 0;
+
+        bytes[index++] = (byte) (value >>> 24);
+        bytes[index++] = (byte) (value >>> 16);
+        bytes[index++] = (byte) (value >>> 8);
+        bytes[index++] = (byte) (value >>> 0);
+
+        return bytes;
+    }
+
+    private byte[] long2bytes(long value) {
+        byte bytes[] = new byte[Long.BYTES];
+        int index = 0;
+
+        bytes[index++] = (byte) (value >>> 56);
+        bytes[index++] = (byte) (value >>> 48);
+        bytes[index++] = (byte) (value >>> 40);
+        bytes[index++] = (byte) (value >>> 32);
+        bytes[index++] = (byte) (value >>> 24);
+        bytes[index++] = (byte) (value >>> 16);
+        bytes[index++] = (byte) (value >>> 8);
+        bytes[index++] = (byte) (value >>> 0);
+
+        return bytes;
+    }
+
+    private byte[] float2bytes(float value) {
+        return int2bytes(Float.floatToRawIntBits(value));
+    }
+
+    private byte[] double2bytes(double value) {
+        return long2bytes(Double.doubleToRawLongBits(value));
+    }
+}
diff --git a/proton-j/src/test/java/org/apache/qpid/proton/engine/impl/DeliveryImplTest.java b/proton-j/src/test/java/org/apache/qpid/proton/engine/impl/DeliveryImplTest.java
index b754d5c..6b7eac1 100644
--- a/proton-j/src/test/java/org/apache/qpid/proton/engine/impl/DeliveryImplTest.java
+++ b/proton-j/src/test/java/org/apache/qpid/proton/engine/impl/DeliveryImplTest.java
@@ -19,13 +19,22 @@
  */
 package org.apache.qpid.proton.engine.impl;
 
+import static org.junit.Assert.assertArrayEquals;
 import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertNotSame;
 import static org.junit.Assert.assertSame;
 import static org.junit.Assert.assertTrue;
 
+import java.nio.ByteBuffer;
 import java.nio.charset.StandardCharsets;
 
+import org.apache.qpid.proton.amqp.Binary;
+import org.apache.qpid.proton.codec.CompositeReadableBuffer;
+import org.apache.qpid.proton.codec.ReadableBuffer;
+import org.apache.qpid.proton.codec.WritableBuffer;
+import org.apache.qpid.proton.codec.WritableBuffer.ByteBufferWrapper;
 import org.apache.qpid.proton.engine.Delivery;
 import org.apache.qpid.proton.engine.Record;
 import org.junit.Test;
@@ -33,6 +42,18 @@
 
 public class DeliveryImplTest
 {
+    //----- Test for toString ------------------------------------------------//
+
+    @Test
+    public void testToStringOnEmptyDelivery() throws Exception
+    {
+        // Check that no NPE gets thrown when no data in delivery.
+        DeliveryImpl delivery = new DeliveryImpl(null, Mockito.mock(LinkImpl.class), null);
+        assertNotNull(delivery.toString());
+    }
+
+    //----- Tests for message format handling --------------------------------//
+
     @Test
     public void testDefaultMessageFormat() throws Exception
     {
@@ -86,8 +107,7 @@
         byte[] myData = "myData".getBytes(StandardCharsets.UTF_8);
 
         DeliveryImpl deliveyImpl = new DeliveryImpl(null, Mockito.mock(LinkImpl.class), null);
-        deliveyImpl.setData(myData);
-        deliveyImpl.setDataLength(myData.length);
+        deliveyImpl.append(myData);
 
         Delivery delivery = deliveyImpl;
 
@@ -113,4 +133,557 @@
         assertEquals("Unexpected data length received", remainderLength, received);
         assertEquals("Expected no data to remain available", 0, delivery.available());
     }
+
+    @Test
+    public void testAvailableWhenEmpty() throws Exception
+    {
+        DeliveryImpl delivery = new DeliveryImpl(null, Mockito.mock(LinkImpl.class), null);
+        assertEquals(0, delivery.available());
+    }
+
+    //----- Tests for getters of internal Data -------------------------------//
+
+    @Test
+    public void testGetDataOnEmptyDelivery() throws Exception
+    {
+        DeliveryImpl delivery = new DeliveryImpl(null, Mockito.mock(LinkImpl.class), null);
+
+        assertNotNull(delivery.getData());
+        assertFalse(delivery.getData().hasRemaining());
+    }
+
+    @Test
+    public void testGetDataLengthOnEmptyDelivery() throws Exception
+    {
+        DeliveryImpl delivery = new DeliveryImpl(null, Mockito.mock(LinkImpl.class), null);
+
+        assertEquals(0, delivery.getDataLength());
+    }
+
+    //----- Tests for Append of Data -----------------------------------------//
+
+    @Test
+    public void testAppendArraysToBuffer() throws Exception
+    {
+        DeliveryImpl delivery = new DeliveryImpl(null, Mockito.mock(LinkImpl.class), null);
+
+        byte[] data1 = new byte[] { 0, 1, 2, 3, 4, 5 };
+        byte[] data2 = new byte[] { 6, 7, 8, 9, 10, 11 };
+
+        delivery.append(data1);
+        delivery.append(data2);
+
+        assertEquals(data1.length + data2.length, delivery.getDataLength());
+        assertNotNull(delivery.getData());
+        assertEquals(data1.length + data2.length, delivery.getData().remaining());
+    }
+
+    @Test
+    public void testAppendBinaryToBuffer() throws Exception
+    {
+        DeliveryImpl delivery = new DeliveryImpl(null, Mockito.mock(LinkImpl.class), null);
+
+        byte[] data1 = new byte[] { 0, 1, 2, 3, 4, 5 };
+        byte[] data2 = new byte[] { 6, 7, 8, 9, 10, 11 };
+
+        Binary binary1 = new Binary(data1);
+        Binary binary2 = new Binary(data2);
+
+        delivery.append(binary1);
+        delivery.append(binary2);
+
+        assertEquals(data1.length + data2.length, delivery.getDataLength());
+        assertNotNull(delivery.getData());
+        assertEquals(data1.length + data2.length, delivery.getData().remaining());
+    }
+
+    @Test
+    public void testAppendBinaryWithOffsetsToBuffer() throws Exception
+    {
+        DeliveryImpl delivery = new DeliveryImpl(null, Mockito.mock(LinkImpl.class), null);
+
+        byte[] data1 = new byte[] { 0, 1, 2, 3, 4, 5 };
+        byte[] data2 = new byte[] { 6, 7, 8, 9, 10, 11 };
+
+        Binary binary1 = new Binary(data1, 1, 2);
+        Binary binary2 = new Binary(data2, 0, 4);
+
+        delivery.append(binary1);
+        delivery.append(binary2);
+
+        assertEquals(binary1.getLength() + binary2.getLength(), delivery.getDataLength());
+        assertNotNull(delivery.getData());
+        assertEquals(binary1.getLength() + binary2.getLength(), delivery.getData().remaining());
+    }
+
+    //----- Tests for recv all data ------------------------------------------//
+
+    @Test
+    public void testRecvAllAsReadableBufferWhenEmpty() throws Exception
+    {
+        DeliveryImpl delivery = new DeliveryImpl(null, Mockito.mock(LinkImpl.class), null);
+
+        ReadableBuffer payload = delivery.recv();
+
+        assertNotNull(payload);
+        assertEquals(0, payload.remaining());
+    }
+
+    @Test
+    public void testRecvAllAsReadableBuffer() throws Exception
+    {
+        DeliveryImpl delivery = new DeliveryImpl(null, Mockito.mock(LinkImpl.class), null);
+
+        byte[] data1 = new byte[] { 0, 1, 2, 3, 4, 5 };
+        byte[] data2 = new byte[] { 6, 7, 8, 9, 10, 11 };
+
+        int legnth = data1.length + data2.length;
+
+        Binary binary1 = new Binary(data1);
+        Binary binary2 = new Binary(data2);
+
+        delivery.append(binary1);
+        delivery.append(binary2);
+
+        ReadableBuffer payload = delivery.recv();
+        assertTrue(payload instanceof CompositeReadableBuffer);
+        CompositeReadableBuffer composite = (CompositeReadableBuffer) payload;
+        assertEquals(2, composite.getArrays().size());
+
+        assertNotNull(payload);
+        assertEquals(legnth, payload.remaining());
+        assertEquals(0, payload.get(0));
+        assertEquals(11, payload.get(11));
+    }
+
+    //----- Tests for recv array data ----------------------------------------//
+
+    @Test
+    public void testRecvArrayWithEmptyDelivery() throws Exception
+    {
+        DeliveryImpl delivery = new DeliveryImpl(null, Mockito.mock(LinkImpl.class), null);
+        byte[] received = new byte[5];
+        assertEquals(0, delivery.recv(received, 0, received.length));
+    }
+
+    @Test
+    public void testRecvArrayWhenIncomingIsOneArray() throws Exception
+    {
+        DeliveryImpl delivery = new DeliveryImpl(null, Mockito.mock(LinkImpl.class), null);
+
+        byte[] data = new byte[] { 0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11 };
+
+        int length = data.length;
+
+        delivery.append(data);
+
+        assertEquals(length, delivery.available());
+
+        byte[] received = new byte[length];
+
+        assertEquals(length, delivery.recv(received, 0, length));
+
+        for (int i = 0; i < length; ++i) {
+            assertEquals(received[i], data[i]);
+        }
+
+        assertEquals(0, delivery.recv(received, 0, length));
+    }
+
+    @Test
+    public void testRecvArrayWhenIncomingIsSplitArrays() throws Exception
+    {
+        DeliveryImpl delivery = new DeliveryImpl(null, Mockito.mock(LinkImpl.class), null);
+
+        byte[] data1 = new byte[] { 0, 1, 2, 3, 4, 5 };
+        byte[] data2 = new byte[] { 6, 7, 8, 9, 10, 11 };
+
+        int length = data1.length + data2.length;
+
+        Binary binary1 = new Binary(data1);
+        Binary binary2 = new Binary(data2);
+
+        delivery.append(binary1);
+        delivery.append(binary2);
+
+        assertEquals(length, delivery.available());
+
+        byte[] received = new byte[length];
+
+        assertEquals(length, delivery.recv(received, 0, length));
+
+        for (int i = 0; i < data1.length; ++i) {
+            assertEquals(received[i], data1[i]);
+        }
+
+        int offset = data1.length;
+
+        for (int i = 0; i < data2.length; ++i) {
+            assertEquals(received[i + offset], data2[i]);
+        }
+
+        assertEquals(0, delivery.recv(received, 0, length));
+    }
+
+    //----- Tests for recv WritableBuffer ------------------------------------//
+
+    @Test
+    public void testRecvWriteableBufferWithEmptyDelivery() throws Exception
+    {
+        DeliveryImpl delivery = new DeliveryImpl(null, Mockito.mock(LinkImpl.class), null);
+        WritableBuffer buffer = WritableBuffer.ByteBufferWrapper.allocate(5);
+        assertEquals(0, delivery.recv(buffer));
+    }
+
+    @Test
+    public void testRecvWritableWhenIncomingIsOneArray() throws Exception
+    {
+        DeliveryImpl delivery = new DeliveryImpl(null, Mockito.mock(LinkImpl.class), null);
+
+        byte[] data = new byte[] { 0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11 };
+
+        int length = data.length;
+
+        delivery.append(data);
+
+        assertEquals(length, delivery.available());
+
+        ByteBufferWrapper buffer = WritableBuffer.ByteBufferWrapper.allocate(length);
+
+        assertEquals(length, delivery.recv(buffer));
+
+        ByteBuffer received = buffer.byteBuffer();
+
+        for (int i = 0; i < length; ++i) {
+            assertEquals(received.get(i), data[i]);
+        }
+
+        assertEquals(0, delivery.recv(WritableBuffer.ByteBufferWrapper.allocate(length)));
+    }
+
+    @Test
+    public void testRecvWritableWhenIncomingIsSplitArrays() throws Exception
+    {
+        DeliveryImpl delivery = new DeliveryImpl(null, Mockito.mock(LinkImpl.class), null);
+
+        byte[] data1 = new byte[] { 0, 1, 2, 3, 4, 5 };
+        byte[] data2 = new byte[] { 6, 7, 8, 9, 10, 11 };
+
+        int length = data1.length + data2.length;
+
+        delivery.append(data1);
+        delivery.append(data2);
+
+        assertEquals(length, delivery.available());
+
+        ByteBufferWrapper buffer = WritableBuffer.ByteBufferWrapper.allocate(length);
+
+        assertEquals(length, delivery.recv(buffer));
+
+        ByteBuffer received = buffer.byteBuffer();
+
+        for (int i = 0; i < data1.length; ++i) {
+            assertEquals(received.get(i), data1[i]);
+        }
+
+        int offset = data1.length;
+
+        for (int i = 0; i < data2.length; ++i) {
+            assertEquals(received.get(i + offset), data2[i]);
+        }
+
+        assertEquals(0, delivery.recv(WritableBuffer.ByteBufferWrapper.allocate(length)));
+    }
+
+    //----- Test send with byte arrays ---------------------------------------//
+
+    @Test
+    public void testSendSingleByteArray() throws Exception
+    {
+        DeliveryImpl delivery = createSenderDelivery();
+
+        byte[] data = new byte[] { 0, 1, 2, 3, 4, 5 };
+
+        delivery.send(data, 0, data.length);
+
+        assertEquals(data.length, delivery.pending());
+        assertEquals(data.length, delivery.getData().remaining());
+
+        CompositeReadableBuffer composite = (CompositeReadableBuffer) delivery.getData();
+
+        assertNotSame(data, composite.array());
+        assertArrayEquals(data, composite.array());
+    }
+
+    @Test
+    public void testSendMultipleByteArrays() throws Exception
+    {
+        DeliveryImpl delivery = createSenderDelivery();
+
+        byte[] data1 = new byte[] { 0, 1, 2, 3, 4, 5 };
+        byte[] data2 = new byte[] { 6, 7, 8, 9, 10, 11 };
+
+        int length = data1.length + data2.length;
+
+        delivery.send(data1, 0, data1.length);
+        delivery.send(data2, 0, data2.length);
+
+        assertEquals(length, delivery.pending());
+        assertEquals(length, delivery.getData().remaining());
+
+        CompositeReadableBuffer composite = (CompositeReadableBuffer) delivery.getData();
+
+        assertNotSame(data1, composite.getArrays().get(0));
+        assertNotSame(data2, composite.getArrays().get(1));
+
+        assertArrayEquals(data1, composite.getArrays().get(0));
+        assertArrayEquals(data2, composite.getArrays().get(1));
+    }
+
+    //----- Test send with WritableBuffer ------------------------------------//
+
+    @Test
+    public void testSendSingleReadableBuffer() throws Exception
+    {
+        DeliveryImpl delivery = createSenderDelivery();
+
+        byte[] data = new byte[] { 0, 1, 2, 3, 4, 5 };
+        ReadableBuffer buffer = ReadableBuffer.ByteBufferReader.wrap(data);
+
+        delivery.send(buffer);
+
+        assertEquals(data.length, delivery.pending());
+        assertEquals(data.length, delivery.getData().remaining());
+
+        CompositeReadableBuffer composite = (CompositeReadableBuffer) delivery.getData();
+
+        assertNotSame(data, composite.array());
+        assertArrayEquals(data, composite.array());
+    }
+
+    @Test
+    public void testSendMultipleReadableBuffers() throws Exception
+    {
+        DeliveryImpl delivery = createSenderDelivery();
+
+        byte[] data1 = new byte[] { 0, 1, 2, 3, 4, 5 };
+        byte[] data2 = new byte[] { 6, 7, 8, 9, 10, 11 };
+
+        ReadableBuffer buffer1 = ReadableBuffer.ByteBufferReader.wrap(data1);
+        ReadableBuffer buffer2 = ReadableBuffer.ByteBufferReader.wrap(data2);
+
+        int length = data1.length + data2.length;
+
+        delivery.send(buffer1);
+        delivery.send(buffer2);
+
+        assertEquals(length, delivery.pending());
+        assertEquals(length, delivery.getData().remaining());
+
+        CompositeReadableBuffer composite = (CompositeReadableBuffer) delivery.getData();
+
+        assertNotSame(data1, composite.getArrays().get(0));
+        assertNotSame(data2, composite.getArrays().get(1));
+
+        assertArrayEquals(data1, composite.getArrays().get(0));
+        assertArrayEquals(data2, composite.getArrays().get(1));
+    }
+
+    //----- Test send with WritableBuffer ------------------------------------//
+
+    @Test
+    public void testSendNoCopySingleReadableBuffer() throws Exception
+    {
+        DeliveryImpl delivery = createSenderDelivery();
+
+        byte[] data = new byte[] { 0, 1, 2, 3, 4, 5 };
+        ReadableBuffer buffer = ReadableBuffer.ByteBufferReader.wrap(data);
+
+        delivery.sendNoCopy(buffer);
+
+        assertEquals(data.length, delivery.pending());
+        assertEquals(data.length, delivery.getData().remaining());
+
+        assertSame(buffer, delivery.getData());
+
+        assertSame(data, delivery.getData().array());
+        assertArrayEquals(data, delivery.getData().array());
+    }
+
+    @Test
+    public void testSendNoCopySingleReadableBufferWhenPreviousBufferWasConsumed() throws Exception
+    {
+        DeliveryImpl delivery = createSenderDelivery();
+
+        byte[] data1 = new byte[] { 0, 1, 2, 3, 4, 5 };
+        ReadableBuffer buffer1 = ReadableBuffer.ByteBufferReader.wrap(data1);
+
+        byte[] data2 = new byte[] { 0, 1, 2, 3, 4, 5, 6 };
+        ReadableBuffer buffer2 = ReadableBuffer.ByteBufferReader.wrap(data2);
+
+        delivery.sendNoCopy(buffer1);
+
+        assertEquals(data1.length, delivery.pending());
+        assertEquals(data1.length, delivery.getData().remaining());
+
+        assertSame(buffer1, delivery.getData());
+
+        assertSame(data1, delivery.getData().array());
+        assertArrayEquals(data1, delivery.getData().array());
+
+        delivery.getData().position(delivery.getDataLength());
+
+        delivery.sendNoCopy(buffer2);
+
+        assertEquals(data2.length, delivery.pending());
+        assertEquals(data2.length, delivery.getData().remaining());
+
+        assertSame(buffer2, delivery.getData());
+
+        assertSame(data2, delivery.getData().array());
+        assertArrayEquals(data2, delivery.getData().array());
+    }
+
+    @Test
+    public void testSendNoCopyMultipleReadableBuffers() throws Exception
+    {
+        DeliveryImpl delivery = createSenderDelivery();
+
+        byte[] data1 = new byte[] { 0, 1, 2, 3, 4, 5 };
+        byte[] data2 = new byte[] { 6, 7, 8, 9, 10, 11 };
+
+        ReadableBuffer buffer1 = ReadableBuffer.ByteBufferReader.wrap(data1);
+        ReadableBuffer buffer2 = ReadableBuffer.ByteBufferReader.wrap(data2);
+
+        int length = data1.length + data2.length;
+
+        delivery.sendNoCopy(buffer1);
+        delivery.sendNoCopy(buffer2);
+
+        assertEquals(length, delivery.pending());
+        assertEquals(length, delivery.getData().remaining());
+
+        // The Delivery had to copy because it doesn't aggregate buffers only arrays
+
+        CompositeReadableBuffer composite = (CompositeReadableBuffer) delivery.getData();
+
+        assertNotSame(data1, composite.getArrays().get(0));
+        assertNotSame(data2, composite.getArrays().get(1));
+
+        assertArrayEquals(data1, composite.getArrays().get(0));
+        assertArrayEquals(data2, composite.getArrays().get(1));
+
+        byte[] data3 = new byte[] { 12, 13, 14 };
+        ReadableBuffer buffer3 = ReadableBuffer.ByteBufferReader.wrap(data3);
+
+        length += data3.length;
+
+        delivery.sendNoCopy(buffer3);
+
+        assertEquals(length, delivery.pending());
+        assertEquals(length, delivery.getData().remaining());
+
+        assertSame(composite, delivery.getData());
+
+        assertNotSame(data1, composite.getArrays().get(0));
+        assertNotSame(data2, composite.getArrays().get(1));
+        assertNotSame(data3, composite.getArrays().get(2));
+
+        assertArrayEquals(data1, composite.getArrays().get(0));
+        assertArrayEquals(data2, composite.getArrays().get(1));
+        assertArrayEquals(data3, composite.getArrays().get(2));
+    }
+
+    //----- Tests for afterSend cleanup --------------------------------------//
+
+    @Test
+    public void testAfterSendOnEmptyDelivery() throws Exception
+    {
+        DeliveryImpl delivery = createSenderDelivery();
+
+        ReadableBuffer sendBuffer = delivery.getData();
+
+        delivery.afterSend();
+
+        assertSame(sendBuffer, delivery.getData());
+    }
+
+    @Test
+    public void testAfterSendPreservesInteralBufferWhenEmpty() throws Exception
+    {
+        DeliveryImpl delivery = createSenderDelivery();
+
+        byte[] data = new byte[] { 0, 1, 2, 3, 4, 5 };
+        ReadableBuffer buffer = ReadableBuffer.ByteBufferReader.wrap(data);
+
+        delivery.send(buffer);
+
+        assertEquals(data.length, delivery.pending());
+        assertEquals(data.length, delivery.getData().remaining());
+
+        CompositeReadableBuffer composite = (CompositeReadableBuffer) delivery.getData();
+
+        assertNotSame(data, composite.array());
+        assertArrayEquals(data, composite.array());
+
+        delivery.getData().position(delivery.getData().limit());
+        delivery.afterSend();
+
+        assertSame(composite, delivery.getData());
+    }
+
+    @Test
+    public void testAfterSendNoCopyClearsExternalReadableBuffer() throws Exception
+    {
+        DeliveryImpl delivery = createSenderDelivery();
+
+        byte[] data = new byte[] { 0, 1, 2, 3, 4, 5 };
+        ReadableBuffer buffer = ReadableBuffer.ByteBufferReader.wrap(data);
+
+        delivery.sendNoCopy(buffer);
+
+        ReadableBuffer sendBuffer = delivery.getData();
+
+        assertEquals(data.length, sendBuffer.remaining());
+        assertSame(buffer, sendBuffer);
+
+        sendBuffer.position(sendBuffer.limit());
+
+        delivery.afterSend();
+
+        assertNotSame(buffer, delivery.getData());
+    }
+
+    @Test
+    public void testAfterSendNoCopyPreservesExternalReadableBufferIfNotDrained() throws Exception
+    {
+        DeliveryImpl delivery = createSenderDelivery();
+
+        byte[] data = new byte[] { 0, 1, 2, 3, 4, 5 };
+        ReadableBuffer buffer = ReadableBuffer.ByteBufferReader.wrap(data);
+
+        delivery.sendNoCopy(buffer);
+
+        ReadableBuffer sendBuffer = delivery.getData();
+
+        assertEquals(data.length, sendBuffer.remaining());
+        assertSame(buffer, sendBuffer);
+
+        sendBuffer.position(sendBuffer.limit() - 1);
+
+        delivery.afterSend();
+
+        assertSame(buffer, delivery.getData());
+    }
+
+    //------------------------------------------------------------------------//
+
+    private DeliveryImpl createSenderDelivery() {
+        LinkImpl link = Mockito.mock(SenderImpl.class);
+        ConnectionImpl connection = Mockito.mock(ConnectionImpl.class);
+
+        Mockito.when(link.getConnectionImpl()).thenReturn(connection);
+
+        return new DeliveryImpl(null, link, null);
+    }
 }
diff --git a/proton-j/src/test/java/org/apache/qpid/proton/engine/impl/TransportImplTest.java b/proton-j/src/test/java/org/apache/qpid/proton/engine/impl/TransportImplTest.java
index ddff6b3..ead411f 100644
--- a/proton-j/src/test/java/org/apache/qpid/proton/engine/impl/TransportImplTest.java
+++ b/proton-j/src/test/java/org/apache/qpid/proton/engine/impl/TransportImplTest.java
@@ -51,6 +51,7 @@
 import org.apache.qpid.proton.amqp.transport.Open;
 import org.apache.qpid.proton.amqp.transport.Role;
 import org.apache.qpid.proton.amqp.transport.Transfer;
+import org.apache.qpid.proton.codec.ReadableBuffer;
 import org.apache.qpid.proton.engine.Collector;
 import org.apache.qpid.proton.engine.Connection;
 import org.apache.qpid.proton.engine.Delivery;
@@ -366,9 +367,10 @@
         }
 
         LinkedList<FrameBody> writes = new LinkedList<FrameBody>();
+
         @Override
         protected void writeFrame(int channel, FrameBody frameBody,
-                                  ByteBuffer payload, Runnable onPayloadTooLarge) {
+                                  ReadableBuffer payload, Runnable onPayloadTooLarge) {
             super.writeFrame(channel, frameBody, payload, onPayloadTooLarge);
             writes.addLast(frameBody);
         }