blob: 5e578a501fef65d70425e26b7e5b0104645990b8 [file] [log] [blame]
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* See the License for the specific language governing permissions and
* limitations under the License.
package org.apache.cassandra.concurrent;
import java.lang.Thread.UncaughtExceptionHandler;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.RejectedExecutionHandler;
import java.util.concurrent.TimeUnit;
import java.util.function.Function;
import org.apache.cassandra.concurrent.NamedThreadFactory.MetaFactory;
import static java.lang.Thread.NORM_PRIORITY;
import static java.util.concurrent.TimeUnit.MINUTES;
import static org.apache.cassandra.utils.concurrent.BlockingQueues.newBlockingQueue;
* Configure a {@link ThreadPoolExecutorPlus}, applying Cassandra's best practices by default
* <li>Core threads may timeout, and use a default {@link #keepAlive} time in {@link #keepAliveUnits}
* <li>Threads share the same {@link ThreadGroup}, which may be configurably a child of a specified {@link ThreadGroup}
* descended from the same parent of the {@link MetaFactory}
* <li>By default queues are unbounded in length
* <li>The default {@link RejectedExecutionHandler} is implementation dependent, but may be overridden
* <li>The default {@link UncaughtExceptionHandler} is inherited from {@link MetaFactory}, which in turn receives it
* from the {@link ExecutorBuilderFactory}
public class ThreadPoolExecutorBuilder<E extends ExecutorPlus> extends MetaFactory implements ExecutorBuilder<E>
static <E extends SequentialExecutorPlus> ExecutorBuilder<E> sequential(Function<ThreadPoolExecutorBuilder<E>, E> constructor, ClassLoader contextClassLoader, ThreadGroup threadGroup, UncaughtExceptionHandler uncaughtExceptionHandler, String name)
ThreadPoolExecutorBuilder<E> result = new ThreadPoolExecutorBuilder<>(constructor, contextClassLoader, threadGroup, uncaughtExceptionHandler, name, 1);
return result;
static <E extends SingleThreadExecutorPlus> ExecutorBuilder<E> sequentialJmx(Function<ThreadPoolExecutorBuilder<E>, E> constructor, ClassLoader contextClassLoader, ThreadGroup threadGroup, UncaughtExceptionHandler uncaughtExceptionHandler, String name, String jmxPath)
return new ThreadPoolExecutorJMXAdapter.Builder<>(sequential(constructor, contextClassLoader, threadGroup, uncaughtExceptionHandler, name), jmxPath);
static <E extends ExecutorPlus> ExecutorBuilder<E> pooled(Function<ThreadPoolExecutorBuilder<E>, E> constructor, ClassLoader contextClassLoader, ThreadGroup threadGroup, UncaughtExceptionHandler uncaughtExceptionHandler, String name, int threads)
return new ThreadPoolExecutorBuilder<>(constructor, contextClassLoader, threadGroup, uncaughtExceptionHandler, name, threads);
static <E extends ThreadPoolExecutorPlus> ExecutorBuilder<E> pooledJmx(Function<ThreadPoolExecutorBuilder<E>, E> constructor, ClassLoader contextClassLoader, ThreadGroup threadGroup, UncaughtExceptionHandler uncaughtExceptionHandler, String name, int threads, String jmxPath)
return new ThreadPoolExecutorJMXAdapter.Builder<>(pooled(constructor, contextClassLoader, threadGroup, uncaughtExceptionHandler, name, threads), jmxPath);
private final Function<ThreadPoolExecutorBuilder<E>, E> constructor;
private final String name;
private final int threads;
private int threadPriority = NORM_PRIORITY;
private Integer queueLimit;
private long keepAlive = 1;
private TimeUnit keepAliveUnits = MINUTES;
private boolean allowCoreThreadTimeouts = true;
private RejectedExecutionHandler rejectedExecutionHandler = null;
protected ThreadPoolExecutorBuilder(Function<ThreadPoolExecutorBuilder<E>, E> constructor, ClassLoader contextClassLoader, ThreadGroup overrideThreadGroup, UncaughtExceptionHandler uncaughtExceptionHandler, String name, int threads)
super(contextClassLoader, overrideThreadGroup, uncaughtExceptionHandler);
this.constructor = constructor; = name;
this.threads = threads;
// core and non-core threads will die after this period of inactivity
public ThreadPoolExecutorBuilder<E> withKeepAlive(long keepAlive, TimeUnit keepAliveUnits)
this.allowCoreThreadTimeouts = true;
this.keepAlive = keepAlive;
this.keepAliveUnits = keepAliveUnits;
return this;
// once started, core threads will never die
public ThreadPoolExecutorBuilder<E> withKeepAlive()
this.allowCoreThreadTimeouts = false;
return this;
public ThreadPoolExecutorBuilder<E> withThreadPriority(int threadPriority)
this.threadPriority = threadPriority;
return this;
public ExecutorBuilder<E> withThreadGroup(ThreadGroup threadGroup)
ThreadGroup current = this.threadGroup;
ThreadGroup parent = threadGroup;
while (parent != null && parent != current)
parent = parent.getParent();
if (parent != current)
throw new IllegalArgumentException("threadGroup may only be overridden with a child of the default threadGroup");
this.threadGroup = threadGroup;
return this;
public ExecutorBuilder<E> withDefaultThreadGroup()
this.threadGroup = null;
return this;
public ThreadPoolExecutorBuilder<E> withQueueLimit(int queueLimit)
this.queueLimit = queueLimit;
return this;
public ThreadPoolExecutorBuilder<E> withRejectedExecutionHandler(RejectedExecutionHandler rejectedExecutionHandler)
this.rejectedExecutionHandler = rejectedExecutionHandler;
return this;
public ThreadPoolExecutorBuilder<E> withUncaughtExceptionHandler(UncaughtExceptionHandler uncaughtExceptionHandler)
this.uncaughtExceptionHandler = uncaughtExceptionHandler;
return this;
public E build()
return constructor.apply(this);
NamedThreadFactory newThreadFactory()
return newThreadFactory(name, threadPriority);
BlockingQueue<Runnable> newQueue()
// if our pool can have an infinite number of threads, there is no point having an infinite queue length
int size = queueLimit != null
? queueLimit
: threads == Integer.MAX_VALUE
? 0 : Integer.MAX_VALUE;
return newBlockingQueue(size);
* If our queue blocks on/rejects all submissions, we can configure our core pool size to 0,
* as new threads will always be created for new work, and core threads timeout at the same
* rate as non-core threads.
int coreThreads()
return (queueLimit != null && queueLimit == 0) || threads == Integer.MAX_VALUE ? 0 : threads;
int maxThreads()
return threads;
RejectedExecutionHandler rejectedExecutionHandler(RejectedExecutionHandler ifNotSet)
return rejectedExecutionHandler == null ? ifNotSet : rejectedExecutionHandler;
long keepAlive()
return keepAlive;
TimeUnit keepAliveUnits()
return keepAliveUnits;
boolean allowCoreThreadTimeouts()
return allowCoreThreadTimeouts;