AMQ-8039 - support system property configuration of the inactivity monitor thread pool, follow example of nio. new test to verify
diff --git a/activemq-client/src/main/java/org/apache/activemq/transport/AbstractInactivityMonitor.java b/activemq-client/src/main/java/org/apache/activemq/transport/AbstractInactivityMonitor.java
index bef8d7f..6182be7 100644
--- a/activemq-client/src/main/java/org/apache/activemq/transport/AbstractInactivityMonitor.java
+++ b/activemq-client/src/main/java/org/apache/activemq/transport/AbstractInactivityMonitor.java
@@ -18,7 +18,10 @@
 
 import java.io.IOException;
 import java.util.Timer;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.LinkedBlockingQueue;
 import java.util.concurrent.RejectedExecutionException;
+import java.util.concurrent.RejectedExecutionHandler;
 import java.util.concurrent.SynchronousQueue;
 import java.util.concurrent.ThreadFactory;
 import java.util.concurrent.ThreadPoolExecutor;
@@ -217,8 +220,7 @@
                 });
             } catch (RejectedExecutionException ex) {
                 if (!ASYNC_TASKS.isShutdown()) {
-                    LOG.error("Async write check was rejected from the executor: ", ex);
-                    throw ex;
+                    LOG.warn("Async write check was rejected from the executor: ", ex);
                 }
             }
         } else {
@@ -253,8 +255,7 @@
                 });
             } catch (RejectedExecutionException ex) {
                 if (!ASYNC_TASKS.isShutdown()) {
-                    LOG.error("Async read check was rejected from the executor: ", ex);
-                    throw ex;
+                    LOG.warn("Async read check was rejected from the executor: ", ex);
                 }
             }
         } else {
@@ -501,7 +502,7 @@
                     WRITE_CHECK_TIMER = null;
                     READ_CHECK_TIMER = null;
                     try {
-                        ThreadPoolUtils.shutdownGraceful(ASYNC_TASKS, TimeUnit.SECONDS.toMillis(10));
+                        ThreadPoolUtils.shutdownGraceful(ASYNC_TASKS, 0);
                     } finally {
                         ASYNC_TASKS = null;
                     }
@@ -511,16 +512,18 @@
     }
 
     private final ThreadFactory factory = new ThreadFactory() {
+        private long i = 0;
         @Override
         public Thread newThread(Runnable runnable) {
-            Thread thread = new Thread(runnable, "ActiveMQ InactivityMonitor Worker");
+            Thread thread = new Thread(runnable, "ActiveMQ InactivityMonitor Worker " + (i++));
             thread.setDaemon(true);
             return thread;
         }
     };
 
     private ThreadPoolExecutor createExecutor() {
-        ThreadPoolExecutor exec = new ThreadPoolExecutor(0, Integer.MAX_VALUE, getDefaultKeepAliveTime(), TimeUnit.SECONDS, new SynchronousQueue<Runnable>(), factory);
+        ThreadPoolExecutor exec = new ThreadPoolExecutor(getDefaultCorePoolSize(), getDefaultMaximumPoolSize(), getDefaultKeepAliveTime(),
+                TimeUnit.SECONDS, newWorkQueue(), factory, newRejectionHandler());
         exec.allowCoreThreadTimeOut(true);
         return exec;
     }
@@ -528,4 +531,29 @@
     private static int getDefaultKeepAliveTime() {
         return Integer.getInteger("org.apache.activemq.transport.AbstractInactivityMonitor.keepAliveTime", 30);
     }
+
+    private static int getDefaultCorePoolSize() {
+        return Integer.getInteger("org.apache.activemq.transport.AbstractInactivityMonitor.corePoolSize", 0);
+    }
+
+    private static int getDefaultMaximumPoolSize() {
+        return Integer.getInteger("org.apache.activemq.transport.AbstractInactivityMonitor.maximumPoolSize", Integer.MAX_VALUE);
+    }
+
+    private static int getDefaultWorkQueueCapacity() {
+        return Integer.getInteger("org.apache.activemq.transport.AbstractInactivityMonitor.workQueueCapacity", 0);
+    }
+
+    private static boolean canRejectWork() {
+        return Boolean.getBoolean("org.apache.activemq.transport.AbstractInactivityMonitor.rejectWork");
+    }
+
+    private BlockingQueue<Runnable> newWorkQueue() {
+        final int workQueueCapacity = getDefaultWorkQueueCapacity();
+        return workQueueCapacity > 0 ? new LinkedBlockingQueue<Runnable>(workQueueCapacity) : new SynchronousQueue<Runnable>();
+    }
+
+    private RejectedExecutionHandler newRejectionHandler() {
+        return canRejectWork() ? new ThreadPoolExecutor.AbortPolicy() : new ThreadPoolExecutor.CallerRunsPolicy();
+    }
 }
diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/transport/RestrictedThreadPoolInactivityTimeoutTest.java b/activemq-unit-tests/src/test/java/org/apache/activemq/transport/RestrictedThreadPoolInactivityTimeoutTest.java
new file mode 100644
index 0000000..2d7d7a0
--- /dev/null
+++ b/activemq-unit-tests/src/test/java/org/apache/activemq/transport/RestrictedThreadPoolInactivityTimeoutTest.java
@@ -0,0 +1,141 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.transport;
+
+import junit.framework.Test;
+import org.apache.activemq.ActiveMQConnectionFactory;
+import org.apache.activemq.JmsTestSupport;
+import org.apache.activemq.broker.BrokerService;
+import org.apache.activemq.broker.jmx.ManagementContext;
+import org.apache.activemq.util.SocketProxy;
+import org.apache.activemq.util.URISupport;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.jms.Connection;
+import javax.management.JMException;
+import javax.management.ObjectName;
+import java.net.URI;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+
+public class RestrictedThreadPoolInactivityTimeoutTest extends JmsTestSupport {
+    private static final Logger LOG = LoggerFactory.getLogger(RestrictedThreadPoolInactivityTimeoutTest.class);
+
+    public String brokerTransportScheme = "tcp";
+    public Boolean rejectWork = Boolean.FALSE;
+
+    final int poolSize = 2;
+    final int numConnections = 10;
+    final CountDownLatch doneOneConnectionAddress = new CountDownLatch(1);
+    final CountDownLatch doneConsumers = new CountDownLatch(numConnections);
+
+    protected BrokerService createBroker() throws Exception {
+
+        if (rejectWork) {
+            System.setProperty("org.apache.activemq.transport.AbstractInactivityMonitor.workQueueCapacity", Integer.toString(poolSize));
+            System.setProperty("org.apache.activemq.transport.AbstractInactivityMonitor.rejectWork", "true");
+        }
+        System.setProperty("org.apache.activemq.transport.AbstractInactivityMonitor.maximumPoolSize", Integer.toString(poolSize));
+
+        BrokerService broker = super.createBroker();
+        broker.setPersistent(false);
+        broker.setUseJmx(true);
+        broker.setManagementContext(new ManagementContext() {
+            @Override
+            public void unregisterMBean(ObjectName name) throws JMException {
+                if (name.getKeyPropertyListString().contains("remoteAddress")) {
+                    // a client connection mbean, removed by inactivity monitor task
+                    // simulate a slow mbean unregister
+                    LOG.info("SLEEP : " + Thread.currentThread() + ": on remoteAddress unregister: " + name);
+                    try {
+                        TimeUnit.SECONDS.sleep(2);
+                    } catch (InterruptedException ok) {
+                    }
+                    doneOneConnectionAddress.countDown();
+                } else if (name.getKeyPropertyListString().contains("Consumer")) {
+                    // consumer removal from asyncStop task, this is blocked on service lock
+                    // during inactivity monitor onException
+                    LOG.info(Thread.currentThread() + ": on consumer unregister: " + name);
+                    doneConsumers.countDown();
+                }
+                super.unregisterMBean(name);
+            }
+        });
+
+        broker.addConnector(brokerTransportScheme + "://localhost:0?wireFormat.maxInactivityDuration=1000&wireFormat.maxInactivityDurationInitalDelay=1000");
+        return broker;
+    }
+
+    public void initCombosForTestThreadsInvolvedInXInactivityTimeouts() {
+        addCombinationValues("brokerTransportScheme", new Object[]{"tcp", "nio"});
+        addCombinationValues("rejectWork", new Object[] {Boolean.TRUE, Boolean.FALSE});
+    }
+
+    public void testThreadsInvolvedInXInactivityTimeouts() throws Exception {
+
+        URI tcpBrokerUri = URISupport.removeQuery(broker.getTransportConnectors().get(0).getConnectUri());
+
+        SocketProxy proxy = new SocketProxy();
+        proxy.setTarget(tcpBrokerUri);
+        proxy.open();
+
+        // leave the server to do the only inactivity monitoring
+        URI clientUri =  URISupport.createURIWithQuery(proxy.getUrl(), "useInactivityMonitor=false");
+        LOG.info("using server uri: " + tcpBrokerUri + ", client uri: " + clientUri);
+
+        ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory(clientUri);
+
+        for (int i=0; i<numConnections;i++) {
+            Connection c = factory.createConnection();
+            c.start();
+        }
+
+        proxy.pause();
+
+        int before = Thread.currentThread().getThreadGroup().activeCount();
+        LOG.info("threads before: " + before);
+
+        // expect inactivity monitor to kick in after 2*timeout
+
+        Thread.yield();
+
+        // after one sleep, unbounded pools will have filled with threads
+        doneOneConnectionAddress.await(10, TimeUnit.SECONDS);
+
+        int after = Thread.currentThread().getThreadGroup().activeCount();
+
+        int diff = Math.abs(before - after);
+        LOG.info("threads after: " + after + ", diff: " + diff);
+
+        assertTrue("Should be at most inactivity monitor pool size * 2. Diff = " + diff, diff <= 2*poolSize);
+
+        assertTrue("all work complete", doneConsumers.await(10, TimeUnit.SECONDS));
+    }
+
+    @Override
+    protected void tearDown() throws Exception {
+        super.tearDown();
+        System.clearProperty("org.apache.activemq.transport.AbstractInactivityMonitor.workQueueCapacity");
+        System.clearProperty("org.apache.activemq.transport.AbstractInactivityMonitor.maximumPoolSize");
+        System.clearProperty("org.apache.activemq.transport.AbstractInactivityMonitor.rejectWork");
+    }
+
+    public static Test suite() {
+        return suite(RestrictedThreadPoolInactivityTimeoutTest.class);
+    }
+}
diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/transport/tcp/InactivityMonitorTest.java b/activemq-unit-tests/src/test/java/org/apache/activemq/transport/tcp/InactivityMonitorTest.java
index a25b094..5c1f484 100644
--- a/activemq-unit-tests/src/test/java/org/apache/activemq/transport/tcp/InactivityMonitorTest.java
+++ b/activemq-unit-tests/src/test/java/org/apache/activemq/transport/tcp/InactivityMonitorTest.java
@@ -24,6 +24,7 @@
 
 import javax.net.SocketFactory;
 
+import junit.framework.Test;
 import org.apache.activemq.CombinationTestSupport;
 import org.apache.activemq.command.WireFormatInfo;
 import org.apache.activemq.openwire.OpenWireFormat;
@@ -55,6 +56,11 @@
     private final AtomicBoolean ignoreClientError = new AtomicBoolean(false);
     private final AtomicBoolean ignoreServerError = new AtomicBoolean(false);
 
+
+    public static Test suite() {
+         return suite(InactivityMonitorTest.class);
+    }
+
     @Override
     protected void setUp() throws Exception {
         super.setUp();
@@ -146,7 +152,7 @@
 
                 @Override
                 public void onException(IOException error) {
-                    if (!ignoreClientError.get()) {
+                    if (!ignoreServerError.get()) {
                         LOG.info("Server transport error:", error);
                         serverErrorCount.incrementAndGet();
                     }
@@ -239,7 +245,6 @@
      * @throws URISyntaxException
      */
     public void initCombosForTestNoClientHangWithServerBlock() throws Exception {
-        startClient();
 
         addCombinationValues("clientInactivityLimit", new Object[] {Long.valueOf(1000)});
         addCombinationValues("serverInactivityLimit", new Object[] {Long.valueOf(1000)});