QPID-8349: Use AMQP 1.0 transport to publish test messages if broker admin does not support putMessageOnQueue
diff --git a/systests/protocol-tests-amqp-1-0/src/main/java/org/apache/qpid/tests/protocol/v1_0/Utils.java b/systests/protocol-tests-amqp-1-0/src/main/java/org/apache/qpid/tests/protocol/v1_0/Utils.java
index a0eab61..14b66e2 100644
--- a/systests/protocol-tests-amqp-1-0/src/main/java/org/apache/qpid/tests/protocol/v1_0/Utils.java
+++ b/systests/protocol-tests-amqp-1-0/src/main/java/org/apache/qpid/tests/protocol/v1_0/Utils.java
@@ -20,14 +20,23 @@
package org.apache.qpid.tests.protocol.v1_0;
+import static org.hamcrest.Matchers.greaterThan;
+import static org.hamcrest.Matchers.is;
+import static org.junit.Assume.assumeThat;
+
import java.net.InetSocketAddress;
import org.apache.qpid.server.bytebuffer.QpidByteBuffer;
import org.apache.qpid.server.protocol.v1_0.type.UnsignedInteger;
+import org.apache.qpid.server.protocol.v1_0.type.messaging.Accepted;
import org.apache.qpid.server.protocol.v1_0.type.messaging.Header;
import org.apache.qpid.server.protocol.v1_0.type.transport.Attach;
+import org.apache.qpid.server.protocol.v1_0.type.transport.Begin;
import org.apache.qpid.server.protocol.v1_0.type.transport.Detach;
+import org.apache.qpid.server.protocol.v1_0.type.transport.Flow;
+import org.apache.qpid.server.protocol.v1_0.type.transport.Open;
import org.apache.qpid.server.protocol.v1_0.type.transport.Role;
+import org.apache.qpid.tests.utils.BrokerAdmin;
public class Utils
{
@@ -115,4 +124,42 @@
return result;
}
+
+ public static void putMessageOnQueue(final BrokerAdmin brokerAdmin, final String queueName, final String... message)
+ throws Exception
+ {
+ if (brokerAdmin.isPutMessageOnQueueSupported())
+ {
+ brokerAdmin.putMessageOnQueue(queueName, message);
+ }
+ else
+ {
+ final InetSocketAddress brokerAddress = brokerAdmin.getBrokerAddress(BrokerAdmin.PortType.ANONYMOUS_AMQP);
+ try (FrameTransport transport = new FrameTransport(brokerAddress).connect())
+ {
+ final Interaction interaction = transport.newInteraction();
+ final Flow flow = interaction.negotiateProtocol().consumeResponse()
+ .open().consumeResponse(Open.class)
+ .begin().consumeResponse(Begin.class)
+ .attachRole(Role.SENDER)
+ .attachTargetAddress(BrokerAdmin.TEST_QUEUE_NAME)
+ .attach().consumeResponse(Attach.class)
+ .consumeResponse(Flow.class)
+ .getLatestResponse(Flow.class);
+
+ assumeThat(String.format("insufficient credit (%d) to publish %d messages",
+ flow.getLinkCredit().intValue(),
+ message.length),
+ flow.getLinkCredit().intValue(),
+ is(greaterThan(message.length)));
+ for (String payload : message)
+ {
+ interaction.transferPayloadData(payload)
+ .transferSettled(true)
+ .transfer()
+ .sync();
+ }
+ }
+ }
+ }
}
diff --git a/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/extensions/qpid/queue/QueueDeletionTest.java b/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/extensions/qpid/queue/QueueDeletionTest.java
index 8529672..6698337 100644
--- a/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/extensions/qpid/queue/QueueDeletionTest.java
+++ b/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/extensions/qpid/queue/QueueDeletionTest.java
@@ -52,6 +52,7 @@
import org.apache.qpid.tests.protocol.v1_0.FrameTransport;
import org.apache.qpid.tests.protocol.v1_0.Interaction;
import org.apache.qpid.tests.protocol.v1_0.InteractionTransactionalState;
+import org.apache.qpid.tests.protocol.v1_0.Utils;
import org.apache.qpid.tests.utils.BrokerAdmin;
import org.apache.qpid.tests.utils.BrokerAdminUsingTestBase;
import org.apache.qpid.tests.utils.BrokerSpecific;
@@ -180,9 +181,10 @@
@Test
public void transactedReceiverDetachedOnQueueDeletionWhenTransactionInProgress() throws Exception
{
- getBrokerAdmin().putMessageOnQueue(BrokerAdmin.TEST_QUEUE_NAME,
- TEST_MESSAGE_CONTENT + 1,
- TEST_MESSAGE_CONTENT + 2);
+ Utils.putMessageOnQueue(getBrokerAdmin(),
+ BrokerAdmin.TEST_QUEUE_NAME,
+ TEST_MESSAGE_CONTENT + 1,
+ TEST_MESSAGE_CONTENT + 2);
try (FrameTransport transport = new FrameTransport(_brokerAddress).connect())
{
final Interaction interaction = transport.newInteraction();
diff --git a/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/extensions/qpid/transactiontimeout/TransactionTimeoutTest.java b/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/extensions/qpid/transactiontimeout/TransactionTimeoutTest.java
index d8608a4..522fe4b 100644
--- a/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/extensions/qpid/transactiontimeout/TransactionTimeoutTest.java
+++ b/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/extensions/qpid/transactiontimeout/TransactionTimeoutTest.java
@@ -47,6 +47,7 @@
import org.apache.qpid.tests.protocol.v1_0.FrameTransport;
import org.apache.qpid.tests.protocol.v1_0.Interaction;
import org.apache.qpid.tests.protocol.v1_0.InteractionTransactionalState;
+import org.apache.qpid.tests.protocol.v1_0.Utils;
import org.apache.qpid.tests.utils.BrokerAdmin;
import org.apache.qpid.tests.utils.BrokerAdminUsingTestBase;
import org.apache.qpid.tests.utils.BrokerSpecific;
@@ -112,7 +113,7 @@
@Test
public void transactionalRetirementTimeout() throws Exception
{
- getBrokerAdmin().putMessageOnQueue(BrokerAdmin.TEST_QUEUE_NAME, TEST_MESSAGE_CONTENT);
+ Utils.putMessageOnQueue(getBrokerAdmin(), BrokerAdmin.TEST_QUEUE_NAME, TEST_MESSAGE_CONTENT);
try (FrameTransport transport = new FrameTransport(_brokerAddress).connect())
{
final Interaction interaction = transport.newInteraction();
diff --git a/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/messaging/OutcomeTest.java b/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/messaging/OutcomeTest.java
index cc377cc..2d48b7e 100644
--- a/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/messaging/OutcomeTest.java
+++ b/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/messaging/OutcomeTest.java
@@ -38,6 +38,7 @@
import org.apache.qpid.tests.protocol.v1_0.FrameTransport;
import org.apache.qpid.tests.protocol.v1_0.Interaction;
import org.apache.qpid.tests.protocol.SpecificationTest;
+import org.apache.qpid.tests.protocol.v1_0.Utils;
import org.apache.qpid.tests.utils.BrokerAdmin;
import org.apache.qpid.tests.utils.BrokerAdminUsingTestBase;
@@ -58,8 +59,7 @@
+ " MUST NOT be redelivered to the modifying link endpoint.")
public void modifiedOutcomeWithUndeliverableHere() throws Exception
{
- getBrokerAdmin().putMessageOnQueue(BrokerAdmin.TEST_QUEUE_NAME, "message1");
- getBrokerAdmin().putMessageOnQueue(BrokerAdmin.TEST_QUEUE_NAME, "message2");
+ Utils.putMessageOnQueue(getBrokerAdmin(), BrokerAdmin.TEST_QUEUE_NAME, "message1", "message2");
try (FrameTransport transport = new FrameTransport(_brokerAddress).connect())
{
diff --git a/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/messaging/TransferTest.java b/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/messaging/TransferTest.java
index 94a1249..4d6b99f 100644
--- a/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/messaging/TransferTest.java
+++ b/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/messaging/TransferTest.java
@@ -43,6 +43,7 @@
import java.util.TreeSet;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.stream.Collectors;
+import java.util.stream.IntStream;
import java.util.stream.LongStream;
import java.util.stream.Stream;
@@ -439,7 +440,7 @@
@SpecificationTest(section = "2.6.12", description = "Transferring A Message.")
public void receiveTransferUnsettled() throws Exception
{
- getBrokerAdmin().putMessageOnQueue(BrokerAdmin.TEST_QUEUE_NAME, TEST_MESSAGE_DATA);
+ Utils.putMessageOnQueue(getBrokerAdmin(), BrokerAdmin.TEST_QUEUE_NAME, TEST_MESSAGE_DATA);
try (FrameTransport transport = new FrameTransport(_brokerAddress).connect())
{
@@ -477,7 +478,7 @@
@SpecificationTest(section = "2.6.12", description = "Transferring A Message.")
public void receiveTransferReceiverSettleFirst() throws Exception
{
- getBrokerAdmin().putMessageOnQueue(BrokerAdmin.TEST_QUEUE_NAME, TEST_MESSAGE_DATA);
+ Utils.putMessageOnQueue(getBrokerAdmin(), BrokerAdmin.TEST_QUEUE_NAME, TEST_MESSAGE_DATA);
try (FrameTransport transport = new FrameTransport(_brokerAddress).connect())
{
@@ -515,7 +516,7 @@
@SpecificationTest(section = "2.6.12", description = "Transferring A Message.")
public void receiveTransferReceiverSettleSecond() throws Exception
{
- getBrokerAdmin().putMessageOnQueue(BrokerAdmin.TEST_QUEUE_NAME, TEST_MESSAGE_DATA);
+ Utils.putMessageOnQueue(getBrokerAdmin(), BrokerAdmin.TEST_QUEUE_NAME, TEST_MESSAGE_DATA);
try (FrameTransport transport = new FrameTransport(_brokerAddress).connect())
{
@@ -557,7 +558,7 @@
@SpecificationTest(section = "2.6.12", description = "Transferring A Message.")
public void receiveTransferReceiverSettleSecondWithRejectedOutcome() throws Exception
{
- getBrokerAdmin().putMessageOnQueue(BrokerAdmin.TEST_QUEUE_NAME, TEST_MESSAGE_DATA);
+ Utils.putMessageOnQueue(getBrokerAdmin(), BrokerAdmin.TEST_QUEUE_NAME, TEST_MESSAGE_DATA);
try (FrameTransport transport = new FrameTransport(_brokerAddress).connect())
{
@@ -607,7 +608,7 @@
@SpecificationTest(section = "2.6.12", description = "Transferring A Message.")
public void receiveTransferReceiverSettleSecondWithImplicitDispositionState() throws Exception
{
- getBrokerAdmin().putMessageOnQueue(BrokerAdmin.TEST_QUEUE_NAME, TEST_MESSAGE_DATA);
+ Utils.putMessageOnQueue(getBrokerAdmin(), BrokerAdmin.TEST_QUEUE_NAME, TEST_MESSAGE_DATA);
try (FrameTransport transport = new FrameTransport(_brokerAddress).connect())
{
@@ -665,7 +666,7 @@
int negotiatedFrameSize = open.getMaxFrameSize().intValue();
String testMessageData = Stream.generate(() -> "*").limit(negotiatedFrameSize).collect(Collectors.joining());
- getBrokerAdmin().putMessageOnQueue(BrokerAdmin.TEST_QUEUE_NAME, testMessageData);
+ Utils.putMessageOnQueue(getBrokerAdmin(), BrokerAdmin.TEST_QUEUE_NAME, testMessageData);
interaction.begin().consumeResponse()
.attachRole(Role.RECEIVER)
@@ -727,7 +728,7 @@
+ " the receiver initiates the attach exchange and the sender supports the desired mode.")
public void receiveTransferSenderSettleModeSettled() throws Exception
{
- getBrokerAdmin().putMessageOnQueue(BrokerAdmin.TEST_QUEUE_NAME, TEST_MESSAGE_DATA);
+ Utils.putMessageOnQueue(getBrokerAdmin(), BrokerAdmin.TEST_QUEUE_NAME, TEST_MESSAGE_DATA);
try (FrameTransport transport = new FrameTransport(_brokerAddress).connect())
{
@@ -764,7 +765,7 @@
{
assertThat(getBrokerAdmin().getQueueDepthMessages(BrokerAdmin.TEST_QUEUE_NAME), is(equalTo(0)));
}
- getBrokerAdmin().putMessageOnQueue(BrokerAdmin.TEST_QUEUE_NAME, "test");
+ Utils.putMessageOnQueue(getBrokerAdmin(), BrokerAdmin.TEST_QUEUE_NAME, "test");
assertThat(Utils.receiveMessage(_brokerAddress, BrokerAdmin.TEST_QUEUE_NAME), is(equalTo("test")));
}
@@ -1047,10 +1048,8 @@
public void receiveMultipleDeliveries() throws Exception
{
int numberOfMessages = 4;
- for (int i = 0; i < numberOfMessages; i++)
- {
- getBrokerAdmin().putMessageOnQueue(BrokerAdmin.TEST_QUEUE_NAME, TEST_MESSAGE_DATA + "_" + i);
- }
+ Utils.putMessageOnQueue(getBrokerAdmin(), BrokerAdmin.TEST_QUEUE_NAME,
+ IntStream.range(0, 4).mapToObj(i -> TEST_MESSAGE_DATA + "_" + i).toArray(String[]::new));
try (FrameTransport transport = new FrameTransport(_brokerAddress).connect())
{
@@ -1099,7 +1098,7 @@
}
String messageText = TEST_MESSAGE_DATA + "_" + 4;
- getBrokerAdmin().putMessageOnQueue(BrokerAdmin.TEST_QUEUE_NAME, messageText);
+ Utils.putMessageOnQueue(getBrokerAdmin(), BrokerAdmin.TEST_QUEUE_NAME, messageText);
Object receivedMessage = Utils.receiveMessage(_brokerAddress, BrokerAdmin.TEST_QUEUE_NAME);
assertThat(receivedMessage, is(equalTo(messageText)));
}
@@ -1109,10 +1108,8 @@
public void receiveMixtureOfTransactionalAndNonTransactionalDeliveries() throws Exception
{
int numberOfMessages = 4;
- for (int i = 0; i < numberOfMessages; i++)
- {
- getBrokerAdmin().putMessageOnQueue(BrokerAdmin.TEST_QUEUE_NAME, TEST_MESSAGE_DATA + "_" + i);
- }
+ Utils.putMessageOnQueue(getBrokerAdmin(), BrokerAdmin.TEST_QUEUE_NAME,
+ IntStream.range(0, 4).mapToObj(i -> TEST_MESSAGE_DATA + "_" + i).toArray(String[]::new));
try (FrameTransport transport = new FrameTransport(_brokerAddress).connect())
{
@@ -1196,7 +1193,7 @@
}
String messageText = TEST_MESSAGE_DATA + "_" + 4;
- getBrokerAdmin().putMessageOnQueue(BrokerAdmin.TEST_QUEUE_NAME, messageText);
+ Utils.putMessageOnQueue(getBrokerAdmin(), BrokerAdmin.TEST_QUEUE_NAME, messageText);
Object receivedMessage = Utils.receiveMessage(_brokerAddress, BrokerAdmin.TEST_QUEUE_NAME);
assertThat(receivedMessage, is(equalTo(messageText)));
}
diff --git a/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/transaction/DischargeTest.java b/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/transaction/DischargeTest.java
index bd7c113..32a96b0 100644
--- a/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/transaction/DischargeTest.java
+++ b/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/transaction/DischargeTest.java
@@ -59,6 +59,7 @@
import org.apache.qpid.tests.protocol.v1_0.Interaction;
import org.apache.qpid.tests.protocol.v1_0.InteractionTransactionalState;
import org.apache.qpid.tests.protocol.SpecificationTest;
+import org.apache.qpid.tests.protocol.v1_0.Utils;
import org.apache.qpid.tests.utils.BrokerAdmin;
import org.apache.qpid.tests.utils.BrokerAdminUsingTestBase;
@@ -168,7 +169,7 @@
{
assumeThat(getBrokerAdmin().isQueueDepthSupported(), is(true));
- getBrokerAdmin().putMessageOnQueue(BrokerAdmin.TEST_QUEUE_NAME, "test message");
+ Utils.putMessageOnQueue(getBrokerAdmin(), BrokerAdmin.TEST_QUEUE_NAME, "test message");
try (FrameTransport transport = new FrameTransport(_brokerAddress).connect())
{
final Interaction interaction = transport.newInteraction();
diff --git a/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/transaction/TransactionalTransferTest.java b/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/transaction/TransactionalTransferTest.java
index 85b5ecd..1a146f4 100644
--- a/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/transaction/TransactionalTransferTest.java
+++ b/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/transaction/TransactionalTransferTest.java
@@ -265,7 +265,7 @@
+ "wish to associate the outcome of a delivery with a transaction.")
public void receiveTransactionalRetirementReceiverSettleFirst() throws Exception
{
- getBrokerAdmin().putMessageOnQueue(BrokerAdmin.TEST_QUEUE_NAME, TEST_MESSAGE_CONTENT);
+ Utils.putMessageOnQueue(getBrokerAdmin(), BrokerAdmin.TEST_QUEUE_NAME, TEST_MESSAGE_CONTENT);
try (FrameTransport transport = new FrameTransport(_brokerAddress).connect())
{
final Interaction interaction = transport.newInteraction();
@@ -314,7 +314,7 @@
+ "wish to associate the outcome of a delivery with a transaction.")
public void receiveTransactionalRetirementDischargeFail() throws Exception
{
- getBrokerAdmin().putMessageOnQueue(BrokerAdmin.TEST_QUEUE_NAME, TEST_MESSAGE_CONTENT);
+ Utils.putMessageOnQueue(getBrokerAdmin(), BrokerAdmin.TEST_QUEUE_NAME, TEST_MESSAGE_CONTENT);
try (FrameTransport transport = new FrameTransport(_brokerAddress).connect())
{
final Interaction interaction = transport.newInteraction();
@@ -370,7 +370,7 @@
+ " upon a successful discharge.")
public void receiveTransactionalRetirementDispositionFailsDueToUnknownTransactionId() throws Exception
{
- getBrokerAdmin().putMessageOnQueue(BrokerAdmin.TEST_QUEUE_NAME, TEST_MESSAGE_CONTENT);
+ Utils.putMessageOnQueue(getBrokerAdmin(), BrokerAdmin.TEST_QUEUE_NAME, TEST_MESSAGE_CONTENT);
try (FrameTransport transport = new FrameTransport(_brokerAddress).connect())
{
final Interaction interaction = transport.newInteraction();
@@ -422,7 +422,7 @@
+ "wish to associate the outcome of a delivery with a transaction.")
public void receiveTransactionalRetirementReceiverSettleSecond() throws Exception
{
- getBrokerAdmin().putMessageOnQueue(BrokerAdmin.TEST_QUEUE_NAME, TEST_MESSAGE_CONTENT);
+ Utils.putMessageOnQueue(getBrokerAdmin(), BrokerAdmin.TEST_QUEUE_NAME, TEST_MESSAGE_CONTENT);
try (FrameTransport transport = new FrameTransport(_brokerAddress).connect())
{
final Interaction interaction = transport.newInteraction();
@@ -483,7 +483,7 @@
+ " anticipated by the controller.")
public void receiveTransactionalAcquisitionReceiverSettleFirst() throws Exception
{
- getBrokerAdmin().putMessageOnQueue(BrokerAdmin.TEST_QUEUE_NAME, TEST_MESSAGE_CONTENT);
+ Utils.putMessageOnQueue(getBrokerAdmin(), BrokerAdmin.TEST_QUEUE_NAME, TEST_MESSAGE_CONTENT);
try (FrameTransport transport = new FrameTransport(_brokerAddress).connect())
{
final Interaction interaction = transport.newInteraction();
@@ -545,7 +545,7 @@
+ " anticipated by the controller.")
public void receiveTransactionalAcquisitionDischargeFail() throws Exception
{
- getBrokerAdmin().putMessageOnQueue(BrokerAdmin.TEST_QUEUE_NAME, TEST_MESSAGE_CONTENT);
+ Utils.putMessageOnQueue(getBrokerAdmin(), BrokerAdmin.TEST_QUEUE_NAME, TEST_MESSAGE_CONTENT);
try (FrameTransport transport = new FrameTransport(_brokerAddress).connect())
{
final Interaction interaction = transport.newInteraction();
@@ -609,7 +609,7 @@
+ " properties map of the flow frame.")
public void receiveTransactionalAcquisitionFlowFailsDueToUnknownTransactionId() throws Exception
{
- getBrokerAdmin().putMessageOnQueue(BrokerAdmin.TEST_QUEUE_NAME, TEST_MESSAGE_CONTENT);
+ Utils.putMessageOnQueue(getBrokerAdmin(), BrokerAdmin.TEST_QUEUE_NAME, TEST_MESSAGE_CONTENT);
try (FrameTransport transport = new FrameTransport(_brokerAddress).connect())
{
final Interaction interaction = transport.newInteraction();
diff --git a/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/transport/link/FlowTest.java b/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/transport/link/FlowTest.java
index 8d1d93d..0ff378b 100644
--- a/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/transport/link/FlowTest.java
+++ b/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/transport/link/FlowTest.java
@@ -132,7 +132,7 @@
public void synchronousGet() throws Exception
{
getBrokerAdmin().createQueue(BrokerAdmin.TEST_QUEUE_NAME);
- getBrokerAdmin().putMessageOnQueue(BrokerAdmin.TEST_QUEUE_NAME, "foo");
+ Utils.putMessageOnQueue(getBrokerAdmin(), BrokerAdmin.TEST_QUEUE_NAME, "foo");
final InetSocketAddress addr = getBrokerAdmin().getBrokerAddress(BrokerAdmin.PortType.ANONYMOUS_AMQP);
String data = (String) Utils.receiveMessage(addr, BrokerAdmin.TEST_QUEUE_NAME);
@@ -260,7 +260,7 @@
BrokerAdmin brokerAdmin = getBrokerAdmin();
brokerAdmin.createQueue(BrokerAdmin.TEST_QUEUE_NAME);
String messageContent = "Test";
- brokerAdmin.putMessageOnQueue(BrokerAdmin.TEST_QUEUE_NAME, messageContent);
+ Utils.putMessageOnQueue(getBrokerAdmin(), BrokerAdmin.TEST_QUEUE_NAME, messageContent);
final InetSocketAddress addr = brokerAdmin.getBrokerAddress(BrokerAdmin.PortType.ANONYMOUS_AMQP);
try (FrameTransport transport = new FrameTransport(addr).connect())
@@ -320,9 +320,7 @@
String messageContent1 = "Test1";
String messageContent2 = "Test2";
String messageContent3 = "Test2";
- brokerAdmin.putMessageOnQueue(BrokerAdmin.TEST_QUEUE_NAME, messageContent1);
- brokerAdmin.putMessageOnQueue(BrokerAdmin.TEST_QUEUE_NAME, messageContent2);
- brokerAdmin.putMessageOnQueue(BrokerAdmin.TEST_QUEUE_NAME, messageContent3);
+ Utils.putMessageOnQueue(getBrokerAdmin(), BrokerAdmin.TEST_QUEUE_NAME, messageContent1, messageContent2, messageContent3);
final InetSocketAddress addr = brokerAdmin.getBrokerAddress(BrokerAdmin.PortType.ANONYMOUS_AMQP);
try (FrameTransport transport = new FrameTransport(addr).connect())
@@ -389,8 +387,7 @@
brokerAdmin.createQueue(BrokerAdmin.TEST_QUEUE_NAME);
String messageContent1 = "Test1";
String messageContent2 = "Test2";
- brokerAdmin.putMessageOnQueue(BrokerAdmin.TEST_QUEUE_NAME, messageContent1);
- brokerAdmin.putMessageOnQueue(BrokerAdmin.TEST_QUEUE_NAME, messageContent2);
+ Utils.putMessageOnQueue(getBrokerAdmin(), BrokerAdmin.TEST_QUEUE_NAME, messageContent1, messageContent2);
final InetSocketAddress addr = brokerAdmin.getBrokerAddress(BrokerAdmin.PortType.ANONYMOUS_AMQP);
try (FrameTransport transport = new FrameTransport(addr).connect())
@@ -441,7 +438,7 @@
{
BrokerAdmin brokerAdmin = getBrokerAdmin();
brokerAdmin.createQueue(BrokerAdmin.TEST_QUEUE_NAME);
- brokerAdmin.putMessageOnQueue(BrokerAdmin.TEST_QUEUE_NAME, "Test1");
+ Utils.putMessageOnQueue(getBrokerAdmin(), BrokerAdmin.TEST_QUEUE_NAME, "Test1");
final InetSocketAddress addr = brokerAdmin.getBrokerAddress(BrokerAdmin.PortType.ANONYMOUS_AMQP);
try (FrameTransport transport = new FrameTransport(addr).connect())
diff --git a/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/transport/link/ResumeDeliveriesTest.java b/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/transport/link/ResumeDeliveriesTest.java
index a98d76f..9342fd8 100644
--- a/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/transport/link/ResumeDeliveriesTest.java
+++ b/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/transport/link/ResumeDeliveriesTest.java
@@ -351,7 +351,7 @@
{
for (int i = 0; i < MIN_MAX_FRAME_SIZE; i++)
{
- getBrokerAdmin().putMessageOnQueue(BrokerAdmin.TEST_QUEUE_NAME, TEST_MESSAGE_CONTENT + "-" + i);
+ Utils.putMessageOnQueue(getBrokerAdmin(), BrokerAdmin.TEST_QUEUE_NAME, TEST_MESSAGE_CONTENT + "-" + i);
}
final InetSocketAddress addr = getBrokerAdmin().getBrokerAddress(BrokerAdmin.PortType.ANONYMOUS_AMQP);
@@ -470,7 +470,7 @@
+ " considered unsettled by the issuing link endpoint.")
public void resumeReceivingLinkEmptyUnsettled() throws Exception
{
- getBrokerAdmin().putMessageOnQueue(BrokerAdmin.TEST_QUEUE_NAME, TEST_MESSAGE_CONTENT);
+ Utils.putMessageOnQueue(getBrokerAdmin(), BrokerAdmin.TEST_QUEUE_NAME, TEST_MESSAGE_CONTENT);
try (FrameTransport transport = new FrameTransport(_brokerAddress).connect())
{
@@ -518,7 +518,7 @@
+ " considered unsettled by the issuing link endpoint.")
public void resumeReceivingLinkWithSingleUnsettledAccepted() throws Exception
{
- getBrokerAdmin().putMessageOnQueue(BrokerAdmin.TEST_QUEUE_NAME, TEST_MESSAGE_CONTENT);
+ Utils.putMessageOnQueue(getBrokerAdmin(), BrokerAdmin.TEST_QUEUE_NAME, TEST_MESSAGE_CONTENT);
try (FrameTransport transport = new FrameTransport(_brokerAddress).connect())
{
@@ -606,7 +606,7 @@
+ " considered unsettled by the issuing link endpoint.")
public void resumeReceivingLinkOneUnsettledWithNoOutcome() throws Exception
{
- getBrokerAdmin().putMessageOnQueue(BrokerAdmin.TEST_QUEUE_NAME, TEST_MESSAGE_CONTENT);
+ Utils.putMessageOnQueue(getBrokerAdmin(), BrokerAdmin.TEST_QUEUE_NAME, TEST_MESSAGE_CONTENT);
try (FrameTransport transport = new FrameTransport(_brokerAddress).connect())
{