AMQ-8039 - support system property configuration of the inactivity monitor thread pool, follow example of nio. new test to verify
diff --git a/activemq-client/src/main/java/org/apache/activemq/transport/AbstractInactivityMonitor.java b/activemq-client/src/main/java/org/apache/activemq/transport/AbstractInactivityMonitor.java
index bef8d7f..6182be7 100644
--- a/activemq-client/src/main/java/org/apache/activemq/transport/AbstractInactivityMonitor.java
+++ b/activemq-client/src/main/java/org/apache/activemq/transport/AbstractInactivityMonitor.java
@@ -18,7 +18,10 @@
import java.io.IOException;
import java.util.Timer;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.RejectedExecutionException;
+import java.util.concurrent.RejectedExecutionHandler;
import java.util.concurrent.SynchronousQueue;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.ThreadPoolExecutor;
@@ -217,8 +220,7 @@
});
} catch (RejectedExecutionException ex) {
if (!ASYNC_TASKS.isShutdown()) {
- LOG.error("Async write check was rejected from the executor: ", ex);
- throw ex;
+ LOG.warn("Async write check was rejected from the executor: ", ex);
}
}
} else {
@@ -253,8 +255,7 @@
});
} catch (RejectedExecutionException ex) {
if (!ASYNC_TASKS.isShutdown()) {
- LOG.error("Async read check was rejected from the executor: ", ex);
- throw ex;
+ LOG.warn("Async read check was rejected from the executor: ", ex);
}
}
} else {
@@ -501,7 +502,7 @@
WRITE_CHECK_TIMER = null;
READ_CHECK_TIMER = null;
try {
- ThreadPoolUtils.shutdownGraceful(ASYNC_TASKS, TimeUnit.SECONDS.toMillis(10));
+ ThreadPoolUtils.shutdownGraceful(ASYNC_TASKS, 0);
} finally {
ASYNC_TASKS = null;
}
@@ -511,16 +512,18 @@
}
private final ThreadFactory factory = new ThreadFactory() {
+ private long i = 0;
@Override
public Thread newThread(Runnable runnable) {
- Thread thread = new Thread(runnable, "ActiveMQ InactivityMonitor Worker");
+ Thread thread = new Thread(runnable, "ActiveMQ InactivityMonitor Worker " + (i++));
thread.setDaemon(true);
return thread;
}
};
private ThreadPoolExecutor createExecutor() {
- ThreadPoolExecutor exec = new ThreadPoolExecutor(0, Integer.MAX_VALUE, getDefaultKeepAliveTime(), TimeUnit.SECONDS, new SynchronousQueue<Runnable>(), factory);
+ ThreadPoolExecutor exec = new ThreadPoolExecutor(getDefaultCorePoolSize(), getDefaultMaximumPoolSize(), getDefaultKeepAliveTime(),
+ TimeUnit.SECONDS, newWorkQueue(), factory, newRejectionHandler());
exec.allowCoreThreadTimeOut(true);
return exec;
}
@@ -528,4 +531,29 @@
private static int getDefaultKeepAliveTime() {
return Integer.getInteger("org.apache.activemq.transport.AbstractInactivityMonitor.keepAliveTime", 30);
}
+
+ private static int getDefaultCorePoolSize() {
+ return Integer.getInteger("org.apache.activemq.transport.AbstractInactivityMonitor.corePoolSize", 0);
+ }
+
+ private static int getDefaultMaximumPoolSize() {
+ return Integer.getInteger("org.apache.activemq.transport.AbstractInactivityMonitor.maximumPoolSize", Integer.MAX_VALUE);
+ }
+
+ private static int getDefaultWorkQueueCapacity() {
+ return Integer.getInteger("org.apache.activemq.transport.AbstractInactivityMonitor.workQueueCapacity", 0);
+ }
+
+ private static boolean canRejectWork() {
+ return Boolean.getBoolean("org.apache.activemq.transport.AbstractInactivityMonitor.rejectWork");
+ }
+
+ private BlockingQueue<Runnable> newWorkQueue() {
+ final int workQueueCapacity = getDefaultWorkQueueCapacity();
+ return workQueueCapacity > 0 ? new LinkedBlockingQueue<Runnable>(workQueueCapacity) : new SynchronousQueue<Runnable>();
+ }
+
+ private RejectedExecutionHandler newRejectionHandler() {
+ return canRejectWork() ? new ThreadPoolExecutor.AbortPolicy() : new ThreadPoolExecutor.CallerRunsPolicy();
+ }
}
diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/transport/RestrictedThreadPoolInactivityTimeoutTest.java b/activemq-unit-tests/src/test/java/org/apache/activemq/transport/RestrictedThreadPoolInactivityTimeoutTest.java
new file mode 100644
index 0000000..2d7d7a0
--- /dev/null
+++ b/activemq-unit-tests/src/test/java/org/apache/activemq/transport/RestrictedThreadPoolInactivityTimeoutTest.java
@@ -0,0 +1,141 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.transport;
+
+import junit.framework.Test;
+import org.apache.activemq.ActiveMQConnectionFactory;
+import org.apache.activemq.JmsTestSupport;
+import org.apache.activemq.broker.BrokerService;
+import org.apache.activemq.broker.jmx.ManagementContext;
+import org.apache.activemq.util.SocketProxy;
+import org.apache.activemq.util.URISupport;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.jms.Connection;
+import javax.management.JMException;
+import javax.management.ObjectName;
+import java.net.URI;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+
+public class RestrictedThreadPoolInactivityTimeoutTest extends JmsTestSupport {
+ private static final Logger LOG = LoggerFactory.getLogger(RestrictedThreadPoolInactivityTimeoutTest.class);
+
+ public String brokerTransportScheme = "tcp";
+ public Boolean rejectWork = Boolean.FALSE;
+
+ final int poolSize = 2;
+ final int numConnections = 10;
+ final CountDownLatch doneOneConnectionAddress = new CountDownLatch(1);
+ final CountDownLatch doneConsumers = new CountDownLatch(numConnections);
+
+ protected BrokerService createBroker() throws Exception {
+
+ if (rejectWork) {
+ System.setProperty("org.apache.activemq.transport.AbstractInactivityMonitor.workQueueCapacity", Integer.toString(poolSize));
+ System.setProperty("org.apache.activemq.transport.AbstractInactivityMonitor.rejectWork", "true");
+ }
+ System.setProperty("org.apache.activemq.transport.AbstractInactivityMonitor.maximumPoolSize", Integer.toString(poolSize));
+
+ BrokerService broker = super.createBroker();
+ broker.setPersistent(false);
+ broker.setUseJmx(true);
+ broker.setManagementContext(new ManagementContext() {
+ @Override
+ public void unregisterMBean(ObjectName name) throws JMException {
+ if (name.getKeyPropertyListString().contains("remoteAddress")) {
+ // a client connection mbean, removed by inactivity monitor task
+ // simulate a slow mbean unregister
+ LOG.info("SLEEP : " + Thread.currentThread() + ": on remoteAddress unregister: " + name);
+ try {
+ TimeUnit.SECONDS.sleep(2);
+ } catch (InterruptedException ok) {
+ }
+ doneOneConnectionAddress.countDown();
+ } else if (name.getKeyPropertyListString().contains("Consumer")) {
+ // consumer removal from asyncStop task, this is blocked on service lock
+ // during inactivity monitor onException
+ LOG.info(Thread.currentThread() + ": on consumer unregister: " + name);
+ doneConsumers.countDown();
+ }
+ super.unregisterMBean(name);
+ }
+ });
+
+ broker.addConnector(brokerTransportScheme + "://localhost:0?wireFormat.maxInactivityDuration=1000&wireFormat.maxInactivityDurationInitalDelay=1000");
+ return broker;
+ }
+
+ public void initCombosForTestThreadsInvolvedInXInactivityTimeouts() {
+ addCombinationValues("brokerTransportScheme", new Object[]{"tcp", "nio"});
+ addCombinationValues("rejectWork", new Object[] {Boolean.TRUE, Boolean.FALSE});
+ }
+
+ public void testThreadsInvolvedInXInactivityTimeouts() throws Exception {
+
+ URI tcpBrokerUri = URISupport.removeQuery(broker.getTransportConnectors().get(0).getConnectUri());
+
+ SocketProxy proxy = new SocketProxy();
+ proxy.setTarget(tcpBrokerUri);
+ proxy.open();
+
+ // leave the server to do the only inactivity monitoring
+ URI clientUri = URISupport.createURIWithQuery(proxy.getUrl(), "useInactivityMonitor=false");
+ LOG.info("using server uri: " + tcpBrokerUri + ", client uri: " + clientUri);
+
+ ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory(clientUri);
+
+ for (int i=0; i<numConnections;i++) {
+ Connection c = factory.createConnection();
+ c.start();
+ }
+
+ proxy.pause();
+
+ int before = Thread.currentThread().getThreadGroup().activeCount();
+ LOG.info("threads before: " + before);
+
+ // expect inactivity monitor to kick in after 2*timeout
+
+ Thread.yield();
+
+ // after one sleep, unbounded pools will have filled with threads
+ doneOneConnectionAddress.await(10, TimeUnit.SECONDS);
+
+ int after = Thread.currentThread().getThreadGroup().activeCount();
+
+ int diff = Math.abs(before - after);
+ LOG.info("threads after: " + after + ", diff: " + diff);
+
+ assertTrue("Should be at most inactivity monitor pool size * 2. Diff = " + diff, diff <= 2*poolSize);
+
+ assertTrue("all work complete", doneConsumers.await(10, TimeUnit.SECONDS));
+ }
+
+ @Override
+ protected void tearDown() throws Exception {
+ super.tearDown();
+ System.clearProperty("org.apache.activemq.transport.AbstractInactivityMonitor.workQueueCapacity");
+ System.clearProperty("org.apache.activemq.transport.AbstractInactivityMonitor.maximumPoolSize");
+ System.clearProperty("org.apache.activemq.transport.AbstractInactivityMonitor.rejectWork");
+ }
+
+ public static Test suite() {
+ return suite(RestrictedThreadPoolInactivityTimeoutTest.class);
+ }
+}
diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/transport/tcp/InactivityMonitorTest.java b/activemq-unit-tests/src/test/java/org/apache/activemq/transport/tcp/InactivityMonitorTest.java
index a25b094..5c1f484 100644
--- a/activemq-unit-tests/src/test/java/org/apache/activemq/transport/tcp/InactivityMonitorTest.java
+++ b/activemq-unit-tests/src/test/java/org/apache/activemq/transport/tcp/InactivityMonitorTest.java
@@ -24,6 +24,7 @@
import javax.net.SocketFactory;
+import junit.framework.Test;
import org.apache.activemq.CombinationTestSupport;
import org.apache.activemq.command.WireFormatInfo;
import org.apache.activemq.openwire.OpenWireFormat;
@@ -55,6 +56,11 @@
private final AtomicBoolean ignoreClientError = new AtomicBoolean(false);
private final AtomicBoolean ignoreServerError = new AtomicBoolean(false);
+
+ public static Test suite() {
+ return suite(InactivityMonitorTest.class);
+ }
+
@Override
protected void setUp() throws Exception {
super.setUp();
@@ -146,7 +152,7 @@
@Override
public void onException(IOException error) {
- if (!ignoreClientError.get()) {
+ if (!ignoreServerError.get()) {
LOG.info("Server transport error:", error);
serverErrorCount.incrementAndGet();
}
@@ -239,7 +245,6 @@
* @throws URISyntaxException
*/
public void initCombosForTestNoClientHangWithServerBlock() throws Exception {
- startClient();
addCombinationValues("clientInactivityLimit", new Object[] {Long.valueOf(1000)});
addCombinationValues("serverInactivityLimit", new Object[] {Long.valueOf(1000)});