diff --git a/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/ConnectionImpl.java b/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/ConnectionImpl.java
index 2878a39..57d4a1e 100644
--- a/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/ConnectionImpl.java
+++ b/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/ConnectionImpl.java
@@ -442,6 +442,8 @@
             next.setWorkPrev(prev);
         }
 
+        delivery.setWorkNext(null);
+        delivery.setWorkPrev(null);
 
         if(_workHead == delivery)
         {
@@ -550,6 +552,8 @@
             next.setTransportWorkPrev(prev);
         }
 
+        delivery.setTransportWorkNext(null);
+        delivery.setTransportWorkPrev(null);
 
         if(_transportWorkHead == delivery)
         {
diff --git a/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/DeliveryImpl.java b/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/DeliveryImpl.java
index 0bdb163..8b47231 100644
--- a/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/DeliveryImpl.java
+++ b/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/DeliveryImpl.java
@@ -151,6 +151,7 @@
         {
             _transportDelivery.settled();
         }
+
         if(_link.current() == this)
         {
             _link.advance();
@@ -161,11 +162,16 @@
         {
             _linkPrevious._linkNext = _linkNext;
         }
+
         if(_linkNext != null)
         {
             _linkNext._linkPrevious = _linkPrevious;
         }
+
         updateWork();
+
+        _linkNext= null;
+        _linkPrevious = null;
     }
 
     DeliveryImpl getLinkNext()
diff --git a/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/LinkImpl.java b/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/LinkImpl.java
index a67785e..0d030cb 100644
--- a/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/LinkImpl.java
+++ b/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/LinkImpl.java
@@ -130,8 +130,9 @@
     {
         DeliveryImpl dlv = _head;
         while (dlv != null) {
+            DeliveryImpl next = dlv.next();
             dlv.free();
-            dlv = dlv.next();
+            dlv = next;
         }
 
         _session.getConnectionImpl().removeLinkEndpoint(_node);
@@ -142,7 +143,11 @@
         modified();
     }
 
-    public void remove(DeliveryImpl delivery)
+    /*
+     * Called when settling a message to ensure that the head/tail refs of the link are updated.
+     * The caller ensures the delivery updates its own refs appropriately.
+     */
+    void remove(DeliveryImpl delivery)
     {
         if(_head == delivery)
         {
@@ -152,10 +157,6 @@
         {
             _tail = delivery.getLinkPrevious();
         }
-        if(_current == delivery)
-        {
-            // TODO - what???
-        }
     }
 
     @Override
diff --git a/proton-j/src/test/java/org/apache/qpid/proton/engine/impl/DeferredSettlementTest.java b/proton-j/src/test/java/org/apache/qpid/proton/engine/impl/DeferredSettlementTest.java
new file mode 100644
index 0000000..34a26e6
--- /dev/null
+++ b/proton-j/src/test/java/org/apache/qpid/proton/engine/impl/DeferredSettlementTest.java
@@ -0,0 +1,467 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.qpid.proton.engine.impl;
+
+import static java.util.EnumSet.of;
+import static org.apache.qpid.proton.engine.EndpointState.ACTIVE;
+import static org.apache.qpid.proton.engine.EndpointState.UNINITIALIZED;
+import static org.apache.qpid.proton.systemtests.TestLoggingHelper.bold;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertSame;
+import static org.junit.Assert.assertTrue;
+
+import java.nio.charset.StandardCharsets;
+import java.util.Arrays;
+import java.util.logging.Logger;
+
+import org.apache.qpid.proton.Proton;
+import org.apache.qpid.proton.amqp.messaging.Accepted;
+import org.apache.qpid.proton.amqp.messaging.AmqpValue;
+import org.apache.qpid.proton.amqp.messaging.Source;
+import org.apache.qpid.proton.amqp.messaging.Target;
+import org.apache.qpid.proton.amqp.transport.ReceiverSettleMode;
+import org.apache.qpid.proton.amqp.transport.SenderSettleMode;
+import org.apache.qpid.proton.engine.Connection;
+import org.apache.qpid.proton.engine.Delivery;
+import org.apache.qpid.proton.engine.Receiver;
+import org.apache.qpid.proton.engine.Sender;
+import org.apache.qpid.proton.engine.Session;
+import org.apache.qpid.proton.engine.Transport;
+import org.apache.qpid.proton.message.Message;
+import org.apache.qpid.proton.systemtests.EngineTestBase;
+import org.apache.qpid.proton.systemtests.ProtocolTracerEnabler;
+import org.apache.qpid.proton.systemtests.TestLoggingHelper;
+import org.junit.Test;
+
+public class DeferredSettlementTest extends EngineTestBase
+{
+    private static final Logger LOGGER = Logger.getLogger(DeferredSettlementTest.class.getName());
+
+    private static final int BUFFER_SIZE = 4096;
+
+    private final String _sourceAddress = getServer().getContainerId() + "-link1-source";
+
+    @Test
+    public void testDeferredOutOfOrderSettlement() throws Exception
+    {
+        LOGGER.fine(bold("======== About to create transports"));
+
+        Transport clientTransport = Proton.transport();
+        getClient().setTransport(clientTransport);
+        ProtocolTracerEnabler.setProtocolTracer(clientTransport, TestLoggingHelper.CLIENT_PREFIX);
+
+        Transport serverTransport = Proton.transport();
+        getServer().setTransport(serverTransport);
+        ProtocolTracerEnabler.setProtocolTracer(serverTransport, "            " + TestLoggingHelper.SERVER_PREFIX);
+
+        doOutputInputCycle();
+
+        Connection clientConnection = Proton.connection();
+        getClient().setConnection(clientConnection);
+        clientTransport.bind(clientConnection);
+
+        Connection serverConnection = Proton.connection();
+        getServer().setConnection(serverConnection);
+        serverTransport.bind(serverConnection);
+
+        LOGGER.fine(bold("======== About to open connections"));
+        clientConnection.open();
+        serverConnection.open();
+
+        doOutputInputCycle();
+
+        LOGGER.fine(bold("======== About to open sessions"));
+        Session clientSession = clientConnection.session();
+        getClient().setSession(clientSession);
+        clientSession.open();
+
+        pumpClientToServer();
+
+        Session serverSession = serverConnection.sessionHead(of(UNINITIALIZED), of(ACTIVE));
+        getServer().setSession(serverSession);
+        assertEndpointState(serverSession, UNINITIALIZED, ACTIVE);
+
+        serverSession.open();
+        assertEndpointState(serverSession, ACTIVE, ACTIVE);
+
+        pumpServerToClient();
+        assertEndpointState(clientSession, ACTIVE, ACTIVE);
+
+        LOGGER.fine(bold("======== About to create receiver"));
+
+        Source clientSource = new Source();
+        getClient().setSource(clientSource);
+        clientSource.setAddress(_sourceAddress);
+
+        Target clientTarget = new Target();
+        getClient().setTarget(clientTarget);
+        clientTarget.setAddress(null);
+
+        Receiver clientReceiver = clientSession.receiver("link1");
+        getClient().setReceiver(clientReceiver);
+        clientReceiver.setTarget(clientTarget);
+        clientReceiver.setSource(clientSource);
+
+        clientReceiver.setReceiverSettleMode(ReceiverSettleMode.FIRST);
+        clientReceiver.setSenderSettleMode(SenderSettleMode.UNSETTLED);
+
+        assertEndpointState(clientReceiver, UNINITIALIZED, UNINITIALIZED);
+
+        clientReceiver.open();
+        assertEndpointState(clientReceiver, ACTIVE, UNINITIALIZED);
+
+        pumpClientToServer();
+
+        LOGGER.fine(bold("======== About to set up implicitly created sender"));
+
+        Sender serverSender = (Sender) getServer().getConnection().linkHead(of(UNINITIALIZED), of(ACTIVE));
+        getServer().setSender(serverSender);
+
+        serverSender.setReceiverSettleMode(serverSender.getRemoteReceiverSettleMode());
+        serverSender.setSenderSettleMode(serverSender.getRemoteSenderSettleMode());
+
+        org.apache.qpid.proton.amqp.transport.Source serverRemoteSource = serverSender.getRemoteSource();
+        serverSender.setSource(serverRemoteSource);
+
+        assertEndpointState(serverSender, UNINITIALIZED, ACTIVE);
+        serverSender.open();
+
+        assertEndpointState(serverSender, ACTIVE, ACTIVE);
+
+        pumpServerToClient();
+
+        assertEndpointState(clientReceiver, ACTIVE, ACTIVE);
+
+        int messageCount = 5;
+        clientReceiver.flow(messageCount);
+
+        pumpClientToServer();
+
+        LOGGER.fine(bold("======== About to create messages and send to the client"));
+
+        DeliveryImpl[] serverDeliveries = sendMessagesToClient(messageCount);
+
+        pumpServerToClient();
+
+        for (int i = 0; i < messageCount; i++) {
+            Delivery d = serverDeliveries[i];
+            assertNotNull("Should have had a delivery", d);
+            assertNull("Delivery shouldnt have local state", d.getLocalState());
+            assertNull("Delivery shouldnt have remote state", d.getRemoteState());
+        }
+
+        LOGGER.fine(bold("======== About to process the messages on the client"));
+
+        // Grab the original linkHead, assert deliveries are there, keep refs for later
+        DeliveryImpl d0 = (DeliveryImpl) clientReceiver.head();
+        assertNotNull("Should have a link head", d0);
+        DeliveryImpl[] origClientLinkDeliveries = new DeliveryImpl[messageCount];
+        for (int i = 0 ; i < messageCount; i++) {
+            origClientLinkDeliveries[i] = d0;
+
+            DeliveryImpl linkPrevious = d0.getLinkPrevious();
+            DeliveryImpl linkNext = d0.getLinkNext();
+
+            if(i == 0) {
+                assertNull("should not have link prev", linkPrevious);
+            } else {
+                assertNotNull("should have link prev", linkPrevious);
+                assertSame("Unexpected delivery at link prev", origClientLinkDeliveries[i - 1], linkPrevious);
+                assertSame("Expected to be prior deliveries link next", d0, origClientLinkDeliveries[i - 1].getLinkNext());
+            }
+
+            if(i != messageCount - 1) {
+                assertNotNull("should have link next", linkNext);
+            } else {
+                assertNull("should not have link next", linkNext);
+            }
+
+            d0 = linkNext;
+        }
+
+        // Receive the deliveries and verify contents, marking with matching integer context for easy identification.
+        DeliveryImpl[] clientDeliveries = receiveMessagesFromServer(messageCount);
+
+        // Accept but don't settle them all
+        for (int i = 0; i < messageCount; i++) {
+            Delivery d = clientDeliveries[i];
+            assertNotNull("Should have had a delivery", d);
+
+            d.disposition(Accepted.getInstance());
+        }
+
+        // Verify the client lists, i.e. deliveries now point to each other where expected
+        for (int i = 0 ; i < messageCount; i++) {
+            DeliveryImpl d = origClientLinkDeliveries[i];
+
+            assertSame("Unexpected delivery", origClientLinkDeliveries[i], clientDeliveries[i]);
+
+            // Verify the Transport and Link list entries
+            if(i == 0) {
+                assertDeliveryLinkReferences(d, i, null, origClientLinkDeliveries[1]);
+                assertDeliveryTransportWorkReferences(d, i, null, origClientLinkDeliveries[1]);
+            } else if (i != messageCount - 1) {
+                assertDeliveryLinkReferences(d, i, origClientLinkDeliveries[i - 1], origClientLinkDeliveries[i+1]);
+                assertDeliveryTransportWorkReferences(d, i, origClientLinkDeliveries[i - 1], origClientLinkDeliveries[i+1]);
+            }
+            else {
+                assertDeliveryLinkReferences(d, i, origClientLinkDeliveries[i - 1], null);
+                assertDeliveryTransportWorkReferences(d, i, origClientLinkDeliveries[i - 1], null);
+            }
+
+            // Assert there are no 'work' list entries, as those are for remote peer updates.
+            assertDeliveryWorkReferences(d, i, null, null);
+        }
+
+        // Verify the server gets intended state changes
+        pumpClientToServer();
+        for (int i = 0; i < messageCount; i++) {
+            DeliveryImpl d = serverDeliveries[i];
+            assertNotNull("Should have had a delivery", d);
+            assertNull("Delivery shouldnt have local state", d.getLocalState());
+            assertEquals("Delivery should have remote state", Accepted.getInstance(), d.getRemoteState());
+
+            // Verify the Link and Work list entries
+            if(i == 0) {
+                assertDeliveryLinkReferences(d, null, null, serverDeliveries[1]);
+                assertDeliveryWorkReferences(d, null, null, serverDeliveries[1]);
+            } else if (i != messageCount - 1) {
+                assertDeliveryLinkReferences(d, null, serverDeliveries[i - 1], serverDeliveries[i+1]);
+                assertDeliveryWorkReferences(d, null, serverDeliveries[i - 1], serverDeliveries[i+1]);
+            }
+            else {
+                assertDeliveryLinkReferences(d, null, serverDeliveries[i - 1], null);
+                assertDeliveryWorkReferences(d, null, serverDeliveries[i - 1], null);
+            }
+
+            // Assert there are no 'transport work' list entries, as those are for local updates.
+            assertDeliveryTransportWorkReferences(d, null, null, null);
+        }
+
+        // Settle one from the middle
+        int toSettle = messageCount/2;
+        assertTrue("need more deliveries", toSettle > 1);
+        assertTrue("need more deliveries", toSettle < messageCount - 1);
+
+        DeliveryImpl dSettle = clientDeliveries[toSettle];
+        Integer index = getDeliveryContextIndex(dSettle);
+
+        // Verify the server gets intended state changes when settled
+        assertFalse("Delivery should not have been remotely settled yet", serverDeliveries[toSettle].remotelySettled());
+        dSettle.settle();
+
+        // Verify the client delivery Link and Work list entries are cleared, tpWork is set
+        assertDeliveryLinkReferences(dSettle, index, null, null);
+        assertDeliveryWorkReferences(dSettle, index, null, null);
+        assertDeliveryTransportWorkReferences(dSettle, index, null, null);
+        assertSame("expected settled delivery to be client connection tpWork head", dSettle, ((ConnectionImpl) clientConnection).getTransportWorkHead());
+
+        // Verify the client Link and Work list entries are correct for neighbouring deliveries
+        assertDeliveryLinkReferences(clientDeliveries[toSettle - 1], index - 1, clientDeliveries[toSettle - 2], clientDeliveries[toSettle + 1]);
+        assertDeliveryTransportWorkReferences(clientDeliveries[toSettle - 1], index - 1, null, null);
+        assertDeliveryWorkReferences(clientDeliveries[toSettle - 1], index - 1, null, null);
+
+        assertDeliveryLinkReferences(clientDeliveries[toSettle + 1], index + 1, clientDeliveries[toSettle - 1], clientDeliveries[toSettle + 2]);
+        assertDeliveryTransportWorkReferences(clientDeliveries[toSettle + 1], index + 1, null, null);
+        assertDeliveryWorkReferences(clientDeliveries[toSettle + 1], index + 1, null, null);
+
+        // Update the server with the changes
+        pumpClientToServer();
+
+        // Verify server delivery is now remotelySettled, its Link and Work entries are NOT yet clear, but tpWork IS clear
+        DeliveryImpl dSettleServer = serverDeliveries[toSettle];
+        assertTrue("Delivery should have been remotely settled on server", dSettleServer.remotelySettled());
+
+        assertDeliveryLinkReferences(dSettleServer, null, serverDeliveries[toSettle - 1], serverDeliveries[toSettle+1]);
+        assertDeliveryWorkReferences(dSettleServer, null, serverDeliveries[toSettle - 1], serverDeliveries[toSettle+1]);
+        assertDeliveryTransportWorkReferences(dSettleServer, null, null, null);
+
+        assertNull("expected client connection tpWork head to now be null", ((ConnectionImpl) clientConnection).getTransportWorkHead());
+
+        // Settle on server, expect Link and Work list entries to be cleared, tpWork to remain clear (as delivery
+        // is already remotely settled). Note 'work next' returns list head if none present, so we verify that here.
+        dSettleServer.settle();
+
+        assertDeliveryLinkReferences(dSettleServer, null, null, null);
+        assertNull("Unexpected workPrev", dSettleServer.getWorkPrev());
+        assertSame("Unexpected workNext", serverDeliveries[0], dSettleServer.getWorkNext());
+        assertDeliveryTransportWorkReferences(dSettleServer, null, null, null);
+
+        assertNull("expected server connection tpWork head to still be null", ((ConnectionImpl) serverConnection).getTransportWorkHead());
+
+        // Verify the server entries are correct for neighbouring deliveries updated to reflect the settle
+        assertDeliveryLinkReferences(serverDeliveries[toSettle - 1], null, serverDeliveries[toSettle - 2], serverDeliveries[toSettle + 1]);
+        assertDeliveryWorkReferences(serverDeliveries[toSettle - 1], null, serverDeliveries[toSettle - 2], serverDeliveries[toSettle + 1]);
+        assertDeliveryTransportWorkReferences(serverDeliveries[toSettle - 1], null, null, null);
+
+        assertDeliveryLinkReferences(serverDeliveries[toSettle + 1], null, serverDeliveries[toSettle - 1], serverDeliveries[toSettle + 2]);
+        assertDeliveryWorkReferences(serverDeliveries[toSettle + 1], null, serverDeliveries[toSettle - 1], serverDeliveries[toSettle + 2]);
+        assertDeliveryTransportWorkReferences(serverDeliveries[toSettle + 1], null, null, null);
+    }
+
+    private Integer getDeliveryContextIndex(DeliveryImpl d) {
+        assertNotNull("Should have had a delivery", d);
+        Integer index = (Integer) d.getContext();
+        assertNotNull("Should have had a context index", index);
+
+        return index;
+    }
+
+    private void assertDeliveryWorkReferences(DeliveryImpl delivery, Integer index, DeliveryImpl deliveryWorkPrev, DeliveryImpl deliveryWorkNext) {
+        assertNotNull("No delivery given", delivery);
+        if(index != null) {
+            assertEquals("Unexpected context index", Integer.valueOf(index), getDeliveryContextIndex(delivery));
+        }
+
+        if(deliveryWorkPrev == null) {
+            assertNull("Unexpected workPrev", delivery.getWorkPrev());
+        } else {
+            assertSame("Unexpected workPrev", deliveryWorkPrev, delivery.getWorkPrev());
+            assertSame("Unexpected workNext on previous delivery", delivery, deliveryWorkPrev.getWorkNext());
+        }
+
+        if(deliveryWorkNext == null) {
+            assertNull("Unexpected workNext", delivery.getWorkNext());
+        } else {
+            assertSame("Unexpected workNext", deliveryWorkNext, delivery.getWorkNext());
+            assertSame("Unexpected workPrev on next delivery",  delivery , deliveryWorkNext.getWorkPrev());
+        }
+    }
+
+    private void assertDeliveryTransportWorkReferences(DeliveryImpl delivery, Integer index, DeliveryImpl deliveryTpWorkPrev, DeliveryImpl deliveryTpWorkNext) {
+        assertNotNull("No delivery given", delivery);
+        if(index != null) {
+            assertEquals("Unexpected context index", Integer.valueOf(index), getDeliveryContextIndex(delivery));
+        }
+
+        if(deliveryTpWorkPrev == null) {
+            assertNull("Unexpected transportWorkPrev", delivery.getTransportWorkPrev());
+        } else {
+            assertSame("Unexpected transportWorkPrev", deliveryTpWorkPrev, delivery.getTransportWorkPrev());
+            assertSame("Unexpected transportWorkNext on previous delivery", delivery, deliveryTpWorkPrev.getTransportWorkNext());
+        }
+
+        if (deliveryTpWorkNext == null) {
+            assertNull("Unexpected transportWorkNext", delivery.getTransportWorkNext());
+        } else {
+            assertSame("Unexpected transportWorkNext", deliveryTpWorkNext, delivery.getTransportWorkNext());
+            assertSame("Unexpected transportWorkPrev on next delivery",  delivery , deliveryTpWorkNext.getTransportWorkPrev());
+        }
+    }
+
+    private void assertDeliveryLinkReferences(DeliveryImpl delivery, Integer index, DeliveryImpl deliveryLinkPrev, DeliveryImpl deliveryLinkNext) {
+        assertNotNull("No delivery given", delivery);
+        if(index != null) {
+            assertEquals("Unexpected context index", Integer.valueOf(index), getDeliveryContextIndex(delivery));
+        }
+
+        if(deliveryLinkPrev == null) {
+            assertNull("Unexpected linkPrev", delivery.getLinkPrevious());
+        } else {
+            assertSame("Unexpected linkPrev", deliveryLinkPrev, delivery.getLinkPrevious());
+            assertSame("Unexpected linkPrev on previous delivery", delivery, deliveryLinkPrev.getLinkNext());
+        }
+
+        if(deliveryLinkNext == null) {
+            assertNull("Unexpected linkNext", delivery.getLinkNext());
+        } else {
+            assertSame("Unexpected linkNext", deliveryLinkNext, delivery.getLinkNext());
+            assertSame("Unexpected linkPrev on next delivery",  delivery , deliveryLinkNext.getLinkPrevious());
+        }
+    }
+
+    private DeliveryImpl[] receiveMessagesFromServer(int count) {
+        DeliveryImpl[] deliveries = new DeliveryImpl[count];
+        for(int i = 0; i < count; i++) {
+            deliveries[i] = (DeliveryImpl) receiveMessageFromServer("Message" + i, i);
+        }
+
+        return deliveries;
+    }
+
+    private Delivery receiveMessageFromServer(String deliveryTag, int count)
+    {
+        Delivery delivery = getClient().getConnection().getWorkHead();
+        Receiver clientReceiver = getClient().getReceiver();
+
+        assertTrue(Arrays.equals(deliveryTag.getBytes(StandardCharsets.UTF_8), delivery.getTag()));
+        assertEquals("The received delivery should be on our receiver",
+                clientReceiver, delivery.getLink());
+
+        assertNull(delivery.getLocalState());
+        assertNull(delivery.getRemoteState());
+
+        assertFalse(delivery.isPartial());
+        assertTrue(delivery.isReadable());
+
+        byte[] received = new byte[BUFFER_SIZE];
+        int len = clientReceiver.recv(received, 0, BUFFER_SIZE);
+
+        assertTrue("given array was too small", len < BUFFER_SIZE);
+
+        Message m = Proton.message();
+        m.decode(received, 0, len);
+
+        Object messageBody = ((AmqpValue)m.getBody()).getValue();
+        assertEquals("Unexpected message content", count, messageBody);
+
+        boolean receiverAdvanced = clientReceiver.advance();
+        assertTrue("receiver has not advanced", receiverAdvanced);
+
+        delivery.setContext(count);
+
+        return delivery;
+    }
+
+    private DeliveryImpl[] sendMessagesToClient(int count) {
+        DeliveryImpl[] deliveries = new DeliveryImpl[count];
+        for(int i = 0; i< count; i++) {
+            deliveries[i] = (DeliveryImpl) sendMessageToClient("Message" + i, i);
+        }
+
+        return deliveries;
+    }
+
+    private Delivery sendMessageToClient(String deliveryTag, int messageBody)
+    {
+        byte[] tag = deliveryTag.getBytes(StandardCharsets.UTF_8);
+
+        Message m = Proton.message();
+        m.setBody(new AmqpValue(messageBody));
+
+        byte[] encoded = new byte[BUFFER_SIZE];
+        int len = m.encode(encoded, 0, BUFFER_SIZE);
+
+        assertTrue("given array was too small", len < BUFFER_SIZE);
+
+        Sender serverSender = getServer().getSender();
+        Delivery serverDelivery = serverSender.delivery(tag);
+        int sent = serverSender.send(encoded, 0, len);
+
+        assertEquals("sender unable to send all data at once as assumed for simplicity", len, sent);
+
+        boolean senderAdvanced = serverSender.advance();
+        assertTrue("sender has not advanced", senderAdvanced);
+
+        return serverDelivery;
+    }
+}
diff --git a/proton-j/src/test/java/org/apache/qpid/proton/systemtests/ProtonContainer.java b/proton-j/src/test/java/org/apache/qpid/proton/systemtests/ProtonContainer.java
index d24d1ce..fbe9d6a 100644
--- a/proton-j/src/test/java/org/apache/qpid/proton/systemtests/ProtonContainer.java
+++ b/proton-j/src/test/java/org/apache/qpid/proton/systemtests/ProtonContainer.java
@@ -49,4 +49,92 @@
     {
         this.containerId = containerId;
     }
+
+    public String getContainerId() {
+        return containerId;
+    }
+
+    public void setContainerId(String containerId) {
+        this.containerId = containerId;
+    }
+
+    public Connection getConnection() {
+        return connection;
+    }
+
+    public void setConnection(Connection connection) {
+        this.connection = connection;
+    }
+
+    public Transport getTransport() {
+        return transport;
+    }
+
+    public void setTransport(Transport transport) {
+        this.transport = transport;
+    }
+
+    public Session getSession() {
+        return session;
+    }
+
+    public void setSession(Session session) {
+        this.session = session;
+    }
+
+    public Sender getSender() {
+        return sender;
+    }
+
+    public void setSender(Sender sender) {
+        this.sender = sender;
+    }
+
+    public Receiver getReceiver() {
+        return receiver;
+    }
+
+    public void setReceiver(Receiver receiver) {
+        this.receiver = receiver;
+    }
+
+    public Source getSource() {
+        return source;
+    }
+
+    public void setSource(Source source) {
+        this.source = source;
+    }
+
+    public Target getTarget() {
+        return target;
+    }
+
+    public void setTarget(Target target) {
+        this.target = target;
+    }
+
+    public Delivery getDelivery() {
+        return delivery;
+    }
+
+    public void setDelivery(Delivery delivery) {
+        this.delivery = delivery;
+    }
+
+    public Message getMessage() {
+        return message;
+    }
+
+    public void setMessage(Message message) {
+        this.message = message;
+    }
+
+    public byte[] getMessageData() {
+        return messageData;
+    }
+
+    public void setMessageData(byte[] messageData) {
+        this.messageData = messageData;
+    }
 }
