blob: bc06e1dd5687d2d8bca11e22886bea05e640599b [file] [log] [blame]
/*
*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*
*/
package org.apache.qpid.jms.integration;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.greaterThan;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertTrue;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import org.apache.qpid.jms.JmsConnection;
import org.apache.qpid.jms.JmsConnectionFactory;
import org.apache.qpid.jms.test.QpidJmsTestCase;
import org.apache.qpid.jms.test.Wait;
import org.apache.qpid.jms.test.testpeer.AmqpPeerRunnable;
import org.apache.qpid.jms.test.testpeer.TestAmqpPeer;
import org.apache.qpid.proton.amqp.UnsignedInteger;
import org.hamcrest.Matchers;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class IdleTimeoutIntegrationTest extends QpidJmsTestCase {
public static final Logger LOGGER = LoggerFactory.getLogger(IdleTimeoutIntegrationTest.class);
@Test(timeout = 20000)
public void testIdleTimeoutIsAdvertisedByDefault() throws Exception {
try (TestAmqpPeer testPeer = new TestAmqpPeer();) {
testPeer.expectSaslAnonymous();
testPeer.expectOpen(null, greaterThan(UnsignedInteger.valueOf(0)), null, false);
// Each connection creates a session for managing temporary destinations etc
testPeer.expectBegin();
ConnectionFactory factory = new JmsConnectionFactory("amqp://localhost:" + testPeer.getServerPort());
Connection connection = factory.createConnection();
// Set a clientID to provoke the actual AMQP connection process to occur.
connection.setClientID("clientName");
testPeer.waitForAllHandlersToComplete(1000);
assertNull(testPeer.getThrowable());
testPeer.expectClose();
connection.close();
}
}
@Test(timeout = 20000)
public void testAdvertisedIdleTimeoutIsHalfOfActualTimeoutValue() throws Exception {
try (TestAmqpPeer testPeer = new TestAmqpPeer();) {
int configuredTimeout = 54320;
int advertisedValue = configuredTimeout / 2;
testPeer.expectSaslAnonymous();
testPeer.expectOpen(null, greaterThan(UnsignedInteger.valueOf(advertisedValue)), null, false);
// Each connection creates a session for managing temporary destinations etc
testPeer.expectBegin();
ConnectionFactory factory = new JmsConnectionFactory("amqp://localhost:" + testPeer.getServerPort() + "?amqp.idleTimeout=" + configuredTimeout);
Connection connection = factory.createConnection();
// Set a clientID to provoke the actual AMQP connection process to occur.
connection.setClientID("clientName");
testPeer.waitForAllHandlersToComplete(1000);
assertNull(testPeer.getThrowable());
testPeer.expectClose();
connection.close();
}
}
@Test(timeout = 20000)
public void testClientSendsEmptyFramesWhenPeerAdvertisesIdleTimeout() throws Exception {
try (TestAmqpPeer testPeer = new TestAmqpPeer();) {
int period = 20;
int idleSleep = 100;
int advertisedTimeout = period * 2;
testPeer.setAdvertisedIdleTimeout(advertisedTimeout);
testPeer.expectSaslAnonymous();
testPeer.expectOpen();
// Each connection creates a session for managing temporary destinations etc
testPeer.expectBegin();
ConnectionFactory factory = new JmsConnectionFactory("amqp://localhost:" + testPeer.getServerPort());
Connection connection = factory.createConnection();
// Set a clientID to provoke the actual AMQP connection process to occur.
connection.setClientID("clientName");
testPeer.waitForAllHandlersToComplete(1000);
assertNull(testPeer.getThrowable());
// Sleep for a bit, let the idle handling work
Thread.sleep(idleSleep);
testPeer.expectClose();
connection.close();
// Verify that *any* empty frames were received by the peer.
// We will verify additional behaviours with slower tests.
assertThat(testPeer.getEmptyFrameCount(), Matchers.greaterThan(0));
}
}
//TODO: Could use JUnit categories to make this slowish test skipable?
// If so, make it slower still and more granular.
@Test(timeout = 20000)
public void testClientSendsEmptyFramesWithExpectedFrequency() throws Exception {
try (TestAmqpPeer testPeer = new TestAmqpPeer();) {
int period = 250;
int advertisedTimeout = period * 2;
int cycles = 10;
int idleSleep = cycles * period;
int offset = 2;
testPeer.setAdvertisedIdleTimeout(advertisedTimeout);
testPeer.expectSaslAnonymous();
testPeer.expectOpen();
// Each connection creates a session for managing temporary destinations etc
testPeer.expectBegin();
ConnectionFactory factory = new JmsConnectionFactory("amqp://localhost:" + testPeer.getServerPort());
Connection connection = factory.createConnection();
// Set a clientID to provoke the actual AMQP connection process to occur.
connection.setClientID("clientName");
testPeer.waitForAllHandlersToComplete(1000);
assertNull(testPeer.getThrowable());
// Sleep for a bit, let the idle handling work
Thread.sleep(idleSleep);
testPeer.expectClose();
connection.close();
assertThat(testPeer.getEmptyFrameCount(), Matchers.greaterThanOrEqualTo(cycles - offset));
assertThat(testPeer.getEmptyFrameCount(), Matchers.lessThanOrEqualTo(cycles + offset));
}
}
@Test(timeout = 20000)
public void testConnectionSetFailedWhenPeerNeglectsToSendEmptyFrames() throws Exception {
try (TestAmqpPeer testPeer = new TestAmqpPeer();) {
int configuredTimeout = 200;
testPeer.expectSaslAnonymous();
testPeer.expectOpen();
// Each connection creates a session for managing temporary destinations etc
testPeer.expectBegin();
JmsConnectionFactory factory = new JmsConnectionFactory("amqp://localhost:" + testPeer.getServerPort() + "?amqp.idleTimeout=" + configuredTimeout);
final JmsConnection connection = (JmsConnection) factory.createConnection();
// Set a clientID to provoke the actual AMQP connection process to occur.
connection.setClientID("clientName");
testPeer.waitForAllHandlersToComplete(1000);
// The peer is still connected, so it will get the close frame with error
testPeer.expectClose(Matchers.notNullValue(), false);
assertNull(testPeer.getThrowable());
testPeer.setSuppressReadExceptionOnClose(true);
boolean failed = Wait.waitFor(new Wait.Condition() {
@Override
public boolean isSatisfied() throws Exception {
return connection.isFailed();
}
}, 10000, 10);
assertTrue("connection didnt fail in expected timeframe", failed);
testPeer.waitForAllHandlersToComplete(1000);
connection.close();
}
}
@Test(timeout = 20000)
public void testConnectionNotMarkedFailedWhenPeerSendsEmptyFrames() throws Exception {
try (TestAmqpPeer testPeer = new TestAmqpPeer();) {
int configuredTimeout = 2000;
int period = 500;
int cycles = 6;
final CountDownLatch latch = new CountDownLatch(cycles);
testPeer.expectSaslAnonymous();
testPeer.expectOpen();
// Each connection creates a session for managing temporary destinations etc
testPeer.expectBegin();
// Start to emit idle frames when the connection is set up, this should stop it timing out
testPeer.runAfterLastHandler(new EmptyFrameSender(latch, period, cycles, testPeer));
JmsConnectionFactory factory = new JmsConnectionFactory("amqp://localhost:" + testPeer.getServerPort() + "?amqp.idleTimeout=" + configuredTimeout);
final JmsConnection connection = (JmsConnection) factory.createConnection();
// Set a clientID to provoke the actual AMQP connection process to occur.
connection.setClientID("clientName");
boolean framesSent = latch.await(cycles * period * 2, TimeUnit.MILLISECONDS);
assertTrue("idle frames were not sent as expected", framesSent);
assertFalse("connection shouldnt fail", connection.isFailed());
testPeer.expectClose();
connection.close();
testPeer.waitForAllHandlersToComplete(1000);
assertNull(testPeer.getThrowable());
}
}
private static class EmptyFrameSender implements AmqpPeerRunnable
{
private int delay;
private int cycles;
private CountDownLatch latch;
private TestAmqpPeer testPeer;
public EmptyFrameSender(CountDownLatch latch, int delay, int cycles, TestAmqpPeer testPeer) {
this.cycles = cycles;
this.delay = delay;
this.latch = latch;
this.testPeer = testPeer;
}
@Override
public void run() {
for (int i = 0; i < cycles; i++) {
LOGGER.info("Delaying before empty frame: {}", i + 1);
try {
Thread.sleep(delay);
} catch (InterruptedException e) {
// pass
}
LOGGER.info("Sending empty frame: {}", i + 1);
testPeer.sendEmptyFrame(false);
latch.countDown();
}
}
}
}