blob: 635644bd68241b8da0a3a6d62c1d6cf08fd6de8a [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.qpid.protonj2.engine.impl;
import static org.hamcrest.CoreMatchers.notNullValue;
import static org.hamcrest.CoreMatchers.nullValue;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertNotNull;
import static org.junit.jupiter.api.Assertions.assertNull;
import static org.junit.jupiter.api.Assertions.assertSame;
import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.junit.jupiter.api.Assertions.fail;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.atomic.AtomicBoolean;
import javax.security.sasl.SaslException;
import org.apache.qpid.protonj2.buffer.ProtonByteBuffer;
import org.apache.qpid.protonj2.engine.AMQPPerformativeEnvelopePool;
import org.apache.qpid.protonj2.engine.Connection;
import org.apache.qpid.protonj2.engine.ConnectionState;
import org.apache.qpid.protonj2.engine.Engine;
import org.apache.qpid.protonj2.engine.EngineFactory;
import org.apache.qpid.protonj2.engine.EngineState;
import org.apache.qpid.protonj2.engine.HeaderEnvelope;
import org.apache.qpid.protonj2.engine.SASLEnvelope;
import org.apache.qpid.protonj2.engine.Session;
import org.apache.qpid.protonj2.engine.exceptions.EngineFailedException;
import org.apache.qpid.protonj2.engine.exceptions.EngineNotStartedException;
import org.apache.qpid.protonj2.engine.exceptions.EngineShutdownException;
import org.apache.qpid.protonj2.engine.exceptions.EngineStateException;
import org.apache.qpid.protonj2.engine.exceptions.MalformedAMQPHeaderException;
import org.apache.qpid.protonj2.test.driver.ProtonTestConnector;
import org.apache.qpid.protonj2.types.UnsignedInteger;
import org.apache.qpid.protonj2.types.security.SaslInit;
import org.apache.qpid.protonj2.types.transport.Open;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.Timeout;
import org.mockito.Mockito;
/**
* Test for basic functionality of the ProtonEngine implementation.
*/
@Timeout(20)
public class ProtonEngineTest extends ProtonEngineTestSupport {
@Test
public void testEnginePipelineWriteFailsBeforeStart() {
Engine engine = EngineFactory.PROTON.createNonSaslEngine();
engine.errorHandler(result -> failure = result.failureCause());
// Engine cannot accept input bytes until started.
assertFalse(engine.isWritable());
try {
engine.pipeline().fireWrite(new ProtonByteBuffer(0), null);
fail("Should not be able to write until engine has been started");
} catch (EngineNotStartedException error) {
// Expected
}
try {
engine.pipeline().fireWrite(HeaderEnvelope.AMQP_HEADER_ENVELOPE);
fail("Should not be able to write until engine has been started");
} catch (EngineNotStartedException error) {
// Expected
}
try {
engine.pipeline().fireWrite(new SASLEnvelope(new SaslInit()));
fail("Should not be able to write until engine has been started");
} catch (EngineNotStartedException error) {
// Expected
}
try {
engine.pipeline().fireWrite(AMQPPerformativeEnvelopePool.outgoingEnvelopePool().take(new Open(), 0, null));
fail("Should not be able to write until engine has been started");
} catch (EngineNotStartedException error) {
// Expected
}
assertNull(failure);
}
@Test
public void testEnginePipelineReadFailsBeforeStart() {
Engine engine = EngineFactory.PROTON.createNonSaslEngine();
engine.errorHandler(result -> failure = result.failureCause());
// Engine cannot accept input bytes until started.
assertFalse(engine.isWritable());
try {
engine.pipeline().fireRead(HeaderEnvelope.AMQP_HEADER_ENVELOPE);
fail("Should not be able to read data until engine has been started");
} catch (EngineNotStartedException error) {
// Expected
}
try {
engine.pipeline().fireRead(new SASLEnvelope(new SaslInit()));
fail("Should not be able to read data until engine has been started");
} catch (EngineNotStartedException error) {
// Expected
}
try {
engine.pipeline().fireRead(AMQPPerformativeEnvelopePool.incomingEnvelopePool().take(new Open(), 0, new ProtonByteBuffer(0)));
fail("Should not be able to read data until engine has been started");
} catch (EngineNotStartedException error) {
// Expected
}
try {
engine.pipeline().fireRead(new ProtonByteBuffer(0));
fail("Should not be able to write until engine has been started");
} catch (EngineNotStartedException error) {
// Expected
}
assertNull(failure);
}
@Test
public void testEngineStart() {
Engine engine = EngineFactory.PROTON.createNonSaslEngine();
engine.errorHandler(result -> failure = result.failureCause());
// Engine cannot accept input bytes until started.
assertFalse(engine.isWritable());
Connection connection = engine.start();
assertNotNull(connection);
assertFalse(engine.isShutdown());
assertFalse(engine.isFailed());
assertNull(engine.failureCause());
// Should be idempotent and return same Connection
Connection another = engine.start();
assertSame(connection, another);
// Default engine should start and return a connection immediately
assertTrue(engine.isWritable());
assertNotNull(connection);
assertNull(failure);
}
@Test
public void testEngineShutdown() {
Engine engine = EngineFactory.PROTON.createNonSaslEngine();
engine.errorHandler(result -> failure = result.failureCause());
// Engine cannot accept input bytes until started.
assertFalse(engine.isWritable());
Connection connection = engine.start();
assertNotNull(connection);
assertTrue(engine.isWritable());
assertFalse(engine.isShutdown());
assertFalse(engine.isFailed());
assertNull(engine.failureCause());
assertEquals(EngineState.STARTED, engine.state());
final AtomicBoolean engineShutdownEventFired = new AtomicBoolean();
engine.shutdownHandler(theEngine -> engineShutdownEventFired.set(true));
engine.shutdown();
assertFalse(engine.isWritable());
assertTrue(engine.isShutdown());
assertFalse(engine.isFailed());
assertNull(engine.failureCause());
assertEquals(EngineState.SHUTDOWN, engine.state());
assertTrue(engineShutdownEventFired.get());
assertNotNull(connection);
assertNull(failure);
}
@Test
public void testEngineFailure() {
ProtonEngine engine = (ProtonEngine) EngineFactory.PROTON.createNonSaslEngine();
engine.errorHandler(result -> failure = result.failureCause());
// Engine cannot accept input bytes until started.
assertFalse(engine.isWritable());
Connection connection = engine.start();
assertNotNull(connection);
assertTrue(engine.isWritable());
assertFalse(engine.isShutdown());
assertFalse(engine.isFailed());
assertNull(engine.failureCause());
assertEquals(EngineState.STARTED, engine.state());
engine.engineFailed(new SaslException());
assertFalse(engine.isWritable());
assertFalse(engine.isShutdown());
assertTrue(engine.isFailed());
assertNotNull(engine.failureCause());
assertEquals(EngineState.FAILED, engine.state());
engine.shutdown();
assertFalse(engine.isWritable());
assertTrue(engine.isShutdown());
assertTrue(engine.isFailed());
assertNotNull(engine.failureCause());
assertEquals(EngineState.SHUTDOWN, engine.state());
assertNotNull(connection);
assertNotNull(failure);
assertTrue(failure instanceof SaslException);
}
@Test
public void testEngineStartAfterConnectionOpen() {
Engine engine = EngineFactory.PROTON.createNonSaslEngine();
engine.errorHandler(result -> failure = result.failureCause());
ProtonTestConnector peer = createTestPeer(engine);
// Engine cannot accept input bytes until started.
assertFalse(engine.isWritable());
Connection connection = engine.connection();
assertNotNull(connection);
assertFalse(engine.isShutdown());
assertFalse(engine.isFailed());
assertNull(engine.failureCause());
connection.open();
peer.waitForScriptToComplete();
peer.expectAMQPHeader().respondWithAMQPHeader();
peer.expectOpen();
// Should be idempotent and return same Connection
Connection another = engine.start();
assertSame(connection, another);
// Default engine should start and return a connection immediately
assertTrue(engine.isWritable());
assertNotNull(connection);
assertNull(failure);
peer.waitForScriptToComplete();
}
@Test
public void testEngineEmitsAMQPHeaderOnConnectionOpen() {
Engine engine = EngineFactory.PROTON.createNonSaslEngine();
engine.errorHandler(result -> failure = result.failureCause());
ProtonTestConnector peer = createTestPeer(engine);
Connection connection = engine.start();
assertNotNull(connection);
// Default engine should start and return a connection immediately
assertNotNull(connection);
peer.expectAMQPHeader().respondWithAMQPHeader();
peer.expectOpen().respond().withContainerId("driver");
connection.setContainerId("test");
connection.open();
peer.waitForScriptToComplete();
assertEquals(ConnectionState.ACTIVE, connection.getState());
assertEquals(ConnectionState.ACTIVE, connection.getRemoteState());
assertNull(failure);
}
@Test
public void testTickFailsWhenConnectionNotOpenedNoLocalIdleSet() throws EngineStateException {
doTestTickFailsBasedOnState(false, false, false, false);
}
@Test
public void testTickFailsWhenConnectionNotOpenedLocalIdleSet() throws EngineStateException {
doTestTickFailsBasedOnState(true, false, false, false);
}
@Test
public void testTickFailsWhenEngineIsShutdownNoLocalIdleSet() throws EngineStateException {
doTestTickFailsBasedOnState(false, true, true, true);
}
@Test
public void testTickFailsWhenEngineIsShutdownLocalIdleSet() throws EngineStateException {
doTestTickFailsBasedOnState(true, true, true, true);
}
@Test
public void testTickFailsWhenEngineIsShutdownButCloseNotCalledNoLocalIdleSet() throws EngineStateException {
doTestTickFailsBasedOnState(false, true, false, true);
}
@Test
public void testTickFailsWhenEngineIsShutdownButCloseNotCalledLocalIdleSet() throws EngineStateException {
doTestTickFailsBasedOnState(true, true, false, true);
}
private void doTestTickFailsBasedOnState(boolean setLocalTimeout, boolean open, boolean close, boolean shutdown) throws EngineStateException {
Engine engine = EngineFactory.PROTON.createNonSaslEngine();
engine.errorHandler(result -> failure = result.failureCause());
ProtonTestConnector peer = createTestPeer(engine);
Connection connection = engine.start();
assertNotNull(connection);
if (setLocalTimeout) {
connection.setIdleTimeout(1000);
}
if (open) {
peer.expectAMQPHeader().respondWithAMQPHeader();
peer.expectOpen().respond();
connection.open();
}
if (close) {
peer.expectClose().respond();
connection.close();
}
peer.waitForScriptToComplete();
assertNull(failure);
if (shutdown) {
engine.shutdown();
}
try {
engine.tick(5000);
fail("Should not be able to tick an unopened connection");
} catch (IllegalStateException | EngineShutdownException error) {
}
}
@Test
public void testAutoTickFailsWhenConnectionNotOpenedNoLocalIdleSet() throws EngineStateException {
doTestAutoTickFailsBasedOnState(false, false, false, false);
}
@Test
public void testAutoTickFailsWhenConnectionNotOpenedLocalIdleSet() throws EngineStateException {
doTestAutoTickFailsBasedOnState(true, false, false, false);
}
@Test
public void testAutoTickFailsWhenEngineShutdownNoLocalIdleSet() throws EngineStateException {
doTestAutoTickFailsBasedOnState(false, true, true, true);
}
@Test
public void testAutoTickFailsWhenEngineShutdownLocalIdleSet() throws EngineStateException {
doTestAutoTickFailsBasedOnState(true, true, true, true);
}
private void doTestAutoTickFailsBasedOnState(boolean setLocalTimeout, boolean open, boolean close, boolean shutdown) throws EngineStateException {
Engine engine = EngineFactory.PROTON.createNonSaslEngine();
engine.errorHandler(result -> failure = result.failureCause());
ProtonTestConnector peer = createTestPeer(engine);
Connection connection = engine.start();
assertNotNull(connection);
if (setLocalTimeout) {
connection.setIdleTimeout(1000);
}
if (open) {
peer.expectAMQPHeader().respondWithAMQPHeader();
peer.expectOpen().respond();
connection.open();
}
if (close) {
peer.expectClose().respond();
connection.close();
}
peer.waitForScriptToComplete();
assertNull(failure);
if (shutdown) {
engine.shutdown();
}
try {
engine.tickAuto(Mockito.mock(ScheduledExecutorService.class));
fail("Should not be able to tick an unopened connection");
} catch (IllegalStateException | EngineShutdownException error) {
}
}
@Test
public void testTickAutoPreventsDoubleInvocation() {
Engine engine = EngineFactory.PROTON.createNonSaslEngine();
engine.errorHandler(result -> failure = result.failureCause());
ProtonTestConnector peer = createTestPeer(engine);
Connection connection = engine.start();
assertNotNull(connection);
peer.expectAMQPHeader().respondWithAMQPHeader();
peer.expectOpen().respond();
peer.expectClose().respond();
connection.open();
engine.tickAuto(Mockito.mock(ScheduledExecutorService.class));
try {
engine.tickAuto(Mockito.mock(ScheduledExecutorService.class));
fail("Should not be able call tickAuto more than once.");
} catch (IllegalStateException ise) {
}
connection.close();
peer.waitForScriptToComplete();
assertNull(failure);
}
@Test
public void testCannotCallTickAfterTickAutoCalled() {
Engine engine = EngineFactory.PROTON.createNonSaslEngine();
engine.errorHandler(result -> failure = result.failureCause());
ProtonTestConnector peer = createTestPeer(engine);
Connection connection = engine.start();
assertNotNull(connection);
peer.expectAMQPHeader().respondWithAMQPHeader();
peer.expectOpen().respond();
peer.expectClose().respond();
connection.open();
engine.tickAuto(Mockito.mock(ScheduledExecutorService.class));
try {
engine.tick(5000);
fail("Should not be able call tick after enabling the auto tick feature.");
} catch (IllegalStateException ise) {
}
connection.close();
peer.waitForScriptToComplete();
assertNull(failure);
}
@Test
public void testTickRemoteTimeout() throws EngineStateException {
Engine engine = EngineFactory.PROTON.createNonSaslEngine();
engine.errorHandler(result -> failure = result.failureCause());
ProtonTestConnector peer = createTestPeer(engine);
Connection connection = engine.start();
assertNotNull(connection);
final int remoteTimeout = 4000;
peer.expectAMQPHeader().respondWithAMQPHeader();
peer.expectOpen().withIdleTimeOut(nullValue()).respond().withIdleTimeOut(remoteTimeout);
// Set our local idleTimeout
connection.open();
long deadline = engine.tick(0);
assertEquals(2000, deadline, "Expected to be returned a deadline of 2000"); // deadline = 4000 / 2
deadline = engine.tick(1000); // Wait for less than the deadline with no data - get the same value
assertEquals(2000, deadline, "When the deadline hasn't been reached tick() should return the previous deadline");
assertEquals(0, peer.getEmptyFrameCount(), "When the deadline hasn't been reached tick() shouldn't write data");
peer.expectEmptyFrame();
deadline = engine.tick(remoteTimeout / 2); // Wait for the deadline - next deadline should be (4000/2)*2
assertEquals(4000, deadline, "When the deadline has been reached expected a new deadline to be returned 4000");
assertEquals(1, peer.getEmptyFrameCount(), "tick() should have written data");
peer.expectBegin();
Session session = connection.session().open();
deadline = engine.tick(3000);
assertEquals(5000, deadline, "Writing data resets the deadline");
assertEquals(1, peer.getEmptyFrameCount(), "When the deadline is reset tick() shouldn't write an empty frame");
peer.expectAttach();
session.sender("test").open();
deadline = engine.tick(4000);
assertEquals(6000, deadline, "Writing data resets the deadline");
assertEquals(1, peer.getEmptyFrameCount(), "When the deadline is reset tick() shouldn't write an empty frame");
peer.waitForScriptToComplete();
assertNull(failure);
}
@Test
public void testTickLocalTimeout() throws EngineStateException {
Engine engine = EngineFactory.PROTON.createNonSaslEngine();
engine.errorHandler(result -> failure = result.failureCause());
ProtonTestConnector peer = createTestPeer(engine);
Connection connection = engine.start();
assertNotNull(connection);
final int localTimeout = 4000;
peer.expectAMQPHeader().respondWithAMQPHeader();
peer.expectOpen().withIdleTimeOut(localTimeout).respond();
// Set our local idleTimeout
connection.setIdleTimeout(localTimeout);
connection.open();
long deadline = engine.tick(0);
assertEquals(4000, deadline, "Expected to be returned a deadline of 4000");
deadline = engine.tick(1000); // Wait for less than the deadline with no data - get the same value
assertEquals(4000, deadline, "When the deadline hasn't been reached tick() should return the previous deadline");
assertEquals(0, peer.getEmptyFrameCount(), "Reading data should never result in a frame being written");
// remote sends an empty frame now
peer.remoteEmptyFrame().now();
deadline = engine.tick(2000);
assertEquals(6000, deadline, "Reading data resets the deadline");
assertEquals(0, peer.getEmptyFrameCount(), "Reading data should never result in a frame being written");
assertEquals(ConnectionState.ACTIVE, connection.getState(), "Reading data before the deadline should keep the connection open");
peer.expectClose().respond();
deadline = engine.tick(7000);
assertEquals(ConnectionState.CLOSED, connection.getState(), "Calling tick() after the deadline should result in the connection being closed");
peer.waitForScriptToComplete();
assertNotNull(failure);
}
@Test
public void testTickWithZeroIdleTimeoutsGivesZeroDeadline() throws EngineStateException {
doTickWithNoIdleTimeoutGivesZeroDeadlineTestImpl(true);
}
@Test
public void testTickWithNullIdleTimeoutsGivesZeroDeadline() throws EngineStateException {
doTickWithNoIdleTimeoutGivesZeroDeadlineTestImpl(false);
}
private void doTickWithNoIdleTimeoutGivesZeroDeadlineTestImpl(boolean useZero) throws EngineStateException {
Engine engine = EngineFactory.PROTON.createNonSaslEngine();
engine.errorHandler(result -> failure = result.failureCause());
ProtonTestConnector peer = createTestPeer(engine);
Connection connection = engine.start();
assertNotNull(connection);
peer.expectAMQPHeader().respondWithAMQPHeader();
if (useZero) {
peer.expectOpen().withIdleTimeOut(nullValue()).respond().withIdleTimeOut(0);
} else {
peer.expectOpen().withIdleTimeOut(nullValue()).respond();
}
connection.open();
peer.waitForScriptToComplete();
assertNull(failure);
assertEquals(0, connection.getRemoteIdleTimeout());
long deadline = engine.tick(0);
assertEquals(0, deadline, "Unexpected deadline returned");
deadline = engine.tick(10);
assertEquals(0, deadline, "Unexpected deadline returned");
peer.waitForScriptToComplete();
assertNull(failure);
}
@Test
public void testTickWithLocalTimeout() throws EngineStateException {
// all-positive
doTickWithLocalTimeoutTestImpl(4000, 10000, 14000, 18000, 22000);
// all-negative
doTickWithLocalTimeoutTestImpl(2000, -100000, -98000, -96000, -94000);
// negative to positive missing 0
doTickWithLocalTimeoutTestImpl(500, -950, -450, 50, 550);
// negative to positive striking 0
doTickWithLocalTimeoutTestImpl(3000, -6000, -3000, 1, 3001);
}
private void doTickWithLocalTimeoutTestImpl(int localTimeout, long tick1, long expectedDeadline1, long expectedDeadline2, long expectedDeadline3) throws EngineStateException {
this.failure = null;
Engine engine = EngineFactory.PROTON.createNonSaslEngine();
engine.errorHandler(result -> failure = result.failureCause());
ProtonTestConnector peer = createTestPeer(engine);
Connection connection = engine.start();
assertNotNull(connection);
peer.expectAMQPHeader().respondWithAMQPHeader();
peer.expectOpen().withIdleTimeOut(localTimeout).respond();
// Set our local idleTimeout
connection.setIdleTimeout(localTimeout);
connection.open();
peer.waitForScriptToComplete();
assertNull(failure);
long deadline = engine.tick(tick1);
assertEquals(expectedDeadline1, deadline, "Unexpected deadline returned");
// Wait for less time than the deadline with no data - get the same value
long interimTick = tick1 + 10;
assertTrue(interimTick < expectedDeadline1);
assertEquals(expectedDeadline1, engine.tick(interimTick), "When the deadline hasn't been reached tick() should return the previous deadline");
assertEquals(1, peer.getPerformativeCount(), "When the deadline hasn't been reached tick() shouldn't write data");
assertNull(failure);
peer.remoteEmptyFrame().now();
deadline = engine.tick(expectedDeadline1);
assertEquals(expectedDeadline2, deadline, "When the deadline has been reached expected a new local deadline to be returned");
assertEquals(1, peer.getPerformativeCount(), "When the deadline hasn't been reached tick() shouldn't write data");
assertNull(failure);
peer.remoteEmptyFrame().now();
deadline = engine.tick(expectedDeadline2);
assertEquals(expectedDeadline3, deadline, "When the deadline has been reached expected a new local deadline to be returned");
assertEquals(1, peer.getPerformativeCount(), "When the deadline hasn't been reached tick() shouldn't write data");
assertNull(failure);
peer.expectClose().withError(notNullValue()).respond();
assertEquals(ConnectionState.ACTIVE, connection.getState(), "Connection should be active");
engine.tick(expectedDeadline3); // Wait for the deadline, but don't receive traffic, allow local timeout to expire
assertEquals(2, peer.getPerformativeCount(), "tick() should have written data");
assertEquals(ConnectionState.CLOSED, connection.getState(), "Calling tick() after the deadline should result in the connection being closed");
peer.waitForScriptToComplete();
assertNotNull(failure);
}
@Test
public void testTickWithRemoteTimeout() throws EngineStateException {
// all-positive
doTickWithRemoteTimeoutTestImpl(4000, 10000, 14000, 18000, 22000);
// all-negative
doTickWithRemoteTimeoutTestImpl(2000, -100000, -98000, -96000, -94000);
// negative to positive missing 0
doTickWithRemoteTimeoutTestImpl(500, -950, -450, 50, 550);
// negative to positive striking 0
doTickWithRemoteTimeoutTestImpl(3000, -6000, -3000, 1, 3001);
}
private void doTickWithRemoteTimeoutTestImpl(int remoteTimeoutHalf, long tick1, long expectedDeadline1, long expectedDeadline2, long expectedDeadline3) throws EngineStateException {
Engine engine = EngineFactory.PROTON.createNonSaslEngine();
engine.errorHandler(result -> failure = result.failureCause());
ProtonTestConnector peer = createTestPeer(engine);
Connection connection = engine.start();
assertNotNull(connection);
peer.expectAMQPHeader().respondWithAMQPHeader();
// Handle the peer transmitting [half] their timeout. We half it on receipt to avoid spurious timeouts
// if they not have transmitted half their actual timeout, as the AMQP spec only says they SHOULD do that.
peer.expectOpen().respond().withIdleTimeOut(remoteTimeoutHalf * 2);
connection.open();
peer.waitForScriptToComplete();
assertNull(failure);
long deadline = engine.tick(tick1);
assertEquals(expectedDeadline1, deadline, "Unexpected deadline returned");
// Wait for less time than the deadline with no data - get the same value
long interimTick = tick1 + 10;
assertTrue(interimTick < expectedDeadline1);
assertEquals(expectedDeadline1, engine.tick(interimTick), "When the deadline hasn't been reached tick() should return the previous deadline");
assertEquals(1, peer.getPerformativeCount(), "When the deadline hasn't been reached tick() shouldn't write data");
assertEquals(0, peer.getEmptyFrameCount(), "When the deadline hasn't been reached tick() shouldn't write data");
peer.expectEmptyFrame();
deadline = engine.tick(expectedDeadline1);
assertEquals(expectedDeadline2, deadline, "When the deadline has been reached expected a new remote deadline to be returned");
assertEquals(1, peer.getEmptyFrameCount(), "tick() should have written data");
peer.expectBegin();
// Do some actual work, create real traffic, removing the need to send empty frame to satisfy idle-timeout
connection.session().open();
assertEquals(2, peer.getPerformativeCount(), "session open should have written data");
deadline = engine.tick(expectedDeadline2);
assertEquals(expectedDeadline3, deadline, "When the deadline has been reached expected a new remote deadline to be returned");
assertEquals(2, peer.getPerformativeCount(), "tick() should not have written data as there was actual activity");
assertEquals(1, peer.getEmptyFrameCount(), "tick() should not have written data as there was actual activity");
peer.expectEmptyFrame();
engine.tick(expectedDeadline3);
assertEquals(2, peer.getEmptyFrameCount(), "tick() should have written data");
peer.waitForScriptToComplete();
assertNull(failure);
}
@Test
public void testTickWithBothTimeouts() throws EngineStateException {
// all-positive
doTickWithBothTimeoutsTestImpl(true, 5000, 2000, 10000, 12000, 14000, 15000);
doTickWithBothTimeoutsTestImpl(false, 5000, 2000, 10000, 12000, 14000, 15000);
// all-negative
doTickWithBothTimeoutsTestImpl(true, 10000, 4000, -100000, -96000, -92000, -90000);
doTickWithBothTimeoutsTestImpl(false, 10000, 4000, -100000, -96000, -92000, -90000);
// negative to positive missing 0
doTickWithBothTimeoutsTestImpl(true, 500, 200, -450, -250, -50, 50);
doTickWithBothTimeoutsTestImpl(false, 500, 200, -450, -250, -50, 50);
// negative to positive striking 0 with local deadline
doTickWithBothTimeoutsTestImpl(true, 500, 200, -500, -300, -100, 1);
doTickWithBothTimeoutsTestImpl(false, 500, 200, -500, -300, -100, 1);
// negative to positive striking 0 with remote deadline
doTickWithBothTimeoutsTestImpl(true, 500, 200, -200, 1, 201, 300);
doTickWithBothTimeoutsTestImpl(false, 500, 200, -200, 1, 201, 300);
}
private void doTickWithBothTimeoutsTestImpl(boolean allowLocalTimeout, int localTimeout, int remoteTimeoutHalf, long tick1,
long expectedDeadline1, long expectedDeadline2, long expectedDeadline3) throws EngineStateException {
this.failure = null;
Engine engine = EngineFactory.PROTON.createNonSaslEngine();
engine.errorHandler(result -> failure = result.failureCause());
ProtonTestConnector peer = createTestPeer(engine);
Connection connection = engine.start();
assertNotNull(connection);
peer.expectAMQPHeader().respondWithAMQPHeader();
// Handle the peer transmitting [half] their timeout. We half it on receipt to avoid spurious timeouts
// if they not have transmitted half their actual timeout, as the AMQP spec only says they SHOULD do that.
peer.expectOpen().respond().withIdleTimeOut(remoteTimeoutHalf * 2);
connection.setIdleTimeout(localTimeout);
connection.open();
long deadline = engine.tick(tick1);
assertEquals(expectedDeadline1, deadline, "Unexpected deadline returned");
// Wait for less time than the deadline with no data - get the same value
long interimTick = tick1 + 10;
assertTrue(interimTick < expectedDeadline1);
assertEquals(expectedDeadline1, engine.tick(interimTick), "When the deadline hasn't been reached tick() should return the previous deadline");
assertEquals(0, peer.getEmptyFrameCount(), "When the deadline hasn't been reached tick() shouldn't write data");
peer.expectEmptyFrame();
deadline = engine.tick(expectedDeadline1);
assertEquals(expectedDeadline2, deadline, "When the deadline has been reached expected a new remote deadline to be returned");
assertEquals(1, peer.getEmptyFrameCount(), "tick() should have written data");
peer.expectEmptyFrame();
deadline = engine.tick(expectedDeadline2);
assertEquals(expectedDeadline3, deadline, "When the deadline has been reached expected a new local deadline to be returned");
assertEquals(2, peer.getEmptyFrameCount(), "tick() should have written data");
peer.waitForScriptToComplete();
if (allowLocalTimeout) {
peer.expectClose().respond();
assertEquals(ConnectionState.ACTIVE, connection.getState(), "Connection should be active");
engine.tick(expectedDeadline3); // Wait for the deadline, but don't receive traffic, allow local timeout to expire
assertEquals(ConnectionState.CLOSED, connection.getState(), "Calling tick() after the deadline should result in the connection being closed");
assertEquals(2, peer.getEmptyFrameCount(), "tick() should have written data but not an empty frame");
peer.waitForScriptToComplete();
assertNotNull(failure);
} else {
peer.remoteEmptyFrame().now();
deadline = engine.tick(expectedDeadline3);
assertEquals(expectedDeadline2 + (remoteTimeoutHalf), deadline, "Receiving data should have reset the deadline (to the next remote one)");
assertEquals(2, peer.getEmptyFrameCount(), "tick() shouldn't have written data");
assertEquals(ConnectionState.ACTIVE, connection.getState(), "Connection should be active");
peer.waitForScriptToComplete();
assertNull(failure);
}
}
@Test
public void testTickWithNanoTimeDerivedValueWhichWrapsLocalThenRemote() throws EngineStateException {
doTickWithNanoTimeDerivedValueWhichWrapsLocalThenRemoteTestImpl(false);
}
@Test
public void testTickWithNanoTimeDerivedValueWhichWrapsLocalThenRemoteWithLocalTimeout() throws EngineStateException {
doTickWithNanoTimeDerivedValueWhichWrapsLocalThenRemoteTestImpl(true);
}
private void doTickWithNanoTimeDerivedValueWhichWrapsLocalThenRemoteTestImpl(boolean allowLocalTimeout) throws EngineStateException {
int localTimeout = 5000;
int remoteTimeoutHalf = 2000;
assertTrue(remoteTimeoutHalf < localTimeout);
long offset = 2500;
assertTrue(offset < localTimeout);
assertTrue(offset > remoteTimeoutHalf);
Engine engine = EngineFactory.PROTON.createNonSaslEngine();
engine.errorHandler(result -> failure = result.failureCause());
ProtonTestConnector peer = createTestPeer(engine);
Connection connection = engine.start();
assertNotNull(connection);
peer.expectAMQPHeader().respondWithAMQPHeader();
// Handle the peer transmitting [half] their timeout. We half it on receipt to avoid spurious timeouts
// if they not have transmitted half their actual timeout, as the AMQP spec only says they SHOULD do that.
peer.expectOpen().respond().withIdleTimeOut(remoteTimeoutHalf * 2);
connection.setIdleTimeout(localTimeout);
connection.open();
long deadline = engine.tick(Long.MAX_VALUE - offset);
assertEquals(Long.MAX_VALUE - offset + remoteTimeoutHalf, deadline, "Unexpected deadline returned");
deadline = engine.tick(Long.MAX_VALUE - (offset - 100)); // Wait for less time than the deadline with no data - get the same value
assertEquals(Long.MAX_VALUE -offset + remoteTimeoutHalf, deadline, "When the deadline hasn't been reached tick() should return the previous deadline");
assertEquals(0, peer.getEmptyFrameCount(), "When the deadline hasn't been reached tick() shouldn't write data");
peer.expectEmptyFrame();
deadline = engine.tick(Long.MAX_VALUE -offset + remoteTimeoutHalf); // Wait for the deadline - next deadline should be previous + remoteTimeoutHalf;
assertEquals(Long.MIN_VALUE + (2* remoteTimeoutHalf) - offset -1, deadline, "When the deadline has been reached expected a new remote deadline to be returned");
assertEquals(1, peer.getEmptyFrameCount(), "tick() should have written data");
peer.expectEmptyFrame();
deadline = engine.tick(Long.MIN_VALUE + (2* remoteTimeoutHalf) - offset -1); // Wait for the deadline - next deadline should be orig + localTimeout;
assertEquals(Long.MIN_VALUE + (localTimeout - offset) -1, deadline, "When the deadline has been reached expected a new local deadline to be returned");
assertEquals(2, peer.getEmptyFrameCount(), "tick() should have written data");
peer.waitForScriptToComplete();
if (allowLocalTimeout) {
peer.expectClose().respond();
assertEquals(ConnectionState.ACTIVE, connection.getState(), "Connection should be active");
engine.tick(Long.MIN_VALUE + (localTimeout - offset) -1); // Wait for the deadline, but don't receive traffic, allow local timeout to expire
assertEquals(ConnectionState.CLOSED, connection.getState(), "Calling tick() after the deadline should result in the connection being closed");
assertEquals(2, peer.getEmptyFrameCount(), "tick() should have written data but not an empty frame");
peer.waitForScriptToComplete();
assertNotNull(failure);
} else {
peer.remoteEmptyFrame().now();
deadline = engine.tick(Long.MIN_VALUE + (localTimeout - offset) -1); // Wait for the deadline - next deadline should be orig + 3*remoteTimeoutHalf;
assertEquals(Long.MIN_VALUE + (3* remoteTimeoutHalf) - offset -1, deadline, "Receiving data should have reset the deadline (to the remote one)");
assertEquals(2, peer.getEmptyFrameCount(), "tick() shouldn't have written data");
assertEquals(ConnectionState.ACTIVE, connection.getState(), "Connection should be active");
peer.waitForScriptToComplete();
assertNull(failure);
}
}
@Test
public void testTickWithNanoTimeDerivedValueWhichWrapsRemoteThenLocal() throws EngineStateException {
doTickWithNanoTimeDerivedValueWhichWrapsRemoteThenLocalTestImpl(false);
}
@Test
public void testTickWithNanoTimeDerivedValueWhichWrapsRemoteThenLocalWithLocalTimeout() throws EngineStateException {
doTickWithNanoTimeDerivedValueWhichWrapsRemoteThenLocalTestImpl(true);
}
private void doTickWithNanoTimeDerivedValueWhichWrapsRemoteThenLocalTestImpl(boolean allowLocalTimeout) throws EngineStateException {
int localTimeout = 2000;
int remoteTimeoutHalf = 5000;
assertTrue(localTimeout < remoteTimeoutHalf);
long offset = 2500;
assertTrue(offset > localTimeout);
assertTrue(offset < remoteTimeoutHalf);
Engine engine = EngineFactory.PROTON.createNonSaslEngine();
engine.errorHandler(result -> failure = result.failureCause());
ProtonTestConnector peer = createTestPeer(engine);
Connection connection = engine.start();
assertNotNull(connection);
peer.expectAMQPHeader().respondWithAMQPHeader();
// Handle the peer transmitting [half] their timeout. We half it on receipt to avoid spurious timeouts
// if they not have transmitted half their actual timeout, as the AMQP spec only says they SHOULD do that.
peer.expectOpen().respond().withIdleTimeOut(remoteTimeoutHalf * 2);
connection.setIdleTimeout(localTimeout);
connection.open();
long deadline = engine.tick(Long.MAX_VALUE - offset);
assertEquals(Long.MAX_VALUE - offset + localTimeout, deadline, "Unexpected deadline returned");
deadline = engine.tick(Long.MAX_VALUE - (offset - 100)); // Wait for less time than the deadline with no data - get the same value
assertEquals(Long.MAX_VALUE - offset + localTimeout, deadline, "When the deadline hasn't been reached tick() should return the previous deadline");
assertEquals(0, peer.getEmptyFrameCount(), "tick() shouldn't have written data");
// Receive Empty frame to satisfy local deadline
peer.remoteEmptyFrame().now();
deadline = engine.tick(Long.MAX_VALUE - offset + localTimeout); // Wait for the deadline - next deadline should be orig + 2* localTimeout;
assertEquals(Long.MIN_VALUE + (localTimeout - offset) -1 + localTimeout, deadline, "When the deadline has been reached expected a new local deadline to be returned");
assertEquals(0, peer.getEmptyFrameCount(), "tick() should not have written data");
peer.waitForScriptToComplete();
if (allowLocalTimeout) {
peer.expectClose().respond();
assertEquals(ConnectionState.ACTIVE, connection.getState(), "Connection should be active");
engine.tick(Long.MIN_VALUE + (localTimeout - offset) -1 + localTimeout); // Wait for the deadline, but don't receive traffic, allow local timeout to expire
assertEquals(ConnectionState.CLOSED, connection.getState(), "Calling tick() after the deadline should result in the connection being closed");
assertEquals(0, peer.getEmptyFrameCount(), "tick() should have written data but not an empty frame");
peer.waitForScriptToComplete();
assertNotNull(failure);
} else {
// Receive Empty frame to satisfy local deadline
peer.remoteEmptyFrame().now();
deadline = engine.tick(Long.MIN_VALUE + (localTimeout - offset) -1 + localTimeout); // Wait for the deadline - next deadline should be orig + remoteTimeoutHalf;
assertEquals(Long.MIN_VALUE + remoteTimeoutHalf - offset -1, deadline, "Receiving data should have reset the deadline (to the remote one)");
assertEquals(0, peer.getEmptyFrameCount(), "tick() shouldn't have written data");
peer.expectEmptyFrame();
deadline = engine.tick(Long.MIN_VALUE + remoteTimeoutHalf - offset -1); // Wait for the deadline - next deadline should be orig + 3* localTimeout;
assertEquals(Long.MIN_VALUE + (3* localTimeout) - offset -1, deadline, "When the deadline has been reached expected a new local deadline to be returned");
assertEquals(1, peer.getEmptyFrameCount(), "tick() should have written an empty frame");
assertEquals(ConnectionState.ACTIVE, connection.getState(), "Connection should be active");
peer.waitForScriptToComplete();
assertNull(failure);
}
}
@Test
public void testTickWithNanoTimeDerivedValueWhichWrapsBothRemoteFirst() throws EngineStateException {
doTickWithNanoTimeDerivedValueWhichWrapsBothRemoteFirstTestImpl(false);
}
@Test
public void testTickWithNanoTimeDerivedValueWhichWrapsBothRemoteFirstWithLocalTimeout() throws EngineStateException {
doTickWithNanoTimeDerivedValueWhichWrapsBothRemoteFirstTestImpl(true);
}
private void doTickWithNanoTimeDerivedValueWhichWrapsBothRemoteFirstTestImpl(boolean allowLocalTimeout) throws EngineStateException {
int localTimeout = 2000;
int remoteTimeoutHalf = 2500;
assertTrue(localTimeout < remoteTimeoutHalf);
long offset = 500;
assertTrue(offset < localTimeout);
Engine engine = EngineFactory.PROTON.createNonSaslEngine();
engine.errorHandler(result -> failure = result.failureCause());
ProtonTestConnector peer = createTestPeer(engine);
Connection connection = engine.start();
assertNotNull(connection);
peer.expectAMQPHeader().respondWithAMQPHeader();
// Handle the peer transmitting [half] their timeout. We half it on receipt to avoid spurious timeouts
// if they not have transmitted half their actual timeout, as the AMQP spec only says they SHOULD do that.
peer.expectOpen().respond().withIdleTimeOut(remoteTimeoutHalf * 2);
connection.setIdleTimeout(localTimeout);
connection.open();
long deadline = engine.tick(Long.MAX_VALUE - offset);
assertEquals(Long.MIN_VALUE + (localTimeout - offset) -1, deadline, "Unexpected deadline returned");
deadline = engine.tick(Long.MAX_VALUE - (offset - 100)); // Wait for less time than the deadline with no data - get the same value
assertEquals(Long.MIN_VALUE + (localTimeout - offset) -1, deadline, "When the deadline hasn't been reached tick() should return the previous deadline");
assertEquals(0, peer.getEmptyFrameCount(), "tick() shouldn't have written data");
// Receive Empty frame to satisfy local deadline
peer.remoteEmptyFrame().now();
deadline = engine.tick(Long.MIN_VALUE + (localTimeout - offset) -1); // Wait for the deadline - next deadline should be orig + remoteTimeoutHalf;
assertEquals(Long.MIN_VALUE + (remoteTimeoutHalf - offset) -1, deadline, "When the deadline has been reached expected a new remote deadline to be returned");
assertEquals(0, peer.getEmptyFrameCount(), "When the deadline hasn't been reached tick() shouldn't write data");
peer.expectEmptyFrame();
deadline = engine.tick(Long.MIN_VALUE + (remoteTimeoutHalf - offset) -1); // Wait for the deadline - next deadline should be orig + 2* localTimeout;
assertEquals(Long.MIN_VALUE + (localTimeout - offset) -1 + localTimeout, deadline, "When the deadline has been reached expected a new local deadline to be returned");
assertEquals(1, peer.getEmptyFrameCount(), "tick() should have written data");
peer.waitForScriptToComplete();
if (allowLocalTimeout) {
peer.expectClose().respond();
assertEquals(ConnectionState.ACTIVE, connection.getState(), "Connection should be active");
engine.tick(Long.MIN_VALUE + (localTimeout - offset) -1 + localTimeout); // Wait for the deadline, but don't receive traffic, allow local timeout to expire
assertEquals(ConnectionState.CLOSED, connection.getState(), "Calling tick() after the deadline should result in the connection being closed");
assertEquals(1, peer.getEmptyFrameCount(), "tick() should have written data but not an empty frame");
peer.waitForScriptToComplete();
assertNotNull(failure);
} else {
// Receive Empty frame to satisfy local deadline
peer.remoteEmptyFrame().now();
deadline = engine.tick(Long.MIN_VALUE + (localTimeout - offset) -1 + localTimeout); // Wait for the deadline - next deadline should be orig + 2*remoteTimeoutHalf;
assertEquals(Long.MIN_VALUE + (2* remoteTimeoutHalf) - offset -1, deadline, "Receiving data should have reset the deadline (to the remote one)");
assertEquals(1, peer.getEmptyFrameCount(), "tick() shouldn't have written data");
assertEquals(ConnectionState.ACTIVE, connection.getState(), "Connection should be active");
peer.waitForScriptToComplete();
assertNull(failure);
}
}
@Test
public void testTickWithNanoTimeDerivedValueWhichWrapsBothLocalFirst() throws EngineStateException {
doTickWithNanoTimeDerivedValueWhichWrapsBothLocalFirstTestImpl(false);
}
@Test
public void testTickWithNanoTimeDerivedValueWhichWrapsBothLocalFirstWithLocalTimeout() throws EngineStateException {
doTickWithNanoTimeDerivedValueWhichWrapsBothLocalFirstTestImpl(true);
}
private void doTickWithNanoTimeDerivedValueWhichWrapsBothLocalFirstTestImpl(boolean allowLocalTimeout) throws EngineStateException {
int localTimeout = 5000;
int remoteTimeoutHalf = 2000;
assertTrue(remoteTimeoutHalf < localTimeout);
long offset = 500;
assertTrue(offset < remoteTimeoutHalf);
Engine engine = EngineFactory.PROTON.createNonSaslEngine();
engine.errorHandler(result -> failure = result.failureCause());
ProtonTestConnector peer = createTestPeer(engine);
Connection connection = engine.start();
assertNotNull(connection);
peer.expectAMQPHeader().respondWithAMQPHeader();
// Handle the peer transmitting [half] their timeout. We half it on receipt to avoid spurious timeouts
// if they not have transmitted half their actual timeout, as the AMQP spec only says they SHOULD do that.
peer.expectOpen().respond().withIdleTimeOut(remoteTimeoutHalf * 2);
connection.setIdleTimeout(localTimeout);
connection.open();
long deadline = engine.tick(Long.MAX_VALUE - offset);
assertEquals(Long.MIN_VALUE + (remoteTimeoutHalf - offset) -1, deadline, "Unexpected deadline returned");
deadline = engine.tick(Long.MAX_VALUE - (offset - 100)); // Wait for less time than the deadline with no data - get the same value
assertEquals(Long.MIN_VALUE + (remoteTimeoutHalf - offset) -1, deadline, "When the deadline hasn't been reached tick() should return the previous deadline");
assertEquals(0, peer.getEmptyFrameCount(), "When the deadline hasn't been reached tick() shouldn't write data");
peer.expectEmptyFrame();
deadline = engine.tick(Long.MIN_VALUE + (remoteTimeoutHalf - offset) -1); // Wait for the deadline - next deadline should be previous + remoteTimeoutHalf;
assertEquals(Long.MIN_VALUE + (remoteTimeoutHalf - offset) -1 + remoteTimeoutHalf, deadline, "When the deadline has been reached expected a new remote deadline to be returned");
assertEquals(1, peer.getEmptyFrameCount(), "tick() should have written data");
peer.expectEmptyFrame();
deadline = engine.tick(Long.MIN_VALUE + (remoteTimeoutHalf - offset) -1 + remoteTimeoutHalf); // Wait for the deadline - next deadline should be orig + localTimeout;
assertEquals(Long.MIN_VALUE + (localTimeout - offset) -1, deadline, "When the deadline has been reached expected a new local deadline to be returned");
assertEquals(2, peer.getEmptyFrameCount(), "tick() should have written data");
peer.waitForScriptToComplete();
if (allowLocalTimeout) {
peer.expectClose().respond();
assertEquals(ConnectionState.ACTIVE, connection.getState(), "Connection should be active");
engine.tick(Long.MIN_VALUE + (localTimeout - offset) -1); // Wait for the deadline, but don't receive traffic, allow local timeout to expire
assertEquals(ConnectionState.CLOSED, connection.getState(), "Calling tick() after the deadline should result in the connection being closed");
assertEquals(2, peer.getEmptyFrameCount(), "tick() should have written data but not an empty frame");
peer.waitForScriptToComplete();
assertNotNull(failure);
} else {
// Receive Empty frame to satisfy local deadline
peer.remoteEmptyFrame().now();
deadline = engine.tick(Long.MIN_VALUE + (localTimeout - offset) -1); // Wait for the deadline - next deadline should be orig + 3*remoteTimeoutHalf;
assertEquals(Long.MIN_VALUE + (3* remoteTimeoutHalf) - offset -1, deadline, "Receiving data should have reset the deadline (to the remote one)");
assertEquals(2, peer.getEmptyFrameCount(), "tick() shouldn't have written data");
assertEquals(ConnectionState.ACTIVE, connection.getState(), "Connection should be active");
peer.waitForScriptToComplete();
assertNull(failure);
}
}
@Test
public void testEngineFailsWithMeaningfulErrorOnNonAMQPHeaderResponseBadByte1() throws EngineStateException {
doTestEngineFailsWithMalformedHeaderException(new byte[] { 'a', 'M', 'Q', 'P', 0, 1, 0, 0 });
}
@Test
public void testEngineFailsWithMeaningfulErrorOnNonAMQPHeaderResponseBadByte2() throws EngineStateException {
doTestEngineFailsWithMalformedHeaderException(new byte[] { 'A', 'm', 'Q', 'P', 0, 1, 0, 0 });
}
@Test
public void testEngineFailsWithMeaningfulErrorOnNonAMQPHeaderResponseBadByte3() throws EngineStateException {
doTestEngineFailsWithMalformedHeaderException(new byte[] { 'A', 'M', 'q', 'P', 0, 1, 0, 0 });
}
@Test
public void testEngineFailsWithMeaningfulErrorOnNonAMQPHeaderResponseBadByte4() throws EngineStateException {
doTestEngineFailsWithMalformedHeaderException(new byte[] { 'A', 'M', 'Q', 'p', 0, 1, 0, 0 });
}
@Test
public void testEngineFailsWithMeaningfulErrorOnNonAMQPHeaderResponseBadByte5() throws EngineStateException {
doTestEngineFailsWithMalformedHeaderException(new byte[] { 'A', 'M', 'Q', 'P', 99, 1, 0, 0 });
}
@Test
public void testEngineFailsWithMeaningfulErrorOnNonAMQPHeaderResponseBadByte6() throws EngineStateException {
doTestEngineFailsWithMalformedHeaderException(new byte[] { 'A', 'M', 'Q', 'P', 0, 99, 0, 0 });
}
@Test
public void testEngineFailsWithMeaningfulErrorOnNonAMQPHeaderResponseBadByte7() throws EngineStateException {
doTestEngineFailsWithMalformedHeaderException(new byte[] { 'A', 'M', 'Q', 'P', 0, 1, 99, 0 });
}
@Test
public void testEngineFailsWithMeaningfulErrorOnNonAMQPHeaderResponseBadByte8() throws EngineStateException {
doTestEngineFailsWithMalformedHeaderException(new byte[] { 'A', 'M', 'Q', 'P', 0, 1, 0, 99 });
}
private final void doTestEngineFailsWithMalformedHeaderException(byte[] headerBytes) {
Engine engine = EngineFactory.PROTON.createNonSaslEngine();
engine.errorHandler(result -> failure = result.failureCause());
ProtonTestConnector peer = createTestPeer(engine);
peer.expectAMQPHeader().respondWithBytes(headerBytes);
Connection connection = engine.start();
assertNotNull(connection);
connection.negotiate();
peer.waitForScriptToCompleteIgnoreErrors();
assertNotNull(failure);
assertTrue(failure instanceof MalformedAMQPHeaderException);
}
@Test
public void testEngineConfiguresDefaultMaxFrameSizeLimits() {
Engine engine = EngineFactory.PROTON.createNonSaslEngine();
engine.errorHandler(result -> failure = result.failureCause());
Connection connection = engine.start();
assertNotNull(connection);
ProtonEngineConfiguration configuration = (ProtonEngineConfiguration) engine.configuration();
ProtonTestConnector peer = createTestPeer(engine);
peer.expectAMQPHeader().respondWithAMQPHeader();
peer.expectOpen().withMaxFrameSize(ProtonConstants.DEFAULT_MAX_AMQP_FRAME_SIZE).respond();
connection.open();
assertEquals(ProtonConstants.DEFAULT_MAX_AMQP_FRAME_SIZE, configuration.getOutboundMaxFrameSize());
assertEquals(ProtonConstants.DEFAULT_MAX_AMQP_FRAME_SIZE, configuration.getInboundMaxFrameSize());
// Default engine should start and return a connection immediately
assertNull(failure);
}
@Test
public void testEngineConfiguresSpecifiedMaxFrameSizeLimitsMatchesDefaultMinMax() {
doTestEngineConfiguresSpecifiedFrameSizeLimits(512, 512);
}
@Test
public void testEngineConfiguresSpecifiedMaxFrameSizeLimitsRemoteLargerThanLocal() {
doTestEngineConfiguresSpecifiedFrameSizeLimits(1024, 1025);
}
@Test
public void testEngineConfiguresSpecifiedMaxFrameSizeLimitsRemoteSmallerThanLocal() {
doTestEngineConfiguresSpecifiedFrameSizeLimits(1024, 1023);
}
@Test
public void testEngineConfiguresSpecifiedMaxFrameSizeLimitsGreaterThanDefaultValues() {
doTestEngineConfiguresSpecifiedFrameSizeLimits(
ProtonConstants.DEFAULT_MAX_AMQP_FRAME_SIZE + 32, ProtonConstants.DEFAULT_MAX_AMQP_FRAME_SIZE + 64);
}
@Test
public void testEngineConfiguresRemoteMaxFrameSizeSetToMaxUnsignedLong() {
doTestEngineConfiguresSpecifiedFrameSizeLimits(
Integer.MAX_VALUE, UnsignedInteger.MAX_VALUE.intValue());
}
private void doTestEngineConfiguresSpecifiedFrameSizeLimits(int localValue, int remoteResponse) {
Engine engine = EngineFactory.PROTON.createNonSaslEngine();
engine.errorHandler(result -> failure = result.failureCause());
Connection connection = engine.start();
assertNotNull(connection);
ProtonEngineConfiguration configuration = (ProtonEngineConfiguration) engine.configuration();
ProtonTestConnector peer = createTestPeer(engine);
peer.expectAMQPHeader().respondWithAMQPHeader();
peer.expectOpen().withMaxFrameSize(Integer.toUnsignedLong(localValue))
.respond()
.withMaxFrameSize(Integer.toUnsignedLong(remoteResponse));
connection.setMaxFrameSize(Integer.toUnsignedLong(localValue));
connection.open();
if (localValue > 0) {
assertEquals(localValue, configuration.getInboundMaxFrameSize());
} else {
assertEquals(Integer.MAX_VALUE, configuration.getInboundMaxFrameSize());
}
if (remoteResponse > localValue) {
assertEquals(localValue, configuration.getOutboundMaxFrameSize());
} else {
if (remoteResponse > 0) {
assertEquals(remoteResponse, configuration.getOutboundMaxFrameSize());
} else {
assertEquals(Integer.MAX_VALUE, configuration.getOutboundMaxFrameSize());
}
}
assertEquals(UnsignedInteger.toUnsignedLong(localValue), connection.getMaxFrameSize());
assertEquals(UnsignedInteger.toUnsignedLong(remoteResponse), connection.getRemoteMaxFrameSize());
// Default engine should start and return a connection immediately
assertNull(failure);
}
@Test
public void testEngineErrorsOnLocalMaxFrameSizeLargerThanImposedLimit() {
Engine engine = EngineFactory.PROTON.createNonSaslEngine();
engine.errorHandler(result -> failure = result.failureCause());
Connection connection = engine.start();
assertNotNull(connection);
assertThrows(IllegalArgumentException.class, () -> connection.setMaxFrameSize(UnsignedInteger.MAX_VALUE.longValue()));
}
@Test
public void testEngineShutdownHandlerThrowsIsIgnoredAndShutdownCompletes() {
ProtonEngine engine = (ProtonEngine) EngineFactory.PROTON.createNonSaslEngine();
engine.errorHandler(result -> failure = result.failureCause());
engine.shutdownHandler((theEngine) -> {
throw new RuntimeException();
});
Connection connection = engine.start();
assertNotNull(connection);
assertTrue(engine.isWritable());
assertTrue(engine.isRunning());
assertFalse(engine.isShutdown());
assertFalse(engine.isFailed());
assertNull(engine.failureCause());
assertEquals(EngineState.STARTED, engine.state());
try {
engine.shutdown();
fail("User event handler throw wasn't propagated");
} catch (RuntimeException expected) {
// Expected
}
assertFalse(engine.isWritable());
assertFalse(engine.isRunning());
assertTrue(engine.isShutdown());
assertFalse(engine.isFailed());
assertNull(engine.failureCause());
assertEquals(EngineState.SHUTDOWN, engine.state());
// should not perform any additional work.
engine.shutdown();
assertNotNull(connection);
assertNull(failure);
}
@Test
public void testEnginePipelineProtectsFromExternalUserMischief() {
Engine engine = EngineFactory.PROTON.createNonSaslEngine();
engine.errorHandler(result -> failure = result.failureCause());
ProtonTestConnector peer = createTestPeer(engine);
Connection connection = engine.connection().open();
peer.waitForScriptToComplete();
peer.expectAMQPHeader().respondWithAMQPHeader();
peer.expectOpen();
engine.start();
assertTrue(engine.isWritable());
assertNotNull(connection);
assertNull(failure);
assertThrows(IllegalAccessError.class, () -> engine.pipeline().fireEngineStarting());
assertThrows(IllegalAccessError.class, () -> engine.pipeline().fireEngineStateChanged());
assertThrows(IllegalAccessError.class, () -> engine.pipeline().fireFailed(new EngineFailedException(null)));
engine.shutdown();
assertThrows(EngineShutdownException.class, () -> engine.pipeline().first());
assertThrows(EngineShutdownException.class, () -> engine.pipeline().last());
assertThrows(EngineShutdownException.class, () -> engine.pipeline().firstContext());
assertThrows(EngineShutdownException.class, () -> engine.pipeline().lastContext());
peer.waitForScriptToComplete();
}
}