blob: 078d076a368040880519d38f52d6d1482c918161 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.qpid.protonj2.engine.impl;
import static org.hamcrest.CoreMatchers.notNullValue;
import static org.hamcrest.CoreMatchers.nullValue;
import static org.junit.jupiter.api.Assertions.assertArrayEquals;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertNotNull;
import static org.junit.jupiter.api.Assertions.assertNull;
import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.junit.jupiter.api.Assertions.fail;
import java.io.IOException;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.qpid.protonj2.codec.EncodeException;
import org.apache.qpid.protonj2.engine.Connection;
import org.apache.qpid.protonj2.engine.ConnectionState;
import org.apache.qpid.protonj2.engine.Engine;
import org.apache.qpid.protonj2.engine.EngineFactory;
import org.apache.qpid.protonj2.engine.Sender;
import org.apache.qpid.protonj2.engine.Session;
import org.apache.qpid.protonj2.engine.exceptions.EngineFailedException;
import org.apache.qpid.protonj2.engine.exceptions.EngineStateException;
import org.apache.qpid.protonj2.engine.exceptions.FrameEncodingException;
import org.apache.qpid.protonj2.test.driver.ProtonTestConnector;
import org.apache.qpid.protonj2.test.driver.matchers.types.UnsignedIntegerMatcher;
import org.apache.qpid.protonj2.test.driver.matchers.types.UnsignedShortMatcher;
import org.apache.qpid.protonj2.types.Symbol;
import org.apache.qpid.protonj2.types.transport.AMQPHeader;
import org.apache.qpid.protonj2.types.transport.ConnectionError;
import org.apache.qpid.protonj2.types.transport.ErrorCondition;
import org.hamcrest.Matcher;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.Timeout;
/**
* Tests for behaviors of the ProtonConnection class
*/
@Timeout(20)
public class ProtonConnectionTest extends ProtonEngineTestSupport {
@Test
public void testConnectionSyncStateAfterEngineStarted() throws Exception {
Engine engine = EngineFactory.PROTON.createNonSaslEngine();
engine.errorHandler(result -> failure = result.failureCause());
ProtonTestConnector peer = createTestPeer(engine);
Connection connection = engine.connection();
Session session = connection.session().open();
Sender sender = session.sender("test").open();
connection.open();
assertEquals(connection, connection.getParent());
peer.waitForScriptToComplete();
peer.expectAMQPHeader().respondWithAMQPHeader();
peer.expectOpen().respond();
peer.expectBegin().respond();
peer.expectAttach().ofSender().respond();
peer.expectDetach().respond();
peer.expectEnd().respond();
peer.expectClose().respond();
engine.start();
sender.close();
session.close();
connection.close();
peer.waitForScriptToComplete();
assertNull(failure);
}
@Test
public void testNegotiateSendsAMQPHeader() throws Exception {
Engine engine = EngineFactory.PROTON.createNonSaslEngine();
engine.errorHandler(result -> failure = result.failureCause());
ProtonTestConnector peer = createTestPeer(engine);
peer.expectAMQPHeader().respondWithAMQPHeader();
Connection connection = engine.start();
connection.negotiate();
peer.waitForScriptToComplete();
assertNull(failure);
}
@Test
public void testNegotiateSendsAMQPHeaderAndFireRemoteHeaderEvent() throws Exception {
Engine engine = EngineFactory.PROTON.createNonSaslEngine();
engine.errorHandler(result -> failure = result.failureCause());
ProtonTestConnector peer = createTestPeer(engine);
peer.expectAMQPHeader().respondWithAMQPHeader();
Connection connection = engine.start();
connection.negotiate();
peer.waitForScriptToComplete();
assertNull(failure);
}
@Test
public void testNegotiateSendsAMQPHeaderEnforcesNotNullEventHandler() throws Exception {
Engine engine = EngineFactory.PROTON.createNonSaslEngine();
engine.errorHandler(result -> failure = result.failureCause());
ProtonTestConnector peer = createTestPeer(engine);
Connection connection = engine.start();
try {
connection.negotiate(null);
fail("Should not allow null event handler");
} catch (NullPointerException npe) {
// Expected
}
peer.waitForScriptToComplete();
assertNull(failure);
}
@Test
public void testNegotiateDoesNotSendAMQPHeaderAfterOpen() throws Exception {
Engine engine = EngineFactory.PROTON.createNonSaslEngine();
engine.errorHandler(result -> failure = result.failureCause());
ProtonTestConnector peer = createTestPeer(engine);
final AtomicInteger headerReceivedCallback = new AtomicInteger();
peer.expectAMQPHeader().respondWithAMQPHeader();
peer.expectOpen().respond().withContainerId("driver");
peer.expectClose().respond();
Connection connection = engine.start();
connection.open();
connection.negotiate(amqpHeader -> headerReceivedCallback.incrementAndGet());
assertEquals(1, headerReceivedCallback.get());
connection.negotiate(amqpHeader -> headerReceivedCallback.incrementAndGet());
assertEquals(2, headerReceivedCallback.get());
connection.close();
peer.waitForScriptToComplete();
assertNull(failure);
}
@Test
public void testConnectionEmitsOpenAndCloseEvents() throws Exception {
Engine engine = EngineFactory.PROTON.createNonSaslEngine();
engine.errorHandler(result -> failure = result.failureCause());
ProtonTestConnector peer = createTestPeer(engine);
final AtomicBoolean connectionLocalOpen = new AtomicBoolean();
final AtomicBoolean connectionLocalClose = new AtomicBoolean();
final AtomicBoolean connectionRemoteOpen = new AtomicBoolean();
final AtomicBoolean connectionRemoteClose = new AtomicBoolean();
peer.expectAMQPHeader().respondWithAMQPHeader();
peer.expectOpen().respond().withContainerId("driver");
peer.expectClose().respond();
Connection connection = engine.start();
connection.localOpenHandler(result -> connectionLocalOpen.set(true))
.localCloseHandler(result -> connectionLocalClose.set(true))
.openHandler(result -> connectionRemoteOpen.set(true))
.closeHandler(result -> connectionRemoteClose.set(true));
connection.open();
connection.close();
assertTrue(connectionLocalOpen.get(), "Connection should have reported local open");
assertTrue(connectionLocalClose.get(), "Connection should have reported local close");
assertTrue(connectionRemoteOpen.get(), "Connection should have reported remote open");
assertTrue(connectionRemoteClose.get(), "Connection should have reported remote close");
peer.waitForScriptToComplete();
assertNull(failure);
}
@Test
public void testConnectionPopulatesRemoteData() throws Exception {
Engine engine = EngineFactory.PROTON.createNonSaslEngine();
engine.errorHandler(result -> failure = result.failureCause());
ProtonTestConnector peer = createTestPeer(engine);
Symbol[] offeredCapabilities = new Symbol[] { Symbol.valueOf("one"), Symbol.valueOf("two") };
Symbol[] desiredCapabilities = new Symbol[] { Symbol.valueOf("three"), Symbol.valueOf("four") };
Map<String, Object> expectedProperties = new HashMap<>();
expectedProperties.put("test", "value");
Map<Symbol, Object> properties = new HashMap<>();
properties.put(Symbol.valueOf("test"), "value");
peer.expectAMQPHeader().respondWithAMQPHeader();
peer.expectOpen().respond().withContainerId("test")
.withHostname("localhost")
.withIdleTimeOut(60000)
.withOfferedCapabilities(new String[] { "one", "two" })
.withDesiredCapabilities(new String[] { "three", "four" })
.withProperties(expectedProperties);
peer.expectClose();
Connection connection = engine.start().open();
assertEquals("test", connection.getRemoteContainerId());
assertEquals("localhost", connection.getRemoteHostname());
assertEquals(60000, connection.getRemoteIdleTimeout());
assertArrayEquals(offeredCapabilities, connection.getRemoteOfferedCapabilities());
assertArrayEquals(desiredCapabilities, connection.getRemoteDesiredCapabilities());
assertEquals(properties, connection.getRemoteProperties());
connection.close();
peer.waitForScriptToComplete();
assertNull(failure);
}
@Test
public void testOpenAndCloseConnectionWithNullSetsOnConnectionOptions() throws IOException {
Engine engine = EngineFactory.PROTON.createNonSaslEngine();
engine.errorHandler(result -> failure = result.failureCause());
ProtonTestConnector peer = createTestPeer(engine);
peer.expectAMQPHeader().respondWithAMQPHeader();
peer.expectOpen().respond();
peer.expectClose();
Connection connection = engine.start();
connection.setProperties(null);
connection.setOfferedCapabilities((Symbol[]) null);
connection.setDesiredCapabilities((Symbol[]) null);
connection.setCondition(null);
connection.open();
assertNotNull(connection.getAttachments());
assertNull(connection.getProperties());
assertNull(connection.getOfferedCapabilities());
assertNull(connection.getDesiredCapabilities());
assertNull(connection.getCondition());
assertNull(connection.getRemoteProperties());
assertNull(connection.getRemoteOfferedCapabilities());
assertNull(connection.getRemoteDesiredCapabilities());
assertNull(connection.getRemoteCondition());
connection.close();
peer.waitForScriptToComplete();
assertNull(failure);
}
@Test
public void testEndpointEmitsEngineShutdownEvent() throws Exception {
Engine engine = EngineFactory.PROTON.createNonSaslEngine();
engine.errorHandler(result -> failure = result.failureCause());
ProtonTestConnector peer = createTestPeer(engine);
final AtomicBoolean engineShutdown = new AtomicBoolean();
peer.expectAMQPHeader().respondWithAMQPHeader();
peer.expectOpen().respond();
peer.expectClose().respond();
Connection connection = engine.start();
connection.engineShutdownHandler(result -> engineShutdown.set(true));
connection.open();
connection.close();
engine.shutdown();
assertTrue(engineShutdown.get(), "Connection should have reported engine shutdown");
peer.waitForScriptToComplete();
assertNull(failure);
}
@Test
public void testConnectionOpenAndCloseAreIdempotent() throws Exception {
Engine engine = EngineFactory.PROTON.createNonSaslEngine();
engine.errorHandler(result -> failure = result.failureCause());
ProtonTestConnector peer = createTestPeer(engine);
peer.expectAMQPHeader().respondWithAMQPHeader();
peer.expectOpen().respond().withContainerId("driver");
peer.expectClose().respond();
Connection connection = engine.start();
// Default engine should start and return a connection immediately
assertNotNull(connection);
connection.open();
// Should not emit another open frame
connection.open();
connection.close();
// Should not emit another close frame
connection.close();
peer.waitForScriptToComplete();
assertNull(failure);
}
@Test
public void testConnectionRemoteOpenTriggeredWhenRemoteOpenArrives() throws EngineStateException {
Engine engine = EngineFactory.PROTON.createNonSaslEngine();
engine.errorHandler(result -> failure = result.failureCause());
ProtonTestConnector peer = createTestPeer(engine);
peer.expectAMQPHeader().respondWithAMQPHeader();
peer.expectOpen().respond().withContainerId("driver");
final AtomicBoolean remoteOpened = new AtomicBoolean();
Connection connection = engine.start();
// Default engine should start and return a connection immediately
assertNotNull(connection);
connection.openHandler((result) -> {
remoteOpened.set(true);
});
connection.open();
peer.waitForScriptToComplete();
assertTrue(remoteOpened.get());
assertNull(failure);
}
@Test
public void testConnectionRemoteOpenTriggeredWhenRemoteOpenArrivesBeforeLocalOpen() throws EngineStateException {
Engine engine = EngineFactory.PROTON.createNonSaslEngine();
engine.errorHandler(result -> failure = result.failureCause());
ProtonTestConnector peer = createTestPeer(engine);
final AtomicBoolean remoteOpened = new AtomicBoolean();
Connection connection = engine.start();
// Default engine should start and return a connection immediately
assertNotNull(connection);
connection.openHandler((result) -> {
remoteOpened.set(true);
});
peer.expectAMQPHeader();
// Remote Header will prompt local response and then remote open should trigger
// the connection handler to fire so that user knows remote opened.
peer.remoteHeader(AMQPHeader.getAMQPHeader().toArray()).now();
peer.remoteOpen().now();
peer.waitForScriptToComplete();
assertTrue(remoteOpened.get());
assertNull(failure);
}
@Test
public void testConnectionRemoteCloseTriggeredWhenRemoteCloseArrives() throws EngineStateException {
Engine engine = EngineFactory.PROTON.createNonSaslEngine();
engine.errorHandler(result -> failure = result.failureCause());
ProtonTestConnector peer = createTestPeer(engine);
peer.expectAMQPHeader().respondWithAMQPHeader();
peer.expectOpen().respond().withContainerId("driver");
peer.expectClose().respond();
final AtomicBoolean connectionOpenedSignaled = new AtomicBoolean();
final AtomicBoolean connectionClosedSignaled = new AtomicBoolean();
Connection connection = engine.start();
// Default engine should start and return a connection immediately
assertNotNull(connection);
connection.openHandler(result -> {
connectionOpenedSignaled.set(true);
});
connection.closeHandler(result -> {
connectionClosedSignaled.set(true);
});
connection.open();
assertEquals(ConnectionState.ACTIVE, connection.getState());
assertEquals(ConnectionState.ACTIVE, connection.getRemoteState());
connection.close();
peer.waitForScriptToComplete();
assertEquals(ConnectionState.CLOSED, connection.getState());
assertEquals(ConnectionState.CLOSED, connection.getRemoteState());
assertNull(failure);
}
@Test
public void testConnectionOpenCarriesAllSetValues() throws IOException {
doTestConnectionOpenPopulatesOpenCorrectly(true, true, true, true, true);
}
@Test
public void testConnectionOpenCarriesDefaultMaxFrameSize() throws IOException {
doTestConnectionOpenPopulatesOpenCorrectly(false, false, false, false, false);
}
@Test
public void testConnectionOpenCarriesSetMaxFrameSize() throws IOException {
doTestConnectionOpenPopulatesOpenCorrectly(true, false, false, false, false);
}
@Test
public void testConnectionOpenCarriesDefaultContainerId() throws IOException {
doTestConnectionOpenPopulatesOpenCorrectly(false, false, false, false, false);
}
@Test
public void testConnectionOpenCarriesSetContainerId() throws IOException {
doTestConnectionOpenPopulatesOpenCorrectly(false, true, false, false, false);
}
@Test
public void testConnectionOpenCarriesDefaultChannelMax() throws IOException {
doTestConnectionOpenPopulatesOpenCorrectly(false, false, false, false, false);
}
@Test
public void testConnectionOpenCarriesSetChannelMax() throws IOException {
doTestConnectionOpenPopulatesOpenCorrectly(false, false, true, false, false);
}
@Test
public void testConnectionOpenCarriesNoHostname() throws IOException {
doTestConnectionOpenPopulatesOpenCorrectly(false, false, false, false, false);
}
@Test
public void testConnectionOpenCarriesSetHostname() throws IOException {
doTestConnectionOpenPopulatesOpenCorrectly(false, false, false, true, false);
}
@Test
public void testConnectionOpenCarriesNoidleTimeout() throws IOException {
doTestConnectionOpenPopulatesOpenCorrectly(false, false, false, false, false);
}
@Test
public void testConnectionOpenCarriesSetIdleTimeout() throws IOException {
doTestConnectionOpenPopulatesOpenCorrectly(false, false, false, false, true);
}
private void doTestConnectionOpenPopulatesOpenCorrectly(boolean setMaxFrameSize, boolean setContainerId, boolean setChannelMax,
boolean setHostname, boolean setIdleTimeout) {
final int expectedMaxFrameSize = 32767;
final Matcher<?> expectedMaxFrameSizeMatcher;
if (setMaxFrameSize) {
expectedMaxFrameSizeMatcher = new UnsignedIntegerMatcher(expectedMaxFrameSize);
} else {
expectedMaxFrameSizeMatcher = new UnsignedIntegerMatcher(ProtonConstants.DEFAULT_MAX_AMQP_FRAME_SIZE);
}
String expectedContainerId = "";
if (setContainerId) {
expectedContainerId = "test";
}
final short expectedChannelMax = 512;
final Matcher<?> expectedChannelMaxMatcher;
if (setChannelMax) {
expectedChannelMaxMatcher = new UnsignedShortMatcher(expectedChannelMax);
} else {
expectedChannelMaxMatcher = nullValue();
}
String expectedHostname = null;
if (setHostname) {
expectedHostname = "localhost";
}
final int expectedIdleTimeout = 60000;
final Matcher<?> expectedIdleTimeoutMatcher;
if (setIdleTimeout) {
expectedIdleTimeoutMatcher = new UnsignedIntegerMatcher(expectedIdleTimeout);
} else {
expectedIdleTimeoutMatcher = nullValue();
}
Engine engine = EngineFactory.PROTON.createNonSaslEngine();
engine.errorHandler(result -> failure = result.failureCause());
ProtonTestConnector peer = createTestPeer(engine);
peer.expectAMQPHeader().respondWithAMQPHeader();
peer.expectOpen().withMaxFrameSize(expectedMaxFrameSizeMatcher)
.withChannelMax(expectedChannelMaxMatcher)
.withContainerId(expectedContainerId)
.withHostname(expectedHostname)
.withIdleTimeOut(expectedIdleTimeoutMatcher)
.withIncomingLocales(nullValue())
.withOutgoingLocales(nullValue())
.withDesiredCapabilities(nullValue())
.withOfferedCapabilities(nullValue())
.withProperties(nullValue())
.respond().withContainerId("driver");
peer.expectClose().respond();
Connection connection = engine.start();
if (setMaxFrameSize) {
connection.setMaxFrameSize(expectedMaxFrameSize);
}
if (setContainerId) {
connection.setContainerId(expectedContainerId);
}
if (setChannelMax) {
connection.setChannelMax(expectedChannelMax);
}
if (setHostname) {
connection.setHostname(expectedHostname);
}
if (setIdleTimeout) {
connection.setIdleTimeout(expectedIdleTimeout);
}
assertEquals(expectedContainerId, connection.getContainerId());
assertEquals(expectedHostname, connection.getHostname());
connection.open();
connection.close();
peer.waitForScriptToComplete();
assertNull(failure);
}
@Test
public void testCapabilitiesArePopulatedAndAccessible() throws Exception {
final Symbol clientOfferedSymbol = Symbol.valueOf("clientOfferedCapability");
final Symbol clientDesiredSymbol = Symbol.valueOf("clientDesiredCapability");
final Symbol serverOfferedSymbol = Symbol.valueOf("serverOfferedCapability");
final Symbol serverDesiredSymbol = Symbol.valueOf("serverDesiredCapability");
Symbol[] clientOfferedCapabilities = new Symbol[] { clientOfferedSymbol };
Symbol[] clientDesiredCapabilities = new Symbol[] { clientDesiredSymbol };
Symbol[] clientExpectedOfferedCapabilities = new Symbol[] { serverOfferedSymbol };
Symbol[] clientExpectedDesiredCapabilities = new Symbol[] { serverDesiredSymbol };
final AtomicBoolean remotelyOpened = new AtomicBoolean();
Engine engine = EngineFactory.PROTON.createNonSaslEngine();
engine.errorHandler(result -> failure = result.failureCause());
ProtonTestConnector peer = createTestPeer(engine);
peer.expectAMQPHeader().respondWithAMQPHeader();
peer.expectOpen().withOfferedCapabilities(new String[] { clientOfferedSymbol.toString() })
.withDesiredCapabilities(new String[] { clientDesiredSymbol.toString() })
.respond()
.withDesiredCapabilities(new String[] { serverDesiredSymbol.toString() })
.withOfferedCapabilities(new String[] { serverOfferedSymbol.toString() });
peer.expectClose().respond();
Connection connection = engine.start();
connection.setDesiredCapabilities(clientDesiredCapabilities);
connection.setOfferedCapabilities(clientOfferedCapabilities);
connection.openHandler(result -> {
remotelyOpened.set(true);
});
connection.open();
assertTrue(remotelyOpened.get(), "Connection remote opened event did not fire");
assertArrayEquals(clientOfferedCapabilities, connection.getOfferedCapabilities());
assertArrayEquals(clientDesiredCapabilities, connection.getDesiredCapabilities());
assertArrayEquals(clientExpectedOfferedCapabilities, connection.getRemoteOfferedCapabilities());
assertArrayEquals(clientExpectedDesiredCapabilities, connection.getRemoteDesiredCapabilities());
connection.close();
peer.waitForScriptToComplete();
assertNull(failure);
}
@Test
public void testPropertiesArePopulatedAndAccessible() throws Exception {
final Symbol clientPropertyName = Symbol.valueOf("ClientPropertyName");
final Integer clientPropertyValue = 1234;
final Symbol serverPropertyName = Symbol.valueOf("ServerPropertyName");
final Integer serverPropertyValue = 5678;
Map<String, Object> clientPropertiesMap = new HashMap<>();
clientPropertiesMap.put(clientPropertyName.toString(), clientPropertyValue);
Map<String, Object> serverPropertiesMap = new HashMap<>();
serverPropertiesMap.put(serverPropertyName.toString(), serverPropertyValue);
final AtomicBoolean remotelyOpened = new AtomicBoolean();
Engine engine = EngineFactory.PROTON.createNonSaslEngine();
engine.errorHandler(result -> failure = result.failureCause());
ProtonTestConnector peer = createTestPeer(engine);
peer.expectAMQPHeader().respondWithAMQPHeader();
peer.expectOpen().withProperties(clientPropertiesMap)
.respond()
.withProperties(serverPropertiesMap);
peer.expectClose().respond();
Connection connection = engine.start();
Map<Symbol, Object> clientProperties = new HashMap<>();
clientProperties.put(clientPropertyName, clientPropertyValue);
connection.setProperties(clientProperties);
connection.openHandler(result -> {
remotelyOpened.set(true);
});
connection.open();
assertTrue(remotelyOpened.get(), "Connection remote opened event did not fire");
assertNotNull(connection.getProperties());
assertNotNull(connection.getRemoteProperties());
assertEquals(clientPropertyValue, connection.getProperties().get(clientPropertyName));
assertEquals(serverPropertyValue, connection.getRemoteProperties().get(serverPropertyName));
connection.close();
peer.waitForScriptToComplete();
assertNull(failure);
}
@Test
public void testOpenedCarriesRemoteErrorCondition() throws Exception {
Map<String, Object> errorInfoExpectation = new HashMap<>();
errorInfoExpectation.put("error", "value");
errorInfoExpectation.put("error-list", Arrays.asList("entry-1", "entry-2", "entry-3"));
Map<Symbol, Object> errorInfo = new HashMap<>();
errorInfo.put(Symbol.getSymbol("error"), "value");
errorInfo.put(Symbol.getSymbol("error-list"), Arrays.asList("entry-1", "entry-2", "entry-3"));
ErrorCondition remoteCondition = new ErrorCondition(Symbol.getSymbol("myerror"), "mydescription", errorInfo);
final AtomicBoolean remotelyOpened = new AtomicBoolean();
final AtomicBoolean remotelyClosed = new AtomicBoolean();
Engine engine = EngineFactory.PROTON.createNonSaslEngine();
engine.errorHandler(result -> failure = result.failureCause());
ProtonTestConnector peer = createTestPeer(engine);
peer.expectAMQPHeader().respondWithAMQPHeader();
peer.expectOpen().respond();
peer.remoteClose().withErrorCondition("myerror", "mydescription", errorInfoExpectation).queue();
peer.expectClose();
Connection connection = engine.start();
connection.openHandler(result -> {
remotelyOpened.set(true);
});
connection.closeHandler(result -> {
remotelyClosed.set(true);
});
connection.open();
assertTrue(connection.isLocallyOpen());
assertFalse(connection.isLocallyClosed());
assertFalse(connection.isRemotelyOpen());
assertTrue(connection.isRemotelyClosed());
assertTrue(remotelyOpened.get(), "Connection remote opened event did not fire");
assertTrue(remotelyClosed.get(), "Connection remote closed event did not fire");
assertNull(connection.getCondition());
assertNotNull(connection.getRemoteCondition());
assertEquals(remoteCondition, connection.getRemoteCondition());
connection.close();
assertFalse(connection.isLocallyOpen());
assertTrue(connection.isLocallyClosed());
assertFalse(connection.isRemotelyOpen());
assertTrue(connection.isRemotelyClosed());
peer.waitForScriptToComplete();
assertNull(failure);
}
@Test
public void testEmptyFrameBeforeOpenDoesNotCauseError() throws Exception {
final AtomicBoolean remotelyOpened = new AtomicBoolean();
final AtomicBoolean remotelyClosed = new AtomicBoolean();
Engine engine = EngineFactory.PROTON.createNonSaslEngine();
engine.errorHandler(result -> failure = result.failureCause());
ProtonTestConnector peer = createTestPeer(engine);
peer.expectAMQPHeader().respondWithAMQPHeader();
peer.expectOpen();
peer.remoteEmptyFrame().queue();
peer.remoteOpen().queue();
peer.expectClose().respond();
Connection connection = engine.start();
connection.openHandler(result -> {
remotelyOpened.set(true);
});
connection.closeHandler(result -> {
remotelyClosed.set(true);
});
connection.open();
assertTrue(connection.isLocallyOpen());
assertFalse(connection.isLocallyClosed());
assertTrue(connection.isRemotelyOpen());
assertFalse(connection.isRemotelyClosed());
assertTrue(remotelyOpened.get(), "Connection remote opened event did not fire");
connection.close();
assertTrue(remotelyClosed.get(), "Connection remote closed event did not fire");
peer.waitForScriptToComplete();
assertNull(failure);
}
@Test
public void testChannelMaxDefaultsToMax() throws Exception {
final AtomicBoolean remotelyOpened = new AtomicBoolean();
Engine engine = EngineFactory.PROTON.createNonSaslEngine();
engine.errorHandler(result -> failure = result.failureCause());
ProtonTestConnector peer = createTestPeer(engine);
peer.expectAMQPHeader().respondWithAMQPHeader();
peer.expectOpen().withChannelMax(nullValue()).respond();
peer.expectClose().respond();
Connection connection = engine.start();
connection.openHandler(result -> {
remotelyOpened.set(true);
});
connection.open();
assertTrue(remotelyOpened.get(), "Connection remote opened event did not fire");
assertEquals(65535, connection.getChannelMax());
connection.close();
peer.waitForScriptToComplete();
assertNull(failure);
}
@Test
public void testChannelMaxRangeEnforced() throws Exception {
final AtomicBoolean remotelyOpened = new AtomicBoolean();
Engine engine = EngineFactory.PROTON.createNonSaslEngine();
engine.errorHandler(result -> failure = result.failureCause());
ProtonTestConnector peer = createTestPeer(engine);
final short eventualChannelMax = 255;
peer.expectAMQPHeader().respondWithAMQPHeader();
peer.expectOpen().withChannelMax(eventualChannelMax).respond();
peer.expectClose().respond();
Connection connection = engine.start();
connection.openHandler(result -> {
remotelyOpened.set(true);
});
try {
connection.setChannelMax(-1);
fail("Should not be able to set an invalid negative channel max");
} catch (IllegalArgumentException iae) {}
try {
connection.setChannelMax(65536);
fail("Should not be able to set an invalid to large channel max");
} catch (IllegalArgumentException iae) {}
connection.setChannelMax(eventualChannelMax);
connection.open();
assertTrue(remotelyOpened.get(), "Connection remote opened event did not fire");
assertEquals(eventualChannelMax, connection.getChannelMax());
connection.close();
peer.waitForScriptToComplete();
assertNull(failure);
}
@Test
public void testCloseConnectionAfterShutdownDoesNotThrowExceptionOpenWrittenAndResponse() throws Exception {
testCloseConnectionAfterShutdownNoOutputAndNoException(true, true);
}
@Test
public void testCloseConnectionAfterShutdownDoesNotThrowExceptionOpenWrittenButNoResponse() throws Exception {
testCloseConnectionAfterShutdownNoOutputAndNoException(true, false);
}
@Test
public void testCloseConnectionAfterShutdownDoesNotThrowExceptionOpenNotWritten() throws Exception {
testCloseConnectionAfterShutdownNoOutputAndNoException(false, false);
}
private void testCloseConnectionAfterShutdownNoOutputAndNoException(boolean respondToHeader, boolean respondToOpen) throws Exception {
Engine engine = EngineFactory.PROTON.createNonSaslEngine();
engine.errorHandler(result -> failure = result.failureCause());
ProtonTestConnector peer = createTestPeer(engine);
if (respondToHeader) {
peer.expectAMQPHeader().respondWithAMQPHeader();
if (respondToOpen) {
peer.expectOpen().respond();
} else {
peer.expectOpen();
}
} else {
peer.expectAMQPHeader();
}
Connection connection = engine.start();
connection.open();
engine.shutdown();
// Should clean up and not throw as we knowingly shutdown engine operations.
connection.close();
peer.waitForScriptToComplete();
assertNull(failure);
}
@Test
public void testCloseConnectionAfterFailureThrowsEngineStateExceptionOpenWrittenAndResponse() throws Exception {
testCloseConnectionAfterEngineFailedThrowsAndNoOutputWritten(true, true);
}
@Test
public void testCloseConnectionAfterFailureThrowsEngineStateExceptionOpenWrittenButNoResponse() throws Exception {
testCloseConnectionAfterEngineFailedThrowsAndNoOutputWritten(true, false);
}
@Test
public void testCloseConnectionAfterFailureThrowsEngineStateExceptionOpenNotWritten() throws Exception {
testCloseConnectionAfterEngineFailedThrowsAndNoOutputWritten(false, false);
}
private void testCloseConnectionAfterEngineFailedThrowsAndNoOutputWritten(boolean respondToHeader, boolean respondToOpen) throws Exception {
Engine engine = EngineFactory.PROTON.createNonSaslEngine();
engine.errorHandler(result -> failure = result.failureCause());
ProtonTestConnector peer = createTestPeer(engine);
if (respondToHeader) {
peer.expectAMQPHeader().respondWithAMQPHeader();
if (respondToOpen) {
peer.expectOpen().respond();
peer.expectClose();
} else {
peer.expectOpen();
peer.expectClose();
}
} else {
peer.expectAMQPHeader();
}
Connection connection = engine.start();
connection.open();
engine.engineFailed(new IOException());
try {
connection.close();
fail("Should throw exception indicating engine is in a failed state.");
} catch (EngineFailedException efe) {}
engine.shutdown(); // Explicit shutdown now allows local close to complete
// Should clean up and not throw as we knowingly shutdown engine operations.
connection.close();
peer.waitForScriptToComplete();
assertNotNull(failure);
}
@Test
public void testOpenAndCloseWhileWaitingForHeaderResponseDoesNotWriteUntilHeaderArrives() throws Exception {
Engine engine = EngineFactory.PROTON.createNonSaslEngine();
engine.errorHandler(result -> failure = result.failureCause());
ProtonTestConnector peer = createTestPeer(engine);
peer.expectAMQPHeader();
Connection connection = engine.start();
connection.open(); // Trigger write of AMQP Header, we don't respond here.
connection.close();
peer.waitForScriptToComplete();
// Now respond and Connection should open and close
peer.expectOpen();
peer.expectClose();
peer.remoteHeader(AMQPHeader.getAMQPHeader().toArray()).now();
peer.waitForScriptToComplete();
engine.shutdown();
assertNull(failure);
}
@Test
public void testOpenWhileWaitingForHeaderResponseDoesNotWriteThenWritesFlowAsExpected() throws Exception {
Engine engine = EngineFactory.PROTON.createNonSaslEngine();
engine.errorHandler(result -> failure = result.failureCause());
ProtonTestConnector peer = createTestPeer(engine);
peer.expectAMQPHeader();
Connection connection = engine.start();
connection.open(); // Trigger write of AMQP Header, we don't respond here.
peer.waitForScriptToComplete();
// Now respond and Connection should open and close
peer.expectOpen();
peer.expectClose().withError(notNullValue());
peer.remoteHeader(AMQPHeader.getAMQPHeader().toArray()).now();
connection.setCondition(new ErrorCondition(ConnectionError.CONNECTION_FORCED, "something about errors")).close();
peer.waitForScriptToComplete();
engine.shutdown();
assertNull(failure);
}
@Test
public void testCloseOrDetachWithErrorCondition() throws Exception {
final String condition = "amqp:connection:forced";
final String description = "something bad happened.";
Engine engine = EngineFactory.PROTON.createNonSaslEngine();
engine.errorHandler(result -> failure = result.failureCause());
ProtonTestConnector peer = createTestPeer(engine);
peer.expectAMQPHeader().respondWithAMQPHeader();
peer.expectOpen().respond();
peer.expectClose().withError(condition, description).respond();
Connection connection = engine.start();
connection.open();
connection.setCondition(new ErrorCondition(Symbol.valueOf(condition), description));
connection.close();
peer.waitForScriptToComplete(5, TimeUnit.SECONDS);
assertNull(failure);
}
@Test
public void testCannotCreateSessionFromLocallyClosedConnection() throws Exception {
testCannotCreateSessionFromClosedConnection(true);
}
@Test
public void testCannotCreateSessionFromRemotelyClosedConnection() throws Exception {
testCannotCreateSessionFromClosedConnection(false);
}
private void testCannotCreateSessionFromClosedConnection(boolean localClose) throws Exception {
Engine engine = EngineFactory.PROTON.createNonSaslEngine();
engine.errorHandler(result -> failure = result.failureCause());
ProtonTestConnector peer = createTestPeer(engine);
peer.expectAMQPHeader().respondWithAMQPHeader();
peer.expectOpen().respond();
if (localClose) {
peer.expectClose().respond();
} else {
peer.remoteClose().queue();
}
Connection connection = engine.start();
connection.open();
if (localClose) {
connection.close();
}
try {
connection.session();
fail("Should not create new Session from closed Connection");
} catch (IllegalStateException error) {
// Expected
}
peer.waitForScriptToComplete();
assertNull(failure);
}
@Test
public void testCannotSetContainerIdOnOpenConnection() throws Exception {
Engine engine = EngineFactory.PROTON.createNonSaslEngine();
engine.errorHandler(result -> failure = result.failureCause());
ProtonTestConnector peer = createTestPeer(engine);
peer.expectAMQPHeader().respondWithAMQPHeader();
peer.expectOpen().respond();
Connection connection = engine.start();
connection.open();
try {
connection.setContainerId("test");
fail("Should not be able to set container ID from open Connection");
} catch (IllegalStateException error) {
// Expected
}
peer.waitForScriptToComplete();
assertNull(failure);
}
@Test
public void testCannotSetContainerIdOnLocallyClosedConnection() throws Exception {
testCannotSetContainerIdOnClosedConnection(true);
}
@Test
public void testCannotSetContainerIdOnRemotelyClosedConnection() throws Exception {
testCannotSetContainerIdOnClosedConnection(false);
}
private void testCannotSetContainerIdOnClosedConnection(boolean localClose) throws Exception {
Engine engine = EngineFactory.PROTON.createNonSaslEngine();
engine.errorHandler(result -> failure = result.failureCause());
ProtonTestConnector peer = createTestPeer(engine);
peer.expectAMQPHeader().respondWithAMQPHeader();
peer.expectOpen().respond();
if (localClose) {
peer.expectClose().respond();
} else {
peer.remoteClose().queue();
}
Connection connection = engine.start();
connection.open();
if (localClose) {
connection.close();
}
try {
connection.setContainerId("test");
fail("Should not be able to set container ID from closed Connection");
} catch (IllegalStateException error) {
// Expected
}
peer.waitForScriptToComplete();
assertNull(failure);
}
@Test
public void testCannotSetHostnameOnOpenConnection() throws Exception {
Engine engine = EngineFactory.PROTON.createNonSaslEngine();
engine.errorHandler(result -> failure = result.failureCause());
ProtonTestConnector peer = createTestPeer(engine);
peer.expectAMQPHeader().respondWithAMQPHeader();
peer.expectOpen().respond();
Connection connection = engine.start();
connection.open();
try {
connection.setHostname("test");
fail("Should not be able to set host name from open Connection");
} catch (IllegalStateException error) {
// Expected
}
peer.waitForScriptToComplete();
assertNull(failure);
}
@Test
public void testCannotSetHostnameOnLocallyClosedConnection() throws Exception {
testCannotSetHostnameOnClosedConnection(true);
}
@Test
public void testCannotSetHostnameOnRemotelyClosedConnection() throws Exception {
testCannotSetHostnameOnClosedConnection(false);
}
private void testCannotSetHostnameOnClosedConnection(boolean localClose) throws Exception {
Engine engine = EngineFactory.PROTON.createNonSaslEngine();
engine.errorHandler(result -> failure = result.failureCause());
ProtonTestConnector peer = createTestPeer(engine);
peer.expectAMQPHeader().respondWithAMQPHeader();
peer.expectOpen().respond();
if (localClose) {
peer.expectClose().respond();
} else {
peer.remoteClose().queue();
}
Connection connection = engine.start();
connection.open();
if (localClose) {
connection.close();
}
try {
connection.setHostname("test");
fail("Should not be able to set host name from closed Connection");
} catch (IllegalStateException error) {
// Expected
}
peer.waitForScriptToComplete();
assertNull(failure);
}
@Test
public void testCannotSetChannelMaxOpenConnection() throws Exception {
Engine engine = EngineFactory.PROTON.createNonSaslEngine();
engine.errorHandler(result -> failure = result.failureCause());
ProtonTestConnector peer = createTestPeer(engine);
peer.expectAMQPHeader().respondWithAMQPHeader();
peer.expectOpen().respond();
Connection connection = engine.start();
connection.open();
try {
connection.setChannelMax(0);
fail("Should not be able to set channel max from open Connection");
} catch (IllegalStateException error) {
// Expected
}
peer.waitForScriptToComplete();
assertNull(failure);
}
@Test
public void testCannotSetChannelMaxOnLocallyClosedConnection() throws Exception {
testCannotSetChannelMaxOnClosedConnection(true);
}
@Test
public void testCannotSetChannelMaxOnRemotelyClosedConnection() throws Exception {
testCannotSetChannelMaxOnClosedConnection(false);
}
private void testCannotSetChannelMaxOnClosedConnection(boolean localClose) throws Exception {
Engine engine = EngineFactory.PROTON.createNonSaslEngine();
engine.errorHandler(result -> failure = result.failureCause());
ProtonTestConnector peer = createTestPeer(engine);
peer.expectAMQPHeader().respondWithAMQPHeader();
peer.expectOpen().respond();
if (localClose) {
peer.expectClose().respond();
} else {
peer.remoteClose().queue();
}
Connection connection = engine.start();
connection.open();
if (localClose) {
connection.close();
}
try {
connection.setChannelMax(0);
fail("Should not be able to set channel max from closed Connection");
} catch (IllegalStateException error) {
// Expected
}
peer.waitForScriptToComplete();
assertNull(failure);
}
@Test
public void testCannotSetMaxFrameSizeOpenConnection() throws Exception {
Engine engine = EngineFactory.PROTON.createNonSaslEngine();
engine.errorHandler(result -> failure = result.failureCause());
ProtonTestConnector peer = createTestPeer(engine);
peer.expectAMQPHeader().respondWithAMQPHeader();
peer.expectOpen().respond();
Connection connection = engine.start();
connection.open();
try {
connection.setMaxFrameSize(65535);
fail("Should not be able to set max frame size from open Connection");
} catch (IllegalStateException error) {
// Expected
}
peer.waitForScriptToComplete();
assertNull(failure);
}
@Test
public void testCannotSetMaxFrameSizeOnLocallyClosedConnection() throws Exception {
testCannotSetMaxFrameSizeOnClosedConnection(true);
}
@Test
public void testCannotSetMaxFrameSizeOnRemotelyClosedConnection() throws Exception {
testCannotSetMaxFrameSizeOnClosedConnection(false);
}
private void testCannotSetMaxFrameSizeOnClosedConnection(boolean localClose) throws Exception {
Engine engine = EngineFactory.PROTON.createNonSaslEngine();
engine.errorHandler(result -> failure = result.failureCause());
ProtonTestConnector peer = createTestPeer(engine);
peer.expectAMQPHeader().respondWithAMQPHeader();
peer.expectOpen().respond();
if (localClose) {
peer.expectClose().respond();
} else {
peer.remoteClose().queue();
}
Connection connection = engine.start();
connection.open();
if (localClose) {
connection.close();
}
try {
connection.setMaxFrameSize(65535);
fail("Should not be able to set max frame size from closed Connection");
} catch (IllegalStateException error) {
// Expected
}
peer.waitForScriptToComplete();
assertNull(failure);
}
@Test
public void testCannotSetIdleTimeoutOnOpenConnection() throws Exception {
Engine engine = EngineFactory.PROTON.createNonSaslEngine();
engine.errorHandler(result -> failure = result.failureCause());
ProtonTestConnector peer = createTestPeer(engine);
peer.expectAMQPHeader().respondWithAMQPHeader();
peer.expectOpen().respond();
Connection connection = engine.start();
try {
connection.setIdleTimeout(-1);
fail("Should not be able to set idle timeout when negative value given");
} catch (IllegalArgumentException error) {
// Expected
}
try {
connection.setIdleTimeout(Long.MAX_VALUE);
fail("Should not be able to set idle timeout greater than unsigned integer value");
} catch (IllegalArgumentException error) {
// Expected
}
connection.open();
assertEquals(0, connection.getIdleTimeout());
try {
connection.setIdleTimeout(65535);
fail("Should not be able to set idle timeout from open Connection");
} catch (IllegalStateException error) {
// Expected
}
assertEquals(0, connection.getIdleTimeout());
peer.waitForScriptToComplete();
assertNull(failure);
}
@Test
public void testCannotSetIdleTimeoutOnLocallyClosedConnection() throws Exception {
testCannotSetIdleTimeoutOnClosedConnection(true);
}
@Test
public void testCannotSetIdleTimeoutOnRemotelyClosedConnection() throws Exception {
testCannotSetIdleTimeoutOnClosedConnection(false);
}
private void testCannotSetIdleTimeoutOnClosedConnection(boolean localClose) throws Exception {
Engine engine = EngineFactory.PROTON.createNonSaslEngine();
engine.errorHandler(result -> failure = result.failureCause());
ProtonTestConnector peer = createTestPeer(engine);
peer.expectAMQPHeader().respondWithAMQPHeader();
peer.expectOpen().respond();
if (localClose) {
peer.expectClose().respond();
} else {
peer.remoteClose().queue();
}
Connection connection = engine.start();
connection.open();
if (localClose) {
connection.close();
}
try {
connection.setIdleTimeout(65535);
fail("Should not be able to set idle timeout from closed Connection");
} catch (IllegalStateException error) {
// Expected
}
peer.waitForScriptToComplete();
assertNull(failure);
}
@Test
public void testCannotSetOfferedCapabilitiesOnOpenConnection() throws Exception {
Engine engine = EngineFactory.PROTON.createNonSaslEngine();
engine.errorHandler(result -> failure = result.failureCause());
ProtonTestConnector peer = createTestPeer(engine);
peer.expectAMQPHeader().respondWithAMQPHeader();
peer.expectOpen().respond();
Connection connection = engine.start();
connection.open();
try {
connection.setOfferedCapabilities(Symbol.valueOf("ANONYMOUS_RELAY"));
fail("Should not be able to set offered capabilities from open Connection");
} catch (IllegalStateException error) {
// Expected
}
peer.waitForScriptToComplete();
assertNull(failure);
}
@Test
public void testCannotSetOfferedCapabilitiesOnLocallyClosedConnection() throws Exception {
testCannotSetOfferedCapabilitiesOnClosedConnection(true);
}
@Test
public void testCannotSetOfferedCapabilitiesOnRemotelyClosedConnection() throws Exception {
testCannotSetOfferedCapabilitiesOnClosedConnection(false);
}
private void testCannotSetOfferedCapabilitiesOnClosedConnection(boolean localClose) throws Exception {
Engine engine = EngineFactory.PROTON.createNonSaslEngine();
engine.errorHandler(result -> failure = result.failureCause());
ProtonTestConnector peer = createTestPeer(engine);
peer.expectAMQPHeader().respondWithAMQPHeader();
peer.expectOpen().respond();
if (localClose) {
peer.expectClose().respond();
} else {
peer.remoteClose().queue();
}
Connection connection = engine.start();
connection.open();
if (localClose) {
connection.close();
}
try {
connection.setOfferedCapabilities(Symbol.valueOf("ANONYMOUS_RELAY"));
fail("Should not be able to set offered capabilities from closed Connection");
} catch (IllegalStateException error) {
// Expected
}
peer.waitForScriptToComplete();
assertNull(failure);
}
@Test
public void testCannotSetDesiredCapabilitiesOnOpenConnection() throws Exception {
Engine engine = EngineFactory.PROTON.createNonSaslEngine();
engine.errorHandler(result -> failure = result.failureCause());
ProtonTestConnector peer = createTestPeer(engine);
peer.expectAMQPHeader().respondWithAMQPHeader();
peer.expectOpen().respond();
Connection connection = engine.start();
connection.open();
try {
connection.setDesiredCapabilities(Symbol.valueOf("ANONYMOUS_RELAY"));
fail("Should not be able to set desired capabilities from open Connection");
} catch (IllegalStateException error) {
// Expected
}
peer.waitForScriptToComplete();
assertNull(failure);
}
@Test
public void testCannotSetDesiredCapabilitiesOnLocallyClosedConnection() throws Exception {
testCannotSetDesiredCapabilitiesOnClosedConnection(true);
}
@Test
public void testCannotSetDesiredCapabilitiesOnRemotelyClosedConnection() throws Exception {
testCannotSetDesiredCapabilitiesOnClosedConnection(false);
}
private void testCannotSetDesiredCapabilitiesOnClosedConnection(boolean localClose) throws Exception {
Engine engine = EngineFactory.PROTON.createNonSaslEngine();
engine.errorHandler(result -> failure = result.failureCause());
ProtonTestConnector peer = createTestPeer(engine);
peer.expectAMQPHeader().respondWithAMQPHeader();
peer.expectOpen().respond();
if (localClose) {
peer.expectClose().respond();
} else {
peer.remoteClose().queue();
}
Connection connection = engine.start();
connection.open();
if (localClose) {
connection.close();
}
try {
connection.setDesiredCapabilities(Symbol.valueOf("ANONYMOUS_RELAY"));
fail("Should not be able to set desired capabilities from open Connection");
} catch (IllegalStateException error) {
// Expected
}
peer.waitForScriptToComplete();
assertNull(failure);
}
@Test
public void testCannotSetPropertiesOnOpenConnection() throws Exception {
Engine engine = EngineFactory.PROTON.createNonSaslEngine();
engine.errorHandler(result -> failure = result.failureCause());
ProtonTestConnector peer = createTestPeer(engine);
peer.expectAMQPHeader().respondWithAMQPHeader();
peer.expectOpen().respond();
Connection connection = engine.start();
connection.open();
try {
connection.setProperties(Collections.emptyMap());
fail("Should not be able to set properties from open Connection");
} catch (IllegalStateException error) {
// Expected
}
peer.waitForScriptToComplete();
assertNull(failure);
}
@Test
public void testCannotSetPropertiesOnLocallyClosedConnection() throws Exception {
testCannotSetPropertiesOnClosedConnection(true);
}
@Test
public void testCannotSetPropertiesOnRemotelyClosedConnection() throws Exception {
testCannotSetPropertiesOnClosedConnection(false);
}
private void testCannotSetPropertiesOnClosedConnection(boolean localClose) throws Exception {
Engine engine = EngineFactory.PROTON.createNonSaslEngine();
engine.errorHandler(result -> failure = result.failureCause());
ProtonTestConnector peer = createTestPeer(engine);
peer.expectAMQPHeader().respondWithAMQPHeader();
peer.expectOpen().respond();
if (localClose) {
peer.expectClose().respond();
} else {
peer.remoteClose().queue();
}
Connection connection = engine.start();
connection.open();
if (localClose) {
connection.close();
}
try {
connection.setProperties(Collections.emptyMap());
fail("Should not be able to set properties from open Connection");
} catch (IllegalStateException error) {
// Expected
}
peer.waitForScriptToComplete();
assertNull(failure);
}
@Test
public void testIterateAndCloseSessionsFromSessionsAPI() throws Exception {
Engine engine = EngineFactory.PROTON.createNonSaslEngine();
engine.errorHandler(result -> failure = result.failureCause());
ProtonTestConnector peer = createTestPeer(engine);
peer.expectAMQPHeader().respondWithAMQPHeader();
peer.expectOpen().respond();
peer.expectBegin().respond();
peer.expectBegin().respond();
peer.expectBegin().respond();
Connection connection = engine.start().open();
connection.session().open();
connection.session().open();
connection.session().open();
peer.waitForScriptToComplete();
peer.expectEnd().respond();
peer.expectEnd().respond();
peer.expectEnd().respond();
peer.expectClose();
connection.sessions().forEach(session -> session.close());
connection.close();
peer.waitForScriptToComplete();
assertTrue(connection.sessions().isEmpty());
peer.waitForScriptToComplete();
assertNull(failure);
}
@Test
public void testConnectionClosedWhenChannelMaxExceeded() {
Engine engine = EngineFactory.PROTON.createNonSaslEngine();
engine.errorHandler(result -> failure = result.failureCause());
ProtonTestConnector peer = createTestPeer(engine);
final AtomicBoolean closed = new AtomicBoolean();
peer.expectAMQPHeader().respondWithAMQPHeader();
peer.expectOpen().withChannelMax(16).respond();
peer.expectClose().withError(ConnectionError.FRAMING_ERROR.toString(), "Channel Max Exceeded for session Begin").respond();
Connection connection = engine.start();
connection.setChannelMax(16);
connection.localCloseHandler(conn -> {
closed.set(true);
});
connection.open();
peer.remoteBegin().onChannel(32).now();
peer.waitForScriptToComplete();
assertTrue(connection.sessions().isEmpty());
assertTrue(closed.get());
peer.waitForScriptToComplete();
assertNull(failure);
}
@Test
public void testConnectionThrowsWhenLocalChannelMaxExceeded() {
Engine engine = EngineFactory.PROTON.createNonSaslEngine();
engine.errorHandler(result -> failure = result.failureCause());
ProtonTestConnector peer = createTestPeer(engine);
peer.expectAMQPHeader().respondWithAMQPHeader();
peer.expectOpen().withChannelMax(1).respond();
peer.expectBegin().onChannel(0).respond().onChannel(1);
peer.expectBegin().onChannel(1).respond().onChannel(0);
peer.expectEnd().onChannel(1).respond().onChannel(0);
Connection connection = engine.start();
connection.setChannelMax(1);
connection.open();
Session session1 = connection.session().open();
Session session2 = connection.session().open();
try {
connection.session().open();
fail("Should not be able to exceed local channel max");
} catch (IllegalStateException ise) {
// Expected
}
session2.close();
peer.waitForScriptToComplete();
peer.expectBegin().onChannel(1).respond().onChannel(0);
peer.expectEnd().onChannel(0).respond().onChannel(1);
peer.expectEnd().onChannel(1).respond().onChannel(0);
peer.expectClose().respond();
session2 = connection.session().open();
session1.close();
session2.close();
connection.close();
assertNull(failure);
}
@Test
public void testNoOpenWrittenAfterEncodeErrorFromConnectionProperties() throws Exception {
Engine engine = EngineFactory.PROTON.createNonSaslEngine();
engine.errorHandler(result -> failure = result.failureCause());
ProtonTestConnector peer = createTestPeer(engine);
peer.expectAMQPHeader().respondWithAMQPHeader();
Map<Symbol, Object> properties = new HashMap<>();
properties.put(Symbol.valueOf("test"), engine);
Connection connection = engine.start().setProperties(properties);
// Ensures that open is synchronous as header exchange will be complete.
connection.negotiate();
try {
connection.open();
fail("Should not have been able to open with invalid type in properties");
} catch (FrameEncodingException fee) {
assertTrue(fee.getCause() instanceof EncodeException);
engine.shutdown();
}
connection.close();
peer.waitForScriptToComplete();
assertNotNull(failure);
}
@Test
public void testPipelinedResourceOpenAllowsForReturningResponsesAfterCloseOfConnection() throws Exception {
Engine engine = EngineFactory.PROTON.createNonSaslEngine();
engine.errorHandler(result -> failure = result.failureCause());
ProtonTestConnector peer = createTestPeer(engine);
peer.expectAMQPHeader().respondWithAMQPHeader();
peer.expectOpen().respond();
peer.expectBegin();
peer.expectEnd();
Connection connection = engine.start().open();
Session session = connection.session().open();
session.close();
peer.waitForScriptToComplete();
peer.expectClose();
peer.remoteBegin().withRemoteChannel(0)
.withNextOutgoingId(0).queue();
peer.remoteClose().queue();
connection.close();
peer.waitForScriptToComplete();
assertNull(failure);
}
@Test
public void testSecondOpenAfterReceiptOfFirstFailsEngine() throws Exception {
Engine engine = EngineFactory.PROTON.createNonSaslEngine();
engine.errorHandler(result -> failure = result.failureCause());
ProtonTestConnector peer = createTestPeer(engine);
Connection connection = engine.connection();
connection.open();
peer.waitForScriptToComplete();
peer.expectAMQPHeader().respondWithAMQPHeader();
peer.expectOpen().respond();
peer.expectClose().withError(notNullValue());
engine.start();
peer.remoteOpen().onChannel(0).now();
peer.waitForScriptToComplete();
assertNotNull(failure);
}
@Test
public void testUnexpectedEndFrameFailsEngine() throws Exception {
Engine engine = EngineFactory.PROTON.createNonSaslEngine();
engine.errorHandler(result -> failure = result.failureCause());
ProtonTestConnector peer = createTestPeer(engine);
Connection connection = engine.connection();
connection.open();
peer.waitForScriptToComplete();
peer.expectAMQPHeader().respondWithAMQPHeader();
peer.expectOpen().respond();
peer.expectClose().withError(notNullValue());
engine.start();
peer.remoteEnd().onChannel(10).now();
peer.waitForScriptToComplete();
assertNotNull(failure);
}
@Test
public void testUnexpectedAttachForUnknownChannelFrameFailsEngine() throws Exception {
Engine engine = EngineFactory.PROTON.createNonSaslEngine();
engine.errorHandler(result -> failure = result.failureCause());
ProtonTestConnector peer = createTestPeer(engine);
Connection connection = engine.connection();
connection.open();
peer.waitForScriptToComplete();
peer.expectAMQPHeader().respondWithAMQPHeader();
peer.expectOpen().respond();
peer.expectClose().withError(notNullValue());
engine.start();
peer.remoteAttach().ofSender().withName("test").withHandle(0).onChannel(10).now();
peer.waitForScriptToComplete();
assertNotNull(failure);
}
@Test
public void testUnexpectedDetachForUnknownChannelFrameFailsEngine() throws Exception {
Engine engine = EngineFactory.PROTON.createNonSaslEngine();
engine.errorHandler(result -> failure = result.failureCause());
ProtonTestConnector peer = createTestPeer(engine);
Connection connection = engine.connection();
connection.open();
peer.waitForScriptToComplete();
peer.expectAMQPHeader().respondWithAMQPHeader();
peer.expectOpen().respond();
peer.expectClose().withError(notNullValue());
engine.start();
peer.remoteDetach().withHandle(0).onChannel(10).now();
peer.waitForScriptToComplete();
assertNotNull(failure);
}
@Test
public void testSecondBeginForAlreadyBegunSessionTriggerEngineFailure() throws Exception {
Engine engine = EngineFactory.PROTON.createNonSaslEngine();
engine.errorHandler(result -> failure = result.failureCause());
ProtonTestConnector peer = createTestPeer(engine);
Connection connection = engine.connection();
connection.session().open();
connection.open();
peer.waitForScriptToComplete();
peer.expectAMQPHeader().respondWithAMQPHeader();
peer.expectOpen().respond();
peer.expectBegin().onChannel(0).respond().onChannel(0);
peer.expectClose().withError(notNullValue());
engine.start();
peer.remoteBegin().onChannel(0).now();
peer.waitForScriptToComplete();
assertNotNull(failure);
}
}