blob: 8f88e35b06f737c2e9e47774b0c16e4fc1d5544b [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.qpid.protonj2.client.impl;
import static org.hamcrest.CoreMatchers.notNullValue;
import static org.hamcrest.Matchers.anyOf;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.nullValue;
import static org.junit.jupiter.api.Assertions.assertArrayEquals;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertNotEquals;
import static org.junit.jupiter.api.Assertions.assertNotNull;
import static org.junit.jupiter.api.Assertions.assertNull;
import static org.junit.jupiter.api.Assertions.assertSame;
import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.junit.jupiter.api.Assertions.fail;
import java.io.InputStream;
import java.net.URI;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import org.apache.qpid.protonj2.client.AdvancedMessage;
import org.apache.qpid.protonj2.client.Client;
import org.apache.qpid.protonj2.client.Connection;
import org.apache.qpid.protonj2.client.ConnectionOptions;
import org.apache.qpid.protonj2.client.Delivery;
import org.apache.qpid.protonj2.client.DeliveryMode;
import org.apache.qpid.protonj2.client.DeliveryState;
import org.apache.qpid.protonj2.client.DistributionMode;
import org.apache.qpid.protonj2.client.DurabilityMode;
import org.apache.qpid.protonj2.client.ErrorCondition;
import org.apache.qpid.protonj2.client.ExpiryPolicy;
import org.apache.qpid.protonj2.client.Message;
import org.apache.qpid.protonj2.client.Receiver;
import org.apache.qpid.protonj2.client.ReceiverOptions;
import org.apache.qpid.protonj2.client.Session;
import org.apache.qpid.protonj2.client.SessionOptions;
import org.apache.qpid.protonj2.client.exceptions.ClientConnectionRemotelyClosedException;
import org.apache.qpid.protonj2.client.exceptions.ClientException;
import org.apache.qpid.protonj2.client.exceptions.ClientIOException;
import org.apache.qpid.protonj2.client.exceptions.ClientIllegalStateException;
import org.apache.qpid.protonj2.client.exceptions.ClientLinkRemotelyClosedException;
import org.apache.qpid.protonj2.client.exceptions.ClientOperationTimedOutException;
import org.apache.qpid.protonj2.client.test.ImperativeClientTestCase;
import org.apache.qpid.protonj2.client.test.Wait;
import org.apache.qpid.protonj2.test.driver.ProtonTestServer;
import org.apache.qpid.protonj2.test.driver.codec.messaging.Modified;
import org.apache.qpid.protonj2.test.driver.codec.messaging.Released;
import org.apache.qpid.protonj2.test.driver.codec.messaging.TerminusDurability;
import org.apache.qpid.protonj2.test.driver.codec.messaging.TerminusExpiryPolicy;
import org.apache.qpid.protonj2.types.messaging.AmqpValue;
import org.apache.qpid.protonj2.types.messaging.Data;
import org.apache.qpid.protonj2.types.messaging.Section;
import org.apache.qpid.protonj2.types.transport.AmqpError;
import org.apache.qpid.protonj2.types.transport.ReceiverSettleMode;
import org.apache.qpid.protonj2.types.transport.Role;
import org.apache.qpid.protonj2.types.transport.SenderSettleMode;
import org.hamcrest.Matcher;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.Timeout;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@Timeout(20)
public class ReceiverTest extends ImperativeClientTestCase {
private static final Logger LOG = LoggerFactory.getLogger(ReceiverTest.class);
@Test
public void testCreateReceiverAndClose() throws Exception {
doTestCreateReceiverAndCloseOrDetachLink(true);
}
@Test
public void testCreateReceiverAndDetach() throws Exception {
doTestCreateReceiverAndCloseOrDetachLink(false);
}
private void doTestCreateReceiverAndCloseOrDetachLink(boolean close) throws Exception {
try (ProtonTestServer peer = new ProtonTestServer()) {
peer.expectSASLAnonymousConnect();
peer.expectOpen().respond();
peer.expectBegin().respond();
peer.expectAttach().ofReceiver().withSource().withDistributionMode(nullValue()).and().respond();
peer.expectFlow().withLinkCredit(10);
peer.expectDetach().withClosed(close).respond();
peer.expectClose().respond();
peer.start();
URI remoteURI = peer.getServerURI();
LOG.info("Test started, peer listening on: {}", remoteURI);
Client container = Client.create();
Connection connection = container.connect(remoteURI.getHost(), remoteURI.getPort());
connection.openFuture().get(10, TimeUnit.SECONDS);
Session session = connection.openSession();
session.openFuture().get(10, TimeUnit.SECONDS);
Receiver receiver = session.openReceiver("test-queue");
receiver.openFuture().get(10, TimeUnit.SECONDS);
assertSame(container, receiver.client());
assertSame(connection, receiver.connection());
assertSame(session, receiver.session());
if (close) {
receiver.closeAsync().get(10, TimeUnit.SECONDS);
} else {
receiver.detachAsync().get(10, TimeUnit.SECONDS);
}
connection.closeAsync().get(10, TimeUnit.SECONDS);
peer.waitForScriptToComplete(5, TimeUnit.SECONDS);
}
}
@Test
public void testCreateReceiverAndCloseSync() throws Exception {
doTestCreateReceiverAndCloseOrDetachLinkSync(true);
}
@Test
public void testCreateReceiverAndDetachSync() throws Exception {
doTestCreateReceiverAndCloseOrDetachLinkSync(false);
}
private void doTestCreateReceiverAndCloseOrDetachLinkSync(boolean close) throws Exception {
try (ProtonTestServer peer = new ProtonTestServer()) {
peer.expectSASLAnonymousConnect();
peer.expectOpen().respond();
peer.expectBegin().respond();
peer.expectAttach().ofReceiver().respond();
peer.expectFlow().withLinkCredit(10);
peer.expectDetach().withClosed(close).respond();
peer.expectClose().respond();
peer.start();
URI remoteURI = peer.getServerURI();
LOG.info("Test started, peer listening on: {}", remoteURI);
Client container = Client.create();
Connection connection = container.connect(remoteURI.getHost(), remoteURI.getPort());
connection.openFuture().get(10, TimeUnit.SECONDS);
Session session = connection.openSession();
session.openFuture().get(10, TimeUnit.SECONDS);
Receiver receiver = session.openReceiver("test-queue");
receiver.openFuture().get(10, TimeUnit.SECONDS);
if (close) {
receiver.close();
} else {
receiver.detach();
}
connection.closeAsync().get(10, TimeUnit.SECONDS);
peer.waitForScriptToComplete(5, TimeUnit.SECONDS);
}
}
@Test
public void testCreateReceiverAndCloseWithErrorSync() throws Exception {
doTestCreateReceiverAndCloseOrDetachWithErrorSync(true);
}
@Test
public void testCreateReceiverAndDetachWithErrorSync() throws Exception {
doTestCreateReceiverAndCloseOrDetachWithErrorSync(false);
}
private void doTestCreateReceiverAndCloseOrDetachWithErrorSync(boolean close) throws Exception {
try (ProtonTestServer peer = new ProtonTestServer()) {
peer.expectSASLAnonymousConnect();
peer.expectOpen().respond();
peer.expectBegin().respond();
peer.expectAttach().ofReceiver().respond();
peer.expectFlow();
peer.expectDetach().withError("amqp-resource-deleted", "an error message").withClosed(close).respond();
peer.expectClose().respond();
peer.start();
URI remoteURI = peer.getServerURI();
LOG.info("Sender test started, peer listening on: {}", remoteURI);
Client container = Client.create();
Connection connection = container.connect(remoteURI.getHost(), remoteURI.getPort());
connection.openFuture().get(10, TimeUnit.SECONDS);
Session session = connection.openSession();
session.openFuture().get(10, TimeUnit.SECONDS);
Receiver receiver = session.openReceiver("test-queue");
receiver.openFuture().get(10, TimeUnit.SECONDS);
if (close) {
receiver.close(ErrorCondition.create("amqp-resource-deleted", "an error message", null));
} else {
receiver.detach(ErrorCondition.create("amqp-resource-deleted", "an error message", null));
}
connection.closeAsync().get(10, TimeUnit.SECONDS);
peer.waitForScriptToComplete(5, TimeUnit.SECONDS);
}
}
@Test
public void testReceiverOpenRejectedByRemote() throws Exception {
try (ProtonTestServer peer = new ProtonTestServer()) {
peer.expectSASLAnonymousConnect();
peer.expectOpen().respond();
peer.expectBegin().respond();
peer.expectAttach().respond().withNullSource();
peer.expectFlow();
peer.remoteDetach().withErrorCondition(AmqpError.UNAUTHORIZED_ACCESS.toString(), "Cannot read from this address").queue();
peer.expectDetach();
peer.start();
URI remoteURI = peer.getServerURI();
LOG.info("Test started, peer listening on: {}", remoteURI);
Client container = Client.create();
Connection connection = container.connect(remoteURI.getHost(), remoteURI.getPort());
connection.openFuture().get();
Session session = connection.openSession();
session.openFuture().get();
Receiver receiver = session.openReceiver("test-queue");
try {
receiver.openFuture().get();
fail("Open of receiver should fail due to remote indicating pending close.");
} catch (ExecutionException exe) {
assertNotNull(exe.getCause());
assertTrue(exe.getCause() instanceof ClientLinkRemotelyClosedException);
ClientLinkRemotelyClosedException linkClosed = (ClientLinkRemotelyClosedException) exe.getCause();
assertNotNull(linkClosed.getErrorCondition());
assertEquals(AmqpError.UNAUTHORIZED_ACCESS.toString(), linkClosed.getErrorCondition().condition());
}
peer.waitForScriptToComplete(5, TimeUnit.SECONDS);
// Should not result in any close being sent now, already closed.
receiver.closeAsync().get();
peer.expectClose().respond();
connection.closeAsync().get();
peer.waitForScriptToComplete(1, TimeUnit.SECONDS);
}
}
@Test
public void testOpenReceiverTimesOutWhenNoAttachResponseReceivedTimeout() throws Exception {
doTestOpenReceiverTimesOutWhenNoAttachResponseReceived(true);
}
@Test
public void testOpenReceiverTimesOutWhenNoAttachResponseReceivedNoTimeout() throws Exception {
doTestOpenReceiverTimesOutWhenNoAttachResponseReceived(false);
}
private void doTestOpenReceiverTimesOutWhenNoAttachResponseReceived(boolean timeout) throws Exception {
try (ProtonTestServer peer = new ProtonTestServer()) {
peer.expectSASLAnonymousConnect();
peer.expectOpen().respond();
peer.expectBegin().respond();
peer.expectAttach().withRole(Role.RECEIVER.getValue());
peer.expectFlow();
peer.expectDetach();
peer.expectClose().respond();
peer.start();
URI remoteURI = peer.getServerURI();
LOG.info("Receiver test started, peer listening on: {}", remoteURI);
Client container = Client.create();
Connection connection = container.connect(remoteURI.getHost(), remoteURI.getPort());
Session session = connection.openSession().openFuture().get(10, TimeUnit.SECONDS);
Receiver receiver = session.openReceiver("test-queue", new ReceiverOptions().openTimeout(10));
try {
if (timeout) {
receiver.openFuture().get(10, TimeUnit.SECONDS);
} else {
receiver.openFuture().get();
}
fail("Should not complete the open future without an error");
} catch (ExecutionException exe) {
Throwable cause = exe.getCause();
assertTrue(cause instanceof ClientOperationTimedOutException);
}
connection.closeAsync().get(10, TimeUnit.SECONDS);
peer.waitForScriptToComplete(5, TimeUnit.SECONDS);
}
}
@Test
public void testOpenReceiverWaitWithTimeoutFailsWhenConnectionDrops() throws Exception {
doTestOpenReceiverWaitFailsWhenConnectionDrops(true);
}
@Test
public void testOpenReceiverWaitWithNoTimeoutFailsWhenConnectionDrops() throws Exception {
doTestOpenReceiverWaitFailsWhenConnectionDrops(false);
}
private void doTestOpenReceiverWaitFailsWhenConnectionDrops(boolean timeout) throws Exception {
try (ProtonTestServer peer = new ProtonTestServer()) {
peer.expectSASLAnonymousConnect();
peer.expectOpen().respond();
peer.expectBegin().respond();
peer.expectAttach().ofReceiver();
peer.expectFlow();
peer.dropAfterLastHandler(10);
peer.start();
URI remoteURI = peer.getServerURI();
LOG.info("Sender test started, peer listening on: {}", remoteURI);
Client container = Client.create();
Connection connection = container.connect(remoteURI.getHost(), remoteURI.getPort());
Session session = connection.openSession();
Receiver receiver = session.openReceiver("test-queue");
try {
if (timeout) {
receiver.openFuture().get(10, TimeUnit.SECONDS);
} else {
receiver.openFuture().get();
}
fail("Should not complete the open future without an error");
} catch (ExecutionException exe) {
Throwable cause = exe.getCause();
assertTrue(cause instanceof ClientIOException);
}
connection.closeAsync().get(10, TimeUnit.SECONDS);
peer.waitForScriptToComplete(5, TimeUnit.SECONDS);
}
}
@Test
public void testCloseReceiverTimesOutWhenNoCloseResponseReceivedTimeout() throws Exception {
doTestCloseOrDetachReceiverTimesOutWhenNoCloseResponseReceived(true, true);
}
@Test
public void testCloseReceiverTimesOutWhenNoCloseResponseReceivedNoTimeout() throws Exception {
doTestCloseOrDetachReceiverTimesOutWhenNoCloseResponseReceived(true, false);
}
@Test
public void testDetachReceiverTimesOutWhenNoCloseResponseReceivedTimeout() throws Exception {
doTestCloseOrDetachReceiverTimesOutWhenNoCloseResponseReceived(false, true);
}
@Test
public void testDetachReceiverTimesOutWhenNoCloseResponseReceivedNoTimeout() throws Exception {
doTestCloseOrDetachReceiverTimesOutWhenNoCloseResponseReceived(false, false);
}
private void doTestCloseOrDetachReceiverTimesOutWhenNoCloseResponseReceived(boolean close, boolean timeout) throws Exception {
try (ProtonTestServer peer = new ProtonTestServer()) {
peer.expectSASLAnonymousConnect();
peer.expectOpen().respond();
peer.expectBegin().respond();
peer.expectAttach().withRole(Role.RECEIVER.getValue()).respond();
peer.expectFlow();
peer.expectDetach();
peer.expectClose().respond();
peer.start();
URI remoteURI = peer.getServerURI();
LOG.info("Receiver test started, peer listening on: {}", remoteURI);
Client container = Client.create();
ConnectionOptions options = new ConnectionOptions();
options.closeTimeout(5);
Connection connection = container.connect(remoteURI.getHost(), remoteURI.getPort(), options);
connection.openFuture().get(10, TimeUnit.SECONDS);
Session session = connection.openSession();
session.openFuture().get(10, TimeUnit.SECONDS);
Receiver receiver = session.openReceiver("test-queue");
receiver.openFuture().get(10, TimeUnit.SECONDS);
try {
if (close) {
if (timeout) {
receiver.closeAsync().get(10, TimeUnit.SECONDS);
} else {
receiver.closeAsync().get();
}
} else {
if (timeout) {
receiver.detachAsync().get(10, TimeUnit.SECONDS);
} else {
receiver.detachAsync().get();
}
}
fail("Should not complete the close or detach future without an error");
} catch (ExecutionException exe) {
Throwable cause = exe.getCause();
assertTrue(cause instanceof ClientOperationTimedOutException);
}
connection.closeAsync().get(10, TimeUnit.SECONDS);
peer.waitForScriptToComplete(5, TimeUnit.SECONDS);
}
}
@Test
public void testReceiverDrainAllOutstanding() throws Exception {
try (ProtonTestServer peer = new ProtonTestServer()) {
peer.expectSASLAnonymousConnect();
peer.expectOpen().respond();
peer.expectBegin().respond();
peer.expectAttach().withRole(Role.RECEIVER.getValue()).respond();
peer.start();
URI remoteURI = peer.getServerURI();
LOG.info("Test started, peer listening on: {}", remoteURI);
Client container = Client.create();
Connection connection = container.connect(remoteURI.getHost(), remoteURI.getPort());
connection.openFuture().get(5, TimeUnit.SECONDS);
Session session = connection.openSession();
session.openFuture().get(5, TimeUnit.SECONDS);
Receiver receiver = session.openReceiver("test-queue", new ReceiverOptions().creditWindow(0));
receiver.openFuture().get(5, TimeUnit.SECONDS);
peer.waitForScriptToComplete(5, TimeUnit.SECONDS);
// Add some credit, verify not draining
int credit = 7;
Matcher<Boolean> drainMatcher = anyOf(equalTo(false), nullValue());
peer.expectFlow().withDrain(drainMatcher).withLinkCredit(credit).withDeliveryCount(0);
receiver.addCredit(credit);
peer.waitForScriptToComplete(5, TimeUnit.SECONDS);
// Drain all the credit
peer.expectFlow().withDrain(true).withLinkCredit(credit).withDeliveryCount(0)
.respond()
.withDrain(true).withLinkCredit(0).withDeliveryCount(credit);
Future<? extends Receiver> draining = receiver.drain();
draining.get(5, TimeUnit.SECONDS);
// Close things down
peer.expectClose().respond();
connection.closeAsync().get(5, TimeUnit.SECONDS);
peer.waitForScriptToComplete(1, TimeUnit.SECONDS);
}
}
@Test
public void testDrainCompletesWhenReceiverHasNoCredit() throws Exception {
try (ProtonTestServer peer = new ProtonTestServer()) {
peer.expectSASLAnonymousConnect();
peer.expectOpen().respond();
peer.expectBegin().respond();
peer.expectAttach().withRole(Role.RECEIVER.getValue()).respond();
peer.start();
URI remoteURI = peer.getServerURI();
LOG.info("Test started, peer listening on: {}", remoteURI);
Client container = Client.create();
Connection connection = container.connect(remoteURI.getHost(), remoteURI.getPort());
Session session = connection.openSession();
Receiver receiver = session.openReceiver("test-queue", new ReceiverOptions().creditWindow(0));
receiver.openFuture().get(5, TimeUnit.SECONDS);
peer.waitForScriptToComplete(5, TimeUnit.SECONDS);
Future<? extends Receiver> draining = receiver.drain();
draining.get(5, TimeUnit.SECONDS);
// Close things down
peer.expectClose().respond();
connection.closeAsync().get(5, TimeUnit.SECONDS);
peer.waitForScriptToComplete(1, TimeUnit.SECONDS);
}
}
@Test
public void testDrainAdditionalDrainCallThrowsWhenReceiverStillDraining() throws Exception {
try (ProtonTestServer peer = new ProtonTestServer()) {
peer.expectSASLAnonymousConnect();
peer.expectOpen().respond();
peer.expectBegin().respond();
peer.expectAttach().ofReceiver().respond();
peer.expectFlow();
peer.expectFlow().withDrain(true);
peer.expectClose().respond();
peer.start();
URI remoteURI = peer.getServerURI();
LOG.info("Test started, peer listening on: {}", remoteURI);
Client container = Client.create();
Connection connection = container.connect(remoteURI.getHost(), remoteURI.getPort());
Session session = connection.openSession();
Receiver receiver = session.openReceiver("test-queue").openFuture().get();
receiver.drain();
try {
receiver.drain().get();
fail("Drain call should fail timeout exceeded.");
} catch (ExecutionException cliEx) {
LOG.debug("Receiver threw error on drain call", cliEx);
assertTrue(cliEx.getCause() instanceof ClientIllegalStateException);
}
connection.closeAsync().get();
peer.waitForScriptToComplete(5, TimeUnit.SECONDS);
}
}
@Test
public void testAddCreditFailsWhileDrainPending() throws Exception {
try (ProtonTestServer peer = new ProtonTestServer()) {
peer.expectSASLAnonymousConnect();
peer.expectOpen().respond();
peer.expectBegin().respond();
peer.expectAttach().withRole(Role.RECEIVER.getValue()).respond().withInitialDeliveryCount(20);
peer.start();
URI remoteURI = peer.getServerURI();
LOG.info("Test started, peer listening on: {}", remoteURI);
Client container = Client.create();
Connection connection = container.connect(remoteURI.getHost(), remoteURI.getPort());
Session session = connection.openSession();
Receiver receiver = session.openReceiver("test-queue", new ReceiverOptions().creditWindow(0));
peer.waitForScriptToComplete(5, TimeUnit.SECONDS);
// Add some credit, verify not draining
int credit = 7;
Matcher<Boolean> drainMatcher = anyOf(equalTo(false), nullValue());
peer.expectFlow().withDrain(drainMatcher).withLinkCredit(credit);
// Ensure we get the attach response with the initial delivery count.
receiver.openFuture().get().addCredit(credit);
peer.waitForScriptToComplete(5, TimeUnit.SECONDS);
// Drain all the credit
peer.expectFlow().withDrain(true).withLinkCredit(credit).withDeliveryCount(20);
peer.expectClose().respond();
Future<? extends Receiver> draining = receiver.drain();
assertFalse(draining.isDone());
try {
receiver.addCredit(1);
fail("Should not allow add credit when drain is pending");
} catch (ClientIllegalStateException ise) {
// Expected
}
connection.closeAsync().get();
peer.waitForScriptToComplete(1, TimeUnit.SECONDS);
}
}
@Test
public void testAddCreditFailsWhenCreditWindowEnabled() throws Exception {
try (ProtonTestServer peer = new ProtonTestServer()) {
peer.expectSASLAnonymousConnect();
peer.expectOpen().respond();
peer.expectBegin().respond();
peer.expectAttach().withRole(Role.RECEIVER.getValue()).respond();
peer.expectFlow().withLinkCredit(10);
peer.start();
URI remoteURI = peer.getServerURI();
LOG.info("Test started, peer listening on: {}", remoteURI);
Client container = Client.create();
Connection connection = container.connect(remoteURI.getHost(), remoteURI.getPort());
Session session = connection.openSession();
Receiver receiver = session.openReceiver("test-queue", new ReceiverOptions().creditWindow(10));
peer.waitForScriptToComplete(5, TimeUnit.SECONDS);
peer.expectClose().respond();
try {
receiver.addCredit(1);
fail("Should not allow add credit when credit window configured");
} catch (ClientIllegalStateException ise) {
// Expected
}
connection.close();
peer.waitForScriptToComplete(1, TimeUnit.SECONDS);
}
}
@Test
public void testCreateDynamicReceiver() throws Exception {
try (ProtonTestServer peer = new ProtonTestServer()) {
peer.expectSASLAnonymousConnect();
peer.expectOpen().respond();
peer.expectBegin().respond();
peer.expectAttach().withRole(Role.RECEIVER.getValue())
.withSource().withDynamic(true).withAddress((String) null)
.and().respond()
.withSource().withDynamic(true).withAddress("test-dynamic-node");
peer.expectFlow();
peer.expectDetach().respond();
peer.expectClose().respond();
peer.start();
URI remoteURI = peer.getServerURI();
LOG.info("Test started, peer listening on: {}", remoteURI);
Client container = Client.create();
Connection connection = container.connect(remoteURI.getHost(), remoteURI.getPort());
connection.openFuture().get(10, TimeUnit.SECONDS);
Session session = connection.openSession();
session.openFuture().get(10, TimeUnit.SECONDS);
Receiver receiver = session.openDynamicReceiver();
receiver.openFuture().get(10, TimeUnit.SECONDS);
assertNotNull(receiver.address(), "Remote should have assigned the address for the dynamic receiver");
assertEquals("test-dynamic-node", receiver.address());
receiver.closeAsync().get(10, TimeUnit.SECONDS);
connection.closeAsync().get(10, TimeUnit.SECONDS);
peer.waitForScriptToComplete(5, TimeUnit.SECONDS);
}
}
@Test
public void testCreateDynamicReceiverWthNodeProperties() throws Exception {
final Map<String, Object> nodeProperties = new HashMap<>();
nodeProperties.put("test-property-1", "one");
nodeProperties.put("test-property-2", "two");
nodeProperties.put("test-property-3", "three");
try (ProtonTestServer peer = new ProtonTestServer()) {
peer.expectSASLAnonymousConnect();
peer.expectOpen().respond();
peer.expectBegin().respond();
peer.expectAttach().withRole(Role.RECEIVER.getValue())
.withSource()
.withDynamic(true)
.withAddress((String) null)
.withDynamicNodeProperties(nodeProperties)
.and().respond()
.withSource()
.withDynamic(true)
.withAddress("test-dynamic-node")
.withDynamicNodeProperties(nodeProperties);
peer.expectFlow();
peer.expectDetach().respond();
peer.expectClose().respond();
peer.start();
URI remoteURI = peer.getServerURI();
LOG.info("Test started, peer listening on: {}", remoteURI);
Client container = Client.create();
Connection connection = container.connect(remoteURI.getHost(), remoteURI.getPort());
Session session = connection.openSession();
Receiver receiver = session.openDynamicReceiver(nodeProperties);
assertNotNull(receiver.address(), "Remote should have assigned the address for the dynamic receiver");
assertEquals("test-dynamic-node", receiver.address());
receiver.closeAsync().get(10, TimeUnit.SECONDS);
connection.closeAsync().get(10, TimeUnit.SECONDS);
peer.waitForScriptToComplete(5, TimeUnit.SECONDS);
}
}
@Test
public void testCreateDynamicReceiverWithNoCreditWindow() throws Exception {
try (ProtonTestServer peer = new ProtonTestServer()) {
peer.expectSASLAnonymousConnect();
peer.expectOpen().respond();
peer.expectBegin().respond();
peer.expectAttach().withRole(Role.RECEIVER.getValue())
.withSource().withDynamic(true).withAddress((String) null)
.and().respond()
.withSource().withDynamic(true).withAddress("test-dynamic-node");
peer.expectAttach().ofSender().respond();
peer.expectDetach().respond();
peer.expectClose().respond();
peer.start();
URI remoteURI = peer.getServerURI();
LOG.info("Test started, peer listening on: {}", remoteURI);
Client container = Client.create();
Connection connection = container.connect(remoteURI.getHost(), remoteURI.getPort());
Session session = connection.openSession();
ReceiverOptions receiverOptions = new ReceiverOptions().creditWindow(0);
Receiver receiver = session.openDynamicReceiver(receiverOptions).openFuture().get();
// Perform another round trip operation to ensure we see that no flow frame was
// sent by the receiver
session.openSender("test");
assertNotNull(receiver.address(), "Remote should have assigned the address for the dynamic receiver");
assertEquals("test-dynamic-node", receiver.address());
receiver.close();
connection.close();
peer.waitForScriptToComplete(5, TimeUnit.SECONDS);
}
}
@Test
public void testDynamicReceiverAddressWaitsForRemoteAttach() throws Exception {
tryReadDynamicReceiverAddress(true);
}
@Test
public void testDynamicReceiverAddressFailsAfterOpenTimeout() throws Exception {
tryReadDynamicReceiverAddress(false);
}
private void tryReadDynamicReceiverAddress(boolean attachResponse) throws Exception {
try (ProtonTestServer peer = new ProtonTestServer()) {
peer.expectSASLAnonymousConnect();
peer.expectOpen().respond();
peer.expectBegin().respond();
peer.expectAttach().withRole(Role.RECEIVER.getValue())
.withSource().withDynamic(true).withAddress((String) null);
peer.expectFlow();
peer.start();
URI remoteURI = peer.getServerURI();
LOG.info("Test started, peer listening on: {}", remoteURI);
Client container = Client.create();
ConnectionOptions options = new ConnectionOptions().openTimeout(100);
Connection connection = container.connect(remoteURI.getHost(), remoteURI.getPort(), options);
connection.openFuture().get();
Session session = connection.openSession();
session.openFuture().get();
Receiver receiver = session.openDynamicReceiver();
peer.waitForScriptToComplete(5, TimeUnit.SECONDS);
if (attachResponse) {
peer.expectDetach().respond();
peer.respondToLastAttach().withSource().withAddress("test-dynamic-node").and().later(10);
} else {
peer.expectDetach();
}
if (attachResponse) {
assertNotNull(receiver.address(), "Remote should have assigned the address for the dynamic receiver");
assertEquals("test-dynamic-node", receiver.address());
} else {
try {
receiver.address();
fail("Should failed to get address due to no attach response");
} catch (ClientException ex) {
LOG.debug("Caught expected exception from address call", ex);
}
}
try {
receiver.closeAsync().get();
} catch (ExecutionException ex) {
LOG.debug("Caught unexpected exception from close call", ex);
fail("Should not fail to close when connection not closed and detach sent");
}
peer.expectClose().respond();
connection.closeAsync().get();
peer.waitForScriptToComplete(5, TimeUnit.SECONDS);
}
}
@Test
public void testCreateReceiverWithQoSOfAtMostOnce() throws Exception {
doTestCreateReceiverWithConfiguredQoS(DeliveryMode.AT_MOST_ONCE);
}
@Test
public void testCreateReceiverWithQoSOfAtLeastOnce() throws Exception {
doTestCreateReceiverWithConfiguredQoS(DeliveryMode.AT_LEAST_ONCE);
}
private void doTestCreateReceiverWithConfiguredQoS(DeliveryMode qos) throws Exception {
byte sndMode = qos == DeliveryMode.AT_MOST_ONCE ? SenderSettleMode.SETTLED.byteValue() : SenderSettleMode.UNSETTLED.byteValue();
try (ProtonTestServer peer = new ProtonTestServer()) {
peer.expectSASLAnonymousConnect();
peer.expectOpen().respond();
peer.expectBegin().respond();
peer.expectAttach().withRole(Role.RECEIVER.getValue())
.withSndSettleMode(sndMode)
.withRcvSettleMode(ReceiverSettleMode.FIRST.byteValue())
.respond()
.withSndSettleMode(sndMode)
.withRcvSettleMode(ReceiverSettleMode.FIRST.byteValue());
peer.expectFlow();
peer.expectDetach().respond();
peer.expectClose().respond();
peer.start();
URI remoteURI = peer.getServerURI();
LOG.info("Test started, peer listening on: {}", remoteURI);
Client container = Client.create();
Connection connection = container.connect(remoteURI.getHost(), remoteURI.getPort());
connection.openFuture().get(10, TimeUnit.SECONDS);
Session session = connection.openSession();
session.openFuture().get(10, TimeUnit.SECONDS);
ReceiverOptions options = new ReceiverOptions().deliveryMode(qos);
Receiver receiver = session.openReceiver("test-qos", options);
receiver.openFuture().get(10, TimeUnit.SECONDS);
assertEquals("test-qos", receiver.address());
receiver.closeAsync().get(10, TimeUnit.SECONDS);
connection.closeAsync().get(10, TimeUnit.SECONDS);
peer.waitForScriptToComplete(5, TimeUnit.SECONDS);
}
}
@Test
public void testReceiverGetSourceWaitsForRemoteAttach() throws Exception {
tryReadReceiverSource(true);
}
@Test
public void testReceiverGetSourceFailsAfterOpenTimeout() throws Exception {
tryReadReceiverSource(false);
}
private void tryReadReceiverSource(boolean attachResponse) throws Exception {
try (ProtonTestServer peer = new ProtonTestServer()) {
peer.expectSASLAnonymousConnect();
peer.expectOpen().respond();
peer.expectBegin().respond();
peer.expectAttach().withRole(Role.RECEIVER.getValue());
peer.expectFlow();
peer.start();
URI remoteURI = peer.getServerURI();
LOG.info("Test started, peer listening on: {}", remoteURI);
Client container = Client.create();
ConnectionOptions options = new ConnectionOptions().openTimeout(100);
Connection connection = container.connect(remoteURI.getHost(), remoteURI.getPort(), options);
connection.openFuture().get();
Session session = connection.openSession();
session.openFuture().get();
Receiver receiver = session.openReceiver("test-receiver");
peer.waitForScriptToComplete(5, TimeUnit.SECONDS);
if (attachResponse) {
peer.expectDetach().respond();
peer.respondToLastAttach().later(10);
} else {
peer.expectDetach();
}
if (attachResponse) {
assertNotNull(receiver.source(), "Remote should have responded with a Source value");
assertEquals("test-receiver", receiver.source().address());
} else {
try {
receiver.source();
fail("Should failed to get remote source due to no attach response");
} catch (ClientException ex) {
LOG.debug("Caught expected exception from blocking call", ex);
}
}
try {
receiver.closeAsync().get();
} catch (ExecutionException ex) {
LOG.debug("Caught unexpected exception from close call", ex);
fail("Should not fail to close when connection not closed and detach sent");
}
peer.expectClose().respond();
connection.closeAsync().get();
peer.waitForScriptToComplete(5, TimeUnit.SECONDS);
}
}
@Test
public void testReceiverGetTargetWaitsForRemoteAttach() throws Exception {
tryReadReceiverTarget(true);
}
@Test
public void testReceiverGetTargetFailsAfterOpenTimeout() throws Exception {
tryReadReceiverTarget(false);
}
private void tryReadReceiverTarget(boolean attachResponse) throws Exception {
try (ProtonTestServer peer = new ProtonTestServer()) {
peer.expectSASLAnonymousConnect();
peer.expectOpen().respond();
peer.expectBegin().respond();
peer.expectAttach().withRole(Role.RECEIVER.getValue());
peer.expectFlow();
peer.start();
URI remoteURI = peer.getServerURI();
LOG.info("Test started, peer listening on: {}", remoteURI);
Client container = Client.create();
ConnectionOptions options = new ConnectionOptions().openTimeout(100);
Connection connection = container.connect(remoteURI.getHost(), remoteURI.getPort(), options);
connection.openFuture().get();
Session session = connection.openSession();
session.openFuture().get();
Receiver receiver = session.openReceiver("test-receiver");
peer.waitForScriptToComplete(5, TimeUnit.SECONDS);
if (attachResponse) {
peer.expectDetach().respond();
peer.respondToLastAttach().later(10);
} else {
peer.expectDetach();
}
if (attachResponse) {
assertNotNull(receiver.target(), "Remote should have responded with a Target value");
} else {
try {
receiver.target();
fail("Should failed to get remote source due to no attach response");
} catch (ClientException ex) {
LOG.debug("Caught expected exception from blocking call", ex);
}
}
try {
receiver.closeAsync().get();
} catch (ExecutionException ex) {
LOG.debug("Caught unexpected exception from close call", ex);
fail("Should not fail to close when connection not closed and detach sent");
}
peer.expectClose().respond();
connection.closeAsync().get();
peer.waitForScriptToComplete(5, TimeUnit.SECONDS);
}
}
@Test
public void testReceiverGetRemotePropertiesWaitsForRemoteAttach() throws Exception {
tryReadReceiverRemoteProperties(true);
}
@Test
public void testReceiverGetRemotePropertiesFailsAfterOpenTimeout() throws Exception {
tryReadReceiverRemoteProperties(false);
}
private void tryReadReceiverRemoteProperties(boolean attachResponse) throws Exception {
try (ProtonTestServer peer = new ProtonTestServer()) {
peer.expectSASLAnonymousConnect();
peer.expectOpen().respond();
peer.expectBegin().respond();
peer.expectAttach().withRole(Role.RECEIVER.getValue());
peer.expectFlow();
peer.start();
URI remoteURI = peer.getServerURI();
LOG.info("Test started, peer listening on: {}", remoteURI);
Client container = Client.create();
Connection connection = container.connect(remoteURI.getHost(), remoteURI.getPort());
Session session = connection.openSession();
session.openFuture().get();
ReceiverOptions options = new ReceiverOptions().openTimeout(100);
Receiver receiver = session.openReceiver("test-receiver", options);
peer.waitForScriptToComplete(5, TimeUnit.SECONDS);
Map<String, Object> expectedProperties = new HashMap<>();
expectedProperties.put("TEST", "test-property");
if (attachResponse) {
peer.expectDetach().respond();
peer.respondToLastAttach().withPropertiesMap(expectedProperties).later(10);
} else {
peer.expectDetach();
}
if (attachResponse) {
assertNotNull(receiver.properties(), "Remote should have responded with a remote properties value");
assertEquals(expectedProperties, receiver.properties());
} else {
try {
receiver.properties();
fail("Should failed to get remote state due to no attach response");
} catch (ClientException ex) {
LOG.debug("Caught expected exception from blocking call", ex);
}
}
try {
receiver.closeAsync().get();
} catch (ExecutionException ex) {
LOG.debug("Caught unexpected exception from close call", ex);
fail("Should not fail to close when connection not closed and detach sent");
}
peer.expectClose().respond();
connection.closeAsync().get();
peer.waitForScriptToComplete(5, TimeUnit.SECONDS);
}
}
@Test
public void testReceiverGetRemoteOfferedCapabilitiesWaitsForRemoteAttach() throws Exception {
tryReadReceiverRemoteOfferedCapabilities(true);
}
@Test
public void testReceiverGetRemoteOfferedCapabilitiesFailsAfterOpenTimeout() throws Exception {
tryReadReceiverRemoteOfferedCapabilities(false);
}
private void tryReadReceiverRemoteOfferedCapabilities(boolean attachResponse) throws Exception {
try (ProtonTestServer peer = new ProtonTestServer()) {
peer.expectSASLAnonymousConnect();
peer.expectOpen().respond();
peer.expectBegin().respond();
peer.expectAttach().withRole(Role.RECEIVER.getValue());
peer.expectFlow();
peer.start();
URI remoteURI = peer.getServerURI();
LOG.info("Test started, peer listening on: {}", remoteURI);
Client container = Client.create();
ConnectionOptions options = new ConnectionOptions().openTimeout(100);
Connection connection = container.connect(remoteURI.getHost(), remoteURI.getPort(), options);
connection.openFuture().get();
Session session = connection.openSession();
session.openFuture().get();
Receiver receiver = session.openReceiver("test-receiver");
peer.waitForScriptToComplete(5, TimeUnit.SECONDS);
if (attachResponse) {
peer.expectDetach().respond();
peer.respondToLastAttach().withOfferedCapabilities("QUEUE").later(10);
} else {
peer.expectDetach();
}
if (attachResponse) {
assertNotNull(receiver.offeredCapabilities(), "Remote should have responded with a remote offered Capabilities value");
assertEquals(1, receiver.offeredCapabilities().length);
assertEquals("QUEUE", receiver.offeredCapabilities()[0]);
} else {
try {
receiver.offeredCapabilities();
fail("Should failed to get remote state due to no attach response");
} catch (ClientException ex) {
LOG.debug("Caught expected exception from blocking call", ex);
}
}
try {
receiver.closeAsync().get();
} catch (ExecutionException ex) {
LOG.debug("Caught unexpected exception from close call", ex);
fail("Should not fail to close when connection not closed and detach sent");
}
peer.expectClose().respond();
connection.closeAsync().get();
peer.waitForScriptToComplete(5, TimeUnit.SECONDS);
}
}
@Test
public void testReceiverGetRemoteDesiredCapabilitiesWaitsForRemoteAttach() throws Exception {
tryReadReceiverRemoteDesiredCapabilities(true);
}
@Test
public void testReceiverGetRemoteDesiredCapabilitiesFailsAfterOpenTimeout() throws Exception {
tryReadReceiverRemoteDesiredCapabilities(false);
}
private void tryReadReceiverRemoteDesiredCapabilities(boolean attachResponse) throws Exception {
try (ProtonTestServer peer = new ProtonTestServer()) {
peer.expectSASLAnonymousConnect();
peer.expectOpen().respond();
peer.expectBegin().respond();
peer.expectAttach().withRole(Role.RECEIVER.getValue());
peer.expectFlow();
peer.start();
URI remoteURI = peer.getServerURI();
LOG.info("Test started, peer listening on: {}", remoteURI);
Client container = Client.create();
ConnectionOptions options = new ConnectionOptions().openTimeout(100);
Connection connection = container.connect(remoteURI.getHost(), remoteURI.getPort(), options);
connection.openFuture().get();
Session session = connection.openSession();
session.openFuture().get();
Receiver receiver = session.openReceiver("test-receiver");
peer.waitForScriptToComplete(5, TimeUnit.SECONDS);
if (attachResponse) {
peer.expectDetach().respond();
peer.respondToLastAttach().withDesiredCapabilities("Error-Free").later(10);
} else {
peer.expectDetach();
}
if (attachResponse) {
assertNotNull(receiver.desiredCapabilities(), "Remote should have responded with a remote desired Capabilities value");
assertEquals(1, receiver.desiredCapabilities().length);
assertEquals("Error-Free", receiver.desiredCapabilities()[0]);
} else {
try {
receiver.desiredCapabilities();
fail("Should failed to get remote state due to no attach response");
} catch (ClientException ex) {
LOG.debug("Caught expected exception from blocking call", ex);
}
}
try {
receiver.closeAsync().get();
} catch (ExecutionException ex) {
LOG.debug("Caught unexpected exception from close call", ex);
fail("Should not fail to close when connection not closed and detach sent");
}
peer.expectClose().respond();
connection.closeAsync().get();
peer.waitForScriptToComplete(5, TimeUnit.SECONDS);
}
}
@Test
public void testBlockingReceiveCancelledWhenReceiverClosed() throws Exception {
doTestBlockingReceiveCancelledWhenReceiverClosedOrDetached(true);
}
@Test
public void testBlockingReceiveCancelledWhenReceiverDetached() throws Exception {
doTestBlockingReceiveCancelledWhenReceiverClosedOrDetached(false);
}
public void doTestBlockingReceiveCancelledWhenReceiverClosedOrDetached(boolean close) throws Exception {
try (ProtonTestServer peer = new ProtonTestServer()) {
peer.expectSASLAnonymousConnect();
peer.expectOpen().respond();
peer.expectBegin().respond();
peer.expectAttach().withRole(Role.RECEIVER.getValue()).respond();
peer.start();
URI remoteURI = peer.getServerURI();
LOG.info("Test started, peer listening on: {}", remoteURI);
Client container = Client.create();
Connection connection = container.connect(remoteURI.getHost(), remoteURI.getPort());
Session session = connection.openSession();
ReceiverOptions options = new ReceiverOptions().creditWindow(0);
Receiver receiver = session.openReceiver("test-queue", options);
receiver.openFuture().get();
peer.waitForScriptToComplete(5, TimeUnit.SECONDS);
peer.expectFlow().withLinkCredit(10);
peer.execute(() -> {
if (close) {
receiver.closeAsync();
} else {
receiver.detachAsync();
}
}).queue();
peer.expectDetach().withClosed(close).respond();
peer.expectClose().respond();
receiver.addCredit(10);
try {
receiver.receive();
fail("Should throw to indicate that receiver was closed");
} catch (ClientException ise) {
}
connection.closeAsync().get();
peer.waitForScriptToComplete(5, TimeUnit.SECONDS);
}
}
@Test
public void testBlockingReceiveCancelledWhenReceiverRemotelyClosed() throws Exception {
doTestBlockingReceiveCancelledWhenReceiverRemotelyClosedOrDetached(true);
}
@Test
public void testBlockingReceiveCancelledWhenReceiverRemotelyDetached() throws Exception {
doTestBlockingReceiveCancelledWhenReceiverRemotelyClosedOrDetached(false);
}
public void doTestBlockingReceiveCancelledWhenReceiverRemotelyClosedOrDetached(boolean close) throws Exception {
try (ProtonTestServer peer = new ProtonTestServer()) {
peer.expectSASLAnonymousConnect();
peer.expectOpen().respond();
peer.expectBegin().respond();
peer.expectAttach().withRole(Role.RECEIVER.getValue()).respond();
peer.expectFlow().withLinkCredit(10);
peer.remoteDetach().withClosed(close)
.withErrorCondition(AmqpError.RESOURCE_DELETED.toString(), "Address was manually deleted")
.afterDelay(10).queue();
peer.expectDetach().withClosed(close);
peer.expectClose().respond();
peer.start();
URI remoteURI = peer.getServerURI();
LOG.info("Test started, peer listening on: {}", remoteURI);
Client container = Client.create();
Connection connection = container.connect(remoteURI.getHost(), remoteURI.getPort());
Session session = connection.openSession();
Receiver receiver = session.openReceiver("test-queue");
receiver.openFuture().get();
try {
receiver.receive();
fail("Client should throw to indicate remote closed the receiver forcibly.");
} catch (ClientIllegalStateException ise) {
}
connection.closeAsync().get();
peer.waitForScriptToComplete(5, TimeUnit.SECONDS);
}
}
@Test
public void testCloseReceiverWithErrorCondition() throws Exception {
doTestCloseOrDetachWithErrorCondition(true);
}
@Test
public void testDetachReceiverWithErrorCondition() throws Exception {
doTestCloseOrDetachWithErrorCondition(false);
}
public void doTestCloseOrDetachWithErrorCondition(boolean close) throws Exception {
final String condition = "amqp:link:detach-forced";
final String description = "something bad happened.";
try (ProtonTestServer peer = new ProtonTestServer()) {
peer.expectSASLAnonymousConnect();
peer.expectOpen().respond();
peer.expectBegin().respond();
peer.expectAttach().withRole(Role.RECEIVER.getValue()).respond();
peer.expectFlow();
peer.expectDetach().withClosed(close).withError(condition, description).respond();
peer.expectClose().respond();
peer.start();
URI remoteURI = peer.getServerURI();
LOG.info("Test started, peer listening on: {}", remoteURI);
Client container = Client.create();
Connection connection = container.connect(remoteURI.getHost(), remoteURI.getPort());
Session session = connection.openSession();
final Receiver receiver = session.openReceiver("test-queue");
receiver.openFuture().get();
if (close) {
receiver.closeAsync(ErrorCondition.create(condition, description, null));
} else {
receiver.detachAsync(ErrorCondition.create(condition, description, null));
}
connection.closeAsync().get();
peer.waitForScriptToComplete(5, TimeUnit.SECONDS);
}
}
@Test
public void testOpenReceiverWithLinCapabilities() throws Exception {
try (ProtonTestServer peer = new ProtonTestServer()) {
peer.expectSASLAnonymousConnect();
peer.expectOpen().respond();
peer.expectBegin().respond();
peer.expectAttach().withRole(Role.RECEIVER.getValue())
.withSource().withCapabilities("queue").and()
.respond();
peer.expectFlow();
peer.expectDetach().respond();
peer.expectClose().respond();
peer.start();
URI remoteURI = peer.getServerURI();
LOG.info("Receiver test started, peer listening on: {}", remoteURI);
Client container = Client.create();
Connection connection = container.connect(remoteURI.getHost(), remoteURI.getPort());
Session session = connection.openSession().openFuture().get(10, TimeUnit.SECONDS);
ReceiverOptions receiverOptions = new ReceiverOptions();
receiverOptions.sourceOptions().capabilities("queue");
Receiver receiver = session.openReceiver("test-queue", receiverOptions);
receiver.openFuture().get();
receiver.close();
connection.closeAsync().get(10, TimeUnit.SECONDS);
peer.waitForScriptToComplete(5, TimeUnit.SECONDS);
}
}
@Test
public void testReceiveMessageInSplitTransferFrames() throws Exception {
try (ProtonTestServer peer = new ProtonTestServer()) {
peer.expectSASLAnonymousConnect();
peer.expectOpen().respond();
peer.expectBegin().respond();
peer.expectAttach().withRole(Role.RECEIVER.getValue()).respond();
peer.expectFlow();
peer.start();
URI remoteURI = peer.getServerURI();
LOG.info("Test started, peer listening on: {}", remoteURI);
Client container = Client.create();
Connection connection = container.connect(remoteURI.getHost(), remoteURI.getPort());
Session session = connection.openSession();
final Receiver receiver = session.openReceiver("test-queue");
receiver.openFuture().get();
final byte[] payload = createEncodedMessage(new AmqpValue<>("Hello World"));
final byte[] slice1 = Arrays.copyOfRange(payload, 0, 2);
final byte[] slice2 = Arrays.copyOfRange(payload, 2, 4);
final byte[] slice3 = Arrays.copyOfRange(payload, 4, payload.length);
peer.remoteTransfer().withHandle(0)
.withDeliveryId(0)
.withDeliveryTag(new byte[] { 1 })
.withMore(true)
.withMessageFormat(0)
.withPayload(slice1).now();
assertNull(receiver.tryReceive());
peer.remoteTransfer().withHandle(0)
.withMore(true)
.withMessageFormat(0)
.withPayload(slice2).now();
assertNull(receiver.tryReceive());
peer.remoteTransfer().withHandle(0)
.withMore(false)
.withMessageFormat(0)
.withPayload(slice3).now();
peer.expectDisposition().withSettled(true).withState().accepted();
peer.expectDetach().respond();
peer.expectClose().respond();
Delivery delivery = receiver.receive();
assertNotNull(delivery);
Message<?> received = delivery.message();
assertNotNull(received);
assertTrue(received.body() instanceof String);
String value = (String) received.body();
assertEquals("Hello World", value);
delivery.accept();
receiver.closeAsync();
connection.closeAsync().get();
peer.waitForScriptToComplete(5, TimeUnit.SECONDS);
}
}
@Test
public void testReceiverHandlesAbortedSplitFrameTransfer() throws Exception {
final byte[] payload = createEncodedMessage(new AmqpValue<>("Hello World"));
try (ProtonTestServer peer = new ProtonTestServer()) {
peer.expectSASLAnonymousConnect();
peer.expectOpen().respond();
peer.expectBegin().respond();
peer.expectAttach().withRole(Role.RECEIVER.getValue()).respond();
peer.expectFlow();
peer.remoteTransfer().withHandle(0)
.withDeliveryId(0)
.withDeliveryTag(new byte[] { 1 })
.withMore(true)
.withMessageFormat(0)
.withPayload(payload).queue();
peer.start();
URI remoteURI = peer.getServerURI();
LOG.info("Test started, peer listening on: {}", remoteURI);
Client container = Client.create();
Connection connection = container.connect(remoteURI.getHost(), remoteURI.getPort());
Session session = connection.openSession();
final Receiver receiver = session.openReceiver("test-queue");
peer.waitForScriptToComplete(5, TimeUnit.SECONDS);
assertNull(receiver.receive(15, TimeUnit.MILLISECONDS));
peer.expectDetach().respond();
peer.expectClose().respond();
peer.remoteTransfer().withHandle(0)
.withMore(false)
.withAborted(true)
.withMessageFormat(0)
.withPayload(payload).now();
assertNull(receiver.receive(15, TimeUnit.MILLISECONDS));
receiver.closeAsync();
connection.closeAsync().get();
peer.waitForScriptToComplete(5, TimeUnit.SECONDS);
}
}
@Test
public void testReceiverAddCreditOnAbortedTransferWhenNeeded() throws Exception {
try (ProtonTestServer peer = new ProtonTestServer()) {
peer.expectSASLAnonymousConnect();
peer.expectOpen().respond();
peer.expectBegin().respond();
peer.expectAttach().withRole(Role.RECEIVER.getValue()).respond();
peer.expectFlow();
peer.start();
URI remoteURI = peer.getServerURI();
LOG.info("Test started, peer listening on: {}", remoteURI);
Client container = Client.create();
Connection connection = container.connect(remoteURI.getHost(), remoteURI.getPort());
Session session = connection.openSession();
ReceiverOptions options = new ReceiverOptions();
options.creditWindow(1);
final Receiver receiver = session.openReceiver("test-queue", options);
receiver.openFuture().get();
final byte[] payload = createEncodedMessage(new AmqpValue<>("Hello World"));
peer.remoteTransfer().withHandle(0)
.withDeliveryId(0)
.withDeliveryTag(new byte[] { 1 })
.withMore(true)
.withMessageFormat(0)
.withPayload(payload).now();
assertNull(receiver.tryReceive());
peer.expectFlow();
peer.remoteTransfer().withHandle(0)
.withDeliveryId(1)
.withMessageFormat(0)
.withMore(false)
.withPayload(payload).queue();
peer.expectDisposition().withSettled(true).withState().accepted();
peer.expectFlow();
peer.expectDetach().respond();
peer.expectClose().respond();
// Send final aborted transfer to complete first transfer and allow next to commence.
peer.remoteTransfer().withHandle(0)
.withDeliveryId(0)
.withMore(false)
.withAborted(true)
.withMessageFormat(0)
.withPayload(payload).now();
Delivery delivery = receiver.receive();
assertNotNull(delivery);
Message<?> received = delivery.message();
assertNotNull(received);
assertTrue(received.body() instanceof String);
String value = (String) received.body();
assertEquals("Hello World", value);
receiver.closeAsync();
connection.closeAsync().get();
peer.waitForScriptToComplete(5, TimeUnit.SECONDS);
}
}
@Test
public void testReceiverHandlesAbortedSplitFrameTransferAndReplenishesCredit() throws Exception {
final byte[] payload = createEncodedMessage(new AmqpValue<>("Hello World"));
try (ProtonTestServer peer = new ProtonTestServer()) {
peer.expectSASLAnonymousConnect();
peer.expectOpen().respond();
peer.expectBegin().respond();
peer.expectAttach().withRole(Role.RECEIVER.getValue()).respond();
peer.expectFlow().withLinkCredit(1);
peer.remoteTransfer().withHandle(0)
.withDeliveryId(0)
.withDeliveryTag(new byte[] { 1 })
.withMore(true)
.withMessageFormat(0)
.withPayload(payload).queue();
peer.start();
URI remoteURI = peer.getServerURI();
LOG.info("Test started, peer listening on: {}", remoteURI);
Client container = Client.create();
Connection connection = container.connect(remoteURI.getHost(), remoteURI.getPort());
Session session = connection.openSession();
ReceiverOptions options = new ReceiverOptions();
options.creditWindow(1);
final Receiver receiver = session.openReceiver("test-queue", options);
peer.waitForScriptToComplete(5, TimeUnit.SECONDS);
assertNull(receiver.receive(15, TimeUnit.MILLISECONDS));
// Credit window is one and next transfer signals aborted so receiver should
// top-up the credit window to allow more transfers to arrive.
peer.expectFlow().withLinkCredit(1);
peer.expectDetach().respond();
peer.expectClose().respond();
// Abort the delivery which should result in a credit top-up.
peer.remoteTransfer().withHandle(0)
.withMore(false)
.withAborted(true)
.withMessageFormat(0)
.withPayload(payload).now();
assertNull(receiver.receive(15, TimeUnit.MILLISECONDS));
receiver.closeAsync();
connection.closeAsync().get();
peer.waitForScriptToComplete(5, TimeUnit.SECONDS);
}
}
@Test
public void testReceiveCallFailsWhenReceiverPreviouslyClosed() throws Exception {
doTestReceiveCallFailsWhenReceiverDetachedOrClosed(true);
}
@Test
public void testReceiveCallFailsWhenReceiverPreviouslyDetached() throws Exception {
doTestReceiveCallFailsWhenReceiverDetachedOrClosed(false);
}
private void doTestReceiveCallFailsWhenReceiverDetachedOrClosed(boolean close) throws Exception {
try (ProtonTestServer peer = new ProtonTestServer()) {
peer.expectSASLAnonymousConnect();
peer.expectOpen().respond();
peer.expectBegin().respond();
peer.expectAttach().ofReceiver().respond();
peer.expectFlow().withLinkCredit(10);
peer.expectDetach().withClosed(close).respond();
peer.start();
URI remoteURI = peer.getServerURI();
LOG.info("Test started, peer listening on: {}", remoteURI);
Client container = Client.create();
Connection connection = container.connect(remoteURI.getHost(), remoteURI.getPort());
Session session = connection.openSession();
Receiver receiver = session.openReceiver("test-queue").openFuture().get();
if (close) {
receiver.closeAsync();
} else {
receiver.detachAsync();
}
peer.waitForScriptToComplete(5, TimeUnit.SECONDS);
try {
receiver.receive();
fail("Receive call should fail when link closed or detached.");
} catch (ClientIllegalStateException cliEx) {
LOG.debug("Receiver threw error on receive call", cliEx);
}
peer.expectClose().respond();
connection.closeAsync().get();
peer.waitForScriptToComplete(5, TimeUnit.SECONDS);
}
}
@Test
public void testReceiveBlockedForMessageFailsWhenConnectionRemotelyClosed() throws Exception {
try (ProtonTestServer peer = new ProtonTestServer()) {
peer.expectSASLAnonymousConnect();
peer.expectOpen().respond();
peer.expectBegin().respond();
peer.expectAttach().ofReceiver().respond();
peer.expectFlow().withLinkCredit(10);
peer.remoteClose().withErrorCondition(AmqpError.RESOURCE_DELETED.toString(), "Connection was deleted").afterDelay(25).queue();
peer.expectClose();
peer.start();
URI remoteURI = peer.getServerURI();
LOG.info("Receiver test started, peer listening on: {}", remoteURI);
Client container = Client.create();
Connection connection = container.connect(remoteURI.getHost(), remoteURI.getPort());
Session session = connection.openSession();
Receiver receiver = session.openReceiver("test-queue");
receiver.openFuture().get();
try {
receiver.receive();
fail("Receive should have failed when Connection remotely closed.");
} catch (ClientConnectionRemotelyClosedException cliEx) {
// Expected
}
connection.closeAsync().get();
peer.waitForScriptToComplete(5, TimeUnit.SECONDS);
}
}
@Test
public void testTimedReceiveBlockedForMessageFailsWhenConnectionRemotelyClosed() throws Exception {
try (ProtonTestServer peer = new ProtonTestServer()) {
peer.expectSASLAnonymousConnect();
peer.expectOpen().respond();
peer.expectBegin().respond();
peer.expectAttach().ofReceiver().respond();
peer.expectFlow().withLinkCredit(10);
peer.remoteClose().withErrorCondition(AmqpError.RESOURCE_DELETED.toString(), "Connection was deleted").afterDelay(25).queue();
peer.expectClose();
peer.start();
URI remoteURI = peer.getServerURI();
LOG.info("Receiver test started, peer listening on: {}", remoteURI);
Client container = Client.create();
Connection connection = container.connect(remoteURI.getHost(), remoteURI.getPort());
Session session = connection.openSession();
Receiver receiver = session.openReceiver("test-queue");
receiver.openFuture().get();
try {
receiver.receive(10, TimeUnit.SECONDS);
fail("Receive should have failed when Connection remotely closed.");
} catch (ClientConnectionRemotelyClosedException cliEx) {
// Expected send to throw indicating that the remote closed the connection
}
connection.closeAsync().get();
peer.waitForScriptToComplete(5, TimeUnit.SECONDS);
}
}
@Test
public void testReceiveTimedCallFailsWhenReceiverClosed() throws Exception {
doTestReceiveTimedCallFailsWhenReceiverDetachedOrClosed(true);
}
@Test
public void testReceiveTimedCallFailsWhenReceiverDetached() throws Exception {
doTestReceiveTimedCallFailsWhenReceiverDetachedOrClosed(false);
}
private void doTestReceiveTimedCallFailsWhenReceiverDetachedOrClosed(boolean close) throws Exception {
try (ProtonTestServer peer = new ProtonTestServer()) {
peer.expectSASLAnonymousConnect();
peer.expectOpen().respond();
peer.expectBegin().respond();
peer.expectAttach().ofReceiver().respond();
peer.expectFlow().withLinkCredit(10);
peer.expectDetach().withClosed(close).respond();
peer.start();
URI remoteURI = peer.getServerURI();
LOG.info("Test started, peer listening on: {}", remoteURI);
Client container = Client.create();
Connection connection = container.connect(remoteURI.getHost(), remoteURI.getPort());
Session session = connection.openSession();
Receiver receiver = session.openReceiver("test-queue").openFuture().get();
if (close) {
receiver.closeAsync();
} else {
receiver.detachAsync();
}
peer.waitForScriptToComplete(5, TimeUnit.SECONDS);
try {
receiver.receive(60, TimeUnit.SECONDS);
fail("Receive call should fail when link closed or detached.");
} catch (ClientIllegalStateException cliEx) {
LOG.debug("Receiver threw error on receive call", cliEx);
}
peer.expectClose().respond();
connection.closeAsync().get();
peer.waitForScriptToComplete(5, TimeUnit.SECONDS);
}
}
@Test
public void testDrainFutureSignalsFailureWhenReceiverClosed() throws Exception {
doTestDrainFutureSignalsFailureWhenReceiverClosedOrDetached(true);
}
@Test
public void testDrainFutureSignalsFailureWhenReceiverDetached() throws Exception {
doTestDrainFutureSignalsFailureWhenReceiverClosedOrDetached(false);
}
private void doTestDrainFutureSignalsFailureWhenReceiverClosedOrDetached(boolean close) throws Exception {
try (ProtonTestServer peer = new ProtonTestServer()) {
peer.expectSASLAnonymousConnect();
peer.expectOpen().respond();
peer.expectBegin().respond();
peer.expectAttach().ofReceiver().respond();
peer.expectFlow().withLinkCredit(10);
peer.start();
URI remoteURI = peer.getServerURI();
LOG.info("Test started, peer listening on: {}", remoteURI);
Client container = Client.create();
Connection connection = container.connect(remoteURI.getHost(), remoteURI.getPort());
Session session = connection.openSession();
Receiver receiver = session.openReceiver("test-queue").openFuture().get();
peer.waitForScriptToComplete(5, TimeUnit.SECONDS);
peer.expectFlow().withDrain(true).withLinkCredit(10);
peer.execute(() -> {
if (close) {
receiver.closeAsync();
} else {
receiver.detachAsync();
}
}).queue();
peer.expectDetach().withClosed(close).respond();
peer.expectClose().respond();
try {
receiver.drain().get(10, TimeUnit.SECONDS);
fail("Drain call should fail when link closed or detached.");
} catch (ExecutionException cliEx) {
LOG.debug("Receiver threw error on drain call", cliEx);
assertTrue(cliEx.getCause() instanceof ClientException);
}
connection.closeAsync().get();
peer.waitForScriptToComplete(5, TimeUnit.SECONDS);
}
}
@Test
public void testDrainFutureSignalsFailureWhenReceiverRemotelyClosed() throws Exception {
doTestDrainFutureSignalsFailureWhenReceiverRemotelyClosedOrDetached(true);
}
@Test
public void testDrainFutureSignalsFailureWhenReceiverRemotelyDetached() throws Exception {
doTestDrainFutureSignalsFailureWhenReceiverRemotelyClosedOrDetached(false);
}
private void doTestDrainFutureSignalsFailureWhenReceiverRemotelyClosedOrDetached(boolean close) throws Exception {
try (ProtonTestServer peer = new ProtonTestServer()) {
peer.expectSASLAnonymousConnect();
peer.expectOpen().respond();
peer.expectBegin().respond();
peer.expectAttach().ofReceiver().respond();
peer.expectFlow().withLinkCredit(10);
peer.start();
URI remoteURI = peer.getServerURI();
LOG.info("Test started, peer listening on: {}", remoteURI);
Client container = Client.create();
Connection connection = container.connect(remoteURI.getHost(), remoteURI.getPort());
Session session = connection.openSession();
Receiver receiver = session.openReceiver("test-queue").openFuture().get();
peer.waitForScriptToComplete(5, TimeUnit.SECONDS);
peer.expectFlow().withDrain(true).withLinkCredit(10);
peer.remoteDetach().withClosed(close)
.withErrorCondition(AmqpError.RESOURCE_DELETED.toString(), "Address was manually deleted").queue();
peer.expectDetach().withClosed(close);
peer.expectClose().respond();
try {
receiver.drain().get(10, TimeUnit.SECONDS);
fail("Drain call should fail when link closed or detached.");
} catch (ExecutionException cliEx) {
LOG.debug("Receiver threw error on drain call", cliEx);
assertTrue(cliEx.getCause() instanceof ClientException);
}
connection.closeAsync().get();
peer.waitForScriptToComplete(5, TimeUnit.SECONDS);
}
}
@Test
public void testDrainFutureSignalsFailureWhenSessionRemotelyClosed() throws Exception {
try (ProtonTestServer peer = new ProtonTestServer()) {
peer.expectSASLAnonymousConnect();
peer.expectOpen().respond();
peer.expectBegin().respond();
peer.expectAttach().ofReceiver().respond();
peer.expectFlow();
peer.expectFlow().withDrain(true);
peer.remoteEnd().withErrorCondition(AmqpError.RESOURCE_DELETED.toString(), "Session was closed").afterDelay(5).queue();
peer.expectEnd();
peer.expectClose().respond();
peer.start();
URI remoteURI = peer.getServerURI();
LOG.info("Test started, peer listening on: {}", remoteURI);
Client container = Client.create();
Connection connection = container.connect(remoteURI.getHost(), remoteURI.getPort());
Session session = connection.openSession();
Receiver receiver = session.openReceiver("test-queue").openFuture().get();
try {
receiver.drain().get(10, TimeUnit.SECONDS);
fail("Drain call should fail when session closed by remote.");
} catch (ExecutionException cliEx) {
LOG.debug("Receiver threw error on drain call", cliEx);
assertTrue(cliEx.getCause() instanceof ClientException);
}
connection.closeAsync().get();
peer.waitForScriptToComplete(5, TimeUnit.SECONDS);
}
}
@Test
public void testDrainFutureSignalsFailureWhenConnectionDrops() throws Exception {
try (ProtonTestServer peer = new ProtonTestServer()) {
peer.expectSASLAnonymousConnect();
peer.expectOpen().respond();
peer.expectBegin().respond();
peer.expectAttach().ofReceiver().respond();
peer.expectFlow();
peer.expectFlow().withDrain(true);
peer.dropAfterLastHandler();
peer.start();
URI remoteURI = peer.getServerURI();
LOG.info("Test started, peer listening on: {}", remoteURI);
Client container = Client.create();
Connection connection = container.connect(remoteURI.getHost(), remoteURI.getPort());
Session session = connection.openSession();
Receiver receiver = session.openReceiver("test-queue").openFuture().get();
try {
receiver.drain().get(10, TimeUnit.SECONDS);
fail("Drain call should fail when the connection drops.");
} catch (ExecutionException cliEx) {
LOG.debug("Receiver threw error on drain call", cliEx);
assertTrue(cliEx.getCause() instanceof ClientException);
}
connection.closeAsync().get();
peer.waitForScriptToComplete(5, TimeUnit.SECONDS);
}
}
@Test
public void testDrainFutureSignalsFailureWhenDrainTimeoutExceeded() throws Exception {
try (ProtonTestServer peer = new ProtonTestServer()) {
peer.expectSASLAnonymousConnect();
peer.expectOpen().respond();
peer.expectBegin().respond();
peer.expectAttach().ofReceiver().respond();
peer.expectFlow();
peer.expectFlow().withDrain(true);
peer.expectClose().respond();
peer.start();
URI remoteURI = peer.getServerURI();
LOG.info("Test started, peer listening on: {}", remoteURI);
Client container = Client.create();
Connection connection = container.connect(remoteURI.getHost(), remoteURI.getPort());
Session session = connection.openSession();
ReceiverOptions receiverOptions = new ReceiverOptions().drainTimeout(15);
Receiver receiver = session.openReceiver("test-queue", receiverOptions).openFuture().get();
try {
receiver.drain().get();
fail("Drain call should fail timeout exceeded.");
} catch (ExecutionException cliEx) {
LOG.debug("Receiver threw error on drain call", cliEx);
assertTrue(cliEx.getCause() instanceof ClientOperationTimedOutException);
}
connection.closeAsync().get();
peer.waitForScriptToComplete(5, TimeUnit.SECONDS);
}
}
@Test
public void testDrainFutureSignalsFailureWhenSessionDrainTimeoutExceeded() throws Exception {
try (ProtonTestServer peer = new ProtonTestServer()) {
peer.expectSASLAnonymousConnect();
peer.expectOpen().respond();
peer.expectBegin().respond();
peer.expectAttach().ofReceiver().respond();
peer.expectFlow();
peer.expectFlow().withDrain(true);
peer.expectClose().respond();
peer.start();
URI remoteURI = peer.getServerURI();
LOG.info("Test started, peer listening on: {}", remoteURI);
Client container = Client.create();
Connection connection = container.connect(remoteURI.getHost(), remoteURI.getPort());
SessionOptions sessionOptions = new SessionOptions().drainTimeout(20);
Session session = connection.openSession(sessionOptions);
Receiver receiver = session.openReceiver("test-queue").openFuture().get();
try {
receiver.drain().get();
fail("Drain call should fail timeout exceeded.");
} catch (ExecutionException cliEx) {
LOG.debug("Receiver threw error on drain call", cliEx);
assertTrue(cliEx.getCause() instanceof ClientOperationTimedOutException);
}
connection.closeAsync().get();
peer.waitForScriptToComplete(5, TimeUnit.SECONDS);
}
}
@Test
public void testDrainFutureSignalsFailureWhenConnectionDrainTimeoutExceeded() throws Exception {
try (ProtonTestServer peer = new ProtonTestServer()) {
peer.expectSASLAnonymousConnect();
peer.expectOpen().respond();
peer.expectBegin().respond();
peer.expectAttach().ofReceiver().respond();
peer.expectFlow();
peer.expectFlow().withDrain(true);
peer.expectClose().respond();
peer.start();
URI remoteURI = peer.getServerURI();
LOG.info("Test started, peer listening on: {}", remoteURI);
Client container = Client.create();
ConnectionOptions connectionOptions = new ConnectionOptions().drainTimeout(20);
Connection connection = container.connect(remoteURI.getHost(), remoteURI.getPort(), connectionOptions);
Session session = connection.openSession();
Receiver receiver = session.openReceiver("test-queue").openFuture().get();
try {
receiver.drain().get();
fail("Drain call should fail timeout exceeded.");
} catch (ExecutionException cliEx) {
LOG.debug("Receiver threw error on drain call", cliEx);
assertTrue(cliEx.getCause() instanceof ClientOperationTimedOutException);
}
connection.closeAsync().get();
peer.waitForScriptToComplete(5, TimeUnit.SECONDS);
}
}
@Test
public void testBlockedReceiveThrowsConnectionRemotelyClosedError() throws Exception {
try (ProtonTestServer peer = new ProtonTestServer()) {
peer.expectSASLAnonymousConnect();
peer.expectOpen().respond();
peer.expectBegin().respond();
peer.expectAttach().ofReceiver().withSource().withAddress("test").and().respond();
peer.expectFlow();
peer.dropAfterLastHandler(25);
peer.start();
URI remoteURI = peer.getServerURI();
Client container = Client.create();
Connection connection = container.connect(remoteURI.getHost(), remoteURI.getPort());
Session session = connection.openSession();
Receiver receiver = session.openReceiver("test");
try {
receiver.receive();
fail("Receive should fail with remotely closed error after remote drops");
} catch (ClientConnectionRemotelyClosedException cliEx) {
}
connection.closeAsync().get();
peer.waitForScriptToComplete(5, TimeUnit.SECONDS);
}
}
@Test
public void testDeliveryRefusesRawStreamAfterMessage() throws Exception {
final byte[] payload = createEncodedMessage(new AmqpValue<>("Hello World"));
try (ProtonTestServer peer = new ProtonTestServer()) {
peer.expectSASLAnonymousConnect();
peer.expectOpen().respond();
peer.expectBegin().respond();
peer.expectAttach().withRole(Role.RECEIVER.getValue()).respond();
peer.expectFlow().withLinkCredit(10);
peer.remoteTransfer().withHandle(0)
.withDeliveryId(0)
.withDeliveryTag(new byte[] { 1 })
.withMore(false)
.withSettled(true)
.withMessageFormat(0)
.withPayload(payload).queue();
peer.start();
URI remoteURI = peer.getServerURI();
LOG.info("Test started, peer listening on: {}", remoteURI);
Client container = Client.create();
Connection connection = container.connect(remoteURI.getHost(), remoteURI.getPort());
Session session = connection.openSession();
Receiver receiver = session.openReceiver("test-queue");
peer.waitForScriptToComplete(5, TimeUnit.SECONDS);
peer.expectDetach().respond();
peer.expectClose().respond();
Delivery delivery = receiver.receive(5, TimeUnit.SECONDS);
assertNotNull(delivery);
Message<String> message = delivery.message();
assertNotNull(message);
try {
delivery.rawInputStream();
fail("Should not be able to use the inputstream once message is requested");
} catch (ClientIllegalStateException cliEx) {
// Expected
}
receiver.closeAsync();
connection.closeAsync().get();
peer.waitForScriptToComplete(5, TimeUnit.SECONDS);
}
}
@Test
public void testDeliveryRefusesRawStreamAfterAnnotations() throws Exception {
final byte[] payload = createEncodedMessage(new AmqpValue<>("Hello World"));
try (ProtonTestServer peer = new ProtonTestServer()) {
peer.expectSASLAnonymousConnect();
peer.expectOpen().respond();
peer.expectBegin().respond();
peer.expectAttach().withRole(Role.RECEIVER.getValue()).respond();
peer.expectFlow().withLinkCredit(10);
peer.remoteTransfer().withHandle(0)
.withDeliveryId(0)
.withDeliveryTag(new byte[] { 1 })
.withMore(false)
.withSettled(true)
.withMessageFormat(0)
.withPayload(payload).queue();
peer.start();
URI remoteURI = peer.getServerURI();
LOG.info("Test started, peer listening on: {}", remoteURI);
Client container = Client.create();
Connection connection = container.connect(remoteURI.getHost(), remoteURI.getPort());
Session session = connection.openSession();
Receiver receiver = session.openReceiver("test-queue");
peer.waitForScriptToComplete(5, TimeUnit.SECONDS);
peer.expectDetach().respond();
peer.expectClose().respond();
Delivery delivery = receiver.receive(5, TimeUnit.SECONDS);
assertNotNull(delivery);
assertNull(delivery.annotations());
try {
delivery.rawInputStream();
fail("Should not be able to use the inputstream once message is requested");
} catch (ClientIllegalStateException cliEx) {
// Expected
}
receiver.closeAsync();
connection.closeAsync().get();
peer.waitForScriptToComplete(5, TimeUnit.SECONDS);
}
}
@Test
public void testDeliveryRefusesMessageDecodeOnceRawInputStreamIsRequested() throws Exception {
final byte[] payload = createEncodedMessage(new AmqpValue<>("Hello World"));
try (ProtonTestServer peer = new ProtonTestServer()) {
peer.expectSASLAnonymousConnect();
peer.expectOpen().respond();
peer.expectBegin().respond();
peer.expectAttach().withRole(Role.RECEIVER.getValue()).respond();
peer.expectFlow().withLinkCredit(10);
peer.remoteTransfer().withHandle(0)
.withDeliveryId(0)
.withDeliveryTag(new byte[] { 1 })
.withMore(false)
.withSettled(true)
.withMessageFormat(0)
.withPayload(payload).queue();
peer.start();
URI remoteURI = peer.getServerURI();
LOG.info("Test started, peer listening on: {}", remoteURI);
Client container = Client.create();
Connection connection = container.connect(remoteURI.getHost(), remoteURI.getPort());
Session session = connection.openSession();
Receiver receiver = session.openReceiver("test-queue");
peer.waitForScriptToComplete(5, TimeUnit.SECONDS);
peer.expectDetach().respond();
peer.expectClose().respond();
Delivery delivery = receiver.receive(5, TimeUnit.SECONDS);
assertNotNull(delivery);
InputStream stream = delivery.rawInputStream();
assertNotNull(stream);
assertEquals(payload.length, stream.available());
byte[] bytesRead = new byte[payload.length];
assertEquals(payload.length, stream.read(bytesRead));
assertArrayEquals(payload, bytesRead);
try {
delivery.message();
fail("Should not be able to use the message API once raw stream is requested");
} catch (ClientIllegalStateException cliEx) {
// Expected
}
try {
delivery.annotations();
fail("Should not be able to use the annotations API once raw stream is requested");
} catch (ClientIllegalStateException cliEx) {
// Expected
}
receiver.closeAsync();
connection.closeAsync().get();
peer.waitForScriptToComplete(5, TimeUnit.SECONDS);
}
}
@Test
public void testReceiveDeliveryWithMultipleDataSections() throws Exception {
final Data section1 = new Data(new byte[] { 0, 1, 2, 3 });
final Data section2 = new Data(new byte[] { 0, 1, 2, 3 });
final Data section3 = new Data(new byte[] { 0, 1, 2, 3 });
final byte[] payload = createEncodedMessage(section1, section2, section3);
try (ProtonTestServer peer = new ProtonTestServer()) {
peer.expectSASLAnonymousConnect();
peer.expectOpen().respond();
peer.expectBegin().respond();
peer.expectAttach().withRole(Role.RECEIVER.getValue()).respond();
peer.expectFlow().withLinkCredit(10);
peer.remoteTransfer().withHandle(0)
.withDeliveryId(0)
.withDeliveryTag(new byte[] { 1 })
.withMore(false)
.withSettled(true)
.withMessageFormat(0)
.withPayload(payload).queue();
peer.start();
URI remoteURI = peer.getServerURI();
LOG.info("Test started, peer listening on: {}", remoteURI);
Client container = Client.create();
Connection connection = container.connect(remoteURI.getHost(), remoteURI.getPort());
Session session = connection.openSession();
Receiver receiver = session.openReceiver("test-queue");
peer.waitForScriptToComplete(5, TimeUnit.SECONDS);
peer.expectDetach().respond();
peer.expectClose().respond();
Delivery delivery = receiver.receive(5, TimeUnit.SECONDS);
assertNotNull(delivery);
AdvancedMessage<?> message = delivery.message().toAdvancedMessage();
assertNotNull(message);
assertEquals(3, message.bodySections().size());
List<Section<?>> section = new ArrayList<>(message.bodySections());
assertEquals(section1, section.get(0));
assertEquals(section2, section.get(1));
assertEquals(section3, section.get(2));
receiver.closeAsync();
connection.closeAsync().get();
peer.waitForScriptToComplete(5, TimeUnit.SECONDS);
}
}
@Test
public void testSessionWindowExpandedAsIncomingFramesArrive() throws Exception {
final byte[] payload1 = new byte[255];
final byte[] payload2 = new byte[255];
try (ProtonTestServer peer = new ProtonTestServer()) {
peer.expectSASLAnonymousConnect();
peer.expectOpen().withMaxFrameSize(1024).respond();
peer.expectBegin().withIncomingWindow(1).respond();
peer.expectAttach().withRole(Role.RECEIVER.getValue()).respond();
peer.expectFlow().withIncomingWindow(1).withLinkCredit(10);
peer.remoteTransfer().withHandle(0)
.withDeliveryId(0)
.withDeliveryTag(new byte[] { 1 })
.withMore(true)
.withSettled(true)
.withMessageFormat(0)
.withPayload(payload1).queue();
peer.expectFlow().withIncomingWindow(1).withLinkCredit(10);
peer.remoteTransfer().withHandle(0)
.withDeliveryId(0)
.withDeliveryTag(new byte[] { 1 })
.withMore(false)
.withSettled(true)
.withMessageFormat(0)
.withPayload(payload2).queue();
peer.expectFlow().withIncomingWindow(1).withLinkCredit(9);
peer.start();
URI remoteURI = peer.getServerURI();
LOG.info("Test started, peer listening on: {}", remoteURI);
Client container = Client.create();
ConnectionOptions options = new ConnectionOptions().maxFrameSize(1024);
Connection connection = container.connect(remoteURI.getHost(), remoteURI.getPort(), options);
SessionOptions sessionOpts = new SessionOptions().incomingCapacity(1024);
Session session = connection.openSession(sessionOpts);
Receiver receiver = session.openReceiver("test-queue");
peer.waitForScriptToComplete(5, TimeUnit.SECONDS);
peer.expectDetach().respond();
peer.expectClose().respond();
Delivery delivery = receiver.receive(5, TimeUnit.SECONDS);
assertNotNull(delivery);
InputStream stream = delivery.rawInputStream();
assertNotNull(stream);
assertEquals(payload1.length + payload2.length, stream.available());
byte[] bytesRead = new byte[payload1.length + payload2.length];
assertEquals(payload1.length + payload2.length, stream.read(bytesRead));
receiver.closeAsync();
connection.closeAsync().get();
peer.waitForScriptToComplete(5, TimeUnit.SECONDS);
}
}
@Test
public void testCannotReadFromStreamDeliveredBeforeConnectionDrop() throws Exception {
final byte[] payload = createEncodedMessage(new AmqpValue<>("Hello World"));
try (ProtonTestServer peer = new ProtonTestServer()) {
peer.expectSASLAnonymousConnect();
peer.expectOpen().respond();
peer.expectBegin().respond();
peer.expectAttach().ofReceiver().respond();
peer.expectFlow();
peer.remoteTransfer().withHandle(0)
.withDeliveryId(0)
.withDeliveryTag(new byte[] { 1 })
.withMore(false)
.withMessageFormat(0)
.withPayload(payload).queue();
peer.dropAfterLastHandler();
peer.start();
URI remoteURI = peer.getServerURI();
LOG.info("Test started, peer listening on: {}", remoteURI);
Client container = Client.create();
Connection connection = container.connect(remoteURI.getHost(), remoteURI.getPort());
final Receiver receiver = connection.openReceiver("test-queue");
final Delivery delivery = receiver.receive();
peer.waitForScriptToComplete();
assertNotNull(delivery);
// Data already read so it will be already available for read.
assertNotEquals(-1, delivery.rawInputStream().read());
connection.close();
peer.waitForScriptToComplete(5, TimeUnit.SECONDS);
}
}
@Test
public void testCreateReceiverWithDefaultSourceAndTargetOptions() throws Exception {
try (ProtonTestServer peer = new ProtonTestServer()) {
peer.expectSASLAnonymousConnect();
peer.expectOpen().respond();
peer.expectBegin().respond();
peer.expectAttach().ofReceiver()
.withSource().withAddress("test-queue")
.withDistributionMode(nullValue())
.withDefaultTimeout()
.withDurable(TerminusDurability.NONE)
.withExpiryPolicy(TerminusExpiryPolicy.LINK_DETACH)
.withDefaultOutcome(new Modified().setDeliveryFailed(true))
.withCapabilities(nullValue())
.withFilter(nullValue())
.withOutcomes("amqp:accepted:list", "amqp:rejected:list", "amqp:released:list", "amqp:modified:list")
.also()
.withTarget().withAddress(notNullValue())
.withCapabilities(nullValue())
.withDurable(nullValue())
.withExpiryPolicy(nullValue())
.withDefaultTimeout()
.withDynamic(anyOf(nullValue(), equalTo(false)))
.withDynamicNodeProperties(nullValue())
.and().respond();
peer.expectFlow().withLinkCredit(10);
peer.expectDetach().respond();
peer.expectEnd().respond();
peer.expectClose().respond();
peer.start();
URI remoteURI = peer.getServerURI();
LOG.info("Test started, peer listening on: {}", remoteURI);
Client container = Client.create();
Connection connection = container.connect(remoteURI.getHost(), remoteURI.getPort());
Session session = connection.openSession();
Receiver receiver = session.openReceiver("test-queue").openFuture().get();
receiver.close();
session.close();
connection.close();
peer.waitForScriptToComplete(5, TimeUnit.SECONDS);
}
}
@Test
public void testCreateReceiverWithUserConfiguredSourceAndTargetOptions() throws Exception {
final Map<String, Object> filtersToObject = new HashMap<>();
filtersToObject.put("x-opt-filter", "a = b");
final Map<String, String> filters = new HashMap<>();
filters.put("x-opt-filter", "a = b");
try (ProtonTestServer peer = new ProtonTestServer()) {
peer.expectSASLAnonymousConnect();
peer.expectOpen().respond();
peer.expectBegin().respond();
peer.expectAttach().ofReceiver()
.withSource().withAddress("test-queue")
.withDistributionMode("copy")
.withTimeout(128)
.withDurable(TerminusDurability.UNSETTLED_STATE)
.withExpiryPolicy(TerminusExpiryPolicy.CONNECTION_CLOSE)
.withDefaultOutcome(new Released())
.withCapabilities("QUEUE")
.withFilter(filtersToObject)
.withOutcomes("amqp:accepted:list", "amqp:rejected:list")
.also()
.withTarget().withAddress(notNullValue())
.withCapabilities("QUEUE")
.withDurable(TerminusDurability.CONFIGURATION)
.withExpiryPolicy(TerminusExpiryPolicy.SESSION_END)
.withTimeout(42)
.withDynamic(anyOf(nullValue(), equalTo(false)))
.withDynamicNodeProperties(nullValue())
.and().respond();
peer.expectFlow().withLinkCredit(10);
peer.expectDetach().respond();
peer.expectEnd().respond();
peer.expectClose().respond();
peer.start();
URI remoteURI = peer.getServerURI();
LOG.info("Test started, peer listening on: {}", remoteURI);
Client container = Client.create();
Connection connection = container.connect(remoteURI.getHost(), remoteURI.getPort());
Session session = connection.openSession();
ReceiverOptions receiverOptions = new ReceiverOptions();
receiverOptions.sourceOptions().capabilities("QUEUE");
receiverOptions.sourceOptions().distributionMode(DistributionMode.COPY);
receiverOptions.sourceOptions().timeout(128);
receiverOptions.sourceOptions().durabilityMode(DurabilityMode.UNSETTLED_STATE);
receiverOptions.sourceOptions().expiryPolicy(ExpiryPolicy.CONNECTION_CLOSE);
receiverOptions.sourceOptions().defaultOutcome(DeliveryState.released());
receiverOptions.sourceOptions().filters(filters);
receiverOptions.sourceOptions().outcomes(DeliveryState.Type.ACCEPTED, DeliveryState.Type.REJECTED);
receiverOptions.targetOptions().capabilities("QUEUE");
receiverOptions.targetOptions().durabilityMode(DurabilityMode.CONFIGURATION);
receiverOptions.targetOptions().expiryPolicy(ExpiryPolicy.SESSION_CLOSE);
receiverOptions.targetOptions().timeout(42);
Receiver receiver = session.openReceiver("test-queue", receiverOptions).openFuture().get();
receiver.close();
session.close();
connection.close();
peer.waitForScriptToComplete(5, TimeUnit.SECONDS);
}
}
@Test
public void testOpenDurableReceiver() throws Exception {
final String address = "test-topic";
final String subscriptionName = "mySubscriptionName";
try (ProtonTestServer peer = new ProtonTestServer()) {
peer.expectSASLAnonymousConnect();
peer.expectOpen().respond();
peer.expectBegin().respond();
peer.expectAttach().ofReceiver()
.withName(subscriptionName)
.withSource()
.withAddress(address)
.withDurable(TerminusDurability.UNSETTLED_STATE)
.withExpiryPolicy(TerminusExpiryPolicy.NEVER)
.withDistributionMode("copy")
.and().respond();
peer.expectFlow();
peer.expectDetach().respond();
peer.expectClose().respond();
peer.start();
URI remoteURI = peer.getServerURI();
LOG.info("Test started, peer listening on: {}", remoteURI);
Client container = Client.create();
Connection connection = container.connect(remoteURI.getHost(), remoteURI.getPort());
Session session = connection.openSession();
Receiver receiver = session.openDurableReceiver(address, subscriptionName);
receiver.openFuture().get();
receiver.closeAsync().get();
connection.closeAsync().get();
peer.waitForScriptToComplete(5, TimeUnit.SECONDS);
}
}
@Test
public void testReceiverCreditReplenishedAfterSyncReceiveAutoAccept() throws Exception {
doTestReceiverCreditReplenishedAfterSyncReceive(true);
}
@Test
public void testReceiverCreditReplenishedAfterSyncReceiveManualAccept() throws Exception {
doTestReceiverCreditReplenishedAfterSyncReceive(false);
}
public void doTestReceiverCreditReplenishedAfterSyncReceive(boolean autoAccept) throws Exception {
byte[] payload = createEncodedMessage(new AmqpValue<String>("Hello World"));
try (ProtonTestServer peer = new ProtonTestServer()) {
peer.expectSASLAnonymousConnect();
peer.expectOpen().respond();
peer.expectBegin().respond();
peer.expectAttach().ofReceiver().respond();
peer.expectFlow().withLinkCredit(10);
for (int i = 0; i < 10; ++i) {
peer.remoteTransfer().withDeliveryId(i)
.withMore(false)
.withMessageFormat(0)
.withPayload(payload).queue();
}
peer.start();
URI remoteURI = peer.getServerURI();
LOG.info("Test started, peer listening on: {}", remoteURI);
Client container = Client.create();
Connection connection = container.connect(remoteURI.getHost(), remoteURI.getPort());
ReceiverOptions options = new ReceiverOptions();
options.autoAccept(autoAccept);
options.creditWindow(10);
Receiver receiver = connection.openReceiver("test-receiver", options);
Wait.waitFor(() -> receiver.queuedDeliveries() == 10);
peer.waitForScriptToComplete();
if (autoAccept)
{
peer.expectDisposition().withFirst(0);
peer.expectDisposition().withFirst(1);
}
// Consume messages 1 and 2 which should not provoke credit replenishment
// as there are still 8 outstanding which is above the 70% mark
assertNotNull(receiver.receive()); // #1
assertNotNull(receiver.receive()); // #2
peer.waitForScriptToComplete();
peer.expectAttach().ofSender().respond();
peer.expectDetach().respond();
if (autoAccept)
{
peer.expectDisposition().withFirst(2);
}
peer.expectFlow().withLinkCredit(3);
// Ensure that no additional frames from last receive overlap with this one
connection.openSender("test").openFuture().get().close();
// Now consume message 3 which will trip the replenish barrier and the
// credit should be updated to reflect that we still have 7 queued
assertNotNull(receiver.receive()); // #3
peer.waitForScriptToComplete();
if (autoAccept)
{
peer.expectDisposition().withFirst(3);
peer.expectDisposition().withFirst(4);
}
// Consume messages 4 and 5 which should not provoke credit replenishment
// as there are still 5 outstanding plus the credit we sent last time
// which is above the 70% mark
assertNotNull(receiver.receive()); // #4
assertNotNull(receiver.receive()); // #5
peer.waitForScriptToComplete();
peer.expectAttach().ofSender().respond();
peer.expectDetach().respond();
if (autoAccept)
{
peer.expectDisposition().withFirst(5);
}
peer.expectFlow().withLinkCredit(6);
// Ensure that no additional frames from last receive overlap with this one
connection.openSender("test").openFuture().get().close();
// Consume number 6 which means we only have 4 outstanding plus the three
// that we sent last time we flowed which is 70% of possible prefetch so
// we should flow to top off credit which would be 6 since we have four
// still pending
assertNotNull(receiver.receive()); // #6
peer.waitForScriptToComplete();
if (autoAccept)
{
peer.expectDisposition().withFirst(6);
peer.expectDisposition().withFirst(7);
}
// Consume deliveries 7 and 8 which should not flow as we should be
// above the threshold of 70% since we would now have 2 outstanding
// and 6 credits on the link
assertNotNull(receiver.receive()); // #7
assertNotNull(receiver.receive()); // #8
peer.waitForScriptToComplete();
if (autoAccept)
{
peer.expectDisposition().withFirst(8);
peer.expectDisposition().withFirst(9);
}
// Now consume 9 and 10 but we still shouldn't flow more credit because
// the link credit is above the 50% mark for overall credit windowing.
assertNotNull(receiver.receive()); // #9
assertNotNull(receiver.receive()); // #10
peer.waitForScriptToComplete();
peer.expectClose().respond();
connection.close();
peer.waitForScriptToComplete();
}
}
}