blob: 40023e383a2221770c2748a377d8b8c8ba2b6ecf [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.qpid.protonj2.client.impl;
import static org.hamcrest.CoreMatchers.notNullValue;
import static org.hamcrest.Matchers.anyOf;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.nullValue;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertNotNull;
import static org.junit.jupiter.api.Assertions.assertNull;
import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.junit.jupiter.api.Assertions.fail;
import java.net.URI;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ForkJoinPool;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.qpid.protonj2.client.Client;
import org.apache.qpid.protonj2.client.Connection;
import org.apache.qpid.protonj2.client.ConnectionOptions;
import org.apache.qpid.protonj2.client.DeliveryMode;
import org.apache.qpid.protonj2.client.DeliveryState;
import org.apache.qpid.protonj2.client.DistributionMode;
import org.apache.qpid.protonj2.client.DurabilityMode;
import org.apache.qpid.protonj2.client.ErrorCondition;
import org.apache.qpid.protonj2.client.ExpiryPolicy;
import org.apache.qpid.protonj2.client.Message;
import org.apache.qpid.protonj2.client.Receiver;
import org.apache.qpid.protonj2.client.ReceiverOptions;
import org.apache.qpid.protonj2.client.Sender;
import org.apache.qpid.protonj2.client.SenderOptions;
import org.apache.qpid.protonj2.client.Session;
import org.apache.qpid.protonj2.client.Source;
import org.apache.qpid.protonj2.client.Target;
import org.apache.qpid.protonj2.client.Tracker;
import org.apache.qpid.protonj2.client.exceptions.ClientConnectionRemotelyClosedException;
import org.apache.qpid.protonj2.client.exceptions.ClientDeliveryStateException;
import org.apache.qpid.protonj2.client.exceptions.ClientException;
import org.apache.qpid.protonj2.client.exceptions.ClientLinkRedirectedException;
import org.apache.qpid.protonj2.client.exceptions.ClientLinkRemotelyClosedException;
import org.apache.qpid.protonj2.client.exceptions.ClientOperationTimedOutException;
import org.apache.qpid.protonj2.client.exceptions.ClientResourceRemotelyClosedException;
import org.apache.qpid.protonj2.client.exceptions.ClientSendTimedOutException;
import org.apache.qpid.protonj2.client.exceptions.ClientUnsupportedOperationException;
import org.apache.qpid.protonj2.client.test.ImperativeClientTestCase;
import org.apache.qpid.protonj2.engine.DeliveryTagGenerator;
import org.apache.qpid.protonj2.test.driver.ProtonTestServer;
import org.apache.qpid.protonj2.test.driver.codec.messaging.Released;
import org.apache.qpid.protonj2.test.driver.codec.messaging.TerminusDurability;
import org.apache.qpid.protonj2.test.driver.codec.messaging.TerminusExpiryPolicy;
import org.apache.qpid.protonj2.test.driver.matchers.messaging.DeliveryAnnotationsMatcher;
import org.apache.qpid.protonj2.test.driver.matchers.transport.TransferPayloadCompositeMatcher;
import org.apache.qpid.protonj2.test.driver.matchers.types.EncodedAmqpValueMatcher;
import org.apache.qpid.protonj2.types.DeliveryTag;
import org.apache.qpid.protonj2.types.transport.AmqpError;
import org.apache.qpid.protonj2.types.transport.LinkError;
import org.apache.qpid.protonj2.types.transport.ReceiverSettleMode;
import org.apache.qpid.protonj2.types.transport.Role;
import org.apache.qpid.protonj2.types.transport.SenderSettleMode;
import org.hamcrest.Matchers;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.Timeout;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@Timeout(20)
public class SenderTest extends ImperativeClientTestCase {
private static final Logger LOG = LoggerFactory.getLogger(SenderTest.class);
@Test
public void testCreateSenderAndClose() throws Exception {
doTestCreateSenderAndCloseOrDetach(true);
}
@Test
public void testCreateSenderAndDetach() throws Exception {
doTestCreateSenderAndCloseOrDetach(false);
}
private void doTestCreateSenderAndCloseOrDetach(boolean close) throws Exception {
try (ProtonTestServer peer = new ProtonTestServer()) {
peer.expectSASLAnonymousConnect();
peer.expectOpen().respond();
peer.expectBegin().respond();
peer.expectAttach().ofSender().respond();
peer.expectDetach().withClosed(close).respond();
peer.expectClose().respond();
peer.start();
URI remoteURI = peer.getServerURI();
LOG.info("Sender test started, peer listening on: {}", remoteURI);
Client container = Client.create();
Connection connection = container.connect(remoteURI.getHost(), remoteURI.getPort());
connection.openFuture().get(10, TimeUnit.SECONDS);
Session session = connection.openSession();
session.openFuture().get(10, TimeUnit.SECONDS);
Sender sender = session.openSender("test-queue");
sender.openFuture().get(10, TimeUnit.SECONDS);
if (close) {
sender.closeAsync().get(10, TimeUnit.SECONDS);
} else {
sender.detachAsync().get(10, TimeUnit.SECONDS);
}
connection.closeAsync().get(10, TimeUnit.SECONDS);
peer.waitForScriptToComplete(5, TimeUnit.SECONDS);
}
}
@Test
public void testCreateSenderAndCloseSync() throws Exception {
doTestCreateSenderAndCloseOrDetachSync(true);
}
@Test
public void testCreateSenderAndDetachSync() throws Exception {
doTestCreateSenderAndCloseOrDetachSync(false);
}
private void doTestCreateSenderAndCloseOrDetachSync(boolean close) throws Exception {
try (ProtonTestServer peer = new ProtonTestServer()) {
peer.expectSASLAnonymousConnect();
peer.expectOpen().respond();
peer.expectBegin().respond();
peer.expectAttach().ofSender().respond();
peer.expectDetach().withClosed(close).respond();
peer.expectClose().respond();
peer.start();
URI remoteURI = peer.getServerURI();
LOG.info("Sender test started, peer listening on: {}", remoteURI);
Client container = Client.create();
Connection connection = container.connect(remoteURI.getHost(), remoteURI.getPort());
connection.openFuture().get(10, TimeUnit.SECONDS);
Session session = connection.openSession();
session.openFuture().get(10, TimeUnit.SECONDS);
Sender sender = session.openSender("test-queue");
sender.openFuture().get(10, TimeUnit.SECONDS);
if (close) {
sender.close();
} else {
sender.detach();
}
connection.closeAsync().get(10, TimeUnit.SECONDS);
peer.waitForScriptToComplete(5, TimeUnit.SECONDS);
}
}
@Test
public void testCreateSenderAndCloseWithErrorSync() throws Exception {
doTestCreateSenderAndCloseOrDetachWithErrorSync(true);
}
@Test
public void testCreateSenderAndDetachWithErrorSync() throws Exception {
doTestCreateSenderAndCloseOrDetachWithErrorSync(false);
}
private void doTestCreateSenderAndCloseOrDetachWithErrorSync(boolean close) throws Exception {
try (ProtonTestServer peer = new ProtonTestServer()) {
peer.expectSASLAnonymousConnect();
peer.expectOpen().respond();
peer.expectBegin().respond();
peer.expectAttach().ofSender().respond();
peer.expectDetach().withError("amqp-resource-deleted", "an error message").withClosed(close).respond();
peer.expectClose().respond();
peer.start();
URI remoteURI = peer.getServerURI();
LOG.info("Sender test started, peer listening on: {}", remoteURI);
Client container = Client.create();
Connection connection = container.connect(remoteURI.getHost(), remoteURI.getPort());
connection.openFuture().get(10, TimeUnit.SECONDS);
Session session = connection.openSession();
session.openFuture().get(10, TimeUnit.SECONDS);
Sender sender = session.openSender("test-queue");
sender.openFuture().get(10, TimeUnit.SECONDS);
if (close) {
sender.close(ErrorCondition.create("amqp-resource-deleted", "an error message", null));
} else {
sender.detach(ErrorCondition.create("amqp-resource-deleted", "an error message", null));
}
connection.closeAsync().get(10, TimeUnit.SECONDS);
peer.waitForScriptToComplete(5, TimeUnit.SECONDS);
}
}
@Test
public void testSenderOpenRejectedByRemote() throws Exception {
try (ProtonTestServer peer = new ProtonTestServer()) {
peer.expectSASLAnonymousConnect();
peer.expectOpen().respond();
peer.expectBegin().respond();
peer.expectAttach().respond().withNullTarget();
peer.remoteDetach().withErrorCondition(AmqpError.UNAUTHORIZED_ACCESS.toString(), "Cannot read from this address").queue();
peer.expectDetach();
peer.start();
URI remoteURI = peer.getServerURI();
LOG.info("Sender test started, peer listening on: {}", remoteURI);
Client container = Client.create();
Connection connection = container.connect(remoteURI.getHost(), remoteURI.getPort());
connection.openFuture().get(10, TimeUnit.SECONDS);
Session session = connection.openSession();
session.openFuture().get(10, TimeUnit.SECONDS);
Sender sender = session.openSender("test-queue");
try {
sender.openFuture().get(10, TimeUnit.SECONDS);
fail("Open of sender should fail due to remote indicating pending close.");
} catch (ExecutionException exe) {
assertNotNull(exe.getCause());
assertTrue(exe.getCause() instanceof ClientLinkRemotelyClosedException);
ClientLinkRemotelyClosedException linkClosed = (ClientLinkRemotelyClosedException) exe.getCause();
assertNotNull(linkClosed.getErrorCondition());
assertEquals(AmqpError.UNAUTHORIZED_ACCESS.toString(), linkClosed.getErrorCondition().condition());
}
peer.waitForScriptToComplete(5, TimeUnit.SECONDS);
// Should not result in any close being sent now, already closed.
sender.closeAsync().get(10, TimeUnit.SECONDS);
peer.expectClose().respond();
connection.closeAsync().get(10, TimeUnit.SECONDS);
peer.waitForScriptToComplete(1, TimeUnit.SECONDS);
}
}
@Test
public void testRemotelyCloseSenderLinkWithRedirect() throws Exception {
final String redirectVhost = "vhost";
final String redirectNetworkHost = "localhost";
final String redirectAddress = "redirect-queue";
final int redirectPort = 5677;
final String redirectScheme = "wss";
final String redirectPath = "/websockets";
// Tell the test peer to close the connection when executing its last handler
final Map<String, Object> errorInfo = new HashMap<>();
errorInfo.put(ClientConstants.OPEN_HOSTNAME.toString(), redirectVhost);
errorInfo.put(ClientConstants.NETWORK_HOST.toString(), redirectNetworkHost);
errorInfo.put(ClientConstants.PORT.toString(), redirectPort);
errorInfo.put(ClientConstants.SCHEME.toString(), redirectScheme);
errorInfo.put(ClientConstants.PATH.toString(), redirectPath);
errorInfo.put(ClientConstants.ADDRESS.toString(), redirectAddress);
try (ProtonTestServer peer = new ProtonTestServer()) {
peer.expectSASLAnonymousConnect();
peer.expectOpen().respond();
peer.expectBegin().respond();
peer.expectAttach().ofSender().respond().withNullTarget();
peer.remoteDetach().withClosed(true)
.withErrorCondition(LinkError.REDIRECT.toString(), "Not accepting links here", errorInfo).queue();
peer.expectDetach();
peer.expectClose().respond();
peer.start();
URI remoteURI = peer.getServerURI();
LOG.info("Test started, peer listening on: {}", remoteURI);
Client container = Client.create();
Connection connection = container.connect(remoteURI.getHost(), remoteURI.getPort());
Session session = connection.openSession();
Sender sender = session.openSender("test-queue");
try {
sender.openFuture().get();
fail("Should not be able to create sender since the remote is redirecting.");
} catch (Exception ex) {
LOG.debug("Received expected exception from sender open: {}", ex.getMessage());
Throwable cause = ex.getCause();
assertTrue(cause instanceof ClientLinkRedirectedException);
ClientLinkRedirectedException linkRedirect = (ClientLinkRedirectedException) ex.getCause();
assertEquals(redirectVhost, linkRedirect.getHostname());
assertEquals(redirectNetworkHost, linkRedirect.getNetworkHost());
assertEquals(redirectPort, linkRedirect.getPort());
assertEquals(redirectScheme, linkRedirect.getScheme());
assertEquals(redirectPath, linkRedirect.getPath());
assertEquals(redirectAddress, linkRedirect.getAddress());
URI redirect = linkRedirect.getRedirectionURI();
assertEquals(redirectNetworkHost, redirect.getHost());
assertEquals(redirectPort, redirect.getPort());
assertEquals(redirectScheme, redirect.getScheme());
assertEquals(redirectPath, redirect.getPath());
}
connection.close();
peer.waitForScriptToComplete(5, TimeUnit.SECONDS);
}
}
@Test
public void testOpenSenderTimesOutWhenNoAttachResponseReceivedTimeout() throws Exception {
doTestOpenSenderTimesOutWhenNoAttachResponseReceived(true);
}
@Test
public void testOpenSenderTimesOutWhenNoAttachResponseReceivedNoTimeout() throws Exception {
doTestOpenSenderTimesOutWhenNoAttachResponseReceived(false);
}
private void doTestOpenSenderTimesOutWhenNoAttachResponseReceived(boolean timeout) throws Exception {
try (ProtonTestServer peer = new ProtonTestServer()) {
peer.expectSASLAnonymousConnect();
peer.expectOpen().respond();
peer.expectBegin().respond();
peer.expectAttach().ofSender();
peer.expectDetach();
peer.expectClose().respond();
peer.start();
URI remoteURI = peer.getServerURI();
LOG.info("Sender test started, peer listening on: {}", remoteURI);
Client container = Client.create();
Connection connection = container.connect(remoteURI.getHost(), remoteURI.getPort());
Session session = connection.openSession().openFuture().get();
Sender sender = session.openSender("test-queue", new SenderOptions().openTimeout(10));
try {
if (timeout) {
sender.openFuture().get(20, TimeUnit.SECONDS);
} else {
sender.openFuture().get();
}
fail("Should not complete the open future without an error");
} catch (ExecutionException exe) {
Throwable cause = exe.getCause();
assertTrue(cause instanceof ClientOperationTimedOutException);
}
LOG.info("Closing connection after waiting for sender open");
connection.closeAsync().get(10, TimeUnit.SECONDS);
peer.waitForScriptToComplete(5, TimeUnit.SECONDS);
}
}
@Test
public void testOpenSenderWaitWithTimeoutFailsWhenConnectionDrops() throws Exception {
doTestOpenSenderWaitFailsWhenConnectionDrops(true);
}
@Test
public void testOpenSenderWaitWithNoTimeoutFailsWhenConnectionDrops() throws Exception {
doTestOpenSenderWaitFailsWhenConnectionDrops(false);
}
private void doTestOpenSenderWaitFailsWhenConnectionDrops(boolean timeout) throws Exception {
try (ProtonTestServer peer = new ProtonTestServer()) {
peer.expectSASLAnonymousConnect();
peer.expectOpen().respond();
peer.expectBegin().respond();
peer.expectAttach().ofSender();
peer.dropAfterLastHandler(10);
peer.start();
URI remoteURI = peer.getServerURI();
LOG.info("Sender test started, peer listening on: {}", remoteURI);
Client container = Client.create();
Connection connection = container.connect(remoteURI.getHost(), remoteURI.getPort());
Session session = connection.openSession();
Sender sender = session.openSender("test-queue");
Thread.sleep(10);
try {
if (timeout) {
sender.openFuture().get(10, TimeUnit.SECONDS);
} else {
sender.openFuture().get();
}
fail("Should not complete the open future without an error");
} catch (ExecutionException exe) {
Throwable cause = exe.getCause();
LOG.trace("Caught exception caused by: {}", exe);
assertTrue(cause instanceof ClientConnectionRemotelyClosedException);
}
connection.closeAsync().get(10, TimeUnit.SECONDS);
peer.waitForScriptToComplete(5, TimeUnit.SECONDS);
}
}
@Test
public void testCloseSenderTimesOutWhenNoCloseResponseReceivedTimeout() throws Exception {
doTestCloseOrDetachSenderTimesOutWhenNoCloseResponseReceived(true, true);
}
@Test
public void testCloseSenderTimesOutWhenNoCloseResponseReceivedNoTimeout() throws Exception {
doTestCloseOrDetachSenderTimesOutWhenNoCloseResponseReceived(true, false);
}
@Test
public void testDetachSenderTimesOutWhenNoCloseResponseReceivedTimeout() throws Exception {
doTestCloseOrDetachSenderTimesOutWhenNoCloseResponseReceived(false, true);
}
@Test
public void testDetachSenderTimesOutWhenNoCloseResponseReceivedNoTimeout() throws Exception {
doTestCloseOrDetachSenderTimesOutWhenNoCloseResponseReceived(false, false);
}
private void doTestCloseOrDetachSenderTimesOutWhenNoCloseResponseReceived(boolean close, boolean timeout) throws Exception {
try (ProtonTestServer peer = new ProtonTestServer()) {
peer.expectSASLAnonymousConnect();
peer.expectOpen().respond();
peer.expectBegin().respond();
peer.expectAttach().ofSender().respond();
peer.expectDetach();
peer.expectClose().respond();
peer.start();
URI remoteURI = peer.getServerURI();
LOG.info("Sender test started, peer listening on: {}", remoteURI);
Client container = Client.create();
ConnectionOptions options = new ConnectionOptions();
options.closeTimeout(10);
Connection connection = container.connect(remoteURI.getHost(), remoteURI.getPort(), options);
Session session = connection.openSession();
Sender sender = session.openSender("test-queue");
sender.openFuture().get(10, TimeUnit.SECONDS);
try {
if (close) {
if (timeout) {
sender.closeAsync().get(10, TimeUnit.SECONDS);
} else {
sender.closeAsync().get();
}
} else {
if (timeout) {
sender.detachAsync().get(10, TimeUnit.SECONDS);
} else {
sender.detachAsync().get();
}
}
fail("Should not complete the close or detach future without an error");
} catch (ExecutionException exe) {
Throwable cause = exe.getCause();
assertTrue(cause instanceof ClientOperationTimedOutException);
}
connection.closeAsync().get(10, TimeUnit.SECONDS);
peer.waitForScriptToComplete(5, TimeUnit.SECONDS);
}
}
@Test
public void testSendTimesOutWhenNoCreditIssued() throws Exception {
try (ProtonTestServer peer = new ProtonTestServer()) {
peer.expectSASLAnonymousConnect();
peer.expectOpen().respond();
peer.expectBegin().respond();
peer.expectAttach().ofSender().respond();
peer.expectDetach().respond();
peer.expectClose().respond();
peer.start();
URI remoteURI = peer.getServerURI();
LOG.info("Sender test started, peer listening on: {}", remoteURI);
Client container = Client.create();
ConnectionOptions options = new ConnectionOptions();
options.sendTimeout(1);
Connection connection = container.connect(remoteURI.getHost(), remoteURI.getPort(), options);
Session session = connection.openSession();
Sender sender = session.openSender("test-queue");
sender.openFuture().get(10, TimeUnit.SECONDS);
Message<String> message = Message.create("Hello World");
try {
sender.send(message);
fail("Should throw a send timed out exception");
} catch (ClientSendTimedOutException ex) {
// Expected error, ignore
}
sender.closeAsync().get(10, TimeUnit.SECONDS);
connection.closeAsync().get(10, TimeUnit.SECONDS);
peer.waitForScriptToComplete(5, TimeUnit.SECONDS);
}
}
@Test
public void testSendCompletesWhenCreditEventuallyOffered() throws Exception {
try (ProtonTestServer peer = new ProtonTestServer()) {
peer.expectSASLAnonymousConnect();
peer.expectOpen().respond();
peer.expectBegin().respond();
peer.expectAttach().ofSender().respond();
peer.start();
URI remoteURI = peer.getServerURI();
LOG.info("Sender test started, peer listening on: {}", remoteURI);
Client container = Client.create();
ConnectionOptions options = new ConnectionOptions();
options.sendTimeout(200);
Connection connection = container.connect(remoteURI.getHost(), remoteURI.getPort(), options);
Session session = connection.openSession();
Sender sender = session.openSender("test-queue");
sender.openFuture().get(10, TimeUnit.SECONDS);
peer.waitForScriptToComplete(5, TimeUnit.SECONDS);
// Expect a transfer but only after the flow which is delayed to allow the
// client time to block on credit.
peer.expectTransfer().withNonNullPayload();
peer.remoteFlow().withDeliveryCount(0)
.withLinkCredit(1)
.withIncomingWindow(1024)
.withOutgoingWindow(10)
.withNextIncomingId(0)
.withNextOutgoingId(1).later(30);
peer.expectDetach().respond();
peer.expectClose().respond();
Message<String> message = Message.create("Hello World");
try {
LOG.debug("Attempting send with sender: {}", sender);
sender.send(message);
} catch (ClientSendTimedOutException ex) {
fail("Should not throw a send timed out exception");
}
sender.closeAsync().get(10, TimeUnit.SECONDS);
connection.closeAsync().get(10, TimeUnit.SECONDS);
peer.waitForScriptToComplete(5, TimeUnit.SECONDS);
}
}
@Test
public void testSendWhenCreditIsAvailable() throws Exception {
doTestSendWhenCreditIsAvailable(false, false);
}
@Test
public void testTrySendWhenCreditIsAvailable() throws Exception {
doTestSendWhenCreditIsAvailable(true, false);
}
@Test
public void testSendWhenCreditIsAvailableWithDeliveryAnnotations() throws Exception {
doTestSendWhenCreditIsAvailable(false, true);
}
@Test
public void testTrySendWhenCreditIsAvailableWithDeliveryAnnotations() throws Exception {
doTestSendWhenCreditIsAvailable(true, true);
}
private void doTestSendWhenCreditIsAvailable(boolean trySend, boolean addDeliveryAnnotations) throws Exception {
try (ProtonTestServer peer = new ProtonTestServer()) {
peer.expectSASLAnonymousConnect();
peer.expectOpen().respond();
peer.expectBegin().respond();
peer.expectAttach().ofSender().respond();
peer.remoteFlow().withDeliveryCount(0)
.withLinkCredit(10)
.withIncomingWindow(1024)
.withOutgoingWindow(10)
.withNextIncomingId(0)
.withNextOutgoingId(1).queue();
peer.expectAttach().ofReceiver().respond();
peer.start();
URI remoteURI = peer.getServerURI();
LOG.info("Sender test started, peer listening on: {}", remoteURI);
Client container = Client.create();
Connection connection = container.connect(remoteURI.getHost(), remoteURI.getPort());
Session session = connection.openSession();
Sender sender = session.openSender("test-queue");
sender.openFuture().get(10, TimeUnit.SECONDS);
// This ensures that the flow to sender is processed before we try-send
Receiver receiver = session.openReceiver("test-queue", new ReceiverOptions().creditWindow(0));
receiver.openFuture().get(10, TimeUnit.SECONDS);
Map<String, Object> deliveryAnnotations = new HashMap<>();
deliveryAnnotations.put("da1", 1);
deliveryAnnotations.put("da2", 2);
deliveryAnnotations.put("da3", 3);
DeliveryAnnotationsMatcher daMatcher = new DeliveryAnnotationsMatcher(true);
daMatcher.withEntry("da1", Matchers.equalTo(1));
daMatcher.withEntry("da2", Matchers.equalTo(2));
daMatcher.withEntry("da3", Matchers.equalTo(3));
EncodedAmqpValueMatcher bodyMatcher = new EncodedAmqpValueMatcher("Hello World");
TransferPayloadCompositeMatcher payloadMatcher = new TransferPayloadCompositeMatcher();
if (addDeliveryAnnotations) {
payloadMatcher.setDeliveryAnnotationsMatcher(daMatcher);
}
payloadMatcher.setMessageContentMatcher(bodyMatcher);
peer.waitForScriptToComplete(5, TimeUnit.SECONDS);
peer.expectTransfer().withPayload(payloadMatcher);
peer.expectDetach().respond();
peer.expectClose().respond();
Message<String> message = Message.create("Hello World");
final Tracker tracker;
if (trySend) {
if (addDeliveryAnnotations) {
tracker = sender.trySend(message, deliveryAnnotations);
} else {
tracker = sender.trySend(message);
}
} else {
if (addDeliveryAnnotations) {
tracker = sender.send(message, deliveryAnnotations);
} else {
tracker = sender.send(message);
}
}
assertNotNull(tracker);
sender.closeAsync().get(10, TimeUnit.SECONDS);
connection.closeAsync().get(10, TimeUnit.SECONDS);
peer.waitForScriptToComplete(5, TimeUnit.SECONDS);
}
}
@Test
public void testTrySendWhenNoCreditAvailable() throws Exception {
try (ProtonTestServer peer = new ProtonTestServer()) {
peer.expectSASLAnonymousConnect();
peer.expectOpen().respond();
peer.expectBegin().respond();
peer.expectAttach().ofSender().respond();
peer.expectDetach().respond();
peer.expectClose().respond();
peer.start();
URI remoteURI = peer.getServerURI();
LOG.info("Sender test started, peer listening on: {}", remoteURI);
Client container = Client.create();
ConnectionOptions options = new ConnectionOptions();
options.sendTimeout(1);
Connection connection = container.connect(remoteURI.getHost(), remoteURI.getPort(), options);
Session session = connection.openSession();
Sender sender = session.openSender("test-queue");
sender.openFuture().get(10, TimeUnit.SECONDS);
Message<String> message = Message.create("Hello World");
assertNull(sender.trySend(message));
sender.closeAsync().get(10, TimeUnit.SECONDS);
connection.closeAsync().get(10, TimeUnit.SECONDS);
peer.waitForScriptToComplete(5, TimeUnit.SECONDS);
}
}
@Test
public void testCreateSenderWithQoSOfAtMostOnce() throws Exception {
doTestCreateSenderWithConfiguredQoS(DeliveryMode.AT_MOST_ONCE);
}
@Test
public void testCreateSenderWithQoSOfAtLeastOnce() throws Exception {
doTestCreateSenderWithConfiguredQoS(DeliveryMode.AT_LEAST_ONCE);
}
private void doTestCreateSenderWithConfiguredQoS(DeliveryMode qos) throws Exception {
byte sndMode = qos == DeliveryMode.AT_MOST_ONCE ? SenderSettleMode.SETTLED.byteValue() : SenderSettleMode.UNSETTLED.byteValue();
try (ProtonTestServer peer = new ProtonTestServer()) {
peer.expectSASLAnonymousConnect();
peer.expectOpen().respond();
peer.expectBegin().respond();
peer.expectAttach().ofSender()
.withSndSettleMode(sndMode)
.withRcvSettleMode(ReceiverSettleMode.FIRST.byteValue())
.respond()
.withSndSettleMode(sndMode)
.withRcvSettleMode(ReceiverSettleMode.FIRST.byteValue());
peer.expectDetach().respond();
peer.expectClose().respond();
peer.start();
URI remoteURI = peer.getServerURI();
LOG.info("Sender test started, peer listening on: {}", remoteURI);
Client container = Client.create();
Connection connection = container.connect(remoteURI.getHost(), remoteURI.getPort());
Session session = connection.openSession();
SenderOptions options = new SenderOptions().deliveryMode(qos);
Sender sender = session.openSender("test-qos", options);
sender.openFuture().get();
assertEquals("test-qos", sender.address());
sender.closeAsync();
connection.closeAsync().get();
peer.waitForScriptToComplete(5, TimeUnit.SECONDS);
}
}
@Test
public void testSendAutoSettlesOnceRemoteSettles() throws Exception {
doTestSentMessageGetsAutoSettledAfterRemoteSettles(false);
}
@Test
public void testTrySendAutoSettlesOnceRemoteSettles() throws Exception {
doTestSentMessageGetsAutoSettledAfterRemoteSettles(true);
}
private void doTestSentMessageGetsAutoSettledAfterRemoteSettles(boolean trySend) throws Exception {
try (ProtonTestServer peer = new ProtonTestServer()) {
peer.expectSASLAnonymousConnect();
peer.expectOpen().respond();
peer.expectBegin().respond();
peer.expectAttach().ofSender().respond();
peer.remoteFlow().withDeliveryCount(0)
.withLinkCredit(10)
.withIncomingWindow(1024)
.withOutgoingWindow(10)
.withNextIncomingId(0)
.withNextOutgoingId(1).queue();
peer.expectAttach().ofReceiver().respond();
peer.start();
URI remoteURI = peer.getServerURI();
LOG.info("Sender test started, peer listening on: {}", remoteURI);
Client container = Client.create();
Connection connection = container.connect(remoteURI.getHost(), remoteURI.getPort());
connection.openFuture().get(10, TimeUnit.SECONDS);
Session session = connection.openSession();
Sender sender = session.openSender("test-queue");
sender.openFuture().get(10, TimeUnit.SECONDS);
// This ensures that the flow to sender is processed before we try-send
Receiver receiver = session.openReceiver("test-queue", new ReceiverOptions().creditWindow(0));
receiver.openFuture().get(10, TimeUnit.SECONDS);
peer.waitForScriptToComplete(5, TimeUnit.SECONDS);
peer.expectTransfer().withNonNullPayload()
.respond()
.withSettled(true).withState().accepted();
peer.expectDetach().respond();
peer.expectClose().respond();
Message<String> message = Message.create("Hello World");
final Tracker tracker;
if (trySend) {
tracker = sender.trySend(message);
} else {
tracker = sender.send(message);
}
assertNotNull(tracker);
assertNotNull(tracker.settlementFuture().get(5, TimeUnit.SECONDS));
assertEquals(tracker.remoteState().getType(), DeliveryState.Type.ACCEPTED);
sender.closeAsync();
connection.closeAsync().get(10, TimeUnit.SECONDS);
peer.waitForScriptToComplete(5, TimeUnit.SECONDS);
}
}
@Test
public void testSendDoesNotAutoSettlesOnceRemoteSettlesIfAutoSettleOff() throws Exception {
doTestSentMessageNotAutoSettledAfterRemoteSettles(false);
}
@Test
public void testTrySendDoesNotAutoSettlesOnceRemoteSettlesIfAutoSettleOff() throws Exception {
doTestSentMessageNotAutoSettledAfterRemoteSettles(true);
}
private void doTestSentMessageNotAutoSettledAfterRemoteSettles(boolean trySend) throws Exception {
try (ProtonTestServer peer = new ProtonTestServer()) {
peer.expectSASLAnonymousConnect();
peer.expectOpen().respond();
peer.expectBegin().respond();
peer.expectAttach().ofSender().respond();
peer.remoteFlow().withDeliveryCount(0)
.withLinkCredit(10)
.withIncomingWindow(1024)
.withOutgoingWindow(10)
.withNextIncomingId(0)
.withNextOutgoingId(1).queue();
peer.expectAttach().ofReceiver().respond();
peer.start();
URI remoteURI = peer.getServerURI();
LOG.info("Sender test started, peer listening on: {}", remoteURI);
Client container = Client.create();
Connection connection = container.connect(remoteURI.getHost(), remoteURI.getPort());
Session session = connection.openSession();
Sender sender = session.openSender("test-queue", new SenderOptions().autoSettle(false));
sender.openFuture().get(10, TimeUnit.SECONDS);
// This ensures that the flow to sender is processed before we try-send
Receiver receiver = session.openReceiver("test-queue", new ReceiverOptions().creditWindow(0));
receiver.openFuture().get(10, TimeUnit.SECONDS);
peer.waitForScriptToComplete(5, TimeUnit.SECONDS);
peer.expectTransfer().withNonNullPayload()
.respond()
.withSettled(true).withState().accepted();
peer.expectDetach().respond();
peer.expectClose().respond();
Message<String> message = Message.create("Hello World");
final Tracker tracker;
if (trySend) {
tracker = sender.trySend(message);
} else {
tracker = sender.send(message);
}
assertNotNull(tracker);
assertTrue(tracker.settlementFuture().get(5, TimeUnit.SECONDS).remoteSettled());
assertEquals(tracker.remoteState().getType(), DeliveryState.Type.ACCEPTED);
assertNull(tracker.state());
assertFalse(tracker.settled());
sender.closeAsync();
connection.closeAsync().get();
peer.waitForScriptToComplete(5, TimeUnit.SECONDS);
}
}
@Test
public void testSenderSendingSettledCompletesTrackerAcknowledgeFuture() throws Exception {
doTestSenderSendingSettledCompletesTrackerAcknowledgeFuture(false);
}
@Test
public void testSenderTrySendingSettledCompletesTrackerAcknowledgeFuture() throws Exception {
doTestSenderSendingSettledCompletesTrackerAcknowledgeFuture(true);
}
private void doTestSenderSendingSettledCompletesTrackerAcknowledgeFuture(boolean trySend) throws Exception {
try (ProtonTestServer peer = new ProtonTestServer()) {
peer.expectSASLAnonymousConnect();
peer.expectOpen().respond();
peer.expectBegin().respond();
peer.expectAttach().ofSender()
.withSenderSettleModeSettled()
.withReceiverSettlesFirst()
.respond()
.withSenderSettleModeSettled()
.withReceivervSettlesFirst();
peer.remoteFlow().withLinkCredit(10).queue();
peer.expectAttach().respond(); // Open a receiver to ensure sender link has processed
peer.expectFlow(); // the inbound flow frame we sent previously before send.
peer.start();
URI remoteURI = peer.getServerURI();
LOG.info("Sender test started, peer listening on: {}", remoteURI);
Client container = Client.create();
Connection connection = container.connect(remoteURI.getHost(), remoteURI.getPort()).openFuture().get();
Session session = connection.openSession().openFuture().get();
SenderOptions options = new SenderOptions().deliveryMode(DeliveryMode.AT_MOST_ONCE);
Sender sender = session.openSender("test-qos", options);
assertEquals("test-qos", sender.address());
session.openReceiver("dummy").openFuture().get();
peer.waitForScriptToComplete(5, TimeUnit.SECONDS);
peer.expectTransfer().withNonNullPayload();
peer.expectDetach().respond();
peer.expectClose().respond();
final Message<String> message = Message.create("Hello World");
final Tracker tracker;
if (trySend) {
tracker = sender.trySend(message);
} else {
tracker = sender.send(message);
}
assertNotNull(tracker);
assertTrue(tracker.settlementFuture().isDone());
assertTrue(tracker.settlementFuture().get().settled());
assertFalse(tracker.settlementFuture().get().remoteSettled());
sender.closeAsync().get(10, TimeUnit.SECONDS);
connection.closeAsync().get(10, TimeUnit.SECONDS);
peer.waitForScriptToComplete(5, TimeUnit.SECONDS);
}
}
@Test
public void testSenderIncrementsTransferTagOnEachSend() throws Exception {
try (ProtonTestServer peer = new ProtonTestServer()) {
peer.expectSASLAnonymousConnect();
peer.expectOpen().respond();
peer.expectBegin().respond();
peer.expectAttach().ofSender().respond();
peer.remoteFlow().withLinkCredit(10).queue();
peer.start();
URI remoteURI = peer.getServerURI();
LOG.info("Sender test started, peer listening on: {}", remoteURI);
Client container = Client.create();
Connection connection = container.connect(remoteURI.getHost(), remoteURI.getPort()).openFuture().get();
Session session = connection.openSession().openFuture().get();
SenderOptions options = new SenderOptions().deliveryMode(DeliveryMode.AT_LEAST_ONCE).autoSettle(false);
Sender sender = session.openSender("test-tags", options).openFuture().get();
peer.waitForScriptToComplete(5, TimeUnit.SECONDS);
peer.expectTransfer().withNonNullPayload()
.withDeliveryTag(new byte[] {0}).respond().withSettled(true).withState().accepted();
peer.expectTransfer().withNonNullPayload()
.withDeliveryTag(new byte[] {1}).respond().withSettled(true).withState().accepted();
peer.expectTransfer().withNonNullPayload()
.withDeliveryTag(new byte[] {2}).respond().withSettled(true).withState().accepted();
peer.expectDetach().respond();
peer.expectClose().respond();
final Message<String> message = Message.create("Hello World");
final Tracker tracker1 = sender.send(message);
final Tracker tracker2 = sender.send(message);
final Tracker tracker3 = sender.send(message);
assertNotNull(tracker1);
assertNotNull(tracker1.settlementFuture().get().settled());
assertNotNull(tracker2);
assertNotNull(tracker2.settlementFuture().get().settled());
assertNotNull(tracker3);
assertNotNull(tracker3.settlementFuture().get().settled());
sender.closeAsync().get(10, TimeUnit.SECONDS);
connection.closeAsync().get(10, TimeUnit.SECONDS);
peer.waitForScriptToComplete(5, TimeUnit.SECONDS);
}
}
@Test
public void testSenderSendsSettledInAtLeastOnceMode() throws Exception {
try (ProtonTestServer peer = new ProtonTestServer()) {
peer.expectSASLAnonymousConnect();
peer.expectOpen().respond();
peer.expectBegin().respond();
peer.expectAttach().ofSender().respond();
peer.remoteFlow().withLinkCredit(10).queue();
peer.start();
URI remoteURI = peer.getServerURI();
LOG.info("Sender test started, peer listening on: {}", remoteURI);
Client container = Client.create();
Connection connection = container.connect(remoteURI.getHost(), remoteURI.getPort()).openFuture().get();
Session session = connection.openSession().openFuture().get();
SenderOptions options = new SenderOptions().deliveryMode(DeliveryMode.AT_MOST_ONCE).autoSettle(false);
Sender sender = session.openSender("test-tags", options).openFuture().get();
peer.waitForScriptToComplete(5, TimeUnit.SECONDS);
peer.expectTransfer().withNonNullPayload()
.withDeliveryTag(new byte[] {}).withSettled(true);
peer.expectTransfer().withNonNullPayload()
.withDeliveryTag(new byte[] {}).withSettled(true);
peer.expectTransfer().withNonNullPayload()
.withDeliveryTag(new byte[] {}).withSettled(true);
peer.expectDetach().respond();
peer.expectClose().respond();
final Message<String> message = Message.create("Hello World");
final Tracker tracker1 = sender.send(message);
final Tracker tracker2 = sender.send(message);
final Tracker tracker3 = sender.send(message);
assertNotNull(tracker1);
assertNotNull(tracker1.settlementFuture().get().settled());
assertNotNull(tracker2);
assertNotNull(tracker2.settlementFuture().get().settled());
assertNotNull(tracker3);
assertNotNull(tracker3.settlementFuture().get().settled());
sender.closeAsync().get();
connection.closeAsync().get();
peer.waitForScriptToComplete(5, TimeUnit.SECONDS);
}
}
@Test
public void testCreateAnonymousSenderWhenRemoteDoesNotOfferSupportForIt() throws Exception {
try (ProtonTestServer peer = new ProtonTestServer()) {
peer.expectSASLAnonymousConnect();
peer.expectOpen().respond();
peer.expectBegin().respond();
peer.expectClose().respond();
peer.start();
URI remoteURI = peer.getServerURI();
LOG.info("Test started, peer listening on: {}", remoteURI);
Client container = Client.create();
Connection connection = container.connect(remoteURI.getHost(), remoteURI.getPort());
Session session = connection.openSession().openFuture().get();
try {
session.openAnonymousSender();
fail("Should not be able to open an anonymous sender when remote does not offer anonymous relay");
} catch (ClientUnsupportedOperationException unsupported) {
LOG.info("Caught expected error: ", unsupported);
}
connection.closeAsync();
peer.waitForScriptToComplete(5, TimeUnit.SECONDS);
}
}
@Test
public void testCreateAnonymousSenderBeforeKnowingRemoteDoesNotOfferSupportForIt() throws Exception {
try (ProtonTestServer peer = new ProtonTestServer()) {
peer.expectSASLAnonymousConnect();
peer.expectOpen();
peer.expectBegin();
peer.start();
URI remoteURI = peer.getServerURI();
LOG.info("Test started, peer listening on: {}", remoteURI);
Client container = Client.create();
Connection connection = container.connect(remoteURI.getHost(), remoteURI.getPort());
Session session = connection.openSession();
Sender anonymousSender = session.openAnonymousSender();
Message<String> message = Message.create("Hello World").to("my-queue");
peer.waitForScriptToComplete(5, TimeUnit.SECONDS);
peer.remoteOpen().now();
peer.respondToLastBegin().now();
peer.expectClose().respond();
try {
anonymousSender.send(message);
fail("Should not be able to open an anonymous sender when remote does not offer anonymous relay");
} catch (ClientUnsupportedOperationException unsupported) {
LOG.info("Caught expected error: ", unsupported);
}
connection.closeAsync();
peer.waitForScriptToComplete(5, TimeUnit.SECONDS);
}
}
@Test
public void testCreateAnonymousSenderAppliesOptions() throws Exception {
try (ProtonTestServer peer = new ProtonTestServer()) {
peer.expectSASLAnonymousConnect();
peer.expectOpen().respond().withOfferedCapabilities("ANONYMOUS-RELAY");
peer.expectBegin().respond();
peer.expectAttach().ofSender().withSenderSettleModeSettled()
.withReceiverSettlesFirst()
.withTarget().withAddress(Matchers.nullValue()).and().respond();
peer.expectClose().respond();
peer.start();
URI remoteURI = peer.getServerURI();
LOG.info("Test started, peer listening on: {}", remoteURI);
Client container = Client.create();
Connection connection = container.connect(remoteURI.getHost(), remoteURI.getPort());
Session session = connection.openSession();
SenderOptions senderOptions = new SenderOptions().deliveryMode(DeliveryMode.AT_MOST_ONCE);
Sender anonymousSender = session.openAnonymousSender(senderOptions);
anonymousSender.openFuture().get();
connection.closeAsync();
peer.waitForScriptToComplete(5, TimeUnit.SECONDS);
}
}
@Test
public void testAnonymousSenderOpenHeldUntilConnectionOpenedAndSupportConfirmed() throws Exception {
try (ProtonTestServer peer = new ProtonTestServer()) {
peer.expectSASLAnonymousConnect();
peer.expectOpen();
peer.expectBegin();
peer.start();
URI remoteURI = peer.getServerURI();
LOG.info("Test started, peer listening on: {}", remoteURI);
Client container = Client.create();
Connection connection = container.connect(remoteURI.getHost(), remoteURI.getPort());
Session session = connection.openSession();
Sender sender = session.openAnonymousSender();
peer.waitForScriptToComplete(5, TimeUnit.SECONDS);
// This should happen after we inject the held open and attach
peer.expectAttach().ofSender().withTarget().withAddress(Matchers.nullValue()).and().respond();
peer.expectClose().respond();
// Inject held responses to get the ball rolling again
peer.remoteOpen().withOfferedCapabilities("ANONYMOUS-RELAY").now();
peer.respondToLastBegin().now();
try {
sender.openFuture().get();
} catch (ExecutionException ex) {
fail("Open of Sender failed waiting for response: " + ex.getCause());
}
connection.closeAsync();
peer.waitForScriptToComplete(5, TimeUnit.SECONDS);
}
}
@Test
public void testSenderGetRemotePropertiesWaitsForRemoteAttach() throws Exception {
tryReadSenderRemoteProperties(true);
}
@Test
public void testSenderGetRemotePropertiesFailsAfterOpenTimeout() throws Exception {
tryReadSenderRemoteProperties(false);
}
private void tryReadSenderRemoteProperties(boolean attachResponse) throws Exception {
try (ProtonTestServer peer = new ProtonTestServer()) {
peer.expectSASLAnonymousConnect();
peer.expectOpen().respond();
peer.expectBegin().respond();
peer.expectAttach().ofSender();
peer.start();
URI remoteURI = peer.getServerURI();
LOG.info("Connect test started, peer listening on: {}", remoteURI);
Client container = Client.create();
Connection connection = container.connect(remoteURI.getHost(), remoteURI.getPort());
connection.openFuture().get();
Session session = connection.openSession();
session.openFuture().get();
SenderOptions options = new SenderOptions().openTimeout(75);
Sender sender = session.openSender("test-sender", options);
peer.waitForScriptToComplete(5, TimeUnit.SECONDS);
Map<String, Object> expectedProperties = new HashMap<>();
expectedProperties.put("TEST", "test-property");
if (attachResponse) {
peer.expectDetach().respond();
peer.respondToLastAttach().withPropertiesMap(expectedProperties).later(10);
} else {
peer.expectDetach();
}
if (attachResponse) {
assertNotNull(sender.properties(), "Remote should have responded with a remote properties value");
assertEquals(expectedProperties, sender.properties());
} else {
try {
sender.properties();
fail("Should failed to get remote state due to no attach response");
} catch (ClientException ex) {
LOG.debug("Caught expected exception from blocking call", ex);
}
}
try {
sender.closeAsync().get();
} catch (ExecutionException ex) {
LOG.debug("Caught unexpected exception from close call", ex);
fail("Should not fail to close when connection not closed and detach sent.");
}
LOG.debug("*** Test read remote properties ***");
peer.expectClose().respond();
connection.closeAsync().get();
peer.waitForScriptToComplete(5, TimeUnit.SECONDS);
}
}
@Test
public void testGetRemoteOfferedCapabilitiesWaitsForRemoteAttach() throws Exception {
tryReadRemoteOfferedCapabilities(true);
}
@Test
public void testGetRemoteOfferedCapabilitiesFailsAfterOpenTimeout() throws Exception {
tryReadRemoteOfferedCapabilities(false);
}
private void tryReadRemoteOfferedCapabilities(boolean attachResponse) throws Exception {
try (ProtonTestServer peer = new ProtonTestServer()) {
peer.expectSASLAnonymousConnect();
peer.expectOpen().respond();
peer.expectBegin().respond();
peer.expectAttach().ofSender();
peer.start();
URI remoteURI = peer.getServerURI();
LOG.info("Connect test started, peer listening on: {}", remoteURI);
Client container = Client.create();
ConnectionOptions options = new ConnectionOptions().openTimeout(75);
Connection connection = container.connect(remoteURI.getHost(), remoteURI.getPort(), options);
connection.openFuture().get();
Session session = connection.openSession();
session.openFuture().get();
Sender sender = session.openSender("test-sender");
peer.waitForScriptToComplete(5, TimeUnit.SECONDS);
if (attachResponse) {
peer.expectDetach().respond();
peer.respondToLastAttach().withOfferedCapabilities("QUEUE").later(10);
} else {
peer.expectDetach();
}
if (attachResponse) {
assertNotNull(sender.offeredCapabilities(), "Remote should have responded with a remote offered Capabilities value");
assertEquals(1, sender.offeredCapabilities().length);
assertEquals("QUEUE", sender.offeredCapabilities()[0]);
} else {
try {
sender.offeredCapabilities();
fail("Should failed to get remote state due to no attach response");
} catch (ClientException ex) {
LOG.debug("Caught expected exception from blocking call", ex);
}
}
try {
sender.closeAsync().get();
} catch (ExecutionException ex) {
LOG.debug("Caught unexpected exception from close call", ex);
fail("Should not fail to close when connection not closed and detach sent.");
}
peer.expectClose().respond();
connection.closeAsync().get();
peer.waitForScriptToComplete(5, TimeUnit.SECONDS);
}
}
@Test
public void testGetRemoteDesiredCapabilitiesWaitsForRemoteAttach() throws Exception {
tryReadRemoteDesiredCapabilities(true);
}
@Test
public void testGetRemoteDesiredCapabilitiesFailsAfterOpenTimeout() throws Exception {
tryReadRemoteDesiredCapabilities(false);
}
private void tryReadRemoteDesiredCapabilities(boolean attachResponse) throws Exception {
try (ProtonTestServer peer = new ProtonTestServer()) {
peer.expectSASLAnonymousConnect();
peer.expectOpen().respond();
peer.expectBegin().respond();
peer.expectAttach().ofSender();
peer.start();
URI remoteURI = peer.getServerURI();
LOG.info("Connect test started, peer listening on: {}", remoteURI);
Client container = Client.create();
ConnectionOptions options = new ConnectionOptions().openTimeout(75);
Connection connection = container.connect(remoteURI.getHost(), remoteURI.getPort(), options);
connection.openFuture().get();
Session session = connection.openSession();
session.openFuture().get();
Sender sender = session.openSender("test-sender");
peer.waitForScriptToComplete(5, TimeUnit.SECONDS);
if (attachResponse) {
peer.expectDetach().respond();
peer.respondToLastAttach().withDesiredCapabilities("Error-Free").later(10);
} else {
peer.expectDetach();
}
if (attachResponse) {
assertNotNull(sender.desiredCapabilities(), "Remote should have responded with a remote desired Capabilities value");
assertEquals(1, sender.desiredCapabilities().length);
assertEquals("Error-Free", sender.desiredCapabilities()[0]);
} else {
try {
sender.desiredCapabilities();
fail("Should failed to get remote state due to no attach response");
} catch (ClientException ex) {
LOG.debug("Caught expected exception from blocking call", ex);
}
}
try {
sender.closeAsync().get();
} catch (ExecutionException ex) {
LOG.debug("Caught unexpected exception from close call", ex);
fail("Should not fail to close when connection not closed and detach sent.");
}
peer.expectClose().respond();
connection.closeAsync().get();
peer.waitForScriptToComplete(5, TimeUnit.SECONDS);
}
}
@Test
public void testOpenSenderWithLinCapabilities() throws Exception {
try (ProtonTestServer peer = new ProtonTestServer()) {
peer.expectSASLAnonymousConnect();
peer.expectOpen().respond();
peer.expectBegin().respond();
peer.expectAttach().withRole(Role.SENDER.getValue())
.withTarget().withCapabilities("queue").and()
.respond();
peer.expectDetach().respond();
peer.expectClose().respond();
peer.start();
URI remoteURI = peer.getServerURI();
LOG.info("Receiver test started, peer listening on: {}", remoteURI);
Client container = Client.create();
Connection connection = container.connect(remoteURI.getHost(), remoteURI.getPort());
Session session = connection.openSession().openFuture().get(10, TimeUnit.SECONDS);
SenderOptions senderOptions = new SenderOptions();
senderOptions.targetOptions().capabilities("queue");
Sender sender = session.openSender("test-queue", senderOptions);
sender.openFuture().get();
sender.close();
connection.closeAsync().get(10, TimeUnit.SECONDS);
peer.waitForScriptToComplete(5, TimeUnit.SECONDS);
}
}
@Test
public void testCloseSenderWithErrorCondition() throws Exception {
doTestCloseOrDetachWithErrorCondition(true);
}
@Test
public void testDetachSenderWithErrorCondition() throws Exception {
doTestCloseOrDetachWithErrorCondition(false);
}
public void doTestCloseOrDetachWithErrorCondition(boolean close) throws Exception {
final String condition = "amqp:link:detach-forced";
final String description = "something bad happened.";
try (ProtonTestServer peer = new ProtonTestServer()) {
peer.expectSASLAnonymousConnect();
peer.expectOpen().respond();
peer.expectBegin().respond();
peer.expectAttach().ofSender().respond();
peer.expectDetach().withClosed(close).withError(condition, description).respond();
peer.expectClose().respond();
peer.start();
URI remoteURI = peer.getServerURI();
LOG.info("Test started, peer listening on: {}", remoteURI);
Client container = Client.create();
Connection connection = container.connect(remoteURI.getHost(), remoteURI.getPort());
Session session = connection.openSession();
Sender sender = session.openSender("test-sender");
sender.openFuture().get();
if (close) {
sender.closeAsync(ErrorCondition.create(condition, description, null));
} else {
sender.detachAsync(ErrorCondition.create(condition, description, null));
}
connection.closeAsync().get();
peer.waitForScriptToComplete(5, TimeUnit.SECONDS);
}
}
@Test
public void testSendMultipleMessages() throws Exception {
final int CREDIT = 20;
try (ProtonTestServer peer = new ProtonTestServer()) {
peer.expectSASLAnonymousConnect();
peer.expectOpen().respond();
peer.expectBegin().respond();
peer.expectAttach().ofSender().respond();
peer.remoteFlow().withDeliveryCount(0).withLinkCredit(CREDIT).queue();
peer.expectAttach().ofReceiver().respond();
peer.start();
URI remoteURI = peer.getServerURI();
LOG.info("Sender test started, peer listening on: {}", remoteURI);
Client container = Client.create();
Connection connection = container.connect(remoteURI.getHost(), remoteURI.getPort());
Session session = connection.openSession();
Sender sender = session.openSender("test-queue");
sender.openFuture().get();
// This ensures that the flow to sender is processed before we try-send
Receiver receiver = session.openReceiver("test-queue", new ReceiverOptions().creditWindow(0));
receiver.openFuture().get();
peer.waitForScriptToComplete(5, TimeUnit.SECONDS);
final List<Tracker> sentMessages = new ArrayList<>();
for (int i = 0; i < CREDIT; ++i) {
peer.expectTransfer().withDeliveryId(i)
.withNonNullPayload()
.withSettled(false)
.respond()
.withSettled(true)
.withState().accepted();
}
peer.expectDetach().respond();
peer.expectClose().respond();
Message<String> message = Message.create("Hello World");
for (int i = 0; i < CREDIT; ++i) {
final Tracker tracker = sender.send(message);
sentMessages.add(tracker);
tracker.settlementFuture().get();
}
assertEquals(CREDIT, sentMessages.size());
sender.closeAsync().get();
connection.closeAsync().get();
peer.waitForScriptToComplete(5, TimeUnit.SECONDS);
}
}
@Test
public void testSendBlockedForCreditFailsWhenLinkRemotelyClosed() throws Exception {
try (ProtonTestServer peer = new ProtonTestServer()) {
peer.expectSASLAnonymousConnect();
peer.expectOpen().respond();
peer.expectBegin().respond();
peer.expectAttach().ofSender().respond();
peer.remoteDetach().withErrorCondition(AmqpError.RESOURCE_DELETED.toString(), "Link was deleted").afterDelay(25).queue();
peer.expectDetach();
peer.expectClose().respond();
peer.start();
URI remoteURI = peer.getServerURI();
LOG.info("Sender test started, peer listening on: {}", remoteURI);
Client container = Client.create();
Connection connection = container.connect(remoteURI.getHost(), remoteURI.getPort());
Session session = connection.openSession();
Sender sender = session.openSender("test-queue");
sender.openFuture().get();
Message<String> message = Message.create("Hello World");
try {
sender.send(message);
fail("Send should have timed out.");
} catch (ClientResourceRemotelyClosedException cliEx) {
// Expected send to throw indicating that the remote closed the link
}
connection.closeAsync().get();
peer.waitForScriptToComplete(5, TimeUnit.SECONDS);
}
}
@Test
public void testSendBlockedForCreditFailsWhenSessionRemotelyClosed() throws Exception {
try (ProtonTestServer peer = new ProtonTestServer()) {
peer.expectSASLAnonymousConnect();
peer.expectOpen().respond();
peer.expectBegin().respond();
peer.expectAttach().ofSender().respond();
peer.remoteEnd().withErrorCondition(AmqpError.RESOURCE_DELETED.toString(), "Session was deleted").afterDelay(25).queue();
peer.expectEnd();
peer.expectClose().respond();
peer.start();
URI remoteURI = peer.getServerURI();
LOG.info("Sender test started, peer listening on: {}", remoteURI);
Client container = Client.create();
Connection connection = container.connect(remoteURI.getHost(), remoteURI.getPort());
Session session = connection.openSession();
Sender sender = session.openSender("test-queue");
sender.openFuture().get();
Message<String> message = Message.create("Hello World");
try {
sender.send(message);
fail("Send should have timed out.");
} catch (ClientResourceRemotelyClosedException cliEx) {
// Expected send to throw indicating that the remote closed the session
}
connection.closeAsync().get();
peer.waitForScriptToComplete(5, TimeUnit.SECONDS);
}
}
@Test
public void testSendBlockedForCreditFailsWhenConnectionRemotelyClosed() throws Exception {
try (ProtonTestServer peer = new ProtonTestServer()) {
peer.expectSASLAnonymousConnect();
peer.expectOpen().respond();
peer.expectBegin().respond();
peer.expectAttach().ofSender().respond();
peer.remoteClose().withErrorCondition(AmqpError.RESOURCE_DELETED.toString(), "Connection was deleted").afterDelay(25).queue();
peer.expectClose();
peer.start();
URI remoteURI = peer.getServerURI();
LOG.info("Sender test started, peer listening on: {}", remoteURI);
Client container = Client.create();
Connection connection = container.connect(remoteURI.getHost(), remoteURI.getPort());
Session session = connection.openSession();
Sender sender = session.openSender("test-queue");
sender.openFuture().get();
Message<String> message = Message.create("Hello World");
try {
sender.send(message);
fail("Send should have failed when Connection remotely closed.");
} catch (ClientConnectionRemotelyClosedException cliEx) {
// Expected send to throw indicating that the remote closed the connection
}
connection.closeAsync().get();
peer.waitForScriptToComplete(5, TimeUnit.SECONDS);
}
}
@Test
public void testSendBlockedForCreditFailsWhenConnectionDrops() throws Exception {
try (ProtonTestServer peer = new ProtonTestServer()) {
peer.expectSASLAnonymousConnect();
peer.expectOpen().respond();
peer.expectBegin().respond();
peer.expectAttach().ofSender().respond();
peer.dropAfterLastHandler(25);
peer.start();
URI remoteURI = peer.getServerURI();
LOG.info("Sender test started, peer listening on: {}", remoteURI);
Client container = Client.create();
Connection connection = container.connect(remoteURI.getHost(), remoteURI.getPort());
Session session = connection.openSession();
Sender sender = session.openSender("test-queue");
sender.openFuture().get();
Message<String> message = Message.create("Hello World");
try {
sender.send(message);
fail("Send should have timed out.");
} catch (ClientConnectionRemotelyClosedException cliEx) {
// Expected send to throw indicating that the remote closed unexpectedly
}
connection.closeAsync().get();
peer.waitForScriptToComplete(5, TimeUnit.SECONDS);
}
}
@Test
public void testSendAfterConnectionDropsThrowsConnectionRemotelyClosedError() throws Exception {
try (ProtonTestServer peer = new ProtonTestServer()) {
peer.expectSASLAnonymousConnect();
peer.expectOpen().respond();
peer.expectBegin().respond();
peer.expectAttach().ofSender().withTarget().withAddress("test").and().respond();
peer.dropAfterLastHandler(25);
peer.start();
final CountDownLatch dropped = new CountDownLatch(1);
ConnectionOptions options = new ConnectionOptions();
options.disconnectedHandler((connection, event) -> {
dropped.countDown();
});
URI remoteURI = peer.getServerURI();
Message<String> message = Message.create("test-message");
Client container = Client.create();
Connection connection = container.connect(remoteURI.getHost(), remoteURI.getPort(), options);
Session session = connection.openSession();
Sender sender = session.openSender("test");
assertTrue(dropped.await(10, TimeUnit.SECONDS));
try {
sender.send(message);
fail("Send should fail with remotely closed error after remote drops");
} catch (ClientConnectionRemotelyClosedException cliEx) {
// Expected
}
try {
sender.trySend(message);
fail("trySend should fail with remotely closed error after remote drops");
} catch (ClientConnectionRemotelyClosedException cliEx) {
// Expected
}
connection.closeAsync().get();
peer.waitForScriptToComplete(5, TimeUnit.SECONDS);
}
}
@Test
public void testAwaitSettlementFutureFailedAfterConnectionDropped() throws Exception {
try (ProtonTestServer peer = new ProtonTestServer()) {
peer.expectSASLAnonymousConnect();
peer.expectOpen().respond();
peer.expectBegin().respond();
peer.expectAttach().ofSender().withTarget().withAddress("test").and().respond();
peer.remoteFlow().withLinkCredit(1).queue();
peer.expectTransfer();
peer.dropAfterLastHandler();
peer.start();
URI remoteURI = peer.getServerURI();
Message<String> message = Message.create("test-message");
Client container = Client.create();
Connection connection = container.connect(remoteURI.getHost(), remoteURI.getPort());
Session session = connection.openSession();
Sender sender = session.openSender("test");
Tracker tracker = null;
try {
tracker = sender.send(message);
} catch (ClientConnectionRemotelyClosedException cliEx) {
fail("Send not should fail with remotely closed error after remote drops");
}
// Connection should be dropped at this point and next call should test that after
// the drop the future has been completed
peer.waitForScriptToComplete(5, TimeUnit.SECONDS);
try {
tracker.settlementFuture().get();
fail("Wait for settlement should fail with remotely closed error after remote drops");
} catch (ExecutionException exe) {
assertTrue(exe.getCause() instanceof ClientConnectionRemotelyClosedException);
}
connection.closeAsync().get();
peer.waitForScriptToComplete(5, TimeUnit.SECONDS);
}
}
@Test
public void testAwaitSettlementFailedOnConnectionDropped() throws Exception {
try (ProtonTestServer peer = new ProtonTestServer()) {
peer.expectSASLAnonymousConnect();
peer.expectOpen().respond();
peer.expectBegin().respond();
peer.expectAttach().ofSender().withTarget().withAddress("test").and().respond();
peer.remoteFlow().withLinkCredit(1).queue();
peer.expectTransfer();
peer.dropAfterLastHandler(30);
peer.start();
URI remoteURI = peer.getServerURI();
Message<String> message = Message.create("test-message");
Client container = Client.create();
Connection connection = container.connect(remoteURI.getHost(), remoteURI.getPort());
Session session = connection.openSession();
Sender sender = session.openSender("test");
Tracker tracker = null;
try {
tracker = sender.send(message);
} catch (ClientConnectionRemotelyClosedException cliEx) {
fail("Send should not fail with remotely closed error after remote drops");
}
// Most of the time this should await before connection drops testing that
// the drop completes waiting callers.
try {
tracker.awaitSettlement();
fail("Wait for settlement should fail with remotely closed error after remote drops");
} catch (ClientConnectionRemotelyClosedException cliRCEx) {
}
peer.waitForScriptToComplete(5, TimeUnit.SECONDS);
}
}
@Test
public void testBlockedSendThrowsConnectionRemotelyClosedError() throws Exception {
try (ProtonTestServer peer = new ProtonTestServer()) {
peer.expectSASLAnonymousConnect();
peer.expectOpen().respond();
peer.expectBegin().respond();
peer.expectAttach().ofSender().withTarget().withAddress("test").and().respond();
peer.dropAfterLastHandler(25);
peer.start();
URI remoteURI = peer.getServerURI();
Message<String> message = Message.create("test-message");
Client container = Client.create();
Connection connection = container.connect(remoteURI.getHost(), remoteURI.getPort());
Session session = connection.openSession();
Sender sender = session.openSender("test");
try {
sender.send(message);
fail("Send should fail with remotely closed error after remote drops");
} catch (ClientConnectionRemotelyClosedException cliEx) {
// Expected
}
connection.closeAsync().get();
peer.waitForScriptToComplete(5, TimeUnit.SECONDS);
}
}
@Test
void testAutoFlushDuringWriteThatExceedConfiguredBufferLimitSessionCreditLimitOnTransfer() throws Exception {
try (ProtonTestServer peer = new ProtonTestServer()) {
peer.expectSASLAnonymousConnect();
peer.expectOpen().respond();
peer.expectBegin().withNextOutgoingId(0).respond();
peer.expectAttach().ofSender().respond();
peer.start();
URI remoteURI = peer.getServerURI();
LOG.info("Test started, peer listening on: {}", remoteURI);
Client container = Client.create();
ConnectionOptions options = new ConnectionOptions().maxFrameSize(1024);
Connection connection = container.connect(remoteURI.getHost(), remoteURI.getPort(), options);
Sender sender = connection.openSender("test-queue");
final byte[] payload = new byte[4800];
Arrays.fill(payload, (byte) 1);
final AtomicBoolean sendFailed = new AtomicBoolean();
ForkJoinPool.commonPool().execute(() -> {
try {
sender.send(Message.create(payload));
} catch (Exception e) {
LOG.info("send failed with error: ", e);
sendFailed.set(true);
}
});
peer.waitForScriptToComplete(5, TimeUnit.SECONDS);
peer.expectTransfer().withNonNullPayload().withMore(true).withFrameSize(1024);
peer.remoteFlow().withIncomingWindow(1).withNextIncomingId(1).withLinkCredit(10).queue();
peer.expectTransfer().withNonNullPayload().withMore(true).withFrameSize(1024);
peer.remoteFlow().withIncomingWindow(1).withNextIncomingId(2).withLinkCredit(10).queue();
peer.expectTransfer().withNonNullPayload().withMore(true).withFrameSize(1024);
peer.remoteFlow().withIncomingWindow(1).withNextIncomingId(3).withLinkCredit(10).queue();
peer.expectTransfer().withNonNullPayload().withMore(true).withFrameSize(1024);
peer.remoteFlow().withIncomingWindow(1).withNextIncomingId(4).withLinkCredit(10).queue();
peer.expectTransfer().withNonNullPayload().withMore(false).accept();
// Grant the credit to start meeting the above expectations
peer.remoteFlow().withIncomingWindow(1).withNextIncomingId(0).withLinkCredit(10).now();
peer.waitForScriptToComplete(500, TimeUnit.SECONDS);
peer.expectDetach().respond();
peer.expectClose().respond();
assertFalse(sendFailed.get());
sender.closeAsync().get();
connection.closeAsync().get();
peer.waitForScriptToComplete(5, TimeUnit.SECONDS);
}
}
@Test
void testAutoFlushDuringWriteWithRollingIncomingWindowUpdates() throws Exception {
try (ProtonTestServer peer = new ProtonTestServer()) {
peer.expectSASLAnonymousConnect();
peer.expectOpen().respond();
peer.expectBegin().withNextOutgoingId(0).respond();
peer.expectAttach().ofSender().respond();
peer.start();
URI remoteURI = peer.getServerURI();
LOG.info("Test started, peer listening on: {}", remoteURI);
Client container = Client.create();
ConnectionOptions options = new ConnectionOptions().maxFrameSize(1024);
Connection connection = container.connect(remoteURI.getHost(), remoteURI.getPort(), options);
Sender sender = connection.openSender("test-queue");
final byte[] payload = new byte[4800];
Arrays.fill(payload, (byte) 1);
final AtomicBoolean sendFailed = new AtomicBoolean();
ForkJoinPool.commonPool().execute(() -> {
try {
sender.send(Message.create(payload));
} catch (Exception e) {
LOG.info("send failed with error: ", e);
sendFailed.set(true);
}
});
// Credit should will be refilling as transfers arrive vs being exhausted on each
// incoming transfer and the send awaiting more credit.
peer.waitForScriptToComplete(5, TimeUnit.SECONDS);
peer.expectTransfer().withNonNullPayload().withMore(true);
peer.remoteFlow().withIncomingWindow(1).withNextIncomingId(2).withLinkCredit(10).queue();
peer.expectTransfer().withNonNullPayload().withMore(true);
peer.remoteFlow().withIncomingWindow(1).withNextIncomingId(3).withLinkCredit(10).queue();
peer.expectTransfer().withNonNullPayload().withMore(true);
peer.remoteFlow().withIncomingWindow(1).withNextIncomingId(4).withLinkCredit(10).queue();
peer.expectTransfer().withNonNullPayload().withMore(true);
peer.expectTransfer().withNonNullPayload().withMore(false).accept();
// Grant the credit to start meeting the above expectations
peer.remoteFlow().withIncomingWindow(2).withNextIncomingId(0).withLinkCredit(10).now();
peer.waitForScriptToComplete(500, TimeUnit.SECONDS);
peer.expectDetach().respond();
peer.expectClose().respond();
assertFalse(sendFailed.get());
sender.closeAsync().get();
connection.closeAsync().get();
peer.waitForScriptToComplete(5, TimeUnit.SECONDS);
}
}
@Test
void testConcurrentSendOnlyBlocksForInitialSendInProgress() throws Exception {
try (ProtonTestServer peer = new ProtonTestServer()) {
peer.expectSASLAnonymousConnect();
peer.expectOpen().respond();
peer.expectBegin().respond();
peer.expectAttach().ofSender().respond();
peer.remoteFlow().withLinkCredit(2).queue();
peer.expectAttach().ofReceiver().respond();
peer.expectFlow();
peer.expectTransfer().withNonNullPayload().withMore(false).respond().withSettled(true).withState().accepted();
peer.expectTransfer().withNonNullPayload().withMore(false).respond().withSettled(true).withState().accepted();
peer.start();
URI remoteURI = peer.getServerURI();
LOG.info("Test started, peer listening on: {}", remoteURI);
Client container = Client.create();
Connection connection = container.connect(remoteURI.getHost(), remoteURI.getPort());
Sender sender = connection.openSender("test-queue").openFuture().get();
// Ensure that sender gets its flow before the sends are triggered.
connection.openReceiver("test-queue").openFuture().get();
final byte[] payload = new byte[1024];
Arrays.fill(payload, (byte) 1);
// One should block on the send waiting for the others send to finish
// otherwise they should not care about concurrency of sends.
final AtomicBoolean sendFailed = new AtomicBoolean();
ForkJoinPool.commonPool().execute(() -> {
try {
LOG.info("Test send 1 is preparing to fire:");
Tracker tracker = sender.send(Message.create(payload));
tracker.awaitSettlement(10, TimeUnit.SECONDS);
} catch (Exception e) {
LOG.info("Test send 1 failed with error: ", e);
sendFailed.set(true);
}
});
ForkJoinPool.commonPool().execute(() -> {
try {
LOG.info("Test send 2 is preparing to fire:");
Tracker tracker = sender.send(Message.create(payload));
tracker.awaitSettlement(10, TimeUnit.SECONDS);
} catch (Exception e) {
LOG.info("Test send 2 failed with error: ", e);
sendFailed.set(true);
}
});
peer.waitForScriptToComplete(5, TimeUnit.SECONDS);
peer.expectDetach().respond();
peer.expectClose().respond();
assertFalse(sendFailed.get());
sender.closeAsync().get();
connection.closeAsync().get();
peer.waitForScriptToComplete(5, TimeUnit.SECONDS);
}
}
@Test
void testConcurrentSendBlocksBehindSendWaitingForCredit() throws Exception {
try (ProtonTestServer peer = new ProtonTestServer()) {
peer.expectSASLAnonymousConnect();
peer.expectOpen().respond();
peer.expectBegin().respond();
peer.expectAttach().ofSender().respond();
peer.start();
URI remoteURI = peer.getServerURI();
LOG.info("Test started, peer listening on: {}", remoteURI);
Client container = Client.create();
Connection connection = container.connect(remoteURI.getHost(), remoteURI.getPort());
Sender sender = connection.openSender("test-queue").openFuture().get();
final byte[] payload = new byte[1024];
Arrays.fill(payload, (byte) 1);
final CountDownLatch send1Started = new CountDownLatch(1);
final CountDownLatch send2Completed = new CountDownLatch(1);
final AtomicBoolean sendFailed = new AtomicBoolean();
ForkJoinPool.commonPool().execute(() -> {
try {
LOG.info("Test send 1 is preparing to fire:");
ForkJoinPool.commonPool().execute(() -> send1Started.countDown());
sender.send(Message.create(payload));
} catch (Exception e) {
LOG.info("Test send 1 failed with error: ", e);
sendFailed.set(true);
}
});
ForkJoinPool.commonPool().execute(() -> {
try {
assertTrue(send1Started.await(10, TimeUnit.SECONDS));
LOG.info("Test send 2 is preparing to fire:");
Tracker tracker = sender.send(Message.create(payload));
tracker.awaitSettlement(10, TimeUnit.SECONDS);
send2Completed.countDown();
} catch (Exception e) {
LOG.info("Test send 2 failed with error: ", e);
sendFailed.set(true);
}
});
peer.waitForScriptToComplete(5, TimeUnit.SECONDS);
peer.remoteFlow().withIncomingWindow(1).withDeliveryCount(0).withNextIncomingId(1).withLinkCredit(1).now();
peer.expectTransfer().withNonNullPayload().withMore(false).respond().withSettled(true).withState().accepted();
peer.remoteFlow().withIncomingWindow(1).withDeliveryCount(1).withNextIncomingId(2).withLinkCredit(1).queue();
peer.expectTransfer().withNonNullPayload().withMore(false).respond().withSettled(true).withState().accepted();
assertTrue(send2Completed.await(10, TimeUnit.SECONDS));
peer.waitForScriptToComplete(5, TimeUnit.SECONDS);
peer.expectDetach().respond();
peer.expectClose().respond();
assertFalse(sendFailed.get());
sender.closeAsync().get();
connection.closeAsync().get();
peer.waitForScriptToComplete(5, TimeUnit.SECONDS);
}
}
@Test
void testConcurrentSendWaitingOnSplitFramedSendToCompleteIsSentAfterCreditUpdated() throws Exception {
try (ProtonTestServer peer = new ProtonTestServer()) {
peer.expectSASLAnonymousConnect();
peer.expectOpen().respond();
peer.expectBegin().respond();
peer.expectAttach().ofSender().respond();
peer.start();
URI remoteURI = peer.getServerURI();
LOG.info("Test started, peer listening on: {}", remoteURI);
Client container = Client.create();
ConnectionOptions options = new ConnectionOptions().maxFrameSize(1024);
Connection connection = container.connect(remoteURI.getHost(), remoteURI.getPort(), options);
Sender sender = connection.openSender("test-queue");
final byte[] payload = new byte[1536];
Arrays.fill(payload, (byte) 1);
final CountDownLatch send1Started = new CountDownLatch(1);
final CountDownLatch send2Completed = new CountDownLatch(1);
final AtomicBoolean sendFailed = new AtomicBoolean();
ForkJoinPool.commonPool().execute(() -> {
try {
LOG.info("Test send 1 is preparing to fire:");
ForkJoinPool.commonPool().execute(() -> send1Started.countDown());
sender.send(Message.create(payload));
} catch (Exception e) {
LOG.info("Test send 1 failed with error: ", e);
sendFailed.set(true);
}
});
ForkJoinPool.commonPool().execute(() -> {
try {
assertTrue(send1Started.await(10, TimeUnit.SECONDS));
LOG.info("Test send 2 is preparing to fire:");
Tracker tracker = sender.send(Message.create(payload));
tracker.awaitSettlement(10, TimeUnit.SECONDS);
send2Completed.countDown();
} catch (Exception e) {
LOG.info("Test send 2 failed with error: ", e);
sendFailed.set(true);
}
});
peer.waitForScriptToComplete(5, TimeUnit.SECONDS);
peer.remoteFlow().withIncomingWindow(1).withDeliveryCount(0).withNextIncomingId(1).withLinkCredit(1).now();
peer.expectTransfer().withNonNullPayload().withMore(true);
peer.remoteFlow().withIncomingWindow(1).withDeliveryCount(0).withNextIncomingId(2).withLinkCredit(1).queue();
peer.expectTransfer().withNonNullPayload().withMore(false).respond().withSettled(true).withState().accepted();
peer.remoteFlow().withIncomingWindow(1).withDeliveryCount(1).withNextIncomingId(3).withLinkCredit(1).queue();
peer.expectTransfer().withNonNullPayload().withMore(true);
peer.remoteFlow().withIncomingWindow(1).withDeliveryCount(1).withNextIncomingId(4).withLinkCredit(1).queue();
peer.expectTransfer().withNonNullPayload().withMore(false).respond().withSettled(true).withState().accepted();
assertTrue(send2Completed.await(10, TimeUnit.SECONDS));
peer.waitForScriptToComplete(5, TimeUnit.SECONDS);
peer.expectDetach().respond();
peer.expectClose().respond();
assertFalse(sendFailed.get());
sender.closeAsync().get();
connection.closeAsync().get();
peer.waitForScriptToComplete(5, TimeUnit.SECONDS);
}
}
@Test
public void testCreateSenderWithDefaultSourceAndTargetOptions() throws Exception {
try (ProtonTestServer peer = new ProtonTestServer()) {
peer.expectSASLAnonymousConnect();
peer.expectOpen().respond();
peer.expectBegin().respond();
peer.expectAttach().ofSender()
.withSource().withAddress(notNullValue())
.withDistributionMode(nullValue())
.withDefaultTimeout()
.withDurable(TerminusDurability.NONE)
.withExpiryPolicy(TerminusExpiryPolicy.LINK_DETACH)
.withDefaultOutcome(nullValue())
.withCapabilities(nullValue())
.withFilter(nullValue())
.withOutcomes("amqp:accepted:list", "amqp:rejected:list", "amqp:released:list", "amqp:modified:list")
.also()
.withTarget().withAddress("test-queue")
.withCapabilities(nullValue())
.withDurable(nullValue())
.withExpiryPolicy(nullValue())
.withDefaultTimeout()
.withDynamic(anyOf(nullValue(), equalTo(false)))
.withDynamicNodeProperties(nullValue())
.and().respond();
peer.expectDetach().respond();
peer.expectEnd().respond();
peer.expectClose().respond();
peer.start();
URI remoteURI = peer.getServerURI();
LOG.info("Test started, peer listening on: {}", remoteURI);
Client container = Client.create();
Connection connection = container.connect(remoteURI.getHost(), remoteURI.getPort());
Session session = connection.openSession();
Sender sender = session.openSender("test-queue").openFuture().get();
sender.close();
session.close();
connection.close();
peer.waitForScriptToComplete(5, TimeUnit.SECONDS);
}
}
@Test
public void testCreateSenderWithUserConfiguredSourceAndTargetOptions() throws Exception {
final Map<String, Object> filtersToObject = new HashMap<>();
filtersToObject.put("x-opt-filter", "a = b");
final Map<String, String> filters = new HashMap<>();
filters.put("x-opt-filter", "a = b");
try (ProtonTestServer peer = new ProtonTestServer()) {
peer.expectSASLAnonymousConnect();
peer.expectOpen().respond();
peer.expectBegin().respond();
peer.expectAttach().ofSender()
.withSource().withAddress(notNullValue())
.withDistributionMode("copy")
.withTimeout(128)
.withDurable(TerminusDurability.UNSETTLED_STATE)
.withExpiryPolicy(TerminusExpiryPolicy.CONNECTION_CLOSE)
.withDefaultOutcome(new Released())
.withCapabilities("QUEUE")
.withFilter(filtersToObject)
.withOutcomes("amqp:accepted:list", "amqp:rejected:list")
.also()
.withTarget().withAddress("test-queue")
.withCapabilities("QUEUE")
.withDurable(TerminusDurability.CONFIGURATION)
.withExpiryPolicy(TerminusExpiryPolicy.SESSION_END)
.withTimeout(42)
.withDynamic(anyOf(nullValue(), equalTo(false)))
.withDynamicNodeProperties(nullValue())
.and().respond();
peer.expectDetach().respond();
peer.expectEnd().respond();
peer.expectClose().respond();
peer.start();
URI remoteURI = peer.getServerURI();
LOG.info("Test started, peer listening on: {}", remoteURI);
Client container = Client.create();
Connection connection = container.connect(remoteURI.getHost(), remoteURI.getPort());
Session session = connection.openSession();
SenderOptions senderOptions = new SenderOptions();
senderOptions.sourceOptions().capabilities("QUEUE");
senderOptions.sourceOptions().distributionMode(DistributionMode.COPY);
senderOptions.sourceOptions().timeout(128);
senderOptions.sourceOptions().durabilityMode(DurabilityMode.UNSETTLED_STATE);
senderOptions.sourceOptions().expiryPolicy(ExpiryPolicy.CONNECTION_CLOSE);
senderOptions.sourceOptions().defaultOutcome(DeliveryState.released());
senderOptions.sourceOptions().filters(filters);
senderOptions.sourceOptions().outcomes(DeliveryState.Type.ACCEPTED, DeliveryState.Type.REJECTED);
senderOptions.targetOptions().capabilities("QUEUE");
senderOptions.targetOptions().durabilityMode(DurabilityMode.CONFIGURATION);
senderOptions.targetOptions().expiryPolicy(ExpiryPolicy.SESSION_CLOSE);
senderOptions.targetOptions().timeout(42);
Sender sender = session.openSender("test-queue", senderOptions).openFuture().get();
sender.close();
session.close();
connection.close();
peer.waitForScriptToComplete(5, TimeUnit.SECONDS);
}
}
@Test
void testWaitForAcceptedReturnsOnRemoteAcceptance() throws Exception {
try (ProtonTestServer peer = new ProtonTestServer()) {
peer.expectSASLAnonymousConnect();
peer.expectOpen().respond();
peer.expectBegin().respond();
peer.expectAttach().ofSender().respond();
peer.remoteFlow().withLinkCredit(2).queue();
peer.expectTransfer().withNonNullPayload().withMore(false).respond().withSettled(true).withState().accepted();
peer.start();
URI remoteURI = peer.getServerURI();
LOG.info("Test started, peer listening on: {}", remoteURI);
Client container = Client.create();
Connection connection = container.connect(remoteURI.getHost(), remoteURI.getPort());
Sender sender = connection.openSender("test-queue").openFuture().get();
Tracker tracker = sender.send(Message.create("Hello World"));
tracker.awaitAccepted();
assertTrue(tracker.remoteSettled());
assertTrue(tracker.remoteState().isAccepted());
peer.waitForScriptToComplete(5, TimeUnit.SECONDS);
peer.expectDetach().respond();
peer.expectClose().respond();
sender.closeAsync().get();
connection.closeAsync().get();
peer.waitForScriptToComplete(5, TimeUnit.SECONDS);
}
}
@Test
void testWaitForAcceptanceFailsIfRemoteSendsRejected() throws Exception {
try (ProtonTestServer peer = new ProtonTestServer()) {
peer.expectSASLAnonymousConnect();
peer.expectOpen().respond();
peer.expectBegin().respond();
peer.expectAttach().ofSender().respond();
peer.remoteFlow().withLinkCredit(2).queue();
peer.expectTransfer().withNonNullPayload().withMore(false).respond().withSettled(true).withState().rejected();
peer.start();
URI remoteURI = peer.getServerURI();
LOG.info("Test started, peer listening on: {}", remoteURI);
Client container = Client.create();
Connection connection = container.connect(remoteURI.getHost(), remoteURI.getPort());
Sender sender = connection.openSender("test-queue").openFuture().get();
Tracker tracker = sender.send(Message.create("Hello World"));
try {
tracker.awaitAccepted(10, TimeUnit.SECONDS);
fail("Should not succeed since remote sent something other than Accepted");
} catch (ClientDeliveryStateException dlvEx) {
// Expected
}
assertTrue(tracker.remoteSettled());
assertFalse(tracker.remoteState().isAccepted());
peer.waitForScriptToComplete(5, TimeUnit.SECONDS);
peer.expectDetach().respond();
peer.expectClose().respond();
sender.closeAsync().get();
connection.closeAsync().get();
peer.waitForScriptToComplete(5, TimeUnit.SECONDS);
}
}
@Test
void testWaitForAcceptanceFailsIfRemoteSendsNoDisposition() throws Exception {
try (ProtonTestServer peer = new ProtonTestServer()) {
peer.expectSASLAnonymousConnect();
peer.expectOpen().respond();
peer.expectBegin().respond();
peer.expectAttach().ofSender().respond();
peer.remoteFlow().withLinkCredit(2).queue();
peer.expectTransfer().withNonNullPayload().withMore(false).respond().withSettled(true);
peer.start();
URI remoteURI = peer.getServerURI();
LOG.info("Test started, peer listening on: {}", remoteURI);
Client container = Client.create();
Connection connection = container.connect(remoteURI.getHost(), remoteURI.getPort());
Sender sender = connection.openSender("test-queue").openFuture().get();
Tracker tracker = sender.send(Message.create("Hello World"));
try {
tracker.awaitAccepted(10, TimeUnit.SECONDS);
fail("Should not succeed since remote sent something other than Accepted");
} catch (ClientDeliveryStateException dlvEx) {
// Expected
}
assertTrue(tracker.remoteSettled());
assertNull(tracker.remoteState());
peer.waitForScriptToComplete(5, TimeUnit.SECONDS);
peer.expectDetach().respond();
peer.expectClose().respond();
sender.closeAsync().get();
connection.closeAsync().get();
peer.waitForScriptToComplete(5, TimeUnit.SECONDS);
}
}
@Test
public void testSenderLinkNameOptionAppliedWhenSet() throws Exception {
try (ProtonTestServer peer = new ProtonTestServer()) {
peer.expectSASLAnonymousConnect();
peer.expectOpen().respond();
peer.expectBegin().respond();
peer.expectAttach().ofSender().withName("custom-link-name").respond();
peer.expectDetach().respond();
peer.expectClose().respond();
peer.start();
URI remoteURI = peer.getServerURI();
LOG.info("Sender test started, peer listening on: {}", remoteURI);
Client container = Client.create();
Connection connection = container.connect(remoteURI.getHost(), remoteURI.getPort());
Session session = connection.openSession();
SenderOptions senderOptions = new SenderOptions().linkName("custom-link-name");
Sender sender = session.openSender("test-queue", senderOptions);
sender.openFuture().get();
sender.close();
connection.close();
peer.waitForScriptToComplete(5, TimeUnit.SECONDS);
}
}
@Test
public void testInspectRemoteSourceMatchesValuesSent() throws Exception {
Map<String, Object> remoteFilters = new HashMap<>();
remoteFilters.put("filter-1", "value1");
try (ProtonTestServer peer = new ProtonTestServer()) {
peer.expectSASLAnonymousConnect();
peer.expectOpen().respond();
peer.expectBegin().respond();
peer.expectAttach().ofSender().respond().withSource().withOutcomes("Accepted", "Released")
.withCapabilities("Queue")
.withDistributionMode("COPY")
.withDynamic(false)
.withExpiryPolicy(TerminusExpiryPolicy.SESSION_END)
.withDurability(TerminusDurability.UNSETTLED_STATE)
.withDefaultOutcome(Released.getInstance())
.withTimeout(Integer.MAX_VALUE)
.withFilterMap(remoteFilters)
.withAddress("test-queue");
peer.expectDetach().respond();
peer.expectClose().respond();
peer.start();
URI remoteURI = peer.getServerURI();
LOG.info("Sender test started, peer listening on: {}", remoteURI);
Client container = Client.create();
Connection connection = container.connect(remoteURI.getHost(), remoteURI.getPort());
Session session = connection.openSession();
Sender sender = session.openSender("test-queue");
Source remoteSource = sender.source();
assertTrue(remoteSource.outcomes().contains(DeliveryState.Type.ACCEPTED));
assertTrue(remoteSource.capabilities().contains("Queue"));
assertEquals("test-queue", remoteSource.address());
assertFalse(remoteSource.dynamic());
assertNull(remoteSource.dynamicNodeProperties());
assertEquals(DistributionMode.COPY, remoteSource.distributionMode());
assertEquals(DeliveryState.released(), remoteSource.defaultOutcome());
assertEquals(Integer.MAX_VALUE, remoteSource.timeout());
assertEquals(DurabilityMode.UNSETTLED_STATE, remoteSource.durabilityMode());
assertEquals(ExpiryPolicy.SESSION_CLOSE, remoteSource.expiryPolicy());
assertEquals(remoteFilters, remoteSource.filters());
sender.close();
connection.close();
peer.waitForScriptToComplete(5, TimeUnit.SECONDS);
}
}
@Test
public void testInspectRemoteTargetMatchesValuesSent() throws Exception {
Map<String, Object> remoteFilters = new HashMap<>();
remoteFilters.put("filter-1", "value1");
try (ProtonTestServer peer = new ProtonTestServer()) {
peer.expectSASLAnonymousConnect();
peer.expectOpen().respond();
peer.expectBegin().respond();
peer.expectAttach().ofSender().respond().withTarget().withCapabilities("Queue")
.withDynamic(false)
.withExpiryPolicy(TerminusExpiryPolicy.SESSION_END)
.withDurability(TerminusDurability.UNSETTLED_STATE)
.withTimeout(Integer.MAX_VALUE)
.withAddress("test-queue");
peer.expectDetach().respond();
peer.expectClose().respond();
peer.start();
URI remoteURI = peer.getServerURI();
LOG.info("Sender test started, peer listening on: {}", remoteURI);
Client container = Client.create();
Connection connection = container.connect(remoteURI.getHost(), remoteURI.getPort());
Session session = connection.openSession();
Sender sender = session.openSender("test-queue");
Target remoteTarget = sender.target();
assertTrue(remoteTarget.capabilities().contains("Queue"));
assertEquals("test-queue", remoteTarget.address());
assertFalse(remoteTarget.dynamic());
assertEquals(Integer.MAX_VALUE, remoteTarget.timeout());
assertEquals(DurabilityMode.UNSETTLED_STATE, remoteTarget.durabilityMode());
assertEquals(ExpiryPolicy.SESSION_CLOSE, remoteTarget.expiryPolicy());
sender.close();
connection.close();
peer.waitForScriptToComplete(5, TimeUnit.SECONDS);
}
}
private static DeliveryTagGenerator customTagGenerator() {
return new DeliveryTagGenerator() {
private int count = 1;
@Override
public DeliveryTag nextTag() {
switch (count++) {
case 1:
return new DeliveryTag.ProtonDeliveryTag(new byte[] { 1, 1, 1 });
case 2:
return new DeliveryTag.ProtonDeliveryTag(new byte[] { 2, 2, 2 });
case 3:
return new DeliveryTag.ProtonDeliveryTag(new byte[] { 3, 3, 3 });
default:
throw new UnsupportedOperationException("Only supports creating three tags");
}
}
};
}
@Test
public void testSenderUsesCustomDeliveryTagGeneratorConfiguration() throws Exception {
try (ProtonTestServer peer = new ProtonTestServer()) {
peer.expectSASLAnonymousConnect();
peer.expectOpen().respond();
peer.expectBegin().respond();
peer.expectAttach().ofSender().respond();
peer.remoteFlow().withLinkCredit(10).queue();
peer.start();
URI remoteURI = peer.getServerURI();
LOG.info("Sender test started, peer listening on: {}", remoteURI);
Client container = Client.create();
Connection connection = container.connect(remoteURI.getHost(), remoteURI.getPort()).openFuture().get();
Session session = connection.openSession().openFuture().get();
SenderOptions options = new SenderOptions().deliveryMode(DeliveryMode.AT_LEAST_ONCE)
.autoSettle(false)
.deliveryTagGeneratorSupplier(SenderTest::customTagGenerator);
Sender sender = session.openSender("test-tags", options).openFuture().get();
peer.waitForScriptToComplete(5, TimeUnit.SECONDS);
peer.expectTransfer().withNonNullPayload()
.withDeliveryTag(new byte[] {1, 1, 1}).respond().withSettled(true).withState().accepted();
peer.expectTransfer().withNonNullPayload()
.withDeliveryTag(new byte[] {2, 2, 2}).respond().withSettled(true).withState().accepted();
peer.expectTransfer().withNonNullPayload()
.withDeliveryTag(new byte[] {3, 3, 3}).respond().withSettled(true).withState().accepted();
peer.expectDetach().respond();
peer.expectClose().respond();
final Message<String> message = Message.create("Hello World");
final Tracker tracker1 = sender.send(message);
final Tracker tracker2 = sender.send(message);
final Tracker tracker3 = sender.send(message);
assertNotNull(tracker1);
assertNotNull(tracker1.settlementFuture().get().settled());
assertNotNull(tracker2);
assertNotNull(tracker2.settlementFuture().get().settled());
assertNotNull(tracker3);
assertNotNull(tracker3.settlementFuture().get().settled());
sender.closeAsync().get(10, TimeUnit.SECONDS);
connection.closeAsync().get(10, TimeUnit.SECONDS);
peer.waitForScriptToComplete(5, TimeUnit.SECONDS);
}
}
@Test
public void testCannotCreateSenderWhenTagGeneratorReturnsNull() throws Exception {
try (ProtonTestServer peer = new ProtonTestServer()) {
peer.expectSASLAnonymousConnect();
peer.expectOpen().respond();
peer.expectBegin().respond();
peer.expectClose().respond();
peer.start();
URI remoteURI = peer.getServerURI();
LOG.info("Sender test started, peer listening on: {}", remoteURI);
Client container = Client.create();
Connection connection = container.connect(remoteURI.getHost(), remoteURI.getPort()).openFuture().get();
Session session = connection.openSession().openFuture().get();
SenderOptions options = new SenderOptions().deliveryMode(DeliveryMode.AT_LEAST_ONCE)
.autoSettle(false)
.deliveryTagGeneratorSupplier(() -> null);
try {
session.openSender("test-tags", options).openFuture().get();
fail("Should not create a sender if the tag generator is not supplied");
} catch (ClientException cliEx) {
// Expected
}
connection.closeAsync().get(10, TimeUnit.SECONDS);
peer.waitForScriptToComplete(5, TimeUnit.SECONDS);
}
}
}