| /* |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| |
| package org.apache.cassandra.concurrent; |
| |
| import org.apache.cassandra.concurrent.InfiniteLoopExecutor.Daemon; |
| import org.apache.cassandra.concurrent.InfiniteLoopExecutor.Interrupts; |
| import org.apache.cassandra.concurrent.InfiniteLoopExecutor.SimulatorSafe; |
| import org.apache.cassandra.utils.JVMStabilityInspector; |
| import org.apache.cassandra.utils.Shared; |
| |
| import static java.lang.Thread.*; |
| import static org.apache.cassandra.concurrent.ExecutorFactory.SimulatorSemantics.NORMAL; |
| import static org.apache.cassandra.concurrent.InfiniteLoopExecutor.Daemon.DAEMON; |
| import static org.apache.cassandra.concurrent.InfiniteLoopExecutor.Interrupts.UNSYNCHRONIZED; |
| import static org.apache.cassandra.concurrent.NamedThreadFactory.createThread; |
| import static org.apache.cassandra.concurrent.NamedThreadFactory.setupThread; |
| import static org.apache.cassandra.concurrent.ThreadPoolExecutorBuilder.pooledJmx; |
| import static org.apache.cassandra.concurrent.ThreadPoolExecutorBuilder.sequentialJmx; |
| import static org.apache.cassandra.utils.Shared.Recursive.INTERFACES; |
| import static org.apache.cassandra.utils.Shared.Scope.SIMULATION; |
| |
| /** |
| * Entry point for configuring and creating new executors. |
| * |
| * Supports quick and easy construction of default-configured executors via |
| * <li>{@link #sequential(String)} |
| * <li>{@link #pooled(String, int)} |
| * <li>{@link #scheduled(String)} |
| * <li>{@link #scheduled(boolean, String)} |
| * <li>{@link #scheduled(boolean, String, int)} |
| * |
| * Supports custom configuration of executors via |
| * <li>{@link #configureSequential(String)} |
| * <li>{@link #configurePooled(String, int)} |
| * |
| * Supports any of the above with added JMX registration via sub-factories |
| * <li>{@link #withJmx(String)} |
| * <li>{@link #withJmxInternal()} |
| * |
| * Supports any of the above with the resultant executor propagating {@link ExecutorLocals} via sub-factory |
| * <li>{@link #localAware()} |
| * |
| * Supports shared executors via sub-factory {@code localAware().withJMX()} |
| * using {@link LocalAwareSubFactoryWithJMX#shared(String, int, ExecutorPlus.MaximumPoolSizeListener)} |
| */ |
| @Shared(scope = SIMULATION, inner = INTERFACES) |
| public interface ExecutorFactory extends ExecutorBuilderFactory.Jmxable<ExecutorPlus, SequentialExecutorPlus> |
| { |
| public interface LocalAwareSubFactoryWithJMX extends ExecutorBuilderFactory<LocalAwareExecutorPlus, LocalAwareSequentialExecutorPlus> |
| { |
| LocalAwareExecutorPlus shared(String name, int threads, ExecutorPlus.MaximumPoolSizeListener onSetMaxSize); |
| } |
| |
| public interface LocalAwareSubFactory extends ExecutorBuilderFactory<LocalAwareExecutorPlus, LocalAwareSequentialExecutorPlus> |
| { |
| LocalAwareSubFactoryWithJMX withJmx(String jmxPath); |
| default LocalAwareSubFactoryWithJMX withJmxInternal() { return withJmx("internal"); } |
| } |
| |
| public enum SimulatorSemantics |
| { |
| NORMAL, DISCARD |
| } |
| |
| /** |
| * @return a factory that configures executors that propagate {@link ExecutorLocals} to the executing thread |
| */ |
| LocalAwareSubFactory localAware(); |
| |
| /** |
| * @param name the name of the executor, the executor's thread group, and of any worker threads |
| * @return a default-configured {@link ScheduledExecutorPlus} |
| */ |
| default ScheduledExecutorPlus scheduled(String name) { return scheduled(true, name, NORM_PRIORITY); } |
| |
| /** |
| * @param name the name of the executor, the executor's thread group, and of any worker threads |
| * @param simulatorSemantics indicate special semantics for the executor under simulation |
| * @return a default-configured {@link ScheduledExecutorPlus} |
| */ |
| default ScheduledExecutorPlus scheduled(String name, SimulatorSemantics simulatorSemantics) { return scheduled(true, name, NORM_PRIORITY, simulatorSemantics); } |
| |
| /** |
| * @param executeOnShutdown if false, waiting tasks will be cancelled on shutdown |
| * @param name the name of the executor, the executor's thread group, and of any worker threads |
| * @return a {@link ScheduledExecutorPlus} with normal thread priority |
| */ |
| default ScheduledExecutorPlus scheduled(boolean executeOnShutdown, String name) { return scheduled(executeOnShutdown, name, NORM_PRIORITY); } |
| |
| /** |
| * @param executeOnShutdown if false, waiting tasks will be cancelled on shutdown |
| * @param name the name of the executor, the executor's thread group, and of any worker threads |
| * @param priority the thread priority of workers |
| * @return a {@link ScheduledExecutorPlus} |
| */ |
| default ScheduledExecutorPlus scheduled(boolean executeOnShutdown, String name, int priority) { return scheduled(executeOnShutdown, name, priority, NORMAL); } |
| |
| /** |
| * @param executeOnShutdown if false, waiting tasks will be cancelled on shutdown |
| * @param name the name of the executor, the executor's thread group, and of any worker threads |
| * @param priority the thread priority of workers |
| * @param simulatorSemantics indicate special semantics for the executor under simulation |
| * @return a {@link ScheduledExecutorPlus} |
| */ |
| ScheduledExecutorPlus scheduled(boolean executeOnShutdown, String name, int priority, SimulatorSemantics simulatorSemantics); |
| |
| /** |
| * Create and start a new thread to execute {@code runnable} |
| * @param name the name of the thread |
| * @param runnable the task to execute |
| * @param daemon flag to indicate whether the thread should be a daemon or not |
| * @return the new thread |
| */ |
| Thread startThread(String name, Runnable runnable, Daemon daemon); |
| |
| /** |
| * Create and start a new thread to execute {@code runnable}; this thread will be a daemon thread. |
| * @param name the name of the thread |
| * @param runnable the task to execute |
| * @return the new thread |
| */ |
| default Thread startThread(String name, Runnable runnable) |
| { |
| return startThread(name, runnable, DAEMON); |
| } |
| |
| /** |
| * Create and start a new InfiniteLoopExecutor to repeatedly invoke {@code runnable}. |
| * On shutdown, the executing thread will be interrupted; to support clean shutdown |
| * {@code runnable} should propagate {@link InterruptedException} |
| * |
| * @param name the name of the thread used to invoke the task repeatedly |
| * @param task the task to execute repeatedly |
| * @param simulatorSafe flag indicating if the loop thread can be intercepted / rescheduled during cluster simulation |
| * @param daemon flag to indicate whether the loop thread should be a daemon thread or not |
| * @param interrupts flag to indicate whether to synchronize interrupts of the task execution thread |
| * using the task's monitor this can be used to prevent interruption while performing |
| * IO operations which forbid interrupted threads. |
| * See: {@link org.apache.cassandra.db.commitlog.AbstractCommitLogSegmentManager::start} |
| * @return the new thread |
| */ |
| Interruptible infiniteLoop(String name, Interruptible.Task task, SimulatorSafe simulatorSafe, Daemon daemon, Interrupts interrupts); |
| |
| /** |
| * Create and start a new InfiniteLoopExecutor to repeatedly invoke {@code runnable}. |
| * On shutdown, the executing thread will be interrupted; to support clean shutdown |
| * {@code runnable} should propagate {@link InterruptedException} |
| * |
| * @param name the name of the thread used to invoke the task repeatedly |
| * @param task the task to execute repeatedly |
| * @param simulatorSafe flag indicating if the loop thread can be intercepted / rescheduled during cluster simulation |
| * @return the new thread |
| */ |
| default Interruptible infiniteLoop(String name, Interruptible.SimpleTask task, SimulatorSafe simulatorSafe) |
| { |
| return infiniteLoop(name, Interruptible.Task.from(task), simulatorSafe, DAEMON, UNSYNCHRONIZED); |
| } |
| |
| /** |
| * Create a new thread group for use with builders - this thread group will be situated within |
| * this factory's parent thread group, and may be supplied to multiple executor builders. |
| */ |
| ThreadGroup newThreadGroup(String name); |
| |
| public static final class Global |
| { |
| // deliberately not volatile to ensure zero overhead outside of testing; |
| // depend on other memory visibility primitives to ensure visibility |
| private static ExecutorFactory FACTORY = new ExecutorFactory.Default(Global.class.getClassLoader(), null, JVMStabilityInspector::uncaughtException); |
| private static boolean modified; |
| |
| public static ExecutorFactory executorFactory() |
| { |
| return FACTORY; |
| } |
| |
| public static synchronized void unsafeSet(ExecutorFactory executorFactory) |
| { |
| FACTORY = executorFactory; |
| modified = true; |
| } |
| |
| public static synchronized boolean tryUnsafeSet(ExecutorFactory executorFactory) |
| { |
| if (modified) |
| return false; |
| unsafeSet(executorFactory); |
| return true; |
| } |
| } |
| |
| public static final class Default extends NamedThreadFactory.MetaFactory implements ExecutorFactory |
| { |
| public Default(ClassLoader contextClassLoader, ThreadGroup threadGroup, UncaughtExceptionHandler uncaughtExceptionHandler) |
| { |
| super(contextClassLoader, threadGroup, uncaughtExceptionHandler); |
| } |
| |
| @Override |
| public LocalAwareSubFactory localAware() |
| { |
| return new LocalAwareSubFactory() |
| { |
| public ExecutorBuilder<? extends LocalAwareSequentialExecutorPlus> configureSequential(String name) |
| { |
| return ThreadPoolExecutorBuilder.sequential(LocalAwareSingleThreadExecutorPlus::new, contextClassLoader, threadGroup, uncaughtExceptionHandler, name); |
| } |
| |
| public ExecutorBuilder<LocalAwareThreadPoolExecutorPlus> configurePooled(String name, int threads) |
| { |
| return ThreadPoolExecutorBuilder.pooled(LocalAwareThreadPoolExecutorPlus::new, contextClassLoader, threadGroup, uncaughtExceptionHandler, name, threads); |
| } |
| |
| public LocalAwareSubFactoryWithJMX withJmx(String jmxPath) |
| { |
| return new LocalAwareSubFactoryWithJMX() |
| { |
| public ExecutorBuilder<LocalAwareSingleThreadExecutorPlus> configureSequential(String name) |
| { |
| return sequentialJmx(LocalAwareSingleThreadExecutorPlus::new, contextClassLoader, threadGroup, uncaughtExceptionHandler, name, jmxPath); |
| } |
| |
| public ExecutorBuilder<LocalAwareThreadPoolExecutorPlus> configurePooled(String name, int threads) |
| { |
| return pooledJmx(LocalAwareThreadPoolExecutorPlus::new, contextClassLoader, threadGroup, uncaughtExceptionHandler, name, threads, jmxPath); |
| } |
| |
| public LocalAwareExecutorPlus shared(String name, int threads, ExecutorPlus.MaximumPoolSizeListener onSetMaxSize) |
| { |
| return SharedExecutorPool.SHARED.newExecutor(threads, onSetMaxSize, jmxPath, name); |
| } |
| }; |
| } |
| }; |
| } |
| |
| @Override |
| public ExecutorBuilderFactory<ExecutorPlus, SequentialExecutorPlus> withJmx(String jmxPath) |
| { |
| return new ExecutorBuilderFactory<ExecutorPlus, SequentialExecutorPlus>() |
| { |
| @Override |
| public ExecutorBuilder<? extends SequentialExecutorPlus> configureSequential(String name) |
| { |
| return ThreadPoolExecutorBuilder.sequentialJmx(SingleThreadExecutorPlus::new, contextClassLoader, threadGroup, uncaughtExceptionHandler, name, jmxPath); |
| } |
| |
| @Override |
| public ExecutorBuilder<? extends ExecutorPlus> configurePooled(String name, int threads) |
| { |
| return ThreadPoolExecutorBuilder.pooledJmx(ThreadPoolExecutorPlus::new, contextClassLoader, threadGroup, uncaughtExceptionHandler, name, threads, jmxPath); |
| } |
| }; |
| } |
| |
| @Override |
| public ExecutorBuilder<SingleThreadExecutorPlus> configureSequential(String name) |
| { |
| return ThreadPoolExecutorBuilder.sequential(SingleThreadExecutorPlus::new, contextClassLoader, threadGroup, uncaughtExceptionHandler, name); |
| } |
| |
| @Override |
| public ExecutorBuilder<ThreadPoolExecutorPlus> configurePooled(String name, int threads) |
| { |
| return ThreadPoolExecutorBuilder.pooled(ThreadPoolExecutorPlus::new, contextClassLoader, threadGroup, uncaughtExceptionHandler, name, threads); |
| } |
| |
| @Override |
| public ScheduledExecutorPlus scheduled(boolean executeOnShutdown, String name, int priority, SimulatorSemantics simulatorSemantics) |
| { |
| ScheduledThreadPoolExecutorPlus executor = new ScheduledThreadPoolExecutorPlus(newThreadFactory(name, priority)); |
| if (!executeOnShutdown) |
| executor.setExecuteExistingDelayedTasksAfterShutdownPolicy(false); |
| return executor; |
| } |
| |
| @Override |
| public Thread startThread(String name, Runnable runnable, Daemon daemon) |
| { |
| Thread thread = setupThread(createThread(threadGroup, runnable, name, daemon == DAEMON), |
| Thread.NORM_PRIORITY, |
| contextClassLoader, |
| uncaughtExceptionHandler); |
| thread.start(); |
| return thread; |
| } |
| |
| @Override |
| public Interruptible infiniteLoop(String name, Interruptible.Task task, SimulatorSafe simulatorSafe, Daemon daemon, Interrupts interrupts) |
| { |
| return new InfiniteLoopExecutor(this, name, task, daemon, interrupts); |
| } |
| |
| @Override |
| public ThreadGroup newThreadGroup(String name) |
| { |
| return threadGroup == null ? null : new ThreadGroup(threadGroup, name); |
| } |
| } |
| } |