blob: 4577b694ddee9f79fec598dbe60d021478dc1811 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.qpid.protonj2.client.impl;
import static org.junit.jupiter.api.Assertions.assertArrayEquals;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertNotNull;
import static org.junit.jupiter.api.Assertions.assertNull;
import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.junit.jupiter.api.Assertions.fail;
import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.io.OutputStream;
import java.net.URI;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Random;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ForkJoinPool;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.qpid.protonj2.client.Client;
import org.apache.qpid.protonj2.client.Connection;
import org.apache.qpid.protonj2.client.ConnectionOptions;
import org.apache.qpid.protonj2.client.DeliveryMode;
import org.apache.qpid.protonj2.client.Message;
import org.apache.qpid.protonj2.client.OutputStreamOptions;
import org.apache.qpid.protonj2.client.Receiver;
import org.apache.qpid.protonj2.client.ReceiverOptions;
import org.apache.qpid.protonj2.client.SenderOptions;
import org.apache.qpid.protonj2.client.Session;
import org.apache.qpid.protonj2.client.StreamSender;
import org.apache.qpid.protonj2.client.StreamSenderMessage;
import org.apache.qpid.protonj2.client.StreamSenderOptions;
import org.apache.qpid.protonj2.client.StreamTracker;
import org.apache.qpid.protonj2.client.exceptions.ClientException;
import org.apache.qpid.protonj2.client.exceptions.ClientIllegalStateException;
import org.apache.qpid.protonj2.client.exceptions.ClientUnsupportedOperationException;
import org.apache.qpid.protonj2.client.test.ImperativeClientTestCase;
import org.apache.qpid.protonj2.client.test.Wait;
import org.apache.qpid.protonj2.engine.DeliveryTagGenerator;
import org.apache.qpid.protonj2.test.driver.ProtonTestServer;
import org.apache.qpid.protonj2.test.driver.matchers.messaging.ApplicationPropertiesMatcher;
import org.apache.qpid.protonj2.test.driver.matchers.messaging.DeliveryAnnotationsMatcher;
import org.apache.qpid.protonj2.test.driver.matchers.messaging.FooterMatcher;
import org.apache.qpid.protonj2.test.driver.matchers.messaging.HeaderMatcher;
import org.apache.qpid.protonj2.test.driver.matchers.messaging.MessageAnnotationsMatcher;
import org.apache.qpid.protonj2.test.driver.matchers.messaging.PropertiesMatcher;
import org.apache.qpid.protonj2.test.driver.matchers.transport.TransferPayloadCompositeMatcher;
import org.apache.qpid.protonj2.test.driver.matchers.types.EncodedAmqpValueMatcher;
import org.apache.qpid.protonj2.test.driver.matchers.types.EncodedCompositingDataSectionMatcher;
import org.apache.qpid.protonj2.test.driver.matchers.types.EncodedDataMatcher;
import org.apache.qpid.protonj2.test.driver.matchers.types.EncodedPartialDataSectionMatcher;
import org.apache.qpid.protonj2.types.DeliveryTag;
import org.apache.qpid.protonj2.types.messaging.AmqpValue;
import org.apache.qpid.protonj2.types.messaging.Data;
import org.apache.qpid.protonj2.types.messaging.Footer;
import org.apache.qpid.protonj2.types.messaging.Header;
import org.apache.qpid.protonj2.types.messaging.Section;
import org.apache.qpid.protonj2.types.transport.Role;
import org.hamcrest.Matchers;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.Timeout;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@Timeout(20)
public class StreamSenderTest extends ImperativeClientTestCase {
private static final Logger LOG = LoggerFactory.getLogger(StreamSenderTest.class);
@Test
public void testSendWhenCreditIsAvailable() throws Exception {
doTestSendWhenCreditIsAvailable(false, false);
}
@Test
public void testTrySendWhenCreditIsAvailable() throws Exception {
doTestSendWhenCreditIsAvailable(true, false);
}
@Test
public void testSendWhenCreditIsAvailableWithDeliveryAnnotations() throws Exception {
doTestSendWhenCreditIsAvailable(false, true);
}
@Test
public void testTrySendWhenCreditIsAvailableWithDeliveryAnnotations() throws Exception {
doTestSendWhenCreditIsAvailable(true, true);
}
private void doTestSendWhenCreditIsAvailable(boolean trySend, boolean addDeliveryAnnotations) throws Exception {
try (ProtonTestServer peer = new ProtonTestServer()) {
peer.expectSASLAnonymousConnect();
peer.expectOpen().respond();
peer.expectBegin().respond();
peer.expectAttach().ofSender().respond();
peer.remoteFlow().withDeliveryCount(0)
.withLinkCredit(10)
.withIncomingWindow(1024)
.withOutgoingWindow(10)
.withNextIncomingId(0)
.withNextOutgoingId(1).queue();
peer.expectBegin().respond();
peer.expectAttach().ofReceiver().respond();
peer.start();
URI remoteURI = peer.getServerURI();
LOG.info("Sender test started, peer listening on: {}", remoteURI);
Client container = Client.create();
Connection connection = container.connect(remoteURI.getHost(), remoteURI.getPort());
StreamSender sender = connection.openStreamSender("test-queue");
sender.openFuture().get(10, TimeUnit.SECONDS);
// This ensures that the flow to sender is processed before we try-send
Receiver receiver = connection.openReceiver("test-queue", new ReceiverOptions().creditWindow(0));
receiver.openFuture().get(10, TimeUnit.SECONDS);
Map<String, Object> deliveryAnnotations = new HashMap<>();
deliveryAnnotations.put("da1", 1);
deliveryAnnotations.put("da2", 2);
deliveryAnnotations.put("da3", 3);
DeliveryAnnotationsMatcher daMatcher = new DeliveryAnnotationsMatcher(true);
daMatcher.withEntry("da1", Matchers.equalTo(1));
daMatcher.withEntry("da2", Matchers.equalTo(2));
daMatcher.withEntry("da3", Matchers.equalTo(3));
EncodedAmqpValueMatcher bodyMatcher = new EncodedAmqpValueMatcher("Hello World");
TransferPayloadCompositeMatcher payloadMatcher = new TransferPayloadCompositeMatcher();
if (addDeliveryAnnotations) {
payloadMatcher.setDeliveryAnnotationsMatcher(daMatcher);
}
payloadMatcher.setMessageContentMatcher(bodyMatcher);
peer.waitForScriptToComplete(5, TimeUnit.SECONDS);
peer.expectTransfer().withNonNullPayload();
peer.expectDetach().respond();
peer.expectEnd().respond();
peer.expectClose().respond();
Message<String> message = Message.create("Hello World");
final StreamTracker tracker;
if (trySend) {
if (addDeliveryAnnotations) {
tracker = sender.trySend(message, deliveryAnnotations);
} else {
tracker = sender.trySend(message);
}
} else {
if (addDeliveryAnnotations) {
tracker = sender.send(message, deliveryAnnotations);
} else {
tracker = sender.send(message);
}
}
assertNotNull(tracker);
sender.closeAsync().get(10, TimeUnit.SECONDS);
connection.closeAsync().get(10, TimeUnit.SECONDS);
peer.waitForScriptToComplete(5, TimeUnit.SECONDS);
}
}
@Test
public void testOpenStreamSenderWithLinCapabilities() throws Exception {
try (ProtonTestServer peer = new ProtonTestServer()) {
peer.expectSASLAnonymousConnect();
peer.expectOpen().respond();
peer.expectBegin().respond();
peer.expectAttach().withRole(Role.SENDER.getValue())
.withTarget().withCapabilities("queue").and()
.respond();
peer.expectDetach().respond();
peer.expectEnd().respond();
peer.expectClose().respond();
peer.start();
URI remoteURI = peer.getServerURI();
LOG.info("StreamSender test started, peer listening on: {}", remoteURI);
Client container = Client.create();
Connection connection = container.connect(remoteURI.getHost(), remoteURI.getPort());
StreamSenderOptions senderOptions = new StreamSenderOptions();
senderOptions.targetOptions().capabilities("queue");
StreamSender sender = connection.openStreamSender("test-queue", senderOptions);
sender.openFuture().get();
sender.close();
connection.closeAsync().get(10, TimeUnit.SECONDS);
peer.waitForScriptToComplete(5, TimeUnit.SECONDS);
}
}
@Test
public void testOpenStreamSenderAppliesDefaultSessionOutgoingWindow() throws Exception {
try (ProtonTestServer peer = new ProtonTestServer()) {
peer.expectSASLAnonymousConnect();
peer.expectOpen().respond();
peer.expectBegin().respond();
peer.expectAttach().withRole(Role.SENDER.getValue())
.withTarget().withCapabilities("queue").and()
.respond();
peer.expectDetach().respond();
peer.expectEnd().respond();
peer.expectClose().respond();
peer.start();
URI remoteURI = peer.getServerURI();
LOG.info("StreamSender test started, peer listening on: {}", remoteURI);
Client container = Client.create();
Connection connection = container.connect(remoteURI.getHost(), remoteURI.getPort());
StreamSenderOptions senderOptions = new StreamSenderOptions();
senderOptions.targetOptions().capabilities("queue");
ClientStreamSender sender = (ClientStreamSender) connection.openStreamSender("test-queue", senderOptions);
assertEquals(StreamSenderOptions.DEFAULT_PENDING_WRITES_BUFFER_SIZE, sender.protonLink().getSession().getOutgoingCapacity());
sender.openFuture().get();
sender.close();
connection.closeAsync().get(10, TimeUnit.SECONDS);
peer.waitForScriptToComplete(5, TimeUnit.SECONDS);
}
}
@Test
public void testOpenStreamSenderAppliesConfiguredSessionOutgoingWindow() throws Exception {
try (ProtonTestServer peer = new ProtonTestServer()) {
peer.expectSASLAnonymousConnect();
peer.expectOpen().respond();
peer.expectBegin().respond();
peer.expectAttach().withRole(Role.SENDER.getValue())
.withTarget().withCapabilities("queue").and()
.respond();
peer.expectDetach().respond();
peer.expectEnd().respond();
peer.expectClose().respond();
peer.start();
final int PENDING_WRITES_BUFFER_SIZE = StreamSenderOptions.DEFAULT_PENDING_WRITES_BUFFER_SIZE / 2;
URI remoteURI = peer.getServerURI();
LOG.info("StreamSender test started, peer listening on: {}", remoteURI);
Client container = Client.create();
Connection connection = container.connect(remoteURI.getHost(), remoteURI.getPort());
StreamSenderOptions senderOptions = new StreamSenderOptions().pendingWritesBufferSize(PENDING_WRITES_BUFFER_SIZE);
senderOptions.targetOptions().capabilities("queue");
ClientStreamSender sender = (ClientStreamSender) connection.openStreamSender("test-queue", senderOptions);
assertEquals(PENDING_WRITES_BUFFER_SIZE, sender.protonLink().getSession().getOutgoingCapacity());
sender.openFuture().get();
sender.close();
connection.closeAsync().get(10, TimeUnit.SECONDS);
peer.waitForScriptToComplete(5, TimeUnit.SECONDS);
}
}
@Test
public void testSendCustomMessageWithMultipleAmqpValueSections() throws Exception {
try (ProtonTestServer peer = new ProtonTestServer()) {
peer.expectSASLAnonymousConnect();
peer.expectOpen().respond();
peer.expectBegin().respond();
peer.expectBegin().respond(); // Hidden session for stream sender
peer.expectAttach().ofSender().respond();
peer.remoteFlow().withLinkCredit(10).queue();
peer.expectAttach().respond(); // Open a receiver to ensure sender link has processed
peer.expectFlow(); // the inbound flow frame we sent previously before send.
peer.start();
URI remoteURI = peer.getServerURI();
LOG.info("Sender test started, peer listening on: {}", remoteURI);
Client container = Client.create();
Connection connection = container.connect(remoteURI.getHost(), remoteURI.getPort()).openFuture().get();
Session session = connection.openSession().openFuture().get();
StreamSenderOptions options = new StreamSenderOptions();
options.deliveryMode(DeliveryMode.AT_MOST_ONCE);
options.writeBufferSize(Integer.MAX_VALUE);
StreamSender sender = connection.openStreamSender("test-qos", options);
// Create a custom message format send context and ensure that no early buffer writes take place
StreamSenderMessage message = sender.beginMessage();
assertEquals(sender, message.sender());
assertNull(message.tracker());
assertEquals(Header.DEFAULT_PRIORITY, message.priority());
assertEquals(Header.DEFAULT_DELIVERY_COUNT, message.deliveryCount());
assertEquals(Header.DEFAULT_FIRST_ACQUIRER, message.firstAcquirer());
assertEquals(Header.DEFAULT_TIME_TO_LIVE, message.timeToLive());
assertEquals(Header.DEFAULT_DURABILITY, message.durable());
message.messageFormat(17);
// Gates send on remote flow having been sent and received
session.openReceiver("dummy").openFuture().get();
HeaderMatcher headerMatcher = new HeaderMatcher(true);
headerMatcher.withDurable(true);
headerMatcher.withPriority((byte) 1);
headerMatcher.withTtl(65535);
headerMatcher.withFirstAcquirer(true);
headerMatcher.withDeliveryCount(2);
// Note: This is a specification violation but could be used by other message formats
// and we don't attempt to enforce at the Send Context what users write
EncodedAmqpValueMatcher bodyMatcher1 = new EncodedAmqpValueMatcher("one", true);
EncodedAmqpValueMatcher bodyMatcher2 = new EncodedAmqpValueMatcher("two", true);
EncodedAmqpValueMatcher bodyMatcher3 = new EncodedAmqpValueMatcher("three", false);
TransferPayloadCompositeMatcher payloadMatcher = new TransferPayloadCompositeMatcher();
payloadMatcher.setHeadersMatcher(headerMatcher);
payloadMatcher.addMessageContentMatcher(bodyMatcher1);
payloadMatcher.addMessageContentMatcher(bodyMatcher2);
payloadMatcher.addMessageContentMatcher(bodyMatcher3);
peer.waitForScriptToComplete(5, TimeUnit.SECONDS);
peer.expectTransfer().withMore(false).withMessageFormat(17).withPayload(payloadMatcher).accept();
peer.expectDetach().respond();
peer.expectEnd().respond();
peer.expectClose().respond();
// Populate all Header values
Header header = new Header();
header.setDurable(true);
header.setPriority((byte) 1);
header.setTimeToLive(65535);
header.setFirstAcquirer(true);
header.setDeliveryCount(2);
message.header(header);
message.addBodySection(new AmqpValue<>("one"));
message.addBodySection(new AmqpValue<>("two"));
message.addBodySection(new AmqpValue<>("three"));
message.complete();
assertNotNull(message.tracker());
assertEquals(17, message.messageFormat());
Wait.assertTrue(() -> message.tracker().settlementFuture().isDone());
assertTrue(message.tracker().settlementFuture().get().settled());
assertThrows(ClientIllegalStateException.class, () -> message.addBodySection(new AmqpValue<>("three")));
assertThrows(ClientIllegalStateException.class, () -> message.body());
assertThrows(ClientIllegalStateException.class, () -> message.rawOutputStream());
assertThrows(ClientIllegalStateException.class, () -> message.abort());
sender.closeAsync().get(10, TimeUnit.SECONDS);
connection.closeAsync().get(10, TimeUnit.SECONDS);
peer.waitForScriptToComplete(5, TimeUnit.SECONDS);
}
}
@Test
public void testClearBodySectionsIsNoOpForStreamSenderMessage() throws Exception {
try (ProtonTestServer peer = new ProtonTestServer()) {
peer.expectSASLAnonymousConnect();
peer.expectOpen().respond();
peer.expectBegin().respond();
peer.expectBegin().respond(); // Hidden session for stream sender
peer.expectAttach().ofSender().respond();
peer.remoteFlow().withLinkCredit(10).queue();
peer.expectAttach().respond(); // Open a receiver to ensure sender link has processed
peer.expectFlow(); // the inbound flow frame we sent previously before send.
peer.start();
URI remoteURI = peer.getServerURI();
LOG.info("Sender test started, peer listening on: {}", remoteURI);
Client container = Client.create();
Connection connection = container.connect(remoteURI.getHost(), remoteURI.getPort()).openFuture().get();
Session session = connection.openSession().openFuture().get();
StreamSenderOptions options = new StreamSenderOptions();
options.deliveryMode(DeliveryMode.AT_MOST_ONCE);
options.writeBufferSize(Integer.MAX_VALUE);
StreamSender sender = connection.openStreamSender("test-qos", options);
// Create a custom message format send context and ensure that no early buffer writes take place
StreamSenderMessage message = sender.beginMessage();
message.messageFormat(17);
// Gates send on remote flow having been sent and received
session.openReceiver("dummy").openFuture().get();
EncodedAmqpValueMatcher bodyMatcher1 = new EncodedAmqpValueMatcher("one", true);
TransferPayloadCompositeMatcher payloadMatcher = new TransferPayloadCompositeMatcher();
payloadMatcher.addMessageContentMatcher(bodyMatcher1);
peer.waitForScriptToComplete(5, TimeUnit.SECONDS);
peer.expectTransfer().withMore(false).withMessageFormat(17).withPayload(payloadMatcher).accept();
peer.expectDetach().respond();
peer.expectEnd().respond();
peer.expectClose().respond();
message.addBodySection(new AmqpValue<>("one"));
message.clearBodySections();
message.forEachBodySection((section) -> {
// No sections retained so this should never run.
throw new RuntimeException();
});
assertNotNull(message.bodySections());
assertTrue(message.bodySections().isEmpty());
message.complete();
assertTrue(message.tracker().settlementFuture().isDone());
assertTrue(message.tracker().settlementFuture().get().settled());
assertThrows(ClientIllegalStateException.class, () -> message.body());
assertThrows(ClientIllegalStateException.class, () -> message.rawOutputStream());
sender.closeAsync().get(10, TimeUnit.SECONDS);
connection.closeAsync().get(10, TimeUnit.SECONDS);
peer.waitForScriptToComplete(5, TimeUnit.SECONDS);
}
}
@Test
public void testMessageFormatCannotBeModifiedAfterBodyWritesStart() throws Exception {
try (ProtonTestServer peer = new ProtonTestServer()) {
peer.expectSASLAnonymousConnect();
peer.expectOpen().respond();
peer.expectBegin().respond(); // Hidden session for stream sender
peer.expectAttach().ofSender().respond();
peer.remoteFlow().withLinkCredit(10).queue();
peer.expectDetach().respond();
peer.expectEnd().respond();
peer.expectClose().respond();
peer.start();
URI remoteURI = peer.getServerURI();
LOG.info("Sender test started, peer listening on: {}", remoteURI);
Client container = Client.create();
Connection connection = container.connect(remoteURI.getHost(), remoteURI.getPort()).openFuture().get();
StreamSender sender = connection.openStreamSender("test-qos");
StreamSenderMessage message = sender.beginMessage();
sender.openFuture().get();
message.durable(true);
message.messageFormat(17);
message.body();
try {
message.messageFormat(16);
fail("Should not be able to modify message format after body writes started");
} catch (ClientIllegalStateException ex) {
// Expected
} catch (Exception unexpected) {
fail("Failed test due to message format set throwing unexpected error: " + unexpected);
}
message.abort();
assertThrows(ClientIllegalStateException.class, () -> message.complete());
sender.closeAsync().get(10, TimeUnit.SECONDS);
connection.closeAsync().get(10, TimeUnit.SECONDS);
peer.waitForScriptToComplete(5, TimeUnit.SECONDS);
}
}
@Test
public void testCannotCreateNewStreamingMessageWhileCurrentInstanceIsIncomplete() throws Exception {
try (ProtonTestServer peer = new ProtonTestServer()) {
peer.expectSASLAnonymousConnect();
peer.expectOpen().respond();
peer.expectBegin().respond(); // Hidden session for stream sender
peer.expectAttach().ofSender().respond();
peer.remoteFlow().withLinkCredit(10).queue();
peer.expectDetach().respond();
peer.expectEnd().respond();
peer.expectClose().respond();
peer.start();
URI remoteURI = peer.getServerURI();
LOG.info("Sender test started, peer listening on: {}", remoteURI);
Client container = Client.create();
Connection connection = container.connect(remoteURI.getHost(), remoteURI.getPort()).openFuture().get();
StreamSender sender = connection.openStreamSender("test-qos").openFuture().get();
StreamSenderMessage message = sender.beginMessage();
try {
sender.beginMessage();
fail("Should not be able create a new streaming sender message before last one is completed.");
} catch (ClientIllegalStateException ex) {
// Expected
}
message.abort();
assertThrows(ClientIllegalStateException.class, () -> message.complete());
sender.closeAsync().get(10, TimeUnit.SECONDS);
connection.closeAsync().get(10, TimeUnit.SECONDS);
peer.waitForScriptToComplete(5, TimeUnit.SECONDS);
}
}
@Test
public void testCannotAssignAnOutputStreamToTheMessageBody() throws Exception {
try (ProtonTestServer peer = new ProtonTestServer()) {
peer.expectSASLAnonymousConnect();
peer.expectOpen().respond();
peer.expectBegin().respond(); // Hidden session for stream sender
peer.expectAttach().ofSender().respond();
peer.remoteFlow().withLinkCredit(10).queue();
peer.expectDetach().respond();
peer.expectEnd().respond();
peer.expectClose().respond();
peer.start();
URI remoteURI = peer.getServerURI();
LOG.info("Sender test started, peer listening on: {}", remoteURI);
Client container = Client.create();
Connection connection = container.connect(remoteURI.getHost(), remoteURI.getPort()).openFuture().get();
StreamSender sender = connection.openStreamSender("test-qos").openFuture().get();
StreamSenderMessage message = sender.beginMessage();
try {
message.body(new ByteArrayOutputStream());
fail("Should not be able assign an output stream to the message body");
} catch (ClientUnsupportedOperationException ex) {
// Expected
}
message.abort();
assertThrows(ClientIllegalStateException.class, () -> message.complete());
sender.closeAsync().get(10, TimeUnit.SECONDS);
connection.closeAsync().get(10, TimeUnit.SECONDS);
peer.waitForScriptToComplete(5, TimeUnit.SECONDS);
}
}
@Test
public void testCannotModifyMessagePreambleAfterWritesHaveStarted() throws Exception {
try (ProtonTestServer peer = new ProtonTestServer()) {
peer.expectSASLAnonymousConnect();
peer.expectOpen().respond();
peer.expectBegin().respond(); // Hidden session for stream sender
peer.expectAttach().ofSender().respond();
peer.remoteFlow().withLinkCredit(10).queue();
peer.expectDetach().respond();
peer.expectEnd().respond();
peer.expectClose().respond();
peer.start();
URI remoteURI = peer.getServerURI();
LOG.info("Sender test started, peer listening on: {}", remoteURI);
Client container = Client.create();
Connection connection = container.connect(remoteURI.getHost(), remoteURI.getPort()).openFuture().get();
StreamSender sender = connection.openStreamSender("test-qos").openFuture().get();
StreamSenderMessage message = sender.beginMessage();
message.durable(true);
message.messageId("test");
message.annotation("key", "value");
message.property("key", "value");
message.body();
try {
message.durable(false);
fail("Should not be able to modify message preamble after body writes started");
} catch (ClientIllegalStateException ex) {
// Expected
}
try {
message.messageId("test1");
fail("Should not be able to modify message preamble after body writes started");
} catch (ClientIllegalStateException ex) {
// Expected
}
try {
message.annotation("key1", "value");
fail("Should not be able to modify message preamble after body writes started");
} catch (ClientIllegalStateException ex) {
// Expected
}
try {
message.property("key", "value");
fail("Should not be able to modify message preamble after body writes started");
} catch (ClientIllegalStateException ex) {
// Expected
}
message.abort();
sender.closeAsync().get(10, TimeUnit.SECONDS);
connection.closeAsync().get(10, TimeUnit.SECONDS);
peer.waitForScriptToComplete(5, TimeUnit.SECONDS);
}
}
@Test
void testCreateStream() throws Exception {
try (ProtonTestServer peer = new ProtonTestServer()) {
peer.expectSASLAnonymousConnect();
peer.expectOpen().respond();
peer.expectBegin().respond();
peer.expectAttach().ofSender().respond();
peer.remoteFlow().withLinkCredit(1).queue();
peer.expectTransfer().withMore(false).withNullPayload();
peer.expectDetach().withClosed(true).respond();
peer.expectEnd().respond();
peer.expectClose().respond();
peer.start();
URI remoteURI = peer.getServerURI();
LOG.info("Test started, peer listening on: {}", remoteURI);
Client container = Client.create();
Connection connection = container.connect(remoteURI.getHost(), remoteURI.getPort());
StreamSender sender = connection.openStreamSender("test-qos");
StreamSenderMessage tracker = sender.beginMessage();
OutputStreamOptions options = new OutputStreamOptions();
OutputStream stream = tracker.body(options);
assertNotNull(stream);
sender.openFuture().get();
stream.close();
sender.closeAsync().get();
connection.closeAsync().get();
peer.waitForScriptToComplete(5, TimeUnit.SECONDS);
}
}
@Test
void testOutputStreamOptionsEnforcesValidBodySizeValues() throws Exception {
OutputStreamOptions options = new OutputStreamOptions();
options.bodyLength(1024);
options.bodyLength(Integer.MAX_VALUE);
assertThrows(IllegalArgumentException.class, () -> options.bodyLength(-1));
}
@Test
public void testFlushWithSetNonBodySectionsThenClose() throws Exception {
doTestNonBodySectionWrittenWhenNoWritesToStream(true);
}
@Test
public void testCloseWithSetNonBodySections() throws Exception {
doTestNonBodySectionWrittenWhenNoWritesToStream(false);
}
private void doTestNonBodySectionWrittenWhenNoWritesToStream(boolean flushBeforeClose) throws Exception {
try (ProtonTestServer peer = new ProtonTestServer()) {
peer.expectSASLAnonymousConnect();
peer.expectOpen().respond();
peer.expectBegin().respond();
peer.expectAttach().ofSender().respond();
peer.remoteFlow().withLinkCredit(1).queue();
peer.start();
URI remoteURI = peer.getServerURI();
LOG.info("Test started, peer listening on: {}", remoteURI);
Client container = Client.create();
Connection connection = container.connect(remoteURI.getHost(), remoteURI.getPort());
StreamSender sender = connection.openStreamSender("test-queue");
StreamSenderMessage message = sender.beginMessage();
// Populate all Header values
Header header = new Header();
header.setDurable(true);
header.setPriority((byte) 1);
header.setTimeToLive(65535);
header.setFirstAcquirer(true);
header.setDeliveryCount(2);
message.header(header);
OutputStreamOptions options = new OutputStreamOptions();
OutputStream stream = message.body(options);
HeaderMatcher headerMatcher = new HeaderMatcher(true);
headerMatcher.withDurable(true);
headerMatcher.withPriority((byte) 1);
headerMatcher.withTtl(65535);
headerMatcher.withFirstAcquirer(true);
headerMatcher.withDeliveryCount(2);
TransferPayloadCompositeMatcher payloadMatcher = new TransferPayloadCompositeMatcher();
payloadMatcher.setHeadersMatcher(headerMatcher);
peer.waitForScriptToComplete(5, TimeUnit.SECONDS);
if (flushBeforeClose) {
peer.expectTransfer().withMore(true).withPayload(payloadMatcher);
peer.expectTransfer().withMore(false).withNullPayload()
.respond()
.withSettled(true).withState().accepted();
} else {
peer.expectTransfer().withMore(false).withPayload(payloadMatcher)
.respond()
.withSettled(true).withState().accepted();
}
peer.expectDetach().respond();
peer.expectEnd().respond();
peer.expectClose().respond();
// Once flush is called than anything in the buffer is written regardless of
// there being any actual stream writes. Default close action is to complete
// the delivery.
if (flushBeforeClose) {
stream.flush();
}
stream.close();
message.tracker().awaitSettlement(10, TimeUnit.SECONDS);
sender.closeAsync().get();
connection.closeAsync().get();
peer.waitForScriptToComplete(5, TimeUnit.SECONDS);
}
}
@Test
void testFlushAfterFirstWriteEncodesAMQPHeaderAndMessageBuffer() throws Exception {
try (ProtonTestServer peer = new ProtonTestServer()) {
peer.expectSASLAnonymousConnect();
peer.expectOpen().respond();
peer.expectBegin().respond();
peer.expectAttach().ofSender().respond();
peer.remoteFlow().withLinkCredit(1).queue();
peer.start();
URI remoteURI = peer.getServerURI();
LOG.info("Test started, peer listening on: {}", remoteURI);
Client container = Client.create();
Connection connection = container.connect(remoteURI.getHost(), remoteURI.getPort());
StreamSender sender = connection.openStreamSender("test-queue");
StreamSenderMessage message = sender.beginMessage();
// Populate all Header values
Header header = new Header();
header.setDurable(true);
header.setPriority((byte) 1);
header.setTimeToLive(65535);
header.setFirstAcquirer(true);
header.setDeliveryCount(2);
message.header(header);
OutputStreamOptions options = new OutputStreamOptions();
OutputStream stream = message.body(options);
HeaderMatcher headerMatcher = new HeaderMatcher(true);
headerMatcher.withDurable(true);
headerMatcher.withPriority((byte) 1);
headerMatcher.withTtl(65535);
headerMatcher.withFirstAcquirer(true);
headerMatcher.withDeliveryCount(2);
EncodedDataMatcher dataMatcher = new EncodedDataMatcher(new byte[] { 0, 1, 2, 3 });
TransferPayloadCompositeMatcher payloadMatcher = new TransferPayloadCompositeMatcher();
payloadMatcher.setHeadersMatcher(headerMatcher);
payloadMatcher.setMessageContentMatcher(dataMatcher);
peer.waitForScriptToComplete(5, TimeUnit.SECONDS);
peer.expectTransfer().withMore(true).withPayload(payloadMatcher);
peer.expectTransfer().withMore(false).withNullPayload();
peer.expectDetach().respond();
peer.expectEnd().respond();
peer.expectClose().respond();
// Stream won't output until some body bytes are written since the buffer was not
// filled by the header write. Then the close will complete the stream message.
stream.write(new byte[] { 0, 1, 2, 3 });
stream.flush();
stream.close();
sender.closeAsync().get();
connection.closeAsync().get();
peer.waitForScriptToComplete(5, TimeUnit.SECONDS);
}
}
@Test
void testAutoFlushAfterSingleWriteExceedsConfiguredBufferLimit() throws Exception {
try (ProtonTestServer peer = new ProtonTestServer()) {
peer.expectSASLAnonymousConnect();
peer.expectOpen().respond();
peer.expectBegin().respond();
peer.expectAttach().ofSender().respond();
peer.remoteFlow().withLinkCredit(1).queue();
peer.start();
URI remoteURI = peer.getServerURI();
LOG.info("Test started, peer listening on: {}", remoteURI);
Client container = Client.create();
Connection connection = container.connect(remoteURI.getHost(), remoteURI.getPort());
StreamSender sender = connection.openStreamSender("test-queue", new StreamSenderOptions().writeBufferSize(512));
StreamSenderMessage tracker = sender.beginMessage();
final byte[] payload = new byte[512];
Arrays.fill(payload, (byte) 16);
// Populate all Header values
Header header = new Header();
header.setDurable(true);
header.setPriority((byte) 1);
header.setTimeToLive(65535);
header.setFirstAcquirer(true);
header.setDeliveryCount(2);
tracker.header(header);
OutputStreamOptions options = new OutputStreamOptions();
OutputStream stream = tracker.body(options);
HeaderMatcher headerMatcher = new HeaderMatcher(true);
headerMatcher.withDurable(true);
headerMatcher.withPriority((byte) 1);
headerMatcher.withTtl(65535);
headerMatcher.withFirstAcquirer(true);
headerMatcher.withDeliveryCount(2);
EncodedDataMatcher dataMatcher = new EncodedDataMatcher(payload);
TransferPayloadCompositeMatcher payloadMatcher = new TransferPayloadCompositeMatcher();
payloadMatcher.setHeadersMatcher(headerMatcher);
payloadMatcher.setMessageContentMatcher(dataMatcher);
peer.waitForScriptToComplete(5, TimeUnit.SECONDS);
peer.expectTransfer().withPayload(payloadMatcher).withMore(true);
// Stream won't output until some body bytes are written.
stream.write(payload);
peer.waitForScriptToComplete(5, TimeUnit.SECONDS);
peer.expectTransfer().withNullPayload().withMore(false).accept();
peer.expectDetach().respond();
peer.expectEnd().respond();
peer.expectClose().respond();
stream.close();
sender.closeAsync().get();
connection.closeAsync().get();
peer.waitForScriptToComplete(5, TimeUnit.SECONDS);
}
}
@Test
void testAutoFlushDuringWriteThatExceedConfiguredBufferLimit() throws Exception {
try (ProtonTestServer peer = new ProtonTestServer()) {
peer.expectSASLAnonymousConnect();
peer.expectOpen().respond();
peer.expectBegin().respond();
peer.expectAttach().ofSender().respond();
peer.remoteFlow().withLinkCredit(1).queue();
peer.start();
URI remoteURI = peer.getServerURI();
LOG.info("Test started, peer listening on: {}", remoteURI);
Client container = Client.create();
Connection connection = container.connect(remoteURI.getHost(), remoteURI.getPort());
StreamSender sender = connection.openStreamSender("test-queue", new StreamSenderOptions().writeBufferSize(256));
StreamSenderMessage tracker = sender.beginMessage();
final byte[] payload = new byte[1024];
Arrays.fill(payload, 0, 256, (byte) 1);
Arrays.fill(payload, 256, 512, (byte) 2);
Arrays.fill(payload, 512, 768, (byte) 3);
Arrays.fill(payload, 768, 1024, (byte) 4);
final byte[] payload1 = new byte[256];
Arrays.fill(payload1, (byte) 1);
final byte[] payload2 = new byte[256];
Arrays.fill(payload2, (byte) 2);
final byte[] payload3 = new byte[256];
Arrays.fill(payload3, (byte) 3);
final byte[] payload4 = new byte[256];
Arrays.fill(payload4, (byte) 4);
// Populate all Header values
Header header = new Header();
header.setDurable(true);
header.setPriority((byte) 1);
header.setTimeToLive(65535);
header.setFirstAcquirer(true);
header.setDeliveryCount(2);
tracker.header(header);
OutputStreamOptions options = new OutputStreamOptions();
OutputStream stream = tracker.body(options);
HeaderMatcher headerMatcher = new HeaderMatcher(true);
headerMatcher.withDurable(true);
headerMatcher.withPriority((byte) 1);
headerMatcher.withTtl(65535);
headerMatcher.withFirstAcquirer(true);
headerMatcher.withDeliveryCount(2);
EncodedDataMatcher dataMatcher1 = new EncodedDataMatcher(payload1);
TransferPayloadCompositeMatcher payloadMatcher1 = new TransferPayloadCompositeMatcher();
payloadMatcher1.setHeadersMatcher(headerMatcher);
payloadMatcher1.setMessageContentMatcher(dataMatcher1);
EncodedDataMatcher dataMatcher2 = new EncodedDataMatcher(payload2);
TransferPayloadCompositeMatcher payloadMatcher2 = new TransferPayloadCompositeMatcher();
payloadMatcher2.setMessageContentMatcher(dataMatcher2);
EncodedDataMatcher dataMatcher3 = new EncodedDataMatcher(payload3);
TransferPayloadCompositeMatcher payloadMatcher3 = new TransferPayloadCompositeMatcher();
payloadMatcher3.setMessageContentMatcher(dataMatcher3);
EncodedDataMatcher dataMatcher4 = new EncodedDataMatcher(payload4);
TransferPayloadCompositeMatcher payloadMatcher4 = new TransferPayloadCompositeMatcher();
payloadMatcher4.setMessageContentMatcher(dataMatcher4);
peer.waitForScriptToComplete(5, TimeUnit.SECONDS);
peer.expectTransfer().withPayload(payloadMatcher1).withMore(true);
peer.expectTransfer().withPayload(payloadMatcher2).withMore(true);
peer.expectTransfer().withPayload(payloadMatcher3).withMore(true);
peer.expectTransfer().withPayload(payloadMatcher4).withMore(true);
// Stream won't output until some body bytes are written.
stream.write(payload);
peer.waitForScriptToComplete(5, TimeUnit.SECONDS);
peer.expectTransfer().withNullPayload().withMore(false).accept();
peer.expectDetach().respond();
peer.expectEnd().respond();
peer.expectClose().respond();
stream.close();
sender.close();
connection.close();
peer.waitForScriptToComplete(5, TimeUnit.SECONDS);
}
}
@Test
void testAutoFlushDuringWriteThatExceedConfiguredBufferLimitSessionCreditLimitOnTransfer() throws Exception {
try (ProtonTestServer peer = new ProtonTestServer()) {
peer.expectSASLAnonymousConnect();
peer.expectOpen().respond();
peer.expectBegin().respond();
peer.expectAttach().ofSender().respond();
peer.start();
URI remoteURI = peer.getServerURI();
LOG.info("Test started, peer listening on: {}", remoteURI);
Client container = Client.create();
Connection connection = container.connect(remoteURI.getHost(), remoteURI.getPort());
StreamSender sender = connection.openStreamSender("test-queue", new StreamSenderOptions().writeBufferSize(256));
StreamSenderMessage tracker = sender.beginMessage();
final byte[] payload = new byte[1024];
Arrays.fill(payload, 0, 256, (byte) 1);
Arrays.fill(payload, 256, 512, (byte) 2);
Arrays.fill(payload, 512, 768, (byte) 3);
Arrays.fill(payload, 768, 1024, (byte) 4);
final byte[] payload1 = new byte[256];
Arrays.fill(payload1, (byte) 1);
final byte[] payload2 = new byte[256];
Arrays.fill(payload2, (byte) 2);
final byte[] payload3 = new byte[256];
Arrays.fill(payload3, (byte) 3);
final byte[] payload4 = new byte[256];
Arrays.fill(payload4, (byte) 4);
// Populate all Header values
Header header = new Header();
header.setDurable(true);
header.setPriority((byte) 1);
header.setTimeToLive(65535);
header.setFirstAcquirer(true);
header.setDeliveryCount(2);
tracker.header(header);
OutputStreamOptions options = new OutputStreamOptions();
OutputStream stream = tracker.body(options);
HeaderMatcher headerMatcher = new HeaderMatcher(true);
headerMatcher.withDurable(true);
headerMatcher.withPriority((byte) 1);
headerMatcher.withTtl(65535);
headerMatcher.withFirstAcquirer(true);
headerMatcher.withDeliveryCount(2);
EncodedDataMatcher dataMatcher1 = new EncodedDataMatcher(payload1);
TransferPayloadCompositeMatcher payloadMatcher1 = new TransferPayloadCompositeMatcher();
payloadMatcher1.setHeadersMatcher(headerMatcher);
payloadMatcher1.setMessageContentMatcher(dataMatcher1);
EncodedDataMatcher dataMatcher2 = new EncodedDataMatcher(payload2);
TransferPayloadCompositeMatcher payloadMatcher2 = new TransferPayloadCompositeMatcher();
payloadMatcher2.setMessageContentMatcher(dataMatcher2);
EncodedDataMatcher dataMatcher3 = new EncodedDataMatcher(payload3);
TransferPayloadCompositeMatcher payloadMatcher3 = new TransferPayloadCompositeMatcher();
payloadMatcher3.setMessageContentMatcher(dataMatcher3);
EncodedDataMatcher dataMatcher4 = new EncodedDataMatcher(payload4);
TransferPayloadCompositeMatcher payloadMatcher4 = new TransferPayloadCompositeMatcher();
payloadMatcher4.setMessageContentMatcher(dataMatcher4);
final CountDownLatch sendComplete = new CountDownLatch(1);
final AtomicBoolean sendFailed = new AtomicBoolean();
// Stream won't output until some body bytes are written.
ForkJoinPool.commonPool().execute(() -> {
try {
stream.write(payload);
} catch (IOException e) {
LOG.info("send failed with error: ", e);
sendFailed.set(true);
} finally {
sendComplete.countDown();
}
});
peer.waitForScriptToComplete(5, TimeUnit.SECONDS);
peer.expectTransfer().withPayload(payloadMatcher1).withMore(true);
peer.remoteFlow().withIncomingWindow(1).withNextIncomingId(2).withLinkCredit(10).queue();
peer.expectTransfer().withPayload(payloadMatcher2).withMore(true);
peer.remoteFlow().withIncomingWindow(1).withNextIncomingId(3).withLinkCredit(10).queue();
peer.expectTransfer().withPayload(payloadMatcher3).withMore(true);
peer.remoteFlow().withIncomingWindow(1).withNextIncomingId(4).withLinkCredit(10).queue();
peer.expectTransfer().withPayload(payloadMatcher4).withMore(true);
peer.remoteFlow().withIncomingWindow(1).withNextIncomingId(5).withLinkCredit(10).queue();
peer.expectTransfer().withNullPayload().withMore(false).accept();
peer.expectDetach().respond();
peer.expectEnd().respond();
peer.expectClose().respond();
// Initiate the above script of transfers and flows
peer.remoteFlow().withIncomingWindow(1).withNextIncomingId(1).withLinkCredit(10).now();
assertTrue(sendComplete.await(10, TimeUnit.SECONDS));
stream.close();
assertFalse(sendFailed.get());
sender.closeAsync().get();
connection.closeAsync().get();
peer.waitForScriptToComplete(5, TimeUnit.SECONDS);
}
}
@Test
void testCloseAfterSingleWriteEncodesAndCompletesTransferWhenNoStreamSizeConfigured() throws Exception {
try (ProtonTestServer peer = new ProtonTestServer()) {
peer.expectSASLAnonymousConnect();
peer.expectOpen().respond();
peer.expectBegin().respond();
peer.expectAttach().ofSender().respond();
peer.remoteFlow().withLinkCredit(1).queue();
peer.start();
URI remoteURI = peer.getServerURI();
LOG.info("Test started, peer listening on: {}", remoteURI);
Client container = Client.create();
Connection connection = container.connect(remoteURI.getHost(), remoteURI.getPort());
StreamSender sender = connection.openStreamSender("test-queue");
StreamSenderMessage tracker = sender.beginMessage();
// Populate all Header values
Header header = new Header();
header.setDurable(true);
header.setPriority((byte) 1);
header.setTimeToLive(65535);
header.setFirstAcquirer(true);
header.setDeliveryCount(2);
tracker.header(header);
OutputStreamOptions options = new OutputStreamOptions();
OutputStream stream = tracker.body(options);
HeaderMatcher headerMatcher = new HeaderMatcher(true);
headerMatcher.withDurable(true);
headerMatcher.withPriority((byte) 1);
headerMatcher.withTtl(65535);
headerMatcher.withFirstAcquirer(true);
headerMatcher.withDeliveryCount(2);
EncodedDataMatcher dataMatcher = new EncodedDataMatcher(new byte[] { 0, 1, 2, 3 });
TransferPayloadCompositeMatcher payloadMatcher = new TransferPayloadCompositeMatcher();
payloadMatcher.setHeadersMatcher(headerMatcher);
payloadMatcher.setMessageContentMatcher(dataMatcher);
peer.waitForScriptToComplete(5, TimeUnit.SECONDS);
peer.expectTransfer().withPayload(payloadMatcher).withMore(false).accept();
peer.expectDetach().respond();
peer.expectEnd().respond();
peer.expectClose().respond();
// Stream won't output until some body bytes are written.
stream.write(new byte[] { 0, 1, 2, 3 });
stream.close();
sender.closeAsync().get();
connection.closeAsync().get();
peer.waitForScriptToComplete(5, TimeUnit.SECONDS);
}
}
@Test
void testFlushAfterSecondWriteDoesNotEncodeAMQPHeaderFromConfiguration() throws Exception {
try (ProtonTestServer peer = new ProtonTestServer()) {
peer.expectSASLAnonymousConnect();
peer.expectOpen().respond();
peer.expectBegin().respond();
peer.expectAttach().ofSender().respond();
peer.remoteFlow().withLinkCredit(1).queue();
peer.start();
URI remoteURI = peer.getServerURI();
LOG.info("Test started, peer listening on: {}", remoteURI);
Client container = Client.create();
Connection connection = container.connect(remoteURI.getHost(), remoteURI.getPort());
StreamSender sender = connection.openStreamSender("test-queue");
StreamSenderMessage tracker = sender.beginMessage();
// Populate all Header values
Header header = new Header();
header.setDurable(true);
header.setPriority((byte) 1);
header.setTimeToLive(65535);
header.setFirstAcquirer(true);
header.setDeliveryCount(2);
tracker.header(header);
OutputStreamOptions options = new OutputStreamOptions();
OutputStream stream = tracker.body(options);
HeaderMatcher headerMatcher = new HeaderMatcher(true);
headerMatcher.withDurable(true);
headerMatcher.withPriority((byte) 1);
headerMatcher.withTtl(65535);
headerMatcher.withFirstAcquirer(true);
headerMatcher.withDeliveryCount(2);
EncodedDataMatcher dataMatcher1 = new EncodedDataMatcher(new byte[] { 0, 1, 2, 3 });
TransferPayloadCompositeMatcher payloadMatcher1 = new TransferPayloadCompositeMatcher();
payloadMatcher1.setHeadersMatcher(headerMatcher);
payloadMatcher1.setMessageContentMatcher(dataMatcher1);
// Second flush expectation
EncodedDataMatcher dataMatcher2 = new EncodedDataMatcher(new byte[] { 4, 5, 6, 7 });
TransferPayloadCompositeMatcher payloadMatcher2 = new TransferPayloadCompositeMatcher();
payloadMatcher2.setMessageContentMatcher(dataMatcher2);
peer.waitForScriptToComplete(5, TimeUnit.SECONDS);
peer.expectTransfer().withPayload(payloadMatcher1).withMore(true);
peer.expectTransfer().withPayload(payloadMatcher2).withMore(true);
peer.expectTransfer().withNullPayload().withMore(false).accept();
peer.expectDetach().respond();
peer.expectEnd().respond();
peer.expectClose().respond();
// Stream won't output until some body bytes are written.
stream.write(new byte[] { 0, 1, 2, 3 });
stream.flush();
// Next write should only be a single Data section
stream.write(new byte[] { 4, 5, 6, 7 });
stream.flush();
// Final Transfer that completes the Delivery
stream.close();
sender.closeAsync().get();
connection.closeAsync().get();
peer.waitForScriptToComplete(5, TimeUnit.SECONDS);
}
}
@Test
void testIncompleteStreamClosureCausesTransferAbort() throws Exception {
try (ProtonTestServer peer = new ProtonTestServer()) {
peer.expectSASLAnonymousConnect();
peer.expectOpen().respond();
peer.expectBegin().respond();
peer.expectAttach().ofSender().respond();
peer.remoteFlow().withLinkCredit(1).queue();
peer.start();
URI remoteURI = peer.getServerURI();
LOG.info("Test started, peer listening on: {}", remoteURI);
Client container = Client.create();
Connection connection = container.connect(remoteURI.getHost(), remoteURI.getPort());
StreamSender sender = connection.openStreamSender("test-queue");
StreamSenderMessage tracker = sender.beginMessage();
final byte[] payload = new byte[] { 0, 1, 2, 3 };
// Populate all Header values
Header header = new Header();
header.setDurable(true);
header.setPriority((byte) 1);
header.setDeliveryCount(1);
tracker.header(header);
OutputStreamOptions options = new OutputStreamOptions().bodyLength(8192);
OutputStream stream = tracker.body(options);
HeaderMatcher headerMatcher = new HeaderMatcher(true);
headerMatcher.withDurable(true);
headerMatcher.withPriority((byte) 1);
headerMatcher.withDeliveryCount(1);
EncodedPartialDataSectionMatcher partialDataMatcher = new EncodedPartialDataSectionMatcher(8192, payload);
TransferPayloadCompositeMatcher payloadMatcher = new TransferPayloadCompositeMatcher();
payloadMatcher.setHeadersMatcher(headerMatcher);
payloadMatcher.setMessageContentMatcher(partialDataMatcher);
peer.waitForScriptToComplete(5, TimeUnit.SECONDS);
peer.expectTransfer().withPayload(payloadMatcher);
peer.expectTransfer().withAborted(true).withNullPayload();
peer.expectDetach().respond();
peer.expectEnd().respond();
peer.expectClose().respond();
stream.write(payload);
stream.flush();
// Stream should abort the send now since the configured size wasn't sent.
stream.close();
sender.closeAsync().get();
connection.closeAsync().get();
peer.waitForScriptToComplete(5, TimeUnit.SECONDS);
}
}
@Test
void testIncompleteStreamClosureWithNoWritesAbortsTransfer() throws Exception {
try (ProtonTestServer peer = new ProtonTestServer()) {
peer.expectSASLAnonymousConnect();
peer.expectOpen().respond();
peer.expectBegin().respond();
peer.expectAttach().ofSender().respond();
peer.remoteFlow().withLinkCredit(1).queue();
peer.start();
URI remoteURI = peer.getServerURI();
LOG.info("Test started, peer listening on: {}", remoteURI);
Client container = Client.create();
Connection connection = container.connect(remoteURI.getHost(), remoteURI.getPort());
StreamSender sender = connection.openStreamSender("test-queue");
StreamSenderMessage message = sender.beginMessage();
// Populate all Header values
Header header = new Header();
header.setDurable(true);
header.setPriority((byte) 1);
header.setDeliveryCount(1);
message.header(header);
OutputStreamOptions options = new OutputStreamOptions().bodyLength(8192).completeSendOnClose(false);
OutputStream stream = message.body(options);
HeaderMatcher headerMatcher = new HeaderMatcher(true);
headerMatcher.withDurable(true);
headerMatcher.withPriority((byte) 1);
headerMatcher.withDeliveryCount(1);
TransferPayloadCompositeMatcher payloadMatcher = new TransferPayloadCompositeMatcher();
payloadMatcher.setHeadersMatcher(headerMatcher);
peer.waitForScriptToComplete(5, TimeUnit.SECONDS);
peer.expectDetach().respond();
peer.expectEnd().respond();
peer.expectClose().respond();
// This should abort the transfer as we might have triggered output upon create when the
// preamble was written.
stream.close();
assertTrue(message.aborted());
// Should have no affect.
message.abort();
sender.closeAsync().get();
connection.closeAsync().get();
peer.waitForScriptToComplete(5, TimeUnit.SECONDS);
}
}
@Test
void testCompleteStreamClosureCausesTransferCompleted() throws Exception {
try (ProtonTestServer peer = new ProtonTestServer()) {
peer.expectSASLAnonymousConnect();
peer.expectOpen().respond();
peer.expectBegin().respond();
peer.expectAttach().ofSender().respond();
peer.remoteFlow().withLinkCredit(3).queue();
peer.start();
URI remoteURI = peer.getServerURI();
LOG.info("Test started, peer listening on: {}", remoteURI);
Client container = Client.create();
Connection connection = container.connect(remoteURI.getHost(), remoteURI.getPort());
StreamSender sender = connection.openStreamSender("test-queue");
StreamSenderMessage tracker = sender.beginMessage();
final byte[] payload1 = new byte[] { 0, 1, 2, 3, 4, 5 };
final byte[] payload2 = new byte[] { 6, 7, 8, 9, 10, 11, 12, 13, 14 };
final byte[] payload3 = new byte[] { 15 };
final int payloadSize = payload1.length + payload2.length + payload3.length;
// Populate all Header values
Header header = new Header();
header.setDurable(true);
header.setPriority((byte) 1);
header.setDeliveryCount(1);
tracker.header(header);
// Populate message application properties
tracker.property("ap1", 1);
tracker.property("ap2", 2);
tracker.property("ap3", 3);
OutputStreamOptions options = new OutputStreamOptions().bodyLength(payloadSize);
OutputStream stream = tracker.body(options);
HeaderMatcher headerMatcher = new HeaderMatcher(true);
headerMatcher.withDurable(true);
headerMatcher.withPriority((byte) 1);
headerMatcher.withDeliveryCount(1);
ApplicationPropertiesMatcher apMatcher = new ApplicationPropertiesMatcher(true);
apMatcher.withEntry("ap1", Matchers.equalTo(1));
apMatcher.withEntry("ap2", Matchers.equalTo(2));
apMatcher.withEntry("ap3", Matchers.equalTo(3));
EncodedPartialDataSectionMatcher partialDataMatcher = new EncodedPartialDataSectionMatcher(payloadSize, payload1);
TransferPayloadCompositeMatcher payloadMatcher = new TransferPayloadCompositeMatcher();
payloadMatcher.setHeadersMatcher(headerMatcher);
payloadMatcher.setMessageContentMatcher(partialDataMatcher);
payloadMatcher.setApplicationPropertiesMatcher(apMatcher);
peer.waitForScriptToComplete(5, TimeUnit.SECONDS);
peer.expectTransfer().withPayload(payloadMatcher);
stream.write(payload1);
stream.flush();
peer.waitForScriptToComplete(5, TimeUnit.SECONDS);
partialDataMatcher = new EncodedPartialDataSectionMatcher(payload2);
payloadMatcher = new TransferPayloadCompositeMatcher();
payloadMatcher.setMessageContentMatcher(partialDataMatcher);
peer.expectTransfer().withMore(true).withPayload(partialDataMatcher);
stream.write(payload2);
stream.flush();
peer.waitForScriptToComplete(5, TimeUnit.SECONDS);
partialDataMatcher = new EncodedPartialDataSectionMatcher(payload3);
payloadMatcher = new TransferPayloadCompositeMatcher();
payloadMatcher.setMessageContentMatcher(partialDataMatcher);
peer.expectTransfer().withMore(false).withPayload(partialDataMatcher).accept();
peer.expectDetach().respond();
peer.expectEnd().respond();
peer.expectClose().respond();
stream.write(payload3);
stream.flush();
// Stream should already be completed so no additional frames should be written.
stream.close();
sender.closeAsync().get();
connection.closeAsync().get();
peer.waitForScriptToComplete(5, TimeUnit.SECONDS);
}
}
@Test
void testRawOutputStreamFromMessageWritesUnmodifiedBytes() throws Exception {
try (ProtonTestServer peer = new ProtonTestServer()) {
peer.expectSASLAnonymousConnect();
peer.expectOpen().respond();
peer.expectBegin().respond();
peer.expectAttach().ofSender().respond();
peer.remoteFlow().withLinkCredit(1).queue();
peer.start();
URI remoteURI = peer.getServerURI();
LOG.info("Test started, peer listening on: {}", remoteURI);
Client container = Client.create();
Connection connection = container.connect(remoteURI.getHost(), remoteURI.getPort());
StreamSender sender = connection.openStreamSender("test-queue");
StreamSenderMessage message = sender.beginMessage();
OutputStream stream = message.rawOutputStream();
// Only one writer at a time can exist
assertThrows(ClientIllegalStateException.class, () -> message.rawOutputStream());
assertThrows(ClientIllegalStateException.class, () -> message.body());
peer.waitForScriptToComplete(5, TimeUnit.SECONDS);
peer.expectTransfer().withMore(true).withPayload(new byte[] { 0, 1, 2, 3 });
peer.expectTransfer().withMore(false).withNullPayload();
peer.expectDetach().respond();
peer.expectEnd().respond();
peer.expectClose().respond();
stream.write(new byte[] { 0, 1, 2, 3 });
stream.flush();
stream.close();
sender.closeAsync().get();
connection.closeAsync().get();
peer.waitForScriptToComplete(5, TimeUnit.SECONDS);
}
}
@Test
public void testStreamSenderMessageWithDeliveryAnnotations() throws Exception {
try (ProtonTestServer peer = new ProtonTestServer()) {
peer.expectSASLAnonymousConnect();
peer.expectOpen().respond();
peer.expectBegin().respond();
peer.expectAttach().ofSender().respond();
peer.remoteFlow().withLinkCredit(10).queue();
peer.start();
URI remoteURI = peer.getServerURI();
LOG.info("Sender test started, peer listening on: {}", remoteURI);
Client container = Client.create();
Connection connection = container.connect(remoteURI.getHost(), remoteURI.getPort()).openFuture().get();
// Populate delivery annotations
final Map<String, Object> deliveryAnnotations = new HashMap<>();
deliveryAnnotations.put("da1", 1);
deliveryAnnotations.put("da2", 2);
deliveryAnnotations.put("da3", 3);
StreamSender sender = connection.openStreamSender("test-queue");
StreamSenderMessage message = sender.beginMessage(deliveryAnnotations);
final byte[] payload = new byte[] { 0, 1, 2, 3, 4, 5 };
HeaderMatcher headerMatcher = new HeaderMatcher(true);
headerMatcher.withDurable(true);
headerMatcher.withPriority((byte) 1);
headerMatcher.withTtl(65535);
headerMatcher.withFirstAcquirer(true);
headerMatcher.withDeliveryCount(2);
PropertiesMatcher propertiesMatcher = new PropertiesMatcher(true);
propertiesMatcher.withMessageId("ID:12345");
propertiesMatcher.withUserId("user".getBytes(StandardCharsets.UTF_8));
propertiesMatcher.withTo("the-management");
propertiesMatcher.withSubject("amqp");
propertiesMatcher.withReplyTo("the-minions");
propertiesMatcher.withCorrelationId("abc");
propertiesMatcher.withContentEncoding("application/json");
propertiesMatcher.withContentType("gzip");
propertiesMatcher.withAbsoluteExpiryTime(123);
propertiesMatcher.withCreationTime(1);
propertiesMatcher.withGroupId("disgruntled");
propertiesMatcher.withGroupSequence(8192);
propertiesMatcher.withReplyToGroupId("/dev/null");
DeliveryAnnotationsMatcher daMatcher = new DeliveryAnnotationsMatcher(true);
daMatcher.withEntry("da1", Matchers.equalTo(1));
daMatcher.withEntry("da2", Matchers.equalTo(2));
daMatcher.withEntry("da3", Matchers.equalTo(3));
MessageAnnotationsMatcher maMatcher = new MessageAnnotationsMatcher(true);
maMatcher.withEntry("ma1", Matchers.equalTo(1));
maMatcher.withEntry("ma2", Matchers.equalTo(2));
maMatcher.withEntry("ma3", Matchers.equalTo(3));
ApplicationPropertiesMatcher apMatcher = new ApplicationPropertiesMatcher(true);
apMatcher.withEntry("ap1", Matchers.equalTo(1));
apMatcher.withEntry("ap2", Matchers.equalTo(2));
apMatcher.withEntry("ap3", Matchers.equalTo(3));
EncodedDataMatcher bodyMatcher = new EncodedDataMatcher(payload);
TransferPayloadCompositeMatcher payloadMatcher = new TransferPayloadCompositeMatcher();
payloadMatcher.setHeadersMatcher(headerMatcher);
payloadMatcher.setDeliveryAnnotationsMatcher(daMatcher);
payloadMatcher.setMessageAnnotationsMatcher(maMatcher);
payloadMatcher.setPropertiesMatcher(propertiesMatcher);
payloadMatcher.setApplicationPropertiesMatcher(apMatcher);
payloadMatcher.setMessageContentMatcher(bodyMatcher);
peer.waitForScriptToComplete(5, TimeUnit.SECONDS);
peer.expectTransfer().withPayload(payloadMatcher).withMore(false).accept();
// Populate all Header values
message.durable(true);
assertEquals(true, message.durable());
message.priority((byte) 1);
assertEquals(1, message.priority());
message.timeToLive(65535);
assertEquals(65535, message.timeToLive());
message.firstAcquirer(true);
assertTrue(message.firstAcquirer());
message.deliveryCount(2);
assertEquals(2, message.deliveryCount());
// Populate message annotations
assertFalse(message.hasAnnotations());
assertFalse(message.hasAnnotation("ma1"));
message.annotation("ma1", 1);
assertTrue(message.hasAnnotation("ma1"));
assertEquals(1, message.annotation("ma1"));
message.annotation("ma2", 2);
assertEquals(2, message.annotation("ma2"));
message.annotation("ma3", 3);
assertEquals(3, message.annotation("ma3"));
assertTrue(message.hasAnnotations());
// Populate all Properties values
message.messageId("ID:12345");
assertEquals("ID:12345", message.messageId());
message.userId("user".getBytes(StandardCharsets.UTF_8));
assertArrayEquals("user".getBytes(StandardCharsets.UTF_8), message.userId());
message.to("the-management");
assertEquals("the-management", message.to());
message.subject("amqp");
assertEquals("amqp", message.subject());
message.replyTo("the-minions");
assertEquals("the-minions", message.replyTo());
message.correlationId("abc");
assertEquals("abc", message.correlationId());
message.contentEncoding("application/json");
assertEquals("application/json", message.contentEncoding());
message.contentType("gzip");
assertEquals("gzip", message.contentType());
message.absoluteExpiryTime(123);
assertEquals(123, message.absoluteExpiryTime());
message.creationTime(1);
assertEquals(1, message.creationTime());
message.groupId("disgruntled");
assertEquals("disgruntled", message.groupId());
message.groupSequence(8192);
assertEquals(8192, message.groupSequence());
message.replyToGroupId("/dev/null");
assertEquals("/dev/null", message.replyToGroupId());
// Populate message application properties
assertFalse(message.hasProperties());
assertFalse(message.hasProperty("ma1"));
message.property("ap1", 1);
assertEquals(1, message.property("ap1"));
assertTrue(message.hasProperty("ap1"));
message.property("ap2", 2);
assertEquals(2, message.property("ap2"));
message.property("ap3", 3);
assertEquals(3, message.property("ap3"));
assertTrue(message.hasProperties());
OutputStream stream = message.body();
stream.write(payload);
stream.close();
peer.waitForScriptToComplete(5, TimeUnit.SECONDS);
peer.expectDetach().respond();
peer.expectEnd().respond();
peer.expectClose().respond();
assertNotNull(message.tracker());
Wait.assertTrue(() -> message.tracker().settlementFuture().isDone());
assertTrue(message.tracker().settlementFuture().get().settled());
sender.closeAsync().get(10, TimeUnit.SECONDS);
connection.closeAsync().get(10, TimeUnit.SECONDS);
peer.waitForScriptToComplete(5, TimeUnit.SECONDS);
}
}
@Test
public void testStreamSenderWritesFooterAfterStreamClosed() throws Exception {
try (ProtonTestServer peer = new ProtonTestServer()) {
peer.expectSASLAnonymousConnect();
peer.expectOpen().respond();
peer.expectBegin().respond();
peer.expectAttach().ofSender().respond();
peer.remoteFlow().withLinkCredit(10).queue();
peer.start();
URI remoteURI = peer.getServerURI();
LOG.info("Sender test started, peer listening on: {}", remoteURI);
Client container = Client.create();
Connection connection = container.connect(remoteURI.getHost(), remoteURI.getPort()).openFuture().get();
StreamSender sender = connection.openStreamSender("test-queue");
StreamSenderMessage message = sender.beginMessage();
final byte[] payload = new byte[] { 0, 1, 2, 3, 4, 5 };
// First frame should include only the bits up to the body
HeaderMatcher headerMatcher = new HeaderMatcher(true);
headerMatcher.withDurable(true);
headerMatcher.withPriority((byte) 1);
headerMatcher.withTtl(65535);
headerMatcher.withFirstAcquirer(true);
headerMatcher.withDeliveryCount(2);
ApplicationPropertiesMatcher apMatcher = new ApplicationPropertiesMatcher(true);
apMatcher.withEntry("ap1", Matchers.equalTo(1));
apMatcher.withEntry("ap2", Matchers.equalTo(2));
apMatcher.withEntry("ap3", Matchers.equalTo(3));
FooterMatcher footerMatcher = new FooterMatcher(false);
footerMatcher.withEntry("f1", Matchers.equalTo(1));
footerMatcher.withEntry("f2", Matchers.equalTo(2));
footerMatcher.withEntry("f3", Matchers.equalTo(3));
EncodedDataMatcher bodyMatcher = new EncodedDataMatcher(payload, true);
TransferPayloadCompositeMatcher payloadMatcher = new TransferPayloadCompositeMatcher();
payloadMatcher.setHeadersMatcher(headerMatcher);
payloadMatcher.setApplicationPropertiesMatcher(apMatcher);
payloadMatcher.setMessageContentMatcher(bodyMatcher);
payloadMatcher.setFootersMatcher(footerMatcher);
peer.waitForScriptToComplete(5, TimeUnit.SECONDS);
peer.expectTransfer().withPayload(payloadMatcher).withMore(false).accept();
// Populate all Header values
message.durable(true);
message.priority((byte) 1);
message.timeToLive(65535);
message.firstAcquirer(true);
message.deliveryCount(2);
// Populate message application properties
message.property("ap1", 1);
message.property("ap2", 2);
message.property("ap3", 3);
// Populate message footers
assertFalse(message.hasFooters());
assertFalse(message.hasFooter("f1"));
message.footer("f1", 1);
message.footer("f2", 2);
message.footer("f3", 3);
assertTrue(message.hasFooter("f1"));
assertTrue(message.hasFooters());
OutputStreamOptions bodyOptions = new OutputStreamOptions().completeSendOnClose(true);
OutputStream stream = message.body(bodyOptions);
assertThrows(ClientUnsupportedOperationException.class, () -> message.encode(Collections.emptyMap()));
stream.write(payload);
stream.close();
assertThrows(ClientIllegalStateException.class, () -> message.footer(new Footer(Collections.emptyMap())));
peer.waitForScriptToComplete(5, TimeUnit.SECONDS);
peer.expectDetach().respond();
peer.expectEnd().respond();
peer.expectClose().respond();
assertNotNull(message.tracker());
Wait.assertTrue(() -> message.tracker().settlementFuture().isDone());
assertTrue(message.tracker().settlementFuture().get().settled());
sender.closeAsync().get(10, TimeUnit.SECONDS);
connection.closeAsync().get(10, TimeUnit.SECONDS);
peer.waitForScriptToComplete(5, TimeUnit.SECONDS);
}
}
@Test
public void testStreamSenderWritesFooterAfterMessageCompleted() throws Exception {
try (ProtonTestServer peer = new ProtonTestServer()) {
peer.expectSASLAnonymousConnect();
peer.expectOpen().respond();
peer.expectBegin().respond();
peer.expectAttach().ofSender().respond();
peer.remoteFlow().withLinkCredit(10).queue();
peer.start();
URI remoteURI = peer.getServerURI();
LOG.info("Sender test started, peer listening on: {}", remoteURI);
Client container = Client.create();
Connection connection = container.connect(remoteURI.getHost(), remoteURI.getPort()).openFuture().get();
StreamSender sender = connection.openStreamSender("test-queue");
StreamSenderMessage message = sender.beginMessage();
final byte[] payload = new byte[] { 0, 1, 2, 3, 4, 5 };
// First frame should include only the bits up to the body
HeaderMatcher headerMatcher = new HeaderMatcher(true);
headerMatcher.withDurable(true);
headerMatcher.withPriority((byte) 1);
headerMatcher.withTtl(65535);
headerMatcher.withFirstAcquirer(true);
headerMatcher.withDeliveryCount(2);
ApplicationPropertiesMatcher apMatcher = new ApplicationPropertiesMatcher(true);
apMatcher.withEntry("ap1", Matchers.equalTo(1));
apMatcher.withEntry("ap2", Matchers.equalTo(2));
apMatcher.withEntry("ap3", Matchers.equalTo(3));
EncodedDataMatcher bodyMatcher = new EncodedDataMatcher(payload);
TransferPayloadCompositeMatcher payloadMatcher = new TransferPayloadCompositeMatcher();
payloadMatcher.setHeadersMatcher(headerMatcher);
payloadMatcher.setApplicationPropertiesMatcher(apMatcher);
payloadMatcher.setMessageContentMatcher(bodyMatcher);
// Second Frame should contains the appended footers
FooterMatcher footerMatcher = new FooterMatcher(false);
footerMatcher.withEntry("f1", Matchers.equalTo(1));
footerMatcher.withEntry("f2", Matchers.equalTo(2));
footerMatcher.withEntry("f3", Matchers.equalTo(3));
TransferPayloadCompositeMatcher payloadFooterMatcher = new TransferPayloadCompositeMatcher();
payloadFooterMatcher.setFootersMatcher(footerMatcher);
peer.waitForScriptToComplete(5, TimeUnit.SECONDS);
peer.expectTransfer().withPayload(payloadMatcher).withMore(true);
peer.expectTransfer().withPayload(payloadFooterMatcher).withMore(false).accept();
// Populate all Header values
message.durable(true);
message.priority((byte) 1);
message.timeToLive(65535);
message.firstAcquirer(true);
message.deliveryCount(2);
// Populate message application properties
message.property("ap1", 1);
message.property("ap2", 2);
message.property("ap3", 3);
OutputStreamOptions bodyOptions = new OutputStreamOptions().completeSendOnClose(false);
OutputStream stream = message.body(bodyOptions);
stream.write(payload);
stream.close();
// Populate message footers
message.footer("f1", 1);
message.footer("f2", 2);
message.footer("f3", 3);
message.complete();
peer.waitForScriptToComplete(5, TimeUnit.SECONDS);
peer.expectDetach().respond();
peer.expectEnd().respond();
peer.expectClose().respond();
assertNotNull(message.tracker());
Wait.assertTrue(() -> message.tracker().settlementFuture().isDone());
assertTrue(message.tracker().settlementFuture().get().settled());
sender.closeAsync().get(10, TimeUnit.SECONDS);
connection.closeAsync().get(10, TimeUnit.SECONDS);
peer.waitForScriptToComplete(5, TimeUnit.SECONDS);
}
}
@Test
void testAutoFlushDuringMessageSendThatExceedConfiguredBufferLimitSessionCreditLimitOnTransfer() throws Exception {
try (ProtonTestServer peer = new ProtonTestServer()) {
peer.expectSASLAnonymousConnect();
peer.expectOpen().respond();
peer.expectBegin().respond();
peer.expectAttach().ofSender().respond();
peer.start();
URI remoteURI = peer.getServerURI();
LOG.info("Test started, peer listening on: {}", remoteURI);
Client container = Client.create();
ConnectionOptions options = new ConnectionOptions().maxFrameSize(1024);
Connection connection = container.connect(remoteURI.getHost(), remoteURI.getPort(), options);
StreamSender sender = connection.openStreamSender("test-queue");
final byte[] payload = new byte[4800];
Arrays.fill(payload, (byte) 1);
final AtomicBoolean sendFailed = new AtomicBoolean();
ForkJoinPool.commonPool().execute(() -> {
try {
sender.send(Message.create(payload));
} catch (Exception e) {
LOG.info("send failed with error: ", e);
sendFailed.set(true);
}
});
peer.waitForScriptToComplete(5, TimeUnit.SECONDS);
peer.remoteFlow().withIncomingWindow(1).withNextIncomingId(1).withLinkCredit(10).now();
peer.expectTransfer().withNonNullPayload().withMore(true);
peer.remoteFlow().withIncomingWindow(1).withNextIncomingId(2).withLinkCredit(10).queue();
peer.expectTransfer().withNonNullPayload().withMore(true);
peer.remoteFlow().withIncomingWindow(1).withNextIncomingId(3).withLinkCredit(10).queue();
peer.expectTransfer().withNonNullPayload().withMore(true);
peer.remoteFlow().withIncomingWindow(1).withNextIncomingId(4).withLinkCredit(10).queue();
peer.expectTransfer().withNonNullPayload().withMore(true);
peer.remoteFlow().withIncomingWindow(1).withNextIncomingId(5).withLinkCredit(10).queue();
peer.expectTransfer().withNonNullPayload().withMore(false).accept();
peer.waitForScriptToComplete(5, TimeUnit.SECONDS);
peer.expectDetach().respond();
peer.expectEnd().respond();
peer.expectClose().respond();
assertFalse(sendFailed.get());
sender.closeAsync().get();
connection.closeAsync().get();
peer.waitForScriptToComplete(5, TimeUnit.SECONDS);
}
}
@Test
void testConcurrentMessageSendOnlyBlocksForInitialSendInProgress() throws Exception {
try (ProtonTestServer peer = new ProtonTestServer()) {
peer.expectSASLAnonymousConnect();
peer.expectOpen().respond();
peer.expectBegin().respond();
peer.expectAttach().ofSender().respond();
peer.remoteFlow().withLinkCredit(2).queue();
peer.expectBegin().respond();
peer.expectAttach().ofReceiver().respond();
peer.expectFlow();
peer.expectTransfer().withNonNullPayload().withMore(false).respond().withSettled(true).withState().accepted();
peer.expectTransfer().withNonNullPayload().withMore(false).respond().withSettled(true).withState().accepted();
peer.start();
URI remoteURI = peer.getServerURI();
LOG.info("Test started, peer listening on: {}", remoteURI);
Client container = Client.create();
Connection connection = container.connect(remoteURI.getHost(), remoteURI.getPort());
StreamSender sender = connection.openStreamSender("test-queue");
sender.openFuture().get();
// Ensure that sender gets its flow before the sends are triggered.
connection.openReceiver("test-queue").openFuture().get();
final byte[] payload = new byte[1024];
Arrays.fill(payload, (byte) 1);
// One should block on the send waiting for the others send to finish
// otherwise they should not care about concurrency of sends.
final AtomicBoolean sendFailed = new AtomicBoolean();
ForkJoinPool.commonPool().execute(() -> {
try {
LOG.info("Test send 1 is preparing to fire:");
StreamTracker tracker = sender.send(Message.create(payload));
tracker.awaitSettlement(10, TimeUnit.SECONDS);
} catch (Exception e) {
LOG.info("Test send 1 failed with error: ", e);
sendFailed.set(true);
}
});
ForkJoinPool.commonPool().execute(() -> {
try {
LOG.info("Test send 2 is preparing to fire:");
StreamTracker tracker = sender.send(Message.create(payload));
tracker.awaitSettlement(10, TimeUnit.SECONDS);
} catch (Exception e) {
LOG.info("Test send 2 failed with error: ", e);
sendFailed.set(true);
}
});
peer.waitForScriptToComplete(5, TimeUnit.SECONDS);
peer.expectDetach().respond();
peer.expectEnd().respond();
peer.expectClose().respond();
assertFalse(sendFailed.get());
sender.closeAsync().get();
connection.closeAsync().get();
peer.waitForScriptToComplete(5, TimeUnit.SECONDS);
}
}
@Test
void testConcurrentMessageSendsBlocksBehindSendWaitingForCredit() throws Exception {
try (ProtonTestServer peer = new ProtonTestServer()) {
peer.expectSASLAnonymousConnect();
peer.expectOpen().respond();
peer.expectBegin().respond();
peer.expectAttach().ofSender().respond();
peer.start();
URI remoteURI = peer.getServerURI();
LOG.info("Test started, peer listening on: {}", remoteURI);
Client container = Client.create();
Connection connection = container.connect(remoteURI.getHost(), remoteURI.getPort());
StreamSender sender = connection.openStreamSender("test-queue");
final byte[] payload = new byte[1024];
Arrays.fill(payload, (byte) 1);
final CountDownLatch send1Started = new CountDownLatch(1);
final CountDownLatch send2Completed = new CountDownLatch(1);
final AtomicBoolean sendFailed = new AtomicBoolean();
ForkJoinPool.commonPool().execute(() -> {
try {
LOG.info("Test send 1 is preparing to fire:");
ForkJoinPool.commonPool().execute(() -> send1Started.countDown());
sender.send(Message.create(payload));
} catch (Exception e) {
LOG.info("Test send 1 failed with error: ", e);
sendFailed.set(true);
}
});
ForkJoinPool.commonPool().execute(() -> {
try {
assertTrue(send1Started.await(10, TimeUnit.SECONDS));
LOG.info("Test send 2 is preparing to fire:");
StreamTracker tracker = sender.send(Message.create(payload));
tracker.awaitSettlement(10, TimeUnit.SECONDS);
send2Completed.countDown();
} catch (Exception e) {
LOG.info("Test send 2 failed with error: ", e);
sendFailed.set(true);
}
});
peer.waitForScriptToComplete(5, TimeUnit.SECONDS);
peer.remoteFlow().withIncomingWindow(1).withDeliveryCount(0).withNextIncomingId(1).withLinkCredit(1).now();
peer.expectTransfer().withNonNullPayload().withMore(false).respond().withSettled(true).withState().accepted();
peer.remoteFlow().withIncomingWindow(1).withDeliveryCount(1).withNextIncomingId(2).withLinkCredit(1).queue();
peer.expectTransfer().withNonNullPayload().withMore(false).respond().withSettled(true).withState().accepted();
assertTrue(send2Completed.await(10, TimeUnit.SECONDS));
peer.waitForScriptToComplete(5, TimeUnit.SECONDS);
peer.expectDetach().respond();
peer.expectEnd().respond();
peer.expectClose().respond();
assertFalse(sendFailed.get());
sender.closeAsync().get();
connection.closeAsync().get();
peer.waitForScriptToComplete(5, TimeUnit.SECONDS);
}
}
@Test
void testConcurrentMessageSendWaitingOnSplitFramedSendToCompleteIsSentAfterCreditUpdated() throws Exception {
try (ProtonTestServer peer = new ProtonTestServer()) {
peer.expectSASLAnonymousConnect();
peer.expectOpen().respond();
peer.expectBegin().respond();
peer.expectAttach().ofSender().respond();
peer.start();
URI remoteURI = peer.getServerURI();
LOG.info("Test started, peer listening on: {}", remoteURI);
Client container = Client.create();
ConnectionOptions options = new ConnectionOptions().maxFrameSize(1024);
Connection connection = container.connect(remoteURI.getHost(), remoteURI.getPort(), options);
StreamSender sender = connection.openStreamSender("test-queue");
final byte[] payload = new byte[1536];
Arrays.fill(payload, (byte) 1);
final CountDownLatch send1Started = new CountDownLatch(1);
final CountDownLatch send2Completed = new CountDownLatch(1);
final AtomicBoolean sendFailed = new AtomicBoolean();
ForkJoinPool.commonPool().execute(() -> {
try {
LOG.info("Test send 1 is preparing to fire:");
ForkJoinPool.commonPool().execute(() -> send1Started.countDown());
sender.send(Message.create(payload));
} catch (Exception e) {
LOG.info("Test send 1 failed with error: ", e);
sendFailed.set(true);
}
});
ForkJoinPool.commonPool().execute(() -> {
try {
assertTrue(send1Started.await(10, TimeUnit.SECONDS));
LOG.info("Test send 2 is preparing to fire:");
StreamTracker tracker = sender.send(Message.create(payload));
tracker.awaitSettlement(10, TimeUnit.SECONDS);
send2Completed.countDown();
} catch (Exception e) {
LOG.info("Test send 2 failed with error: ", e);
sendFailed.set(true);
}
});
peer.waitForScriptToComplete(5, TimeUnit.SECONDS);
peer.remoteFlow().withIncomingWindow(1).withDeliveryCount(0).withNextIncomingId(1).withLinkCredit(1).now();
peer.expectTransfer().withNonNullPayload().withMore(true);
peer.remoteFlow().withIncomingWindow(1).withDeliveryCount(0).withNextIncomingId(2).withLinkCredit(1).queue();
peer.expectTransfer().withNonNullPayload().withMore(false).respond().withSettled(true).withState().accepted();
peer.remoteFlow().withIncomingWindow(1).withDeliveryCount(1).withNextIncomingId(3).withLinkCredit(1).queue();
peer.expectTransfer().withNonNullPayload().withMore(true);
peer.remoteFlow().withIncomingWindow(1).withDeliveryCount(1).withNextIncomingId(4).withLinkCredit(1).queue();
peer.expectTransfer().withNonNullPayload().withMore(false).respond().withSettled(true).withState().accepted();
assertTrue(send2Completed.await(10, TimeUnit.SECONDS));
peer.waitForScriptToComplete(5, TimeUnit.SECONDS);
peer.expectDetach().respond();
peer.expectEnd().respond();
peer.expectClose().respond();
assertFalse(sendFailed.get());
sender.closeAsync().get();
connection.closeAsync().get();
peer.waitForScriptToComplete(5, TimeUnit.SECONDS);
}
}
@Test
void testMessageSendWhileStreamSendIsOpenShouldBlock() throws Exception {
try (ProtonTestServer peer = new ProtonTestServer()) {
peer.expectSASLAnonymousConnect();
peer.expectOpen().respond();
peer.expectBegin().respond();
peer.expectAttach().ofSender().respond();
peer.remoteFlow().withLinkCredit(1).queue();
peer.start();
URI remoteURI = peer.getServerURI();
final byte[] payload = new byte[1536];
Arrays.fill(payload, (byte) 1);
LOG.info("Test started, peer listening on: {}", remoteURI);
Client container = Client.create();
Connection connection = container.connect(remoteURI.getHost(), remoteURI.getPort());
StreamSender sender = connection.openStreamSender("test-queue");
StreamSenderMessage message = sender.beginMessage();
OutputStreamOptions options = new OutputStreamOptions().bodyLength(8192).completeSendOnClose(false);
OutputStream stream = message.body(options);
final CountDownLatch sendStarted = new CountDownLatch(1);
final CountDownLatch sendCompleted = new CountDownLatch(1);
final AtomicBoolean sendFailed = new AtomicBoolean();
ForkJoinPool.commonPool().execute(() -> {
try {
LOG.info("Test send 1 is preparing to fire:");
sendStarted.countDown();
sender.send(Message.create(payload));
sendCompleted.countDown();
} catch (Exception e) {
LOG.info("Test send 1 failed with error: ", e);
sendFailed.set(true);
}
});
EncodedDataMatcher bodyMatcher = new EncodedDataMatcher(payload);
TransferPayloadCompositeMatcher payloadMatcher = new TransferPayloadCompositeMatcher();
payloadMatcher.setMessageContentMatcher(bodyMatcher);
peer.waitForScriptToComplete(5, TimeUnit.SECONDS);
peer.expectTransfer().withPayload(payloadMatcher).accept();
peer.expectDetach().respond();
peer.expectEnd().respond();
peer.expectClose().respond();
assertTrue(sendStarted.await(10, TimeUnit.SECONDS));
// This should abort the streamed send as we provided a size for the body.
stream.close();
assertTrue(message.aborted());
assertTrue(sendCompleted.await(100, TimeUnit.SECONDS));
assertThrows(ClientIllegalStateException.class, () -> message.rawOutputStream());
assertThrows(ClientIllegalStateException.class, () -> message.body());
sender.closeAsync().get();
connection.closeAsync().get();
peer.waitForScriptToComplete(5, TimeUnit.SECONDS);
}
}
@Test
public void testStreamSenderSessionCannotCreateNewResources() throws Exception {
try (ProtonTestServer peer = new ProtonTestServer()) {
peer.expectSASLAnonymousConnect();
peer.expectOpen().respond();
peer.expectBegin().respond();
peer.expectAttach().ofSender().respond();
peer.start();
URI remoteURI = peer.getServerURI();
LOG.info("Test started, peer listening on: {}", remoteURI);
Client container = Client.create();
Connection connection = container.connect(remoteURI.getHost(), remoteURI.getPort());
StreamSender sender = connection.openStreamSender("test-queue");
assertThrows(ClientUnsupportedOperationException.class, () -> sender.session().openReceiver("test"));
assertThrows(ClientUnsupportedOperationException.class, () -> sender.session().openReceiver("test", new ReceiverOptions()));
assertThrows(ClientUnsupportedOperationException.class, () -> sender.session().openDurableReceiver("test", "test"));
assertThrows(ClientUnsupportedOperationException.class, () -> sender.session().openDurableReceiver("test", "test", new ReceiverOptions()));
assertThrows(ClientUnsupportedOperationException.class, () -> sender.session().openDynamicReceiver());
assertThrows(ClientUnsupportedOperationException.class, () -> sender.session().openDynamicReceiver(new HashMap<>()));
assertThrows(ClientUnsupportedOperationException.class, () -> sender.session().openDynamicReceiver(new ReceiverOptions()));
assertThrows(ClientUnsupportedOperationException.class, () -> sender.session().openDynamicReceiver(new HashMap<>(), new ReceiverOptions()));
assertThrows(ClientUnsupportedOperationException.class, () -> sender.session().openSender("test"));
assertThrows(ClientUnsupportedOperationException.class, () -> sender.session().openSender("test", new SenderOptions()));
assertThrows(ClientUnsupportedOperationException.class, () -> sender.session().openAnonymousSender());
assertThrows(ClientUnsupportedOperationException.class, () -> sender.session().openAnonymousSender(new SenderOptions()));
peer.waitForScriptToComplete(5, TimeUnit.SECONDS);
peer.expectDetach().respond();
peer.expectEnd().respond();
peer.expectClose().respond();
sender.close();
connection.close();
peer.waitForScriptToComplete(5, TimeUnit.SECONDS);
}
}
@Test
void testStreamMessageWaitingOnCreditWritesWhileCompleteSendWaitsInQueue() throws Exception {
try (ProtonTestServer peer = new ProtonTestServer()) {
peer.expectSASLAnonymousConnect();
peer.expectOpen().respond();
peer.expectBegin().respond();
peer.expectAttach().ofSender().respond();
peer.start();
URI remoteURI = peer.getServerURI();
LOG.info("Test started, peer listening on: {}", remoteURI);
Client container = Client.create();
Connection connection = container.connect(remoteURI.getHost(), remoteURI.getPort());
StreamSender sender = connection.openStreamSender("test-queue");
StreamSenderMessage tracker = sender.beginMessage();
OutputStream stream = tracker.body();
final byte[] payload1 = new byte[256];
Arrays.fill(payload1, (byte) 1);
final byte[] payload2 = new byte[256];
Arrays.fill(payload2, (byte) 2);
final byte[] payload3 = new byte[256];
Arrays.fill(payload3, (byte) 3);
EncodedDataMatcher dataMatcher1 = new EncodedDataMatcher(payload1);
TransferPayloadCompositeMatcher payloadMatcher1 = new TransferPayloadCompositeMatcher();
payloadMatcher1.setMessageContentMatcher(dataMatcher1);
EncodedDataMatcher dataMatcher2 = new EncodedDataMatcher(payload2);
TransferPayloadCompositeMatcher payloadMatcher2 = new TransferPayloadCompositeMatcher();
payloadMatcher2.setMessageContentMatcher(dataMatcher2);
EncodedDataMatcher dataMatcher3 = new EncodedDataMatcher(payload3);
TransferPayloadCompositeMatcher payloadMatcher3 = new TransferPayloadCompositeMatcher();
payloadMatcher3.setMessageContentMatcher(dataMatcher3);
final AtomicBoolean sendFailed = new AtomicBoolean();
final CountDownLatch streamSend1Complete = new CountDownLatch(1);
// Stream won't output until some body bytes are written.
ForkJoinPool.commonPool().execute(() -> {
try {
stream.write(payload1);
stream.flush();
} catch (IOException e) {
LOG.info("send failed with error: ", e);
sendFailed.set(true);
} finally {
streamSend1Complete.countDown();
}
});
peer.waitForScriptToComplete(5, TimeUnit.SECONDS);
peer.expectTransfer().withPayload(payloadMatcher1).withMore(true);
// Now trigger the next send by granting credit for payload 1
peer.remoteFlow().withIncomingWindow(1).withNextIncomingId(1).withLinkCredit(10).now();
assertTrue(streamSend1Complete.await(5, TimeUnit.SECONDS), "Stream sender completed first send");
assertFalse(sendFailed.get());
final CountDownLatch sendStarted = new CountDownLatch(1);
final CountDownLatch sendCompleted = new CountDownLatch(1);
ForkJoinPool.commonPool().execute(() -> {
try {
LOG.info("Test send 1 is preparing to fire:");
sendStarted.countDown();
sender.send(Message.create(payload3));
} catch (Exception e) {
LOG.info("Test send 1 failed with error: ", e);
sendFailed.set(true);
} finally {
sendCompleted.countDown();
}
});
assertTrue(sendStarted.await(10, TimeUnit.SECONDS));
peer.waitForScriptToComplete(5, TimeUnit.SECONDS);
peer.expectTransfer().withPayload(payloadMatcher2).withMore(true);
// Queue a flow that will allow send by granting credit for payload 3 via sender.send
peer.remoteFlow().withIncomingWindow(1).withNextIncomingId(3).withLinkCredit(10).queue();
// Now trigger the next send by granting credit for payload 2
peer.remoteFlow().withIncomingWindow(1).withNextIncomingId(2).withLinkCredit(10).now();
stream.write(payload2);
stream.flush();
peer.waitForScriptToComplete(5, TimeUnit.SECONDS);
peer.expectTransfer().withNullPayload().withMore(false).accept();
peer.remoteFlow().withIncomingWindow(1).withNextIncomingId(4).withLinkCredit(10).queue();
peer.expectTransfer().withPayload(payloadMatcher3).withMore(false);
peer.expectDetach().respond();
peer.expectEnd().respond();
peer.expectClose().respond();
stream.close();
assertTrue(sendCompleted.await(100, TimeUnit.SECONDS));
assertFalse(sendFailed.get());
sender.closeAsync().get();
connection.closeAsync().get();
peer.waitForScriptToComplete(5, TimeUnit.SECONDS);
}
}
@Test
void testWriteToCreditLimitFramesOfMessagePayloadOneBytePerWrite() throws Exception {
final int WRITE_COUNT = 10;
try (ProtonTestServer peer = new ProtonTestServer()) {
peer.expectSASLAnonymousConnect();
peer.expectOpen().respond();
peer.expectBegin().respond();
peer.expectAttach().ofSender().respond();
peer.remoteFlow().withIncomingWindow(WRITE_COUNT).withNextIncomingId(0).withLinkCredit(WRITE_COUNT).queue();
peer.start();
URI remoteURI = peer.getServerURI();
LOG.info("Test started, peer listening on: {}", remoteURI);
Client container = Client.create();
Connection connection = container.connect(remoteURI.getHost(), remoteURI.getPort());
StreamSender sender = connection.openStreamSender("test-queue");
StreamSenderMessage tracker = sender.beginMessage();
OutputStream stream = tracker.body();
peer.waitForScriptToComplete(5, TimeUnit.SECONDS);
final byte[][] payloads = new byte[WRITE_COUNT][256];
for (int i = 0; i < WRITE_COUNT; ++i) {
payloads[i] = new byte[256];
Arrays.fill(payloads[i], (byte)(i + 1));
}
for (int i = 0; i < WRITE_COUNT; ++i) {
EncodedDataMatcher dataMatcher = new EncodedDataMatcher(payloads[i]);
TransferPayloadCompositeMatcher payloadMatcher = new TransferPayloadCompositeMatcher();
payloadMatcher.setMessageContentMatcher(dataMatcher);
peer.expectTransfer().withPayload(payloadMatcher).withMore(true);
}
for (int i = 0; i < WRITE_COUNT; ++i) {
for (byte value : payloads[i]) {
stream.write(value);
}
stream.flush();
}
peer.waitForScriptToComplete(5, TimeUnit.SECONDS);
peer.expectTransfer().withNullPayload().withMore(false).accept();
// grant one more credit for the complete to arrive.
peer.remoteFlow().withIncomingWindow(1).withNextIncomingId(WRITE_COUNT).withLinkCredit(1).later(10);
stream.close();
peer.waitForScriptToComplete(5, TimeUnit.SECONDS);
peer.expectDetach().respond();
peer.expectEnd().respond();
peer.expectClose().respond();
sender.closeAsync().get();
connection.closeAsync().get();
peer.waitForScriptToComplete(5, TimeUnit.SECONDS);
}
}
@Test
void testWriteToCreditLimitFramesOfMessagePayload() throws Exception {
final int WRITE_COUNT = 10;
try (ProtonTestServer peer = new ProtonTestServer()) {
peer.expectSASLAnonymousConnect();
peer.expectOpen().respond();
peer.expectBegin().respond();
peer.expectAttach().ofSender().respond();
peer.remoteFlow().withIncomingWindow(WRITE_COUNT).withNextIncomingId(0).withLinkCredit(WRITE_COUNT).queue();
peer.start();
URI remoteURI = peer.getServerURI();
LOG.info("Test started, peer listening on: {}", remoteURI);
Client container = Client.create();
Connection connection = container.connect(remoteURI.getHost(), remoteURI.getPort());
StreamSender sender = connection.openStreamSender("test-queue");
StreamSenderMessage tracker = sender.beginMessage();
OutputStream stream = tracker.body();
peer.waitForScriptToComplete(5, TimeUnit.SECONDS);
final byte[][] payloads = new byte[WRITE_COUNT][256];
for (int i = 0; i < WRITE_COUNT; ++i) {
payloads[i] = new byte[256];
Arrays.fill(payloads[i], (byte)(i + 1));
}
for (int i = 0; i < WRITE_COUNT; ++i) {
EncodedDataMatcher dataMatcher = new EncodedDataMatcher(payloads[i]);
TransferPayloadCompositeMatcher payloadMatcher = new TransferPayloadCompositeMatcher();
payloadMatcher.setMessageContentMatcher(dataMatcher);
peer.expectTransfer().withPayload(payloadMatcher).withMore(true);
}
for (int i = 0; i < WRITE_COUNT; ++i) {
stream.write(payloads[i]);
stream.flush();
}
peer.waitForScriptToComplete(5, TimeUnit.SECONDS);
peer.expectTransfer().withNullPayload().withMore(false).accept();
peer.expectDetach().respond();
peer.expectEnd().respond();
peer.expectClose().respond();
// grant one more credit for the complete to arrive.
peer.remoteFlow().withIncomingWindow(1).withNextIncomingId(WRITE_COUNT).withLinkCredit(1).now();
stream.close();
sender.closeAsync().get();
connection.closeAsync().get();
peer.waitForScriptToComplete(5, TimeUnit.SECONDS);
}
}
@Test
void testStreamMessageFlushFailsAfterConnectionDropped() throws Exception {
try (ProtonTestServer peer = new ProtonTestServer()) {
peer.expectSASLAnonymousConnect();
peer.expectOpen().respond();
peer.expectBegin().respond();
peer.expectAttach().ofSender().respond();
peer.remoteFlow().withLinkCredit(1).queue();
peer.start();
URI remoteURI = peer.getServerURI();
LOG.info("Test started, peer listening on: {}", remoteURI);
Client container = Client.create();
Connection connection = container.connect(remoteURI.getHost(), remoteURI.getPort());
StreamSender sender = connection.openStreamSender("test-queue");
StreamSenderMessage message = sender.beginMessage();
OutputStream stream = message.body();
EncodedDataMatcher dataMatcher1 = new EncodedDataMatcher(new byte[] { 0, 1, 2, 3 });
TransferPayloadCompositeMatcher payloadMatcher1 = new TransferPayloadCompositeMatcher();
payloadMatcher1.setMessageContentMatcher(dataMatcher1);
EncodedDataMatcher dataMatcher2 = new EncodedDataMatcher(new byte[] { 4, 5, 6, 7 });
TransferPayloadCompositeMatcher payloadMatcher2 = new TransferPayloadCompositeMatcher();
payloadMatcher2.setMessageContentMatcher(dataMatcher2);
peer.waitForScriptToComplete(5, TimeUnit.SECONDS);
peer.expectTransfer().withPayload(payloadMatcher1).withMore(true);
peer.expectTransfer().withPayload(payloadMatcher2).withMore(true);
peer.dropAfterLastHandler();
// Write two then after connection drops the message should fail on future writes
stream.write(new byte[] { 0, 1, 2, 3 });
stream.flush();
stream.write(new byte[] { 4, 5, 6, 7 });
stream.flush();
peer.waitForScriptToComplete();
// Next write should fail as connection should have dropped.
stream.write(new byte[] { 8, 9, 10, 11 });
try {
stream.flush();
fail("Should not be able to flush after connection drop");
} catch (IOException ioe) {
assertTrue(ioe.getCause() instanceof ClientException);
}
connection.closeAsync().get();
peer.waitForScriptToComplete(5, TimeUnit.SECONDS);
}
}
@Test
void testStreamMessageCloseThatFlushesFailsAfterConnectionDropped() throws Exception {
try (ProtonTestServer peer = new ProtonTestServer()) {
peer.expectSASLAnonymousConnect();
peer.expectOpen().respond();
peer.expectBegin().respond();
peer.expectAttach().ofSender().respond();
peer.remoteFlow().withLinkCredit(1).queue();
peer.start();
URI remoteURI = peer.getServerURI();
LOG.info("Test started, peer listening on: {}", remoteURI);
Client container = Client.create();
Connection connection = container.connect(remoteURI.getHost(), remoteURI.getPort());
StreamSender sender = connection.openStreamSender("test-queue");
StreamSenderMessage message = sender.beginMessage();
OutputStream stream = message.body();
EncodedDataMatcher dataMatcher1 = new EncodedDataMatcher(new byte[] { 0, 1, 2, 3 });
TransferPayloadCompositeMatcher payloadMatcher1 = new TransferPayloadCompositeMatcher();
payloadMatcher1.setMessageContentMatcher(dataMatcher1);
EncodedDataMatcher dataMatcher2 = new EncodedDataMatcher(new byte[] { 4, 5, 6, 7 });
TransferPayloadCompositeMatcher payloadMatcher2 = new TransferPayloadCompositeMatcher();
payloadMatcher2.setMessageContentMatcher(dataMatcher2);
peer.waitForScriptToComplete(5, TimeUnit.SECONDS);
peer.expectTransfer().withPayload(payloadMatcher1).withMore(true);
peer.expectTransfer().withPayload(payloadMatcher2).withMore(true);
peer.dropAfterLastHandler();
// Write two then after connection drops the message should fail on future writes
stream.write(new byte[] { 0, 1, 2, 3 });
stream.flush();
stream.write(new byte[] { 4, 5, 6, 7 });
stream.flush();
peer.waitForScriptToComplete();
// Next write should fail as connection should have dropped.
stream.write(new byte[] { 8, 9, 10, 11 });
try {
stream.close();
fail("Should not be able to close after connection drop");
} catch (IOException ioe) {
assertTrue(ioe.getCause() instanceof ClientException);
}
connection.closeAsync().get();
peer.waitForScriptToComplete(5, TimeUnit.SECONDS);
}
}
@Test
void testStreamMessageWriteThatFlushesFailsAfterConnectionDropped() throws Exception {
try (ProtonTestServer peer = new ProtonTestServer()) {
peer.expectSASLAnonymousConnect();
peer.expectOpen().respond();
peer.expectBegin().respond();
peer.expectAttach().ofSender().respond();
peer.remoteFlow().withLinkCredit(1).queue();
peer.dropAfterLastHandler();
peer.start();
URI remoteURI = peer.getServerURI();
LOG.info("Test started, peer listening on: {}", remoteURI);
Client container = Client.create();
Connection connection = container.connect(remoteURI.getHost(), remoteURI.getPort());
StreamSenderOptions options = new StreamSenderOptions().writeBufferSize(1024);
StreamSender sender = connection.openStreamSender("test-queue", options);
StreamSenderMessage message = sender.beginMessage();
byte[] payload = new byte[65535];
Arrays.fill(payload, (byte) 65);
OutputStreamOptions streamOptions = new OutputStreamOptions().bodyLength(65535);
OutputStream stream = message.body(streamOptions);
peer.waitForScriptToComplete();
try {
stream.write(payload);
fail("Should not be able to write section after connection drop");
} catch (IOException ioe) {
assertTrue(ioe.getCause() instanceof ClientException);
}
connection.closeAsync().get();
peer.waitForScriptToComplete(5, TimeUnit.SECONDS);
}
}
@Test
void testStreamMessageSendFromByteArrayInputStreamWithoutBodySizeSet() throws Exception {
doTestStreamMessageSendFromByteArrayInputStream(false);
}
@Test
void testStreamMessageSendFromByteArrayInputStreamWithBodySizeSet() throws Exception {
doTestStreamMessageSendFromByteArrayInputStream(false);
}
private void doTestStreamMessageSendFromByteArrayInputStream(boolean setBodySize) throws Exception {
final Random random = new Random(System.nanoTime());
final byte[] array = new byte[4096];
final ByteArrayInputStream bytesIn = new ByteArrayInputStream(array);
// Populate the array with something other than zeros.
random.nextBytes(array);
EncodedCompositingDataSectionMatcher matcher = new EncodedCompositingDataSectionMatcher(array);
try (ProtonTestServer peer = new ProtonTestServer()) {
peer.expectSASLAnonymousConnect();
peer.expectOpen().respond();
peer.expectBegin().respond();
peer.expectAttach().ofSender().respond();
peer.remoteFlow().withLinkCredit(100).queue();
for (int i = 0; i < (array.length / 1023); ++i) {
peer.expectTransfer().withDeliveryId(0)
.withMore(true)
.withPayload(matcher);
}
// A small number of trailing bytes will be transmitted in the final frame.
peer.expectTransfer().withDeliveryId(0)
.withMore(false)
.withPayload(matcher);
peer.start();
URI remoteURI = peer.getServerURI();
LOG.info("Test started, peer listening on: {}", remoteURI);
Client container = Client.create();
Connection connection = container.connect(remoteURI.getHost(), remoteURI.getPort());
StreamSenderOptions options = new StreamSenderOptions().writeBufferSize(1023);
StreamSender sender = connection.openStreamSender("test-queue", options);
StreamSenderMessage tracker = sender.beginMessage();
final OutputStream stream;
if (setBodySize) {
stream = tracker.body(new OutputStreamOptions().bodyLength(array.length));
} else {
stream = tracker.body();
}
try {
bytesIn.transferTo(stream);
} finally {
// Ensure any trailing bytes get written and transfer marked as done.
stream.close();
}
peer.waitForScriptToComplete();
peer.expectDetach().respond();
peer.expectEnd().respond();
peer.expectClose().respond();
sender.close();
connection.close();
peer.waitForScriptToComplete(5, TimeUnit.SECONDS);
}
}
@Test
public void testBatchAddBodySectionsWritesEach() throws Exception {
try (ProtonTestServer peer = new ProtonTestServer()) {
peer.expectSASLAnonymousConnect();
peer.expectOpen().respond();
peer.expectBegin().respond();
peer.expectBegin().respond(); // Hidden session for stream sender
peer.expectAttach().ofSender().respond();
peer.remoteFlow().withLinkCredit(10).queue();
peer.expectAttach().respond(); // Open a receiver to ensure sender link has processed
peer.expectFlow(); // the inbound flow frame we sent previously before send.
peer.start();
URI remoteURI = peer.getServerURI();
LOG.info("Sender test started, peer listening on: {}", remoteURI);
Client container = Client.create();
Connection connection = container.connect(remoteURI.getHost(), remoteURI.getPort()).openFuture().get();
Session session = connection.openSession().openFuture().get();
StreamSenderOptions options = new StreamSenderOptions();
options.deliveryMode(DeliveryMode.AT_MOST_ONCE);
options.writeBufferSize(Integer.MAX_VALUE);
StreamSender sender = connection.openStreamSender("test-qos", options);
// Create a custom message format send context and ensure that no early buffer writes take place
StreamSenderMessage message = sender.beginMessage();
assertEquals(Header.DEFAULT_PRIORITY, message.priority());
assertEquals(Header.DEFAULT_DELIVERY_COUNT, message.deliveryCount());
assertEquals(Header.DEFAULT_FIRST_ACQUIRER, message.firstAcquirer());
assertEquals(Header.DEFAULT_TIME_TO_LIVE, message.timeToLive());
assertEquals(Header.DEFAULT_DURABILITY, message.durable());
// Gates send on remote flow having been sent and received
session.openReceiver("dummy").openFuture().get();
HeaderMatcher headerMatcher = new HeaderMatcher(true);
headerMatcher.withDurable(true);
headerMatcher.withPriority((byte) 1);
headerMatcher.withTtl(65535);
headerMatcher.withFirstAcquirer(true);
headerMatcher.withDeliveryCount(2);
EncodedDataMatcher data1Matcher = new EncodedDataMatcher(new byte[] { 0, 1, 2, 3 }, true);
EncodedDataMatcher data2Matcher = new EncodedDataMatcher(new byte[] { 4, 5, 6, 7 }, true);
EncodedDataMatcher data3Matcher = new EncodedDataMatcher(new byte[] { 8, 9, 0, 1 });
TransferPayloadCompositeMatcher payloadMatcher = new TransferPayloadCompositeMatcher();
payloadMatcher.setHeadersMatcher(headerMatcher);
payloadMatcher.addMessageContentMatcher(data1Matcher);
payloadMatcher.addMessageContentMatcher(data2Matcher);
payloadMatcher.addMessageContentMatcher(data3Matcher);
peer.waitForScriptToComplete(5, TimeUnit.SECONDS);
peer.expectTransfer().withMore(false).withPayload(payloadMatcher).accept();
peer.expectDetach().respond();
peer.expectEnd().respond();
peer.expectClose().respond();
// Populate all Header values
Header header = new Header();
header.setDurable(true);
header.setPriority((byte) 1);
header.setTimeToLive(65535);
header.setFirstAcquirer(true);
header.setDeliveryCount(2);
List<Section<?>> sections = new ArrayList<>(3);
sections.add(new Data(new byte[] { 0, 1, 2, 3 }));
sections.add(new Data(new byte[] { 4, 5, 6, 7 }));
sections.add(new Data(new byte[] { 8, 9, 0, 1 }));
message.header(header);
message.bodySections(sections);
message.complete();
assertEquals(message, message.complete()); // Should no-op at this point
Wait.assertTrue(() ->message.tracker().settlementFuture().isDone());
assertTrue(message.tracker().settlementFuture().get().settled());
sender.closeAsync().get(10, TimeUnit.SECONDS);
connection.closeAsync().get(10, TimeUnit.SECONDS);
peer.waitForScriptToComplete(5, TimeUnit.SECONDS);
}
}
private static DeliveryTagGenerator customTagGenerator() {
return new DeliveryTagGenerator() {
private int count = 1;
@Override
public DeliveryTag nextTag() {
switch (count++) {
case 1:
return new DeliveryTag.ProtonDeliveryTag(new byte[] { 1, 1, 1 });
case 2:
return new DeliveryTag.ProtonDeliveryTag(new byte[] { 2, 2, 2 });
case 3:
return new DeliveryTag.ProtonDeliveryTag(new byte[] { 3, 3, 3 });
default:
throw new UnsupportedOperationException("Only supports creating three tags");
}
}
};
}
@Test
public void testSenderUsesCustomDeliveryTagGeneratorConfiguration() throws Exception {
try (ProtonTestServer peer = new ProtonTestServer()) {
peer.expectSASLAnonymousConnect();
peer.expectOpen().respond();
peer.expectBegin().respond(); // Hidden session for stream sender
peer.expectAttach().ofSender().respond();
peer.remoteFlow().withLinkCredit(10).queue();
peer.start();
URI remoteURI = peer.getServerURI();
LOG.info("Sender test started, peer listening on: {}", remoteURI);
Client container = Client.create();
Connection connection = container.connect(remoteURI.getHost(), remoteURI.getPort()).openFuture().get();
StreamSenderOptions options = new StreamSenderOptions().deliveryMode(DeliveryMode.AT_LEAST_ONCE)
.autoSettle(false)
.deliveryTagGeneratorSupplier(StreamSenderTest::customTagGenerator);
StreamSender sender = connection.openStreamSender("test-tags", options).openFuture().get();
peer.waitForScriptToComplete(5, TimeUnit.SECONDS);
peer.expectTransfer().withNonNullPayload()
.withDeliveryTag(new byte[] {1, 1, 1}).respond().withSettled(true).withState().accepted();
peer.expectTransfer().withNonNullPayload()
.withDeliveryTag(new byte[] {2, 2, 2}).respond().withSettled(true).withState().accepted();
peer.expectTransfer().withNonNullPayload()
.withDeliveryTag(new byte[] {3, 3, 3}).respond().withSettled(true).withState().accepted();
peer.expectDetach().respond();
peer.expectEnd().respond(); // From hidden stream sender session
peer.expectClose().respond();
final Message<String> message = Message.create("Hello World");
final StreamTracker tracker1 = sender.send(message);
final StreamTracker tracker2 = sender.send(message);
final StreamTracker tracker3 = sender.send(message);
assertNotNull(tracker1);
assertNotNull(tracker1.settlementFuture().get().settled());
assertNotNull(tracker2);
assertNotNull(tracker2.settlementFuture().get().settled());
assertNotNull(tracker3);
assertNotNull(tracker3.settlementFuture().get().settled());
sender.closeAsync().get(10, TimeUnit.SECONDS);
connection.closeAsync().get(10, TimeUnit.SECONDS);
peer.waitForScriptToComplete(5, TimeUnit.SECONDS);
}
}
@Test
public void testCannotCreateSenderWhenTagGeneratorReturnsNull() throws Exception {
try (ProtonTestServer peer = new ProtonTestServer()) {
peer.expectSASLAnonymousConnect();
peer.expectOpen().respond();
peer.expectBegin().respond(); // Hidden session for stream sender
peer.expectClose().respond();
peer.start();
URI remoteURI = peer.getServerURI();
LOG.info("Sender test started, peer listening on: {}", remoteURI);
Client container = Client.create();
Connection connection = container.connect(remoteURI.getHost(), remoteURI.getPort()).openFuture().get();
StreamSenderOptions options = new StreamSenderOptions().deliveryMode(DeliveryMode.AT_LEAST_ONCE)
.autoSettle(false)
.deliveryTagGeneratorSupplier(() -> null);
try {
connection.openStreamSender("test-tags", options).openFuture().get();
fail("Should not create a sender if the tag generator is not supplied");
} catch (ClientException cliEx) {
// Expected
}
connection.closeAsync().get(10, TimeUnit.SECONDS);
peer.waitForScriptToComplete(5, TimeUnit.SECONDS);
}
}
}