QPID-8350: Strengthen test cases involiving message transfer
diff --git a/systests/protocol-tests-amqp-1-0/src/main/java/org/apache/qpid/tests/protocol/v1_0/Utils.java b/systests/protocol-tests-amqp-1-0/src/main/java/org/apache/qpid/tests/protocol/v1_0/Utils.java
index 14b66e2..e846b5f 100644
--- a/systests/protocol-tests-amqp-1-0/src/main/java/org/apache/qpid/tests/protocol/v1_0/Utils.java
+++ b/systests/protocol-tests-amqp-1-0/src/main/java/org/apache/qpid/tests/protocol/v1_0/Utils.java
@@ -25,6 +25,7 @@
import static org.junit.Assume.assumeThat;
import java.net.InetSocketAddress;
+import java.util.stream.IntStream;
import org.apache.qpid.server.bytebuffer.QpidByteBuffer;
import org.apache.qpid.server.protocol.v1_0.type.UnsignedInteger;
@@ -74,27 +75,40 @@
{
try (FrameTransport transport = new FrameTransport(brokerAddress).connect())
{
- final Interaction interaction = transport.newInteraction()
- .negotiateProtocol().consumeResponse()
- .open().consumeResponse()
- .begin().consumeResponse()
- .attachRole(Role.RECEIVER)
- .attachSourceAddress(queueName)
- .attach().consumeResponse()
- .flowIncomingWindow(UnsignedInteger.ONE)
- .flowNextIncomingId(UnsignedInteger.ZERO)
- .flowOutgoingWindow(UnsignedInteger.ZERO)
- .flowNextOutgoingId(UnsignedInteger.ZERO)
- .flowLinkCredit(UnsignedInteger.ONE)
- .flowHandleFromLinkHandle()
- .flow()
- .receiveDelivery()
- .decodeLatestDelivery();
-
+ final Interaction interaction = transport.newInteraction();
+ interaction.negotiateProtocol().consumeResponse()
+ .open().consumeResponse()
+ .begin().consumeResponse()
+ .attachRole(Role.RECEIVER)
+ .attachSourceAddress(queueName)
+ .attach().consumeResponse()
+ .flowIncomingWindow(UnsignedInteger.ONE)
+ .flowNextIncomingId(UnsignedInteger.ZERO)
+ .flowOutgoingWindow(UnsignedInteger.ZERO)
+ .flowNextOutgoingId(UnsignedInteger.ZERO)
+ .flowLinkCredit(UnsignedInteger.ONE)
+ .flowHandleFromLinkHandle()
+ .flow()
+ .receiveDelivery()
+ .decodeLatestDelivery()
+ .dispositionSettled(true)
+ .dispositionRole(Role.RECEIVER)
+ .dispositionFirst(interaction.getLatestDeliveryId())
+ .dispositionLast(interaction.getLatestDeliveryId())
+ .dispositionState(new Accepted())
+ .disposition()
+ .sync();
return interaction.getDecodedLatestDelivery();
}
}
+ public static String[] createTestMessageContents(final int numberOfMessages, final String testName)
+ {
+ return IntStream.range(0, numberOfMessages)
+ .mapToObj(i -> String.format("%s_%d", testName, i))
+ .toArray(String[]::new);
+ }
+
public static QpidByteBuffer[] splitPayload(final String messageContent, int numberOfParts)
{
MessageEncoder messageEncoder = new MessageEncoder();
diff --git a/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/extensions/qpid/queue/QueueDeletionTest.java b/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/extensions/qpid/queue/QueueDeletionTest.java
index 6698337..d6eb3d1 100644
--- a/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/extensions/qpid/queue/QueueDeletionTest.java
+++ b/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/extensions/qpid/queue/QueueDeletionTest.java
@@ -60,7 +60,6 @@
@BrokerSpecific(kind = KIND_BROKER_J)
public class QueueDeletionTest extends BrokerAdminUsingTestBase
{
- private static final String TEST_MESSAGE_CONTENT = "test";
private InetSocketAddress _brokerAddress;
@@ -153,7 +152,7 @@
Disposition responseDisposition = interaction.consumeResponse(Flow.class)
.transferHandle(linkHandle)
- .transferPayloadData(TEST_MESSAGE_CONTENT)
+ .transferPayloadData(getTestName())
.transferTransactionalState(txnState.getCurrentTransactionId())
.transfer()
.consumeResponse(Disposition.class)
@@ -183,8 +182,8 @@
{
Utils.putMessageOnQueue(getBrokerAdmin(),
BrokerAdmin.TEST_QUEUE_NAME,
- TEST_MESSAGE_CONTENT + 1,
- TEST_MESSAGE_CONTENT + 2);
+ getTestName() + 1,
+ getTestName() + 2);
try (FrameTransport transport = new FrameTransport(_brokerAddress).connect())
{
final Interaction interaction = transport.newInteraction();
@@ -218,7 +217,7 @@
.decodeLatestDelivery();
Object data = interaction.getDecodedLatestDelivery();
- assertThat(data, is(equalTo(TEST_MESSAGE_CONTENT + 1)));
+ assertThat(data, is(equalTo(getTestName() + 1)));
interaction.dispositionSettled(true)
.dispositionRole(Role.RECEIVER)
@@ -236,7 +235,7 @@
.decodeLatestDelivery();
data = interaction.getDecodedLatestDelivery();
- assertThat(data, is(equalTo(TEST_MESSAGE_CONTENT + 2)));
+ assertThat(data, is(equalTo(getTestName() + 2)));
getBrokerAdmin().deleteQueue(BrokerAdmin.TEST_QUEUE_NAME);
diff --git a/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/extensions/qpid/transactiontimeout/TransactionTimeoutTest.java b/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/extensions/qpid/transactiontimeout/TransactionTimeoutTest.java
index 522fe4b..cd4f896 100644
--- a/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/extensions/qpid/transactiontimeout/TransactionTimeoutTest.java
+++ b/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/extensions/qpid/transactiontimeout/TransactionTimeoutTest.java
@@ -24,10 +24,10 @@
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.is;
+import static org.hamcrest.Matchers.notNullValue;
import java.net.InetSocketAddress;
-import org.hamcrest.Matchers;
import org.junit.Before;
import org.junit.Test;
@@ -57,7 +57,6 @@
@ConfigItem(name = "virtualhost.storeTransactionOpenTimeoutClose", value = "1000")
public class TransactionTimeoutTest extends BrokerAdminUsingTestBase
{
- private static final String TEST_MESSAGE_CONTENT = "testMessageContent";
private InetSocketAddress _brokerAddress;
@Before
@@ -93,7 +92,7 @@
.consumeResponse(Flow.class)
.transferHandle(linkHandle)
- .transferPayloadData(TEST_MESSAGE_CONTENT)
+ .transferPayloadData(getTestName())
.transferTransactionalState(txnState.getCurrentTransactionId())
.transfer()
.consumeResponse(Disposition.class)
@@ -105,7 +104,7 @@
assertThat(((TransactionalState) responseDisposition.getState()).getOutcome(), is(instanceOf(Accepted.class)));
Close responseClose = interaction.consumeResponse().getLatestResponse(Close.class);
- assertThat(responseClose.getError(), is(Matchers.notNullValue()));
+ assertThat(responseClose.getError(), is(notNullValue()));
assertThat(responseClose.getError().getCondition(), equalTo(TransactionError.TRANSACTION_TIMEOUT));
}
}
@@ -113,7 +112,7 @@
@Test
public void transactionalRetirementTimeout() throws Exception
{
- Utils.putMessageOnQueue(getBrokerAdmin(), BrokerAdmin.TEST_QUEUE_NAME, TEST_MESSAGE_CONTENT);
+ Utils.putMessageOnQueue(getBrokerAdmin(), BrokerAdmin.TEST_QUEUE_NAME, getTestName());
try (FrameTransport transport = new FrameTransport(_brokerAddress).connect())
{
final Interaction interaction = transport.newInteraction();
@@ -147,7 +146,7 @@
.decodeLatestDelivery();
Object data = interaction.getDecodedLatestDelivery();
- assertThat(data, is(equalTo(TEST_MESSAGE_CONTENT)));
+ assertThat(data, is(equalTo(getTestName())));
interaction.dispositionSettled(true)
.dispositionRole(Role.RECEIVER)
@@ -165,8 +164,9 @@
{
responseClose = interaction.consumeResponse().getLatestResponse(Close.class);
}
- assertThat(responseClose.getError(), is(Matchers.notNullValue()));
+ assertThat(responseClose.getError(), is(notNullValue()));
assertThat(responseClose.getError().getCondition(), equalTo(TransactionError.TRANSACTION_TIMEOUT));
}
+ assertThat(Utils.receiveMessage(_brokerAddress, BrokerAdmin.TEST_QUEUE_NAME), is(equalTo(getTestName())));
}
}
diff --git a/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/messaging/DeleteOnCloseTest.java b/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/messaging/DeleteOnCloseTest.java
index 36203a4..bec5067 100644
--- a/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/messaging/DeleteOnCloseTest.java
+++ b/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/messaging/DeleteOnCloseTest.java
@@ -83,9 +83,14 @@
assertThat(attachResponse.getSource(), is(notNullValue()));
final String newTempQueueAddress = ((Source) attachResponse.getSource()).getAddress();
- assertThat(Utils.doesNodeExist(_brokerAddress, newTempQueueAddress), is(true));
-
- interaction.detachClose(true).detach().consumeResponse().getLatestResponse(Detach.class);
+ try
+ {
+ assertThat(Utils.doesNodeExist(_brokerAddress, newTempQueueAddress), is(true));
+ }
+ finally
+ {
+ interaction.detachClose(true).detach().consumeResponse().getLatestResponse(Detach.class);
+ }
assertThat(Utils.doesNodeExist(_brokerAddress, newTempQueueAddress), is(false));
}
@@ -116,7 +121,14 @@
assertThat(Utils.doesNodeExist(_brokerAddress, newTempQueueAddress), is(true));
interaction.consumeResponse().getLatestResponse(Flow.class);
- interaction.detachClose(true).detach().consumeResponse().getLatestResponse(Detach.class);
+ try
+ {
+ assertThat(Utils.doesNodeExist(_brokerAddress, newTempQueueAddress), is(true));
+ }
+ finally
+ {
+ interaction.detachClose(true).detach().consumeResponse().getLatestResponse(Detach.class);
+ }
assertThat(Utils.doesNodeExist(_brokerAddress, newTempQueueAddress), is(false));
}
@@ -144,11 +156,24 @@
assertThat(attachResponse.getSource(), is(notNullValue()));
final String newTempQueueAddress = ((Source) attachResponse.getSource()).getAddress();
- assertThat(Utils.doesNodeExist(_brokerAddress, newTempQueueAddress), is(true));
-
- interaction.detach().consumeResponse().getLatestResponse(Detach.class);
+ try
+ {
+ assertThat(Utils.doesNodeExist(_brokerAddress, newTempQueueAddress), is(true));
+ }
+ finally
+ {
+ interaction.detach().consumeResponse().getLatestResponse(Detach.class);
+ }
assertThat(Utils.doesNodeExist(_brokerAddress, newTempQueueAddress), is(true));
+
+ interaction.attach()
+ .consumeResponse(Attach.class)
+ .detachClose(true)
+ .detach()
+ .consumeResponse()
+ .getLatestResponse(Detach.class);
+ assertThat(Utils.doesNodeExist(_brokerAddress, newTempQueueAddress), is(false));
}
}
diff --git a/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/messaging/MultiTransferTest.java b/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/messaging/MultiTransferTest.java
index 126c882..ed9e58d 100644
--- a/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/messaging/MultiTransferTest.java
+++ b/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/messaging/MultiTransferTest.java
@@ -48,7 +48,6 @@
import org.apache.qpid.server.protocol.v1_0.type.transport.End;
import org.apache.qpid.server.protocol.v1_0.type.transport.Flow;
import org.apache.qpid.server.protocol.v1_0.type.transport.Open;
-import org.apache.qpid.server.protocol.v1_0.type.transport.ReceiverSettleMode;
import org.apache.qpid.server.protocol.v1_0.type.transport.Role;
import org.apache.qpid.tests.protocol.v1_0.FrameTransport;
import org.apache.qpid.tests.protocol.v1_0.Interaction;
@@ -95,7 +94,7 @@
{
try (FrameTransport transport = new FrameTransport(_brokerAddress).connect())
{
- QpidByteBuffer[] payloads = Utils.splitPayload("testData", 2);
+ QpidByteBuffer[] payloads = Utils.splitPayload(getTestName(), 2);
final UnsignedInteger deliveryId = UnsignedInteger.ZERO;
final Binary deliveryTag = new Binary("testTransfer".getBytes(UTF_8));
@@ -106,7 +105,6 @@
.begin().consumeResponse(Begin.class)
.attachRole(Role.SENDER)
.attachTargetAddress(BrokerAdmin.TEST_QUEUE_NAME)
- .attachRcvSettleMode(ReceiverSettleMode.SECOND)
.attachSourceOutcomes(Accepted.ACCEPTED_SYMBOL)
.attach().consumeResponse(Attach.class)
.consumeResponse(Flow.class)
@@ -128,8 +126,9 @@
}
assertThat(disposition.getFirst(), is(equalTo(deliveryId)));
assertThat(disposition.getLast(), oneOf(null, deliveryId));
- assertThat(disposition.getSettled(), is(equalTo(false)));
+ assertThat(disposition.getSettled(), is(equalTo(true)));
}
+ assertThat(Utils.receiveMessage(_brokerAddress, BrokerAdmin.TEST_QUEUE_NAME), is(equalTo(getTestName())));
}
@Test
@@ -141,7 +140,7 @@
{
try (FrameTransport transport = new FrameTransport(_brokerAddress).connect())
{
- QpidByteBuffer[] payloads = Utils.splitPayload("testData", 4);
+ QpidByteBuffer[] payloads = Utils.splitPayload(getTestName(), 4);
final UnsignedInteger deliveryId = UnsignedInteger.ZERO;
final Binary deliveryTag = new Binary("testTransfer".getBytes(UTF_8));
@@ -151,7 +150,6 @@
.begin().consumeResponse(Begin.class)
.attachRole(Role.SENDER)
.attachTargetAddress(BrokerAdmin.TEST_QUEUE_NAME)
- .attachRcvSettleMode(ReceiverSettleMode.SECOND)
.attachSourceOutcomes(Accepted.ACCEPTED_SYMBOL)
.attach().consumeResponse(Attach.class)
.consumeResponse(Flow.class)
@@ -188,14 +186,12 @@
}
assertThat(disposition.getFirst(), is(equalTo(deliveryId)));
assertThat(disposition.getLast(), oneOf(null, deliveryId));
- assertThat(disposition.getSettled(), is(equalTo(false)));
+ assertThat(disposition.getSettled(), is(equalTo(true)));
assertThat(disposition.getState(), is(instanceOf(Accepted.class)));
}
+ assertThat(Utils.receiveMessage(_brokerAddress, BrokerAdmin.TEST_QUEUE_NAME), is(equalTo(getTestName())));
}
-
- //
-
@Test
@SpecificationTest(section = "2.6.14",
description = "The sender MAY indicate an aborted attempt to deliver a message by setting the abort flag on the last transfer."
@@ -204,7 +200,7 @@
{
try (FrameTransport transport = new FrameTransport(_brokerAddress).connect())
{
- QpidByteBuffer[] payloads = Utils.splitPayload("testData", 2);
+ QpidByteBuffer[] payloads = Utils.splitPayload(getTestName(), 2);
final UnsignedInteger deliveryId = UnsignedInteger.ZERO;
final Binary deliveryTag = new Binary("testTransfer".getBytes(UTF_8));
@@ -215,7 +211,6 @@
.begin().consumeResponse(Begin.class)
.attachRole(Role.SENDER)
.attachTargetAddress(BrokerAdmin.TEST_QUEUE_NAME)
- .attachRcvSettleMode(ReceiverSettleMode.SECOND)
.attachSourceOutcomes(Accepted.ACCEPTED_SYMBOL)
.attach().consumeResponse(Attach.class)
.consumeResponse(Flow.class)
@@ -238,15 +233,18 @@
assertThat(latestResponse, is(nullValue()));
}
}
+
@Test
@SpecificationTest(section = "2.6.14",
description = "[...]messages being transferred along different links MAY be interleaved")
public void multiTransferInterleaved() throws Exception
{
+ String messageContent1 = getTestName() + "_1";
+ String messageContent2 = getTestName() + "_2";
try (FrameTransport transport = new FrameTransport(_brokerAddress).connect())
{
- QpidByteBuffer[] messagePayload1 = Utils.splitPayload("testData1", 2);
- QpidByteBuffer[] messagePayload2 = Utils.splitPayload("testData2", 2);
+ QpidByteBuffer[] messagePayload1 = Utils.splitPayload(messageContent1, 2);
+ QpidByteBuffer[] messagePayload2 = Utils.splitPayload(messageContent2, 2);
UnsignedInteger linkHandle1 = UnsignedInteger.ZERO;
UnsignedInteger linkHandle2 = UnsignedInteger.ONE;
@@ -265,7 +263,6 @@
.attachHandle(linkHandle1)
.attachRole(Role.SENDER)
.attachTargetAddress(BrokerAdmin.TEST_QUEUE_NAME)
- .attachRcvSettleMode(ReceiverSettleMode.SECOND)
.attachSourceOutcomes(Accepted.ACCEPTED_SYMBOL)
.attach().consumeResponse(Attach.class)
.consumeResponse(Flow.class)
@@ -274,7 +271,6 @@
.attachHandle(linkHandle2)
.attachRole(Role.SENDER)
.attachTargetAddress(BrokerAdmin.TEST_QUEUE_NAME)
- .attachRcvSettleMode(ReceiverSettleMode.SECOND)
.attachSourceOutcomes(Accepted.ACCEPTED_SYMBOL)
.attach().consumeResponse(Attach.class)
.consumeResponse(Flow.class)
@@ -328,7 +324,7 @@
dispositionMap.put(disposition.getFirst(), disposition);
assertThat(disposition.getLast(), oneOf(null, disposition.getFirst()));
- assertThat(disposition.getSettled(), is(equalTo(false)));
+ assertThat(disposition.getSettled(), is(equalTo(true)));
assertThat(disposition.getState(), is(instanceOf(Accepted.class)));
}
@@ -336,6 +332,8 @@
assertThat(dispositionMap.containsKey(deliverId1), is(true));
assertThat(dispositionMap.containsKey(deliveryId2), is(true));
}
+ assertThat(Utils.receiveMessage(_brokerAddress, BrokerAdmin.TEST_QUEUE_NAME), is(equalTo(messageContent1)));
+ assertThat(Utils.receiveMessage(_brokerAddress, BrokerAdmin.TEST_QUEUE_NAME), is(equalTo(messageContent2)));
}
@Test
@@ -343,10 +341,12 @@
description = "[...]messages transferred along a single link MUST NOT be interleaved")
public void illegallyInterleavedMultiTransferOnSingleLink() throws Exception
{
+ String messageContent1 = getTestName() + "_1";
+ String messageContent2 = getTestName() + "_2";
try (FrameTransport transport = new FrameTransport(_brokerAddress).connect())
{
- QpidByteBuffer[] messagePayload1 = Utils.splitPayload("testData1", 2);
- QpidByteBuffer[] messagePayload2 = Utils.splitPayload("testData2", 2);
+ QpidByteBuffer[] messagePayload1 = Utils.splitPayload(messageContent1, 2);
+ QpidByteBuffer[] messagePayload2 = Utils.splitPayload(messageContent2, 2);
Binary deliveryTag1 = new Binary("testTransfer1".getBytes(UTF_8));
Binary deliveryTag2 = new Binary("testTransfer2".getBytes(UTF_8));
@@ -361,7 +361,6 @@
.attachRole(Role.SENDER)
.attachTargetAddress(BrokerAdmin.TEST_QUEUE_NAME)
- .attachRcvSettleMode(ReceiverSettleMode.SECOND)
.attachSourceOutcomes(Accepted.ACCEPTED_SYMBOL)
.attach().consumeResponse(Attach.class)
.consumeResponse(Flow.class)
diff --git a/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/messaging/OutcomeTest.java b/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/messaging/OutcomeTest.java
index 2d48b7e..fce9324 100644
--- a/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/messaging/OutcomeTest.java
+++ b/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/messaging/OutcomeTest.java
@@ -59,7 +59,9 @@
+ " MUST NOT be redelivered to the modifying link endpoint.")
public void modifiedOutcomeWithUndeliverableHere() throws Exception
{
- Utils.putMessageOnQueue(getBrokerAdmin(), BrokerAdmin.TEST_QUEUE_NAME, "message1", "message2");
+ String content1 = getTestName() + "_1";
+ String content2 = getTestName() + "_2";
+ Utils.putMessageOnQueue(getBrokerAdmin(), BrokerAdmin.TEST_QUEUE_NAME, content1, content2);
try (FrameTransport transport = new FrameTransport(_brokerAddress).connect())
{
@@ -79,7 +81,7 @@
.decodeLatestDelivery();
Object firstDeliveryPayload = interaction.getDecodedLatestDelivery();
- assertThat(firstDeliveryPayload, is(equalTo("message1")));
+ assertThat(firstDeliveryPayload, is(equalTo(content1)));
Modified modifiedOutcome = new Modified();
modifiedOutcome.setUndeliverableHere(Boolean.TRUE);
@@ -96,10 +98,12 @@
.decodeLatestDelivery();
Object secondDeliveryPayload = interaction.getDecodedLatestDelivery();
- assertThat(secondDeliveryPayload, is(equalTo("message2")));
+ assertThat(secondDeliveryPayload, is(equalTo(content2)));
// verify that no unexpected performative is received by closing
interaction.doCloseConnection();
}
+ assertThat(Utils.receiveMessage(_brokerAddress, BrokerAdmin.TEST_QUEUE_NAME), is(equalTo(content1)));
+ assertThat(Utils.receiveMessage(_brokerAddress, BrokerAdmin.TEST_QUEUE_NAME), is(equalTo(content2)));
}
}
diff --git a/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/messaging/TransferTest.java b/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/messaging/TransferTest.java
index 4d6b99f..7950741 100644
--- a/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/messaging/TransferTest.java
+++ b/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/messaging/TransferTest.java
@@ -43,13 +43,10 @@
import java.util.TreeSet;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.stream.Collectors;
-import java.util.stream.IntStream;
import java.util.stream.LongStream;
import java.util.stream.Stream;
import com.google.common.collect.Sets;
-import org.hamcrest.CoreMatchers;
-import org.hamcrest.core.Is;
import org.junit.After;
import org.junit.Before;
import org.junit.Ignore;
@@ -91,10 +88,10 @@
import org.apache.qpid.tests.protocol.v1_0.Utils;
import org.apache.qpid.tests.utils.BrokerAdmin;
import org.apache.qpid.tests.utils.BrokerAdminUsingTestBase;
+import org.apache.qpid.tests.utils.BrokerSpecific;
public class TransferTest extends BrokerAdminUsingTestBase
{
- private static final String TEST_MESSAGE_DATA = "foo";
private static final long MAX_MAX_MESSAGE_SIZE_WE_ARE_WILLING_TO_TEST = 200 * 1024 * 1024L;
private InetSocketAddress _brokerAddress;
private String _originalMmsMessageStorePersistence;
@@ -162,7 +159,7 @@
.attach().consumeResponse(Attach.class)
.consumeResponse(Flow.class)
.transferDeliveryTag(null)
- .transferPayloadData("testData")
+ .transferPayloadData(getTestName())
.transfer();
interaction.consumeResponse(Detach.class, End.class, Close.class);
}
@@ -186,7 +183,7 @@
.attach().consumeResponse(Attach.class)
.consumeResponse(Flow.class)
.transferHandle(linkHandle)
- .transferPayloadData("testData")
+ .transferPayloadData(getTestName())
.transfer()
.consumeResponse()
.getLatestResponse(Disposition.class);
@@ -194,6 +191,7 @@
assertThat(responseDisposition.getSettled(), is(Boolean.TRUE));
assertThat(responseDisposition.getState(), is(instanceOf(Accepted.class)));
}
+ assertThat(Utils.receiveMessage(_brokerAddress, BrokerAdmin.TEST_QUEUE_NAME), is(equalTo(getTestName())));
}
@Test
@@ -202,6 +200,7 @@
+ " that could be considered unsettled by either end of the link.")
public void transferMessagesWithTheSameDeliveryTagOnSeparateLinksBelongingToTheSameSession() throws Exception
{
+ final String[] contents = Utils.createTestMessageContents(2, getTestName());
try (final FrameTransport transport = new FrameTransport(_brokerAddress).connect())
{
final UnsignedInteger link1Handle = UnsignedInteger.ONE;
@@ -227,13 +226,13 @@
.consumeResponse(Flow.class)
.transferHandle(link1Handle)
- .transferPayloadData("testData")
+ .transferPayloadData(contents[0])
.transferDeliveryTag(deliveryTag)
.transferDeliveryId(UnsignedInteger.ZERO)
.transfer()
.transferHandle(link2Handle)
.transferDeliveryId(UnsignedInteger.ONE)
- .transferPayloadData("testData2")
+ .transferPayloadData(contents[1])
.transferDeliveryTag(deliveryTag)
.transfer();
@@ -252,6 +251,7 @@
assertThat(disposition2.getFirst(), is(not(equalTo(first))));
}
}
+ assertTestQueueMessages(contents);
}
@Test
@@ -267,10 +267,9 @@
.begin().consumeResponse(Begin.class)
.attachRole(Role.SENDER)
.attachTargetAddress(BrokerAdmin.TEST_QUEUE_NAME)
- .attachRcvSettleMode(ReceiverSettleMode.SECOND)
.attach().consumeResponse(Attach.class)
.consumeResponse(Flow.class)
- .transferPayloadData("testData")
+ .transferPayloadData(getTestName())
.transferRcvSettleMode(ReceiverSettleMode.FIRST)
.transfer()
.consumeResponse()
@@ -279,6 +278,7 @@
assertThat(responseDisposition.getSettled(), is(Boolean.TRUE));
assertThat(responseDisposition.getState(), is(instanceOf(Accepted.class)));
}
+ assertThat(Utils.receiveMessage(_brokerAddress, BrokerAdmin.TEST_QUEUE_NAME), is(equalTo(getTestName())));
}
@Test
@@ -297,7 +297,7 @@
.attachRcvSettleMode(ReceiverSettleMode.FIRST)
.attach().consumeResponse(Attach.class)
.consumeResponse(Flow.class)
- .transferPayloadData("testData")
+ .transferPayloadData(getTestName())
.transferRcvSettleMode(ReceiverSettleMode.SECOND)
.transfer()
.consumeResponse()
@@ -321,7 +321,7 @@
.attachRole(Role.SENDER)
.attachTargetAddress(BrokerAdmin.TEST_QUEUE_NAME)
.attach()
- .transferPayloadData("testData")
+ .transferPayloadData(getTestName())
.transferSettled(true)
.transfer()
.close()
@@ -336,6 +336,7 @@
interaction.consumeResponse().getLatestResponse(Flow.class);
interaction.consumeResponse().getLatestResponse(Close.class);
}
+ assertThat(Utils.receiveMessage(_brokerAddress, BrokerAdmin.TEST_QUEUE_NAME), is(equalTo(getTestName())));
}
@Test
@@ -345,6 +346,7 @@
+ "where the durable header is set to true: if the source allows the rejected outcome then the "
+ "message SHOULD be rejected with the precondition-failed error, otherwise the link MUST be "
+ "detached by the receiver with the same error.")
+ @BrokerSpecific(kind = BrokerAdmin.KIND_BROKER_J)
public void durableTransferWithRejectedOutcome() throws Exception
{
try (FrameTransport transport = new FrameTransport(_brokerAddress).connect())
@@ -353,14 +355,13 @@
final Header header = new Header();
header.setDurable(true);
messageEncoder.setHeader(header);
- messageEncoder.addData("foo");
+ messageEncoder.addData(getTestName());
final Disposition receivedDisposition = transport.newInteraction()
.negotiateProtocol().consumeResponse()
.open().consumeResponse(Open.class)
.begin().consumeResponse(Begin.class)
.attachRole(Role.SENDER)
.attachTargetAddress(BrokerAdmin.TEST_QUEUE_NAME)
- .attachRcvSettleMode(ReceiverSettleMode.SECOND)
.attachSourceOutcomes(Accepted.ACCEPTED_SYMBOL,
Rejected.REJECTED_SYMBOL)
.attach().consumeResponse(Attach.class)
@@ -391,6 +392,7 @@
+ "where the durable header is set to true: if the source allows the rejected outcome then the "
+ "message SHOULD be rejected with the precondition-failed error, otherwise the link MUST be "
+ "detached by the receiver with the same error.")
+ @BrokerSpecific(kind = BrokerAdmin.KIND_BROKER_J)
public void durableTransferWithoutRejectedOutcome() throws Exception
{
try (FrameTransport transport = new FrameTransport(_brokerAddress).connect())
@@ -399,14 +401,13 @@
final Header header = new Header();
header.setDurable(true);
messageEncoder.setHeader(header);
- messageEncoder.addData("foo");
+ messageEncoder.addData(getTestName());
final Response<?> response = transport.newInteraction()
.negotiateProtocol().consumeResponse()
.open().consumeResponse(Open.class)
.begin().consumeResponse(Begin.class)
.attachRole(Role.SENDER)
.attachTargetAddress(BrokerAdmin.TEST_QUEUE_NAME)
- .attachRcvSettleMode(ReceiverSettleMode.SECOND)
.attachSourceOutcomes(Accepted.ACCEPTED_SYMBOL)
.attach().consumeResponse(Attach.class)
.consumeResponse(Flow.class)
@@ -440,7 +441,7 @@
@SpecificationTest(section = "2.6.12", description = "Transferring A Message.")
public void receiveTransferUnsettled() throws Exception
{
- Utils.putMessageOnQueue(getBrokerAdmin(), BrokerAdmin.TEST_QUEUE_NAME, TEST_MESSAGE_DATA);
+ Utils.putMessageOnQueue(getBrokerAdmin(), BrokerAdmin.TEST_QUEUE_NAME, getTestName());
try (FrameTransport transport = new FrameTransport(_brokerAddress).connect())
{
@@ -470,15 +471,16 @@
while (hasMore);
Object data = messageDecoder.getData();
- assertThat(data, Is.is(CoreMatchers.equalTo(TEST_MESSAGE_DATA)));
+ assertThat(data, is(equalTo(getTestName())));
}
+ assertThat(Utils.receiveMessage(_brokerAddress, BrokerAdmin.TEST_QUEUE_NAME), is(equalTo(getTestName())));
}
@Test
@SpecificationTest(section = "2.6.12", description = "Transferring A Message.")
public void receiveTransferReceiverSettleFirst() throws Exception
{
- Utils.putMessageOnQueue(getBrokerAdmin(), BrokerAdmin.TEST_QUEUE_NAME, TEST_MESSAGE_DATA);
+ Utils.putMessageOnQueue(getBrokerAdmin(), BrokerAdmin.TEST_QUEUE_NAME, getTestName());
try (FrameTransport transport = new FrameTransport(_brokerAddress).connect())
{
@@ -501,9 +503,10 @@
.decodeLatestDelivery();
Object data = interaction.getDecodedLatestDelivery();
- assertThat(data, Is.is(CoreMatchers.equalTo(TEST_MESSAGE_DATA)));
+ assertThat(data, is(equalTo(getTestName())));
interaction.dispositionSettled(true)
+ .dispositionState(new Accepted())
.dispositionRole(Role.RECEIVER)
.disposition();
@@ -516,7 +519,7 @@
@SpecificationTest(section = "2.6.12", description = "Transferring A Message.")
public void receiveTransferReceiverSettleSecond() throws Exception
{
- Utils.putMessageOnQueue(getBrokerAdmin(), BrokerAdmin.TEST_QUEUE_NAME, TEST_MESSAGE_DATA);
+ Utils.putMessageOnQueue(getBrokerAdmin(), BrokerAdmin.TEST_QUEUE_NAME, getTestName());
try (FrameTransport transport = new FrameTransport(_brokerAddress).connect())
{
@@ -539,7 +542,7 @@
.decodeLatestDelivery();
Object data = interaction.getDecodedLatestDelivery();
- assertThat(data, Is.is(CoreMatchers.equalTo(TEST_MESSAGE_DATA)));
+ assertThat(data, is(equalTo(getTestName())));
Disposition disposition = interaction.dispositionSettled(false)
.dispositionRole(Role.RECEIVER)
@@ -558,7 +561,7 @@
@SpecificationTest(section = "2.6.12", description = "Transferring A Message.")
public void receiveTransferReceiverSettleSecondWithRejectedOutcome() throws Exception
{
- Utils.putMessageOnQueue(getBrokerAdmin(), BrokerAdmin.TEST_QUEUE_NAME, TEST_MESSAGE_DATA);
+ Utils.putMessageOnQueue(getBrokerAdmin(), BrokerAdmin.TEST_QUEUE_NAME, getTestName());
try (FrameTransport transport = new FrameTransport(_brokerAddress).connect())
{
@@ -580,7 +583,7 @@
.flow();
Object data = interaction.receiveDelivery().decodeLatestDelivery().getDecodedLatestDelivery();
- assertThat(data, is(equalTo(TEST_MESSAGE_DATA)));
+ assertThat(data, is(equalTo(getTestName())));
interaction.dispositionSettled(false)
.dispositionRole(Role.RECEIVER)
@@ -601,6 +604,7 @@
}
+ assertThat(Utils.receiveMessage(_brokerAddress, BrokerAdmin.TEST_QUEUE_NAME), is(equalTo(getTestName())));
}
@Ignore
@@ -608,7 +612,7 @@
@SpecificationTest(section = "2.6.12", description = "Transferring A Message.")
public void receiveTransferReceiverSettleSecondWithImplicitDispositionState() throws Exception
{
- Utils.putMessageOnQueue(getBrokerAdmin(), BrokerAdmin.TEST_QUEUE_NAME, TEST_MESSAGE_DATA);
+ Utils.putMessageOnQueue(getBrokerAdmin(), BrokerAdmin.TEST_QUEUE_NAME, getTestName());
try (FrameTransport transport = new FrameTransport(_brokerAddress).connect())
{
@@ -633,7 +637,7 @@
.decodeLatestDelivery();
Object data = interaction.getDecodedLatestDelivery();
- assertThat(data, Is.is(CoreMatchers.equalTo(TEST_MESSAGE_DATA)));
+ assertThat(data, is(equalTo(getTestName())));
Disposition disposition = interaction.dispositionSettled(false)
.dispositionRole(Role.RECEIVER)
@@ -646,6 +650,7 @@
interaction.consumeResponse(null, Flow.class);
}
+ assertThat(Utils.receiveMessage(_brokerAddress, BrokerAdmin.TEST_QUEUE_NAME), is(equalTo(getTestName())));
}
@Test
@@ -653,7 +658,7 @@
+ " non-terminal delivery states to the sender")
public void receiveTransferReceiverIndicatesNonTerminalDeliveryState() throws Exception
{
-
+ String testMessageData;
try (FrameTransport transport = new FrameTransport(_brokerAddress).connect())
{
final Interaction interaction = transport.newInteraction();
@@ -664,7 +669,7 @@
.getLatestResponse(Open.class);
int negotiatedFrameSize = open.getMaxFrameSize().intValue();
- String testMessageData = Stream.generate(() -> "*").limit(negotiatedFrameSize).collect(Collectors.joining());
+ testMessageData = Stream.generate(() -> "*").limit(negotiatedFrameSize).collect(Collectors.joining());
Utils.putMessageOnQueue(getBrokerAdmin(), BrokerAdmin.TEST_QUEUE_NAME, testMessageData);
@@ -712,14 +717,10 @@
assertThat(messageDecoder.getData(), is(equalTo(testMessageData)));
- Disposition disposition = interaction.dispositionSettled(false)
- .dispositionRole(Role.RECEIVER)
- .dispositionState(new Accepted())
- .disposition().consumeResponse(Disposition.class)
- .getLatestResponse(Disposition.class);
- assertThat(disposition.getSettled(), is(true));
-
- interaction.consumeResponse(null, Flow.class);
+ interaction.dispositionSettled(true)
+ .dispositionRole(Role.RECEIVER)
+ .dispositionState(new Accepted())
+ .disposition().sync();
}
}
@@ -728,7 +729,7 @@
+ " the receiver initiates the attach exchange and the sender supports the desired mode.")
public void receiveTransferSenderSettleModeSettled() throws Exception
{
- Utils.putMessageOnQueue(getBrokerAdmin(), BrokerAdmin.TEST_QUEUE_NAME, TEST_MESSAGE_DATA);
+ Utils.putMessageOnQueue(getBrokerAdmin(), BrokerAdmin.TEST_QUEUE_NAME, getTestName());
try (FrameTransport transport = new FrameTransport(_brokerAddress).connect())
{
@@ -772,8 +773,10 @@
@Test
@SpecificationTest(section = "2.7.5",
description = "[delivery-tag] uniquely identifies the delivery attempt for a given message on this link.")
+ @Ignore("QPID-8346: test relies on receiver-settle-mode=second which is broken")
public void transfersWithDuplicateUnsettledDeliveryTag() throws Exception
{
+ String content1 = getTestName() + "_1";
try (FrameTransport transport = new FrameTransport(_brokerAddress).connect())
{
final Binary deliveryTag = new Binary("testDeliveryTag".getBytes(UTF_8));
@@ -787,6 +790,7 @@
.consumeResponse(Begin.class)
.attachRole(Role.SENDER)
.attachRcvSettleMode(ReceiverSettleMode.SECOND)
+ .attachTargetAddress(BrokerAdmin.TEST_QUEUE_NAME)
.attach()
.consumeResponse(Attach.class)
.consumeResponse(Flow.class);
@@ -796,12 +800,11 @@
interaction.transferDeliveryId(UnsignedInteger.ZERO)
.transferDeliveryTag(deliveryTag)
- .transferPayloadData("test")
+ .transferPayloadData(content1)
.transfer()
- .sync()
.transferDeliveryTag(deliveryTag)
.transferDeliveryId(UnsignedInteger.ONE)
- .transferPayloadData("test2")
+ .transferPayloadData(getTestName() + "_2")
.transfer()
.sync();
@@ -839,6 +842,7 @@
+ " could be considered unsettled by either end of the link.")
public void deliveryTagCanBeReusedAfterDeliveryIsSettled() throws Exception
{
+ final String[] contents = Utils.createTestMessageContents(2, getTestName());
try (FrameTransport transport = new FrameTransport(_brokerAddress).connect())
{
final Binary deliveryTag = new Binary("testDeliveryTag".getBytes(UTF_8));
@@ -861,22 +865,21 @@
interaction.transferDeliveryId(UnsignedInteger.ZERO)
.transferDeliveryTag(deliveryTag)
- .transferPayloadData("test")
+ .transferPayloadData(contents[0])
.transferSettled(true)
.transfer()
.sync()
.transferDeliveryTag(deliveryTag)
.transferDeliveryId(UnsignedInteger.ONE)
- .transferPayloadData("test2")
+ .transferPayloadData(contents[1])
.transfer()
.sync();
interaction.doCloseConnection();
- assumeThat(getBrokerAdmin().isQueueDepthSupported(), is(true));
- assertThat(getBrokerAdmin().getQueueDepthMessages(BrokerAdmin.TEST_QUEUE_NAME), is(equalTo(2)));
}
+ assertTestQueueMessages(contents);
}
@Test
@@ -914,7 +917,7 @@
assertThat(flow.getLinkCredit().intValue(), is(greaterThan(1)));
final long chunkSize = Math.min(1024 * 1024, maxFrameSize - 100);
- byte[] payloadChunk = createTestPaload(chunkSize);
+ byte[] payloadChunk = createTestPayload(chunkSize);
interaction.transferDeliveryId(UnsignedInteger.ZERO)
.transferDeliveryTag(deliveryTag)
.transferPayloadData(payloadChunk)
@@ -955,6 +958,7 @@
@SpecificationTest(section = "2.6.12", description = "Transferring A Message.")
public void transferMultipleDeliveries() throws Exception
{
+ final String[] contents = Utils.createTestMessageContents(3, getTestName());
try (FrameTransport transport = new FrameTransport(_brokerAddress).connect())
{
final Interaction interaction = transport.newInteraction()
@@ -970,15 +974,15 @@
interaction.transferDeliveryId(UnsignedInteger.ZERO)
.transferDeliveryTag(new Binary("A".getBytes(StandardCharsets.UTF_8)))
- .transferPayloadData("test")
+ .transferPayloadData(contents[0])
.transfer()
.transferDeliveryId(UnsignedInteger.ONE)
.transferDeliveryTag(new Binary("B".getBytes(StandardCharsets.UTF_8)))
- .transferPayloadData("test")
+ .transferPayloadData(contents[1])
.transfer()
.transferDeliveryId(UnsignedInteger.valueOf(2))
.transferDeliveryTag(new Binary("C".getBytes(StandardCharsets.UTF_8)))
- .transferPayloadData("test")
+ .transferPayloadData(contents[2])
.transfer();
TreeSet<UnsignedInteger> expectedDeliveryIds = Sets.newTreeSet(Arrays.asList(UnsignedInteger.ZERO,
@@ -989,6 +993,7 @@
// verify that no unexpected performative is received by closing
interaction.doCloseConnection();
}
+ assertTestQueueMessages(contents);
}
@@ -996,6 +1001,7 @@
@SpecificationTest(section = "2.6.12", description = "Transferring A Message.")
public void transferMixtureOfTransactionalAndNonTransactionalDeliveries() throws Exception
{
+ final String[] contents = Utils.createTestMessageContents(3, getTestName());
try (FrameTransport transport = new FrameTransport(_brokerAddress).connect())
{
final Interaction interaction = transport.newInteraction().negotiateProtocol().consumeResponse()
@@ -1015,16 +1021,16 @@
interaction.transferDeliveryId(UnsignedInteger.ONE)
.transferDeliveryTag(new Binary("A".getBytes(StandardCharsets.UTF_8)))
- .transferPayloadData("test")
+ .transferPayloadData(contents[0])
.transfer()
.transferDeliveryId(UnsignedInteger.valueOf(2))
.transferDeliveryTag(new Binary("B".getBytes(StandardCharsets.UTF_8)))
- .transferPayloadData("test")
+ .transferPayloadData(contents[1])
.transfer()
.transferDeliveryId(UnsignedInteger.valueOf(3))
.transferDeliveryTag(new Binary("C".getBytes(StandardCharsets.UTF_8)))
.transferTransactionalState(txnState.getCurrentTransactionId())
- .transferPayloadData("test")
+ .transferPayloadData(contents[2])
.transfer();
final Discharge discharge = new Discharge();
@@ -1041,15 +1047,16 @@
UnsignedInteger.valueOf(3),
UnsignedInteger.valueOf(4))));
}
+ assertTestQueueMessages(contents);
}
@Test
@SpecificationTest(section = "2.6.12", description = "Transferring A Message.")
public void receiveMultipleDeliveries() throws Exception
{
- int numberOfMessages = 4;
- Utils.putMessageOnQueue(getBrokerAdmin(), BrokerAdmin.TEST_QUEUE_NAME,
- IntStream.range(0, 4).mapToObj(i -> TEST_MESSAGE_DATA + "_" + i).toArray(String[]::new));
+ final int numberOfMessages = 4;
+ final String[] contents = Utils.createTestMessageContents(numberOfMessages, getTestName());
+ Utils.putMessageOnQueue(getBrokerAdmin(), BrokerAdmin.TEST_QUEUE_NAME, contents);
try (FrameTransport transport = new FrameTransport(_brokerAddress).connect())
{
@@ -1069,12 +1076,12 @@
.flowHandleFromLinkHandle()
.flow();
- for (int i = 0; i < numberOfMessages; i++)
+ for (int i = 0; i < contents.length; i++)
{
interaction.receiveDelivery(Flow.class).decodeLatestDelivery();
Object data = interaction.getDecodedLatestDelivery();
- assertThat(data, Is.is(CoreMatchers.equalTo(TEST_MESSAGE_DATA + "_" + i)));
- assertThat(interaction.getLatestDeliveryId(), Is.is(equalTo(UnsignedInteger.valueOf(i))));
+ assertThat(data, is(equalTo(contents[i])));
+ assertThat(interaction.getLatestDeliveryId(), is(equalTo(UnsignedInteger.valueOf(i))));
}
interaction.dispositionSettled(true)
@@ -1097,7 +1104,7 @@
}
}
- String messageText = TEST_MESSAGE_DATA + "_" + 4;
+ final String messageText = getTestName() + "_" + 4;
Utils.putMessageOnQueue(getBrokerAdmin(), BrokerAdmin.TEST_QUEUE_NAME, messageText);
Object receivedMessage = Utils.receiveMessage(_brokerAddress, BrokerAdmin.TEST_QUEUE_NAME);
assertThat(receivedMessage, is(equalTo(messageText)));
@@ -1107,9 +1114,9 @@
@SpecificationTest(section = "2.6.12", description = "Transferring A Message.")
public void receiveMixtureOfTransactionalAndNonTransactionalDeliveries() throws Exception
{
- int numberOfMessages = 4;
- Utils.putMessageOnQueue(getBrokerAdmin(), BrokerAdmin.TEST_QUEUE_NAME,
- IntStream.range(0, 4).mapToObj(i -> TEST_MESSAGE_DATA + "_" + i).toArray(String[]::new));
+ final int numberOfMessages = 4;
+ final String[] contents = Utils.createTestMessageContents(numberOfMessages, getTestName());
+ Utils.putMessageOnQueue(getBrokerAdmin(), BrokerAdmin.TEST_QUEUE_NAME, contents);
try (FrameTransport transport = new FrameTransport(_brokerAddress).connect())
{
@@ -1130,12 +1137,12 @@
.flowHandleFromLinkHandle()
.flow();
- for (int i = 0; i < numberOfMessages; i++)
+ for (int i = 0; i < contents.length; i++)
{
interaction.receiveDelivery(Flow.class).decodeLatestDelivery();
Object data = interaction.getDecodedLatestDelivery();
- assertThat(data, Is.is(CoreMatchers.equalTo(TEST_MESSAGE_DATA + "_" + i)));
- assertThat(interaction.getLatestDeliveryId(), Is.is(equalTo(UnsignedInteger.valueOf(i))));
+ assertThat(data, is(equalTo(contents[i])));
+ assertThat(interaction.getLatestDeliveryId(), is(equalTo(UnsignedInteger.valueOf(i))));
}
final InteractionTransactionalState txnState = interaction.createTransactionalState(UnsignedInteger.ONE);
@@ -1166,33 +1173,12 @@
.transferPayloadData(discharge)
.transfer();
- Disposition declareTransactionDisposition = null;
- Flow coordinatorFlow = null;
- do
- {
- interaction.consumeResponse(Disposition.class, Flow.class);
- Response<?> response = interaction.getLatestResponse();
- if (response.getBody() instanceof Disposition)
- {
- declareTransactionDisposition = (Disposition) response.getBody();
- }
- if (response.getBody() instanceof Flow)
- {
- final Flow flowResponse = (Flow) response.getBody();
- if (flowResponse.getHandle().equals(txnState.getHandle()))
- {
- coordinatorFlow = flowResponse;
- }
- }
- } while(declareTransactionDisposition == null || coordinatorFlow == null);
- if (getBrokerAdmin().isQueueDepthSupported())
- {
- assertThat(getBrokerAdmin().getQueueDepthMessages(BrokerAdmin.TEST_QUEUE_NAME), is(equalTo(0)));
- }
+ final Flow coordinatorFlow = interaction.consume(Flow.class, Disposition.class);
+ assertThat(coordinatorFlow.getHandle(), is(equalTo(txnState.getHandle())));
}
- String messageText = TEST_MESSAGE_DATA + "_" + 4;
+ String messageText = getTestName() + "_" + 4;
Utils.putMessageOnQueue(getBrokerAdmin(), BrokerAdmin.TEST_QUEUE_NAME, messageText);
Object receivedMessage = Utils.receiveMessage(_brokerAddress, BrokerAdmin.TEST_QUEUE_NAME);
assertThat(receivedMessage, is(equalTo(messageText)));
@@ -1221,7 +1207,7 @@
while (!expectedDeliveryIds.isEmpty());
}
- private byte[] createTestPaload(final long payloadSize)
+ private byte[] createTestPayload(final long payloadSize)
{
if (payloadSize > 1024*1024*1024)
{
@@ -1229,4 +1215,12 @@
}
return new byte[(int) payloadSize];
}
+
+ private void assertTestQueueMessages(final String[] contents) throws Exception
+ {
+ for (final String content : contents)
+ {
+ assertThat(Utils.receiveMessage(_brokerAddress, BrokerAdmin.TEST_QUEUE_NAME), is(equalTo(content)));
+ }
+ }
}
diff --git a/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/transaction/TransactionalTransferTest.java b/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/transaction/TransactionalTransferTest.java
index 1a146f4..853fdd9 100644
--- a/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/transaction/TransactionalTransferTest.java
+++ b/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/transaction/TransactionalTransferTest.java
@@ -26,7 +26,6 @@
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.is;
import static org.junit.Assert.fail;
-import static org.junit.Assume.assumeThat;
import java.net.InetSocketAddress;
import java.util.Collections;
@@ -65,7 +64,6 @@
public class TransactionalTransferTest extends BrokerAdminUsingTestBase
{
- private static final String TEST_MESSAGE_CONTENT = "testMessageContent";
private InetSocketAddress _brokerAddress;
@Before
@@ -105,7 +103,7 @@
.consumeResponse(Flow.class)
.transferHandle(linkHandle)
- .transferPayloadData(TEST_MESSAGE_CONTENT)
+ .transferPayloadData(getTestName())
.transferTransactionalState(txnState.getCurrentTransactionId())
.transfer()
.consumeResponse(Disposition.class)
@@ -119,7 +117,7 @@
interaction.txnDischarge(txnState, false);
Object receivedMessage = Utils.receiveMessage(_brokerAddress, BrokerAdmin.TEST_QUEUE_NAME);
- assertThat(receivedMessage, is(equalTo(TEST_MESSAGE_CONTENT)));
+ assertThat(receivedMessage, is(equalTo(getTestName())));
}
}
@@ -153,7 +151,7 @@
.consumeResponse(Flow.class)
.transferHandle(linkHandle)
- .transferPayloadData(TEST_MESSAGE_CONTENT)
+ .transferPayloadData(getTestName())
.transferTransactionalState(txnState.getCurrentTransactionId())
.transfer()
.consumeResponse(Disposition.class)
@@ -166,8 +164,16 @@
interaction.txnDischarge(txnState, true);
- assumeThat(getBrokerAdmin().isQueueDepthSupported(), is(true));
- assertThat(getBrokerAdmin().getQueueDepthMessages(BrokerAdmin.TEST_QUEUE_NAME), is(equalTo(0)));
+ if (getBrokerAdmin().isQueueDepthSupported())
+ {
+ assertThat(getBrokerAdmin().getQueueDepthMessages(BrokerAdmin.TEST_QUEUE_NAME), is(equalTo(0)));
+ }
+ else
+ {
+ final String content = getTestName() + "_2";
+ Utils.putMessageOnQueue(getBrokerAdmin(), BrokerAdmin.TEST_QUEUE_NAME, content);
+ assertThat(Utils.receiveMessage(_brokerAddress, BrokerAdmin.TEST_QUEUE_NAME), is(equalTo(content)));
+ }
}
}
@@ -202,7 +208,7 @@
.consumeResponse(Flow.class)
.transferHandle(linkHandle)
- .transferPayloadData(TEST_MESSAGE_CONTENT)
+ .transferPayloadData(getTestName())
.transferTransactionalState(txnState.getCurrentTransactionId())
.transfer()
.consumeResponse(Disposition.class)
@@ -220,6 +226,7 @@
interaction.txnDischarge(txnState, false);
}
+ assertThat(Utils.receiveMessage(_brokerAddress, BrokerAdmin.TEST_QUEUE_NAME), is(equalTo(getTestName())));
}
@Test
@@ -249,7 +256,7 @@
.consumeResponse(Flow.class)
.transferHandle(linkHandle)
- .transferPayloadData(TEST_MESSAGE_CONTENT)
+ .transferPayloadData(getTestName())
.transferTransactionalState(integerToBinary(Integer.MAX_VALUE))
.transfer()
.consumeResponse()
@@ -265,7 +272,7 @@
+ "wish to associate the outcome of a delivery with a transaction.")
public void receiveTransactionalRetirementReceiverSettleFirst() throws Exception
{
- Utils.putMessageOnQueue(getBrokerAdmin(), BrokerAdmin.TEST_QUEUE_NAME, TEST_MESSAGE_CONTENT);
+ Utils.putMessageOnQueue(getBrokerAdmin(), BrokerAdmin.TEST_QUEUE_NAME, getTestName());
try (FrameTransport transport = new FrameTransport(_brokerAddress).connect())
{
final Interaction interaction = transport.newInteraction();
@@ -299,7 +306,7 @@
.decodeLatestDelivery();
Object data = interaction.getDecodedLatestDelivery();
- assertThat(data, is(equalTo(TEST_MESSAGE_CONTENT)));
+ assertThat(data, is(equalTo(getTestName())));
interaction.dispositionSettled(true)
.dispositionRole(Role.RECEIVER)
@@ -314,7 +321,7 @@
+ "wish to associate the outcome of a delivery with a transaction.")
public void receiveTransactionalRetirementDischargeFail() throws Exception
{
- Utils.putMessageOnQueue(getBrokerAdmin(), BrokerAdmin.TEST_QUEUE_NAME, TEST_MESSAGE_CONTENT);
+ Utils.putMessageOnQueue(getBrokerAdmin(), BrokerAdmin.TEST_QUEUE_NAME, getTestName());
try (FrameTransport transport = new FrameTransport(_brokerAddress).connect())
{
final Interaction interaction = transport.newInteraction();
@@ -348,7 +355,7 @@
.decodeLatestDelivery();
Object data = interaction.getDecodedLatestDelivery();
- assertThat(data, is(equalTo(TEST_MESSAGE_CONTENT)));
+ assertThat(data, is(equalTo(getTestName())));
interaction.dispositionSettled(true)
.dispositionRole(Role.RECEIVER)
@@ -357,7 +364,7 @@
.txnDischarge(txnState, true);
Object receivedMessage = Utils.receiveMessage(_brokerAddress, BrokerAdmin.TEST_QUEUE_NAME);
- assertThat(receivedMessage, is(equalTo(TEST_MESSAGE_CONTENT)));
+ assertThat(receivedMessage, is(equalTo(getTestName())));
}
}
@@ -370,7 +377,7 @@
+ " upon a successful discharge.")
public void receiveTransactionalRetirementDispositionFailsDueToUnknownTransactionId() throws Exception
{
- Utils.putMessageOnQueue(getBrokerAdmin(), BrokerAdmin.TEST_QUEUE_NAME, TEST_MESSAGE_CONTENT);
+ Utils.putMessageOnQueue(getBrokerAdmin(), BrokerAdmin.TEST_QUEUE_NAME, getTestName());
try (FrameTransport transport = new FrameTransport(_brokerAddress).connect())
{
final Interaction interaction = transport.newInteraction();
@@ -403,7 +410,7 @@
assertThat(deliveryId, is(notNullValue()));
Object data = interaction.decodeLatestDelivery().getDecodedLatestDelivery();
- assertThat(data, is(equalTo(TEST_MESSAGE_CONTENT)));
+ assertThat(data, is(equalTo(getTestName())));
Response<?> response = interaction.dispositionSettled(true)
.dispositionRole(Role.RECEIVER)
@@ -414,6 +421,7 @@
.consumeResponse().getLatestResponse();
assertUnknownTransactionIdError(response);
}
+ assertThat(Utils.receiveMessage(_brokerAddress, BrokerAdmin.TEST_QUEUE_NAME), is(equalTo(getTestName())));
}
@Ignore("TODO disposition is currently not being sent by Broker")
@@ -422,7 +430,7 @@
+ "wish to associate the outcome of a delivery with a transaction.")
public void receiveTransactionalRetirementReceiverSettleSecond() throws Exception
{
- Utils.putMessageOnQueue(getBrokerAdmin(), BrokerAdmin.TEST_QUEUE_NAME, TEST_MESSAGE_CONTENT);
+ Utils.putMessageOnQueue(getBrokerAdmin(), BrokerAdmin.TEST_QUEUE_NAME, getTestName());
try (FrameTransport transport = new FrameTransport(_brokerAddress).connect())
{
final Interaction interaction = transport.newInteraction();
@@ -456,7 +464,7 @@
.decodeLatestDelivery();
Object data = interaction.getDecodedLatestDelivery();
- assertThat(data, is(equalTo(TEST_MESSAGE_CONTENT)));
+ assertThat(data, is(equalTo(getTestName())));
Disposition settledDisposition = interaction.dispositionSettled(false)
.dispositionRole(Role.RECEIVER)
@@ -483,7 +491,7 @@
+ " anticipated by the controller.")
public void receiveTransactionalAcquisitionReceiverSettleFirst() throws Exception
{
- Utils.putMessageOnQueue(getBrokerAdmin(), BrokerAdmin.TEST_QUEUE_NAME, TEST_MESSAGE_CONTENT);
+ Utils.putMessageOnQueue(getBrokerAdmin(), BrokerAdmin.TEST_QUEUE_NAME, getTestName());
try (FrameTransport transport = new FrameTransport(_brokerAddress).connect())
{
final Interaction interaction = transport.newInteraction();
@@ -523,7 +531,7 @@
assertThat(((TransactionalState) transfer.getState()).getTxnId(), is(equalTo(txnState.getCurrentTransactionId())));
Object data = interaction.decodeLatestDelivery().getDecodedLatestDelivery();
- assertThat(data, is(equalTo(TEST_MESSAGE_CONTENT)));
+ assertThat(data, is(equalTo(getTestName())));
interaction.dispositionSettled(true)
.dispositionRole(Role.RECEIVER)
@@ -531,8 +539,16 @@
.disposition()
.txnDischarge(txnState, false);
- assumeThat(getBrokerAdmin().isQueueDepthSupported(), is(true));
- assertThat(getBrokerAdmin().getQueueDepthMessages(BrokerAdmin.TEST_QUEUE_NAME), is(equalTo(0)));
+ if (getBrokerAdmin().isQueueDepthSupported())
+ {
+ assertThat(getBrokerAdmin().getQueueDepthMessages(BrokerAdmin.TEST_QUEUE_NAME), is(equalTo(0)));
+ }
+ else
+ {
+ final String content = getTestName() + "_2";
+ Utils.putMessageOnQueue(getBrokerAdmin(), BrokerAdmin.TEST_QUEUE_NAME, content);
+ assertThat(Utils.receiveMessage(_brokerAddress, BrokerAdmin.TEST_QUEUE_NAME), is(equalTo(content)));
+ }
}
}
@@ -545,7 +561,7 @@
+ " anticipated by the controller.")
public void receiveTransactionalAcquisitionDischargeFail() throws Exception
{
- Utils.putMessageOnQueue(getBrokerAdmin(), BrokerAdmin.TEST_QUEUE_NAME, TEST_MESSAGE_CONTENT);
+ Utils.putMessageOnQueue(getBrokerAdmin(), BrokerAdmin.TEST_QUEUE_NAME, getTestName());
try (FrameTransport transport = new FrameTransport(_brokerAddress).connect())
{
final Interaction interaction = transport.newInteraction();
@@ -585,7 +601,7 @@
assertThat(((TransactionalState) transfer.getState()).getTxnId(), is(equalTo(txnState.getCurrentTransactionId())));
Object data = interaction.decodeLatestDelivery().getDecodedLatestDelivery();
- assertThat(data, is(equalTo(TEST_MESSAGE_CONTENT)));
+ assertThat(data, is(equalTo(getTestName())));
interaction.dispositionSettled(true)
.dispositionRole(Role.RECEIVER)
@@ -593,8 +609,11 @@
.disposition()
.txnDischarge(txnState, true);
- assumeThat(getBrokerAdmin().isQueueDepthSupported(), is(true));
- assertThat(getBrokerAdmin().getQueueDepthMessages(BrokerAdmin.TEST_QUEUE_NAME), is(equalTo(1)));
+ if (getBrokerAdmin().isQueueDepthSupported())
+ {
+ assertThat(getBrokerAdmin().getQueueDepthMessages(BrokerAdmin.TEST_QUEUE_NAME), is(equalTo(1)));
+ }
+ assertThat(Utils.receiveMessage(_brokerAddress, BrokerAdmin.TEST_QUEUE_NAME), is(equalTo(getTestName())));
}
}
@@ -609,7 +628,7 @@
+ " properties map of the flow frame.")
public void receiveTransactionalAcquisitionFlowFailsDueToUnknownTransactionId() throws Exception
{
- Utils.putMessageOnQueue(getBrokerAdmin(), BrokerAdmin.TEST_QUEUE_NAME, TEST_MESSAGE_CONTENT);
+ Utils.putMessageOnQueue(getBrokerAdmin(), BrokerAdmin.TEST_QUEUE_NAME, getTestName());
try (FrameTransport transport = new FrameTransport(_brokerAddress).connect())
{
final Interaction interaction = transport.newInteraction();
@@ -642,6 +661,7 @@
assertUnknownTransactionIdError(response);
}
+ assertThat(Utils.receiveMessage(_brokerAddress, BrokerAdmin.TEST_QUEUE_NAME), is(equalTo(getTestName())));
}
private void assertUnknownTransactionIdError(final Response<?> response)
diff --git a/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/transport/link/AttachTest.java b/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/transport/link/AttachTest.java
index 774cc00..c81e950 100644
--- a/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/transport/link/AttachTest.java
+++ b/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/transport/link/AttachTest.java
@@ -102,7 +102,7 @@
+ "assigning it to an unused handle, and sending an attach frame.")
public void successfulAttachAsReceiver() throws Exception
{
- String queueName = "testQueue";
+ String queueName = BrokerAdmin.TEST_QUEUE_NAME;
getBrokerAdmin().createQueue(queueName);
final InetSocketAddress addr = getBrokerAdmin().getBrokerAddress(BrokerAdmin.PortType.ANONYMOUS_AMQP);
try (FrameTransport transport = new FrameTransport(addr).connect())
@@ -130,7 +130,7 @@
+ " detach the newly created link endpoint.")
public void attachReceiverWithNullTarget() throws Exception
{
- String queueName = "testQueue";
+ String queueName = BrokerAdmin.TEST_QUEUE_NAME;
getBrokerAdmin().createQueue(queueName);
final InetSocketAddress addr = getBrokerAdmin().getBrokerAddress(BrokerAdmin.PortType.ANONYMOUS_AMQP);
try (FrameTransport transport = new FrameTransport(addr).connect())
diff --git a/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/transport/link/FlowTest.java b/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/transport/link/FlowTest.java
index 0ff378b..1bfa6d3 100644
--- a/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/transport/link/FlowTest.java
+++ b/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/transport/link/FlowTest.java
@@ -32,6 +32,7 @@
import org.junit.Test;
import org.apache.qpid.server.protocol.v1_0.type.UnsignedInteger;
+import org.apache.qpid.server.protocol.v1_0.type.messaging.Accepted;
import org.apache.qpid.server.protocol.v1_0.type.transport.AmqpError;
import org.apache.qpid.server.protocol.v1_0.type.transport.Attach;
import org.apache.qpid.server.protocol.v1_0.type.transport.Begin;
@@ -132,11 +133,37 @@
public void synchronousGet() throws Exception
{
getBrokerAdmin().createQueue(BrokerAdmin.TEST_QUEUE_NAME);
- Utils.putMessageOnQueue(getBrokerAdmin(), BrokerAdmin.TEST_QUEUE_NAME, "foo");
+ Utils.putMessageOnQueue(getBrokerAdmin(), BrokerAdmin.TEST_QUEUE_NAME, getTestName());
final InetSocketAddress addr = getBrokerAdmin().getBrokerAddress(BrokerAdmin.PortType.ANONYMOUS_AMQP);
- String data = (String) Utils.receiveMessage(addr, BrokerAdmin.TEST_QUEUE_NAME);
- assertThat(data, is(equalTo("foo")));
+ try (FrameTransport transport = new FrameTransport(addr).connect())
+ {
+ final Interaction interaction = transport.newInteraction();
+ interaction.negotiateProtocol().consumeResponse()
+ .open().consumeResponse()
+ .begin().consumeResponse()
+ .attachRole(Role.RECEIVER)
+ .attachSourceAddress(BrokerAdmin.TEST_QUEUE_NAME)
+ .attach().consumeResponse()
+ .flowIncomingWindow(UnsignedInteger.ONE)
+ .flowNextIncomingId(UnsignedInteger.ZERO)
+ .flowOutgoingWindow(UnsignedInteger.ZERO)
+ .flowNextOutgoingId(UnsignedInteger.ZERO)
+ .flowLinkCredit(UnsignedInteger.ONE)
+ .flowHandleFromLinkHandle()
+ .flow()
+ .receiveDelivery()
+ .decodeLatestDelivery()
+ .dispositionSettled(true)
+ .dispositionRole(Role.RECEIVER)
+ .dispositionFirst(interaction.getLatestDeliveryId())
+ .dispositionLast(interaction.getLatestDeliveryId())
+ .dispositionState(new Accepted())
+ .disposition()
+ .sync();
+ final Object data = interaction.getDecodedLatestDelivery();
+ assertThat(data, is(equalTo(getTestName())));
+ }
}
@Test
@@ -259,7 +286,7 @@
{
BrokerAdmin brokerAdmin = getBrokerAdmin();
brokerAdmin.createQueue(BrokerAdmin.TEST_QUEUE_NAME);
- String messageContent = "Test";
+ String messageContent = getTestName();
Utils.putMessageOnQueue(getBrokerAdmin(), BrokerAdmin.TEST_QUEUE_NAME, messageContent);
final InetSocketAddress addr = brokerAdmin.getBrokerAddress(BrokerAdmin.PortType.ANONYMOUS_AMQP);
@@ -289,7 +316,8 @@
.getDecodedLatestDelivery();
assertThat(receivedMessageContent, is(equalTo(messageContent)));
- assertThat(interaction.getLatestDeliveryId(), is(equalTo(UnsignedInteger.ZERO)));
+ UnsignedInteger firstDeliveryId = interaction.getLatestDeliveryId();
+ assertThat(firstDeliveryId, is(equalTo(UnsignedInteger.ZERO)));
Flow responseFlow = interaction.flowNextIncomingId(UnsignedInteger.ONE)
.flowLinkCredit(UnsignedInteger.ONE)
@@ -301,6 +329,13 @@
assertThat(responseFlow.getHandle(), is(equalTo(remoteHandle)));
assertThat(responseFlow.getLinkCredit(), is(equalTo(UnsignedInteger.ZERO)));
+
+ interaction.dispositionSettled(true)
+ .dispositionRole(Role.RECEIVER)
+ .dispositionFirst(firstDeliveryId)
+ .dispositionState(new Accepted())
+ .disposition()
+ .sync();
}
}
@@ -317,10 +352,8 @@
{
BrokerAdmin brokerAdmin = getBrokerAdmin();
brokerAdmin.createQueue(BrokerAdmin.TEST_QUEUE_NAME);
- String messageContent1 = "Test1";
- String messageContent2 = "Test2";
- String messageContent3 = "Test2";
- Utils.putMessageOnQueue(getBrokerAdmin(), BrokerAdmin.TEST_QUEUE_NAME, messageContent1, messageContent2, messageContent3);
+ final String[] contents = Utils.createTestMessageContents(3, getTestName());
+ Utils.putMessageOnQueue(getBrokerAdmin(), BrokerAdmin.TEST_QUEUE_NAME, contents);
final InetSocketAddress addr = brokerAdmin.getBrokerAddress(BrokerAdmin.PortType.ANONYMOUS_AMQP);
try (FrameTransport transport = new FrameTransport(addr).connect())
@@ -348,8 +381,9 @@
.decodeLatestDelivery()
.getDecodedLatestDelivery();
- assertThat(receivedMessageContent1, is(equalTo(messageContent1)));
- assertThat(interaction.getLatestDeliveryId(), is(equalTo(UnsignedInteger.ZERO)));
+ assertThat(receivedMessageContent1, is(equalTo(contents[0])));
+ UnsignedInteger firstDeliveryId = interaction.getLatestDeliveryId();
+ assertThat(firstDeliveryId, is(equalTo(UnsignedInteger.ZERO)));
Object receivedMessageContent2 = interaction.flowIncomingWindow(incomingWindow)
.flowNextIncomingId(UnsignedInteger.ONE)
@@ -360,8 +394,9 @@
.decodeLatestDelivery()
.getDecodedLatestDelivery();
- assertThat(receivedMessageContent2, is(equalTo(messageContent2)));
- assertThat(interaction.getLatestDeliveryId(), is(equalTo(UnsignedInteger.ONE)));
+ assertThat(receivedMessageContent2, is(equalTo(contents[1])));
+ UnsignedInteger secondDeliveryId = interaction.getLatestDeliveryId();
+ assertThat(secondDeliveryId, is(equalTo(UnsignedInteger.ONE)));
// send session flow with echo=true to verify that no message is delivered without issuing a credit
Flow responseFlow = interaction.flowNextIncomingId(UnsignedInteger.valueOf(2))
@@ -372,7 +407,16 @@
.consumeResponse().getLatestResponse(Flow.class);
assertThat(responseFlow.getHandle(), is(nullValue()));
+
+ interaction.dispositionSettled(true)
+ .dispositionRole(Role.RECEIVER)
+ .dispositionFirst(firstDeliveryId)
+ .dispositionLast(secondDeliveryId)
+ .dispositionState(new Accepted())
+ .disposition()
+ .sync();
}
+ assertThat(Utils.receiveMessage(addr, BrokerAdmin.TEST_QUEUE_NAME), is(equalTo(contents[2])));
}
@Test
@@ -385,9 +429,8 @@
{
BrokerAdmin brokerAdmin = getBrokerAdmin();
brokerAdmin.createQueue(BrokerAdmin.TEST_QUEUE_NAME);
- String messageContent1 = "Test1";
- String messageContent2 = "Test2";
- Utils.putMessageOnQueue(getBrokerAdmin(), BrokerAdmin.TEST_QUEUE_NAME, messageContent1, messageContent2);
+ final String[] contents = Utils.createTestMessageContents(2, getTestName());
+ Utils.putMessageOnQueue(getBrokerAdmin(), BrokerAdmin.TEST_QUEUE_NAME, contents);
final InetSocketAddress addr = brokerAdmin.getBrokerAddress(BrokerAdmin.PortType.ANONYMOUS_AMQP);
try (FrameTransport transport = new FrameTransport(addr).connect())
@@ -413,7 +456,7 @@
.decodeLatestDelivery()
.getDecodedLatestDelivery();
- assertThat(receivedMessageContent1, is(equalTo(messageContent1)));
+ assertThat(receivedMessageContent1, is(equalTo(contents[0])));
assertThat(interaction.getLatestDeliveryId(), is(equalTo(UnsignedInteger.ZERO)));
Flow responseFlow = interaction.flowNextIncomingId(UnsignedInteger.ONE)
@@ -425,7 +468,15 @@
assertThat(responseFlow.getHandle(), is(equalTo(remoteHandle)));
assertThat(responseFlow.getLinkCredit(), is(equalTo(UnsignedInteger.ZERO)));
+
+ interaction.dispositionSettled(true)
+ .dispositionRole(Role.RECEIVER)
+ .dispositionFirst(interaction.getLatestDeliveryId())
+ .dispositionState(new Accepted())
+ .disposition()
+ .sync();
}
+ assertThat(Utils.receiveMessage(addr, BrokerAdmin.TEST_QUEUE_NAME), is(equalTo(contents[1])));
}
@Test
@@ -438,7 +489,7 @@
{
BrokerAdmin brokerAdmin = getBrokerAdmin();
brokerAdmin.createQueue(BrokerAdmin.TEST_QUEUE_NAME);
- Utils.putMessageOnQueue(getBrokerAdmin(), BrokerAdmin.TEST_QUEUE_NAME, "Test1");
+ Utils.putMessageOnQueue(getBrokerAdmin(), BrokerAdmin.TEST_QUEUE_NAME, getTestName());
final InetSocketAddress addr = brokerAdmin.getBrokerAddress(BrokerAdmin.PortType.ANONYMOUS_AMQP);
try (FrameTransport transport = new FrameTransport(addr).connect())
@@ -466,5 +517,6 @@
assertThat(responseFlow.getHandle(), is(equalTo(remoteHandle)));
assertThat(responseFlow.getLinkCredit(), is(equalTo(UnsignedInteger.ZERO)));
}
+ assertThat(Utils.receiveMessage(addr, BrokerAdmin.TEST_QUEUE_NAME), is(equalTo(getTestName())));
}
}
diff --git a/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/transport/link/ResumeDeliveriesTest.java b/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/transport/link/ResumeDeliveriesTest.java
index 9342fd8..337c129 100644
--- a/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/transport/link/ResumeDeliveriesTest.java
+++ b/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/transport/link/ResumeDeliveriesTest.java
@@ -78,7 +78,6 @@
public class ResumeDeliveriesTest extends BrokerAdminUsingTestBase
{
private static final int MIN_MAX_FRAME_SIZE = 512;
- private static final String TEST_MESSAGE_CONTENT = "foo";
private InetSocketAddress _brokerAddress;
private String _originalMmsMessageStorePersistence;
@@ -136,7 +135,7 @@
final Disposition responseDisposition = interaction.transferHandle(linkHandle1)
.transferDeliveryId(UnsignedInteger.ZERO)
.transferDeliveryTag(deliveryTag)
- .transferPayloadData(TEST_MESSAGE_CONTENT)
+ .transferPayloadData(getTestName())
.transfer()
.consumeResponse()
.getLatestResponse(Disposition.class);
@@ -197,7 +196,7 @@
// 2. send enough unsettled deliveries to cause incomplete-unsettled to be true
// assume each delivery requires at least 1 byte, therefore max-frame-size deliveries should be enough
interaction.transferHandle(linkHandle1)
- .transferPayloadData(TEST_MESSAGE_CONTENT);
+ .transferPayloadData(getTestName());
Map<Binary, DeliveryState> localUnsettled = new HashMap<>(open.getMaxFrameSize().intValue());
for (int i = 0; i < open.getMaxFrameSize().intValue(); ++i)
{
@@ -291,7 +290,7 @@
// 2. send enough unsettled deliverys to cause incomplete-unsettled to be true
// assume each delivery requires at least 1 byte, therefore max-frame-size deliveries should be enough
interaction.transferHandle(linkHandle1)
- .transferPayloadData(TEST_MESSAGE_CONTENT);
+ .transferPayloadData(getTestName());
Map<Binary, DeliveryState> localUnsettled = new HashMap<>(open.getMaxFrameSize().intValue());
for (int i = 0; i < open.getMaxFrameSize().intValue(); ++i)
{
@@ -351,7 +350,7 @@
{
for (int i = 0; i < MIN_MAX_FRAME_SIZE; i++)
{
- Utils.putMessageOnQueue(getBrokerAdmin(), BrokerAdmin.TEST_QUEUE_NAME, TEST_MESSAGE_CONTENT + "-" + i);
+ Utils.putMessageOnQueue(getBrokerAdmin(), BrokerAdmin.TEST_QUEUE_NAME, getTestName() + "-" + i);
}
final InetSocketAddress addr = getBrokerAdmin().getBrokerAddress(BrokerAdmin.PortType.ANONYMOUS_AMQP);
@@ -470,7 +469,7 @@
+ " considered unsettled by the issuing link endpoint.")
public void resumeReceivingLinkEmptyUnsettled() throws Exception
{
- Utils.putMessageOnQueue(getBrokerAdmin(), BrokerAdmin.TEST_QUEUE_NAME, TEST_MESSAGE_CONTENT);
+ Utils.putMessageOnQueue(getBrokerAdmin(), BrokerAdmin.TEST_QUEUE_NAME, getTestName());
try (FrameTransport transport = new FrameTransport(_brokerAddress).connect())
{
@@ -490,9 +489,11 @@
.decodeLatestDelivery();
Object data = interaction.getDecodedLatestDelivery();
- assertThat(data, is(equalTo(TEST_MESSAGE_CONTENT)));
+ assertThat(data, is(equalTo(getTestName())));
interaction.dispositionSettled(true)
+ .dispositionState(new Accepted())
+ .dispositionFirstFromLatestDelivery()
.dispositionRole(Role.RECEIVER)
.disposition();
@@ -518,7 +519,7 @@
+ " considered unsettled by the issuing link endpoint.")
public void resumeReceivingLinkWithSingleUnsettledAccepted() throws Exception
{
- Utils.putMessageOnQueue(getBrokerAdmin(), BrokerAdmin.TEST_QUEUE_NAME, TEST_MESSAGE_CONTENT);
+ Utils.putMessageOnQueue(getBrokerAdmin(), BrokerAdmin.TEST_QUEUE_NAME, getTestName());
try (FrameTransport transport = new FrameTransport(_brokerAddress).connect())
{
@@ -549,7 +550,7 @@
assertThat(deliveryTag, is(notNullValue()));
assertThat(transfer.getSettled(), is(not(equalTo(true))));
Object data = interaction.decodeLatestDelivery().getDecodedLatestDelivery();
- assertThat(data, is(equalTo(TEST_MESSAGE_CONTENT)));
+ assertThat(data, is(equalTo(getTestName())));
Detach detach = interaction.detach().consumeResponse().getLatestResponse(Detach.class);
assertThat(detach.getClosed(), anyOf(nullValue(), equalTo(false)));
@@ -606,7 +607,7 @@
+ " considered unsettled by the issuing link endpoint.")
public void resumeReceivingLinkOneUnsettledWithNoOutcome() throws Exception
{
- Utils.putMessageOnQueue(getBrokerAdmin(), BrokerAdmin.TEST_QUEUE_NAME, TEST_MESSAGE_CONTENT);
+ Utils.putMessageOnQueue(getBrokerAdmin(), BrokerAdmin.TEST_QUEUE_NAME, getTestName());
try (FrameTransport transport = new FrameTransport(_brokerAddress).connect())
{
@@ -636,7 +637,7 @@
Binary deliveryTag = transfer.getDeliveryTag();
assertThat(deliveryTag, is(notNullValue()));
Object data = interaction.decodeLatestDelivery().getDecodedLatestDelivery();
- assertThat(data, is(equalTo(TEST_MESSAGE_CONTENT)));
+ assertThat(data, is(equalTo(getTestName())));
Detach detach = interaction.detach().consumeResponse(Detach.class).getLatestResponse(Detach.class);
assertThat(detach.getClosed(), anyOf(nullValue(), equalTo(false)));
@@ -693,7 +694,7 @@
final String destination = BrokerAdmin.TEST_QUEUE_NAME;
final Binary deliveryTag = new Binary("testDeliveryTag".getBytes(StandardCharsets.UTF_8));
- QpidByteBuffer[] messagePayload = Utils.splitPayload("testData1", 2);
+ QpidByteBuffer[] messagePayload = Utils.splitPayload(getTestName(), 2);
try (FrameTransport transport = new FrameTransport(_brokerAddress).connect())
{