- Deprecate getPool()
- Expose createPool()
- Add new getExecutorService() to benefit from ForkJoinPools
- Add management of ScheduledExecutorServices
git-svn-id: https://svn.apache.org/repos/asf/commons/proper/jcs/trunk@1779944 13f79535-47bb-0310-9956-ffa450edef68
diff --git a/commons-jcs-core/src/main/java/org/apache/commons/jcs/auxiliary/remote/AbstractRemoteAuxiliaryCache.java b/commons-jcs-core/src/main/java/org/apache/commons/jcs/auxiliary/remote/AbstractRemoteAuxiliaryCache.java
index 1228796..0b3d656 100644
--- a/commons-jcs-core/src/main/java/org/apache/commons/jcs/auxiliary/remote/AbstractRemoteAuxiliaryCache.java
+++ b/commons-jcs-core/src/main/java/org/apache/commons/jcs/auxiliary/remote/AbstractRemoteAuxiliaryCache.java
@@ -26,8 +26,8 @@
import java.util.Set;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;
-import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
@@ -77,7 +77,7 @@
private IRemoteCacheAttributes remoteCacheAttributes;
/** A thread pool for gets if configured. */
- private ThreadPoolExecutor pool = null;
+ private ExecutorService pool = null;
/** Should we get asynchronously using a pool. */
private boolean usePoolForGet = false;
@@ -113,7 +113,7 @@
if ( getRemoteCacheAttributes().getGetTimeoutMillis() > 0 )
{
- pool = ThreadPoolManager.getInstance().getPool( getRemoteCacheAttributes().getThreadPoolName() );
+ pool = ThreadPoolManager.getInstance().getExecutorService( getRemoteCacheAttributes().getThreadPoolName() );
if ( log.isDebugEnabled() )
{
log.debug( "Thread Pool = " + pool );
@@ -570,8 +570,7 @@
if ( pool != null )
{
- elems.add(new StatElement<Integer>( "Pool Size", Integer.valueOf(pool.getPoolSize()) ) );
- elems.add(new StatElement<Integer>( "Maximum Pool Size", Integer.valueOf(pool.getMaximumPoolSize()) ) );
+ elems.add(new StatElement<ExecutorService>( "Pool", pool ) );
}
if ( getRemoteCacheService() instanceof ZombieCacheServiceNonLocal )
diff --git a/commons-jcs-core/src/main/java/org/apache/commons/jcs/engine/CacheEventQueue.java b/commons-jcs-core/src/main/java/org/apache/commons/jcs/engine/CacheEventQueue.java
index 92b55e3..445e499 100644
--- a/commons-jcs-core/src/main/java/org/apache/commons/jcs/engine/CacheEventQueue.java
+++ b/commons-jcs-core/src/main/java/org/apache/commons/jcs/engine/CacheEventQueue.java
@@ -19,12 +19,10 @@
* under the License.
*/
-import java.util.concurrent.LinkedBlockingQueue;
-import java.util.concurrent.ThreadPoolExecutor;
-import java.util.concurrent.TimeUnit;
-
import org.apache.commons.jcs.engine.behavior.ICacheListener;
-import org.apache.commons.jcs.utils.threadpool.DaemonThreadFactory;
+import org.apache.commons.jcs.utils.threadpool.PoolConfiguration;
+import org.apache.commons.jcs.utils.threadpool.PoolConfiguration.WhenBlockedPolicy;
+import org.apache.commons.jcs.utils.threadpool.ThreadPoolManager;
/**
* An event queue is used to propagate ordered cache events to one and only one target listener.
@@ -79,15 +77,9 @@
super.initialize(listener, listenerId, cacheName, maxFailure, waitBeforeRetry);
// create a default pool with one worker thread to mimic the SINGLE queue behavior
- LinkedBlockingQueue<Runnable> queue = new LinkedBlockingQueue<Runnable>();
-
- pool = new ThreadPoolExecutor(
- 0,
- 1,
- getWaitToDieMillis(),
- TimeUnit.MILLISECONDS,
- queue,
- new DaemonThreadFactory("CacheEventQueue.QProcessor-" + getCacheName()));
+ pool = ThreadPoolManager.getInstance().createPool(
+ new PoolConfiguration(false, 0, 1, 0, getWaitToDieMillis(), WhenBlockedPolicy.BLOCK, 0),
+ "CacheEventQueue.QProcessor-" + getCacheName());
}
/**
diff --git a/commons-jcs-core/src/main/java/org/apache/commons/jcs/engine/PooledCacheEventQueue.java b/commons-jcs-core/src/main/java/org/apache/commons/jcs/engine/PooledCacheEventQueue.java
index b7edda3..e4b1089 100644
--- a/commons-jcs-core/src/main/java/org/apache/commons/jcs/engine/PooledCacheEventQueue.java
+++ b/commons-jcs-core/src/main/java/org/apache/commons/jcs/engine/PooledCacheEventQueue.java
@@ -2,6 +2,7 @@
import java.util.ArrayList;
import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.ExecutorService;
import java.util.concurrent.ThreadPoolExecutor;
/*
@@ -52,7 +53,10 @@
private static final QueueType queueType = QueueType.POOLED;
/** The Thread Pool to execute events with. */
- protected ThreadPoolExecutor pool = null;
+ protected ExecutorService pool = null;
+
+ /** The Thread Pool queue */
+ protected BlockingQueue<Runnable> queue = null;
/**
* Constructor for the CacheEventQueue object
@@ -86,8 +90,13 @@
super.initialize(listener, listenerId, cacheName, maxFailure, waitBeforeRetry);
// this will share the same pool with other event queues by default.
- pool = ThreadPoolManager.getInstance().getPool(
+ pool = ThreadPoolManager.getInstance().getExecutorService(
(threadPoolName == null) ? "cache_event_queue" : threadPoolName );
+
+ if (pool instanceof ThreadPoolExecutor)
+ {
+ queue = ((ThreadPoolExecutor) pool).getQueue();
+ }
}
/**
@@ -141,16 +150,12 @@
elems.add(new StatElement<Boolean>( "Working", Boolean.valueOf(isWorking()) ) );
elems.add(new StatElement<Boolean>( "Empty", Boolean.valueOf(this.isEmpty()) ) );
- if ( pool.getQueue() != null )
+ if ( queue != null )
{
- BlockingQueue<Runnable> bb = pool.getQueue();
- elems.add(new StatElement<Integer>( "Queue Size", Integer.valueOf(bb.size()) ) );
- elems.add(new StatElement<Integer>( "Queue Capacity", Integer.valueOf(bb.remainingCapacity()) ) );
+ elems.add(new StatElement<Integer>( "Queue Size", Integer.valueOf(queue.size()) ) );
+ elems.add(new StatElement<Integer>( "Queue Capacity", Integer.valueOf(queue.remainingCapacity()) ) );
}
- elems.add(new StatElement<Integer>( "Pool Size", Integer.valueOf(pool.getPoolSize()) ) );
- elems.add(new StatElement<Integer>( "Maximum Pool Size", Integer.valueOf(pool.getMaximumPoolSize()) ) );
-
stats.setStatElements( elems );
return stats;
@@ -165,32 +170,25 @@
@Override
public boolean isEmpty()
{
- if ( pool.getQueue() == null )
- {
- return true;
- }
- else
- {
- return pool.getQueue().size() == 0;
- }
+ return size() == 0;
}
/**
* Returns the number of elements in the queue. If the queue cannot determine the size
- * accurately it will return 1.
+ * accurately it will return 0.
* <p>
* @return number of items in the queue.
*/
@Override
public int size()
{
- if ( pool.getQueue() == null )
+ if ( queue == null )
{
return 0;
}
else
{
- return pool.getQueue().size();
+ return queue.size();
}
}
}
diff --git a/commons-jcs-core/src/main/java/org/apache/commons/jcs/engine/control/event/ElementEventQueue.java b/commons-jcs-core/src/main/java/org/apache/commons/jcs/engine/control/event/ElementEventQueue.java
index 1a6e9bd..44ab573 100644
--- a/commons-jcs-core/src/main/java/org/apache/commons/jcs/engine/control/event/ElementEventQueue.java
+++ b/commons-jcs-core/src/main/java/org/apache/commons/jcs/engine/control/event/ElementEventQueue.java
@@ -20,14 +20,14 @@
*/
import java.io.IOException;
-import java.util.concurrent.LinkedBlockingQueue;
-import java.util.concurrent.ThreadPoolExecutor;
-import java.util.concurrent.TimeUnit;
+import java.util.concurrent.ExecutorService;
import org.apache.commons.jcs.engine.control.event.behavior.IElementEvent;
import org.apache.commons.jcs.engine.control.event.behavior.IElementEventHandler;
import org.apache.commons.jcs.engine.control.event.behavior.IElementEventQueue;
-import org.apache.commons.jcs.utils.threadpool.DaemonThreadFactory;
+import org.apache.commons.jcs.utils.threadpool.PoolConfiguration;
+import org.apache.commons.jcs.utils.threadpool.PoolConfiguration.WhenBlockedPolicy;
+import org.apache.commons.jcs.utils.threadpool.ThreadPoolManager;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
@@ -45,20 +45,16 @@
/** shutdown or not */
private boolean destroyed = false;
- /** The event queue */
- private LinkedBlockingQueue<Runnable> queue;
-
/** The worker thread pool. */
- private ThreadPoolExecutor queueProcessor;
+ private ExecutorService queueProcessor;
/**
* Constructor for the ElementEventQueue object
*/
public ElementEventQueue()
{
- queue = new LinkedBlockingQueue<Runnable>();
- queueProcessor = new ThreadPoolExecutor(1, 1, 0L, TimeUnit.MILLISECONDS,
- queue, new DaemonThreadFactory(THREAD_PREFIX));
+ queueProcessor = ThreadPoolManager.getInstance().createPool(
+ new PoolConfiguration(false, 0, 1, 1, 0, WhenBlockedPolicy.RUN, 1), THREAD_PREFIX);
if ( log.isDebugEnabled() )
{
diff --git a/commons-jcs-core/src/main/java/org/apache/commons/jcs/utils/discovery/UDPDiscoveryReceiver.java b/commons-jcs-core/src/main/java/org/apache/commons/jcs/utils/discovery/UDPDiscoveryReceiver.java
index e8f9cba..07a6687 100644
--- a/commons-jcs-core/src/main/java/org/apache/commons/jcs/utils/discovery/UDPDiscoveryReceiver.java
+++ b/commons-jcs-core/src/main/java/org/apache/commons/jcs/utils/discovery/UDPDiscoveryReceiver.java
@@ -25,14 +25,15 @@
import java.net.DatagramPacket;
import java.net.InetAddress;
import java.net.MulticastSocket;
-import java.util.concurrent.Executors;
-import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.ExecutorService;
import org.apache.commons.jcs.engine.CacheInfo;
import org.apache.commons.jcs.engine.behavior.IShutdownObserver;
import org.apache.commons.jcs.io.ObjectInputStreamClassLoaderAware;
import org.apache.commons.jcs.utils.discovery.UDPDiscoveryMessage.BroadcastType;
-import org.apache.commons.jcs.utils.threadpool.DaemonThreadFactory;
+import org.apache.commons.jcs.utils.threadpool.PoolConfiguration;
+import org.apache.commons.jcs.utils.threadpool.PoolConfiguration.WhenBlockedPolicy;
+import org.apache.commons.jcs.utils.threadpool.ThreadPoolManager;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
@@ -56,7 +57,7 @@
private static final int maxPoolSize = 2;
/** The processor */
- private ThreadPoolExecutor pooledExecutor = null;
+ private ExecutorService pooledExecutor = null;
/** number of messages received. For debugging and testing. */
private int cnt = 0;
@@ -91,10 +92,9 @@
this.multicastPort = multicastPort;
// create a small thread pool to handle a barrage
- pooledExecutor = (ThreadPoolExecutor)Executors.newFixedThreadPool(maxPoolSize,
- new DaemonThreadFactory("JCS-UDPDiscoveryReceiver-", Thread.MIN_PRIORITY));
- pooledExecutor.setRejectedExecutionHandler(new ThreadPoolExecutor.DiscardOldestPolicy());
- //pooledExecutor.setMinimumPoolSize(1);
+ pooledExecutor = ThreadPoolManager.getInstance().createPool(
+ new PoolConfiguration(false, 0, maxPoolSize, maxPoolSize, 0, WhenBlockedPolicy.DISCARDOLDEST, maxPoolSize),
+ "JCS-UDPDiscoveryReceiver-", Thread.MIN_PRIORITY);
if ( log.isInfoEnabled() )
{
diff --git a/commons-jcs-core/src/main/java/org/apache/commons/jcs/utils/threadpool/ThreadPoolManager.java b/commons-jcs-core/src/main/java/org/apache/commons/jcs/utils/threadpool/ThreadPoolManager.java
index 1d9cf1a..93ca842 100644
--- a/commons-jcs-core/src/main/java/org/apache/commons/jcs/utils/threadpool/ThreadPoolManager.java
+++ b/commons-jcs-core/src/main/java/org/apache/commons/jcs/utils/threadpool/ThreadPoolManager.java
@@ -23,7 +23,10 @@
import java.util.Properties;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
@@ -39,30 +42,12 @@
* This manager forces you to use a bounded queue. By default it uses the current thread for
* execution when the buffer is full and no free threads can be created.
* <p>
- * You can specify the props file to use or pass in a properties object prior to configuration. By
- * default it looks for configuration information in thread_pool.properties.
+ * You can specify the props file to use or pass in a properties object prior to configuration.
* <p>
* If set, the Properties object will take precedence.
* <p>
- * If a value is not set for a particular pool, the hard coded defaults will be used.
- *
- * <pre>
- * int boundarySize_DEFAULT = 2000;
- *
- * int maximumPoolSize_DEFAULT = 150;
- *
- * int minimumPoolSize_DEFAULT = number of processors as reported by the JVM;
- *
- * int keepAliveTime_DEFAULT = 1000 * 60 * 5;
- *
- * boolean abortWhenBlocked = false;
- *
- * String whenBlockedPolicy_DEFAULT = IPoolConfiguration.POLICY_RUN;
- *
- * int startUpSize_DEFAULT = 4;
- * </pre>
- *
- * You can configure default settings by specifying a default pool in the properties, ie "cache.ccf"
+ * If a value is not set for a particular pool, the hard coded defaults in <code>PoolConfiguration</code> will be used.
+ * You can configure default settings by specifying <code>thread_pool.default</code> in the properties, ie "cache.ccf"
* <p>
* @author Aaron Smuts
*/
@@ -80,7 +65,13 @@
/** default property file name */
private static final String DEFAULT_PROP_NAME_ROOT = "thread_pool.default";
- /**
+ /** the scheduler root property name */
+ private static final String PROP_NAME_SCHEDULER_ROOT = "scheduler_pool";
+
+ /** default scheduler property file name */
+ private static final String DEFAULT_PROP_NAME_SCHEDULER_ROOT = "scheduler_pool.default";
+
+ /**
* You can specify the properties to be used to configure the thread pool. Setting this post
* initialization will have no effect.
*/
@@ -90,24 +81,42 @@
private static ThreadPoolManager INSTANCE = null;
/** Map of names to pools. */
- private ConcurrentHashMap<String, ThreadPoolExecutor> pools;
+ private ConcurrentHashMap<String, ExecutorService> pools;
+
+ /** Map of names to scheduler pools. */
+ private ConcurrentHashMap<String, ScheduledExecutorService> schedulerPools;
/**
* No instances please. This is a singleton.
*/
private ThreadPoolManager()
{
- this.pools = new ConcurrentHashMap<String, ThreadPoolExecutor>();
+ this.pools = new ConcurrentHashMap<String, ExecutorService>();
+ this.schedulerPools = new ConcurrentHashMap<String, ScheduledExecutorService>();
configure();
}
/**
* Creates a pool based on the configuration info.
* <p>
- * @param config
- * @return A ThreadPoll wrapper
+ * @param config the pool configuration
+ * @param threadNamePrefix prefix for the thread names of the pool
+ * @return A ThreadPool wrapper
*/
- private ThreadPoolExecutor createPool( PoolConfiguration config )
+ public ExecutorService createPool( PoolConfiguration config, String threadNamePrefix)
+ {
+ return createPool(config, threadNamePrefix, Thread.NORM_PRIORITY);
+ }
+
+ /**
+ * Creates a pool based on the configuration info.
+ * <p>
+ * @param config the pool configuration
+ * @param threadNamePrefix prefix for the thread names of the pool
+ * @param threadPriority the priority of the created threads
+ * @return A ThreadPool wrapper
+ */
+ public ExecutorService createPool( PoolConfiguration config, String threadNamePrefix, int threadPriority )
{
BlockingQueue<Runnable> queue = null;
if ( config.isUseBoundary() )
@@ -134,7 +143,7 @@
config.getKeepAliveTime(),
TimeUnit.MILLISECONDS,
queue,
- new DaemonThreadFactory("JCS-ThreadPoolManager-"));
+ new DaemonThreadFactory(threadNamePrefix, threadPriority));
// when blocked policy
switch (config.getWhenBlockedPolicy())
@@ -164,6 +173,23 @@
}
/**
+ * Creates a scheduler pool based on the configuration info.
+ * <p>
+ * @param config the pool configuration
+ * @param threadNamePrefix prefix for the thread names of the pool
+ * @param threadPriority the priority of the created threads
+ * @return A ScheduledExecutorService
+ */
+ public ScheduledExecutorService createSchedulerPool( PoolConfiguration config, String threadNamePrefix, int threadPriority )
+ {
+ ScheduledExecutorService scheduler = Executors.newScheduledThreadPool(
+ config.getMaximumPoolSize(),
+ new DaemonThreadFactory(threadNamePrefix, threadPriority));
+
+ return scheduler;
+ }
+
+ /**
* Returns a configured instance of the ThreadPoolManger To specify a configuration file or
* Properties object to use call the appropriate setter prior to calling getInstance.
* <p>
@@ -185,34 +211,46 @@
{
if ( INSTANCE != null )
{
- for ( String poolName : INSTANCE.getPoolNames())
+ for ( ExecutorService pool : INSTANCE.pools.values() )
{
try
{
- INSTANCE.getPool(poolName).shutdownNow();
+ pool.shutdownNow();
}
catch (Throwable t)
{
- log.warn("Failed to close pool " + poolName, t);
+ log.warn("Failed to close pool " + pool, t);
}
}
-
+
+ for ( ScheduledExecutorService pool : INSTANCE.schedulerPools.values() )
+ {
+ try
+ {
+ pool.shutdownNow();
+ }
+ catch (Throwable t)
+ {
+ log.warn("Failed to close pool " + pool, t);
+ }
+ }
+
INSTANCE = null;
}
}
/**
- * Returns a pool by name. If a pool by this name does not exist in the configuration file or
+ * Returns an executor service by name. If a service by this name does not exist in the configuration file or
* properties, one will be created using the default values.
* <p>
- * Pools are lazily created.
+ * Services are lazily created.
* <p>
* @param name
- * @return The thread pool configured for the name.
+ * @return The executor service configured for the name.
*/
- public ThreadPoolExecutor getPool( String name )
+ public ExecutorService getExecutorService( String name )
{
- ThreadPoolExecutor pool = pools.get( name );
+ ExecutorService pool = pools.get( name );
if ( pool == null )
{
@@ -222,12 +260,17 @@
}
PoolConfiguration config = loadConfig( PROP_NAME_ROOT + "." + name );
- pool = createPool( config );
- ThreadPoolExecutor _pool = pools.putIfAbsent( name, pool );
- if (_pool != null)
+ ExecutorService _pool = createPool( config, "JCS-ThreadPoolManager-" + name + "-" );
+ pool = pools.putIfAbsent( name, _pool );
+ if (pool == null)
{
pool = _pool;
}
+ else
+ {
+ // already created in another thread
+ _pool.shutdownNow();
+ }
if ( log.isDebugEnabled() )
{
@@ -237,6 +280,61 @@
return pool;
}
+
+ /**
+ * Returns a pool by name. If a pool by this name does not exist in the configuration file or
+ * properties, one will be created using the default values.
+ * <p>
+ * Pools are lazily created.
+ * <p>
+ * @param name
+ * @return The thread pool configured for the name.
+ *
+ * @deprecated Use getExecutorService() instead
+ */
+ @Deprecated
+ public ThreadPoolExecutor getPool( String name )
+ {
+ return (ThreadPoolExecutor) getExecutorService(name);
+ }
+
+ /**
+ * Returns a scheduler pool by name. If a pool by this name does not exist in the configuration file or
+ * properties, one will be created using the default values.
+ * <p>
+ * Pools are lazily created.
+ * <p>
+ * @param name
+ * @return The scheduler pool configured for the name.
+ */
+ public ScheduledExecutorService getSchedulerPool( String name )
+ {
+ ScheduledExecutorService pool = schedulerPools.get( name );
+
+ if ( pool == null )
+ {
+ if ( log.isDebugEnabled() )
+ {
+ log.debug( "Creating scheduler pool for name [" + name + "]" );
+ }
+
+ PoolConfiguration defaultSchedulerConfig = loadConfig( DEFAULT_PROP_NAME_SCHEDULER_ROOT );
+ PoolConfiguration config = loadConfig( PROP_NAME_SCHEDULER_ROOT + "." + name, defaultSchedulerConfig );
+ ScheduledExecutorService _pool = createSchedulerPool( config, "JCS-ThreadPoolManager-" + name + "-", Thread.NORM_PRIORITY );
+ pool = schedulerPools.putIfAbsent( name, _pool );
+ if (pool == null)
+ {
+ pool = _pool;
+ }
+ else
+ {
+ // already created in another thread
+ _pool.shutdownNow();
+ }
+ }
+
+ return pool;
+ }
/**
* Returns the names of all configured pools.
@@ -282,14 +380,26 @@
}
/**
- * Configures the default PoolConfiguration settings.
+ * Configures the PoolConfiguration settings.
* <p>
- * @param root
+ * @param root the configuration key prefix
* @return PoolConfiguration
*/
private PoolConfiguration loadConfig( String root )
{
- PoolConfiguration config = defaultConfig.clone();
+ return loadConfig(root, defaultConfig);
+ }
+
+ /**
+ * Configures the PoolConfiguration settings.
+ * <p>
+ * @param root the configuration key prefix
+ * @param defaultPoolConfiguration the default configuration
+ * @return PoolConfiguration
+ */
+ private PoolConfiguration loadConfig( String root, PoolConfiguration defaultPoolConfiguration )
+ {
+ PoolConfiguration config = defaultPoolConfiguration.clone();
PropertySetter.setProperties( config, props, root + "." );
if ( log.isDebugEnabled() )
diff --git a/commons-jcs-core/src/test/java/org/apache/commons/jcs/utils/threadpool/ThreadPoolManagerUnitTest.java b/commons-jcs-core/src/test/java/org/apache/commons/jcs/utils/threadpool/ThreadPoolManagerUnitTest.java
index a1bed66..1c89cc5 100644
--- a/commons-jcs-core/src/test/java/org/apache/commons/jcs/utils/threadpool/ThreadPoolManagerUnitTest.java
+++ b/commons-jcs-core/src/test/java/org/apache/commons/jcs/utils/threadpool/ThreadPoolManagerUnitTest.java
@@ -126,10 +126,10 @@
assertNotNull( mgr );
String poolName1 = "testGetPoolNames1";
- mgr.getPool( poolName1 );
+ mgr.getExecutorService( poolName1 );
String poolName2 = "testGetPoolNames2";
- mgr.getPool( poolName2 );
+ mgr.getExecutorService( poolName2 );
ArrayList<String> names = mgr.getPoolNames();
assertTrue( "Should have name in list.", names.contains( poolName1 ) );