| /* |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| package org.apache.cassandra.concurrent; |
| |
| import java.util.List; |
| import java.util.concurrent.*; |
| |
| import org.apache.cassandra.utils.concurrent.UncheckedInterruptedException; |
| |
| /** |
| * This class incorporates some Executor best practices for Cassandra. Most of the executors in the system |
| * should use or extend {@link ThreadPoolExecutorPlus}, or in rare exceptions this class. |
| * |
| * This class provides some very basic improvements: |
| * <li>We are configured by {@link ThreadPoolExecutorBuilder} |
| * <li>Tasks rejected due to overflow of the queue block the submitting thread rather than throwing {@link RejectedExecutionException} |
| * <li>{@link RunnableFuture} rejected due to executor shutdown will be cancelled |
| * <li>{@link RunnableFuture} removed by {@link #shutdownNow()} will be cancelled |
| * |
| * We also provide a shutdown hook for JMX registration cleanup. |
| */ |
| public class ThreadPoolExecutorBase extends ThreadPoolExecutor implements ResizableThreadPool |
| { |
| public static final RejectedExecutionHandler blockingExecutionHandler = (task, executor) -> |
| { |
| BlockingQueue<Runnable> queue = executor.getQueue(); |
| try |
| { |
| while (true) |
| { |
| try |
| { |
| if (executor.isShutdown()) |
| throw new RejectedExecutionException(executor + " has shut down"); |
| |
| if (queue.offer(task, 1, TimeUnit.SECONDS)) |
| break; |
| } |
| catch (InterruptedException e) |
| { |
| throw new UncheckedInterruptedException(e); |
| } |
| } |
| } |
| catch (Throwable t) |
| { |
| //Give some notification to the caller the task isn't going to run |
| if (task instanceof java.util.concurrent.Future) |
| ((java.util.concurrent.Future<?>) task).cancel(false); |
| throw t; |
| } |
| }; |
| |
| private Runnable onShutdown; |
| |
| // maximumPoolSize is only used when corePoolSize == 0 |
| // if keepAliveTime < 0 and unit == null, we forbid core thread timeouts (e.g. single threaded executors by default) |
| public ThreadPoolExecutorBase(ThreadPoolExecutorBuilder<?> builder) |
| { |
| super(builder.coreThreads(), builder.maxThreads(), builder.keepAlive(), builder.keepAliveUnits(), builder.newQueue(), builder.newThreadFactory()); |
| allowCoreThreadTimeOut(builder.allowCoreThreadTimeouts()); |
| |
| // block task submissions until queue has room. |
| // this is fighting TPE's design a bit because TPE rejects if queue.offer reports a full queue. |
| // we'll just override this with a handler that retries until it gets in. ugly, but effective. |
| // (there is an extensive analysis of the options here at |
| // http://today.java.net/pub/a/today/2008/10/23/creating-a-notifying-blocking-thread-pool-executor.html) |
| setRejectedExecutionHandler(builder.rejectedExecutionHandler(blockingExecutionHandler)); |
| } |
| |
| // no RejectedExecutionHandler |
| public ThreadPoolExecutorBase(int threads, int keepAlive, TimeUnit keepAliveUnits, BlockingQueue<Runnable> queue, NamedThreadFactory threadFactory) |
| { |
| super(threads, threads, keepAlive, keepAliveUnits, queue, threadFactory); |
| assert queue.isEmpty() : "Executor initialized with non-empty task queue"; |
| allowCoreThreadTimeOut(true); |
| } |
| |
| public void onShutdown(Runnable onShutdown) |
| { |
| this.onShutdown = onShutdown; |
| } |
| |
| public Runnable onShutdown() |
| { |
| return onShutdown; |
| } |
| |
| @Override |
| protected void terminated() |
| { |
| getThreadFactory().close(); |
| } |
| |
| @Override |
| public void shutdown() |
| { |
| try |
| { |
| super.shutdown(); |
| } |
| finally |
| { |
| if (onShutdown != null) |
| onShutdown.run(); |
| } |
| } |
| |
| @Override |
| public List<Runnable> shutdownNow() |
| { |
| try |
| { |
| List<Runnable> cancelled = super.shutdownNow(); |
| for (Runnable c : cancelled) |
| { |
| if (c instanceof java.util.concurrent.Future<?>) |
| ((java.util.concurrent.Future<?>) c).cancel(true); |
| } |
| return cancelled; |
| } |
| finally |
| { |
| if (onShutdown != null) |
| onShutdown.run(); |
| } |
| } |
| |
| @Override |
| public int getActiveTaskCount() |
| { |
| return getActiveCount(); |
| } |
| |
| @Override |
| public int getPendingTaskCount() |
| { |
| return getQueue().size(); |
| } |
| |
| public int getCoreThreads() |
| { |
| return getCorePoolSize(); |
| } |
| |
| public void setCoreThreads(int number) |
| { |
| setCorePoolSize(number); |
| } |
| |
| public int getMaximumThreads() |
| { |
| return getMaximumPoolSize(); |
| } |
| |
| public void setMaximumThreads(int number) |
| { |
| setMaximumPoolSize(number); |
| } |
| |
| @Override |
| public NamedThreadFactory getThreadFactory() |
| { |
| return (NamedThreadFactory) super.getThreadFactory(); |
| } |
| |
| public String toString() |
| { |
| return getThreadFactory().id; |
| } |
| } |