blob: 94a1249cfded5e66f67e2b117e0ccdd910667a08 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*
*/
package org.apache.qpid.tests.protocol.v1_0.messaging;
import static java.nio.charset.StandardCharsets.UTF_8;
import static org.hamcrest.CoreMatchers.anyOf;
import static org.hamcrest.CoreMatchers.instanceOf;
import static org.hamcrest.CoreMatchers.notNullValue;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.both;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.greaterThan;
import static org.hamcrest.Matchers.is;
import static org.hamcrest.Matchers.lessThan;
import static org.hamcrest.Matchers.not;
import static org.hamcrest.Matchers.nullValue;
import static org.hamcrest.Matchers.oneOf;
import static org.junit.Assert.fail;
import static org.junit.Assume.assumeThat;
import java.net.InetSocketAddress;
import java.nio.charset.StandardCharsets;
import java.util.Arrays;
import java.util.List;
import java.util.TreeSet;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.stream.Collectors;
import java.util.stream.LongStream;
import java.util.stream.Stream;
import com.google.common.collect.Sets;
import org.hamcrest.CoreMatchers;
import org.hamcrest.core.Is;
import org.junit.After;
import org.junit.Before;
import org.junit.Ignore;
import org.junit.Test;
import org.apache.qpid.server.bytebuffer.QpidByteBuffer;
import org.apache.qpid.server.protocol.v1_0.type.Binary;
import org.apache.qpid.server.protocol.v1_0.type.ErrorCarryingFrameBody;
import org.apache.qpid.server.protocol.v1_0.type.Outcome;
import org.apache.qpid.server.protocol.v1_0.type.UnsignedInteger;
import org.apache.qpid.server.protocol.v1_0.type.UnsignedLong;
import org.apache.qpid.server.protocol.v1_0.type.messaging.Accepted;
import org.apache.qpid.server.protocol.v1_0.type.messaging.Header;
import org.apache.qpid.server.protocol.v1_0.type.messaging.Received;
import org.apache.qpid.server.protocol.v1_0.type.messaging.Rejected;
import org.apache.qpid.server.protocol.v1_0.type.transaction.Discharge;
import org.apache.qpid.server.protocol.v1_0.type.transport.AmqpError;
import org.apache.qpid.server.protocol.v1_0.type.transport.Attach;
import org.apache.qpid.server.protocol.v1_0.type.transport.Begin;
import org.apache.qpid.server.protocol.v1_0.type.transport.Close;
import org.apache.qpid.server.protocol.v1_0.type.transport.Detach;
import org.apache.qpid.server.protocol.v1_0.type.transport.Disposition;
import org.apache.qpid.server.protocol.v1_0.type.transport.End;
import org.apache.qpid.server.protocol.v1_0.type.transport.Error;
import org.apache.qpid.server.protocol.v1_0.type.transport.Flow;
import org.apache.qpid.server.protocol.v1_0.type.transport.LinkError;
import org.apache.qpid.server.protocol.v1_0.type.transport.Open;
import org.apache.qpid.server.protocol.v1_0.type.transport.ReceiverSettleMode;
import org.apache.qpid.server.protocol.v1_0.type.transport.Role;
import org.apache.qpid.server.protocol.v1_0.type.transport.SenderSettleMode;
import org.apache.qpid.server.protocol.v1_0.type.transport.Transfer;
import org.apache.qpid.tests.protocol.Response;
import org.apache.qpid.tests.protocol.SpecificationTest;
import org.apache.qpid.tests.protocol.v1_0.FrameTransport;
import org.apache.qpid.tests.protocol.v1_0.Interaction;
import org.apache.qpid.tests.protocol.v1_0.InteractionTransactionalState;
import org.apache.qpid.tests.protocol.v1_0.MessageDecoder;
import org.apache.qpid.tests.protocol.v1_0.MessageEncoder;
import org.apache.qpid.tests.protocol.v1_0.Utils;
import org.apache.qpid.tests.utils.BrokerAdmin;
import org.apache.qpid.tests.utils.BrokerAdminUsingTestBase;
public class TransferTest extends BrokerAdminUsingTestBase
{
private static final String TEST_MESSAGE_DATA = "foo";
private static final long MAX_MAX_MESSAGE_SIZE_WE_ARE_WILLING_TO_TEST = 200 * 1024 * 1024L;
private InetSocketAddress _brokerAddress;
private String _originalMmsMessageStorePersistence;
@Before
public void setUp()
{
_originalMmsMessageStorePersistence = System.getProperty("qpid.tests.mms.messagestore.persistence");
System.setProperty("qpid.tests.mms.messagestore.persistence", "false");
getBrokerAdmin().createQueue(BrokerAdmin.TEST_QUEUE_NAME);
_brokerAddress = getBrokerAdmin().getBrokerAddress(BrokerAdmin.PortType.ANONYMOUS_AMQP);
}
@After
public void tearDown()
{
if (_originalMmsMessageStorePersistence != null)
{
System.setProperty("qpid.tests.mms.messagestore.persistence", _originalMmsMessageStorePersistence);
}
else
{
System.clearProperty("qpid.tests.mms.messagestore.persistence");
}
}
@Test
@SpecificationTest(section = "1.3.4",
description = "Transfer without mandatory fields should result in a decoding error.")
public void emptyTransfer() throws Exception
{
try (FrameTransport transport = new FrameTransport(_brokerAddress).connect())
{
Close responseClose = transport.newInteraction()
.negotiateProtocol().consumeResponse()
.open().consumeResponse(Open.class)
.begin().consumeResponse(Begin.class)
.attachRole(Role.SENDER)
.attach().consumeResponse(Attach.class)
.consumeResponse(Flow.class)
.transferHandle(null)
.transfer()
.consumeResponse()
.getLatestResponse(Close.class);
assertThat(responseClose.getError(), is(notNullValue()));
assertThat(responseClose.getError().getCondition(), equalTo(AmqpError.DECODE_ERROR));
}
}
@Test
@SpecificationTest(section = "2.7.5",
description = "[delivery-tag] MUST be specified for the first transfer "
+ "[...] and can only be omitted for continuation transfers.")
public void transferWithoutDeliveryTag() throws Exception
{
try (FrameTransport transport = new FrameTransport(_brokerAddress).connect())
{
Interaction interaction = transport.newInteraction()
.negotiateProtocol().consumeResponse()
.open().consumeResponse(Open.class)
.begin().consumeResponse(Begin.class)
.attachRole(Role.SENDER)
.attachTargetAddress(BrokerAdmin.TEST_QUEUE_NAME)
.attach().consumeResponse(Attach.class)
.consumeResponse(Flow.class)
.transferDeliveryTag(null)
.transferPayloadData("testData")
.transfer();
interaction.consumeResponse(Detach.class, End.class, Close.class);
}
}
@Test
@SpecificationTest(section = "2.6.12",
description = "Transferring A Message.")
public void transferUnsettled() throws Exception
{
try (FrameTransport transport = new FrameTransport(_brokerAddress).connect())
{
final UnsignedInteger linkHandle = UnsignedInteger.ONE;
Disposition responseDisposition = transport.newInteraction()
.negotiateProtocol().consumeResponse()
.open().consumeResponse(Open.class)
.begin().consumeResponse(Begin.class)
.attachRole(Role.SENDER)
.attachTargetAddress(BrokerAdmin.TEST_QUEUE_NAME)
.attachHandle(linkHandle)
.attach().consumeResponse(Attach.class)
.consumeResponse(Flow.class)
.transferHandle(linkHandle)
.transferPayloadData("testData")
.transfer()
.consumeResponse()
.getLatestResponse(Disposition.class);
assertThat(responseDisposition.getRole(), is(Role.RECEIVER));
assertThat(responseDisposition.getSettled(), is(Boolean.TRUE));
assertThat(responseDisposition.getState(), is(instanceOf(Accepted.class)));
}
}
@Test
@SpecificationTest(section = "2.6.12 Transferring A Message",
description = "The delivery-tag MUST be unique amongst all deliveries"
+ " that could be considered unsettled by either end of the link.")
public void transferMessagesWithTheSameDeliveryTagOnSeparateLinksBelongingToTheSameSession() throws Exception
{
try (final FrameTransport transport = new FrameTransport(_brokerAddress).connect())
{
final UnsignedInteger link1Handle = UnsignedInteger.ONE;
final UnsignedInteger link2Handle = UnsignedInteger.valueOf(2);
final Binary deliveryTag = new Binary("deliveryTag".getBytes(StandardCharsets.UTF_8));
final Interaction interaction = transport.newInteraction();
interaction.negotiateProtocol().consumeResponse()
.open().consumeResponse(Open.class)
.begin().consumeResponse(Begin.class)
.attachName("test1")
.attachRole(Role.SENDER)
.attachTargetAddress(BrokerAdmin.TEST_QUEUE_NAME)
.attachSndSettleMode(SenderSettleMode.UNSETTLED)
.attachRcvSettleMode(ReceiverSettleMode.FIRST)
.attachHandle(link1Handle)
.attach().consumeResponse(Attach.class)
.consumeResponse(Flow.class)
.attachName("test2")
.attachHandle(link2Handle)
.attach().consumeResponse(Attach.class)
.consumeResponse(Flow.class)
.transferHandle(link1Handle)
.transferPayloadData("testData")
.transferDeliveryTag(deliveryTag)
.transferDeliveryId(UnsignedInteger.ZERO)
.transfer()
.transferHandle(link2Handle)
.transferDeliveryId(UnsignedInteger.ONE)
.transferPayloadData("testData2")
.transferDeliveryTag(deliveryTag)
.transfer();
final Disposition disposition1 = interaction.consumeResponse().getLatestResponse(Disposition.class);
final UnsignedInteger first = disposition1.getFirst();
final UnsignedInteger last = disposition1.getLast();
assertThat(first, anyOf(is(UnsignedInteger.ZERO), is(UnsignedInteger.ONE)));
assertThat(last, anyOf(nullValue(), is(UnsignedInteger.ZERO), is(UnsignedInteger.ONE)));
if (last == null || first.equals(last))
{
final Disposition disposition2 = interaction.consumeResponse().getLatestResponse(Disposition.class);
assertThat(disposition2.getFirst(), anyOf(is(UnsignedInteger.ZERO), is(UnsignedInteger.ONE)));
assertThat(disposition2.getLast(), anyOf(nullValue(), is(UnsignedInteger.ZERO), is(UnsignedInteger.ONE)));
assertThat(disposition2.getFirst(), is(not(equalTo(first))));
}
}
}
@Test
@SpecificationTest(section = "2.7.5",
description = "If first, this indicates that the receiver MUST settle the delivery once it has arrived without waiting for the sender to settle first")
public void transferReceiverSettleModeFirst() throws Exception
{
try (FrameTransport transport = new FrameTransport(_brokerAddress).connect())
{
Disposition responseDisposition = transport.newInteraction()
.negotiateProtocol().consumeResponse()
.open().consumeResponse(Open.class)
.begin().consumeResponse(Begin.class)
.attachRole(Role.SENDER)
.attachTargetAddress(BrokerAdmin.TEST_QUEUE_NAME)
.attachRcvSettleMode(ReceiverSettleMode.SECOND)
.attach().consumeResponse(Attach.class)
.consumeResponse(Flow.class)
.transferPayloadData("testData")
.transferRcvSettleMode(ReceiverSettleMode.FIRST)
.transfer()
.consumeResponse()
.getLatestResponse(Disposition.class);
assertThat(responseDisposition.getRole(), is(Role.RECEIVER));
assertThat(responseDisposition.getSettled(), is(Boolean.TRUE));
assertThat(responseDisposition.getState(), is(instanceOf(Accepted.class)));
}
}
@Test
@SpecificationTest(section = "2.7.5",
description = "If the negotiated link value is first, then it is illegal to set this field to second.")
public void transferReceiverSettleModeCannotBeSecondWhenLinkModeIsFirst() throws Exception
{
try (FrameTransport transport = new FrameTransport(_brokerAddress).connect())
{
Detach detach = transport.newInteraction()
.negotiateProtocol().consumeResponse()
.open().consumeResponse(Open.class)
.begin().consumeResponse(Begin.class)
.attachRole(Role.SENDER)
.attachTargetAddress(BrokerAdmin.TEST_QUEUE_NAME)
.attachRcvSettleMode(ReceiverSettleMode.FIRST)
.attach().consumeResponse(Attach.class)
.consumeResponse(Flow.class)
.transferPayloadData("testData")
.transferRcvSettleMode(ReceiverSettleMode.SECOND)
.transfer()
.consumeResponse()
.getLatestResponse(Detach.class);
Error error = detach.getError();
assertThat(error, is(notNullValue()));
assertThat(error.getCondition(), is(equalTo(AmqpError.INVALID_FIELD)));
}
}
@Test
@SpecificationTest(section = "", description = "Pipelined message send")
public void presettledPipelined() throws Exception
{
try (FrameTransport transport = new FrameTransport(_brokerAddress).connect())
{
final Interaction interaction = transport.newInteraction();
interaction.negotiateProtocol()
.open()
.begin()
.attachRole(Role.SENDER)
.attachTargetAddress(BrokerAdmin.TEST_QUEUE_NAME)
.attach()
.transferPayloadData("testData")
.transferSettled(true)
.transfer()
.close()
.sync();
final byte[] protocolResponse = interaction.consumeResponse().getLatestResponse(byte[].class);
assertThat(protocolResponse, is(equalTo("AMQP\0\1\0\0".getBytes(UTF_8))));
interaction.consumeResponse().getLatestResponse(Open.class);
interaction.consumeResponse().getLatestResponse(Begin.class);
interaction.consumeResponse().getLatestResponse(Attach.class);
interaction.consumeResponse().getLatestResponse(Flow.class);
interaction.consumeResponse().getLatestResponse(Close.class);
}
}
@Test
@SpecificationTest(section = "3.2.1",
description = "Durable messages MUST NOT be lost even if an intermediary is unexpectedly terminated and "
+ "restarted. A target which is not capable of fulfilling this guarantee MUST NOT accept messages "
+ "where the durable header is set to true: if the source allows the rejected outcome then the "
+ "message SHOULD be rejected with the precondition-failed error, otherwise the link MUST be "
+ "detached by the receiver with the same error.")
public void durableTransferWithRejectedOutcome() throws Exception
{
try (FrameTransport transport = new FrameTransport(_brokerAddress).connect())
{
MessageEncoder messageEncoder = new MessageEncoder();
final Header header = new Header();
header.setDurable(true);
messageEncoder.setHeader(header);
messageEncoder.addData("foo");
final Disposition receivedDisposition = transport.newInteraction()
.negotiateProtocol().consumeResponse()
.open().consumeResponse(Open.class)
.begin().consumeResponse(Begin.class)
.attachRole(Role.SENDER)
.attachTargetAddress(BrokerAdmin.TEST_QUEUE_NAME)
.attachRcvSettleMode(ReceiverSettleMode.SECOND)
.attachSourceOutcomes(Accepted.ACCEPTED_SYMBOL,
Rejected.REJECTED_SYMBOL)
.attach().consumeResponse(Attach.class)
.consumeResponse(Flow.class)
.transferPayload(messageEncoder.getPayload())
.transferRcvSettleMode(ReceiverSettleMode.FIRST)
.transfer()
.consumeResponse()
.getLatestResponse(Disposition.class);
assertThat(receivedDisposition.getSettled(), is(true));
assertThat(receivedDisposition.getState(), is(instanceOf(Outcome.class)));
if (getBrokerAdmin().supportsRestart())
{
assertThat(((Outcome) receivedDisposition.getState()).getSymbol(), is(Accepted.ACCEPTED_SYMBOL));
}
else
{
assertThat(((Outcome) receivedDisposition.getState()).getSymbol(), is(Rejected.REJECTED_SYMBOL));
}
}
}
@Test
@SpecificationTest(section = "3.2.1",
description = "Durable messages MUST NOT be lost even if an intermediary is unexpectedly terminated and "
+ "restarted. A target which is not capable of fulfilling this guarantee MUST NOT accept messages "
+ "where the durable header is set to true: if the source allows the rejected outcome then the "
+ "message SHOULD be rejected with the precondition-failed error, otherwise the link MUST be "
+ "detached by the receiver with the same error.")
public void durableTransferWithoutRejectedOutcome() throws Exception
{
try (FrameTransport transport = new FrameTransport(_brokerAddress).connect())
{
MessageEncoder messageEncoder = new MessageEncoder();
final Header header = new Header();
header.setDurable(true);
messageEncoder.setHeader(header);
messageEncoder.addData("foo");
final Response<?> response = transport.newInteraction()
.negotiateProtocol().consumeResponse()
.open().consumeResponse(Open.class)
.begin().consumeResponse(Begin.class)
.attachRole(Role.SENDER)
.attachTargetAddress(BrokerAdmin.TEST_QUEUE_NAME)
.attachRcvSettleMode(ReceiverSettleMode.SECOND)
.attachSourceOutcomes(Accepted.ACCEPTED_SYMBOL)
.attach().consumeResponse(Attach.class)
.consumeResponse(Flow.class)
.transferPayload(messageEncoder.getPayload())
.transferRcvSettleMode(ReceiverSettleMode.FIRST)
.transfer()
.consumeResponse()
.getLatestResponse();
if (getBrokerAdmin().supportsRestart())
{
assertThat(response, is(notNullValue()));
assertThat(response.getBody(), is(instanceOf(Disposition.class)));
final Disposition receivedDisposition = (Disposition) response.getBody();
assertThat(receivedDisposition.getSettled(), is(true));
assertThat(receivedDisposition.getState(), is(instanceOf(Outcome.class)));
assertThat(((Outcome) receivedDisposition.getState()).getSymbol(), is(Accepted.ACCEPTED_SYMBOL));
}
else
{
assertThat(response, is(notNullValue()));
assertThat(response.getBody(), is(instanceOf(Detach.class)));
final Detach receivedDetach = (Detach) response.getBody();
assertThat(receivedDetach.getError(), is(notNullValue()));
assertThat(receivedDetach.getError().getCondition(), is(AmqpError.PRECONDITION_FAILED));
}
}
}
@Test
@SpecificationTest(section = "2.6.12", description = "Transferring A Message.")
public void receiveTransferUnsettled() throws Exception
{
getBrokerAdmin().putMessageOnQueue(BrokerAdmin.TEST_QUEUE_NAME, TEST_MESSAGE_DATA);
try (FrameTransport transport = new FrameTransport(_brokerAddress).connect())
{
final Interaction interaction = transport.newInteraction()
.negotiateProtocol().consumeResponse()
.open().consumeResponse()
.begin().consumeResponse()
.attachRole(Role.RECEIVER)
.attachSourceAddress(BrokerAdmin.TEST_QUEUE_NAME)
.attach().consumeResponse()
.flowIncomingWindow(UnsignedInteger.ONE)
.flowNextIncomingId(UnsignedInteger.ZERO)
.flowOutgoingWindow(UnsignedInteger.ZERO)
.flowNextOutgoingId(UnsignedInteger.ZERO)
.flowLinkCredit(UnsignedInteger.ONE)
.flowHandleFromLinkHandle()
.flow();
MessageDecoder messageDecoder = new MessageDecoder();
boolean hasMore;
do
{
Transfer responseTransfer = interaction.consumeResponse().getLatestResponse(Transfer.class);
messageDecoder.addTransfer(responseTransfer);
hasMore = Boolean.TRUE.equals(responseTransfer.getMore());
}
while (hasMore);
Object data = messageDecoder.getData();
assertThat(data, Is.is(CoreMatchers.equalTo(TEST_MESSAGE_DATA)));
}
}
@Test
@SpecificationTest(section = "2.6.12", description = "Transferring A Message.")
public void receiveTransferReceiverSettleFirst() throws Exception
{
getBrokerAdmin().putMessageOnQueue(BrokerAdmin.TEST_QUEUE_NAME, TEST_MESSAGE_DATA);
try (FrameTransport transport = new FrameTransport(_brokerAddress).connect())
{
final Interaction interaction = transport.newInteraction()
.negotiateProtocol().consumeResponse()
.open().consumeResponse()
.begin().consumeResponse()
.attachRole(Role.RECEIVER)
.attachSourceAddress(BrokerAdmin.TEST_QUEUE_NAME)
.attachRcvSettleMode(ReceiverSettleMode.FIRST)
.attach().consumeResponse()
.flowIncomingWindow(UnsignedInteger.ONE)
.flowNextIncomingId(UnsignedInteger.ZERO)
.flowOutgoingWindow(UnsignedInteger.ZERO)
.flowNextOutgoingId(UnsignedInteger.ZERO)
.flowLinkCredit(UnsignedInteger.ONE)
.flowHandleFromLinkHandle()
.flow()
.receiveDelivery()
.decodeLatestDelivery();
Object data = interaction.getDecodedLatestDelivery();
assertThat(data, Is.is(CoreMatchers.equalTo(TEST_MESSAGE_DATA)));
interaction.dispositionSettled(true)
.dispositionRole(Role.RECEIVER)
.disposition();
// verify that no unexpected performative is received by closing
interaction.doCloseConnection();
}
}
@Test
@SpecificationTest(section = "2.6.12", description = "Transferring A Message.")
public void receiveTransferReceiverSettleSecond() throws Exception
{
getBrokerAdmin().putMessageOnQueue(BrokerAdmin.TEST_QUEUE_NAME, TEST_MESSAGE_DATA);
try (FrameTransport transport = new FrameTransport(_brokerAddress).connect())
{
final Interaction interaction = transport.newInteraction()
.negotiateProtocol().consumeResponse()
.open().consumeResponse()
.begin().consumeResponse()
.attachRole(Role.RECEIVER)
.attachSourceAddress(BrokerAdmin.TEST_QUEUE_NAME)
.attachRcvSettleMode(ReceiverSettleMode.SECOND)
.attach().consumeResponse()
.flowIncomingWindow(UnsignedInteger.ONE)
.flowNextIncomingId(UnsignedInteger.ZERO)
.flowOutgoingWindow(UnsignedInteger.ZERO)
.flowNextOutgoingId(UnsignedInteger.ZERO)
.flowLinkCredit(UnsignedInteger.ONE)
.flowHandleFromLinkHandle()
.flow()
.receiveDelivery()
.decodeLatestDelivery();
Object data = interaction.getDecodedLatestDelivery();
assertThat(data, Is.is(CoreMatchers.equalTo(TEST_MESSAGE_DATA)));
Disposition disposition = interaction.dispositionSettled(false)
.dispositionRole(Role.RECEIVER)
.dispositionState(new Accepted())
.disposition()
.consumeResponse(Disposition.class)
.getLatestResponse(Disposition.class);
assertThat(disposition.getSettled(), is(true));
interaction.consumeResponse(null, Flow.class);
}
}
@Test
@SpecificationTest(section = "2.6.12", description = "Transferring A Message.")
public void receiveTransferReceiverSettleSecondWithRejectedOutcome() throws Exception
{
getBrokerAdmin().putMessageOnQueue(BrokerAdmin.TEST_QUEUE_NAME, TEST_MESSAGE_DATA);
try (FrameTransport transport = new FrameTransport(_brokerAddress).connect())
{
final Interaction interaction = transport.newInteraction()
.negotiateProtocol().consumeResponse()
.open().consumeResponse()
.begin().consumeResponse()
.attachRole(Role.RECEIVER)
.attachSourceAddress(BrokerAdmin.TEST_QUEUE_NAME)
.attachSourceOutcomes(Accepted.ACCEPTED_SYMBOL, Rejected.REJECTED_SYMBOL)
.attachRcvSettleMode(ReceiverSettleMode.SECOND)
.attach().consumeResponse()
.flowIncomingWindow(UnsignedInteger.ONE)
.flowNextIncomingId(UnsignedInteger.ZERO)
.flowOutgoingWindow(UnsignedInteger.ZERO)
.flowNextOutgoingId(UnsignedInteger.ZERO)
.flowLinkCredit(UnsignedInteger.ONE)
.flowHandleFromLinkHandle()
.flow();
Object data = interaction.receiveDelivery().decodeLatestDelivery().getDecodedLatestDelivery();
assertThat(data, is(equalTo(TEST_MESSAGE_DATA)));
interaction.dispositionSettled(false)
.dispositionRole(Role.RECEIVER)
.dispositionState(new Rejected())
.disposition()
.consumeResponse(Disposition.class, Flow.class);
Response<?> response = interaction.getLatestResponse();
if (response.getBody() instanceof Flow)
{
interaction.consumeResponse(Disposition.class);
}
Disposition disposition = interaction.getLatestResponse(Disposition.class);
assertThat(disposition.getSettled(), is(true));
interaction.consumeResponse(null, Flow.class);
}
}
@Ignore
@Test
@SpecificationTest(section = "2.6.12", description = "Transferring A Message.")
public void receiveTransferReceiverSettleSecondWithImplicitDispositionState() throws Exception
{
getBrokerAdmin().putMessageOnQueue(BrokerAdmin.TEST_QUEUE_NAME, TEST_MESSAGE_DATA);
try (FrameTransport transport = new FrameTransport(_brokerAddress).connect())
{
final Interaction interaction = transport.newInteraction()
.negotiateProtocol().consumeResponse()
.open().consumeResponse()
.begin().consumeResponse()
.attachRole(Role.RECEIVER)
.attachSourceAddress(BrokerAdmin.TEST_QUEUE_NAME)
.attachRcvSettleMode(ReceiverSettleMode.SECOND)
.attachSourceOutcomes()
.attachSourceDefaultOutcome(null)
.attach().consumeResponse()
.flowIncomingWindow(UnsignedInteger.ONE)
.flowNextIncomingId(UnsignedInteger.ZERO)
.flowOutgoingWindow(UnsignedInteger.ZERO)
.flowNextOutgoingId(UnsignedInteger.ZERO)
.flowLinkCredit(UnsignedInteger.ONE)
.flowHandleFromLinkHandle()
.flow()
.receiveDelivery()
.decodeLatestDelivery();
Object data = interaction.getDecodedLatestDelivery();
assertThat(data, Is.is(CoreMatchers.equalTo(TEST_MESSAGE_DATA)));
Disposition disposition = interaction.dispositionSettled(false)
.dispositionRole(Role.RECEIVER)
.dispositionState(null)
.disposition()
.consumeResponse(Disposition.class)
.getLatestResponse(Disposition.class);
assertThat(disposition.getSettled(), is(true));
interaction.consumeResponse(null, Flow.class);
}
}
@Test
@SpecificationTest(section = "2.6.12", description = "[...] the receiving application MAY wish to indicate"
+ " non-terminal delivery states to the sender")
public void receiveTransferReceiverIndicatesNonTerminalDeliveryState() throws Exception
{
try (FrameTransport transport = new FrameTransport(_brokerAddress).connect())
{
final Interaction interaction = transport.newInteraction();
Open open = interaction.negotiateProtocol().consumeResponse()
.openMaxFrameSize(UnsignedInteger.valueOf(4096))
.open().consumeResponse()
.getLatestResponse(Open.class);
int negotiatedFrameSize = open.getMaxFrameSize().intValue();
String testMessageData = Stream.generate(() -> "*").limit(negotiatedFrameSize).collect(Collectors.joining());
getBrokerAdmin().putMessageOnQueue(BrokerAdmin.TEST_QUEUE_NAME, testMessageData);
interaction.begin().consumeResponse()
.attachRole(Role.RECEIVER)
.attachSourceAddress(BrokerAdmin.TEST_QUEUE_NAME)
.attachRcvSettleMode(ReceiverSettleMode.SECOND)
.attach().consumeResponse()
.flowIncomingWindow(UnsignedInteger.ONE)
.flowNextIncomingId(UnsignedInteger.ZERO)
.flowOutgoingWindow(UnsignedInteger.ZERO)
.flowNextOutgoingId(UnsignedInteger.ZERO)
.flowLinkCredit(UnsignedInteger.ONE)
.flowHandleFromLinkHandle()
.flow()
.sync();
MessageDecoder messageDecoder = new MessageDecoder();
Transfer first = interaction.consumeResponse(Transfer.class)
.getLatestResponse(Transfer.class);
assertThat(first.getMore(), is(equalTo(true)));
messageDecoder.addTransfer(first);
final long firstRemaining;
try (QpidByteBuffer payload = first.getPayload())
{
firstRemaining = payload.remaining();
}
Received state = new Received();
state.setSectionNumber(UnsignedInteger.ZERO);
state.setSectionOffset(UnsignedLong.valueOf(firstRemaining + 1));
interaction.dispositionSettled(false)
.dispositionRole(Role.RECEIVER)
.dispositionState(state)
.disposition()
.sync();
Transfer second = interaction.consumeResponse(Transfer.class)
.getLatestResponse(Transfer.class);
assertThat(second.getMore(), oneOf(false, null));
messageDecoder.addTransfer(second);
assertThat(messageDecoder.getData(), is(equalTo(testMessageData)));
Disposition disposition = interaction.dispositionSettled(false)
.dispositionRole(Role.RECEIVER)
.dispositionState(new Accepted())
.disposition().consumeResponse(Disposition.class)
.getLatestResponse(Disposition.class);
assertThat(disposition.getSettled(), is(true));
interaction.consumeResponse(null, Flow.class);
}
}
@Test
@SpecificationTest(section = "2.7.3", description = "The sender SHOULD respect the receiver’s desired settlement mode if"
+ " the receiver initiates the attach exchange and the sender supports the desired mode.")
public void receiveTransferSenderSettleModeSettled() throws Exception
{
getBrokerAdmin().putMessageOnQueue(BrokerAdmin.TEST_QUEUE_NAME, TEST_MESSAGE_DATA);
try (FrameTransport transport = new FrameTransport(_brokerAddress).connect())
{
final Interaction interaction = transport.newInteraction()
.negotiateProtocol().consumeResponse()
.open().consumeResponse()
.begin().consumeResponse()
.attachRole(Role.RECEIVER)
.attachSourceAddress(BrokerAdmin.TEST_QUEUE_NAME)
.attachSndSettleMode(SenderSettleMode.SETTLED)
.attach().consumeResponse(Attach.class);
Attach attach = interaction.getLatestResponse(Attach.class);
assumeThat(attach.getSndSettleMode(), is(equalTo(SenderSettleMode.SETTLED)));
interaction.flowIncomingWindow(UnsignedInteger.ONE)
.flowNextIncomingId(UnsignedInteger.ZERO)
.flowOutgoingWindow(UnsignedInteger.ZERO)
.flowNextOutgoingId(UnsignedInteger.ZERO)
.flowLinkCredit(UnsignedInteger.ONE)
.flowHandleFromLinkHandle()
.flow();
List<Transfer> transfers = interaction.receiveDelivery().getLatestDelivery();
final AtomicBoolean isSettled = new AtomicBoolean();
transfers.forEach(transfer -> { if (Boolean.TRUE.equals(transfer.getSettled())) { isSettled.set(true);}});
assertThat(isSettled.get(), is(true));
// verify no unexpected performative received by closing the connection
interaction.doCloseConnection();
}
if (getBrokerAdmin().isQueueDepthSupported())
{
assertThat(getBrokerAdmin().getQueueDepthMessages(BrokerAdmin.TEST_QUEUE_NAME), is(equalTo(0)));
}
getBrokerAdmin().putMessageOnQueue(BrokerAdmin.TEST_QUEUE_NAME, "test");
assertThat(Utils.receiveMessage(_brokerAddress, BrokerAdmin.TEST_QUEUE_NAME), is(equalTo("test")));
}
@Test
@SpecificationTest(section = "2.7.5",
description = "[delivery-tag] uniquely identifies the delivery attempt for a given message on this link.")
public void transfersWithDuplicateUnsettledDeliveryTag() throws Exception
{
try (FrameTransport transport = new FrameTransport(_brokerAddress).connect())
{
final Binary deliveryTag = new Binary("testDeliveryTag".getBytes(UTF_8));
Interaction interaction = transport.newInteraction();
interaction.negotiateProtocol()
.consumeResponse()
.open()
.consumeResponse(Open.class)
.begin()
.consumeResponse(Begin.class)
.attachRole(Role.SENDER)
.attachRcvSettleMode(ReceiverSettleMode.SECOND)
.attach()
.consumeResponse(Attach.class)
.consumeResponse(Flow.class);
Flow flow = interaction.getLatestResponse(Flow.class);
assertThat(flow.getLinkCredit().intValue(), is(greaterThan(1)));
interaction.transferDeliveryId(UnsignedInteger.ZERO)
.transferDeliveryTag(deliveryTag)
.transferPayloadData("test")
.transfer()
.sync()
.transferDeliveryTag(deliveryTag)
.transferDeliveryId(UnsignedInteger.ONE)
.transferPayloadData("test2")
.transfer()
.sync();
do
{
interaction.consumeResponse();
Response<?> response = interaction.getLatestResponse();
assertThat(response, is(notNullValue()));
Object body = response.getBody();
if (body instanceof ErrorCarryingFrameBody)
{
Error error = ((ErrorCarryingFrameBody) body).getError();
assertThat(error, is(notNullValue()));
break;
}
else if (body instanceof Disposition)
{
Disposition disposition = (Disposition) body;
assertThat(disposition.getSettled(), is(equalTo(false)));
assertThat(disposition.getFirst(), is(not(equalTo(UnsignedInteger.ONE))));
assertThat(disposition.getLast(), is(not(equalTo(UnsignedInteger.ONE))));
}
else if (!(body instanceof Flow))
{
fail("Unexpected response " + body);
}
} while (true);
}
}
@Test
@SpecificationTest(section = "2.6.12",
description = "The delivery-tag MUST be unique amongst all deliveries that"
+ " could be considered unsettled by either end of the link.")
public void deliveryTagCanBeReusedAfterDeliveryIsSettled() throws Exception
{
try (FrameTransport transport = new FrameTransport(_brokerAddress).connect())
{
final Binary deliveryTag = new Binary("testDeliveryTag".getBytes(UTF_8));
Interaction interaction = transport.newInteraction();
interaction.negotiateProtocol()
.consumeResponse()
.open()
.consumeResponse(Open.class)
.begin()
.consumeResponse(Begin.class)
.attachRole(Role.SENDER)
.attachTargetAddress(BrokerAdmin.TEST_QUEUE_NAME)
.attach()
.consumeResponse(Attach.class)
.consumeResponse(Flow.class);
Flow flow = interaction.getLatestResponse(Flow.class);
assertThat(flow.getLinkCredit().intValue(), is(greaterThan(1)));
interaction.transferDeliveryId(UnsignedInteger.ZERO)
.transferDeliveryTag(deliveryTag)
.transferPayloadData("test")
.transferSettled(true)
.transfer()
.sync()
.transferDeliveryTag(deliveryTag)
.transferDeliveryId(UnsignedInteger.ONE)
.transferPayloadData("test2")
.transfer()
.sync();
interaction.doCloseConnection();
assumeThat(getBrokerAdmin().isQueueDepthSupported(), is(true));
assertThat(getBrokerAdmin().getQueueDepthMessages(BrokerAdmin.TEST_QUEUE_NAME), is(equalTo(2)));
}
}
@Test
@SpecificationTest(section = "2.7.3",
description = "max-message-size: This field indicates the maximum message size supported by the link"
+ " endpoint. Any attempt to deliver a message larger than this results in a"
+ " message-size-exceeded link-error. If this field is zero or unset, there is no maximum"
+ " size imposed by the link endpoint.")
public void exceedMaxMessageSizeLimit() throws Exception
{
try (FrameTransport transport = new FrameTransport(_brokerAddress).connect())
{
final Binary deliveryTag = new Binary("testDeliveryTag".getBytes(UTF_8));
Interaction interaction = transport.newInteraction();
Open open = interaction.negotiateProtocol().consumeResponse()
.open().consumeResponse(Open.class)
.getLatestResponse(Open.class);
long maxFrameSize = open.getMaxFrameSize() == null ? Integer.MAX_VALUE : open.getMaxFrameSize().longValue();
Attach attach = interaction.begin().consumeResponse(Begin.class)
.attachRole(Role.SENDER)
.attachTargetAddress(BrokerAdmin.TEST_QUEUE_NAME)
.attach().consumeResponse(Attach.class)
.getLatestResponse(Attach.class);
final UnsignedLong maxMessageSizeLimit = attach.getMaxMessageSize();
assumeThat(maxMessageSizeLimit, is(notNullValue()));
assumeThat(maxMessageSizeLimit.longValue(),
is(both(greaterThan(0L)).and(lessThan(MAX_MAX_MESSAGE_SIZE_WE_ARE_WILLING_TO_TEST))));
Flow flow = interaction.consumeResponse(Flow.class)
.getLatestResponse(Flow.class);
assertThat(flow.getLinkCredit().intValue(), is(greaterThan(1)));
final long chunkSize = Math.min(1024 * 1024, maxFrameSize - 100);
byte[] payloadChunk = createTestPaload(chunkSize);
interaction.transferDeliveryId(UnsignedInteger.ZERO)
.transferDeliveryTag(deliveryTag)
.transferPayloadData(payloadChunk)
.transferSettled(true)
.transferMore(true);
int payloadSize = 0;
while (payloadSize < maxMessageSizeLimit.longValue())
{
payloadSize += chunkSize;
interaction.transfer().sync();
}
while (true)
{
Response<?> response = interaction.consumeResponse(Flow.class, Disposition.class, Detach.class).getLatestResponse();
if (response != null)
{
if (response.getBody() instanceof Detach)
{
break;
}
else if (response.getBody() instanceof Disposition)
{
assertThat(((Disposition) response.getBody()).getState(), is(instanceOf(Rejected.class)));
assertThat(((Rejected) ((Disposition) response.getBody()).getState()).getError(), is(notNullValue()));
assertThat(((Rejected) ((Disposition) response.getBody()).getState()).getError().getCondition(), is(equalTo(LinkError.MESSAGE_SIZE_EXCEEDED)));
}
}
}
Detach detach = interaction.getLatestResponse(Detach.class);
assertThat(detach.getError(), is(notNullValue()));
assertThat(detach.getError().getCondition(), is(equalTo(LinkError.MESSAGE_SIZE_EXCEEDED)));
}
}
@Test
@SpecificationTest(section = "2.6.12", description = "Transferring A Message.")
public void transferMultipleDeliveries() throws Exception
{
try (FrameTransport transport = new FrameTransport(_brokerAddress).connect())
{
final Interaction interaction = transport.newInteraction()
.negotiateProtocol().consumeResponse()
.open().consumeResponse(Open.class)
.begin().consumeResponse(Begin.class)
.attachRole(Role.SENDER)
.attachTargetAddress(BrokerAdmin.TEST_QUEUE_NAME)
.attach().consumeResponse(Attach.class)
.consumeResponse(Flow.class);
Flow flow = interaction.getLatestResponse(Flow.class);
assumeThat("insufficient credit for the test", flow.getLinkCredit().intValue(), is(greaterThan(2)));
interaction.transferDeliveryId(UnsignedInteger.ZERO)
.transferDeliveryTag(new Binary("A".getBytes(StandardCharsets.UTF_8)))
.transferPayloadData("test")
.transfer()
.transferDeliveryId(UnsignedInteger.ONE)
.transferDeliveryTag(new Binary("B".getBytes(StandardCharsets.UTF_8)))
.transferPayloadData("test")
.transfer()
.transferDeliveryId(UnsignedInteger.valueOf(2))
.transferDeliveryTag(new Binary("C".getBytes(StandardCharsets.UTF_8)))
.transferPayloadData("test")
.transfer();
TreeSet<UnsignedInteger> expectedDeliveryIds = Sets.newTreeSet(Arrays.asList(UnsignedInteger.ZERO,
UnsignedInteger.ONE,
UnsignedInteger.valueOf(2)));
assertDeliveries(interaction, expectedDeliveryIds);
// verify that no unexpected performative is received by closing
interaction.doCloseConnection();
}
}
@Test
@SpecificationTest(section = "2.6.12", description = "Transferring A Message.")
public void transferMixtureOfTransactionalAndNonTransactionalDeliveries() throws Exception
{
try (FrameTransport transport = new FrameTransport(_brokerAddress).connect())
{
final Interaction interaction = transport.newInteraction().negotiateProtocol().consumeResponse()
.open().consumeResponse(Open.class)
.begin().consumeResponse(Begin.class)
.attachRole(Role.SENDER)
.attachTargetAddress(BrokerAdmin.TEST_QUEUE_NAME)
.attach().consumeResponse(Attach.class)
.consumeResponse(Flow.class);
Flow flow = interaction.getLatestResponse(Flow.class);
assumeThat("insufficient credit for the test", flow.getLinkCredit().intValue(), is(greaterThan(2)));
final InteractionTransactionalState txnState = interaction.createTransactionalState(UnsignedInteger.ONE);
interaction.txnAttachCoordinatorLink(txnState)
.txnDeclare(txnState);
interaction.transferDeliveryId(UnsignedInteger.ONE)
.transferDeliveryTag(new Binary("A".getBytes(StandardCharsets.UTF_8)))
.transferPayloadData("test")
.transfer()
.transferDeliveryId(UnsignedInteger.valueOf(2))
.transferDeliveryTag(new Binary("B".getBytes(StandardCharsets.UTF_8)))
.transferPayloadData("test")
.transfer()
.transferDeliveryId(UnsignedInteger.valueOf(3))
.transferDeliveryTag(new Binary("C".getBytes(StandardCharsets.UTF_8)))
.transferTransactionalState(txnState.getCurrentTransactionId())
.transferPayloadData("test")
.transfer();
final Discharge discharge = new Discharge();
discharge.setTxnId(txnState.getCurrentTransactionId());
discharge.setFail(false);
interaction.transferHandle(txnState.getHandle())
.transferDeliveryId(UnsignedInteger.valueOf(4))
.transferDeliveryTag(new Binary(("transaction-" + 4).getBytes(StandardCharsets.UTF_8)))
.transferPayloadData(discharge).transfer();
assertDeliveries(interaction, Sets.newTreeSet(Arrays.asList(UnsignedInteger.ONE,
UnsignedInteger.valueOf(2),
UnsignedInteger.valueOf(3),
UnsignedInteger.valueOf(4))));
}
}
@Test
@SpecificationTest(section = "2.6.12", description = "Transferring A Message.")
public void receiveMultipleDeliveries() throws Exception
{
int numberOfMessages = 4;
for (int i = 0; i < numberOfMessages; i++)
{
getBrokerAdmin().putMessageOnQueue(BrokerAdmin.TEST_QUEUE_NAME, TEST_MESSAGE_DATA + "_" + i);
}
try (FrameTransport transport = new FrameTransport(_brokerAddress).connect())
{
final Interaction interaction = transport.newInteraction()
.negotiateProtocol().consumeResponse()
.open().consumeResponse()
.begin().consumeResponse()
.attachRole(Role.RECEIVER)
.attachSourceAddress(BrokerAdmin.TEST_QUEUE_NAME)
.attachRcvSettleMode(ReceiverSettleMode.FIRST)
.attach().consumeResponse()
.flowIncomingWindow(UnsignedInteger.valueOf(numberOfMessages))
.flowNextIncomingId(UnsignedInteger.ZERO)
.flowOutgoingWindow(UnsignedInteger.ZERO)
.flowNextOutgoingId(UnsignedInteger.ZERO)
.flowLinkCredit(UnsignedInteger.valueOf(numberOfMessages))
.flowHandleFromLinkHandle()
.flow();
for (int i = 0; i < numberOfMessages; i++)
{
interaction.receiveDelivery(Flow.class).decodeLatestDelivery();
Object data = interaction.getDecodedLatestDelivery();
assertThat(data, Is.is(CoreMatchers.equalTo(TEST_MESSAGE_DATA + "_" + i)));
assertThat(interaction.getLatestDeliveryId(), Is.is(equalTo(UnsignedInteger.valueOf(i))));
}
interaction.dispositionSettled(true)
.dispositionRole(Role.RECEIVER)
.dispositionFirst(UnsignedInteger.ZERO)
.dispositionLast(interaction.getLatestDeliveryId())
.dispositionState(new Accepted())
.disposition();
// make sure sure the disposition is handled by making drain request
interaction.flowLinkCredit(UnsignedInteger.ONE)
.flowNextIncomingId(UnsignedInteger.valueOf(numberOfMessages))
.flowDrain(Boolean.TRUE)
.flow()
.consumeResponse(Flow.class);
if (getBrokerAdmin().isQueueDepthSupported())
{
assertThat(getBrokerAdmin().getQueueDepthMessages(BrokerAdmin.TEST_QUEUE_NAME), is(equalTo(0)));
}
}
String messageText = TEST_MESSAGE_DATA + "_" + 4;
getBrokerAdmin().putMessageOnQueue(BrokerAdmin.TEST_QUEUE_NAME, messageText);
Object receivedMessage = Utils.receiveMessage(_brokerAddress, BrokerAdmin.TEST_QUEUE_NAME);
assertThat(receivedMessage, is(equalTo(messageText)));
}
@Test
@SpecificationTest(section = "2.6.12", description = "Transferring A Message.")
public void receiveMixtureOfTransactionalAndNonTransactionalDeliveries() throws Exception
{
int numberOfMessages = 4;
for (int i = 0; i < numberOfMessages; i++)
{
getBrokerAdmin().putMessageOnQueue(BrokerAdmin.TEST_QUEUE_NAME, TEST_MESSAGE_DATA + "_" + i);
}
try (FrameTransport transport = new FrameTransport(_brokerAddress).connect())
{
final Interaction interaction = transport.newInteraction()
.negotiateProtocol().consumeResponse()
.open().consumeResponse()
.begin().consumeResponse()
.attachRole(Role.RECEIVER)
.attachSourceAddress(BrokerAdmin.TEST_QUEUE_NAME)
.attachRcvSettleMode(ReceiverSettleMode.FIRST)
.attachHandle(UnsignedInteger.ZERO)
.attach().consumeResponse()
.flowIncomingWindow(UnsignedInteger.valueOf(numberOfMessages))
.flowNextIncomingId(UnsignedInteger.ZERO)
.flowOutgoingWindow(UnsignedInteger.ZERO)
.flowNextOutgoingId(UnsignedInteger.ZERO)
.flowLinkCredit(UnsignedInteger.valueOf(numberOfMessages))
.flowHandleFromLinkHandle()
.flow();
for (int i = 0; i < numberOfMessages; i++)
{
interaction.receiveDelivery(Flow.class).decodeLatestDelivery();
Object data = interaction.getDecodedLatestDelivery();
assertThat(data, Is.is(CoreMatchers.equalTo(TEST_MESSAGE_DATA + "_" + i)));
assertThat(interaction.getLatestDeliveryId(), Is.is(equalTo(UnsignedInteger.valueOf(i))));
}
final InteractionTransactionalState txnState = interaction.createTransactionalState(UnsignedInteger.ONE);
interaction.txnAttachCoordinatorLink(txnState)
.txnDeclare(txnState);
interaction.dispositionSettled(true)
.dispositionRole(Role.RECEIVER)
.dispositionFirst(UnsignedInteger.ZERO)
.dispositionLast(UnsignedInteger.ONE)
.dispositionState(new Accepted())
.disposition()
.dispositionSettled(true)
.dispositionRole(Role.RECEIVER)
.dispositionFirst(UnsignedInteger.valueOf(2))
.dispositionLast(UnsignedInteger.valueOf(3))
.dispositionTransactionalState(txnState.getCurrentTransactionId(), new Accepted())
.disposition();
final Discharge discharge = new Discharge();
discharge.setTxnId(txnState.getCurrentTransactionId());
discharge.setFail(false);
interaction.transferHandle(txnState.getHandle())
.transferDeliveryId(UnsignedInteger.valueOf(4))
.transferDeliveryTag(new Binary(("transaction-" + 4).getBytes(StandardCharsets.UTF_8)))
.transferPayloadData(discharge)
.transfer();
Disposition declareTransactionDisposition = null;
Flow coordinatorFlow = null;
do
{
interaction.consumeResponse(Disposition.class, Flow.class);
Response<?> response = interaction.getLatestResponse();
if (response.getBody() instanceof Disposition)
{
declareTransactionDisposition = (Disposition) response.getBody();
}
if (response.getBody() instanceof Flow)
{
final Flow flowResponse = (Flow) response.getBody();
if (flowResponse.getHandle().equals(txnState.getHandle()))
{
coordinatorFlow = flowResponse;
}
}
} while(declareTransactionDisposition == null || coordinatorFlow == null);
if (getBrokerAdmin().isQueueDepthSupported())
{
assertThat(getBrokerAdmin().getQueueDepthMessages(BrokerAdmin.TEST_QUEUE_NAME), is(equalTo(0)));
}
}
String messageText = TEST_MESSAGE_DATA + "_" + 4;
getBrokerAdmin().putMessageOnQueue(BrokerAdmin.TEST_QUEUE_NAME, messageText);
Object receivedMessage = Utils.receiveMessage(_brokerAddress, BrokerAdmin.TEST_QUEUE_NAME);
assertThat(receivedMessage, is(equalTo(messageText)));
}
private void assertDeliveries(final Interaction interaction, final TreeSet<UnsignedInteger> expectedDeliveryIds)
throws Exception
{
do
{
Response<?> response = interaction.consumeResponse(Disposition.class, Flow.class).getLatestResponse();
if (response.getBody() instanceof Disposition)
{
Disposition disposition = (Disposition) response.getBody();
LongStream.rangeClosed(disposition.getFirst().longValue(),
disposition.getLast() == null
? disposition.getFirst().longValue()
: disposition.getLast().longValue())
.forEach(value -> {
UnsignedInteger deliveryId = expectedDeliveryIds.first();
assertThat(value, is(equalTo(deliveryId.longValue())));
expectedDeliveryIds.remove(deliveryId);
});
}
}
while (!expectedDeliveryIds.isEmpty());
}
private byte[] createTestPaload(final long payloadSize)
{
if (payloadSize > 1024*1024*1024)
{
throw new IllegalArgumentException(String.format("Payload size (%.2f MB) too big", payloadSize / (1024. * 1024.)));
}
return new byte[(int) payloadSize];
}
}