blob: 684d10101d08d34a711dabe6b1f335d83a10327c [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.ra;
import java.util.Set;
import java.util.Iterator;
import javax.transaction.xa.XAResource;
import org.apache.activemq.broker.BrokerService;
import org.apache.activemq.ra.ActiveMQResourceAdapter;
import org.junit.Assert;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* Test for AMQ-6700.
* Will fail to connect to embedded broker using JCA and uses
* "ActiveMQ Connection Executor" thread to deal with low
* level exception. This tests verifies if this thread gets
* cleared up correctly after use.
*/
public class ActiveMQConnectionExecutorThreadCleanUpTest {
protected static Logger LOG =
LoggerFactory.getLogger(ActiveMQConnectionExecutorThreadCleanUpTest.class);
protected static final String AMQ_CONN_EXECUTOR_THREAD_NAME =
"ActiveMQ Connection Executor";
private BrokerService broker = null;
@Before
public void setUp() throws Exception {
LOG.info("Configuring broker programmatically.");
broker = new BrokerService();
broker.setPersistent(false);
// explicitly limiting to 0 connections so that test is unable
// to connect
broker.addConnector("tcp://localhost:0?maximumConnections=0");
broker.start();
broker.waitUntilStarted(5000);
}
@After
public void shutDown() throws Exception {
if (broker != null) {
if (broker.isStarted()) {
broker.stop();
broker.waitUntilStopped();
}
}
}
/**
* This test tries to create connections into the broker using the
* resource adapter's transaction recovery functionality.
* If the broker does not accept the connection, the connection's
* thread pool executor is used to deal with the error.
* This has lead to race conditions where the thread was not shutdown
* but got leaked.
* @throws Exception
*/
@Test
public void testAMQConnectionExecutorThreadCleanUp() throws Exception {
LOG.info("testAMQConnectionExecutorThreadCleanUp() started.");
ActiveMQResourceAdapter ra = new ActiveMQResourceAdapter();
ra.setServerUrl(broker.getTransportConnectorByScheme("tcp").getPublishableConnectString());
LOG.info("Using brokerUrl " + ra.getServerUrl());
// running in a small loop as very occasionally the call to
// ActiveMQResourceAdapter.$2.makeConnection() raises an exception
// rather than using the connection's executor task to deal with the
// connection error.
for (int i=0; i<10; i++) {
LOG.debug("Iteration " + i);
ra.start(null);
try {
XAResource[] resources = ra.getXAResources(null);
resources[0].recover(100);
} catch (Exception ex) {
LOG.error(ex.getMessage());
}
ra.stop();
// allow some small time for thread cleanup to happen
Thread.sleep(300);
// check if thread exists
Assert.assertFalse("Thread named \"" +
AMQ_CONN_EXECUTOR_THREAD_NAME +
"\" not cleared up with ActiveMQConnection.",
hasActiveMQConnectionExceutorThread());
}
}
/**
* Retrieves all threads from JVM and checks if any thread names contain
* AMQ_CONN_EXECUTOR_THREAD_NAME.
*
* @return true if such thread exists, otherwise false
*/
public boolean hasActiveMQConnectionExceutorThread() {
// retrieve all threads
Set<Thread> threadSet = Thread.getAllStackTraces().keySet();
Iterator<Thread> iter = threadSet.iterator();
while (iter.hasNext()) {
Thread thread = (Thread)iter.next();
if (thread.getName().startsWith(AMQ_CONN_EXECUTOR_THREAD_NAME )) {
LOG.error("Thread with name {} found.", thread.getName());
return true;
}
}
LOG.debug("Thread with name {} not found.", AMQ_CONN_EXECUTOR_THREAD_NAME);
return false;
}
}