QPID-8349: [Tests][AMQP 1.0] Improve transaction and transfer tests
diff --git a/systests/protocol-tests-amqp-1-0/src/main/java/org/apache/qpid/tests/protocol/v1_0/Interaction.java b/systests/protocol-tests-amqp-1-0/src/main/java/org/apache/qpid/tests/protocol/v1_0/Interaction.java
index 98ffe45..ab2c959 100644
--- a/systests/protocol-tests-amqp-1-0/src/main/java/org/apache/qpid/tests/protocol/v1_0/Interaction.java
+++ b/systests/protocol-tests-amqp-1-0/src/main/java/org/apache/qpid/tests/protocol/v1_0/Interaction.java
@@ -20,12 +20,6 @@
package org.apache.qpid.tests.protocol.v1_0;
-import static org.hamcrest.CoreMatchers.instanceOf;
-import static org.hamcrest.CoreMatchers.notNullValue;
-import static org.hamcrest.MatcherAssert.assertThat;
-import static org.hamcrest.Matchers.equalTo;
-import static org.hamcrest.Matchers.is;
-
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.Arrays;
@@ -117,6 +111,7 @@
private Map<String, Object> _latestDeliveryApplicationProperties;
private Map<Class, FrameBody> _latestResponses = new HashMap<>();
private AtomicLong _receivedDeliveryCount = new AtomicLong();
+ private AtomicLong _coordinatorCredits = new AtomicLong();
Interaction(final FrameTransport frameTransport)
{
@@ -955,6 +950,12 @@
public Interaction txnAttachCoordinatorLink(InteractionTransactionalState transactionalState) throws Exception
{
+ return txnAttachCoordinatorLink(transactionalState, Accepted.ACCEPTED_SYMBOL, Rejected.REJECTED_SYMBOL);
+ }
+
+ public Interaction txnAttachCoordinatorLink(final InteractionTransactionalState transactionalState,
+ final Symbol... outcomes) throws Exception
+ {
Attach attach = new Attach();
attach.setName("testTransactionCoordinator-" + transactionalState.getHandle());
attach.setHandle(transactionalState.getHandle());
@@ -963,78 +964,99 @@
attach.setRole(Role.SENDER);
Source source = new Source();
attach.setSource(source);
- source.setOutcomes(Accepted.ACCEPTED_SYMBOL, Rejected.REJECTED_SYMBOL);
+ source.setOutcomes(outcomes);
sendPerformativeAndChainFuture(attach, _sessionChannel);
consumeResponse(Attach.class);
- consumeResponse(Flow.class);
+ final Flow flow = consumeResponse(Flow.class).getLatestResponse(Flow.class);
+ _coordinatorCredits.set(flow.getLinkCredit().longValue());
return this;
}
public Interaction txnDeclare(final InteractionTransactionalState txnState) throws Exception
{
- Transfer transfer = createTransactionTransfer(txnState.getHandle());
- transferPayload(transfer, new Declare());
- sendPerformativeAndChainFuture(transfer, _sessionChannel);
- consumeResponse(Disposition.class);
- Disposition declareTransactionDisposition = getLatestResponse(Disposition.class);
- assertThat(declareTransactionDisposition.getSettled(), is(equalTo(true)));
- assertThat(declareTransactionDisposition.getState(), is(instanceOf(Declared.class)));
- Binary transactionId = ((Declared) declareTransactionDisposition.getState()).getTxnId();
- assertThat(transactionId, is(notNullValue()));
- consumeResponse(Flow.class);
+ sendPayloadToCoordinator(new Declare(), txnState.getHandle());
+ final DeliveryState state = handleCoordinatorResponse();
+ txnState.setDeliveryState(state);
+ final Binary transactionId = ((Declared) state).getTxnId();
txnState.setLastTransactionId(transactionId);
return this;
}
- public Interaction discharge(final InteractionTransactionalState txnState, final boolean failed) throws Exception
+ public Interaction txnSendDischarge(final InteractionTransactionalState txnState, final boolean failed)
+ throws Exception
{
final Discharge discharge = new Discharge();
discharge.setTxnId(txnState.getCurrentTransactionId());
discharge.setFail(failed);
-
- Transfer transfer = createTransactionTransfer(txnState.getHandle());
- transferPayload(transfer, discharge);
- sendPerformativeAndChainFuture(transfer, _sessionChannel);
+ sendPayloadToCoordinator(discharge, txnState.getHandle());
return this;
}
public Interaction txnDischarge(final InteractionTransactionalState txnState, boolean failed) throws Exception
{
- discharge(txnState, failed);
-
- Disposition declareTransactionDisposition = null;
- Flow coordinatorFlow = null;
- do
- {
- consumeResponse(Disposition.class, Flow.class);
- Response<?> response = getLatestResponse();
- if (response.getBody() instanceof Disposition)
- {
- declareTransactionDisposition = (Disposition) response.getBody();
- }
- if (response.getBody() instanceof Flow)
- {
- final Flow flowResponse = (Flow) response.getBody();
- if (flowResponse.getHandle().equals(txnState.getHandle()))
- {
- coordinatorFlow = flowResponse;
- }
- }
- } while(declareTransactionDisposition == null || coordinatorFlow == null);
-
- assertThat(declareTransactionDisposition.getSettled(), is(equalTo(true)));
- assertThat(declareTransactionDisposition.getState(), is(instanceOf(Accepted.class)));
-
+ txnSendDischarge(txnState, failed);
+ final DeliveryState state = handleCoordinatorResponse();
+ txnState.setDeliveryState(state);
txnState.setLastTransactionId(null);
return this;
}
+ private void sendPayloadToCoordinator(final Object payload, final UnsignedInteger handle)
+ throws Exception
+ {
+ final Transfer transfer = createTransactionTransfer(handle);
+ transferPayload(transfer, payload);
+ sendPerformativeAndChainFuture(transfer, _sessionChannel);
+ }
+
+ private DeliveryState handleCoordinatorResponse() throws Exception
+ {
+ final Set<Class<?>> expected = new HashSet<>(Collections.singletonList(Disposition.class));
+
+ if (_coordinatorCredits.decrementAndGet() == 0)
+ {
+ expected.add(Flow.class);
+ }
+
+ final Map<Class<?>, ?> responses = consumeResponses(expected);
+
+ final Disposition disposition = (Disposition) responses.get(Disposition.class);
+ if (expected.contains(Flow.class))
+ {
+ Flow flow = (Flow) responses.get(Flow.class);
+ _coordinatorCredits.set(flow.getLinkCredit().longValue());
+ }
+ if (!Boolean.TRUE.equals(disposition.getSettled()))
+ {
+ throw new IllegalStateException("Coordinator disposition is not settled");
+ }
+ return disposition.getState();
+ }
+
+ private Map<Class<?>, ?> consumeResponses(final Set<Class<?>> responseTypes)
+ throws Exception
+ {
+ Map<Class<?>, Object> results = new HashMap<>();
+ do
+ {
+ Response<?> response = consumeResponse(responseTypes).getLatestResponse();
+ if (response != null && response.getBody() instanceof FrameBody)
+ {
+ Class<?> bodyClass = response.getBody().getClass();
+ results.put(bodyClass, response.getBody());
+ }
+ }
+ while (!results.keySet().containsAll(responseTypes));
+ return results;
+ }
+
private Transfer createTransactionTransfer(final UnsignedInteger handle)
{
Transfer transfer = new Transfer();
transfer.setHandle(handle);
transfer.setDeliveryId(getNextDeliveryId());
- transfer.setDeliveryTag(new Binary(("transaction-" + transfer.getDeliveryId()).getBytes(StandardCharsets.UTF_8)));
+ transfer.setDeliveryTag(new Binary(("transaction-"
+ + transfer.getDeliveryId()).getBytes(StandardCharsets.UTF_8)));
return transfer;
}
diff --git a/systests/protocol-tests-amqp-1-0/src/main/java/org/apache/qpid/tests/protocol/v1_0/InteractionTransactionalState.java b/systests/protocol-tests-amqp-1-0/src/main/java/org/apache/qpid/tests/protocol/v1_0/InteractionTransactionalState.java
index 061be92..b0832cc 100644
--- a/systests/protocol-tests-amqp-1-0/src/main/java/org/apache/qpid/tests/protocol/v1_0/InteractionTransactionalState.java
+++ b/systests/protocol-tests-amqp-1-0/src/main/java/org/apache/qpid/tests/protocol/v1_0/InteractionTransactionalState.java
@@ -21,12 +21,14 @@
package org.apache.qpid.tests.protocol.v1_0;
import org.apache.qpid.server.protocol.v1_0.type.Binary;
+import org.apache.qpid.server.protocol.v1_0.type.DeliveryState;
import org.apache.qpid.server.protocol.v1_0.type.UnsignedInteger;
public class InteractionTransactionalState
{
private final UnsignedInteger _handle;
private Binary _lastTransactionId;
+ private DeliveryState _deliveryState;
public InteractionTransactionalState(final UnsignedInteger handle)
{
@@ -47,4 +49,14 @@
{
return _lastTransactionId;
}
+
+ public DeliveryState getDeliveryState()
+ {
+ return _deliveryState;
+ }
+
+ public void setDeliveryState(final DeliveryState deliveryState)
+ {
+ _deliveryState = deliveryState;
+ }
}
diff --git a/systests/protocol-tests-amqp-1-0/src/main/java/org/apache/qpid/tests/protocol/v1_0/Utils.java b/systests/protocol-tests-amqp-1-0/src/main/java/org/apache/qpid/tests/protocol/v1_0/Utils.java
index e846b5f..2ac50f4 100644
--- a/systests/protocol-tests-amqp-1-0/src/main/java/org/apache/qpid/tests/protocol/v1_0/Utils.java
+++ b/systests/protocol-tests-amqp-1-0/src/main/java/org/apache/qpid/tests/protocol/v1_0/Utils.java
@@ -20,6 +20,7 @@
package org.apache.qpid.tests.protocol.v1_0;
+import static java.nio.charset.StandardCharsets.UTF_8;
import static org.hamcrest.Matchers.greaterThan;
import static org.hamcrest.Matchers.is;
import static org.junit.Assume.assumeThat;
@@ -28,6 +29,7 @@
import java.util.stream.IntStream;
import org.apache.qpid.server.bytebuffer.QpidByteBuffer;
+import org.apache.qpid.server.protocol.v1_0.type.Binary;
import org.apache.qpid.server.protocol.v1_0.type.UnsignedInteger;
import org.apache.qpid.server.protocol.v1_0.type.messaging.Accepted;
import org.apache.qpid.server.protocol.v1_0.type.messaging.Header;
@@ -37,6 +39,7 @@
import org.apache.qpid.server.protocol.v1_0.type.transport.Flow;
import org.apache.qpid.server.protocol.v1_0.type.transport.Open;
import org.apache.qpid.server.protocol.v1_0.type.transport.Role;
+import org.apache.qpid.server.protocol.v1_0.type.transport.SenderSettleMode;
import org.apache.qpid.tests.utils.BrokerAdmin;
public class Utils
@@ -83,7 +86,7 @@
.attachSourceAddress(queueName)
.attach().consumeResponse()
.flowIncomingWindow(UnsignedInteger.ONE)
- .flowNextIncomingId(UnsignedInteger.ZERO)
+ .flowNextIncomingIdFromPeerLatestSessionBeginAndDeliveryCount()
.flowOutgoingWindow(UnsignedInteger.ZERO)
.flowNextOutgoingId(UnsignedInteger.ZERO)
.flowLinkCredit(UnsignedInteger.ONE)
@@ -157,6 +160,7 @@
.begin().consumeResponse(Begin.class)
.attachRole(Role.SENDER)
.attachTargetAddress(BrokerAdmin.TEST_QUEUE_NAME)
+ .attachSndSettleMode(SenderSettleMode.SETTLED)
.attach().consumeResponse(Attach.class)
.consumeResponse(Flow.class)
.getLatestResponse(Flow.class);
@@ -166,14 +170,21 @@
message.length),
flow.getLinkCredit().intValue(),
is(greaterThan(message.length)));
+
+ int tag = 0;
for (String payload : message)
{
interaction.transferPayloadData(payload)
.transferSettled(true)
+ .transferDeliveryId()
+ .transferDeliveryTag(new Binary(String.valueOf(tag).getBytes(UTF_8)))
.transfer()
.sync();
+ tag++;
}
+ interaction.doCloseConnection();
}
}
}
+
}
diff --git a/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/DecodeErrorTest.java b/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/DecodeErrorTest.java
index 1ca4419..b079122 100644
--- a/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/DecodeErrorTest.java
+++ b/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/DecodeErrorTest.java
@@ -24,14 +24,18 @@
import static org.hamcrest.CoreMatchers.equalTo;
import static org.hamcrest.CoreMatchers.notNullValue;
import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.Matchers.greaterThan;
import static org.hamcrest.Matchers.is;
import static org.junit.Assert.fail;
+import static org.junit.Assume.assumeThat;
import java.net.InetSocketAddress;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
+import java.util.Map;
+import org.hamcrest.Matchers;
import org.junit.Before;
import org.junit.Test;
@@ -39,6 +43,8 @@
import org.apache.qpid.server.protocol.v1_0.codec.StringWriter;
import org.apache.qpid.server.protocol.v1_0.type.Symbol;
import org.apache.qpid.server.protocol.v1_0.type.UnsignedInteger;
+import org.apache.qpid.server.protocol.v1_0.type.messaging.AmqpValue;
+import org.apache.qpid.server.protocol.v1_0.type.messaging.AmqpValueSection;
import org.apache.qpid.server.protocol.v1_0.type.messaging.DeliveryAnnotations;
import org.apache.qpid.server.protocol.v1_0.type.messaging.DeliveryAnnotationsSection;
import org.apache.qpid.server.protocol.v1_0.type.messaging.Header;
@@ -53,11 +59,13 @@
import org.apache.qpid.server.protocol.v1_0.type.transport.Error;
import org.apache.qpid.server.protocol.v1_0.type.transport.Flow;
import org.apache.qpid.server.protocol.v1_0.type.transport.Open;
+import org.apache.qpid.server.protocol.v1_0.type.transport.ReceiverSettleMode;
import org.apache.qpid.server.protocol.v1_0.type.transport.Role;
import org.apache.qpid.tests.protocol.Response;
import org.apache.qpid.tests.protocol.SpecificationTest;
import org.apache.qpid.tests.utils.BrokerAdmin;
import org.apache.qpid.tests.utils.BrokerAdminUsingTestBase;
+import org.apache.qpid.tests.utils.BrokerSpecific;
public class DecodeErrorTest extends BrokerAdminUsingTestBase
{
@@ -74,44 +82,46 @@
@SpecificationTest(section = "3.2",
description = "Altogether a message consists of the following sections: Zero or one header,"
+ " Zero or one delivery-annotations, [...]")
- public void illegalMessageFormatPayload() throws Exception
+ @BrokerSpecific(kind = BrokerAdmin.KIND_BROKER_J)
+ public void illegalMessage() throws Exception
{
try (FrameTransport transport = new FrameTransport(_brokerAddress).connect())
{
+ final Interaction interaction = transport.newInteraction();
+ final Attach attach = interaction.negotiateProtocol()
+ .consumeResponse()
+ .open()
+ .consumeResponse(Open.class)
+ .begin()
+ .consumeResponse(Begin.class)
+ .attachRole(Role.SENDER)
+ .attachTargetAddress(BrokerAdmin.TEST_QUEUE_NAME)
+ .attachRcvSettleMode(ReceiverSettleMode.SECOND)
+ .attach()
+ .consumeResponse(Attach.class)
+ .getLatestResponse(Attach.class);
+ assumeThat(attach.getRcvSettleMode(), is(equalTo(ReceiverSettleMode.SECOND)));
- List<QpidByteBuffer> payloads = new ArrayList<>();
- final HeaderSection headerSection = new Header().createEncodingRetainingSection();
- payloads.add(headerSection.getEncodedForm());
- headerSection.dispose();
- final StringWriter stringWriter = new StringWriter("string in between annotation sections");
- QpidByteBuffer encodedString = QpidByteBuffer.allocate(stringWriter.getEncodedSize());
- stringWriter.writeToBuffer(encodedString);
- encodedString.flip();
- payloads.add(encodedString);
- final DeliveryAnnotationsSection
- deliveryAnnotationsSection =
- new DeliveryAnnotations(Collections.emptyMap()).createEncodingRetainingSection();
- payloads.add(deliveryAnnotationsSection.getEncodedForm());
- deliveryAnnotationsSection.dispose();
+ final Flow flow = interaction.consumeResponse(Flow.class).getLatestResponse(Flow.class);
+ assumeThat(flow.getLinkCredit(), is(greaterThan(UnsignedInteger.ZERO)));
- final Detach detachResponse;
- try (QpidByteBuffer combinedPayload = QpidByteBuffer.concatenate(payloads))
+ final List<QpidByteBuffer> payloads = buildInvalidMessage();
+ try
{
- detachResponse = transport.newInteraction()
- .negotiateProtocol().consumeResponse()
- .open().consumeResponse(Open.class)
- .begin().consumeResponse(Begin.class)
- .attachRole(Role.SENDER)
- .attachTargetAddress(BrokerAdmin.TEST_QUEUE_NAME)
- .attach().consumeResponse(Attach.class)
- .consumeResponse(Flow.class)
- .transferMessageFormat(UnsignedInteger.ZERO)
- .transferPayload(combinedPayload)
- .transfer()
- .consumeResponse()
- .getLatestResponse(Detach.class);
+ try (QpidByteBuffer combinedPayload = QpidByteBuffer.concatenate(payloads))
+ {
+ interaction.transferMessageFormat(UnsignedInteger.ZERO)
+ .transferPayload(combinedPayload)
+ .transfer();
+ }
}
- payloads.forEach(QpidByteBuffer::dispose);
+ finally
+ {
+ payloads.forEach(QpidByteBuffer::dispose);
+ }
+
+ final Detach detachResponse = interaction.consumeResponse()
+ .getLatestResponse(Detach.class);
assertThat(detachResponse.getError(), is(notNullValue()));
assertThat(detachResponse.getError().getCondition(), is(equalTo(DECODE_ERROR)));
}
@@ -148,6 +158,10 @@
{
error = ((Close) responseBody).getError();
}
+ else if (responseBody instanceof Detach)
+ {
+ error = ((Detach) responseBody).getError();
+ }
else
{
fail(String.format("Expected response of either Detach, End, or Close. Got '%s'", responseBody));
@@ -200,4 +214,51 @@
assertThat(error.getCondition(), is(equalTo(DECODE_ERROR)));
}
}
+
+ private List<QpidByteBuffer> buildInvalidMessage()
+ {
+ final List<QpidByteBuffer> payloads = new ArrayList<>();
+ final Header header = new Header();
+ header.setTtl(UnsignedInteger.valueOf(1000L));
+ final HeaderSection headerSection = header.createEncodingRetainingSection();
+ try
+ {
+ payloads.add(headerSection.getEncodedForm());
+ }
+ finally
+ {
+ headerSection.dispose();
+ }
+
+ final StringWriter stringWriter = new StringWriter("string in between annotation sections");
+ QpidByteBuffer encodedString = QpidByteBuffer.allocate(stringWriter.getEncodedSize());
+ stringWriter.writeToBuffer(encodedString);
+ encodedString.flip();
+ payloads.add(encodedString);
+
+ final Map<Symbol, Object> annoationMap = Collections.singletonMap(Symbol.valueOf("foo"), "bar");
+ final DeliveryAnnotations annotations = new DeliveryAnnotations(annoationMap);
+ final DeliveryAnnotationsSection deliveryAnnotationsSection = annotations.createEncodingRetainingSection();
+ try
+ {
+
+ payloads.add(deliveryAnnotationsSection.getEncodedForm());
+ }
+ finally
+ {
+ deliveryAnnotationsSection.dispose();
+ }
+
+ final AmqpValueSection payload = new AmqpValue(getTestName()).createEncodingRetainingSection();
+ try
+ {
+ payloads.add(payload.getEncodedForm());
+ }
+ finally
+ {
+ payload.dispose();
+ }
+ return payloads;
+ }
+
}
diff --git a/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/extensions/anonymousterminus/AnonymousTerminusTest.java b/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/extensions/anonymousterminus/AnonymousTerminusTest.java
index 1878950..a7ef076 100644
--- a/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/extensions/anonymousterminus/AnonymousTerminusTest.java
+++ b/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/extensions/anonymousterminus/AnonymousTerminusTest.java
@@ -46,8 +46,6 @@
import org.apache.qpid.server.protocol.v1_0.type.messaging.Properties;
import org.apache.qpid.server.protocol.v1_0.type.messaging.Rejected;
import org.apache.qpid.server.protocol.v1_0.type.messaging.Target;
-import org.apache.qpid.server.protocol.v1_0.type.transaction.Coordinator;
-import org.apache.qpid.server.protocol.v1_0.type.transaction.Discharge;
import org.apache.qpid.server.protocol.v1_0.type.transaction.TransactionError;
import org.apache.qpid.server.protocol.v1_0.type.transaction.TransactionalState;
import org.apache.qpid.server.protocol.v1_0.type.transport.AmqpError;
@@ -104,6 +102,7 @@
.attach().consumeResponse(Attach.class)
.consumeResponse(Flow.class)
+ .transferDeliveryId()
.transferPayload(generateMessagePayloadToDestination(BrokerAdmin.TEST_QUEUE_NAME))
.transferSettled(Boolean.TRUE)
.transferDeliveryTag(_deliveryTag)
@@ -139,12 +138,13 @@
.attach().consumeResponse(Attach.class)
.consumeResponse(Flow.class)
+ .transferDeliveryId()
.transferPayload(generateMessagePayloadToDestination("Unknown"))
.transferSettled(Boolean.TRUE)
.transferDeliveryTag(_deliveryTag)
.transfer();
- Detach detach = interaction.consumeResponse().getLatestResponse(Detach.class);
+ Detach detach = interaction.consumeResponse(Detach.class).getLatestResponse(Detach.class);
Error error = detach.getError();
assertThat(error, is(notNullValue()));
assertThat(error.getCondition(), is(equalTo(AmqpError.NOT_FOUND)));
@@ -179,6 +179,7 @@
.attach().consumeResponse(Attach.class)
.consumeResponse(Flow.class)
+ .transferDeliveryId()
.transferPayload(generateMessagePayloadToDestination("Unknown"))
.transferDeliveryTag(_deliveryTag)
.transfer()
@@ -226,6 +227,7 @@
.attach().consumeResponse(Attach.class)
.consumeResponse(Flow.class)
+ .transferDeliveryId()
.transferPayload(generateMessagePayloadToDestination("Unknown"))
.transferDeliveryTag(_deliveryTag)
.transfer();
@@ -262,6 +264,7 @@
.attach().consumeResponse(Attach.class)
.consumeResponse(Flow.class)
+ .transferDeliveryId()
.transferHandle(linkHandle)
.transferPayload(generateMessagePayloadToDestination(BrokerAdmin.TEST_QUEUE_NAME))
.transferDeliveryTag(_deliveryTag)
@@ -271,6 +274,8 @@
.txnDischarge(txnState, false);
+ assertThat(txnState.getDeliveryState(), is(instanceOf(Accepted.class)));
+
Object receivedMessage = Utils.receiveMessage(_brokerAddress, BrokerAdmin.TEST_QUEUE_NAME);
assertThat(receivedMessage, is(equalTo(TEST_MESSAGE_CONTENT)));
}
@@ -295,6 +300,7 @@
.attach().consumeResponse(Attach.class)
.consumeResponse(Flow.class)
+ .transferDeliveryId()
.transferHandle(linkHandle)
.transferPayload(generateMessagePayloadToDestination(BrokerAdmin.TEST_QUEUE_NAME))
.transferDeliveryTag(_deliveryTag)
@@ -314,6 +320,8 @@
interaction.txnDischarge(txnState, false);
+ assertThat(txnState.getDeliveryState(), is(instanceOf(Accepted.class)));
+
Object receivedMessage = Utils.receiveMessage(_brokerAddress, BrokerAdmin.TEST_QUEUE_NAME);
assertThat(receivedMessage, is(equalTo(TEST_MESSAGE_CONTENT)));
}
@@ -339,6 +347,7 @@
.attach().consumeResponse(Attach.class)
.consumeResponse(Flow.class)
+ .transferDeliveryId()
.transferHandle(linkHandle)
.transferPayload(generateMessagePayloadToDestination("Unknown"))
.transferDeliveryTag(_deliveryTag)
@@ -362,6 +371,8 @@
assertThat(rejectedError.getInfo().get(DELIVERY_TAG), is(equalTo(_deliveryTag)));
interaction.txnDischarge(txnState, false);
+
+ assertThat(txnState.getDeliveryState(), is(instanceOf(Accepted.class)));
}
}
@@ -385,6 +396,7 @@
.attach().consumeResponse(Attach.class)
.consumeResponse(Flow.class)
+ .transferDeliveryId()
.transferHandle(linkHandle)
.transferPayload(generateMessagePayloadToDestination("Unknown"))
.transferDeliveryId(UnsignedInteger.valueOf(1))
@@ -400,23 +412,11 @@
assertThat(senderLinkDetachError.getInfo(), is(notNullValue()));
assertThat(senderLinkDetachError.getInfo().get(DELIVERY_TAG), is(equalTo(_deliveryTag)));
- final Discharge discharge = new Discharge();
- discharge.setTxnId(txnState.getCurrentTransactionId());
- discharge.setFail(false);
+ interaction.txnDischarge(txnState, false);
- interaction.transferHandle(txnState.getHandle())
- .transferSettled(Boolean.FALSE)
- .transferDeliveryId(UnsignedInteger.valueOf(2))
- .transferDeliveryTag(new Binary(("transaction-" + 2).getBytes(StandardCharsets.UTF_8)))
- .transferPayloadData(discharge).transfer();
-
- Disposition dischargeTransactionDisposition =
- getDispositionForDeliveryId(interaction, UnsignedInteger.valueOf(2));
-
- assertThat(dischargeTransactionDisposition.getSettled(), is(equalTo(true)));
- assertThat(dischargeTransactionDisposition.getState(), is(instanceOf(Rejected.class)));
-
- Rejected rejected = (Rejected) dischargeTransactionDisposition.getState();
+ DeliveryState txnDischargeDeliveryState = txnState.getDeliveryState();
+ assertThat(txnDischargeDeliveryState, is(instanceOf(Rejected.class)));
+ Rejected rejected = (Rejected) txnDischargeDeliveryState;
Error error = rejected.getError();
assertThat(error, is(notNullValue()));
@@ -468,30 +468,20 @@
.attach().consumeResponse(Attach.class)
.consumeResponse(Flow.class)
+ .transferDeliveryId()
.transferHandle(linkHandle)
.transferPayload(generateMessagePayloadToDestination("Unknown"))
.transferDeliveryTag(_deliveryTag)
.transferTransactionalState(txnState.getCurrentTransactionId())
.transferSettled(Boolean.TRUE)
- .transferDeliveryId(UnsignedInteger.valueOf(1))
.transfer();
- final Discharge discharge = new Discharge();
- discharge.setTxnId(txnState.getCurrentTransactionId());
- discharge.setFail(false);
- interaction.transferHandle(txnState.getHandle())
- .transferDeliveryId(UnsignedInteger.valueOf(2))
- .transferSettled(Boolean.FALSE)
- .transferDeliveryTag(new Binary(("transaction-" + 2).getBytes(StandardCharsets.UTF_8)))
- .transferPayloadData(discharge).transfer();
+ interaction.txnDischarge(txnState, false);
- Disposition dischargeTransactionDisposition =
- getDispositionForDeliveryId(interaction, UnsignedInteger.valueOf(2));
+ DeliveryState txDischargeDeliveryState = txnState.getDeliveryState();
+ assertThat(txDischargeDeliveryState, is(instanceOf(Rejected.class)));
- assertThat(dischargeTransactionDisposition.getSettled(), is(equalTo(true)));
- assertThat(dischargeTransactionDisposition.getState(), is(instanceOf(Rejected.class)));
-
- Rejected rejected = (Rejected) dischargeTransactionDisposition.getState();
+ Rejected rejected = (Rejected) txDischargeDeliveryState;
Error error = rejected.getError();
assertThat(error, is(notNullValue()));
@@ -536,14 +526,7 @@
interaction.begin()
.consumeResponse(Begin.class)
- .attachRole(Role.SENDER)
- .attachName("testTransactionCoordinator-" + txnState.getHandle())
- .attachHandle(txnState.getHandle())
- .attachInitialDeliveryCount(UnsignedInteger.ZERO)
- .attachTarget(new Coordinator())
- .attachSourceOutcomes(Accepted.ACCEPTED_SYMBOL)
- .attach().consumeResponse(Attach.class)
- .consumeResponse(Flow.class)
+ .txnAttachCoordinatorLink(txnState, Accepted.ACCEPTED_SYMBOL)
.txnDeclare(txnState)
.attachRole(Role.SENDER)
@@ -554,22 +537,14 @@
.attach().consumeResponse(Attach.class)
.consumeResponse(Flow.class)
+ .transferDeliveryId()
.transferHandle(linkHandle)
.transferPayload(generateMessagePayloadToDestination("Unknown"))
.transferDeliveryTag(_deliveryTag)
.transferTransactionalState(txnState.getCurrentTransactionId())
.transferSettled(Boolean.TRUE)
- .transfer();
-
- final Discharge discharge = new Discharge();
- discharge.setTxnId(txnState.getCurrentTransactionId());
- discharge.setFail(false);
-
- interaction.transferHandle(txnState.getHandle())
- .transferSettled(Boolean.FALSE)
- .transferDeliveryId(UnsignedInteger.valueOf(4))
- .transferDeliveryTag(new Binary(("transaction-" + 4).getBytes(StandardCharsets.UTF_8)))
- .transferPayloadData(discharge).transfer();
+ .transfer()
+ .txnSendDischarge(txnState, false);
Detach transactionCoordinatorDetach = interaction.consumeResponse().getLatestResponse(Detach.class);
Error transactionCoordinatorDetachError = transactionCoordinatorDetach.getError();
diff --git a/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/extensions/qpid/queue/QueueDeletionTest.java b/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/extensions/qpid/queue/QueueDeletionTest.java
index d6eb3d1..39a8a9b 100644
--- a/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/extensions/qpid/queue/QueueDeletionTest.java
+++ b/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/extensions/qpid/queue/QueueDeletionTest.java
@@ -171,7 +171,7 @@
assertThat(receivedDetach.getError().getCondition(), is(AmqpError.RESOURCE_DELETED));
assertThat(receivedDetach.getHandle(), is(equalTo(attach.getHandle())));
- interaction.discharge(txnState, false);
+ interaction.txnSendDischarge(txnState, false);
assertTransactionRollbackOnly(interaction, txnState);
}
@@ -244,7 +244,7 @@
assertThat(receivedDetach.getError().getCondition(), is(AmqpError.RESOURCE_DELETED));
assertThat(receivedDetach.getHandle(), is(equalTo(attach.getHandle())));
- interaction.discharge(txnState, false);
+ interaction.txnSendDischarge(txnState, false);
assertTransactionRollbackOnly(interaction, txnState);
}
diff --git a/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/messaging/MessageFormat.java b/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/messaging/MessageFormat.java
index 1cf1ff0..6ac8e7f 100644
--- a/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/messaging/MessageFormat.java
+++ b/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/messaging/MessageFormat.java
@@ -70,7 +70,7 @@
{
try (FrameTransport transport = new FrameTransport(_brokerAddress).connect())
{
- QpidByteBuffer[] payloads = Utils.splitPayload("testData", 2);
+ QpidByteBuffer[] payloads = Utils.splitPayload(getTestName(), 2);
final Response<?> latestResponse = transport.newInteraction()
.negotiateProtocol().consumeResponse()
diff --git a/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/messaging/MultiTransferTest.java b/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/messaging/MultiTransferTest.java
index ed9e58d..3b421c7 100644
--- a/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/messaging/MultiTransferTest.java
+++ b/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/messaging/MultiTransferTest.java
@@ -49,6 +49,7 @@
import org.apache.qpid.server.protocol.v1_0.type.transport.Flow;
import org.apache.qpid.server.protocol.v1_0.type.transport.Open;
import org.apache.qpid.server.protocol.v1_0.type.transport.Role;
+import org.apache.qpid.tests.protocol.ChannelClosedResponse;
import org.apache.qpid.tests.protocol.v1_0.FrameTransport;
import org.apache.qpid.tests.protocol.v1_0.Interaction;
import org.apache.qpid.tests.protocol.SpecificationTest;
@@ -229,9 +230,11 @@
{
payload.dispose();
}
- Response<?> latestResponse = interaction.consumeResponse(new Class<?>[] {null}).getLatestResponse();
- assertThat(latestResponse, is(nullValue()));
+ interaction.consumeResponse(null, Flow.class);
}
+ String secondMessage = getTestName() + "_2";
+ Utils.putMessageOnQueue(getBrokerAdmin(), BrokerAdmin.TEST_QUEUE_NAME, secondMessage);
+ assertThat(Utils.receiveMessage(_brokerAddress, BrokerAdmin.TEST_QUEUE_NAME), is(equalTo(secondMessage)));
}
@Test
@@ -387,7 +390,7 @@
payload.dispose();
}
- interaction.consumeResponse(Detach.class, End.class, Close.class);
+ interaction.consumeResponse(Detach.class, End.class, Close.class, ChannelClosedResponse.class);
}
}
}
diff --git a/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/messaging/TransferTest.java b/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/messaging/TransferTest.java
index 7950741..42f5dfc 100644
--- a/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/messaging/TransferTest.java
+++ b/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/messaging/TransferTest.java
@@ -38,6 +38,7 @@
import java.net.InetSocketAddress;
import java.nio.charset.StandardCharsets;
+import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.TreeSet;
@@ -62,7 +63,6 @@
import org.apache.qpid.server.protocol.v1_0.type.messaging.Header;
import org.apache.qpid.server.protocol.v1_0.type.messaging.Received;
import org.apache.qpid.server.protocol.v1_0.type.messaging.Rejected;
-import org.apache.qpid.server.protocol.v1_0.type.transaction.Discharge;
import org.apache.qpid.server.protocol.v1_0.type.transport.AmqpError;
import org.apache.qpid.server.protocol.v1_0.type.transport.Attach;
import org.apache.qpid.server.protocol.v1_0.type.transport.Begin;
@@ -78,6 +78,7 @@
import org.apache.qpid.server.protocol.v1_0.type.transport.Role;
import org.apache.qpid.server.protocol.v1_0.type.transport.SenderSettleMode;
import org.apache.qpid.server.protocol.v1_0.type.transport.Transfer;
+import org.apache.qpid.tests.protocol.ChannelClosedResponse;
import org.apache.qpid.tests.protocol.Response;
import org.apache.qpid.tests.protocol.SpecificationTest;
import org.apache.qpid.tests.protocol.v1_0.FrameTransport;
@@ -121,13 +122,13 @@
@Test
@SpecificationTest(section = "1.3.4",
- description = "Transfer without mandatory fields should result in a decoding error.")
+ description = "mandatory [...] a non null value for the field is always encoded.")
public void emptyTransfer() throws Exception
{
try (FrameTransport transport = new FrameTransport(_brokerAddress).connect())
{
- Close responseClose = transport.newInteraction()
- .negotiateProtocol().consumeResponse()
+ Interaction interact = transport.newInteraction();
+ Response<?> response = interact.negotiateProtocol().consumeResponse()
.open().consumeResponse(Open.class)
.begin().consumeResponse(Begin.class)
.attachRole(Role.SENDER)
@@ -136,9 +137,27 @@
.transferHandle(null)
.transfer()
.consumeResponse()
- .getLatestResponse(Close.class);
- assertThat(responseClose.getError(), is(notNullValue()));
- assertThat(responseClose.getError().getCondition(), equalTo(AmqpError.DECODE_ERROR));
+ .getLatestResponse();
+
+ assertThat(response.getBody(), is(notNullValue()));
+
+ if (response.getBody() instanceof Close)
+ {
+ final Close responseClose = (Close)response.getBody();
+ assertThat(responseClose.getError(), is(notNullValue()));
+ assertThat(responseClose.getError().getCondition(), equalTo(AmqpError.DECODE_ERROR));
+
+ interact.close().sync();
+ }
+ else if (response.getBody() instanceof End)
+ {
+ final End responseEnd = (End)response.getBody();
+ assertThat(responseEnd.getError(), is(notNullValue()));
+ assertThat(responseEnd.getError().getCondition(), equalTo(AmqpError.DECODE_ERROR));
+
+ interact.end().doCloseConnection();
+ }
+ transport.assertNoMoreResponses();
}
}
@@ -158,10 +177,11 @@
.attachTargetAddress(BrokerAdmin.TEST_QUEUE_NAME)
.attach().consumeResponse(Attach.class)
.consumeResponse(Flow.class)
+ .transferDeliveryId()
.transferDeliveryTag(null)
.transferPayloadData(getTestName())
.transfer();
- interaction.consumeResponse(Detach.class, End.class, Close.class);
+ interaction.consumeResponse(Detach.class, End.class, Close.class, ChannelClosedResponse.class);
}
}
@@ -182,6 +202,7 @@
.attachHandle(linkHandle)
.attach().consumeResponse(Attach.class)
.consumeResponse(Flow.class)
+ .transferDeliveryId()
.transferHandle(linkHandle)
.transferPayloadData(getTestName())
.transfer()
@@ -283,12 +304,18 @@
@Test
@SpecificationTest(section = "2.7.5",
- description = "If the negotiated link value is first, then it is illegal to set this field to second.")
+ description = "rcv-settle-mode "
+ + "If first, this indicates that the receiver MUST settle the delivery once it has arrived"
+ + " without waiting for the sender to settle first."
+ + " If second, this indicates that the receiver MUST NOT settle until sending its disposition"
+ + " to the sender and receiving a settled disposition from the sender."
+ + " If not set, this value is defaulted to the value negotiated on link attach."
+ + " If the negotiated link value is first, then it is illegal to set this field to second.")
public void transferReceiverSettleModeCannotBeSecondWhenLinkModeIsFirst() throws Exception
{
try (FrameTransport transport = new FrameTransport(_brokerAddress).connect())
{
- Detach detach = transport.newInteraction()
+ Response<?> response = transport.newInteraction()
.negotiateProtocol().consumeResponse()
.open().consumeResponse(Open.class)
.begin().consumeResponse(Begin.class)
@@ -301,10 +328,24 @@
.transferRcvSettleMode(ReceiverSettleMode.SECOND)
.transfer()
.consumeResponse()
- .getLatestResponse(Detach.class);
- Error error = detach.getError();
- assertThat(error, is(notNullValue()));
- assertThat(error.getCondition(), is(equalTo(AmqpError.INVALID_FIELD)));
+ .getLatestResponse();
+
+ if (response.getBody() instanceof Detach)
+ {
+ final Detach detach = (Detach) response.getBody();
+ Error error = detach.getError();
+ assertThat(error, is(notNullValue()));
+ assertThat(error.getCondition(), is(equalTo(AmqpError.INVALID_FIELD)));
+ }
+ else
+ {
+ if (response.getBody() instanceof Disposition)
+ {
+ // clean up
+ Utils.receiveMessage(_brokerAddress, BrokerAdmin.TEST_QUEUE_NAME);
+ }
+ fail("it is illegal to set transfer 'rcv-settle-mode' to 'second' when link 'rcv-settle-mode' is set to 'first'");
+ }
}
}
@@ -366,6 +407,7 @@
Rejected.REJECTED_SYMBOL)
.attach().consumeResponse(Attach.class)
.consumeResponse(Flow.class)
+ .transferDeliveryId()
.transferPayload(messageEncoder.getPayload())
.transferRcvSettleMode(ReceiverSettleMode.FIRST)
.transfer()
@@ -411,6 +453,7 @@
.attachSourceOutcomes(Accepted.ACCEPTED_SYMBOL)
.attach().consumeResponse(Attach.class)
.consumeResponse(Flow.class)
+ .transferDeliveryId()
.transferPayload(messageEncoder.getPayload())
.transferRcvSettleMode(ReceiverSettleMode.FIRST)
.transfer()
@@ -453,7 +496,7 @@
.attachSourceAddress(BrokerAdmin.TEST_QUEUE_NAME)
.attach().consumeResponse()
.flowIncomingWindow(UnsignedInteger.ONE)
- .flowNextIncomingId(UnsignedInteger.ZERO)
+ .flowNextIncomingIdFromPeerLatestSessionBeginAndDeliveryCount()
.flowOutgoingWindow(UnsignedInteger.ZERO)
.flowNextOutgoingId(UnsignedInteger.ZERO)
.flowLinkCredit(UnsignedInteger.ONE)
@@ -493,7 +536,7 @@
.attachRcvSettleMode(ReceiverSettleMode.FIRST)
.attach().consumeResponse()
.flowIncomingWindow(UnsignedInteger.ONE)
- .flowNextIncomingId(UnsignedInteger.ZERO)
+ .flowNextIncomingIdFromPeerLatestSessionBeginAndDeliveryCount()
.flowOutgoingWindow(UnsignedInteger.ZERO)
.flowNextOutgoingId(UnsignedInteger.ZERO)
.flowLinkCredit(UnsignedInteger.ONE)
@@ -519,8 +562,6 @@
@SpecificationTest(section = "2.6.12", description = "Transferring A Message.")
public void receiveTransferReceiverSettleSecond() throws Exception
{
- Utils.putMessageOnQueue(getBrokerAdmin(), BrokerAdmin.TEST_QUEUE_NAME, getTestName());
-
try (FrameTransport transport = new FrameTransport(_brokerAddress).connect())
{
final Interaction interaction = transport.newInteraction()
@@ -532,19 +573,22 @@
.attachRcvSettleMode(ReceiverSettleMode.SECOND)
.attach().consumeResponse()
.flowIncomingWindow(UnsignedInteger.ONE)
- .flowNextIncomingId(UnsignedInteger.ZERO)
+ .flowNextIncomingIdFromPeerLatestSessionBeginAndDeliveryCount()
.flowOutgoingWindow(UnsignedInteger.ZERO)
.flowNextOutgoingId(UnsignedInteger.ZERO)
.flowLinkCredit(UnsignedInteger.ONE)
.flowHandleFromLinkHandle()
- .flow()
- .receiveDelivery()
- .decodeLatestDelivery();
+ .flow();
- Object data = interaction.getDecodedLatestDelivery();
+ Utils.putMessageOnQueue(getBrokerAdmin(), BrokerAdmin.TEST_QUEUE_NAME, getTestName());
+
+ Object data = interaction.receiveDelivery()
+ .decodeLatestDelivery()
+ .getDecodedLatestDelivery();
assertThat(data, is(equalTo(getTestName())));
Disposition disposition = interaction.dispositionSettled(false)
+ .dispositionFirstFromLatestDelivery()
.dispositionRole(Role.RECEIVER)
.dispositionState(new Accepted())
.disposition()
@@ -552,8 +596,11 @@
.getLatestResponse(Disposition.class);
assertThat(disposition.getSettled(), is(true));
- interaction.consumeResponse(null, Flow.class);
-
+ interaction.dispositionSettled(true)
+ .dispositionFirstFromLatestDelivery()
+ .dispositionRole(Role.RECEIVER)
+ .dispositionState(new Accepted())
+ .disposition();
}
}
@@ -561,8 +608,6 @@
@SpecificationTest(section = "2.6.12", description = "Transferring A Message.")
public void receiveTransferReceiverSettleSecondWithRejectedOutcome() throws Exception
{
- Utils.putMessageOnQueue(getBrokerAdmin(), BrokerAdmin.TEST_QUEUE_NAME, getTestName());
-
try (FrameTransport transport = new FrameTransport(_brokerAddress).connect())
{
final Interaction interaction = transport.newInteraction()
@@ -575,17 +620,20 @@
.attachRcvSettleMode(ReceiverSettleMode.SECOND)
.attach().consumeResponse()
.flowIncomingWindow(UnsignedInteger.ONE)
- .flowNextIncomingId(UnsignedInteger.ZERO)
+ .flowNextIncomingIdFromPeerLatestSessionBeginAndDeliveryCount()
.flowOutgoingWindow(UnsignedInteger.ZERO)
.flowNextOutgoingId(UnsignedInteger.ZERO)
.flowLinkCredit(UnsignedInteger.ONE)
.flowHandleFromLinkHandle()
.flow();
+ Utils.putMessageOnQueue(getBrokerAdmin(), BrokerAdmin.TEST_QUEUE_NAME, getTestName());
+
Object data = interaction.receiveDelivery().decodeLatestDelivery().getDecodedLatestDelivery();
assertThat(data, is(equalTo(getTestName())));
interaction.dispositionSettled(false)
+ .dispositionFirstFromLatestDelivery()
.dispositionRole(Role.RECEIVER)
.dispositionState(new Rejected())
.disposition()
@@ -599,9 +647,11 @@
Disposition disposition = interaction.getLatestResponse(Disposition.class);
assertThat(disposition.getSettled(), is(true));
- interaction.consumeResponse(null, Flow.class);
-
-
+ interaction.dispositionSettled(true)
+ .dispositionFirstFromLatestDelivery()
+ .dispositionRole(Role.RECEIVER)
+ .dispositionState(new Rejected())
+ .disposition();
}
assertThat(Utils.receiveMessage(_brokerAddress, BrokerAdmin.TEST_QUEUE_NAME), is(equalTo(getTestName())));
@@ -627,7 +677,7 @@
.attachSourceDefaultOutcome(null)
.attach().consumeResponse()
.flowIncomingWindow(UnsignedInteger.ONE)
- .flowNextIncomingId(UnsignedInteger.ZERO)
+ .flowNextIncomingIdFromPeerLatestSessionBeginAndDeliveryCount()
.flowOutgoingWindow(UnsignedInteger.ZERO)
.flowNextOutgoingId(UnsignedInteger.ZERO)
.flowLinkCredit(UnsignedInteger.ONE)
@@ -640,6 +690,7 @@
assertThat(data, is(equalTo(getTestName())));
Disposition disposition = interaction.dispositionSettled(false)
+ .dispositionFirstFromLatestDelivery()
.dispositionRole(Role.RECEIVER)
.dispositionState(null)
.disposition()
@@ -658,7 +709,6 @@
+ " non-terminal delivery states to the sender")
public void receiveTransferReceiverIndicatesNonTerminalDeliveryState() throws Exception
{
- String testMessageData;
try (FrameTransport transport = new FrameTransport(_brokerAddress).connect())
{
final Interaction interaction = transport.newInteraction();
@@ -667,19 +717,13 @@
.openMaxFrameSize(UnsignedInteger.valueOf(4096))
.open().consumeResponse()
.getLatestResponse(Open.class);
-
- int negotiatedFrameSize = open.getMaxFrameSize().intValue();
- testMessageData = Stream.generate(() -> "*").limit(negotiatedFrameSize).collect(Collectors.joining());
-
- Utils.putMessageOnQueue(getBrokerAdmin(), BrokerAdmin.TEST_QUEUE_NAME, testMessageData);
-
interaction.begin().consumeResponse()
.attachRole(Role.RECEIVER)
.attachSourceAddress(BrokerAdmin.TEST_QUEUE_NAME)
.attachRcvSettleMode(ReceiverSettleMode.SECOND)
.attach().consumeResponse()
.flowIncomingWindow(UnsignedInteger.ONE)
- .flowNextIncomingId(UnsignedInteger.ZERO)
+ .flowNextIncomingIdFromPeerLatestSessionBeginAndDeliveryCount()
.flowOutgoingWindow(UnsignedInteger.ZERO)
.flowNextOutgoingId(UnsignedInteger.ZERO)
.flowLinkCredit(UnsignedInteger.ONE)
@@ -687,6 +731,11 @@
.flow()
.sync();
+ final int negotiatedFrameSize = open.getMaxFrameSize().intValue();
+ final String testMessageData = Stream.generate(() -> "*").limit(negotiatedFrameSize).collect(Collectors.joining());
+
+ Utils.putMessageOnQueue(getBrokerAdmin(), BrokerAdmin.TEST_QUEUE_NAME, testMessageData);
+
MessageDecoder messageDecoder = new MessageDecoder();
Transfer first = interaction.consumeResponse(Transfer.class)
@@ -745,14 +794,14 @@
assumeThat(attach.getSndSettleMode(), is(equalTo(SenderSettleMode.SETTLED)));
interaction.flowIncomingWindow(UnsignedInteger.ONE)
- .flowNextIncomingId(UnsignedInteger.ZERO)
- .flowOutgoingWindow(UnsignedInteger.ZERO)
- .flowNextOutgoingId(UnsignedInteger.ZERO)
- .flowLinkCredit(UnsignedInteger.ONE)
- .flowHandleFromLinkHandle()
- .flow();
+ .flowNextIncomingIdFromPeerLatestSessionBeginAndDeliveryCount()
+ .flowOutgoingWindow(UnsignedInteger.ZERO)
+ .flowNextOutgoingId(UnsignedInteger.ZERO)
+ .flowLinkCredit(UnsignedInteger.ONE)
+ .flowHandleFromLinkHandle()
+ .flow();
- List<Transfer> transfers = interaction.receiveDelivery().getLatestDelivery();
+ final List<Transfer> transfers = interaction.receiveDelivery().getLatestDelivery();
final AtomicBoolean isSettled = new AtomicBoolean();
transfers.forEach(transfer -> { if (Boolean.TRUE.equals(transfer.getSettled())) { isSettled.set(true);}});
@@ -762,10 +811,6 @@
interaction.doCloseConnection();
}
- if (getBrokerAdmin().isQueueDepthSupported())
- {
- assertThat(getBrokerAdmin().getQueueDepthMessages(BrokerAdmin.TEST_QUEUE_NAME), is(equalTo(0)));
- }
Utils.putMessageOnQueue(getBrokerAdmin(), BrokerAdmin.TEST_QUEUE_NAME, "test");
assertThat(Utils.receiveMessage(_brokerAddress, BrokerAdmin.TEST_QUEUE_NAME), is(equalTo("test")));
}
@@ -793,17 +838,13 @@
.attachTargetAddress(BrokerAdmin.TEST_QUEUE_NAME)
.attach()
.consumeResponse(Attach.class)
- .consumeResponse(Flow.class);
-
- Flow flow = interaction.getLatestResponse(Flow.class);
- assertThat(flow.getLinkCredit().intValue(), is(greaterThan(1)));
-
- interaction.transferDeliveryId(UnsignedInteger.ZERO)
+ .consumeResponse(Flow.class)
+ .transferDeliveryId()
.transferDeliveryTag(deliveryTag)
.transferPayloadData(content1)
.transfer()
.transferDeliveryTag(deliveryTag)
- .transferDeliveryId(UnsignedInteger.ONE)
+ .transferDeliveryId()
.transferPayloadData(getTestName() + "_2")
.transfer()
.sync();
@@ -1019,28 +1060,21 @@
interaction.txnAttachCoordinatorLink(txnState)
.txnDeclare(txnState);
- interaction.transferDeliveryId(UnsignedInteger.ONE)
+ interaction.transferDeliveryId()
.transferDeliveryTag(new Binary("A".getBytes(StandardCharsets.UTF_8)))
.transferPayloadData(contents[0])
.transfer()
- .transferDeliveryId(UnsignedInteger.valueOf(2))
+ .transferDeliveryId()
.transferDeliveryTag(new Binary("B".getBytes(StandardCharsets.UTF_8)))
.transferPayloadData(contents[1])
.transfer()
- .transferDeliveryId(UnsignedInteger.valueOf(3))
+ .transferDeliveryId()
.transferDeliveryTag(new Binary("C".getBytes(StandardCharsets.UTF_8)))
.transferTransactionalState(txnState.getCurrentTransactionId())
.transferPayloadData(contents[2])
.transfer();
- final Discharge discharge = new Discharge();
- discharge.setTxnId(txnState.getCurrentTransactionId());
- discharge.setFail(false);
-
- interaction.transferHandle(txnState.getHandle())
- .transferDeliveryId(UnsignedInteger.valueOf(4))
- .transferDeliveryTag(new Binary(("transaction-" + 4).getBytes(StandardCharsets.UTF_8)))
- .transferPayloadData(discharge).transfer();
+ interaction.txnSendDischarge(txnState, false);
assertDeliveries(interaction, Sets.newTreeSet(Arrays.asList(UnsignedInteger.ONE,
UnsignedInteger.valueOf(2),
@@ -1069,39 +1103,32 @@
.attachRcvSettleMode(ReceiverSettleMode.FIRST)
.attach().consumeResponse()
.flowIncomingWindow(UnsignedInteger.valueOf(numberOfMessages))
- .flowNextIncomingId(UnsignedInteger.ZERO)
+ .flowNextIncomingIdFromPeerLatestSessionBeginAndDeliveryCount()
.flowOutgoingWindow(UnsignedInteger.ZERO)
.flowNextOutgoingId(UnsignedInteger.ZERO)
.flowLinkCredit(UnsignedInteger.valueOf(numberOfMessages))
.flowHandleFromLinkHandle()
.flow();
- for (int i = 0; i < contents.length; i++)
+ UnsignedInteger firstDeliveryId = null;
+ for (final String content : contents)
{
interaction.receiveDelivery(Flow.class).decodeLatestDelivery();
Object data = interaction.getDecodedLatestDelivery();
- assertThat(data, is(equalTo(contents[i])));
- assertThat(interaction.getLatestDeliveryId(), is(equalTo(UnsignedInteger.valueOf(i))));
+ assertThat(data, is(equalTo(content)));
+ if (firstDeliveryId == null)
+ {
+ firstDeliveryId = interaction.getLatestDeliveryId();
+ }
}
interaction.dispositionSettled(true)
.dispositionRole(Role.RECEIVER)
- .dispositionFirst(UnsignedInteger.ZERO)
+ .dispositionFirst(firstDeliveryId)
.dispositionLast(interaction.getLatestDeliveryId())
.dispositionState(new Accepted())
.disposition();
-
- // make sure sure the disposition is handled by making drain request
- interaction.flowLinkCredit(UnsignedInteger.ONE)
- .flowNextIncomingId(UnsignedInteger.valueOf(numberOfMessages))
- .flowDrain(Boolean.TRUE)
- .flow()
- .consumeResponse(Flow.class);
-
- if (getBrokerAdmin().isQueueDepthSupported())
- {
- assertThat(getBrokerAdmin().getQueueDepthMessages(BrokerAdmin.TEST_QUEUE_NAME), is(equalTo(0)));
- }
+ interaction.doCloseConnection();
}
final String messageText = getTestName() + "_" + 4;
@@ -1116,7 +1143,6 @@
{
final int numberOfMessages = 4;
final String[] contents = Utils.createTestMessageContents(numberOfMessages, getTestName());
- Utils.putMessageOnQueue(getBrokerAdmin(), BrokerAdmin.TEST_QUEUE_NAME, contents);
try (FrameTransport transport = new FrameTransport(_brokerAddress).connect())
{
@@ -1130,19 +1156,22 @@
.attachHandle(UnsignedInteger.ZERO)
.attach().consumeResponse()
.flowIncomingWindow(UnsignedInteger.valueOf(numberOfMessages))
- .flowNextIncomingId(UnsignedInteger.ZERO)
+ .flowNextIncomingIdFromPeerLatestSessionBeginAndDeliveryCount()
.flowOutgoingWindow(UnsignedInteger.ZERO)
.flowNextOutgoingId(UnsignedInteger.ZERO)
.flowLinkCredit(UnsignedInteger.valueOf(numberOfMessages))
.flowHandleFromLinkHandle()
.flow();
- for (int i = 0; i < contents.length; i++)
+ Utils.putMessageOnQueue(getBrokerAdmin(), BrokerAdmin.TEST_QUEUE_NAME, contents);
+
+ final List<UnsignedInteger> deliveryIds = new ArrayList<>();
+ for (final String content : contents)
{
interaction.receiveDelivery(Flow.class).decodeLatestDelivery();
Object data = interaction.getDecodedLatestDelivery();
- assertThat(data, is(equalTo(contents[i])));
- assertThat(interaction.getLatestDeliveryId(), is(equalTo(UnsignedInteger.valueOf(i))));
+ assertThat(data, is(equalTo(content)));
+ deliveryIds.add(interaction.getLatestDeliveryId());
}
final InteractionTransactionalState txnState = interaction.createTransactionalState(UnsignedInteger.ONE);
@@ -1151,31 +1180,18 @@
interaction.dispositionSettled(true)
.dispositionRole(Role.RECEIVER)
- .dispositionFirst(UnsignedInteger.ZERO)
- .dispositionLast(UnsignedInteger.ONE)
+ .dispositionFirst(deliveryIds.get(0))
+ .dispositionLast(deliveryIds.get(1))
.dispositionState(new Accepted())
.disposition()
.dispositionSettled(true)
.dispositionRole(Role.RECEIVER)
- .dispositionFirst(UnsignedInteger.valueOf(2))
- .dispositionLast(UnsignedInteger.valueOf(3))
+ .dispositionFirst(deliveryIds.get(2))
+ .dispositionLast(deliveryIds.get(3))
.dispositionTransactionalState(txnState.getCurrentTransactionId(), new Accepted())
.disposition();
-
- final Discharge discharge = new Discharge();
- discharge.setTxnId(txnState.getCurrentTransactionId());
- discharge.setFail(false);
-
- interaction.transferHandle(txnState.getHandle())
- .transferDeliveryId(UnsignedInteger.valueOf(4))
- .transferDeliveryTag(new Binary(("transaction-" + 4).getBytes(StandardCharsets.UTF_8)))
- .transferPayloadData(discharge)
- .transfer();
-
-
- final Flow coordinatorFlow = interaction.consume(Flow.class, Disposition.class);
- assertThat(coordinatorFlow.getHandle(), is(equalTo(txnState.getHandle())));
+ interaction.txnDischarge(txnState, false);
}
String messageText = getTestName() + "_" + 4;
@@ -1203,6 +1219,10 @@
expectedDeliveryIds.remove(deliveryId);
});
}
+ else if (response.getBody() instanceof Flow)
+ {
+ // ignore flows
+ }
}
while (!expectedDeliveryIds.isEmpty());
}
diff --git a/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/transaction/DischargeTest.java b/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/transaction/DischargeTest.java
index 32a96b0..85040c8 100644
--- a/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/transaction/DischargeTest.java
+++ b/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/transaction/DischargeTest.java
@@ -21,6 +21,7 @@
package org.apache.qpid.tests.protocol.v1_0.transaction;
import static java.nio.charset.StandardCharsets.UTF_8;
+import static org.apache.qpid.tests.utils.BrokerAdmin.KIND_BROKER_J;
import static org.hamcrest.CoreMatchers.instanceOf;
import static org.hamcrest.CoreMatchers.not;
import static org.hamcrest.CoreMatchers.notNullValue;
@@ -28,7 +29,6 @@
import static org.hamcrest.Matchers.empty;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.is;
-import static org.junit.Assume.assumeThat;
import java.net.InetSocketAddress;
import java.util.List;
@@ -40,10 +40,7 @@
import org.apache.qpid.server.protocol.v1_0.type.UnsignedInteger;
import org.apache.qpid.server.protocol.v1_0.type.messaging.Accepted;
import org.apache.qpid.server.protocol.v1_0.type.messaging.Rejected;
-import org.apache.qpid.server.protocol.v1_0.type.transaction.Coordinator;
-import org.apache.qpid.server.protocol.v1_0.type.transaction.Declare;
import org.apache.qpid.server.protocol.v1_0.type.transaction.Declared;
-import org.apache.qpid.server.protocol.v1_0.type.transaction.Discharge;
import org.apache.qpid.server.protocol.v1_0.type.transaction.TransactionError;
import org.apache.qpid.server.protocol.v1_0.type.transport.Attach;
import org.apache.qpid.server.protocol.v1_0.type.transport.Begin;
@@ -55,10 +52,10 @@
import org.apache.qpid.server.protocol.v1_0.type.transport.ReceiverSettleMode;
import org.apache.qpid.server.protocol.v1_0.type.transport.Role;
import org.apache.qpid.server.protocol.v1_0.type.transport.Transfer;
+import org.apache.qpid.tests.protocol.SpecificationTest;
import org.apache.qpid.tests.protocol.v1_0.FrameTransport;
import org.apache.qpid.tests.protocol.v1_0.Interaction;
import org.apache.qpid.tests.protocol.v1_0.InteractionTransactionalState;
-import org.apache.qpid.tests.protocol.SpecificationTest;
import org.apache.qpid.tests.protocol.v1_0.Utils;
import org.apache.qpid.tests.utils.BrokerAdmin;
import org.apache.qpid.tests.utils.BrokerAdminUsingTestBase;
@@ -76,86 +73,76 @@
@Test
@SpecificationTest(section = "4.3",
- description = "If the coordinator is unable to complete the discharge, the coordinator MUST convey the error to the controller "
- + "as a transaction-error. If the source for the link to the coordinator supports the rejected outcome, then the "
- + "message MUST be rejected with this outcome carrying the transaction-error.")
+ description = "If the coordinator is unable to complete the discharge,"
+ + " the coordinator MUST convey the error to the controller as a transaction-error."
+ + " If the source for the link to the coordinator supports the rejected outcome, then the "
+ + " message MUST be rejected with this outcome carrying the transaction-error.")
public void dischargeUnknownTransactionIdWhenSourceSupportsRejectedOutcome() throws Exception
{
try (FrameTransport transport = new FrameTransport(_brokerAddress).connect())
{
+ final InteractionTransactionalState txnState = new InteractionTransactionalState(UnsignedInteger.ZERO);
final Interaction interaction = transport.newInteraction();
- final Disposition disposition = interaction.negotiateProtocol().consumeResponse()
- .open().consumeResponse(Open.class)
- .begin().consumeResponse(Begin.class)
- .attachRole(Role.SENDER)
- .attachSourceOutcomes(Rejected.REJECTED_SYMBOL)
- .attachTarget(new Coordinator())
- .attach().consumeResponse(Attach.class)
- .consumeResponse(Flow.class)
- .transferPayloadData(new Declare())
- .transfer().consumeResponse()
- .getLatestResponse(Disposition.class);
+ interaction.negotiateProtocol().consumeResponse()
+ .open().consumeResponse(Open.class)
+ .begin().consumeResponse(Begin.class)
- assertThat(disposition.getSettled(), is(equalTo(true)));
- assertThat(disposition.getState(), is(instanceOf(Declared.class)));
- assertThat(((Declared) disposition.getState()).getTxnId(), is(notNullValue()));
+ .txnAttachCoordinatorLink(txnState, Rejected.REJECTED_SYMBOL)
+ .txnDeclare(txnState);
- interaction.consumeResponse(Flow.class);
+ assertThat(txnState.getDeliveryState(), is(instanceOf(Declared.class)));
+ assertThat(txnState.getCurrentTransactionId(), is(notNullValue()));
- final Discharge discharge = new Discharge();
- discharge.setTxnId(new Binary("nonExistingTransaction".getBytes(UTF_8)));
- final Disposition dischargeDisposition = interaction.transferDeliveryId(UnsignedInteger.ONE)
- .transferDeliveryTag(new Binary("discharge".getBytes(UTF_8)))
- .transferPayloadData(discharge)
- .transfer().consumeResponse()
- .getLatestResponse(Disposition.class);
- assertThat(dischargeDisposition.getState(), is(instanceOf(Rejected.class)));
- final Error error = ((Rejected) dischargeDisposition.getState()).getError();
+ txnState.setLastTransactionId(new Binary("nonExistingTransaction".getBytes(UTF_8)));
+ interaction.txnDischarge(txnState, false);
+ interaction.doCloseConnection();
+
+ assertThat(txnState.getDeliveryState(), is(instanceOf(Rejected.class)));
+ final Error error = ((Rejected) txnState.getDeliveryState()).getError();
assertThat(error, is(notNullValue()));
- assertThat(error.getCondition(), is(equalTo(TransactionError.UNKNOWN_ID)));
+
+ if (KIND_BROKER_J.equals(getBrokerAdmin().getKind()))
+ {
+ assertThat(error.getCondition(), is(equalTo(TransactionError.UNKNOWN_ID)));
+ }
}
}
@Test
@SpecificationTest(section = "4.3",
- description = "If the coordinator is unable to complete the discharge, the coordinator MUST convey the error to the controller "
- + "as a transaction-error. [...] If the source does not support "
- + "the rejected outcome, the transactional resource MUST detach the link to the coordinator, with the detach "
- + "performative carrying the transaction-error.")
+ description = "If the coordinator is unable to complete the discharge,"
+ + " the coordinator MUST convey the error to the controller as a transaction-error."
+ + " [...] If the source does not support the rejected outcome, the transactional resource"
+ + " MUST detach the link to the coordinator, with the detach performative carrying"
+ + " the transaction-error.")
public void dischargeUnknownTransactionIdWhenSourceDoesNotSupportRejectedOutcome() throws Exception
{
try (FrameTransport transport = new FrameTransport(_brokerAddress).connect())
{
+ final InteractionTransactionalState txnState = new InteractionTransactionalState(UnsignedInteger.ZERO);
final Interaction interaction = transport.newInteraction();
- final Disposition disposition = interaction.negotiateProtocol().consumeResponse()
- .open().consumeResponse(Open.class)
- .begin().consumeResponse(Begin.class)
- .attachRole(Role.SENDER)
- .attachSourceOutcomes(Accepted.ACCEPTED_SYMBOL)
- .attachTarget(new Coordinator())
- .attach().consumeResponse(Attach.class)
- .consumeResponse(Flow.class)
- .transferPayloadData(new Declare())
- .transfer().consumeResponse()
- .getLatestResponse(Disposition.class);
+ interaction.negotiateProtocol().consumeResponse()
+ .open().consumeResponse(Open.class)
+ .begin().consumeResponse(Begin.class)
+ .txnAttachCoordinatorLink(txnState, Accepted.ACCEPTED_SYMBOL)
+ .txnDeclare(txnState);
+ assertThat(txnState.getDeliveryState(), is(instanceOf(Declared.class)));
+ assertThat(txnState.getCurrentTransactionId(), is(notNullValue()));
- assertThat(disposition.getSettled(), is(equalTo(true)));
- assertThat(disposition.getState(), is(instanceOf(Declared.class)));
- assertThat(((Declared) disposition.getState()).getTxnId(), is(notNullValue()));
+ txnState.setLastTransactionId(new Binary("nonExistingTransaction".getBytes(UTF_8)));
+ interaction.txnSendDischarge(txnState, false);
- interaction.consumeResponse(Flow.class);
+ final Detach detachResponse = interaction.consumeResponse(Detach.class).getLatestResponse(Detach.class);
+ interaction.doCloseConnection();
- final Discharge discharge = new Discharge();
- discharge.setTxnId(new Binary("nonExistingTransaction".getBytes(UTF_8)));
- final Detach detachResponse = interaction.transferDeliveryId(UnsignedInteger.ONE)
- .transferDeliveryTag(new Binary("discharge".getBytes(UTF_8)))
- .transferPayloadData(discharge)
- .transfer().consumeResponse(Detach.class)
- .getLatestResponse(Detach.class);
- Error error = detachResponse.getError();
+ final Error error = detachResponse.getError();
assertThat(error, is(notNullValue()));
- assertThat(error.getCondition(), is(equalTo(TransactionError.UNKNOWN_ID)));
+
+ if (KIND_BROKER_J.equals(getBrokerAdmin().getKind()))
+ {
+ assertThat(error.getCondition(), is(equalTo(TransactionError.UNKNOWN_ID)));
+ }
}
}
@@ -167,33 +154,34 @@
+ " desired transaction identifier and the outcome to be applied upon a successful discharge.")
public void dischargeSettledAfterReceiverDetach() throws Exception
{
- assumeThat(getBrokerAdmin().isQueueDepthSupported(), is(true));
-
- Utils.putMessageOnQueue(getBrokerAdmin(), BrokerAdmin.TEST_QUEUE_NAME, "test message");
try (FrameTransport transport = new FrameTransport(_brokerAddress).connect())
{
final Interaction interaction = transport.newInteraction();
final InteractionTransactionalState txnState = interaction.createTransactionalState(UnsignedInteger.ZERO);
- List<Transfer> transfers = interaction.negotiateProtocol().consumeResponse()
- .open().consumeResponse(Open.class)
- .begin().consumeResponse(Begin.class)
+ interaction.negotiateProtocol().consumeResponse()
+ .open().consumeResponse(Open.class)
+ .begin().consumeResponse(Begin.class)
- .txnAttachCoordinatorLink(txnState)
- .txnDeclare(txnState)
+ .txnAttachCoordinatorLink(txnState)
+ .txnDeclare(txnState)
- .attachRole(Role.RECEIVER)
- .attachHandle(UnsignedInteger.ONE)
- .attachSourceAddress(BrokerAdmin.TEST_QUEUE_NAME)
- .attachRcvSettleMode(ReceiverSettleMode.FIRST)
- .attach().consumeResponse(Attach.class)
+ .attachRole(Role.RECEIVER)
+ .attachHandle(UnsignedInteger.ONE)
+ .attachSourceAddress(BrokerAdmin.TEST_QUEUE_NAME)
+ .attachRcvSettleMode(ReceiverSettleMode.FIRST)
+ .attach().consumeResponse(Attach.class)
- .flowIncomingWindow(UnsignedInteger.ONE)
- .flowLinkCredit(UnsignedInteger.ONE)
- .flowHandleFromLinkHandle()
- .flow()
+ .flowNextIncomingIdFromPeerLatestSessionBeginAndDeliveryCount()
+ .flowNextOutgoingId(UnsignedInteger.ZERO)
+ .flowOutgoingWindow(UnsignedInteger.ZERO)
+ .flowIncomingWindow(UnsignedInteger.ONE)
+ .flowLinkCredit(UnsignedInteger.ONE)
+ .flowHandleFromLinkHandle()
+ .flow();
- .receiveDelivery()
- .getLatestDelivery();
+ Utils.putMessageOnQueue(getBrokerAdmin(), BrokerAdmin.TEST_QUEUE_NAME, getTestName());
+
+ List<Transfer> transfers = interaction.receiveDelivery().getLatestDelivery();
assertThat(transfers, is(notNullValue()));
assertThat(transfers, is(not(empty())));
final UnsignedInteger deliveryId = transfers.get(0).getDeliveryId();
@@ -204,8 +192,15 @@
.disposition()
.txnDischarge(txnState, false);
- assertThat(getBrokerAdmin().getQueueDepthMessages(BrokerAdmin.TEST_QUEUE_NAME), is(equalTo(0)));
+ assertThat(txnState.getDeliveryState(), is(instanceOf(Accepted.class)));
+
+ interaction.doCloseConnection();
}
+
+ String secondMessage = getTestName() + "_2";
+ Utils.putMessageOnQueue(getBrokerAdmin(), BrokerAdmin.TEST_QUEUE_NAME, secondMessage);
+ Object receivedMessage = Utils.receiveMessage(_brokerAddress, BrokerAdmin.TEST_QUEUE_NAME);
+ assertThat(receivedMessage, is(equalTo(secondMessage)));
}
@Test
@@ -219,8 +214,6 @@
+ " was associated.")
public void dischargeSettledAfterSenderDetach() throws Exception
{
- assumeThat(getBrokerAdmin().isQueueDepthSupported(), is(true));
-
try (FrameTransport transport = new FrameTransport(_brokerAddress).connect())
{
final Interaction interaction = transport.newInteraction();
@@ -238,20 +231,21 @@
.attach().consumeResponse(Attach.class)
.consumeResponse(Flow.class)
+ .transferDeliveryId()
.transferTransactionalState(txnState.getCurrentTransactionId())
- .transferPayloadData("test message")
+ .transferPayloadData(getTestName())
.transferHandle(UnsignedInteger.ONE)
.transfer().consumeResponse(Disposition.class)
.detachHandle(UnsignedInteger.ONE)
.detach().consumeResponse(Detach.class);
-
- assertThat(getBrokerAdmin().getQueueDepthMessages(BrokerAdmin.TEST_QUEUE_NAME), is(equalTo(0)));
-
interaction.txnDischarge(txnState, false);
- assertThat(getBrokerAdmin().getQueueDepthMessages(BrokerAdmin.TEST_QUEUE_NAME), is(equalTo(1)));
+ assertThat(txnState.getDeliveryState(), is(instanceOf(Accepted.class)));
}
+
+ final Object receivedMessage = Utils.receiveMessage(_brokerAddress, BrokerAdmin.TEST_QUEUE_NAME);
+ assertThat(receivedMessage, is(equalTo(getTestName())));
}
@Test
@@ -262,8 +256,6 @@
+ " reflect the outcome that was applied.")
public void dischargeUnsettledAfterSenderClose() throws Exception
{
- assumeThat(getBrokerAdmin().isQueueDepthSupported(), is(true));
-
try (FrameTransport transport = new FrameTransport(_brokerAddress).connect())
{
final Interaction interaction = transport.newInteraction();
@@ -282,21 +274,22 @@
.attach().consumeResponse(Attach.class)
.consumeResponse(Flow.class)
+ .transferDeliveryId()
.transferTransactionalState(txnState.getCurrentTransactionId())
- .transferPayloadData("test message")
+ .transferPayloadData(getTestName())
.transferHandle(UnsignedInteger.ONE)
.transfer().consumeResponse(Disposition.class)
.detachHandle(UnsignedInteger.ONE)
.detachClose(true)
.detach().consumeResponse(Detach.class);
-
- assertThat(getBrokerAdmin().getQueueDepthMessages(BrokerAdmin.TEST_QUEUE_NAME), is(equalTo(0)));
-
interaction.txnDischarge(txnState, false);
- assertThat(getBrokerAdmin().getQueueDepthMessages(BrokerAdmin.TEST_QUEUE_NAME), is(equalTo(1)));
+ assertThat(txnState.getDeliveryState(), is(instanceOf(Accepted.class)));
}
+
+ final Object receivedMessage = Utils.receiveMessage(_brokerAddress, BrokerAdmin.TEST_QUEUE_NAME);
+ assertThat(receivedMessage, is(equalTo(getTestName())));
}
}
diff --git a/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/transaction/TransactionalTransferTest.java b/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/transaction/TransactionalTransferTest.java
index 853fdd9..f44790f 100644
--- a/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/transaction/TransactionalTransferTest.java
+++ b/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/transaction/TransactionalTransferTest.java
@@ -26,6 +26,7 @@
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.is;
import static org.junit.Assert.fail;
+import static org.junit.Assume.assumeThat;
import java.net.InetSocketAddress;
import java.util.Collections;
@@ -102,6 +103,7 @@
.attach().consumeResponse(Attach.class)
.consumeResponse(Flow.class)
+ .transferDeliveryId()
.transferHandle(linkHandle)
.transferPayloadData(getTestName())
.transferTransactionalState(txnState.getCurrentTransactionId())
@@ -116,9 +118,10 @@
interaction.txnDischarge(txnState, false);
- Object receivedMessage = Utils.receiveMessage(_brokerAddress, BrokerAdmin.TEST_QUEUE_NAME);
- assertThat(receivedMessage, is(equalTo(getTestName())));
+ assertThat(txnState.getDeliveryState(), is(instanceOf(Accepted.class)));
}
+ Object receivedMessage = Utils.receiveMessage(_brokerAddress, BrokerAdmin.TEST_QUEUE_NAME);
+ assertThat(receivedMessage, is(equalTo(getTestName())));
}
@Test
@@ -150,6 +153,7 @@
.attach().consumeResponse(Attach.class)
.consumeResponse(Flow.class)
+ .transferDeliveryId()
.transferHandle(linkHandle)
.transferPayloadData(getTestName())
.transferTransactionalState(txnState.getCurrentTransactionId())
@@ -164,16 +168,11 @@
interaction.txnDischarge(txnState, true);
- if (getBrokerAdmin().isQueueDepthSupported())
- {
- assertThat(getBrokerAdmin().getQueueDepthMessages(BrokerAdmin.TEST_QUEUE_NAME), is(equalTo(0)));
- }
- else
- {
- final String content = getTestName() + "_2";
- Utils.putMessageOnQueue(getBrokerAdmin(), BrokerAdmin.TEST_QUEUE_NAME, content);
- assertThat(Utils.receiveMessage(_brokerAddress, BrokerAdmin.TEST_QUEUE_NAME), is(equalTo(content)));
- }
+ assertThat(txnState.getDeliveryState(), is(instanceOf(Accepted.class)));
+
+ final String content = getTestName() + "_2";
+ Utils.putMessageOnQueue(getBrokerAdmin(), BrokerAdmin.TEST_QUEUE_NAME, content);
+ assertThat(Utils.receiveMessage(_brokerAddress, BrokerAdmin.TEST_QUEUE_NAME), is(equalTo(content)));
}
}
@@ -204,9 +203,11 @@
.attachTargetAddress(BrokerAdmin.TEST_QUEUE_NAME)
.attachRcvSettleMode(ReceiverSettleMode.SECOND)
.attachHandle(linkHandle)
- .attach().consumeResponse(Attach.class)
+ .attach()
+ .consumeResponse(Attach.class)
.consumeResponse(Flow.class)
+ .transferDeliveryId()
.transferHandle(linkHandle)
.transferPayloadData(getTestName())
.transferTransactionalState(txnState.getCurrentTransactionId())
@@ -225,6 +226,8 @@
.disposition();
interaction.txnDischarge(txnState, false);
+
+ assertThat(txnState.getDeliveryState(), is(instanceOf(Accepted.class)));
}
assertThat(Utils.receiveMessage(_brokerAddress, BrokerAdmin.TEST_QUEUE_NAME), is(equalTo(getTestName())));
}
@@ -255,6 +258,7 @@
.attach().consumeResponse(Attach.class)
.consumeResponse(Flow.class)
+ .transferDeliveryId()
.transferHandle(linkHandle)
.transferPayloadData(getTestName())
.transferTransactionalState(integerToBinary(Integer.MAX_VALUE))
@@ -295,7 +299,7 @@
.consumeResponse(Attach.class)
.flowIncomingWindow(UnsignedInteger.ONE)
- .flowNextIncomingId(UnsignedInteger.ZERO)
+ .flowNextIncomingIdFromPeerLatestSessionBeginAndDeliveryCount()
.flowOutgoingWindow(UnsignedInteger.ZERO)
.flowNextOutgoingId(UnsignedInteger.ZERO)
.flowLinkCredit(UnsignedInteger.ONE)
@@ -313,6 +317,9 @@
.dispositionTransactionalState(txnState.getCurrentTransactionId(), new Accepted())
.disposition()
.txnDischarge(txnState, false);
+
+
+ assertThat(txnState.getDeliveryState(), is(instanceOf(Accepted.class)));
}
}
@@ -344,7 +351,7 @@
.consumeResponse(Attach.class)
.flowIncomingWindow(UnsignedInteger.ONE)
- .flowNextIncomingId(UnsignedInteger.ZERO)
+ .flowNextIncomingIdFromPeerLatestSessionBeginAndDeliveryCount()
.flowOutgoingWindow(UnsignedInteger.ZERO)
.flowNextOutgoingId(UnsignedInteger.ZERO)
.flowLinkCredit(UnsignedInteger.ONE)
@@ -363,6 +370,8 @@
.disposition()
.txnDischarge(txnState, true);
+ assertThat(txnState.getDeliveryState(), is(instanceOf(Accepted.class)));
+
Object receivedMessage = Utils.receiveMessage(_brokerAddress, BrokerAdmin.TEST_QUEUE_NAME);
assertThat(receivedMessage, is(equalTo(getTestName())));
}
@@ -396,7 +405,7 @@
.attach().consumeResponse(Attach.class)
.flowIncomingWindow(UnsignedInteger.ONE)
- .flowNextIncomingId(UnsignedInteger.ZERO)
+ .flowNextIncomingIdFromPeerLatestSessionBeginAndDeliveryCount()
.flowOutgoingWindow(UnsignedInteger.ZERO)
.flowNextOutgoingId(UnsignedInteger.ZERO)
.flowLinkCredit(UnsignedInteger.ONE)
@@ -421,7 +430,10 @@
.consumeResponse().getLatestResponse();
assertUnknownTransactionIdError(response);
}
- assertThat(Utils.receiveMessage(_brokerAddress, BrokerAdmin.TEST_QUEUE_NAME), is(equalTo(getTestName())));
+ finally
+ {
+ assertThat(Utils.receiveMessage(_brokerAddress, BrokerAdmin.TEST_QUEUE_NAME), is(equalTo(getTestName())));
+ }
}
@Ignore("TODO disposition is currently not being sent by Broker")
@@ -479,6 +491,8 @@
assertThat(((TransactionalState) settledDisposition.getState()).getOutcome(), is(instanceOf(Accepted.class)));
interaction.txnDischarge(txnState, false);
+
+ assertThat(txnState.getDeliveryState(), is(instanceOf(Accepted.class)));
}
}
@@ -514,7 +528,7 @@
.consumeResponse(Attach.class)
.flowIncomingWindow(UnsignedInteger.ONE)
- .flowNextIncomingId(UnsignedInteger.ZERO)
+ .flowNextIncomingIdFromPeerLatestSessionBeginAndDeliveryCount()
.flowOutgoingWindow(UnsignedInteger.ZERO)
.flowNextOutgoingId(UnsignedInteger.ZERO)
.flowLinkCredit(UnsignedInteger.ONE)
@@ -526,9 +540,6 @@
List<Transfer> transfers = interaction.getLatestDelivery();
assertThat(transfers.size(), is(equalTo(1)));
- Transfer transfer = transfers.get(0);
- assertThat(transfer.getState(), is(instanceOf(TransactionalState.class)));
- assertThat(((TransactionalState) transfer.getState()).getTxnId(), is(equalTo(txnState.getCurrentTransactionId())));
Object data = interaction.decodeLatestDelivery().getDecodedLatestDelivery();
assertThat(data, is(equalTo(getTestName())));
@@ -536,19 +547,20 @@
interaction.dispositionSettled(true)
.dispositionRole(Role.RECEIVER)
.dispositionTransactionalState(txnState.getCurrentTransactionId(), new Accepted())
+ .dispositionFirstFromLatestDelivery()
.disposition()
.txnDischarge(txnState, false);
- if (getBrokerAdmin().isQueueDepthSupported())
- {
- assertThat(getBrokerAdmin().getQueueDepthMessages(BrokerAdmin.TEST_QUEUE_NAME), is(equalTo(0)));
- }
- else
- {
- final String content = getTestName() + "_2";
- Utils.putMessageOnQueue(getBrokerAdmin(), BrokerAdmin.TEST_QUEUE_NAME, content);
- assertThat(Utils.receiveMessage(_brokerAddress, BrokerAdmin.TEST_QUEUE_NAME), is(equalTo(content)));
- }
+
+ assertThat(txnState.getDeliveryState(), is(instanceOf(Accepted.class)));
+
+ Transfer transfer = transfers.get(0);
+ assumeThat(transfer.getState(), is(instanceOf(TransactionalState.class)));
+ assumeThat(((TransactionalState) transfer.getState()).getTxnId(), is(equalTo(txnState.getCurrentTransactionId())));
+
+ final String content = getTestName() + "_2";
+ Utils.putMessageOnQueue(getBrokerAdmin(), BrokerAdmin.TEST_QUEUE_NAME, content);
+ assertThat(Utils.receiveMessage(_brokerAddress, BrokerAdmin.TEST_QUEUE_NAME), is(equalTo(content)));
}
}
@@ -584,7 +596,7 @@
.consumeResponse(Attach.class)
.flowIncomingWindow(UnsignedInteger.ONE)
- .flowNextIncomingId(UnsignedInteger.ZERO)
+ .flowNextIncomingIdFromPeerLatestSessionBeginAndDeliveryCount()
.flowOutgoingWindow(UnsignedInteger.ZERO)
.flowNextOutgoingId(UnsignedInteger.ZERO)
.flowLinkCredit(UnsignedInteger.ONE)
@@ -596,9 +608,6 @@
List<Transfer> transfers = interaction.getLatestDelivery();
assertThat(transfers.size(), is(equalTo(1)));
- Transfer transfer = transfers.get(0);
- assertThat(transfer.getState(), is(instanceOf(TransactionalState.class)));
- assertThat(((TransactionalState) transfer.getState()).getTxnId(), is(equalTo(txnState.getCurrentTransactionId())));
Object data = interaction.decodeLatestDelivery().getDecodedLatestDelivery();
assertThat(data, is(equalTo(getTestName())));
@@ -609,11 +618,13 @@
.disposition()
.txnDischarge(txnState, true);
- if (getBrokerAdmin().isQueueDepthSupported())
- {
- assertThat(getBrokerAdmin().getQueueDepthMessages(BrokerAdmin.TEST_QUEUE_NAME), is(equalTo(1)));
- }
+ assertThat(txnState.getDeliveryState(), is(instanceOf(Accepted.class)));
+
assertThat(Utils.receiveMessage(_brokerAddress, BrokerAdmin.TEST_QUEUE_NAME), is(equalTo(getTestName())));
+
+ Transfer transfer = transfers.get(0);
+ assumeThat(transfer.getState(), is(instanceOf(TransactionalState.class)));
+ assumeThat(((TransactionalState) transfer.getState()).getTxnId(), is(equalTo(txnState.getCurrentTransactionId())));
}
}
@@ -661,7 +672,10 @@
assertUnknownTransactionIdError(response);
}
- assertThat(Utils.receiveMessage(_brokerAddress, BrokerAdmin.TEST_QUEUE_NAME), is(equalTo(getTestName())));
+ finally
+ {
+ assertThat(Utils.receiveMessage(_brokerAddress, BrokerAdmin.TEST_QUEUE_NAME), is(equalTo(getTestName())));
+ }
}
private void assertUnknownTransactionIdError(final Response<?> response)
diff --git a/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/transport/link/FlowTest.java b/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/transport/link/FlowTest.java
index 1bfa6d3..448de42 100644
--- a/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/transport/link/FlowTest.java
+++ b/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/transport/link/FlowTest.java
@@ -21,6 +21,7 @@
package org.apache.qpid.tests.protocol.v1_0.transport.link;
import static org.hamcrest.CoreMatchers.equalTo;
+import static org.hamcrest.CoreMatchers.instanceOf;
import static org.hamcrest.CoreMatchers.notNullValue;
import static org.hamcrest.CoreMatchers.nullValue;
import static org.hamcrest.MatcherAssert.assertThat;
@@ -42,6 +43,7 @@
import org.apache.qpid.server.protocol.v1_0.type.transport.Open;
import org.apache.qpid.server.protocol.v1_0.type.transport.Role;
import org.apache.qpid.server.protocol.v1_0.type.transport.SessionError;
+import org.apache.qpid.tests.protocol.Response;
import org.apache.qpid.tests.protocol.SpecificationTest;
import org.apache.qpid.tests.protocol.v1_0.FrameTransport;
import org.apache.qpid.tests.protocol.v1_0.Interaction;
@@ -53,7 +55,7 @@
{
@Test
@SpecificationTest(section = "1.3.4",
- description = "Flow without mandatory fields should result in a decoding error.")
+ description = "mandatory [...] a non null value for the field is always encoded.")
public void emptyFlow() throws Exception
{
getBrokerAdmin().createQueue(BrokerAdmin.TEST_QUEUE_NAME);
@@ -69,7 +71,7 @@
.flowOutgoingWindow(null)
.flowNextOutgoingId(null)
.flow()
- .consumeResponse()
+ .consumeResponse(Close.class)
.getLatestResponse(Close.class);
assertThat(responseClose.getError(), is(notNullValue()));
assertThat(responseClose.getError().getCondition(), is(AmqpError.DECODE_ERROR));
@@ -89,6 +91,11 @@
.open().consumeResponse(Open.class)
.begin().consumeResponse(Begin.class)
.flowEcho(true)
+ .flowOutgoingWindow(UnsignedInteger.ZERO)
+ .flowNextOutgoingId(UnsignedInteger.ZERO)
+ .flowNextIncomingIdFromPeerLatestSessionBeginAndDeliveryCount()
+ .flowIncomingWindow(UnsignedInteger.ONE)
+ .flowHandle(null)
.flow()
.consumeResponse()
.getLatestResponse(Flow.class);
@@ -118,7 +125,10 @@
.flowHandleFromLinkHandle()
.flowAvailable(UnsignedInteger.valueOf(10))
.flowDeliveryCount(UnsignedInteger.ZERO)
- .flowLinkCredit(UnsignedInteger.ZERO)
+ .flowLinkCredit(UnsignedInteger.ONE)
+ .flowOutgoingWindow(UnsignedInteger.ZERO)
+ .flowNextOutgoingId(UnsignedInteger.ZERO)
+ .flowNextIncomingIdFromPeerLatestSessionBeginAndDeliveryCount()
.flow().consumeResponse()
.getLatestResponse(Flow.class);
assertThat(responseFlow.getEcho(), not(equalTo(Boolean.TRUE)));
@@ -146,7 +156,7 @@
.attachSourceAddress(BrokerAdmin.TEST_QUEUE_NAME)
.attach().consumeResponse()
.flowIncomingWindow(UnsignedInteger.ONE)
- .flowNextIncomingId(UnsignedInteger.ZERO)
+ .flowNextIncomingIdFromPeerLatestSessionBeginAndDeliveryCount()
.flowOutgoingWindow(UnsignedInteger.ZERO)
.flowNextOutgoingId(UnsignedInteger.ZERO)
.flowLinkCredit(UnsignedInteger.ONE)
@@ -156,7 +166,7 @@
.decodeLatestDelivery()
.dispositionSettled(true)
.dispositionRole(Role.RECEIVER)
- .dispositionFirst(interaction.getLatestDeliveryId())
+ .dispositionFirstFromLatestDelivery()
.dispositionLast(interaction.getLatestDeliveryId())
.dispositionState(new Accepted())
.disposition()
@@ -215,7 +225,12 @@
.open().consumeResponse(Open.class)
.begin().consumeResponse(Begin.class)
.flowEcho(true)
- .flowHandle(UnsignedInteger.ONE)
+ .flowIncomingWindow(UnsignedInteger.ONE)
+ .flowNextIncomingIdFromPeerLatestSessionBeginAndDeliveryCount()
+ .flowLinkCredit(UnsignedInteger.ONE)
+ .flowHandle(UnsignedInteger.valueOf(Integer.MAX_VALUE))
+ .flowOutgoingWindow(UnsignedInteger.ZERO)
+ .flowNextOutgoingId(UnsignedInteger.ZERO)
.flow()
.consumeResponse().getLatestResponse(End.class);
@@ -249,20 +264,22 @@
UnsignedInteger remoteHandle = remoteAttach.getHandle();
assertThat(remoteHandle, is(notNullValue()));
- Flow responseFlow = interaction.flowIncomingWindow(UnsignedInteger.valueOf(1))
- .flowNextIncomingId(UnsignedInteger.ZERO)
+ interaction.flowIncomingWindow(UnsignedInteger.ONE)
+ .flowNextIncomingIdFromPeerLatestSessionBeginAndDeliveryCount()
+ .flowOutgoingWindow(UnsignedInteger.ZERO)
+ .flowNextOutgoingId(UnsignedInteger.ZERO)
.flowLinkCredit(UnsignedInteger.ONE)
.flowDrain(Boolean.FALSE)
.flowEcho(Boolean.TRUE)
.flowHandleFromLinkHandle()
.flow()
- .consumeResponse().getLatestResponse(Flow.class);
+ .consumeResponse(null, Flow.class);
- assertThat(responseFlow.getHandle(), is(equalTo(remoteHandle)));
- assertThat(responseFlow.getLinkCredit(), is(equalTo(UnsignedInteger.ONE)));
- assertThat(responseFlow.getDrain(), is(equalTo(Boolean.FALSE)));
-
- responseFlow = interaction.flowLinkCredit(UnsignedInteger.ONE)
+ Flow responseFlow = interaction.flowIncomingWindow(UnsignedInteger.ONE)
+ .flowNextIncomingIdFromPeerLatestSessionBeginAndDeliveryCount()
+ .flowOutgoingWindow(UnsignedInteger.ZERO)
+ .flowNextOutgoingId(UnsignedInteger.ZERO)
+ .flowLinkCredit(UnsignedInteger.ONE)
.flowDrain(Boolean.TRUE)
.flowEcho(Boolean.FALSE)
.flowHandleFromLinkHandle()
@@ -271,6 +288,7 @@
assertThat(responseFlow.getHandle(), is(equalTo(remoteHandle)));
assertThat(responseFlow.getLinkCredit(), is(equalTo(UnsignedInteger.ZERO)));
+ assertThat(responseFlow.getDrain(), is(equalTo(Boolean.TRUE)));
}
}
@@ -286,8 +304,6 @@
{
BrokerAdmin brokerAdmin = getBrokerAdmin();
brokerAdmin.createQueue(BrokerAdmin.TEST_QUEUE_NAME);
- String messageContent = getTestName();
- Utils.putMessageOnQueue(getBrokerAdmin(), BrokerAdmin.TEST_QUEUE_NAME, messageContent);
final InetSocketAddress addr = brokerAdmin.getBrokerAddress(BrokerAdmin.PortType.ANONYMOUS_AMQP);
try (FrameTransport transport = new FrameTransport(addr).connect())
@@ -304,35 +320,42 @@
UnsignedInteger remoteHandle = remoteAttach.getHandle();
assertThat(remoteHandle, is(notNullValue()));
- Object receivedMessageContent = interaction.flowIncomingWindow(UnsignedInteger.valueOf(1))
- .flowNextIncomingId(UnsignedInteger.ZERO)
+ interaction.flowIncomingWindow(UnsignedInteger.valueOf(1))
+ .flowNextIncomingIdFromPeerLatestSessionBeginAndDeliveryCount()
.flowLinkCredit(UnsignedInteger.ONE)
.flowDrain(Boolean.FALSE)
.flowEcho(Boolean.FALSE)
.flowHandleFromLinkHandle()
+ .flowOutgoingWindow(UnsignedInteger.ZERO)
+ .flowNextOutgoingId(UnsignedInteger.ZERO)
.flow()
- .receiveDelivery()
- .decodeLatestDelivery()
- .getDecodedLatestDelivery();
+ .sync();
- assertThat(receivedMessageContent, is(equalTo(messageContent)));
- UnsignedInteger firstDeliveryId = interaction.getLatestDeliveryId();
- assertThat(firstDeliveryId, is(equalTo(UnsignedInteger.ZERO)));
+ Utils.putMessageOnQueue(getBrokerAdmin(), BrokerAdmin.TEST_QUEUE_NAME, getTestName());
- Flow responseFlow = interaction.flowNextIncomingId(UnsignedInteger.ONE)
- .flowLinkCredit(UnsignedInteger.ONE)
- .flowDrain(Boolean.TRUE)
- .flowEcho(Boolean.FALSE)
- .flowHandleFromLinkHandle()
- .flow()
- .consumeResponse().getLatestResponse(Flow.class);
+ final Object receivedMessageContent = interaction.receiveDelivery()
+ .decodeLatestDelivery()
+ .getDecodedLatestDelivery();
+
+ assertThat(receivedMessageContent, is(equalTo(getTestName())));
+
+ final Flow responseFlow = interaction.flowNextIncomingIdFromPeerLatestSessionBeginAndDeliveryCount()
+ .flowLinkCredit(UnsignedInteger.ONE)
+ .flowDrain(Boolean.TRUE)
+ .flowEcho(Boolean.FALSE)
+ .flowHandleFromLinkHandle()
+ .flowOutgoingWindow(UnsignedInteger.ZERO)
+ .flowNextOutgoingId(UnsignedInteger.ZERO)
+ .flowDeliveryCount()
+ .flow()
+ .consumeResponse().getLatestResponse(Flow.class);
assertThat(responseFlow.getHandle(), is(equalTo(remoteHandle)));
assertThat(responseFlow.getLinkCredit(), is(equalTo(UnsignedInteger.ZERO)));
interaction.dispositionSettled(true)
.dispositionRole(Role.RECEIVER)
- .dispositionFirst(firstDeliveryId)
+ .dispositionFirst(interaction.getLatestDeliveryId())
.dispositionState(new Accepted())
.disposition()
.sync();
@@ -373,9 +396,12 @@
UnsignedInteger delta = UnsignedInteger.ONE;
UnsignedInteger incomingWindow = UnsignedInteger.valueOf(3);
Object receivedMessageContent1 = interaction.flowIncomingWindow(incomingWindow)
- .flowNextIncomingId(UnsignedInteger.ZERO)
+ .flowNextIncomingIdFromPeerLatestSessionBeginAndDeliveryCount()
.flowLinkCredit(delta)
.flowHandleFromLinkHandle()
+ .flowDeliveryCount()
+ .flowOutgoingWindow(UnsignedInteger.ZERO)
+ .flowNextOutgoingId(UnsignedInteger.ZERO)
.flow()
.receiveDelivery()
.decodeLatestDelivery()
@@ -383,12 +409,14 @@
assertThat(receivedMessageContent1, is(equalTo(contents[0])));
UnsignedInteger firstDeliveryId = interaction.getLatestDeliveryId();
- assertThat(firstDeliveryId, is(equalTo(UnsignedInteger.ZERO)));
Object receivedMessageContent2 = interaction.flowIncomingWindow(incomingWindow)
- .flowNextIncomingId(UnsignedInteger.ONE)
+ .flowNextIncomingIdFromPeerLatestSessionBeginAndDeliveryCount()
.flowLinkCredit(delta)
.flowHandleFromLinkHandle()
+ .flowOutgoingWindow(UnsignedInteger.ZERO)
+ .flowNextOutgoingId(UnsignedInteger.ZERO)
+ .flowDeliveryCount()
.flow()
.receiveDelivery()
.decodeLatestDelivery()
@@ -396,17 +424,18 @@
assertThat(receivedMessageContent2, is(equalTo(contents[1])));
UnsignedInteger secondDeliveryId = interaction.getLatestDeliveryId();
- assertThat(secondDeliveryId, is(equalTo(UnsignedInteger.ONE)));
// send session flow with echo=true to verify that no message is delivered without issuing a credit
- Flow responseFlow = interaction.flowNextIncomingId(UnsignedInteger.valueOf(2))
- .flowLinkCredit(null)
- .flowHandle(null)
- .flowEcho(Boolean.TRUE)
- .flow()
- .consumeResponse().getLatestResponse(Flow.class);
-
- assertThat(responseFlow.getHandle(), is(nullValue()));
+ interaction.flowNextIncomingIdFromPeerLatestSessionBeginAndDeliveryCount()
+ .flowIncomingWindow(incomingWindow)
+ .flowLinkCredit(null)
+ .flowHandle(null)
+ .flowDeliveryCount(null)
+ .flowEcho(Boolean.TRUE)
+ .flowOutgoingWindow(UnsignedInteger.ZERO)
+ .flowNextOutgoingId(UnsignedInteger.ZERO)
+ .flow()
+ .consumeResponse(null, Flow.class);
interaction.dispositionSettled(true)
.dispositionRole(Role.RECEIVER)
@@ -429,10 +458,8 @@
{
BrokerAdmin brokerAdmin = getBrokerAdmin();
brokerAdmin.createQueue(BrokerAdmin.TEST_QUEUE_NAME);
- final String[] contents = Utils.createTestMessageContents(2, getTestName());
- Utils.putMessageOnQueue(getBrokerAdmin(), BrokerAdmin.TEST_QUEUE_NAME, contents);
-
final InetSocketAddress addr = brokerAdmin.getBrokerAddress(BrokerAdmin.PortType.ANONYMOUS_AMQP);
+ Utils.putMessageOnQueue(getBrokerAdmin(), BrokerAdmin.TEST_QUEUE_NAME, getTestName());
try (FrameTransport transport = new FrameTransport(addr).connect())
{
Interaction interaction = transport.newInteraction()
@@ -447,36 +474,68 @@
UnsignedInteger remoteHandle = remoteAttach.getHandle();
assertThat(remoteHandle, is(notNullValue()));
- Object receivedMessageContent1 = interaction.flowIncomingWindow(UnsignedInteger.valueOf(2))
- .flowNextIncomingId(UnsignedInteger.ZERO)
- .flowLinkCredit(UnsignedInteger.ONE)
+ UnsignedInteger incomingWindow = UnsignedInteger.valueOf(2);
+ Object receivedMessageContent1 = interaction.flowIncomingWindow(incomingWindow)
+ .flowNextIncomingIdFromPeerLatestSessionBeginAndDeliveryCount()
+ .flowLinkCredit(incomingWindow)
+ .flowNextOutgoingId()
.flowHandleFromLinkHandle()
+ .flowOutgoingWindow(UnsignedInteger.ZERO)
+ .flowNextOutgoingId(UnsignedInteger.ZERO)
.flow()
.receiveDelivery()
.decodeLatestDelivery()
.getDecodedLatestDelivery();
+ assertThat(receivedMessageContent1, is(equalTo(getTestName())));
- assertThat(receivedMessageContent1, is(equalTo(contents[0])));
- assertThat(interaction.getLatestDeliveryId(), is(equalTo(UnsignedInteger.ZERO)));
+ final Response<?> response = interaction.flowIncomingWindow(incomingWindow)
+ .flowNextIncomingIdFromPeerLatestSessionBeginAndDeliveryCount()
+ .flowLinkCredit(UnsignedInteger.ZERO)
+ .flowHandleFromLinkHandle()
+ .flowEcho(Boolean.TRUE)
+ .flowOutgoingWindow(UnsignedInteger.ZERO)
+ .flowNextOutgoingId(UnsignedInteger.ZERO)
+ .flowDeliveryCount()
+ .flow()
+ .consumeResponse(null, Flow.class)
+ .getLatestResponse();
- Flow responseFlow = interaction.flowNextIncomingId(UnsignedInteger.ONE)
- .flowLinkCredit(UnsignedInteger.ZERO)
- .flowHandleFromLinkHandle()
- .flowEcho(Boolean.TRUE)
- .flow()
- .consumeResponse().getLatestResponse(Flow.class);
+ if (response != null)
+ {
+ assertThat(response.getBody(), is(instanceOf(Flow.class)));
+ final Flow responseFlow = (Flow) response.getBody();
+ assertThat(responseFlow.getEcho(), not(equalTo(Boolean.TRUE)));
+ assertThat(responseFlow.getHandle(), is(notNullValue()));
+ }
- assertThat(responseFlow.getHandle(), is(equalTo(remoteHandle)));
- assertThat(responseFlow.getLinkCredit(), is(equalTo(UnsignedInteger.ZERO)));
+ final String message2 = getTestName() + "_2";
+ Utils.putMessageOnQueue(getBrokerAdmin(), BrokerAdmin.TEST_QUEUE_NAME, message2);
+ try
+ {
+ // send session flow with echo=true to verify that no message is delivered without issuing a credit
+ interaction.flowIncomingWindow(incomingWindow)
+ .flowNextIncomingIdFromPeerLatestSessionBeginAndDeliveryCount()
+ .flowLinkCredit(null)
+ .flowHandle(null)
+ .flowDeliveryCount(null)
+ .flowEcho(Boolean.TRUE)
+ .flowOutgoingWindow(UnsignedInteger.ZERO)
+ .flowNextOutgoingId(UnsignedInteger.ZERO)
+ .flow()
+ .consumeResponse(null, Flow.class);
+ }
+ finally
+ {
+ assertThat(Utils.receiveMessage(addr, BrokerAdmin.TEST_QUEUE_NAME), is(equalTo(message2)));
- interaction.dispositionSettled(true)
- .dispositionRole(Role.RECEIVER)
- .dispositionFirst(interaction.getLatestDeliveryId())
- .dispositionState(new Accepted())
- .disposition()
- .sync();
+ interaction.dispositionSettled(true)
+ .dispositionRole(Role.RECEIVER)
+ .dispositionFirst(interaction.getLatestDeliveryId())
+ .dispositionState(new Accepted())
+ .disposition()
+ .sync();
+ }
}
- assertThat(Utils.receiveMessage(addr, BrokerAdmin.TEST_QUEUE_NAME), is(equalTo(contents[1])));
}
@Test
@@ -485,7 +544,7 @@
+ " available to consume the current link-credit. If set, the sender will"
+ " (after sending all available messages) advance the delivery-count as much as possible,"
+ " consuming all link-credit, and send the flow state to the receiver.")
- public void drainWithZeroCredits() throws Exception
+ public void drain() throws Exception
{
BrokerAdmin brokerAdmin = getBrokerAdmin();
brokerAdmin.createQueue(BrokerAdmin.TEST_QUEUE_NAME);
@@ -507,16 +566,26 @@
assertThat(remoteHandle, is(notNullValue()));
Flow responseFlow = interaction.flowIncomingWindow(UnsignedInteger.valueOf(2))
- .flowNextIncomingId(UnsignedInteger.ZERO)
- .flowLinkCredit(UnsignedInteger.ZERO)
+ .flowNextIncomingIdFromPeerLatestSessionBeginAndDeliveryCount()
+ .flowLinkCredit(UnsignedInteger.valueOf(2))
.flowDrain(Boolean.TRUE)
.flowHandleFromLinkHandle()
+ .flowOutgoingWindow(UnsignedInteger.ZERO)
+ .flowNextOutgoingId(UnsignedInteger.ZERO)
.flow()
- .consumeResponse().getLatestResponse(Flow.class);
+ .receiveDelivery()
+ .decodeLatestDelivery()
+ .consumeResponse(Flow.class).getLatestResponse(Flow.class);
assertThat(responseFlow.getHandle(), is(equalTo(remoteHandle)));
assertThat(responseFlow.getLinkCredit(), is(equalTo(UnsignedInteger.ZERO)));
+
+ interaction.dispositionSettled(true)
+ .dispositionRole(Role.RECEIVER)
+ .dispositionFirstFromLatestDelivery()
+ .dispositionState(new Accepted())
+ .disposition()
+ .sync();
}
- assertThat(Utils.receiveMessage(addr, BrokerAdmin.TEST_QUEUE_NAME), is(equalTo(getTestName())));
}
}
diff --git a/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/transport/link/ResumeDeliveriesTest.java b/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/transport/link/ResumeDeliveriesTest.java
index 337c129..61884e3 100644
--- a/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/transport/link/ResumeDeliveriesTest.java
+++ b/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/transport/link/ResumeDeliveriesTest.java
@@ -591,11 +591,9 @@
interaction.doCloseConnection();
- if (getBrokerAdmin().isQueueDepthSupported())
- {
- assertThat(getBrokerAdmin().getQueueDepthMessages(BrokerAdmin.TEST_QUEUE_NAME),
- is(equalTo(0)));
- }
+ final String content = getTestName() + "_2";
+ Utils.putMessageOnQueue(getBrokerAdmin(), BrokerAdmin.TEST_QUEUE_NAME, content);
+ assertThat(Utils.receiveMessage(_brokerAddress, BrokerAdmin.TEST_QUEUE_NAME), Matchers.is(Matchers.equalTo(content)));
}
}
@@ -675,11 +673,9 @@
interaction.doCloseConnection();
- if (getBrokerAdmin().isQueueDepthSupported())
- {
- assertThat(getBrokerAdmin().getQueueDepthMessages(BrokerAdmin.TEST_QUEUE_NAME),
- Matchers.is(Matchers.equalTo(0)));
- }
+ final String content = getTestName() + "_2";
+ Utils.putMessageOnQueue(getBrokerAdmin(), BrokerAdmin.TEST_QUEUE_NAME, content);
+ assertThat(Utils.receiveMessage(_brokerAddress, BrokerAdmin.TEST_QUEUE_NAME), Matchers.is(Matchers.equalTo(content)));
}
}
diff --git a/systests/protocol-tests-core/src/main/java/org/apache/qpid/tests/protocol/AbstractInteraction.java b/systests/protocol-tests-core/src/main/java/org/apache/qpid/tests/protocol/AbstractInteraction.java
index f9640dd..801033e 100644
--- a/systests/protocol-tests-core/src/main/java/org/apache/qpid/tests/protocol/AbstractInteraction.java
+++ b/systests/protocol-tests-core/src/main/java/org/apache/qpid/tests/protocol/AbstractInteraction.java
@@ -44,9 +44,14 @@
public I consumeResponse(final Class<?>... responseTypes) throws Exception
{
+ final Set<Class<?>> acceptableResponseClasses = new HashSet<>(Arrays.asList(responseTypes));
+ return consumeResponse(acceptableResponseClasses);
+ }
+
+ protected I consumeResponse(final Set<Class<?>> acceptableResponseClasses) throws Exception
+ {
sync();
_latestResponse = getNextResponse();
- final Set<Class<?>> acceptableResponseClasses = new HashSet<>(Arrays.asList(responseTypes));
if ((acceptableResponseClasses.isEmpty() && _latestResponse != null)
|| (acceptableResponseClasses.contains(null) && _latestResponse == null))
{