blob: 0df9a856761b4b7e3496b1476f6b65f30236966f [file] [log] [blame]
/*
*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*
*/
package org.apache.qpid.pool;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
/**
* ReferenceCountingExecutorService wraps an ExecutorService in order to provide shared reference to it. It counts
* the references taken, instantiating the service on the first reference, and shutting it down when the last
* reference is released.
*
* <p/>It is important to ensure that an executor service is correctly shut down as failing to do so prevents the JVM
* from terminating due to the existence of non-daemon threads.
*
* <p/><table id="crc><caption>CRC Card</caption>
* <tr><th> Responsibilities <th> Collaborations
* <tr><td> Provide a shared executor service. <td> {@link Executors}
* <tr><td> Shutdown the executor service when not needed. <td> {@link ExecutorService}
* <tr><td> Track references to the executor service.
* <tr><td> Provide configuration of the executor service.
* </table>
*
* @todo Might be more elegant to make this actually implement ExecutorService, providing better hiding of the
* implementation details. Also this class introduces a pattern (albeit specific to this usage) that could be
* generalized to reference count anything. That is, on first instance call a create method, on release of last
* instance call a destroy method. This could definitely be abstracted out as a re-usable piece of code; a
* reference counting factory. It could then be re-used to do reference counting in other places (such as
* messages). Countable objects have a simple create/destroy life cycle, capturable by an interface that the
* ref counting factory can call to manage the lifecycle.
*
* @todo {@link #_poolSize} should be static?
*
* @todo The {@link #getPool()} method breaks the encapsulation of the reference counter. Generally when getPool is used
* further checks are applied to ensure that the executor service has not been shutdown. This passes responsibility
* for managing the lifecycle of the reference counted object onto the caller rather than neatly encapsulating it
* here. Could think about adding more state to the lifecycle, to mark ref counted objects as invalid, and have an
* isValid method, or could make calling code deal with RejectedExecutionException raised by shutdown executors.
*/
public class ReferenceCountingExecutorService
{
/** Defines the smallest thread pool that will be allocated, irrespective of the number of processors. */
private static final int MINIMUM_POOL_SIZE = 4;
/** Holds the number of processors on the machine. */
private static final int NUM_CPUS = Runtime.getRuntime().availableProcessors();
/** Defines the thread pool size to use, which is the larger of the number of CPUs or the minimum size. */
private static final int DEFAULT_POOL_SIZE = Math.max(NUM_CPUS, MINIMUM_POOL_SIZE);
/**
* Holds the singleton instance of this reference counter. This is only created once, statically, so the
* {@link #getInstance()} method does not need to be synchronized.
*/
private static final ReferenceCountingExecutorService _instance = new ReferenceCountingExecutorService();
/** This lock is used to ensure that reference counts are updated atomically with create/destroy operations. */
private final Object _lock = new Object();
/** The shared executor service that is reference counted. */
private ExecutorService _pool;
/** Holds the number of references given out to the executor service. */
private int _refCount = 0;
/** Holds the number of executor threads to create. */
private int _poolSize = Integer.getInteger("amqj.read_write_pool_size", DEFAULT_POOL_SIZE);
/** Thread Factory used to create thread of the pool. Uses the default implementation provided by
* {@link java.util.concurrent.Executors#defaultThreadFactory()} unless reset by the caller.
*/
private ThreadFactory _threadFactory = Executors.defaultThreadFactory();
/**
* Retrieves the singleton instance of this reference counter.
*
* @return The singleton instance of this reference counter.
*/
public static ReferenceCountingExecutorService getInstance()
{
return _instance;
}
/**
* Private constructor to ensure that only a singleton instance can be created.
*/
private ReferenceCountingExecutorService()
{ }
/**
* Provides a reference to a shared executor service, incrementing the reference count.
*
* @return An executor service.
*/
public ExecutorService acquireExecutorService()
{
synchronized (_lock)
{
if (_refCount++ == 0)
{
_pool = new ThreadPoolExecutor(_poolSize, _poolSize,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>(),
_threadFactory);
}
return _pool;
}
}
/**
* Releases a reference to a shared executor service, decrementing the reference count. If the reference count falls
* to zero, the executor service is shut down.
*/
public void releaseExecutorService()
{
synchronized (_lock)
{
if (--_refCount == 0)
{
_pool.shutdownNow();
}
}
}
/**
* Provides access to the executor service, without touching the reference count.
*
* @return The shared executor service, or <tt>null</tt> if none has been instantiated yet.
*/
public ExecutorService getPool()
{
return _pool;
}
/**
* Return the ReferenceCount to this ExecutorService
* @return reference count
*/
public int getReferenceCount()
{
return _refCount;
}
/**
*
* Return the thread factory used by the {@link ThreadPoolExecutor} to create new threads.
*
* @return thread factory
*/
public ThreadFactory getThreadFactory()
{
return _threadFactory;
}
/**
* Sets the thread factory used by the {@link ThreadPoolExecutor} to create new threads.
* <p>
* If the pool has been already created, the change will have no effect until
* {@link #getReferenceCount()} reaches zero and the pool recreated. For this reason,
* callers must invoke this method <i>before</i> calling {@link #acquireExecutorService()}.
*
* @param threadFactory thread factory
*/
public void setThreadFactory(final ThreadFactory threadFactory)
{
if (threadFactory == null)
{
throw new NullPointerException("threadFactory cannot be null");
}
_threadFactory = threadFactory;
}
}