blob: 604092addd856c8da1cad2ea8490de9cb70d5d98 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.sling.commons.threads.impl;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.Callable;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.RejectedExecutionHandler;
import java.util.concurrent.SynchronousQueue;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import org.apache.sling.commons.threads.ModifiableThreadPoolConfig;
import org.apache.sling.commons.threads.ThreadPool;
import org.apache.sling.commons.threads.ThreadPoolConfig;
import org.apache.sling.commons.threads.ThreadPoolManager;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* The DefaultThreadPool class implements the {@link ThreadPool} interface.
* Instances of this class are managed by the {@link ThreadPoolManager}.
*/
public class DefaultThreadPool
implements ThreadPool {
/** By default we use the logger for this class. */
protected final Logger logger = LoggerFactory.getLogger(this.getClass());
/** The name of this thread pool */
protected final String name;
/** The executor. */
protected ThreadPoolExecutor executor;
protected final ModifiableThreadPoolConfig configuration;
/**
* Create a new thread pool.
* @param name - The name of the thread pool. If null {@link DefaultThreadPoolManager#DEFAULT_THREADPOOL_NAME}
* is used
*/
public DefaultThreadPool(final String name,
final ThreadPoolConfig origConfig) {
// name
if ( name != null ) {
this.name = name;
} else {
this.name = DefaultThreadPoolManager.DEFAULT_THREADPOOL_NAME;
}
this.logger.info("Initializing thread pool [{}] ...", this.name);
this.configuration = new ModifiableThreadPoolConfig(origConfig);
// factory
final ThreadFactory delegateThreadFactory;
if (this.configuration.getFactory() == null) {
logger.debug("Thread pool [{}] ; No ThreadFactory is configured. Will use JVM default thread factory: {}",
this.name, ExtendedThreadFactory.class.getName());
delegateThreadFactory = Executors.defaultThreadFactory();
} else {
delegateThreadFactory = this.configuration.getFactory();
}
// Min pool size
if (this.configuration.getMinPoolSize() < 0) {
this.configuration.setMinPoolSize(1);
this.logger.warn("min-pool-size < 0 for pool \"" + this.name + "\". Set to 1");
}
// Max pool size
if ( this.configuration.getMaxPoolSize() < 0 ) {
this.configuration.setMaxPoolSize(Integer.MAX_VALUE);
}
// Set priority and daemon flag
final ExtendedThreadFactory threadFactory = new ExtendedThreadFactory(
delegateThreadFactory,
this.name,
this.configuration.getPriority(),
this.configuration.isDaemon()
);
// Keep alive time
if (this.configuration.getKeepAliveTime() < 0) {
this.configuration.setKeepAliveTime(1000);
this.logger.warn("keep-alive-time-ms < 0 for pool \"" + this.name + "\". Set to 1000");
}
// Queue
final BlockingQueue<Runnable> queue;
if (this.configuration.getQueueSize() != 0) {
if (this.configuration.getQueueSize() > 0) {
queue = new java.util.concurrent.ArrayBlockingQueue<Runnable>(this.configuration.getQueueSize());
} else {
// SLING-7407 : queue size is -1 (or negative) == unbounded
// in this case the max pool size wouldn't have any effect, since the
// pool is only increased (ie threads only created) when the queue is blocked
// but with an unbounded queue that never happens, thus you'd always get only
// maximum min queue size threads.
// To fix this somewhat odd behaviour, we now automatically set the min to max for this case:
if (this.configuration.getMinPoolSize() < this.configuration.getMaxPoolSize()) {
this.logger.info("min-pool-size (" + configuration.getMinPoolSize() +
") < max-pool-size (" + configuration.getMaxPoolSize() + ") for pool \"" + this.name +
"\" which has unbounded queue (queue size -1). Set to " + configuration.getMaxPoolSize());
this.configuration.setMinPoolSize(configuration.getMaxPoolSize());
}
queue = new LinkedBlockingQueue<Runnable>();
}
} else {
queue = new SynchronousQueue<Runnable>();
}
RejectedExecutionHandler handler = null;
switch (this.configuration.getBlockPolicy()) {
case ABORT :
handler = new ThreadPoolExecutor.AbortPolicy();
break;
case DISCARD :
handler = new ThreadPoolExecutor.DiscardPolicy();
break;
case DISCARDOLDEST :
handler = new ThreadPoolExecutor.DiscardOldestPolicy();
break;
case RUN :
handler = new ThreadPoolExecutor.CallerRunsPolicy();
break;
}
try {
this.executor = new ThreadPoolExecutorCleaningThreadLocals(this.configuration.getMinPoolSize(),
this.configuration.getMaxPoolSize(),
this.configuration.getKeepAliveTime(),
TimeUnit.MILLISECONDS,
queue,
threadFactory,
handler,
new LoggingThreadLocalChangeListener());
} catch (IllegalStateException e) {
logger.warn("Unsupported JRE, cannot register ThreadPoolExecutorCleaningThreadLocals due to '{}', fall back to regular ThreadPoolExecutor", e.getMessage(), e);
this.executor = new ThreadPoolExecutor(this.configuration.getMinPoolSize(),
this.configuration.getMaxPoolSize(),
this.configuration.getKeepAliveTime(),
TimeUnit.MILLISECONDS,
queue,
threadFactory,
handler);
}
this.logger.info("Thread pool [{}] initialized.", name);
}
private static class LoggingThreadLocalChangeListener implements ThreadLocalChangeListener {
@Override
public void changed(Mode mode, Thread thread, ThreadLocal<?> threadLocal, Object value) {
LOGGER.debug("Thread '{}' {} ThreadLocal {} with value {}", thread, mode,
threadLocal != null ? threadLocal.getClass() : "<null>", value);
}
}
/**
* @see org.apache.sling.commons.threads.ThreadPool#getName()
*/
public String getName() {
return name;
}
/**
* @see org.apache.sling.commons.threads.ThreadPool#getConfiguration()
*/
public ThreadPoolConfig getConfiguration() {
return this.configuration;
}
/**
* @see org.apache.sling.commons.threads.ThreadPool#execute(java.lang.Runnable)
*/
public void execute(final Runnable runnable) {
checkExecutor();
if ( runnable != null ) {
if ( logger.isDebugEnabled() ) {
logOperation("Executing runnable: ", runnable);
}
executor.execute(runnable);
}
}
/**
* @see org.apache.sling.commons.threads.ThreadPool#submit(java.util.concurrent.Callable)
*/
public <T> Future<T> submit(final Callable<T> callable) {
checkExecutor();
if ( logger.isDebugEnabled() ) {
logOperation("Submitting callable: ", callable);
}
return executor.submit(callable);
}
/**
* @see org.apache.sling.commons.threads.ThreadPool#submit(java.lang.Runnable)
*/
public Future<?> submit(final Runnable runnable) {
checkExecutor();
if ( logger.isDebugEnabled() ) {
logOperation("Submitting runnable: ", runnable);
}
return executor.submit(runnable);
}
/**
* Shut down the threadpool.
*/
public void shutdown() {
this.logger.info("Shutting down thread pool [{}] ...", name);
if ( this.executor != null ) {
if (this.configuration.isShutdownGraceful()) {
this.executor.shutdown();
} else {
this.executor.shutdownNow();
}
try {
if (this.configuration.getShutdownWaitTimeMs() > 0) {
if (!this.executor.awaitTermination(this.configuration.getShutdownWaitTimeMs(), TimeUnit.MILLISECONDS)) {
logger.warn("Running commands have not terminated within "
+ this.configuration.getShutdownWaitTimeMs()
+ "ms. Will shut them down by interruption");
this.executor.shutdownNow(); // TODO: shouldn't this be outside the if statement?!
}
}
} catch (final InterruptedException ie) {
this.logger.error("Cannot shutdown thread pool [" + this.name + "]", ie);
}
this.executor = null;
}
this.logger.info("Thread pool [{}] is shut down.", this.name);
}
public ThreadPoolExecutor getExecutor() {
return this.executor;
}
private void checkExecutor() {
if ( this.executor == null ) {
throw new RejectedExecutionException("Thread pool " + this.name + " is already shutdown.");
}
}
private void logOperation(final String msg, final Object obj) {
logger.debug("{} {}, pool={}, active={}, corePoolSize={}, maxPoolSize={}, queueSize={}",
new Object[] {msg, obj, name,
executor.getActiveCount(),
executor.getCorePoolSize(),
executor.getMaximumPoolSize(),
executor.getQueue().size()});
}
}