blob: 5e578a501fef65d70425e26b7e5b0104645990b8 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.cassandra.concurrent;
import java.lang.Thread.UncaughtExceptionHandler;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.RejectedExecutionHandler;
import java.util.concurrent.TimeUnit;
import java.util.function.Function;
import org.apache.cassandra.concurrent.NamedThreadFactory.MetaFactory;
import static java.lang.Thread.NORM_PRIORITY;
import static java.util.concurrent.TimeUnit.MINUTES;
import static org.apache.cassandra.utils.concurrent.BlockingQueues.newBlockingQueue;
/**
* Configure a {@link ThreadPoolExecutorPlus}, applying Cassandra's best practices by default
* <li>Core threads may timeout, and use a default {@link #keepAlive} time in {@link #keepAliveUnits}
* <li>Threads share the same {@link ThreadGroup}, which may be configurably a child of a specified {@link ThreadGroup}
* descended from the same parent of the {@link MetaFactory}
* <li>By default queues are unbounded in length
* <li>The default {@link RejectedExecutionHandler} is implementation dependent, but may be overridden
* <li>The default {@link UncaughtExceptionHandler} is inherited from {@link MetaFactory}, which in turn receives it
* from the {@link ExecutorBuilderFactory}
*/
public class ThreadPoolExecutorBuilder<E extends ExecutorPlus> extends MetaFactory implements ExecutorBuilder<E>
{
static <E extends SequentialExecutorPlus> ExecutorBuilder<E> sequential(Function<ThreadPoolExecutorBuilder<E>, E> constructor, ClassLoader contextClassLoader, ThreadGroup threadGroup, UncaughtExceptionHandler uncaughtExceptionHandler, String name)
{
ThreadPoolExecutorBuilder<E> result = new ThreadPoolExecutorBuilder<>(constructor, contextClassLoader, threadGroup, uncaughtExceptionHandler, name, 1);
result.withKeepAlive();
return result;
}
static <E extends SingleThreadExecutorPlus> ExecutorBuilder<E> sequentialJmx(Function<ThreadPoolExecutorBuilder<E>, E> constructor, ClassLoader contextClassLoader, ThreadGroup threadGroup, UncaughtExceptionHandler uncaughtExceptionHandler, String name, String jmxPath)
{
return new ThreadPoolExecutorJMXAdapter.Builder<>(sequential(constructor, contextClassLoader, threadGroup, uncaughtExceptionHandler, name), jmxPath);
}
static <E extends ExecutorPlus> ExecutorBuilder<E> pooled(Function<ThreadPoolExecutorBuilder<E>, E> constructor, ClassLoader contextClassLoader, ThreadGroup threadGroup, UncaughtExceptionHandler uncaughtExceptionHandler, String name, int threads)
{
return new ThreadPoolExecutorBuilder<>(constructor, contextClassLoader, threadGroup, uncaughtExceptionHandler, name, threads);
}
static <E extends ThreadPoolExecutorPlus> ExecutorBuilder<E> pooledJmx(Function<ThreadPoolExecutorBuilder<E>, E> constructor, ClassLoader contextClassLoader, ThreadGroup threadGroup, UncaughtExceptionHandler uncaughtExceptionHandler, String name, int threads, String jmxPath)
{
return new ThreadPoolExecutorJMXAdapter.Builder<>(pooled(constructor, contextClassLoader, threadGroup, uncaughtExceptionHandler, name, threads), jmxPath);
}
private final Function<ThreadPoolExecutorBuilder<E>, E> constructor;
private final String name;
private final int threads;
private int threadPriority = NORM_PRIORITY;
private Integer queueLimit;
private long keepAlive = 1;
private TimeUnit keepAliveUnits = MINUTES;
private boolean allowCoreThreadTimeouts = true;
private RejectedExecutionHandler rejectedExecutionHandler = null;
protected ThreadPoolExecutorBuilder(Function<ThreadPoolExecutorBuilder<E>, E> constructor, ClassLoader contextClassLoader, ThreadGroup overrideThreadGroup, UncaughtExceptionHandler uncaughtExceptionHandler, String name, int threads)
{
super(contextClassLoader, overrideThreadGroup, uncaughtExceptionHandler);
this.constructor = constructor;
this.name = name;
this.threads = threads;
}
// core and non-core threads will die after this period of inactivity
public ThreadPoolExecutorBuilder<E> withKeepAlive(long keepAlive, TimeUnit keepAliveUnits)
{
this.allowCoreThreadTimeouts = true;
this.keepAlive = keepAlive;
this.keepAliveUnits = keepAliveUnits;
return this;
}
// once started, core threads will never die
public ThreadPoolExecutorBuilder<E> withKeepAlive()
{
this.allowCoreThreadTimeouts = false;
return this;
}
public ThreadPoolExecutorBuilder<E> withThreadPriority(int threadPriority)
{
this.threadPriority = threadPriority;
return this;
}
@Override
public ExecutorBuilder<E> withThreadGroup(ThreadGroup threadGroup)
{
ThreadGroup current = this.threadGroup;
ThreadGroup parent = threadGroup;
while (parent != null && parent != current)
parent = parent.getParent();
if (parent != current)
throw new IllegalArgumentException("threadGroup may only be overridden with a child of the default threadGroup");
this.threadGroup = threadGroup;
return this;
}
@Override
public ExecutorBuilder<E> withDefaultThreadGroup()
{
this.threadGroup = null;
return this;
}
public ThreadPoolExecutorBuilder<E> withQueueLimit(int queueLimit)
{
this.queueLimit = queueLimit;
return this;
}
public ThreadPoolExecutorBuilder<E> withRejectedExecutionHandler(RejectedExecutionHandler rejectedExecutionHandler)
{
this.rejectedExecutionHandler = rejectedExecutionHandler;
return this;
}
public ThreadPoolExecutorBuilder<E> withUncaughtExceptionHandler(UncaughtExceptionHandler uncaughtExceptionHandler)
{
this.uncaughtExceptionHandler = uncaughtExceptionHandler;
return this;
}
@Override
public E build()
{
return constructor.apply(this);
}
NamedThreadFactory newThreadFactory()
{
return newThreadFactory(name, threadPriority);
}
BlockingQueue<Runnable> newQueue()
{
// if our pool can have an infinite number of threads, there is no point having an infinite queue length
int size = queueLimit != null
? queueLimit
: threads == Integer.MAX_VALUE
? 0 : Integer.MAX_VALUE;
return newBlockingQueue(size);
}
/**
* If our queue blocks on/rejects all submissions, we can configure our core pool size to 0,
* as new threads will always be created for new work, and core threads timeout at the same
* rate as non-core threads.
*/
int coreThreads()
{
return (queueLimit != null && queueLimit == 0) || threads == Integer.MAX_VALUE ? 0 : threads;
}
int maxThreads()
{
return threads;
}
RejectedExecutionHandler rejectedExecutionHandler(RejectedExecutionHandler ifNotSet)
{
return rejectedExecutionHandler == null ? ifNotSet : rejectedExecutionHandler;
}
long keepAlive()
{
return keepAlive;
}
TimeUnit keepAliveUnits()
{
return keepAliveUnits;
}
boolean allowCoreThreadTimeouts()
{
return allowCoreThreadTimeouts;
}
}