PROTON-1967 Reduce garbage created in the transport

Used cached isntances of objects that are created and immediately discared
after being sent into the frame writer.  In order to fix tests the FrameBody
types all need a copy method so that the events coming through the transport
can be recorded for later analysis.
diff --git a/proton-j/src/main/java/org/apache/qpid/proton/amqp/Binary.java b/proton-j/src/main/java/org/apache/qpid/proton/amqp/Binary.java
index ab1bfe5..f65a117 100644
--- a/proton-j/src/main/java/org/apache/qpid/proton/amqp/Binary.java
+++ b/proton-j/src/main/java/org/apache/qpid/proton/amqp/Binary.java
@@ -27,7 +27,6 @@
 
 public final class Binary
 {
-
     private final byte[] _data;
     private final int _offset;
     private final int _length;
@@ -102,7 +101,6 @@
         return true;
     }
 
-
     public int getArrayOffset()
     {
         return _offset;
@@ -118,11 +116,11 @@
         return _length;
     }
 
+    @Override
     public String toString()
     {
         StringBuilder str = new StringBuilder();
 
-
         for (int i = 0; i < _length; i++)
         {
             byte c = _data[_offset + i];
@@ -138,12 +136,10 @@
         }
 
         return str.toString();
-
     }
 
     public static Binary combine(final Collection<Binary> binaries)
     {
-
         if(binaries.size() == 1)
         {
             return binaries.iterator().next();
@@ -190,9 +186,11 @@
 
     public static Binary create(ByteBuffer buffer)
     {
-        if( buffer == null )
+        if (buffer == null)
+        {
             return null;
-        if( buffer.isDirect() || buffer.isReadOnly() )
+        }
+        if (buffer.isDirect() || buffer.isReadOnly())
         {
             byte data[] = new byte [buffer.remaining()];
             ByteBuffer dup = buffer.duplicate();
@@ -205,4 +203,17 @@
         }
     }
 
+    public static Binary copy(Binary source)
+    {
+        if (source == null)
+        {
+            return null;
+        }
+        else
+        {
+            byte[] data = new byte[source.getLength()];
+            System.arraycopy(source.getArray(), source.getArrayOffset(), data, 0, source.getLength());
+            return new Binary(data);
+        }
+    }
 }
diff --git a/proton-j/src/main/java/org/apache/qpid/proton/amqp/transport/Attach.java b/proton-j/src/main/java/org/apache/qpid/proton/amqp/transport/Attach.java
index 1d1746f..fa6dbad 100644
--- a/proton-j/src/main/java/org/apache/qpid/proton/amqp/transport/Attach.java
+++ b/proton-j/src/main/java/org/apache/qpid/proton/amqp/transport/Attach.java
@@ -23,17 +23,18 @@
 
 package org.apache.qpid.proton.amqp.transport;
 
+import java.util.Arrays;
+import java.util.LinkedHashMap;
+import java.util.Map;
+
 import org.apache.qpid.proton.amqp.Binary;
 import org.apache.qpid.proton.amqp.Symbol;
 import org.apache.qpid.proton.amqp.UnsignedInteger;
 import org.apache.qpid.proton.amqp.UnsignedLong;
 
-import java.util.Arrays;
-import java.util.Map;
-
+@SuppressWarnings("rawtypes")
 public final class Attach implements FrameBody
 {
-
     private String _name;
     private UnsignedInteger _handle;
     private Role _role = Role.SENDER;
@@ -49,6 +50,39 @@
     private Symbol[] _desiredCapabilities;
     private Map _properties;
 
+    public Attach() {}
+
+    @SuppressWarnings("unchecked")
+    public Attach(Attach other)
+    {
+        this._name = other._name;
+        this._handle = other._handle;
+        this._role = other._role;
+        this._sndSettleMode = other._sndSettleMode;
+        this._rcvSettleMode = other._rcvSettleMode;
+        if (other._source != null) {
+            this._source = other._source.copy();
+        }
+        if (other._target != null) {
+            this._target = other._target.copy();
+        }
+        if (other._unsettled != null) {
+            this._unsettled = new LinkedHashMap<>(other._unsettled);
+        }
+        this._incompleteUnsettled = other._incompleteUnsettled;
+        this._initialDeliveryCount = other._initialDeliveryCount;
+        this._maxMessageSize = other._maxMessageSize;
+        if (other._offeredCapabilities != null) {
+            this._offeredCapabilities = Arrays.copyOf(other._offeredCapabilities, other._offeredCapabilities.length);
+        }
+        if (other._desiredCapabilities != null) {
+            this._desiredCapabilities = Arrays.copyOf(other._desiredCapabilities, other._desiredCapabilities.length);
+        }
+        if (other._properties != null) {
+            this._properties = new LinkedHashMap<>(other._properties);
+        }
+    }
+
     public String getName()
     {
         return _name;
@@ -203,12 +237,12 @@
         _properties = properties;
     }
 
+    @Override
     public <E> void invoke(FrameBodyHandler<E> handler, Binary payload, E context)
     {
         handler.handleAttach(this, payload, context);
     }
 
-
     @Override
     public String toString()
     {
@@ -229,4 +263,10 @@
                ", properties=" + _properties +
                '}';
     }
+
+    @Override
+    public Attach copy()
+    {
+        return new Attach(this);
+    }
 }
diff --git a/proton-j/src/main/java/org/apache/qpid/proton/amqp/transport/Begin.java b/proton-j/src/main/java/org/apache/qpid/proton/amqp/transport/Begin.java
index ead4050..c4859fb 100644
--- a/proton-j/src/main/java/org/apache/qpid/proton/amqp/transport/Begin.java
+++ b/proton-j/src/main/java/org/apache/qpid/proton/amqp/transport/Begin.java
@@ -23,18 +23,18 @@
 
 package org.apache.qpid.proton.amqp.transport;
 
+import java.util.Arrays;
+import java.util.LinkedHashMap;
+import java.util.Map;
+
 import org.apache.qpid.proton.amqp.Binary;
 import org.apache.qpid.proton.amqp.Symbol;
 import org.apache.qpid.proton.amqp.UnsignedInteger;
 import org.apache.qpid.proton.amqp.UnsignedShort;
 
-import java.util.Arrays;
-import java.util.Map;
-
-
+@SuppressWarnings("rawtypes")
 public final class Begin implements FrameBody
 {
-
     private UnsignedShort _remoteChannel;
     private UnsignedInteger _nextOutgoingId;
     private UnsignedInteger _incomingWindow;
@@ -44,6 +44,27 @@
     private Symbol[] _desiredCapabilities;
     private Map _properties;
 
+    public Begin() {}
+
+    @SuppressWarnings("unchecked")
+    public Begin(Begin other)
+    {
+        this._remoteChannel = other._remoteChannel;
+        this._nextOutgoingId = other._nextOutgoingId;
+        this._incomingWindow = other._incomingWindow;
+        this._outgoingWindow = other._outgoingWindow;
+        this._handleMax = other._handleMax;
+        if (other._offeredCapabilities != null) {
+            this._offeredCapabilities = Arrays.copyOf(other._offeredCapabilities, other._offeredCapabilities.length);
+        }
+        if (other._desiredCapabilities != null) {
+            this._desiredCapabilities = Arrays.copyOf(other._desiredCapabilities, other._desiredCapabilities.length);
+        }
+        if (other._properties != null) {
+            this._properties = new LinkedHashMap<>(other._properties);
+        }
+    }
+
     public UnsignedShort getRemoteChannel()
     {
         return _remoteChannel;
@@ -139,12 +160,12 @@
         _properties = properties;
     }
 
+    @Override
     public <E> void invoke(FrameBodyHandler<E> handler, Binary payload, E context)
     {
         handler.handleBegin(this, payload, context);
     }
 
-
     @Override
     public String toString()
     {
@@ -159,5 +180,10 @@
                ", properties=" + _properties +
                '}';
     }
+
+    @Override
+    public Begin copy()
+    {
+        return new Begin(this);
+    }
 }
-  
\ No newline at end of file
diff --git a/proton-j/src/main/java/org/apache/qpid/proton/amqp/transport/Close.java b/proton-j/src/main/java/org/apache/qpid/proton/amqp/transport/Close.java
index ca141f8..05145bb 100644
--- a/proton-j/src/main/java/org/apache/qpid/proton/amqp/transport/Close.java
+++ b/proton-j/src/main/java/org/apache/qpid/proton/amqp/transport/Close.java
@@ -29,6 +29,17 @@
 {
     private ErrorCondition _error;
 
+    public Close() {}
+
+    public Close(Close other)
+    {
+        if (other._error != null)
+        {
+            this._error = new ErrorCondition();
+            this._error.copyFrom(other._error);
+        }
+    }
+
     public ErrorCondition getError()
     {
         return _error;
@@ -39,6 +50,7 @@
         _error = error;
     }
 
+    @Override
     public <E> void invoke(FrameBodyHandler<E> handler, Binary payload, E context)
     {
         handler.handleClose(this, payload, context);
@@ -51,5 +63,10 @@
                "error=" + _error +
                '}';
     }
+
+    @Override
+    public Close copy()
+    {
+        return new Close(this);
+    }
 }
-  
\ No newline at end of file
diff --git a/proton-j/src/main/java/org/apache/qpid/proton/amqp/transport/Detach.java b/proton-j/src/main/java/org/apache/qpid/proton/amqp/transport/Detach.java
index b0f2936..2b96611 100644
--- a/proton-j/src/main/java/org/apache/qpid/proton/amqp/transport/Detach.java
+++ b/proton-j/src/main/java/org/apache/qpid/proton/amqp/transport/Detach.java
@@ -28,11 +28,23 @@
 
 public final class Detach implements FrameBody
 {
-
     private UnsignedInteger _handle;
     private boolean _closed;
     private ErrorCondition _error;
 
+    public Detach() {}
+
+    public Detach(Detach other)
+    {
+        this._handle = other._handle;
+        this._closed = other._closed;
+        if (other._error != null)
+        {
+            this._error = new ErrorCondition();
+            this._error.copyFrom(other._error);
+        }
+    }
+
     public UnsignedInteger getHandle()
     {
         return _handle;
@@ -67,7 +79,8 @@
     {
         _error = error;
     }
-    
+
+    @Override
     public <E> void invoke(FrameBodyHandler<E> handler, Binary payload, E context)
     {
         handler.handleDetach(this, payload, context);
@@ -82,5 +95,10 @@
                ", error=" + _error +
                '}';
     }
+
+    @Override
+    public Detach copy()
+    {
+        return new Detach(this);
+    }
 }
-  
\ No newline at end of file
diff --git a/proton-j/src/main/java/org/apache/qpid/proton/amqp/transport/Disposition.java b/proton-j/src/main/java/org/apache/qpid/proton/amqp/transport/Disposition.java
index a850940..771789b 100644
--- a/proton-j/src/main/java/org/apache/qpid/proton/amqp/transport/Disposition.java
+++ b/proton-j/src/main/java/org/apache/qpid/proton/amqp/transport/Disposition.java
@@ -35,6 +35,18 @@
     private DeliveryState _state;
     private boolean _batchable;
 
+    public Disposition() {}
+
+    public Disposition(Disposition other)
+    {
+        this._role = other._role;
+        this._first = other._first;
+        this._last = other._last;
+        this._settled = other._settled;
+        this._state = other._state;
+        this._batchable = other._batchable;
+    }
+
     public Role getRole()
     {
         return _role;
@@ -104,6 +116,7 @@
         _batchable = batchable;
     }
 
+    @Override
     public <E> void invoke(FrameBodyHandler<E> handler, Binary payload, E context)
     {
         handler.handleDisposition(this, payload, context);
@@ -121,4 +134,10 @@
                ", batchable=" + _batchable +
                '}';
     }
+
+    @Override
+    public Disposition copy()
+    {
+        return new Disposition(this);
+    }
 }
diff --git a/proton-j/src/main/java/org/apache/qpid/proton/amqp/transport/EmptyFrame.java b/proton-j/src/main/java/org/apache/qpid/proton/amqp/transport/EmptyFrame.java
index bb76ba9..15f0308 100644
--- a/proton-j/src/main/java/org/apache/qpid/proton/amqp/transport/EmptyFrame.java
+++ b/proton-j/src/main/java/org/apache/qpid/proton/amqp/transport/EmptyFrame.java
@@ -37,4 +37,10 @@
     {
         return "Empty Frame";
     }
+
+    @Override
+    public EmptyFrame copy()
+    {
+        return new EmptyFrame();
+    }
 }
diff --git a/proton-j/src/main/java/org/apache/qpid/proton/amqp/transport/End.java b/proton-j/src/main/java/org/apache/qpid/proton/amqp/transport/End.java
index e53a456..7cbd774 100644
--- a/proton-j/src/main/java/org/apache/qpid/proton/amqp/transport/End.java
+++ b/proton-j/src/main/java/org/apache/qpid/proton/amqp/transport/End.java
@@ -29,6 +29,17 @@
 {
     private ErrorCondition _error;
 
+    public End() {}
+
+    public End(End other)
+    {
+        if (other._error != null)
+        {
+            this._error = new ErrorCondition();
+            this._error.copyFrom(other._error);
+        }
+    }
+
     public ErrorCondition getError()
     {
         return _error;
@@ -39,6 +50,7 @@
         _error = error;
     }
 
+    @Override
     public <E> void invoke(FrameBodyHandler<E> handler, Binary payload, E context)
     {
         handler.handleEnd(this, payload, context);
@@ -51,5 +63,10 @@
                "error=" + _error +
                '}';
     }
+
+    @Override
+    public End copy()
+    {
+        return new End(this);
+    }
 }
-  
\ No newline at end of file
diff --git a/proton-j/src/main/java/org/apache/qpid/proton/amqp/transport/Flow.java b/proton-j/src/main/java/org/apache/qpid/proton/amqp/transport/Flow.java
index b421a00..a321bd3 100644
--- a/proton-j/src/main/java/org/apache/qpid/proton/amqp/transport/Flow.java
+++ b/proton-j/src/main/java/org/apache/qpid/proton/amqp/transport/Flow.java
@@ -23,11 +23,13 @@
 
 package org.apache.qpid.proton.amqp.transport;
 
+import java.util.LinkedHashMap;
 import java.util.Map;
 
 import org.apache.qpid.proton.amqp.Binary;
 import org.apache.qpid.proton.amqp.UnsignedInteger;
 
+@SuppressWarnings("rawtypes")
 public final class Flow implements FrameBody
 {
     private UnsignedInteger _nextIncomingId;
@@ -42,6 +44,27 @@
     private boolean _echo;
     private Map _properties;
 
+    public Flow() {}
+
+    @SuppressWarnings("unchecked")
+    public Flow(Flow other)
+    {
+        this._nextIncomingId = other._nextIncomingId;
+        this._incomingWindow = other._incomingWindow;
+        this._nextOutgoingId = other._nextOutgoingId;
+        this._outgoingWindow = other._outgoingWindow;
+        this._handle = other._handle;
+        this._deliveryCount = other._deliveryCount;
+        this._linkCredit = other._linkCredit;
+        this._available = other._available;
+        this._drain = other._drain;
+        this._echo = other._echo;
+        if (other._properties != null)
+        {
+            this._properties = new LinkedHashMap<>(other._properties);
+        }
+    }
+
     public UnsignedInteger getNextIncomingId()
     {
         return _nextIncomingId;
@@ -167,6 +190,7 @@
         _properties = properties;
     }
 
+    @Override
     public <E> void invoke(FrameBodyHandler<E> handler, Binary payload, E context)
     {
         handler.handleFlow(this, payload, context);
@@ -189,5 +213,10 @@
                ", properties=" + _properties +
                '}';
     }
+
+    @Override
+    public Flow copy()
+    {
+        return new Flow(this);
+    }
 }
-  
\ No newline at end of file
diff --git a/proton-j/src/main/java/org/apache/qpid/proton/amqp/transport/FrameBody.java b/proton-j/src/main/java/org/apache/qpid/proton/amqp/transport/FrameBody.java
index 90a2431..4fb08c3 100644
--- a/proton-j/src/main/java/org/apache/qpid/proton/amqp/transport/FrameBody.java
+++ b/proton-j/src/main/java/org/apache/qpid/proton/amqp/transport/FrameBody.java
@@ -40,4 +40,10 @@
     }
 
     <E> void invoke(FrameBodyHandler<E> handler, Binary payload, E context);
+
+    /**
+     * @return a deep copy of this FrameBody.
+     */
+    FrameBody copy();
+
 }
diff --git a/proton-j/src/main/java/org/apache/qpid/proton/amqp/transport/Open.java b/proton-j/src/main/java/org/apache/qpid/proton/amqp/transport/Open.java
index e4ec98b..46283fd 100644
--- a/proton-j/src/main/java/org/apache/qpid/proton/amqp/transport/Open.java
+++ b/proton-j/src/main/java/org/apache/qpid/proton/amqp/transport/Open.java
@@ -23,15 +23,16 @@
 
 package org.apache.qpid.proton.amqp.transport;
 
+import java.util.Arrays;
+import java.util.LinkedHashMap;
+import java.util.Map;
+
 import org.apache.qpid.proton.amqp.Binary;
 import org.apache.qpid.proton.amqp.Symbol;
 import org.apache.qpid.proton.amqp.UnsignedInteger;
 import org.apache.qpid.proton.amqp.UnsignedShort;
 
-import java.util.Arrays;
-import java.util.Map;
-
-
+@SuppressWarnings("rawtypes")
 public final class Open implements FrameBody
 {
     private String _containerId;
@@ -45,6 +46,33 @@
     private Symbol[] _desiredCapabilities;
     private Map _properties;
 
+    public Open() {}
+
+    @SuppressWarnings("unchecked")
+    public Open(Open other)
+    {
+        this._containerId = other._containerId;
+        this._hostname = other._hostname;
+        this._maxFrameSize = other._maxFrameSize;
+        this._channelMax = other._channelMax;
+        this._idleTimeOut = other._idleTimeOut;
+        if (other._outgoingLocales != null) {
+            this._outgoingLocales = Arrays.copyOf(other._outgoingLocales, other._outgoingLocales.length);
+        }
+        if (other._incomingLocales != null) {
+            this._incomingLocales = Arrays.copyOf(other._incomingLocales, other._incomingLocales.length);
+        }
+        if (other._offeredCapabilities != null) {
+            this._offeredCapabilities = Arrays.copyOf(other._offeredCapabilities, other._offeredCapabilities.length);
+        }
+        if (other._desiredCapabilities != null) {
+            this._desiredCapabilities = Arrays.copyOf(other._desiredCapabilities, other._desiredCapabilities.length);
+        }
+        if (other._properties != null) {
+            this._properties = new LinkedHashMap<>(other._properties);
+        }
+    }
+
     public String getContainerId()
     {
         return _containerId;
@@ -150,6 +178,7 @@
         _properties = properties;
     }
 
+    @Override
     public <E> void invoke(FrameBodyHandler<E> handler, Binary payload, E context)
     {
         handler.handleOpen(this, payload, context);
@@ -171,5 +200,10 @@
                ", properties=" + _properties +
                '}';
     }
+
+    @Override
+    public Open copy()
+    {
+        return new Open(this);
+    }
 }
-  
\ No newline at end of file
diff --git a/proton-j/src/main/java/org/apache/qpid/proton/amqp/transport/Transfer.java b/proton-j/src/main/java/org/apache/qpid/proton/amqp/transport/Transfer.java
index 35c421c..b95c476 100644
--- a/proton-j/src/main/java/org/apache/qpid/proton/amqp/transport/Transfer.java
+++ b/proton-j/src/main/java/org/apache/qpid/proton/amqp/transport/Transfer.java
@@ -40,6 +40,23 @@
     private boolean _aborted;
     private boolean _batchable;
 
+    public Transfer() {}
+
+    public Transfer(Transfer other)
+    {
+        this._handle = other._handle;
+        this._deliveryId = other._deliveryId;
+        this._deliveryTag = Binary.copy(other._deliveryTag);
+        this._messageFormat = other._messageFormat;
+        this._settled = other._settled;
+        this._more = other._more;
+        this._rcvSettleMode = other._rcvSettleMode;
+        this._state = other._state;
+        this._resume = other._resume;
+        this._aborted = other._aborted;
+        this._batchable = other._batchable;
+    }
+
     public UnsignedInteger getHandle()
     {
         return _handle;
@@ -155,6 +172,7 @@
         _batchable = batchable;
     }
 
+    @Override
     public <E> void invoke(FrameBodyHandler<E> handler, Binary payload, E context)
     {
         handler.handleTransfer(this, payload, context);
@@ -177,5 +195,10 @@
                ", batchable=" + _batchable +
                '}';
     }
+
+    @Override
+    public Transfer copy()
+    {
+        return new Transfer(this);
+    }
 }
-  
\ No newline at end of file
diff --git a/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/TransportImpl.java b/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/TransportImpl.java
index 1a02cf3..9f68260 100644
--- a/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/TransportImpl.java
+++ b/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/TransportImpl.java
@@ -145,7 +145,11 @@
 
     private List<TransportLayer> _additionalTransportLayers;
 
-    private final PartialTransferHandler partialTransferHandler = new PartialTransferHandler();
+    // Cached instances used to carry the Performatives to the frame writer without the need to create
+    // a new instance on each operation that triggers a write
+    private final Disposition cachedDisposition = new Disposition();
+    private final Flow cachedFlow = new Flow();
+    private final Transfer cachedTransfer = new Transfer();
 
     /**
      * Application code should use {@link org.apache.qpid.proton.engine.Transport.Factory#create()} instead
@@ -155,7 +159,6 @@
         this(DEFAULT_MAX_FRAME_SIZE);
     }
 
-
     /**
      * Creates a transport with the given maximum frame size.
      * Note that the maximumFrameSize also determines the size of the output buffer.
@@ -447,7 +450,6 @@
                         transportLink.clearLocalHandle();
                         transportSession.freeLocalHandle(localHandle);
 
-
                         Detach detach = new Detach();
                         detach.setHandle(localHandle);
                         detach.setClosed(!link.detached());
@@ -458,7 +460,6 @@
                             detach.setError(localError);
                         }
 
-
                         writeFrame(transportSession.getLocalChannel(), detach, null, null);
                     }
 
@@ -472,19 +473,24 @@
 
     private void writeFlow(TransportSession ssn, TransportLink link)
     {
-        Flow flow = new Flow();
-        flow.setNextIncomingId(ssn.getNextIncomingId());
-        flow.setNextOutgoingId(ssn.getNextOutgoingId());
+        cachedFlow.setNextIncomingId(ssn.getNextIncomingId());
+        cachedFlow.setNextOutgoingId(ssn.getNextOutgoingId());
         ssn.updateIncomingWindow();
-        flow.setIncomingWindow(ssn.getIncomingWindowSize());
-        flow.setOutgoingWindow(ssn.getOutgoingWindowSize());
+        cachedFlow.setIncomingWindow(ssn.getIncomingWindowSize());
+        cachedFlow.setOutgoingWindow(ssn.getOutgoingWindowSize());
+        cachedFlow.setProperties(null);
         if (link != null) {
-            flow.setHandle(link.getLocalHandle());
-            flow.setDeliveryCount(link.getDeliveryCount());
-            flow.setLinkCredit(link.getLinkCredit());
-            flow.setDrain(link.getLink().getDrain());
+            cachedFlow.setHandle(link.getLocalHandle());
+            cachedFlow.setDeliveryCount(link.getDeliveryCount());
+            cachedFlow.setLinkCredit(link.getLinkCredit());
+            cachedFlow.setDrain(link.getLink().getDrain());
+        } else {
+            cachedFlow.setHandle(null);
+            cachedFlow.setDeliveryCount(null);
+            cachedFlow.setLinkCredit(null);
+            cachedFlow.setDrain(false);
         }
-        writeFrame(ssn.getLocalChannel(), flow, null, null);
+        writeFrame(ssn.getLocalChannel(), cachedFlow, null, null);
     }
 
     private void processSenderFlow()
@@ -494,7 +500,6 @@
             EndpointImpl endpoint = _connectionEndpoint.getTransportHead();
             while(endpoint != null)
             {
-
                 if(endpoint instanceof SenderImpl)
                 {
                     SenderImpl sender = (SenderImpl) endpoint;
@@ -509,7 +514,6 @@
 
                         writeFlow(transportSession, transportLink);
                     }
-
                 }
 
                 endpoint = endpoint.transportNext();
@@ -578,35 +582,48 @@
             tpDelivery = new TransportDelivery(deliveryId, delivery, tpLink);
             delivery.setTransportDelivery(tpDelivery);
 
-            final Transfer transfer = new Transfer();
-            transfer.setDeliveryId(deliveryId);
-            transfer.setDeliveryTag(new Binary(delivery.getTag()));
-            transfer.setHandle(tpLink.getLocalHandle());
+            cachedTransfer.setDeliveryId(deliveryId);
+            cachedTransfer.setDeliveryTag(new Binary(delivery.getTag()));
+            cachedTransfer.setHandle(tpLink.getLocalHandle());
+            cachedTransfer.setRcvSettleMode(null);
+            cachedTransfer.setResume(false); // Ensure default is written
+            cachedTransfer.setAborted(false); // Ensure default is written
+            cachedTransfer.setBatchable(false); // Ensure default is written
 
             if(delivery.getLocalState() != null)
             {
-                transfer.setState(delivery.getLocalState());
+                cachedTransfer.setState(delivery.getLocalState());
+            }
+            else
+            {
+                cachedTransfer.setState(null);
             }
 
             if(delivery.isSettled())
             {
-                transfer.setSettled(Boolean.TRUE);
+                cachedTransfer.setSettled(Boolean.TRUE);
             }
             else
             {
+                cachedTransfer.setSettled(Boolean.FALSE);
                 tpSession.addUnsettledOutgoing(deliveryId, delivery);
             }
 
             if(snd.current() == delivery)
             {
-                transfer.setMore(true);
+                cachedTransfer.setMore(true);
+            }
+            else
+            {
+                // Partial transfers will reset this as needed to true in the FrameWriter
+                cachedTransfer.setMore(false);
             }
 
             int messageFormat = delivery.getMessageFormat();
             if(messageFormat == DeliveryImpl.DEFAULT_MESSAGE_FORMAT) {
-                transfer.setMessageFormat(UnsignedInteger.ZERO);
+                cachedTransfer.setMessageFormat(UnsignedInteger.ZERO);
             } else {
-                transfer.setMessageFormat(UnsignedInteger.valueOf(messageFormat));
+                cachedTransfer.setMessageFormat(UnsignedInteger.valueOf(messageFormat));
             }
 
             ReadableBuffer payload = delivery.getData();
@@ -614,9 +631,8 @@
             int pending = payload.remaining();
 
             try {
-                writeFrame(tpSession.getLocalChannel(), transfer, payload, partialTransferHandler.setTransfer(transfer));
+                writeFrame(tpSession.getLocalChannel(), cachedTransfer, payload, () -> cachedTransfer.setMore(true));
             } finally {
-                partialTransferHandler.setTransfer(null);
                 delivery.afterSend();  // Allow for freeing resources after write of buffered data
             }
 
@@ -627,7 +643,7 @@
             {
                 session.incrementOutgoingBytes(-pending);
 
-                if (!transfer.getMore()) {
+                if (!cachedTransfer.getMore()) {
                     // Clear the in-progress delivery marker
                     tpLink.setInProgressDelivery(null);
 
@@ -655,26 +671,25 @@
         if(wasDone && delivery.getLocalState() != null)
         {
             TransportDelivery tpDelivery = delivery.getTransportDelivery();
-            Disposition disposition = new Disposition();
-            disposition.setFirst(tpDelivery.getDeliveryId());
-            disposition.setLast(tpDelivery.getDeliveryId());
-            disposition.setRole(Role.SENDER);
-            disposition.setSettled(delivery.isSettled());
+            // Use cached object as holder of data for immediate write to the FrameWriter
+            cachedDisposition.setFirst(tpDelivery.getDeliveryId());
+            cachedDisposition.setLast(tpDelivery.getDeliveryId());
+            cachedDisposition.setRole(Role.SENDER);
+            cachedDisposition.setSettled(delivery.isSettled());
+            cachedDisposition.setBatchable(false);  // Enforce default is written
             if(delivery.isSettled())
             {
                 tpDelivery.settled();
             }
-            disposition.setState(delivery.getLocalState());
+            cachedDisposition.setState(delivery.getLocalState());
 
-            writeFrame(tpSession.getLocalChannel(), disposition, null,
-                       null);
+            writeFrame(tpSession.getLocalChannel(), cachedDisposition, null, null);
         }
 
         return !delivery.isBuffered();
     }
 
-    private boolean processTransportWorkReceiver(DeliveryImpl delivery,
-                                                 ReceiverImpl rcv)
+    private boolean processTransportWorkReceiver(DeliveryImpl delivery, ReceiverImpl rcv)
     {
         TransportDelivery tpDelivery = delivery.getTransportDelivery();
         SessionImpl session = rcv.getSession();
@@ -684,19 +699,19 @@
         {
             boolean settled = delivery.isSettled();
             DeliveryState localState = delivery.getLocalState();
-
-            Disposition disposition = new Disposition();
-            disposition.setFirst(tpDelivery.getDeliveryId());
-            disposition.setLast(tpDelivery.getDeliveryId());
-            disposition.setRole(Role.RECEIVER);
-            disposition.setSettled(settled);
-            disposition.setState(localState);
+            // Use cached object as holder of data for immediate write to the FrameWriter
+            cachedDisposition.setFirst(tpDelivery.getDeliveryId());
+            cachedDisposition.setLast(tpDelivery.getDeliveryId());
+            cachedDisposition.setRole(Role.RECEIVER);
+            cachedDisposition.setSettled(settled);
+            cachedDisposition.setState(localState);
+            cachedDisposition.setBatchable(false);  // Enforce default is written
 
             if(localState == null && settled) {
-                disposition.setState(delivery.getDefaultDeliveryState());
+                cachedDisposition.setState(delivery.getDefaultDeliveryState());
             }
 
-            writeFrame(tpSession.getLocalChannel(), disposition, null, null);
+            writeFrame(tpSession.getLocalChannel(), cachedDisposition, null, null);
             if (settled)
             {
                 tpDelivery.settled();
@@ -1685,23 +1700,6 @@
         return "TransportImpl [_connectionEndpoint=" + _connectionEndpoint + ", " + super.toString() + "]";
     }
 
-    private static class PartialTransferHandler implements Runnable
-    {
-        private Transfer _transfer;
-
-        PartialTransferHandler setTransfer(Transfer transfer)
-        {
-            this._transfer = transfer;
-            return this;
-        }
-
-        @Override
-        public void run()
-        {
-            _transfer.setMore(true);
-        }
-    }
-
     /**
      * Override the default frame handler. Must be called before the transport starts being used
      * (e.g. {@link #getInputBuffer()}, {@link #getOutputBuffer()}, {@link #ssl(SslDomain)} etc).
diff --git a/proton-j/src/test/java/org/apache/qpid/proton/amqp/transport/AttachTest.java b/proton-j/src/test/java/org/apache/qpid/proton/amqp/transport/AttachTest.java
new file mode 100644
index 0000000..4cc9137
--- /dev/null
+++ b/proton-j/src/test/java/org/apache/qpid/proton/amqp/transport/AttachTest.java
@@ -0,0 +1,65 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.qpid.proton.amqp.transport;
+
+import static org.junit.Assert.assertArrayEquals;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertNull;
+
+import org.apache.qpid.proton.amqp.Symbol;
+import org.apache.qpid.proton.amqp.UnsignedInteger;
+import org.apache.qpid.proton.amqp.UnsignedLong;
+import org.junit.Test;
+
+public class AttachTest {
+
+    @Test
+    public void testCopy() {
+        Attach attach = new Attach();
+
+        attach.setName("test");
+        attach.setHandle(UnsignedInteger.ONE);
+        attach.setRole(Role.RECEIVER);
+        attach.setSndSettleMode(SenderSettleMode.MIXED);
+        attach.setRcvSettleMode(ReceiverSettleMode.SECOND);
+        attach.setSource(null);
+        attach.setTarget(new org.apache.qpid.proton.amqp.messaging.Target());
+        attach.setUnsettled(null);
+        attach.setIncompleteUnsettled(false);
+        attach.setInitialDeliveryCount(UnsignedInteger.valueOf(42));
+        attach.setMaxMessageSize(UnsignedLong.valueOf(1024));
+        attach.setOfferedCapabilities(new Symbol[] { Symbol.valueOf("anonymous-relay") });
+        attach.setDesiredCapabilities(new Symbol[0]);
+
+        final Attach copyOf = attach.copy();
+
+        assertEquals(attach.getName(), copyOf.getName());
+        assertArrayEquals(attach.getDesiredCapabilities(), copyOf.getDesiredCapabilities());
+        assertEquals(attach.getHandle(), copyOf.getHandle());
+        assertEquals(attach.getRole(), copyOf.getRole());
+        assertEquals(attach.getSndSettleMode(), copyOf.getSndSettleMode());
+        assertEquals(attach.getRcvSettleMode(), copyOf.getRcvSettleMode());
+        assertNull(copyOf.getSource());
+        assertNotNull(copyOf.getTarget());
+        assertEquals(attach.getUnsettled(), copyOf.getUnsettled());
+        assertEquals(attach.getIncompleteUnsettled(), copyOf.getIncompleteUnsettled());
+        assertEquals(attach.getMaxMessageSize(), copyOf.getMaxMessageSize());
+        assertEquals(attach.getInitialDeliveryCount(), copyOf.getInitialDeliveryCount());
+        assertArrayEquals(attach.getOfferedCapabilities(), copyOf.getOfferedCapabilities());
+    }
+}
diff --git a/proton-j/src/test/java/org/apache/qpid/proton/amqp/transport/BeginTest.java b/proton-j/src/test/java/org/apache/qpid/proton/amqp/transport/BeginTest.java
new file mode 100644
index 0000000..2ef3b80
--- /dev/null
+++ b/proton-j/src/test/java/org/apache/qpid/proton/amqp/transport/BeginTest.java
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.qpid.proton.amqp.transport;
+
+import static org.junit.Assert.assertArrayEquals;
+import static org.junit.Assert.assertEquals;
+
+import java.util.HashMap;
+import java.util.Map;
+
+import org.apache.qpid.proton.amqp.Symbol;
+import org.apache.qpid.proton.amqp.UnsignedInteger;
+import org.apache.qpid.proton.amqp.UnsignedShort;
+import org.junit.Test;
+
+public class BeginTest {
+
+    @Test
+    public void testCopy() {
+        Map<Symbol, Object> properties = new HashMap<>();
+        properties.put(Symbol.valueOf("x-opt"), "value");
+
+        Begin begin = new Begin();
+        begin.setRemoteChannel(UnsignedShort.valueOf((short) 2));
+        begin.setNextOutgoingId(UnsignedInteger.valueOf(10));
+        begin.setIncomingWindow(UnsignedInteger.valueOf(11));
+        begin.setOutgoingWindow(UnsignedInteger.valueOf(12));
+        begin.setHandleMax(UnsignedInteger.valueOf(13));
+        begin.setDesiredCapabilities(new Symbol[0]);
+        begin.setOfferedCapabilities(new Symbol[] { Symbol.valueOf("anonymous-relay") });
+        begin.setProperties(properties);
+
+        final Begin copyOf = begin.copy();
+
+        assertEquals(begin.getRemoteChannel(), copyOf.getRemoteChannel());
+        assertEquals(begin.getNextOutgoingId(), copyOf.getNextOutgoingId());
+        assertEquals(begin.getIncomingWindow(), copyOf.getIncomingWindow());
+        assertEquals(begin.getOutgoingWindow(), copyOf.getOutgoingWindow());
+        assertEquals(begin.getHandleMax(), copyOf.getHandleMax());
+        assertArrayEquals(begin.getDesiredCapabilities(), copyOf.getDesiredCapabilities());
+        assertArrayEquals(begin.getOfferedCapabilities(), copyOf.getOfferedCapabilities());
+        assertEquals(begin.getProperties(), copyOf.getProperties());
+    }
+}
diff --git a/proton-j/src/test/java/org/apache/qpid/proton/amqp/transport/CloseTest.java b/proton-j/src/test/java/org/apache/qpid/proton/amqp/transport/CloseTest.java
new file mode 100644
index 0000000..80f6f11
--- /dev/null
+++ b/proton-j/src/test/java/org/apache/qpid/proton/amqp/transport/CloseTest.java
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.qpid.proton.amqp.transport;
+
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertNull;
+
+import org.junit.Test;
+
+public class CloseTest {
+
+    @Test
+    public void testCopy() {
+        Close close = new Close();
+        Close copyOf = close.copy();
+
+        assertNull(copyOf.getError());
+        close.setError(new ErrorCondition());
+        copyOf = close.copy();
+        assertNotNull(copyOf.getError());
+    }
+}
diff --git a/proton-j/src/test/java/org/apache/qpid/proton/amqp/transport/DetachTest.java b/proton-j/src/test/java/org/apache/qpid/proton/amqp/transport/DetachTest.java
new file mode 100644
index 0000000..7478304
--- /dev/null
+++ b/proton-j/src/test/java/org/apache/qpid/proton/amqp/transport/DetachTest.java
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.qpid.proton.amqp.transport;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertNull;
+
+import org.apache.qpid.proton.amqp.UnsignedInteger;
+import org.junit.Test;
+
+public class DetachTest {
+
+    @Test
+    public void testCopy() {
+        Detach detach = new Detach();
+        Detach copyOf = detach.copy();
+
+        assertNull(copyOf.getError());
+
+        detach.setError(new ErrorCondition());
+        detach.setClosed(true);
+        detach.setHandle(UnsignedInteger.ONE);
+
+        copyOf = detach.copy();
+
+        assertNotNull(copyOf.getError());
+        assertEquals(detach.getClosed(), copyOf.getClosed());
+        assertEquals(detach.getHandle(), copyOf.getHandle());
+    }
+}
diff --git a/proton-j/src/test/java/org/apache/qpid/proton/amqp/transport/EndTest.java b/proton-j/src/test/java/org/apache/qpid/proton/amqp/transport/EndTest.java
new file mode 100644
index 0000000..b0f23dc
--- /dev/null
+++ b/proton-j/src/test/java/org/apache/qpid/proton/amqp/transport/EndTest.java
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.qpid.proton.amqp.transport;
+
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertNull;
+
+import org.junit.Test;
+
+public class EndTest {
+
+    @Test
+    public void testCopy() {
+        End end = new End();
+        End copyOf = end.copy();
+
+        assertNull(copyOf.getError());
+        end.setError(new ErrorCondition());
+        copyOf = end.copy();
+        assertNotNull(copyOf.getError());
+    }
+}
diff --git a/proton-j/src/test/java/org/apache/qpid/proton/amqp/transport/FlowTest.java b/proton-j/src/test/java/org/apache/qpid/proton/amqp/transport/FlowTest.java
new file mode 100644
index 0000000..fb7fdb3
--- /dev/null
+++ b/proton-j/src/test/java/org/apache/qpid/proton/amqp/transport/FlowTest.java
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.qpid.proton.amqp.transport;
+
+import static org.junit.Assert.assertEquals;
+
+import java.util.HashMap;
+
+import org.apache.qpid.proton.amqp.UnsignedInteger;
+import org.junit.Test;
+
+public class FlowTest {
+
+    @Test
+    public void test() {
+        Flow flow = new Flow();
+
+        flow.setNextIncomingId(UnsignedInteger.valueOf(1));
+        flow.setIncomingWindow(UnsignedInteger.valueOf(2));
+        flow.setNextOutgoingId(UnsignedInteger.valueOf(3));
+        flow.setOutgoingWindow(UnsignedInteger.valueOf(4));
+        flow.setHandle(UnsignedInteger.valueOf(5));
+        flow.setDeliveryCount(UnsignedInteger.valueOf(6));
+        flow.setLinkCredit(UnsignedInteger.valueOf(7));
+        flow.setAvailable(UnsignedInteger.valueOf(8));
+        flow.setDrain(true);
+        flow.setEcho(true);
+        flow.setProperties(new HashMap<>());
+
+        Flow copyOf = flow.copy();
+
+        assertEquals(flow.getNextIncomingId(), copyOf.getNextIncomingId());
+        assertEquals(flow.getIncomingWindow(), copyOf.getIncomingWindow());
+        assertEquals(flow.getNextOutgoingId(), copyOf.getNextOutgoingId());
+        assertEquals(flow.getOutgoingWindow(), copyOf.getOutgoingWindow());
+        assertEquals(flow.getHandle(), copyOf.getHandle());
+        assertEquals(flow.getDeliveryCount(), copyOf.getDeliveryCount());
+        assertEquals(flow.getLinkCredit(), copyOf.getLinkCredit());
+        assertEquals(flow.getAvailable(), copyOf.getAvailable());
+        assertEquals(flow.getDrain(), copyOf.getDrain());
+        assertEquals(flow.getEcho(), copyOf.getEcho());
+        assertEquals(flow.getProperties(), copyOf.getProperties());
+    }
+}
diff --git a/proton-j/src/test/java/org/apache/qpid/proton/amqp/transport/OpenTest.java b/proton-j/src/test/java/org/apache/qpid/proton/amqp/transport/OpenTest.java
new file mode 100644
index 0000000..91ea2b7
--- /dev/null
+++ b/proton-j/src/test/java/org/apache/qpid/proton/amqp/transport/OpenTest.java
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.qpid.proton.amqp.transport;
+
+import static org.junit.Assert.assertArrayEquals;
+import static org.junit.Assert.assertEquals;
+
+import java.util.HashMap;
+import java.util.Map;
+
+import org.apache.qpid.proton.amqp.Symbol;
+import org.apache.qpid.proton.amqp.UnsignedInteger;
+import org.apache.qpid.proton.amqp.UnsignedShort;
+import org.junit.Test;
+
+public class OpenTest {
+
+    @Test
+    public void testCopy() {
+        Map<Symbol, Object> properties = new HashMap<>();
+        properties.put(Symbol.valueOf("x-opt"), "value");
+
+        Open open = new Open();
+
+        open.setContainerId("test");
+        open.setHostname("host");
+        open.setMaxFrameSize(UnsignedInteger.valueOf(42));
+        open.setChannelMax(UnsignedShort.MAX_VALUE);
+        open.setIdleTimeOut(UnsignedInteger.valueOf(111));
+        open.setOfferedCapabilities(new Symbol[] { Symbol.valueOf("anonymous-relay") });
+        open.setDesiredCapabilities(new Symbol[0]);
+        open.setProperties(properties);
+
+        Open copyOf = open.copy();
+
+        assertEquals(open.getContainerId(), copyOf.getContainerId());
+        assertEquals(open.getHostname(), copyOf.getHostname());
+        assertEquals(open.getMaxFrameSize(), copyOf.getMaxFrameSize());
+        assertEquals(open.getChannelMax(), copyOf.getChannelMax());
+        assertEquals(open.getIdleTimeOut(), copyOf.getIdleTimeOut());
+        assertArrayEquals(open.getDesiredCapabilities(), copyOf.getDesiredCapabilities());
+        assertArrayEquals(open.getOfferedCapabilities(), copyOf.getOfferedCapabilities());
+        assertEquals(open.getProperties(), copyOf.getProperties());
+    }
+}
diff --git a/proton-j/src/test/java/org/apache/qpid/proton/amqp/transport/TransferTest.java b/proton-j/src/test/java/org/apache/qpid/proton/amqp/transport/TransferTest.java
new file mode 100644
index 0000000..62ebc97
--- /dev/null
+++ b/proton-j/src/test/java/org/apache/qpid/proton/amqp/transport/TransferTest.java
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.qpid.proton.amqp.transport;
+
+import static org.junit.Assert.assertEquals;
+
+import org.apache.qpid.proton.amqp.Binary;
+import org.apache.qpid.proton.amqp.UnsignedInteger;
+import org.junit.Test;
+
+public class TransferTest {
+
+    @Test
+    public void testCopyTransfers() {
+        Transfer transfer = new Transfer();
+        transfer.setHandle(UnsignedInteger.ONE);
+        transfer.setDeliveryTag(new Binary(new byte[] {0, 1}));
+        transfer.setMessageFormat(UnsignedInteger.ZERO);
+        transfer.setDeliveryId(UnsignedInteger.valueOf(127));
+        transfer.setAborted(false);
+        transfer.setBatchable(true);
+        transfer.setRcvSettleMode(ReceiverSettleMode.SECOND);
+
+        final Transfer copyOf = transfer.copy();
+
+        assertEquals(transfer.getHandle(), copyOf.getHandle());
+        assertEquals(transfer.getMessageFormat(), copyOf.getMessageFormat());
+        assertEquals(transfer.getDeliveryTag(), copyOf.getDeliveryTag());
+        assertEquals(transfer.getDeliveryId(), copyOf.getDeliveryId());
+        assertEquals(transfer.getAborted(), copyOf.getAborted());
+        assertEquals(transfer.getBatchable(), copyOf.getBatchable());
+        assertEquals(transfer.getRcvSettleMode(), copyOf.getRcvSettleMode());
+    }
+}
diff --git a/proton-j/src/test/java/org/apache/qpid/proton/codec/TransferTypeTest.java b/proton-j/src/test/java/org/apache/qpid/proton/codec/TransferTypeTest.java
index 8e2d088..5bfba50 100644
--- a/proton-j/src/test/java/org/apache/qpid/proton/codec/TransferTypeTest.java
+++ b/proton-j/src/test/java/org/apache/qpid/proton/codec/TransferTypeTest.java
@@ -16,7 +16,7 @@
  */
 package org.apache.qpid.proton.codec;
 
-import static org.junit.Assert.*;
+import static org.junit.Assert.assertEquals;
 
 import org.apache.qpid.proton.amqp.Binary;
 import org.apache.qpid.proton.amqp.UnsignedInteger;
diff --git a/proton-j/src/test/java/org/apache/qpid/proton/engine/impl/TransportImplTest.java b/proton-j/src/test/java/org/apache/qpid/proton/engine/impl/TransportImplTest.java
index 203274f..0be219c 100644
--- a/proton-j/src/test/java/org/apache/qpid/proton/engine/impl/TransportImplTest.java
+++ b/proton-j/src/test/java/org/apache/qpid/proton/engine/impl/TransportImplTest.java
@@ -379,7 +379,7 @@
         protected void writeFrame(int channel, FrameBody frameBody,
                                   ReadableBuffer payload, Runnable onPayloadTooLarge) {
             super.writeFrame(channel, frameBody, payload, onPayloadTooLarge);
-            writes.addLast(frameBody);
+            writes.addLast(frameBody != null ? frameBody.copy() : null);
         }
     }