blob: 6f3cbcf9fc562fe2e240b7fd2ff889baf076b557 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.qpid.protonj2.engine.impl;
import static org.hamcrest.CoreMatchers.anyOf;
import static org.hamcrest.CoreMatchers.is;
import static org.hamcrest.CoreMatchers.notNullValue;
import static org.hamcrest.CoreMatchers.nullValue;
import static org.junit.jupiter.api.Assertions.assertDoesNotThrow;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertNotEquals;
import static org.junit.jupiter.api.Assertions.assertNotNull;
import static org.junit.jupiter.api.Assertions.assertNotSame;
import static org.junit.jupiter.api.Assertions.assertNull;
import static org.junit.jupiter.api.Assertions.assertSame;
import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.junit.jupiter.api.Assertions.fail;
import java.io.IOException;
import java.io.UncheckedIOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
import org.apache.qpid.protonj2.buffer.ProtonBuffer;
import org.apache.qpid.protonj2.buffer.ProtonByteBufferAllocator;
import org.apache.qpid.protonj2.engine.Connection;
import org.apache.qpid.protonj2.engine.DeliveryTagGenerator;
import org.apache.qpid.protonj2.engine.Engine;
import org.apache.qpid.protonj2.engine.EngineFactory;
import org.apache.qpid.protonj2.engine.OutgoingDelivery;
import org.apache.qpid.protonj2.engine.Sender;
import org.apache.qpid.protonj2.engine.Session;
import org.apache.qpid.protonj2.engine.exceptions.EngineFailedException;
import org.apache.qpid.protonj2.logging.ProtonLogger;
import org.apache.qpid.protonj2.logging.ProtonLoggerFactory;
import org.apache.qpid.protonj2.test.driver.ProtonTestConnector;
import org.apache.qpid.protonj2.test.driver.matchers.messaging.AcceptedMatcher;
import org.apache.qpid.protonj2.test.driver.matchers.messaging.ModifiedMatcher;
import org.apache.qpid.protonj2.test.driver.matchers.messaging.RejectedMatcher;
import org.apache.qpid.protonj2.test.driver.matchers.messaging.ReleasedMatcher;
import org.apache.qpid.protonj2.test.driver.matchers.transactions.TransactionalStateMatcher;
import org.apache.qpid.protonj2.types.Binary;
import org.apache.qpid.protonj2.types.DeliveryTag;
import org.apache.qpid.protonj2.types.Symbol;
import org.apache.qpid.protonj2.types.UnsignedLong;
import org.apache.qpid.protonj2.types.messaging.Accepted;
import org.apache.qpid.protonj2.types.messaging.Modified;
import org.apache.qpid.protonj2.types.messaging.Rejected;
import org.apache.qpid.protonj2.types.messaging.Released;
import org.apache.qpid.protonj2.types.messaging.Source;
import org.apache.qpid.protonj2.types.messaging.Target;
import org.apache.qpid.protonj2.types.transactions.TransactionalState;
import org.apache.qpid.protonj2.types.transport.AmqpError;
import org.apache.qpid.protonj2.types.transport.DeliveryState;
import org.apache.qpid.protonj2.types.transport.ErrorCondition;
import org.apache.qpid.protonj2.types.transport.ReceiverSettleMode;
import org.apache.qpid.protonj2.types.transport.Role;
import org.apache.qpid.protonj2.types.transport.SenderSettleMode;
import org.hamcrest.Matcher;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.Timeout;
/**
* Test the {@link ProtonSender}
*/
@Timeout(20)
public class ProtonSenderTest extends ProtonEngineTestSupport {
private static final ProtonLogger LOG = ProtonLoggerFactory.getLogger(ProtonSenderTest.class);
@Test
public void testLocalLinkStateCannotBeChangedAfterOpen() throws Exception {
Engine engine = EngineFactory.PROTON.createNonSaslEngine();
engine.errorHandler(result -> failure = result.failureCause());
ProtonTestConnector peer = createTestPeer(engine);
peer.expectAMQPHeader().respondWithAMQPHeader();
peer.expectOpen().respond().withContainerId("driver");
peer.expectBegin().respond();
peer.expectAttach().respond();
peer.expectDetach().respond();
peer.expectEnd().respond();
Connection connection = engine.start();
connection.open();
Session session = connection.session();
session.open();
Sender sender = session.sender("test");
sender.setProperties(new HashMap<>());
sender.open();
try {
sender.setProperties(new HashMap<>());
fail("Cannot alter local link initial state data after sender opened.");
} catch (IllegalStateException ise) {
// Expected
}
try {
sender.setDesiredCapabilities(new Symbol[] { AmqpError.DECODE_ERROR });
fail("Cannot alter local link initial state data after sender opened.");
} catch (IllegalStateException ise) {
// Expected
}
try {
sender.setOfferedCapabilities(new Symbol[] { AmqpError.DECODE_ERROR });
fail("Cannot alter local link initial state data after sender opened.");
} catch (IllegalStateException ise) {
// Expected
}
try {
sender.setSenderSettleMode(SenderSettleMode.MIXED);
fail("Cannot alter local link initial state data after sender opened.");
} catch (IllegalStateException ise) {
// Expected
}
try {
sender.setSource(new Source());
fail("Cannot alter local link initial state data after sender opened.");
} catch (IllegalStateException ise) {
// Expected
}
try {
sender.setTarget(new Target());
fail("Cannot alter local link initial state data after sender opened.");
} catch (IllegalStateException ise) {
// Expected
}
try {
sender.setMaxMessageSize(UnsignedLong.ZERO);
fail("Cannot alter local link initial state data after sender opened.");
} catch (IllegalStateException ise) {
// Expected
}
sender.detach();
session.close();
peer.waitForScriptToComplete();
assertNull(failure);
}
@Test
public void testSenderEmitsOpenAndCloseEvents() throws Exception {
doTestSenderEmitsEvents(false);
}
@Test
public void testSenderEmitsOpenAndDetachEvents() throws Exception {
doTestSenderEmitsEvents(true);
}
private void doTestSenderEmitsEvents(boolean detach) throws Exception {
Engine engine = EngineFactory.PROTON.createNonSaslEngine();
engine.errorHandler(result -> failure = result.failureCause());
ProtonTestConnector peer = createTestPeer(engine);
final AtomicBoolean senderLocalOpen = new AtomicBoolean();
final AtomicBoolean senderLocalClose = new AtomicBoolean();
final AtomicBoolean senderLocalDetach = new AtomicBoolean();
final AtomicBoolean senderRemoteOpen = new AtomicBoolean();
final AtomicBoolean senderRemoteClose = new AtomicBoolean();
final AtomicBoolean senderRemoteDetach = new AtomicBoolean();
peer.expectAMQPHeader().respondWithAMQPHeader();
peer.expectOpen().respond().withContainerId("driver");
peer.expectBegin().respond();
peer.expectAttach().respond();
peer.expectDetach().respond();
peer.expectEnd().respond();
Connection connection = engine.start();
connection.open();
Session session = connection.session();
session.open();
Sender sender = session.sender("test");
sender.localOpenHandler(result -> senderLocalOpen.set(true))
.localCloseHandler(result -> senderLocalClose.set(true))
.localDetachHandler(result -> senderLocalDetach.set(true))
.openHandler(result -> senderRemoteOpen.set(true))
.detachHandler(result -> senderRemoteDetach.set(true))
.closeHandler(result -> senderRemoteClose.set(true));
sender.open();
assertNull(sender.getDeliveryTagGenerator());
if (detach) {
sender.detach();
} else {
sender.close();
}
assertTrue(senderLocalOpen.get(), "Sender should have reported local open");
assertTrue(senderRemoteOpen.get(), "Sender should have reported remote open");
if (detach) {
assertFalse(senderLocalClose.get(), "Sender should not have reported local close");
assertTrue(senderLocalDetach.get(), "Sender should have reported local detach");
assertFalse(senderRemoteClose.get(), "Sender should not have reported remote close");
assertTrue(senderRemoteDetach.get(), "Sender should have reported remote close");
} else {
assertTrue(senderLocalClose.get(), "Sender should have reported local close");
assertFalse(senderLocalDetach.get(), "Sender should not have reported local detach");
assertTrue(senderRemoteClose.get(), "Sender should have reported remote close");
assertFalse(senderRemoteDetach.get(), "Sender should not have reported remote close");
}
session.close();
peer.waitForScriptToComplete();
assertNull(failure);
}
@Test
public void testSenderRoutesDetachEventToCloseHandlerIfNonSset() throws Exception {
Engine engine = EngineFactory.PROTON.createNonSaslEngine();
engine.errorHandler(result -> failure = result.failureCause());
ProtonTestConnector peer = createTestPeer(engine);
final AtomicBoolean senderLocalOpen = new AtomicBoolean();
final AtomicBoolean senderLocalClose = new AtomicBoolean();
final AtomicBoolean senderRemoteOpen = new AtomicBoolean();
final AtomicBoolean senderRemoteClose = new AtomicBoolean();
peer.expectAMQPHeader().respondWithAMQPHeader();
peer.expectOpen().respond().withContainerId("driver");
peer.expectBegin().respond();
peer.expectAttach().respond();
peer.expectDetach().respond();
peer.expectEnd().respond();
Connection connection = engine.start();
connection.open();
Session session = connection.session();
session.open();
Sender sender = session.sender("test");
sender.localOpenHandler(result -> senderLocalOpen.set(true))
.localCloseHandler(result -> senderLocalClose.set(true))
.openHandler(result -> senderRemoteOpen.set(true))
.closeHandler(result -> senderRemoteClose.set(true));
sender.open();
sender.detach();
assertTrue(senderLocalOpen.get(), "Sender should have reported local open");
assertTrue(senderRemoteOpen.get(), "Sender should have reported remote open");
assertTrue(senderLocalClose.get(), "Sender should have reported local detach");
assertTrue(senderRemoteClose.get(), "Sender should have reported remote detach");
session.close();
peer.waitForScriptToComplete();
assertNull(failure);
}
@Test
public void testSenderEnforcesOneActiveDeliveryAtNextAPI() throws Exception {
Engine engine = EngineFactory.PROTON.createNonSaslEngine();
engine.errorHandler(result -> failure = result.failureCause());
ProtonTestConnector peer = createTestPeer(engine);
peer.expectAMQPHeader().respondWithAMQPHeader();
peer.expectOpen().respond().withContainerId("driver");
peer.expectBegin().respond();
peer.expectAttach().respond();
Connection connection = engine.start().open();
Session session = connection.session().open();
Sender sender = session.sender("test").open();
assertNotNull(sender.next());
assertThrows(IllegalStateException.class, () -> sender.next());
peer.waitForScriptToComplete();
assertNull(failure);
}
@Test
public void testSenderReceivesParentSessionClosedEvent() throws Exception {
Engine engine = EngineFactory.PROTON.createNonSaslEngine();
engine.errorHandler(result -> failure = result.failureCause());
ProtonTestConnector peer = createTestPeer(engine);
final AtomicBoolean parentClosed = new AtomicBoolean();
peer.expectAMQPHeader().respondWithAMQPHeader();
peer.expectOpen().respond().withContainerId("driver");
peer.expectBegin().respond();
peer.expectAttach().respond();
peer.expectEnd().respond();
Connection connection = engine.start();
connection.open();
Session session = connection.session();
session.open();
Sender sender = session.sender("test");
sender.parentEndpointClosedHandler(result -> parentClosed.set(true));
sender.open();
session.close();
assertTrue(parentClosed.get(), "Sender should have reported parent session closed");
peer.waitForScriptToComplete();
assertNull(failure);
}
@Test
public void testSenderReceivesParentConnectionClosedEvent() throws Exception {
Engine engine = EngineFactory.PROTON.createNonSaslEngine();
engine.errorHandler(result -> failure = result.failureCause());
ProtonTestConnector peer = createTestPeer(engine);
final AtomicBoolean parentClosed = new AtomicBoolean();
peer.expectAMQPHeader().respondWithAMQPHeader();
peer.expectOpen().respond().withContainerId("driver");
peer.expectBegin().respond();
peer.expectAttach().respond();
peer.expectClose().respond();
Connection connection = engine.start();
connection.open();
Session session = connection.session();
session.open();
Sender sender = session.sender("test");
sender.parentEndpointClosedHandler(result -> parentClosed.set(true));
sender.open();
connection.close();
assertTrue(parentClosed.get(), "Sender should have reported parent connection closed");
peer.waitForScriptToComplete();
assertNull(failure);
}
@Test
public void testEngineShutdownEventNeitherEndClosed() throws Exception {
doTestEngineShutdownEvent(false, false);
}
@Test
public void testEngineShutdownEventLocallyClosed() throws Exception {
doTestEngineShutdownEvent(true, false);
}
@Test
public void testEngineShutdownEventRemotelyClosed() throws Exception {
doTestEngineShutdownEvent(false, true);
}
@Test
public void testEngineShutdownEventBothEndsClosed() throws Exception {
doTestEngineShutdownEvent(true, true);
}
private void doTestEngineShutdownEvent(boolean locallyClosed, boolean remotelyClosed) throws Exception {
Engine engine = EngineFactory.PROTON.createNonSaslEngine();
engine.errorHandler(result -> failure = result.failureCause());
ProtonTestConnector peer = createTestPeer(engine);
final AtomicBoolean engineShutdown = new AtomicBoolean();
peer.expectAMQPHeader().respondWithAMQPHeader();
peer.expectOpen().respond();
peer.expectBegin().respond();
peer.expectAttach().respond();
Connection connection = engine.start();
connection.open();
Session session = connection.session();
session.open();
Sender sender = session.sender("test");
sender.open();
sender.engineShutdownHandler(result -> engineShutdown.set(true));
if (locallyClosed) {
if (remotelyClosed) {
peer.expectDetach().respond();
} else {
peer.expectDetach();
}
sender.close();
}
if (remotelyClosed && !locallyClosed) {
peer.remoteDetach();
}
engine.shutdown();
if (locallyClosed && remotelyClosed) {
assertFalse(engineShutdown.get(), "Should not have reported engine shutdown");
} else {
assertTrue(engineShutdown.get(), "Should have reported engine shutdown");
}
peer.waitForScriptToComplete();
assertNull(failure);
}
@Test
public void testSenderOpenWithNoSenderOrReceiverSettleModes() throws Exception {
doTestOpenSenderWithConfiguredSenderAndReceiverSettlementModes(null, null);
}
@Test
public void testSenderOpenWithSettledAndFirst() throws Exception {
doTestOpenSenderWithConfiguredSenderAndReceiverSettlementModes(SenderSettleMode.SETTLED, ReceiverSettleMode.FIRST);
}
@Test
public void testSenderOpenWithUnsettledAndSecond() throws Exception {
doTestOpenSenderWithConfiguredSenderAndReceiverSettlementModes(SenderSettleMode.UNSETTLED, ReceiverSettleMode.SECOND);
}
private void doTestOpenSenderWithConfiguredSenderAndReceiverSettlementModes(SenderSettleMode senderMode, ReceiverSettleMode receiverMode) {
Engine engine = EngineFactory.PROTON.createNonSaslEngine();
engine.errorHandler(result -> failure = result.failureCause());
ProtonTestConnector peer = createTestPeer(engine);
peer.expectAMQPHeader().respondWithAMQPHeader();
peer.expectOpen().respond().withContainerId("driver");
peer.expectBegin().respond();
peer.expectAttach().withSndSettleMode(senderMode == null ? null : senderMode.byteValue())
.withRcvSettleMode(receiverMode == null ? null : receiverMode.byteValue())
.respond()
.withSndSettleMode(senderMode == null ? null : senderMode.byteValue())
.withRcvSettleMode(receiverMode == null ? null : receiverMode.byteValue());
Connection connection = engine.start();
connection.open();
Session session = connection.session();
session.open();
Sender sender = session.sender("sender");
sender.setSenderSettleMode(senderMode);
sender.setReceiverSettleMode(receiverMode);
sender.open();
peer.waitForScriptToComplete();
peer.expectDetach().respond();
if (senderMode != null) {
assertEquals(senderMode, sender.getSenderSettleMode());
} else {
assertEquals(SenderSettleMode.MIXED, sender.getSenderSettleMode());
}
if (receiverMode != null) {
assertEquals(receiverMode, sender.getReceiverSettleMode());
} else {
assertEquals(ReceiverSettleMode.FIRST, sender.getReceiverSettleMode());
}
sender.close();
peer.waitForScriptToComplete();
assertNull(failure);
}
@Test
public void testSenderOpenAndCloseAreIdempotent() throws Exception {
Engine engine = EngineFactory.PROTON.createNonSaslEngine();
engine.errorHandler(result -> failure = result.failureCause());
ProtonTestConnector peer = createTestPeer(engine);
peer.expectAMQPHeader().respondWithAMQPHeader();
peer.expectOpen().respond().withContainerId("driver");
peer.expectBegin().respond();
peer.expectAttach().withRole(Role.SENDER.getValue()).respond();
peer.expectDetach().respond();
Connection connection = engine.start();
// Default engine should start and return a connection immediately
assertNotNull(connection);
connection.open();
Session session = connection.session();
session.open();
Sender sender = session.sender("test");
sender.open();
// Should not emit another attach frame
sender.open();
sender.close();
// Should not emit another detach frame
sender.close();
peer.waitForScriptToComplete();
assertNull(failure);
}
@Test
public void testCreateSenderAndClose() throws Exception {
doTestCreateSenderAndCloseOrDetachLink(true);
}
@Test
public void testCreateSenderAndDetach() throws Exception {
doTestCreateSenderAndCloseOrDetachLink(false);
}
private void doTestCreateSenderAndCloseOrDetachLink(boolean close) throws Exception {
Engine engine = EngineFactory.PROTON.createNonSaslEngine();
engine.errorHandler(result -> failure = result.failureCause());
ProtonTestConnector peer = createTestPeer(engine);
peer.expectAMQPHeader().respondWithAMQPHeader();
peer.expectOpen().respond();
peer.expectBegin().respond();
peer.expectAttach().withRole(Role.SENDER.getValue()).respond();
peer.expectDetach().withClosed(close).respond();
peer.expectClose().respond();
Connection connection = engine.start();
connection.open();
Session session = connection.session();
session.open();
Sender sender = session.sender("test");
sender.open();
assertTrue(sender.isSender());
assertFalse(sender.isReceiver());
if (close) {
sender.close();
} else {
sender.detach();
}
connection.close();
peer.waitForScriptToComplete();
assertNull(failure);
}
@Test
public void testEngineEmitsAttachAfterLocalSenderOpened() throws Exception {
Engine engine = EngineFactory.PROTON.createNonSaslEngine();
engine.errorHandler(result -> failure = result.failureCause());
ProtonTestConnector peer = createTestPeer(engine);
peer.expectAMQPHeader().respondWithAMQPHeader();
peer.expectOpen().respond().withContainerId("driver");
peer.expectBegin().respond();
peer.expectAttach().respond();
peer.expectDetach().respond();
Connection connection = engine.start();
// Default engine should start and return a connection immediately
assertNotNull(connection);
connection.open();
Session session = connection.session();
session.open();
Sender sender = session.sender("test");
sender.open();
sender.close();
peer.waitForScriptToComplete();
assertNull(failure);
}
@Test
public void testOpenBeginAttachBeforeRemoteResponds() throws Exception {
Engine engine = EngineFactory.PROTON.createNonSaslEngine();
engine.errorHandler(result -> failure = result.failureCause());
ProtonTestConnector peer = createTestPeer(engine);
peer.expectAMQPHeader().respondWithAMQPHeader();
peer.expectOpen();
peer.expectBegin();
peer.expectAttach();
Connection connection = engine.start();
// Default engine should start and return a connection immediately
assertNotNull(connection);
connection.open();
Session session = connection.session();
session.open();
Sender sender = session.sender("test");
sender.open();
peer.waitForScriptToComplete();
assertNull(failure);
}
@Test
public void testSenderFireOpenedEventAfterRemoteAttachArrives() throws Exception {
Engine engine = EngineFactory.PROTON.createNonSaslEngine();
engine.errorHandler(result -> failure = result.failureCause());
ProtonTestConnector peer = createTestPeer(engine);
peer.expectAMQPHeader().respondWithAMQPHeader();
peer.expectOpen().respond().withContainerId("driver");
peer.expectBegin().respond();
peer.expectAttach().respond();
peer.expectDetach().respond();
final AtomicBoolean senderRemotelyOpened = new AtomicBoolean();
Connection connection = engine.start();
// Default engine should start and return a connection immediately
assertNotNull(connection);
connection.open();
Session session = connection.session();
session.open();
Sender sender = session.sender("test");
sender.openHandler(result -> {
senderRemotelyOpened.set(true);
});
sender.open();
assertTrue(senderRemotelyOpened.get(), "Sender remote opened event did not fire");
sender.close();
peer.waitForScriptToComplete();
assertNull(failure);
}
@Test
public void testSenderFireOpenedEventAfterRemoteAttachArrivesWithNullTarget() throws Exception {
Engine engine = EngineFactory.PROTON.createNonSaslEngine();
engine.errorHandler(result -> failure = result.failureCause());
ProtonTestConnector peer = createTestPeer(engine);
peer.expectAMQPHeader().respondWithAMQPHeader();
peer.expectOpen().respond().withContainerId("driver");
peer.expectBegin().respond();
peer.expectAttach().respond().withNullTarget();
peer.expectDetach().respond();
final AtomicBoolean senderRemotelyOpened = new AtomicBoolean();
Connection connection = engine.start();
// Default engine should start and return a connection immediately
assertNotNull(connection);
connection.open();
Session session = connection.session();
session.open();
Sender sender = session.sender("test");
sender.setSource(new Source());
sender.setTarget(new Target());
sender.openHandler(result -> {
senderRemotelyOpened.set(true);
});
sender.open();
assertTrue(senderRemotelyOpened.get(), "Sender remote opened event did not fire");
assertNull(sender.getRemoteTarget());
sender.close();
peer.waitForScriptToComplete();
assertNull(failure);
}
@Test
public void testOpenAndCloseMultipleSenders() throws Exception {
Engine engine = EngineFactory.PROTON.createNonSaslEngine();
engine.errorHandler(result -> failure = result.failureCause());
ProtonTestConnector peer = createTestPeer(engine);
peer.expectAMQPHeader().respondWithAMQPHeader();
peer.expectOpen().respond().withContainerId("driver");
peer.expectBegin().respond();
peer.expectAttach().withHandle(0).respond();
peer.expectAttach().withHandle(1).respond();
peer.expectDetach().withHandle(1).respond();
peer.expectDetach().withHandle(0).respond();
Connection connection = engine.start();
connection.open();
Session session = connection.session();
session.open();
Sender sender1 = session.sender("sender-1");
sender1.open();
Sender sender2 = session.sender("sender-2");
sender2.open();
// Close in reverse order
sender2.close();
sender1.close();
peer.waitForScriptToComplete();
assertNull(failure);
}
@Test
public void testSenderFireClosedEventAfterRemoteDetachArrives() throws Exception {
Engine engine = EngineFactory.PROTON.createNonSaslEngine();
engine.errorHandler(result -> failure = result.failureCause());
ProtonTestConnector peer = createTestPeer(engine);
peer.expectAMQPHeader().respondWithAMQPHeader();
peer.expectOpen().respond().withContainerId("driver");
peer.expectBegin().respond();
peer.expectAttach().respond();
peer.expectDetach().respond();
final AtomicBoolean senderRemotelyOpened = new AtomicBoolean();
final AtomicBoolean senderRemotelyClosed = new AtomicBoolean();
Connection connection = engine.start();
// Default engine should start and return a connection immediately
assertNotNull(connection);
connection.open();
Session session = connection.session();
session.open();
Sender sender = session.sender("test");
sender.openHandler(result -> {
senderRemotelyOpened.set(true);
});
sender.closeHandler(result -> {
senderRemotelyClosed.set(true);
});
sender.open();
assertTrue(senderRemotelyOpened.get(), "Sender remote opened event did not fire");
sender.close();
assertTrue(senderRemotelyClosed.get(), "Sender remote closed event did not fire");
peer.waitForScriptToComplete();
assertNull(failure);
}
@Test
public void testSenderFireClosedEventAfterRemoteDetachArrivesBeforeLocalClose() throws Exception {
doTestSenderFireEventAfterRemoteDetachArrivesBeforeLocalClose(true);
}
@Test
public void testSenderFireDetachEventAfterRemoteDetachArrivesBeforeLocalClose() throws Exception {
doTestSenderFireEventAfterRemoteDetachArrivesBeforeLocalClose(false);
}
private void doTestSenderFireEventAfterRemoteDetachArrivesBeforeLocalClose(boolean close) throws Exception {
Engine engine = EngineFactory.PROTON.createNonSaslEngine();
engine.errorHandler(result -> failure = result.failureCause());
ProtonTestConnector peer = createTestPeer(engine);
peer.expectAMQPHeader().respondWithAMQPHeader();
peer.expectOpen().respond().withContainerId("driver");
peer.expectBegin().respond();
peer.expectAttach().respond();
peer.remoteDetach().withClosed(close).queue();
final AtomicBoolean senderRemotelyOpened = new AtomicBoolean();
final AtomicBoolean senderRemotelyClosed = new AtomicBoolean();
final AtomicBoolean senderRemotelyDetached = new AtomicBoolean();
Connection connection = engine.start();
// Default engine should start and return a connection immediately
assertNotNull(connection);
connection.open();
Session session = connection.session();
session.open();
Sender sender = session.sender("test");
sender.openHandler(result -> senderRemotelyOpened.set(true));
sender.closeHandler(result -> senderRemotelyClosed.set(true));
sender.detachHandler(result -> senderRemotelyDetached.set(true));
sender.open();
peer.waitForScriptToComplete();
assertTrue(senderRemotelyOpened.get(), "Sender remote opened event did not fire");
if (close) {
assertTrue(senderRemotelyClosed.get(), "Sender remote closed event did not fire");
assertFalse(senderRemotelyDetached.get(), "Sender remote detached event fired");
} else {
assertFalse(senderRemotelyClosed.get(), "Sender remote closed event fired");
assertTrue(senderRemotelyDetached.get(), "Sender remote closed event did not fire");
}
peer.expectDetach().withClosed(close);
if (close) {
sender.close();
} else {
sender.detach();
}
peer.waitForScriptToComplete();
assertNull(failure);
}
@Test
public void testRemotelyCloseSenderAndOpenNewSenderImmediatelyAfterWithNewLinkName() throws Exception {
doTestRemotelyTerminateLinkAndThenCreateNewLink(true, false);
}
@Test
public void testRemotelyDetachSenderAndOpenNewSenderImmediatelyAfterWithNewLinkName() throws Exception {
doTestRemotelyTerminateLinkAndThenCreateNewLink(false, false);
}
@Test
public void testRemotelyCloseSenderAndOpenNewSenderImmediatelyAfterWithSameLinkName() throws Exception {
doTestRemotelyTerminateLinkAndThenCreateNewLink(true, true);
}
@Test
public void testRemotelyDetachSenderAndOpenNewSenderImmediatelyAfterWithSameLinkName() throws Exception {
doTestRemotelyTerminateLinkAndThenCreateNewLink(false, true);
}
private void doTestRemotelyTerminateLinkAndThenCreateNewLink(boolean close, boolean sameLinkName) throws Exception {
Engine engine = EngineFactory.PROTON.createNonSaslEngine();
engine.errorHandler(result -> failure = result.failureCause());
ProtonTestConnector peer = createTestPeer(engine);
String firstLinkName = "test-link-1";
String secondLinkName = sameLinkName ? firstLinkName : "test-link-2";
peer.expectAMQPHeader().respondWithAMQPHeader();
peer.expectOpen().respond().withContainerId("driver");
peer.expectBegin().respond();
peer.expectAttach().withHandle(0).withRole(Role.SENDER.getValue()).respond();
peer.remoteDetach().withClosed(close).queue();
final AtomicBoolean senderRemotelyOpened = new AtomicBoolean();
final AtomicBoolean senderRemotelyClosed = new AtomicBoolean();
final AtomicBoolean senderRemotelyDetached = new AtomicBoolean();
Connection connection = engine.start();
// Default engine should start and return a connection immediately
assertNotNull(connection);
connection.open();
Session session = connection.session();
session.open();
Sender sender = session.sender(firstLinkName);
sender.openHandler(result -> senderRemotelyOpened.set(true));
sender.closeHandler(result -> senderRemotelyClosed.set(true));
sender.detachHandler(result -> senderRemotelyDetached.set(true));
sender.open();
peer.waitForScriptToComplete();
assertTrue(senderRemotelyOpened.get(), "Sender remote opened event did not fire");
if (close) {
assertTrue(senderRemotelyClosed.get(), "Sender remote closed event did not fire");
assertFalse(senderRemotelyDetached.get(), "Sender remote detached event fired");
} else {
assertFalse(senderRemotelyClosed.get(), "Sender remote closed event fired");
assertTrue(senderRemotelyDetached.get(), "Sender remote closed event did not fire");
}
peer.expectDetach().withClosed(close);
if (close) {
sender.close();
} else {
sender.detach();
}
peer.waitForScriptToComplete();
peer.expectAttach().withHandle(0).withRole(Role.SENDER.getValue()).respond();
peer.expectDetach().withClosed(close).respond();
// Reset trackers
senderRemotelyOpened.set(false);
senderRemotelyClosed.set(false);
senderRemotelyDetached.set(false);
sender = session.sender(secondLinkName);
sender.openHandler(result -> senderRemotelyOpened.set(true));
sender.closeHandler(result -> senderRemotelyClosed.set(true));
sender.detachHandler(result -> senderRemotelyDetached.set(true));
sender.open();
if (close) {
sender.close();
} else {
sender.detach();
}
peer.waitForScriptToComplete();
assertTrue(senderRemotelyOpened.get(), "Sender remote opened event did not fire");
if (close) {
assertTrue(senderRemotelyClosed.get(), "Sender remote closed event did not fire");
assertFalse(senderRemotelyDetached.get(), "Sender remote detached event fired");
} else {
assertFalse(senderRemotelyClosed.get(), "Sender remote closed event fired");
assertTrue(senderRemotelyDetached.get(), "Sender remote closed event did not fire");
}
assertNull(failure);
}
@Test
public void testConnectionSignalsRemoteSenderOpen() throws Exception {
Engine engine = EngineFactory.PROTON.createNonSaslEngine();
engine.errorHandler(result -> failure = result.failureCause());
ProtonTestConnector peer = createTestPeer(engine);
peer.expectAMQPHeader().respondWithAMQPHeader();
peer.expectOpen().respond().withContainerId("driver");
peer.expectBegin().respond();
peer.remoteAttach().withName("receiver")
.withHandle(0)
.withRole(Role.RECEIVER.getValue())
.withInitialDeliveryCount(0)
.onChannel(0).queue();
peer.expectAttach();
peer.expectDetach().respond();
final AtomicBoolean senderRemotelyOpened = new AtomicBoolean();
final AtomicReference<Sender> sender = new AtomicReference<>();
Connection connection = engine.start();
connection.senderOpenHandler(result -> {
senderRemotelyOpened.set(true);
sender.set(result);
});
// Default engine should start and return a connection immediately
assertNotNull(connection);
connection.open();
Session session = connection.session();
session.open();
assertTrue(senderRemotelyOpened.get(), "Sender remote opened event did not fire");
sender.get().open();
sender.get().close();
peer.waitForScriptToComplete();
assertNull(failure);
}
@Test
public void testCannotOpenSenderAfterSessionClosed() throws Exception {
Engine engine = EngineFactory.PROTON.createNonSaslEngine();
engine.errorHandler(result -> failure = result.failureCause());
ProtonTestConnector peer = createTestPeer(engine);
peer.expectAMQPHeader().respondWithAMQPHeader();
peer.expectOpen().respond().withContainerId("driver");
peer.expectBegin().respond();
peer.expectEnd().respond();
Connection connection = engine.start();
// Default engine should start and return a connection immediately
assertNotNull(connection);
connection.open();
Session session = connection.session();
session.open();
Sender sender = session.sender("test");
session.close();
try {
sender.open();
fail("Should not be able to open a link from a closed session.");
} catch (IllegalStateException ise) {}
sender.close();
peer.waitForScriptToComplete();
assertNull(failure);
}
@Test
public void testCannotOpenSenderAfterSessionRemotelyClosed() throws Exception {
Engine engine = EngineFactory.PROTON.createNonSaslEngine();
engine.errorHandler(result -> failure = result.failureCause());
ProtonTestConnector peer = createTestPeer(engine);
peer.expectAMQPHeader().respondWithAMQPHeader();
peer.expectOpen().respond().withContainerId("driver");
peer.expectBegin().respond();
peer.remoteEnd().queue();
Connection connection = engine.start();
// Default engine should start and return a connection immediately
assertNotNull(connection);
connection.open();
Session session = connection.session();
Sender sender = session.sender("test");
session.open();
try {
sender.open();
fail("Should not be able to open a link from a remotely closed session.");
} catch (IllegalStateException ise) {}
sender.close();
peer.waitForScriptToComplete();
assertNull(failure);
}
@Test
public void testGetCurrentDeliveryFromSender() throws Exception {
Engine engine = EngineFactory.PROTON.createNonSaslEngine();
engine.errorHandler(result -> failure = result.failureCause());
ProtonTestConnector peer = createTestPeer(engine);
peer.expectAMQPHeader().respondWithAMQPHeader();
peer.expectOpen().respond().withContainerId("driver");
peer.expectBegin().respond();
peer.expectAttach().withHandle(0).respond();
peer.expectDetach().withHandle(0).respond();
Connection connection = engine.start();
connection.open();
Session session = connection.session();
session.open();
Sender sender = session.sender("sender-1");
sender.open();
OutgoingDelivery delivery = sender.next();
assertNotNull(delivery);
assertFalse(delivery.isAborted());
assertTrue(delivery.isPartial());
assertFalse(delivery.isSettled());
assertFalse(delivery.isRemotelySettled());
// Always return same delivery until completed.
assertSame(delivery, sender.current());
sender.close();
peer.waitForScriptToComplete();
assertNull(failure);
}
@Test
public void testSenderGetsCreditOnIncomingFlow() throws Exception {
Engine engine = EngineFactory.PROTON.createNonSaslEngine();
engine.errorHandler(result -> failure = result.failureCause());
ProtonTestConnector peer = createTestPeer(engine);
peer.expectAMQPHeader().respondWithAMQPHeader();
peer.expectOpen().respond().withContainerId("driver");
peer.expectBegin().respond();
peer.expectAttach().withRole(Role.SENDER.getValue()).respond();
peer.remoteFlow().withDeliveryCount(0)
.withLinkCredit(10)
.withIncomingWindow(1024)
.withOutgoingWindow(10)
.withNextIncomingId(0)
.withNextOutgoingId(1).queue();
peer.expectDetach().withHandle(0).respond();
Connection connection = engine.start();
connection.open();
Session session = connection.session();
session.open();
Sender sender = session.sender("sender-1");
assertFalse(sender.isSendable());
sender.open();
assertEquals(10, sender.getCredit());
assertTrue(sender.isSendable());
sender.close();
peer.waitForScriptToComplete();
assertNull(failure);
}
@Test
public void testSendSmallPayloadWhenCreditAvailable() throws Exception {
Engine engine = EngineFactory.PROTON.createNonSaslEngine();
engine.errorHandler(result -> failure = result.failureCause());
ProtonTestConnector peer = createTestPeer(engine);
final byte [] payloadBuffer = new byte[] {0, 1, 2, 3, 4};
peer.expectAMQPHeader().respondWithAMQPHeader();
peer.expectOpen().respond().withContainerId("driver");
peer.expectBegin().respond();
peer.expectAttach().withRole(Role.SENDER.getValue()).respond();
peer.remoteFlow().withDeliveryCount(0)
.withLinkCredit(10)
.withIncomingWindow(1024)
.withOutgoingWindow(10)
.withNextIncomingId(0)
.withNextOutgoingId(1).queue();
peer.expectTransfer().withHandle(0)
.withSettled(false)
.withState(nullValue())
.withDeliveryId(0)
.withDeliveryTag(new byte[] {0})
.withPayload(payloadBuffer);
peer.expectDetach().withHandle(0).respond();
Connection connection = engine.start();
connection.open();
Session session = connection.session();
session.open();
ProtonBuffer payload = ProtonByteBufferAllocator.DEFAULT.wrap(payloadBuffer);
Sender sender = session.sender("sender-1");
assertFalse(sender.isSendable());
sender.creditStateUpdateHandler(handler -> {
if (handler.isSendable()) {
handler.next().setTag(new byte[] {0}).writeBytes(payload);
}
});
sender.open();
sender.close();
peer.waitForScriptToComplete();
assertNull(failure);
}
@Test
public void testSendTransferWithNonDefaultMessageFormat() throws Exception {
Engine engine = EngineFactory.PROTON.createNonSaslEngine();
engine.errorHandler(result -> failure = result.failureCause());
ProtonTestConnector peer = createTestPeer(engine);
final byte [] payloadBuffer = new byte[] {0, 1, 2, 3, 4};
peer.expectAMQPHeader().respondWithAMQPHeader();
peer.expectOpen().respond().withContainerId("driver");
peer.expectBegin().respond();
peer.expectAttach().withRole(Role.SENDER.getValue()).respond();
peer.remoteFlow().withDeliveryCount(0)
.withLinkCredit(10)
.withIncomingWindow(1024)
.withOutgoingWindow(10)
.withNextIncomingId(0)
.withNextOutgoingId(1).queue();
peer.expectTransfer().withMessageFormat(17).withPayload(payloadBuffer);
peer.expectDetach().withHandle(0).respond();
Connection connection = engine.start();
connection.open();
Session session = connection.session();
session.open();
ProtonBuffer payload = ProtonByteBufferAllocator.DEFAULT.wrap(payloadBuffer);
Sender sender = session.sender("sender-1");
assertFalse(sender.isSendable());
sender.creditStateUpdateHandler(handler -> {
if (handler.isSendable()) {
handler.next().setTag(new byte[] {0}).setMessageFormat(17).writeBytes(payload);
}
});
sender.open();
sender.close();
peer.waitForScriptToComplete();
assertNull(failure);
}
@Test
public void testSenderSignalsDeliveryUpdatedOnSettledThenSettleFromLinkAPI() throws Exception {
doTestSenderSignalsDeliveryUpdatedOnSettled(true);
}
@Test
public void testSenderSignalsDeliveryUpdatedOnSettledThenSettleDelivery() throws Exception {
doTestSenderSignalsDeliveryUpdatedOnSettled(false);
}
private void doTestSenderSignalsDeliveryUpdatedOnSettled(boolean settleFromLink) {
Engine engine = EngineFactory.PROTON.createNonSaslEngine();
engine.errorHandler(result -> failure = result.failureCause());
ProtonTestConnector peer = createTestPeer(engine);
byte[] payload = new byte[] {0, 1, 2, 3, 4};
peer.expectAMQPHeader().respondWithAMQPHeader();
peer.expectOpen().respond().withContainerId("driver");
peer.expectBegin().respond();
peer.expectAttach().withRole(Role.SENDER.getValue()).respond();
peer.remoteFlow().withDeliveryCount(0)
.withLinkCredit(10)
.withIncomingWindow(1024)
.withOutgoingWindow(10)
.withNextIncomingId(0)
.withNextOutgoingId(1).queue();
peer.expectTransfer().withHandle(0)
.withSettled(false)
.withState(nullValue())
.withDeliveryId(0)
.withDeliveryTag(new byte[] {0})
.withPayload(payload);
peer.remoteDisposition().withSettled(true)
.withRole(Role.RECEIVER.getValue())
.withState().accepted()
.withFirst(0).queue();
peer.expectDetach().withHandle(0).respond();
Connection connection = engine.start();
connection.open();
Session session = connection.session();
session.open();
Sender sender = session.sender("sender-1");
final AtomicBoolean deliveryUpdatedAndSettled = new AtomicBoolean();
final AtomicReference<OutgoingDelivery> updatedDelivery = new AtomicReference<>();
sender.deliveryStateUpdatedHandler(delivery -> {
if (delivery.isRemotelySettled()) {
deliveryUpdatedAndSettled.set(true);
}
updatedDelivery.set(delivery);
});
assertFalse(sender.isSendable());
sender.creditStateUpdateHandler(handler -> {
if (handler.isSendable()) {
handler.next().setTag(new byte[] {0}).writeBytes(ProtonByteBufferAllocator.DEFAULT.wrap(payload));
}
});
sender.open();
assertTrue(deliveryUpdatedAndSettled.get(), "Delivery should have been updated and state settled");
assertEquals(Accepted.getInstance(), updatedDelivery.get().getRemoteState());
assertTrue(sender.hasUnsettled());
assertFalse(sender.unsettled().isEmpty());
if (settleFromLink) {
sender.settle(delivery -> true);
} else {
updatedDelivery.get().settle();
}
assertFalse(sender.hasUnsettled());
assertTrue(sender.unsettled().isEmpty());
sender.close();
peer.waitForScriptToComplete();
assertNull(failure);
}
@Test
public void testOpenSenderBeforeOpenConnection() {
Engine engine = EngineFactory.PROTON.createNonSaslEngine();
engine.errorHandler(result -> failure = result.failureCause());
ProtonTestConnector peer = createTestPeer(engine);
// Create the connection but don't open, then open a session and a sender and
// the session begin and sender attach shouldn't go out until the connection
// is opened locally.
Connection connection = engine.start();
Session session = connection.session();
session.open();
Sender sender = session.sender("sender");
sender.open();
peer.expectAMQPHeader().respondWithAMQPHeader();
peer.expectOpen().respond();
peer.expectBegin().respond();
peer.expectAttach().withHandle(0).withName("sender").withRole(Role.SENDER.getValue()).respond();
// Now open the connection, expect the Open, Begin, and Attach frames
connection.open();
peer.waitForScriptToComplete();
assertNull(failure);
}
@Test
public void testOpenSenderBeforeOpenSession() {
Engine engine = EngineFactory.PROTON.createNonSaslEngine();
engine.errorHandler(result -> failure = result.failureCause());
ProtonTestConnector peer = createTestPeer(engine);
peer.expectAMQPHeader().respondWithAMQPHeader();
peer.expectOpen().respond();
// Create the connection and open it, then create a session and a sender
// and observe that the sender doesn't send its attach until the session
// is opened.
Connection connection = engine.start();
connection.open();
Session session = connection.session();
Sender sender = session.sender("sender");
sender.open();
peer.expectBegin().respond();
peer.expectAttach().withHandle(0).withName("sender").withRole(Role.SENDER.getValue()).respond();
// Now open the session, expect the Begin, and Attach frames
session.open();
peer.waitForScriptToComplete();
assertNull(failure);
}
@Test
public void testSenderDetachAfterEndSent() {
doTestSenderClosedOrDetachedAfterEndSent(false);
}
@Test
public void testSenderCloseAfterEndSent() {
doTestSenderClosedOrDetachedAfterEndSent(true);
}
public void doTestSenderClosedOrDetachedAfterEndSent(boolean close) {
Engine engine = EngineFactory.PROTON.createNonSaslEngine();
engine.errorHandler(result -> failure = result.failureCause());
ProtonTestConnector peer = createTestPeer(engine);
peer.expectAMQPHeader().respondWithAMQPHeader();
peer.expectOpen().respond();
peer.expectBegin().respond();
peer.expectAttach().withHandle(0).withName("sender").withRole(Role.SENDER.getValue()).respond();
peer.expectEnd().respond();
// Create the connection and open it, then create a session and a sender
// and observe that the sender doesn't send its detach if the session has
// already been closed.
Connection connection = engine.start();
connection.open();
Session session = connection.session();
session.open();
Sender sender = session.sender("sender");
sender.open();
// Causes the End frame to be sent
session.close();
// The sender should not emit an end as the session was closed which implicitly
// detached the link.
if (close) {
sender.close();
} else {
sender.detach();
}
peer.waitForScriptToComplete();
assertNull(failure);
}
@Test
public void testSenderDetachAfterCloseSent() {
doTestSenderClosedOrDetachedAfterCloseSent(false);
}
@Test
public void testSenderCloseAfterCloseSent() {
doTestSenderClosedOrDetachedAfterCloseSent(true);
}
public void doTestSenderClosedOrDetachedAfterCloseSent(boolean close) {
Engine engine = EngineFactory.PROTON.createNonSaslEngine();
engine.errorHandler(result -> failure = result.failureCause());
ProtonTestConnector peer = createTestPeer(engine);
peer.expectAMQPHeader().respondWithAMQPHeader();
peer.expectOpen().respond();
peer.expectBegin().respond();
peer.expectAttach().withHandle(0).withName("sender").withRole(Role.SENDER.getValue()).respond();
peer.expectClose().respond();
// Create the connection and open it, then create a session and a sender
// and observe that the sender doesn't send its detach if the connection has
// already been closed.
Connection connection = engine.start();
connection.open();
Session session = connection.session();
session.open();
Sender sender = session.sender("sender");
sender.open();
// Cause an Close frame to be sent
connection.close();
// The sender should not emit an detach as the connection was closed which implicitly
// detached the link.
if (close) {
sender.close();
} else {
sender.detach();
}
peer.waitForScriptToComplete();
assertNull(failure);
}
@Test
public void testNoDispositionSentAfterDeliverySettledForSender() throws Exception {
Engine engine = EngineFactory.PROTON.createNonSaslEngine();
engine.errorHandler(result -> failure = result.failureCause());
ProtonTestConnector peer = createTestPeer(engine);
peer.expectAMQPHeader().respondWithAMQPHeader();
peer.expectOpen().respond().withContainerId("driver");
peer.expectBegin().respond();
peer.expectAttach().withRole(Role.SENDER.getValue()).respond();
peer.remoteFlow().withDeliveryCount(0)
.withLinkCredit(10)
.withIncomingWindow(1024)
.withOutgoingWindow(10)
.withNextIncomingId(0)
.withNextOutgoingId(1).queue();
peer.expectTransfer().withHandle(0)
.withSettled(false)
.withState(nullValue())
.withDeliveryId(0)
.withDeliveryTag(new byte[] {0});
peer.expectDisposition().withFirst(0)
.withSettled(true)
.withState().accepted();
peer.expectDetach().withHandle(0).respond();
Connection connection = engine.start();
connection.open();
Session session = connection.session();
session.open();
ProtonBuffer payload = ProtonByteBufferAllocator.DEFAULT.wrap(new byte[] {0, 1, 2, 3, 4});
Sender sender = session.sender("sender-1");
final AtomicBoolean deliverySentAfterSendable = new AtomicBoolean();
final AtomicReference<OutgoingDelivery> sent = new AtomicReference<>();
sender.creditStateUpdateHandler(handler -> {
if (handler.isSendable()) {
sent.set(handler.next().setTag(new byte[] {0}).writeBytes(payload));
deliverySentAfterSendable.set(true);
}
});
sender.open();
assertTrue(deliverySentAfterSendable.get(), "Delivery should have been sent after credit arrived");
assertNull(sender.current());
sent.get().disposition(Accepted.getInstance(), true);
OutgoingDelivery delivery2 = sender.next();
assertNotSame(delivery2, sent.get());
delivery2.disposition(Released.getInstance(), true);
assertFalse(sender.hasUnsettled());
sender.close();
peer.waitForScriptToComplete();
assertNull(failure);
}
@Test
public void testSenderCannotSendAfterConnectionClosed() throws Exception {
Engine engine = EngineFactory.PROTON.createNonSaslEngine();
engine.errorHandler(result -> failure = result.failureCause());
ProtonTestConnector peer = createTestPeer(engine);
peer.expectAMQPHeader().respondWithAMQPHeader();
peer.expectOpen().respond().withContainerId("driver");
peer.expectBegin().respond();
peer.expectAttach().withRole(Role.SENDER.getValue()).respond();
peer.remoteFlow().withDeliveryCount(0)
.withLinkCredit(10)
.withIncomingWindow(1024)
.withOutgoingWindow(10)
.withNextIncomingId(0)
.withNextOutgoingId(1).queue();
peer.expectClose().respond();
Connection connection = engine.start();
connection.open();
Session session = connection.session();
session.open();
Sender sender = session.sender("sender-1");
assertFalse(sender.isSendable());
OutgoingDelivery delivery = sender.next();
assertNotNull(delivery);
sender.open();
assertEquals(10, sender.getCredit());
assertTrue(sender.isSendable());
connection.close();
assertFalse(sender.isSendable());
try {
delivery.writeBytes(ProtonByteBufferAllocator.DEFAULT.wrap(new byte[] { 1 }));
fail("Should not be able to write to delivery after connection closed.");
} catch (IllegalStateException ise) {
// Should not allow writes on past delivery instances after connection closed
}
try {
sender.next();
fail("Should not be able get next after connection closed");
} catch (IllegalStateException ise) {
// Should not allow next message after close of connection
}
peer.waitForScriptToComplete();
assertNull(failure);
}
@Test
public void testSenderCannotSendAfterSessionClosed() throws Exception {
Engine engine = EngineFactory.PROTON.createNonSaslEngine();
engine.errorHandler(result -> failure = result.failureCause());
ProtonTestConnector peer = createTestPeer(engine);
peer.expectAMQPHeader().respondWithAMQPHeader();
peer.expectOpen().respond().withContainerId("driver");
peer.expectBegin().respond();
peer.expectAttach().withRole(Role.SENDER.getValue()).respond();
peer.remoteFlow().withDeliveryCount(0)
.withLinkCredit(10)
.withIncomingWindow(1024)
.withOutgoingWindow(10)
.withNextIncomingId(0)
.withNextOutgoingId(1).queue();
peer.expectEnd().respond();
peer.expectClose().respond();
Connection connection = engine.start();
connection.open();
Session session = connection.session();
session.open();
Sender sender = session.sender("sender-1");
assertFalse(sender.isSendable());
OutgoingDelivery delivery = sender.next();
assertNotNull(delivery);
sender.open();
assertEquals(10, sender.getCredit());
assertTrue(sender.isSendable());
session.close();
assertFalse(sender.isSendable());
try {
delivery.writeBytes(ProtonByteBufferAllocator.DEFAULT.wrap(new byte[] { 1 }));
fail("Should not be able to write to delivery after session closed.");
} catch (IllegalStateException ise) {
// Should not allow writes on past delivery instances after session closed
}
connection.close();
peer.waitForScriptToComplete();
assertNull(failure);
}
@Test
public void testSenderWriteBytesThrowsEngineFailedAfterConnectionDropped() throws Exception {
Engine engine = EngineFactory.PROTON.createNonSaslEngine();
engine.errorHandler(result -> failure = result.failureCause());
ProtonTestConnector peer = createTestPeer(engine);
peer.expectAMQPHeader().respondWithAMQPHeader();
peer.expectOpen().respond().withContainerId("driver");
peer.expectBegin().respond();
peer.expectAttach().withRole(Role.SENDER.getValue()).respond();
peer.remoteFlow().withDeliveryCount(0)
.withLinkCredit(10)
.withIncomingWindow(1024)
.withOutgoingWindow(10)
.withNextIncomingId(0)
.withNextOutgoingId(1).queue();
peer.dropAfterLastHandler();
Connection connection = engine.start().open();
Session session = connection.session().open();
Sender sender = session.sender("sender-1").open();
OutgoingDelivery delivery = sender.next();
assertNotNull(delivery);
assertTrue(sender.isSendable());
try {
delivery.writeBytes(ProtonByteBufferAllocator.DEFAULT.wrap(new byte[] { 1 }));
fail("Should not be able to write to delivery afters simulated connection drop.");
} catch (EngineFailedException efe) {
// Should not allow writes on past delivery instances after connection dropped
assertTrue(efe.getCause() instanceof UncheckedIOException);
LOG.debug("Caught expected IO exception from write to broken connection", efe);
}
peer.waitForScriptToComplete();
assertNotNull(failure);
}
@Test
public void testSendMultiFrameDeliveryAndSingleFrameDeliveryOnSingleSessionFromDifferentSenders() {
doMultiplexMultiFrameDeliveryOnSingleSessionOutgoingTestImpl(false);
}
@Test
public void testMultipleMultiFrameDeliveriesOnSingleSessionFromDifferentSenders() {
doMultiplexMultiFrameDeliveryOnSingleSessionOutgoingTestImpl(true);
}
private void doMultiplexMultiFrameDeliveryOnSingleSessionOutgoingTestImpl(boolean bothDeliveriesMultiFrame) {
Engine engine = EngineFactory.PROTON.createNonSaslEngine();
engine.errorHandler(result -> failure = result.failureCause());
ProtonTestConnector peer = createTestPeer(engine);
int contentLength1 = 6000;
int frameSizeLimit = 4000;
int contentLength2 = 2000;
if (bothDeliveriesMultiFrame) {
contentLength2 = 6000;
}
peer.expectAMQPHeader().respondWithAMQPHeader();
peer.expectOpen().withMaxFrameSize(frameSizeLimit).respond().withContainerId("driver").withMaxFrameSize(frameSizeLimit);
peer.expectBegin().respond();
peer.expectAttach().withRole(Role.SENDER.getValue()).respond();
peer.expectAttach().withRole(Role.SENDER.getValue()).respond();
Connection connection = engine.start();
connection.setMaxFrameSize(frameSizeLimit);
connection.open();
Session session = connection.session();
session.open();
String linkName1 = "Sender1";
Sender sender1 = session.sender(linkName1);
sender1.open();
String linkName2 = "Sender2";
Sender sender2 = session.sender(linkName2);
sender2.open();
final AtomicBoolean sender1MarkedSendable = new AtomicBoolean();
sender1.creditStateUpdateHandler(handler -> {
sender1MarkedSendable.set(handler.isSendable());
});
final AtomicBoolean sender2MarkedSendable = new AtomicBoolean();
sender2.creditStateUpdateHandler(handler -> {
sender2MarkedSendable.set(handler.isSendable());
});
peer.remoteFlow().withHandle(0)
.withDeliveryCount(0)
.withLinkCredit(10)
.withIncomingWindow(1024)
.withOutgoingWindow(10)
.withNextIncomingId(0)
.withNextOutgoingId(1).now();
peer.remoteFlow().withHandle(1)
.withDeliveryCount(0)
.withLinkCredit(10)
.withIncomingWindow(1024)
.withOutgoingWindow(10)
.withNextIncomingId(0)
.withNextOutgoingId(1).now();
assertTrue(sender1MarkedSendable.get(), "Sender 1 should now be sendable");
assertTrue(sender2MarkedSendable.get(), "Sender 2 should now be sendable");
// Frames are not multiplexed for large deliveries as we write the full
// writable portion out when a write is called.
peer.expectTransfer().withHandle(0)
.withSettled(true)
.withState().accepted()
.withDeliveryId(0)
.withMore(true)
.withDeliveryTag(new byte[] {1});
peer.expectTransfer().withHandle(0)
.withSettled(true)
.withState().accepted()
.withDeliveryId(0)
.withMore(false)
.withDeliveryTag(nullValue());
peer.expectTransfer().withHandle(1)
.withSettled(true)
.withState().accepted()
.withDeliveryId(1)
.withMore(bothDeliveriesMultiFrame)
.withDeliveryTag(new byte[] {2});
if (bothDeliveriesMultiFrame) {
peer.expectTransfer().withHandle(1)
.withSettled(true)
.withState().accepted()
.withDeliveryId(1)
.withMore(false)
.withDeliveryTag(nullValue());
}
ProtonBuffer messageContent1 = createContentBuffer(contentLength1);
OutgoingDelivery delivery1 = sender1.next();
delivery1.setTag(new byte[] { 1 });
delivery1.disposition(Accepted.getInstance(), true);
delivery1.writeBytes(messageContent1);
ProtonBuffer messageContent2 = createContentBuffer(contentLength2);
OutgoingDelivery delivery2 = sender2.next();
delivery2.setTag(new byte[] { 2 });
delivery2.disposition(Accepted.getInstance(), true);
delivery2.writeBytes(messageContent2);
peer.expectClose().respond();
connection.close();
peer.waitForScriptToComplete();
assertNull(failure);
}
@Test
public void testMaxFrameSizeOfPeerHasEffect() {
doMaxFrameSizeTestImpl(0, 0, 5700, 1);
doMaxFrameSizeTestImpl(1024, 0, 5700, 6);
}
@Test
public void testMaxFrameSizeOutgoingFrameSizeLimitHasEffect() {
doMaxFrameSizeTestImpl(0, 512, 5700, 12);
doMaxFrameSizeTestImpl(1024, 512, 5700, 12);
doMaxFrameSizeTestImpl(1024, 2048, 5700, 6);
}
void doMaxFrameSizeTestImpl(int remoteMaxFrameSize, int outboundFrameSizeLimit, int contentLength, int expectedNumFrames) {
Engine engine = EngineFactory.PROTON.createNonSaslEngine();
engine.errorHandler(result -> failure = result.failureCause());
ProtonTestConnector peer = createTestPeer(engine);
peer.expectAMQPHeader().respondWithAMQPHeader();
if (outboundFrameSizeLimit == 0) {
if (remoteMaxFrameSize == 0) {
peer.expectOpen().respond();
} else {
peer.expectOpen().respond().withMaxFrameSize(remoteMaxFrameSize);
}
} else {
if (remoteMaxFrameSize == 0) {
peer.expectOpen().withMaxFrameSize(outboundFrameSizeLimit).respond();
} else {
peer.expectOpen().withMaxFrameSize(outboundFrameSizeLimit)
.respond()
.withMaxFrameSize(remoteMaxFrameSize);
}
}
peer.expectBegin().respond();
peer.expectAttach().withRole(Role.SENDER.getValue()).respond();
Connection connection = engine.start();
if (outboundFrameSizeLimit != 0) {
connection.setMaxFrameSize(outboundFrameSizeLimit);
}
connection.open();
Session session = connection.session();
session.open();
String linkName = "mySender";
Sender sender = session.sender(linkName);
sender.open();
final AtomicBoolean senderMarkedSendable = new AtomicBoolean();
sender.creditStateUpdateHandler(handler -> {
senderMarkedSendable.set(handler.isSendable());
});
peer.remoteFlow().withHandle(0)
.withDeliveryCount(0)
.withLinkCredit(50)
.withIncomingWindow(65535)
.withOutgoingWindow(65535)
.withNextIncomingId(0)
.withNextOutgoingId(1).now();
assertTrue(senderMarkedSendable.get(), "Sender should now be sendable");
// This calculation isn't entirely precise, there is some added performative/frame overhead not
// accounted for...but values are chosen to work, and verified here.
final int frameCount;
if (remoteMaxFrameSize == 0 && outboundFrameSizeLimit == 0) {
frameCount = 1;
} else if(remoteMaxFrameSize == 0 && outboundFrameSizeLimit != 0) {
frameCount = (int) Math.ceil((double)contentLength / (double) outboundFrameSizeLimit);
} else {
int effectiveMaxFrameSize;
if (outboundFrameSizeLimit != 0) {
effectiveMaxFrameSize = Math.min(outboundFrameSizeLimit, remoteMaxFrameSize);
} else {
effectiveMaxFrameSize = remoteMaxFrameSize;
}
frameCount = (int) Math.ceil((double)contentLength / (double) effectiveMaxFrameSize);
}
assertEquals(expectedNumFrames, frameCount, "Unexpected number of frames calculated");
for (int i = 1; i <= expectedNumFrames; ++i) {
peer.expectTransfer().withHandle(0)
.withSettled(true)
.withState().accepted()
.withDeliveryId(0)
.withMore(i != expectedNumFrames ? true : false)
.withDeliveryTag(i == 1 ? notNullValue() : nullValue())
.withNonNullPayload();
}
ProtonBuffer messageContent = createContentBuffer(contentLength);
OutgoingDelivery delivery = sender.next();
delivery.setTag(new byte[] { 1 });
delivery.disposition(Accepted.getInstance(), true);
delivery.writeBytes(messageContent);
peer.expectClose().respond();
connection.close();
peer.waitForScriptToComplete();
assertNull(failure);
}
@Test
public void testCompleteInProgressDeliveryWithFinalEmptyTransfer() throws Exception {
Engine engine = EngineFactory.PROTON.createNonSaslEngine();
engine.errorHandler(result -> failure = result.failureCause());
ProtonTestConnector peer = createTestPeer(engine);
byte[] payload = new byte[] {0, 1, 2, 3, 4};
peer.expectAMQPHeader().respondWithAMQPHeader();
peer.expectOpen().respond().withContainerId("driver");
peer.expectBegin().respond();
peer.expectAttach().withRole(Role.SENDER.getValue()).respond();
peer.remoteFlow().withDeliveryCount(0)
.withLinkCredit(10)
.withIncomingWindow(1024)
.withOutgoingWindow(10)
.withNextIncomingId(0)
.withNextOutgoingId(1).queue();
peer.expectTransfer().withHandle(0)
.withMore(true)
.withSettled(false)
.withState(nullValue())
.withDeliveryId(0)
.withDeliveryTag(new byte[] {0})
.withPayload(payload);
peer.expectTransfer().withHandle(0)
.withState(nullValue())
.withDeliveryId(0)
.withAborted(anyOf(nullValue(), is(false)))
.withSettled(false)
.withMore(anyOf(nullValue(), is(false)))
.withNullPayload();
peer.expectDetach().withHandle(0).respond();
Connection connection = engine.start();
connection.open();
Session session = connection.session();
session.open();
Sender sender = session.sender("sender-1");
sender.open();
final AtomicBoolean senderMarkedSendable = new AtomicBoolean();
sender.creditStateUpdateHandler(handler -> {
senderMarkedSendable.set(sender.isSendable());
});
OutgoingDelivery delivery = sender.next();
assertNotNull(delivery);
delivery.setTag(new byte[] {0});
delivery.streamBytes(ProtonByteBufferAllocator.DEFAULT.wrap(payload), false);
delivery.streamBytes(null, true);
assertFalse(delivery.isAborted());
assertFalse(delivery.isPartial());
assertFalse(delivery.isSettled());
sender.close();
peer.waitForScriptToComplete();
assertNull(failure);
}
@Test
public void testAbortInProgressDelivery() throws Exception {
Engine engine = EngineFactory.PROTON.createNonSaslEngine();
engine.errorHandler(result -> failure = result.failureCause());
ProtonTestConnector peer = createTestPeer(engine);
byte[] payload = new byte[] {0, 1, 2, 3, 4};
peer.expectAMQPHeader().respondWithAMQPHeader();
peer.expectOpen().respond().withContainerId("driver");
peer.expectBegin().respond();
peer.expectAttach().withRole(Role.SENDER.getValue()).respond();
peer.remoteFlow().withDeliveryCount(0)
.withLinkCredit(10)
.withIncomingWindow(1024)
.withOutgoingWindow(10)
.withNextIncomingId(0)
.withNextOutgoingId(1).queue();
peer.expectTransfer().withHandle(0)
.withMore(true)
.withSettled(false)
.withState(nullValue())
.withDeliveryId(0)
.withDeliveryTag(new byte[] {0})
.withPayload(payload);
peer.expectTransfer().withHandle(0)
.withState(nullValue())
.withDeliveryId(0)
.withDeliveryTag(new byte[] {0})
.withAborted(true)
.withSettled(true)
.withMore(anyOf(nullValue(), is(false)))
.withNullPayload();
peer.expectDetach().withHandle(0).respond();
Connection connection = engine.start();
connection.open();
Session session = connection.session();
session.open();
Sender sender = session.sender("sender-1");
sender.open();
final AtomicBoolean senderMarkedSendable = new AtomicBoolean();
sender.creditStateUpdateHandler(handler -> {
senderMarkedSendable.set(sender.isSendable());
});
OutgoingDelivery delivery = sender.next();
assertNotNull(delivery);
delivery.setTag(new byte[] {0});
delivery.streamBytes(ProtonByteBufferAllocator.DEFAULT.wrap(payload));
delivery.abort();
assertTrue(delivery.isAborted());
assertFalse(delivery.isPartial());
assertTrue(delivery.isSettled());
sender.close();
peer.waitForScriptToComplete();
assertNull(failure);
}
@Test
public void testAbortAlreadyAbortedDelivery() throws Exception {
Engine engine = EngineFactory.PROTON.createNonSaslEngine();
engine.errorHandler(result -> failure = result.failureCause());
ProtonTestConnector peer = createTestPeer(engine);
byte[] payload = new byte[] {0, 1, 2, 3, 4};
peer.expectAMQPHeader().respondWithAMQPHeader();
peer.expectOpen().respond().withContainerId("driver");
peer.expectBegin().respond();
peer.expectAttach().withRole(Role.SENDER.getValue()).respond();
peer.remoteFlow().withDeliveryCount(0)
.withLinkCredit(10)
.withIncomingWindow(1024)
.withOutgoingWindow(10)
.withNextIncomingId(0)
.withNextOutgoingId(1).queue();
peer.expectTransfer().withHandle(0)
.withMore(true)
.withSettled(false)
.withState(nullValue())
.withDeliveryId(0)
.withDeliveryTag(new byte[] {0})
.withPayload(payload);
peer.expectTransfer().withHandle(0)
.withState(nullValue())
.withDeliveryId(0)
.withDeliveryTag(new byte[] {0})
.withAborted(true)
.withSettled(true)
.withMore(anyOf(nullValue(), is(false)))
.withNullPayload();
peer.expectDetach().withHandle(0).respond();
Connection connection = engine.start();
connection.open();
Session session = connection.session();
session.open();
Sender sender = session.sender("sender-1");
sender.open();
final AtomicBoolean senderMarkedSendable = new AtomicBoolean();
sender.creditStateUpdateHandler(handler -> {
senderMarkedSendable.set(sender.isSendable());
});
OutgoingDelivery delivery = sender.next();
assertNotNull(delivery);
delivery.setTag(new byte[] {0});
delivery.streamBytes(ProtonByteBufferAllocator.DEFAULT.wrap(payload));
assertTrue(sender.hasUnsettled());
delivery.abort();
assertTrue(delivery.isAborted());
assertFalse(delivery.isPartial());
assertTrue(delivery.isSettled());
// Second abort attempt should not error out or trigger additional frames
delivery.abort();
sender.close();
peer.waitForScriptToComplete();
assertNull(failure);
}
@Test
public void testAbortOnDeliveryThatHasNoWritesIsNoOp() throws Exception {
Engine engine = EngineFactory.PROTON.createNonSaslEngine();
engine.errorHandler(result -> failure = result.failureCause());
ProtonTestConnector peer = createTestPeer(engine);
peer.expectAMQPHeader().respondWithAMQPHeader();
peer.expectOpen().respond().withContainerId("driver");
peer.expectBegin().respond();
peer.expectAttach().withRole(Role.SENDER.getValue()).respond();
peer.remoteFlow().withDeliveryCount(0)
.withLinkCredit(10)
.withIncomingWindow(1024)
.withOutgoingWindow(10)
.withNextIncomingId(0)
.withNextOutgoingId(1).queue();
peer.expectDetach().withHandle(0).respond();
Connection connection = engine.start();
connection.open();
Session session = connection.session();
session.open();
Sender sender = session.sender("sender-1");
sender.open();
final AtomicBoolean senderMarkedSendable = new AtomicBoolean();
sender.creditStateUpdateHandler(handler -> {
senderMarkedSendable.set(sender.isSendable());
});
OutgoingDelivery delivery = sender.next();
assertNotNull(delivery);
delivery.setTag(new byte[] {0});
delivery.abort();
assertNull(sender.current());
assertTrue(delivery.isAborted());
assertFalse(delivery.isPartial());
assertTrue(delivery.isSettled());
sender.close();
peer.waitForScriptToComplete();
assertNull(failure);
}
@Test
public void testAbortOnDeliveryThatHasNoWritesIsNoOpThenSendUsingCurrent() throws Exception {
Engine engine = EngineFactory.PROTON.createNonSaslEngine();
engine.errorHandler(result -> failure = result.failureCause());
ProtonTestConnector peer = createTestPeer(engine);
peer.expectAMQPHeader().respondWithAMQPHeader();
peer.expectOpen().respond().withContainerId("driver");
peer.expectBegin().respond();
peer.expectAttach().withRole(Role.SENDER.getValue()).respond();
peer.remoteFlow().withDeliveryCount(0)
.withLinkCredit(10)
.withIncomingWindow(1024)
.withOutgoingWindow(10)
.withNextIncomingId(0)
.withNextOutgoingId(1).queue();
byte[] payload = new byte[] {0, 1, 2, 3, 4};
Connection connection = engine.start();
connection.open();
Session session = connection.session();
session.open();
Sender sender = session.sender("sender-1");
sender.open();
final AtomicBoolean senderMarkedSendable = new AtomicBoolean();
sender.creditStateUpdateHandler(handler -> {
senderMarkedSendable.set(sender.isSendable());
});
OutgoingDelivery delivery = sender.next();
assertNotNull(delivery);
delivery.setTag(new byte[] {0});
delivery.abort();
assertNull(sender.current());
assertTrue(delivery.isAborted());
assertFalse(delivery.isPartial());
assertTrue(delivery.isSettled());
try {
sender.next();
} catch (IllegalStateException ise) {
fail("Should not be able to next as current was not aborted since nothing was ever written.");
}
peer.expectTransfer().withHandle(0)
.withSettled(false)
.withState(nullValue())
.withDeliveryId(0)
.withDeliveryTag(new byte[] {1})
.withPayload(payload);
peer.expectDisposition().withFirst(0).withSettled(true).withState().accepted();
peer.expectDetach().withHandle(0).respond();
delivery = sender.current();
delivery.setTag(new byte[] {1}).writeBytes(ProtonByteBufferAllocator.DEFAULT.wrap(payload));
delivery.disposition(Accepted.getInstance(), true);
sender.close();
peer.waitForScriptToComplete();
assertNull(failure);
}
@Test
public void testSettleTransferWithNullDisposition() throws Exception {
doTestSettleTransferWithSpecifiedOutcome(null, nullValue(), true);
}
@Test
public void testSettleTransferWithAcceptedDisposition() throws Exception {
DeliveryState state = Accepted.getInstance();
AcceptedMatcher matcher = new AcceptedMatcher();
doTestSettleTransferWithSpecifiedOutcome(state, matcher, true);
}
@Test
public void testUnsettledDispositionOfTransferWithAcceptedOutcome() throws Exception {
DeliveryState state = Accepted.getInstance();
AcceptedMatcher matcher = new AcceptedMatcher();
doTestSettleTransferWithSpecifiedOutcome(state, matcher, false);
}
@Test
public void testSettleTransferWithReleasedDisposition() throws Exception {
DeliveryState state = Released.getInstance();
ReleasedMatcher matcher = new ReleasedMatcher();
doTestSettleTransferWithSpecifiedOutcome(state, matcher, true);
}
@Test
public void testSettleTransferWithRejectedDisposition() throws Exception {
DeliveryState state = new Rejected();
RejectedMatcher matcher = new RejectedMatcher();
doTestSettleTransferWithSpecifiedOutcome(state, matcher, true);
}
@Test
public void testSettleTransferWithRejectedWithErrorDisposition() throws Exception {
DeliveryState state = new Rejected().setError(new ErrorCondition(AmqpError.DECODE_ERROR, "test"));
RejectedMatcher matcher = new RejectedMatcher().withError(AmqpError.DECODE_ERROR.toString(), "test");
doTestSettleTransferWithSpecifiedOutcome(state, matcher, true);
}
@Test
public void testSettleTransferWithModifiedDisposition() throws Exception {
DeliveryState state = new Modified().setDeliveryFailed(true).setUndeliverableHere(true);
ModifiedMatcher matcher = new ModifiedMatcher().withDeliveryFailed(true).withUndeliverableHere(true);
doTestSettleTransferWithSpecifiedOutcome(state, matcher, true);
}
@Test
public void testSettleTransferWithTransactionalDisposition() throws Exception {
DeliveryState state = new TransactionalState().setTxnId(new Binary(new byte[] {1})).setOutcome(Accepted.getInstance());
TransactionalStateMatcher matcher =
new TransactionalStateMatcher().withTxnId(new byte[] {1}).withOutcome(new AcceptedMatcher());
doTestSettleTransferWithSpecifiedOutcome(state, matcher, true);
}
private void doTestSettleTransferWithSpecifiedOutcome(DeliveryState state, Matcher<?> stateMatcher, boolean settled) throws Exception {
Engine engine = EngineFactory.PROTON.createNonSaslEngine();
engine.errorHandler(result -> failure = result.failureCause());
ProtonTestConnector peer = createTestPeer(engine);
peer.expectAMQPHeader().respondWithAMQPHeader();
peer.expectOpen().respond().withContainerId("driver");
peer.expectBegin().respond();
peer.expectAttach().withRole(Role.SENDER.getValue()).respond();
peer.remoteFlow().withDeliveryCount(0)
.withLinkCredit(10)
.withIncomingWindow(1024)
.withOutgoingWindow(10)
.withNextIncomingId(0)
.withNextOutgoingId(1).queue();
peer.expectTransfer().withHandle(0)
.withSettled(false)
.withState(nullValue())
.withDeliveryId(0)
.withDeliveryTag(new byte[] {0});
peer.expectDisposition().withFirst(0)
.withSettled(settled)
.withState(stateMatcher);
peer.expectDetach().withHandle(0).respond();
Connection connection = engine.start();
connection.open();
Session session = connection.session();
session.open();
ProtonBuffer payload = ProtonByteBufferAllocator.DEFAULT.wrap(new byte[] {0, 1, 2, 3, 4});
Sender sender = session.sender("sender-1");
final AtomicBoolean deliverySentAfterSendable = new AtomicBoolean();
final AtomicReference<OutgoingDelivery> sentDelivery = new AtomicReference<>();
sender.creditStateUpdateHandler(handler -> {
sentDelivery.set(handler.next().setTag(new byte[] {0}).writeBytes(payload));
deliverySentAfterSendable.set(sender.isSendable());
});
sender.open();
assertTrue(deliverySentAfterSendable.get(), "Delivery should have been sent after credit arrived");
OutgoingDelivery delivery = sender.current();
assertNull(delivery);
sentDelivery.get().disposition(state, settled);
sender.close();
peer.waitForScriptToComplete();
assertNull(failure);
}
@Test
public void testAttemptedSecondDispositionOnAlreadySettledDeliveryNull() throws Exception {
doTestAttemptedSecondDispositionOnAlreadySettledDelivery(Accepted.getInstance(), null);
}
@Test
public void testAttemptedSecondDispositionOnAlreadySettledDeliveryReleased() throws Exception {
doTestAttemptedSecondDispositionOnAlreadySettledDelivery(Accepted.getInstance(), Released.getInstance());
}
@Test
public void testAttemptedSecondDispositionOnAlreadySettledDeliveryModified() throws Exception {
doTestAttemptedSecondDispositionOnAlreadySettledDelivery(Released.getInstance(), new Modified().setDeliveryFailed(true));
}
@Test
public void testAttemptedSecondDispositionOnAlreadySettledDeliveryRejected() throws Exception {
doTestAttemptedSecondDispositionOnAlreadySettledDelivery(Released.getInstance(), new Rejected());
}
@Test
public void testAttemptedSecondDispositionOnAlreadySettledDeliveryTransactional() throws Exception {
doTestAttemptedSecondDispositionOnAlreadySettledDelivery(Released.getInstance(), new TransactionalState().setOutcome(Accepted.getInstance()));
}
private void doTestAttemptedSecondDispositionOnAlreadySettledDelivery(DeliveryState first, DeliveryState second) throws Exception {
Engine engine = EngineFactory.PROTON.createNonSaslEngine();
engine.errorHandler(result -> failure = result.failureCause());
ProtonTestConnector peer = createTestPeer(engine);
peer.expectAMQPHeader().respondWithAMQPHeader();
peer.expectOpen().respond().withContainerId("driver");
peer.expectBegin().respond();
peer.expectAttach().withRole(Role.SENDER.getValue()).respond();
peer.remoteFlow().withDeliveryCount(0)
.withLinkCredit(10)
.withIncomingWindow(1024)
.withOutgoingWindow(10)
.withNextIncomingId(0)
.withNextOutgoingId(1).queue();
peer.expectTransfer().withHandle(0)
.withSettled(false)
.withState(nullValue())
.withDeliveryId(0)
.withDeliveryTag(new byte[] {0});
peer.expectDisposition().withFirst(0)
.withSettled(true)
.withState(notNullValue());
peer.expectDetach().withHandle(0).respond();
Connection connection = engine.start();
connection.open();
Session session = connection.session();
session.open();
ProtonBuffer payload = ProtonByteBufferAllocator.DEFAULT.wrap(new byte[] {0, 1, 2, 3, 4});
Sender sender = session.sender("sender-1");
final AtomicReference<OutgoingDelivery> sentDelivery = new AtomicReference<>();
final AtomicBoolean deliverySentAfterSendable = new AtomicBoolean();
sender.creditStateUpdateHandler(handler -> {
sentDelivery.set(handler.next().setTag(new byte[] {0}).writeBytes(payload));
deliverySentAfterSendable.set(sender.isSendable());
});
sender.open();
assertTrue(deliverySentAfterSendable.get(), "Delivery should have been sent after credit arrived");
OutgoingDelivery delivery = sender.current();
assertNull(delivery);
sentDelivery.get().disposition(first, true);
// A second attempt at the same outcome should result in no action.
sentDelivery.get().disposition(first, true);
try {
sentDelivery.get().disposition(second, true);
fail("Should not be able to update outcome on already settled delivery");
} catch (IllegalStateException ise) {
// Expected
}
sender.close();
peer.waitForScriptToComplete();
assertNull(failure);
}
@Test
public void testSettleSentDeliveryAfterRemoteSettles() throws Exception {
Engine engine = EngineFactory.PROTON.createNonSaslEngine();
engine.errorHandler(result -> failure = result.failureCause());
ProtonTestConnector peer = createTestPeer(engine);
peer.expectAMQPHeader().respondWithAMQPHeader();
peer.expectOpen().respond().withContainerId("driver");
peer.expectBegin().respond();
peer.expectAttach().withRole(Role.SENDER.getValue()).respond();
peer.remoteFlow().withDeliveryCount(0)
.withLinkCredit(10)
.withIncomingWindow(1024)
.withOutgoingWindow(10)
.withNextIncomingId(0)
.withNextOutgoingId(1).queue();
peer.expectTransfer().withHandle(0)
.withSettled(false)
.withState(nullValue())
.withDeliveryId(0)
.withDeliveryTag(new byte[] {0})
.accept();
peer.expectDetach().withHandle(0).respond();
Connection connection = engine.start();
connection.open();
Session session = connection.session();
session.open();
ProtonBuffer payload = ProtonByteBufferAllocator.DEFAULT.wrap(new byte[] {0, 1, 2, 3, 4});
Sender sender = session.sender("sender-1");
final AtomicBoolean deliverySentAfterSendable = new AtomicBoolean();
final AtomicReference<OutgoingDelivery> sentDelivery = new AtomicReference<>();
sender.creditStateUpdateHandler(handler -> {
sentDelivery.set(handler.next().setTag(new byte[] {0}).writeBytes(payload));
deliverySentAfterSendable.set(sender.isSendable());
});
sender.deliveryStateUpdatedHandler((delivery) -> {
if (delivery.isRemotelySettled()) {
delivery.settle();
}
});
sender.open();
assertTrue(deliverySentAfterSendable.get(), "Delivery should have been sent after credit arrived");
assertNull(sender.current());
assertTrue(sentDelivery.get().isRemotelySettled());
assertSame(Accepted.getInstance(), sentDelivery.get().getRemoteState());
assertNull(sentDelivery.get().getState());
assertTrue(sentDelivery.get().isSettled());
sender.close();
peer.waitForScriptToComplete();
assertNull(failure);
}
@Test
public void testSenderHandlesDeferredOpenAndBeginAttachResponses() throws Exception {
Engine engine = EngineFactory.PROTON.createNonSaslEngine();
engine.errorHandler(result -> failure = result.failureCause());
ProtonTestConnector peer = createTestPeer(engine);
final AtomicBoolean senderRemotelyOpened = new AtomicBoolean();
peer.expectAMQPHeader().respondWithAMQPHeader();
peer.expectOpen();
peer.expectBegin();
peer.expectAttach().withRole(Role.SENDER.getValue())
.withTarget().withDynamic(true).withAddress((String) null);
Connection connection = engine.start();
connection.open();
Session session = connection.session();
session.open();
Sender sender = session.sender("sender-1");
sender.setTarget(new Target().setDynamic(true).setAddress(null));
sender.openHandler(result -> senderRemotelyOpened.set(true)).open();
peer.waitForScriptToComplete(5, TimeUnit.SECONDS);
// This should happen after we inject the held open and attach
peer.expectClose().respond();
// Inject held responses to get the ball rolling again
peer.remoteOpen().withOfferedCapabilities("ANONYMOUS_RELAY").now();
peer.respondToLastBegin().now();
peer.respondToLastAttach().now();
assertTrue(senderRemotelyOpened.get(), "Sender remote opened event did not fire");
assertNotNull(sender.<Target>getRemoteTarget().getAddress());
connection.close();
peer.waitForScriptToComplete(5, TimeUnit.SECONDS);
}
@Test
public void testCloseAfterShutdownDoesNotThrowExceptionOpenAndBeginWrittenAndResponseAttachWrittenAndResponse() throws Exception {
testCloseAfterShutdownNoOutputAndNoException(true, true, true, true);
}
@Test
public void testCloseAfterShutdownDoesNotThrowExceptionOpenAndBeginWrittenAndResponseAttachWrittenAndNoResponse() throws Exception {
testCloseAfterShutdownNoOutputAndNoException(true, true, true, false);
}
@Test
public void testCloseAfterShutdownDoesNotThrowExceptionOpenWrittenAndResponseBeginWrittenAndNoResponse() throws Exception {
testCloseAfterShutdownNoOutputAndNoException(true, true, false, false);
}
@Test
public void testCloseAfterShutdownDoesNotThrowExceptionOpenWrittenButNoResponse() throws Exception {
testCloseAfterShutdownNoOutputAndNoException(true, false, false, false);
}
@Test
public void testCloseAfterShutdownDoesNotThrowExceptionOpenNotWritten() throws Exception {
testCloseAfterShutdownNoOutputAndNoException(false, false, false, false);
}
private void testCloseAfterShutdownNoOutputAndNoException(boolean respondToHeader, boolean respondToOpen, boolean respondToBegin, boolean respondToAttach) throws Exception {
Engine engine = EngineFactory.PROTON.createNonSaslEngine();
engine.errorHandler(result -> failure = result.failureCause());
ProtonTestConnector peer = createTestPeer(engine);
if (respondToHeader) {
peer.expectAMQPHeader().respondWithAMQPHeader();
if (respondToOpen) {
peer.expectOpen().respond();
if (respondToBegin) {
peer.expectBegin().respond();
if (respondToAttach) {
peer.expectAttach().respond();
} else {
peer.expectAttach();
}
} else {
peer.expectBegin();
peer.expectAttach();
}
} else {
peer.expectOpen();
peer.expectBegin();
peer.expectAttach();
}
} else {
peer.expectAMQPHeader();
}
Connection connection = engine.start();
connection.open();
Session session = connection.session();
session.open();
Sender sender = session.sender("test");
sender.open();
engine.shutdown();
// Should clean up and not throw as we knowingly shutdown engine operations.
sender.close();
session.close();
connection.close();
peer.waitForScriptToComplete();
assertNull(failure);
}
@Test
public void testCloseAfterFailureThrowsEngineStateExceptionOpenAndBeginWrittenAndResponseAttachWrittenAndResponse() throws Exception {
testCloseAfterEngineFailedThrowsAndNoOutputWritten(true, true, true, true);
}
@Test
public void testCloseAfterFailureThrowsEngineStateExceptionOpenAndBeginWrittenAndResponseAttachWrittenAndNoResponse() throws Exception {
testCloseAfterEngineFailedThrowsAndNoOutputWritten(true, true, true, false);
}
@Test
public void testCloseAfterFailureThrowsEngineStateExceptionOpenWrittenAndResponseBeginWrittenAndNoResponse() throws Exception {
testCloseAfterEngineFailedThrowsAndNoOutputWritten(true, true, true, false);
}
@Test
public void testCloseAfterFailureThrowsEngineStateExceptionOpenWrittenButNoResponse() throws Exception {
testCloseAfterEngineFailedThrowsAndNoOutputWritten(true, false, false, false);
}
@Test
public void testCloseAfterFailureThrowsEngineStateExceptionOpenNotWritten() throws Exception {
testCloseAfterEngineFailedThrowsAndNoOutputWritten(false, false, false, false);
}
private void testCloseAfterEngineFailedThrowsAndNoOutputWritten(boolean respondToHeader, boolean respondToOpen, boolean respondToBegin, boolean respondToAttach) throws Exception {
Engine engine = EngineFactory.PROTON.createNonSaslEngine();
engine.errorHandler(result -> failure = result.failureCause());
ProtonTestConnector peer = createTestPeer(engine);
if (respondToHeader) {
peer.expectAMQPHeader().respondWithAMQPHeader();
if (respondToOpen) {
peer.expectOpen().respond();
if (respondToBegin) {
peer.expectBegin().respond();
if (respondToAttach) {
peer.expectAttach().respond();
} else {
peer.expectAttach();
}
} else {
peer.expectBegin();
peer.expectAttach();
}
peer.expectClose();
} else {
peer.expectOpen();
peer.expectBegin();
peer.expectAttach();
peer.expectClose();
}
} else {
peer.expectAMQPHeader();
}
Connection connection = engine.start();
connection.open();
Session session = connection.session();
session.open();
Sender sender = session.sender("test");
sender.open();
engine.engineFailed(new IOException());
try {
sender.close();
fail("Should throw exception indicating engine is in a failed state.");
} catch (EngineFailedException efe) {}
try {
session.close();
fail("Should throw exception indicating engine is in a failed state.");
} catch (EngineFailedException efe) {}
try {
connection.close();
fail("Should throw exception indicating engine is in a failed state.");
} catch (EngineFailedException efe) {}
engine.shutdown(); // Explicit shutdown now allows local close to complete
// Should clean up and not throw as we knowingly shutdown engine operations.
sender.close();
session.close();
connection.close();
peer.waitForScriptToComplete();
assertNotNull(failure);
}
@Test
public void testCloseReceiverWithErrorCondition() throws Exception {
doTestCloseOrDetachWithErrorCondition(true);
}
@Test
public void testDetachReceiverWithErrorCondition() throws Exception {
doTestCloseOrDetachWithErrorCondition(false);
}
private void doTestCloseOrDetachWithErrorCondition(boolean close) throws Exception {
final String condition = "amqp:link:detach-forced";
final String description = "something bad happened.";
Engine engine = EngineFactory.PROTON.createNonSaslEngine();
engine.errorHandler(result -> failure = result.failureCause());
ProtonTestConnector peer = createTestPeer(engine);
peer.expectAMQPHeader().respondWithAMQPHeader();
peer.expectOpen().respond();
peer.expectBegin().respond();
peer.expectAttach().respond();
peer.expectDetach().withClosed(close)
.withError(condition, description)
.respond();
peer.expectClose();
Connection connection = engine.start();
connection.open();
Session session = connection.session();
session.open();
Sender sender = session.sender("sender-1");
sender.open();
sender.setCondition(new ErrorCondition(Symbol.valueOf(condition), description));
if (close) {
sender.close();
} else {
sender.detach();
}
connection.close();
peer.waitForScriptToComplete(5, TimeUnit.SECONDS);
}
@Test
public void testSenderDrainedWhenNotDraining() {
Engine engine = EngineFactory.PROTON.createNonSaslEngine();
engine.errorHandler(result -> failure = result.failureCause());
ProtonTestConnector peer = createTestPeer(engine);
peer.expectAMQPHeader().respondWithAMQPHeader();
peer.expectOpen().respond();
peer.expectBegin().respond();
peer.expectAttach().respond();
peer.remoteFlow().withDeliveryCount(0).withLinkCredit(10).withDrain(false).queue();
peer.expectDetach().respond();
peer.expectClose().respond();
Connection connection = engine.start().open();
Session session = connection.session().open();
Sender sender = session.sender("sender-1");
sender.creditStateUpdateHandler(link -> link.drained());
sender.open();
assertEquals(10, sender.getCredit());
sender.close();
connection.close();
peer.waitForScriptToComplete(5, TimeUnit.SECONDS);
}
@Test
public void testSenderDrainedWhenDrainSignaledButNoCreditGiven() {
Engine engine = EngineFactory.PROTON.createNonSaslEngine();
engine.errorHandler(result -> failure = result.failureCause());
ProtonTestConnector peer = createTestPeer(engine);
peer.expectAMQPHeader().respondWithAMQPHeader();
peer.expectOpen().respond();
peer.expectBegin().respond();
peer.expectAttach().respond();
peer.remoteFlow().withDeliveryCount(0).withLinkCredit(0).withDrain(false).queue();
peer.expectDetach().respond();
peer.expectClose().respond();
Connection connection = engine.start().open();
Session session = connection.session().open();
Sender sender = session.sender("sender-1");
sender.creditStateUpdateHandler(link -> link.drained());
sender.open();
assertEquals(0, sender.getCredit());
sender.close();
connection.close();
peer.waitForScriptToComplete(5, TimeUnit.SECONDS);
}
@Test
public void testSenderSignalsDrainedWhenCreditOutstanding() {
Engine engine = EngineFactory.PROTON.createNonSaslEngine();
engine.errorHandler(result -> failure = result.failureCause());
ProtonTestConnector peer = createTestPeer(engine);
peer.expectAMQPHeader().respondWithAMQPHeader();
peer.expectOpen().respond();
peer.expectBegin().respond();
peer.expectAttach().respond();
peer.remoteFlow().withDeliveryCount(0).withLinkCredit(10).withDrain(true).queue();
peer.expectFlow().withDeliveryCount(10).withLinkCredit(0).withDrain(true);
peer.expectDetach().respond();
peer.expectClose().respond();
Connection connection = engine.start().open();
Session session = connection.session().open();
Sender sender = session.sender("sender-1");
sender.creditStateUpdateHandler(link -> link.drained());
sender.open();
sender.close();
connection.close();
peer.waitForScriptToComplete(5, TimeUnit.SECONDS);
}
@Test
public void testSenderOmitsFlowWhenDrainedCreditIsSatisfied() {
Engine engine = EngineFactory.PROTON.createNonSaslEngine();
engine.errorHandler(result -> failure = result.failureCause());
ProtonTestConnector peer = createTestPeer(engine);
ProtonBuffer payload = ProtonByteBufferAllocator.DEFAULT.wrap(new byte[] {0, 1, 2, 3, 4});
peer.expectAMQPHeader().respondWithAMQPHeader();
peer.expectOpen().respond();
peer.expectBegin().respond();
peer.expectAttach().respond();
peer.remoteFlow().withDeliveryCount(0).withLinkCredit(1).withDrain(true).queue();
peer.expectTransfer().withHandle(0)
.withSettled(false)
.withState(nullValue())
.withDeliveryId(0)
.withDeliveryTag(new byte[] {0})
.accept();
Connection connection = engine.start().open();
Session session = connection.session().open();
Sender sender = session.sender("sender-1");
final AtomicBoolean deliverySentAfterSendable = new AtomicBoolean();
final AtomicReference<OutgoingDelivery> sentDelivery = new AtomicReference<>();
sender.creditStateUpdateHandler(link -> {
if (link.isSendable()) {
sentDelivery.set(link.next().setTag(new byte[] {0}).writeBytes(payload));
deliverySentAfterSendable.set(true);
}
});
sender.deliveryStateUpdatedHandler((delivery) -> {
if (delivery.isRemotelySettled()) {
delivery.settle();
}
});
sender.open();
peer.waitForScriptToComplete(5, TimeUnit.SECONDS);
peer.expectDetach().respond();
peer.expectClose().respond();
// Should not send a flow as the send fulfilled the requested drain amount.
sender.drained();
sender.close();
connection.close();
peer.waitForScriptToComplete(5, TimeUnit.SECONDS);
}
@Test
public void testSenderAppliesDeliveryTagGeneratorToNextDelivery() throws Exception {
Engine engine = EngineFactory.PROTON.createNonSaslEngine();
engine.errorHandler(result -> failure = result.failureCause());
ProtonTestConnector peer = createTestPeer(engine);
ProtonBuffer payload = ProtonByteBufferAllocator.DEFAULT.wrap(new byte[] {0, 1, 2, 3, 4});
peer.expectAMQPHeader().respondWithAMQPHeader();
peer.expectOpen().respond();
peer.expectBegin().respond();
peer.expectAttach().withRole(Role.SENDER.getValue()).respond();
peer.remoteFlow().withIncomingWindow(10).withLinkCredit(10).queue();
Connection connection = engine.start().open();
Session session = connection.session().open();
Sender sender = session.sender("sender-1");
sender.setDeliveryTagGenerator(ProtonDeliveryTagGenerator.BUILTIN.SEQUENTIAL.createGenerator());
sender.deliveryStateUpdatedHandler((delivery) -> {
delivery.settle();
});
sender.open();
peer.waitForScriptToComplete(5, TimeUnit.SECONDS);
peer.expectTransfer().withNonNullPayload()
.withDeliveryTag(new byte[] {0}).accept();
peer.expectTransfer().withNonNullPayload()
.withDeliveryTag(new byte[] {1}).accept();
peer.expectTransfer().withNonNullPayload()
.withDeliveryTag(new byte[] {2}).accept();
OutgoingDelivery delivery1 = sender.next();
delivery1.writeBytes(payload.duplicate());
OutgoingDelivery delivery2 = sender.next();
delivery2.writeBytes(payload.duplicate());
OutgoingDelivery delivery3 = sender.next();
delivery3.writeBytes(payload.duplicate());
peer.waitForScriptToComplete(5, TimeUnit.SECONDS);
assertNotNull(delivery1);
assertTrue(delivery1.isSettled());
assertTrue(delivery1.isRemotelySettled());
assertNotNull(delivery2);
assertTrue(delivery2.isSettled());
assertTrue(delivery2.isRemotelySettled());
assertNotNull(delivery3);
assertTrue(delivery3.isSettled());
assertTrue(delivery3.isRemotelySettled());
peer.expectDetach().respond();
peer.expectClose().respond();
sender.close();
connection.close();
peer.waitForScriptToComplete(5, TimeUnit.SECONDS);
}
@Test
public void testSenderAppliedGeneratedDeliveryTagCanBeOverridden() throws Exception {
Engine engine = EngineFactory.PROTON.createNonSaslEngine();
engine.errorHandler(result -> failure = result.failureCause());
ProtonTestConnector peer = createTestPeer(engine);
final byte [] payloadBuffer = new byte[] {0, 1, 2, 3, 4};
peer.expectAMQPHeader().respondWithAMQPHeader();
peer.expectOpen().respond().withContainerId("driver");
peer.expectBegin().respond();
peer.expectAttach().withRole(Role.SENDER.getValue()).respond();
peer.remoteFlow().withDeliveryCount(0)
.withLinkCredit(10)
.withIncomingWindow(1024)
.withOutgoingWindow(10)
.withNextIncomingId(0)
.withNextOutgoingId(1).queue();
Connection connection = engine.start();
connection.open();
Session session = connection.session();
session.open();
ProtonBuffer payload = ProtonByteBufferAllocator.DEFAULT.wrap(payloadBuffer);
Sender sender = session.sender("sender-1");
assertFalse(sender.isSendable());
final DeliveryTagGenerator generator = ProtonDeliveryTagGenerator.BUILTIN.POOLED.createGenerator();
sender.setDeliveryTagGenerator(generator);
sender.open();
peer.waitForScriptToComplete();
peer.expectTransfer().withHandle(0)
.withSettled(false)
.withState(nullValue())
.withDeliveryId(0)
.withDeliveryTag(new byte[] {127})
.withPayload(payloadBuffer);
peer.expectDetach().withHandle(0).respond();
OutgoingDelivery delivery = sender.next();
DeliveryTag oldTag = delivery.getTag();
delivery.setTag(new byte[] {127});
// Pooled tag should be reused.
assertSame(oldTag, generator.nextTag());
delivery.writeBytes(payload);
sender.close();
peer.waitForScriptToComplete();
assertNull(failure);
}
@Test
public void testSenderReleasesPooledDeliveryTagsAfterSettledByBoth() throws Exception {
doTestSenderReleasesPooledDeliveryTags(false, true);
}
@Test
public void testSenderReleasesPooledDeliveryTagsAfterSettledAfterDispositionUpdate() throws Exception {
doTestSenderReleasesPooledDeliveryTags(false, false);
}
@Test
public void testSenderReleasesPooledDeliveryTagsSenderSettlesFirst() throws Exception {
doTestSenderReleasesPooledDeliveryTags(true, false);
}
private void doTestSenderReleasesPooledDeliveryTags(boolean sendSettled, boolean receiverSettles) throws Exception {
Engine engine = EngineFactory.PROTON.createNonSaslEngine();
engine.errorHandler(result -> failure = result.failureCause());
ProtonTestConnector peer = createTestPeer(engine);
ProtonBuffer payload = ProtonByteBufferAllocator.DEFAULT.wrap(new byte[] {0, 1, 2, 3, 4});
peer.expectAMQPHeader().respondWithAMQPHeader();
peer.expectOpen().respond();
peer.expectBegin().respond();
peer.expectAttach().respond();
peer.remoteFlow().withDeliveryCount(0).withLinkCredit(10).queue();
Connection connection = engine.start().open();
Session session = connection.session().open();
Sender sender = session.sender("sender-1");
sender.setDeliveryTagGenerator(ProtonDeliveryTagGenerator.BUILTIN.POOLED.createGenerator());
sender.open();
peer.waitForScriptToComplete(5, TimeUnit.SECONDS);
final int toSend = sender.getCredit();
final byte[] expectedTag = new byte[] {0};
List<OutgoingDelivery> sent = new ArrayList<>(toSend);
for (int i = 0; i < toSend; ++i) {
peer.expectTransfer().withHandle(0)
.withSettled(sendSettled)
.withState(nullValue())
.withDeliveryId(i)
.withMore(false)
.withDeliveryTag(expectedTag)
.respond()
.withSettled(receiverSettles)
.withState().accepted();
if (!sendSettled && !receiverSettles) {
peer.expectDisposition().withFirst(i)
.withSettled(true)
.withState(nullValue());
}
}
for (int i = 0; i < toSend; ++i) {
OutgoingDelivery delivery = sender.next();
if (sendSettled) {
delivery.settle();
}
delivery.writeBytes(payload.duplicate());
if (!sendSettled) {
delivery.settle();
}
}
peer.waitForScriptToComplete(5, TimeUnit.SECONDS);
sent.forEach(delivery -> assertEquals(delivery.getTag().tagBytes() , expectedTag));
peer.expectDetach().respond();
peer.expectClose().respond();
// Should not send a flow as the send fulfilled the requested drain amount.
sender.drained();
sender.close();
connection.close();
peer.waitForScriptToComplete(5, TimeUnit.SECONDS);
}
@Test
public void testSenderHandlesDelayedDispositionsForSentTransfers() throws Exception {
Engine engine = EngineFactory.PROTON.createNonSaslEngine();
engine.errorHandler(result -> failure = result.failureCause());
ProtonTestConnector peer = createTestPeer(engine);
ProtonBuffer payload = ProtonByteBufferAllocator.DEFAULT.wrap(new byte[] {0, 1, 2, 3, 4});
peer.expectAMQPHeader().respondWithAMQPHeader();
peer.expectOpen().respond();
peer.expectBegin().respond();
peer.expectAttach().withRole(Role.SENDER.getValue()).respond();
peer.remoteFlow().withLinkCredit(10).queue();
Connection connection = engine.start().open();
Session session = connection.session().open();
Sender sender = session.sender("sender-1");
sender.setDeliveryTagGenerator(ProtonDeliveryTagGenerator.BUILTIN.SEQUENTIAL.createGenerator());
sender.deliveryStateUpdatedHandler((delivery) -> {
delivery.settle();
});
sender.open();
peer.waitForScriptToComplete(5, TimeUnit.SECONDS);
peer.expectTransfer().withNonNullPayload()
.withDeliveryTag(new byte[] {0});
peer.expectTransfer().withNonNullPayload()
.withDeliveryTag(new byte[] {1});
peer.expectTransfer().withNonNullPayload()
.withDeliveryTag(new byte[] {2});
OutgoingDelivery delivery1 = sender.next();
delivery1.writeBytes(payload.duplicate());
OutgoingDelivery delivery2 = sender.next();
delivery2.writeBytes(payload.duplicate());
OutgoingDelivery delivery3 = sender.next();
delivery3.writeBytes(payload.duplicate());
peer.waitForScriptToComplete(5, TimeUnit.SECONDS);
assertNotNull(delivery1);
assertNotNull(delivery2);
assertNotNull(delivery3);
peer.remoteDisposition().withRole(Role.RECEIVER.getValue())
.withFirst(0)
.withSettled(true)
.withState().accepted().now();
assertTrue(delivery1.isSettled());
assertTrue(delivery1.isRemotelySettled());
assertFalse(delivery2.isSettled());
assertFalse(delivery2.isRemotelySettled());
assertFalse(delivery3.isSettled());
assertFalse(delivery3.isRemotelySettled());
peer.remoteDisposition().withRole(Role.RECEIVER.getValue())
.withFirst(1)
.withSettled(true)
.withState().accepted().now();
assertTrue(delivery1.isSettled());
assertTrue(delivery1.isRemotelySettled());
assertTrue(delivery2.isSettled());
assertTrue(delivery2.isRemotelySettled());
assertFalse(delivery3.isSettled());
assertFalse(delivery3.isRemotelySettled());
peer.remoteDisposition().withRole(Role.RECEIVER.getValue())
.withFirst(2)
.withSettled(true)
.withState().accepted().now();
assertTrue(delivery1.isSettled());
assertTrue(delivery1.isRemotelySettled());
assertTrue(delivery2.isSettled());
assertTrue(delivery2.isRemotelySettled());
assertTrue(delivery3.isSettled());
assertTrue(delivery3.isRemotelySettled());
peer.expectDetach().respond();
peer.expectClose().respond();
sender.close();
connection.close();
peer.waitForScriptToComplete(5, TimeUnit.SECONDS);
}
@Test
public void testNoDispositionSentWhenNoStateOrSettlementRequested() {
Engine engine = EngineFactory.PROTON.createNonSaslEngine();
engine.errorHandler(result -> failure = result.failureCause());
ProtonTestConnector peer = createTestPeer(engine);
peer.expectAMQPHeader().respondWithAMQPHeader();
peer.expectOpen().respond();
peer.expectBegin().respond();
peer.expectAttach().withRole(Role.SENDER.getValue()).respond();
Connection connection = engine.start().open();
Session session = connection.session().open();
Sender sender = session.sender("sender").open();
final AtomicBoolean sender1MarkedSendable = new AtomicBoolean();
sender.creditStateUpdateHandler(handler -> {
sender1MarkedSendable.set(handler.isSendable());
});
peer.remoteFlow().withHandle(0)
.withDeliveryCount(0)
.withLinkCredit(10)
.withIncomingWindow(1024)
.withOutgoingWindow(10)
.withNextIncomingId(1)
.withNextOutgoingId(1).now();
assertTrue(sender1MarkedSendable.get(), "Sender 1 should now be sendable");
// Frames are not multiplexed for large deliveries as we write the full
// writable portion out when a write is called.
peer.expectTransfer().withHandle(0)
.withSettled(false)
.withNullState()
.withDeliveryId(0)
.withMore(false)
.withDeliveryTag(new byte[] {1});
ProtonBuffer messageContent1 = createContentBuffer(32);
OutgoingDelivery delivery1 = sender.next();
delivery1.setTag(new byte[] { 1 });
delivery1.writeBytes(messageContent1);
// No action requested so no frame should be emitted.
delivery1.disposition(null, false);
peer.waitForScriptToComplete();
peer.expectDisposition().withState().accepted();
delivery1.disposition(Accepted.getInstance(), true);
peer.expectClose().respond();
connection.close();
peer.waitForScriptToComplete();
assertNull(failure);
}
@Test
public void testCannotAlterMessageFormatAfterInitialBytesWritten() throws Exception {
Engine engine = EngineFactory.PROTON.createNonSaslEngine();
engine.errorHandler(result -> failure = result.failureCause());
ProtonTestConnector peer = createTestPeer(engine);
byte[] payload = new byte[] {0, 1, 2, 3, 4};
peer.expectAMQPHeader().respondWithAMQPHeader();
peer.expectOpen().respond().withContainerId("driver");
peer.expectBegin().respond();
peer.expectAttach().withRole(Role.SENDER.getValue()).respond();
peer.remoteFlow().withDeliveryCount(0)
.withLinkCredit(10)
.withIncomingWindow(1024)
.withOutgoingWindow(10)
.withNextIncomingId(0)
.withNextOutgoingId(1).queue();
peer.expectTransfer().withHandle(0)
.withMore(true)
.withSettled(false)
.withState(nullValue())
.withMessageFormat(42)
.withDeliveryId(0)
.withDeliveryTag(new byte[] {0})
.withPayload(payload);
peer.expectTransfer().withHandle(0)
.withState(nullValue())
.withDeliveryId(0)
.withMessageFormat(42)
.withAborted(anyOf(nullValue(), is(false)))
.withSettled(false)
.withMore(anyOf(nullValue(), is(false)))
.withPayload(payload);
peer.expectDetach().withHandle(0).respond();
Connection connection = engine.start();
connection.open();
Session session = connection.session();
session.open();
Sender sender = session.sender("sender-1");
sender.open();
final AtomicBoolean senderMarkedSendable = new AtomicBoolean();
sender.creditStateUpdateHandler(handler -> {
senderMarkedSendable.set(sender.isSendable());
});
final OutgoingDelivery delivery = sender.next();
assertNotNull(delivery);
delivery.setTag(new byte[] {0});
delivery.setMessageFormat(42);
delivery.streamBytes(ProtonByteBufferAllocator.DEFAULT.wrap(payload), false);
assertThrows(IllegalStateException.class, () -> delivery.setMessageFormat(43));
assertDoesNotThrow(() -> delivery.setMessageFormat(42));
delivery.streamBytes(ProtonByteBufferAllocator.DEFAULT.wrap(payload), true);
assertFalse(delivery.isAborted());
assertFalse(delivery.isPartial());
assertFalse(delivery.isSettled());
sender.close();
peer.waitForScriptToComplete();
assertNull(failure);
}
@Test
public void testCanUpdateAcceptedStateAfterInitialTransferAndSettle() throws Exception {
doTestCanUpdateStateAndOrSettleAfterInitialTransfer(true);
}
@Test
public void testCanUpdateAcceptedStateAfterInitialTransferDoNotSettle() throws Exception {
doTestCanUpdateStateAndOrSettleAfterInitialTransfer(false);
}
private void doTestCanUpdateStateAndOrSettleAfterInitialTransfer(boolean settle) throws Exception {
Engine engine = EngineFactory.PROTON.createNonSaslEngine();
engine.errorHandler(result -> failure = result.failureCause());
ProtonTestConnector peer = createTestPeer(engine);
byte[] payload = new byte[] {0, 1, 2, 3, 4};
peer.expectAMQPHeader().respondWithAMQPHeader();
peer.expectOpen().respond().withContainerId("driver");
peer.expectBegin().respond();
peer.expectAttach().withRole(Role.SENDER.getValue()).respond();
peer.remoteFlow().withDeliveryCount(0)
.withLinkCredit(10)
.withIncomingWindow(1024)
.withOutgoingWindow(10)
.withNextIncomingId(0)
.withNextOutgoingId(1).queue();
peer.expectTransfer().withHandle(0)
.withMore(true)
.withSettled(false)
.withState(nullValue())
.withMessageFormat(42)
.withDeliveryId(0)
.withDeliveryTag(new byte[] {0})
.withPayload(payload);
peer.expectTransfer().withHandle(0)
.withState().accepted()
.withDeliveryId(0)
.withMessageFormat(42)
.withAborted(anyOf(nullValue(), is(false)))
.withSettled(settle)
.withMore(anyOf(nullValue(), is(false)))
.withPayload(payload);
peer.expectDetach().withHandle(0).respond();
Connection connection = engine.start();
connection.open();
Session session = connection.session();
session.open();
Sender sender = session.sender("sender-1");
sender.open();
final AtomicBoolean senderMarkedSendable = new AtomicBoolean();
sender.creditStateUpdateHandler(handler -> {
senderMarkedSendable.set(sender.isSendable());
});
final OutgoingDelivery delivery = sender.next();
assertNotNull(delivery);
delivery.setTag(new byte[] {0});
delivery.setMessageFormat(42);
delivery.streamBytes(ProtonByteBufferAllocator.DEFAULT.wrap(payload), false);
assertThrows(IllegalStateException.class, () -> delivery.setMessageFormat(43));
assertDoesNotThrow(() -> delivery.setMessageFormat(42));
delivery.disposition(Accepted.getInstance(), settle);
delivery.streamBytes(ProtonByteBufferAllocator.DEFAULT.wrap(payload), true);
assertFalse(delivery.isAborted());
assertFalse(delivery.isPartial());
if (settle) {
assertTrue(delivery.isSettled());
} else {
assertFalse(delivery.isSettled());
}
sender.close();
peer.waitForScriptToComplete();
assertNull(failure);
}
@Test
public void testSenderNotSendableWhenRemoteIncomingWindowIsZero() throws Exception {
Engine engine = EngineFactory.PROTON.createNonSaslEngine();
engine.errorHandler(result -> failure = result.failureCause());
ProtonTestConnector peer = createTestPeer(engine);
byte[] payload = new byte[] {0, 1, 2, 3, 4};
peer.expectAMQPHeader().respondWithAMQPHeader();
peer.expectOpen().respond().withContainerId("driver");
peer.expectBegin().respond();
peer.expectAttach().withRole(Role.SENDER.getValue()).respond();
peer.remoteFlow().withDeliveryCount(0)
.withLinkCredit(10)
.withIncomingWindow(0)
.withOutgoingWindow(10)
.withNextIncomingId(0)
.withNextOutgoingId(1).queue();
peer.expectDetach().withHandle(0).respond();
Connection connection = engine.start().open();
Session session = connection.session().open();
Sender sender = session.sender("sender-1").open();
final OutgoingDelivery delivery = sender.next();
assertNotNull(delivery);
delivery.setTag(new byte[] {0});
delivery.streamBytes(ProtonByteBufferAllocator.DEFAULT.wrap(payload), false);
assertFalse(sender.isSendable());
sender.close();
peer.waitForScriptToComplete();
assertNull(failure);
}
@Test
public void testSenderBecomesSendableAfterRemoteIncomingWindowExpanded() throws Exception {
Engine engine = EngineFactory.PROTON.createNonSaslEngine();
engine.errorHandler(result -> failure = result.failureCause());
ProtonTestConnector peer = createTestPeer(engine);
byte[] payload = new byte[] {0, 1, 2, 3, 4};
peer.expectAMQPHeader().respondWithAMQPHeader();
peer.expectOpen().respond().withContainerId("driver");
peer.expectBegin().respond();
peer.expectAttach().withRole(Role.SENDER.getValue()).respond();
peer.remoteFlow().withDeliveryCount(0)
.withLinkCredit(10)
.withIncomingWindow(0)
.withOutgoingWindow(10)
.withNextIncomingId(0)
.withNextOutgoingId(1).queue();
Connection connection = engine.start().open();
Session session = connection.session().open();
Sender sender = session.sender("sender-1");
{
// Not expecting an update as we weren't yet able to send and still aren't
final CountDownLatch senderCreditUpdated = new CountDownLatch(1);
sender.creditStateUpdateHandler(handler -> {
senderCreditUpdated.countDown();
});
sender.open();
assertTrue(senderCreditUpdated.await(5, TimeUnit.MILLISECONDS));
assertFalse(sender.isSendable());
}
final OutgoingDelivery delivery = sender.next();
assertNotNull(delivery);
delivery.setTag(new byte[] {0});
// Shouldn't generate any frames as there's no session capacity
delivery.streamBytes(ProtonByteBufferAllocator.DEFAULT.wrap(payload), false);
{
final CountDownLatch senderCreditUpdated = new CountDownLatch(1);
sender.creditStateUpdateHandler(handler -> {
senderCreditUpdated.countDown();
});
peer.remoteFlow().withDeliveryCount(0)
.withLinkCredit(10)
.withIncomingWindow(1)
.withOutgoingWindow(10)
.withNextIncomingId(0)
.withNextOutgoingId(0).now();
assertTrue(senderCreditUpdated.await(10, TimeUnit.SECONDS));
assertTrue(sender.isSendable());
}
peer.expectTransfer().withHandle(0)
.withMore(false)
.withSettled(false)
.withState(nullValue())
.withDeliveryId(0)
.withDeliveryTag(new byte[] {0})
.withPayload(payload);
peer.expectDetach().withHandle(0).respond();
delivery.writeBytes(ProtonByteBufferAllocator.DEFAULT.wrap(payload));
assertFalse(sender.isSendable());
sender.close();
peer.waitForScriptToComplete();
assertNull(failure);
}
@Test
public void testSenderBecomesSendableAfterRemoteIncomingWindowExpandedSessionFlowSentBeforeAttach() throws Exception {
Engine engine = EngineFactory.PROTON.createNonSaslEngine();
engine.errorHandler(result -> failure = result.failureCause());
ProtonTestConnector peer = createTestPeer(engine);
byte[] payload = new byte[] {0, 1, 2, 3, 4};
peer.expectAMQPHeader().respondWithAMQPHeader();
peer.expectOpen().respond().withContainerId("driver");
peer.expectBegin().respond();
peer.remoteFlow().withNullHandle()
.withIncomingWindow(0)
.withOutgoingWindow(10)
.withNextIncomingId(0)
.withNextOutgoingId(0).queue();
peer.expectAttach().withRole(Role.SENDER.getValue()).respond();
Connection connection = engine.start().open();
Session session = connection.session().open();
Sender sender = session.sender("sender-1").open();
assertFalse(sender.isSendable());
final OutgoingDelivery delivery = sender.next();
assertNotNull(delivery);
delivery.setTag(new byte[] {0});
// Shouldn't generate any frames as there's no session capacity
delivery.streamBytes(ProtonByteBufferAllocator.DEFAULT.wrap(payload), false);
{
final CountDownLatch senderCreditUpdated = new CountDownLatch(1);
sender.creditStateUpdateHandler(handler -> {
senderCreditUpdated.countDown();
});
peer.remoteFlow().withDeliveryCount(0)
.withLinkCredit(10)
.withIncomingWindow(1)
.withOutgoingWindow(10)
.withNextIncomingId(0)
.withNextOutgoingId(0).now();
assertTrue(senderCreditUpdated.await(10, TimeUnit.SECONDS));
assertTrue(sender.isSendable());
}
peer.expectTransfer().withHandle(0)
.withMore(false)
.withSettled(false)
.withState(nullValue())
.withDeliveryId(0)
.withDeliveryTag(new byte[] {0})
.withPayload(payload);
peer.expectDetach().withHandle(0).respond();
delivery.writeBytes(ProtonByteBufferAllocator.DEFAULT.wrap(payload));
assertFalse(sender.isSendable());
sender.close();
peer.waitForScriptToComplete();
assertNull(failure);
}
@Test
public void testSessionRevokesIncomingWindowSetsSenderStateToNotSendableViaDirectLinkFlow() throws Exception {
Engine engine = EngineFactory.PROTON.createNonSaslEngine();
engine.errorHandler(result -> failure = result.failureCause());
ProtonTestConnector peer = createTestPeer(engine);
byte[] payload = new byte[] {0, 1, 2, 3, 4};
peer.expectAMQPHeader().respondWithAMQPHeader();
peer.expectOpen().respond().withContainerId("driver");
peer.expectBegin().respond();
peer.remoteFlow().withIncomingWindow(1)
.withOutgoingWindow(10)
.withNextIncomingId(0)
.withNextOutgoingId(0).queue();
peer.expectAttach().withRole(Role.SENDER.getValue()).respond();
Connection connection = engine.start().open();
Session session = connection.session().open();
Sender sender = session.sender("sender-1").open();
assertFalse(sender.isSendable());
{
final CountDownLatch senderCreditUpdated = new CountDownLatch(1);
sender.creditStateUpdateHandler(handler -> {
senderCreditUpdated.countDown();
});
peer.remoteFlow().withDeliveryCount(0)
.withLinkCredit(1)
.withIncomingWindow(1)
.withOutgoingWindow(10)
.withNextIncomingId(0)
.withNextOutgoingId(0).now();
assertTrue(senderCreditUpdated.await(10, TimeUnit.SECONDS));
assertTrue(sender.isSendable());
}
{
final CountDownLatch senderCreditUpdated = new CountDownLatch(1);
sender.creditStateUpdateHandler(handler -> {
senderCreditUpdated.countDown();
});
peer.remoteFlow().withDeliveryCount(0)
.withLinkCredit(1)
.withIncomingWindow(0)
.withOutgoingWindow(10)
.withNextIncomingId(0)
.withNextOutgoingId(0).now();
assertTrue(senderCreditUpdated.await(10, TimeUnit.SECONDS));
assertFalse(sender.isSendable());
}
peer.expectDetach().withHandle(0).respond();
// Should not generate any outgoing transfers as the delivery is not sendable
final OutgoingDelivery delivery = sender.next();
delivery.setTag(new byte[] {0});
delivery.writeBytes(ProtonByteBufferAllocator.DEFAULT.wrap(payload));
sender.close();
peer.waitForScriptToComplete();
assertNull(failure);
}
@Test
public void testSessionRevokesIncomingWindowSetsSenderStateToNotSendableViaSessionFlow() throws Exception {
Engine engine = EngineFactory.PROTON.createNonSaslEngine();
engine.errorHandler(result -> failure = result.failureCause());
ProtonTestConnector peer = createTestPeer(engine);
byte[] payload = new byte[] {0, 1, 2, 3, 4};
peer.expectAMQPHeader().respondWithAMQPHeader();
peer.expectOpen().respond().withContainerId("driver");
peer.expectBegin().respond();
peer.remoteFlow().withIncomingWindow(1)
.withOutgoingWindow(10)
.withNextIncomingId(0)
.withNextOutgoingId(0).queue();
peer.expectAttach().withRole(Role.SENDER.getValue()).respond();
Connection connection = engine.start().open();
Session session = connection.session().open();
Sender sender = session.sender("sender-1").open();
assertFalse(sender.isSendable());
{
final CountDownLatch senderCreditUpdated = new CountDownLatch(1);
sender.creditStateUpdateHandler(handler -> {
senderCreditUpdated.countDown();
});
peer.remoteFlow().withDeliveryCount(0)
.withLinkCredit(1)
.withIncomingWindow(1)
.withOutgoingWindow(10)
.withNextIncomingId(0)
.withNextOutgoingId(0).now();
assertTrue(senderCreditUpdated.await(10, TimeUnit.SECONDS));
assertTrue(sender.isSendable());
}
{
final CountDownLatch senderCreditUpdated = new CountDownLatch(1);
sender.creditStateUpdateHandler(handler -> {
senderCreditUpdated.countDown();
});
// Arrives at session level but impacts the links in the session.
peer.remoteFlow().withNullHandle()
.withIncomingWindow(0)
.withOutgoingWindow(10)
.withNextIncomingId(0)
.withNextOutgoingId(0).now();
assertTrue(senderCreditUpdated.await(10, TimeUnit.SECONDS));
assertFalse(sender.isSendable());
}
peer.expectDetach().withHandle(0).respond();
// Should not generate any outgoing transfers as the delivery is not sendable
final OutgoingDelivery delivery = sender.next();
delivery.setTag(new byte[] {0});
delivery.writeBytes(ProtonByteBufferAllocator.DEFAULT.wrap(payload));
sender.close();
peer.waitForScriptToComplete();
assertNull(failure);
}
@Test
public void testSenderOnlyWritesToSessionRemoteIncomingLimitWriteBytes() throws Exception {
doTestSenderOnlyWritesToSessionRemoteIncomingLimit(false);
}
@Test
public void testSenderOnlyWritesToSessionRemoteIncomingLimitStreamBytes() throws Exception {
doTestSenderOnlyWritesToSessionRemoteIncomingLimit(true);
}
private void doTestSenderOnlyWritesToSessionRemoteIncomingLimit(boolean streamBytes) throws Exception {
Engine engine = EngineFactory.PROTON.createNonSaslEngine();
engine.errorHandler(result -> failure = result.failureCause());
ProtonTestConnector peer = createTestPeer(engine);
byte[] payload = new byte[1536];
Arrays.fill(payload, (byte) 64);
ProtonBuffer payloadBuffer = ProtonByteBufferAllocator.DEFAULT.wrap(payload);
peer.expectAMQPHeader().respondWithAMQPHeader();
peer.expectOpen().respond().withContainerId("driver").withMaxFrameSize(1024);
peer.expectBegin().respond().withIncomingWindow(1);
peer.expectAttach().withRole(Role.SENDER.getValue()).respond();
peer.remoteFlow().withDeliveryCount(0)
.withLinkCredit(1)
.withIncomingWindow(1)
.withOutgoingWindow(10)
.withNextIncomingId(0)
.withNextOutgoingId(0).queue();
peer.expectTransfer().withHandle(0)
.withMore(true)
.withSettled(false)
.withState(nullValue())
.withDeliveryId(0)
.withDeliveryTag(new byte[] {0})
.withNonNullPayload();
Connection connection = engine.start().open();
Session session = connection.session().open();
Sender sender = session.sender("sender-1").open();
final OutgoingDelivery delivery = sender.next();
delivery.setTag(new byte[] {0});
if (streamBytes) {
delivery.streamBytes(payloadBuffer, true);
} else {
delivery.writeBytes(payloadBuffer);
}
assertTrue(delivery.isPartial());
assertTrue(payloadBuffer.isReadable());
assertNotEquals(payload.length, payloadBuffer.getReadableBytes());
assertFalse(sender.isSendable());
peer.remoteFlow().withIncomingWindow(1)
.withNextIncomingId(1)
.withLinkCredit(10).now();
assertTrue(sender.isSendable());
peer.expectTransfer().withHandle(0)
.withMore(false)
.withDeliveryId(0)
.withNonNullPayload();
peer.expectDetach().withHandle(0).respond();
if (streamBytes) {
delivery.streamBytes(payloadBuffer, true);
} else {
delivery.writeBytes(payloadBuffer);
}
assertFalse(delivery.isPartial());
assertFalse(payloadBuffer.isReadable());
assertFalse(sender.isSendable());
sender.close();
peer.waitForScriptToComplete();
assertNull(failure);
}
@Test
public void testSenderUpdateDeliveryUpdatedEventHandlerInDelivery() throws InterruptedException {
Engine engine = EngineFactory.PROTON.createNonSaslEngine();
engine.errorHandler(result -> failure = result.failureCause());
ProtonTestConnector peer = createTestPeer(engine);
ProtonBuffer payload = ProtonByteBufferAllocator.DEFAULT.wrap(new byte[] {0, 1, 2, 3, 4});
peer.expectAMQPHeader().respondWithAMQPHeader();
peer.expectOpen().respond();
peer.expectBegin().respond();
peer.expectAttach().ofSender().respond();
peer.remoteFlow().withDeliveryCount(0).withLinkCredit(1).queue();
peer.expectTransfer().withHandle(0)
.withSettled(false)
.withState(nullValue())
.withDeliveryId(0)
.withDeliveryTag(new byte[] {0})
.respond()
.withSettled(true)
.withState().accepted();
Connection connection = engine.start().open();
Session session = connection.session().open();
Sender sender = session.sender("sender-1");
final CountDownLatch stateUpdated = new CountDownLatch(1);
sender.creditStateUpdateHandler(link -> {
if (link.isSendable()) {
OutgoingDelivery delivery = sender.next();
delivery.deliveryStateUpdatedHandler((outgoing) -> {
stateUpdated.countDown();
});
delivery.setTag(new byte[] {0});
delivery.writeBytes(payload);
}
});
sender.open();
peer.waitForScriptToComplete(5, TimeUnit.SECONDS);
peer.expectDetach().respond();
peer.expectClose().respond();
assertTrue(stateUpdated.await(5, TimeUnit.SECONDS));
sender.close();
connection.close();
peer.waitForScriptToComplete(5, TimeUnit.SECONDS);
}
@Test
public void testTransferCountTracksOutgoingDeliveryLifecycle() throws Exception {
Engine engine = EngineFactory.PROTON.createNonSaslEngine();
engine.errorHandler(result -> failure = result.failureCause());
ProtonTestConnector peer = createTestPeer(engine);
byte[] payload = new byte[] {0, 1, 2, 3, 4};
peer.expectAMQPHeader().respondWithAMQPHeader();
peer.expectOpen().respond().withContainerId("driver");
peer.expectBegin().respond();
peer.expectAttach().withRole(Role.SENDER.getValue()).respond();
peer.remoteFlow().withDeliveryCount(0).withLinkCredit(10).queue();
peer.expectTransfer().withHandle(0)
.withMore(true)
.withDeliveryId(0)
.withDeliveryTag(new byte[] {0})
.withPayload(payload);
peer.expectTransfer().withHandle(0)
.withMore(true)
.withDeliveryId(0)
.withDeliveryTag(anyOf(nullValue(), is(new byte[] {0})))
.withPayload(payload);
peer.expectTransfer().withHandle(0)
.withState(nullValue())
.withDeliveryId(0)
.withDeliveryTag(new byte[] {0})
.withAborted(true)
.withMore(anyOf(nullValue(), is(false)))
.withNullPayload();
peer.expectDetach().withHandle(0).respond();
Connection connection = engine.start().open();
Session session = connection.session().open();
Sender sender = session.sender("sender-1").open();
OutgoingDelivery delivery = sender.next();
assertNotNull(delivery);
assertEquals(0, delivery.getTransferCount());
delivery.setTag(new byte[] {0});
delivery.streamBytes(ProtonByteBufferAllocator.DEFAULT.wrap(payload));
assertEquals(1, delivery.getTransferCount());
delivery.streamBytes(ProtonByteBufferAllocator.DEFAULT.wrap(payload));
assertEquals(2, delivery.getTransferCount());
delivery.abort();
assertEquals(2, delivery.getTransferCount());
assertTrue(delivery.isAborted());
assertFalse(delivery.isPartial());
assertTrue(delivery.isSettled());
sender.close();
peer.waitForScriptToComplete();
assertNull(failure);
}
@Test
public void testDispositionFilterAppliesToOnlySubsetOfUnsettledMapSettledAndAccepted() {
testDispositionFilterAppliesToOnlySubsetOfUnsettledMap(true, true);
}
@Test
public void testDispositionFilterAppliesToOnlySubsetOfUnsettledMapSettledOnly() {
testDispositionFilterAppliesToOnlySubsetOfUnsettledMap(true, false);
}
@Test
public void testDispositionFilterAppliesToOnlySubsetOfUnsettledMapAcceptedOnly() {
testDispositionFilterAppliesToOnlySubsetOfUnsettledMap(false, true);
}
private void testDispositionFilterAppliesToOnlySubsetOfUnsettledMap(boolean settled, boolean accepted) {
Engine engine = EngineFactory.PROTON.createNonSaslEngine();
engine.errorHandler(result -> failure = result.failureCause());
ProtonTestConnector peer = createTestPeer(engine);
ProtonBuffer payload = ProtonByteBufferAllocator.DEFAULT.wrap(new byte[] {0, 1, 2, 3, 4});
peer.expectAMQPHeader().respondWithAMQPHeader();
peer.expectOpen().respond();
peer.expectBegin().respond();
peer.expectAttach().respond();
peer.remoteFlow().withLinkCredit(10).queue();
peer.expectTransfer().withHandle(0)
.withMore(false)
.withDeliveryId(0)
.withDeliveryTag(new byte[] {0})
.withNonNullPayload();
peer.expectTransfer().withHandle(0)
.withMore(false)
.withDeliveryId(1)
.withDeliveryTag(new byte[] {1})
.withNonNullPayload();
peer.expectTransfer().withHandle(0)
.withMore(false)
.withDeliveryId(2)
.withDeliveryTag(new byte[] {2})
.withNonNullPayload();
if (!accepted) {
peer.expectDisposition().withFirst(1).withSettled(settled).withState(nullValue());
} else {
peer.expectDisposition().withFirst(1).withSettled(settled).withState().accepted();
}
peer.expectDetach().respond();
peer.expectClose().respond();
Connection connection = engine.start().open();
Session session = connection.session().open();
Sender sender = session.sender("sender-1");
sender.creditStateUpdateHandler(link -> link.drained());
sender.open();
OutgoingDelivery delivery1 = sender.next();
delivery1.setTag(new byte[] { 0 });
delivery1.writeBytes(payload.duplicate());
OutgoingDelivery delivery2 = sender.next();
delivery2.setTag(new byte[] { 1 });
delivery2.writeBytes(payload.duplicate());
OutgoingDelivery delivery3 = sender.next();
delivery3.setTag(new byte[] { 2 });
delivery3.writeBytes(payload.duplicate());
sender.disposition((delivery) -> {
if (delivery.getTag().tagBuffer().equals(ProtonByteBufferAllocator.DEFAULT.wrap(new byte[] {1}))) {
return true;
} else {
return false;
}
}, accepted ? Accepted.getInstance() : null, settled);
assertEquals(7, sender.getCredit());
sender.close();
connection.close();
peer.waitForScriptToComplete(5, TimeUnit.SECONDS);
}
@Test
public void testSenderReportsDeliveryUpdatedOnDispositionForMultipleTransfers() throws Exception {
final Engine engine = EngineFactory.PROTON.createNonSaslEngine();
engine.errorHandler(result -> failure = result.failureCause());
final ProtonTestConnector peer = createTestPeer(engine);
final byte[] payload = new byte[] {0, 1, 2, 3, 4};
peer.expectAMQPHeader().respondWithAMQPHeader();
peer.expectOpen().respond().withContainerId("driver");
peer.expectBegin().respond();
peer.expectAttach().respond();
peer.remoteFlow().withLinkCredit(2).queue();
peer.expectTransfer().withDeliveryId(0)
.withDeliveryTag(new byte[] {0})
.withMore(false)
.withPayload(payload);
peer.expectTransfer().withDeliveryId(1)
.withDeliveryTag(new byte[] {1})
.withMore(false)
.withPayload(payload);
peer.remoteDisposition().withSettled(true)
.withRole(Role.RECEIVER.getValue())
.withState().accepted()
.withFirst(0)
.withLast(1).queue();
Connection connection = engine.start().open();
Session session = connection.session().open();
Sender sender = session.sender("test");
final AtomicInteger dispositionCounter = new AtomicInteger();
final ArrayList<OutgoingDelivery> deliveries = new ArrayList<>();
sender.deliveryStateUpdatedHandler(delivery -> {
if (delivery.isRemotelySettled()) {
dispositionCounter.incrementAndGet();
deliveries.add(delivery);
}
});
sender.open();
OutgoingDelivery delivery1 = sender.next();
delivery1.setTag(new byte[] { 0 });
delivery1.writeBytes(ProtonByteBufferAllocator.DEFAULT.wrap(payload));
OutgoingDelivery delivery2 = sender.next();
delivery2.setTag(new byte[] { 1 });
delivery2.writeBytes(ProtonByteBufferAllocator.DEFAULT.wrap(payload));
peer.waitForScriptToComplete();
peer.expectDetach().respond();
sender.close();
assertEquals(2, deliveries.size(), "Not all deliveries received dispositions");
byte deliveryTag = 0;
for (OutgoingDelivery delivery : deliveries) {
assertEquals(deliveryTag++, delivery.getTag().tagBuffer().getByte(0), "Delivery not updated in correct order");
assertTrue(delivery.isRemotelySettled(), "Delivery should be marked as remotely settled");
}
peer.waitForScriptToComplete();
assertNull(failure);
}
@Test
public void testSenderReportsDeliveryUpdatedOnDispositionForMultipleTransfersInsideTheRange() throws Exception {
final Engine engine = EngineFactory.PROTON.createNonSaslEngine();
engine.errorHandler(result -> failure = result.failureCause());
final ProtonTestConnector peer = createTestPeer(engine);
final byte[] payload = new byte[] {0, 1, 2, 3, 4};
peer.expectAMQPHeader().respondWithAMQPHeader();
peer.expectOpen().respond().withContainerId("driver");
peer.expectBegin().respond();
peer.expectAttach().respond();
peer.remoteFlow().withLinkCredit(10).queue();
for (int i = 0; i < 10; ++i) {
peer.expectTransfer().withDeliveryId(i)
.withDeliveryTag(new byte[] {(byte) i})
.withMore(false)
.withPayload(payload);
}
peer.remoteDisposition().withSettled(true)
.withRole(Role.RECEIVER.getValue())
.withState().accepted()
.withFirst(1)
.withLast(3).queue();
Connection connection = engine.start().open();
Session session = connection.session().open();
Sender sender = session.sender("test");
final AtomicInteger dispositionCounter = new AtomicInteger();
final ArrayList<OutgoingDelivery> deliveries = new ArrayList<>();
sender.deliveryStateUpdatedHandler(delivery -> {
if (delivery.isRemotelySettled()) {
dispositionCounter.incrementAndGet();
deliveries.add(delivery);
delivery.settle();
}
});
sender.open();
for (int i = 0; i < 10; ++i) {
OutgoingDelivery delivery = sender.next();
delivery.setTag(new byte[] { (byte) i });
delivery.writeBytes(ProtonByteBufferAllocator.DEFAULT.wrap(payload));
}
peer.waitForScriptToComplete();
peer.expectDetach().respond();
assertEquals(7, sender.unsettled().size());
sender.close();
assertEquals(3, deliveries.size(), "Not all deliveries received dispositions");
byte deliveryTag = 1;
for (OutgoingDelivery delivery : deliveries) {
assertEquals(deliveryTag++, delivery.getTag().tagBuffer().getByte(0), "Delivery not updated in correct order");
assertTrue(delivery.isRemotelySettled(), "Delivery should be marked as remotely settled");
}
peer.waitForScriptToComplete();
assertNull(failure);
}
@Test
public void testSenderReportsIsSendableAfterOpenedIfRemoteSendsFlowBeforeLocallyOpened() throws Exception {
Engine engine = EngineFactory.PROTON.createNonSaslEngine();
engine.errorHandler(result -> failure = result.failureCause());
ProtonTestConnector peer = createTestPeer(engine);
peer.expectAMQPHeader().respondWithAMQPHeader();
peer.expectOpen().respond().withContainerId("driver");
peer.expectBegin().respond();
peer.remoteAttach().withName("receiver")
.withHandle(0)
.withRole(Role.RECEIVER.getValue())
.withInitialDeliveryCount(0)
.onChannel(0).queue();
peer.remoteFlow().withLinkCredit(1).queue();
peer.expectAttach();
peer.expectDetach().respond();
final AtomicBoolean senderRemotelyOpened = new AtomicBoolean();
final AtomicReference<Sender> sender = new AtomicReference<>();
Connection connection = engine.start();
connection.senderOpenHandler(result -> {
senderRemotelyOpened.set(true);
sender.set(result);
});
// Default engine should start and return a connection immediately
assertNotNull(connection);
connection.open();
connection.session().open();
assertTrue(senderRemotelyOpened.get(), "Sender remote opened event did not fire");
assertFalse(sender.get().isSendable());
sender.get().open();
assertTrue(sender.get().isSendable());
sender.get().close();
peer.waitForScriptToComplete();
assertNull(failure);
}
@Test
public void testWriteThatExceedConfiguredSessionIncomingCreditLimitOnTransfer() throws Exception {
Engine engine = EngineFactory.PROTON.createNonSaslEngine();
engine.errorHandler(result -> failure = result.failureCause());
ProtonTestConnector peer = createTestPeer(engine);
peer.expectAMQPHeader().respondWithAMQPHeader();
peer.expectOpen().respond();
peer.expectBegin().withNextOutgoingId(0).respond();
peer.expectAttach().ofSender().respond();
Connection connection = engine.start().setMaxFrameSize(1024).open();
Session session = connection.session().open();
Sender sender = session.sender("test").open();
int payloadOutstanding = 4800;
final byte[] bytes = new byte[payloadOutstanding];
Arrays.fill(bytes, (byte) 1);
ProtonBuffer payload = ProtonByteBufferAllocator.DEFAULT.wrap(bytes);
OutgoingDelivery delivery = sender.next().setTag(new byte[] { 0 });
assertEquals(payload.getReadableBytes(), payloadOutstanding);
delivery.writeBytes(payload);
assertEquals(payload.getReadableBytes(), payloadOutstanding);
peer.waitForScriptToComplete(5, TimeUnit.SECONDS);
peer.remoteFlow().withIncomingWindow(1).withNextIncomingId(0).withLinkCredit(10).now();
peer.expectTransfer().withNonNullPayload().withMore(true);
delivery.writeBytes(payload);
assertTrue(payload.getReadableBytes() < payloadOutstanding); // Leave space for Transfer
payloadOutstanding = payload.getReadableBytes();
peer.waitForScriptToComplete(5, TimeUnit.SECONDS);
peer.remoteFlow().withIncomingWindow(1).withNextIncomingId(1).withLinkCredit(10).now();
peer.expectTransfer().withNonNullPayload().withMore(true);
delivery.writeBytes(payload);
assertTrue(payload.getReadableBytes() < payloadOutstanding, "Expected < " + payloadOutstanding + " but was: " + payload.getReadableBytes());
payloadOutstanding = payload.getReadableBytes();
peer.waitForScriptToComplete(5, TimeUnit.SECONDS);
peer.remoteFlow().withIncomingWindow(1).withNextIncomingId(2).withLinkCredit(10).now();
peer.expectTransfer().withNonNullPayload().withMore(true);
delivery.writeBytes(payload);
assertTrue(payload.getReadableBytes() < payloadOutstanding, "Expected < " + payloadOutstanding + " but was: " + payload.getReadableBytes());
payloadOutstanding = payload.getReadableBytes();
peer.waitForScriptToComplete(5, TimeUnit.SECONDS);
peer.remoteFlow().withIncomingWindow(1).withNextIncomingId(3).withLinkCredit(10).now();
peer.expectTransfer().withNonNullPayload().withMore(true);
delivery.writeBytes(payload);
assertTrue(payload.getReadableBytes() < payloadOutstanding, "Expected < " + payloadOutstanding + " but was: " + payload.getReadableBytes());
payloadOutstanding = payload.getReadableBytes();
peer.waitForScriptToComplete(5, TimeUnit.SECONDS);
peer.remoteFlow().withIncomingWindow(1).withNextIncomingId(4).withLinkCredit(10).now();
peer.expectTransfer().withNonNullPayload().withMore(false).accept();
delivery.writeBytes(payload);
assertEquals(0 , payload.getReadableBytes());
peer.waitForScriptToComplete(500, TimeUnit.SECONDS);
peer.expectDetach().respond();
peer.expectClose().respond();
sender.close();
connection.close();
peer.waitForScriptToComplete();
assertNull(failure);
}
@Test
public void testWriteThatExceedsConfiguredSessionIncomingCreditLimitOnTransferFromCreditUpdatedhandler() throws Exception {
Engine engine = EngineFactory.PROTON.createNonSaslEngine();
engine.errorHandler(result -> failure = result.failureCause());
ProtonTestConnector peer = createTestPeer(engine);
peer.expectAMQPHeader().respondWithAMQPHeader();
peer.expectOpen().respond();
peer.expectBegin().withNextOutgoingId(0).respond();
peer.expectAttach().ofSender().respond();
Connection connection = engine.start().setMaxFrameSize(1024).open();
Session session = connection.session().open();
Sender sender = session.sender("test").open();
int payloadOutstanding = 4800;
final byte[] bytes = new byte[payloadOutstanding];
Arrays.fill(bytes, (byte) 1);
ProtonBuffer payload = ProtonByteBufferAllocator.DEFAULT.wrap(bytes);
final OutgoingDelivery delivery = sender.next().setTag(new byte[] { 0 });
assertEquals(payload.getReadableBytes(), payloadOutstanding);
delivery.writeBytes(payload);
assertEquals(payload.getReadableBytes(), payloadOutstanding);
sender.creditStateUpdateHandler((theSender) -> {
delivery.writeBytes(payload);
});
peer.waitForScriptToComplete(5, TimeUnit.SECONDS);
peer.expectTransfer().withNonNullPayload().withMore(true);
peer.remoteFlow().withIncomingWindow(1).withNextIncomingId(0).withLinkCredit(10).now();
assertTrue(payload.getReadableBytes() < payloadOutstanding); // Leave space for Transfer
payloadOutstanding = payload.getReadableBytes();
peer.waitForScriptToComplete(5, TimeUnit.SECONDS);
peer.expectTransfer().withNonNullPayload().withMore(true);
peer.remoteFlow().withIncomingWindow(1).withNextIncomingId(1).withLinkCredit(10).now();
assertTrue(payload.getReadableBytes() < payloadOutstanding, "Expected < " + payloadOutstanding + " but was: " + payload.getReadableBytes());
payloadOutstanding = payload.getReadableBytes();
peer.waitForScriptToComplete(5, TimeUnit.SECONDS);
peer.expectTransfer().withNonNullPayload().withMore(true);
peer.remoteFlow().withIncomingWindow(1).withNextIncomingId(2).withLinkCredit(10).now();
assertTrue(payload.getReadableBytes() < payloadOutstanding, "Expected < " + payloadOutstanding + " but was: " + payload.getReadableBytes());
payloadOutstanding = payload.getReadableBytes();
peer.waitForScriptToComplete(5, TimeUnit.SECONDS);
peer.expectTransfer().withNonNullPayload().withMore(true);
peer.remoteFlow().withIncomingWindow(1).withNextIncomingId(3).withLinkCredit(10).now();
assertTrue(payload.getReadableBytes() < payloadOutstanding, "Expected < " + payloadOutstanding + " but was: " + payload.getReadableBytes());
payloadOutstanding = payload.getReadableBytes();
peer.waitForScriptToComplete(5, TimeUnit.SECONDS);
peer.expectTransfer().withNonNullPayload().withMore(false).accept();
peer.remoteFlow().withIncomingWindow(1).withNextIncomingId(4).withLinkCredit(10).now();
assertEquals(0 , payload.getReadableBytes());
peer.waitForScriptToComplete(500, TimeUnit.SECONDS);
peer.expectDetach().respond();
peer.expectClose().respond();
sender.close();
connection.close();
peer.waitForScriptToComplete();
assertNull(failure);
}
}