/*
 * Licensed to the Apache Software Foundation (ASF) under one or more
 * contributor license agreements.  See the NOTICE file distributed with
 * this work for additional information regarding copyright ownership.
 * The ASF licenses this file to You under the Apache License, Version 2.0
 * (the "License"); you may not use this file except in compliance with
 * the License.  You may obtain a copy of the License at
 *
 *      http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */
package org.apache.qpid.protonj2.client.impl;

import static org.junit.jupiter.api.Assertions.assertArrayEquals;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertNotNull;
import static org.junit.jupiter.api.Assertions.assertNull;
import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.junit.jupiter.api.Assertions.fail;

import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.io.OutputStream;
import java.net.URI;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Random;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ForkJoinPool;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;

import org.apache.qpid.protonj2.client.Client;
import org.apache.qpid.protonj2.client.Connection;
import org.apache.qpid.protonj2.client.ConnectionOptions;
import org.apache.qpid.protonj2.client.DeliveryMode;
import org.apache.qpid.protonj2.client.Message;
import org.apache.qpid.protonj2.client.OutputStreamOptions;
import org.apache.qpid.protonj2.client.Receiver;
import org.apache.qpid.protonj2.client.ReceiverOptions;
import org.apache.qpid.protonj2.client.SenderOptions;
import org.apache.qpid.protonj2.client.Session;
import org.apache.qpid.protonj2.client.StreamSender;
import org.apache.qpid.protonj2.client.StreamSenderMessage;
import org.apache.qpid.protonj2.client.StreamSenderOptions;
import org.apache.qpid.protonj2.client.StreamTracker;
import org.apache.qpid.protonj2.client.exceptions.ClientException;
import org.apache.qpid.protonj2.client.exceptions.ClientIllegalStateException;
import org.apache.qpid.protonj2.client.exceptions.ClientUnsupportedOperationException;
import org.apache.qpid.protonj2.client.test.ImperativeClientTestCase;
import org.apache.qpid.protonj2.client.test.Wait;
import org.apache.qpid.protonj2.engine.DeliveryTagGenerator;
import org.apache.qpid.protonj2.test.driver.ProtonTestServer;
import org.apache.qpid.protonj2.test.driver.matchers.messaging.ApplicationPropertiesMatcher;
import org.apache.qpid.protonj2.test.driver.matchers.messaging.DeliveryAnnotationsMatcher;
import org.apache.qpid.protonj2.test.driver.matchers.messaging.FooterMatcher;
import org.apache.qpid.protonj2.test.driver.matchers.messaging.HeaderMatcher;
import org.apache.qpid.protonj2.test.driver.matchers.messaging.MessageAnnotationsMatcher;
import org.apache.qpid.protonj2.test.driver.matchers.messaging.PropertiesMatcher;
import org.apache.qpid.protonj2.test.driver.matchers.transport.TransferPayloadCompositeMatcher;
import org.apache.qpid.protonj2.test.driver.matchers.types.EncodedAmqpValueMatcher;
import org.apache.qpid.protonj2.test.driver.matchers.types.EncodedCompositingDataSectionMatcher;
import org.apache.qpid.protonj2.test.driver.matchers.types.EncodedDataMatcher;
import org.apache.qpid.protonj2.test.driver.matchers.types.EncodedPartialDataSectionMatcher;
import org.apache.qpid.protonj2.types.DeliveryTag;
import org.apache.qpid.protonj2.types.messaging.AmqpValue;
import org.apache.qpid.protonj2.types.messaging.Data;
import org.apache.qpid.protonj2.types.messaging.Footer;
import org.apache.qpid.protonj2.types.messaging.Header;
import org.apache.qpid.protonj2.types.messaging.Section;
import org.apache.qpid.protonj2.types.transport.Role;
import org.hamcrest.Matchers;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.Timeout;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@Timeout(20)
public class StreamSenderTest extends ImperativeClientTestCase {

    private static final Logger LOG = LoggerFactory.getLogger(StreamSenderTest.class);

    @Test
    public void testSendWhenCreditIsAvailable() throws Exception {
        doTestSendWhenCreditIsAvailable(false, false);
    }

    @Test
    public void testTrySendWhenCreditIsAvailable() throws Exception {
        doTestSendWhenCreditIsAvailable(true, false);
    }

    @Test
    public void testSendWhenCreditIsAvailableWithDeliveryAnnotations() throws Exception {
        doTestSendWhenCreditIsAvailable(false, true);
    }

    @Test
    public void testTrySendWhenCreditIsAvailableWithDeliveryAnnotations() throws Exception {
        doTestSendWhenCreditIsAvailable(true, true);
    }

    private void doTestSendWhenCreditIsAvailable(boolean trySend, boolean addDeliveryAnnotations) throws Exception {
        try (ProtonTestServer peer = new ProtonTestServer()) {
            peer.expectSASLAnonymousConnect();
            peer.expectOpen().respond();
            peer.expectBegin().respond();
            peer.expectAttach().ofSender().respond();
            peer.remoteFlow().withDeliveryCount(0)
                             .withLinkCredit(10)
                             .withIncomingWindow(1024)
                             .withOutgoingWindow(10)
                             .withNextIncomingId(0)
                             .withNextOutgoingId(1).queue();
            peer.expectBegin().respond();
            peer.expectAttach().ofReceiver().respond();
            peer.start();

            URI remoteURI = peer.getServerURI();

            LOG.info("Sender test started, peer listening on: {}", remoteURI);

            Client container = Client.create();
            Connection connection = container.connect(remoteURI.getHost(), remoteURI.getPort());
            StreamSender sender = connection.openStreamSender("test-queue");
            sender.openFuture().get(10, TimeUnit.SECONDS);

            // This ensures that the flow to sender is processed before we try-send
            Receiver receiver = connection.openReceiver("test-queue", new ReceiverOptions().creditWindow(0));
            receiver.openFuture().get(10, TimeUnit.SECONDS);

            Map<String, Object> deliveryAnnotations = new HashMap<>();
            deliveryAnnotations.put("da1", 1);
            deliveryAnnotations.put("da2", 2);
            deliveryAnnotations.put("da3", 3);
            DeliveryAnnotationsMatcher daMatcher = new DeliveryAnnotationsMatcher(true);
            daMatcher.withEntry("da1", Matchers.equalTo(1));
            daMatcher.withEntry("da2", Matchers.equalTo(2));
            daMatcher.withEntry("da3", Matchers.equalTo(3));
            EncodedAmqpValueMatcher bodyMatcher = new EncodedAmqpValueMatcher("Hello World");
            TransferPayloadCompositeMatcher payloadMatcher = new TransferPayloadCompositeMatcher();
            if (addDeliveryAnnotations) {
                payloadMatcher.setDeliveryAnnotationsMatcher(daMatcher);
            }
            payloadMatcher.setMessageContentMatcher(bodyMatcher);

            peer.waitForScriptToComplete(5, TimeUnit.SECONDS);
            peer.expectTransfer().withNonNullPayload();
            peer.expectDetach().respond();
            peer.expectEnd().respond();
            peer.expectClose().respond();

            Message<String> message = Message.create("Hello World");

            final StreamTracker tracker;
            if (trySend) {
                if (addDeliveryAnnotations) {
                    tracker = sender.trySend(message, deliveryAnnotations);
                } else {
                    tracker = sender.trySend(message);
                }
            } else {
                if (addDeliveryAnnotations) {
                    tracker = sender.send(message, deliveryAnnotations);
                } else {
                    tracker = sender.send(message);
                }
            }

            assertNotNull(tracker);

            sender.closeAsync().get(10, TimeUnit.SECONDS);

            connection.closeAsync().get(10, TimeUnit.SECONDS);

            peer.waitForScriptToComplete(5, TimeUnit.SECONDS);
        }
    }

    @Test
    public void testOpenStreamSenderWithLinCapabilities() throws Exception {
        try (ProtonTestServer peer = new ProtonTestServer()) {
            peer.expectSASLAnonymousConnect();
            peer.expectOpen().respond();
            peer.expectBegin().respond();
            peer.expectAttach().withRole(Role.SENDER.getValue())
                               .withTarget().withCapabilities("queue").and()
                               .respond();
            peer.expectDetach().respond();
            peer.expectEnd().respond();
            peer.expectClose().respond();
            peer.start();

            URI remoteURI = peer.getServerURI();

            LOG.info("StreamSender test started, peer listening on: {}", remoteURI);

            Client container = Client.create();
            Connection connection = container.connect(remoteURI.getHost(), remoteURI.getPort());
            StreamSenderOptions senderOptions = new StreamSenderOptions();
            senderOptions.targetOptions().capabilities("queue");
            StreamSender sender = connection.openStreamSender("test-queue", senderOptions);

            sender.openFuture().get();
            sender.close();

            connection.closeAsync().get(10, TimeUnit.SECONDS);

            peer.waitForScriptToComplete(5, TimeUnit.SECONDS);
        }
    }

    @Test
    public void testOpenStreamSenderAppliesDefaultSessionOutgoingWindow() throws Exception {
        try (ProtonTestServer peer = new ProtonTestServer()) {
            peer.expectSASLAnonymousConnect();
            peer.expectOpen().respond();
            peer.expectBegin().respond();
            peer.expectAttach().withRole(Role.SENDER.getValue())
                               .withTarget().withCapabilities("queue").and()
                               .respond();
            peer.expectDetach().respond();
            peer.expectEnd().respond();
            peer.expectClose().respond();
            peer.start();

            URI remoteURI = peer.getServerURI();

            LOG.info("StreamSender test started, peer listening on: {}", remoteURI);

            Client container = Client.create();
            Connection connection = container.connect(remoteURI.getHost(), remoteURI.getPort());
            StreamSenderOptions senderOptions = new StreamSenderOptions();
            senderOptions.targetOptions().capabilities("queue");
            ClientStreamSender sender = (ClientStreamSender) connection.openStreamSender("test-queue", senderOptions);

            assertEquals(StreamSenderOptions.DEFAULT_PENDING_WRITES_BUFFER_SIZE, sender.protonLink().getSession().getOutgoingCapacity());

            sender.openFuture().get();
            sender.close();

            connection.closeAsync().get(10, TimeUnit.SECONDS);

            peer.waitForScriptToComplete(5, TimeUnit.SECONDS);
        }
    }

    @Test
    public void testOpenStreamSenderAppliesConfiguredSessionOutgoingWindow() throws Exception {
        try (ProtonTestServer peer = new ProtonTestServer()) {
            peer.expectSASLAnonymousConnect();
            peer.expectOpen().respond();
            peer.expectBegin().respond();
            peer.expectAttach().withRole(Role.SENDER.getValue())
                               .withTarget().withCapabilities("queue").and()
                               .respond();
            peer.expectDetach().respond();
            peer.expectEnd().respond();
            peer.expectClose().respond();
            peer.start();

            final int PENDING_WRITES_BUFFER_SIZE = StreamSenderOptions.DEFAULT_PENDING_WRITES_BUFFER_SIZE / 2;

            URI remoteURI = peer.getServerURI();

            LOG.info("StreamSender test started, peer listening on: {}", remoteURI);

            Client container = Client.create();
            Connection connection = container.connect(remoteURI.getHost(), remoteURI.getPort());
            StreamSenderOptions senderOptions = new StreamSenderOptions().pendingWritesBufferSize(PENDING_WRITES_BUFFER_SIZE);
            senderOptions.targetOptions().capabilities("queue");
            ClientStreamSender sender = (ClientStreamSender) connection.openStreamSender("test-queue", senderOptions);

            assertEquals(PENDING_WRITES_BUFFER_SIZE, sender.protonLink().getSession().getOutgoingCapacity());

            sender.openFuture().get();
            sender.close();

            connection.closeAsync().get(10, TimeUnit.SECONDS);

            peer.waitForScriptToComplete(5, TimeUnit.SECONDS);
        }
    }

    @Test
    public void testSendCustomMessageWithMultipleAmqpValueSections() throws Exception {
        try (ProtonTestServer peer = new ProtonTestServer()) {
            peer.expectSASLAnonymousConnect();
            peer.expectOpen().respond();
            peer.expectBegin().respond();
            peer.expectBegin().respond(); // Hidden session for stream sender
            peer.expectAttach().ofSender().respond();
            peer.remoteFlow().withLinkCredit(10).queue();
            peer.expectAttach().respond();  // Open a receiver to ensure sender link has processed
            peer.expectFlow();              // the inbound flow frame we sent previously before send.
            peer.start();

            URI remoteURI = peer.getServerURI();

            LOG.info("Sender test started, peer listening on: {}", remoteURI);

            Client container = Client.create();
            Connection connection = container.connect(remoteURI.getHost(), remoteURI.getPort()).openFuture().get();
            Session session = connection.openSession().openFuture().get();

            StreamSenderOptions options = new StreamSenderOptions();
            options.deliveryMode(DeliveryMode.AT_MOST_ONCE);
            options.writeBufferSize(Integer.MAX_VALUE);

            StreamSender sender = connection.openStreamSender("test-qos", options);

            // Create a custom message format send context and ensure that no early buffer writes take place
            StreamSenderMessage message = sender.beginMessage();

            assertEquals(sender, message.sender());
            assertNull(message.tracker());

            assertEquals(Header.DEFAULT_PRIORITY, message.priority());
            assertEquals(Header.DEFAULT_DELIVERY_COUNT, message.deliveryCount());
            assertEquals(Header.DEFAULT_FIRST_ACQUIRER, message.firstAcquirer());
            assertEquals(Header.DEFAULT_TIME_TO_LIVE, message.timeToLive());
            assertEquals(Header.DEFAULT_DURABILITY, message.durable());

            message.messageFormat(17);

            // Gates send on remote flow having been sent and received
            session.openReceiver("dummy").openFuture().get();

            HeaderMatcher headerMatcher = new HeaderMatcher(true);
            headerMatcher.withDurable(true);
            headerMatcher.withPriority((byte) 1);
            headerMatcher.withTtl(65535);
            headerMatcher.withFirstAcquirer(true);
            headerMatcher.withDeliveryCount(2);
            // Note: This is a specification violation but could be used by other message formats
            //       and we don't attempt to enforce at the Send Context what users write
            EncodedAmqpValueMatcher bodyMatcher1 = new EncodedAmqpValueMatcher("one", true);
            EncodedAmqpValueMatcher bodyMatcher2 = new EncodedAmqpValueMatcher("two", true);
            EncodedAmqpValueMatcher bodyMatcher3 = new EncodedAmqpValueMatcher("three", false);
            TransferPayloadCompositeMatcher payloadMatcher = new TransferPayloadCompositeMatcher();
            payloadMatcher.setHeadersMatcher(headerMatcher);
            payloadMatcher.addMessageContentMatcher(bodyMatcher1);
            payloadMatcher.addMessageContentMatcher(bodyMatcher2);
            payloadMatcher.addMessageContentMatcher(bodyMatcher3);

            peer.waitForScriptToComplete(5, TimeUnit.SECONDS);
            peer.expectTransfer().withMore(false).withMessageFormat(17).withPayload(payloadMatcher).accept();
            peer.expectDetach().respond();
            peer.expectEnd().respond();
            peer.expectClose().respond();

            // Populate all Header values
            Header header = new Header();
            header.setDurable(true);
            header.setPriority((byte) 1);
            header.setTimeToLive(65535);
            header.setFirstAcquirer(true);
            header.setDeliveryCount(2);

            message.header(header);
            message.addBodySection(new AmqpValue<>("one"));
            message.addBodySection(new AmqpValue<>("two"));
            message.addBodySection(new AmqpValue<>("three"));

            message.complete();

            assertNotNull(message.tracker());
            assertEquals(17, message.messageFormat());
            Wait.assertTrue(() -> message.tracker().settlementFuture().isDone());
            assertTrue(message.tracker().settlementFuture().get().settled());
            assertThrows(ClientIllegalStateException.class, () -> message.addBodySection(new AmqpValue<>("three")));
            assertThrows(ClientIllegalStateException.class, () -> message.body());
            assertThrows(ClientIllegalStateException.class, () -> message.rawOutputStream());
            assertThrows(ClientIllegalStateException.class, () -> message.abort());

            sender.closeAsync().get(10, TimeUnit.SECONDS);

            connection.closeAsync().get(10, TimeUnit.SECONDS);

            peer.waitForScriptToComplete(5, TimeUnit.SECONDS);
        }
    }

    @Test
    public void testClearBodySectionsIsNoOpForStreamSenderMessage() throws Exception {
        try (ProtonTestServer peer = new ProtonTestServer()) {
            peer.expectSASLAnonymousConnect();
            peer.expectOpen().respond();
            peer.expectBegin().respond();
            peer.expectBegin().respond(); // Hidden session for stream sender
            peer.expectAttach().ofSender().respond();
            peer.remoteFlow().withLinkCredit(10).queue();
            peer.expectAttach().respond();  // Open a receiver to ensure sender link has processed
            peer.expectFlow();              // the inbound flow frame we sent previously before send.
            peer.start();

            URI remoteURI = peer.getServerURI();

            LOG.info("Sender test started, peer listening on: {}", remoteURI);

            Client container = Client.create();
            Connection connection = container.connect(remoteURI.getHost(), remoteURI.getPort()).openFuture().get();
            Session session = connection.openSession().openFuture().get();

            StreamSenderOptions options = new StreamSenderOptions();
            options.deliveryMode(DeliveryMode.AT_MOST_ONCE);
            options.writeBufferSize(Integer.MAX_VALUE);

            StreamSender sender = connection.openStreamSender("test-qos", options);

            // Create a custom message format send context and ensure that no early buffer writes take place
            StreamSenderMessage message = sender.beginMessage();

            message.messageFormat(17);

            // Gates send on remote flow having been sent and received
            session.openReceiver("dummy").openFuture().get();

            EncodedAmqpValueMatcher bodyMatcher1 = new EncodedAmqpValueMatcher("one", true);
            TransferPayloadCompositeMatcher payloadMatcher = new TransferPayloadCompositeMatcher();
            payloadMatcher.addMessageContentMatcher(bodyMatcher1);

            peer.waitForScriptToComplete(5, TimeUnit.SECONDS);
            peer.expectTransfer().withMore(false).withMessageFormat(17).withPayload(payloadMatcher).accept();
            peer.expectDetach().respond();
            peer.expectEnd().respond();
            peer.expectClose().respond();

            message.addBodySection(new AmqpValue<>("one"));
            message.clearBodySections();
            message.forEachBodySection((section) -> {
                // No sections retained so this should never run.
                throw new RuntimeException();
            });

            assertNotNull(message.bodySections());
            assertTrue(message.bodySections().isEmpty());

            message.complete();

            assertTrue(message.tracker().settlementFuture().isDone());
            assertTrue(message.tracker().settlementFuture().get().settled());
            assertThrows(ClientIllegalStateException.class, () -> message.body());
            assertThrows(ClientIllegalStateException.class, () -> message.rawOutputStream());

            sender.closeAsync().get(10, TimeUnit.SECONDS);

            connection.closeAsync().get(10, TimeUnit.SECONDS);

            peer.waitForScriptToComplete(5, TimeUnit.SECONDS);
        }
    }

    @Test
    public void testMessageFormatCannotBeModifiedAfterBodyWritesStart() throws Exception {
        try (ProtonTestServer peer = new ProtonTestServer()) {
            peer.expectSASLAnonymousConnect();
            peer.expectOpen().respond();
            peer.expectBegin().respond(); // Hidden session for stream sender
            peer.expectAttach().ofSender().respond();
            peer.remoteFlow().withLinkCredit(10).queue();
            peer.expectDetach().respond();
            peer.expectEnd().respond();
            peer.expectClose().respond();
            peer.start();

            URI remoteURI = peer.getServerURI();

            LOG.info("Sender test started, peer listening on: {}", remoteURI);

            Client container = Client.create();
            Connection connection = container.connect(remoteURI.getHost(), remoteURI.getPort()).openFuture().get();

            StreamSender sender = connection.openStreamSender("test-qos");
            StreamSenderMessage message = sender.beginMessage();

            sender.openFuture().get();

            message.durable(true);
            message.messageFormat(17);
            message.body();

            try {
                message.messageFormat(16);
                fail("Should not be able to modify message format after body writes started");
            } catch (ClientIllegalStateException ex) {
                // Expected
            } catch (Exception unexpected) {
                fail("Failed test due to message format set throwing unexpected error: " + unexpected);
            }

            message.abort();

            assertThrows(ClientIllegalStateException.class, () -> message.complete());

            sender.closeAsync().get(10, TimeUnit.SECONDS);
            connection.closeAsync().get(10, TimeUnit.SECONDS);

            peer.waitForScriptToComplete(5, TimeUnit.SECONDS);
        }
    }

    @Test
    public void testCannotCreateNewStreamingMessageWhileCurrentInstanceIsIncomplete() throws Exception {
        try (ProtonTestServer peer = new ProtonTestServer()) {
            peer.expectSASLAnonymousConnect();
            peer.expectOpen().respond();
            peer.expectBegin().respond(); // Hidden session for stream sender
            peer.expectAttach().ofSender().respond();
            peer.remoteFlow().withLinkCredit(10).queue();
            peer.expectDetach().respond();
            peer.expectEnd().respond();
            peer.expectClose().respond();
            peer.start();

            URI remoteURI = peer.getServerURI();

            LOG.info("Sender test started, peer listening on: {}", remoteURI);

            Client container = Client.create();
            Connection connection = container.connect(remoteURI.getHost(), remoteURI.getPort()).openFuture().get();

            StreamSender sender = connection.openStreamSender("test-qos").openFuture().get();
            StreamSenderMessage message = sender.beginMessage();

            try {
                sender.beginMessage();
                fail("Should not be able create a new streaming sender message before last one is completed.");
            } catch (ClientIllegalStateException ex) {
                // Expected
            }

            message.abort();

            assertThrows(ClientIllegalStateException.class, () -> message.complete());

            sender.closeAsync().get(10, TimeUnit.SECONDS);
            connection.closeAsync().get(10, TimeUnit.SECONDS);

            peer.waitForScriptToComplete(5, TimeUnit.SECONDS);
        }
    }

    @Test
    public void testCannotAssignAnOutputStreamToTheMessageBody() throws Exception {
        try (ProtonTestServer peer = new ProtonTestServer()) {
            peer.expectSASLAnonymousConnect();
            peer.expectOpen().respond();
            peer.expectBegin().respond(); // Hidden session for stream sender
            peer.expectAttach().ofSender().respond();
            peer.remoteFlow().withLinkCredit(10).queue();
            peer.expectDetach().respond();
            peer.expectEnd().respond();
            peer.expectClose().respond();
            peer.start();

            URI remoteURI = peer.getServerURI();

            LOG.info("Sender test started, peer listening on: {}", remoteURI);

            Client container = Client.create();
            Connection connection = container.connect(remoteURI.getHost(), remoteURI.getPort()).openFuture().get();

            StreamSender sender = connection.openStreamSender("test-qos").openFuture().get();
            StreamSenderMessage message = sender.beginMessage();

            try {
                message.body(new ByteArrayOutputStream());
                fail("Should not be able assign an output stream to the message body");
            } catch (ClientUnsupportedOperationException ex) {
                // Expected
            }

            message.abort();

            assertThrows(ClientIllegalStateException.class, () -> message.complete());

            sender.closeAsync().get(10, TimeUnit.SECONDS);
            connection.closeAsync().get(10, TimeUnit.SECONDS);

            peer.waitForScriptToComplete(5, TimeUnit.SECONDS);
        }
    }

    @Test
    public void testCannotModifyMessagePreambleAfterWritesHaveStarted() throws Exception {
        try (ProtonTestServer peer = new ProtonTestServer()) {
            peer.expectSASLAnonymousConnect();
            peer.expectOpen().respond();
            peer.expectBegin().respond(); // Hidden session for stream sender
            peer.expectAttach().ofSender().respond();
            peer.remoteFlow().withLinkCredit(10).queue();
            peer.expectDetach().respond();
            peer.expectEnd().respond();
            peer.expectClose().respond();
            peer.start();

            URI remoteURI = peer.getServerURI();

            LOG.info("Sender test started, peer listening on: {}", remoteURI);

            Client container = Client.create();
            Connection connection = container.connect(remoteURI.getHost(), remoteURI.getPort()).openFuture().get();

            StreamSender sender = connection.openStreamSender("test-qos").openFuture().get();
            StreamSenderMessage message = sender.beginMessage();

            message.durable(true);
            message.messageId("test");
            message.annotation("key", "value");
            message.property("key", "value");
            message.body();

            try {
                message.durable(false);
                fail("Should not be able to modify message preamble after body writes started");
            } catch (ClientIllegalStateException ex) {
                // Expected
            }

            try {
                message.messageId("test1");
                fail("Should not be able to modify message preamble after body writes started");
            } catch (ClientIllegalStateException ex) {
                // Expected
            }

            try {
                message.annotation("key1", "value");
                fail("Should not be able to modify message preamble after body writes started");
            } catch (ClientIllegalStateException ex) {
                // Expected
            }

            try {
                message.property("key", "value");
                fail("Should not be able to modify message preamble after body writes started");
            } catch (ClientIllegalStateException ex) {
                // Expected
            }

            message.abort();

            sender.closeAsync().get(10, TimeUnit.SECONDS);
            connection.closeAsync().get(10, TimeUnit.SECONDS);

            peer.waitForScriptToComplete(5, TimeUnit.SECONDS);
        }
    }

    @Test
    void testCreateStream() throws Exception {
        try (ProtonTestServer peer = new ProtonTestServer()) {
            peer.expectSASLAnonymousConnect();
            peer.expectOpen().respond();
            peer.expectBegin().respond();
            peer.expectAttach().ofSender().respond();
            peer.remoteFlow().withLinkCredit(1).queue();
            peer.expectTransfer().withMore(false).withNullPayload();
            peer.expectDetach().withClosed(true).respond();
            peer.expectEnd().respond();
            peer.expectClose().respond();
            peer.start();

            URI remoteURI = peer.getServerURI();

            LOG.info("Test started, peer listening on: {}", remoteURI);

            Client container = Client.create();
            Connection connection = container.connect(remoteURI.getHost(), remoteURI.getPort());
            StreamSender sender = connection.openStreamSender("test-qos");
            StreamSenderMessage tracker = sender.beginMessage();

            OutputStreamOptions options = new OutputStreamOptions();
            OutputStream stream = tracker.body(options);

            assertNotNull(stream);

            sender.openFuture().get();

            stream.close();

            sender.closeAsync().get();
            connection.closeAsync().get();

            peer.waitForScriptToComplete(5, TimeUnit.SECONDS);
        }
    }

    @Test
    void testOutputStreamOptionsEnforcesValidBodySizeValues() throws Exception {
        OutputStreamOptions options = new OutputStreamOptions();

        options.bodyLength(1024);
        options.bodyLength(Integer.MAX_VALUE);

        assertThrows(IllegalArgumentException.class, () -> options.bodyLength(-1));
    }

    @Test
    public void testFlushWithSetNonBodySectionsThenClose() throws Exception {
        doTestNonBodySectionWrittenWhenNoWritesToStream(true);
    }

    @Test
    public void testCloseWithSetNonBodySections() throws Exception {
        doTestNonBodySectionWrittenWhenNoWritesToStream(false);
    }

    private void doTestNonBodySectionWrittenWhenNoWritesToStream(boolean flushBeforeClose) throws Exception {
        try (ProtonTestServer peer = new ProtonTestServer()) {
            peer.expectSASLAnonymousConnect();
            peer.expectOpen().respond();
            peer.expectBegin().respond();
            peer.expectAttach().ofSender().respond();
            peer.remoteFlow().withLinkCredit(1).queue();
            peer.start();

            URI remoteURI = peer.getServerURI();

            LOG.info("Test started, peer listening on: {}", remoteURI);

            Client container = Client.create();
            Connection connection = container.connect(remoteURI.getHost(), remoteURI.getPort());
            StreamSender sender = connection.openStreamSender("test-queue");
            StreamSenderMessage message = sender.beginMessage();

            // Populate all Header values
            Header header = new Header();
            header.setDurable(true);
            header.setPriority((byte) 1);
            header.setTimeToLive(65535);
            header.setFirstAcquirer(true);
            header.setDeliveryCount(2);

            message.header(header);

            OutputStreamOptions options = new OutputStreamOptions();
            OutputStream stream = message.body(options);

            HeaderMatcher headerMatcher = new HeaderMatcher(true);
            headerMatcher.withDurable(true);
            headerMatcher.withPriority((byte) 1);
            headerMatcher.withTtl(65535);
            headerMatcher.withFirstAcquirer(true);
            headerMatcher.withDeliveryCount(2);
            TransferPayloadCompositeMatcher payloadMatcher = new TransferPayloadCompositeMatcher();
            payloadMatcher.setHeadersMatcher(headerMatcher);

            peer.waitForScriptToComplete(5, TimeUnit.SECONDS);
            if (flushBeforeClose) {
                peer.expectTransfer().withMore(true).withPayload(payloadMatcher);
                peer.expectTransfer().withMore(false).withNullPayload()
                                     .respond()
                                     .withSettled(true).withState().accepted();
            } else {
                peer.expectTransfer().withMore(false).withPayload(payloadMatcher)
                                     .respond()
                                     .withSettled(true).withState().accepted();
            }
            peer.expectDetach().respond();
            peer.expectEnd().respond();
            peer.expectClose().respond();

            // Once flush is called than anything in the buffer is written regardless of
            // there being any actual stream writes.  Default close action is to complete
            // the delivery.
            if (flushBeforeClose) {
                stream.flush();
            }
            stream.close();

            message.tracker().awaitSettlement(10, TimeUnit.SECONDS);

            sender.closeAsync().get();
            connection.closeAsync().get();

            peer.waitForScriptToComplete(5, TimeUnit.SECONDS);
        }
    }

    @Test
    void testFlushAfterFirstWriteEncodesAMQPHeaderAndMessageBuffer() throws Exception {
        try (ProtonTestServer peer = new ProtonTestServer()) {
            peer.expectSASLAnonymousConnect();
            peer.expectOpen().respond();
            peer.expectBegin().respond();
            peer.expectAttach().ofSender().respond();
            peer.remoteFlow().withLinkCredit(1).queue();
            peer.start();

            URI remoteURI = peer.getServerURI();

            LOG.info("Test started, peer listening on: {}", remoteURI);

            Client container = Client.create();
            Connection connection = container.connect(remoteURI.getHost(), remoteURI.getPort());
            StreamSender sender = connection.openStreamSender("test-queue");
            StreamSenderMessage message = sender.beginMessage();

            // Populate all Header values
            Header header = new Header();
            header.setDurable(true);
            header.setPriority((byte) 1);
            header.setTimeToLive(65535);
            header.setFirstAcquirer(true);
            header.setDeliveryCount(2);

            message.header(header);

            OutputStreamOptions options = new OutputStreamOptions();
            OutputStream stream = message.body(options);

            HeaderMatcher headerMatcher = new HeaderMatcher(true);
            headerMatcher.withDurable(true);
            headerMatcher.withPriority((byte) 1);
            headerMatcher.withTtl(65535);
            headerMatcher.withFirstAcquirer(true);
            headerMatcher.withDeliveryCount(2);
            EncodedDataMatcher dataMatcher = new EncodedDataMatcher(new byte[] { 0, 1, 2, 3 });
            TransferPayloadCompositeMatcher payloadMatcher = new TransferPayloadCompositeMatcher();
            payloadMatcher.setHeadersMatcher(headerMatcher);
            payloadMatcher.setMessageContentMatcher(dataMatcher);

            peer.waitForScriptToComplete(5, TimeUnit.SECONDS);
            peer.expectTransfer().withMore(true).withPayload(payloadMatcher);
            peer.expectTransfer().withMore(false).withNullPayload();
            peer.expectDetach().respond();
            peer.expectEnd().respond();
            peer.expectClose().respond();

            // Stream won't output until some body bytes are written since the buffer was not
            // filled by the header write.  Then the close will complete the stream message.
            stream.write(new byte[] { 0, 1, 2, 3 });
            stream.flush();
            stream.close();

            sender.closeAsync().get();
            connection.closeAsync().get();

            peer.waitForScriptToComplete(5, TimeUnit.SECONDS);
        }
    }

    @Test
    void testAutoFlushAfterSingleWriteExceedsConfiguredBufferLimit() throws Exception {
        try (ProtonTestServer peer = new ProtonTestServer()) {
            peer.expectSASLAnonymousConnect();
            peer.expectOpen().respond();
            peer.expectBegin().respond();
            peer.expectAttach().ofSender().respond();
            peer.remoteFlow().withLinkCredit(1).queue();
            peer.start();

            URI remoteURI = peer.getServerURI();

            LOG.info("Test started, peer listening on: {}", remoteURI);

            Client container = Client.create();
            Connection connection = container.connect(remoteURI.getHost(), remoteURI.getPort());
            StreamSender sender = connection.openStreamSender("test-queue", new StreamSenderOptions().writeBufferSize(512));
            StreamSenderMessage tracker = sender.beginMessage();

            final byte[] payload = new byte[512];
            Arrays.fill(payload, (byte) 16);

            // Populate all Header values
            Header header = new Header();
            header.setDurable(true);
            header.setPriority((byte) 1);
            header.setTimeToLive(65535);
            header.setFirstAcquirer(true);
            header.setDeliveryCount(2);

            tracker.header(header);

            OutputStreamOptions options = new OutputStreamOptions();
            OutputStream stream = tracker.body(options);

            HeaderMatcher headerMatcher = new HeaderMatcher(true);
            headerMatcher.withDurable(true);
            headerMatcher.withPriority((byte) 1);
            headerMatcher.withTtl(65535);
            headerMatcher.withFirstAcquirer(true);
            headerMatcher.withDeliveryCount(2);
            EncodedDataMatcher dataMatcher = new EncodedDataMatcher(payload);
            TransferPayloadCompositeMatcher payloadMatcher = new TransferPayloadCompositeMatcher();
            payloadMatcher.setHeadersMatcher(headerMatcher);
            payloadMatcher.setMessageContentMatcher(dataMatcher);

            peer.waitForScriptToComplete(5, TimeUnit.SECONDS);
            peer.expectTransfer().withPayload(payloadMatcher).withMore(true);

            // Stream won't output until some body bytes are written.
            stream.write(payload);

            peer.waitForScriptToComplete(5, TimeUnit.SECONDS);
            peer.expectTransfer().withNullPayload().withMore(false).accept();
            peer.expectDetach().respond();
            peer.expectEnd().respond();
            peer.expectClose().respond();

            stream.close();

            sender.closeAsync().get();
            connection.closeAsync().get();

            peer.waitForScriptToComplete(5, TimeUnit.SECONDS);
        }
    }

    @Test
    void testAutoFlushDuringWriteThatExceedConfiguredBufferLimit() throws Exception {
        try (ProtonTestServer peer = new ProtonTestServer()) {
            peer.expectSASLAnonymousConnect();
            peer.expectOpen().respond();
            peer.expectBegin().respond();
            peer.expectAttach().ofSender().respond();
            peer.remoteFlow().withLinkCredit(1).queue();
            peer.start();

            URI remoteURI = peer.getServerURI();

            LOG.info("Test started, peer listening on: {}", remoteURI);

            Client container = Client.create();
            Connection connection = container.connect(remoteURI.getHost(), remoteURI.getPort());
            StreamSender sender = connection.openStreamSender("test-queue", new StreamSenderOptions().writeBufferSize(256));
            StreamSenderMessage tracker = sender.beginMessage();

            final byte[] payload = new byte[1024];
            Arrays.fill(payload, 0, 256, (byte) 1);
            Arrays.fill(payload, 256, 512, (byte) 2);
            Arrays.fill(payload, 512, 768, (byte) 3);
            Arrays.fill(payload, 768, 1024, (byte) 4);

            final byte[] payload1 = new byte[256];
            Arrays.fill(payload1, (byte) 1);
            final byte[] payload2 = new byte[256];
            Arrays.fill(payload2, (byte) 2);
            final byte[] payload3 = new byte[256];
            Arrays.fill(payload3, (byte) 3);
            final byte[] payload4 = new byte[256];
            Arrays.fill(payload4, (byte) 4);

            // Populate all Header values
            Header header = new Header();
            header.setDurable(true);
            header.setPriority((byte) 1);
            header.setTimeToLive(65535);
            header.setFirstAcquirer(true);
            header.setDeliveryCount(2);

            tracker.header(header);

            OutputStreamOptions options = new OutputStreamOptions();
            OutputStream stream = tracker.body(options);

            HeaderMatcher headerMatcher = new HeaderMatcher(true);
            headerMatcher.withDurable(true);
            headerMatcher.withPriority((byte) 1);
            headerMatcher.withTtl(65535);
            headerMatcher.withFirstAcquirer(true);
            headerMatcher.withDeliveryCount(2);
            EncodedDataMatcher dataMatcher1 = new EncodedDataMatcher(payload1);
            TransferPayloadCompositeMatcher payloadMatcher1 = new TransferPayloadCompositeMatcher();
            payloadMatcher1.setHeadersMatcher(headerMatcher);
            payloadMatcher1.setMessageContentMatcher(dataMatcher1);

            EncodedDataMatcher dataMatcher2 = new EncodedDataMatcher(payload2);
            TransferPayloadCompositeMatcher payloadMatcher2 = new TransferPayloadCompositeMatcher();
            payloadMatcher2.setMessageContentMatcher(dataMatcher2);

            EncodedDataMatcher dataMatcher3 = new EncodedDataMatcher(payload3);
            TransferPayloadCompositeMatcher payloadMatcher3 = new TransferPayloadCompositeMatcher();
            payloadMatcher3.setMessageContentMatcher(dataMatcher3);

            EncodedDataMatcher dataMatcher4 = new EncodedDataMatcher(payload4);
            TransferPayloadCompositeMatcher payloadMatcher4 = new TransferPayloadCompositeMatcher();
            payloadMatcher4.setMessageContentMatcher(dataMatcher4);

            peer.waitForScriptToComplete(5, TimeUnit.SECONDS);
            peer.expectTransfer().withPayload(payloadMatcher1).withMore(true);
            peer.expectTransfer().withPayload(payloadMatcher2).withMore(true);
            peer.expectTransfer().withPayload(payloadMatcher3).withMore(true);
            peer.expectTransfer().withPayload(payloadMatcher4).withMore(true);

            // Stream won't output until some body bytes are written.
            stream.write(payload);

            peer.waitForScriptToComplete(5, TimeUnit.SECONDS);
            peer.expectTransfer().withNullPayload().withMore(false).accept();
            peer.expectDetach().respond();
            peer.expectEnd().respond();
            peer.expectClose().respond();

            stream.close();

            sender.close();
            connection.close();

            peer.waitForScriptToComplete(5, TimeUnit.SECONDS);
        }
    }

    @Test
    void testAutoFlushDuringWriteThatExceedConfiguredBufferLimitSessionCreditLimitOnTransfer() throws Exception {
        try (ProtonTestServer peer = new ProtonTestServer()) {
            peer.expectSASLAnonymousConnect();
            peer.expectOpen().respond();
            peer.expectBegin().respond();
            peer.expectAttach().ofSender().respond();
            peer.start();

            URI remoteURI = peer.getServerURI();

            LOG.info("Test started, peer listening on: {}", remoteURI);

            Client container = Client.create();
            Connection connection = container.connect(remoteURI.getHost(), remoteURI.getPort());
            StreamSender sender = connection.openStreamSender("test-queue", new StreamSenderOptions().writeBufferSize(256));
            StreamSenderMessage tracker = sender.beginMessage();

            final byte[] payload = new byte[1024];
            Arrays.fill(payload, 0, 256, (byte) 1);
            Arrays.fill(payload, 256, 512, (byte) 2);
            Arrays.fill(payload, 512, 768, (byte) 3);
            Arrays.fill(payload, 768, 1024, (byte) 4);

            final byte[] payload1 = new byte[256];
            Arrays.fill(payload1, (byte) 1);
            final byte[] payload2 = new byte[256];
            Arrays.fill(payload2, (byte) 2);
            final byte[] payload3 = new byte[256];
            Arrays.fill(payload3, (byte) 3);
            final byte[] payload4 = new byte[256];
            Arrays.fill(payload4, (byte) 4);

            // Populate all Header values
            Header header = new Header();
            header.setDurable(true);
            header.setPriority((byte) 1);
            header.setTimeToLive(65535);
            header.setFirstAcquirer(true);
            header.setDeliveryCount(2);

            tracker.header(header);

            OutputStreamOptions options = new OutputStreamOptions();
            OutputStream stream = tracker.body(options);

            HeaderMatcher headerMatcher = new HeaderMatcher(true);
            headerMatcher.withDurable(true);
            headerMatcher.withPriority((byte) 1);
            headerMatcher.withTtl(65535);
            headerMatcher.withFirstAcquirer(true);
            headerMatcher.withDeliveryCount(2);
            EncodedDataMatcher dataMatcher1 = new EncodedDataMatcher(payload1);
            TransferPayloadCompositeMatcher payloadMatcher1 = new TransferPayloadCompositeMatcher();
            payloadMatcher1.setHeadersMatcher(headerMatcher);
            payloadMatcher1.setMessageContentMatcher(dataMatcher1);

            EncodedDataMatcher dataMatcher2 = new EncodedDataMatcher(payload2);
            TransferPayloadCompositeMatcher payloadMatcher2 = new TransferPayloadCompositeMatcher();
            payloadMatcher2.setMessageContentMatcher(dataMatcher2);

            EncodedDataMatcher dataMatcher3 = new EncodedDataMatcher(payload3);
            TransferPayloadCompositeMatcher payloadMatcher3 = new TransferPayloadCompositeMatcher();
            payloadMatcher3.setMessageContentMatcher(dataMatcher3);

            EncodedDataMatcher dataMatcher4 = new EncodedDataMatcher(payload4);
            TransferPayloadCompositeMatcher payloadMatcher4 = new TransferPayloadCompositeMatcher();
            payloadMatcher4.setMessageContentMatcher(dataMatcher4);

            final CountDownLatch sendComplete = new CountDownLatch(1);
            final AtomicBoolean sendFailed = new AtomicBoolean();
            // Stream won't output until some body bytes are written.
            ForkJoinPool.commonPool().execute(() -> {
                try {
                    stream.write(payload);
                } catch (IOException e) {
                    LOG.info("send failed with error: ", e);
                    sendFailed.set(true);
                } finally {
                    sendComplete.countDown();
                }
            });

            peer.waitForScriptToComplete(5, TimeUnit.SECONDS);
            peer.expectTransfer().withPayload(payloadMatcher1).withMore(true);
            peer.remoteFlow().withIncomingWindow(1).withNextIncomingId(2).withLinkCredit(10).queue();
            peer.expectTransfer().withPayload(payloadMatcher2).withMore(true);
            peer.remoteFlow().withIncomingWindow(1).withNextIncomingId(3).withLinkCredit(10).queue();
            peer.expectTransfer().withPayload(payloadMatcher3).withMore(true);
            peer.remoteFlow().withIncomingWindow(1).withNextIncomingId(4).withLinkCredit(10).queue();
            peer.expectTransfer().withPayload(payloadMatcher4).withMore(true);
            peer.remoteFlow().withIncomingWindow(1).withNextIncomingId(5).withLinkCredit(10).queue();
            peer.expectTransfer().withNullPayload().withMore(false).accept();
            peer.expectDetach().respond();
            peer.expectEnd().respond();
            peer.expectClose().respond();

            // Initiate the above script of transfers and flows
            peer.remoteFlow().withIncomingWindow(1).withNextIncomingId(1).withLinkCredit(10).now();

            assertTrue(sendComplete.await(10, TimeUnit.SECONDS));

            stream.close();

            assertFalse(sendFailed.get());

            sender.closeAsync().get();
            connection.closeAsync().get();

            peer.waitForScriptToComplete(5, TimeUnit.SECONDS);
        }
    }

    @Test
    void testCloseAfterSingleWriteEncodesAndCompletesTransferWhenNoStreamSizeConfigured() throws Exception {
        try (ProtonTestServer peer = new ProtonTestServer()) {
            peer.expectSASLAnonymousConnect();
            peer.expectOpen().respond();
            peer.expectBegin().respond();
            peer.expectAttach().ofSender().respond();
            peer.remoteFlow().withLinkCredit(1).queue();
            peer.start();

            URI remoteURI = peer.getServerURI();

            LOG.info("Test started, peer listening on: {}", remoteURI);

            Client container = Client.create();
            Connection connection = container.connect(remoteURI.getHost(), remoteURI.getPort());
            StreamSender sender = connection.openStreamSender("test-queue");
            StreamSenderMessage tracker = sender.beginMessage();

            // Populate all Header values
            Header header = new Header();
            header.setDurable(true);
            header.setPriority((byte) 1);
            header.setTimeToLive(65535);
            header.setFirstAcquirer(true);
            header.setDeliveryCount(2);

            tracker.header(header);

            OutputStreamOptions options = new OutputStreamOptions();
            OutputStream stream = tracker.body(options);

            HeaderMatcher headerMatcher = new HeaderMatcher(true);
            headerMatcher.withDurable(true);
            headerMatcher.withPriority((byte) 1);
            headerMatcher.withTtl(65535);
            headerMatcher.withFirstAcquirer(true);
            headerMatcher.withDeliveryCount(2);
            EncodedDataMatcher dataMatcher = new EncodedDataMatcher(new byte[] { 0, 1, 2, 3 });
            TransferPayloadCompositeMatcher payloadMatcher = new TransferPayloadCompositeMatcher();
            payloadMatcher.setHeadersMatcher(headerMatcher);
            payloadMatcher.setMessageContentMatcher(dataMatcher);

            peer.waitForScriptToComplete(5, TimeUnit.SECONDS);
            peer.expectTransfer().withPayload(payloadMatcher).withMore(false).accept();
            peer.expectDetach().respond();
            peer.expectEnd().respond();
            peer.expectClose().respond();

            // Stream won't output until some body bytes are written.
            stream.write(new byte[] { 0, 1, 2, 3 });
            stream.close();

            sender.closeAsync().get();
            connection.closeAsync().get();

            peer.waitForScriptToComplete(5, TimeUnit.SECONDS);
        }
    }

    @Test
    void testFlushAfterSecondWriteDoesNotEncodeAMQPHeaderFromConfiguration() throws Exception {
        try (ProtonTestServer peer = new ProtonTestServer()) {
            peer.expectSASLAnonymousConnect();
            peer.expectOpen().respond();
            peer.expectBegin().respond();
            peer.expectAttach().ofSender().respond();
            peer.remoteFlow().withLinkCredit(1).queue();
            peer.start();

            URI remoteURI = peer.getServerURI();

            LOG.info("Test started, peer listening on: {}", remoteURI);

            Client container = Client.create();
            Connection connection = container.connect(remoteURI.getHost(), remoteURI.getPort());
            StreamSender sender = connection.openStreamSender("test-queue");
            StreamSenderMessage tracker = sender.beginMessage();

            // Populate all Header values
            Header header = new Header();
            header.setDurable(true);
            header.setPriority((byte) 1);
            header.setTimeToLive(65535);
            header.setFirstAcquirer(true);
            header.setDeliveryCount(2);

            tracker.header(header);

            OutputStreamOptions options = new OutputStreamOptions();
            OutputStream stream = tracker.body(options);

            HeaderMatcher headerMatcher = new HeaderMatcher(true);
            headerMatcher.withDurable(true);
            headerMatcher.withPriority((byte) 1);
            headerMatcher.withTtl(65535);
            headerMatcher.withFirstAcquirer(true);
            headerMatcher.withDeliveryCount(2);
            EncodedDataMatcher dataMatcher1 = new EncodedDataMatcher(new byte[] { 0, 1, 2, 3 });
            TransferPayloadCompositeMatcher payloadMatcher1 = new TransferPayloadCompositeMatcher();
            payloadMatcher1.setHeadersMatcher(headerMatcher);
            payloadMatcher1.setMessageContentMatcher(dataMatcher1);

            // Second flush expectation
            EncodedDataMatcher dataMatcher2 = new EncodedDataMatcher(new byte[] { 4, 5, 6, 7 });
            TransferPayloadCompositeMatcher payloadMatcher2 = new TransferPayloadCompositeMatcher();
            payloadMatcher2.setMessageContentMatcher(dataMatcher2);

            peer.waitForScriptToComplete(5, TimeUnit.SECONDS);
            peer.expectTransfer().withPayload(payloadMatcher1).withMore(true);
            peer.expectTransfer().withPayload(payloadMatcher2).withMore(true);
            peer.expectTransfer().withNullPayload().withMore(false).accept();
            peer.expectDetach().respond();
            peer.expectEnd().respond();
            peer.expectClose().respond();

            // Stream won't output until some body bytes are written.
            stream.write(new byte[] { 0, 1, 2, 3 });
            stream.flush();

            // Next write should only be a single Data section
            stream.write(new byte[] { 4, 5, 6, 7 });
            stream.flush();

            // Final Transfer that completes the Delivery
            stream.close();

            sender.closeAsync().get();
            connection.closeAsync().get();

            peer.waitForScriptToComplete(5, TimeUnit.SECONDS);
        }
    }

    @Test
    void testIncompleteStreamClosureCausesTransferAbort() throws Exception {
        try (ProtonTestServer peer = new ProtonTestServer()) {
            peer.expectSASLAnonymousConnect();
            peer.expectOpen().respond();
            peer.expectBegin().respond();
            peer.expectAttach().ofSender().respond();
            peer.remoteFlow().withLinkCredit(1).queue();
            peer.start();

            URI remoteURI = peer.getServerURI();

            LOG.info("Test started, peer listening on: {}", remoteURI);

            Client container = Client.create();
            Connection connection = container.connect(remoteURI.getHost(), remoteURI.getPort());
            StreamSender sender = connection.openStreamSender("test-queue");
            StreamSenderMessage tracker = sender.beginMessage();

            final byte[] payload = new byte[] { 0, 1, 2, 3 };

            // Populate all Header values
            Header header = new Header();
            header.setDurable(true);
            header.setPriority((byte) 1);
            header.setDeliveryCount(1);

            tracker.header(header);

            OutputStreamOptions options = new OutputStreamOptions().bodyLength(8192);
            OutputStream stream = tracker.body(options);

            HeaderMatcher headerMatcher = new HeaderMatcher(true);
            headerMatcher.withDurable(true);
            headerMatcher.withPriority((byte) 1);
            headerMatcher.withDeliveryCount(1);
            EncodedPartialDataSectionMatcher partialDataMatcher = new EncodedPartialDataSectionMatcher(8192, payload);
            TransferPayloadCompositeMatcher payloadMatcher = new TransferPayloadCompositeMatcher();
            payloadMatcher.setHeadersMatcher(headerMatcher);
            payloadMatcher.setMessageContentMatcher(partialDataMatcher);

            peer.waitForScriptToComplete(5, TimeUnit.SECONDS);
            peer.expectTransfer().withPayload(payloadMatcher);
            peer.expectTransfer().withAborted(true).withNullPayload();
            peer.expectDetach().respond();
            peer.expectEnd().respond();
            peer.expectClose().respond();

            stream.write(payload);
            stream.flush();

            // Stream should abort the send now since the configured size wasn't sent.
            stream.close();

            sender.closeAsync().get();
            connection.closeAsync().get();

            peer.waitForScriptToComplete(5, TimeUnit.SECONDS);
        }
    }

    @Test
    void testIncompleteStreamClosureWithNoWritesAbortsTransfer() throws Exception {
        try (ProtonTestServer peer = new ProtonTestServer()) {
            peer.expectSASLAnonymousConnect();
            peer.expectOpen().respond();
            peer.expectBegin().respond();
            peer.expectAttach().ofSender().respond();
            peer.remoteFlow().withLinkCredit(1).queue();
            peer.start();

            URI remoteURI = peer.getServerURI();

            LOG.info("Test started, peer listening on: {}", remoteURI);

            Client container = Client.create();
            Connection connection = container.connect(remoteURI.getHost(), remoteURI.getPort());
            StreamSender sender = connection.openStreamSender("test-queue");
            StreamSenderMessage message = sender.beginMessage();

            // Populate all Header values
            Header header = new Header();
            header.setDurable(true);
            header.setPriority((byte) 1);
            header.setDeliveryCount(1);

            message.header(header);

            OutputStreamOptions options = new OutputStreamOptions().bodyLength(8192).completeSendOnClose(false);
            OutputStream stream = message.body(options);

            HeaderMatcher headerMatcher = new HeaderMatcher(true);
            headerMatcher.withDurable(true);
            headerMatcher.withPriority((byte) 1);
            headerMatcher.withDeliveryCount(1);
            TransferPayloadCompositeMatcher payloadMatcher = new TransferPayloadCompositeMatcher();
            payloadMatcher.setHeadersMatcher(headerMatcher);

            peer.waitForScriptToComplete(5, TimeUnit.SECONDS);
            peer.expectDetach().respond();
            peer.expectEnd().respond();
            peer.expectClose().respond();

            // This should abort the transfer as we might have triggered output upon create when the
            // preamble was written.
            stream.close();

            assertTrue(message.aborted());

            // Should have no affect.
            message.abort();

            sender.closeAsync().get();
            connection.closeAsync().get();

            peer.waitForScriptToComplete(5, TimeUnit.SECONDS);
        }
    }

    @Test
    void testCompleteStreamClosureCausesTransferCompleted() throws Exception {
        try (ProtonTestServer peer = new ProtonTestServer()) {
            peer.expectSASLAnonymousConnect();
            peer.expectOpen().respond();
            peer.expectBegin().respond();
            peer.expectAttach().ofSender().respond();
            peer.remoteFlow().withLinkCredit(3).queue();
            peer.start();

            URI remoteURI = peer.getServerURI();

            LOG.info("Test started, peer listening on: {}", remoteURI);

            Client container = Client.create();
            Connection connection = container.connect(remoteURI.getHost(), remoteURI.getPort());
            StreamSender sender = connection.openStreamSender("test-queue");
            StreamSenderMessage tracker = sender.beginMessage();

            final byte[] payload1 = new byte[] { 0, 1, 2, 3, 4, 5 };
            final byte[] payload2 = new byte[] { 6, 7, 8, 9, 10, 11, 12, 13, 14 };
            final byte[] payload3 = new byte[] { 15 };

            final int payloadSize = payload1.length + payload2.length + payload3.length;

            // Populate all Header values
            Header header = new Header();
            header.setDurable(true);
            header.setPriority((byte) 1);
            header.setDeliveryCount(1);

            tracker.header(header);

            // Populate message application properties
            tracker.property("ap1", 1);
            tracker.property("ap2", 2);
            tracker.property("ap3", 3);

            OutputStreamOptions options = new OutputStreamOptions().bodyLength(payloadSize);
            OutputStream stream = tracker.body(options);

            HeaderMatcher headerMatcher = new HeaderMatcher(true);
            headerMatcher.withDurable(true);
            headerMatcher.withPriority((byte) 1);
            headerMatcher.withDeliveryCount(1);
            ApplicationPropertiesMatcher apMatcher = new ApplicationPropertiesMatcher(true);
            apMatcher.withEntry("ap1", Matchers.equalTo(1));
            apMatcher.withEntry("ap2", Matchers.equalTo(2));
            apMatcher.withEntry("ap3", Matchers.equalTo(3));
            EncodedPartialDataSectionMatcher partialDataMatcher = new EncodedPartialDataSectionMatcher(payloadSize, payload1);
            TransferPayloadCompositeMatcher payloadMatcher = new TransferPayloadCompositeMatcher();
            payloadMatcher.setHeadersMatcher(headerMatcher);
            payloadMatcher.setMessageContentMatcher(partialDataMatcher);
            payloadMatcher.setApplicationPropertiesMatcher(apMatcher);

            peer.waitForScriptToComplete(5, TimeUnit.SECONDS);
            peer.expectTransfer().withPayload(payloadMatcher);

            stream.write(payload1);
            stream.flush();

            peer.waitForScriptToComplete(5, TimeUnit.SECONDS);
            partialDataMatcher = new EncodedPartialDataSectionMatcher(payload2);
            payloadMatcher = new TransferPayloadCompositeMatcher();
            payloadMatcher.setMessageContentMatcher(partialDataMatcher);
            peer.expectTransfer().withMore(true).withPayload(partialDataMatcher);

            stream.write(payload2);
            stream.flush();

            peer.waitForScriptToComplete(5, TimeUnit.SECONDS);
            partialDataMatcher = new EncodedPartialDataSectionMatcher(payload3);
            payloadMatcher = new TransferPayloadCompositeMatcher();
            payloadMatcher.setMessageContentMatcher(partialDataMatcher);
            peer.expectTransfer().withMore(false).withPayload(partialDataMatcher).accept();
            peer.expectDetach().respond();
            peer.expectEnd().respond();
            peer.expectClose().respond();

            stream.write(payload3);
            stream.flush();

            // Stream should already be completed so no additional frames should be written.
            stream.close();

            sender.closeAsync().get();
            connection.closeAsync().get();

            peer.waitForScriptToComplete(5, TimeUnit.SECONDS);
        }
    }

    @Test
    void testRawOutputStreamFromMessageWritesUnmodifiedBytes() throws Exception {
        try (ProtonTestServer peer = new ProtonTestServer()) {
            peer.expectSASLAnonymousConnect();
            peer.expectOpen().respond();
            peer.expectBegin().respond();
            peer.expectAttach().ofSender().respond();
            peer.remoteFlow().withLinkCredit(1).queue();
            peer.start();

            URI remoteURI = peer.getServerURI();

            LOG.info("Test started, peer listening on: {}", remoteURI);

            Client container = Client.create();
            Connection connection = container.connect(remoteURI.getHost(), remoteURI.getPort());
            StreamSender sender = connection.openStreamSender("test-queue");
            StreamSenderMessage message = sender.beginMessage();

            OutputStream stream = message.rawOutputStream();

            // Only one writer at a time can exist
            assertThrows(ClientIllegalStateException.class, () -> message.rawOutputStream());
            assertThrows(ClientIllegalStateException.class, () -> message.body());

            peer.waitForScriptToComplete(5, TimeUnit.SECONDS);
            peer.expectTransfer().withMore(true).withPayload(new byte[] { 0, 1, 2, 3 });
            peer.expectTransfer().withMore(false).withNullPayload();
            peer.expectDetach().respond();
            peer.expectEnd().respond();
            peer.expectClose().respond();

            stream.write(new byte[] { 0, 1, 2, 3 });
            stream.flush();
            stream.close();

            sender.closeAsync().get();
            connection.closeAsync().get();

            peer.waitForScriptToComplete(5, TimeUnit.SECONDS);
        }
    }

    @Test
    public void testStreamSenderMessageWithDeliveryAnnotations() throws Exception {
        try (ProtonTestServer peer = new ProtonTestServer()) {
            peer.expectSASLAnonymousConnect();
            peer.expectOpen().respond();
            peer.expectBegin().respond();
            peer.expectAttach().ofSender().respond();
            peer.remoteFlow().withLinkCredit(10).queue();
            peer.start();

            URI remoteURI = peer.getServerURI();

            LOG.info("Sender test started, peer listening on: {}", remoteURI);

            Client container = Client.create();
            Connection connection = container.connect(remoteURI.getHost(), remoteURI.getPort()).openFuture().get();

            // Populate delivery annotations
            final Map<String, Object> deliveryAnnotations = new HashMap<>();
            deliveryAnnotations.put("da1", 1);
            deliveryAnnotations.put("da2", 2);
            deliveryAnnotations.put("da3", 3);

            StreamSender sender = connection.openStreamSender("test-queue");
            StreamSenderMessage message = sender.beginMessage(deliveryAnnotations);

            final byte[] payload = new byte[] { 0, 1, 2, 3, 4, 5 };

            HeaderMatcher headerMatcher = new HeaderMatcher(true);
            headerMatcher.withDurable(true);
            headerMatcher.withPriority((byte) 1);
            headerMatcher.withTtl(65535);
            headerMatcher.withFirstAcquirer(true);
            headerMatcher.withDeliveryCount(2);
            PropertiesMatcher propertiesMatcher = new PropertiesMatcher(true);
            propertiesMatcher.withMessageId("ID:12345");
            propertiesMatcher.withUserId("user".getBytes(StandardCharsets.UTF_8));
            propertiesMatcher.withTo("the-management");
            propertiesMatcher.withSubject("amqp");
            propertiesMatcher.withReplyTo("the-minions");
            propertiesMatcher.withCorrelationId("abc");
            propertiesMatcher.withContentEncoding("application/json");
            propertiesMatcher.withContentType("gzip");
            propertiesMatcher.withAbsoluteExpiryTime(123);
            propertiesMatcher.withCreationTime(1);
            propertiesMatcher.withGroupId("disgruntled");
            propertiesMatcher.withGroupSequence(8192);
            propertiesMatcher.withReplyToGroupId("/dev/null");
            DeliveryAnnotationsMatcher daMatcher = new DeliveryAnnotationsMatcher(true);
            daMatcher.withEntry("da1", Matchers.equalTo(1));
            daMatcher.withEntry("da2", Matchers.equalTo(2));
            daMatcher.withEntry("da3", Matchers.equalTo(3));
            MessageAnnotationsMatcher maMatcher = new MessageAnnotationsMatcher(true);
            maMatcher.withEntry("ma1", Matchers.equalTo(1));
            maMatcher.withEntry("ma2", Matchers.equalTo(2));
            maMatcher.withEntry("ma3", Matchers.equalTo(3));
            ApplicationPropertiesMatcher apMatcher = new ApplicationPropertiesMatcher(true);
            apMatcher.withEntry("ap1", Matchers.equalTo(1));
            apMatcher.withEntry("ap2", Matchers.equalTo(2));
            apMatcher.withEntry("ap3", Matchers.equalTo(3));
            EncodedDataMatcher bodyMatcher = new EncodedDataMatcher(payload);
            TransferPayloadCompositeMatcher payloadMatcher = new TransferPayloadCompositeMatcher();
            payloadMatcher.setHeadersMatcher(headerMatcher);
            payloadMatcher.setDeliveryAnnotationsMatcher(daMatcher);
            payloadMatcher.setMessageAnnotationsMatcher(maMatcher);
            payloadMatcher.setPropertiesMatcher(propertiesMatcher);
            payloadMatcher.setApplicationPropertiesMatcher(apMatcher);
            payloadMatcher.setMessageContentMatcher(bodyMatcher);

            peer.waitForScriptToComplete(5, TimeUnit.SECONDS);
            peer.expectTransfer().withPayload(payloadMatcher).withMore(false).accept();

            // Populate all Header values
            message.durable(true);
            assertEquals(true, message.durable());
            message.priority((byte) 1);
            assertEquals(1, message.priority());
            message.timeToLive(65535);
            assertEquals(65535, message.timeToLive());
            message.firstAcquirer(true);
            assertTrue(message.firstAcquirer());
            message.deliveryCount(2);
            assertEquals(2, message.deliveryCount());
            // Populate message annotations
            assertFalse(message.hasAnnotations());
            assertFalse(message.hasAnnotation("ma1"));
            message.annotation("ma1", 1);
            assertTrue(message.hasAnnotation("ma1"));
            assertEquals(1, message.annotation("ma1"));
            message.annotation("ma2", 2);
            assertEquals(2, message.annotation("ma2"));
            message.annotation("ma3", 3);
            assertEquals(3, message.annotation("ma3"));
            assertTrue(message.hasAnnotations());
            // Populate all Properties values
            message.messageId("ID:12345");
            assertEquals("ID:12345", message.messageId());
            message.userId("user".getBytes(StandardCharsets.UTF_8));
            assertArrayEquals("user".getBytes(StandardCharsets.UTF_8), message.userId());
            message.to("the-management");
            assertEquals("the-management", message.to());
            message.subject("amqp");
            assertEquals("amqp", message.subject());
            message.replyTo("the-minions");
            assertEquals("the-minions", message.replyTo());
            message.correlationId("abc");
            assertEquals("abc", message.correlationId());
            message.contentEncoding("application/json");
            assertEquals("application/json", message.contentEncoding());
            message.contentType("gzip");
            assertEquals("gzip", message.contentType());
            message.absoluteExpiryTime(123);
            assertEquals(123, message.absoluteExpiryTime());
            message.creationTime(1);
            assertEquals(1, message.creationTime());
            message.groupId("disgruntled");
            assertEquals("disgruntled", message.groupId());
            message.groupSequence(8192);
            assertEquals(8192, message.groupSequence());
            message.replyToGroupId("/dev/null");
            assertEquals("/dev/null", message.replyToGroupId());
            // Populate message application properties
            assertFalse(message.hasProperties());
            assertFalse(message.hasProperty("ma1"));
            message.property("ap1", 1);
            assertEquals(1, message.property("ap1"));
            assertTrue(message.hasProperty("ap1"));
            message.property("ap2", 2);
            assertEquals(2, message.property("ap2"));
            message.property("ap3", 3);
            assertEquals(3, message.property("ap3"));
            assertTrue(message.hasProperties());

            OutputStream stream = message.body();

            stream.write(payload);
            stream.close();

            peer.waitForScriptToComplete(5, TimeUnit.SECONDS);
            peer.expectDetach().respond();
            peer.expectEnd().respond();
            peer.expectClose().respond();

            assertNotNull(message.tracker());
            Wait.assertTrue(() -> message.tracker().settlementFuture().isDone());
            assertTrue(message.tracker().settlementFuture().get().settled());

            sender.closeAsync().get(10, TimeUnit.SECONDS);

            connection.closeAsync().get(10, TimeUnit.SECONDS);

            peer.waitForScriptToComplete(5, TimeUnit.SECONDS);
        }
    }

    @Test
    public void testStreamSenderWritesFooterAfterStreamClosed() throws Exception {
        try (ProtonTestServer peer = new ProtonTestServer()) {
            peer.expectSASLAnonymousConnect();
            peer.expectOpen().respond();
            peer.expectBegin().respond();
            peer.expectAttach().ofSender().respond();
            peer.remoteFlow().withLinkCredit(10).queue();
            peer.start();

            URI remoteURI = peer.getServerURI();

            LOG.info("Sender test started, peer listening on: {}", remoteURI);

            Client container = Client.create();
            Connection connection = container.connect(remoteURI.getHost(), remoteURI.getPort()).openFuture().get();
            StreamSender sender = connection.openStreamSender("test-queue");
            StreamSenderMessage message = sender.beginMessage();

            final byte[] payload = new byte[] { 0, 1, 2, 3, 4, 5 };

            // First frame should include only the bits up to the body
            HeaderMatcher headerMatcher = new HeaderMatcher(true);
            headerMatcher.withDurable(true);
            headerMatcher.withPriority((byte) 1);
            headerMatcher.withTtl(65535);
            headerMatcher.withFirstAcquirer(true);
            headerMatcher.withDeliveryCount(2);
            ApplicationPropertiesMatcher apMatcher = new ApplicationPropertiesMatcher(true);
            apMatcher.withEntry("ap1", Matchers.equalTo(1));
            apMatcher.withEntry("ap2", Matchers.equalTo(2));
            apMatcher.withEntry("ap3", Matchers.equalTo(3));
            FooterMatcher footerMatcher = new FooterMatcher(false);
            footerMatcher.withEntry("f1", Matchers.equalTo(1));
            footerMatcher.withEntry("f2", Matchers.equalTo(2));
            footerMatcher.withEntry("f3", Matchers.equalTo(3));
            EncodedDataMatcher bodyMatcher = new EncodedDataMatcher(payload, true);
            TransferPayloadCompositeMatcher payloadMatcher = new TransferPayloadCompositeMatcher();
            payloadMatcher.setHeadersMatcher(headerMatcher);
            payloadMatcher.setApplicationPropertiesMatcher(apMatcher);
            payloadMatcher.setMessageContentMatcher(bodyMatcher);
            payloadMatcher.setFootersMatcher(footerMatcher);

            peer.waitForScriptToComplete(5, TimeUnit.SECONDS);
            peer.expectTransfer().withPayload(payloadMatcher).withMore(false).accept();

            // Populate all Header values
            message.durable(true);
            message.priority((byte) 1);
            message.timeToLive(65535);
            message.firstAcquirer(true);
            message.deliveryCount(2);
            // Populate message application properties
            message.property("ap1", 1);
            message.property("ap2", 2);
            message.property("ap3", 3);
            // Populate message footers
            assertFalse(message.hasFooters());
            assertFalse(message.hasFooter("f1"));
            message.footer("f1", 1);
            message.footer("f2", 2);
            message.footer("f3", 3);
            assertTrue(message.hasFooter("f1"));
            assertTrue(message.hasFooters());

            OutputStreamOptions bodyOptions = new OutputStreamOptions().completeSendOnClose(true);
            OutputStream stream = message.body(bodyOptions);

            assertThrows(ClientUnsupportedOperationException.class, () -> message.encode(Collections.emptyMap()));

            stream.write(payload);
            stream.close();

            assertThrows(ClientIllegalStateException.class, () -> message.footer(new Footer(Collections.emptyMap())));

            peer.waitForScriptToComplete(5, TimeUnit.SECONDS);
            peer.expectDetach().respond();
            peer.expectEnd().respond();
            peer.expectClose().respond();

            assertNotNull(message.tracker());
            Wait.assertTrue(() -> message.tracker().settlementFuture().isDone());
            assertTrue(message.tracker().settlementFuture().get().settled());

            sender.closeAsync().get(10, TimeUnit.SECONDS);

            connection.closeAsync().get(10, TimeUnit.SECONDS);

            peer.waitForScriptToComplete(5, TimeUnit.SECONDS);
        }
    }

    @Test
    public void testStreamSenderWritesFooterAfterMessageCompleted() throws Exception {
        try (ProtonTestServer peer = new ProtonTestServer()) {
            peer.expectSASLAnonymousConnect();
            peer.expectOpen().respond();
            peer.expectBegin().respond();
            peer.expectAttach().ofSender().respond();
            peer.remoteFlow().withLinkCredit(10).queue();
            peer.start();

            URI remoteURI = peer.getServerURI();

            LOG.info("Sender test started, peer listening on: {}", remoteURI);

            Client container = Client.create();
            Connection connection = container.connect(remoteURI.getHost(), remoteURI.getPort()).openFuture().get();
            StreamSender sender = connection.openStreamSender("test-queue");
            StreamSenderMessage message = sender.beginMessage();

            final byte[] payload = new byte[] { 0, 1, 2, 3, 4, 5 };

            // First frame should include only the bits up to the body
            HeaderMatcher headerMatcher = new HeaderMatcher(true);
            headerMatcher.withDurable(true);
            headerMatcher.withPriority((byte) 1);
            headerMatcher.withTtl(65535);
            headerMatcher.withFirstAcquirer(true);
            headerMatcher.withDeliveryCount(2);
            ApplicationPropertiesMatcher apMatcher = new ApplicationPropertiesMatcher(true);
            apMatcher.withEntry("ap1", Matchers.equalTo(1));
            apMatcher.withEntry("ap2", Matchers.equalTo(2));
            apMatcher.withEntry("ap3", Matchers.equalTo(3));
            EncodedDataMatcher bodyMatcher = new EncodedDataMatcher(payload);
            TransferPayloadCompositeMatcher payloadMatcher = new TransferPayloadCompositeMatcher();
            payloadMatcher.setHeadersMatcher(headerMatcher);
            payloadMatcher.setApplicationPropertiesMatcher(apMatcher);
            payloadMatcher.setMessageContentMatcher(bodyMatcher);

            // Second Frame should contains the appended footers
            FooterMatcher footerMatcher = new FooterMatcher(false);
            footerMatcher.withEntry("f1", Matchers.equalTo(1));
            footerMatcher.withEntry("f2", Matchers.equalTo(2));
            footerMatcher.withEntry("f3", Matchers.equalTo(3));
            TransferPayloadCompositeMatcher payloadFooterMatcher = new TransferPayloadCompositeMatcher();
            payloadFooterMatcher.setFootersMatcher(footerMatcher);

            peer.waitForScriptToComplete(5, TimeUnit.SECONDS);
            peer.expectTransfer().withPayload(payloadMatcher).withMore(true);
            peer.expectTransfer().withPayload(payloadFooterMatcher).withMore(false).accept();

            // Populate all Header values
            message.durable(true);
            message.priority((byte) 1);
            message.timeToLive(65535);
            message.firstAcquirer(true);
            message.deliveryCount(2);
            // Populate message application properties
            message.property("ap1", 1);
            message.property("ap2", 2);
            message.property("ap3", 3);

            OutputStreamOptions bodyOptions = new OutputStreamOptions().completeSendOnClose(false);
            OutputStream stream = message.body(bodyOptions);

            stream.write(payload);
            stream.close();

            // Populate message footers
            message.footer("f1", 1);
            message.footer("f2", 2);
            message.footer("f3", 3);

            message.complete();

            peer.waitForScriptToComplete(5, TimeUnit.SECONDS);
            peer.expectDetach().respond();
            peer.expectEnd().respond();
            peer.expectClose().respond();

            assertNotNull(message.tracker());
            Wait.assertTrue(() -> message.tracker().settlementFuture().isDone());
            assertTrue(message.tracker().settlementFuture().get().settled());

            sender.closeAsync().get(10, TimeUnit.SECONDS);

            connection.closeAsync().get(10, TimeUnit.SECONDS);

            peer.waitForScriptToComplete(5, TimeUnit.SECONDS);
        }
    }

    @Test
    void testAutoFlushDuringMessageSendThatExceedConfiguredBufferLimitSessionCreditLimitOnTransfer() throws Exception {
        try (ProtonTestServer peer = new ProtonTestServer()) {
            peer.expectSASLAnonymousConnect();
            peer.expectOpen().respond();
            peer.expectBegin().respond();
            peer.expectAttach().ofSender().respond();
            peer.start();

            URI remoteURI = peer.getServerURI();

            LOG.info("Test started, peer listening on: {}", remoteURI);

            Client container = Client.create();
            ConnectionOptions options = new ConnectionOptions().maxFrameSize(1024);
            Connection connection = container.connect(remoteURI.getHost(), remoteURI.getPort(), options);
            StreamSender sender = connection.openStreamSender("test-queue");

            final byte[] payload = new byte[4800];
            Arrays.fill(payload, (byte) 1);

            final AtomicBoolean sendFailed = new AtomicBoolean();
            ForkJoinPool.commonPool().execute(() -> {
                try {
                    sender.send(Message.create(payload));
                } catch (Exception e) {
                    LOG.info("send failed with error: ", e);
                    sendFailed.set(true);
                }
            });

            peer.waitForScriptToComplete(5, TimeUnit.SECONDS);
            peer.remoteFlow().withIncomingWindow(1).withNextIncomingId(1).withLinkCredit(10).now();
            peer.expectTransfer().withNonNullPayload().withMore(true);
            peer.remoteFlow().withIncomingWindow(1).withNextIncomingId(2).withLinkCredit(10).queue();
            peer.expectTransfer().withNonNullPayload().withMore(true);
            peer.remoteFlow().withIncomingWindow(1).withNextIncomingId(3).withLinkCredit(10).queue();
            peer.expectTransfer().withNonNullPayload().withMore(true);
            peer.remoteFlow().withIncomingWindow(1).withNextIncomingId(4).withLinkCredit(10).queue();
            peer.expectTransfer().withNonNullPayload().withMore(true);
            peer.remoteFlow().withIncomingWindow(1).withNextIncomingId(5).withLinkCredit(10).queue();
            peer.expectTransfer().withNonNullPayload().withMore(false).accept();

            peer.waitForScriptToComplete(5, TimeUnit.SECONDS);
            peer.expectDetach().respond();
            peer.expectEnd().respond();
            peer.expectClose().respond();

            assertFalse(sendFailed.get());

            sender.closeAsync().get();
            connection.closeAsync().get();

            peer.waitForScriptToComplete(5, TimeUnit.SECONDS);
        }
    }

    @Test
    void testConcurrentMessageSendOnlyBlocksForInitialSendInProgress() throws Exception {
        try (ProtonTestServer peer = new ProtonTestServer()) {
            peer.expectSASLAnonymousConnect();
            peer.expectOpen().respond();
            peer.expectBegin().respond();
            peer.expectAttach().ofSender().respond();
            peer.remoteFlow().withLinkCredit(2).queue();
            peer.expectBegin().respond();
            peer.expectAttach().ofReceiver().respond();
            peer.expectFlow();
            peer.expectTransfer().withNonNullPayload().withMore(false).respond().withSettled(true).withState().accepted();
            peer.expectTransfer().withNonNullPayload().withMore(false).respond().withSettled(true).withState().accepted();
            peer.start();

            URI remoteURI = peer.getServerURI();

            LOG.info("Test started, peer listening on: {}", remoteURI);

            Client container = Client.create();
            Connection connection = container.connect(remoteURI.getHost(), remoteURI.getPort());
            StreamSender sender = connection.openStreamSender("test-queue");
            sender.openFuture().get();

            // Ensure that sender gets its flow before the sends are triggered.
            connection.openReceiver("test-queue").openFuture().get();

            final byte[] payload = new byte[1024];
            Arrays.fill(payload, (byte) 1);

            // One should block on the send waiting for the others send to finish
            // otherwise they should not care about concurrency of sends.

            final AtomicBoolean sendFailed = new AtomicBoolean();
            ForkJoinPool.commonPool().execute(() -> {
                try {
                    LOG.info("Test send 1 is preparing to fire:");
                    StreamTracker tracker = sender.send(Message.create(payload));
                    tracker.awaitSettlement(10, TimeUnit.SECONDS);
                } catch (Exception e) {
                    LOG.info("Test send 1 failed with error: ", e);
                    sendFailed.set(true);
                }
            });

            ForkJoinPool.commonPool().execute(() -> {
                try {
                    LOG.info("Test send 2 is preparing to fire:");
                    StreamTracker tracker = sender.send(Message.create(payload));
                    tracker.awaitSettlement(10, TimeUnit.SECONDS);
                } catch (Exception e) {
                    LOG.info("Test send 2 failed with error: ", e);
                    sendFailed.set(true);
                }
            });

            peer.waitForScriptToComplete(5, TimeUnit.SECONDS);
            peer.expectDetach().respond();
            peer.expectEnd().respond();
            peer.expectClose().respond();

            assertFalse(sendFailed.get());

            sender.closeAsync().get();
            connection.closeAsync().get();

            peer.waitForScriptToComplete(5, TimeUnit.SECONDS);
        }
    }

    @Test
    void testConcurrentMessageSendsBlocksBehindSendWaitingForCredit() throws Exception {
        try (ProtonTestServer peer = new ProtonTestServer()) {
            peer.expectSASLAnonymousConnect();
            peer.expectOpen().respond();
            peer.expectBegin().respond();
            peer.expectAttach().ofSender().respond();
            peer.start();

            URI remoteURI = peer.getServerURI();

            LOG.info("Test started, peer listening on: {}", remoteURI);

            Client container = Client.create();
            Connection connection = container.connect(remoteURI.getHost(), remoteURI.getPort());
            StreamSender sender = connection.openStreamSender("test-queue");

            final byte[] payload = new byte[1024];
            Arrays.fill(payload, (byte) 1);

            final CountDownLatch send1Started = new CountDownLatch(1);
            final CountDownLatch send2Completed = new CountDownLatch(1);

            final AtomicBoolean sendFailed = new AtomicBoolean();
            ForkJoinPool.commonPool().execute(() -> {
                try {
                    LOG.info("Test send 1 is preparing to fire:");
                    ForkJoinPool.commonPool().execute(() -> send1Started.countDown());
                    sender.send(Message.create(payload));
                } catch (Exception e) {
                    LOG.info("Test send 1 failed with error: ", e);
                    sendFailed.set(true);
                }
            });

            ForkJoinPool.commonPool().execute(() -> {
                try {
                    assertTrue(send1Started.await(10, TimeUnit.SECONDS));
                    LOG.info("Test send 2 is preparing to fire:");
                    StreamTracker tracker = sender.send(Message.create(payload));
                    tracker.awaitSettlement(10, TimeUnit.SECONDS);
                    send2Completed.countDown();
                } catch (Exception e) {
                    LOG.info("Test send 2 failed with error: ", e);
                    sendFailed.set(true);
                }
            });

            peer.waitForScriptToComplete(5, TimeUnit.SECONDS);
            peer.remoteFlow().withIncomingWindow(1).withDeliveryCount(0).withNextIncomingId(1).withLinkCredit(1).now();
            peer.expectTransfer().withNonNullPayload().withMore(false).respond().withSettled(true).withState().accepted();
            peer.remoteFlow().withIncomingWindow(1).withDeliveryCount(1).withNextIncomingId(2).withLinkCredit(1).queue();
            peer.expectTransfer().withNonNullPayload().withMore(false).respond().withSettled(true).withState().accepted();

            assertTrue(send2Completed.await(10, TimeUnit.SECONDS));

            peer.waitForScriptToComplete(5, TimeUnit.SECONDS);
            peer.expectDetach().respond();
            peer.expectEnd().respond();
            peer.expectClose().respond();

            assertFalse(sendFailed.get());

            sender.closeAsync().get();
            connection.closeAsync().get();

            peer.waitForScriptToComplete(5, TimeUnit.SECONDS);
        }
    }

    @Test
    void testConcurrentMessageSendWaitingOnSplitFramedSendToCompleteIsSentAfterCreditUpdated() throws Exception {
        try (ProtonTestServer peer = new ProtonTestServer()) {
            peer.expectSASLAnonymousConnect();
            peer.expectOpen().respond();
            peer.expectBegin().respond();
            peer.expectAttach().ofSender().respond();
            peer.start();

            URI remoteURI = peer.getServerURI();

            LOG.info("Test started, peer listening on: {}", remoteURI);

            Client container = Client.create();
            ConnectionOptions options = new ConnectionOptions().maxFrameSize(1024);
            Connection connection = container.connect(remoteURI.getHost(), remoteURI.getPort(), options);
            StreamSender sender = connection.openStreamSender("test-queue");

            final byte[] payload = new byte[1536];
            Arrays.fill(payload, (byte) 1);

            final CountDownLatch send1Started = new CountDownLatch(1);
            final CountDownLatch send2Completed = new CountDownLatch(1);

            final AtomicBoolean sendFailed = new AtomicBoolean();
            ForkJoinPool.commonPool().execute(() -> {
                try {
                    LOG.info("Test send 1 is preparing to fire:");
                    ForkJoinPool.commonPool().execute(() -> send1Started.countDown());
                    sender.send(Message.create(payload));
                } catch (Exception e) {
                    LOG.info("Test send 1 failed with error: ", e);
                    sendFailed.set(true);
                }
            });

            ForkJoinPool.commonPool().execute(() -> {
                try {
                    assertTrue(send1Started.await(10, TimeUnit.SECONDS));
                    LOG.info("Test send 2 is preparing to fire:");
                    StreamTracker tracker = sender.send(Message.create(payload));
                    tracker.awaitSettlement(10, TimeUnit.SECONDS);
                    send2Completed.countDown();
                } catch (Exception e) {
                    LOG.info("Test send 2 failed with error: ", e);
                    sendFailed.set(true);
                }
            });

            peer.waitForScriptToComplete(5, TimeUnit.SECONDS);
            peer.remoteFlow().withIncomingWindow(1).withDeliveryCount(0).withNextIncomingId(1).withLinkCredit(1).now();
            peer.expectTransfer().withNonNullPayload().withMore(true);
            peer.remoteFlow().withIncomingWindow(1).withDeliveryCount(0).withNextIncomingId(2).withLinkCredit(1).queue();
            peer.expectTransfer().withNonNullPayload().withMore(false).respond().withSettled(true).withState().accepted();
            peer.remoteFlow().withIncomingWindow(1).withDeliveryCount(1).withNextIncomingId(3).withLinkCredit(1).queue();
            peer.expectTransfer().withNonNullPayload().withMore(true);
            peer.remoteFlow().withIncomingWindow(1).withDeliveryCount(1).withNextIncomingId(4).withLinkCredit(1).queue();
            peer.expectTransfer().withNonNullPayload().withMore(false).respond().withSettled(true).withState().accepted();

            assertTrue(send2Completed.await(10, TimeUnit.SECONDS));

            peer.waitForScriptToComplete(5, TimeUnit.SECONDS);
            peer.expectDetach().respond();
            peer.expectEnd().respond();
            peer.expectClose().respond();

            assertFalse(sendFailed.get());

            sender.closeAsync().get();
            connection.closeAsync().get();

            peer.waitForScriptToComplete(5, TimeUnit.SECONDS);
        }
    }

    @Test
    void testMessageSendWhileStreamSendIsOpenShouldBlock() throws Exception {
        try (ProtonTestServer peer = new ProtonTestServer()) {
            peer.expectSASLAnonymousConnect();
            peer.expectOpen().respond();
            peer.expectBegin().respond();
            peer.expectAttach().ofSender().respond();
            peer.remoteFlow().withLinkCredit(1).queue();
            peer.start();

            URI remoteURI = peer.getServerURI();
            final byte[] payload = new byte[1536];
            Arrays.fill(payload, (byte) 1);

            LOG.info("Test started, peer listening on: {}", remoteURI);

            Client container = Client.create();
            Connection connection = container.connect(remoteURI.getHost(), remoteURI.getPort());
            StreamSender sender = connection.openStreamSender("test-queue");
            StreamSenderMessage message = sender.beginMessage();
            OutputStreamOptions options = new OutputStreamOptions().bodyLength(8192).completeSendOnClose(false);
            OutputStream stream = message.body(options);

            final CountDownLatch sendStarted = new CountDownLatch(1);
            final CountDownLatch sendCompleted = new CountDownLatch(1);
            final AtomicBoolean sendFailed = new AtomicBoolean();

            ForkJoinPool.commonPool().execute(() -> {
                try {
                    LOG.info("Test send 1 is preparing to fire:");
                    sendStarted.countDown();
                    sender.send(Message.create(payload));
                    sendCompleted.countDown();
                } catch (Exception e) {
                    LOG.info("Test send 1 failed with error: ", e);
                    sendFailed.set(true);
                }
            });

            EncodedDataMatcher bodyMatcher = new EncodedDataMatcher(payload);
            TransferPayloadCompositeMatcher payloadMatcher = new TransferPayloadCompositeMatcher();
            payloadMatcher.setMessageContentMatcher(bodyMatcher);

            peer.waitForScriptToComplete(5, TimeUnit.SECONDS);
            peer.expectTransfer().withPayload(payloadMatcher).accept();
            peer.expectDetach().respond();
            peer.expectEnd().respond();
            peer.expectClose().respond();

            assertTrue(sendStarted.await(10, TimeUnit.SECONDS));

            // This should abort the streamed send as we provided a size for the body.
            stream.close();
            assertTrue(message.aborted());
            assertTrue(sendCompleted.await(100, TimeUnit.SECONDS));
            assertThrows(ClientIllegalStateException.class, () -> message.rawOutputStream());
            assertThrows(ClientIllegalStateException.class, () -> message.body());

            sender.closeAsync().get();
            connection.closeAsync().get();

            peer.waitForScriptToComplete(5, TimeUnit.SECONDS);
        }
    }

    @Test
    public void testStreamSenderSessionCannotCreateNewResources() throws Exception {
        try (ProtonTestServer peer = new ProtonTestServer()) {
            peer.expectSASLAnonymousConnect();
            peer.expectOpen().respond();
            peer.expectBegin().respond();
            peer.expectAttach().ofSender().respond();
            peer.start();

            URI remoteURI = peer.getServerURI();

            LOG.info("Test started, peer listening on: {}", remoteURI);

            Client container = Client.create();
            Connection connection = container.connect(remoteURI.getHost(), remoteURI.getPort());
            StreamSender sender = connection.openStreamSender("test-queue");

            assertThrows(ClientUnsupportedOperationException.class, () -> sender.session().openReceiver("test"));
            assertThrows(ClientUnsupportedOperationException.class, () -> sender.session().openReceiver("test", new ReceiverOptions()));
            assertThrows(ClientUnsupportedOperationException.class, () -> sender.session().openDurableReceiver("test", "test"));
            assertThrows(ClientUnsupportedOperationException.class, () -> sender.session().openDurableReceiver("test", "test", new ReceiverOptions()));
            assertThrows(ClientUnsupportedOperationException.class, () -> sender.session().openDynamicReceiver());
            assertThrows(ClientUnsupportedOperationException.class, () -> sender.session().openDynamicReceiver(new HashMap<>()));
            assertThrows(ClientUnsupportedOperationException.class, () -> sender.session().openDynamicReceiver(new ReceiverOptions()));
            assertThrows(ClientUnsupportedOperationException.class, () -> sender.session().openDynamicReceiver(new HashMap<>(), new ReceiverOptions()));
            assertThrows(ClientUnsupportedOperationException.class, () -> sender.session().openSender("test"));
            assertThrows(ClientUnsupportedOperationException.class, () -> sender.session().openSender("test", new SenderOptions()));
            assertThrows(ClientUnsupportedOperationException.class, () -> sender.session().openAnonymousSender());
            assertThrows(ClientUnsupportedOperationException.class, () -> sender.session().openAnonymousSender(new SenderOptions()));

            peer.waitForScriptToComplete(5, TimeUnit.SECONDS);

            peer.expectDetach().respond();
            peer.expectEnd().respond();
            peer.expectClose().respond();

            sender.close();
            connection.close();

            peer.waitForScriptToComplete(5, TimeUnit.SECONDS);
        }
    }

    @Test
    void testStreamMessageWaitingOnCreditWritesWhileCompleteSendWaitsInQueue() throws Exception {
        try (ProtonTestServer peer = new ProtonTestServer()) {
            peer.expectSASLAnonymousConnect();
            peer.expectOpen().respond();
            peer.expectBegin().respond();
            peer.expectAttach().ofSender().respond();
            peer.start();

            URI remoteURI = peer.getServerURI();

            LOG.info("Test started, peer listening on: {}", remoteURI);

            Client container = Client.create();
            Connection connection = container.connect(remoteURI.getHost(), remoteURI.getPort());
            StreamSender sender = connection.openStreamSender("test-queue");
            StreamSenderMessage tracker = sender.beginMessage();
            OutputStream stream = tracker.body();

            final byte[] payload1 = new byte[256];
            Arrays.fill(payload1, (byte) 1);
            final byte[] payload2 = new byte[256];
            Arrays.fill(payload2, (byte) 2);
            final byte[] payload3 = new byte[256];
            Arrays.fill(payload3, (byte) 3);

            EncodedDataMatcher dataMatcher1 = new EncodedDataMatcher(payload1);
            TransferPayloadCompositeMatcher payloadMatcher1 = new TransferPayloadCompositeMatcher();
            payloadMatcher1.setMessageContentMatcher(dataMatcher1);

            EncodedDataMatcher dataMatcher2 = new EncodedDataMatcher(payload2);
            TransferPayloadCompositeMatcher payloadMatcher2 = new TransferPayloadCompositeMatcher();
            payloadMatcher2.setMessageContentMatcher(dataMatcher2);

            EncodedDataMatcher dataMatcher3 = new EncodedDataMatcher(payload3);
            TransferPayloadCompositeMatcher payloadMatcher3 = new TransferPayloadCompositeMatcher();
            payloadMatcher3.setMessageContentMatcher(dataMatcher3);

            final AtomicBoolean sendFailed = new AtomicBoolean();
            final CountDownLatch streamSend1Complete = new CountDownLatch(1);
            // Stream won't output until some body bytes are written.
            ForkJoinPool.commonPool().execute(() -> {
                try {
                    stream.write(payload1);
                    stream.flush();
                } catch (IOException e) {
                    LOG.info("send failed with error: ", e);
                    sendFailed.set(true);
                } finally {
                    streamSend1Complete.countDown();
                }
            });

            peer.waitForScriptToComplete(5, TimeUnit.SECONDS);
            peer.expectTransfer().withPayload(payloadMatcher1).withMore(true);
            // Now trigger the next send by granting credit for payload 1
            peer.remoteFlow().withIncomingWindow(1).withNextIncomingId(1).withLinkCredit(10).now();

            assertTrue(streamSend1Complete.await(5, TimeUnit.SECONDS), "Stream sender completed first send");
            assertFalse(sendFailed.get());

            final CountDownLatch sendStarted = new CountDownLatch(1);
            final CountDownLatch sendCompleted = new CountDownLatch(1);

            ForkJoinPool.commonPool().execute(() -> {
                try {
                    LOG.info("Test send 1 is preparing to fire:");
                    sendStarted.countDown();
                    sender.send(Message.create(payload3));
                } catch (Exception e) {
                    LOG.info("Test send 1 failed with error: ", e);
                    sendFailed.set(true);
                } finally {
                    sendCompleted.countDown();
                }
            });

            assertTrue(sendStarted.await(10, TimeUnit.SECONDS));

            peer.waitForScriptToComplete(5, TimeUnit.SECONDS);
            peer.expectTransfer().withPayload(payloadMatcher2).withMore(true);
            // Queue a flow that will allow send by granting credit for payload 3 via sender.send
            peer.remoteFlow().withIncomingWindow(1).withNextIncomingId(3).withLinkCredit(10).queue();
            // Now trigger the next send by granting credit for payload 2
            peer.remoteFlow().withIncomingWindow(1).withNextIncomingId(2).withLinkCredit(10).now();

            stream.write(payload2);
            stream.flush();

            peer.waitForScriptToComplete(5, TimeUnit.SECONDS);
            peer.expectTransfer().withNullPayload().withMore(false).accept();
            peer.remoteFlow().withIncomingWindow(1).withNextIncomingId(4).withLinkCredit(10).queue();
            peer.expectTransfer().withPayload(payloadMatcher3).withMore(false);
            peer.expectDetach().respond();
            peer.expectEnd().respond();
            peer.expectClose().respond();

            stream.close();

            assertTrue(sendCompleted.await(100, TimeUnit.SECONDS));
            assertFalse(sendFailed.get());

            sender.closeAsync().get();
            connection.closeAsync().get();

            peer.waitForScriptToComplete(5, TimeUnit.SECONDS);
        }
    }

    @Test
    void testWriteToCreditLimitFramesOfMessagePayloadOneBytePerWrite() throws Exception {
        final int WRITE_COUNT = 10;

        try (ProtonTestServer peer = new ProtonTestServer()) {
            peer.expectSASLAnonymousConnect();
            peer.expectOpen().respond();
            peer.expectBegin().respond();
            peer.expectAttach().ofSender().respond();
            peer.remoteFlow().withIncomingWindow(WRITE_COUNT).withNextIncomingId(0).withLinkCredit(WRITE_COUNT).queue();
            peer.start();

            URI remoteURI = peer.getServerURI();

            LOG.info("Test started, peer listening on: {}", remoteURI);

            Client container = Client.create();
            Connection connection = container.connect(remoteURI.getHost(), remoteURI.getPort());
            StreamSender sender = connection.openStreamSender("test-queue");
            StreamSenderMessage tracker = sender.beginMessage();
            OutputStream stream = tracker.body();

            peer.waitForScriptToComplete(5, TimeUnit.SECONDS);

            final byte[][] payloads = new byte[WRITE_COUNT][256];
            for (int i = 0; i < WRITE_COUNT; ++i) {
                payloads[i] = new byte[256];
                Arrays.fill(payloads[i], (byte)(i + 1));
            }

            for (int i = 0; i < WRITE_COUNT; ++i) {
                EncodedDataMatcher dataMatcher = new EncodedDataMatcher(payloads[i]);
                TransferPayloadCompositeMatcher payloadMatcher = new TransferPayloadCompositeMatcher();
                payloadMatcher.setMessageContentMatcher(dataMatcher);

                peer.expectTransfer().withPayload(payloadMatcher).withMore(true);
            }

            for (int i = 0; i < WRITE_COUNT; ++i) {
                for (byte value : payloads[i]) {
                    stream.write(value);
                }
                stream.flush();
            }

            peer.waitForScriptToComplete(5, TimeUnit.SECONDS);
            peer.expectTransfer().withNullPayload().withMore(false).accept();

            // grant one more credit for the complete to arrive.
            peer.remoteFlow().withIncomingWindow(1).withNextIncomingId(WRITE_COUNT).withLinkCredit(1).later(10);

            stream.close();

            peer.waitForScriptToComplete(5, TimeUnit.SECONDS);
            peer.expectDetach().respond();
            peer.expectEnd().respond();
            peer.expectClose().respond();

            sender.closeAsync().get();
            connection.closeAsync().get();

            peer.waitForScriptToComplete(5, TimeUnit.SECONDS);
        }
    }

    @Test
    void testWriteToCreditLimitFramesOfMessagePayload() throws Exception {
        final int WRITE_COUNT = 10;

        try (ProtonTestServer peer = new ProtonTestServer()) {
            peer.expectSASLAnonymousConnect();
            peer.expectOpen().respond();
            peer.expectBegin().respond();
            peer.expectAttach().ofSender().respond();
            peer.remoteFlow().withIncomingWindow(WRITE_COUNT).withNextIncomingId(0).withLinkCredit(WRITE_COUNT).queue();
            peer.start();

            URI remoteURI = peer.getServerURI();

            LOG.info("Test started, peer listening on: {}", remoteURI);

            Client container = Client.create();
            Connection connection = container.connect(remoteURI.getHost(), remoteURI.getPort());
            StreamSender sender = connection.openStreamSender("test-queue");
            StreamSenderMessage tracker = sender.beginMessage();
            OutputStream stream = tracker.body();

            peer.waitForScriptToComplete(5, TimeUnit.SECONDS);

            final byte[][] payloads = new byte[WRITE_COUNT][256];
            for (int i = 0; i < WRITE_COUNT; ++i) {
                payloads[i] = new byte[256];
                Arrays.fill(payloads[i], (byte)(i + 1));
            }

            for (int i = 0; i < WRITE_COUNT; ++i) {
                EncodedDataMatcher dataMatcher = new EncodedDataMatcher(payloads[i]);
                TransferPayloadCompositeMatcher payloadMatcher = new TransferPayloadCompositeMatcher();
                payloadMatcher.setMessageContentMatcher(dataMatcher);

                peer.expectTransfer().withPayload(payloadMatcher).withMore(true);
            }

            for (int i = 0; i < WRITE_COUNT; ++i) {
                stream.write(payloads[i]);
                stream.flush();
            }

            peer.waitForScriptToComplete(5, TimeUnit.SECONDS);
            peer.expectTransfer().withNullPayload().withMore(false).accept();
            peer.expectDetach().respond();
            peer.expectEnd().respond();
            peer.expectClose().respond();

            // grant one more credit for the complete to arrive.
            peer.remoteFlow().withIncomingWindow(1).withNextIncomingId(WRITE_COUNT).withLinkCredit(1).now();

            stream.close();

            sender.closeAsync().get();
            connection.closeAsync().get();

            peer.waitForScriptToComplete(5, TimeUnit.SECONDS);
        }
    }

    @Test
    void testStreamMessageFlushFailsAfterConnectionDropped() throws Exception {
        try (ProtonTestServer peer = new ProtonTestServer()) {
            peer.expectSASLAnonymousConnect();
            peer.expectOpen().respond();
            peer.expectBegin().respond();
            peer.expectAttach().ofSender().respond();
            peer.remoteFlow().withLinkCredit(1).queue();
            peer.start();

            URI remoteURI = peer.getServerURI();

            LOG.info("Test started, peer listening on: {}", remoteURI);

            Client container = Client.create();
            Connection connection = container.connect(remoteURI.getHost(), remoteURI.getPort());
            StreamSender sender = connection.openStreamSender("test-queue");
            StreamSenderMessage message = sender.beginMessage();

            OutputStream stream = message.body();

            EncodedDataMatcher dataMatcher1 = new EncodedDataMatcher(new byte[] { 0, 1, 2, 3 });
            TransferPayloadCompositeMatcher payloadMatcher1 = new TransferPayloadCompositeMatcher();
            payloadMatcher1.setMessageContentMatcher(dataMatcher1);

            EncodedDataMatcher dataMatcher2 = new EncodedDataMatcher(new byte[] { 4, 5, 6, 7 });
            TransferPayloadCompositeMatcher payloadMatcher2 = new TransferPayloadCompositeMatcher();
            payloadMatcher2.setMessageContentMatcher(dataMatcher2);

            peer.waitForScriptToComplete(5, TimeUnit.SECONDS);
            peer.expectTransfer().withPayload(payloadMatcher1).withMore(true);
            peer.expectTransfer().withPayload(payloadMatcher2).withMore(true);
            peer.dropAfterLastHandler();

            // Write two then after connection drops the message should fail on future writes
            stream.write(new byte[] { 0, 1, 2, 3 });
            stream.flush();
            stream.write(new byte[] { 4, 5, 6, 7 });
            stream.flush();

            peer.waitForScriptToComplete();

            // Next write should fail as connection should have dropped.
            stream.write(new byte[] { 8, 9, 10, 11 });

            try {
                stream.flush();
                fail("Should not be able to flush after connection drop");
            } catch (IOException ioe) {
                assertTrue(ioe.getCause() instanceof ClientException);
            }

            connection.closeAsync().get();

            peer.waitForScriptToComplete(5, TimeUnit.SECONDS);
        }
    }

    @Test
    void testStreamMessageCloseThatFlushesFailsAfterConnectionDropped() throws Exception {
        try (ProtonTestServer peer = new ProtonTestServer()) {
            peer.expectSASLAnonymousConnect();
            peer.expectOpen().respond();
            peer.expectBegin().respond();
            peer.expectAttach().ofSender().respond();
            peer.remoteFlow().withLinkCredit(1).queue();
            peer.start();

            URI remoteURI = peer.getServerURI();

            LOG.info("Test started, peer listening on: {}", remoteURI);

            Client container = Client.create();
            Connection connection = container.connect(remoteURI.getHost(), remoteURI.getPort());
            StreamSender sender = connection.openStreamSender("test-queue");
            StreamSenderMessage message = sender.beginMessage();

            OutputStream stream = message.body();

            EncodedDataMatcher dataMatcher1 = new EncodedDataMatcher(new byte[] { 0, 1, 2, 3 });
            TransferPayloadCompositeMatcher payloadMatcher1 = new TransferPayloadCompositeMatcher();
            payloadMatcher1.setMessageContentMatcher(dataMatcher1);

            EncodedDataMatcher dataMatcher2 = new EncodedDataMatcher(new byte[] { 4, 5, 6, 7 });
            TransferPayloadCompositeMatcher payloadMatcher2 = new TransferPayloadCompositeMatcher();
            payloadMatcher2.setMessageContentMatcher(dataMatcher2);

            peer.waitForScriptToComplete(5, TimeUnit.SECONDS);
            peer.expectTransfer().withPayload(payloadMatcher1).withMore(true);
            peer.expectTransfer().withPayload(payloadMatcher2).withMore(true);
            peer.dropAfterLastHandler();

            // Write two then after connection drops the message should fail on future writes
            stream.write(new byte[] { 0, 1, 2, 3 });
            stream.flush();
            stream.write(new byte[] { 4, 5, 6, 7 });
            stream.flush();

            peer.waitForScriptToComplete();

            // Next write should fail as connection should have dropped.
            stream.write(new byte[] { 8, 9, 10, 11 });

            try {
                stream.close();
                fail("Should not be able to close after connection drop");
            } catch (IOException ioe) {
                assertTrue(ioe.getCause() instanceof ClientException);
            }

            connection.closeAsync().get();

            peer.waitForScriptToComplete(5, TimeUnit.SECONDS);
        }
    }

    @Test
    void testStreamMessageWriteThatFlushesFailsAfterConnectionDropped() throws Exception {
        try (ProtonTestServer peer = new ProtonTestServer()) {
            peer.expectSASLAnonymousConnect();
            peer.expectOpen().respond();
            peer.expectBegin().respond();
            peer.expectAttach().ofSender().respond();
            peer.remoteFlow().withLinkCredit(1).queue();
            peer.dropAfterLastHandler();
            peer.start();

            URI remoteURI = peer.getServerURI();

            LOG.info("Test started, peer listening on: {}", remoteURI);

            Client container = Client.create();
            Connection connection = container.connect(remoteURI.getHost(), remoteURI.getPort());
            StreamSenderOptions options = new StreamSenderOptions().writeBufferSize(1024);
            StreamSender sender = connection.openStreamSender("test-queue", options);
            StreamSenderMessage message = sender.beginMessage();

            byte[] payload = new byte[65535];
            Arrays.fill(payload, (byte) 65);
            OutputStreamOptions streamOptions = new OutputStreamOptions().bodyLength(65535);
            OutputStream stream = message.body(streamOptions);

            peer.waitForScriptToComplete();

            try {
                stream.write(payload);
                fail("Should not be able to write section after connection drop");
            } catch (IOException ioe) {
                assertTrue(ioe.getCause() instanceof ClientException);
            }

            connection.closeAsync().get();

            peer.waitForScriptToComplete(5, TimeUnit.SECONDS);
        }
    }

    @Test
    void testStreamMessageSendFromByteArrayInputStreamWithoutBodySizeSet() throws Exception {
        doTestStreamMessageSendFromByteArrayInputStream(false);
    }

    @Test
    void testStreamMessageSendFromByteArrayInputStreamWithBodySizeSet() throws Exception {
        doTestStreamMessageSendFromByteArrayInputStream(false);
    }

    private void doTestStreamMessageSendFromByteArrayInputStream(boolean setBodySize) throws Exception {
        final Random random = new Random(System.nanoTime());
        final byte[] array = new byte[4096];
        final ByteArrayInputStream bytesIn = new ByteArrayInputStream(array);

        // Populate the array with something other than zeros.
        random.nextBytes(array);

        EncodedCompositingDataSectionMatcher matcher = new EncodedCompositingDataSectionMatcher(array);

        try (ProtonTestServer peer = new ProtonTestServer()) {
            peer.expectSASLAnonymousConnect();
            peer.expectOpen().respond();
            peer.expectBegin().respond();
            peer.expectAttach().ofSender().respond();
            peer.remoteFlow().withLinkCredit(100).queue();
            for (int i = 0; i < (array.length / 1023); ++i) {
                peer.expectTransfer().withDeliveryId(0)
                                     .withMore(true)
                                     .withPayload(matcher);
            }
            // A small number of trailing bytes will be transmitted in the final frame.
            peer.expectTransfer().withDeliveryId(0)
                                 .withMore(false)
                                 .withPayload(matcher);
            peer.start();

            URI remoteURI = peer.getServerURI();

            LOG.info("Test started, peer listening on: {}", remoteURI);

            Client container = Client.create();
            Connection connection = container.connect(remoteURI.getHost(), remoteURI.getPort());
            StreamSenderOptions options = new StreamSenderOptions().writeBufferSize(1023);
            StreamSender sender = connection.openStreamSender("test-queue", options);
            StreamSenderMessage tracker = sender.beginMessage();

            final OutputStream stream;

            if (setBodySize) {
                stream = tracker.body(new OutputStreamOptions().bodyLength(array.length));
            } else {
                stream = tracker.body();
            }

            try {
                bytesIn.transferTo(stream);
            } finally {
                // Ensure any trailing bytes get written and transfer marked as done.
                stream.close();
            }

            peer.waitForScriptToComplete();
            peer.expectDetach().respond();
            peer.expectEnd().respond();
            peer.expectClose().respond();

            sender.close();
            connection.close();

            peer.waitForScriptToComplete(5, TimeUnit.SECONDS);
        }
    }

    @Test
    public void testBatchAddBodySectionsWritesEach() throws Exception {
        try (ProtonTestServer peer = new ProtonTestServer()) {
            peer.expectSASLAnonymousConnect();
            peer.expectOpen().respond();
            peer.expectBegin().respond();
            peer.expectBegin().respond(); // Hidden session for stream sender
            peer.expectAttach().ofSender().respond();
            peer.remoteFlow().withLinkCredit(10).queue();
            peer.expectAttach().respond();  // Open a receiver to ensure sender link has processed
            peer.expectFlow();              // the inbound flow frame we sent previously before send.
            peer.start();

            URI remoteURI = peer.getServerURI();

            LOG.info("Sender test started, peer listening on: {}", remoteURI);

            Client container = Client.create();
            Connection connection = container.connect(remoteURI.getHost(), remoteURI.getPort()).openFuture().get();
            Session session = connection.openSession().openFuture().get();

            StreamSenderOptions options = new StreamSenderOptions();
            options.deliveryMode(DeliveryMode.AT_MOST_ONCE);
            options.writeBufferSize(Integer.MAX_VALUE);

            StreamSender sender = connection.openStreamSender("test-qos", options);

            // Create a custom message format send context and ensure that no early buffer writes take place
            StreamSenderMessage message = sender.beginMessage();

            assertEquals(Header.DEFAULT_PRIORITY, message.priority());
            assertEquals(Header.DEFAULT_DELIVERY_COUNT, message.deliveryCount());
            assertEquals(Header.DEFAULT_FIRST_ACQUIRER, message.firstAcquirer());
            assertEquals(Header.DEFAULT_TIME_TO_LIVE, message.timeToLive());
            assertEquals(Header.DEFAULT_DURABILITY, message.durable());

            // Gates send on remote flow having been sent and received
            session.openReceiver("dummy").openFuture().get();

            HeaderMatcher headerMatcher = new HeaderMatcher(true);
            headerMatcher.withDurable(true);
            headerMatcher.withPriority((byte) 1);
            headerMatcher.withTtl(65535);
            headerMatcher.withFirstAcquirer(true);
            headerMatcher.withDeliveryCount(2);
            EncodedDataMatcher data1Matcher = new EncodedDataMatcher(new byte[] { 0, 1, 2, 3 }, true);
            EncodedDataMatcher data2Matcher = new EncodedDataMatcher(new byte[] { 4, 5, 6, 7 }, true);
            EncodedDataMatcher data3Matcher = new EncodedDataMatcher(new byte[] { 8, 9, 0, 1 });
            TransferPayloadCompositeMatcher payloadMatcher = new TransferPayloadCompositeMatcher();
            payloadMatcher.setHeadersMatcher(headerMatcher);
            payloadMatcher.addMessageContentMatcher(data1Matcher);
            payloadMatcher.addMessageContentMatcher(data2Matcher);
            payloadMatcher.addMessageContentMatcher(data3Matcher);

            peer.waitForScriptToComplete(5, TimeUnit.SECONDS);
            peer.expectTransfer().withMore(false).withPayload(payloadMatcher).accept();
            peer.expectDetach().respond();
            peer.expectEnd().respond();
            peer.expectClose().respond();

            // Populate all Header values
            Header header = new Header();
            header.setDurable(true);
            header.setPriority((byte) 1);
            header.setTimeToLive(65535);
            header.setFirstAcquirer(true);
            header.setDeliveryCount(2);

            List<Section<?>> sections = new ArrayList<>(3);
            sections.add(new Data(new byte[] { 0, 1, 2, 3 }));
            sections.add(new Data(new byte[] { 4, 5, 6, 7 }));
            sections.add(new Data(new byte[] { 8, 9, 0, 1 }));

            message.header(header);
            message.bodySections(sections);

            message.complete();

            assertEquals(message, message.complete()); // Should no-op at this point
            Wait.assertTrue(() ->message.tracker().settlementFuture().isDone());
            assertTrue(message.tracker().settlementFuture().get().settled());

            sender.closeAsync().get(10, TimeUnit.SECONDS);

            connection.closeAsync().get(10, TimeUnit.SECONDS);

            peer.waitForScriptToComplete(5, TimeUnit.SECONDS);
        }
    }

    private static DeliveryTagGenerator customTagGenerator() {
        return new DeliveryTagGenerator() {

            private int count = 1;

            @Override
            public DeliveryTag nextTag() {
                switch (count++) {
                    case 1:
                        return new DeliveryTag.ProtonDeliveryTag(new byte[] { 1, 1, 1 });
                    case 2:
                        return new DeliveryTag.ProtonDeliveryTag(new byte[] { 2, 2, 2 });
                    case 3:
                        return new DeliveryTag.ProtonDeliveryTag(new byte[] { 3, 3, 3 });
                    default:
                        throw new UnsupportedOperationException("Only supports creating three tags");
                }
            }
        };
    }

    @Test
    public void testSendeUsesCustomDeliveryTagGeneratorConfiguration() throws Exception {
        try (ProtonTestServer peer = new ProtonTestServer()) {
            peer.expectSASLAnonymousConnect();
            peer.expectOpen().respond();
            peer.expectBegin().respond(); // Hidden session for stream sender
            peer.expectAttach().ofSender().respond();
            peer.remoteFlow().withLinkCredit(10).queue();
            peer.start();

            URI remoteURI = peer.getServerURI();

            LOG.info("Sender test started, peer listening on: {}", remoteURI);

            Client container = Client.create();
            Connection connection = container.connect(remoteURI.getHost(), remoteURI.getPort()).openFuture().get();

            StreamSenderOptions options = new StreamSenderOptions().deliveryMode(DeliveryMode.AT_LEAST_ONCE)
                                                                   .autoSettle(false)
                                                                   .deliveryTagGeneratorSupplier(StreamSenderTest::customTagGenerator);
            StreamSender sender = connection.openStreamSender("test-tags", options).openFuture().get();

            peer.waitForScriptToComplete(5, TimeUnit.SECONDS);
            peer.expectTransfer().withNonNullPayload()
                                 .withDeliveryTag(new byte[] {1, 1, 1}).respond().withSettled(true).withState().accepted();
            peer.expectTransfer().withNonNullPayload()
                                 .withDeliveryTag(new byte[] {2, 2, 2}).respond().withSettled(true).withState().accepted();
            peer.expectTransfer().withNonNullPayload()
                                 .withDeliveryTag(new byte[] {3, 3, 3}).respond().withSettled(true).withState().accepted();
            peer.expectDetach().respond();
            peer.expectEnd().respond(); // From hidden stream sender session
            peer.expectClose().respond();

            final Message<String> message = Message.create("Hello World");
            final StreamTracker tracker1 = sender.send(message);
            final StreamTracker tracker2 = sender.send(message);
            final StreamTracker tracker3 = sender.send(message);

            assertNotNull(tracker1);
            assertNotNull(tracker1.settlementFuture().get().settled());
            assertNotNull(tracker2);
            assertNotNull(tracker2.settlementFuture().get().settled());
            assertNotNull(tracker3);
            assertNotNull(tracker3.settlementFuture().get().settled());

            sender.closeAsync().get(10, TimeUnit.SECONDS);

            connection.closeAsync().get(10, TimeUnit.SECONDS);

            peer.waitForScriptToComplete(5, TimeUnit.SECONDS);
        }
    }

    @Test
    public void testCannotCreateSenderWhenTagGeneratorReturnsNull() throws Exception {
        try (ProtonTestServer peer = new ProtonTestServer()) {
            peer.expectSASLAnonymousConnect();
            peer.expectOpen().respond();
            peer.expectBegin().respond(); // Hidden session for stream sender
            peer.expectClose().respond();
            peer.start();

            URI remoteURI = peer.getServerURI();

            LOG.info("Sender test started, peer listening on: {}", remoteURI);

            Client container = Client.create();
            Connection connection = container.connect(remoteURI.getHost(), remoteURI.getPort()).openFuture().get();

            StreamSenderOptions options = new StreamSenderOptions().deliveryMode(DeliveryMode.AT_LEAST_ONCE)
                                                                   .autoSettle(false)
                                                                   .deliveryTagGeneratorSupplier(() -> null);
            try {
                connection.openStreamSender("test-tags", options).openFuture().get();
                fail("Should not create a sender if the tag generator is not supplied");
            } catch (ClientException cliEx) {
                // Expected
            }

            connection.closeAsync().get(10, TimeUnit.SECONDS);

            peer.waitForScriptToComplete(5, TimeUnit.SECONDS);
        }
    }
}
