QPID-8349: [Tests][AMQP 1.0] Improve transaction and transfer tests
diff --git a/systests/protocol-tests-amqp-1-0/src/main/java/org/apache/qpid/tests/protocol/v1_0/Interaction.java b/systests/protocol-tests-amqp-1-0/src/main/java/org/apache/qpid/tests/protocol/v1_0/Interaction.java
index 98ffe45..ab2c959 100644
--- a/systests/protocol-tests-amqp-1-0/src/main/java/org/apache/qpid/tests/protocol/v1_0/Interaction.java
+++ b/systests/protocol-tests-amqp-1-0/src/main/java/org/apache/qpid/tests/protocol/v1_0/Interaction.java
@@ -20,12 +20,6 @@
 
 package org.apache.qpid.tests.protocol.v1_0;
 
-import static org.hamcrest.CoreMatchers.instanceOf;
-import static org.hamcrest.CoreMatchers.notNullValue;
-import static org.hamcrest.MatcherAssert.assertThat;
-import static org.hamcrest.Matchers.equalTo;
-import static org.hamcrest.Matchers.is;
-
 import java.nio.charset.StandardCharsets;
 import java.util.ArrayList;
 import java.util.Arrays;
@@ -117,6 +111,7 @@
     private Map<String, Object> _latestDeliveryApplicationProperties;
     private Map<Class, FrameBody> _latestResponses = new HashMap<>();
     private AtomicLong _receivedDeliveryCount = new AtomicLong();
+    private AtomicLong _coordinatorCredits = new AtomicLong();
 
     Interaction(final FrameTransport frameTransport)
     {
@@ -955,6 +950,12 @@
 
     public Interaction txnAttachCoordinatorLink(InteractionTransactionalState transactionalState) throws Exception
     {
+        return txnAttachCoordinatorLink(transactionalState, Accepted.ACCEPTED_SYMBOL, Rejected.REJECTED_SYMBOL);
+    }
+
+    public Interaction txnAttachCoordinatorLink(final InteractionTransactionalState transactionalState,
+                                                final Symbol... outcomes) throws Exception
+    {
         Attach attach = new Attach();
         attach.setName("testTransactionCoordinator-" + transactionalState.getHandle());
         attach.setHandle(transactionalState.getHandle());
@@ -963,78 +964,99 @@
         attach.setRole(Role.SENDER);
         Source source = new Source();
         attach.setSource(source);
-        source.setOutcomes(Accepted.ACCEPTED_SYMBOL, Rejected.REJECTED_SYMBOL);
+        source.setOutcomes(outcomes);
         sendPerformativeAndChainFuture(attach, _sessionChannel);
         consumeResponse(Attach.class);
-        consumeResponse(Flow.class);
+        final Flow flow = consumeResponse(Flow.class).getLatestResponse(Flow.class);
+        _coordinatorCredits.set(flow.getLinkCredit().longValue());
         return this;
     }
 
     public Interaction txnDeclare(final InteractionTransactionalState txnState) throws Exception
     {
-        Transfer transfer = createTransactionTransfer(txnState.getHandle());
-        transferPayload(transfer, new Declare());
-        sendPerformativeAndChainFuture(transfer, _sessionChannel);
-        consumeResponse(Disposition.class);
-        Disposition declareTransactionDisposition = getLatestResponse(Disposition.class);
-        assertThat(declareTransactionDisposition.getSettled(), is(equalTo(true)));
-        assertThat(declareTransactionDisposition.getState(), is(instanceOf(Declared.class)));
-        Binary transactionId = ((Declared) declareTransactionDisposition.getState()).getTxnId();
-        assertThat(transactionId, is(notNullValue()));
-        consumeResponse(Flow.class);
+        sendPayloadToCoordinator(new Declare(), txnState.getHandle());
+        final DeliveryState state = handleCoordinatorResponse();
+        txnState.setDeliveryState(state);
+        final Binary transactionId = ((Declared) state).getTxnId();
         txnState.setLastTransactionId(transactionId);
         return this;
     }
 
-    public Interaction discharge(final InteractionTransactionalState txnState, final boolean failed) throws Exception
+    public Interaction txnSendDischarge(final InteractionTransactionalState txnState, final boolean failed)
+            throws Exception
     {
         final Discharge discharge = new Discharge();
         discharge.setTxnId(txnState.getCurrentTransactionId());
         discharge.setFail(failed);
-
-        Transfer transfer = createTransactionTransfer(txnState.getHandle());
-        transferPayload(transfer, discharge);
-        sendPerformativeAndChainFuture(transfer, _sessionChannel);
+        sendPayloadToCoordinator(discharge, txnState.getHandle());
         return this;
     }
 
     public Interaction txnDischarge(final InteractionTransactionalState txnState, boolean failed) throws Exception
     {
-        discharge(txnState, failed);
-
-        Disposition declareTransactionDisposition = null;
-        Flow coordinatorFlow = null;
-        do
-        {
-            consumeResponse(Disposition.class, Flow.class);
-            Response<?> response = getLatestResponse();
-            if (response.getBody() instanceof Disposition)
-            {
-                declareTransactionDisposition = (Disposition) response.getBody();
-            }
-            if (response.getBody() instanceof Flow)
-            {
-                final Flow flowResponse = (Flow) response.getBody();
-                if (flowResponse.getHandle().equals(txnState.getHandle()))
-                {
-                    coordinatorFlow = flowResponse;
-                }
-            }
-        } while(declareTransactionDisposition == null || coordinatorFlow == null);
-
-        assertThat(declareTransactionDisposition.getSettled(), is(equalTo(true)));
-        assertThat(declareTransactionDisposition.getState(), is(instanceOf(Accepted.class)));
-
+        txnSendDischarge(txnState, failed);
+        final DeliveryState state = handleCoordinatorResponse();
+        txnState.setDeliveryState(state);
         txnState.setLastTransactionId(null);
         return this;
     }
 
+    private void sendPayloadToCoordinator(final Object payload, final UnsignedInteger handle)
+            throws Exception
+    {
+        final Transfer transfer = createTransactionTransfer(handle);
+        transferPayload(transfer, payload);
+        sendPerformativeAndChainFuture(transfer, _sessionChannel);
+    }
+
+    private DeliveryState handleCoordinatorResponse() throws Exception
+    {
+        final Set<Class<?>> expected = new HashSet<>(Collections.singletonList(Disposition.class));
+
+        if (_coordinatorCredits.decrementAndGet() == 0)
+        {
+            expected.add(Flow.class);
+        }
+
+        final Map<Class<?>, ?> responses = consumeResponses(expected);
+
+        final Disposition disposition = (Disposition) responses.get(Disposition.class);
+        if (expected.contains(Flow.class))
+        {
+            Flow flow = (Flow) responses.get(Flow.class);
+            _coordinatorCredits.set(flow.getLinkCredit().longValue());
+        }
+        if (!Boolean.TRUE.equals(disposition.getSettled()))
+        {
+            throw new IllegalStateException("Coordinator disposition is not settled");
+        }
+        return disposition.getState();
+    }
+
+    private Map<Class<?>, ?> consumeResponses(final Set<Class<?>> responseTypes)
+            throws Exception
+    {
+        Map<Class<?>, Object> results = new HashMap<>();
+        do
+        {
+            Response<?> response = consumeResponse(responseTypes).getLatestResponse();
+            if (response != null && response.getBody() instanceof FrameBody)
+            {
+                Class<?> bodyClass = response.getBody().getClass();
+                results.put(bodyClass, response.getBody());
+            }
+        }
+        while (!results.keySet().containsAll(responseTypes));
+        return results;
+    }
+
     private Transfer createTransactionTransfer(final UnsignedInteger handle)
     {
         Transfer transfer = new Transfer();
         transfer.setHandle(handle);
         transfer.setDeliveryId(getNextDeliveryId());
-        transfer.setDeliveryTag(new Binary(("transaction-" + transfer.getDeliveryId()).getBytes(StandardCharsets.UTF_8)));
+        transfer.setDeliveryTag(new Binary(("transaction-"
+                                            + transfer.getDeliveryId()).getBytes(StandardCharsets.UTF_8)));
         return transfer;
     }
 
diff --git a/systests/protocol-tests-amqp-1-0/src/main/java/org/apache/qpid/tests/protocol/v1_0/InteractionTransactionalState.java b/systests/protocol-tests-amqp-1-0/src/main/java/org/apache/qpid/tests/protocol/v1_0/InteractionTransactionalState.java
index 061be92..b0832cc 100644
--- a/systests/protocol-tests-amqp-1-0/src/main/java/org/apache/qpid/tests/protocol/v1_0/InteractionTransactionalState.java
+++ b/systests/protocol-tests-amqp-1-0/src/main/java/org/apache/qpid/tests/protocol/v1_0/InteractionTransactionalState.java
@@ -21,12 +21,14 @@
 package org.apache.qpid.tests.protocol.v1_0;
 
 import org.apache.qpid.server.protocol.v1_0.type.Binary;
+import org.apache.qpid.server.protocol.v1_0.type.DeliveryState;
 import org.apache.qpid.server.protocol.v1_0.type.UnsignedInteger;
 
 public class InteractionTransactionalState
 {
     private final UnsignedInteger _handle;
     private Binary _lastTransactionId;
+    private DeliveryState _deliveryState;
 
     public InteractionTransactionalState(final UnsignedInteger handle)
     {
@@ -47,4 +49,14 @@
     {
         return _lastTransactionId;
     }
+
+    public DeliveryState getDeliveryState()
+    {
+        return _deliveryState;
+    }
+
+    public void setDeliveryState(final DeliveryState deliveryState)
+    {
+        _deliveryState = deliveryState;
+    }
 }
diff --git a/systests/protocol-tests-amqp-1-0/src/main/java/org/apache/qpid/tests/protocol/v1_0/Utils.java b/systests/protocol-tests-amqp-1-0/src/main/java/org/apache/qpid/tests/protocol/v1_0/Utils.java
index e846b5f..2ac50f4 100644
--- a/systests/protocol-tests-amqp-1-0/src/main/java/org/apache/qpid/tests/protocol/v1_0/Utils.java
+++ b/systests/protocol-tests-amqp-1-0/src/main/java/org/apache/qpid/tests/protocol/v1_0/Utils.java
@@ -20,6 +20,7 @@
 
 package org.apache.qpid.tests.protocol.v1_0;
 
+import static java.nio.charset.StandardCharsets.UTF_8;
 import static org.hamcrest.Matchers.greaterThan;
 import static org.hamcrest.Matchers.is;
 import static org.junit.Assume.assumeThat;
@@ -28,6 +29,7 @@
 import java.util.stream.IntStream;
 
 import org.apache.qpid.server.bytebuffer.QpidByteBuffer;
+import org.apache.qpid.server.protocol.v1_0.type.Binary;
 import org.apache.qpid.server.protocol.v1_0.type.UnsignedInteger;
 import org.apache.qpid.server.protocol.v1_0.type.messaging.Accepted;
 import org.apache.qpid.server.protocol.v1_0.type.messaging.Header;
@@ -37,6 +39,7 @@
 import org.apache.qpid.server.protocol.v1_0.type.transport.Flow;
 import org.apache.qpid.server.protocol.v1_0.type.transport.Open;
 import org.apache.qpid.server.protocol.v1_0.type.transport.Role;
+import org.apache.qpid.server.protocol.v1_0.type.transport.SenderSettleMode;
 import org.apache.qpid.tests.utils.BrokerAdmin;
 
 public class Utils
@@ -83,7 +86,7 @@
                        .attachSourceAddress(queueName)
                        .attach().consumeResponse()
                        .flowIncomingWindow(UnsignedInteger.ONE)
-                       .flowNextIncomingId(UnsignedInteger.ZERO)
+                       .flowNextIncomingIdFromPeerLatestSessionBeginAndDeliveryCount()
                        .flowOutgoingWindow(UnsignedInteger.ZERO)
                        .flowNextOutgoingId(UnsignedInteger.ZERO)
                        .flowLinkCredit(UnsignedInteger.ONE)
@@ -157,6 +160,7 @@
                                              .begin().consumeResponse(Begin.class)
                                              .attachRole(Role.SENDER)
                                              .attachTargetAddress(BrokerAdmin.TEST_QUEUE_NAME)
+                                             .attachSndSettleMode(SenderSettleMode.SETTLED)
                                              .attach().consumeResponse(Attach.class)
                                              .consumeResponse(Flow.class)
                                              .getLatestResponse(Flow.class);
@@ -166,14 +170,21 @@
                                          message.length),
                            flow.getLinkCredit().intValue(),
                            is(greaterThan(message.length)));
+
+                int tag = 0;
                 for (String payload : message)
                 {
                     interaction.transferPayloadData(payload)
                                .transferSettled(true)
+                               .transferDeliveryId()
+                               .transferDeliveryTag(new Binary(String.valueOf(tag).getBytes(UTF_8)))
                                .transfer()
                                .sync();
+                    tag++;
                 }
+                interaction.doCloseConnection();
             }
         }
     }
+
 }
diff --git a/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/DecodeErrorTest.java b/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/DecodeErrorTest.java
index 1ca4419..b079122 100644
--- a/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/DecodeErrorTest.java
+++ b/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/DecodeErrorTest.java
@@ -24,14 +24,18 @@
 import static org.hamcrest.CoreMatchers.equalTo;
 import static org.hamcrest.CoreMatchers.notNullValue;
 import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.Matchers.greaterThan;
 import static org.hamcrest.Matchers.is;
 import static org.junit.Assert.fail;
+import static org.junit.Assume.assumeThat;
 
 import java.net.InetSocketAddress;
 import java.util.ArrayList;
 import java.util.Collections;
 import java.util.List;
+import java.util.Map;
 
+import org.hamcrest.Matchers;
 import org.junit.Before;
 import org.junit.Test;
 
@@ -39,6 +43,8 @@
 import org.apache.qpid.server.protocol.v1_0.codec.StringWriter;
 import org.apache.qpid.server.protocol.v1_0.type.Symbol;
 import org.apache.qpid.server.protocol.v1_0.type.UnsignedInteger;
+import org.apache.qpid.server.protocol.v1_0.type.messaging.AmqpValue;
+import org.apache.qpid.server.protocol.v1_0.type.messaging.AmqpValueSection;
 import org.apache.qpid.server.protocol.v1_0.type.messaging.DeliveryAnnotations;
 import org.apache.qpid.server.protocol.v1_0.type.messaging.DeliveryAnnotationsSection;
 import org.apache.qpid.server.protocol.v1_0.type.messaging.Header;
@@ -53,11 +59,13 @@
 import org.apache.qpid.server.protocol.v1_0.type.transport.Error;
 import org.apache.qpid.server.protocol.v1_0.type.transport.Flow;
 import org.apache.qpid.server.protocol.v1_0.type.transport.Open;
+import org.apache.qpid.server.protocol.v1_0.type.transport.ReceiverSettleMode;
 import org.apache.qpid.server.protocol.v1_0.type.transport.Role;
 import org.apache.qpid.tests.protocol.Response;
 import org.apache.qpid.tests.protocol.SpecificationTest;
 import org.apache.qpid.tests.utils.BrokerAdmin;
 import org.apache.qpid.tests.utils.BrokerAdminUsingTestBase;
+import org.apache.qpid.tests.utils.BrokerSpecific;
 
 public class DecodeErrorTest extends BrokerAdminUsingTestBase
 {
@@ -74,44 +82,46 @@
     @SpecificationTest(section = "3.2",
             description = "Altogether a message consists of the following sections: Zero or one header,"
                           + " Zero or one delivery-annotations, [...]")
-    public void illegalMessageFormatPayload() throws Exception
+    @BrokerSpecific(kind = BrokerAdmin.KIND_BROKER_J)
+    public void illegalMessage() throws Exception
     {
         try (FrameTransport transport = new FrameTransport(_brokerAddress).connect())
         {
+            final Interaction interaction = transport.newInteraction();
+            final Attach attach = interaction.negotiateProtocol()
+                                             .consumeResponse()
+                                             .open()
+                                             .consumeResponse(Open.class)
+                                             .begin()
+                                             .consumeResponse(Begin.class)
+                                             .attachRole(Role.SENDER)
+                                             .attachTargetAddress(BrokerAdmin.TEST_QUEUE_NAME)
+                                             .attachRcvSettleMode(ReceiverSettleMode.SECOND)
+                                             .attach()
+                                             .consumeResponse(Attach.class)
+                                             .getLatestResponse(Attach.class);
+            assumeThat(attach.getRcvSettleMode(), is(equalTo(ReceiverSettleMode.SECOND)));
 
-            List<QpidByteBuffer> payloads = new ArrayList<>();
-            final HeaderSection headerSection = new Header().createEncodingRetainingSection();
-            payloads.add(headerSection.getEncodedForm());
-            headerSection.dispose();
-            final StringWriter stringWriter = new StringWriter("string in between annotation sections");
-            QpidByteBuffer encodedString = QpidByteBuffer.allocate(stringWriter.getEncodedSize());
-            stringWriter.writeToBuffer(encodedString);
-            encodedString.flip();
-            payloads.add(encodedString);
-            final DeliveryAnnotationsSection
-                    deliveryAnnotationsSection =
-                    new DeliveryAnnotations(Collections.emptyMap()).createEncodingRetainingSection();
-            payloads.add(deliveryAnnotationsSection.getEncodedForm());
-            deliveryAnnotationsSection.dispose();
+            final Flow flow = interaction.consumeResponse(Flow.class).getLatestResponse(Flow.class);
+            assumeThat(flow.getLinkCredit(), is(greaterThan(UnsignedInteger.ZERO)));
 
-            final Detach detachResponse;
-            try (QpidByteBuffer combinedPayload = QpidByteBuffer.concatenate(payloads))
+            final List<QpidByteBuffer> payloads = buildInvalidMessage();
+            try
             {
-                detachResponse = transport.newInteraction()
-                                          .negotiateProtocol().consumeResponse()
-                                          .open().consumeResponse(Open.class)
-                                          .begin().consumeResponse(Begin.class)
-                                          .attachRole(Role.SENDER)
-                                          .attachTargetAddress(BrokerAdmin.TEST_QUEUE_NAME)
-                                          .attach().consumeResponse(Attach.class)
-                                          .consumeResponse(Flow.class)
-                                          .transferMessageFormat(UnsignedInteger.ZERO)
-                                          .transferPayload(combinedPayload)
-                                          .transfer()
-                                          .consumeResponse()
-                                          .getLatestResponse(Detach.class);
+                try (QpidByteBuffer combinedPayload = QpidByteBuffer.concatenate(payloads))
+                {
+                    interaction.transferMessageFormat(UnsignedInteger.ZERO)
+                               .transferPayload(combinedPayload)
+                               .transfer();
+                }
             }
-            payloads.forEach(QpidByteBuffer::dispose);
+            finally
+            {
+                payloads.forEach(QpidByteBuffer::dispose);
+            }
+
+            final Detach detachResponse = interaction.consumeResponse()
+                                                     .getLatestResponse(Detach.class);
             assertThat(detachResponse.getError(), is(notNullValue()));
             assertThat(detachResponse.getError().getCondition(), is(equalTo(DECODE_ERROR)));
         }
@@ -148,6 +158,10 @@
             {
                 error = ((Close) responseBody).getError();
             }
+            else if (responseBody instanceof Detach)
+            {
+                error = ((Detach) responseBody).getError();
+            }
             else
             {
                 fail(String.format("Expected response of either Detach, End, or Close. Got '%s'", responseBody));
@@ -200,4 +214,51 @@
             assertThat(error.getCondition(), is(equalTo(DECODE_ERROR)));
         }
     }
+
+    private List<QpidByteBuffer> buildInvalidMessage()
+    {
+        final List<QpidByteBuffer> payloads = new ArrayList<>();
+        final Header header = new Header();
+        header.setTtl(UnsignedInteger.valueOf(1000L));
+        final HeaderSection headerSection = header.createEncodingRetainingSection();
+        try
+        {
+            payloads.add(headerSection.getEncodedForm());
+        }
+        finally
+        {
+            headerSection.dispose();
+        }
+
+        final StringWriter stringWriter = new StringWriter("string in between annotation sections");
+        QpidByteBuffer encodedString = QpidByteBuffer.allocate(stringWriter.getEncodedSize());
+        stringWriter.writeToBuffer(encodedString);
+        encodedString.flip();
+        payloads.add(encodedString);
+
+        final Map<Symbol, Object> annoationMap = Collections.singletonMap(Symbol.valueOf("foo"), "bar");
+        final DeliveryAnnotations annotations = new DeliveryAnnotations(annoationMap);
+        final DeliveryAnnotationsSection deliveryAnnotationsSection = annotations.createEncodingRetainingSection();
+        try
+        {
+
+            payloads.add(deliveryAnnotationsSection.getEncodedForm());
+        }
+        finally
+        {
+            deliveryAnnotationsSection.dispose();
+        }
+
+        final AmqpValueSection payload = new AmqpValue(getTestName()).createEncodingRetainingSection();
+        try
+        {
+            payloads.add(payload.getEncodedForm());
+        }
+        finally
+        {
+            payload.dispose();
+        }
+        return payloads;
+    }
+
 }
diff --git a/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/extensions/anonymousterminus/AnonymousTerminusTest.java b/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/extensions/anonymousterminus/AnonymousTerminusTest.java
index 1878950..a7ef076 100644
--- a/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/extensions/anonymousterminus/AnonymousTerminusTest.java
+++ b/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/extensions/anonymousterminus/AnonymousTerminusTest.java
@@ -46,8 +46,6 @@
 import org.apache.qpid.server.protocol.v1_0.type.messaging.Properties;
 import org.apache.qpid.server.protocol.v1_0.type.messaging.Rejected;
 import org.apache.qpid.server.protocol.v1_0.type.messaging.Target;
-import org.apache.qpid.server.protocol.v1_0.type.transaction.Coordinator;
-import org.apache.qpid.server.protocol.v1_0.type.transaction.Discharge;
 import org.apache.qpid.server.protocol.v1_0.type.transaction.TransactionError;
 import org.apache.qpid.server.protocol.v1_0.type.transaction.TransactionalState;
 import org.apache.qpid.server.protocol.v1_0.type.transport.AmqpError;
@@ -104,6 +102,7 @@
                        .attach().consumeResponse(Attach.class)
                        .consumeResponse(Flow.class)
 
+                       .transferDeliveryId()
                        .transferPayload(generateMessagePayloadToDestination(BrokerAdmin.TEST_QUEUE_NAME))
                        .transferSettled(Boolean.TRUE)
                        .transferDeliveryTag(_deliveryTag)
@@ -139,12 +138,13 @@
                        .attach().consumeResponse(Attach.class)
                        .consumeResponse(Flow.class)
 
+                       .transferDeliveryId()
                        .transferPayload(generateMessagePayloadToDestination("Unknown"))
                        .transferSettled(Boolean.TRUE)
                        .transferDeliveryTag(_deliveryTag)
                        .transfer();
 
-            Detach detach = interaction.consumeResponse().getLatestResponse(Detach.class);
+            Detach detach = interaction.consumeResponse(Detach.class).getLatestResponse(Detach.class);
             Error error = detach.getError();
             assertThat(error, is(notNullValue()));
             assertThat(error.getCondition(), is(equalTo(AmqpError.NOT_FOUND)));
@@ -179,6 +179,7 @@
                        .attach().consumeResponse(Attach.class)
                        .consumeResponse(Flow.class)
 
+                       .transferDeliveryId()
                        .transferPayload(generateMessagePayloadToDestination("Unknown"))
                        .transferDeliveryTag(_deliveryTag)
                        .transfer()
@@ -226,6 +227,7 @@
                        .attach().consumeResponse(Attach.class)
                        .consumeResponse(Flow.class)
 
+                       .transferDeliveryId()
                        .transferPayload(generateMessagePayloadToDestination("Unknown"))
                        .transferDeliveryTag(_deliveryTag)
                        .transfer();
@@ -262,6 +264,7 @@
                        .attach().consumeResponse(Attach.class)
                        .consumeResponse(Flow.class)
 
+                       .transferDeliveryId()
                        .transferHandle(linkHandle)
                        .transferPayload(generateMessagePayloadToDestination(BrokerAdmin.TEST_QUEUE_NAME))
                        .transferDeliveryTag(_deliveryTag)
@@ -271,6 +274,8 @@
 
                        .txnDischarge(txnState, false);
 
+            assertThat(txnState.getDeliveryState(), is(instanceOf(Accepted.class)));
+
             Object receivedMessage = Utils.receiveMessage(_brokerAddress, BrokerAdmin.TEST_QUEUE_NAME);
             assertThat(receivedMessage, is(equalTo(TEST_MESSAGE_CONTENT)));
         }
@@ -295,6 +300,7 @@
                        .attach().consumeResponse(Attach.class)
                        .consumeResponse(Flow.class)
 
+                       .transferDeliveryId()
                        .transferHandle(linkHandle)
                        .transferPayload(generateMessagePayloadToDestination(BrokerAdmin.TEST_QUEUE_NAME))
                        .transferDeliveryTag(_deliveryTag)
@@ -314,6 +320,8 @@
 
             interaction.txnDischarge(txnState, false);
 
+            assertThat(txnState.getDeliveryState(), is(instanceOf(Accepted.class)));
+
             Object receivedMessage = Utils.receiveMessage(_brokerAddress, BrokerAdmin.TEST_QUEUE_NAME);
             assertThat(receivedMessage, is(equalTo(TEST_MESSAGE_CONTENT)));
         }
@@ -339,6 +347,7 @@
                        .attach().consumeResponse(Attach.class)
                        .consumeResponse(Flow.class)
 
+                       .transferDeliveryId()
                        .transferHandle(linkHandle)
                        .transferPayload(generateMessagePayloadToDestination("Unknown"))
                        .transferDeliveryTag(_deliveryTag)
@@ -362,6 +371,8 @@
             assertThat(rejectedError.getInfo().get(DELIVERY_TAG), is(equalTo(_deliveryTag)));
 
             interaction.txnDischarge(txnState, false);
+
+            assertThat(txnState.getDeliveryState(), is(instanceOf(Accepted.class)));
         }
     }
 
@@ -385,6 +396,7 @@
                        .attach().consumeResponse(Attach.class)
                        .consumeResponse(Flow.class)
 
+                       .transferDeliveryId()
                        .transferHandle(linkHandle)
                        .transferPayload(generateMessagePayloadToDestination("Unknown"))
                        .transferDeliveryId(UnsignedInteger.valueOf(1))
@@ -400,23 +412,11 @@
             assertThat(senderLinkDetachError.getInfo(), is(notNullValue()));
             assertThat(senderLinkDetachError.getInfo().get(DELIVERY_TAG), is(equalTo(_deliveryTag)));
 
-            final Discharge discharge = new Discharge();
-            discharge.setTxnId(txnState.getCurrentTransactionId());
-            discharge.setFail(false);
+            interaction.txnDischarge(txnState, false);
 
-            interaction.transferHandle(txnState.getHandle())
-                       .transferSettled(Boolean.FALSE)
-                       .transferDeliveryId(UnsignedInteger.valueOf(2))
-                       .transferDeliveryTag(new Binary(("transaction-" + 2).getBytes(StandardCharsets.UTF_8)))
-                       .transferPayloadData(discharge).transfer();
-
-            Disposition dischargeTransactionDisposition =
-                    getDispositionForDeliveryId(interaction, UnsignedInteger.valueOf(2));
-
-            assertThat(dischargeTransactionDisposition.getSettled(), is(equalTo(true)));
-            assertThat(dischargeTransactionDisposition.getState(), is(instanceOf(Rejected.class)));
-
-            Rejected rejected = (Rejected) dischargeTransactionDisposition.getState();
+            DeliveryState txnDischargeDeliveryState = txnState.getDeliveryState();
+            assertThat(txnDischargeDeliveryState, is(instanceOf(Rejected.class)));
+            Rejected rejected = (Rejected) txnDischargeDeliveryState;
             Error error = rejected.getError();
 
             assertThat(error, is(notNullValue()));
@@ -468,30 +468,20 @@
                        .attach().consumeResponse(Attach.class)
                        .consumeResponse(Flow.class)
 
+                       .transferDeliveryId()
                        .transferHandle(linkHandle)
                        .transferPayload(generateMessagePayloadToDestination("Unknown"))
                        .transferDeliveryTag(_deliveryTag)
                        .transferTransactionalState(txnState.getCurrentTransactionId())
                        .transferSettled(Boolean.TRUE)
-                       .transferDeliveryId(UnsignedInteger.valueOf(1))
                        .transfer();
 
-            final Discharge discharge = new Discharge();
-            discharge.setTxnId(txnState.getCurrentTransactionId());
-            discharge.setFail(false);
-            interaction.transferHandle(txnState.getHandle())
-                       .transferDeliveryId(UnsignedInteger.valueOf(2))
-                       .transferSettled(Boolean.FALSE)
-                       .transferDeliveryTag(new Binary(("transaction-" + 2).getBytes(StandardCharsets.UTF_8)))
-                       .transferPayloadData(discharge).transfer();
+            interaction.txnDischarge(txnState, false);
 
-            Disposition dischargeTransactionDisposition =
-                    getDispositionForDeliveryId(interaction, UnsignedInteger.valueOf(2));
+            DeliveryState txDischargeDeliveryState = txnState.getDeliveryState();
+            assertThat(txDischargeDeliveryState, is(instanceOf(Rejected.class)));
 
-            assertThat(dischargeTransactionDisposition.getSettled(), is(equalTo(true)));
-            assertThat(dischargeTransactionDisposition.getState(), is(instanceOf(Rejected.class)));
-
-            Rejected rejected = (Rejected) dischargeTransactionDisposition.getState();
+            Rejected rejected = (Rejected) txDischargeDeliveryState;
             Error error = rejected.getError();
 
             assertThat(error, is(notNullValue()));
@@ -536,14 +526,7 @@
             interaction.begin()
                        .consumeResponse(Begin.class)
 
-                       .attachRole(Role.SENDER)
-                       .attachName("testTransactionCoordinator-" + txnState.getHandle())
-                       .attachHandle(txnState.getHandle())
-                       .attachInitialDeliveryCount(UnsignedInteger.ZERO)
-                       .attachTarget(new Coordinator())
-                       .attachSourceOutcomes(Accepted.ACCEPTED_SYMBOL)
-                       .attach().consumeResponse(Attach.class)
-                       .consumeResponse(Flow.class)
+                       .txnAttachCoordinatorLink(txnState, Accepted.ACCEPTED_SYMBOL)
                        .txnDeclare(txnState)
 
                        .attachRole(Role.SENDER)
@@ -554,22 +537,14 @@
                        .attach().consumeResponse(Attach.class)
                        .consumeResponse(Flow.class)
 
+                       .transferDeliveryId()
                        .transferHandle(linkHandle)
                        .transferPayload(generateMessagePayloadToDestination("Unknown"))
                        .transferDeliveryTag(_deliveryTag)
                        .transferTransactionalState(txnState.getCurrentTransactionId())
                        .transferSettled(Boolean.TRUE)
-                       .transfer();
-
-            final Discharge discharge = new Discharge();
-            discharge.setTxnId(txnState.getCurrentTransactionId());
-            discharge.setFail(false);
-
-            interaction.transferHandle(txnState.getHandle())
-                       .transferSettled(Boolean.FALSE)
-                       .transferDeliveryId(UnsignedInteger.valueOf(4))
-                       .transferDeliveryTag(new Binary(("transaction-" + 4).getBytes(StandardCharsets.UTF_8)))
-                       .transferPayloadData(discharge).transfer();
+                       .transfer()
+                       .txnSendDischarge(txnState, false);
 
             Detach transactionCoordinatorDetach = interaction.consumeResponse().getLatestResponse(Detach.class);
             Error transactionCoordinatorDetachError = transactionCoordinatorDetach.getError();
diff --git a/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/extensions/qpid/queue/QueueDeletionTest.java b/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/extensions/qpid/queue/QueueDeletionTest.java
index d6eb3d1..39a8a9b 100644
--- a/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/extensions/qpid/queue/QueueDeletionTest.java
+++ b/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/extensions/qpid/queue/QueueDeletionTest.java
@@ -171,7 +171,7 @@
             assertThat(receivedDetach.getError().getCondition(), is(AmqpError.RESOURCE_DELETED));
             assertThat(receivedDetach.getHandle(), is(equalTo(attach.getHandle())));
 
-            interaction.discharge(txnState, false);
+            interaction.txnSendDischarge(txnState, false);
 
             assertTransactionRollbackOnly(interaction, txnState);
         }
@@ -244,7 +244,7 @@
             assertThat(receivedDetach.getError().getCondition(), is(AmqpError.RESOURCE_DELETED));
             assertThat(receivedDetach.getHandle(), is(equalTo(attach.getHandle())));
 
-            interaction.discharge(txnState, false);
+            interaction.txnSendDischarge(txnState, false);
 
             assertTransactionRollbackOnly(interaction, txnState);
         }
diff --git a/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/messaging/MessageFormat.java b/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/messaging/MessageFormat.java
index 1cf1ff0..6ac8e7f 100644
--- a/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/messaging/MessageFormat.java
+++ b/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/messaging/MessageFormat.java
@@ -70,7 +70,7 @@
     {
         try (FrameTransport transport = new FrameTransport(_brokerAddress).connect())
         {
-            QpidByteBuffer[] payloads = Utils.splitPayload("testData", 2);
+            QpidByteBuffer[] payloads = Utils.splitPayload(getTestName(), 2);
 
             final Response<?> latestResponse = transport.newInteraction()
                                                         .negotiateProtocol().consumeResponse()
diff --git a/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/messaging/MultiTransferTest.java b/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/messaging/MultiTransferTest.java
index ed9e58d..3b421c7 100644
--- a/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/messaging/MultiTransferTest.java
+++ b/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/messaging/MultiTransferTest.java
@@ -49,6 +49,7 @@
 import org.apache.qpid.server.protocol.v1_0.type.transport.Flow;
 import org.apache.qpid.server.protocol.v1_0.type.transport.Open;
 import org.apache.qpid.server.protocol.v1_0.type.transport.Role;
+import org.apache.qpid.tests.protocol.ChannelClosedResponse;
 import org.apache.qpid.tests.protocol.v1_0.FrameTransport;
 import org.apache.qpid.tests.protocol.v1_0.Interaction;
 import org.apache.qpid.tests.protocol.SpecificationTest;
@@ -229,9 +230,11 @@
             {
                 payload.dispose();
             }
-            Response<?> latestResponse = interaction.consumeResponse(new Class<?>[] {null}).getLatestResponse();
-            assertThat(latestResponse, is(nullValue()));
+            interaction.consumeResponse(null, Flow.class);
         }
+        String secondMessage = getTestName() + "_2";
+        Utils.putMessageOnQueue(getBrokerAdmin(), BrokerAdmin.TEST_QUEUE_NAME, secondMessage);
+        assertThat(Utils.receiveMessage(_brokerAddress, BrokerAdmin.TEST_QUEUE_NAME), is(equalTo(secondMessage)));
     }
 
     @Test
@@ -387,7 +390,7 @@
                 payload.dispose();
             }
 
-            interaction.consumeResponse(Detach.class, End.class, Close.class);
+            interaction.consumeResponse(Detach.class, End.class, Close.class, ChannelClosedResponse.class);
         }
     }
 }
diff --git a/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/messaging/TransferTest.java b/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/messaging/TransferTest.java
index 7950741..42f5dfc 100644
--- a/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/messaging/TransferTest.java
+++ b/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/messaging/TransferTest.java
@@ -38,6 +38,7 @@
 
 import java.net.InetSocketAddress;
 import java.nio.charset.StandardCharsets;
+import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.List;
 import java.util.TreeSet;
@@ -62,7 +63,6 @@
 import org.apache.qpid.server.protocol.v1_0.type.messaging.Header;
 import org.apache.qpid.server.protocol.v1_0.type.messaging.Received;
 import org.apache.qpid.server.protocol.v1_0.type.messaging.Rejected;
-import org.apache.qpid.server.protocol.v1_0.type.transaction.Discharge;
 import org.apache.qpid.server.protocol.v1_0.type.transport.AmqpError;
 import org.apache.qpid.server.protocol.v1_0.type.transport.Attach;
 import org.apache.qpid.server.protocol.v1_0.type.transport.Begin;
@@ -78,6 +78,7 @@
 import org.apache.qpid.server.protocol.v1_0.type.transport.Role;
 import org.apache.qpid.server.protocol.v1_0.type.transport.SenderSettleMode;
 import org.apache.qpid.server.protocol.v1_0.type.transport.Transfer;
+import org.apache.qpid.tests.protocol.ChannelClosedResponse;
 import org.apache.qpid.tests.protocol.Response;
 import org.apache.qpid.tests.protocol.SpecificationTest;
 import org.apache.qpid.tests.protocol.v1_0.FrameTransport;
@@ -121,13 +122,13 @@
 
     @Test
     @SpecificationTest(section = "1.3.4",
-            description = "Transfer without mandatory fields should result in a decoding error.")
+            description = "mandatory [...] a non null value for the field is always encoded.")
     public void emptyTransfer() throws Exception
     {
         try (FrameTransport transport = new FrameTransport(_brokerAddress).connect())
         {
-            Close responseClose = transport.newInteraction()
-                                           .negotiateProtocol().consumeResponse()
+            Interaction interact = transport.newInteraction();
+            Response<?> response = interact.negotiateProtocol().consumeResponse()
                                            .open().consumeResponse(Open.class)
                                            .begin().consumeResponse(Begin.class)
                                            .attachRole(Role.SENDER)
@@ -136,9 +137,27 @@
                                            .transferHandle(null)
                                            .transfer()
                                            .consumeResponse()
-                                           .getLatestResponse(Close.class);
-            assertThat(responseClose.getError(), is(notNullValue()));
-            assertThat(responseClose.getError().getCondition(), equalTo(AmqpError.DECODE_ERROR));
+                                           .getLatestResponse();
+
+            assertThat(response.getBody(), is(notNullValue()));
+
+            if (response.getBody() instanceof Close)
+            {
+                final Close responseClose = (Close)response.getBody();
+                assertThat(responseClose.getError(), is(notNullValue()));
+                assertThat(responseClose.getError().getCondition(), equalTo(AmqpError.DECODE_ERROR));
+
+                interact.close().sync();
+            }
+            else if (response.getBody() instanceof End)
+            {
+                final End responseEnd = (End)response.getBody();
+                assertThat(responseEnd.getError(), is(notNullValue()));
+                assertThat(responseEnd.getError().getCondition(), equalTo(AmqpError.DECODE_ERROR));
+
+                interact.end().doCloseConnection();
+            }
+            transport.assertNoMoreResponses();
         }
     }
 
@@ -158,10 +177,11 @@
                                                  .attachTargetAddress(BrokerAdmin.TEST_QUEUE_NAME)
                                                  .attach().consumeResponse(Attach.class)
                                                  .consumeResponse(Flow.class)
+                                                 .transferDeliveryId()
                                                  .transferDeliveryTag(null)
                                                  .transferPayloadData(getTestName())
                                                  .transfer();
-            interaction.consumeResponse(Detach.class, End.class, Close.class);
+            interaction.consumeResponse(Detach.class, End.class, Close.class, ChannelClosedResponse.class);
         }
     }
 
@@ -182,6 +202,7 @@
                                                        .attachHandle(linkHandle)
                                                        .attach().consumeResponse(Attach.class)
                                                        .consumeResponse(Flow.class)
+                                                       .transferDeliveryId()
                                                        .transferHandle(linkHandle)
                                                        .transferPayloadData(getTestName())
                                                        .transfer()
@@ -283,12 +304,18 @@
 
     @Test
     @SpecificationTest(section = "2.7.5",
-            description = "If the negotiated link value is first, then it is illegal to set this field to second.")
+            description = "rcv-settle-mode "
+                          + "If first, this indicates that the receiver MUST settle the delivery once it has arrived"
+                          + " without waiting for the sender to settle first."
+                          + " If second, this indicates that the receiver MUST NOT settle until sending its disposition"
+                          + " to the sender and receiving a settled disposition from the sender."
+                          + " If not set, this value is defaulted to the value negotiated on link attach."
+                          + " If the negotiated link value is first, then it is illegal to set this field to second.")
     public void transferReceiverSettleModeCannotBeSecondWhenLinkModeIsFirst() throws Exception
     {
         try (FrameTransport transport = new FrameTransport(_brokerAddress).connect())
         {
-            Detach detach = transport.newInteraction()
+            Response<?> response = transport.newInteraction()
                                      .negotiateProtocol().consumeResponse()
                                      .open().consumeResponse(Open.class)
                                      .begin().consumeResponse(Begin.class)
@@ -301,10 +328,24 @@
                                      .transferRcvSettleMode(ReceiverSettleMode.SECOND)
                                      .transfer()
                                      .consumeResponse()
-                                     .getLatestResponse(Detach.class);
-            Error error = detach.getError();
-            assertThat(error, is(notNullValue()));
-            assertThat(error.getCondition(), is(equalTo(AmqpError.INVALID_FIELD)));
+                                     .getLatestResponse();
+
+            if (response.getBody() instanceof Detach)
+            {
+                final Detach detach = (Detach) response.getBody();
+                Error error = detach.getError();
+                assertThat(error, is(notNullValue()));
+                assertThat(error.getCondition(), is(equalTo(AmqpError.INVALID_FIELD)));
+            }
+            else
+            {
+                if (response.getBody() instanceof Disposition)
+                {
+                    // clean up
+                    Utils.receiveMessage(_brokerAddress, BrokerAdmin.TEST_QUEUE_NAME);
+                }
+                fail("it is illegal to set transfer 'rcv-settle-mode' to 'second' when link 'rcv-settle-mode' is set to 'first'");
+            }
         }
     }
 
@@ -366,6 +407,7 @@
                                                                                    Rejected.REJECTED_SYMBOL)
                                                              .attach().consumeResponse(Attach.class)
                                                              .consumeResponse(Flow.class)
+                                                             .transferDeliveryId()
                                                              .transferPayload(messageEncoder.getPayload())
                                                              .transferRcvSettleMode(ReceiverSettleMode.FIRST)
                                                              .transfer()
@@ -411,6 +453,7 @@
                                                   .attachSourceOutcomes(Accepted.ACCEPTED_SYMBOL)
                                                   .attach().consumeResponse(Attach.class)
                                                   .consumeResponse(Flow.class)
+                                                  .transferDeliveryId()
                                                   .transferPayload(messageEncoder.getPayload())
                                                   .transferRcvSettleMode(ReceiverSettleMode.FIRST)
                                                   .transfer()
@@ -453,7 +496,7 @@
                                                      .attachSourceAddress(BrokerAdmin.TEST_QUEUE_NAME)
                                                      .attach().consumeResponse()
                                                      .flowIncomingWindow(UnsignedInteger.ONE)
-                                                     .flowNextIncomingId(UnsignedInteger.ZERO)
+                                                     .flowNextIncomingIdFromPeerLatestSessionBeginAndDeliveryCount()
                                                      .flowOutgoingWindow(UnsignedInteger.ZERO)
                                                      .flowNextOutgoingId(UnsignedInteger.ZERO)
                                                      .flowLinkCredit(UnsignedInteger.ONE)
@@ -493,7 +536,7 @@
                                                      .attachRcvSettleMode(ReceiverSettleMode.FIRST)
                                                      .attach().consumeResponse()
                                                      .flowIncomingWindow(UnsignedInteger.ONE)
-                                                     .flowNextIncomingId(UnsignedInteger.ZERO)
+                                                     .flowNextIncomingIdFromPeerLatestSessionBeginAndDeliveryCount()
                                                      .flowOutgoingWindow(UnsignedInteger.ZERO)
                                                      .flowNextOutgoingId(UnsignedInteger.ZERO)
                                                      .flowLinkCredit(UnsignedInteger.ONE)
@@ -519,8 +562,6 @@
     @SpecificationTest(section = "2.6.12", description = "Transferring A Message.")
     public void receiveTransferReceiverSettleSecond() throws Exception
     {
-        Utils.putMessageOnQueue(getBrokerAdmin(), BrokerAdmin.TEST_QUEUE_NAME, getTestName());
-
         try (FrameTransport transport = new FrameTransport(_brokerAddress).connect())
         {
             final Interaction interaction = transport.newInteraction()
@@ -532,19 +573,22 @@
                                                      .attachRcvSettleMode(ReceiverSettleMode.SECOND)
                                                      .attach().consumeResponse()
                                                      .flowIncomingWindow(UnsignedInteger.ONE)
-                                                     .flowNextIncomingId(UnsignedInteger.ZERO)
+                                                     .flowNextIncomingIdFromPeerLatestSessionBeginAndDeliveryCount()
                                                      .flowOutgoingWindow(UnsignedInteger.ZERO)
                                                      .flowNextOutgoingId(UnsignedInteger.ZERO)
                                                      .flowLinkCredit(UnsignedInteger.ONE)
                                                      .flowHandleFromLinkHandle()
-                                                     .flow()
-                                                     .receiveDelivery()
-                                                     .decodeLatestDelivery();
+                                                     .flow();
 
-            Object data = interaction.getDecodedLatestDelivery();
+            Utils.putMessageOnQueue(getBrokerAdmin(), BrokerAdmin.TEST_QUEUE_NAME, getTestName());
+
+            Object data = interaction.receiveDelivery()
+                                     .decodeLatestDelivery()
+                                     .getDecodedLatestDelivery();
             assertThat(data, is(equalTo(getTestName())));
 
             Disposition disposition = interaction.dispositionSettled(false)
+                                                 .dispositionFirstFromLatestDelivery()
                                                  .dispositionRole(Role.RECEIVER)
                                                  .dispositionState(new Accepted())
                                                  .disposition()
@@ -552,8 +596,11 @@
                                                  .getLatestResponse(Disposition.class);
             assertThat(disposition.getSettled(), is(true));
 
-            interaction.consumeResponse(null, Flow.class);
-
+            interaction.dispositionSettled(true)
+                       .dispositionFirstFromLatestDelivery()
+                       .dispositionRole(Role.RECEIVER)
+                       .dispositionState(new Accepted())
+                       .disposition();
         }
     }
 
@@ -561,8 +608,6 @@
     @SpecificationTest(section = "2.6.12", description = "Transferring A Message.")
     public void receiveTransferReceiverSettleSecondWithRejectedOutcome() throws Exception
     {
-        Utils.putMessageOnQueue(getBrokerAdmin(), BrokerAdmin.TEST_QUEUE_NAME, getTestName());
-
         try (FrameTransport transport = new FrameTransport(_brokerAddress).connect())
         {
             final Interaction interaction = transport.newInteraction()
@@ -575,17 +620,20 @@
                                                      .attachRcvSettleMode(ReceiverSettleMode.SECOND)
                                                      .attach().consumeResponse()
                                                      .flowIncomingWindow(UnsignedInteger.ONE)
-                                                     .flowNextIncomingId(UnsignedInteger.ZERO)
+                                                     .flowNextIncomingIdFromPeerLatestSessionBeginAndDeliveryCount()
                                                      .flowOutgoingWindow(UnsignedInteger.ZERO)
                                                      .flowNextOutgoingId(UnsignedInteger.ZERO)
                                                      .flowLinkCredit(UnsignedInteger.ONE)
                                                      .flowHandleFromLinkHandle()
                                                      .flow();
 
+            Utils.putMessageOnQueue(getBrokerAdmin(), BrokerAdmin.TEST_QUEUE_NAME, getTestName());
+
             Object data = interaction.receiveDelivery().decodeLatestDelivery().getDecodedLatestDelivery();
             assertThat(data, is(equalTo(getTestName())));
 
             interaction.dispositionSettled(false)
+                       .dispositionFirstFromLatestDelivery()
                        .dispositionRole(Role.RECEIVER)
                        .dispositionState(new Rejected())
                        .disposition()
@@ -599,9 +647,11 @@
             Disposition disposition = interaction.getLatestResponse(Disposition.class);
             assertThat(disposition.getSettled(), is(true));
 
-            interaction.consumeResponse(null, Flow.class);
-
-
+            interaction.dispositionSettled(true)
+                       .dispositionFirstFromLatestDelivery()
+                       .dispositionRole(Role.RECEIVER)
+                       .dispositionState(new Rejected())
+                       .disposition();
 
         }
         assertThat(Utils.receiveMessage(_brokerAddress, BrokerAdmin.TEST_QUEUE_NAME), is(equalTo(getTestName())));
@@ -627,7 +677,7 @@
                                                      .attachSourceDefaultOutcome(null)
                                                      .attach().consumeResponse()
                                                      .flowIncomingWindow(UnsignedInteger.ONE)
-                                                     .flowNextIncomingId(UnsignedInteger.ZERO)
+                                                     .flowNextIncomingIdFromPeerLatestSessionBeginAndDeliveryCount()
                                                      .flowOutgoingWindow(UnsignedInteger.ZERO)
                                                      .flowNextOutgoingId(UnsignedInteger.ZERO)
                                                      .flowLinkCredit(UnsignedInteger.ONE)
@@ -640,6 +690,7 @@
             assertThat(data, is(equalTo(getTestName())));
 
             Disposition disposition = interaction.dispositionSettled(false)
+                                                 .dispositionFirstFromLatestDelivery()
                                                  .dispositionRole(Role.RECEIVER)
                                                  .dispositionState(null)
                                                  .disposition()
@@ -658,7 +709,6 @@
                                                          + " non-terminal delivery states to the sender")
     public void receiveTransferReceiverIndicatesNonTerminalDeliveryState() throws Exception
     {
-        String testMessageData;
         try (FrameTransport transport = new FrameTransport(_brokerAddress).connect())
         {
             final Interaction interaction = transport.newInteraction();
@@ -667,19 +717,13 @@
                                    .openMaxFrameSize(UnsignedInteger.valueOf(4096))
                                    .open().consumeResponse()
                                    .getLatestResponse(Open.class);
-
-            int negotiatedFrameSize = open.getMaxFrameSize().intValue();
-            testMessageData = Stream.generate(() -> "*").limit(negotiatedFrameSize).collect(Collectors.joining());
-
-            Utils.putMessageOnQueue(getBrokerAdmin(), BrokerAdmin.TEST_QUEUE_NAME, testMessageData);
-
             interaction.begin().consumeResponse()
                        .attachRole(Role.RECEIVER)
                        .attachSourceAddress(BrokerAdmin.TEST_QUEUE_NAME)
                        .attachRcvSettleMode(ReceiverSettleMode.SECOND)
                        .attach().consumeResponse()
                        .flowIncomingWindow(UnsignedInteger.ONE)
-                       .flowNextIncomingId(UnsignedInteger.ZERO)
+                       .flowNextIncomingIdFromPeerLatestSessionBeginAndDeliveryCount()
                        .flowOutgoingWindow(UnsignedInteger.ZERO)
                        .flowNextOutgoingId(UnsignedInteger.ZERO)
                        .flowLinkCredit(UnsignedInteger.ONE)
@@ -687,6 +731,11 @@
                        .flow()
                        .sync();
 
+            final int negotiatedFrameSize = open.getMaxFrameSize().intValue();
+            final String testMessageData = Stream.generate(() -> "*").limit(negotiatedFrameSize).collect(Collectors.joining());
+
+            Utils.putMessageOnQueue(getBrokerAdmin(), BrokerAdmin.TEST_QUEUE_NAME, testMessageData);
+
             MessageDecoder messageDecoder = new MessageDecoder();
 
             Transfer first = interaction.consumeResponse(Transfer.class)
@@ -745,14 +794,14 @@
             assumeThat(attach.getSndSettleMode(), is(equalTo(SenderSettleMode.SETTLED)));
 
             interaction.flowIncomingWindow(UnsignedInteger.ONE)
-                                                     .flowNextIncomingId(UnsignedInteger.ZERO)
-                                                     .flowOutgoingWindow(UnsignedInteger.ZERO)
-                                                     .flowNextOutgoingId(UnsignedInteger.ZERO)
-                                                     .flowLinkCredit(UnsignedInteger.ONE)
-                                                     .flowHandleFromLinkHandle()
-                                                     .flow();
+                       .flowNextIncomingIdFromPeerLatestSessionBeginAndDeliveryCount()
+                       .flowOutgoingWindow(UnsignedInteger.ZERO)
+                       .flowNextOutgoingId(UnsignedInteger.ZERO)
+                       .flowLinkCredit(UnsignedInteger.ONE)
+                       .flowHandleFromLinkHandle()
+                       .flow();
 
-            List<Transfer> transfers = interaction.receiveDelivery().getLatestDelivery();
+            final List<Transfer> transfers = interaction.receiveDelivery().getLatestDelivery();
             final AtomicBoolean isSettled = new AtomicBoolean();
             transfers.forEach(transfer -> { if (Boolean.TRUE.equals(transfer.getSettled())) { isSettled.set(true);}});
 
@@ -762,10 +811,6 @@
             interaction.doCloseConnection();
         }
 
-        if (getBrokerAdmin().isQueueDepthSupported())
-        {
-            assertThat(getBrokerAdmin().getQueueDepthMessages(BrokerAdmin.TEST_QUEUE_NAME), is(equalTo(0)));
-        }
         Utils.putMessageOnQueue(getBrokerAdmin(), BrokerAdmin.TEST_QUEUE_NAME, "test");
         assertThat(Utils.receiveMessage(_brokerAddress, BrokerAdmin.TEST_QUEUE_NAME), is(equalTo("test")));
     }
@@ -793,17 +838,13 @@
                        .attachTargetAddress(BrokerAdmin.TEST_QUEUE_NAME)
                        .attach()
                        .consumeResponse(Attach.class)
-                       .consumeResponse(Flow.class);
-
-            Flow flow = interaction.getLatestResponse(Flow.class);
-            assertThat(flow.getLinkCredit().intValue(), is(greaterThan(1)));
-
-            interaction.transferDeliveryId(UnsignedInteger.ZERO)
+                       .consumeResponse(Flow.class)
+                       .transferDeliveryId()
                        .transferDeliveryTag(deliveryTag)
                        .transferPayloadData(content1)
                        .transfer()
                        .transferDeliveryTag(deliveryTag)
-                       .transferDeliveryId(UnsignedInteger.ONE)
+                       .transferDeliveryId()
                        .transferPayloadData(getTestName() + "_2")
                        .transfer()
                        .sync();
@@ -1019,28 +1060,21 @@
             interaction.txnAttachCoordinatorLink(txnState)
                        .txnDeclare(txnState);
 
-            interaction.transferDeliveryId(UnsignedInteger.ONE)
+            interaction.transferDeliveryId()
                        .transferDeliveryTag(new Binary("A".getBytes(StandardCharsets.UTF_8)))
                        .transferPayloadData(contents[0])
                        .transfer()
-                       .transferDeliveryId(UnsignedInteger.valueOf(2))
+                       .transferDeliveryId()
                        .transferDeliveryTag(new Binary("B".getBytes(StandardCharsets.UTF_8)))
                        .transferPayloadData(contents[1])
                        .transfer()
-                       .transferDeliveryId(UnsignedInteger.valueOf(3))
+                       .transferDeliveryId()
                        .transferDeliveryTag(new Binary("C".getBytes(StandardCharsets.UTF_8)))
                        .transferTransactionalState(txnState.getCurrentTransactionId())
                        .transferPayloadData(contents[2])
                        .transfer();
 
-            final Discharge discharge = new Discharge();
-            discharge.setTxnId(txnState.getCurrentTransactionId());
-            discharge.setFail(false);
-
-            interaction.transferHandle(txnState.getHandle())
-                       .transferDeliveryId(UnsignedInteger.valueOf(4))
-                       .transferDeliveryTag(new Binary(("transaction-" + 4).getBytes(StandardCharsets.UTF_8)))
-                       .transferPayloadData(discharge).transfer();
+            interaction.txnSendDischarge(txnState, false);
 
             assertDeliveries(interaction, Sets.newTreeSet(Arrays.asList(UnsignedInteger.ONE,
                                                                         UnsignedInteger.valueOf(2),
@@ -1069,39 +1103,32 @@
                                                      .attachRcvSettleMode(ReceiverSettleMode.FIRST)
                                                      .attach().consumeResponse()
                                                      .flowIncomingWindow(UnsignedInteger.valueOf(numberOfMessages))
-                                                     .flowNextIncomingId(UnsignedInteger.ZERO)
+                                                     .flowNextIncomingIdFromPeerLatestSessionBeginAndDeliveryCount()
                                                      .flowOutgoingWindow(UnsignedInteger.ZERO)
                                                      .flowNextOutgoingId(UnsignedInteger.ZERO)
                                                      .flowLinkCredit(UnsignedInteger.valueOf(numberOfMessages))
                                                      .flowHandleFromLinkHandle()
                                                      .flow();
 
-            for (int i = 0; i < contents.length; i++)
+            UnsignedInteger firstDeliveryId = null;
+            for (final String content : contents)
             {
                 interaction.receiveDelivery(Flow.class).decodeLatestDelivery();
                 Object data = interaction.getDecodedLatestDelivery();
-                assertThat(data, is(equalTo(contents[i])));
-                assertThat(interaction.getLatestDeliveryId(), is(equalTo(UnsignedInteger.valueOf(i))));
+                assertThat(data, is(equalTo(content)));
+                if (firstDeliveryId == null)
+                {
+                    firstDeliveryId = interaction.getLatestDeliveryId();
+                }
             }
 
             interaction.dispositionSettled(true)
                        .dispositionRole(Role.RECEIVER)
-                       .dispositionFirst(UnsignedInteger.ZERO)
+                       .dispositionFirst(firstDeliveryId)
                        .dispositionLast(interaction.getLatestDeliveryId())
                        .dispositionState(new Accepted())
                        .disposition();
-
-            // make sure sure the disposition is handled by making drain request
-            interaction.flowLinkCredit(UnsignedInteger.ONE)
-                       .flowNextIncomingId(UnsignedInteger.valueOf(numberOfMessages))
-                       .flowDrain(Boolean.TRUE)
-                       .flow()
-                       .consumeResponse(Flow.class);
-
-            if (getBrokerAdmin().isQueueDepthSupported())
-            {
-                assertThat(getBrokerAdmin().getQueueDepthMessages(BrokerAdmin.TEST_QUEUE_NAME), is(equalTo(0)));
-            }
+            interaction.doCloseConnection();
         }
 
         final String messageText = getTestName() + "_" + 4;
@@ -1116,7 +1143,6 @@
     {
         final int numberOfMessages = 4;
         final String[] contents = Utils.createTestMessageContents(numberOfMessages, getTestName());
-        Utils.putMessageOnQueue(getBrokerAdmin(), BrokerAdmin.TEST_QUEUE_NAME, contents);
 
         try (FrameTransport transport = new FrameTransport(_brokerAddress).connect())
         {
@@ -1130,19 +1156,22 @@
                                                      .attachHandle(UnsignedInteger.ZERO)
                                                      .attach().consumeResponse()
                                                      .flowIncomingWindow(UnsignedInteger.valueOf(numberOfMessages))
-                                                     .flowNextIncomingId(UnsignedInteger.ZERO)
+                                                     .flowNextIncomingIdFromPeerLatestSessionBeginAndDeliveryCount()
                                                      .flowOutgoingWindow(UnsignedInteger.ZERO)
                                                      .flowNextOutgoingId(UnsignedInteger.ZERO)
                                                      .flowLinkCredit(UnsignedInteger.valueOf(numberOfMessages))
                                                      .flowHandleFromLinkHandle()
                                                      .flow();
 
-            for (int i = 0; i < contents.length; i++)
+            Utils.putMessageOnQueue(getBrokerAdmin(), BrokerAdmin.TEST_QUEUE_NAME, contents);
+
+            final List<UnsignedInteger> deliveryIds = new ArrayList<>();
+            for (final String content : contents)
             {
                 interaction.receiveDelivery(Flow.class).decodeLatestDelivery();
                 Object data = interaction.getDecodedLatestDelivery();
-                assertThat(data, is(equalTo(contents[i])));
-                assertThat(interaction.getLatestDeliveryId(), is(equalTo(UnsignedInteger.valueOf(i))));
+                assertThat(data, is(equalTo(content)));
+                deliveryIds.add(interaction.getLatestDeliveryId());
             }
 
             final InteractionTransactionalState txnState = interaction.createTransactionalState(UnsignedInteger.ONE);
@@ -1151,31 +1180,18 @@
 
             interaction.dispositionSettled(true)
                        .dispositionRole(Role.RECEIVER)
-                       .dispositionFirst(UnsignedInteger.ZERO)
-                       .dispositionLast(UnsignedInteger.ONE)
+                       .dispositionFirst(deliveryIds.get(0))
+                       .dispositionLast(deliveryIds.get(1))
                        .dispositionState(new Accepted())
                        .disposition()
                        .dispositionSettled(true)
                        .dispositionRole(Role.RECEIVER)
-                       .dispositionFirst(UnsignedInteger.valueOf(2))
-                       .dispositionLast(UnsignedInteger.valueOf(3))
+                       .dispositionFirst(deliveryIds.get(2))
+                       .dispositionLast(deliveryIds.get(3))
                        .dispositionTransactionalState(txnState.getCurrentTransactionId(), new Accepted())
                        .disposition();
 
-
-            final Discharge discharge = new Discharge();
-            discharge.setTxnId(txnState.getCurrentTransactionId());
-            discharge.setFail(false);
-
-            interaction.transferHandle(txnState.getHandle())
-                       .transferDeliveryId(UnsignedInteger.valueOf(4))
-                       .transferDeliveryTag(new Binary(("transaction-" + 4).getBytes(StandardCharsets.UTF_8)))
-                       .transferPayloadData(discharge)
-                       .transfer();
-
-
-            final Flow coordinatorFlow = interaction.consume(Flow.class, Disposition.class);
-            assertThat(coordinatorFlow.getHandle(), is(equalTo(txnState.getHandle())));
+            interaction.txnDischarge(txnState, false);
         }
 
         String messageText = getTestName() + "_" + 4;
@@ -1203,6 +1219,10 @@
                               expectedDeliveryIds.remove(deliveryId);
                           });
             }
+            else if (response.getBody() instanceof Flow)
+            {
+                // ignore flows
+            }
         }
         while (!expectedDeliveryIds.isEmpty());
     }
diff --git a/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/transaction/DischargeTest.java b/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/transaction/DischargeTest.java
index 32a96b0..85040c8 100644
--- a/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/transaction/DischargeTest.java
+++ b/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/transaction/DischargeTest.java
@@ -21,6 +21,7 @@
 package org.apache.qpid.tests.protocol.v1_0.transaction;
 
 import static java.nio.charset.StandardCharsets.UTF_8;
+import static org.apache.qpid.tests.utils.BrokerAdmin.KIND_BROKER_J;
 import static org.hamcrest.CoreMatchers.instanceOf;
 import static org.hamcrest.CoreMatchers.not;
 import static org.hamcrest.CoreMatchers.notNullValue;
@@ -28,7 +29,6 @@
 import static org.hamcrest.Matchers.empty;
 import static org.hamcrest.Matchers.equalTo;
 import static org.hamcrest.Matchers.is;
-import static org.junit.Assume.assumeThat;
 
 import java.net.InetSocketAddress;
 import java.util.List;
@@ -40,10 +40,7 @@
 import org.apache.qpid.server.protocol.v1_0.type.UnsignedInteger;
 import org.apache.qpid.server.protocol.v1_0.type.messaging.Accepted;
 import org.apache.qpid.server.protocol.v1_0.type.messaging.Rejected;
-import org.apache.qpid.server.protocol.v1_0.type.transaction.Coordinator;
-import org.apache.qpid.server.protocol.v1_0.type.transaction.Declare;
 import org.apache.qpid.server.protocol.v1_0.type.transaction.Declared;
-import org.apache.qpid.server.protocol.v1_0.type.transaction.Discharge;
 import org.apache.qpid.server.protocol.v1_0.type.transaction.TransactionError;
 import org.apache.qpid.server.protocol.v1_0.type.transport.Attach;
 import org.apache.qpid.server.protocol.v1_0.type.transport.Begin;
@@ -55,10 +52,10 @@
 import org.apache.qpid.server.protocol.v1_0.type.transport.ReceiverSettleMode;
 import org.apache.qpid.server.protocol.v1_0.type.transport.Role;
 import org.apache.qpid.server.protocol.v1_0.type.transport.Transfer;
+import org.apache.qpid.tests.protocol.SpecificationTest;
 import org.apache.qpid.tests.protocol.v1_0.FrameTransport;
 import org.apache.qpid.tests.protocol.v1_0.Interaction;
 import org.apache.qpid.tests.protocol.v1_0.InteractionTransactionalState;
-import org.apache.qpid.tests.protocol.SpecificationTest;
 import org.apache.qpid.tests.protocol.v1_0.Utils;
 import org.apache.qpid.tests.utils.BrokerAdmin;
 import org.apache.qpid.tests.utils.BrokerAdminUsingTestBase;
@@ -76,86 +73,76 @@
 
     @Test
     @SpecificationTest(section = "4.3",
-            description = "If the coordinator is unable to complete the discharge, the coordinator MUST convey the error to the controller "
-                          + "as a transaction-error. If the source for the link to the coordinator supports the rejected outcome, then the "
-                          + "message MUST be rejected with this outcome carrying the transaction-error.")
+            description = "If the coordinator is unable to complete the discharge,"
+                          + " the coordinator MUST convey the error to the controller as a transaction-error."
+                          + " If the source for the link to the coordinator supports the rejected outcome, then the "
+                          + " message MUST be rejected with this outcome carrying the transaction-error.")
     public void dischargeUnknownTransactionIdWhenSourceSupportsRejectedOutcome() throws Exception
     {
         try (FrameTransport transport = new FrameTransport(_brokerAddress).connect())
         {
+            final InteractionTransactionalState txnState = new InteractionTransactionalState(UnsignedInteger.ZERO);
             final Interaction interaction = transport.newInteraction();
-            final Disposition disposition = interaction.negotiateProtocol().consumeResponse()
-                                                       .open().consumeResponse(Open.class)
-                                                       .begin().consumeResponse(Begin.class)
-                                                       .attachRole(Role.SENDER)
-                                                       .attachSourceOutcomes(Rejected.REJECTED_SYMBOL)
-                                                       .attachTarget(new Coordinator())
-                                                       .attach().consumeResponse(Attach.class)
-                                                       .consumeResponse(Flow.class)
-                                                       .transferPayloadData(new Declare())
-                                                       .transfer().consumeResponse()
-                                                       .getLatestResponse(Disposition.class);
+            interaction.negotiateProtocol().consumeResponse()
+                       .open().consumeResponse(Open.class)
+                       .begin().consumeResponse(Begin.class)
 
-            assertThat(disposition.getSettled(), is(equalTo(true)));
-            assertThat(disposition.getState(), is(instanceOf(Declared.class)));
-            assertThat(((Declared) disposition.getState()).getTxnId(), is(notNullValue()));
+                       .txnAttachCoordinatorLink(txnState, Rejected.REJECTED_SYMBOL)
+                       .txnDeclare(txnState);
 
-            interaction.consumeResponse(Flow.class);
+            assertThat(txnState.getDeliveryState(), is(instanceOf(Declared.class)));
+            assertThat(txnState.getCurrentTransactionId(), is(notNullValue()));
 
-            final Discharge discharge = new Discharge();
-            discharge.setTxnId(new Binary("nonExistingTransaction".getBytes(UTF_8)));
-            final Disposition dischargeDisposition = interaction.transferDeliveryId(UnsignedInteger.ONE)
-                                                                .transferDeliveryTag(new Binary("discharge".getBytes(UTF_8)))
-                                                                .transferPayloadData(discharge)
-                                                                .transfer().consumeResponse()
-                                                                .getLatestResponse(Disposition.class);
-            assertThat(dischargeDisposition.getState(), is(instanceOf(Rejected.class)));
-            final Error error = ((Rejected) dischargeDisposition.getState()).getError();
+            txnState.setLastTransactionId(new Binary("nonExistingTransaction".getBytes(UTF_8)));
+            interaction.txnDischarge(txnState, false);
+            interaction.doCloseConnection();
+
+            assertThat(txnState.getDeliveryState(), is(instanceOf(Rejected.class)));
+            final Error error = ((Rejected) txnState.getDeliveryState()).getError();
             assertThat(error, is(notNullValue()));
-            assertThat(error.getCondition(), is(equalTo(TransactionError.UNKNOWN_ID)));
+
+            if (KIND_BROKER_J.equals(getBrokerAdmin().getKind()))
+            {
+                assertThat(error.getCondition(), is(equalTo(TransactionError.UNKNOWN_ID)));
+            }
         }
     }
 
     @Test
     @SpecificationTest(section = "4.3",
-            description = "If the coordinator is unable to complete the discharge, the coordinator MUST convey the error to the controller "
-                          + "as a transaction-error. [...] If the source does not support "
-                          + "the rejected outcome, the transactional resource MUST detach the link to the coordinator, with the detach "
-                          + "performative carrying the transaction-error.")
+            description = "If the coordinator is unable to complete the discharge,"
+                          + " the coordinator MUST convey the error to the controller as a transaction-error."
+                          + " [...] If the source does not support the rejected outcome, the transactional resource"
+                          + " MUST detach the link to the coordinator, with the detach performative carrying"
+                          + " the transaction-error.")
     public void dischargeUnknownTransactionIdWhenSourceDoesNotSupportRejectedOutcome() throws Exception
     {
         try (FrameTransport transport = new FrameTransport(_brokerAddress).connect())
         {
+            final InteractionTransactionalState txnState = new InteractionTransactionalState(UnsignedInteger.ZERO);
             final Interaction interaction = transport.newInteraction();
-            final Disposition disposition = interaction.negotiateProtocol().consumeResponse()
-                                                       .open().consumeResponse(Open.class)
-                                                       .begin().consumeResponse(Begin.class)
-                                                       .attachRole(Role.SENDER)
-                                                       .attachSourceOutcomes(Accepted.ACCEPTED_SYMBOL)
-                                                       .attachTarget(new Coordinator())
-                                                       .attach().consumeResponse(Attach.class)
-                                                       .consumeResponse(Flow.class)
-                                                       .transferPayloadData(new Declare())
-                                                       .transfer().consumeResponse()
-                                                       .getLatestResponse(Disposition.class);
+            interaction.negotiateProtocol().consumeResponse()
+                                           .open().consumeResponse(Open.class)
+                                           .begin().consumeResponse(Begin.class)
+                                           .txnAttachCoordinatorLink(txnState, Accepted.ACCEPTED_SYMBOL)
+                                           .txnDeclare(txnState);
 
+            assertThat(txnState.getDeliveryState(), is(instanceOf(Declared.class)));
+            assertThat(txnState.getCurrentTransactionId(), is(notNullValue()));
 
-            assertThat(disposition.getSettled(), is(equalTo(true)));
-            assertThat(disposition.getState(), is(instanceOf(Declared.class)));
-            assertThat(((Declared) disposition.getState()).getTxnId(), is(notNullValue()));
+            txnState.setLastTransactionId(new Binary("nonExistingTransaction".getBytes(UTF_8)));
+            interaction.txnSendDischarge(txnState, false);
 
-            interaction.consumeResponse(Flow.class);
+            final Detach detachResponse = interaction.consumeResponse(Detach.class).getLatestResponse(Detach.class);
+            interaction.doCloseConnection();
 
-            final Discharge discharge = new Discharge();
-            discharge.setTxnId(new Binary("nonExistingTransaction".getBytes(UTF_8)));
-            final Detach detachResponse = interaction.transferDeliveryId(UnsignedInteger.ONE)
-                                                                .transferDeliveryTag(new Binary("discharge".getBytes(UTF_8)))
-                                                                .transferPayloadData(discharge)
-                                                                .transfer().consumeResponse(Detach.class)
-                                                                .getLatestResponse(Detach.class);
-            Error error = detachResponse.getError();
+            final Error error = detachResponse.getError();
             assertThat(error, is(notNullValue()));
-            assertThat(error.getCondition(), is(equalTo(TransactionError.UNKNOWN_ID)));
+
+            if (KIND_BROKER_J.equals(getBrokerAdmin().getKind()))
+            {
+                assertThat(error.getCondition(), is(equalTo(TransactionError.UNKNOWN_ID)));
+            }
         }
     }
 
@@ -167,33 +154,34 @@
                           + " desired transaction identifier and the outcome to be applied upon a successful discharge.")
     public void dischargeSettledAfterReceiverDetach() throws Exception
     {
-        assumeThat(getBrokerAdmin().isQueueDepthSupported(), is(true));
-
-        Utils.putMessageOnQueue(getBrokerAdmin(), BrokerAdmin.TEST_QUEUE_NAME, "test message");
         try (FrameTransport transport = new FrameTransport(_brokerAddress).connect())
         {
             final Interaction interaction = transport.newInteraction();
             final InteractionTransactionalState txnState = interaction.createTransactionalState(UnsignedInteger.ZERO);
-            List<Transfer> transfers = interaction.negotiateProtocol().consumeResponse()
-                                                  .open().consumeResponse(Open.class)
-                                                  .begin().consumeResponse(Begin.class)
+            interaction.negotiateProtocol().consumeResponse()
+                       .open().consumeResponse(Open.class)
+                       .begin().consumeResponse(Begin.class)
 
-                                                  .txnAttachCoordinatorLink(txnState)
-                                                  .txnDeclare(txnState)
+                       .txnAttachCoordinatorLink(txnState)
+                       .txnDeclare(txnState)
 
-                                                  .attachRole(Role.RECEIVER)
-                                                  .attachHandle(UnsignedInteger.ONE)
-                                                  .attachSourceAddress(BrokerAdmin.TEST_QUEUE_NAME)
-                                                  .attachRcvSettleMode(ReceiverSettleMode.FIRST)
-                                                  .attach().consumeResponse(Attach.class)
+                       .attachRole(Role.RECEIVER)
+                       .attachHandle(UnsignedInteger.ONE)
+                       .attachSourceAddress(BrokerAdmin.TEST_QUEUE_NAME)
+                       .attachRcvSettleMode(ReceiverSettleMode.FIRST)
+                       .attach().consumeResponse(Attach.class)
 
-                                                  .flowIncomingWindow(UnsignedInteger.ONE)
-                                                  .flowLinkCredit(UnsignedInteger.ONE)
-                                                  .flowHandleFromLinkHandle()
-                                                  .flow()
+                       .flowNextIncomingIdFromPeerLatestSessionBeginAndDeliveryCount()
+                       .flowNextOutgoingId(UnsignedInteger.ZERO)
+                       .flowOutgoingWindow(UnsignedInteger.ZERO)
+                       .flowIncomingWindow(UnsignedInteger.ONE)
+                       .flowLinkCredit(UnsignedInteger.ONE)
+                       .flowHandleFromLinkHandle()
+                       .flow();
 
-                                                  .receiveDelivery()
-                                                  .getLatestDelivery();
+            Utils.putMessageOnQueue(getBrokerAdmin(), BrokerAdmin.TEST_QUEUE_NAME, getTestName());
+
+            List<Transfer> transfers = interaction.receiveDelivery().getLatestDelivery();
             assertThat(transfers, is(notNullValue()));
             assertThat(transfers, is(not(empty())));
             final UnsignedInteger deliveryId = transfers.get(0).getDeliveryId();
@@ -204,8 +192,15 @@
                        .disposition()
                        .txnDischarge(txnState, false);
 
-            assertThat(getBrokerAdmin().getQueueDepthMessages(BrokerAdmin.TEST_QUEUE_NAME), is(equalTo(0)));
+            assertThat(txnState.getDeliveryState(), is(instanceOf(Accepted.class)));
+
+            interaction.doCloseConnection();
         }
+
+        String secondMessage = getTestName() + "_2";
+        Utils.putMessageOnQueue(getBrokerAdmin(), BrokerAdmin.TEST_QUEUE_NAME, secondMessage);
+        Object receivedMessage = Utils.receiveMessage(_brokerAddress, BrokerAdmin.TEST_QUEUE_NAME);
+        assertThat(receivedMessage, is(equalTo(secondMessage)));
     }
 
     @Test
@@ -219,8 +214,6 @@
                           + " was associated.")
     public void dischargeSettledAfterSenderDetach() throws Exception
     {
-        assumeThat(getBrokerAdmin().isQueueDepthSupported(), is(true));
-
         try (FrameTransport transport = new FrameTransport(_brokerAddress).connect())
         {
             final Interaction interaction = transport.newInteraction();
@@ -238,20 +231,21 @@
                        .attach().consumeResponse(Attach.class)
                        .consumeResponse(Flow.class)
 
+                       .transferDeliveryId()
                        .transferTransactionalState(txnState.getCurrentTransactionId())
-                       .transferPayloadData("test message")
+                       .transferPayloadData(getTestName())
                        .transferHandle(UnsignedInteger.ONE)
                        .transfer().consumeResponse(Disposition.class)
 
                        .detachHandle(UnsignedInteger.ONE)
                        .detach().consumeResponse(Detach.class);
-
-            assertThat(getBrokerAdmin().getQueueDepthMessages(BrokerAdmin.TEST_QUEUE_NAME), is(equalTo(0)));
-
             interaction.txnDischarge(txnState, false);
 
-            assertThat(getBrokerAdmin().getQueueDepthMessages(BrokerAdmin.TEST_QUEUE_NAME), is(equalTo(1)));
+            assertThat(txnState.getDeliveryState(), is(instanceOf(Accepted.class)));
         }
+
+        final Object receivedMessage = Utils.receiveMessage(_brokerAddress, BrokerAdmin.TEST_QUEUE_NAME);
+        assertThat(receivedMessage, is(equalTo(getTestName())));
     }
 
     @Test
@@ -262,8 +256,6 @@
                           + " reflect the outcome that was applied.")
     public void dischargeUnsettledAfterSenderClose() throws Exception
     {
-        assumeThat(getBrokerAdmin().isQueueDepthSupported(), is(true));
-
         try (FrameTransport transport = new FrameTransport(_brokerAddress).connect())
         {
             final Interaction interaction = transport.newInteraction();
@@ -282,21 +274,22 @@
                        .attach().consumeResponse(Attach.class)
                        .consumeResponse(Flow.class)
 
+                       .transferDeliveryId()
                        .transferTransactionalState(txnState.getCurrentTransactionId())
-                       .transferPayloadData("test message")
+                       .transferPayloadData(getTestName())
                        .transferHandle(UnsignedInteger.ONE)
                        .transfer().consumeResponse(Disposition.class)
 
                        .detachHandle(UnsignedInteger.ONE)
                        .detachClose(true)
                        .detach().consumeResponse(Detach.class);
-
-            assertThat(getBrokerAdmin().getQueueDepthMessages(BrokerAdmin.TEST_QUEUE_NAME), is(equalTo(0)));
-
             interaction.txnDischarge(txnState, false);
 
-            assertThat(getBrokerAdmin().getQueueDepthMessages(BrokerAdmin.TEST_QUEUE_NAME), is(equalTo(1)));
+            assertThat(txnState.getDeliveryState(), is(instanceOf(Accepted.class)));
         }
+
+        final Object receivedMessage = Utils.receiveMessage(_brokerAddress, BrokerAdmin.TEST_QUEUE_NAME);
+        assertThat(receivedMessage, is(equalTo(getTestName())));
     }
 
 }
diff --git a/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/transaction/TransactionalTransferTest.java b/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/transaction/TransactionalTransferTest.java
index 853fdd9..f44790f 100644
--- a/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/transaction/TransactionalTransferTest.java
+++ b/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/transaction/TransactionalTransferTest.java
@@ -26,6 +26,7 @@
 import static org.hamcrest.Matchers.equalTo;
 import static org.hamcrest.Matchers.is;
 import static org.junit.Assert.fail;
+import static org.junit.Assume.assumeThat;
 
 import java.net.InetSocketAddress;
 import java.util.Collections;
@@ -102,6 +103,7 @@
                                                          .attach().consumeResponse(Attach.class)
                                                          .consumeResponse(Flow.class)
 
+                                                         .transferDeliveryId()
                                                          .transferHandle(linkHandle)
                                                          .transferPayloadData(getTestName())
                                                          .transferTransactionalState(txnState.getCurrentTransactionId())
@@ -116,9 +118,10 @@
 
             interaction.txnDischarge(txnState, false);
 
-            Object receivedMessage = Utils.receiveMessage(_brokerAddress, BrokerAdmin.TEST_QUEUE_NAME);
-            assertThat(receivedMessage, is(equalTo(getTestName())));
+            assertThat(txnState.getDeliveryState(), is(instanceOf(Accepted.class)));
         }
+        Object receivedMessage = Utils.receiveMessage(_brokerAddress, BrokerAdmin.TEST_QUEUE_NAME);
+        assertThat(receivedMessage, is(equalTo(getTestName())));
     }
 
     @Test
@@ -150,6 +153,7 @@
                                                          .attach().consumeResponse(Attach.class)
                                                          .consumeResponse(Flow.class)
 
+                                                         .transferDeliveryId()
                                                          .transferHandle(linkHandle)
                                                          .transferPayloadData(getTestName())
                                                          .transferTransactionalState(txnState.getCurrentTransactionId())
@@ -164,16 +168,11 @@
 
             interaction.txnDischarge(txnState, true);
 
-            if (getBrokerAdmin().isQueueDepthSupported())
-            {
-                assertThat(getBrokerAdmin().getQueueDepthMessages(BrokerAdmin.TEST_QUEUE_NAME), is(equalTo(0)));
-            }
-            else
-            {
-                final String content = getTestName() + "_2";
-                Utils.putMessageOnQueue(getBrokerAdmin(), BrokerAdmin.TEST_QUEUE_NAME, content);
-                assertThat(Utils.receiveMessage(_brokerAddress, BrokerAdmin.TEST_QUEUE_NAME), is(equalTo(content)));
-            }
+            assertThat(txnState.getDeliveryState(), is(instanceOf(Accepted.class)));
+
+            final String content = getTestName() + "_2";
+            Utils.putMessageOnQueue(getBrokerAdmin(), BrokerAdmin.TEST_QUEUE_NAME, content);
+            assertThat(Utils.receiveMessage(_brokerAddress, BrokerAdmin.TEST_QUEUE_NAME), is(equalTo(content)));
         }
     }
 
@@ -204,9 +203,11 @@
                                                          .attachTargetAddress(BrokerAdmin.TEST_QUEUE_NAME)
                                                          .attachRcvSettleMode(ReceiverSettleMode.SECOND)
                                                          .attachHandle(linkHandle)
-                                                         .attach().consumeResponse(Attach.class)
+                                                         .attach()
+                                                         .consumeResponse(Attach.class)
                                                          .consumeResponse(Flow.class)
 
+                                                         .transferDeliveryId()
                                                          .transferHandle(linkHandle)
                                                          .transferPayloadData(getTestName())
                                                          .transferTransactionalState(txnState.getCurrentTransactionId())
@@ -225,6 +226,8 @@
                        .disposition();
 
             interaction.txnDischarge(txnState, false);
+
+            assertThat(txnState.getDeliveryState(), is(instanceOf(Accepted.class)));
         }
         assertThat(Utils.receiveMessage(_brokerAddress, BrokerAdmin.TEST_QUEUE_NAME), is(equalTo(getTestName())));
     }
@@ -255,6 +258,7 @@
                                               .attach().consumeResponse(Attach.class)
                                               .consumeResponse(Flow.class)
 
+                                              .transferDeliveryId()
                                               .transferHandle(linkHandle)
                                               .transferPayloadData(getTestName())
                                               .transferTransactionalState(integerToBinary(Integer.MAX_VALUE))
@@ -295,7 +299,7 @@
                        .consumeResponse(Attach.class)
 
                        .flowIncomingWindow(UnsignedInteger.ONE)
-                       .flowNextIncomingId(UnsignedInteger.ZERO)
+                       .flowNextIncomingIdFromPeerLatestSessionBeginAndDeliveryCount()
                        .flowOutgoingWindow(UnsignedInteger.ZERO)
                        .flowNextOutgoingId(UnsignedInteger.ZERO)
                        .flowLinkCredit(UnsignedInteger.ONE)
@@ -313,6 +317,9 @@
                        .dispositionTransactionalState(txnState.getCurrentTransactionId(), new Accepted())
                        .disposition()
                        .txnDischarge(txnState, false);
+
+
+            assertThat(txnState.getDeliveryState(), is(instanceOf(Accepted.class)));
         }
     }
 
@@ -344,7 +351,7 @@
                        .consumeResponse(Attach.class)
 
                        .flowIncomingWindow(UnsignedInteger.ONE)
-                       .flowNextIncomingId(UnsignedInteger.ZERO)
+                       .flowNextIncomingIdFromPeerLatestSessionBeginAndDeliveryCount()
                        .flowOutgoingWindow(UnsignedInteger.ZERO)
                        .flowNextOutgoingId(UnsignedInteger.ZERO)
                        .flowLinkCredit(UnsignedInteger.ONE)
@@ -363,6 +370,8 @@
                        .disposition()
                        .txnDischarge(txnState, true);
 
+            assertThat(txnState.getDeliveryState(), is(instanceOf(Accepted.class)));
+
             Object receivedMessage = Utils.receiveMessage(_brokerAddress, BrokerAdmin.TEST_QUEUE_NAME);
             assertThat(receivedMessage, is(equalTo(getTestName())));
         }
@@ -396,7 +405,7 @@
                                                   .attach().consumeResponse(Attach.class)
 
                                                   .flowIncomingWindow(UnsignedInteger.ONE)
-                                                  .flowNextIncomingId(UnsignedInteger.ZERO)
+                                                  .flowNextIncomingIdFromPeerLatestSessionBeginAndDeliveryCount()
                                                   .flowOutgoingWindow(UnsignedInteger.ZERO)
                                                   .flowNextOutgoingId(UnsignedInteger.ZERO)
                                                   .flowLinkCredit(UnsignedInteger.ONE)
@@ -421,7 +430,10 @@
                                               .consumeResponse().getLatestResponse();
             assertUnknownTransactionIdError(response);
         }
-        assertThat(Utils.receiveMessage(_brokerAddress, BrokerAdmin.TEST_QUEUE_NAME), is(equalTo(getTestName())));
+        finally
+        {
+            assertThat(Utils.receiveMessage(_brokerAddress, BrokerAdmin.TEST_QUEUE_NAME), is(equalTo(getTestName())));
+        }
     }
 
     @Ignore("TODO disposition is currently not being sent by Broker")
@@ -479,6 +491,8 @@
             assertThat(((TransactionalState) settledDisposition.getState()).getOutcome(), is(instanceOf(Accepted.class)));
 
             interaction.txnDischarge(txnState, false);
+
+            assertThat(txnState.getDeliveryState(), is(instanceOf(Accepted.class)));
         }
     }
 
@@ -514,7 +528,7 @@
                        .consumeResponse(Attach.class)
 
                        .flowIncomingWindow(UnsignedInteger.ONE)
-                       .flowNextIncomingId(UnsignedInteger.ZERO)
+                       .flowNextIncomingIdFromPeerLatestSessionBeginAndDeliveryCount()
                        .flowOutgoingWindow(UnsignedInteger.ZERO)
                        .flowNextOutgoingId(UnsignedInteger.ZERO)
                        .flowLinkCredit(UnsignedInteger.ONE)
@@ -526,9 +540,6 @@
 
             List<Transfer> transfers = interaction.getLatestDelivery();
             assertThat(transfers.size(), is(equalTo(1)));
-            Transfer transfer = transfers.get(0);
-            assertThat(transfer.getState(), is(instanceOf(TransactionalState.class)));
-            assertThat(((TransactionalState) transfer.getState()).getTxnId(), is(equalTo(txnState.getCurrentTransactionId())));
 
             Object data = interaction.decodeLatestDelivery().getDecodedLatestDelivery();
             assertThat(data, is(equalTo(getTestName())));
@@ -536,19 +547,20 @@
             interaction.dispositionSettled(true)
                        .dispositionRole(Role.RECEIVER)
                        .dispositionTransactionalState(txnState.getCurrentTransactionId(), new Accepted())
+                       .dispositionFirstFromLatestDelivery()
                        .disposition()
                        .txnDischarge(txnState, false);
 
-            if (getBrokerAdmin().isQueueDepthSupported())
-            {
-                assertThat(getBrokerAdmin().getQueueDepthMessages(BrokerAdmin.TEST_QUEUE_NAME), is(equalTo(0)));
-            }
-            else
-            {
-                final String content = getTestName() + "_2";
-                Utils.putMessageOnQueue(getBrokerAdmin(), BrokerAdmin.TEST_QUEUE_NAME, content);
-                assertThat(Utils.receiveMessage(_brokerAddress, BrokerAdmin.TEST_QUEUE_NAME), is(equalTo(content)));
-            }
+
+            assertThat(txnState.getDeliveryState(), is(instanceOf(Accepted.class)));
+
+            Transfer transfer = transfers.get(0);
+            assumeThat(transfer.getState(), is(instanceOf(TransactionalState.class)));
+            assumeThat(((TransactionalState) transfer.getState()).getTxnId(), is(equalTo(txnState.getCurrentTransactionId())));
+
+            final String content = getTestName() + "_2";
+            Utils.putMessageOnQueue(getBrokerAdmin(), BrokerAdmin.TEST_QUEUE_NAME, content);
+            assertThat(Utils.receiveMessage(_brokerAddress, BrokerAdmin.TEST_QUEUE_NAME), is(equalTo(content)));
         }
     }
 
@@ -584,7 +596,7 @@
                        .consumeResponse(Attach.class)
 
                        .flowIncomingWindow(UnsignedInteger.ONE)
-                       .flowNextIncomingId(UnsignedInteger.ZERO)
+                       .flowNextIncomingIdFromPeerLatestSessionBeginAndDeliveryCount()
                        .flowOutgoingWindow(UnsignedInteger.ZERO)
                        .flowNextOutgoingId(UnsignedInteger.ZERO)
                        .flowLinkCredit(UnsignedInteger.ONE)
@@ -596,9 +608,6 @@
 
             List<Transfer> transfers = interaction.getLatestDelivery();
             assertThat(transfers.size(), is(equalTo(1)));
-            Transfer transfer = transfers.get(0);
-            assertThat(transfer.getState(), is(instanceOf(TransactionalState.class)));
-            assertThat(((TransactionalState) transfer.getState()).getTxnId(), is(equalTo(txnState.getCurrentTransactionId())));
 
             Object data = interaction.decodeLatestDelivery().getDecodedLatestDelivery();
             assertThat(data, is(equalTo(getTestName())));
@@ -609,11 +618,13 @@
                        .disposition()
                        .txnDischarge(txnState, true);
 
-            if (getBrokerAdmin().isQueueDepthSupported())
-            {
-                assertThat(getBrokerAdmin().getQueueDepthMessages(BrokerAdmin.TEST_QUEUE_NAME), is(equalTo(1)));
-            }
+            assertThat(txnState.getDeliveryState(), is(instanceOf(Accepted.class)));
+
             assertThat(Utils.receiveMessage(_brokerAddress, BrokerAdmin.TEST_QUEUE_NAME), is(equalTo(getTestName())));
+
+            Transfer transfer = transfers.get(0);
+            assumeThat(transfer.getState(), is(instanceOf(TransactionalState.class)));
+            assumeThat(((TransactionalState) transfer.getState()).getTxnId(), is(equalTo(txnState.getCurrentTransactionId())));
         }
     }
 
@@ -661,7 +672,10 @@
 
             assertUnknownTransactionIdError(response);
         }
-        assertThat(Utils.receiveMessage(_brokerAddress, BrokerAdmin.TEST_QUEUE_NAME), is(equalTo(getTestName())));
+        finally
+        {
+            assertThat(Utils.receiveMessage(_brokerAddress, BrokerAdmin.TEST_QUEUE_NAME), is(equalTo(getTestName())));
+        }
     }
 
     private void assertUnknownTransactionIdError(final Response<?> response)
diff --git a/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/transport/link/FlowTest.java b/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/transport/link/FlowTest.java
index 1bfa6d3..448de42 100644
--- a/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/transport/link/FlowTest.java
+++ b/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/transport/link/FlowTest.java
@@ -21,6 +21,7 @@
 package org.apache.qpid.tests.protocol.v1_0.transport.link;
 
 import static org.hamcrest.CoreMatchers.equalTo;
+import static org.hamcrest.CoreMatchers.instanceOf;
 import static org.hamcrest.CoreMatchers.notNullValue;
 import static org.hamcrest.CoreMatchers.nullValue;
 import static org.hamcrest.MatcherAssert.assertThat;
@@ -42,6 +43,7 @@
 import org.apache.qpid.server.protocol.v1_0.type.transport.Open;
 import org.apache.qpid.server.protocol.v1_0.type.transport.Role;
 import org.apache.qpid.server.protocol.v1_0.type.transport.SessionError;
+import org.apache.qpid.tests.protocol.Response;
 import org.apache.qpid.tests.protocol.SpecificationTest;
 import org.apache.qpid.tests.protocol.v1_0.FrameTransport;
 import org.apache.qpid.tests.protocol.v1_0.Interaction;
@@ -53,7 +55,7 @@
 {
     @Test
     @SpecificationTest(section = "1.3.4",
-            description = "Flow without mandatory fields should result in a decoding error.")
+            description = "mandatory [...] a non null value for the field is always encoded.")
     public void emptyFlow() throws Exception
     {
         getBrokerAdmin().createQueue(BrokerAdmin.TEST_QUEUE_NAME);
@@ -69,7 +71,7 @@
                                            .flowOutgoingWindow(null)
                                            .flowNextOutgoingId(null)
                                            .flow()
-                                           .consumeResponse()
+                                           .consumeResponse(Close.class)
                                            .getLatestResponse(Close.class);
             assertThat(responseClose.getError(), is(notNullValue()));
             assertThat(responseClose.getError().getCondition(), is(AmqpError.DECODE_ERROR));
@@ -89,6 +91,11 @@
                                          .open().consumeResponse(Open.class)
                                          .begin().consumeResponse(Begin.class)
                                          .flowEcho(true)
+                                         .flowOutgoingWindow(UnsignedInteger.ZERO)
+                                         .flowNextOutgoingId(UnsignedInteger.ZERO)
+                                         .flowNextIncomingIdFromPeerLatestSessionBeginAndDeliveryCount()
+                                         .flowIncomingWindow(UnsignedInteger.ONE)
+                                         .flowHandle(null)
                                          .flow()
                                          .consumeResponse()
                                          .getLatestResponse(Flow.class);
@@ -118,7 +125,10 @@
                                          .flowHandleFromLinkHandle()
                                          .flowAvailable(UnsignedInteger.valueOf(10))
                                          .flowDeliveryCount(UnsignedInteger.ZERO)
-                                         .flowLinkCredit(UnsignedInteger.ZERO)
+                                         .flowLinkCredit(UnsignedInteger.ONE)
+                                         .flowOutgoingWindow(UnsignedInteger.ZERO)
+                                         .flowNextOutgoingId(UnsignedInteger.ZERO)
+                                         .flowNextIncomingIdFromPeerLatestSessionBeginAndDeliveryCount()
                                          .flow().consumeResponse()
                                          .getLatestResponse(Flow.class);
             assertThat(responseFlow.getEcho(), not(equalTo(Boolean.TRUE)));
@@ -146,7 +156,7 @@
                        .attachSourceAddress(BrokerAdmin.TEST_QUEUE_NAME)
                        .attach().consumeResponse()
                        .flowIncomingWindow(UnsignedInteger.ONE)
-                       .flowNextIncomingId(UnsignedInteger.ZERO)
+                       .flowNextIncomingIdFromPeerLatestSessionBeginAndDeliveryCount()
                        .flowOutgoingWindow(UnsignedInteger.ZERO)
                        .flowNextOutgoingId(UnsignedInteger.ZERO)
                        .flowLinkCredit(UnsignedInteger.ONE)
@@ -156,7 +166,7 @@
                        .decodeLatestDelivery()
                        .dispositionSettled(true)
                        .dispositionRole(Role.RECEIVER)
-                       .dispositionFirst(interaction.getLatestDeliveryId())
+                       .dispositionFirstFromLatestDelivery()
                        .dispositionLast(interaction.getLatestDeliveryId())
                        .dispositionState(new Accepted())
                        .disposition()
@@ -215,7 +225,12 @@
                                        .open().consumeResponse(Open.class)
                                        .begin().consumeResponse(Begin.class)
                                        .flowEcho(true)
-                                       .flowHandle(UnsignedInteger.ONE)
+                                       .flowIncomingWindow(UnsignedInteger.ONE)
+                                       .flowNextIncomingIdFromPeerLatestSessionBeginAndDeliveryCount()
+                                       .flowLinkCredit(UnsignedInteger.ONE)
+                                       .flowHandle(UnsignedInteger.valueOf(Integer.MAX_VALUE))
+                                       .flowOutgoingWindow(UnsignedInteger.ZERO)
+                                       .flowNextOutgoingId(UnsignedInteger.ZERO)
                                        .flow()
                                        .consumeResponse().getLatestResponse(End.class);
 
@@ -249,20 +264,22 @@
             UnsignedInteger remoteHandle = remoteAttach.getHandle();
             assertThat(remoteHandle, is(notNullValue()));
 
-            Flow responseFlow = interaction.flowIncomingWindow(UnsignedInteger.valueOf(1))
-                                           .flowNextIncomingId(UnsignedInteger.ZERO)
+            interaction.flowIncomingWindow(UnsignedInteger.ONE)
+                                           .flowNextIncomingIdFromPeerLatestSessionBeginAndDeliveryCount()
+                                           .flowOutgoingWindow(UnsignedInteger.ZERO)
+                                           .flowNextOutgoingId(UnsignedInteger.ZERO)
                                            .flowLinkCredit(UnsignedInteger.ONE)
                                            .flowDrain(Boolean.FALSE)
                                            .flowEcho(Boolean.TRUE)
                                            .flowHandleFromLinkHandle()
                                            .flow()
-                                           .consumeResponse().getLatestResponse(Flow.class);
+                                           .consumeResponse(null, Flow.class);
 
-            assertThat(responseFlow.getHandle(), is(equalTo(remoteHandle)));
-            assertThat(responseFlow.getLinkCredit(), is(equalTo(UnsignedInteger.ONE)));
-            assertThat(responseFlow.getDrain(), is(equalTo(Boolean.FALSE)));
-
-            responseFlow = interaction.flowLinkCredit(UnsignedInteger.ONE)
+            Flow responseFlow = interaction.flowIncomingWindow(UnsignedInteger.ONE)
+                                      .flowNextIncomingIdFromPeerLatestSessionBeginAndDeliveryCount()
+                                      .flowOutgoingWindow(UnsignedInteger.ZERO)
+                                      .flowNextOutgoingId(UnsignedInteger.ZERO)
+                                      .flowLinkCredit(UnsignedInteger.ONE)
                                       .flowDrain(Boolean.TRUE)
                                       .flowEcho(Boolean.FALSE)
                                       .flowHandleFromLinkHandle()
@@ -271,6 +288,7 @@
 
             assertThat(responseFlow.getHandle(), is(equalTo(remoteHandle)));
             assertThat(responseFlow.getLinkCredit(), is(equalTo(UnsignedInteger.ZERO)));
+            assertThat(responseFlow.getDrain(), is(equalTo(Boolean.TRUE)));
         }
     }
 
@@ -286,8 +304,6 @@
     {
         BrokerAdmin brokerAdmin = getBrokerAdmin();
         brokerAdmin.createQueue(BrokerAdmin.TEST_QUEUE_NAME);
-        String messageContent = getTestName();
-        Utils.putMessageOnQueue(getBrokerAdmin(), BrokerAdmin.TEST_QUEUE_NAME, messageContent);
 
         final InetSocketAddress addr = brokerAdmin.getBrokerAddress(BrokerAdmin.PortType.ANONYMOUS_AMQP);
         try (FrameTransport transport = new FrameTransport(addr).connect())
@@ -304,35 +320,42 @@
             UnsignedInteger remoteHandle = remoteAttach.getHandle();
             assertThat(remoteHandle, is(notNullValue()));
 
-            Object receivedMessageContent = interaction.flowIncomingWindow(UnsignedInteger.valueOf(1))
-                                                       .flowNextIncomingId(UnsignedInteger.ZERO)
+            interaction.flowIncomingWindow(UnsignedInteger.valueOf(1))
+                                                       .flowNextIncomingIdFromPeerLatestSessionBeginAndDeliveryCount()
                                                        .flowLinkCredit(UnsignedInteger.ONE)
                                                        .flowDrain(Boolean.FALSE)
                                                        .flowEcho(Boolean.FALSE)
                                                        .flowHandleFromLinkHandle()
+                                                       .flowOutgoingWindow(UnsignedInteger.ZERO)
+                                                       .flowNextOutgoingId(UnsignedInteger.ZERO)
                                                        .flow()
-                                                       .receiveDelivery()
-                                                       .decodeLatestDelivery()
-                                                       .getDecodedLatestDelivery();
+                                                       .sync();
 
-            assertThat(receivedMessageContent, is(equalTo(messageContent)));
-            UnsignedInteger firstDeliveryId = interaction.getLatestDeliveryId();
-            assertThat(firstDeliveryId, is(equalTo(UnsignedInteger.ZERO)));
+            Utils.putMessageOnQueue(getBrokerAdmin(), BrokerAdmin.TEST_QUEUE_NAME, getTestName());
 
-            Flow responseFlow = interaction.flowNextIncomingId(UnsignedInteger.ONE)
-                                           .flowLinkCredit(UnsignedInteger.ONE)
-                                           .flowDrain(Boolean.TRUE)
-                                           .flowEcho(Boolean.FALSE)
-                                           .flowHandleFromLinkHandle()
-                                           .flow()
-                                           .consumeResponse().getLatestResponse(Flow.class);
+            final Object receivedMessageContent = interaction.receiveDelivery()
+                                                             .decodeLatestDelivery()
+                                                             .getDecodedLatestDelivery();
+
+            assertThat(receivedMessageContent, is(equalTo(getTestName())));
+
+            final Flow responseFlow = interaction.flowNextIncomingIdFromPeerLatestSessionBeginAndDeliveryCount()
+                                                 .flowLinkCredit(UnsignedInteger.ONE)
+                                                 .flowDrain(Boolean.TRUE)
+                                                 .flowEcho(Boolean.FALSE)
+                                                 .flowHandleFromLinkHandle()
+                                                 .flowOutgoingWindow(UnsignedInteger.ZERO)
+                                                 .flowNextOutgoingId(UnsignedInteger.ZERO)
+                                                 .flowDeliveryCount()
+                                                 .flow()
+                                                 .consumeResponse().getLatestResponse(Flow.class);
 
             assertThat(responseFlow.getHandle(), is(equalTo(remoteHandle)));
             assertThat(responseFlow.getLinkCredit(), is(equalTo(UnsignedInteger.ZERO)));
 
             interaction.dispositionSettled(true)
                        .dispositionRole(Role.RECEIVER)
-                       .dispositionFirst(firstDeliveryId)
+                       .dispositionFirst(interaction.getLatestDeliveryId())
                        .dispositionState(new Accepted())
                        .disposition()
                        .sync();
@@ -373,9 +396,12 @@
             UnsignedInteger delta = UnsignedInteger.ONE;
             UnsignedInteger incomingWindow = UnsignedInteger.valueOf(3);
             Object receivedMessageContent1 = interaction.flowIncomingWindow(incomingWindow)
-                                                        .flowNextIncomingId(UnsignedInteger.ZERO)
+                                                        .flowNextIncomingIdFromPeerLatestSessionBeginAndDeliveryCount()
                                                         .flowLinkCredit(delta)
                                                         .flowHandleFromLinkHandle()
+                                                        .flowDeliveryCount()
+                                                        .flowOutgoingWindow(UnsignedInteger.ZERO)
+                                                        .flowNextOutgoingId(UnsignedInteger.ZERO)
                                                         .flow()
                                                         .receiveDelivery()
                                                         .decodeLatestDelivery()
@@ -383,12 +409,14 @@
 
             assertThat(receivedMessageContent1, is(equalTo(contents[0])));
             UnsignedInteger firstDeliveryId = interaction.getLatestDeliveryId();
-            assertThat(firstDeliveryId, is(equalTo(UnsignedInteger.ZERO)));
 
             Object receivedMessageContent2 = interaction.flowIncomingWindow(incomingWindow)
-                                                        .flowNextIncomingId(UnsignedInteger.ONE)
+                                                        .flowNextIncomingIdFromPeerLatestSessionBeginAndDeliveryCount()
                                                         .flowLinkCredit(delta)
                                                         .flowHandleFromLinkHandle()
+                                                        .flowOutgoingWindow(UnsignedInteger.ZERO)
+                                                        .flowNextOutgoingId(UnsignedInteger.ZERO)
+                                                        .flowDeliveryCount()
                                                         .flow()
                                                         .receiveDelivery()
                                                         .decodeLatestDelivery()
@@ -396,17 +424,18 @@
 
             assertThat(receivedMessageContent2, is(equalTo(contents[1])));
             UnsignedInteger secondDeliveryId = interaction.getLatestDeliveryId();
-            assertThat(secondDeliveryId, is(equalTo(UnsignedInteger.ONE)));
 
             // send session flow with echo=true to verify that no message is delivered without issuing a credit
-            Flow responseFlow = interaction.flowNextIncomingId(UnsignedInteger.valueOf(2))
-                                           .flowLinkCredit(null)
-                                           .flowHandle(null)
-                                           .flowEcho(Boolean.TRUE)
-                                           .flow()
-                                           .consumeResponse().getLatestResponse(Flow.class);
-
-            assertThat(responseFlow.getHandle(), is(nullValue()));
+            interaction.flowNextIncomingIdFromPeerLatestSessionBeginAndDeliveryCount()
+                       .flowIncomingWindow(incomingWindow)
+                       .flowLinkCredit(null)
+                       .flowHandle(null)
+                       .flowDeliveryCount(null)
+                       .flowEcho(Boolean.TRUE)
+                       .flowOutgoingWindow(UnsignedInteger.ZERO)
+                       .flowNextOutgoingId(UnsignedInteger.ZERO)
+                       .flow()
+                       .consumeResponse(null, Flow.class);
 
             interaction.dispositionSettled(true)
                        .dispositionRole(Role.RECEIVER)
@@ -429,10 +458,8 @@
     {
         BrokerAdmin brokerAdmin = getBrokerAdmin();
         brokerAdmin.createQueue(BrokerAdmin.TEST_QUEUE_NAME);
-        final String[] contents = Utils.createTestMessageContents(2, getTestName());
-        Utils.putMessageOnQueue(getBrokerAdmin(), BrokerAdmin.TEST_QUEUE_NAME, contents);
-
         final InetSocketAddress addr = brokerAdmin.getBrokerAddress(BrokerAdmin.PortType.ANONYMOUS_AMQP);
+        Utils.putMessageOnQueue(getBrokerAdmin(), BrokerAdmin.TEST_QUEUE_NAME, getTestName());
         try (FrameTransport transport = new FrameTransport(addr).connect())
         {
             Interaction interaction = transport.newInteraction()
@@ -447,36 +474,68 @@
             UnsignedInteger remoteHandle = remoteAttach.getHandle();
             assertThat(remoteHandle, is(notNullValue()));
 
-            Object receivedMessageContent1 = interaction.flowIncomingWindow(UnsignedInteger.valueOf(2))
-                                                        .flowNextIncomingId(UnsignedInteger.ZERO)
-                                                        .flowLinkCredit(UnsignedInteger.ONE)
+            UnsignedInteger incomingWindow = UnsignedInteger.valueOf(2);
+            Object receivedMessageContent1 = interaction.flowIncomingWindow(incomingWindow)
+                                                        .flowNextIncomingIdFromPeerLatestSessionBeginAndDeliveryCount()
+                                                        .flowLinkCredit(incomingWindow)
+                                                        .flowNextOutgoingId()
                                                         .flowHandleFromLinkHandle()
+                                                        .flowOutgoingWindow(UnsignedInteger.ZERO)
+                                                        .flowNextOutgoingId(UnsignedInteger.ZERO)
                                                         .flow()
                                                         .receiveDelivery()
                                                         .decodeLatestDelivery()
                                                         .getDecodedLatestDelivery();
+            assertThat(receivedMessageContent1, is(equalTo(getTestName())));
 
-            assertThat(receivedMessageContent1, is(equalTo(contents[0])));
-            assertThat(interaction.getLatestDeliveryId(), is(equalTo(UnsignedInteger.ZERO)));
+            final Response<?> response = interaction.flowIncomingWindow(incomingWindow)
+                                                    .flowNextIncomingIdFromPeerLatestSessionBeginAndDeliveryCount()
+                                                    .flowLinkCredit(UnsignedInteger.ZERO)
+                                                    .flowHandleFromLinkHandle()
+                                                    .flowEcho(Boolean.TRUE)
+                                                    .flowOutgoingWindow(UnsignedInteger.ZERO)
+                                                    .flowNextOutgoingId(UnsignedInteger.ZERO)
+                                                    .flowDeliveryCount()
+                                                    .flow()
+                                                    .consumeResponse(null, Flow.class)
+                                                    .getLatestResponse();
 
-            Flow responseFlow = interaction.flowNextIncomingId(UnsignedInteger.ONE)
-                                           .flowLinkCredit(UnsignedInteger.ZERO)
-                                           .flowHandleFromLinkHandle()
-                                           .flowEcho(Boolean.TRUE)
-                                           .flow()
-                                           .consumeResponse().getLatestResponse(Flow.class);
+            if (response != null)
+            {
+                assertThat(response.getBody(), is(instanceOf(Flow.class)));
+                final Flow responseFlow = (Flow) response.getBody();
+                assertThat(responseFlow.getEcho(), not(equalTo(Boolean.TRUE)));
+                assertThat(responseFlow.getHandle(), is(notNullValue()));
+            }
 
-            assertThat(responseFlow.getHandle(), is(equalTo(remoteHandle)));
-            assertThat(responseFlow.getLinkCredit(), is(equalTo(UnsignedInteger.ZERO)));
+            final String message2 = getTestName() + "_2";
+            Utils.putMessageOnQueue(getBrokerAdmin(), BrokerAdmin.TEST_QUEUE_NAME, message2);
+            try
+            {
+                // send session flow with echo=true to verify that no message is delivered without issuing a credit
+                interaction.flowIncomingWindow(incomingWindow)
+                           .flowNextIncomingIdFromPeerLatestSessionBeginAndDeliveryCount()
+                           .flowLinkCredit(null)
+                           .flowHandle(null)
+                           .flowDeliveryCount(null)
+                           .flowEcho(Boolean.TRUE)
+                           .flowOutgoingWindow(UnsignedInteger.ZERO)
+                           .flowNextOutgoingId(UnsignedInteger.ZERO)
+                           .flow()
+                           .consumeResponse(null, Flow.class);
+            }
+            finally
+            {
+                assertThat(Utils.receiveMessage(addr, BrokerAdmin.TEST_QUEUE_NAME), is(equalTo(message2)));
 
-            interaction.dispositionSettled(true)
-                       .dispositionRole(Role.RECEIVER)
-                       .dispositionFirst(interaction.getLatestDeliveryId())
-                       .dispositionState(new Accepted())
-                       .disposition()
-                       .sync();
+                interaction.dispositionSettled(true)
+                           .dispositionRole(Role.RECEIVER)
+                           .dispositionFirst(interaction.getLatestDeliveryId())
+                           .dispositionState(new Accepted())
+                           .disposition()
+                           .sync();
+            }
         }
-        assertThat(Utils.receiveMessage(addr, BrokerAdmin.TEST_QUEUE_NAME), is(equalTo(contents[1])));
     }
 
     @Test
@@ -485,7 +544,7 @@
                           + " available to consume the current link-credit. If set, the sender will"
                           + " (after sending all available messages) advance the delivery-count as much as possible,"
                           + " consuming all link-credit, and send the flow state to the receiver.")
-    public void drainWithZeroCredits() throws Exception
+    public void drain() throws Exception
     {
         BrokerAdmin brokerAdmin = getBrokerAdmin();
         brokerAdmin.createQueue(BrokerAdmin.TEST_QUEUE_NAME);
@@ -507,16 +566,26 @@
             assertThat(remoteHandle, is(notNullValue()));
 
             Flow responseFlow = interaction.flowIncomingWindow(UnsignedInteger.valueOf(2))
-                                           .flowNextIncomingId(UnsignedInteger.ZERO)
-                                           .flowLinkCredit(UnsignedInteger.ZERO)
+                                           .flowNextIncomingIdFromPeerLatestSessionBeginAndDeliveryCount()
+                                           .flowLinkCredit(UnsignedInteger.valueOf(2))
                                            .flowDrain(Boolean.TRUE)
                                            .flowHandleFromLinkHandle()
+                                           .flowOutgoingWindow(UnsignedInteger.ZERO)
+                                           .flowNextOutgoingId(UnsignedInteger.ZERO)
                                            .flow()
-                                           .consumeResponse().getLatestResponse(Flow.class);
+                                           .receiveDelivery()
+                                           .decodeLatestDelivery()
+                                           .consumeResponse(Flow.class).getLatestResponse(Flow.class);
 
             assertThat(responseFlow.getHandle(), is(equalTo(remoteHandle)));
             assertThat(responseFlow.getLinkCredit(), is(equalTo(UnsignedInteger.ZERO)));
+
+            interaction.dispositionSettled(true)
+                       .dispositionRole(Role.RECEIVER)
+                       .dispositionFirstFromLatestDelivery()
+                       .dispositionState(new Accepted())
+                       .disposition()
+                       .sync();
         }
-        assertThat(Utils.receiveMessage(addr, BrokerAdmin.TEST_QUEUE_NAME), is(equalTo(getTestName())));
     }
 }
diff --git a/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/transport/link/ResumeDeliveriesTest.java b/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/transport/link/ResumeDeliveriesTest.java
index 337c129..61884e3 100644
--- a/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/transport/link/ResumeDeliveriesTest.java
+++ b/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/transport/link/ResumeDeliveriesTest.java
@@ -591,11 +591,9 @@
 
             interaction.doCloseConnection();
 
-            if (getBrokerAdmin().isQueueDepthSupported())
-            {
-                assertThat(getBrokerAdmin().getQueueDepthMessages(BrokerAdmin.TEST_QUEUE_NAME),
-                           is(equalTo(0)));
-            }
+            final String content = getTestName() + "_2";
+            Utils.putMessageOnQueue(getBrokerAdmin(), BrokerAdmin.TEST_QUEUE_NAME, content);
+            assertThat(Utils.receiveMessage(_brokerAddress, BrokerAdmin.TEST_QUEUE_NAME), Matchers.is(Matchers.equalTo(content)));
         }
     }
 
@@ -675,11 +673,9 @@
 
             interaction.doCloseConnection();
 
-            if (getBrokerAdmin().isQueueDepthSupported())
-            {
-                assertThat(getBrokerAdmin().getQueueDepthMessages(BrokerAdmin.TEST_QUEUE_NAME),
-                           Matchers.is(Matchers.equalTo(0)));
-            }
+            final String content = getTestName() + "_2";
+            Utils.putMessageOnQueue(getBrokerAdmin(), BrokerAdmin.TEST_QUEUE_NAME, content);
+            assertThat(Utils.receiveMessage(_brokerAddress, BrokerAdmin.TEST_QUEUE_NAME), Matchers.is(Matchers.equalTo(content)));
         }
     }
 
diff --git a/systests/protocol-tests-core/src/main/java/org/apache/qpid/tests/protocol/AbstractInteraction.java b/systests/protocol-tests-core/src/main/java/org/apache/qpid/tests/protocol/AbstractInteraction.java
index f9640dd..801033e 100644
--- a/systests/protocol-tests-core/src/main/java/org/apache/qpid/tests/protocol/AbstractInteraction.java
+++ b/systests/protocol-tests-core/src/main/java/org/apache/qpid/tests/protocol/AbstractInteraction.java
@@ -44,9 +44,14 @@
 
     public I consumeResponse(final Class<?>... responseTypes) throws Exception
     {
+        final Set<Class<?>> acceptableResponseClasses = new HashSet<>(Arrays.asList(responseTypes));
+        return consumeResponse(acceptableResponseClasses);
+    }
+
+    protected I consumeResponse(final Set<Class<?>> acceptableResponseClasses) throws Exception
+    {
         sync();
         _latestResponse = getNextResponse();
-        final Set<Class<?>> acceptableResponseClasses = new HashSet<>(Arrays.asList(responseTypes));
         if ((acceptableResponseClasses.isEmpty() && _latestResponse != null)
             || (acceptableResponseClasses.contains(null) && _latestResponse == null))
         {