/*
 * Licensed to the Apache Software Foundation (ASF) under one or more
 * contributor license agreements.  See the NOTICE file distributed with
 * this work for additional information regarding copyright ownership.
 * The ASF licenses this file to You under the Apache License, Version 2.0
 * (the "License"); you may not use this file except in compliance with
 * the License.  You may obtain a copy of the License at
 *
 *      http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */
package org.apache.qpid.protonj2.client.impl;

import static org.hamcrest.CoreMatchers.nullValue;
import static org.junit.jupiter.api.Assertions.assertArrayEquals;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertNotNull;
import static org.junit.jupiter.api.Assertions.assertNotSame;
import static org.junit.jupiter.api.Assertions.assertNull;
import static org.junit.jupiter.api.Assertions.assertSame;
import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.junit.jupiter.api.Assertions.fail;

import java.io.IOException;
import java.io.InputStream;
import java.net.URI;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Random;
import java.util.UUID;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;

import org.apache.qpid.protonj2.client.Client;
import org.apache.qpid.protonj2.client.Connection;
import org.apache.qpid.protonj2.client.ConnectionOptions;
import org.apache.qpid.protonj2.client.DeliveryState;
import org.apache.qpid.protonj2.client.ErrorCondition;
import org.apache.qpid.protonj2.client.Receiver;
import org.apache.qpid.protonj2.client.ReceiverOptions;
import org.apache.qpid.protonj2.client.SenderOptions;
import org.apache.qpid.protonj2.client.StreamDelivery;
import org.apache.qpid.protonj2.client.StreamReceiver;
import org.apache.qpid.protonj2.client.StreamReceiverMessage;
import org.apache.qpid.protonj2.client.StreamReceiverOptions;
import org.apache.qpid.protonj2.client.exceptions.ClientDeliveryAbortedException;
import org.apache.qpid.protonj2.client.exceptions.ClientException;
import org.apache.qpid.protonj2.client.exceptions.ClientIllegalStateException;
import org.apache.qpid.protonj2.client.exceptions.ClientLinkRemotelyClosedException;
import org.apache.qpid.protonj2.client.exceptions.ClientOperationTimedOutException;
import org.apache.qpid.protonj2.client.exceptions.ClientUnsupportedOperationException;
import org.apache.qpid.protonj2.client.test.ImperativeClientTestCase;
import org.apache.qpid.protonj2.client.test.Wait;
import org.apache.qpid.protonj2.codec.EncodingCodes;
import org.apache.qpid.protonj2.test.driver.ProtonTestServer;
import org.apache.qpid.protonj2.test.driver.codec.messaging.Accepted;
import org.apache.qpid.protonj2.test.driver.codec.messaging.DeliveryAnnotations;
import org.apache.qpid.protonj2.types.Binary;
import org.apache.qpid.protonj2.types.Symbol;
import org.apache.qpid.protonj2.types.UnsignedInteger;
import org.apache.qpid.protonj2.types.messaging.AmqpSequence;
import org.apache.qpid.protonj2.types.messaging.AmqpValue;
import org.apache.qpid.protonj2.types.messaging.ApplicationProperties;
import org.apache.qpid.protonj2.types.messaging.Data;
import org.apache.qpid.protonj2.types.messaging.Footer;
import org.apache.qpid.protonj2.types.messaging.Header;
import org.apache.qpid.protonj2.types.messaging.MessageAnnotations;
import org.apache.qpid.protonj2.types.messaging.Properties;
import org.apache.qpid.protonj2.types.transport.Role;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.Timeout;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
 * Tests the {@link StreamReceiver} implementation
 */
@Timeout(20)
class StreamReceiverTest extends ImperativeClientTestCase {

    private static final Logger LOG = LoggerFactory.getLogger(StreamReceiverTest.class);

    @Test
    public void testCreateReceiverAndClose() throws Exception {
        doTestCreateReceiverAndCloseOrDetachLink(true);
    }

    @Test
    public void testCreateReceiverAndDetach() throws Exception {
        doTestCreateReceiverAndCloseOrDetachLink(false);
    }

    private void doTestCreateReceiverAndCloseOrDetachLink(boolean close) throws Exception {
        try (ProtonTestServer peer = new ProtonTestServer()) {
            peer.expectSASLAnonymousConnect();
            peer.expectOpen().respond();
            peer.expectBegin().respond();
            peer.expectAttach().ofReceiver().respond();
            peer.expectFlow().withLinkCredit(10);
            peer.expectDetach().withClosed(close).respond();
            peer.expectEnd().respond();
            peer.expectClose().respond();
            peer.start();

            URI remoteURI = peer.getServerURI();

            LOG.info("Test started, peer listening on: {}", remoteURI);

            Client container = Client.create();
            Connection connection = container.connect(remoteURI.getHost(), remoteURI.getPort());
            StreamReceiver receiver = connection.openStreamReceiver("test-queue");
            receiver.openFuture().get(10, TimeUnit.SECONDS);

            assertSame(container, receiver.client());
            assertSame(connection, receiver.connection());

            if (close) {
                receiver.closeAsync().get(10, TimeUnit.SECONDS);
            } else {
                receiver.detachAsync().get(10, TimeUnit.SECONDS);
            }

            connection.closeAsync().get(10, TimeUnit.SECONDS);

            peer.waitForScriptToComplete(5, TimeUnit.SECONDS);
        }
    }

    @Test
    public void testCreateReceiverAndCloseSync() throws Exception {
        doTestCreateReceiverAndCloseOrDetachSyncLink(true);
    }

    @Test
    public void testCreateReceiverAndDetachSync() throws Exception {
        doTestCreateReceiverAndCloseOrDetachSyncLink(false);
    }

    private void doTestCreateReceiverAndCloseOrDetachSyncLink(boolean close) throws Exception {
        try (ProtonTestServer peer = new ProtonTestServer()) {
            peer.expectSASLAnonymousConnect();
            peer.expectOpen().respond();
            peer.expectBegin().respond();
            peer.expectAttach().ofReceiver().respond();
            peer.expectFlow().withLinkCredit(10);
            peer.expectDetach().withClosed(close).respond();
            peer.expectEnd().respond();
            peer.expectClose().respond();
            peer.start();

            URI remoteURI = peer.getServerURI();

            LOG.info("Test started, peer listening on: {}", remoteURI);

            Client container = Client.create();
            Connection connection = container.connect(remoteURI.getHost(), remoteURI.getPort());
            StreamReceiver receiver = connection.openStreamReceiver("test-queue");
            receiver.openFuture().get(10, TimeUnit.SECONDS);

            if (close) {
                receiver.close();
            } else {
                receiver.detach();
            }

            connection.closeAsync().get(10, TimeUnit.SECONDS);

            peer.waitForScriptToComplete(5, TimeUnit.SECONDS);
        }
    }

    @Test
    public void testCreateReceiverAndCloseWithErrorSync() throws Exception {
        doTestCreateReceiverAndCloseOrDetachWithErrorSync(true);
    }

    @Test
    public void testCreateReceiverAndDetachWithErrorSync() throws Exception {
        doTestCreateReceiverAndCloseOrDetachWithErrorSync(false);
    }

    private void doTestCreateReceiverAndCloseOrDetachWithErrorSync(boolean close) throws Exception {
        try (ProtonTestServer peer = new ProtonTestServer()) {
            peer.expectSASLAnonymousConnect();
            peer.expectOpen().respond();
            peer.expectBegin().respond();
            peer.expectAttach().ofReceiver().respond();
            peer.expectFlow();
            peer.expectDetach().withError("amqp-resource-deleted", "an error message").withClosed(close).respond();
            peer.expectEnd().respond();
            peer.expectClose().respond();
            peer.start();

            URI remoteURI = peer.getServerURI();

            LOG.info("Sender test started, peer listening on: {}", remoteURI);

            Client container = Client.create();
            Connection connection = container.connect(remoteURI.getHost(), remoteURI.getPort());
            StreamReceiver receiver = connection.openStreamReceiver("test-queue");
            receiver.openFuture().get(10, TimeUnit.SECONDS);

            if (close) {
                receiver.close(ErrorCondition.create("amqp-resource-deleted", "an error message", null));
            } else {
                receiver.detach(ErrorCondition.create("amqp-resource-deleted", "an error message", null));
            }

            connection.closeAsync().get(10, TimeUnit.SECONDS);

            peer.waitForScriptToComplete(5, TimeUnit.SECONDS);
        }
    }

    @Test
    public void testCreateReceiverAndCloseWithErrorAsync() throws Exception {
        doTestCreateReceiverAndCloseOrDetachWithErrorAsync(true);
    }

    @Test
    public void testCreateReceiverAndDetachWithErrorAsync() throws Exception {
        doTestCreateReceiverAndCloseOrDetachWithErrorAsync(false);
    }

    private void doTestCreateReceiverAndCloseOrDetachWithErrorAsync(boolean close) throws Exception {
        try (ProtonTestServer peer = new ProtonTestServer()) {
            peer.expectSASLAnonymousConnect();
            peer.expectOpen().respond();
            peer.expectBegin().respond();
            peer.expectAttach().ofReceiver().respond();
            peer.expectFlow();
            peer.expectDetach().withError("amqp-resource-deleted", "an error message").withClosed(close).respond();
            peer.expectEnd().respond();
            peer.expectClose().respond();
            peer.start();

            URI remoteURI = peer.getServerURI();

            LOG.info("Sender test started, peer listening on: {}", remoteURI);

            Client container = Client.create();
            Connection connection = container.connect(remoteURI.getHost(), remoteURI.getPort());
            StreamReceiver receiver = connection.openStreamReceiver("test-queue");
            receiver.openFuture().get(10, TimeUnit.SECONDS);

            if (close) {
                receiver.closeAsync(ErrorCondition.create("amqp-resource-deleted", "an error message", null)).get();
            } else {
                receiver.detachAsync(ErrorCondition.create("amqp-resource-deleted", "an error message", null)).get();
            }

            connection.closeAsync().get(10, TimeUnit.SECONDS);

            peer.waitForScriptToComplete(5, TimeUnit.SECONDS);
        }
    }

    @Test
    public void testStreamReceiverConfiguresSessionCapacity_1() throws Exception {
        // Read buffer is always halved by connection when creating new session for the stream
        doTestStreamReceiverSessionCapacity(100_000, 200_000, 1);
    }

    @Test
    public void testStreamReceiverConfiguresSessionCapacity_2() throws Exception {
        // Read buffer is always halved by connection when creating new session for the stream
        doTestStreamReceiverSessionCapacity(100_000, 400_000, 2);
    }

    @Test
    public void testStreamReceiverConfiguresSessionCapacity_3() throws Exception {
        // Read buffer is always halved by connection when creating new session for the stream
        doTestStreamReceiverSessionCapacity(100_000, 600_000, 3);
    }

    @Test
    public void testStreamReceiverConfiguresSessionCapacityIdenticalToMaxFrameSize() throws Exception {
        // Read buffer is always halved by connection when creating new session for the stream
        // unless it falls at the max frame size value which means only one is possible, in this
        // case the user configured session window the same as than max frame size so only one
        // frame is possible.
        doTestStreamReceiverSessionCapacity(100_000, 100_000, 1);
    }

    @Test
    public void testStreamReceiverConfiguresSessionCapacityLowerThanMaxFrameSize() throws Exception {
        // Read buffer is always halved by connection when creating new session for the stream
        // unless it falls at the max frame size value which means only one is possible, in this
        // case the user configured session window lower than max frame size and the client auto
        // adjusts that to one frame.
        doTestStreamReceiverSessionCapacity(100_000, 50_000, 1);
    }

    private void doTestStreamReceiverSessionCapacity(int maxFrameSize, int readBufferSize, int expectedSessionWindow) throws Exception {
        try (ProtonTestServer peer = new ProtonTestServer()) {
            peer.expectSASLAnonymousConnect();
            peer.expectOpen().withMaxFrameSize(maxFrameSize).respond();
            peer.expectBegin().withIncomingWindow(expectedSessionWindow).respond();
            peer.expectAttach().ofReceiver().respond();
            peer.expectFlow().withIncomingWindow(expectedSessionWindow);
            peer.expectDetach().respond();
            peer.expectEnd().respond();
            peer.expectClose().respond();
            peer.start();

            URI remoteURI = peer.getServerURI();

            LOG.info("Test started, peer listening on: {}", remoteURI);

            Client container = Client.create();
            ConnectionOptions connectionOptions = new ConnectionOptions().maxFrameSize(maxFrameSize);
            Connection connection = container.connect(remoteURI.getHost(), remoteURI.getPort(), connectionOptions);
            StreamReceiverOptions streamOptions = new StreamReceiverOptions().readBufferSize(readBufferSize);
            StreamReceiver receiver = connection.openStreamReceiver("test-queue", streamOptions);

            receiver.openFuture().get();
            receiver.closeAsync().get();
            connection.closeAsync().get();

            peer.waitForScriptToComplete(5, TimeUnit.SECONDS);
        }
    }

    @Test
    public void testOpenStreamReceiverWithLinCapabilities() throws Exception {
        try (ProtonTestServer peer = new ProtonTestServer()) {
            peer.expectSASLAnonymousConnect();
            peer.expectOpen().respond();
            peer.expectBegin().respond();
            peer.expectAttach().withRole(Role.RECEIVER.getValue())
                               .withSource().withCapabilities("queue")
                               .withDistributionMode(nullValue())
                               .and().respond();
            peer.expectFlow();
            peer.expectDetach().respond();
            peer.expectEnd().respond();
            peer.expectClose().respond();
            peer.start();

            URI remoteURI = peer.getServerURI();

            LOG.info("StreamReceiver test started, peer listening on: {}", remoteURI);

            Client container = Client.create();
            Connection connection = container.connect(remoteURI.getHost(), remoteURI.getPort());
            StreamReceiverOptions receiverOptions = new StreamReceiverOptions();
            receiverOptions.sourceOptions().capabilities("queue");
            StreamReceiver receiver = connection.openStreamReceiver("test-queue", receiverOptions);

            receiver.openFuture().get();

            assertSame(container, receiver.client());
            assertSame(connection, receiver.connection());

            receiver.close();

            connection.closeAsync().get(10, TimeUnit.SECONDS);

            peer.waitForScriptToComplete(5, TimeUnit.SECONDS);
        }
    }

    @Test
    public void testCreateStreamDeliveryWithoutAnyIncomingDeliveryPresent() throws Exception {
        try (ProtonTestServer peer = new ProtonTestServer()) {
            peer.expectSASLAnonymousConnect();
            peer.expectOpen().respond();
            peer.expectBegin().respond();
            peer.expectAttach().withRole(Role.RECEIVER.getValue()).respond();
            peer.expectFlow();
            peer.start();

            URI remoteURI = peer.getServerURI();

            LOG.info("Test started, peer listening on: {}", remoteURI);

            Client container = Client.create();
            Connection connection = container.connect(remoteURI.getHost(), remoteURI.getPort());
            StreamReceiver receiver = connection.openStreamReceiver("test-queue");
            StreamDelivery delivery = receiver.receive(5, TimeUnit.MILLISECONDS);

            assertNull(delivery);

            peer.waitForScriptToComplete(5, TimeUnit.SECONDS);

            peer.expectDetach().respond();
            peer.expectEnd().respond();
            peer.expectClose().respond();

            receiver.close();
            connection.close();

            peer.waitForScriptToComplete(5, TimeUnit.SECONDS);
        }
    }

    @Test
    public void testStreamReceiverAwaitTimedCanBePerformedMultipleTimes() throws Exception {
        try (ProtonTestServer peer = new ProtonTestServer()) {
            peer.expectSASLAnonymousConnect();
            peer.expectOpen().respond();
            peer.expectBegin().respond();
            peer.expectAttach().withRole(Role.RECEIVER.getValue()).respond();
            peer.expectFlow();
            peer.start();

            URI remoteURI = peer.getServerURI();

            LOG.info("Test started, peer listening on: {}", remoteURI);

            Client container = Client.create();
            Connection connection = container.connect(remoteURI.getHost(), remoteURI.getPort());
            StreamReceiver receiver = connection.openStreamReceiver("test-queue");

            peer.waitForScriptToComplete(5, TimeUnit.SECONDS);

            assertNull(receiver.receive(3, TimeUnit.MILLISECONDS));
            assertNull(receiver.receive(3, TimeUnit.MILLISECONDS));
            assertNull(receiver.receive(3, TimeUnit.MILLISECONDS));

            peer.expectDetach().respond();
            peer.expectEnd().respond();
            peer.expectClose().respond();

            receiver.close();
            connection.closeAsync().get();

            peer.waitForScriptToComplete(5, TimeUnit.SECONDS);
        }
    }

    @Test
    public void testReceiveFailsWhenLinkRemotelyClosed() throws Exception {
        doTestReceiveFailsWhenLinkRemotelyClose(false);
    }

    @Test
    public void testTimedReceiveFailsWhenLinkRemotelyClosed() throws Exception {
        doTestReceiveFailsWhenLinkRemotelyClose(true);
    }

    private void doTestReceiveFailsWhenLinkRemotelyClose(boolean timed) throws Exception {
        try (ProtonTestServer peer = new ProtonTestServer()) {
            peer.expectSASLAnonymousConnect();
            peer.expectOpen().respond();
            peer.expectBegin().respond();
            peer.expectAttach().withRole(Role.RECEIVER.getValue()).respond();
            peer.expectFlow();
            peer.start();

            URI remoteURI = peer.getServerURI();

            LOG.info("Test started, peer listening on: {}", remoteURI);

            Client container = Client.create();
            Connection connection = container.connect(remoteURI.getHost(), remoteURI.getPort());
            StreamReceiver receiver = connection.openStreamReceiver("test-queue");

            peer.waitForScriptToComplete(5, TimeUnit.SECONDS);
            peer.expectDetach();
            peer.expectEnd().respond();
            peer.expectClose().respond();
            peer.remoteDetach().later(50);

            if (timed) {
                assertThrows(ClientLinkRemotelyClosedException.class, () -> receiver.receive(1, TimeUnit.MINUTES));
            } else {
                assertThrows(ClientLinkRemotelyClosedException.class, () -> receiver.receive());
            }

            receiver.closeAsync();
            connection.closeAsync().get();

            peer.waitForScriptToComplete(5, TimeUnit.SECONDS);
        }
    }

    @Test
    public void testStreamDeliveryUsesUnsettledDeliveryOnOpen() throws Exception {
        final byte[] payload = createEncodedMessage(new AmqpValue<>("Hello World"));

        try (ProtonTestServer peer = new ProtonTestServer()) {
            peer.expectSASLAnonymousConnect();
            peer.expectOpen().respond();
            peer.expectBegin().respond();
            peer.expectAttach().withRole(Role.RECEIVER.getValue()).respond();
            peer.expectFlow();
            peer.remoteTransfer().withHandle(0)
                                 .withDeliveryId(0)
                                 .withDeliveryTag(new byte[] { 1 })
                                 .withMore(false)
                                 .withMessageFormat(0)
                                 .withPayload(payload).queue();
            peer.remoteDisposition().withRole(Role.SENDER.getValue())
                                    .withFirst(0)
                                    .withSettled(true)
                                    .withState(Accepted.getInstance()).queue();
            peer.start();

            URI remoteURI = peer.getServerURI();

            LOG.info("Test started, peer listening on: {}", remoteURI);

            final Client container = Client.create();
            final Connection connection = container.connect(remoteURI.getHost(), remoteURI.getPort());
            final StreamReceiverOptions options = new StreamReceiverOptions().autoAccept(false);
            final StreamReceiver receiver = connection.openStreamReceiver("test-queue", options);

            peer.waitForScriptToComplete(5, TimeUnit.SECONDS);

            final StreamDelivery delivery = receiver.receive();

            Wait.assertTrue("Should eventually be remotely settled", delivery::remoteSettled);
            Wait.assertTrue(() -> { return delivery.remoteState() == DeliveryState.accepted(); });

            peer.expectDetach().respond();
            peer.expectEnd().respond();
            peer.expectClose().respond();

            receiver.close();
            connection.close();

            peer.waitForScriptToComplete(5, TimeUnit.SECONDS);
        }
    }

    @Test
    public void testStreamDeliveryReceiveWithTransferAlreadyComplete() throws Exception {
        doTestStreamDeliveryReceiveWithTransferAlreadyComplete(false);
    }

    @Test
    public void testStreamDeliveryTryReceiveWithTransferAlreadyComplete() throws Exception {
        doTestStreamDeliveryReceiveWithTransferAlreadyComplete(true);
    }

    private void doTestStreamDeliveryReceiveWithTransferAlreadyComplete(boolean tryReceive) throws Exception {
        final byte[] payload = createEncodedMessage(new AmqpValue<>("Hello World"));

        try (ProtonTestServer peer = new ProtonTestServer()) {
            peer.expectSASLAnonymousConnect();
            peer.expectOpen().respond();
            peer.expectBegin().respond();
            peer.expectAttach().ofReceiver().respond();
            peer.expectFlow();
            peer.remoteTransfer().withHandle(0)
                                 .withDeliveryId(0)
                                 .withDeliveryTag(new byte[] { 1 })
                                 .withMore(false)
                                 .withMessageFormat(0)
                                 .withPayload(payload).queue();
            peer.start();

            URI remoteURI = peer.getServerURI();

            LOG.info("Test started, peer listening on: {}", remoteURI);

            final Client container = Client.create();
            final Connection connection = container.connect(remoteURI.getHost(), remoteURI.getPort());
            final StreamReceiver receiver = connection.openStreamReceiver("test-queue");

            peer.waitForScriptToComplete(5, TimeUnit.SECONDS);
            peer.expectBegin().respond();
            peer.expectAttach().ofSender().respond();
            peer.expectDisposition().withState().accepted().withSettled(true);

            // Ensures that stream receiver has the delivery in its queue.
            connection.openSender("test-sender").openFuture().get();

            final StreamDelivery delivery;

            if (tryReceive) {
                delivery = receiver.tryReceive();
            } else {
                delivery = receiver.receive();
            }

            assertNotNull(delivery);
            assertTrue(delivery.completed());
            assertFalse(delivery.aborted());

            peer.expectDetach().respond();
            peer.expectEnd().respond();
            peer.expectClose().respond();

            receiver.close();
            connection.closeAsync().get();

            peer.waitForScriptToComplete(5, TimeUnit.SECONDS);
        }
    }

    @Test
    public void testStreamDeliveryReceivedWhileTransferIsIncomplete() throws Exception {
        final byte[] payload = createEncodedMessage(new AmqpValue<>("Hello World"));

        try (ProtonTestServer peer = new ProtonTestServer()) {
            peer.expectSASLAnonymousConnect();
            peer.expectOpen().respond();
            peer.expectBegin().respond();
            peer.expectAttach().withRole(Role.RECEIVER.getValue()).respond();
            peer.expectFlow();
            peer.remoteTransfer().withHandle(0)
                                 .withDeliveryId(0)
                                 .withDeliveryTag(new byte[] { 1 })
                                 .withMore(true)
                                 .withMessageFormat(0)
                                 .withPayload(payload).queue();
            peer.start();

            URI remoteURI = peer.getServerURI();

            LOG.info("Test started, peer listening on: {}", remoteURI);

            final Client container = Client.create();
            final Connection connection = container.connect(remoteURI.getHost(), remoteURI.getPort());
            final StreamReceiver receiver = connection.openStreamReceiver("test-queue");
            final StreamDelivery delivery = receiver.receive();

            assertNotNull(delivery);
            assertFalse(delivery.completed());

            peer.waitForScriptToComplete(5, TimeUnit.SECONDS);
            peer.expectDisposition().withSettled(true);

            peer.remoteTransfer().withHandle(0)
                                 .withDeliveryId(0)
                                 .withNullDeliveryTag()
                                 .withMore(false)
                                 .withMessageFormat(0)
                                 .withPayload(payload).now();

            Wait.assertTrue("Should eventually be marked as completed", delivery::completed);

            peer.expectDetach().respond();
            peer.expectEnd().respond();
            peer.expectClose().respond();

            receiver.closeAsync().get();
            connection.closeAsync().get();

            peer.waitForScriptToComplete(5, TimeUnit.SECONDS);
        }
    }

    @Test
    public void testStreamDeliveryRawInputStreamWithCompleteDeliveryReadByte() throws Exception {
        final byte[] payload = createEncodedMessage(new AmqpValue<>("Hello World"));

        try (ProtonTestServer peer = new ProtonTestServer()) {
            peer.expectSASLAnonymousConnect();
            peer.expectOpen().respond();
            peer.expectBegin().respond();
            peer.expectAttach().withRole(Role.RECEIVER.getValue()).respond();
            peer.expectFlow();
            peer.remoteTransfer().withHandle(0)
                                 .withDeliveryId(0)
                                 .withDeliveryTag(new byte[] { 1 })
                                 .withMore(false)
                                 .withMessageFormat(0)
                                 .withPayload(payload).queue();
            peer.expectDisposition().withState().accepted().withSettled(true);
            peer.start();

            URI remoteURI = peer.getServerURI();

            LOG.info("Test started, peer listening on: {}", remoteURI);

            final Client container = Client.create();
            final Connection connection = container.connect(remoteURI.getHost(), remoteURI.getPort());
            final StreamReceiver receiver = connection.openStreamReceiver("test-queue");

            final StreamDelivery delivery = receiver.receive();

            assertNotNull(delivery);
            assertTrue(delivery.completed());
            assertFalse(delivery.aborted());

            final InputStream stream = delivery.rawInputStream();
            assertNotNull(stream);

            assertEquals(payload.length, stream.available());
            final byte[] deliveryBytes = new byte[payload.length];
            for (int i = 0; i < payload.length; ++i) {
                deliveryBytes[i] = (byte) stream.read();
            }

            assertArrayEquals(payload, deliveryBytes);
            assertEquals(0, stream.available());
            assertEquals(-1, stream.read());

            stream.close();

            peer.expectDetach().respond();
            peer.expectEnd().respond();
            peer.expectClose().respond();

            receiver.close();
            connection.closeAsync().get();

            peer.waitForScriptToComplete(5, TimeUnit.SECONDS);
        }
    }

    @Test
    public void testStreamDeliveryRawInputStreamBehaviorAfterStreamClosed() throws Exception {
        final byte[] payload = createEncodedMessage(new AmqpValue<>("Hello World"));

        try (ProtonTestServer peer = new ProtonTestServer()) {
            peer.expectSASLAnonymousConnect();
            peer.expectOpen().respond();
            peer.expectBegin().respond();
            peer.expectAttach().withRole(Role.RECEIVER.getValue()).respond();
            peer.expectFlow();
            peer.remoteTransfer().withHandle(0)
                                 .withDeliveryId(0)
                                 .withDeliveryTag(new byte[] { 1 })
                                 .withMore(false)
                                 .withMessageFormat(0)
                                 .withPayload(payload).queue();
            peer.expectDisposition().withState().accepted().withSettled(true);
            peer.start();

            URI remoteURI = peer.getServerURI();

            LOG.info("Test started, peer listening on: {}", remoteURI);

            final Client container = Client.create();
            final Connection connection = container.connect(remoteURI.getHost(), remoteURI.getPort());
            final StreamReceiver receiver = connection.openStreamReceiver("test-queue");

            final StreamDelivery delivery = receiver.receive();

            assertNotNull(delivery);
            assertTrue(delivery.completed());
            assertFalse(delivery.aborted());

            final InputStream stream = delivery.rawInputStream();
            assertNotNull(stream);

            stream.close();

            final byte[] scratch = new byte[10];

            assertThrows(IOException.class, () -> stream.available());
            assertThrows(IOException.class, () -> stream.skip(1));
            assertThrows(IOException.class, () -> stream.read());
            assertThrows(IOException.class, () -> stream.read(scratch));
            assertThrows(IOException.class, () -> stream.read(scratch, 0, scratch.length));

            peer.expectDetach().respond();
            peer.expectEnd().respond();
            peer.expectClose().respond();

            receiver.close();
            connection.closeAsync().get();

            peer.waitForScriptToComplete(5, TimeUnit.SECONDS);
        }
    }

    @Test
    public void testStreamDeliveryRawInputStreamWithCompleteDeliveryReadBytes() throws Exception {
        final byte[] payload = createEncodedMessage(new AmqpValue<>("Hello World"));

        try (ProtonTestServer peer = new ProtonTestServer()) {
            peer.expectSASLAnonymousConnect();
            peer.expectOpen().respond();
            peer.expectBegin().respond();
            peer.expectAttach().withRole(Role.RECEIVER.getValue()).respond();
            peer.expectFlow();
            peer.remoteTransfer().withHandle(0)
                                 .withDeliveryId(0)
                                 .withDeliveryTag(new byte[] { 1 })
                                 .withMore(false)
                                 .withMessageFormat(0)
                                 .withPayload(payload).queue();
            peer.expectDisposition().withState().accepted().withSettled(true);
            peer.start();

            URI remoteURI = peer.getServerURI();

            LOG.info("Test started, peer listening on: {}", remoteURI);

            final Client container = Client.create();
            final Connection connection = container.connect(remoteURI.getHost(), remoteURI.getPort());
            final StreamReceiver receiver = connection.openStreamReceiver("test-queue");

            final StreamDelivery delivery = receiver.receive();

            assertNotNull(delivery);
            assertTrue(delivery.completed());
            assertFalse(delivery.aborted());

            final InputStream stream = delivery.rawInputStream();
            assertNotNull(stream);

            assertEquals(payload.length, stream.available());
            final byte[] deliveryBytes = new byte[payload.length];
            stream.read(deliveryBytes);

            assertArrayEquals(payload, deliveryBytes);
            assertEquals(0, stream.available());
            assertEquals(-1, stream.read());

            stream.close();

            peer.expectDetach().respond();
            peer.expectEnd().respond();
            peer.expectClose().respond();

            receiver.close();
            connection.closeAsync().get();

            peer.waitForScriptToComplete(5, TimeUnit.SECONDS);
        }
    }

    @Test
    public void testStreamDeliveryRawInputStreamWithInCompleteDeliveryReadBytes() throws Exception {
        final byte[] payload1 = createEncodedMessage(new Data(new byte[] { 0, 1, 2, 3, 4, 5 }));
        final byte[] payload2 = createEncodedMessage(new Data(new byte[] { 6, 7, 8, 9, 0 ,1 }));

        try (ProtonTestServer peer = new ProtonTestServer()) {
            peer.expectSASLAnonymousConnect();
            peer.expectOpen().respond();
            peer.expectBegin().respond();
            peer.expectAttach().withRole(Role.RECEIVER.getValue()).respond();
            peer.expectFlow();
            peer.remoteTransfer().withHandle(0)
                                 .withDeliveryId(0)
                                 .withDeliveryTag(new byte[] { 1 })
                                 .withMore(true)
                                 .withMessageFormat(0)
                                 .withPayload(payload1).queue();
            peer.expectDisposition().withState().accepted().withSettled(true);
            peer.start();

            URI remoteURI = peer.getServerURI();

            LOG.info("Test started, peer listening on: {}", remoteURI);

            final Client container = Client.create();
            final Connection connection = container.connect(remoteURI.getHost(), remoteURI.getPort());
            final StreamReceiver receiver = connection.openStreamReceiver("test-queue");

            final StreamDelivery delivery = receiver.receive();

            assertNotNull(delivery);
            assertFalse(delivery.completed());
            assertFalse(delivery.aborted());

            final InputStream stream = delivery.rawInputStream();
            assertNotNull(stream);

            assertEquals(payload1.length, stream.available());
            final byte[] deliveryBytes1 = new byte[payload1.length];
            stream.read(deliveryBytes1);

            assertArrayEquals(payload1, deliveryBytes1);
            assertEquals(0, stream.available());

            peer.remoteTransfer().withHandle(0)
                                 .withDeliveryId(0)
                                 .withDeliveryTag(new byte[] { 1 })
                                 .withMore(false)
                                 .withPayload(payload2).later(50);

            // Should block until more data arrives.
            final byte[] deliveryBytes2 = new byte[payload2.length];
            stream.read(deliveryBytes2);

            assertArrayEquals(payload2, deliveryBytes2);
            assertEquals(0, stream.available());

            assertTrue(delivery.completed());
            assertFalse(delivery.aborted());

            stream.close();

            peer.expectDetach().respond();
            peer.expectEnd().respond();
            peer.expectClose().respond();

            receiver.close();
            connection.close();

            peer.waitForScriptToComplete(5, TimeUnit.SECONDS);
        }
    }

    @Test
    public void testStreamDeliveryRawInputStreamReadBytesSignalsEOFOnEmptyCompleteTransfer() throws Exception {
        final byte[] payload1 = createEncodedMessage(new Data(new byte[] { 0, 1, 2, 3, 4, 5 }));

        try (ProtonTestServer peer = new ProtonTestServer()) {
            peer.expectSASLAnonymousConnect();
            peer.expectOpen().respond();
            peer.expectBegin().respond();
            peer.expectAttach().withRole(Role.RECEIVER.getValue()).respond();
            peer.expectFlow();
            peer.remoteTransfer().withHandle(0)
                                 .withDeliveryId(0)
                                 .withDeliveryTag(new byte[] { 1 })
                                 .withMore(true)
                                 .withMessageFormat(0)
                                 .withPayload(payload1).queue();
            peer.expectDisposition().withState().accepted().withSettled(true);
            peer.start();

            URI remoteURI = peer.getServerURI();

            LOG.info("Test started, peer listening on: {}", remoteURI);

            final Client container = Client.create();
            final Connection connection = container.connect(remoteURI.getHost(), remoteURI.getPort());
            final StreamReceiver receiver = connection.openStreamReceiver("test-queue");

            final StreamDelivery delivery = receiver.receive();

            assertNotNull(delivery);
            assertFalse(delivery.completed());
            assertFalse(delivery.aborted());

            final InputStream stream = delivery.rawInputStream();
            assertNotNull(stream);

            assertEquals(payload1.length, stream.available());
            final byte[] deliveryBytes1 = new byte[payload1.length];
            stream.read(deliveryBytes1);

            assertArrayEquals(payload1, deliveryBytes1);
            assertEquals(0, stream.available());

            peer.remoteTransfer().withHandle(0)
                                 .withDeliveryId(0)
                                 .withDeliveryTag(new byte[] { 1 })
                                 .withMore(false)
                                 .later(50);

            // Should block until more data arrives.
            final byte[] deliveryBytes2 = new byte[payload1.length];
            assertEquals(-1, stream.read(deliveryBytes2));
            assertEquals(0, stream.available());

            assertTrue(delivery.completed());
            assertFalse(delivery.aborted());

            stream.close();

            peer.expectDetach().respond();
            peer.expectEnd().respond();
            peer.expectClose().respond();

            receiver.close();
            connection.closeAsync().get();

            peer.waitForScriptToComplete(5, TimeUnit.SECONDS);
        }
    }

    @Test
    public void testStreamDeliveryRawInputStreamWithInCompleteDeliverySkipBytes() throws Exception {
        final byte[] payload1 = createEncodedMessage(new Data(new byte[] { 0, 1, 2, 3, 4, 5 }));
        final byte[] payload2 = createEncodedMessage(new Data(new byte[] { 6, 7, 8, 9, 0 ,1 }));

        try (ProtonTestServer peer = new ProtonTestServer()) {
            peer.expectSASLAnonymousConnect();
            peer.expectOpen().respond();
            peer.expectBegin().respond();
            peer.expectAttach().withRole(Role.RECEIVER.getValue()).respond();
            peer.expectFlow();
            peer.remoteTransfer().withHandle(0)
                                 .withDeliveryId(0)
                                 .withDeliveryTag(new byte[] { 1 })
                                 .withMore(true)
                                 .withMessageFormat(0)
                                 .withPayload(payload1).queue();
            peer.expectDisposition().withState().accepted().withSettled(true);
            peer.start();

            URI remoteURI = peer.getServerURI();

            LOG.info("Test started, peer listening on: {}", remoteURI);

            final Client container = Client.create();
            final Connection connection = container.connect(remoteURI.getHost(), remoteURI.getPort());
            final StreamReceiver receiver = connection.openStreamReceiver("test-queue");

            final StreamDelivery delivery = receiver.receive();

            assertNotNull(delivery);
            assertFalse(delivery.completed());
            assertFalse(delivery.aborted());

            final InputStream stream = delivery.rawInputStream();
            assertNotNull(stream);

            assertEquals(payload1.length, stream.available());
            stream.skip(payload1.length);
            assertEquals(0, stream.available());

            peer.remoteTransfer().withHandle(0)
                                 .withDeliveryId(0)
                                 .withDeliveryTag(new byte[] { 1 })
                                 .withMore(false)
                                 .withPayload(payload2).later(50);

            // Should block until more data arrives.
            stream.skip(payload2.length);
            assertEquals(0, stream.available());

            assertTrue(delivery.completed());
            assertFalse(delivery.aborted());

            stream.close();

            peer.expectDetach().respond();
            peer.expectEnd().respond();
            peer.expectClose().respond();

            receiver.close();
            connection.close();

            peer.waitForScriptToComplete(5, TimeUnit.SECONDS);
        }
    }

    @Test
    public void testStreamDeliveryRawInputStreamReadOpensSessionWindowForAdditionalInput() throws Exception {
        final byte[] body1 = new byte[] { 0, 1, 2, 3, 4, 5, 6, 7, 8, 9 };
        final byte[] body2 = new byte[] { 10, 9, 8, 7, 6, 5, 4, 3, 2, 1, 0 };
        final byte[] payload1 = createEncodedMessage(new Data(body1));
        final byte[] payload2 = createEncodedMessage(new Data(body2));

        try (ProtonTestServer peer = new ProtonTestServer()) {
            peer.expectSASLAnonymousConnect();
            peer.expectOpen().withMaxFrameSize(1000).respond();
            peer.expectBegin().withIncomingWindow(1).respond();
            peer.expectAttach().ofReceiver().respond();
            peer.expectFlow().withIncomingWindow(1).withLinkCredit(10);
            peer.remoteTransfer().withHandle(0)
                                 .withDeliveryId(0)
                                 .withDeliveryTag(new byte[] { 1 })
                                 .withMore(true)
                                 .withMessageFormat(0)
                                 .withPayload(payload1).queue();
            peer.start();

            URI remoteURI = peer.getServerURI();

            LOG.info("Test started, peer listening on: {}", remoteURI);

            Client container = Client.create();
            ConnectionOptions connectionOptions = new ConnectionOptions().maxFrameSize(1000);
            Connection connection = container.connect(remoteURI.getHost(), remoteURI.getPort(), connectionOptions);
            StreamReceiverOptions streamOptions = new StreamReceiverOptions().readBufferSize(2000);
            StreamReceiver receiver = connection.openStreamReceiver("test-queue", streamOptions);
            StreamDelivery delivery = receiver.receive();
            assertNotNull(delivery);
            InputStream rawStream = delivery.rawInputStream();
            assertNotNull(rawStream);

            // An initial frame has arrived but more than that is requested so the first chuck is pulled
            // from the incoming delivery and the session window opens which allows the second chunk to
            // arrive and again the session window will be opened as that chunk is moved to the reader's
            // buffer for return from the read request.
            peer.waitForScriptToComplete(5, TimeUnit.SECONDS);
            peer.expectFlow().withDeliveryCount(0).withIncomingWindow(1).withLinkCredit(10);
            peer.remoteTransfer().withHandle(0)
                                 .withDeliveryId(0)
                                 .withMore(false)
                                 .withMessageFormat(0)
                                 .withPayload(payload2).queue();
            peer.expectFlow().withDeliveryCount(1).withIncomingWindow(1).withLinkCredit(9);
            peer.expectDisposition().withFirst(0).withState().accepted().withSettled(true);

            byte[] combinedPayloads = new byte[payload1.length + payload2.length];
            rawStream.read(combinedPayloads);

            assertTrue(Arrays.equals(payload1, 0, payload1.length, combinedPayloads, 0, payload1.length));
            assertTrue(Arrays.equals(payload2, 0, payload2.length, combinedPayloads, payload1.length, payload1.length + payload2.length));

            rawStream.close();

            peer.waitForScriptToComplete(5, TimeUnit.SECONDS);
            peer.expectDetach().respond();
            peer.expectEnd().respond();
            peer.expectClose().respond();

            receiver.openFuture().get();
            receiver.closeAsync().get();
            connection.closeAsync().get();

            peer.waitForScriptToComplete(5, TimeUnit.SECONDS);
        }
    }

    @Test
    public void testStreamDeliveryRawInputStreamBlockedReadBytesAborted() throws Exception {
        final byte[] payload = createEncodedMessage(new AmqpValue<>("Hello World"));

        try (ProtonTestServer peer = new ProtonTestServer()) {
            peer.expectSASLAnonymousConnect();
            peer.expectOpen().respond();
            peer.expectBegin().respond();
            peer.expectAttach().withRole(Role.RECEIVER.getValue()).respond();
            peer.expectFlow();
            peer.remoteTransfer().withHandle(0)
                                 .withDeliveryId(0)
                                 .withDeliveryTag(new byte[] { 1 })
                                 .withMore(true)
                                 .withMessageFormat(0)
                                 .withPayload(payload).queue();
            peer.start();

            URI remoteURI = peer.getServerURI();

            LOG.info("Test started, peer listening on: {}", remoteURI);

            final Client container = Client.create();
            final Connection connection = container.connect(remoteURI.getHost(), remoteURI.getPort());
            final StreamReceiver receiver = connection.openStreamReceiver("test-queue");

            final StreamDelivery delivery = receiver.receive();

            assertNotNull(delivery);
            assertFalse(delivery.completed());
            assertFalse(delivery.aborted());

            final InputStream stream = delivery.rawInputStream();
            assertNotNull(stream);

            assertEquals(payload.length, stream.available());
            final byte[] deliveryBytes = new byte[payload.length * 2];

            peer.remoteTransfer().withHandle(0)
                                 .withDeliveryId(0)
                                 .withDeliveryTag(new byte[] { 1 })
                                 .withMore(false)
                                 .withAborted(true)
                                 .withMessageFormat(0).later(50);

            try {
                stream.read(deliveryBytes);
                fail("Delivery should have been aborted while waiting for more data.");
            } catch (IOException ioe) {
                assertTrue(ioe.getCause() instanceof ClientDeliveryAbortedException);
            }

            stream.close();

            peer.expectDetach().respond();
            peer.expectEnd().respond();
            peer.expectClose().respond();

            receiver.closeAsync().get();
            connection.closeAsync().get();

            peer.waitForScriptToComplete(5, TimeUnit.SECONDS);
        }
    }

    @Test
    public void testStreamDeliveryRawInputStreamClosedWithoutReadsConsumesTransfers() throws Exception {
        final byte[] body1 = new byte[] { 0, 1, 2, 3, 4, 5, 6, 7, 8, 9 };
        final byte[] body2 = new byte[] { 10, 9, 8, 7, 6, 5, 4, 3, 2, 1, 0 };
        final byte[] payload1 = createEncodedMessage(new Data(body1));
        final byte[] payload2 = createEncodedMessage(new Data(body2));

        try (ProtonTestServer peer = new ProtonTestServer()) {
            peer.expectSASLAnonymousConnect();
            peer.expectOpen().withMaxFrameSize(1000).respond();
            peer.expectBegin().withIncomingWindow(1).respond();
            peer.expectAttach().ofReceiver().respond();
            peer.expectFlow().withIncomingWindow(1).withLinkCredit(10);
            peer.remoteTransfer().withHandle(0)
                                 .withDeliveryId(0)
                                 .withDeliveryTag(new byte[] { 1 })
                                 .withMore(true)
                                 .withMessageFormat(0)
                                 .withPayload(payload1).queue();
            peer.start();

            URI remoteURI = peer.getServerURI();

            LOG.info("Test started, peer listening on: {}", remoteURI);

            Client container = Client.create();
            ConnectionOptions connectionOptions = new ConnectionOptions().maxFrameSize(1000);
            Connection connection = container.connect(remoteURI.getHost(), remoteURI.getPort(), connectionOptions);
            StreamReceiverOptions streamOptions = new StreamReceiverOptions().readBufferSize(2000);
            StreamReceiver receiver = connection.openStreamReceiver("test-queue", streamOptions);
            StreamDelivery delivery = receiver.receive();
            assertNotNull(delivery);
            InputStream rawStream = delivery.rawInputStream();
            assertNotNull(rawStream);

            // An initial frame has arrived but no reads have been performed and then if closed
            // the delivery will be consumed to allow the session window to be opened and prevent
            // a stall due to an un-consumed delivery.
            peer.waitForScriptToComplete(5, TimeUnit.SECONDS);
            peer.expectFlow().withDeliveryCount(0).withIncomingWindow(1).withLinkCredit(10);
            peer.remoteTransfer().withHandle(0)
                                 .withDeliveryId(0)
                                 .withMore(false)
                                 .withMessageFormat(0)
                                 .withPayload(payload2).queue();
            peer.expectFlow().withDeliveryCount(1).withIncomingWindow(1).withLinkCredit(9);
            peer.expectDisposition().withSettled(true).withState().accepted();

            rawStream.close();

            peer.waitForScriptToComplete(5, TimeUnit.SECONDS);
            peer.expectDetach().respond();
            peer.expectEnd().respond();
            peer.expectClose().respond();

            receiver.openFuture().get();
            receiver.closeAsync().get();
            connection.closeAsync().get();

            peer.waitForScriptToComplete(5, TimeUnit.SECONDS);
        }
    }

    @Test
    public void testStreamDeliveryRawInputStreamClosedWithoutReadsAllowsUserDisposition() throws Exception {
        final byte[] body1 = new byte[] { 0, 1, 2, 3, 4, 5, 6, 7, 8, 9 };
        final byte[] body2 = new byte[] { 10, 9, 8, 7, 6, 5, 4, 3, 2, 1, 0 };
        final byte[] payload1 = createEncodedMessage(new Data(body1));
        final byte[] payload2 = createEncodedMessage(new Data(body2));

        try (ProtonTestServer peer = new ProtonTestServer()) {
            peer.expectSASLAnonymousConnect();
            peer.expectOpen().withMaxFrameSize(1000).respond();
            peer.expectBegin().withIncomingWindow(1).respond();
            peer.expectAttach().ofReceiver().respond();
            peer.expectFlow().withIncomingWindow(1).withLinkCredit(10);
            peer.remoteTransfer().withHandle(0)
                                 .withDeliveryId(0)
                                 .withDeliveryTag(new byte[] { 1 })
                                 .withMore(true)
                                 .withMessageFormat(0)
                                 .withPayload(payload1).queue();
            peer.start();

            URI remoteURI = peer.getServerURI();

            LOG.info("Test started, peer listening on: {}", remoteURI);

            Client container = Client.create();
            ConnectionOptions connectionOptions = new ConnectionOptions().maxFrameSize(1000);
            Connection connection = container.connect(remoteURI.getHost(), remoteURI.getPort(), connectionOptions);
            StreamReceiverOptions streamOptions = new StreamReceiverOptions().readBufferSize(2000).autoAccept(false);
            StreamReceiver receiver = connection.openStreamReceiver("test-queue", streamOptions);
            StreamDelivery delivery = receiver.receive();
            assertNotNull(delivery);
            InputStream rawStream = delivery.rawInputStream();
            assertNotNull(rawStream);

            // An initial frame has arrived but no reads have been performed and then if closed
            // the delivery will be consumed to allow the session window to be opened and prevent
            // a stall due to an un-consumed delivery.  The stream delivery will not auto accept
            // or auto settle the delivery as the user closed early which should indicate they
            // are rejecting the message otherwise it is a programming error on their part.
            peer.waitForScriptToComplete(5, TimeUnit.SECONDS);
            peer.expectFlow().withDeliveryCount(0).withIncomingWindow(1).withLinkCredit(10);
            peer.remoteTransfer().withHandle(0)
                                 .withDeliveryId(0)
                                 .withMore(false)
                                 .withMessageFormat(0)
                                 .withPayload(payload2).queue();
            peer.expectFlow().withDeliveryCount(1).withIncomingWindow(1).withLinkCredit(9);

            rawStream.close();

            peer.waitForScriptToComplete(5, TimeUnit.SECONDS);
            peer.expectDisposition().withState().rejected("invalid-format", "decode error").withSettled(true);
            peer.expectDetach().respond();
            peer.expectEnd().respond();
            peer.expectClose().respond();

            delivery.disposition(new ClientDeliveryState.ClientRejected("invalid-format", "decode error"), true);

            receiver.openFuture().get();
            receiver.closeAsync().get();
            connection.closeAsync().get();

            peer.waitForScriptToComplete(5, TimeUnit.SECONDS);
        }
    }

    @Test
    public void testStreamDeliveryUserAppliedDispositionBeforeStreamRead() throws Exception {
        final byte[] payload = createEncodedMessage(new AmqpValue<>("Hello World"));

        try (ProtonTestServer peer = new ProtonTestServer()) {
            peer.expectSASLAnonymousConnect();
            peer.expectOpen().respond();
            peer.expectBegin().respond();
            peer.expectAttach().withRole(Role.RECEIVER.getValue()).respond();
            peer.expectFlow();
            peer.remoteTransfer().withHandle(0)
                                 .withDeliveryId(0)
                                 .withDeliveryTag(new byte[] { 1 })
                                 .withMore(false)
                                 .withMessageFormat(0)
                                 .withPayload(payload).queue();
            peer.expectDisposition().withState().accepted().withSettled(true);
            peer.start();

            URI remoteURI = peer.getServerURI();

            LOG.info("Test started, peer listening on: {}", remoteURI);

            final Client container = Client.create();
            final Connection connection = container.connect(remoteURI.getHost(), remoteURI.getPort());
            final StreamReceiver receiver = connection.openStreamReceiver("test-queue");
            final StreamDelivery delivery = receiver.receive();

            assertNotNull(delivery);
            assertTrue(delivery.completed());
            assertFalse(delivery.aborted());

            delivery.disposition(ClientDeliveryState.ClientAccepted.getInstance(), true);

            peer.waitForScriptToComplete(5, TimeUnit.SECONDS);

            final InputStream stream = delivery.rawInputStream();
            assertNotNull(stream);

            assertEquals(payload.length, stream.available());
            final byte[] deliveryBytes = new byte[payload.length];
            for (int i = 0; i < payload.length; ++i) {
                deliveryBytes[i] = (byte) stream.read();
            }

            assertArrayEquals(payload, deliveryBytes);
            assertEquals(0, stream.available());
            assertEquals(-1, stream.read());

            stream.close();

            peer.expectDetach().respond();
            peer.expectEnd().respond();
            peer.expectClose().respond();

            receiver.close();
            connection.closeAsync().get();

            peer.waitForScriptToComplete(5, TimeUnit.SECONDS);
        }
    }

    @Test
    public void testStreamSupportsMark() throws Exception {
        final byte[] payload = createEncodedMessage(new Data(new byte[] { 0, 1, 2, 3, 4, 5 }));

        try (ProtonTestServer peer = new ProtonTestServer()) {
            peer.expectSASLAnonymousConnect();
            peer.expectOpen().respond();
            peer.expectBegin().respond();
            peer.expectAttach().withRole(Role.RECEIVER.getValue()).respond();
            peer.expectFlow();
            peer.remoteTransfer().withHandle(0)
                                 .withDeliveryId(0)
                                 .withDeliveryTag(new byte[] { 1 })
                                 .withMore(false)
                                 .withMessageFormat(0)
                                 .withPayload(payload).queue();
            peer.expectDisposition().withState().accepted().withSettled(true);
            peer.start();

            URI remoteURI = peer.getServerURI();

            LOG.info("Test started, peer listening on: {}", remoteURI);

            final Client container = Client.create();
            final Connection connection = container.connect(remoteURI.getHost(), remoteURI.getPort());
            final StreamReceiver receiver = connection.openStreamReceiver("test-queue");

            final StreamDelivery delivery = receiver.receive();

            assertNotNull(delivery);
            assertTrue(delivery.completed());
            assertFalse(delivery.aborted());

            final InputStream stream = delivery.rawInputStream();
            assertNotNull(stream);
            assertTrue(stream.markSupported());

            assertEquals(payload.length, stream.available());
            stream.mark(payload.length);

            final byte[] deliveryBytes1 = new byte[payload.length];
            final byte[] deliveryBytes2 = new byte[payload.length];
            stream.read(deliveryBytes1);
            stream.reset();
            stream.read(deliveryBytes2);

            assertNotSame(deliveryBytes1, deliveryBytes2);
            assertArrayEquals(payload, deliveryBytes1);
            assertArrayEquals(payload, deliveryBytes2);
            assertEquals(0, stream.available());

            assertTrue(delivery.completed());
            assertFalse(delivery.aborted());

            stream.close();

            peer.expectDetach().respond();
            peer.expectEnd().respond();
            peer.expectClose().respond();

            receiver.close();
            connection.close();

            peer.waitForScriptToComplete(5, TimeUnit.SECONDS);
        }
    }

    @Test
    public void testStreamMessageWithHeaderOnly() throws Exception {
        final byte[] payload = createEncodedMessage(new Header().setDurable(true));

        try (ProtonTestServer peer = new ProtonTestServer()) {
            peer.expectSASLAnonymousConnect();
            peer.expectOpen().respond();
            peer.expectBegin().respond();
            peer.expectAttach().withRole(Role.RECEIVER.getValue()).respond();
            peer.expectFlow();
            peer.remoteTransfer().withHandle(0)
                                 .withDeliveryId(0)
                                 .withDeliveryTag(new byte[] { 1 })
                                 .withMore(false)
                                 .withMessageFormat(0)
                                 .withPayload(payload).queue();
            peer.expectDisposition().withState().accepted().withSettled(true);
            peer.start();

            URI remoteURI = peer.getServerURI();

            LOG.info("Test started, peer listening on: {}", remoteURI);

            final Client container = Client.create();
            final Connection connection = container.connect(remoteURI.getHost(), remoteURI.getPort());
            final StreamReceiver receiver = connection.openStreamReceiver("test-queue");
            final StreamDelivery delivery = receiver.receive();

            assertNotNull(delivery);
            assertTrue(delivery.completed());
            assertFalse(delivery.aborted());

            StreamReceiverMessage message = delivery.message();
            assertNotNull(message);
            Header header = message.header();
            assertNotNull(header);

            assertSame(receiver, message.receiver());
            assertSame(delivery, message.delivery());

            assertNull(message.properties());
            assertNull(message.annotations());
            assertNull(message.applicationProperties());
            assertNull(message.footer());
            assertTrue(message.completed());

            peer.expectDetach().respond();
            peer.expectEnd().respond();
            peer.expectClose().respond();

            receiver.close();
            connection.closeAsync().get();

            peer.waitForScriptToComplete(5, TimeUnit.SECONDS);
        }
    }

    @Test
    public void testReadHeaderFromStreamMessageWithoutHeaderSection() throws Exception {
        Map<Symbol, Object> annotationsMap = new HashMap<>();
        annotationsMap.put(Symbol.valueOf("test-1"), UUID.randomUUID());
        annotationsMap.put(Symbol.valueOf("test-2"), UUID.randomUUID());
        annotationsMap.put(Symbol.valueOf("test-3"), UUID.randomUUID());

        final byte[] payload = createEncodedMessage(new MessageAnnotations(annotationsMap));

        try (ProtonTestServer peer = new ProtonTestServer()) {
            peer.expectSASLAnonymousConnect();
            peer.expectOpen().respond();
            peer.expectBegin().respond();
            peer.expectAttach().withRole(Role.RECEIVER.getValue()).respond();
            peer.expectFlow();
            peer.remoteTransfer().withHandle(0)
                                 .withDeliveryId(0)
                                 .withDeliveryTag(new byte[] { 1 })
                                 .withMore(false)
                                 .withMessageFormat(0)
                                 .withPayload(payload).queue();
            peer.expectDisposition().withSettled(true).withState().accepted();
            peer.start();

            URI remoteURI = peer.getServerURI();

            LOG.info("Test started, peer listening on: {}", remoteURI);

            final Client container = Client.create();
            final Connection connection = container.connect(remoteURI.getHost(), remoteURI.getPort());
            final StreamReceiver receiver = connection.openStreamReceiver("test-queue");
            final StreamDelivery delivery = receiver.receive();

            assertNotNull(delivery);
            assertTrue(delivery.completed());
            assertFalse(delivery.aborted());

            StreamReceiverMessage message = delivery.message();
            assertNotNull(message);
            Header header = message.header();
            assertNull(header);
            MessageAnnotations annotations = message.annotations();
            assertNotNull(annotations);
            assertEquals(annotationsMap, annotations.getValue());

            peer.expectDetach().respond();
            peer.expectEnd().respond();
            peer.expectClose().respond();

            receiver.closeAsync().get();
            connection.closeAsync().get();

            peer.waitForScriptToComplete(5, TimeUnit.SECONDS);
        }
    }

    @Test
    public void testTryReadSectionBeyondWhatIsEncodedIntoMessage() throws Exception {
        Map<Symbol, Object> annotationsMap = new HashMap<>();
        annotationsMap.put(Symbol.valueOf("test-1"), UUID.randomUUID());
        annotationsMap.put(Symbol.valueOf("test-2"), UUID.randomUUID());
        annotationsMap.put(Symbol.valueOf("test-3"), UUID.randomUUID());

        final byte[] payload = createEncodedMessage(new Header(), new MessageAnnotations(annotationsMap));

        try (ProtonTestServer peer = new ProtonTestServer()) {
            peer.expectSASLAnonymousConnect();
            peer.expectOpen().respond();
            peer.expectBegin().respond();
            peer.expectAttach().withRole(Role.RECEIVER.getValue()).respond();
            peer.expectFlow();
            peer.remoteTransfer().withHandle(0)
                                 .withDeliveryId(0)
                                 .withDeliveryTag(new byte[] { 1 })
                                 .withMore(false)
                                 .withMessageFormat(0)
                                 .withPayload(payload).queue();
            peer.expectDisposition().withFirst(0).withState().accepted().withSettled(true);
            peer.start();

            URI remoteURI = peer.getServerURI();

            LOG.info("Test started, peer listening on: {}", remoteURI);

            final Client container = Client.create();
            final Connection connection = container.connect(remoteURI.getHost(), remoteURI.getPort());
            final StreamReceiver receiver = connection.openStreamReceiver("test-queue");
            final StreamDelivery delivery = receiver.receive();

            assertNotNull(delivery);
            assertTrue(delivery.completed());
            assertFalse(delivery.aborted());

            StreamReceiverMessage message = delivery.message();
            assertNotNull(message);

            Properties properties = message.properties();
            assertNull(properties);
            Header header = message.header();
            assertNotNull(header);
            MessageAnnotations annotations = message.annotations();
            assertNotNull(annotations);
            assertEquals(annotationsMap, annotations.getValue());

            peer.expectDetach().respond();
            peer.expectEnd().respond();
            peer.expectClose().respond();

            receiver.closeAsync().get();
            connection.closeAsync().get();

            peer.waitForScriptToComplete(5, TimeUnit.SECONDS);
        }
    }

    @Test
    public void testReadBytesFromBodyInputStreamUsingReadByteAPI() throws Exception {
        final byte[] body = new byte[] { 0, 1, 2, 3, 4, 5, 6, 7, 8, 9 };
        final byte[] payload = createEncodedMessage(new Data(body));

        try (ProtonTestServer peer = new ProtonTestServer()) {
            peer.expectSASLAnonymousConnect();
            peer.expectOpen().respond();
            peer.expectBegin().respond();
            peer.expectAttach().withRole(Role.RECEIVER.getValue()).respond();
            peer.expectFlow();
            peer.remoteTransfer().withHandle(0)
                                 .withDeliveryId(0)
                                 .withDeliveryTag(new byte[] { 1 })
                                 .withMore(false)
                                 .withMessageFormat(0)
                                 .withPayload(payload).queue();
            peer.expectDisposition().withFirst(0).withState().accepted().withSettled(true);
            peer.start();

            URI remoteURI = peer.getServerURI();

            LOG.info("Test started, peer listening on: {}", remoteURI);

            final Client container = Client.create();
            final Connection connection = container.connect(remoteURI.getHost(), remoteURI.getPort());
            final StreamReceiver receiver = connection.openStreamReceiver("test-queue");
            final StreamDelivery delivery = receiver.receive();

            assertNotNull(delivery);
            assertTrue(delivery.completed());
            assertFalse(delivery.aborted());

            StreamReceiverMessage message = delivery.message();
            assertNotNull(message);

            InputStream bodyStream = message.body();
            assertNotNull(bodyStream);

            assertNull(message.header());
            assertNull(message.annotations());
            assertNull(message.properties());
            assertNull(delivery.annotations());

            final byte[] receivedBody = new byte[body.length];
            for (int i = 0; i < body.length; ++i) {
                receivedBody[i] = (byte) bodyStream.read();
            }
            assertArrayEquals(body, receivedBody);
            assertEquals(-1, bodyStream.read());
            assertNull(message.footer());

            peer.expectDetach().respond();
            peer.expectEnd().respond();
            peer.expectClose().respond();

            receiver.closeAsync().get();
            connection.closeAsync().get();

            peer.waitForScriptToComplete(5, TimeUnit.SECONDS);
        }
    }

    @Test
    public void testReadBytesFromInputStreamUsingReadByteWithSingleByteSplitTransfers() throws Exception {
        testReadBytesFromBodyInputStreamWithSplitSingleByteTransfers(1);
    }

    @Test
    public void testReadBytesFromInputStreamUsingSingleReadBytesWithSingleByteSplitTransfers() throws Exception {
        testReadBytesFromBodyInputStreamWithSplitSingleByteTransfers(2);
    }

    @Test
    public void testSkipBytesFromInputStreamWithSingleByteSplitTransfers() throws Exception {
        testReadBytesFromBodyInputStreamWithSplitSingleByteTransfers(3);
    }

    private void testReadBytesFromBodyInputStreamWithSplitSingleByteTransfers(int option) throws Exception {
        final byte[] body = new byte[] { 0, 1, 2, 3, 4, 5, 6, 7, 8, 9 };
        final byte[] payload = createEncodedMessage(new Data(body));

        try (ProtonTestServer peer = new ProtonTestServer()) {
            peer.expectSASLAnonymousConnect();
            peer.expectOpen().respond();
            peer.expectBegin().respond();
            peer.expectAttach().withRole(Role.RECEIVER.getValue()).respond();
            peer.expectFlow();
            for (int i = 0; i < payload.length; ++i) {
                peer.remoteTransfer().withHandle(0)
                                     .withDeliveryId(0)
                                     .withDeliveryTag(new byte[] { 1 })
                                     .withMore(true)
                                     .withMessageFormat(0)
                                     .withPayload(new byte[] { payload[i] }).afterDelay(3).queue();
            }
            peer.remoteTransfer().withHandle(0)
                                 .withDeliveryId(0)
                                 .withDeliveryTag(new byte[] { 1 })
                                 .withMore(false)
                                 .withMessageFormat(0).afterDelay(5).queue();
            peer.expectDisposition().withFirst(0).withSettled(true);
            peer.start();

            URI remoteURI = peer.getServerURI();

            LOG.info("Test started, peer listening on: {}", remoteURI);

            final Client container = Client.create();
            final Connection connection = container.connect(remoteURI.getHost(), remoteURI.getPort());
            final StreamReceiver receiver = connection.openStreamReceiver("test-queue");
            final StreamDelivery delivery = receiver.receive();
            final StreamReceiverMessage message = delivery.message();
            final InputStream bodyStream = message.body();

            final byte[] receivedBody = new byte[body.length];

            if (option == 1) {
                for (int i = 0; i < body.length; ++i) {
                    receivedBody[i] = (byte) bodyStream.read();
                }
                assertArrayEquals(body, receivedBody);
            } else if (option == 2) {
                assertEquals(body.length, bodyStream.read(receivedBody));
                assertArrayEquals(body, receivedBody);
            } else if (option == 3) {
                assertEquals(body.length, bodyStream.skip(body.length));
            } else {
                fail("Unknown test option");
            }

            bodyStream.close();

            peer.expectDetach().respond();
            peer.expectEnd().respond();
            peer.expectClose().respond();

            receiver.close();
            connection.closeAsync().get();

            peer.waitForScriptToComplete(5, TimeUnit.SECONDS);
        }
    }

    @Test
    public void testStreamReceiverSessionCannotCreateNewResources() throws Exception {
        try (ProtonTestServer peer = new ProtonTestServer()) {
            peer.expectSASLAnonymousConnect();
            peer.expectOpen().respond();
            peer.expectBegin().respond();
            peer.expectAttach().withRole(Role.RECEIVER.getValue()).respond();
            peer.expectFlow();
            peer.start();

            URI remoteURI = peer.getServerURI();

            LOG.info("Test started, peer listening on: {}", remoteURI);

            Client container = Client.create();
            Connection connection = container.connect(remoteURI.getHost(), remoteURI.getPort());
            StreamReceiver receiver = connection.openStreamReceiver("test-queue");

            assertThrows(ClientUnsupportedOperationException.class, () -> receiver.session().openReceiver("test"));
            assertThrows(ClientUnsupportedOperationException.class, () -> receiver.session().openReceiver("test", new ReceiverOptions()));
            assertThrows(ClientUnsupportedOperationException.class, () -> receiver.session().openDurableReceiver("test", "test"));
            assertThrows(ClientUnsupportedOperationException.class, () -> receiver.session().openDurableReceiver("test", "test", new ReceiverOptions()));
            assertThrows(ClientUnsupportedOperationException.class, () -> receiver.session().openDynamicReceiver());
            assertThrows(ClientUnsupportedOperationException.class, () -> receiver.session().openDynamicReceiver(new HashMap<>()));
            assertThrows(ClientUnsupportedOperationException.class, () -> receiver.session().openDynamicReceiver(new ReceiverOptions()));
            assertThrows(ClientUnsupportedOperationException.class, () -> receiver.session().openDynamicReceiver(new HashMap<>(), new ReceiverOptions()));
            assertThrows(ClientUnsupportedOperationException.class, () -> receiver.session().openSender("test"));
            assertThrows(ClientUnsupportedOperationException.class, () -> receiver.session().openSender("test", new SenderOptions()));
            assertThrows(ClientUnsupportedOperationException.class, () -> receiver.session().openAnonymousSender());
            assertThrows(ClientUnsupportedOperationException.class, () -> receiver.session().openAnonymousSender(new SenderOptions()));

            peer.waitForScriptToComplete(5, TimeUnit.SECONDS);

            peer.expectDetach().respond();
            peer.expectEnd().respond();
            peer.expectClose().respond();

            receiver.close();
            connection.close();

            peer.waitForScriptToComplete(5, TimeUnit.SECONDS);
        }
    }

    @Test
    public void testReadByteArrayPayloadInChunksFromSingleTransferMessage() throws Exception {
        testReadPayloadInChunksFromLargerMessage(false);
    }

    @Test
    public void testReadBytesWithArgsPayloadInChunksFromSingleTransferMessage() throws Exception {
        testReadPayloadInChunksFromLargerMessage(true);
    }

    private void testReadPayloadInChunksFromLargerMessage(boolean readWithArgs) throws Exception {
        final byte[] body = new byte[100];
        final Random random = new Random();
        random.setSeed(System.currentTimeMillis());
        random.nextBytes(body);
        final byte[] payload = createEncodedMessage(new Data(body));

        try (ProtonTestServer peer = new ProtonTestServer()) {
            peer.expectSASLAnonymousConnect();
            peer.expectOpen().respond();
            peer.expectBegin().respond();
            peer.expectAttach().withRole(Role.RECEIVER.getValue()).respond();
            peer.expectFlow();
            peer.remoteTransfer().withHandle(0)
                                 .withDeliveryId(0)
                                 .withDeliveryTag(new byte[] { 1 })
                                 .withMore(false)
                                 .withMessageFormat(0)
                                 .withPayload(payload).queue();
            peer.expectDisposition().withFirst(0).withState().accepted().withSettled(true);
            peer.start();

            URI remoteURI = peer.getServerURI();

            LOG.info("Test started, peer listening on: {}", remoteURI);

            final Client container = Client.create();
            final Connection connection = container.connect(remoteURI.getHost(), remoteURI.getPort());
            final StreamReceiver receiver = connection.openStreamReceiver("test-queue");
            final StreamDelivery delivery = receiver.receive();

            assertNotNull(delivery);
            assertTrue(delivery.completed());
            assertFalse(delivery.aborted());
            assertEquals(0, delivery.messageFormat());

            StreamReceiverMessage message = delivery.message();
            assertNotNull(message);

            InputStream bodyStream = message.body();
            assertNotNull(bodyStream);

            assertThrows(ClientUnsupportedOperationException.class, () -> message.messageFormat(1));
            assertNull(message.header());
            assertNull(message.annotations());
            assertNull(message.properties());
            assertNull(delivery.annotations());

            final byte[] aggregateBody = new byte[body.length];
            final byte[] receivedBody = new byte[10];

            for (int i = 0; i < body.length; i += 10) {
                if (readWithArgs) {
                    bodyStream.read(receivedBody, 0, receivedBody.length);
                } else {
                    bodyStream.read(receivedBody);
                }

                System.arraycopy(receivedBody, 0, aggregateBody, i, receivedBody.length);
            }

            assertArrayEquals(body, aggregateBody);
            assertEquals(-1, bodyStream.read(receivedBody, 0, receivedBody.length));
            assertEquals(-1, bodyStream.read(receivedBody));
            assertEquals(-1, bodyStream.read());
            assertNull(message.footer());

            peer.expectDetach().respond();
            peer.expectEnd().respond();
            peer.expectClose().respond();

            receiver.closeAsync().get();
            connection.closeAsync().get();

            peer.waitForScriptToComplete(5, TimeUnit.SECONDS);
        }
    }

    @Test
    public void testStreamReceiverMessageThrowsOnAnyMessageModificationAPI() throws Exception {
        final byte[] body = new byte[] { 0, 1, 2, 3, 4, 5, 6, 7, 8, 9 };
        final byte[] payload = createEncodedMessage(new Data(body));

        try (ProtonTestServer peer = new ProtonTestServer()) {
            peer.expectSASLAnonymousConnect();
            peer.expectOpen().respond();
            peer.expectBegin().respond();
            peer.expectAttach().withRole(Role.RECEIVER.getValue()).respond();
            peer.expectFlow();
            peer.remoteTransfer().withHandle(0)
                                 .withDeliveryId(0)
                                 .withDeliveryTag(new byte[] { 1 })
                                 .withMore(false)
                                 .withMessageFormat(0)
                                 .withPayload(payload).queue();
            peer.expectDisposition().withFirst(0).withState().accepted().withSettled(true);
            peer.start();

            URI remoteURI = peer.getServerURI();

            LOG.info("Test started, peer listening on: {}", remoteURI);

            final Client container = Client.create();
            final Connection connection = container.connect(remoteURI.getHost(), remoteURI.getPort());
            final StreamReceiver receiver = connection.openStreamReceiver("test-queue");
            final StreamDelivery delivery = receiver.receive();
            final StreamReceiverMessage message = delivery.message();

            assertThrows(ClientUnsupportedOperationException.class, () -> message.header(new Header()));
            assertThrows(ClientUnsupportedOperationException.class, () -> message.properties(new Properties()));
            assertThrows(ClientUnsupportedOperationException.class, () -> message.applicationProperties(new ApplicationProperties(null)));
            assertThrows(ClientUnsupportedOperationException.class, () -> message.annotations(new MessageAnnotations(null)));
            assertThrows(ClientUnsupportedOperationException.class, () -> message.footer(new Footer(null)));

            assertThrows(ClientUnsupportedOperationException.class, () -> message.messageFormat(1));
            assertThrows(ClientUnsupportedOperationException.class, () -> message.durable(true));
            assertThrows(ClientUnsupportedOperationException.class, () -> message.priority((byte) 4));
            assertThrows(ClientUnsupportedOperationException.class, () -> message.timeToLive(128));
            assertThrows(ClientUnsupportedOperationException.class, () -> message.firstAcquirer(false));
            assertThrows(ClientUnsupportedOperationException.class, () -> message.deliveryCount(10));
            assertThrows(ClientUnsupportedOperationException.class, () -> message.messageId(10));
            assertThrows(ClientUnsupportedOperationException.class, () -> message.correlationId(10));
            assertThrows(ClientUnsupportedOperationException.class, () -> message.userId(new byte[] {1}));
            assertThrows(ClientUnsupportedOperationException.class, () -> message.to("test"));
            assertThrows(ClientUnsupportedOperationException.class, () -> message.subject("test"));
            assertThrows(ClientUnsupportedOperationException.class, () -> message.replyTo("test"));
            assertThrows(ClientUnsupportedOperationException.class, () -> message.contentType("test"));
            assertThrows(ClientUnsupportedOperationException.class, () -> message.contentEncoding("test"));
            assertThrows(ClientUnsupportedOperationException.class, () -> message.absoluteExpiryTime(10));
            assertThrows(ClientUnsupportedOperationException.class, () -> message.creationTime(10));
            assertThrows(ClientUnsupportedOperationException.class, () -> message.groupId("test"));
            assertThrows(ClientUnsupportedOperationException.class, () -> message.groupSequence(10));
            assertThrows(ClientUnsupportedOperationException.class, () -> message.replyToGroupId("test"));

            assertThrows(ClientUnsupportedOperationException.class, () -> message.annotation("test", 1));
            assertThrows(ClientUnsupportedOperationException.class, () -> message.removeAnnotation("test"));
            assertThrows(ClientUnsupportedOperationException.class, () -> message.property("test", 1));
            assertThrows(ClientUnsupportedOperationException.class, () -> message.removeProperty("test"));
            assertThrows(ClientUnsupportedOperationException.class, () -> message.footer("test", 1));
            assertThrows(ClientUnsupportedOperationException.class, () -> message.removeFooter("test"));

            assertThrows(ClientUnsupportedOperationException.class, () -> message.body(InputStream.nullInputStream()));
            assertThrows(ClientUnsupportedOperationException.class, () -> message.addBodySection(new AmqpValue<>("test")));
            assertThrows(ClientUnsupportedOperationException.class, () -> message.bodySections(Collections.emptyList()));
            assertThrows(ClientUnsupportedOperationException.class, () -> message.bodySections());
            assertThrows(ClientUnsupportedOperationException.class, () -> message.clearBodySections());
            assertThrows(ClientUnsupportedOperationException.class, () -> message.forEachBodySection((section) -> {}));
            assertThrows(ClientUnsupportedOperationException.class, () -> message.encode(Collections.emptyMap()));

            InputStream bodyStream = message.body();

            assertNotNull(bodyStream.readAllBytes());
            bodyStream.close();

            peer.expectDetach().respond();
            peer.expectEnd().respond();
            peer.expectClose().respond();

            receiver.closeAsync().get();
            connection.closeAsync().get();

            peer.waitForScriptToComplete(5, TimeUnit.SECONDS);
        }
    }

    @Test
    public void testSkipPayloadInChunksFromSingleTransferMessage() throws Exception {
        final byte[] body = new byte[100];
        final Random random = new Random();
        random.setSeed(System.currentTimeMillis());
        random.nextBytes(body);
        final byte[] payload = createEncodedMessage(new Data(body));

        try (ProtonTestServer peer = new ProtonTestServer()) {
            peer.expectSASLAnonymousConnect();
            peer.expectOpen().respond();
            peer.expectBegin().respond();
            peer.expectAttach().withRole(Role.RECEIVER.getValue()).respond();
            peer.expectFlow();
            peer.remoteTransfer().withHandle(0)
                                 .withDeliveryId(0)
                                 .withDeliveryTag(new byte[] { 1 })
                                 .withMore(false)
                                 .withMessageFormat(0)
                                 .withPayload(payload).queue();
            peer.expectDisposition().withFirst(0).withState().accepted().withSettled(true);
            peer.start();

            URI remoteURI = peer.getServerURI();

            LOG.info("Test started, peer listening on: {}", remoteURI);

            final Client container = Client.create();
            final Connection connection = container.connect(remoteURI.getHost(), remoteURI.getPort());
            final StreamReceiver receiver = connection.openStreamReceiver("test-queue");
            final StreamDelivery delivery = receiver.receive();

            assertNotNull(delivery);
            assertTrue(delivery.completed());
            assertFalse(delivery.aborted());

            StreamReceiverMessage message = delivery.message();
            assertNotNull(message);

            InputStream bodyStream = message.body();
            assertNotNull(bodyStream);

            assertNull(message.header());
            assertNull(message.annotations());
            assertNull(message.properties());
            assertNull(delivery.annotations());

            final int skipSize = 10;

            for (int i = 0; i < body.length; i += skipSize) {
                bodyStream.skip(10);
            }

            final byte[] scratchBuffer = new byte[10];

            assertEquals(-1, bodyStream.read(scratchBuffer, 0, scratchBuffer.length));
            assertEquals(-1, bodyStream.read(scratchBuffer));
            assertEquals(-1, bodyStream.read());
            assertNull(message.footer());

            peer.expectDetach().respond();
            peer.expectEnd().respond();
            peer.expectClose().respond();

            receiver.closeAsync().get();
            connection.closeAsync().get();

            peer.waitForScriptToComplete(5, TimeUnit.SECONDS);
        }
    }

    @Test
    public void testReadByteArrayPayloadInChunksFromMultipleTransfersMessage() throws Exception {
        testReadPayloadInChunksFromLargerMultiTransferMessage(false);
    }

    @Test
    public void testReadBytesWithArgsPayloadInChunksFromMultipleTransferMessage() throws Exception {
        testReadPayloadInChunksFromLargerMultiTransferMessage(true);
    }

    private void testReadPayloadInChunksFromLargerMultiTransferMessage(boolean readWithArgs) throws Exception {
        final Random random = new Random();
        final long seed = System.currentTimeMillis();
        final int numChunks = 4;
        final int chunkSize = 30;

        random.setSeed(seed);

        try (ProtonTestServer peer = new ProtonTestServer()) {
            peer.expectSASLAnonymousConnect();
            peer.expectOpen().respond();
            peer.expectBegin().respond();
            peer.expectAttach().withRole(Role.RECEIVER.getValue()).respond();
            peer.expectFlow();
            for (int i = 0; i < numChunks; ++i) {
                final byte[] chunk = new byte[chunkSize];
                random.nextBytes(chunk);
                peer.remoteTransfer().withHandle(0)
                                     .withDeliveryId(0)
                                     .withDeliveryTag(new byte[] { 1 })
                                     .withMore(true)
                                     .withMessageFormat(0)
                                     .withPayload(createEncodedMessage(new Data(chunk))).queue();
            }
            peer.remoteTransfer().withHandle(0).withMore(false).queue();
            peer.expectDisposition().withFirst(0).withState().accepted().withSettled(true);
            peer.start();

            URI remoteURI = peer.getServerURI();

            LOG.info("Test started, peer listening on: {}", remoteURI);

            final Client container = Client.create();
            final Connection connection = container.connect(remoteURI.getHost(), remoteURI.getPort());
            final StreamReceiver receiver = connection.openStreamReceiver("test-queue");
            final StreamDelivery delivery = receiver.receive();

            assertNotNull(delivery);

            StreamReceiverMessage message = delivery.message();
            assertNotNull(message);

            InputStream bodyStream = message.body();
            assertNotNull(bodyStream);

            assertNull(message.header());
            assertNull(message.annotations());
            assertNull(message.properties());
            assertNull(delivery.annotations());

            final byte[] readChunk = new byte[chunkSize];
            final byte[] receivedBody = new byte[3];

            random.setSeed(seed);

            int totalBytesRead = 0;

            for (int i = 0; i < numChunks; ++i) {
                for (int j = 0; j < readChunk.length; j += receivedBody.length) {
                    int bytesRead = 0;
                    if (readWithArgs) {
                        bytesRead = bodyStream.read(receivedBody, 0, receivedBody.length);
                    } else {
                        bytesRead = bodyStream.read(receivedBody);
                    }

                    totalBytesRead += bytesRead;

                    System.arraycopy(receivedBody, 0, readChunk, j, bytesRead);
                }

                final byte[] chunk = new byte[chunkSize];
                random.nextBytes(chunk);
                assertArrayEquals(chunk, readChunk);
            }

            assertEquals(chunkSize * numChunks, totalBytesRead);
            assertEquals(-1, bodyStream.read(receivedBody, 0, receivedBody.length));
            assertEquals(-1, bodyStream.read(receivedBody));
            assertEquals(-1, bodyStream.read());
            assertNull(message.footer());

            peer.expectDetach().respond();
            peer.expectEnd().respond();
            peer.expectClose().respond();

            receiver.closeAsync().get();
            connection.closeAsync().get();

            peer.waitForScriptToComplete(5, TimeUnit.SECONDS);
        }
    }

    @Test
    public void testReadPayloadFromSplitFrameTransferWithBufferLargerThanTotalPayload() throws Exception {
        final Random random = new Random();
        final long seed = System.currentTimeMillis();
        final int numChunks = 4;
        final int chunkSize = 30;

        random.setSeed(seed);

        try (ProtonTestServer peer = new ProtonTestServer()) {
            peer.expectSASLAnonymousConnect();
            peer.expectOpen().respond();
            peer.expectBegin().respond();
            peer.expectAttach().withRole(Role.RECEIVER.getValue()).respond();
            peer.expectFlow();
            for (int i = 0; i < numChunks; ++i) {
                final byte[] chunk = new byte[chunkSize];
                random.nextBytes(chunk);
                peer.remoteTransfer().withHandle(0)
                                     .withDeliveryId(0)
                                     .withDeliveryTag(new byte[] { 1 })
                                     .withMore(true)
                                     .withMessageFormat(0)
                                     .withPayload(createEncodedMessage(new Data(chunk))).queue();
            }
            peer.remoteTransfer().withHandle(0).withMore(false).queue();
            peer.expectDisposition().withFirst(0).withState().accepted().withSettled(true);
            peer.start();

            URI remoteURI = peer.getServerURI();

            LOG.info("Test started, peer listening on: {}", remoteURI);

            final Client container = Client.create();
            final Connection connection = container.connect(remoteURI.getHost(), remoteURI.getPort());
            final StreamReceiver receiver = connection.openStreamReceiver("test-queue");
            final StreamDelivery delivery = receiver.receive();

            assertNotNull(delivery);

            StreamReceiverMessage message = delivery.message();
            assertNotNull(message);

            InputStream bodyStream = message.body();
            assertNotNull(bodyStream);

            assertNull(message.header());
            assertNull(message.annotations());
            assertNull(message.properties());
            assertNull(delivery.annotations());

            final byte[] receivedBody = new byte[(chunkSize * numChunks) + 100];
            Arrays.fill(receivedBody, (byte) 0);
            final int totalBytesRead = bodyStream.read(receivedBody);

            assertEquals(chunkSize * numChunks, totalBytesRead);
            assertEquals(-1, bodyStream.read(receivedBody, 0, receivedBody.length));
            assertEquals(-1, bodyStream.read(receivedBody));
            assertEquals(-1, bodyStream.read());
            assertNull(message.footer());

            // Regenerate what should have been sent plus empty trailing section to
            // check that the read doesn't write anything into the area we gave beyond
            // what was expected payload size.
            random.setSeed(seed);
            final byte[] regeneratedPayload = new byte[numChunks * chunkSize + 100];
            Arrays.fill(regeneratedPayload, (byte) 0);
            for (int i = 0; i < numChunks; ++i) {
                final byte[] chunk = new byte[chunkSize];
                random.nextBytes(chunk);
                System.arraycopy(chunk, 0, regeneratedPayload, chunkSize * i, chunkSize);
            }

            assertArrayEquals(regeneratedPayload, receivedBody);

            peer.expectDetach().respond();
            peer.expectEnd().respond();
            peer.expectClose().respond();

            receiver.closeAsync().get();
            connection.closeAsync().get();

            peer.waitForScriptToComplete(5, TimeUnit.SECONDS);
        }
    }

    @Test
    public void testStreamReadOpensSessionWindowForAdditionalInput() throws Exception {
        final byte[] body1 = new byte[] { 0, 1, 2, 3, 4, 5, 6, 7, 8, 9 };
        final byte[] body2 = new byte[] { 10, 9, 8, 7, 6, 5, 4, 3, 2, 1, 0 };
        final byte[] payload1 = createEncodedMessage(new Data(body1));
        final byte[] payload2 = createEncodedMessage(new Data(body2));

        try (ProtonTestServer peer = new ProtonTestServer()) {
            peer.expectSASLAnonymousConnect();
            peer.expectOpen().withMaxFrameSize(1000).respond();
            peer.expectBegin().withIncomingWindow(1).respond();
            peer.expectAttach().ofReceiver().respond();
            peer.expectFlow().withIncomingWindow(1).withLinkCredit(10);
            peer.remoteTransfer().withHandle(0)
                                 .withDeliveryId(0)
                                 .withDeliveryTag(new byte[] { 1 })
                                 .withMore(true)
                                 .withMessageFormat(0)
                                 .withPayload(payload1).queue();
            peer.start();

            URI remoteURI = peer.getServerURI();

            LOG.info("Test started, peer listening on: {}", remoteURI);

            Client container = Client.create();
            ConnectionOptions connectionOptions = new ConnectionOptions().maxFrameSize(1000);
            Connection connection = container.connect(remoteURI.getHost(), remoteURI.getPort(), connectionOptions);
            StreamReceiverOptions streamOptions = new StreamReceiverOptions().readBufferSize(2000);
            StreamReceiver receiver = connection.openStreamReceiver("test-queue", streamOptions);
            StreamDelivery delivery = receiver.receive();
            assertNotNull(delivery);
            StreamReceiverMessage message = delivery.message();
            assertNotNull(message);

            // Creating the input stream instance should read the first chunk of data from the incoming
            // delivery which should result in a new credit being available to expand the session window.
            // An additional transfer should be placed into the delivery buffer but not yet read since
            // the user hasn't read anything.
            peer.waitForScriptToComplete(5, TimeUnit.SECONDS);
            peer.expectFlow().withDeliveryCount(0).withIncomingWindow(1).withLinkCredit(10);
            peer.remoteTransfer().withHandle(0)
                                 .withDeliveryId(0)
                                 .withMore(false)
                                 .withMessageFormat(0)
                                 .withPayload(payload2).queue();
            peer.expectDisposition().withFirst(0).withState().accepted().withSettled(true);

            InputStream bodyStream = message.body();
            assertNotNull(bodyStream);

            // Once the read of all data completes the session window should be opened and the
            // stream should mark the delivery as accepted and settled since we are in auto settle
            // mode and there is nothing more to read.
            peer.waitForScriptToComplete(5, TimeUnit.SECONDS);
            peer.expectFlow().withDeliveryCount(1).withIncomingWindow(1).withLinkCredit(9);

            byte[] combinedPayloads = new byte[body1.length + body2.length];
            bodyStream.read(combinedPayloads);

            assertTrue(Arrays.equals(body1, 0, body1.length, combinedPayloads, 0, body1.length));
            assertTrue(Arrays.equals(body2, 0, body2.length, combinedPayloads, body1.length, body1.length + body2.length));

            bodyStream.close();

            peer.waitForScriptToComplete(5, TimeUnit.SECONDS);
            peer.expectDetach().respond();
            peer.expectEnd().respond();
            peer.expectClose().respond();

            receiver.openFuture().get();
            receiver.closeAsync().get();
            connection.closeAsync().get();

            peer.waitForScriptToComplete(5, TimeUnit.SECONDS);
        }
    }

    @Test
    public void testStreamReadOpensSessionWindowForAdditionalInputAndGrantsCreditOnClose() throws Exception {
        final byte[] body1 = new byte[] { 0, 1, 2, 3, 4, 5, 6, 7, 8, 9 };
        final byte[] body2 = new byte[] { 10, 9, 8, 7, 6, 5, 4, 3, 2, 1, 0 };
        final byte[] payload1 = createEncodedMessage(new Data(body1));
        final byte[] payload2 = createEncodedMessage(new Data(body2));

        try (ProtonTestServer peer = new ProtonTestServer()) {
            peer.expectSASLAnonymousConnect();
            peer.expectOpen().withMaxFrameSize(1000).respond();
            peer.expectBegin().withIncomingWindow(1).respond();
            peer.expectAttach().ofReceiver().respond();
            peer.expectFlow().withIncomingWindow(1).withLinkCredit(1);
            peer.remoteTransfer().withHandle(0)
                                 .withDeliveryId(0)
                                 .withDeliveryTag(new byte[] { 1 })
                                 .withMore(true)
                                 .withMessageFormat(0)
                                 .withPayload(payload1).queue();
            peer.start();

            URI remoteURI = peer.getServerURI();

            LOG.info("Test started, peer listening on: {}", remoteURI);

            Client container = Client.create();
            ConnectionOptions connectionOptions = new ConnectionOptions().maxFrameSize(1000);
            Connection connection = container.connect(remoteURI.getHost(), remoteURI.getPort(), connectionOptions);
            StreamReceiverOptions streamOptions = new StreamReceiverOptions().readBufferSize(2000).creditWindow(1);
            StreamReceiver receiver = connection.openStreamReceiver("test-queue", streamOptions);
            StreamDelivery delivery = receiver.receive();
            assertNotNull(delivery);
            StreamReceiverMessage message = delivery.message();
            assertNotNull(message);

            // Creating the input stream instance should read the first chunk of data from the incoming
            // delivery which should result in a new credit being available to expand the session window.
            // An additional transfer should be placed into the delivery buffer but not yet read since
            // the user hasn't read anything. Since we are in auto settle the completed transfer should
            // trigger settlement and also open the credit window but the session window should not be
            // expanded since we haven't read the data yet.
            peer.waitForScriptToComplete(5, TimeUnit.SECONDS);
            peer.expectFlow().withDeliveryCount(0).withIncomingWindow(1).withLinkCredit(1);
            peer.remoteTransfer().withHandle(0)
                                 .withDeliveryId(0)
                                 .withMore(false)
                                 .withMessageFormat(0)
                                 .withPayload(payload2).queue();
            peer.expectDisposition().withSettled(true).withState().accepted();
            peer.expectFlow().withDeliveryCount(1).withIncomingWindow(0).withLinkCredit(1);

            InputStream bodyStream = message.body();
            assertNotNull(bodyStream);

            // Once the read of all data completes the session window should be opened
            peer.waitForScriptToComplete(5, TimeUnit.SECONDS);
            peer.expectFlow().withDeliveryCount(1).withIncomingWindow(1).withLinkCredit(1);

            byte[] combinedPayloads = new byte[body1.length + body2.length];
            bodyStream.read(combinedPayloads);

            peer.waitForScriptToComplete(5, TimeUnit.SECONDS);

            // No frames should be triggered by closing the stream since we already auto settled
            // and updated the session window on the remote.

            assertTrue(Arrays.equals(body1, 0, body1.length, combinedPayloads, 0, body1.length));
            assertTrue(Arrays.equals(body2, 0, body2.length, combinedPayloads, body1.length, body1.length + body2.length));

            bodyStream.close();

            peer.waitForScriptToComplete(5, TimeUnit.SECONDS);
            peer.expectDetach().respond();
            peer.expectEnd().respond();
            peer.expectClose().respond();

            receiver.openFuture().get();
            receiver.closeAsync().get();
            connection.closeAsync().get();

            peer.waitForScriptToComplete(5, TimeUnit.SECONDS);
        }
    }

    @Test
    public void testStreamReadOfAllPayloadConsumesTrailingFooterOnClose() throws Exception {
        final byte[] body1 = new byte[] { 0, 1, 2, 3, 4, 5, 6, 7, 8, 9 };
        final byte[] body2 = new byte[] { 10, 9, 8, 7, 6, 5, 4, 3, 2, 1, 0 };
        final byte[] payload1 = createEncodedMessage(new Data(body1));
        final byte[] payload2 = createEncodedMessage(new Data(body2));
        final Footer footers = new Footer(new HashMap<>());
        footers.getValue().put(Symbol.valueOf("footer-key"), "test");
        final byte[] payload3 = createEncodedMessage(footers);

        try (ProtonTestServer peer = new ProtonTestServer()) {
            peer.expectSASLAnonymousConnect();
            peer.expectOpen().withMaxFrameSize(1000).respond();
            peer.expectBegin().withIncomingWindow(1).respond();
            peer.expectAttach().ofReceiver().respond();
            peer.expectFlow().withIncomingWindow(1).withLinkCredit(10);
            peer.remoteTransfer().withHandle(0)
                                 .withDeliveryId(0)
                                 .withDeliveryTag(new byte[] { 1 })
                                 .withMore(true)
                                 .withMessageFormat(0)
                                 .withPayload(payload1).queue();
            peer.start();

            URI remoteURI = peer.getServerURI();

            LOG.info("Test started, peer listening on: {}", remoteURI);

            Client container = Client.create();
            ConnectionOptions connectionOptions = new ConnectionOptions().maxFrameSize(1000);
            Connection connection = container.connect(remoteURI.getHost(), remoteURI.getPort(), connectionOptions);
            StreamReceiverOptions streamOptions = new StreamReceiverOptions().readBufferSize(2000).autoAccept(false);
            StreamReceiver receiver = connection.openStreamReceiver("test-queue", streamOptions);
            StreamDelivery delivery = receiver.receive();
            assertNotNull(delivery);
            StreamReceiverMessage message = delivery.message();
            assertNotNull(message);

            // Creating the input stream instance should read the first chunk of data from the incoming
            // delivery which should result in a new credit being available to expand the session window.
            // An additional transfer should be placed into the delivery buffer but not yet read since
            // the user hasn't read anything.
            peer.waitForScriptToComplete(5, TimeUnit.SECONDS);
            peer.expectFlow().withDeliveryCount(0).withIncomingWindow(1).withLinkCredit(10);
            peer.remoteTransfer().withHandle(0)
                                 .withDeliveryId(0)
                                 .withMore(true)
                                 .withMessageFormat(0)
                                 .withPayload(payload2).queue();

            InputStream bodyStream = message.body();
            assertNotNull(bodyStream);

            // Once the read of all data completes the session window should be opened and the
            // stream should mark the delivery as accepted and settled since we are in auto settle
            // mode and there is nothing more to read.
            peer.waitForScriptToComplete(5, TimeUnit.SECONDS);
            peer.expectFlow().withDeliveryCount(0).withIncomingWindow(1).withLinkCredit(10);
            peer.remoteTransfer().withHandle(0)
                                 .withDeliveryId(0)
                                 .withMore(false)
                                 .withMessageFormat(0)
                                 .withPayload(payload3).queue();
            peer.expectFlow().withDeliveryCount(1).withIncomingWindow(1).withLinkCredit(9);
            peer.expectDisposition().withFirst(0).withState().accepted().withSettled(true);

            byte[] combinedPayloads = new byte[body1.length + body2.length];
            bodyStream.read(combinedPayloads);

            assertTrue(Arrays.equals(body1, 0, body1.length, combinedPayloads, 0, body1.length));
            assertTrue(Arrays.equals(body2, 0, body2.length, combinedPayloads, body1.length, body1.length + body2.length));

            bodyStream.close();

            Footer footer = message.footer();
            assertNotNull(footer);
            assertFalse(footer.getValue().isEmpty());
            assertTrue(footer.getValue().containsKey(Symbol.valueOf("footer-key")));

            assertTrue(message.hasFooters());
            assertTrue(message.hasFooter("footer-key"));
            message.forEachFooter((key, value) -> {
                assertEquals(key, "footer-key");
                assertEquals(value, "test");
            });

            delivery.accept();

            peer.waitForScriptToComplete(5, TimeUnit.SECONDS);
            peer.expectDetach().respond();
            peer.expectEnd().respond();
            peer.expectClose().respond();

            receiver.openFuture().get();
            receiver.closeAsync().get();
            connection.closeAsync().get();

            peer.waitForScriptToComplete(5, TimeUnit.SECONDS);
        }
    }

    @Test
    public void testReadBytesFromBodyInputStreamWithinTransactedSession() throws Exception {
        final byte[] body = new byte[] { 0, 1, 2, 3, 4, 5, 6, 7, 8, 9 };
        final byte[] payload = createEncodedMessage(new Data(body));
        final byte[] txnId = new byte[] { 0, 1, 2, 3 };

        try (ProtonTestServer peer = new ProtonTestServer()) {
            peer.expectSASLAnonymousConnect();
            peer.expectOpen().respond();
            peer.expectBegin().respond();
            peer.expectAttach().withRole(Role.RECEIVER.getValue()).respond();
            peer.expectFlow();
            peer.remoteTransfer().withHandle(0)
                                 .withDeliveryId(0)
                                 .withDeliveryTag(new byte[] { 1 })
                                 .withMore(false)
                                 .withMessageFormat(0)
                                 .withPayload(payload).queue();
            peer.start();

            URI remoteURI = peer.getServerURI();

            LOG.info("Test started, peer listening on: {}", remoteURI);

            final Client container = Client.create();
            final Connection connection = container.connect(remoteURI.getHost(), remoteURI.getPort());
            final StreamReceiver receiver = connection.openStreamReceiver("test-queue");

            peer.waitForScriptToComplete(5, TimeUnit.SECONDS);
            peer.expectCoordinatorAttach().respond();
            peer.remoteFlow().withLinkCredit(2).queue();
            peer.expectDeclare().accept(txnId);
            peer.expectDisposition().withSettled(true).withState().transactional().withTxnId(txnId).withAccepted();
            peer.expectDischarge().withFail(false).withTxnId(txnId).accept();

            receiver.session().beginTransaction();

            final StreamDelivery delivery = receiver.receive();

            assertNotNull(delivery);
            assertTrue(delivery.completed());
            assertFalse(delivery.aborted());

            StreamReceiverMessage message = delivery.message();
            assertNotNull(message);

            InputStream bodyStream = message.body();
            assertNotNull(bodyStream);

            assertNull(message.header());
            assertNull(message.annotations());
            assertNull(message.properties());
            assertNull(delivery.annotations());

            final byte[] receivedBody = new byte[body.length];
            for (int i = 0; i < body.length; ++i) {
                receivedBody[i] = (byte) bodyStream.read();
            }
            assertArrayEquals(body, receivedBody);
            assertEquals(-1, bodyStream.read());

            receiver.session().commitTransaction();

            peer.waitForScriptToComplete(5, TimeUnit.SECONDS);

            peer.expectDetach().respond();
            peer.expectEnd().respond();
            peer.expectClose().respond();

            receiver.closeAsync().get();
            connection.closeAsync().get();

            peer.waitForScriptToComplete(5, TimeUnit.SECONDS);
        }
    }

    @Test
    public void testStreamDeliveryHandlesInvalidHeaderEncoding() throws Exception {
        final byte[] payload = createInvalidHeaderEncoding();

        try (ProtonTestServer peer = new ProtonTestServer()) {
            peer.expectSASLAnonymousConnect();
            peer.expectOpen().respond();
            peer.expectBegin().respond();
            peer.expectAttach().withRole(Role.RECEIVER.getValue()).respond();
            peer.expectFlow();
            peer.remoteTransfer().withHandle(0)
                                 .withDeliveryId(0)
                                 .withDeliveryTag(new byte[] { 1 })
                                 .withMore(false)
                                 .withMessageFormat(0)
                                 .withPayload(payload).queue();
            peer.start();

            URI remoteURI = peer.getServerURI();

            LOG.info("Test started, peer listening on: {}", remoteURI);

            final Client container = Client.create();
            final Connection connection = container.connect(remoteURI.getHost(), remoteURI.getPort());
            final StreamReceiverOptions options = new StreamReceiverOptions().autoAccept(false);
            final StreamReceiver receiver = connection.openStreamReceiver("test-queue", options);

            peer.waitForScriptToComplete(5, TimeUnit.SECONDS);
            peer.expectDisposition().withState().rejected("decode-error", "failed reading message header");

            final StreamDelivery delivery = receiver.receive();
            final StreamReceiverMessage message = delivery.message();

            assertThrows(ClientException.class, () -> message.header());
            assertThrows(ClientException.class, () -> message.body());

            delivery.reject("decode-error", "failed reading message header");

            peer.expectDetach().respond();
            peer.expectEnd().respond();
            peer.expectClose().respond();

            receiver.close();
            connection.close();

            peer.waitForScriptToComplete(5, TimeUnit.SECONDS);
        }
    }

    @Test
    public void testStreamDeliveryHandlesInvalidDeliveryAnnotationsEncoding() throws Exception {
        final byte[] payload = createInvalidDeliveryAnnotationsEncoding();

        try (ProtonTestServer peer = new ProtonTestServer()) {
            peer.expectSASLAnonymousConnect();
            peer.expectOpen().respond();
            peer.expectBegin().respond();
            peer.expectAttach().withRole(Role.RECEIVER.getValue()).respond();
            peer.expectFlow();
            peer.remoteTransfer().withHandle(0)
                                 .withDeliveryId(0)
                                 .withDeliveryTag(new byte[] { 1 })
                                 .withMore(false)
                                 .withMessageFormat(0)
                                 .withPayload(payload).queue();
            peer.start();

            URI remoteURI = peer.getServerURI();

            LOG.info("Test started, peer listening on: {}", remoteURI);

            final Client container = Client.create();
            final Connection connection = container.connect(remoteURI.getHost(), remoteURI.getPort());
            final StreamReceiverOptions options = new StreamReceiverOptions().autoAccept(false);
            final StreamReceiver receiver = connection.openStreamReceiver("test-queue", options);

            peer.waitForScriptToComplete(5, TimeUnit.SECONDS);
            peer.expectDisposition().withState().rejected("decode-error", "failed reading message header");

            final StreamDelivery delivery = receiver.receive();
            final StreamReceiverMessage message = delivery.message();

            assertThrows(ClientException.class, () -> delivery.annotations());
            assertThrows(ClientException.class, () -> message.body());

            delivery.reject("decode-error", "failed reading message header");

            peer.expectDetach().respond();
            peer.expectEnd().respond();
            peer.expectClose().respond();

            receiver.close();
            connection.close();

            peer.waitForScriptToComplete(5, TimeUnit.SECONDS);
        }
    }

    @Test
    public void testStreamDeliveryHandlesInvalidMessageAnnotationsEncoding() throws Exception {
        final byte[] payload = createInvalidMessageAnnotationsEncoding();

        try (ProtonTestServer peer = new ProtonTestServer()) {
            peer.expectSASLAnonymousConnect();
            peer.expectOpen().respond();
            peer.expectBegin().respond();
            peer.expectAttach().withRole(Role.RECEIVER.getValue()).respond();
            peer.expectFlow();
            peer.remoteTransfer().withHandle(0)
                                 .withDeliveryId(0)
                                 .withDeliveryTag(new byte[] { 1 })
                                 .withMore(false)
                                 .withMessageFormat(0)
                                 .withPayload(payload).queue();
            peer.start();

            URI remoteURI = peer.getServerURI();

            LOG.info("Test started, peer listening on: {}", remoteURI);

            final Client container = Client.create();
            final Connection connection = container.connect(remoteURI.getHost(), remoteURI.getPort());
            final StreamReceiverOptions options = new StreamReceiverOptions().autoAccept(false);
            final StreamReceiver receiver = connection.openStreamReceiver("test-queue", options);

            peer.waitForScriptToComplete(5, TimeUnit.SECONDS);
            peer.expectDisposition().withState().rejected("decode-error", "failed reading message header");

            final StreamDelivery delivery = receiver.receive();
            final StreamReceiverMessage message = delivery.message();

            assertThrows(ClientException.class, () -> message.annotations());
            assertThrows(ClientException.class, () -> message.body());

            delivery.reject("decode-error", "failed reading message header");

            peer.expectDetach().respond();
            peer.expectEnd().respond();
            peer.expectClose().respond();

            receiver.close();
            connection.close();

            peer.waitForScriptToComplete(5, TimeUnit.SECONDS);
        }
    }

    @Test
    public void testStreamDeliveryHandlesInvalidPropertiesEncoding() throws Exception {
        final byte[] payload = createInvalidPropertiesEncoding();

        try (ProtonTestServer peer = new ProtonTestServer()) {
            peer.expectSASLAnonymousConnect();
            peer.expectOpen().respond();
            peer.expectBegin().respond();
            peer.expectAttach().withRole(Role.RECEIVER.getValue()).respond();
            peer.expectFlow();
            peer.remoteTransfer().withHandle(0)
                                 .withDeliveryId(0)
                                 .withDeliveryTag(new byte[] { 1 })
                                 .withMore(false)
                                 .withMessageFormat(0)
                                 .withPayload(payload).queue();
            peer.start();

            URI remoteURI = peer.getServerURI();

            LOG.info("Test started, peer listening on: {}", remoteURI);

            final Client container = Client.create();
            final Connection connection = container.connect(remoteURI.getHost(), remoteURI.getPort());
            final StreamReceiverOptions options = new StreamReceiverOptions().autoAccept(false);
            final StreamReceiver receiver = connection.openStreamReceiver("test-queue", options);

            peer.waitForScriptToComplete(5, TimeUnit.SECONDS);
            peer.expectDisposition().withState().rejected("decode-error", "failed reading message header");

            final StreamDelivery delivery = receiver.receive();
            final StreamReceiverMessage message = delivery.message();

            assertThrows(ClientException.class, () -> message.properties());
            assertThrows(ClientException.class, () -> message.body());

            delivery.reject("decode-error", "failed reading message header");

            peer.expectDetach().respond();
            peer.expectEnd().respond();
            peer.expectClose().respond();

            receiver.close();
            connection.close();

            peer.waitForScriptToComplete(5, TimeUnit.SECONDS);
        }
    }

    @Test
    public void testStreamDeliveryHandlesInvalidApplicationPropertiesEncoding() throws Exception {
        final byte[] payload = createInvalidApplicationPropertiesEncoding();

        try (ProtonTestServer peer = new ProtonTestServer()) {
            peer.expectSASLAnonymousConnect();
            peer.expectOpen().respond();
            peer.expectBegin().respond();
            peer.expectAttach().withRole(Role.RECEIVER.getValue()).respond();
            peer.expectFlow();
            peer.remoteTransfer().withHandle(0)
                                 .withDeliveryId(0)
                                 .withDeliveryTag(new byte[] { 1 })
                                 .withMore(false)
                                 .withMessageFormat(0)
                                 .withPayload(payload).queue();
            peer.start();

            URI remoteURI = peer.getServerURI();

            LOG.info("Test started, peer listening on: {}", remoteURI);

            final Client container = Client.create();
            final Connection connection = container.connect(remoteURI.getHost(), remoteURI.getPort());
            final StreamReceiverOptions options = new StreamReceiverOptions().autoAccept(false);
            final StreamReceiver receiver = connection.openStreamReceiver("test-queue", options);

            peer.waitForScriptToComplete(5, TimeUnit.SECONDS);
            peer.expectDisposition().withState().rejected("decode-error", "failed reading message header");

            final StreamDelivery delivery = receiver.receive();
            final StreamReceiverMessage message = delivery.message();

            assertThrows(ClientException.class, () -> message.applicationProperties());
            assertThrows(ClientException.class, () -> message.body());

            delivery.reject("decode-error", "failed reading message header");

            peer.expectDetach().respond();
            peer.expectEnd().respond();
            peer.expectClose().respond();

            receiver.close();
            connection.close();

            peer.waitForScriptToComplete(5, TimeUnit.SECONDS);
        }
    }

    @Test
    public void testStreamDeliveryHandlesInvalidHeaderEncodingDuringBodyStreamOpen() throws Exception {
        final byte[] payload = createInvalidHeaderEncoding();

        try (ProtonTestServer peer = new ProtonTestServer()) {
            peer.expectSASLAnonymousConnect();
            peer.expectOpen().respond();
            peer.expectBegin().respond();
            peer.expectAttach().withRole(Role.RECEIVER.getValue()).respond();
            peer.expectFlow();
            peer.remoteTransfer().withHandle(0)
                                 .withDeliveryId(0)
                                 .withDeliveryTag(new byte[] { 1 })
                                 .withMore(false)
                                 .withMessageFormat(0)
                                 .withPayload(payload).queue();
            peer.start();

            URI remoteURI = peer.getServerURI();

            LOG.info("Test started, peer listening on: {}", remoteURI);

            final Client container = Client.create();
            final Connection connection = container.connect(remoteURI.getHost(), remoteURI.getPort());
            final StreamReceiverOptions options = new StreamReceiverOptions().autoAccept(false);
            final StreamReceiver receiver = connection.openStreamReceiver("test-queue", options);

            peer.waitForScriptToComplete(5, TimeUnit.SECONDS);
            peer.expectDisposition().withState().rejected("decode-error", "failed reading message header");

            final StreamDelivery delivery = receiver.receive();

            StreamReceiverMessage message = delivery.message();

            assertThrows(ClientException.class, () -> message.body());

            delivery.reject("decode-error", "failed reading message header");

            peer.expectDetach().respond();
            peer.expectEnd().respond();
            peer.expectClose().respond();

            receiver.close();
            connection.close();

            peer.waitForScriptToComplete(5, TimeUnit.SECONDS);
        }
    }

    @Test
    public void testConnectionDropsDuringStreamedBodyRead() throws Exception {
        final byte[] body1 = new byte[] { 0, 1, 2, 3, 4, 5, 6, 7, 8, 9 };
        final byte[] body2 = new byte[] { 10, 9, 8, 7, 6, 5, 4, 3, 2, 1, 0 };
        final byte[] payload1 = createEncodedMessage(new Data(body1));
        final byte[] payload2 = createEncodedMessage(new Data(body2));

        final CountDownLatch disconnected = new CountDownLatch(1);

        try (ProtonTestServer peer = new ProtonTestServer()) {
            peer.expectSASLAnonymousConnect();
            peer.expectOpen().withMaxFrameSize(1000).respond();
            peer.expectBegin().withIncomingWindow(1).respond();
            peer.expectAttach().ofReceiver().respond();
            peer.expectFlow().withIncomingWindow(1).withLinkCredit(1);
            peer.remoteTransfer().withHandle(0)
                                 .withDeliveryId(0)
                                 .withDeliveryTag(new byte[] { 1 })
                                 .withMore(true)
                                 .withMessageFormat(0)
                                 .withPayload(payload1).queue();
            peer.start();

            URI remoteURI = peer.getServerURI();

            LOG.info("Test started, peer listening on: {}", remoteURI);

            Client container = Client.create();
            ConnectionOptions connectionOptions = new ConnectionOptions().maxFrameSize(1000);
            connectionOptions.disconnectedHandler((conn, event) -> disconnected.countDown());
            Connection connection = container.connect(remoteURI.getHost(), remoteURI.getPort(), connectionOptions);
            StreamReceiverOptions streamOptions = new StreamReceiverOptions().readBufferSize(2000).creditWindow(1);
            StreamReceiver receiver = connection.openStreamReceiver("test-queue", streamOptions);
            StreamDelivery delivery = receiver.receive();
            StreamReceiverMessage message = delivery.message();

            // Creating the input stream instance should read the first chunk of data from the incoming
            // delivery which should result in a new credit being available to expand the session window.
            // An additional transfer should be placed into the delivery buffer but not yet read since
            // the user hasn't read anything.
            peer.waitForScriptToComplete(5, TimeUnit.SECONDS);
            peer.expectFlow().withDeliveryCount(0).withIncomingWindow(1).withLinkCredit(1);
            peer.remoteTransfer().withHandle(0)
                                 .withDeliveryId(0)
                                 .withMore(true)
                                 .withMessageFormat(0)
                                 .withPayload(payload2).queue();
            peer.dropAfterLastHandler();

            InputStream bodyStream = message.body();
            assertNotNull(bodyStream);

            assertTrue(disconnected.await(5, TimeUnit.SECONDS));

            byte[] readPayload = new byte[body1.length + body2.length];

            try {
                bodyStream.read(readPayload);
                fail("Should not be able to read from closed connection stream");
            } catch (IOException ioe) {
                // Connection should be down now.
            }

            bodyStream.close();

            peer.waitForScriptToComplete(5, TimeUnit.SECONDS);
        }
    }

    @Test
    public void testFrameSizeViolationWhileWaitingForIncomingStreamReceiverContent() throws Exception {
        byte[] overFrameSizeLimitFrameHeader = new byte[] { 0x00, (byte) 0xA0, 0x00, 0x00, 0x02, 0x00, 0x00, 0x00 };

        final byte[] body = new byte[] { 0, 1, 2, 3, 4, 5, 6, 7, 8, 9 };
        final byte[] payload = createEncodedMessage(new Data(body));

        try (ProtonTestServer peer = new ProtonTestServer()) {
            peer.expectSASLAnonymousConnect();
            peer.expectOpen().withMaxFrameSize(65535).respond();
            peer.expectBegin().respond();
            peer.expectAttach().ofReceiver().respond();
            peer.expectFlow().withLinkCredit(1);
            peer.remoteTransfer().withHandle(0)
                                 .withDeliveryId(0)
                                 .withDeliveryTag(new byte[] { 1 })
                                 .withMore(true)
                                 .withMessageFormat(0)
                                 .withPayload(payload).queue();
            peer.start();

            URI remoteURI = peer.getServerURI();

            LOG.info("Test started, peer listening on: {}", remoteURI);

            Client container = Client.create();
            ConnectionOptions connectionOptions = new ConnectionOptions().maxFrameSize(65535);
            Connection connection = container.connect(remoteURI.getHost(), remoteURI.getPort(), connectionOptions);
            StreamReceiverOptions streamOptions = new StreamReceiverOptions().creditWindow(1);
            StreamReceiver receiver = connection.openStreamReceiver("test-queue", streamOptions);
            StreamDelivery delivery = receiver.receive();
            StreamReceiverMessage message = delivery.message();
            InputStream stream = message.body();

            peer.waitForScriptToComplete();
            peer.expectClose().respond();
            peer.remoteBytes().withBytes(overFrameSizeLimitFrameHeader).later(10);

            byte[] bytesToRead = new byte[body.length * 2];

            try {
                stream.read(bytesToRead);
                fail("Should throw an error indicating issue with read of payload");
            } catch (IOException ioe) {
                // Expected
            }

            peer.waitForScriptToComplete(5, TimeUnit.SECONDS);
        }
    }

    @Test
    public void testStreamReceiverTryReadAmqpSequenceBytes() throws Exception {
        final List<String> stringList = new ArrayList<>();
        stringList.add("Hello World");
        final byte[] payload = createEncodedMessage(new AmqpSequence<>(stringList));

        doTestStreamReceiverReadsNonDataSectionBody(payload);
    }

    @Test
    public void testStreamReceiverTryReadAmqpValueBytes() throws Exception {
        final byte[] payload = createEncodedMessage(new AmqpValue<>("Hello World"));

        doTestStreamReceiverReadsNonDataSectionBody(payload);
    }

    private void doTestStreamReceiverReadsNonDataSectionBody(byte[] payload) throws Exception {
        try (ProtonTestServer peer = new ProtonTestServer()) {
            peer.expectSASLAnonymousConnect();
            peer.expectOpen().respond();
            peer.expectBegin().respond();
            peer.expectAttach().withRole(Role.RECEIVER.getValue()).respond();
            peer.expectFlow();
            peer.remoteTransfer().withHandle(0)
                                 .withDeliveryId(0)
                                 .withDeliveryTag(new byte[] { 1 })
                                 .withSettled(true)
                                 .withMore(false)
                                 .withMessageFormat(0)
                                 .withPayload(payload).queue();
            peer.start();

            URI remoteURI = peer.getServerURI();

            LOG.info("Test started, peer listening on: {}", remoteURI);

            final Client container = Client.create();
            final Connection connection = container.connect(remoteURI.getHost(), remoteURI.getPort());
            final StreamReceiver receiver = connection.openStreamReceiver("test-queue");

            peer.waitForScriptToComplete(5, TimeUnit.SECONDS);

            final StreamDelivery delivery = receiver.receive();
            final StreamReceiverMessage message = delivery.message();
            try {
                message.body();
                fail("Should not return a stream since we cannot read this type");
            } catch (ClientException cliEx) {
                // Expected
            }

            peer.expectDetach().respond();
            peer.expectEnd().respond();
            peer.expectClose().respond();

            receiver.close();
            connection.close();

            peer.waitForScriptToComplete(5, TimeUnit.SECONDS);
        }
    }

    @Test
    public void testReadMessageHeaderFromStreamReceiverMessage() throws Exception {
        final Header header = new Header();

        header.setDeliveryCount(UnsignedInteger.MAX_VALUE.longValue());
        header.setDurable(true);
        header.setFirstAcquirer(false);
        header.setPriority((byte) 255);
        header.setTimeToLive(Integer.MAX_VALUE);

        final byte[] payload = createEncodedMessage(header);

        try (ProtonTestServer peer = new ProtonTestServer()) {
            peer.expectSASLAnonymousConnect();
            peer.expectOpen().respond();
            peer.expectBegin().respond();
            peer.expectAttach().withRole(Role.RECEIVER.getValue()).respond();
            peer.expectFlow();
            peer.remoteTransfer().withHandle(0)
                                 .withDeliveryId(0)
                                 .withDeliveryTag(new byte[] { 1 })
                                 .withMore(false)
                                 .withMessageFormat(0)
                                 .withPayload(payload).queue();
            peer.expectDisposition().withFirst(0).withState().accepted().withSettled(true);
            peer.start();

            URI remoteURI = peer.getServerURI();

            LOG.info("Test started, peer listening on: {}", remoteURI);

            final Client container = Client.create();
            final Connection connection = container.connect(remoteURI.getHost(), remoteURI.getPort());
            final StreamReceiver receiver = connection.openStreamReceiver("test-queue");
            final StreamDelivery delivery = receiver.receive();

            assertNotNull(delivery);
            assertTrue(delivery.completed());
            assertFalse(delivery.aborted());

            StreamReceiverMessage message = delivery.message();
            assertNotNull(message);

            Header readHeader = message.header();
            assertNotNull(readHeader);
            assertNull(message.body());

            assertEquals(Integer.toUnsignedLong(Integer.MAX_VALUE), message.timeToLive());
            assertEquals(true, message.durable());
            assertEquals(false, message.firstAcquirer());
            assertEquals((byte) 255, message.priority());
            assertEquals(Integer.toUnsignedLong(Integer.MAX_VALUE), message.timeToLive());

            peer.expectDetach().respond();
            peer.expectEnd().respond();
            peer.expectClose().respond();

            receiver.closeAsync().get();
            connection.closeAsync().get();

            peer.waitForScriptToComplete(5, TimeUnit.SECONDS);
        }
    }

    @Test
    public void testReadMessagePropertiesFromStreamReceiverMessage() throws Exception {
        final Properties properties = new Properties();

        properties.setAbsoluteExpiryTime(Integer.MAX_VALUE);
        properties.setContentEncoding("utf8");
        properties.setContentType("text/plain");
        properties.setCorrelationId(new Binary(new byte[] { 1, 2, 3 }));
        properties.setCreationTime(Short.MAX_VALUE);
        properties.setGroupId("Group");
        properties.setGroupSequence(UnsignedInteger.MAX_VALUE.longValue());
        properties.setMessageId(UUID.randomUUID());
        properties.setReplyTo("replyTo");
        properties.setReplyToGroupId("group-1");
        properties.setSubject("test");
        properties.setTo("queue");
        properties.setUserId(new byte[] { 0, 1, 5, 6, 9 });

        final byte[] payload = createEncodedMessage(new Header(), properties);

        try (ProtonTestServer peer = new ProtonTestServer()) {
            peer.expectSASLAnonymousConnect();
            peer.expectOpen().respond();
            peer.expectBegin().respond();
            peer.expectAttach().withRole(Role.RECEIVER.getValue()).respond();
            peer.expectFlow();
            peer.remoteTransfer().withHandle(0)
                                 .withDeliveryId(0)
                                 .withDeliveryTag(new byte[] { 1 })
                                 .withMore(false)
                                 .withMessageFormat(0)
                                 .withPayload(payload).queue();
            peer.expectDisposition().withFirst(0).withState().accepted().withSettled(true);
            peer.start();

            URI remoteURI = peer.getServerURI();

            LOG.info("Test started, peer listening on: {}", remoteURI);

            final Client container = Client.create();
            final Connection connection = container.connect(remoteURI.getHost(), remoteURI.getPort());
            final StreamReceiver receiver = connection.openStreamReceiver("test-queue");
            final StreamDelivery delivery = receiver.receive();

            assertNotNull(delivery);
            assertTrue(delivery.completed());
            assertFalse(delivery.aborted());

            StreamReceiverMessage message = delivery.message();
            assertNotNull(message);

            assertFalse(message.hasProperties());
            assertFalse(message.hasFooters());
            assertFalse(message.hasAnnotations());

            Properties readProperties = message.properties();
            assertNotNull(readProperties);
            Header header = message.header();
            assertNotNull(header);
            assertNull(message.body());

            assertEquals(Integer.MAX_VALUE, message.absoluteExpiryTime());
            assertEquals("utf8", message.contentEncoding());
            assertEquals("text/plain", message.contentType());
            assertEquals(new Binary(new byte[] { 1, 2, 3 }), message.correlationId());
            assertEquals("utf8", message.contentEncoding());
            assertEquals("utf8", message.contentEncoding());
            assertEquals("utf8", message.contentEncoding());
            assertEquals(Short.MAX_VALUE, message.creationTime());
            assertEquals(UnsignedInteger.MAX_VALUE.intValue(), message.groupSequence());
            assertEquals(properties.getMessageId(), message.messageId());
            assertEquals("replyTo", message.replyTo());
            assertEquals("group-1", message.replyToGroupId());
            assertEquals("test", message.subject());
            assertEquals("queue", message.to());
            assertArrayEquals(new byte[] { 0, 1, 5, 6, 9 }, message.userId());

            peer.expectDetach().respond();
            peer.expectEnd().respond();
            peer.expectClose().respond();

            receiver.closeAsync().get();
            connection.closeAsync().get();

            peer.waitForScriptToComplete(5, TimeUnit.SECONDS);
        }
    }

    @Test
    public void testReadApplicationPropertiesStreamReceiverMessage() throws Exception {
        final Map<String, Object> propertiesMap = new HashMap<>();
        final ApplicationProperties appProperties = new ApplicationProperties(propertiesMap);

        propertiesMap.put("property1", UnsignedInteger.MAX_VALUE);
        propertiesMap.put("property2", UnsignedInteger.ONE);
        propertiesMap.put("property3", UnsignedInteger.ZERO);

        final byte[] payload = createEncodedMessage(appProperties);

        try (ProtonTestServer peer = new ProtonTestServer()) {
            peer.expectSASLAnonymousConnect();
            peer.expectOpen().respond();
            peer.expectBegin().respond();
            peer.expectAttach().withRole(Role.RECEIVER.getValue()).respond();
            peer.expectFlow();
            peer.remoteTransfer().withHandle(0)
                                 .withDeliveryId(0)
                                 .withDeliveryTag(new byte[] { 1 })
                                 .withMore(false)
                                 .withMessageFormat(0)
                                 .withPayload(payload).queue();
            peer.expectDisposition().withFirst(0).withState().accepted().withSettled(true);
            peer.start();

            URI remoteURI = peer.getServerURI();

            LOG.info("Test started, peer listening on: {}", remoteURI);

            final Client container = Client.create();
            final Connection connection = container.connect(remoteURI.getHost(), remoteURI.getPort());
            final StreamReceiver receiver = connection.openStreamReceiver("test-queue");
            final StreamDelivery delivery = receiver.receive();

            assertNotNull(delivery);
            assertTrue(delivery.completed());
            assertFalse(delivery.aborted());

            StreamReceiverMessage message = delivery.message();
            assertNotNull(message);

            assertTrue(message.hasProperties());
            assertFalse(message.hasFooters());
            assertFalse(message.hasAnnotations());

            assertFalse(message.hasProperty("property"));
            assertEquals(UnsignedInteger.MAX_VALUE, message.property("property1"));
            assertEquals(UnsignedInteger.ONE, message.property("property2"));
            assertEquals(UnsignedInteger.ZERO, message.property("property3"));

            message.forEachProperty((key, value) -> {
                assertTrue(propertiesMap.containsKey(key));
                assertEquals(value, propertiesMap.get(key));
            });

            peer.expectDetach().respond();
            peer.expectEnd().respond();
            peer.expectClose().respond();

            receiver.closeAsync().get();
            connection.closeAsync().get();

            peer.waitForScriptToComplete(5, TimeUnit.SECONDS);
        }
    }

    @Test
    public void testDrainFutureSignalsFailureWhenDrainTimeoutExceeded() throws Exception {
        try (ProtonTestServer peer = new ProtonTestServer()) {
            peer.expectSASLAnonymousConnect();
            peer.expectOpen().respond();
            peer.expectBegin().respond();
            peer.expectAttach().ofReceiver().respond();
            peer.expectFlow();
            peer.expectFlow().withDrain(true);
            peer.expectClose().respond();
            peer.start();

            URI remoteURI = peer.getServerURI();

            LOG.info("Test started, peer listening on: {}", remoteURI);

            Client container = Client.create();
            Connection connection = container.connect(remoteURI.getHost(), remoteURI.getPort());
            StreamReceiverOptions receiverOptions = new StreamReceiverOptions().drainTimeout(15);
            Receiver receiver = connection.openStreamReceiver("test-queue", receiverOptions).openFuture().get();

            try {
                receiver.drain().get();
                fail("Drain call should fail timeout exceeded.");
            } catch (ExecutionException cliEx) {
                LOG.debug("Receiver threw error on drain call", cliEx);
                assertTrue(cliEx.getCause() instanceof ClientOperationTimedOutException);
            }

            connection.closeAsync().get();

            peer.waitForScriptToComplete(5, TimeUnit.SECONDS);
        }
    }

    @Test
    public void testDrainFutureSignalsFailureWhenConnectionDrainTimeoutExceeded() throws Exception {
        try (ProtonTestServer peer = new ProtonTestServer()) {
            peer.expectSASLAnonymousConnect();
            peer.expectOpen().respond();
            peer.expectBegin().respond();
            peer.expectAttach().ofReceiver().respond();
            peer.expectFlow();
            peer.expectFlow().withDrain(true);
            peer.expectClose().respond();
            peer.start();

            URI remoteURI = peer.getServerURI();

            LOG.info("Test started, peer listening on: {}", remoteURI);

            Client container = Client.create();
            ConnectionOptions connectionOptions = new ConnectionOptions().drainTimeout(20);
            Connection connection = container.connect(remoteURI.getHost(), remoteURI.getPort(), connectionOptions);
            Receiver receiver = connection.openStreamReceiver("test-queue").openFuture().get();

            try {
                receiver.drain().get();
                fail("Drain call should fail timeout exceeded.");
            } catch (ExecutionException cliEx) {
                LOG.debug("Receiver threw error on drain call", cliEx);
                assertTrue(cliEx.getCause() instanceof ClientOperationTimedOutException);
            }

            connection.closeAsync().get();

            peer.waitForScriptToComplete(5, TimeUnit.SECONDS);
        }
    }

    @Test
    public void testDrainCompletesWhenReceiverHasNoCredit() throws Exception {
        try (ProtonTestServer peer = new ProtonTestServer()) {
            peer.expectSASLAnonymousConnect();
            peer.expectOpen().respond();
            peer.expectBegin().respond();
            peer.expectAttach().withRole(Role.RECEIVER.getValue()).respond();
            peer.start();

            URI remoteURI = peer.getServerURI();

            LOG.info("Test started, peer listening on: {}", remoteURI);

            Client container = Client.create();
            Connection connection = container.connect(remoteURI.getHost(), remoteURI.getPort());
            Receiver receiver = connection.openStreamReceiver("test-queue", new StreamReceiverOptions().creditWindow(0));
            receiver.openFuture().get(5, TimeUnit.SECONDS);

            peer.waitForScriptToComplete(5, TimeUnit.SECONDS);

            Future<? extends Receiver> draining = receiver.drain();
            draining.get(5, TimeUnit.SECONDS);

            // Close things down
            peer.expectClose().respond();
            connection.closeAsync().get(5, TimeUnit.SECONDS);

            peer.waitForScriptToComplete(1, TimeUnit.SECONDS);
        }
    }

    @Test
    public void testDrainAdditionalDrainCallThrowsWhenReceiverStillDraining() throws Exception {
        try (ProtonTestServer peer = new ProtonTestServer()) {
            peer.expectSASLAnonymousConnect();
            peer.expectOpen().respond();
            peer.expectBegin().respond();
            peer.expectAttach().ofReceiver().respond();
            peer.expectFlow();
            peer.expectFlow().withDrain(true);
            peer.expectClose().respond();
            peer.start();

            URI remoteURI = peer.getServerURI();

            LOG.info("Test started, peer listening on: {}", remoteURI);

            Client container = Client.create();
            Connection connection = container.connect(remoteURI.getHost(), remoteURI.getPort());
            StreamReceiverOptions receiverOptions = new StreamReceiverOptions();
            Receiver receiver = connection.openStreamReceiver("test-queue", receiverOptions).openFuture().get();

            receiver.drain();

            try {
                receiver.drain().get();
                fail("Drain call should fail timeout exceeded.");
            } catch (ExecutionException cliEx) {
                LOG.debug("Receiver threw error on drain call", cliEx);
                assertTrue(cliEx.getCause() instanceof ClientIllegalStateException);
            }

            connection.closeAsync().get();

            peer.waitForScriptToComplete(5, TimeUnit.SECONDS);
        }
    }

    @Test
    public void testReceiverGetRemotePropertiesWaitsForRemoteAttach() throws Exception {
        tryReadReceiverRemoteProperties(true);
    }

    @Test
    public void testReceiverGetRemotePropertiesFailsAfterOpenTimeout() throws Exception {
        tryReadReceiverRemoteProperties(false);
    }

    private void tryReadReceiverRemoteProperties(boolean attachResponse) throws Exception {
        try (ProtonTestServer peer = new ProtonTestServer()) {
            peer.expectSASLAnonymousConnect();
            peer.expectOpen().respond();
            peer.expectBegin().respond();
            peer.expectAttach().withRole(Role.RECEIVER.getValue());
            peer.expectFlow();
            peer.start();

            URI remoteURI = peer.getServerURI();

            LOG.info("Test started, peer listening on: {}", remoteURI);

            Client container = Client.create();
            Connection connection = container.connect(remoteURI.getHost(), remoteURI.getPort());
            StreamReceiverOptions options = new StreamReceiverOptions().openTimeout(150, TimeUnit.MILLISECONDS);
            StreamReceiver receiver = connection.openStreamReceiver("test-receiver", options);

            peer.waitForScriptToComplete(5, TimeUnit.SECONDS);

            Map<String, Object> expectedProperties = new HashMap<>();
            expectedProperties.put("TEST", "test-property");

            if (attachResponse) {
                peer.expectDetach().respond();
                peer.expectEnd().respond();
                peer.respondToLastAttach().withPropertiesMap(expectedProperties).later(10);
            } else {
                peer.expectDetach();
                peer.expectEnd();
            }

            if (attachResponse) {
                assertNotNull(receiver.properties(), "Remote should have responded with a remote properties value");
                assertEquals(expectedProperties, receiver.properties());
            } else {
                try {
                    receiver.properties();
                    fail("Should failed to get remote state due to no attach response");
                } catch (ClientException ex) {
                    LOG.debug("Caught expected exception from blocking call", ex);
                }
            }

            try {
                receiver.closeAsync().get();
            } catch (ExecutionException ex) {
                LOG.debug("Caught unexpected exception from close call", ex);
                fail("Should not fail to close when connection not closed and detach sent");
            }

            peer.expectClose().respond();
            connection.closeAsync().get();

            peer.waitForScriptToComplete(5, TimeUnit.SECONDS);
        }
    }

    @Test
    public void testReceiverGetRemoteOfferedCapabilitiesWaitsForRemoteAttach() throws Exception {
        tryReadReceiverRemoteOfferedCapabilities(true);
    }

    @Test
    public void testReceiverGetRemoteOfferedCapabilitiesFailsAfterOpenTimeout() throws Exception {
        tryReadReceiverRemoteOfferedCapabilities(false);
    }

    private void tryReadReceiverRemoteOfferedCapabilities(boolean attachResponse) throws Exception {
        try (ProtonTestServer peer = new ProtonTestServer()) {
            peer.expectSASLAnonymousConnect();
            peer.expectOpen().respond();
            peer.expectBegin().respond();
            peer.expectAttach().withRole(Role.RECEIVER.getValue());
            peer.expectFlow();
            peer.start();

            URI remoteURI = peer.getServerURI();

            LOG.info("Test started, peer listening on: {}", remoteURI);

            Client container = Client.create();
            Connection connection = container.connect(remoteURI.getHost(), remoteURI.getPort());
            StreamReceiverOptions options = new StreamReceiverOptions().openTimeout(150, TimeUnit.MILLISECONDS);
            StreamReceiver receiver = connection.openStreamReceiver("test-receiver", options);

            peer.waitForScriptToComplete(5, TimeUnit.SECONDS);

            if (attachResponse) {
                peer.expectDetach().respond();
                peer.expectEnd().respond();
                peer.respondToLastAttach().withOfferedCapabilities("QUEUE").later(10);
            } else {
                peer.expectDetach();
                peer.expectEnd();
            }

            if (attachResponse) {
                assertNotNull(receiver.offeredCapabilities(), "Remote should have responded with a remote offered Capabilities value");
                assertEquals(1, receiver.offeredCapabilities().length);
                assertEquals("QUEUE", receiver.offeredCapabilities()[0]);
            } else {
                try {
                    receiver.offeredCapabilities();
                    fail("Should failed to get remote state due to no attach response");
                } catch (ClientException ex) {
                    LOG.debug("Caught expected exception from blocking call", ex);
                }
            }

            try {
                receiver.closeAsync().get();
            } catch (ExecutionException ex) {
                LOG.debug("Caught unexpected exception from close call", ex);
                fail("Should not fail to close when connection not closed and detach sent");
            }

            peer.expectClose().respond();
            connection.closeAsync().get();

            peer.waitForScriptToComplete(5, TimeUnit.SECONDS);
        }
    }

    @Test
    public void testReceiverGetRemoteDesiredCapabilitiesWaitsForRemoteAttach() throws Exception {
        tryReadReceiverRemoteDesiredCapabilities(true);
    }

    @Test
    public void testReceiverGetRemoteDesiredCapabilitiesFailsAfterOpenTimeout() throws Exception {
        tryReadReceiverRemoteDesiredCapabilities(false);
    }

    private void tryReadReceiverRemoteDesiredCapabilities(boolean attachResponse) throws Exception {
        try (ProtonTestServer peer = new ProtonTestServer()) {
            peer.expectSASLAnonymousConnect();
            peer.expectOpen().respond();
            peer.expectBegin().respond();
            peer.expectAttach().withRole(Role.RECEIVER.getValue());
            peer.expectFlow();
            peer.start();

            URI remoteURI = peer.getServerURI();

            LOG.info("Test started, peer listening on: {}", remoteURI);

            Client container = Client.create();
            Connection connection = container.connect(remoteURI.getHost(), remoteURI.getPort());
            StreamReceiverOptions options = new StreamReceiverOptions().openTimeout(150, TimeUnit.MILLISECONDS);
            StreamReceiver receiver = connection.openStreamReceiver("test-receiver", options);

            peer.waitForScriptToComplete(5, TimeUnit.SECONDS);

            if (attachResponse) {
                peer.expectDetach().respond();
                peer.expectEnd().respond();
                peer.respondToLastAttach().withDesiredCapabilities("Error-Free").later(10);
            } else {
                peer.expectDetach();
                peer.expectEnd();
            }

            if (attachResponse) {
                assertNotNull(receiver.desiredCapabilities(), "Remote should have responded with a remote desired Capabilities value");
                assertEquals(1, receiver.desiredCapabilities().length);
                assertEquals("Error-Free", receiver.desiredCapabilities()[0]);
            } else {
                try {
                    receiver.desiredCapabilities();
                    fail("Should failed to get remote state due to no attach response");
                } catch (ClientException ex) {
                    LOG.debug("Caught expected exception from blocking call", ex);
                }
            }

            try {
                receiver.closeAsync().get();
            } catch (ExecutionException ex) {
                LOG.debug("Caught unexpected exception from close call", ex);
                fail("Should not fail to close when connection not closed and detach sent");
            }

            peer.expectClose().respond();
            connection.closeAsync().get();

            peer.waitForScriptToComplete(5, TimeUnit.SECONDS);
        }
    }

    @Test
    public void testReceiverGetTargetWaitsForRemoteAttach() throws Exception {
        tryReadReceiverTarget(true);
    }

    @Test
    public void testReceiverGetTargetFailsAfterOpenTimeout() throws Exception {
        tryReadReceiverTarget(false);
    }

    private void tryReadReceiverTarget(boolean attachResponse) throws Exception {
        try (ProtonTestServer peer = new ProtonTestServer()) {
            peer.expectSASLAnonymousConnect();
            peer.expectOpen().respond();
            peer.expectBegin().respond();
            peer.expectAttach().withRole(Role.RECEIVER.getValue());
            peer.expectFlow();
            peer.start();

            URI remoteURI = peer.getServerURI();

            LOG.info("Test started, peer listening on: {}", remoteURI);

            Client container = Client.create();
            Connection connection = container.connect(remoteURI.getHost(), remoteURI.getPort());
            StreamReceiverOptions options = new StreamReceiverOptions().openTimeout(150, TimeUnit.MILLISECONDS);
            StreamReceiver receiver = connection.openStreamReceiver("test-receiver", options);

            peer.waitForScriptToComplete(5, TimeUnit.SECONDS);

            if (attachResponse) {
                peer.expectDetach().respond();
                peer.expectEnd().respond();
                peer.respondToLastAttach().later(10);
            } else {
                peer.expectDetach();
                peer.expectEnd();
            }

            if (attachResponse) {
                assertNotNull(receiver.target(), "Remote should have responded with a Target value");
            } else {
                try {
                    receiver.target();
                    fail("Should failed to get remote source due to no attach response");
                } catch (ClientException ex) {
                    LOG.debug("Caught expected exception from blocking call", ex);
                }
            }

            try {
                receiver.closeAsync().get();
            } catch (ExecutionException ex) {
                LOG.debug("Caught unexpected exception from close call", ex);
                fail("Should not fail to close when connection not closed and detach sent");
            }

            peer.expectClose().respond();
            connection.closeAsync().get();

            peer.waitForScriptToComplete(5, TimeUnit.SECONDS);
        }
    }

    @Test
    public void testReceiverGetSourceWaitsForRemoteAttach() throws Exception {
        tryReadReceiverSource(true);
    }

    @Test
    public void testReceiverGetSourceFailsAfterOpenTimeout() throws Exception {
        tryReadReceiverSource(false);
    }

    private void tryReadReceiverSource(boolean attachResponse) throws Exception {
        try (ProtonTestServer peer = new ProtonTestServer()) {
            peer.expectSASLAnonymousConnect();
            peer.expectOpen().respond();
            peer.expectBegin().respond();
            peer.expectAttach().withRole(Role.RECEIVER.getValue());
            peer.expectFlow();
            peer.start();

            URI remoteURI = peer.getServerURI();

            LOG.info("Test started, peer listening on: {}", remoteURI);

            Client container = Client.create();
            Connection connection = container.connect(remoteURI.getHost(), remoteURI.getPort());
            StreamReceiverOptions options = new StreamReceiverOptions().openTimeout(150, TimeUnit.MILLISECONDS);
            StreamReceiver receiver = connection.openStreamReceiver("test-receiver", options);

            peer.waitForScriptToComplete(5, TimeUnit.SECONDS);

            if (attachResponse) {
                peer.expectDetach().respond();
                peer.expectEnd().respond();
                peer.respondToLastAttach().later(10);
            } else {
                peer.expectDetach();
                peer.expectEnd();
            }

            if (attachResponse) {
                assertNotNull(receiver.source(), "Remote should have responded with a Source value");
                assertEquals("test-receiver", receiver.source().address());
            } else {
                try {
                    receiver.source();
                    fail("Should failed to get remote source due to no attach response");
                } catch (ClientException ex) {
                    LOG.debug("Caught expected exception from blocking call", ex);
                }
            }

            try {
                receiver.closeAsync().get();
            } catch (ExecutionException ex) {
                LOG.debug("Caught unexpected exception from close call", ex);
                fail("Should not fail to close when connection not closed and detach sent");
            }

            peer.expectClose().respond();
            connection.closeAsync().get();

            peer.waitForScriptToComplete(5, TimeUnit.SECONDS);
        }
    }

    @Test
    public void testReceiverCreditReplenishedAfterSyncReceiveAutoAccept() throws Exception {
       doTestReceiverCreditReplenishedAfterSyncReceive(true);
    }

    @Test
    public void testReceiverCreditReplenishedAfterSyncReceiveManualAccept() throws Exception {
       doTestReceiverCreditReplenishedAfterSyncReceive(false);
    }

    public void doTestReceiverCreditReplenishedAfterSyncReceive(boolean autoAccept) throws Exception {
       byte[] payload = createEncodedMessage(new AmqpValue<String>("Hello World"));

       try (ProtonTestServer peer = new ProtonTestServer()) {
          peer.expectSASLAnonymousConnect();
          peer.expectOpen().respond();
          peer.expectBegin().respond();
          peer.expectAttach().ofReceiver().respond();
          peer.expectFlow().withLinkCredit(10);
          for (int i = 0; i < 10; ++i) {
             peer.remoteTransfer().withDeliveryId(i)
                                  .withMore(false)
                                  .withMessageFormat(0)
                                  .withPayload(payload).queue();
          }
          peer.start();

          URI remoteURI = peer.getServerURI();

          LOG.info("Test started, peer listening on: {}", remoteURI);

          Client container = Client.create();
          Connection connection = container.connect(remoteURI.getHost(), remoteURI.getPort());

          StreamReceiverOptions options = new StreamReceiverOptions();
          options.autoAccept(autoAccept);
          options.creditWindow(10);

          StreamReceiver receiver = connection.openStreamReceiver("test-receiver", options);

          Wait.waitFor(() -> receiver.queuedDeliveries() == 10);

          peer.waitForScriptToComplete();
          if (autoAccept)
          {
              peer.expectDisposition();
              peer.expectDisposition();
          }

          // Consume messages 1 and 2 which should not provoke credit replenishment
          // as there are still 8 outstanding which is above the 70% mark
          assertNotNull(receiver.receive()); // #1
          assertNotNull(receiver.receive()); // #2

          peer.waitForScriptToComplete();
          if (autoAccept)
          {
              peer.expectDisposition();
          }
          peer.expectFlow().withLinkCredit(3);

          // Now consume message 3 which will trip the replenish barrier and the
          // credit should be updated to reflect that we still have 7 queued
          assertNotNull(receiver.receive());  // #3

          peer.waitForScriptToComplete();
          if (autoAccept)
          {
              peer.expectDisposition();
              peer.expectDisposition();
          }

          // Consume messages 4 and 5 which should not provoke credit replenishment
          // as there are still 5 outstanding plus the credit we sent last time
          // which is above the 70% mark
          assertNotNull(receiver.receive()); // #4
          assertNotNull(receiver.receive()); // #5

          peer.waitForScriptToComplete();
          if (autoAccept)
          {
              peer.expectDisposition();
          }
          peer.expectFlow().withLinkCredit(6);

          // Consume number 6 which means we only have 4 outstanding plus the three
          // that we sent last time we flowed which is 70% of possible prefetch so
          // we should flow to top off credit which would be 6 since we have four
          // still pending
          assertNotNull(receiver.receive()); // #6

          peer.waitForScriptToComplete();
          if (autoAccept)
          {
              peer.expectDisposition();
              peer.expectDisposition();
          }

          // Consume deliveries 7 and 8 which should not flow as we should be
          // above the threshold of 70% since we would now have 2 outstanding
          // and 6 credits on the link
          assertNotNull(receiver.receive()); // #7
          assertNotNull(receiver.receive()); // #8

          peer.waitForScriptToComplete();
          if (autoAccept)
          {
              peer.expectDisposition();
              peer.expectDisposition();
          }

          // Now consume 9 and 10 but we still shouldn't flow more credit because
          // the link credit is above the 50% mark for overall credit windowing.
          assertNotNull(receiver.receive()); // #9
          assertNotNull(receiver.receive()); // #10

          peer.waitForScriptToComplete();

          peer.expectClose().respond();
          connection.close();

          peer.waitForScriptToComplete();
       }
    }

    private byte[] createInvalidHeaderEncoding() {
        final byte[] buffer = new byte[12];

        buffer[0] = 0; // Described Type Indicator
        buffer[1] = EncodingCodes.SMALLULONG;
        buffer[2] = Header.DESCRIPTOR_CODE.byteValue();
        buffer[3] = EncodingCodes.MAP32; // Should be list based

        return buffer;
    }

    private byte[] createInvalidDeliveryAnnotationsEncoding() {
        final byte[] buffer = new byte[12];

        buffer[0] = 0; // Described Type Indicator
        buffer[1] = EncodingCodes.SMALLULONG;
        buffer[2] = DeliveryAnnotations.DESCRIPTOR_CODE.byteValue();
        buffer[3] = EncodingCodes.LIST32; // Should be Map based

        return buffer;
    }

    private byte[] createInvalidMessageAnnotationsEncoding() {
        final byte[] buffer = new byte[12];

        buffer[0] = 0; // Described Type Indicator
        buffer[1] = EncodingCodes.SMALLULONG;
        buffer[2] = MessageAnnotations.DESCRIPTOR_CODE.byteValue();
        buffer[3] = EncodingCodes.LIST32; // Should be Map based

        return buffer;
    }

    private byte[] createInvalidPropertiesEncoding() {
        final byte[] buffer = new byte[12];

        buffer[0] = 0; // Described Type Indicator
        buffer[1] = EncodingCodes.SMALLULONG;
        buffer[2] = Properties.DESCRIPTOR_CODE.byteValue();
        buffer[3] = EncodingCodes.MAP32; // Should be list based

        return buffer;
    }

    private byte[] createInvalidApplicationPropertiesEncoding() {
        final byte[] buffer = new byte[12];

        buffer[0] = 0; // Described Type Indicator
        buffer[1] = EncodingCodes.SMALLULONG;
        buffer[2] = ApplicationProperties.DESCRIPTOR_CODE.byteValue();
        buffer[3] = EncodingCodes.LIST32; // Should be map based

        return buffer;
    }
}
