blob: 196a6249647ad9f14e53cdc77380dc58003f7b02 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.qpid.protonj2.client.impl;
import static org.hamcrest.CoreMatchers.nullValue;
import static org.junit.jupiter.api.Assertions.assertArrayEquals;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertNotNull;
import static org.junit.jupiter.api.Assertions.assertNotSame;
import static org.junit.jupiter.api.Assertions.assertNull;
import static org.junit.jupiter.api.Assertions.assertSame;
import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.junit.jupiter.api.Assertions.fail;
import java.io.IOException;
import java.io.InputStream;
import java.net.URI;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Random;
import java.util.UUID;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import org.apache.qpid.protonj2.client.Client;
import org.apache.qpid.protonj2.client.Connection;
import org.apache.qpid.protonj2.client.ConnectionOptions;
import org.apache.qpid.protonj2.client.DeliveryState;
import org.apache.qpid.protonj2.client.ErrorCondition;
import org.apache.qpid.protonj2.client.Receiver;
import org.apache.qpid.protonj2.client.ReceiverOptions;
import org.apache.qpid.protonj2.client.SenderOptions;
import org.apache.qpid.protonj2.client.StreamDelivery;
import org.apache.qpid.protonj2.client.StreamReceiver;
import org.apache.qpid.protonj2.client.StreamReceiverMessage;
import org.apache.qpid.protonj2.client.StreamReceiverOptions;
import org.apache.qpid.protonj2.client.exceptions.ClientDeliveryAbortedException;
import org.apache.qpid.protonj2.client.exceptions.ClientException;
import org.apache.qpid.protonj2.client.exceptions.ClientIllegalStateException;
import org.apache.qpid.protonj2.client.exceptions.ClientLinkRemotelyClosedException;
import org.apache.qpid.protonj2.client.exceptions.ClientOperationTimedOutException;
import org.apache.qpid.protonj2.client.exceptions.ClientUnsupportedOperationException;
import org.apache.qpid.protonj2.client.test.ImperativeClientTestCase;
import org.apache.qpid.protonj2.client.test.Wait;
import org.apache.qpid.protonj2.codec.EncodingCodes;
import org.apache.qpid.protonj2.test.driver.ProtonTestServer;
import org.apache.qpid.protonj2.test.driver.codec.messaging.Accepted;
import org.apache.qpid.protonj2.test.driver.codec.messaging.DeliveryAnnotations;
import org.apache.qpid.protonj2.types.Binary;
import org.apache.qpid.protonj2.types.Symbol;
import org.apache.qpid.protonj2.types.UnsignedInteger;
import org.apache.qpid.protonj2.types.messaging.AmqpSequence;
import org.apache.qpid.protonj2.types.messaging.AmqpValue;
import org.apache.qpid.protonj2.types.messaging.ApplicationProperties;
import org.apache.qpid.protonj2.types.messaging.Data;
import org.apache.qpid.protonj2.types.messaging.Footer;
import org.apache.qpid.protonj2.types.messaging.Header;
import org.apache.qpid.protonj2.types.messaging.MessageAnnotations;
import org.apache.qpid.protonj2.types.messaging.Properties;
import org.apache.qpid.protonj2.types.transport.Role;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.Timeout;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* Tests the {@link StreamReceiver} implementation
*/
@Timeout(20)
class StreamReceiverTest extends ImperativeClientTestCase {
private static final Logger LOG = LoggerFactory.getLogger(StreamReceiverTest.class);
@Test
public void testCreateReceiverAndClose() throws Exception {
doTestCreateReceiverAndCloseOrDetachLink(true);
}
@Test
public void testCreateReceiverAndDetach() throws Exception {
doTestCreateReceiverAndCloseOrDetachLink(false);
}
private void doTestCreateReceiverAndCloseOrDetachLink(boolean close) throws Exception {
try (ProtonTestServer peer = new ProtonTestServer()) {
peer.expectSASLAnonymousConnect();
peer.expectOpen().respond();
peer.expectBegin().respond();
peer.expectAttach().ofReceiver().respond();
peer.expectFlow().withLinkCredit(10);
peer.expectDetach().withClosed(close).respond();
peer.expectEnd().respond();
peer.expectClose().respond();
peer.start();
URI remoteURI = peer.getServerURI();
LOG.info("Test started, peer listening on: {}", remoteURI);
Client container = Client.create();
Connection connection = container.connect(remoteURI.getHost(), remoteURI.getPort());
StreamReceiver receiver = connection.openStreamReceiver("test-queue");
receiver.openFuture().get(10, TimeUnit.SECONDS);
assertSame(container, receiver.client());
assertSame(connection, receiver.connection());
if (close) {
receiver.closeAsync().get(10, TimeUnit.SECONDS);
} else {
receiver.detachAsync().get(10, TimeUnit.SECONDS);
}
connection.closeAsync().get(10, TimeUnit.SECONDS);
peer.waitForScriptToComplete(5, TimeUnit.SECONDS);
}
}
@Test
public void testCreateReceiverAndCloseSync() throws Exception {
doTestCreateReceiverAndCloseOrDetachSyncLink(true);
}
@Test
public void testCreateReceiverAndDetachSync() throws Exception {
doTestCreateReceiverAndCloseOrDetachSyncLink(false);
}
private void doTestCreateReceiverAndCloseOrDetachSyncLink(boolean close) throws Exception {
try (ProtonTestServer peer = new ProtonTestServer()) {
peer.expectSASLAnonymousConnect();
peer.expectOpen().respond();
peer.expectBegin().respond();
peer.expectAttach().ofReceiver().respond();
peer.expectFlow().withLinkCredit(10);
peer.expectDetach().withClosed(close).respond();
peer.expectEnd().respond();
peer.expectClose().respond();
peer.start();
URI remoteURI = peer.getServerURI();
LOG.info("Test started, peer listening on: {}", remoteURI);
Client container = Client.create();
Connection connection = container.connect(remoteURI.getHost(), remoteURI.getPort());
StreamReceiver receiver = connection.openStreamReceiver("test-queue");
receiver.openFuture().get(10, TimeUnit.SECONDS);
if (close) {
receiver.close();
} else {
receiver.detach();
}
connection.closeAsync().get(10, TimeUnit.SECONDS);
peer.waitForScriptToComplete(5, TimeUnit.SECONDS);
}
}
@Test
public void testCreateReceiverAndCloseWithErrorSync() throws Exception {
doTestCreateReceiverAndCloseOrDetachWithErrorSync(true);
}
@Test
public void testCreateReceiverAndDetachWithErrorSync() throws Exception {
doTestCreateReceiverAndCloseOrDetachWithErrorSync(false);
}
private void doTestCreateReceiverAndCloseOrDetachWithErrorSync(boolean close) throws Exception {
try (ProtonTestServer peer = new ProtonTestServer()) {
peer.expectSASLAnonymousConnect();
peer.expectOpen().respond();
peer.expectBegin().respond();
peer.expectAttach().ofReceiver().respond();
peer.expectFlow();
peer.expectDetach().withError("amqp-resource-deleted", "an error message").withClosed(close).respond();
peer.expectEnd().respond();
peer.expectClose().respond();
peer.start();
URI remoteURI = peer.getServerURI();
LOG.info("Sender test started, peer listening on: {}", remoteURI);
Client container = Client.create();
Connection connection = container.connect(remoteURI.getHost(), remoteURI.getPort());
StreamReceiver receiver = connection.openStreamReceiver("test-queue");
receiver.openFuture().get(10, TimeUnit.SECONDS);
if (close) {
receiver.close(ErrorCondition.create("amqp-resource-deleted", "an error message", null));
} else {
receiver.detach(ErrorCondition.create("amqp-resource-deleted", "an error message", null));
}
connection.closeAsync().get(10, TimeUnit.SECONDS);
peer.waitForScriptToComplete(5, TimeUnit.SECONDS);
}
}
@Test
public void testCreateReceiverAndCloseWithErrorAsync() throws Exception {
doTestCreateReceiverAndCloseOrDetachWithErrorAsync(true);
}
@Test
public void testCreateReceiverAndDetachWithErrorAsync() throws Exception {
doTestCreateReceiverAndCloseOrDetachWithErrorAsync(false);
}
private void doTestCreateReceiverAndCloseOrDetachWithErrorAsync(boolean close) throws Exception {
try (ProtonTestServer peer = new ProtonTestServer()) {
peer.expectSASLAnonymousConnect();
peer.expectOpen().respond();
peer.expectBegin().respond();
peer.expectAttach().ofReceiver().respond();
peer.expectFlow();
peer.expectDetach().withError("amqp-resource-deleted", "an error message").withClosed(close).respond();
peer.expectEnd().respond();
peer.expectClose().respond();
peer.start();
URI remoteURI = peer.getServerURI();
LOG.info("Sender test started, peer listening on: {}", remoteURI);
Client container = Client.create();
Connection connection = container.connect(remoteURI.getHost(), remoteURI.getPort());
StreamReceiver receiver = connection.openStreamReceiver("test-queue");
receiver.openFuture().get(10, TimeUnit.SECONDS);
if (close) {
receiver.closeAsync(ErrorCondition.create("amqp-resource-deleted", "an error message", null)).get();
} else {
receiver.detachAsync(ErrorCondition.create("amqp-resource-deleted", "an error message", null)).get();
}
connection.closeAsync().get(10, TimeUnit.SECONDS);
peer.waitForScriptToComplete(5, TimeUnit.SECONDS);
}
}
@Test
public void testStreamReceiverConfiguresSessionCapacity_1() throws Exception {
// Read buffer is always halved by connection when creating new session for the stream
doTestStreamReceiverSessionCapacity(100_000, 200_000, 1);
}
@Test
public void testStreamReceiverConfiguresSessionCapacity_2() throws Exception {
// Read buffer is always halved by connection when creating new session for the stream
doTestStreamReceiverSessionCapacity(100_000, 400_000, 2);
}
@Test
public void testStreamReceiverConfiguresSessionCapacity_3() throws Exception {
// Read buffer is always halved by connection when creating new session for the stream
doTestStreamReceiverSessionCapacity(100_000, 600_000, 3);
}
@Test
public void testStreamReceiverConfiguresSessionCapacityIdenticalToMaxFrameSize() throws Exception {
// Read buffer is always halved by connection when creating new session for the stream
// unless it falls at the max frame size value which means only one is possible, in this
// case the user configured session window the same as than max frame size so only one
// frame is possible.
doTestStreamReceiverSessionCapacity(100_000, 100_000, 1);
}
@Test
public void testStreamReceiverConfiguresSessionCapacityLowerThanMaxFrameSize() throws Exception {
// Read buffer is always halved by connection when creating new session for the stream
// unless it falls at the max frame size value which means only one is possible, in this
// case the user configured session window lower than max frame size and the client auto
// adjusts that to one frame.
doTestStreamReceiverSessionCapacity(100_000, 50_000, 1);
}
private void doTestStreamReceiverSessionCapacity(int maxFrameSize, int readBufferSize, int expectedSessionWindow) throws Exception {
try (ProtonTestServer peer = new ProtonTestServer()) {
peer.expectSASLAnonymousConnect();
peer.expectOpen().withMaxFrameSize(maxFrameSize).respond();
peer.expectBegin().withIncomingWindow(expectedSessionWindow).respond();
peer.expectAttach().ofReceiver().respond();
peer.expectFlow().withIncomingWindow(expectedSessionWindow);
peer.expectDetach().respond();
peer.expectEnd().respond();
peer.expectClose().respond();
peer.start();
URI remoteURI = peer.getServerURI();
LOG.info("Test started, peer listening on: {}", remoteURI);
Client container = Client.create();
ConnectionOptions connectionOptions = new ConnectionOptions().maxFrameSize(maxFrameSize);
Connection connection = container.connect(remoteURI.getHost(), remoteURI.getPort(), connectionOptions);
StreamReceiverOptions streamOptions = new StreamReceiverOptions().readBufferSize(readBufferSize);
StreamReceiver receiver = connection.openStreamReceiver("test-queue", streamOptions);
receiver.openFuture().get();
receiver.closeAsync().get();
connection.closeAsync().get();
peer.waitForScriptToComplete(5, TimeUnit.SECONDS);
}
}
@Test
public void testOpenStreamReceiverWithLinCapabilities() throws Exception {
try (ProtonTestServer peer = new ProtonTestServer()) {
peer.expectSASLAnonymousConnect();
peer.expectOpen().respond();
peer.expectBegin().respond();
peer.expectAttach().withRole(Role.RECEIVER.getValue())
.withSource().withCapabilities("queue")
.withDistributionMode(nullValue())
.and().respond();
peer.expectFlow();
peer.expectDetach().respond();
peer.expectEnd().respond();
peer.expectClose().respond();
peer.start();
URI remoteURI = peer.getServerURI();
LOG.info("StreamReceiver test started, peer listening on: {}", remoteURI);
Client container = Client.create();
Connection connection = container.connect(remoteURI.getHost(), remoteURI.getPort());
StreamReceiverOptions receiverOptions = new StreamReceiverOptions();
receiverOptions.sourceOptions().capabilities("queue");
StreamReceiver receiver = connection.openStreamReceiver("test-queue", receiverOptions);
receiver.openFuture().get();
assertSame(container, receiver.client());
assertSame(connection, receiver.connection());
receiver.close();
connection.closeAsync().get(10, TimeUnit.SECONDS);
peer.waitForScriptToComplete(5, TimeUnit.SECONDS);
}
}
@Test
public void testCreateStreamDeliveryWithoutAnyIncomingDeliveryPresent() throws Exception {
try (ProtonTestServer peer = new ProtonTestServer()) {
peer.expectSASLAnonymousConnect();
peer.expectOpen().respond();
peer.expectBegin().respond();
peer.expectAttach().withRole(Role.RECEIVER.getValue()).respond();
peer.expectFlow();
peer.start();
URI remoteURI = peer.getServerURI();
LOG.info("Test started, peer listening on: {}", remoteURI);
Client container = Client.create();
Connection connection = container.connect(remoteURI.getHost(), remoteURI.getPort());
StreamReceiver receiver = connection.openStreamReceiver("test-queue");
StreamDelivery delivery = receiver.receive(5, TimeUnit.MILLISECONDS);
assertNull(delivery);
peer.waitForScriptToComplete(5, TimeUnit.SECONDS);
peer.expectDetach().respond();
peer.expectEnd().respond();
peer.expectClose().respond();
receiver.close();
connection.close();
peer.waitForScriptToComplete(5, TimeUnit.SECONDS);
}
}
@Test
public void testStreamReceiverAwaitTimedCanBePerformedMultipleTimes() throws Exception {
try (ProtonTestServer peer = new ProtonTestServer()) {
peer.expectSASLAnonymousConnect();
peer.expectOpen().respond();
peer.expectBegin().respond();
peer.expectAttach().withRole(Role.RECEIVER.getValue()).respond();
peer.expectFlow();
peer.start();
URI remoteURI = peer.getServerURI();
LOG.info("Test started, peer listening on: {}", remoteURI);
Client container = Client.create();
Connection connection = container.connect(remoteURI.getHost(), remoteURI.getPort());
StreamReceiver receiver = connection.openStreamReceiver("test-queue");
peer.waitForScriptToComplete(5, TimeUnit.SECONDS);
assertNull(receiver.receive(3, TimeUnit.MILLISECONDS));
assertNull(receiver.receive(3, TimeUnit.MILLISECONDS));
assertNull(receiver.receive(3, TimeUnit.MILLISECONDS));
peer.expectDetach().respond();
peer.expectEnd().respond();
peer.expectClose().respond();
receiver.close();
connection.closeAsync().get();
peer.waitForScriptToComplete(5, TimeUnit.SECONDS);
}
}
@Test
public void testReceiveFailsWhenLinkRemotelyClosed() throws Exception {
doTestReceiveFailsWhenLinkRemotelyClose(false);
}
@Test
public void testTimedReceiveFailsWhenLinkRemotelyClosed() throws Exception {
doTestReceiveFailsWhenLinkRemotelyClose(true);
}
private void doTestReceiveFailsWhenLinkRemotelyClose(boolean timed) throws Exception {
try (ProtonTestServer peer = new ProtonTestServer()) {
peer.expectSASLAnonymousConnect();
peer.expectOpen().respond();
peer.expectBegin().respond();
peer.expectAttach().withRole(Role.RECEIVER.getValue()).respond();
peer.expectFlow();
peer.start();
URI remoteURI = peer.getServerURI();
LOG.info("Test started, peer listening on: {}", remoteURI);
Client container = Client.create();
Connection connection = container.connect(remoteURI.getHost(), remoteURI.getPort());
StreamReceiver receiver = connection.openStreamReceiver("test-queue");
peer.waitForScriptToComplete(5, TimeUnit.SECONDS);
peer.expectDetach();
peer.expectEnd().respond();
peer.expectClose().respond();
peer.remoteDetach().later(50);
if (timed) {
assertThrows(ClientLinkRemotelyClosedException.class, () -> receiver.receive(1, TimeUnit.MINUTES));
} else {
assertThrows(ClientLinkRemotelyClosedException.class, () -> receiver.receive());
}
receiver.closeAsync();
connection.closeAsync().get();
peer.waitForScriptToComplete(5, TimeUnit.SECONDS);
}
}
@Test
public void testStreamDeliveryUsesUnsettledDeliveryOnOpen() throws Exception {
final byte[] payload = createEncodedMessage(new AmqpValue<>("Hello World"));
try (ProtonTestServer peer = new ProtonTestServer()) {
peer.expectSASLAnonymousConnect();
peer.expectOpen().respond();
peer.expectBegin().respond();
peer.expectAttach().withRole(Role.RECEIVER.getValue()).respond();
peer.expectFlow();
peer.remoteTransfer().withHandle(0)
.withDeliveryId(0)
.withDeliveryTag(new byte[] { 1 })
.withMore(false)
.withMessageFormat(0)
.withPayload(payload).queue();
peer.remoteDisposition().withRole(Role.SENDER.getValue())
.withFirst(0)
.withSettled(true)
.withState(Accepted.getInstance()).queue();
peer.start();
URI remoteURI = peer.getServerURI();
LOG.info("Test started, peer listening on: {}", remoteURI);
final Client container = Client.create();
final Connection connection = container.connect(remoteURI.getHost(), remoteURI.getPort());
final StreamReceiverOptions options = new StreamReceiverOptions().autoAccept(false);
final StreamReceiver receiver = connection.openStreamReceiver("test-queue", options);
peer.waitForScriptToComplete(5, TimeUnit.SECONDS);
final StreamDelivery delivery = receiver.receive();
Wait.assertTrue("Should eventually be remotely settled", delivery::remoteSettled);
Wait.assertTrue(() -> { return delivery.remoteState() == DeliveryState.accepted(); });
peer.expectDetach().respond();
peer.expectEnd().respond();
peer.expectClose().respond();
receiver.close();
connection.close();
peer.waitForScriptToComplete(5, TimeUnit.SECONDS);
}
}
@Test
public void testStreamDeliveryReceiveWithTransferAlreadyComplete() throws Exception {
doTestStreamDeliveryReceiveWithTransferAlreadyComplete(false);
}
@Test
public void testStreamDeliveryTryReceiveWithTransferAlreadyComplete() throws Exception {
doTestStreamDeliveryReceiveWithTransferAlreadyComplete(true);
}
private void doTestStreamDeliveryReceiveWithTransferAlreadyComplete(boolean tryReceive) throws Exception {
final byte[] payload = createEncodedMessage(new AmqpValue<>("Hello World"));
try (ProtonTestServer peer = new ProtonTestServer()) {
peer.expectSASLAnonymousConnect();
peer.expectOpen().respond();
peer.expectBegin().respond();
peer.expectAttach().ofReceiver().respond();
peer.expectFlow();
peer.remoteTransfer().withHandle(0)
.withDeliveryId(0)
.withDeliveryTag(new byte[] { 1 })
.withMore(false)
.withMessageFormat(0)
.withPayload(payload).queue();
peer.start();
URI remoteURI = peer.getServerURI();
LOG.info("Test started, peer listening on: {}", remoteURI);
final Client container = Client.create();
final Connection connection = container.connect(remoteURI.getHost(), remoteURI.getPort());
final StreamReceiver receiver = connection.openStreamReceiver("test-queue");
peer.waitForScriptToComplete(5, TimeUnit.SECONDS);
peer.expectBegin().respond();
peer.expectAttach().ofSender().respond();
peer.expectDisposition().withState().accepted().withSettled(true);
// Ensures that stream receiver has the delivery in its queue.
connection.openSender("test-sender").openFuture().get();
final StreamDelivery delivery;
if (tryReceive) {
delivery = receiver.tryReceive();
} else {
delivery = receiver.receive();
}
assertNotNull(delivery);
assertTrue(delivery.completed());
assertFalse(delivery.aborted());
peer.expectDetach().respond();
peer.expectEnd().respond();
peer.expectClose().respond();
receiver.close();
connection.closeAsync().get();
peer.waitForScriptToComplete(5, TimeUnit.SECONDS);
}
}
@Test
public void testStreamDeliveryReceivedWhileTransferIsIncomplete() throws Exception {
final byte[] payload = createEncodedMessage(new AmqpValue<>("Hello World"));
try (ProtonTestServer peer = new ProtonTestServer()) {
peer.expectSASLAnonymousConnect();
peer.expectOpen().respond();
peer.expectBegin().respond();
peer.expectAttach().withRole(Role.RECEIVER.getValue()).respond();
peer.expectFlow();
peer.remoteTransfer().withHandle(0)
.withDeliveryId(0)
.withDeliveryTag(new byte[] { 1 })
.withMore(true)
.withMessageFormat(0)
.withPayload(payload).queue();
peer.start();
URI remoteURI = peer.getServerURI();
LOG.info("Test started, peer listening on: {}", remoteURI);
final Client container = Client.create();
final Connection connection = container.connect(remoteURI.getHost(), remoteURI.getPort());
final StreamReceiver receiver = connection.openStreamReceiver("test-queue");
final StreamDelivery delivery = receiver.receive();
assertNotNull(delivery);
assertFalse(delivery.completed());
peer.waitForScriptToComplete(5, TimeUnit.SECONDS);
peer.expectDisposition().withSettled(true);
peer.remoteTransfer().withHandle(0)
.withDeliveryId(0)
.withNullDeliveryTag()
.withMore(false)
.withMessageFormat(0)
.withPayload(payload).now();
Wait.assertTrue("Should eventually be marked as completed", delivery::completed);
peer.expectDetach().respond();
peer.expectEnd().respond();
peer.expectClose().respond();
receiver.closeAsync().get();
connection.closeAsync().get();
peer.waitForScriptToComplete(5, TimeUnit.SECONDS);
}
}
@Test
public void testStreamDeliveryRawInputStreamWithCompleteDeliveryReadByte() throws Exception {
final byte[] payload = createEncodedMessage(new AmqpValue<>("Hello World"));
try (ProtonTestServer peer = new ProtonTestServer()) {
peer.expectSASLAnonymousConnect();
peer.expectOpen().respond();
peer.expectBegin().respond();
peer.expectAttach().withRole(Role.RECEIVER.getValue()).respond();
peer.expectFlow();
peer.remoteTransfer().withHandle(0)
.withDeliveryId(0)
.withDeliveryTag(new byte[] { 1 })
.withMore(false)
.withMessageFormat(0)
.withPayload(payload).queue();
peer.expectDisposition().withState().accepted().withSettled(true);
peer.start();
URI remoteURI = peer.getServerURI();
LOG.info("Test started, peer listening on: {}", remoteURI);
final Client container = Client.create();
final Connection connection = container.connect(remoteURI.getHost(), remoteURI.getPort());
final StreamReceiver receiver = connection.openStreamReceiver("test-queue");
final StreamDelivery delivery = receiver.receive();
assertNotNull(delivery);
assertTrue(delivery.completed());
assertFalse(delivery.aborted());
final InputStream stream = delivery.rawInputStream();
assertNotNull(stream);
assertEquals(payload.length, stream.available());
final byte[] deliveryBytes = new byte[payload.length];
for (int i = 0; i < payload.length; ++i) {
deliveryBytes[i] = (byte) stream.read();
}
assertArrayEquals(payload, deliveryBytes);
assertEquals(0, stream.available());
assertEquals(-1, stream.read());
stream.close();
peer.expectDetach().respond();
peer.expectEnd().respond();
peer.expectClose().respond();
receiver.close();
connection.closeAsync().get();
peer.waitForScriptToComplete(5, TimeUnit.SECONDS);
}
}
@Test
public void testStreamDeliveryRawInputStreamBehaviorAfterStreamClosed() throws Exception {
final byte[] payload = createEncodedMessage(new AmqpValue<>("Hello World"));
try (ProtonTestServer peer = new ProtonTestServer()) {
peer.expectSASLAnonymousConnect();
peer.expectOpen().respond();
peer.expectBegin().respond();
peer.expectAttach().withRole(Role.RECEIVER.getValue()).respond();
peer.expectFlow();
peer.remoteTransfer().withHandle(0)
.withDeliveryId(0)
.withDeliveryTag(new byte[] { 1 })
.withMore(false)
.withMessageFormat(0)
.withPayload(payload).queue();
peer.expectDisposition().withState().accepted().withSettled(true);
peer.start();
URI remoteURI = peer.getServerURI();
LOG.info("Test started, peer listening on: {}", remoteURI);
final Client container = Client.create();
final Connection connection = container.connect(remoteURI.getHost(), remoteURI.getPort());
final StreamReceiver receiver = connection.openStreamReceiver("test-queue");
final StreamDelivery delivery = receiver.receive();
assertNotNull(delivery);
assertTrue(delivery.completed());
assertFalse(delivery.aborted());
final InputStream stream = delivery.rawInputStream();
assertNotNull(stream);
stream.close();
final byte[] scratch = new byte[10];
assertThrows(IOException.class, () -> stream.available());
assertThrows(IOException.class, () -> stream.skip(1));
assertThrows(IOException.class, () -> stream.read());
assertThrows(IOException.class, () -> stream.read(scratch));
assertThrows(IOException.class, () -> stream.read(scratch, 0, scratch.length));
peer.expectDetach().respond();
peer.expectEnd().respond();
peer.expectClose().respond();
receiver.close();
connection.closeAsync().get();
peer.waitForScriptToComplete(5, TimeUnit.SECONDS);
}
}
@Test
public void testStreamDeliveryRawInputStreamWithCompleteDeliveryReadBytes() throws Exception {
final byte[] payload = createEncodedMessage(new AmqpValue<>("Hello World"));
try (ProtonTestServer peer = new ProtonTestServer()) {
peer.expectSASLAnonymousConnect();
peer.expectOpen().respond();
peer.expectBegin().respond();
peer.expectAttach().withRole(Role.RECEIVER.getValue()).respond();
peer.expectFlow();
peer.remoteTransfer().withHandle(0)
.withDeliveryId(0)
.withDeliveryTag(new byte[] { 1 })
.withMore(false)
.withMessageFormat(0)
.withPayload(payload).queue();
peer.expectDisposition().withState().accepted().withSettled(true);
peer.start();
URI remoteURI = peer.getServerURI();
LOG.info("Test started, peer listening on: {}", remoteURI);
final Client container = Client.create();
final Connection connection = container.connect(remoteURI.getHost(), remoteURI.getPort());
final StreamReceiver receiver = connection.openStreamReceiver("test-queue");
final StreamDelivery delivery = receiver.receive();
assertNotNull(delivery);
assertTrue(delivery.completed());
assertFalse(delivery.aborted());
final InputStream stream = delivery.rawInputStream();
assertNotNull(stream);
assertEquals(payload.length, stream.available());
final byte[] deliveryBytes = new byte[payload.length];
stream.read(deliveryBytes);
assertArrayEquals(payload, deliveryBytes);
assertEquals(0, stream.available());
assertEquals(-1, stream.read());
stream.close();
peer.expectDetach().respond();
peer.expectEnd().respond();
peer.expectClose().respond();
receiver.close();
connection.closeAsync().get();
peer.waitForScriptToComplete(5, TimeUnit.SECONDS);
}
}
@Test
public void testStreamDeliveryRawInputStreamWithInCompleteDeliveryReadBytes() throws Exception {
final byte[] payload1 = createEncodedMessage(new Data(new byte[] { 0, 1, 2, 3, 4, 5 }));
final byte[] payload2 = createEncodedMessage(new Data(new byte[] { 6, 7, 8, 9, 0 ,1 }));
try (ProtonTestServer peer = new ProtonTestServer()) {
peer.expectSASLAnonymousConnect();
peer.expectOpen().respond();
peer.expectBegin().respond();
peer.expectAttach().withRole(Role.RECEIVER.getValue()).respond();
peer.expectFlow();
peer.remoteTransfer().withHandle(0)
.withDeliveryId(0)
.withDeliveryTag(new byte[] { 1 })
.withMore(true)
.withMessageFormat(0)
.withPayload(payload1).queue();
peer.expectDisposition().withState().accepted().withSettled(true);
peer.start();
URI remoteURI = peer.getServerURI();
LOG.info("Test started, peer listening on: {}", remoteURI);
final Client container = Client.create();
final Connection connection = container.connect(remoteURI.getHost(), remoteURI.getPort());
final StreamReceiver receiver = connection.openStreamReceiver("test-queue");
final StreamDelivery delivery = receiver.receive();
assertNotNull(delivery);
assertFalse(delivery.completed());
assertFalse(delivery.aborted());
final InputStream stream = delivery.rawInputStream();
assertNotNull(stream);
assertEquals(payload1.length, stream.available());
final byte[] deliveryBytes1 = new byte[payload1.length];
stream.read(deliveryBytes1);
assertArrayEquals(payload1, deliveryBytes1);
assertEquals(0, stream.available());
peer.remoteTransfer().withHandle(0)
.withDeliveryId(0)
.withDeliveryTag(new byte[] { 1 })
.withMore(false)
.withPayload(payload2).later(50);
// Should block until more data arrives.
final byte[] deliveryBytes2 = new byte[payload2.length];
stream.read(deliveryBytes2);
assertArrayEquals(payload2, deliveryBytes2);
assertEquals(0, stream.available());
assertTrue(delivery.completed());
assertFalse(delivery.aborted());
stream.close();
peer.expectDetach().respond();
peer.expectEnd().respond();
peer.expectClose().respond();
receiver.close();
connection.close();
peer.waitForScriptToComplete(5, TimeUnit.SECONDS);
}
}
@Test
public void testStreamDeliveryRawInputStreamReadBytesSignalsEOFOnEmptyCompleteTransfer() throws Exception {
final byte[] payload1 = createEncodedMessage(new Data(new byte[] { 0, 1, 2, 3, 4, 5 }));
try (ProtonTestServer peer = new ProtonTestServer()) {
peer.expectSASLAnonymousConnect();
peer.expectOpen().respond();
peer.expectBegin().respond();
peer.expectAttach().withRole(Role.RECEIVER.getValue()).respond();
peer.expectFlow();
peer.remoteTransfer().withHandle(0)
.withDeliveryId(0)
.withDeliveryTag(new byte[] { 1 })
.withMore(true)
.withMessageFormat(0)
.withPayload(payload1).queue();
peer.expectDisposition().withState().accepted().withSettled(true);
peer.start();
URI remoteURI = peer.getServerURI();
LOG.info("Test started, peer listening on: {}", remoteURI);
final Client container = Client.create();
final Connection connection = container.connect(remoteURI.getHost(), remoteURI.getPort());
final StreamReceiver receiver = connection.openStreamReceiver("test-queue");
final StreamDelivery delivery = receiver.receive();
assertNotNull(delivery);
assertFalse(delivery.completed());
assertFalse(delivery.aborted());
final InputStream stream = delivery.rawInputStream();
assertNotNull(stream);
assertEquals(payload1.length, stream.available());
final byte[] deliveryBytes1 = new byte[payload1.length];
stream.read(deliveryBytes1);
assertArrayEquals(payload1, deliveryBytes1);
assertEquals(0, stream.available());
peer.remoteTransfer().withHandle(0)
.withDeliveryId(0)
.withDeliveryTag(new byte[] { 1 })
.withMore(false)
.later(50);
// Should block until more data arrives.
final byte[] deliveryBytes2 = new byte[payload1.length];
assertEquals(-1, stream.read(deliveryBytes2));
assertEquals(0, stream.available());
assertTrue(delivery.completed());
assertFalse(delivery.aborted());
stream.close();
peer.expectDetach().respond();
peer.expectEnd().respond();
peer.expectClose().respond();
receiver.close();
connection.closeAsync().get();
peer.waitForScriptToComplete(5, TimeUnit.SECONDS);
}
}
@Test
public void testStreamDeliveryRawInputStreamWithInCompleteDeliverySkipBytes() throws Exception {
final byte[] payload1 = createEncodedMessage(new Data(new byte[] { 0, 1, 2, 3, 4, 5 }));
final byte[] payload2 = createEncodedMessage(new Data(new byte[] { 6, 7, 8, 9, 0 ,1 }));
try (ProtonTestServer peer = new ProtonTestServer()) {
peer.expectSASLAnonymousConnect();
peer.expectOpen().respond();
peer.expectBegin().respond();
peer.expectAttach().withRole(Role.RECEIVER.getValue()).respond();
peer.expectFlow();
peer.remoteTransfer().withHandle(0)
.withDeliveryId(0)
.withDeliveryTag(new byte[] { 1 })
.withMore(true)
.withMessageFormat(0)
.withPayload(payload1).queue();
peer.expectDisposition().withState().accepted().withSettled(true);
peer.start();
URI remoteURI = peer.getServerURI();
LOG.info("Test started, peer listening on: {}", remoteURI);
final Client container = Client.create();
final Connection connection = container.connect(remoteURI.getHost(), remoteURI.getPort());
final StreamReceiver receiver = connection.openStreamReceiver("test-queue");
final StreamDelivery delivery = receiver.receive();
assertNotNull(delivery);
assertFalse(delivery.completed());
assertFalse(delivery.aborted());
final InputStream stream = delivery.rawInputStream();
assertNotNull(stream);
assertEquals(payload1.length, stream.available());
stream.skip(payload1.length);
assertEquals(0, stream.available());
peer.remoteTransfer().withHandle(0)
.withDeliveryId(0)
.withDeliveryTag(new byte[] { 1 })
.withMore(false)
.withPayload(payload2).later(50);
// Should block until more data arrives.
stream.skip(payload2.length);
assertEquals(0, stream.available());
assertTrue(delivery.completed());
assertFalse(delivery.aborted());
stream.close();
peer.expectDetach().respond();
peer.expectEnd().respond();
peer.expectClose().respond();
receiver.close();
connection.close();
peer.waitForScriptToComplete(5, TimeUnit.SECONDS);
}
}
@Test
public void testStreamDeliveryRawInputStreamReadOpensSessionWindowForAdditionalInput() throws Exception {
final byte[] body1 = new byte[] { 0, 1, 2, 3, 4, 5, 6, 7, 8, 9 };
final byte[] body2 = new byte[] { 10, 9, 8, 7, 6, 5, 4, 3, 2, 1, 0 };
final byte[] payload1 = createEncodedMessage(new Data(body1));
final byte[] payload2 = createEncodedMessage(new Data(body2));
try (ProtonTestServer peer = new ProtonTestServer()) {
peer.expectSASLAnonymousConnect();
peer.expectOpen().withMaxFrameSize(1000).respond();
peer.expectBegin().withIncomingWindow(1).respond();
peer.expectAttach().ofReceiver().respond();
peer.expectFlow().withIncomingWindow(1).withLinkCredit(10);
peer.remoteTransfer().withHandle(0)
.withDeliveryId(0)
.withDeliveryTag(new byte[] { 1 })
.withMore(true)
.withMessageFormat(0)
.withPayload(payload1).queue();
peer.start();
URI remoteURI = peer.getServerURI();
LOG.info("Test started, peer listening on: {}", remoteURI);
Client container = Client.create();
ConnectionOptions connectionOptions = new ConnectionOptions().maxFrameSize(1000);
Connection connection = container.connect(remoteURI.getHost(), remoteURI.getPort(), connectionOptions);
StreamReceiverOptions streamOptions = new StreamReceiverOptions().readBufferSize(2000);
StreamReceiver receiver = connection.openStreamReceiver("test-queue", streamOptions);
StreamDelivery delivery = receiver.receive();
assertNotNull(delivery);
InputStream rawStream = delivery.rawInputStream();
assertNotNull(rawStream);
// An initial frame has arrived but more than that is requested so the first chuck is pulled
// from the incoming delivery and the session window opens which allows the second chunk to
// arrive and again the session window will be opened as that chunk is moved to the reader's
// buffer for return from the read request.
peer.waitForScriptToComplete(5, TimeUnit.SECONDS);
peer.expectFlow().withDeliveryCount(0).withIncomingWindow(1).withLinkCredit(10);
peer.remoteTransfer().withHandle(0)
.withDeliveryId(0)
.withMore(false)
.withMessageFormat(0)
.withPayload(payload2).queue();
peer.expectFlow().withDeliveryCount(1).withIncomingWindow(1).withLinkCredit(9);
peer.expectDisposition().withFirst(0).withState().accepted().withSettled(true);
byte[] combinedPayloads = new byte[payload1.length + payload2.length];
rawStream.read(combinedPayloads);
assertTrue(Arrays.equals(payload1, 0, payload1.length, combinedPayloads, 0, payload1.length));
assertTrue(Arrays.equals(payload2, 0, payload2.length, combinedPayloads, payload1.length, payload1.length + payload2.length));
rawStream.close();
peer.waitForScriptToComplete(5, TimeUnit.SECONDS);
peer.expectDetach().respond();
peer.expectEnd().respond();
peer.expectClose().respond();
receiver.openFuture().get();
receiver.closeAsync().get();
connection.closeAsync().get();
peer.waitForScriptToComplete(5, TimeUnit.SECONDS);
}
}
@Test
public void testStreamDeliveryRawInputStreamBlockedReadBytesAborted() throws Exception {
final byte[] payload = createEncodedMessage(new AmqpValue<>("Hello World"));
try (ProtonTestServer peer = new ProtonTestServer()) {
peer.expectSASLAnonymousConnect();
peer.expectOpen().respond();
peer.expectBegin().respond();
peer.expectAttach().withRole(Role.RECEIVER.getValue()).respond();
peer.expectFlow();
peer.remoteTransfer().withHandle(0)
.withDeliveryId(0)
.withDeliveryTag(new byte[] { 1 })
.withMore(true)
.withMessageFormat(0)
.withPayload(payload).queue();
peer.start();
URI remoteURI = peer.getServerURI();
LOG.info("Test started, peer listening on: {}", remoteURI);
final Client container = Client.create();
final Connection connection = container.connect(remoteURI.getHost(), remoteURI.getPort());
final StreamReceiver receiver = connection.openStreamReceiver("test-queue");
final StreamDelivery delivery = receiver.receive();
assertNotNull(delivery);
assertFalse(delivery.completed());
assertFalse(delivery.aborted());
final InputStream stream = delivery.rawInputStream();
assertNotNull(stream);
assertEquals(payload.length, stream.available());
final byte[] deliveryBytes = new byte[payload.length * 2];
peer.remoteTransfer().withHandle(0)
.withDeliveryId(0)
.withDeliveryTag(new byte[] { 1 })
.withMore(false)
.withAborted(true)
.withMessageFormat(0).later(50);
try {
stream.read(deliveryBytes);
fail("Delivery should have been aborted while waiting for more data.");
} catch (IOException ioe) {
assertTrue(ioe.getCause() instanceof ClientDeliveryAbortedException);
}
stream.close();
peer.expectDetach().respond();
peer.expectEnd().respond();
peer.expectClose().respond();
receiver.closeAsync().get();
connection.closeAsync().get();
peer.waitForScriptToComplete(5, TimeUnit.SECONDS);
}
}
@Test
public void testStreamDeliveryRawInputStreamClosedWithoutReadsConsumesTransfers() throws Exception {
final byte[] body1 = new byte[] { 0, 1, 2, 3, 4, 5, 6, 7, 8, 9 };
final byte[] body2 = new byte[] { 10, 9, 8, 7, 6, 5, 4, 3, 2, 1, 0 };
final byte[] payload1 = createEncodedMessage(new Data(body1));
final byte[] payload2 = createEncodedMessage(new Data(body2));
try (ProtonTestServer peer = new ProtonTestServer()) {
peer.expectSASLAnonymousConnect();
peer.expectOpen().withMaxFrameSize(1000).respond();
peer.expectBegin().withIncomingWindow(1).respond();
peer.expectAttach().ofReceiver().respond();
peer.expectFlow().withIncomingWindow(1).withLinkCredit(10);
peer.remoteTransfer().withHandle(0)
.withDeliveryId(0)
.withDeliveryTag(new byte[] { 1 })
.withMore(true)
.withMessageFormat(0)
.withPayload(payload1).queue();
peer.start();
URI remoteURI = peer.getServerURI();
LOG.info("Test started, peer listening on: {}", remoteURI);
Client container = Client.create();
ConnectionOptions connectionOptions = new ConnectionOptions().maxFrameSize(1000);
Connection connection = container.connect(remoteURI.getHost(), remoteURI.getPort(), connectionOptions);
StreamReceiverOptions streamOptions = new StreamReceiverOptions().readBufferSize(2000);
StreamReceiver receiver = connection.openStreamReceiver("test-queue", streamOptions);
StreamDelivery delivery = receiver.receive();
assertNotNull(delivery);
InputStream rawStream = delivery.rawInputStream();
assertNotNull(rawStream);
// An initial frame has arrived but no reads have been performed and then if closed
// the delivery will be consumed to allow the session window to be opened and prevent
// a stall due to an un-consumed delivery.
peer.waitForScriptToComplete(5, TimeUnit.SECONDS);
peer.expectFlow().withDeliveryCount(0).withIncomingWindow(1).withLinkCredit(10);
peer.remoteTransfer().withHandle(0)
.withDeliveryId(0)
.withMore(false)
.withMessageFormat(0)
.withPayload(payload2).queue();
peer.expectFlow().withDeliveryCount(1).withIncomingWindow(1).withLinkCredit(9);
peer.expectDisposition().withSettled(true).withState().accepted();
rawStream.close();
peer.waitForScriptToComplete(5, TimeUnit.SECONDS);
peer.expectDetach().respond();
peer.expectEnd().respond();
peer.expectClose().respond();
receiver.openFuture().get();
receiver.closeAsync().get();
connection.closeAsync().get();
peer.waitForScriptToComplete(5, TimeUnit.SECONDS);
}
}
@Test
public void testStreamDeliveryRawInputStreamClosedWithoutReadsAllowsUserDisposition() throws Exception {
final byte[] body1 = new byte[] { 0, 1, 2, 3, 4, 5, 6, 7, 8, 9 };
final byte[] body2 = new byte[] { 10, 9, 8, 7, 6, 5, 4, 3, 2, 1, 0 };
final byte[] payload1 = createEncodedMessage(new Data(body1));
final byte[] payload2 = createEncodedMessage(new Data(body2));
try (ProtonTestServer peer = new ProtonTestServer()) {
peer.expectSASLAnonymousConnect();
peer.expectOpen().withMaxFrameSize(1000).respond();
peer.expectBegin().withIncomingWindow(1).respond();
peer.expectAttach().ofReceiver().respond();
peer.expectFlow().withIncomingWindow(1).withLinkCredit(10);
peer.remoteTransfer().withHandle(0)
.withDeliveryId(0)
.withDeliveryTag(new byte[] { 1 })
.withMore(true)
.withMessageFormat(0)
.withPayload(payload1).queue();
peer.start();
URI remoteURI = peer.getServerURI();
LOG.info("Test started, peer listening on: {}", remoteURI);
Client container = Client.create();
ConnectionOptions connectionOptions = new ConnectionOptions().maxFrameSize(1000);
Connection connection = container.connect(remoteURI.getHost(), remoteURI.getPort(), connectionOptions);
StreamReceiverOptions streamOptions = new StreamReceiverOptions().readBufferSize(2000).autoAccept(false);
StreamReceiver receiver = connection.openStreamReceiver("test-queue", streamOptions);
StreamDelivery delivery = receiver.receive();
assertNotNull(delivery);
InputStream rawStream = delivery.rawInputStream();
assertNotNull(rawStream);
// An initial frame has arrived but no reads have been performed and then if closed
// the delivery will be consumed to allow the session window to be opened and prevent
// a stall due to an un-consumed delivery. The stream delivery will not auto accept
// or auto settle the delivery as the user closed early which should indicate they
// are rejecting the message otherwise it is a programming error on their part.
peer.waitForScriptToComplete(5, TimeUnit.SECONDS);
peer.expectFlow().withDeliveryCount(0).withIncomingWindow(1).withLinkCredit(10);
peer.remoteTransfer().withHandle(0)
.withDeliveryId(0)
.withMore(false)
.withMessageFormat(0)
.withPayload(payload2).queue();
peer.expectFlow().withDeliveryCount(1).withIncomingWindow(1).withLinkCredit(9);
rawStream.close();
peer.waitForScriptToComplete(5, TimeUnit.SECONDS);
peer.expectDisposition().withState().rejected("invalid-format", "decode error").withSettled(true);
peer.expectDetach().respond();
peer.expectEnd().respond();
peer.expectClose().respond();
delivery.disposition(new ClientDeliveryState.ClientRejected("invalid-format", "decode error"), true);
receiver.openFuture().get();
receiver.closeAsync().get();
connection.closeAsync().get();
peer.waitForScriptToComplete(5, TimeUnit.SECONDS);
}
}
@Test
public void testStreamDeliveryUserAppliedDispositionBeforeStreamRead() throws Exception {
final byte[] payload = createEncodedMessage(new AmqpValue<>("Hello World"));
try (ProtonTestServer peer = new ProtonTestServer()) {
peer.expectSASLAnonymousConnect();
peer.expectOpen().respond();
peer.expectBegin().respond();
peer.expectAttach().withRole(Role.RECEIVER.getValue()).respond();
peer.expectFlow();
peer.remoteTransfer().withHandle(0)
.withDeliveryId(0)
.withDeliveryTag(new byte[] { 1 })
.withMore(false)
.withMessageFormat(0)
.withPayload(payload).queue();
peer.expectDisposition().withState().accepted().withSettled(true);
peer.start();
URI remoteURI = peer.getServerURI();
LOG.info("Test started, peer listening on: {}", remoteURI);
final Client container = Client.create();
final Connection connection = container.connect(remoteURI.getHost(), remoteURI.getPort());
final StreamReceiver receiver = connection.openStreamReceiver("test-queue");
final StreamDelivery delivery = receiver.receive();
assertNotNull(delivery);
assertTrue(delivery.completed());
assertFalse(delivery.aborted());
delivery.disposition(ClientDeliveryState.ClientAccepted.getInstance(), true);
peer.waitForScriptToComplete(5, TimeUnit.SECONDS);
final InputStream stream = delivery.rawInputStream();
assertNotNull(stream);
assertEquals(payload.length, stream.available());
final byte[] deliveryBytes = new byte[payload.length];
for (int i = 0; i < payload.length; ++i) {
deliveryBytes[i] = (byte) stream.read();
}
assertArrayEquals(payload, deliveryBytes);
assertEquals(0, stream.available());
assertEquals(-1, stream.read());
stream.close();
peer.expectDetach().respond();
peer.expectEnd().respond();
peer.expectClose().respond();
receiver.close();
connection.closeAsync().get();
peer.waitForScriptToComplete(5, TimeUnit.SECONDS);
}
}
@Test
public void testStreamSupportsMark() throws Exception {
final byte[] payload = createEncodedMessage(new Data(new byte[] { 0, 1, 2, 3, 4, 5 }));
try (ProtonTestServer peer = new ProtonTestServer()) {
peer.expectSASLAnonymousConnect();
peer.expectOpen().respond();
peer.expectBegin().respond();
peer.expectAttach().withRole(Role.RECEIVER.getValue()).respond();
peer.expectFlow();
peer.remoteTransfer().withHandle(0)
.withDeliveryId(0)
.withDeliveryTag(new byte[] { 1 })
.withMore(false)
.withMessageFormat(0)
.withPayload(payload).queue();
peer.expectDisposition().withState().accepted().withSettled(true);
peer.start();
URI remoteURI = peer.getServerURI();
LOG.info("Test started, peer listening on: {}", remoteURI);
final Client container = Client.create();
final Connection connection = container.connect(remoteURI.getHost(), remoteURI.getPort());
final StreamReceiver receiver = connection.openStreamReceiver("test-queue");
final StreamDelivery delivery = receiver.receive();
assertNotNull(delivery);
assertTrue(delivery.completed());
assertFalse(delivery.aborted());
final InputStream stream = delivery.rawInputStream();
assertNotNull(stream);
assertTrue(stream.markSupported());
assertEquals(payload.length, stream.available());
stream.mark(payload.length);
final byte[] deliveryBytes1 = new byte[payload.length];
final byte[] deliveryBytes2 = new byte[payload.length];
stream.read(deliveryBytes1);
stream.reset();
stream.read(deliveryBytes2);
assertNotSame(deliveryBytes1, deliveryBytes2);
assertArrayEquals(payload, deliveryBytes1);
assertArrayEquals(payload, deliveryBytes2);
assertEquals(0, stream.available());
assertTrue(delivery.completed());
assertFalse(delivery.aborted());
stream.close();
peer.expectDetach().respond();
peer.expectEnd().respond();
peer.expectClose().respond();
receiver.close();
connection.close();
peer.waitForScriptToComplete(5, TimeUnit.SECONDS);
}
}
@Test
public void testStreamMessageWithHeaderOnly() throws Exception {
final byte[] payload = createEncodedMessage(new Header().setDurable(true));
try (ProtonTestServer peer = new ProtonTestServer()) {
peer.expectSASLAnonymousConnect();
peer.expectOpen().respond();
peer.expectBegin().respond();
peer.expectAttach().withRole(Role.RECEIVER.getValue()).respond();
peer.expectFlow();
peer.remoteTransfer().withHandle(0)
.withDeliveryId(0)
.withDeliveryTag(new byte[] { 1 })
.withMore(false)
.withMessageFormat(0)
.withPayload(payload).queue();
peer.expectDisposition().withState().accepted().withSettled(true);
peer.start();
URI remoteURI = peer.getServerURI();
LOG.info("Test started, peer listening on: {}", remoteURI);
final Client container = Client.create();
final Connection connection = container.connect(remoteURI.getHost(), remoteURI.getPort());
final StreamReceiver receiver = connection.openStreamReceiver("test-queue");
final StreamDelivery delivery = receiver.receive();
assertNotNull(delivery);
assertTrue(delivery.completed());
assertFalse(delivery.aborted());
StreamReceiverMessage message = delivery.message();
assertNotNull(message);
Header header = message.header();
assertNotNull(header);
assertSame(receiver, message.receiver());
assertSame(delivery, message.delivery());
assertNull(message.properties());
assertNull(message.annotations());
assertNull(message.applicationProperties());
assertNull(message.footer());
assertTrue(message.completed());
peer.expectDetach().respond();
peer.expectEnd().respond();
peer.expectClose().respond();
receiver.close();
connection.closeAsync().get();
peer.waitForScriptToComplete(5, TimeUnit.SECONDS);
}
}
@Test
public void testReadHeaderFromStreamMessageWithoutHeaderSection() throws Exception {
Map<Symbol, Object> annotationsMap = new HashMap<>();
annotationsMap.put(Symbol.valueOf("test-1"), UUID.randomUUID());
annotationsMap.put(Symbol.valueOf("test-2"), UUID.randomUUID());
annotationsMap.put(Symbol.valueOf("test-3"), UUID.randomUUID());
final byte[] payload = createEncodedMessage(new MessageAnnotations(annotationsMap));
try (ProtonTestServer peer = new ProtonTestServer()) {
peer.expectSASLAnonymousConnect();
peer.expectOpen().respond();
peer.expectBegin().respond();
peer.expectAttach().withRole(Role.RECEIVER.getValue()).respond();
peer.expectFlow();
peer.remoteTransfer().withHandle(0)
.withDeliveryId(0)
.withDeliveryTag(new byte[] { 1 })
.withMore(false)
.withMessageFormat(0)
.withPayload(payload).queue();
peer.expectDisposition().withSettled(true).withState().accepted();
peer.start();
URI remoteURI = peer.getServerURI();
LOG.info("Test started, peer listening on: {}", remoteURI);
final Client container = Client.create();
final Connection connection = container.connect(remoteURI.getHost(), remoteURI.getPort());
final StreamReceiver receiver = connection.openStreamReceiver("test-queue");
final StreamDelivery delivery = receiver.receive();
assertNotNull(delivery);
assertTrue(delivery.completed());
assertFalse(delivery.aborted());
StreamReceiverMessage message = delivery.message();
assertNotNull(message);
Header header = message.header();
assertNull(header);
MessageAnnotations annotations = message.annotations();
assertNotNull(annotations);
assertEquals(annotationsMap, annotations.getValue());
peer.expectDetach().respond();
peer.expectEnd().respond();
peer.expectClose().respond();
receiver.closeAsync().get();
connection.closeAsync().get();
peer.waitForScriptToComplete(5, TimeUnit.SECONDS);
}
}
@Test
public void testTryReadSectionBeyondWhatIsEncodedIntoMessage() throws Exception {
Map<Symbol, Object> annotationsMap = new HashMap<>();
annotationsMap.put(Symbol.valueOf("test-1"), UUID.randomUUID());
annotationsMap.put(Symbol.valueOf("test-2"), UUID.randomUUID());
annotationsMap.put(Symbol.valueOf("test-3"), UUID.randomUUID());
final byte[] payload = createEncodedMessage(new Header(), new MessageAnnotations(annotationsMap));
try (ProtonTestServer peer = new ProtonTestServer()) {
peer.expectSASLAnonymousConnect();
peer.expectOpen().respond();
peer.expectBegin().respond();
peer.expectAttach().withRole(Role.RECEIVER.getValue()).respond();
peer.expectFlow();
peer.remoteTransfer().withHandle(0)
.withDeliveryId(0)
.withDeliveryTag(new byte[] { 1 })
.withMore(false)
.withMessageFormat(0)
.withPayload(payload).queue();
peer.expectDisposition().withFirst(0).withState().accepted().withSettled(true);
peer.start();
URI remoteURI = peer.getServerURI();
LOG.info("Test started, peer listening on: {}", remoteURI);
final Client container = Client.create();
final Connection connection = container.connect(remoteURI.getHost(), remoteURI.getPort());
final StreamReceiver receiver = connection.openStreamReceiver("test-queue");
final StreamDelivery delivery = receiver.receive();
assertNotNull(delivery);
assertTrue(delivery.completed());
assertFalse(delivery.aborted());
StreamReceiverMessage message = delivery.message();
assertNotNull(message);
Properties properties = message.properties();
assertNull(properties);
Header header = message.header();
assertNotNull(header);
MessageAnnotations annotations = message.annotations();
assertNotNull(annotations);
assertEquals(annotationsMap, annotations.getValue());
peer.expectDetach().respond();
peer.expectEnd().respond();
peer.expectClose().respond();
receiver.closeAsync().get();
connection.closeAsync().get();
peer.waitForScriptToComplete(5, TimeUnit.SECONDS);
}
}
@Test
public void testReadBytesFromBodyInputStreamUsingReadByteAPI() throws Exception {
final byte[] body = new byte[] { 0, 1, 2, 3, 4, 5, 6, 7, 8, 9 };
final byte[] payload = createEncodedMessage(new Data(body));
try (ProtonTestServer peer = new ProtonTestServer()) {
peer.expectSASLAnonymousConnect();
peer.expectOpen().respond();
peer.expectBegin().respond();
peer.expectAttach().withRole(Role.RECEIVER.getValue()).respond();
peer.expectFlow();
peer.remoteTransfer().withHandle(0)
.withDeliveryId(0)
.withDeliveryTag(new byte[] { 1 })
.withMore(false)
.withMessageFormat(0)
.withPayload(payload).queue();
peer.expectDisposition().withFirst(0).withState().accepted().withSettled(true);
peer.start();
URI remoteURI = peer.getServerURI();
LOG.info("Test started, peer listening on: {}", remoteURI);
final Client container = Client.create();
final Connection connection = container.connect(remoteURI.getHost(), remoteURI.getPort());
final StreamReceiver receiver = connection.openStreamReceiver("test-queue");
final StreamDelivery delivery = receiver.receive();
assertNotNull(delivery);
assertTrue(delivery.completed());
assertFalse(delivery.aborted());
StreamReceiverMessage message = delivery.message();
assertNotNull(message);
InputStream bodyStream = message.body();
assertNotNull(bodyStream);
assertNull(message.header());
assertNull(message.annotations());
assertNull(message.properties());
assertNull(delivery.annotations());
final byte[] receivedBody = new byte[body.length];
for (int i = 0; i < body.length; ++i) {
receivedBody[i] = (byte) bodyStream.read();
}
assertArrayEquals(body, receivedBody);
assertEquals(-1, bodyStream.read());
assertNull(message.footer());
peer.expectDetach().respond();
peer.expectEnd().respond();
peer.expectClose().respond();
receiver.closeAsync().get();
connection.closeAsync().get();
peer.waitForScriptToComplete(5, TimeUnit.SECONDS);
}
}
@Test
public void testReadBytesFromInputStreamUsingReadByteWithSingleByteSplitTransfers() throws Exception {
testReadBytesFromBodyInputStreamWithSplitSingleByteTransfers(1);
}
@Test
public void testReadBytesFromInputStreamUsingSingleReadBytesWithSingleByteSplitTransfers() throws Exception {
testReadBytesFromBodyInputStreamWithSplitSingleByteTransfers(2);
}
@Test
public void testSkipBytesFromInputStreamWithSingleByteSplitTransfers() throws Exception {
testReadBytesFromBodyInputStreamWithSplitSingleByteTransfers(3);
}
private void testReadBytesFromBodyInputStreamWithSplitSingleByteTransfers(int option) throws Exception {
final byte[] body = new byte[] { 0, 1, 2, 3, 4, 5, 6, 7, 8, 9 };
final byte[] payload = createEncodedMessage(new Data(body));
try (ProtonTestServer peer = new ProtonTestServer()) {
peer.expectSASLAnonymousConnect();
peer.expectOpen().respond();
peer.expectBegin().respond();
peer.expectAttach().withRole(Role.RECEIVER.getValue()).respond();
peer.expectFlow();
for (int i = 0; i < payload.length; ++i) {
peer.remoteTransfer().withHandle(0)
.withDeliveryId(0)
.withDeliveryTag(new byte[] { 1 })
.withMore(true)
.withMessageFormat(0)
.withPayload(new byte[] { payload[i] }).afterDelay(3).queue();
}
peer.remoteTransfer().withHandle(0)
.withDeliveryId(0)
.withDeliveryTag(new byte[] { 1 })
.withMore(false)
.withMessageFormat(0).afterDelay(5).queue();
peer.expectDisposition().withFirst(0).withSettled(true);
peer.start();
URI remoteURI = peer.getServerURI();
LOG.info("Test started, peer listening on: {}", remoteURI);
final Client container = Client.create();
final Connection connection = container.connect(remoteURI.getHost(), remoteURI.getPort());
final StreamReceiver receiver = connection.openStreamReceiver("test-queue");
final StreamDelivery delivery = receiver.receive();
final StreamReceiverMessage message = delivery.message();
final InputStream bodyStream = message.body();
final byte[] receivedBody = new byte[body.length];
if (option == 1) {
for (int i = 0; i < body.length; ++i) {
receivedBody[i] = (byte) bodyStream.read();
}
assertArrayEquals(body, receivedBody);
} else if (option == 2) {
assertEquals(body.length, bodyStream.read(receivedBody));
assertArrayEquals(body, receivedBody);
} else if (option == 3) {
assertEquals(body.length, bodyStream.skip(body.length));
} else {
fail("Unknown test option");
}
bodyStream.close();
peer.expectDetach().respond();
peer.expectEnd().respond();
peer.expectClose().respond();
receiver.close();
connection.closeAsync().get();
peer.waitForScriptToComplete(5, TimeUnit.SECONDS);
}
}
@Test
public void testStreamReceiverSessionCannotCreateNewResources() throws Exception {
try (ProtonTestServer peer = new ProtonTestServer()) {
peer.expectSASLAnonymousConnect();
peer.expectOpen().respond();
peer.expectBegin().respond();
peer.expectAttach().withRole(Role.RECEIVER.getValue()).respond();
peer.expectFlow();
peer.start();
URI remoteURI = peer.getServerURI();
LOG.info("Test started, peer listening on: {}", remoteURI);
Client container = Client.create();
Connection connection = container.connect(remoteURI.getHost(), remoteURI.getPort());
StreamReceiver receiver = connection.openStreamReceiver("test-queue");
assertThrows(ClientUnsupportedOperationException.class, () -> receiver.session().openReceiver("test"));
assertThrows(ClientUnsupportedOperationException.class, () -> receiver.session().openReceiver("test", new ReceiverOptions()));
assertThrows(ClientUnsupportedOperationException.class, () -> receiver.session().openDurableReceiver("test", "test"));
assertThrows(ClientUnsupportedOperationException.class, () -> receiver.session().openDurableReceiver("test", "test", new ReceiverOptions()));
assertThrows(ClientUnsupportedOperationException.class, () -> receiver.session().openDynamicReceiver());
assertThrows(ClientUnsupportedOperationException.class, () -> receiver.session().openDynamicReceiver(new HashMap<>()));
assertThrows(ClientUnsupportedOperationException.class, () -> receiver.session().openDynamicReceiver(new ReceiverOptions()));
assertThrows(ClientUnsupportedOperationException.class, () -> receiver.session().openDynamicReceiver(new HashMap<>(), new ReceiverOptions()));
assertThrows(ClientUnsupportedOperationException.class, () -> receiver.session().openSender("test"));
assertThrows(ClientUnsupportedOperationException.class, () -> receiver.session().openSender("test", new SenderOptions()));
assertThrows(ClientUnsupportedOperationException.class, () -> receiver.session().openAnonymousSender());
assertThrows(ClientUnsupportedOperationException.class, () -> receiver.session().openAnonymousSender(new SenderOptions()));
peer.waitForScriptToComplete(5, TimeUnit.SECONDS);
peer.expectDetach().respond();
peer.expectEnd().respond();
peer.expectClose().respond();
receiver.close();
connection.close();
peer.waitForScriptToComplete(5, TimeUnit.SECONDS);
}
}
@Test
public void testReadByteArrayPayloadInChunksFromSingleTransferMessage() throws Exception {
testReadPayloadInChunksFromLargerMessage(false);
}
@Test
public void testReadBytesWithArgsPayloadInChunksFromSingleTransferMessage() throws Exception {
testReadPayloadInChunksFromLargerMessage(true);
}
private void testReadPayloadInChunksFromLargerMessage(boolean readWithArgs) throws Exception {
final byte[] body = new byte[100];
final Random random = new Random();
random.setSeed(System.currentTimeMillis());
random.nextBytes(body);
final byte[] payload = createEncodedMessage(new Data(body));
try (ProtonTestServer peer = new ProtonTestServer()) {
peer.expectSASLAnonymousConnect();
peer.expectOpen().respond();
peer.expectBegin().respond();
peer.expectAttach().withRole(Role.RECEIVER.getValue()).respond();
peer.expectFlow();
peer.remoteTransfer().withHandle(0)
.withDeliveryId(0)
.withDeliveryTag(new byte[] { 1 })
.withMore(false)
.withMessageFormat(0)
.withPayload(payload).queue();
peer.expectDisposition().withFirst(0).withState().accepted().withSettled(true);
peer.start();
URI remoteURI = peer.getServerURI();
LOG.info("Test started, peer listening on: {}", remoteURI);
final Client container = Client.create();
final Connection connection = container.connect(remoteURI.getHost(), remoteURI.getPort());
final StreamReceiver receiver = connection.openStreamReceiver("test-queue");
final StreamDelivery delivery = receiver.receive();
assertNotNull(delivery);
assertTrue(delivery.completed());
assertFalse(delivery.aborted());
assertEquals(0, delivery.messageFormat());
StreamReceiverMessage message = delivery.message();
assertNotNull(message);
InputStream bodyStream = message.body();
assertNotNull(bodyStream);
assertThrows(ClientUnsupportedOperationException.class, () -> message.messageFormat(1));
assertNull(message.header());
assertNull(message.annotations());
assertNull(message.properties());
assertNull(delivery.annotations());
final byte[] aggregateBody = new byte[body.length];
final byte[] receivedBody = new byte[10];
for (int i = 0; i < body.length; i += 10) {
if (readWithArgs) {
bodyStream.read(receivedBody, 0, receivedBody.length);
} else {
bodyStream.read(receivedBody);
}
System.arraycopy(receivedBody, 0, aggregateBody, i, receivedBody.length);
}
assertArrayEquals(body, aggregateBody);
assertEquals(-1, bodyStream.read(receivedBody, 0, receivedBody.length));
assertEquals(-1, bodyStream.read(receivedBody));
assertEquals(-1, bodyStream.read());
assertNull(message.footer());
peer.expectDetach().respond();
peer.expectEnd().respond();
peer.expectClose().respond();
receiver.closeAsync().get();
connection.closeAsync().get();
peer.waitForScriptToComplete(5, TimeUnit.SECONDS);
}
}
@Test
public void testStreamReceiverMessageThrowsOnAnyMessageModificationAPI() throws Exception {
final byte[] body = new byte[] { 0, 1, 2, 3, 4, 5, 6, 7, 8, 9 };
final byte[] payload = createEncodedMessage(new Data(body));
try (ProtonTestServer peer = new ProtonTestServer()) {
peer.expectSASLAnonymousConnect();
peer.expectOpen().respond();
peer.expectBegin().respond();
peer.expectAttach().withRole(Role.RECEIVER.getValue()).respond();
peer.expectFlow();
peer.remoteTransfer().withHandle(0)
.withDeliveryId(0)
.withDeliveryTag(new byte[] { 1 })
.withMore(false)
.withMessageFormat(0)
.withPayload(payload).queue();
peer.expectDisposition().withFirst(0).withState().accepted().withSettled(true);
peer.start();
URI remoteURI = peer.getServerURI();
LOG.info("Test started, peer listening on: {}", remoteURI);
final Client container = Client.create();
final Connection connection = container.connect(remoteURI.getHost(), remoteURI.getPort());
final StreamReceiver receiver = connection.openStreamReceiver("test-queue");
final StreamDelivery delivery = receiver.receive();
final StreamReceiverMessage message = delivery.message();
assertThrows(ClientUnsupportedOperationException.class, () -> message.header(new Header()));
assertThrows(ClientUnsupportedOperationException.class, () -> message.properties(new Properties()));
assertThrows(ClientUnsupportedOperationException.class, () -> message.applicationProperties(new ApplicationProperties(null)));
assertThrows(ClientUnsupportedOperationException.class, () -> message.annotations(new MessageAnnotations(null)));
assertThrows(ClientUnsupportedOperationException.class, () -> message.footer(new Footer(null)));
assertThrows(ClientUnsupportedOperationException.class, () -> message.messageFormat(1));
assertThrows(ClientUnsupportedOperationException.class, () -> message.durable(true));
assertThrows(ClientUnsupportedOperationException.class, () -> message.priority((byte) 4));
assertThrows(ClientUnsupportedOperationException.class, () -> message.timeToLive(128));
assertThrows(ClientUnsupportedOperationException.class, () -> message.firstAcquirer(false));
assertThrows(ClientUnsupportedOperationException.class, () -> message.deliveryCount(10));
assertThrows(ClientUnsupportedOperationException.class, () -> message.messageId(10));
assertThrows(ClientUnsupportedOperationException.class, () -> message.correlationId(10));
assertThrows(ClientUnsupportedOperationException.class, () -> message.userId(new byte[] {1}));
assertThrows(ClientUnsupportedOperationException.class, () -> message.to("test"));
assertThrows(ClientUnsupportedOperationException.class, () -> message.subject("test"));
assertThrows(ClientUnsupportedOperationException.class, () -> message.replyTo("test"));
assertThrows(ClientUnsupportedOperationException.class, () -> message.contentType("test"));
assertThrows(ClientUnsupportedOperationException.class, () -> message.contentEncoding("test"));
assertThrows(ClientUnsupportedOperationException.class, () -> message.absoluteExpiryTime(10));
assertThrows(ClientUnsupportedOperationException.class, () -> message.creationTime(10));
assertThrows(ClientUnsupportedOperationException.class, () -> message.groupId("test"));
assertThrows(ClientUnsupportedOperationException.class, () -> message.groupSequence(10));
assertThrows(ClientUnsupportedOperationException.class, () -> message.replyToGroupId("test"));
assertThrows(ClientUnsupportedOperationException.class, () -> message.annotation("test", 1));
assertThrows(ClientUnsupportedOperationException.class, () -> message.removeAnnotation("test"));
assertThrows(ClientUnsupportedOperationException.class, () -> message.property("test", 1));
assertThrows(ClientUnsupportedOperationException.class, () -> message.removeProperty("test"));
assertThrows(ClientUnsupportedOperationException.class, () -> message.footer("test", 1));
assertThrows(ClientUnsupportedOperationException.class, () -> message.removeFooter("test"));
assertThrows(ClientUnsupportedOperationException.class, () -> message.body(InputStream.nullInputStream()));
assertThrows(ClientUnsupportedOperationException.class, () -> message.addBodySection(new AmqpValue<>("test")));
assertThrows(ClientUnsupportedOperationException.class, () -> message.bodySections(Collections.emptyList()));
assertThrows(ClientUnsupportedOperationException.class, () -> message.bodySections());
assertThrows(ClientUnsupportedOperationException.class, () -> message.clearBodySections());
assertThrows(ClientUnsupportedOperationException.class, () -> message.forEachBodySection((section) -> {}));
assertThrows(ClientUnsupportedOperationException.class, () -> message.encode(Collections.emptyMap()));
InputStream bodyStream = message.body();
assertNotNull(bodyStream.readAllBytes());
bodyStream.close();
peer.expectDetach().respond();
peer.expectEnd().respond();
peer.expectClose().respond();
receiver.closeAsync().get();
connection.closeAsync().get();
peer.waitForScriptToComplete(5, TimeUnit.SECONDS);
}
}
@Test
public void testSkipPayloadInChunksFromSingleTransferMessage() throws Exception {
final byte[] body = new byte[100];
final Random random = new Random();
random.setSeed(System.currentTimeMillis());
random.nextBytes(body);
final byte[] payload = createEncodedMessage(new Data(body));
try (ProtonTestServer peer = new ProtonTestServer()) {
peer.expectSASLAnonymousConnect();
peer.expectOpen().respond();
peer.expectBegin().respond();
peer.expectAttach().withRole(Role.RECEIVER.getValue()).respond();
peer.expectFlow();
peer.remoteTransfer().withHandle(0)
.withDeliveryId(0)
.withDeliveryTag(new byte[] { 1 })
.withMore(false)
.withMessageFormat(0)
.withPayload(payload).queue();
peer.expectDisposition().withFirst(0).withState().accepted().withSettled(true);
peer.start();
URI remoteURI = peer.getServerURI();
LOG.info("Test started, peer listening on: {}", remoteURI);
final Client container = Client.create();
final Connection connection = container.connect(remoteURI.getHost(), remoteURI.getPort());
final StreamReceiver receiver = connection.openStreamReceiver("test-queue");
final StreamDelivery delivery = receiver.receive();
assertNotNull(delivery);
assertTrue(delivery.completed());
assertFalse(delivery.aborted());
StreamReceiverMessage message = delivery.message();
assertNotNull(message);
InputStream bodyStream = message.body();
assertNotNull(bodyStream);
assertNull(message.header());
assertNull(message.annotations());
assertNull(message.properties());
assertNull(delivery.annotations());
final int skipSize = 10;
for (int i = 0; i < body.length; i += skipSize) {
bodyStream.skip(10);
}
final byte[] scratchBuffer = new byte[10];
assertEquals(-1, bodyStream.read(scratchBuffer, 0, scratchBuffer.length));
assertEquals(-1, bodyStream.read(scratchBuffer));
assertEquals(-1, bodyStream.read());
assertNull(message.footer());
peer.expectDetach().respond();
peer.expectEnd().respond();
peer.expectClose().respond();
receiver.closeAsync().get();
connection.closeAsync().get();
peer.waitForScriptToComplete(5, TimeUnit.SECONDS);
}
}
@Test
public void testReadByteArrayPayloadInChunksFromMultipleTransfersMessage() throws Exception {
testReadPayloadInChunksFromLargerMultiTransferMessage(false);
}
@Test
public void testReadBytesWithArgsPayloadInChunksFromMultipleTransferMessage() throws Exception {
testReadPayloadInChunksFromLargerMultiTransferMessage(true);
}
private void testReadPayloadInChunksFromLargerMultiTransferMessage(boolean readWithArgs) throws Exception {
final Random random = new Random();
final long seed = System.currentTimeMillis();
final int numChunks = 4;
final int chunkSize = 30;
random.setSeed(seed);
try (ProtonTestServer peer = new ProtonTestServer()) {
peer.expectSASLAnonymousConnect();
peer.expectOpen().respond();
peer.expectBegin().respond();
peer.expectAttach().withRole(Role.RECEIVER.getValue()).respond();
peer.expectFlow();
for (int i = 0; i < numChunks; ++i) {
final byte[] chunk = new byte[chunkSize];
random.nextBytes(chunk);
peer.remoteTransfer().withHandle(0)
.withDeliveryId(0)
.withDeliveryTag(new byte[] { 1 })
.withMore(true)
.withMessageFormat(0)
.withPayload(createEncodedMessage(new Data(chunk))).queue();
}
peer.remoteTransfer().withHandle(0).withMore(false).queue();
peer.expectDisposition().withFirst(0).withState().accepted().withSettled(true);
peer.start();
URI remoteURI = peer.getServerURI();
LOG.info("Test started, peer listening on: {}", remoteURI);
final Client container = Client.create();
final Connection connection = container.connect(remoteURI.getHost(), remoteURI.getPort());
final StreamReceiver receiver = connection.openStreamReceiver("test-queue");
final StreamDelivery delivery = receiver.receive();
assertNotNull(delivery);
StreamReceiverMessage message = delivery.message();
assertNotNull(message);
InputStream bodyStream = message.body();
assertNotNull(bodyStream);
assertNull(message.header());
assertNull(message.annotations());
assertNull(message.properties());
assertNull(delivery.annotations());
final byte[] readChunk = new byte[chunkSize];
final byte[] receivedBody = new byte[3];
random.setSeed(seed);
int totalBytesRead = 0;
for (int i = 0; i < numChunks; ++i) {
for (int j = 0; j < readChunk.length; j += receivedBody.length) {
int bytesRead = 0;
if (readWithArgs) {
bytesRead = bodyStream.read(receivedBody, 0, receivedBody.length);
} else {
bytesRead = bodyStream.read(receivedBody);
}
totalBytesRead += bytesRead;
System.arraycopy(receivedBody, 0, readChunk, j, bytesRead);
}
final byte[] chunk = new byte[chunkSize];
random.nextBytes(chunk);
assertArrayEquals(chunk, readChunk);
}
assertEquals(chunkSize * numChunks, totalBytesRead);
assertEquals(-1, bodyStream.read(receivedBody, 0, receivedBody.length));
assertEquals(-1, bodyStream.read(receivedBody));
assertEquals(-1, bodyStream.read());
assertNull(message.footer());
peer.expectDetach().respond();
peer.expectEnd().respond();
peer.expectClose().respond();
receiver.closeAsync().get();
connection.closeAsync().get();
peer.waitForScriptToComplete(5, TimeUnit.SECONDS);
}
}
@Test
public void testReadPayloadFromSplitFrameTransferWithBufferLargerThanTotalPayload() throws Exception {
final Random random = new Random();
final long seed = System.currentTimeMillis();
final int numChunks = 4;
final int chunkSize = 30;
random.setSeed(seed);
try (ProtonTestServer peer = new ProtonTestServer()) {
peer.expectSASLAnonymousConnect();
peer.expectOpen().respond();
peer.expectBegin().respond();
peer.expectAttach().withRole(Role.RECEIVER.getValue()).respond();
peer.expectFlow();
for (int i = 0; i < numChunks; ++i) {
final byte[] chunk = new byte[chunkSize];
random.nextBytes(chunk);
peer.remoteTransfer().withHandle(0)
.withDeliveryId(0)
.withDeliveryTag(new byte[] { 1 })
.withMore(true)
.withMessageFormat(0)
.withPayload(createEncodedMessage(new Data(chunk))).queue();
}
peer.remoteTransfer().withHandle(0).withMore(false).queue();
peer.expectDisposition().withFirst(0).withState().accepted().withSettled(true);
peer.start();
URI remoteURI = peer.getServerURI();
LOG.info("Test started, peer listening on: {}", remoteURI);
final Client container = Client.create();
final Connection connection = container.connect(remoteURI.getHost(), remoteURI.getPort());
final StreamReceiver receiver = connection.openStreamReceiver("test-queue");
final StreamDelivery delivery = receiver.receive();
assertNotNull(delivery);
StreamReceiverMessage message = delivery.message();
assertNotNull(message);
InputStream bodyStream = message.body();
assertNotNull(bodyStream);
assertNull(message.header());
assertNull(message.annotations());
assertNull(message.properties());
assertNull(delivery.annotations());
final byte[] receivedBody = new byte[(chunkSize * numChunks) + 100];
Arrays.fill(receivedBody, (byte) 0);
final int totalBytesRead = bodyStream.read(receivedBody);
assertEquals(chunkSize * numChunks, totalBytesRead);
assertEquals(-1, bodyStream.read(receivedBody, 0, receivedBody.length));
assertEquals(-1, bodyStream.read(receivedBody));
assertEquals(-1, bodyStream.read());
assertNull(message.footer());
// Regenerate what should have been sent plus empty trailing section to
// check that the read doesn't write anything into the area we gave beyond
// what was expected payload size.
random.setSeed(seed);
final byte[] regeneratedPayload = new byte[numChunks * chunkSize + 100];
Arrays.fill(regeneratedPayload, (byte) 0);
for (int i = 0; i < numChunks; ++i) {
final byte[] chunk = new byte[chunkSize];
random.nextBytes(chunk);
System.arraycopy(chunk, 0, regeneratedPayload, chunkSize * i, chunkSize);
}
assertArrayEquals(regeneratedPayload, receivedBody);
peer.expectDetach().respond();
peer.expectEnd().respond();
peer.expectClose().respond();
receiver.closeAsync().get();
connection.closeAsync().get();
peer.waitForScriptToComplete(5, TimeUnit.SECONDS);
}
}
@Test
public void testStreamReadOpensSessionWindowForAdditionalInput() throws Exception {
final byte[] body1 = new byte[] { 0, 1, 2, 3, 4, 5, 6, 7, 8, 9 };
final byte[] body2 = new byte[] { 10, 9, 8, 7, 6, 5, 4, 3, 2, 1, 0 };
final byte[] payload1 = createEncodedMessage(new Data(body1));
final byte[] payload2 = createEncodedMessage(new Data(body2));
try (ProtonTestServer peer = new ProtonTestServer()) {
peer.expectSASLAnonymousConnect();
peer.expectOpen().withMaxFrameSize(1000).respond();
peer.expectBegin().withIncomingWindow(1).respond();
peer.expectAttach().ofReceiver().respond();
peer.expectFlow().withIncomingWindow(1).withLinkCredit(10);
peer.remoteTransfer().withHandle(0)
.withDeliveryId(0)
.withDeliveryTag(new byte[] { 1 })
.withMore(true)
.withMessageFormat(0)
.withPayload(payload1).queue();
peer.start();
URI remoteURI = peer.getServerURI();
LOG.info("Test started, peer listening on: {}", remoteURI);
Client container = Client.create();
ConnectionOptions connectionOptions = new ConnectionOptions().maxFrameSize(1000);
Connection connection = container.connect(remoteURI.getHost(), remoteURI.getPort(), connectionOptions);
StreamReceiverOptions streamOptions = new StreamReceiverOptions().readBufferSize(2000);
StreamReceiver receiver = connection.openStreamReceiver("test-queue", streamOptions);
StreamDelivery delivery = receiver.receive();
assertNotNull(delivery);
StreamReceiverMessage message = delivery.message();
assertNotNull(message);
// Creating the input stream instance should read the first chunk of data from the incoming
// delivery which should result in a new credit being available to expand the session window.
// An additional transfer should be placed into the delivery buffer but not yet read since
// the user hasn't read anything.
peer.waitForScriptToComplete(5, TimeUnit.SECONDS);
peer.expectFlow().withDeliveryCount(0).withIncomingWindow(1).withLinkCredit(10);
peer.remoteTransfer().withHandle(0)
.withDeliveryId(0)
.withMore(false)
.withMessageFormat(0)
.withPayload(payload2).queue();
peer.expectDisposition().withFirst(0).withState().accepted().withSettled(true);
InputStream bodyStream = message.body();
assertNotNull(bodyStream);
// Once the read of all data completes the session window should be opened and the
// stream should mark the delivery as accepted and settled since we are in auto settle
// mode and there is nothing more to read.
peer.waitForScriptToComplete(5, TimeUnit.SECONDS);
peer.expectFlow().withDeliveryCount(1).withIncomingWindow(1).withLinkCredit(9);
byte[] combinedPayloads = new byte[body1.length + body2.length];
bodyStream.read(combinedPayloads);
assertTrue(Arrays.equals(body1, 0, body1.length, combinedPayloads, 0, body1.length));
assertTrue(Arrays.equals(body2, 0, body2.length, combinedPayloads, body1.length, body1.length + body2.length));
bodyStream.close();
peer.waitForScriptToComplete(5, TimeUnit.SECONDS);
peer.expectDetach().respond();
peer.expectEnd().respond();
peer.expectClose().respond();
receiver.openFuture().get();
receiver.closeAsync().get();
connection.closeAsync().get();
peer.waitForScriptToComplete(5, TimeUnit.SECONDS);
}
}
@Test
public void testStreamReadOpensSessionWindowForAdditionalInputAndGrantsCreditOnClose() throws Exception {
final byte[] body1 = new byte[] { 0, 1, 2, 3, 4, 5, 6, 7, 8, 9 };
final byte[] body2 = new byte[] { 10, 9, 8, 7, 6, 5, 4, 3, 2, 1, 0 };
final byte[] payload1 = createEncodedMessage(new Data(body1));
final byte[] payload2 = createEncodedMessage(new Data(body2));
try (ProtonTestServer peer = new ProtonTestServer()) {
peer.expectSASLAnonymousConnect();
peer.expectOpen().withMaxFrameSize(1000).respond();
peer.expectBegin().withIncomingWindow(1).respond();
peer.expectAttach().ofReceiver().respond();
peer.expectFlow().withIncomingWindow(1).withLinkCredit(1);
peer.remoteTransfer().withHandle(0)
.withDeliveryId(0)
.withDeliveryTag(new byte[] { 1 })
.withMore(true)
.withMessageFormat(0)
.withPayload(payload1).queue();
peer.start();
URI remoteURI = peer.getServerURI();
LOG.info("Test started, peer listening on: {}", remoteURI);
Client container = Client.create();
ConnectionOptions connectionOptions = new ConnectionOptions().maxFrameSize(1000);
Connection connection = container.connect(remoteURI.getHost(), remoteURI.getPort(), connectionOptions);
StreamReceiverOptions streamOptions = new StreamReceiverOptions().readBufferSize(2000).creditWindow(1);
StreamReceiver receiver = connection.openStreamReceiver("test-queue", streamOptions);
StreamDelivery delivery = receiver.receive();
assertNotNull(delivery);
StreamReceiverMessage message = delivery.message();
assertNotNull(message);
// Creating the input stream instance should read the first chunk of data from the incoming
// delivery which should result in a new credit being available to expand the session window.
// An additional transfer should be placed into the delivery buffer but not yet read since
// the user hasn't read anything. Since we are in auto settle the completed transfer should
// trigger settlement and also open the credit window but the session window should not be
// expanded since we haven't read the data yet.
peer.waitForScriptToComplete(5, TimeUnit.SECONDS);
peer.expectFlow().withDeliveryCount(0).withIncomingWindow(1).withLinkCredit(1);
peer.remoteTransfer().withHandle(0)
.withDeliveryId(0)
.withMore(false)
.withMessageFormat(0)
.withPayload(payload2).queue();
peer.expectDisposition().withSettled(true).withState().accepted();
peer.expectFlow().withDeliveryCount(1).withIncomingWindow(0).withLinkCredit(1);
InputStream bodyStream = message.body();
assertNotNull(bodyStream);
// Once the read of all data completes the session window should be opened
peer.waitForScriptToComplete(5, TimeUnit.SECONDS);
peer.expectFlow().withDeliveryCount(1).withIncomingWindow(1).withLinkCredit(1);
byte[] combinedPayloads = new byte[body1.length + body2.length];
bodyStream.read(combinedPayloads);
peer.waitForScriptToComplete(5, TimeUnit.SECONDS);
// No frames should be triggered by closing the stream since we already auto settled
// and updated the session window on the remote.
assertTrue(Arrays.equals(body1, 0, body1.length, combinedPayloads, 0, body1.length));
assertTrue(Arrays.equals(body2, 0, body2.length, combinedPayloads, body1.length, body1.length + body2.length));
bodyStream.close();
peer.waitForScriptToComplete(5, TimeUnit.SECONDS);
peer.expectDetach().respond();
peer.expectEnd().respond();
peer.expectClose().respond();
receiver.openFuture().get();
receiver.closeAsync().get();
connection.closeAsync().get();
peer.waitForScriptToComplete(5, TimeUnit.SECONDS);
}
}
@Test
public void testStreamReadOfAllPayloadConsumesTrailingFooterOnClose() throws Exception {
final byte[] body1 = new byte[] { 0, 1, 2, 3, 4, 5, 6, 7, 8, 9 };
final byte[] body2 = new byte[] { 10, 9, 8, 7, 6, 5, 4, 3, 2, 1, 0 };
final byte[] payload1 = createEncodedMessage(new Data(body1));
final byte[] payload2 = createEncodedMessage(new Data(body2));
final Footer footers = new Footer(new HashMap<>());
footers.getValue().put(Symbol.valueOf("footer-key"), "test");
final byte[] payload3 = createEncodedMessage(footers);
try (ProtonTestServer peer = new ProtonTestServer()) {
peer.expectSASLAnonymousConnect();
peer.expectOpen().withMaxFrameSize(1000).respond();
peer.expectBegin().withIncomingWindow(1).respond();
peer.expectAttach().ofReceiver().respond();
peer.expectFlow().withIncomingWindow(1).withLinkCredit(10);
peer.remoteTransfer().withHandle(0)
.withDeliveryId(0)
.withDeliveryTag(new byte[] { 1 })
.withMore(true)
.withMessageFormat(0)
.withPayload(payload1).queue();
peer.start();
URI remoteURI = peer.getServerURI();
LOG.info("Test started, peer listening on: {}", remoteURI);
Client container = Client.create();
ConnectionOptions connectionOptions = new ConnectionOptions().maxFrameSize(1000);
Connection connection = container.connect(remoteURI.getHost(), remoteURI.getPort(), connectionOptions);
StreamReceiverOptions streamOptions = new StreamReceiverOptions().readBufferSize(2000).autoAccept(false);
StreamReceiver receiver = connection.openStreamReceiver("test-queue", streamOptions);
StreamDelivery delivery = receiver.receive();
assertNotNull(delivery);
StreamReceiverMessage message = delivery.message();
assertNotNull(message);
// Creating the input stream instance should read the first chunk of data from the incoming
// delivery which should result in a new credit being available to expand the session window.
// An additional transfer should be placed into the delivery buffer but not yet read since
// the user hasn't read anything.
peer.waitForScriptToComplete(5, TimeUnit.SECONDS);
peer.expectFlow().withDeliveryCount(0).withIncomingWindow(1).withLinkCredit(10);
peer.remoteTransfer().withHandle(0)
.withDeliveryId(0)
.withMore(true)
.withMessageFormat(0)
.withPayload(payload2).queue();
InputStream bodyStream = message.body();
assertNotNull(bodyStream);
// Once the read of all data completes the session window should be opened and the
// stream should mark the delivery as accepted and settled since we are in auto settle
// mode and there is nothing more to read.
peer.waitForScriptToComplete(5, TimeUnit.SECONDS);
peer.expectFlow().withDeliveryCount(0).withIncomingWindow(1).withLinkCredit(10);
peer.remoteTransfer().withHandle(0)
.withDeliveryId(0)
.withMore(false)
.withMessageFormat(0)
.withPayload(payload3).queue();
peer.expectFlow().withDeliveryCount(1).withIncomingWindow(1).withLinkCredit(9);
peer.expectDisposition().withFirst(0).withState().accepted().withSettled(true);
byte[] combinedPayloads = new byte[body1.length + body2.length];
bodyStream.read(combinedPayloads);
assertTrue(Arrays.equals(body1, 0, body1.length, combinedPayloads, 0, body1.length));
assertTrue(Arrays.equals(body2, 0, body2.length, combinedPayloads, body1.length, body1.length + body2.length));
bodyStream.close();
Footer footer = message.footer();
assertNotNull(footer);
assertFalse(footer.getValue().isEmpty());
assertTrue(footer.getValue().containsKey(Symbol.valueOf("footer-key")));
assertTrue(message.hasFooters());
assertTrue(message.hasFooter("footer-key"));
message.forEachFooter((key, value) -> {
assertEquals(key, "footer-key");
assertEquals(value, "test");
});
delivery.accept();
peer.waitForScriptToComplete(5, TimeUnit.SECONDS);
peer.expectDetach().respond();
peer.expectEnd().respond();
peer.expectClose().respond();
receiver.openFuture().get();
receiver.closeAsync().get();
connection.closeAsync().get();
peer.waitForScriptToComplete(5, TimeUnit.SECONDS);
}
}
@Test
public void testReadBytesFromBodyInputStreamWithinTransactedSession() throws Exception {
final byte[] body = new byte[] { 0, 1, 2, 3, 4, 5, 6, 7, 8, 9 };
final byte[] payload = createEncodedMessage(new Data(body));
final byte[] txnId = new byte[] { 0, 1, 2, 3 };
try (ProtonTestServer peer = new ProtonTestServer()) {
peer.expectSASLAnonymousConnect();
peer.expectOpen().respond();
peer.expectBegin().respond();
peer.expectAttach().withRole(Role.RECEIVER.getValue()).respond();
peer.expectFlow();
peer.remoteTransfer().withHandle(0)
.withDeliveryId(0)
.withDeliveryTag(new byte[] { 1 })
.withMore(false)
.withMessageFormat(0)
.withPayload(payload).queue();
peer.start();
URI remoteURI = peer.getServerURI();
LOG.info("Test started, peer listening on: {}", remoteURI);
final Client container = Client.create();
final Connection connection = container.connect(remoteURI.getHost(), remoteURI.getPort());
final StreamReceiver receiver = connection.openStreamReceiver("test-queue");
peer.waitForScriptToComplete(5, TimeUnit.SECONDS);
peer.expectCoordinatorAttach().respond();
peer.remoteFlow().withLinkCredit(2).queue();
peer.expectDeclare().accept(txnId);
peer.expectDisposition().withSettled(true).withState().transactional().withTxnId(txnId).withAccepted();
peer.expectDischarge().withFail(false).withTxnId(txnId).accept();
receiver.session().beginTransaction();
final StreamDelivery delivery = receiver.receive();
assertNotNull(delivery);
assertTrue(delivery.completed());
assertFalse(delivery.aborted());
StreamReceiverMessage message = delivery.message();
assertNotNull(message);
InputStream bodyStream = message.body();
assertNotNull(bodyStream);
assertNull(message.header());
assertNull(message.annotations());
assertNull(message.properties());
assertNull(delivery.annotations());
final byte[] receivedBody = new byte[body.length];
for (int i = 0; i < body.length; ++i) {
receivedBody[i] = (byte) bodyStream.read();
}
assertArrayEquals(body, receivedBody);
assertEquals(-1, bodyStream.read());
receiver.session().commitTransaction();
peer.waitForScriptToComplete(5, TimeUnit.SECONDS);
peer.expectDetach().respond();
peer.expectEnd().respond();
peer.expectClose().respond();
receiver.closeAsync().get();
connection.closeAsync().get();
peer.waitForScriptToComplete(5, TimeUnit.SECONDS);
}
}
@Test
public void testStreamDeliveryHandlesInvalidHeaderEncoding() throws Exception {
final byte[] payload = createInvalidHeaderEncoding();
try (ProtonTestServer peer = new ProtonTestServer()) {
peer.expectSASLAnonymousConnect();
peer.expectOpen().respond();
peer.expectBegin().respond();
peer.expectAttach().withRole(Role.RECEIVER.getValue()).respond();
peer.expectFlow();
peer.remoteTransfer().withHandle(0)
.withDeliveryId(0)
.withDeliveryTag(new byte[] { 1 })
.withMore(false)
.withMessageFormat(0)
.withPayload(payload).queue();
peer.start();
URI remoteURI = peer.getServerURI();
LOG.info("Test started, peer listening on: {}", remoteURI);
final Client container = Client.create();
final Connection connection = container.connect(remoteURI.getHost(), remoteURI.getPort());
final StreamReceiverOptions options = new StreamReceiverOptions().autoAccept(false);
final StreamReceiver receiver = connection.openStreamReceiver("test-queue", options);
peer.waitForScriptToComplete(5, TimeUnit.SECONDS);
peer.expectDisposition().withState().rejected("decode-error", "failed reading message header");
final StreamDelivery delivery = receiver.receive();
final StreamReceiverMessage message = delivery.message();
assertThrows(ClientException.class, () -> message.header());
assertThrows(ClientException.class, () -> message.body());
delivery.reject("decode-error", "failed reading message header");
peer.expectDetach().respond();
peer.expectEnd().respond();
peer.expectClose().respond();
receiver.close();
connection.close();
peer.waitForScriptToComplete(5, TimeUnit.SECONDS);
}
}
@Test
public void testStreamDeliveryHandlesInvalidDeliveryAnnotationsEncoding() throws Exception {
final byte[] payload = createInvalidDeliveryAnnotationsEncoding();
try (ProtonTestServer peer = new ProtonTestServer()) {
peer.expectSASLAnonymousConnect();
peer.expectOpen().respond();
peer.expectBegin().respond();
peer.expectAttach().withRole(Role.RECEIVER.getValue()).respond();
peer.expectFlow();
peer.remoteTransfer().withHandle(0)
.withDeliveryId(0)
.withDeliveryTag(new byte[] { 1 })
.withMore(false)
.withMessageFormat(0)
.withPayload(payload).queue();
peer.start();
URI remoteURI = peer.getServerURI();
LOG.info("Test started, peer listening on: {}", remoteURI);
final Client container = Client.create();
final Connection connection = container.connect(remoteURI.getHost(), remoteURI.getPort());
final StreamReceiverOptions options = new StreamReceiverOptions().autoAccept(false);
final StreamReceiver receiver = connection.openStreamReceiver("test-queue", options);
peer.waitForScriptToComplete(5, TimeUnit.SECONDS);
peer.expectDisposition().withState().rejected("decode-error", "failed reading message header");
final StreamDelivery delivery = receiver.receive();
final StreamReceiverMessage message = delivery.message();
assertThrows(ClientException.class, () -> delivery.annotations());
assertThrows(ClientException.class, () -> message.body());
delivery.reject("decode-error", "failed reading message header");
peer.expectDetach().respond();
peer.expectEnd().respond();
peer.expectClose().respond();
receiver.close();
connection.close();
peer.waitForScriptToComplete(5, TimeUnit.SECONDS);
}
}
@Test
public void testStreamDeliveryHandlesInvalidMessageAnnotationsEncoding() throws Exception {
final byte[] payload = createInvalidMessageAnnotationsEncoding();
try (ProtonTestServer peer = new ProtonTestServer()) {
peer.expectSASLAnonymousConnect();
peer.expectOpen().respond();
peer.expectBegin().respond();
peer.expectAttach().withRole(Role.RECEIVER.getValue()).respond();
peer.expectFlow();
peer.remoteTransfer().withHandle(0)
.withDeliveryId(0)
.withDeliveryTag(new byte[] { 1 })
.withMore(false)
.withMessageFormat(0)
.withPayload(payload).queue();
peer.start();
URI remoteURI = peer.getServerURI();
LOG.info("Test started, peer listening on: {}", remoteURI);
final Client container = Client.create();
final Connection connection = container.connect(remoteURI.getHost(), remoteURI.getPort());
final StreamReceiverOptions options = new StreamReceiverOptions().autoAccept(false);
final StreamReceiver receiver = connection.openStreamReceiver("test-queue", options);
peer.waitForScriptToComplete(5, TimeUnit.SECONDS);
peer.expectDisposition().withState().rejected("decode-error", "failed reading message header");
final StreamDelivery delivery = receiver.receive();
final StreamReceiverMessage message = delivery.message();
assertThrows(ClientException.class, () -> message.annotations());
assertThrows(ClientException.class, () -> message.body());
delivery.reject("decode-error", "failed reading message header");
peer.expectDetach().respond();
peer.expectEnd().respond();
peer.expectClose().respond();
receiver.close();
connection.close();
peer.waitForScriptToComplete(5, TimeUnit.SECONDS);
}
}
@Test
public void testStreamDeliveryHandlesInvalidPropertiesEncoding() throws Exception {
final byte[] payload = createInvalidPropertiesEncoding();
try (ProtonTestServer peer = new ProtonTestServer()) {
peer.expectSASLAnonymousConnect();
peer.expectOpen().respond();
peer.expectBegin().respond();
peer.expectAttach().withRole(Role.RECEIVER.getValue()).respond();
peer.expectFlow();
peer.remoteTransfer().withHandle(0)
.withDeliveryId(0)
.withDeliveryTag(new byte[] { 1 })
.withMore(false)
.withMessageFormat(0)
.withPayload(payload).queue();
peer.start();
URI remoteURI = peer.getServerURI();
LOG.info("Test started, peer listening on: {}", remoteURI);
final Client container = Client.create();
final Connection connection = container.connect(remoteURI.getHost(), remoteURI.getPort());
final StreamReceiverOptions options = new StreamReceiverOptions().autoAccept(false);
final StreamReceiver receiver = connection.openStreamReceiver("test-queue", options);
peer.waitForScriptToComplete(5, TimeUnit.SECONDS);
peer.expectDisposition().withState().rejected("decode-error", "failed reading message header");
final StreamDelivery delivery = receiver.receive();
final StreamReceiverMessage message = delivery.message();
assertThrows(ClientException.class, () -> message.properties());
assertThrows(ClientException.class, () -> message.body());
delivery.reject("decode-error", "failed reading message header");
peer.expectDetach().respond();
peer.expectEnd().respond();
peer.expectClose().respond();
receiver.close();
connection.close();
peer.waitForScriptToComplete(5, TimeUnit.SECONDS);
}
}
@Test
public void testStreamDeliveryHandlesInvalidApplicationPropertiesEncoding() throws Exception {
final byte[] payload = createInvalidApplicationPropertiesEncoding();
try (ProtonTestServer peer = new ProtonTestServer()) {
peer.expectSASLAnonymousConnect();
peer.expectOpen().respond();
peer.expectBegin().respond();
peer.expectAttach().withRole(Role.RECEIVER.getValue()).respond();
peer.expectFlow();
peer.remoteTransfer().withHandle(0)
.withDeliveryId(0)
.withDeliveryTag(new byte[] { 1 })
.withMore(false)
.withMessageFormat(0)
.withPayload(payload).queue();
peer.start();
URI remoteURI = peer.getServerURI();
LOG.info("Test started, peer listening on: {}", remoteURI);
final Client container = Client.create();
final Connection connection = container.connect(remoteURI.getHost(), remoteURI.getPort());
final StreamReceiverOptions options = new StreamReceiverOptions().autoAccept(false);
final StreamReceiver receiver = connection.openStreamReceiver("test-queue", options);
peer.waitForScriptToComplete(5, TimeUnit.SECONDS);
peer.expectDisposition().withState().rejected("decode-error", "failed reading message header");
final StreamDelivery delivery = receiver.receive();
final StreamReceiverMessage message = delivery.message();
assertThrows(ClientException.class, () -> message.applicationProperties());
assertThrows(ClientException.class, () -> message.body());
delivery.reject("decode-error", "failed reading message header");
peer.expectDetach().respond();
peer.expectEnd().respond();
peer.expectClose().respond();
receiver.close();
connection.close();
peer.waitForScriptToComplete(5, TimeUnit.SECONDS);
}
}
@Test
public void testStreamDeliveryHandlesInvalidHeaderEncodingDuringBodyStreamOpen() throws Exception {
final byte[] payload = createInvalidHeaderEncoding();
try (ProtonTestServer peer = new ProtonTestServer()) {
peer.expectSASLAnonymousConnect();
peer.expectOpen().respond();
peer.expectBegin().respond();
peer.expectAttach().withRole(Role.RECEIVER.getValue()).respond();
peer.expectFlow();
peer.remoteTransfer().withHandle(0)
.withDeliveryId(0)
.withDeliveryTag(new byte[] { 1 })
.withMore(false)
.withMessageFormat(0)
.withPayload(payload).queue();
peer.start();
URI remoteURI = peer.getServerURI();
LOG.info("Test started, peer listening on: {}", remoteURI);
final Client container = Client.create();
final Connection connection = container.connect(remoteURI.getHost(), remoteURI.getPort());
final StreamReceiverOptions options = new StreamReceiverOptions().autoAccept(false);
final StreamReceiver receiver = connection.openStreamReceiver("test-queue", options);
peer.waitForScriptToComplete(5, TimeUnit.SECONDS);
peer.expectDisposition().withState().rejected("decode-error", "failed reading message header");
final StreamDelivery delivery = receiver.receive();
StreamReceiverMessage message = delivery.message();
assertThrows(ClientException.class, () -> message.body());
delivery.reject("decode-error", "failed reading message header");
peer.expectDetach().respond();
peer.expectEnd().respond();
peer.expectClose().respond();
receiver.close();
connection.close();
peer.waitForScriptToComplete(5, TimeUnit.SECONDS);
}
}
@Test
public void testConnectionDropsDuringStreamedBodyRead() throws Exception {
final byte[] body1 = new byte[] { 0, 1, 2, 3, 4, 5, 6, 7, 8, 9 };
final byte[] body2 = new byte[] { 10, 9, 8, 7, 6, 5, 4, 3, 2, 1, 0 };
final byte[] payload1 = createEncodedMessage(new Data(body1));
final byte[] payload2 = createEncodedMessage(new Data(body2));
final CountDownLatch disconnected = new CountDownLatch(1);
try (ProtonTestServer peer = new ProtonTestServer()) {
peer.expectSASLAnonymousConnect();
peer.expectOpen().withMaxFrameSize(1000).respond();
peer.expectBegin().withIncomingWindow(1).respond();
peer.expectAttach().ofReceiver().respond();
peer.expectFlow().withIncomingWindow(1).withLinkCredit(1);
peer.remoteTransfer().withHandle(0)
.withDeliveryId(0)
.withDeliveryTag(new byte[] { 1 })
.withMore(true)
.withMessageFormat(0)
.withPayload(payload1).queue();
peer.start();
URI remoteURI = peer.getServerURI();
LOG.info("Test started, peer listening on: {}", remoteURI);
Client container = Client.create();
ConnectionOptions connectionOptions = new ConnectionOptions().maxFrameSize(1000);
connectionOptions.disconnectedHandler((conn, event) -> disconnected.countDown());
Connection connection = container.connect(remoteURI.getHost(), remoteURI.getPort(), connectionOptions);
StreamReceiverOptions streamOptions = new StreamReceiverOptions().readBufferSize(2000).creditWindow(1);
StreamReceiver receiver = connection.openStreamReceiver("test-queue", streamOptions);
StreamDelivery delivery = receiver.receive();
StreamReceiverMessage message = delivery.message();
// Creating the input stream instance should read the first chunk of data from the incoming
// delivery which should result in a new credit being available to expand the session window.
// An additional transfer should be placed into the delivery buffer but not yet read since
// the user hasn't read anything.
peer.waitForScriptToComplete(5, TimeUnit.SECONDS);
peer.expectFlow().withDeliveryCount(0).withIncomingWindow(1).withLinkCredit(1);
peer.remoteTransfer().withHandle(0)
.withDeliveryId(0)
.withMore(true)
.withMessageFormat(0)
.withPayload(payload2).queue();
peer.dropAfterLastHandler();
InputStream bodyStream = message.body();
assertNotNull(bodyStream);
assertTrue(disconnected.await(5, TimeUnit.SECONDS));
byte[] readPayload = new byte[body1.length + body2.length];
try {
bodyStream.read(readPayload);
fail("Should not be able to read from closed connection stream");
} catch (IOException ioe) {
// Connection should be down now.
}
bodyStream.close();
peer.waitForScriptToComplete(5, TimeUnit.SECONDS);
}
}
@Test
public void testFrameSizeViolationWhileWaitingForIncomingStreamReceiverContent() throws Exception {
byte[] overFrameSizeLimitFrameHeader = new byte[] { 0x00, (byte) 0xA0, 0x00, 0x00, 0x02, 0x00, 0x00, 0x00 };
final byte[] body = new byte[] { 0, 1, 2, 3, 4, 5, 6, 7, 8, 9 };
final byte[] payload = createEncodedMessage(new Data(body));
try (ProtonTestServer peer = new ProtonTestServer()) {
peer.expectSASLAnonymousConnect();
peer.expectOpen().withMaxFrameSize(65535).respond();
peer.expectBegin().respond();
peer.expectAttach().ofReceiver().respond();
peer.expectFlow().withLinkCredit(1);
peer.remoteTransfer().withHandle(0)
.withDeliveryId(0)
.withDeliveryTag(new byte[] { 1 })
.withMore(true)
.withMessageFormat(0)
.withPayload(payload).queue();
peer.start();
URI remoteURI = peer.getServerURI();
LOG.info("Test started, peer listening on: {}", remoteURI);
Client container = Client.create();
ConnectionOptions connectionOptions = new ConnectionOptions().maxFrameSize(65535);
Connection connection = container.connect(remoteURI.getHost(), remoteURI.getPort(), connectionOptions);
StreamReceiverOptions streamOptions = new StreamReceiverOptions().creditWindow(1);
StreamReceiver receiver = connection.openStreamReceiver("test-queue", streamOptions);
StreamDelivery delivery = receiver.receive();
StreamReceiverMessage message = delivery.message();
InputStream stream = message.body();
peer.waitForScriptToComplete();
peer.expectClose().respond();
peer.remoteBytes().withBytes(overFrameSizeLimitFrameHeader).later(10);
byte[] bytesToRead = new byte[body.length * 2];
try {
stream.read(bytesToRead);
fail("Should throw an error indicating issue with read of payload");
} catch (IOException ioe) {
// Expected
}
peer.waitForScriptToComplete(5, TimeUnit.SECONDS);
}
}
@Test
public void testStreamReceiverTryReadAmqpSequenceBytes() throws Exception {
final List<String> stringList = new ArrayList<>();
stringList.add("Hello World");
final byte[] payload = createEncodedMessage(new AmqpSequence<>(stringList));
doTestStreamReceiverReadsNonDataSectionBody(payload);
}
@Test
public void testStreamReceiverTryReadAmqpValueBytes() throws Exception {
final byte[] payload = createEncodedMessage(new AmqpValue<>("Hello World"));
doTestStreamReceiverReadsNonDataSectionBody(payload);
}
private void doTestStreamReceiverReadsNonDataSectionBody(byte[] payload) throws Exception {
try (ProtonTestServer peer = new ProtonTestServer()) {
peer.expectSASLAnonymousConnect();
peer.expectOpen().respond();
peer.expectBegin().respond();
peer.expectAttach().withRole(Role.RECEIVER.getValue()).respond();
peer.expectFlow();
peer.remoteTransfer().withHandle(0)
.withDeliveryId(0)
.withDeliveryTag(new byte[] { 1 })
.withSettled(true)
.withMore(false)
.withMessageFormat(0)
.withPayload(payload).queue();
peer.start();
URI remoteURI = peer.getServerURI();
LOG.info("Test started, peer listening on: {}", remoteURI);
final Client container = Client.create();
final Connection connection = container.connect(remoteURI.getHost(), remoteURI.getPort());
final StreamReceiver receiver = connection.openStreamReceiver("test-queue");
peer.waitForScriptToComplete(5, TimeUnit.SECONDS);
final StreamDelivery delivery = receiver.receive();
final StreamReceiverMessage message = delivery.message();
try {
message.body();
fail("Should not return a stream since we cannot read this type");
} catch (ClientException cliEx) {
// Expected
}
peer.expectDetach().respond();
peer.expectEnd().respond();
peer.expectClose().respond();
receiver.close();
connection.close();
peer.waitForScriptToComplete(5, TimeUnit.SECONDS);
}
}
@Test
public void testReadMessageHeaderFromStreamReceiverMessage() throws Exception {
final Header header = new Header();
header.setDeliveryCount(UnsignedInteger.MAX_VALUE.longValue());
header.setDurable(true);
header.setFirstAcquirer(false);
header.setPriority((byte) 255);
header.setTimeToLive(Integer.MAX_VALUE);
final byte[] payload = createEncodedMessage(header);
try (ProtonTestServer peer = new ProtonTestServer()) {
peer.expectSASLAnonymousConnect();
peer.expectOpen().respond();
peer.expectBegin().respond();
peer.expectAttach().withRole(Role.RECEIVER.getValue()).respond();
peer.expectFlow();
peer.remoteTransfer().withHandle(0)
.withDeliveryId(0)
.withDeliveryTag(new byte[] { 1 })
.withMore(false)
.withMessageFormat(0)
.withPayload(payload).queue();
peer.expectDisposition().withFirst(0).withState().accepted().withSettled(true);
peer.start();
URI remoteURI = peer.getServerURI();
LOG.info("Test started, peer listening on: {}", remoteURI);
final Client container = Client.create();
final Connection connection = container.connect(remoteURI.getHost(), remoteURI.getPort());
final StreamReceiver receiver = connection.openStreamReceiver("test-queue");
final StreamDelivery delivery = receiver.receive();
assertNotNull(delivery);
assertTrue(delivery.completed());
assertFalse(delivery.aborted());
StreamReceiverMessage message = delivery.message();
assertNotNull(message);
Header readHeader = message.header();
assertNotNull(readHeader);
assertNull(message.body());
assertEquals(Integer.toUnsignedLong(Integer.MAX_VALUE), message.timeToLive());
assertEquals(true, message.durable());
assertEquals(false, message.firstAcquirer());
assertEquals((byte) 255, message.priority());
assertEquals(Integer.toUnsignedLong(Integer.MAX_VALUE), message.timeToLive());
peer.expectDetach().respond();
peer.expectEnd().respond();
peer.expectClose().respond();
receiver.closeAsync().get();
connection.closeAsync().get();
peer.waitForScriptToComplete(5, TimeUnit.SECONDS);
}
}
@Test
public void testReadMessagePropertiesFromStreamReceiverMessage() throws Exception {
final Properties properties = new Properties();
properties.setAbsoluteExpiryTime(Integer.MAX_VALUE);
properties.setContentEncoding("utf8");
properties.setContentType("text/plain");
properties.setCorrelationId(new Binary(new byte[] { 1, 2, 3 }));
properties.setCreationTime(Short.MAX_VALUE);
properties.setGroupId("Group");
properties.setGroupSequence(UnsignedInteger.MAX_VALUE.longValue());
properties.setMessageId(UUID.randomUUID());
properties.setReplyTo("replyTo");
properties.setReplyToGroupId("group-1");
properties.setSubject("test");
properties.setTo("queue");
properties.setUserId(new byte[] { 0, 1, 5, 6, 9 });
final byte[] payload = createEncodedMessage(new Header(), properties);
try (ProtonTestServer peer = new ProtonTestServer()) {
peer.expectSASLAnonymousConnect();
peer.expectOpen().respond();
peer.expectBegin().respond();
peer.expectAttach().withRole(Role.RECEIVER.getValue()).respond();
peer.expectFlow();
peer.remoteTransfer().withHandle(0)
.withDeliveryId(0)
.withDeliveryTag(new byte[] { 1 })
.withMore(false)
.withMessageFormat(0)
.withPayload(payload).queue();
peer.expectDisposition().withFirst(0).withState().accepted().withSettled(true);
peer.start();
URI remoteURI = peer.getServerURI();
LOG.info("Test started, peer listening on: {}", remoteURI);
final Client container = Client.create();
final Connection connection = container.connect(remoteURI.getHost(), remoteURI.getPort());
final StreamReceiver receiver = connection.openStreamReceiver("test-queue");
final StreamDelivery delivery = receiver.receive();
assertNotNull(delivery);
assertTrue(delivery.completed());
assertFalse(delivery.aborted());
StreamReceiverMessage message = delivery.message();
assertNotNull(message);
assertFalse(message.hasProperties());
assertFalse(message.hasFooters());
assertFalse(message.hasAnnotations());
Properties readProperties = message.properties();
assertNotNull(readProperties);
Header header = message.header();
assertNotNull(header);
assertNull(message.body());
assertEquals(Integer.MAX_VALUE, message.absoluteExpiryTime());
assertEquals("utf8", message.contentEncoding());
assertEquals("text/plain", message.contentType());
assertEquals(new Binary(new byte[] { 1, 2, 3 }), message.correlationId());
assertEquals("utf8", message.contentEncoding());
assertEquals("utf8", message.contentEncoding());
assertEquals("utf8", message.contentEncoding());
assertEquals(Short.MAX_VALUE, message.creationTime());
assertEquals(UnsignedInteger.MAX_VALUE.intValue(), message.groupSequence());
assertEquals(properties.getMessageId(), message.messageId());
assertEquals("replyTo", message.replyTo());
assertEquals("group-1", message.replyToGroupId());
assertEquals("test", message.subject());
assertEquals("queue", message.to());
assertArrayEquals(new byte[] { 0, 1, 5, 6, 9 }, message.userId());
peer.expectDetach().respond();
peer.expectEnd().respond();
peer.expectClose().respond();
receiver.closeAsync().get();
connection.closeAsync().get();
peer.waitForScriptToComplete(5, TimeUnit.SECONDS);
}
}
@Test
public void testReadApplicationPropertiesStreamReceiverMessage() throws Exception {
final Map<String, Object> propertiesMap = new HashMap<>();
final ApplicationProperties appProperties = new ApplicationProperties(propertiesMap);
propertiesMap.put("property1", UnsignedInteger.MAX_VALUE);
propertiesMap.put("property2", UnsignedInteger.ONE);
propertiesMap.put("property3", UnsignedInteger.ZERO);
final byte[] payload = createEncodedMessage(appProperties);
try (ProtonTestServer peer = new ProtonTestServer()) {
peer.expectSASLAnonymousConnect();
peer.expectOpen().respond();
peer.expectBegin().respond();
peer.expectAttach().withRole(Role.RECEIVER.getValue()).respond();
peer.expectFlow();
peer.remoteTransfer().withHandle(0)
.withDeliveryId(0)
.withDeliveryTag(new byte[] { 1 })
.withMore(false)
.withMessageFormat(0)
.withPayload(payload).queue();
peer.expectDisposition().withFirst(0).withState().accepted().withSettled(true);
peer.start();
URI remoteURI = peer.getServerURI();
LOG.info("Test started, peer listening on: {}", remoteURI);
final Client container = Client.create();
final Connection connection = container.connect(remoteURI.getHost(), remoteURI.getPort());
final StreamReceiver receiver = connection.openStreamReceiver("test-queue");
final StreamDelivery delivery = receiver.receive();
assertNotNull(delivery);
assertTrue(delivery.completed());
assertFalse(delivery.aborted());
StreamReceiverMessage message = delivery.message();
assertNotNull(message);
assertTrue(message.hasProperties());
assertFalse(message.hasFooters());
assertFalse(message.hasAnnotations());
assertFalse(message.hasProperty("property"));
assertEquals(UnsignedInteger.MAX_VALUE, message.property("property1"));
assertEquals(UnsignedInteger.ONE, message.property("property2"));
assertEquals(UnsignedInteger.ZERO, message.property("property3"));
message.forEachProperty((key, value) -> {
assertTrue(propertiesMap.containsKey(key));
assertEquals(value, propertiesMap.get(key));
});
peer.expectDetach().respond();
peer.expectEnd().respond();
peer.expectClose().respond();
receiver.closeAsync().get();
connection.closeAsync().get();
peer.waitForScriptToComplete(5, TimeUnit.SECONDS);
}
}
@Test
public void testDrainFutureSignalsFailureWhenDrainTimeoutExceeded() throws Exception {
try (ProtonTestServer peer = new ProtonTestServer()) {
peer.expectSASLAnonymousConnect();
peer.expectOpen().respond();
peer.expectBegin().respond();
peer.expectAttach().ofReceiver().respond();
peer.expectFlow();
peer.expectFlow().withDrain(true);
peer.expectClose().respond();
peer.start();
URI remoteURI = peer.getServerURI();
LOG.info("Test started, peer listening on: {}", remoteURI);
Client container = Client.create();
Connection connection = container.connect(remoteURI.getHost(), remoteURI.getPort());
StreamReceiverOptions receiverOptions = new StreamReceiverOptions().drainTimeout(15);
Receiver receiver = connection.openStreamReceiver("test-queue", receiverOptions).openFuture().get();
try {
receiver.drain().get();
fail("Drain call should fail timeout exceeded.");
} catch (ExecutionException cliEx) {
LOG.debug("Receiver threw error on drain call", cliEx);
assertTrue(cliEx.getCause() instanceof ClientOperationTimedOutException);
}
connection.closeAsync().get();
peer.waitForScriptToComplete(5, TimeUnit.SECONDS);
}
}
@Test
public void testDrainFutureSignalsFailureWhenConnectionDrainTimeoutExceeded() throws Exception {
try (ProtonTestServer peer = new ProtonTestServer()) {
peer.expectSASLAnonymousConnect();
peer.expectOpen().respond();
peer.expectBegin().respond();
peer.expectAttach().ofReceiver().respond();
peer.expectFlow();
peer.expectFlow().withDrain(true);
peer.expectClose().respond();
peer.start();
URI remoteURI = peer.getServerURI();
LOG.info("Test started, peer listening on: {}", remoteURI);
Client container = Client.create();
ConnectionOptions connectionOptions = new ConnectionOptions().drainTimeout(20);
Connection connection = container.connect(remoteURI.getHost(), remoteURI.getPort(), connectionOptions);
Receiver receiver = connection.openStreamReceiver("test-queue").openFuture().get();
try {
receiver.drain().get();
fail("Drain call should fail timeout exceeded.");
} catch (ExecutionException cliEx) {
LOG.debug("Receiver threw error on drain call", cliEx);
assertTrue(cliEx.getCause() instanceof ClientOperationTimedOutException);
}
connection.closeAsync().get();
peer.waitForScriptToComplete(5, TimeUnit.SECONDS);
}
}
@Test
public void testDrainCompletesWhenReceiverHasNoCredit() throws Exception {
try (ProtonTestServer peer = new ProtonTestServer()) {
peer.expectSASLAnonymousConnect();
peer.expectOpen().respond();
peer.expectBegin().respond();
peer.expectAttach().withRole(Role.RECEIVER.getValue()).respond();
peer.start();
URI remoteURI = peer.getServerURI();
LOG.info("Test started, peer listening on: {}", remoteURI);
Client container = Client.create();
Connection connection = container.connect(remoteURI.getHost(), remoteURI.getPort());
Receiver receiver = connection.openStreamReceiver("test-queue", new StreamReceiverOptions().creditWindow(0));
receiver.openFuture().get(5, TimeUnit.SECONDS);
peer.waitForScriptToComplete(5, TimeUnit.SECONDS);
Future<? extends Receiver> draining = receiver.drain();
draining.get(5, TimeUnit.SECONDS);
// Close things down
peer.expectClose().respond();
connection.closeAsync().get(5, TimeUnit.SECONDS);
peer.waitForScriptToComplete(1, TimeUnit.SECONDS);
}
}
@Test
public void testDrainAdditionalDrainCallThrowsWhenReceiverStillDraining() throws Exception {
try (ProtonTestServer peer = new ProtonTestServer()) {
peer.expectSASLAnonymousConnect();
peer.expectOpen().respond();
peer.expectBegin().respond();
peer.expectAttach().ofReceiver().respond();
peer.expectFlow();
peer.expectFlow().withDrain(true);
peer.expectClose().respond();
peer.start();
URI remoteURI = peer.getServerURI();
LOG.info("Test started, peer listening on: {}", remoteURI);
Client container = Client.create();
Connection connection = container.connect(remoteURI.getHost(), remoteURI.getPort());
StreamReceiverOptions receiverOptions = new StreamReceiverOptions();
Receiver receiver = connection.openStreamReceiver("test-queue", receiverOptions).openFuture().get();
receiver.drain();
try {
receiver.drain().get();
fail("Drain call should fail timeout exceeded.");
} catch (ExecutionException cliEx) {
LOG.debug("Receiver threw error on drain call", cliEx);
assertTrue(cliEx.getCause() instanceof ClientIllegalStateException);
}
connection.closeAsync().get();
peer.waitForScriptToComplete(5, TimeUnit.SECONDS);
}
}
@Test
public void testReceiverGetRemotePropertiesWaitsForRemoteAttach() throws Exception {
tryReadReceiverRemoteProperties(true);
}
@Test
public void testReceiverGetRemotePropertiesFailsAfterOpenTimeout() throws Exception {
tryReadReceiverRemoteProperties(false);
}
private void tryReadReceiverRemoteProperties(boolean attachResponse) throws Exception {
try (ProtonTestServer peer = new ProtonTestServer()) {
peer.expectSASLAnonymousConnect();
peer.expectOpen().respond();
peer.expectBegin().respond();
peer.expectAttach().withRole(Role.RECEIVER.getValue());
peer.expectFlow();
peer.start();
URI remoteURI = peer.getServerURI();
LOG.info("Test started, peer listening on: {}", remoteURI);
Client container = Client.create();
Connection connection = container.connect(remoteURI.getHost(), remoteURI.getPort());
StreamReceiverOptions options = new StreamReceiverOptions().openTimeout(150, TimeUnit.MILLISECONDS);
StreamReceiver receiver = connection.openStreamReceiver("test-receiver", options);
peer.waitForScriptToComplete(5, TimeUnit.SECONDS);
Map<String, Object> expectedProperties = new HashMap<>();
expectedProperties.put("TEST", "test-property");
if (attachResponse) {
peer.expectDetach().respond();
peer.expectEnd().respond();
peer.respondToLastAttach().withPropertiesMap(expectedProperties).later(10);
} else {
peer.expectDetach();
peer.expectEnd();
}
if (attachResponse) {
assertNotNull(receiver.properties(), "Remote should have responded with a remote properties value");
assertEquals(expectedProperties, receiver.properties());
} else {
try {
receiver.properties();
fail("Should failed to get remote state due to no attach response");
} catch (ClientException ex) {
LOG.debug("Caught expected exception from blocking call", ex);
}
}
try {
receiver.closeAsync().get();
} catch (ExecutionException ex) {
LOG.debug("Caught unexpected exception from close call", ex);
fail("Should not fail to close when connection not closed and detach sent");
}
peer.expectClose().respond();
connection.closeAsync().get();
peer.waitForScriptToComplete(5, TimeUnit.SECONDS);
}
}
@Test
public void testReceiverGetRemoteOfferedCapabilitiesWaitsForRemoteAttach() throws Exception {
tryReadReceiverRemoteOfferedCapabilities(true);
}
@Test
public void testReceiverGetRemoteOfferedCapabilitiesFailsAfterOpenTimeout() throws Exception {
tryReadReceiverRemoteOfferedCapabilities(false);
}
private void tryReadReceiverRemoteOfferedCapabilities(boolean attachResponse) throws Exception {
try (ProtonTestServer peer = new ProtonTestServer()) {
peer.expectSASLAnonymousConnect();
peer.expectOpen().respond();
peer.expectBegin().respond();
peer.expectAttach().withRole(Role.RECEIVER.getValue());
peer.expectFlow();
peer.start();
URI remoteURI = peer.getServerURI();
LOG.info("Test started, peer listening on: {}", remoteURI);
Client container = Client.create();
Connection connection = container.connect(remoteURI.getHost(), remoteURI.getPort());
StreamReceiverOptions options = new StreamReceiverOptions().openTimeout(150, TimeUnit.MILLISECONDS);
StreamReceiver receiver = connection.openStreamReceiver("test-receiver", options);
peer.waitForScriptToComplete(5, TimeUnit.SECONDS);
if (attachResponse) {
peer.expectDetach().respond();
peer.expectEnd().respond();
peer.respondToLastAttach().withOfferedCapabilities("QUEUE").later(10);
} else {
peer.expectDetach();
peer.expectEnd();
}
if (attachResponse) {
assertNotNull(receiver.offeredCapabilities(), "Remote should have responded with a remote offered Capabilities value");
assertEquals(1, receiver.offeredCapabilities().length);
assertEquals("QUEUE", receiver.offeredCapabilities()[0]);
} else {
try {
receiver.offeredCapabilities();
fail("Should failed to get remote state due to no attach response");
} catch (ClientException ex) {
LOG.debug("Caught expected exception from blocking call", ex);
}
}
try {
receiver.closeAsync().get();
} catch (ExecutionException ex) {
LOG.debug("Caught unexpected exception from close call", ex);
fail("Should not fail to close when connection not closed and detach sent");
}
peer.expectClose().respond();
connection.closeAsync().get();
peer.waitForScriptToComplete(5, TimeUnit.SECONDS);
}
}
@Test
public void testReceiverGetRemoteDesiredCapabilitiesWaitsForRemoteAttach() throws Exception {
tryReadReceiverRemoteDesiredCapabilities(true);
}
@Test
public void testReceiverGetRemoteDesiredCapabilitiesFailsAfterOpenTimeout() throws Exception {
tryReadReceiverRemoteDesiredCapabilities(false);
}
private void tryReadReceiverRemoteDesiredCapabilities(boolean attachResponse) throws Exception {
try (ProtonTestServer peer = new ProtonTestServer()) {
peer.expectSASLAnonymousConnect();
peer.expectOpen().respond();
peer.expectBegin().respond();
peer.expectAttach().withRole(Role.RECEIVER.getValue());
peer.expectFlow();
peer.start();
URI remoteURI = peer.getServerURI();
LOG.info("Test started, peer listening on: {}", remoteURI);
Client container = Client.create();
Connection connection = container.connect(remoteURI.getHost(), remoteURI.getPort());
StreamReceiverOptions options = new StreamReceiverOptions().openTimeout(150, TimeUnit.MILLISECONDS);
StreamReceiver receiver = connection.openStreamReceiver("test-receiver", options);
peer.waitForScriptToComplete(5, TimeUnit.SECONDS);
if (attachResponse) {
peer.expectDetach().respond();
peer.expectEnd().respond();
peer.respondToLastAttach().withDesiredCapabilities("Error-Free").later(10);
} else {
peer.expectDetach();
peer.expectEnd();
}
if (attachResponse) {
assertNotNull(receiver.desiredCapabilities(), "Remote should have responded with a remote desired Capabilities value");
assertEquals(1, receiver.desiredCapabilities().length);
assertEquals("Error-Free", receiver.desiredCapabilities()[0]);
} else {
try {
receiver.desiredCapabilities();
fail("Should failed to get remote state due to no attach response");
} catch (ClientException ex) {
LOG.debug("Caught expected exception from blocking call", ex);
}
}
try {
receiver.closeAsync().get();
} catch (ExecutionException ex) {
LOG.debug("Caught unexpected exception from close call", ex);
fail("Should not fail to close when connection not closed and detach sent");
}
peer.expectClose().respond();
connection.closeAsync().get();
peer.waitForScriptToComplete(5, TimeUnit.SECONDS);
}
}
@Test
public void testReceiverGetTargetWaitsForRemoteAttach() throws Exception {
tryReadReceiverTarget(true);
}
@Test
public void testReceiverGetTargetFailsAfterOpenTimeout() throws Exception {
tryReadReceiverTarget(false);
}
private void tryReadReceiverTarget(boolean attachResponse) throws Exception {
try (ProtonTestServer peer = new ProtonTestServer()) {
peer.expectSASLAnonymousConnect();
peer.expectOpen().respond();
peer.expectBegin().respond();
peer.expectAttach().withRole(Role.RECEIVER.getValue());
peer.expectFlow();
peer.start();
URI remoteURI = peer.getServerURI();
LOG.info("Test started, peer listening on: {}", remoteURI);
Client container = Client.create();
Connection connection = container.connect(remoteURI.getHost(), remoteURI.getPort());
StreamReceiverOptions options = new StreamReceiverOptions().openTimeout(150, TimeUnit.MILLISECONDS);
StreamReceiver receiver = connection.openStreamReceiver("test-receiver", options);
peer.waitForScriptToComplete(5, TimeUnit.SECONDS);
if (attachResponse) {
peer.expectDetach().respond();
peer.expectEnd().respond();
peer.respondToLastAttach().later(10);
} else {
peer.expectDetach();
peer.expectEnd();
}
if (attachResponse) {
assertNotNull(receiver.target(), "Remote should have responded with a Target value");
} else {
try {
receiver.target();
fail("Should failed to get remote source due to no attach response");
} catch (ClientException ex) {
LOG.debug("Caught expected exception from blocking call", ex);
}
}
try {
receiver.closeAsync().get();
} catch (ExecutionException ex) {
LOG.debug("Caught unexpected exception from close call", ex);
fail("Should not fail to close when connection not closed and detach sent");
}
peer.expectClose().respond();
connection.closeAsync().get();
peer.waitForScriptToComplete(5, TimeUnit.SECONDS);
}
}
@Test
public void testReceiverGetSourceWaitsForRemoteAttach() throws Exception {
tryReadReceiverSource(true);
}
@Test
public void testReceiverGetSourceFailsAfterOpenTimeout() throws Exception {
tryReadReceiverSource(false);
}
private void tryReadReceiverSource(boolean attachResponse) throws Exception {
try (ProtonTestServer peer = new ProtonTestServer()) {
peer.expectSASLAnonymousConnect();
peer.expectOpen().respond();
peer.expectBegin().respond();
peer.expectAttach().withRole(Role.RECEIVER.getValue());
peer.expectFlow();
peer.start();
URI remoteURI = peer.getServerURI();
LOG.info("Test started, peer listening on: {}", remoteURI);
Client container = Client.create();
Connection connection = container.connect(remoteURI.getHost(), remoteURI.getPort());
StreamReceiverOptions options = new StreamReceiverOptions().openTimeout(150, TimeUnit.MILLISECONDS);
StreamReceiver receiver = connection.openStreamReceiver("test-receiver", options);
peer.waitForScriptToComplete(5, TimeUnit.SECONDS);
if (attachResponse) {
peer.expectDetach().respond();
peer.expectEnd().respond();
peer.respondToLastAttach().later(10);
} else {
peer.expectDetach();
peer.expectEnd();
}
if (attachResponse) {
assertNotNull(receiver.source(), "Remote should have responded with a Source value");
assertEquals("test-receiver", receiver.source().address());
} else {
try {
receiver.source();
fail("Should failed to get remote source due to no attach response");
} catch (ClientException ex) {
LOG.debug("Caught expected exception from blocking call", ex);
}
}
try {
receiver.closeAsync().get();
} catch (ExecutionException ex) {
LOG.debug("Caught unexpected exception from close call", ex);
fail("Should not fail to close when connection not closed and detach sent");
}
peer.expectClose().respond();
connection.closeAsync().get();
peer.waitForScriptToComplete(5, TimeUnit.SECONDS);
}
}
@Test
public void testReceiverCreditReplenishedAfterSyncReceiveAutoAccept() throws Exception {
doTestReceiverCreditReplenishedAfterSyncReceive(true);
}
@Test
public void testReceiverCreditReplenishedAfterSyncReceiveManualAccept() throws Exception {
doTestReceiverCreditReplenishedAfterSyncReceive(false);
}
public void doTestReceiverCreditReplenishedAfterSyncReceive(boolean autoAccept) throws Exception {
byte[] payload = createEncodedMessage(new AmqpValue<String>("Hello World"));
try (ProtonTestServer peer = new ProtonTestServer()) {
peer.expectSASLAnonymousConnect();
peer.expectOpen().respond();
peer.expectBegin().respond();
peer.expectAttach().ofReceiver().respond();
peer.expectFlow().withLinkCredit(10);
for (int i = 0; i < 10; ++i) {
peer.remoteTransfer().withDeliveryId(i)
.withMore(false)
.withMessageFormat(0)
.withPayload(payload).queue();
}
peer.start();
URI remoteURI = peer.getServerURI();
LOG.info("Test started, peer listening on: {}", remoteURI);
Client container = Client.create();
Connection connection = container.connect(remoteURI.getHost(), remoteURI.getPort());
StreamReceiverOptions options = new StreamReceiverOptions();
options.autoAccept(autoAccept);
options.creditWindow(10);
StreamReceiver receiver = connection.openStreamReceiver("test-receiver", options);
Wait.waitFor(() -> receiver.queuedDeliveries() == 10);
peer.waitForScriptToComplete();
if (autoAccept)
{
peer.expectDisposition();
peer.expectDisposition();
}
// Consume messages 1 and 2 which should not provoke credit replenishment
// as there are still 8 outstanding which is above the 70% mark
assertNotNull(receiver.receive()); // #1
assertNotNull(receiver.receive()); // #2
peer.waitForScriptToComplete();
if (autoAccept)
{
peer.expectDisposition();
}
peer.expectFlow().withLinkCredit(3);
// Now consume message 3 which will trip the replenish barrier and the
// credit should be updated to reflect that we still have 7 queued
assertNotNull(receiver.receive()); // #3
peer.waitForScriptToComplete();
if (autoAccept)
{
peer.expectDisposition();
peer.expectDisposition();
}
// Consume messages 4 and 5 which should not provoke credit replenishment
// as there are still 5 outstanding plus the credit we sent last time
// which is above the 70% mark
assertNotNull(receiver.receive()); // #4
assertNotNull(receiver.receive()); // #5
peer.waitForScriptToComplete();
if (autoAccept)
{
peer.expectDisposition();
}
peer.expectFlow().withLinkCredit(6);
// Consume number 6 which means we only have 4 outstanding plus the three
// that we sent last time we flowed which is 70% of possible prefetch so
// we should flow to top off credit which would be 6 since we have four
// still pending
assertNotNull(receiver.receive()); // #6
peer.waitForScriptToComplete();
if (autoAccept)
{
peer.expectDisposition();
peer.expectDisposition();
}
// Consume deliveries 7 and 8 which should not flow as we should be
// above the threshold of 70% since we would now have 2 outstanding
// and 6 credits on the link
assertNotNull(receiver.receive()); // #7
assertNotNull(receiver.receive()); // #8
peer.waitForScriptToComplete();
if (autoAccept)
{
peer.expectDisposition();
peer.expectDisposition();
}
// Now consume 9 and 10 but we still shouldn't flow more credit because
// the link credit is above the 50% mark for overall credit windowing.
assertNotNull(receiver.receive()); // #9
assertNotNull(receiver.receive()); // #10
peer.waitForScriptToComplete();
peer.expectClose().respond();
connection.close();
peer.waitForScriptToComplete();
}
}
private byte[] createInvalidHeaderEncoding() {
final byte[] buffer = new byte[12];
buffer[0] = 0; // Described Type Indicator
buffer[1] = EncodingCodes.SMALLULONG;
buffer[2] = Header.DESCRIPTOR_CODE.byteValue();
buffer[3] = EncodingCodes.MAP32; // Should be list based
return buffer;
}
private byte[] createInvalidDeliveryAnnotationsEncoding() {
final byte[] buffer = new byte[12];
buffer[0] = 0; // Described Type Indicator
buffer[1] = EncodingCodes.SMALLULONG;
buffer[2] = DeliveryAnnotations.DESCRIPTOR_CODE.byteValue();
buffer[3] = EncodingCodes.LIST32; // Should be Map based
return buffer;
}
private byte[] createInvalidMessageAnnotationsEncoding() {
final byte[] buffer = new byte[12];
buffer[0] = 0; // Described Type Indicator
buffer[1] = EncodingCodes.SMALLULONG;
buffer[2] = MessageAnnotations.DESCRIPTOR_CODE.byteValue();
buffer[3] = EncodingCodes.LIST32; // Should be Map based
return buffer;
}
private byte[] createInvalidPropertiesEncoding() {
final byte[] buffer = new byte[12];
buffer[0] = 0; // Described Type Indicator
buffer[1] = EncodingCodes.SMALLULONG;
buffer[2] = Properties.DESCRIPTOR_CODE.byteValue();
buffer[3] = EncodingCodes.MAP32; // Should be list based
return buffer;
}
private byte[] createInvalidApplicationPropertiesEncoding() {
final byte[] buffer = new byte[12];
buffer[0] = 0; // Described Type Indicator
buffer[1] = EncodingCodes.SMALLULONG;
buffer[2] = ApplicationProperties.DESCRIPTOR_CODE.byteValue();
buffer[3] = EncodingCodes.LIST32; // Should be map based
return buffer;
}
}